godspeed app sync

This commit is contained in:
Tim Bendt
2025-08-20 08:30:54 -04:00
parent ca6e4cdf5d
commit c46d53b261
17 changed files with 3170 additions and 45 deletions

View File

View File

@@ -0,0 +1,129 @@
"""Godspeed API client for task and list management."""
import json
import os
import re
import requests
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime
import urllib3
class GodspeedClient:
"""Client for interacting with the Godspeed API."""
BASE_URL = "https://api.godspeedapp.com"
def __init__(self, email: str = None, password: str = None, token: str = None):
self.email = email
self.password = password
self.token = token
self.session = requests.Session()
# Handle SSL verification bypass for corporate networks
disable_ssl = os.getenv("GODSPEED_DISABLE_SSL_VERIFY", "").lower() == "true"
if disable_ssl:
self.session.verify = False
# Suppress only the specific warning about unverified HTTPS requests
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
print("⚠️ SSL verification disabled for Godspeed API")
if token:
self.session.headers.update({"Authorization": f"Bearer {token}"})
elif email and password:
self._authenticate()
def _authenticate(self) -> str:
"""Authenticate and get access token."""
if not self.email or not self.password:
raise ValueError("Email and password required for authentication")
response = self.session.post(
f"{self.BASE_URL}/sessions/sign_in",
json={"email": self.email, "password": self.password},
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
data = response.json()
if not data.get("success"):
raise Exception("Authentication failed")
self.token = data["token"]
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
return self.token
def get_lists(self) -> List[Dict[str, Any]]:
"""Get all lists."""
response = self.session.get(f"{self.BASE_URL}/lists")
response.raise_for_status()
return response.json()
def get_tasks(self, list_id: str = None, status: str = None) -> Dict[str, Any]:
"""Get tasks with optional filtering."""
params = {}
if list_id:
params["list_id"] = list_id
if status:
params["status"] = status
response = self.session.get(f"{self.BASE_URL}/tasks", params=params)
response.raise_for_status()
return response.json()
def get_task(self, task_id: str) -> Dict[str, Any]:
"""Get a single task by ID."""
response = self.session.get(f"{self.BASE_URL}/tasks/{task_id}")
response.raise_for_status()
return response.json()
def create_task(
self,
title: str,
list_id: str = None,
notes: str = None,
location: str = "end",
**kwargs,
) -> Dict[str, Any]:
"""Create a new task."""
data = {"title": title, "location": location}
if list_id:
data["list_id"] = list_id
if notes:
data["notes"] = notes
# Add any additional kwargs
data.update(kwargs)
response = self.session.post(
f"{self.BASE_URL}/tasks",
json=data,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
return response.json()
def update_task(self, task_id: str, **kwargs) -> Dict[str, Any]:
"""Update an existing task."""
response = self.session.patch(
f"{self.BASE_URL}/tasks/{task_id}",
json=kwargs,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
return response.json()
def delete_task(self, task_id: str) -> None:
"""Delete a task."""
response = self.session.delete(f"{self.BASE_URL}/tasks/{task_id}")
response.raise_for_status()
def complete_task(self, task_id: str) -> Dict[str, Any]:
"""Mark a task as complete."""
return self.update_task(task_id, is_complete=True)
def incomplete_task(self, task_id: str) -> Dict[str, Any]:
"""Mark a task as incomplete."""
return self.update_task(task_id, is_complete=False)

View File

@@ -0,0 +1,87 @@
"""Configuration and credential management for Godspeed sync."""
import json
import os
from pathlib import Path
from typing import Optional, Dict, Any
class GodspeedConfig:
"""Manages configuration and credentials for Godspeed sync."""
def __init__(self, config_dir: Optional[Path] = None):
if config_dir is None:
config_dir = Path.home() / ".local" / "share" / "gtd-terminal-tools"
self.config_dir = Path(config_dir)
self.config_file = self.config_dir / "godspeed_config.json"
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file."""
if self.config_file.exists():
with open(self.config_file, "r") as f:
return json.load(f)
return {}
def _save_config(self):
"""Save configuration to file."""
self.config_dir.mkdir(parents=True, exist_ok=True)
with open(self.config_file, "w") as f:
json.dump(self.config, f, indent=2)
def get_email(self) -> Optional[str]:
"""Get stored email or from environment."""
return os.getenv("GODSPEED_EMAIL") or self.config.get("email")
def set_email(self, email: str):
"""Store email in config."""
self.config["email"] = email
self._save_config()
def get_token(self) -> Optional[str]:
"""Get stored token or from environment."""
return os.getenv("GODSPEED_TOKEN") or self.config.get("token")
def set_token(self, token: str):
"""Store token in config."""
self.config["token"] = token
self._save_config()
def get_sync_directory(self) -> Path:
"""Get sync directory from config or environment."""
sync_dir = os.getenv("GODSPEED_SYNC_DIR") or self.config.get("sync_directory")
if sync_dir:
return Path(sync_dir)
# Default to ~/Documents/Godspeed or ~/.local/share/gtd-terminal-tools/godspeed
home = Path.home()
# Try Documents first
docs_dir = home / "Documents" / "Godspeed"
if docs_dir.parent.exists():
return docs_dir
# Fall back to data directory
return home / ".local" / "share" / "gtd-terminal-tools" / "godspeed"
def set_sync_directory(self, sync_dir: Path):
"""Store sync directory in config."""
self.config["sync_directory"] = str(sync_dir)
self._save_config()
def clear_credentials(self):
"""Clear stored credentials."""
self.config.pop("email", None)
self.config.pop("token", None)
self._save_config()
def get_all_settings(self) -> Dict[str, Any]:
"""Get all current settings."""
return {
"email": self.get_email(),
"has_token": bool(self.get_token()),
"sync_directory": str(self.get_sync_directory()),
"config_file": str(self.config_file),
}

View File

@@ -0,0 +1,395 @@
"""Two-way synchronization engine for Godspeed API and local markdown files."""
import json
import os
import re
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from datetime import datetime
from .client import GodspeedClient
class GodspeedSync:
"""Handles bidirectional sync between Godspeed API and local markdown files."""
def __init__(self, client: GodspeedClient, sync_dir: Path):
self.client = client
self.sync_dir = Path(sync_dir)
self.metadata_file = self.sync_dir / ".godspeed_metadata.json"
self.metadata = self._load_metadata()
def _load_metadata(self) -> Dict:
"""Load sync metadata from local file."""
if self.metadata_file.exists():
with open(self.metadata_file, "r") as f:
return json.load(f)
return {
"task_mapping": {}, # local_id -> godspeed_id
"list_mapping": {}, # list_name -> list_id
"last_sync": None,
}
def _save_metadata(self):
"""Save sync metadata to local file."""
self.sync_dir.mkdir(parents=True, exist_ok=True)
with open(self.metadata_file, "w") as f:
json.dump(self.metadata, f, indent=2)
def _sanitize_filename(self, name: str) -> str:
"""Convert list name to safe filename."""
# Replace special characters with underscores
sanitized = re.sub(r'[<>:"/\\|?*]', "_", name)
# Remove multiple underscores
sanitized = re.sub(r"_+", "_", sanitized)
# Strip leading/trailing underscores and spaces
return sanitized.strip("_ ")
def _generate_local_id(self) -> str:
"""Generate a unique local ID for tracking."""
import uuid
return str(uuid.uuid4())[:8]
def _parse_task_line(self, line: str) -> Optional[Tuple[str, str, str, str]]:
"""Parse a markdown task line and extract components.
Returns: (local_id, status, title, notes) or None if invalid
status can be: 'incomplete', 'complete', or 'cleared'
"""
# Match patterns like:
# - [ ] Task title <!-- id:abc123 -->
# - [x] Completed task <!-- id:def456 -->
# - [-] Cleared/cancelled task <!-- id:ghi789 -->
# - [ ] Task with notes <!-- id:jkl012 --> Some notes here
task_pattern = r"^\s*-\s*\[([xX\s\-])\]\s*(.+?)(?:\s*<!--\s*id:(\w+)\s*-->)?\s*(?:\n\s*(.+))?$"
match = re.match(task_pattern, line.strip(), re.MULTILINE | re.DOTALL)
if not match:
return None
checkbox, title_and_maybe_notes, local_id, extra_notes = match.groups()
# Determine status from checkbox
if checkbox.lower() == "x":
status = "complete"
elif checkbox == "-":
status = "cleared"
else:
status = "incomplete"
# Split title and inline notes if present
title_parts = title_and_maybe_notes.split("<!--")[0].strip()
notes = extra_notes.strip() if extra_notes else ""
if not local_id:
local_id = self._generate_local_id()
return local_id, status, title_parts, notes
def _format_task_line(
self, local_id: str, status: str, title: str, notes: str = ""
) -> str:
"""Format a task as a markdown line with ID tracking."""
if status == "complete":
checkbox = "[x]"
elif status == "cleared":
checkbox = "[-]"
else:
checkbox = "[ ]"
line = f"- {checkbox} {title} <!-- id:{local_id} -->"
if notes:
line += f"\n {notes}"
return line
def _read_list_file(self, list_path: Path) -> List[Tuple[str, str, str, str]]:
"""Read and parse tasks from a markdown file."""
if not list_path.exists():
return []
tasks = []
with open(list_path, "r", encoding="utf-8") as f:
content = f.read()
# Split into potential task blocks
lines = content.split("\n")
current_task_lines = []
for line in lines:
if line.strip().startswith("- ["):
# Process previous task if exists
if current_task_lines:
task_block = "\n".join(current_task_lines)
parsed = self._parse_task_line(task_block)
if parsed:
tasks.append(parsed)
current_task_lines = []
current_task_lines = [line]
elif current_task_lines and line.strip():
# Continuation of current task (notes)
current_task_lines.append(line)
elif current_task_lines:
# Empty line ends the current task
task_block = "\n".join(current_task_lines)
parsed = self._parse_task_line(task_block)
if parsed:
tasks.append(parsed)
current_task_lines = []
# Process last task if exists
if current_task_lines:
task_block = "\n".join(current_task_lines)
parsed = self._parse_task_line(task_block)
if parsed:
tasks.append(parsed)
return tasks
def _write_list_file(self, list_path: Path, tasks: List[Tuple[str, str, str, str]]):
"""Write tasks to a markdown file."""
list_path.parent.mkdir(parents=True, exist_ok=True)
with open(list_path, "w", encoding="utf-8") as f:
for local_id, status, title, notes in tasks:
f.write(self._format_task_line(local_id, status, title, notes))
f.write("\n")
def download_from_api(self) -> None:
"""Download all lists and tasks from Godspeed API to local files."""
print("Downloading from Godspeed API...")
# Get all lists
lists_data = self.client.get_lists()
lists = (
lists_data if isinstance(lists_data, list) else lists_data.get("lists", [])
)
# Update list mapping
for list_item in lists:
list_name = list_item["name"]
list_id = list_item["id"]
self.metadata["list_mapping"][list_name] = list_id
# Get only incomplete tasks (hide completed/cleared from local files)
all_tasks_data = self.client.get_tasks(status="incomplete")
tasks = all_tasks_data.get("tasks", [])
task_lists = all_tasks_data.get("lists", {})
# Group tasks by list
tasks_by_list = {}
for task in tasks:
list_id = task.get("list_id")
if list_id in task_lists:
list_name = task_lists[list_id]["name"]
else:
# Find list name from our mapping
list_name = None
for name, lid in self.metadata["list_mapping"].items():
if lid == list_id:
list_name = name
break
if not list_name:
list_name = "Unknown"
if list_name not in tasks_by_list:
tasks_by_list[list_name] = []
tasks_by_list[list_name].append(task)
# Create directory structure and files
for list_name, list_tasks in tasks_by_list.items():
safe_name = self._sanitize_filename(list_name)
list_path = self.sync_dir / f"{safe_name}.md"
# Convert API tasks to our format
local_tasks = []
for task in list_tasks:
# Find existing local ID or create new one
godspeed_id = task["id"]
local_id = None
for lid, gid in self.metadata["task_mapping"].items():
if gid == godspeed_id:
local_id = lid
break
if not local_id:
local_id = self._generate_local_id()
self.metadata["task_mapping"][local_id] = godspeed_id
# Convert API task status to our format
is_complete = task.get("is_complete", False)
is_cleared = task.get("is_cleared", False)
if is_cleared:
status = "cleared"
elif is_complete:
status = "complete"
else:
status = "incomplete"
title = task["title"]
notes = task.get("notes", "")
local_tasks.append((local_id, status, title, notes))
self._write_list_file(list_path, local_tasks)
print(f" Downloaded {len(local_tasks)} tasks to {list_path}")
self.metadata["last_sync"] = datetime.now().isoformat()
self._save_metadata()
print(f"Download complete. Synced {len(tasks_by_list)} lists.")
def upload_to_api(self) -> None:
"""Upload local markdown files to Godspeed API."""
print("Uploading to Godspeed API...")
# Find all markdown files
md_files = list(self.sync_dir.glob("*.md"))
for md_file in md_files:
if md_file.name.startswith("."):
continue # Skip hidden files
list_name = md_file.stem
local_tasks = self._read_list_file(md_file)
# Get or create list ID
list_id = self.metadata["list_mapping"].get(list_name)
if not list_id:
print(
f" Warning: No list ID found for '{list_name}', tasks will go to Inbox"
)
list_id = None
for local_id, status, title, notes in local_tasks:
# Skip tasks with empty titles
if not title or not title.strip():
print(f" Skipping task with empty title (id: {local_id})")
continue
godspeed_id = self.metadata["task_mapping"].get(local_id)
if godspeed_id:
# Update existing task
try:
update_data = {"title": title.strip()}
# Handle status conversion to API format
if status == "complete":
update_data["is_complete"] = True
update_data["is_cleared"] = False
elif status == "cleared":
# Note: API requires task to be complete before clearing
update_data["is_complete"] = True
update_data["is_cleared"] = True
else: # incomplete
update_data["is_complete"] = False
update_data["is_cleared"] = False
if notes and notes.strip():
update_data["notes"] = notes.strip()
self.client.update_task(godspeed_id, **update_data)
action = {
"complete": "completed",
"cleared": "cleared",
"incomplete": "reopened",
}[status]
print(f" Updated task ({action}): {title}")
except Exception as e:
print(f" Error updating task '{title}': {e}")
else:
# Create new task
try:
create_data = {
"title": title.strip(),
"list_id": list_id,
}
# Only add notes if they exist and are not empty
if notes and notes.strip():
create_data["notes"] = notes.strip()
print(f" Creating task: '{title}' with data: {create_data}")
response = self.client.create_task(**create_data)
print(f" API response: {response}")
# Handle different response formats
if isinstance(response, dict):
if "id" in response:
new_godspeed_id = response["id"]
elif "task" in response and "id" in response["task"]:
new_godspeed_id = response["task"]["id"]
else:
print(
f" Warning: No ID found in response: {response}"
)
continue
else:
print(
f" Warning: Unexpected response format: {response}"
)
continue
self.metadata["task_mapping"][local_id] = new_godspeed_id
# Set status if not incomplete
if status == "complete":
self.client.update_task(new_godspeed_id, is_complete=True)
print(f" Created completed task: {title}")
elif status == "cleared":
# Mark complete first, then clear
self.client.update_task(
new_godspeed_id, is_complete=True, is_cleared=True
)
print(f" Created cleared task: {title}")
else:
print(f" Created task: {title}")
except Exception as e:
print(f" Error creating task '{title}': {e}")
import traceback
traceback.print_exc()
self.metadata["last_sync"] = datetime.now().isoformat()
self._save_metadata()
print("Upload complete.")
def sync_bidirectional(self) -> None:
"""Perform a full bidirectional sync."""
print("Starting bidirectional sync...")
# Download first to get latest state
self.download_from_api()
# Then upload any local changes
self.upload_to_api()
print("Bidirectional sync complete.")
def list_local_files(self) -> List[Path]:
"""List all markdown files in sync directory."""
if not self.sync_dir.exists():
return []
return list(self.sync_dir.glob("*.md"))
def get_sync_status(self) -> Dict:
"""Get current sync status and statistics."""
local_files = self.list_local_files()
total_local_tasks = 0
for file_path in local_files:
tasks = self._read_list_file(file_path)
total_local_tasks += len(tasks)
return {
"sync_directory": str(self.sync_dir),
"local_files": len(local_files),
"total_local_tasks": total_local_tasks,
"tracked_tasks": len(self.metadata["task_mapping"]),
"tracked_lists": len(self.metadata["list_mapping"]),
"last_sync": self.metadata.get("last_sync"),
}