This commit is contained in:
2025-12-22 15:23:44 -05:00
parent dd3f0dea56
commit cef92e9144
9 changed files with 670 additions and 103 deletions

61
hubstaff-cli/README.md Normal file
View File

@@ -0,0 +1,61 @@
# Hubstaff CLI
This project is a command-line interface (CLI) tool for fetching timesheet data from the Hubstaff API. It provides a simple way to interact with the API and retrieve timesheet information formatted according to specified requirements.
## Table of Contents
- [Installation](#installation)
- [Usage](#usage)
- [Output Format](#output-format)
- [Dependencies](#dependencies)
## Installation
1. Clone the repository:
```
git clone <repository-url>
cd hubstaff-cli
```
2. Install the required dependencies:
```
pip install -r requirements.txt
```
3. Set up your environment variables for the Hubstaff API:
- Create a `.env` file in the root directory and add your Hubstaff API key:
```
HUBSTAFF_API_KEY=your_api_key_here
```
## Usage
To run the timesheet data fetching script, execute the following command:
```
python src/timesheet.py
```
This will log in to the Hubstaff API and fetch the timesheet data for the specified user and date range.
## Output Format
The output will be displayed in a tabular format with the following columns:
- **Date**: The date of the time entry.
- **Started At**: The time the entry started.
- **Tracked (minutes)**: The total minutes tracked for the entry.
Example output:
```
| Date | Started At | Tracked (minutes) |
|------------|------------|--------------------|
| 2023-10-01 | 09:00 | 120 |
| 2023-10-01 | 13:00 | 60 |
```
## Dependencies
- `requests`: For making HTTP requests to the Hubstaff API.
- `python-dotenv`: For loading environment variables from a `.env` file.

View File

@@ -0,0 +1,2 @@
requests
python-dotenv

View File

@@ -0,0 +1,32 @@
class HubstaffAPI:
def __init__(self, refresh_token: str):
self.refresh_token = refresh_token
self.base_url = "https://api.hubstaff.com/v2"
self.bearer_token = None
async def fetch_bearer_token(self):
# Logic to fetch bearer token using the refresh token
pass
async def get_current_user(self):
# Logic to get current user information
pass
async def get_activities(self, organization_id: int, user_id: int, start: str, stop: str):
# Logic to fetch activities from Hubstaff API
pass
async def get_timesheets(self, organization_id: int, user_id: int, start: str, stop: str):
# Logic to fetch timesheet data from Hubstaff API
url = f"{self.base_url}/organizations/{organization_id}/users/{user_id}/timesheets"
params = {
"start": start,
"stop": stop
}
# Make an API request to fetch timesheets
response = await self._make_request(url, params)
return response
async def _make_request(self, url: str, params: dict):
# Logic to make an HTTP request to the Hubstaff API
pass

View File

@@ -0,0 +1,112 @@
from services.hubstaff_api import HubstaffAPI
import os
import shutil
import asyncio
from datetime import datetime, timedelta
async def main():
# Load the Hubstaff API key from environment variables
api_key = os.getenv("HUBSTAFF_API_KEY")
if not api_key:
print("Error: HUBSTAFF_API_KEY environment variable not set.")
return
# Load the Hubstaff organization ID from environment variables
org_id = os.getenv("HUBSTAFF_ORG_ID")
if not org_id:
print("Error: HUBSTAFF_ORG_ID environment variable not set.")
return
# Initialize the Hubstaff API client
hubstaff = HubstaffAPI(refresh_token=api_key)
try:
# Fetch user info
user_info = await hubstaff.get_current_user()
user_id = user_info["user"]["id"]
# Define the date range for the timesheet
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
# Fetch timesheet data
timesheet_data = await hubstaff.get_timesheets(
user_id, start=start_date.isoformat(), stop=end_date.isoformat()
)
# Check if timesheet_data is valid and "times" is a list before proceeding
times = timesheet_data.get("times") if timesheet_data else None
if not times or not isinstance(times, list):
print("Error: No timesheet data found or invalid format received.")
return
# Prepare invoice configuration based on timesheet data
invoice_logo = os.getenv("INVOICE_LOGO", "/path/to/logo.png")
invoice_from = os.getenv("INVOICE_FROM", "Default Company")
invoice_to = os.getenv("INVOICE_TO", "Client Company")
invoice_tax = float(os.getenv("INVOICE_TAX", "0.0"))
invoice_rate = float(os.getenv("INVOICE_RATE", "25"))
config = {
"logo": invoice_logo,
"from": invoice_from,
"to": invoice_to,
"tax": invoice_tax,
"items": [],
"quantities": [],
"rates": [],
}
# For each timesheet entry, we'll use the note as item.
# If note is missing or "N/A", we'll use the date instead.
for entry in times:
entry_date = datetime.fromtimestamp(int(entry["date"])).strftime("%Y-%m-%d")
note = entry.get("note", "N/A")
item = note if note != "N/A" else f"Work on {entry_date}"
minutes = entry.get("minutes", 0)
config["items"].append(item)
config["quantities"].append(minutes)
config["rates"].append(invoice_rate)
# Generate YAML output
yaml_lines = [
f"logo: {config['logo']}",
f"from: {config['from']}",
f"to: {config['to']}",
f"tax: {config['tax']}",
"items:",
]
for item in config["items"]:
yaml_lines.append(f' - "{item}"')
yaml_lines.append("quantities:")
for qty in config["quantities"]:
yaml_lines.append(f" - {qty}")
yaml_lines.append("rates:")
for rate in config["rates"]:
yaml_lines.append(f" - {rate}")
yaml_content = "\n".join(yaml_lines)
config_filename = "invoice_config.yaml"
with open(config_filename, "w") as f:
f.write(yaml_content)
print(f"Invoice configuration written to {config_filename}")
# Call the invoice command if available locally
if shutil.which("invoice"):
os.system(
f"invoice generate --import {config_filename} --output timesheet-invoice.pdf"
)
else:
print(
"Invoice command not found locally. Please install it to generate invoices."
)
except Exception as e:
print(f"Error fetching timesheet data: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,22 +1,33 @@
from typing import List from typing import List
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.widgets import Footer, Header, Tabs, TabbedContent, TabPane, Select from textual.widgets import Footer, Header, TabbedContent, TabPane, Select, Button
from textual.containers import Horizontal, Vertical, ScrollableContainer from textual.containers import Horizontal, Vertical, ScrollableContainer, Container
from textual.widgets import ListView, DataTable, ListItem, Label, Static from textual.widgets import ListView, DataTable, ListItem, Label
from textual.reactive import Reactive from textual.reactive import Reactive
from textual import work from textual import work
from services.pancake import ListResponse, PancakeAPI, ClientResponse, ProjectResponse from services.pancake import PancakeAPI, ClientResponse, ProjectResponse
from datetime import datetime, UTC, timedelta from datetime import datetime, UTC, timedelta
import os
from services.hubstaff_api import HubstaffAPI
class InvoicesApp(App): class InvoicesApp(App):
"""A Textual app to manage stopwatches.""" """A Textual app to manage stopwatches."""
CSS_PATH = "styles/invoices.tcss" CSS_PATH = "styles/invoices.tcss"
clients: List[ClientResponse] clients: List[ClientResponse]
projects: List[ProjectResponse] projects: List[ProjectResponse]
client_project_id: str client_project_id: Reactive[str] = Reactive("")
selected_client_id: Reactive[str] = Reactive(None)
selected_project_id: Reactive[str] = Reactive(None)
hubstaff_user_id: Reactive[int] = Reactive(None)
hubstaff_organization_id: Reactive[int] = Reactive(None)
start_date: Reactive[datetime] = Reactive(None)
stop_date: Reactive[datetime] = Reactive(None)
BINDINGS = [ BINDINGS = [
("d", "toggle_dark", "Toggle dark mode"), ("d", "toggle_dark", "Toggle dark mode"),
("q", "quit", "Quit the App"), ("q", "quit", "Quit the App"),
] ]
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
"""Create child widgets for the app.""" """Create child widgets for the app."""
@@ -28,96 +39,157 @@ class InvoicesApp(App):
) )
with ScrollableContainer(id="main"): with ScrollableContainer(id="main"):
with TabbedContent(): with TabbedContent():
with TabPane("Projects"): with TabPane("\uf503 Projects"):
yield DataTable(id="project_list") yield DataTable(id="project_list")
with TabPane("Invoices"): with TabPane("\uee3a Invoices"):
yield DataTable(id="invoice_list") yield DataTable(id="invoice_list")
with TabPane("Timesheet"): with TabPane("\U000f12e1 Timesheet"):
with ScrollableContainer(id="timesheets"): with Horizontal(classes="split_pane"):
yield Select([("loading", 0)],id="timesheet_select") with Container(id="timesheet_select_container"):
yield DataTable(id="timesheet_list") yield Select(
[("loading", 0)],
id="timesheet_select",
compact=True,
prompt="Client Project",
)
yield DataTable(id="timesheet_list")
with Container(id="hubstaff_container"):
with Horizontal(id="timesheet_controls"):
yield Button(
"\U000f0f28 Previous Week", id="prev_week"
)
yield Label("start", id="week_start")
yield Label("end", id="week_end")
yield Button(
"Next Week \U000f0f27 ", id="next_week"
)
yield DataTable(id="hubstaff_list")
yield Footer() yield Footer()
def on_mount(self) -> None: async def on_mount(self) -> None:
#load data from pancake api # Load data from Pancake API
self.pancake = PancakeAPI() self.pancake = PancakeAPI()
self.load_sidebar() self.load_sidebar()
self.query_one(
"#timesheet_select_container", Container
).border_title = "Time Entries in Project"
self.query_one("#timesheet_list").add_columns( self.query_one("#timesheet_list").add_columns(
"ID", "ID", "Is Billed", "Date", "Minutes", "Notes"
"Is Billed",
"Date",
"Minutes",
"Notes"
) )
self.query_one("#hubstaff_list", DataTable).add_columns(
"Date", "started_at", "Tracked (minutes)"
)
self.query_one("#hubstaff_container", Container).border_title = "Hubstaff"
self.query_one("#invoice_list", DataTable).add_columns( self.query_one("#invoice_list", DataTable).add_columns(
"#", "#",
"Payment Date", "Payment Date",
"Amount", "Amount",
"Due Date", "Due Date",
"Is Paid", "Is Paid",
"Is Viewable", "Is Viewable",
"Last Viewed", "Last Viewed",
"Paid", "Paid",
"Overdue") "Overdue",
self.query_one("#project_list", DataTable).add_columns(
"ID",
"Name",
"Is Archived",
"Is Completed",
"Created At",
"Updated At"
) )
self.query_one("#project_list", DataTable).add_columns(
"ID", "Name", "Is Archived", "Is Completed", "Created At", "Updated At"
)
self.load_hubstaff()
@work
async def load_hubstaff(self) -> None:
# Initialize Hubstaff API
try:
self.hubstaff = HubstaffAPI(refresh_token=os.getenv("HUBSTAFF_API_KEY"))
await self.hubstaff.fetch_bearer_token()
# Fetch user info
user_info = await self.hubstaff.get_current_user()
self.hubstaff_user_id = user_info["user"]["id"]
# Fetch organization info
organizations = await self.hubstaff.get_organizations()
self.hubstaff_organization_id = organizations["organizations"][0]["id"]
self.start_date = (datetime.now(UTC) - timedelta(days=7)).strftime(
"%Y-%m-%dT%H:%M:%S"
)
self.stop_date = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S")
except Exception as e:
self.notify(f"Error initializing Hubstaff API: {e}")
@work @work
async def load_sidebar(self) -> None: async def load_sidebar(self) -> None:
client_resp = await self.pancake.get_all_clients({"limit": 100, "sort_by": "id", "sort_dir": "asc"}) client_resp = await self.pancake.get_all_clients(
self.clients = sorted(client_resp.get("clients", []), key=lambda x: x['total'], reverse=True) {"limit": 100, "sort_by": "id", "sort_dir": "asc"}
)
self.clients = sorted(
client_resp.get("clients", []), key=lambda x: x["total"], reverse=True
)
project_resp = await self.pancake.get_all_projects({"limit": 100}) project_resp = await self.pancake.get_all_projects({"limit": 100})
self.projects = project_resp.get("projects", []) self.projects = project_resp.get("projects", [])
# self.query_one("#invoice_list", ListView).extend(ListItem(f"{invoice['id']} {invoice['client']}") for invoice in self.invoices) # self.query_one("#invoice_list", ListView).extend(ListItem(f"{invoice['id']} {invoice['client']}") for invoice in self.invoices)
client_list = self.query_one("#client_list", ListView) client_list = self.query_one("#client_list", ListView)
client_list.clear() client_list.clear()
client_list.border_title = "Clients" client_list.border_title = "Clients"
self.selected_client_id = self.clients[0]["id"] if self.clients else None
for client in self.clients: for client in self.clients:
client_list.append(ListItem(Label(f"{client.get('company', '-')} {client.get('email', '@')}"), id="client_" + str(client["id"]))) if client.get("archived") != "1" else None client_list.append(
ListItem(
Label(f"{client.get('company', '-')} {client.get('email', '@')}"),
id="client_" + str(client["id"]),
)
) if client.get("archived") != "1" else None
def watch_selected_client_id(self, selected_client_id: str) -> None:
"""Watch for changes to the selected client ID."""
self.query_one("#client_list").selected_id = "client_" + str(selected_client_id)
def watch_start_date(self, start_date: str) -> None:
self.query_one("#week_start").update(start_date.strftime("%Y-%m-%d"))
# if self.clients: def watch_end_date(self, end_date: str) -> None:
# self.fetch_invoices(self.clients[0]["id"]) self.query_one("#week_end").update(end_date.strftime("%Y-%m-%d"))
def on_list_view_selected(self, event: ListView.Selected) -> None: def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Handle the selection of an invoice.""" """Handle the selection of an invoice."""
if event.list_view.id == "client_list": if event.list_view.id == "client_list":
selected_client_id = event.item.id.split("_")[1] selected_client_id = event.item.id.split("_")[1]
if selected_client_id: if selected_client_id:
self.selected_client_id = selected_client_id
self.fetch_invoices(selected_client_id) self.fetch_invoices(selected_client_id)
self.show_projects(selected_client_id) self.show_projects(selected_client_id)
self.query_one(
"#timesheet_select", Select
).clear() # Clear the timesheet select options
@work @work
async def fetch_invoices(self, client_id: str) -> None: async def fetch_invoices(self, client_id: str) -> None:
invoices = await self.pancake.get_all_invoices(params={"client_id": client_id, "limit": 100, "sort_by": "id", "sort_dir": "desc"}) invoices = await self.pancake.get_all_invoices(
invoice_details = invoices.get("invoices", []) params={
table = self.query_one("#invoice_list", DataTable) "client_id": client_id,
table.clear() "limit": 100,
table.border_title = "Invoices" "sort_by": "id",
"sort_dir": "desc",
}
for detail in invoice_details: )
table.add_row( invoice_details = invoices.get("invoices", [])
detail["invoice_number"], table = self.query_one("#invoice_list", DataTable)
detail["payment_date"], table.clear()
str(detail["amount"]), table.border_title = "Invoices"
detail["due_date"],
detail["is_paid"],
detail["is_viewable"],
detail.get("last_viewed", "N/A"),
detail.get("paid", "No"),
detail.get("overdue", "No"),
key=detail["id"]
)
for detail in invoice_details:
table.add_row(
detail["invoice_number"],
detail["payment_date"],
str(detail["amount"]),
detail["due_date"],
detail["is_paid"],
detail["is_viewable"],
detail.get("last_viewed", "N/A"),
detail.get("paid", "No"),
detail.get("overdue", "No"),
key=detail["id"],
)
def show_projects(self, client_id: str) -> None: def show_projects(self, client_id: str) -> None:
"""Show projects for the selected client.""" """Show projects for the selected client."""
@@ -127,25 +199,27 @@ class InvoicesApp(App):
table.border_title = "Projects" table.border_title = "Projects"
# create a new list that is sorted by id and only contains the projects for the selected client # create a new list that is sorted by id and only contains the projects for the selected client
client_projects = sorted( client_projects = sorted(self.projects, key=lambda x: x["id"], reverse=True)
self.projects,
key=lambda x: x["id"],
reverse=True
)
# filter the projects by client_id # filter the projects by client_id
client_projects = [project for project in client_projects if project["client_id"] == client_id] client_projects = [
project for project in client_projects if project["client_id"] == client_id
]
select = self.query_one("#timesheet_select", Select) select = self.query_one("#timesheet_select", Select)
select.clear() select.clear()
select.set_options((project["name"], project["id"],) for project in client_projects) select.set_options(
(
project["name"],
project["id"],
)
for project in client_projects
)
self.selected_project_id = client_projects[0]["id"] if client_projects else None self.selected_project_id = client_projects[0]["id"] if client_projects else None
if (self.selected_project_id): if self.selected_project_id:
select.value = self.selected_project_id select.value = self.selected_project_id
self.fetch_timesheet(self.selected_project_id) self.fetch_timesheet(self.selected_project_id)
for project in client_projects: for project in client_projects:
if project["client_id"] == client_id: if project["client_id"] == client_id:
table.add_row( table.add_row(
str(project["id"]), str(project["id"]),
project["name"], project["name"],
@@ -158,27 +232,71 @@ class InvoicesApp(App):
@work @work
async def fetch_timesheet(self, project_id: str) -> None: async def fetch_timesheet(self, project_id: str) -> None:
"""Fetch timesheet for the selected project.""" """Fetch timesheet for the selected project."""
#is project_id a string of digits # is project_id a string of digits
if not project_id or not str(project_id).isdigit(): if not project_id or not str(project_id).isdigit():
return return
table = self.query_one("#timesheet_list", DataTable) table = self.query_one("#timesheet_list", DataTable)
table.clear() table.clear()
table.border_title = "Timesheets" table.border_title = "Time Entries for Project ID: " + str(project_id)
# fetch timesheet data from pancake api # fetch timesheet data from pancake api
timesheet_resp = await self.pancake.get_one_project(project_id) timesheet_resp = await self.pancake.get_one_project(project_id)
timesheet_details = timesheet_resp.get("times", []) timesheet_details = timesheet_resp.get("times", [])
timesheet_details = sorted(timesheet_details, key=lambda x: x["date"], reverse=True) timesheet_details = sorted(
timesheet_details, key=lambda x: x["date"], reverse=True
)
for detail in timesheet_details: for detail in timesheet_details:
table.add_row( table.add_row(
str(detail["id"]), str(detail["id"]),
"Yes" if detail.get("invoice_item_id") else "No", "Yes" if detail.get("invoice_item_id") else "No",
datetime.fromtimestamp(int(detail["date"])).strftime("%Y-%m-%d %H:%M:%S"), datetime.fromtimestamp(int(detail["date"])).strftime(
round(float(detail.get("minutes", 0))), #round the minutes to int "%Y-%m-%d %H:%M:%S"
detail.get("note", "N/A") ),
round(float(detail.get("minutes", 0))), # round the minutes to int
detail.get("note", "N/A"),
) )
self.fetch_hubstaff_activities(
self.hubstaff_organization_id,
self.hubstaff_user_id,
self.start_date,
self.stop_date,
)
@work
async def fetch_hubstaff_activities(
self, organization_id: int, user_id: int, start: str, stop: str
) -> None:
"""Fetch activities from Hubstaff and load them into the hubstaff_list table."""
table = self.query_one("#hubstaff_list", DataTable)
table.border_title = f"Hubstaff Activities ({start} to {stop})"
try:
# Initialize Hubstaff API
# Fetch activities
activities = await self.hubstaff.get_activities(
organization_id=organization_id,
user_id=user_id,
start=start,
stop=stop,
)
# Populate the table with activities
for activity in activities.get("activities", []):
table.add_row(
str(
activity.get("date", "N/A")
), # convert start time to local time
datetime.strptime(
activity.get("started_at", "N/A"), "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%H:%M"),
int(activity.get("tracked", 0)) / 60, # Convert seconds to minutes
)
except Exception as e:
self.notify(f"Error fetching Hubstaff activities: {e}")
def on_select_changed(self, event: Select.Changed) -> None: def on_select_changed(self, event: Select.Changed) -> None:
"""Handle the selection of a project.""" """Handle the selection of a project."""
@@ -187,7 +305,6 @@ class InvoicesApp(App):
if selected_project_id: if selected_project_id:
self.fetch_timesheet(selected_project_id) self.fetch_timesheet(selected_project_id)
def action_toggle_dark(self) -> None: def action_toggle_dark(self) -> None:
"""An action to toggle dark mode.""" """An action to toggle dark mode."""
self.theme = ( self.theme = (

View File

@@ -0,0 +1,127 @@
import os
import aiohttp
import json
from typing import Dict, List, TypedDict, Optional
TOKEN_CACHE_PATH = os.path.expanduser("~/.hubstaff_tokens.json")
class Activity(TypedDict):
id: int
date: str
created_at: str
updated_at: str
time_slot: str
starts_at: str
user_id: int
project_id: int
task_id: Optional[int]
keyboard: int
mouse: int
overall: int
tracked: int
input_tracked: int
tracks_input: bool
billable: bool
paid: bool
client_invoiced: bool
team_invoiced: bool
immutable: bool
time_type: str
client: str
class ActivitiesResponse(TypedDict):
activities: List[Activity]
class HubstaffAPI:
def __init__(self, refresh_token: str):
self.refresh_token = refresh_token
self.api_root = "https://api.hubstaff.com"
self.token_endpoint = "https://account.hubstaff.com/access_tokens"
self.bearer_token = None
self._load_tokens_from_cache()
def _load_tokens_from_cache(self):
if os.path.exists(TOKEN_CACHE_PATH):
try:
with open(TOKEN_CACHE_PATH, "r") as f:
tokens = json.load(f)
self.bearer_token = tokens.get("bearer_token")
self.refresh_token = tokens.get("refresh_token", self.refresh_token)
except Exception as e:
print(f"Failed to load tokens from cache: {e}")
def _save_tokens_to_cache(self, bearer_token: str):
try:
with open(TOKEN_CACHE_PATH, "w") as f:
json.dump(
{"bearer_token": bearer_token, "refresh_token": self.refresh_token},
f,
)
except Exception as e:
print(f"Failed to save tokens to cache: {e}")
async def fetch_bearer_token(self) -> str:
"""Fetch a new bearer token using the cached refresh token."""
async with aiohttp.ClientSession() as session:
async with session.post(
self.token_endpoint,
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
) as response:
if response.status == 200:
data = await response.json()
self.bearer_token = data["access_token"]
self._save_tokens_to_cache(self.bearer_token)
return self.bearer_token
else:
raise Exception(f"Failed to fetch bearer token: {response.status}")
async def _request_with_reauth(self, method: str, url: str, **kwargs) -> Dict:
"""Make an HTTP request with automatic re-authentication on 401 errors."""
if not self.bearer_token:
await self.fetch_bearer_token()
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.bearer_token}"
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=headers, **kwargs
) as response:
if response.status == 200:
return await response.json()
elif response.status == 401:
await self.fetch_bearer_token()
return await self._request_with_reauth(method, url, **kwargs)
else:
raise Exception(
f"Request failed with status {response.status}: {await response.text()}"
)
async def get_current_user(self) -> Dict:
"""Fetch the current user info."""
url = f"{self.api_root}/v2/users/me"
return await self._request_with_reauth("GET", url)
async def get_organization_id(self) -> List[Dict]:
"""Fetch the list of organizations the user belongs to."""
url = f"{self.api_root}/organizations"
return await self._request_with_reauth("GET", url)
async def get_activities(
self, organization_id: int, user_id: int, start: str, stop: str
) -> ActivitiesResponse:
"""Fetch activities for a specific organization and user within a time range."""
url = f"{self.api_root}/v2/organizations/{organization_id}/activities"
params = {
"time_slot[start]": start,
"time_slot[stop]": stop,
"user_ids": user_id,
}
return await self._request_with_reauth("GET", url, params=params)

View File

@@ -59,17 +59,25 @@ class ListResponse(TypedDict, total=False):
class PancakeAPI: class PancakeAPI:
def __init__(self): def __init__(self):
if not API_KEY or not API_URL:
raise EnvironmentError(
"PANCAKE_API_KEY and PANCAKE_API_URL must be set as environment variables."
)
self.headers = {"x-api-key": API_KEY} self.headers = {"x-api-key": API_KEY}
self.prefix_url = API_URL self.prefix_url = API_URL
async def _get(self, url: str, params: Optional[Dict] = None) -> Dict: async def _get(self, url: str, params: Optional[Dict] = None) -> Dict:
async with ClientSession() as session: async with ClientSession() as session:
async with session.get(f"{self.prefix_url}/{url}", headers=self.headers, params=params) as response: async with session.get(
f"{self.prefix_url}/{url}", headers=self.headers, params=params
) as response:
return await response.json() return await response.json()
async def _post(self, url: str, json: Optional[Dict] = None) -> Dict: async def _post(self, url: str, json: Optional[Dict] = None) -> Dict:
async with ClientSession() as session: async with ClientSession() as session:
async with session.post(f"{self.prefix_url}/{url}", headers=self.headers, json=json) as response: async with session.post(
f"{self.prefix_url}/{url}", headers=self.headers, json=json
) as response:
return await response.json() return await response.json()
# Clients # Clients

View File

@@ -1,15 +1,85 @@
import fs from "fs";
import path from "path";
import ky from "ky"; import ky from "ky";
import { InvoiceDetailsResponse } from "./invoiceResponse.js"; import { InvoiceDetailsResponse } from "./invoiceResponse.js";
const API_KEY = process.env.PANCAKE_API_KEY; const API_KEY = process.env.PANCAKE_API_KEY;
const API_URL = process.env.PANCAKE_API_URL; const API_URL = process.env.PANCAKE_API_URL;
const TOKEN_CACHE_PATH = path.resolve(".pancake_tokens.json");
console.log("🚀 ~ API_URL:", API_URL); console.log("🚀 ~ API_URL:", API_URL);
let cachedTokens: {
refreshToken: string;
bearerToken: string;
expiresAt: number;
} | null = null;
if (fs.existsSync(TOKEN_CACHE_PATH)) {
try {
cachedTokens = JSON.parse(fs.readFileSync(TOKEN_CACHE_PATH, "utf-8"));
} catch (error) {
console.error("Failed to read token cache:", error);
}
}
const api = ky.create({ const api = ky.create({
prefixUrl: API_URL, prefixUrl: API_URL,
headers: { "x-api-key": `${API_KEY}` }, headers: async () => {
const token = await getBearerToken();
return { "x-api-key": `${API_KEY}`, Authorization: `Bearer ${token}` };
},
}); });
async function getBearerToken(): Promise<string> {
if (
cachedTokens &&
cachedTokens.bearerToken &&
Date.now() < cachedTokens.expiresAt
) {
return cachedTokens.bearerToken;
}
if (!cachedTokens || !cachedTokens.refreshToken) {
throw new Error("No refresh token available. Please authenticate again.");
}
const response = await ky.post("https://account.hubstaff.com/access_tokens", {
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: new URLSearchParams({
grant_type: "refresh_token",
refresh_token: cachedTokens.refreshToken,
}),
});
if (!response.ok) {
throw new Error("Failed to refresh bearer token.");
}
const data = await response.json();
cachedTokens.bearerToken = data.access_token;
cachedTokens.expiresAt = Date.now() + data.expires_in * 1000;
fs.writeFileSync(TOKEN_CACHE_PATH, JSON.stringify(cachedTokens));
return cachedTokens.bearerToken;
}
async function handleRequestWithRetry<T>(
requestFn: () => Promise<T>,
): Promise<T> {
try {
return await requestFn();
} catch (error: any) {
if (error.response?.status === 401) {
console.warn("Authorization failed. Retrying with refreshed token...");
await getBearerToken();
return requestFn();
}
throw error;
}
}
type PaginationParams = { type PaginationParams = {
limit?: number; limit?: number;
start?: number; start?: number;
@@ -28,40 +98,44 @@ export type ClientResponse = {
async function getAllClients( async function getAllClients(
params: PaginationParams, params: PaginationParams,
): Promise<ListResponse<"clients", ClientResponse>> { ): Promise<ListResponse<"clients", ClientResponse>> {
const { limit = 100, start = 0, sort_by = "id", sort_dir = "desc" } = params; return handleRequestWithRetry(() => {
const url = `clients?limit=${limit}&start=${start}&sort_by=${sort_by}&sort_dir=${sort_dir}`; const {
const response = api.get(url); limit = 100,
return response.json(); start = 0,
sort_by = "id",
sort_dir = "desc",
} = params;
const url = `clients?limit=${limit}&start=${start}&sort_by=${sort_by}&sort_dir=${sort_dir}`;
return api.get(url).json();
});
} }
async function getOneClient(id: string) { async function getOneClient(id: string) {
const url = `clients/show?id=${id}`; return handleRequestWithRetry(() => {
const response = await api.get(url); const url = `clients/show?id=${id}`;
return response.json(); return api.get(url).json();
});
} }
async function createNewClient(data) { async function createNewClient(data) {
const url = `clients/new`; return handleRequestWithRetry(() => {
const response = await api.post(url, { const url = `clients/new`;
json: data, return api.post(url, { json: data }).json();
}); });
return response.json();
} }
async function updateClient(data) { async function updateClient(data) {
const url = `clients/edit`; return handleRequestWithRetry(() => {
const response = await api.post(url, { const url = `clients/edit`;
json: data, return api.post(url, { json: data }).json();
}); });
return response.json();
} }
async function deleteClient(id: string) { async function deleteClient(id: string) {
const url = `clients/delete`; return handleRequestWithRetry(() => {
const response = await api.post(url, { const url = `clients/delete`;
json: { id }, return api.post(url, { json: { id } }).json();
}); });
return response.json();
} }
type ListResponse<A extends string, T> = { type ListResponse<A extends string, T> = {
status: boolean; status: boolean;

View File

@@ -11,6 +11,40 @@
margin-bottom: 1; margin-bottom: 1;
} }
#timesheets {
align: right top;
}
#timesheet_controls {
height: 1;
Button {
height: 1;
padding: 0;
border: none;
}
}
#timesheet_select {
height: 1;
color: rgb(200,20,20);
border: none;
padding: 0;
width: 50;
margin: 0;
}
#timesheet_select_container, #hubstaff_container {
height: 1fr;
border: round $primary;
}
#timesheet_select_container:focus-within, #hubstaff_container:focus-within {
border: round $accent;
}
#main { #main {
width: 4fr; width: 4fr;
} }