working prototype
This commit is contained in:
194
src/services/pancake.py
Normal file
194
src/services/pancake.py
Normal file
@@ -0,0 +1,194 @@
|
||||
import os
|
||||
from aiohttp import ClientSession
|
||||
from typing import Dict, List, Optional, Union, TypedDict
|
||||
|
||||
API_KEY = os.getenv("PANCAKE_API_KEY")
|
||||
API_URL = os.getenv("PANCAKE_API_URL")
|
||||
|
||||
|
||||
class PaginationParams(TypedDict, total=False):
|
||||
limit: int
|
||||
start: int
|
||||
sort_by: str
|
||||
sort_dir: str # "asc" or "desc"
|
||||
|
||||
|
||||
class ClientResponse(TypedDict):
|
||||
id: str
|
||||
first_name: str
|
||||
last_name: str
|
||||
company: str
|
||||
total: float
|
||||
unpaid_total: float
|
||||
|
||||
|
||||
class ProjectResponse(TypedDict):
|
||||
name: str
|
||||
tasks: List[Dict]
|
||||
times: List[Dict]
|
||||
|
||||
|
||||
class InvoiceResponse(TypedDict):
|
||||
id: int
|
||||
invoice_number: str
|
||||
client_id: str
|
||||
amount: str
|
||||
due_date: str
|
||||
description: str
|
||||
is_viewable: str # "0" or "1"
|
||||
has_sent_notification: str # "0" or "1"
|
||||
is_paid: bool
|
||||
date_entered: str
|
||||
overdue: bool
|
||||
payment_status: str
|
||||
payment_date: str
|
||||
company: str
|
||||
email: str
|
||||
last_sent: str
|
||||
last_viewed: str
|
||||
|
||||
|
||||
class ListResponse(TypedDict, total=False):
|
||||
status: bool
|
||||
message: str
|
||||
count: int
|
||||
clients: List[ClientResponse]
|
||||
projects: List[ProjectResponse]
|
||||
invoices: List[InvoiceResponse]
|
||||
|
||||
|
||||
class PancakeAPI:
|
||||
def __init__(self):
|
||||
self.headers = {"x-api-key": API_KEY}
|
||||
self.prefix_url = API_URL
|
||||
|
||||
async def _get(self, url: str, params: Optional[Dict] = None) -> Dict:
|
||||
async with ClientSession() as session:
|
||||
async with session.get(f"{self.prefix_url}/{url}", headers=self.headers, params=params) as response:
|
||||
return await response.json()
|
||||
|
||||
async def _post(self, url: str, json: Optional[Dict] = None) -> Dict:
|
||||
async with ClientSession() as session:
|
||||
async with session.post(f"{self.prefix_url}/{url}", headers=self.headers, json=json) as response:
|
||||
return await response.json()
|
||||
|
||||
# Clients
|
||||
async def get_all_clients(self, params: PaginationParams) -> ListResponse:
|
||||
url = "clients"
|
||||
return await self._get(url, params)
|
||||
|
||||
async def get_one_client(self, client_id: str) -> ClientResponse:
|
||||
url = "clients/show"
|
||||
return await self._get(url, {"id": client_id})
|
||||
|
||||
async def create_new_client(self, data: Dict) -> Dict:
|
||||
url = "clients/new"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def update_client(self, data: Dict) -> Dict:
|
||||
url = "clients/edit"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def delete_client(self, client_id: str) -> Dict:
|
||||
url = "clients/delete"
|
||||
return await self._post(url, {"id": client_id})
|
||||
|
||||
# Projects
|
||||
async def get_all_projects(self, params: PaginationParams) -> ListResponse:
|
||||
url = "projects"
|
||||
return await self._get(url, params)
|
||||
|
||||
async def get_one_project(self, project_id: str) -> ProjectResponse:
|
||||
url = "projects/show"
|
||||
return await self._get(url, {"id": project_id})
|
||||
|
||||
async def create_new_project(self, data: Dict) -> Dict:
|
||||
url = "projects/new"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def update_project(self, data: Dict) -> Dict:
|
||||
url = "projects/edit"
|
||||
return await self._post(url, data)
|
||||
|
||||
# Tasks
|
||||
async def get_tasks_by_project(self, project_id: str) -> Dict:
|
||||
url = "projects/tasks"
|
||||
return await self._get(url, {"id": project_id})
|
||||
|
||||
async def create_task(self, data: Dict) -> Dict:
|
||||
url = "projects/tasks/new"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def update_task(self, data: Dict) -> Dict:
|
||||
url = "projects/tasks/update"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def delete_task(self, task_id: str) -> Dict:
|
||||
url = "projects/tasks/show"
|
||||
return await self._post(url, {"id": task_id})
|
||||
|
||||
async def log_time_on_task(self, data: Dict) -> Dict:
|
||||
url = "projects/tasks/log_time"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def complete_task(self, task_id: str) -> Dict:
|
||||
url = "projects/tasks/compete"
|
||||
return await self._post(url, {"id": task_id})
|
||||
|
||||
async def reopen_task(self, task_id: str) -> Dict:
|
||||
url = "projects/tasks/reopen"
|
||||
return await self._post(url, {"id": task_id})
|
||||
|
||||
# Invoices
|
||||
async def get_all_invoices(self, params: Dict) -> ListResponse:
|
||||
url = "invoices"
|
||||
return await self._get(url, params)
|
||||
|
||||
async def get_one_invoice(self, invoice_id: str) -> Dict:
|
||||
url = "invoices/show"
|
||||
return await self._get(url, {"id": invoice_id})
|
||||
|
||||
async def create_new_invoice(self, data: Dict) -> Dict:
|
||||
url = "invoices/new"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def update_invoice(self, data: Dict) -> Dict:
|
||||
url = "invoices/edit"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def delete_invoice(self, invoice_id: str) -> Dict:
|
||||
url = "invoices/delete"
|
||||
return await self._post(url, {"id": invoice_id})
|
||||
|
||||
async def open_invoice(self, invoice_id: str) -> Dict:
|
||||
url = "invoices/open"
|
||||
return await self._post(url, {"id": invoice_id})
|
||||
|
||||
async def close_invoice(self, invoice_id: str) -> Dict:
|
||||
url = "invoices/close"
|
||||
return await self._post(url, {"id": invoice_id})
|
||||
|
||||
async def mark_invoice_paid(self, invoice_id: str) -> Dict:
|
||||
url = "invoices/paid"
|
||||
return await self._post(url, {"id": invoice_id})
|
||||
|
||||
async def send_invoice(self, invoice_id: str) -> Dict:
|
||||
url = "invoices/send"
|
||||
return await self._post(url, {"id": invoice_id})
|
||||
|
||||
# Users
|
||||
async def get_all_users(self, params: PaginationParams) -> ListResponse:
|
||||
url = "users"
|
||||
return await self._get(url, params)
|
||||
|
||||
async def get_one_user(self, user_id: str) -> Dict:
|
||||
url = "users/show"
|
||||
return await self._get(url, {"id": user_id})
|
||||
|
||||
async def update_user(self, data: Dict) -> Dict:
|
||||
url = "users/edit"
|
||||
return await self._post(url, data)
|
||||
|
||||
async def delete_user(self, user_id: str) -> Dict:
|
||||
url = "users/delete"
|
||||
return await self._post(url, {"id": user_id})
|
||||
Reference in New Issue
Block a user