617 lines
21 KiB
Python
617 lines
21 KiB
Python
"""CLI interface for Godspeed sync functionality."""
|
||
|
||
import click
|
||
import getpass
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
from ..services.godspeed.client import GodspeedClient
|
||
from ..services.godspeed.sync import GodspeedSync
|
||
|
||
|
||
def get_credentials():
|
||
"""Get Godspeed credentials from environment or user input."""
|
||
email = os.getenv("GODSPEED_EMAIL")
|
||
password = os.getenv("GODSPEED_PASSWORD")
|
||
token = os.getenv("GODSPEED_TOKEN")
|
||
|
||
if token:
|
||
return None, None, token
|
||
|
||
if not email:
|
||
email = click.prompt("Godspeed email")
|
||
|
||
if not password:
|
||
password = click.prompt("Godspeed password", hide_input=True)
|
||
|
||
return email, password, None
|
||
|
||
|
||
def get_sync_directory():
|
||
"""Get sync directory from environment or default."""
|
||
sync_dir = os.getenv("GODSPEED_SYNC_DIR")
|
||
if sync_dir:
|
||
return Path(sync_dir)
|
||
|
||
# Default to ~/Documents/Godspeed or ~/.local/share/gtd-terminal-tools/godspeed
|
||
home = Path.home()
|
||
|
||
# Try Documents first
|
||
docs_dir = home / "Documents" / "Godspeed"
|
||
if docs_dir.parent.exists():
|
||
return docs_dir
|
||
|
||
# Fall back to data directory
|
||
data_dir = home / ".local" / "share" / "gtd-terminal-tools" / "godspeed"
|
||
return data_dir
|
||
|
||
|
||
@click.group()
|
||
def godspeed():
|
||
"""Godspeed sync tool - bidirectional sync between Godspeed API and markdown files."""
|
||
pass
|
||
|
||
|
||
@godspeed.command()
|
||
def download():
|
||
"""Download tasks from Godspeed API to local files."""
|
||
email, password, token = get_credentials()
|
||
sync_dir = get_sync_directory()
|
||
|
||
try:
|
||
client = GodspeedClient(email=email, password=password, token=token)
|
||
sync_engine = GodspeedSync(client, sync_dir)
|
||
sync_engine.download_from_api()
|
||
|
||
click.echo(f"\nTasks downloaded to: {sync_dir}")
|
||
click.echo(
|
||
"You can now edit the markdown files and run 'godspeed upload' to sync changes back."
|
||
)
|
||
|
||
except Exception as e:
|
||
click.echo(f"Error during download: {e}", err=True)
|
||
sys.exit(1)
|
||
|
||
|
||
@godspeed.command()
|
||
def upload():
|
||
"""Upload local markdown files to Godspeed API."""
|
||
email, password, token = get_credentials()
|
||
sync_dir = get_sync_directory()
|
||
|
||
if not sync_dir.exists():
|
||
click.echo(f"Sync directory does not exist: {sync_dir}", err=True)
|
||
click.echo("Run 'godspeed download' first to initialize the sync directory.")
|
||
sys.exit(1)
|
||
|
||
try:
|
||
client = GodspeedClient(email=email, password=password, token=token)
|
||
sync_engine = GodspeedSync(client, sync_dir)
|
||
sync_engine.upload_to_api()
|
||
|
||
click.echo("Local changes uploaded successfully.")
|
||
|
||
except Exception as e:
|
||
click.echo(f"Error during upload: {e}", err=True)
|
||
sys.exit(1)
|
||
|
||
|
||
@godspeed.command()
|
||
def sync():
|
||
"""Perform bidirectional sync between local files and Godspeed API."""
|
||
email, password, token = get_credentials()
|
||
sync_dir = get_sync_directory()
|
||
|
||
try:
|
||
client = GodspeedClient(email=email, password=password, token=token)
|
||
sync_engine = GodspeedSync(client, sync_dir)
|
||
sync_engine.sync_bidirectional()
|
||
|
||
click.echo(f"\nSync complete. Files are in: {sync_dir}")
|
||
|
||
except Exception as e:
|
||
click.echo(f"Error during sync: {e}", err=True)
|
||
sys.exit(1)
|
||
|
||
|
||
@godspeed.command()
|
||
def status():
|
||
"""Show sync status and directory information."""
|
||
sync_dir = get_sync_directory()
|
||
|
||
if not sync_dir.exists():
|
||
click.echo(f"Sync directory does not exist: {sync_dir}")
|
||
click.echo("Run 'godspeed download' or 'godspeed sync' to initialize.")
|
||
return
|
||
|
||
# Create a minimal sync engine for status (no API client needed)
|
||
sync_engine = GodspeedSync(None, sync_dir)
|
||
status_info = sync_engine.get_sync_status()
|
||
|
||
click.echo(f"Sync Directory: {status_info['sync_directory']}")
|
||
click.echo(f"Local Files: {status_info['local_files']}")
|
||
click.echo(f"Total Local Tasks: {status_info['total_local_tasks']}")
|
||
click.echo(f"Tracked Tasks: {status_info['tracked_tasks']}")
|
||
click.echo(f"Tracked Lists: {status_info['tracked_lists']}")
|
||
|
||
if status_info["last_sync"]:
|
||
click.echo(f"Last Sync: {status_info['last_sync']}")
|
||
else:
|
||
click.echo("Last Sync: Never")
|
||
|
||
click.echo("\nMarkdown Files:")
|
||
for file_path in sync_engine.list_local_files():
|
||
tasks = sync_engine._read_list_file(file_path)
|
||
completed = sum(
|
||
1 for _, status, _, _ in tasks if status in ["complete", "cleared"]
|
||
)
|
||
total = len(tasks)
|
||
click.echo(f" {file_path.name}: {completed}/{total} completed")
|
||
|
||
|
||
@godspeed.command()
|
||
def test_connection():
|
||
"""Test connection to Godspeed API with SSL diagnostics."""
|
||
import requests
|
||
import ssl
|
||
import socket
|
||
|
||
click.echo("Testing connection to Godspeed API...")
|
||
|
||
# Check if SSL bypass is enabled first
|
||
disable_ssl = os.getenv("GODSPEED_DISABLE_SSL_VERIFY", "").lower() == "true"
|
||
if disable_ssl:
|
||
click.echo("⚠️ SSL verification is disabled (GODSPEED_DISABLE_SSL_VERIFY=true)")
|
||
|
||
# Test basic connectivity
|
||
ssl_error_occurred = False
|
||
try:
|
||
response = requests.get("https://api.godspeedapp.com", timeout=10)
|
||
click.echo("✓ Basic HTTPS connection successful")
|
||
except requests.exceptions.SSLError as e:
|
||
ssl_error_occurred = True
|
||
click.echo(f"✗ SSL Error: {e}")
|
||
if not disable_ssl:
|
||
click.echo("\n💡 Try setting: export GODSPEED_DISABLE_SSL_VERIFY=true")
|
||
except requests.exceptions.ConnectionError as e:
|
||
click.echo(f"✗ Connection Error: {e}")
|
||
return
|
||
except Exception as e:
|
||
click.echo(f"✗ Unexpected Error: {e}")
|
||
return
|
||
|
||
# Test with SSL bypass if enabled and there was an SSL error
|
||
if disable_ssl and ssl_error_occurred:
|
||
try:
|
||
response = requests.get(
|
||
"https://api.godspeedapp.com", verify=False, timeout=10
|
||
)
|
||
click.echo("✓ Connection successful with SSL bypass")
|
||
except Exception as e:
|
||
click.echo(f"✗ Connection failed even with SSL bypass: {e}")
|
||
return
|
||
|
||
# Test authentication if credentials available
|
||
email, password, token = get_credentials()
|
||
if token or (email and password):
|
||
try:
|
||
client = GodspeedClient(email=email, password=password, token=token)
|
||
lists = client.get_lists()
|
||
click.echo(f"✓ Authentication successful, found {len(lists)} lists")
|
||
except Exception as e:
|
||
click.echo(f"✗ Authentication failed: {e}")
|
||
else:
|
||
click.echo("ℹ️ No credentials provided for authentication test")
|
||
|
||
click.echo("\nConnection test complete!")
|
||
|
||
|
||
@godspeed.command()
|
||
def open():
|
||
"""Open the sync directory in the default file manager."""
|
||
sync_dir = get_sync_directory()
|
||
|
||
if not sync_dir.exists():
|
||
click.echo(f"Sync directory does not exist: {sync_dir}", err=True)
|
||
click.echo("Run 'godspeed download' or 'godspeed sync' to initialize.")
|
||
return
|
||
|
||
import subprocess
|
||
import platform
|
||
|
||
system = platform.system()
|
||
try:
|
||
if system == "Darwin": # macOS
|
||
subprocess.run(["open", str(sync_dir)])
|
||
elif system == "Windows":
|
||
subprocess.run(["explorer", str(sync_dir)])
|
||
else: # Linux
|
||
subprocess.run(["xdg-open", str(sync_dir)])
|
||
|
||
click.echo(f"Opened sync directory: {sync_dir}")
|
||
except Exception as e:
|
||
click.echo(f"Could not open directory: {e}", err=True)
|
||
click.echo(f"Sync directory is: {sync_dir}")
|
||
|
||
|
||
class TaskSweeper:
|
||
"""Sweeps incomplete tasks from markdown files into Godspeed Inbox."""
|
||
|
||
def __init__(self, notes_dir: Path, godspeed_dir: Path, dry_run: bool = False):
|
||
self.notes_dir = Path(notes_dir)
|
||
self.godspeed_dir = Path(godspeed_dir)
|
||
self.dry_run = dry_run
|
||
self.inbox_file = self.godspeed_dir / "Inbox.md"
|
||
|
||
# Try to use the sync engine for consistent ID generation and formatting
|
||
try:
|
||
self.sync_engine = GodspeedSync(None, str(godspeed_dir))
|
||
except Exception:
|
||
# Fallback parsing if sync engine fails
|
||
self.sync_engine = None
|
||
|
||
def _parse_task_line_fallback(self, line: str):
|
||
"""Fallback task parsing if sync engine not available."""
|
||
import re
|
||
import uuid
|
||
|
||
# Match patterns like: - [ ] Task title <!-- id:abc123 -->
|
||
task_pattern = (
|
||
r"^\s*-\s*\[([xX\s\-])\]\s*(.+?)(?:\s*<!--\s*id:(\w+)\s*-->)?\s*$"
|
||
)
|
||
match = re.match(task_pattern, line.strip())
|
||
|
||
if not match:
|
||
return None
|
||
|
||
checkbox, title_and_notes, local_id = match.groups()
|
||
|
||
# Determine status
|
||
if checkbox.lower() == "x":
|
||
status = "complete"
|
||
elif checkbox == "-":
|
||
status = "cleared"
|
||
else:
|
||
status = "incomplete"
|
||
|
||
# Extract title (remove any inline notes after <!--)
|
||
title = title_and_notes.split("<!--")[0].strip()
|
||
|
||
# Generate ID if missing
|
||
if not local_id:
|
||
if hasattr(self, "sync_engine") and self.sync_engine:
|
||
local_id = self.sync_engine._generate_local_id()
|
||
else:
|
||
import uuid
|
||
|
||
local_id = str(uuid.uuid4())[:8]
|
||
|
||
return local_id, status, title, ""
|
||
|
||
def _parse_markdown_file(self, file_path: Path):
|
||
"""Parse a markdown file and extract tasks and non-task content."""
|
||
if not file_path.exists():
|
||
return [], []
|
||
|
||
tasks = []
|
||
non_task_lines = []
|
||
|
||
try:
|
||
import builtins
|
||
|
||
with builtins.open(str(file_path), "r", encoding="utf-8") as f:
|
||
lines = f.readlines()
|
||
except Exception as e:
|
||
click.echo(f" ⚠️ Error reading {file_path}: {e}")
|
||
return [], []
|
||
|
||
for i, line in enumerate(lines):
|
||
line = line.rstrip()
|
||
|
||
# Check if this line looks like a task
|
||
if line.strip().startswith("- ["):
|
||
# Always use fallback parsing
|
||
parsed = self._parse_task_line_fallback(line)
|
||
if parsed:
|
||
tasks.append(parsed)
|
||
continue
|
||
|
||
# Not a task, keep as regular content
|
||
non_task_lines.append(line)
|
||
|
||
return tasks, non_task_lines
|
||
|
||
def _write_tasks_to_file(self, file_path: Path, tasks):
|
||
"""Write tasks to a markdown file."""
|
||
if not tasks:
|
||
return
|
||
|
||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
import builtins
|
||
|
||
# Read existing content if file exists
|
||
existing_content = ""
|
||
if file_path.exists():
|
||
with builtins.open(str(file_path), "r", encoding="utf-8") as f:
|
||
existing_content = f.read()
|
||
|
||
# Format new tasks
|
||
new_task_lines = []
|
||
for local_id, status, title, notes in tasks:
|
||
if self.sync_engine:
|
||
formatted = self.sync_engine._format_task_line(
|
||
local_id, status, title, notes
|
||
)
|
||
else:
|
||
# Fallback formatting
|
||
checkbox = {"incomplete": "[ ]", "complete": "[x]", "cleared": "[-]"}[
|
||
status
|
||
]
|
||
formatted = f"- {checkbox} {title} <!-- id:{local_id} -->"
|
||
if notes:
|
||
formatted += f"\n {notes}"
|
||
|
||
new_task_lines.append(formatted)
|
||
|
||
# Combine with existing content
|
||
if existing_content.strip():
|
||
new_content = (
|
||
existing_content.rstrip() + "\n\n" + "\n".join(new_task_lines) + "\n"
|
||
)
|
||
else:
|
||
new_content = "\n".join(new_task_lines) + "\n"
|
||
|
||
with builtins.open(str(file_path), "w", encoding="utf-8") as f:
|
||
f.write(new_content)
|
||
|
||
def _clean_file(self, file_path: Path, non_task_lines):
|
||
"""Remove tasks from original file, keeping only non-task content."""
|
||
import builtins
|
||
|
||
if not non_task_lines or all(not line.strip() for line in non_task_lines):
|
||
# File would be empty, delete it
|
||
if not self.dry_run:
|
||
file_path.unlink()
|
||
click.echo(f" 🗑️ Would delete empty file: {file_path}")
|
||
else:
|
||
# Write back non-task content
|
||
cleaned_content = "\n".join(non_task_lines).strip()
|
||
if cleaned_content:
|
||
cleaned_content += "\n"
|
||
|
||
if not self.dry_run:
|
||
with builtins.open(str(file_path), "w", encoding="utf-8") as f:
|
||
f.write(cleaned_content)
|
||
click.echo(f" ✂️ Cleaned file (removed tasks): {file_path}")
|
||
|
||
def find_markdown_files(self):
|
||
"""Find all markdown files in the notes directory, excluding Godspeed directory."""
|
||
markdown_files = []
|
||
|
||
for md_file in self.notes_dir.rglob("*.md"):
|
||
# Skip files in the Godspeed directory
|
||
if (
|
||
self.godspeed_dir in md_file.parents
|
||
or md_file.parent == self.godspeed_dir
|
||
):
|
||
continue
|
||
|
||
# Skip hidden files and directories
|
||
if any(part.startswith(".") for part in md_file.parts):
|
||
continue
|
||
|
||
markdown_files.append(md_file)
|
||
|
||
return sorted(markdown_files)
|
||
|
||
def sweep_tasks(self):
|
||
"""Sweep incomplete tasks from all markdown files into Inbox."""
|
||
click.echo(f"🧹 Sweeping incomplete tasks from: {self.notes_dir}")
|
||
click.echo(f"📥 Target Inbox: {self.inbox_file}")
|
||
click.echo(f"🔍 Dry run: {self.dry_run}")
|
||
click.echo("=" * 60)
|
||
|
||
markdown_files = self.find_markdown_files()
|
||
click.echo(f"\n📁 Found {len(markdown_files)} markdown files to process")
|
||
|
||
swept_tasks = []
|
||
processed_files = []
|
||
|
||
for file_path in markdown_files:
|
||
try:
|
||
rel_path = file_path.relative_to(self.notes_dir)
|
||
rel_path_str = str(rel_path)
|
||
except Exception as e:
|
||
click.echo(f"Error getting relative path for {file_path}: {e}")
|
||
rel_path_str = str(file_path.name)
|
||
|
||
click.echo(f"\n📄 Processing: {rel_path_str}")
|
||
|
||
tasks, non_task_lines = self._parse_markdown_file(file_path)
|
||
|
||
if not tasks:
|
||
click.echo(f" ℹ️ No tasks found")
|
||
continue
|
||
if not tasks:
|
||
click.echo(f" ℹ️ No tasks found")
|
||
continue
|
||
|
||
# Separate incomplete tasks from completed/cleared ones
|
||
incomplete_tasks = []
|
||
complete_tasks = []
|
||
|
||
for task in tasks:
|
||
local_id, status, title, notes = task
|
||
if status == "incomplete":
|
||
incomplete_tasks.append(task)
|
||
else:
|
||
complete_tasks.append(task)
|
||
|
||
if incomplete_tasks:
|
||
click.echo(f" 🔄 Found {len(incomplete_tasks)} incomplete tasks:")
|
||
for _, status, title, notes in incomplete_tasks:
|
||
click.echo(f" • {title}")
|
||
if notes:
|
||
click.echo(f" Notes: {notes}")
|
||
|
||
# Add source file annotation with clean task IDs
|
||
annotated_tasks = []
|
||
for local_id, status, title, notes in incomplete_tasks:
|
||
# Generate a fresh ID for swept tasks to avoid conflicts
|
||
if self.sync_engine:
|
||
fresh_id = self.sync_engine._generate_local_id()
|
||
else:
|
||
import uuid
|
||
|
||
fresh_id = str(uuid.uuid4())[:8]
|
||
|
||
# Add source info to notes
|
||
source_notes = f"From: {rel_path_str}"
|
||
if notes:
|
||
combined_notes = f"{notes}\n{source_notes}"
|
||
else:
|
||
combined_notes = source_notes
|
||
annotated_tasks.append((fresh_id, status, title, combined_notes))
|
||
|
||
swept_tasks.extend(annotated_tasks)
|
||
processed_files.append(str(rel_path))
|
||
|
||
if complete_tasks:
|
||
click.echo(
|
||
f" ✅ Keeping {len(complete_tasks)} completed/cleared tasks in place"
|
||
)
|
||
|
||
# Reconstruct remaining content (non-tasks + completed tasks)
|
||
remaining_content = non_task_lines.copy()
|
||
|
||
# Add completed/cleared tasks back to remaining content
|
||
if complete_tasks:
|
||
remaining_content.append("") # Empty line before tasks
|
||
for task in complete_tasks:
|
||
if self.sync_engine:
|
||
formatted = self.sync_engine._format_task_line(*task)
|
||
else:
|
||
local_id, status, title, notes = task
|
||
checkbox = {
|
||
"incomplete": "[ ]",
|
||
"complete": "[x]",
|
||
"cleared": "[-]",
|
||
}[status]
|
||
formatted = f"- {checkbox} {title} <!-- id:{local_id} -->"
|
||
if notes:
|
||
formatted += f"\n {notes}"
|
||
remaining_content.append(formatted)
|
||
|
||
# Clean the original file
|
||
if incomplete_tasks:
|
||
self._clean_file(file_path, remaining_content)
|
||
|
||
# Write swept tasks to Inbox
|
||
if swept_tasks:
|
||
click.echo(f"\n📥 Writing {len(swept_tasks)} tasks to Inbox...")
|
||
if not self.dry_run:
|
||
self._write_tasks_to_file(self.inbox_file, swept_tasks)
|
||
click.echo(f" ✅ Inbox updated: {self.inbox_file}")
|
||
|
||
# Summary
|
||
click.echo(f"\n" + "=" * 60)
|
||
click.echo(f"📊 SWEEP SUMMARY:")
|
||
click.echo(f" • Files processed: {len(processed_files)}")
|
||
click.echo(f" • Tasks swept: {len(swept_tasks)}")
|
||
click.echo(f" • Target: {self.inbox_file}")
|
||
|
||
if self.dry_run:
|
||
click.echo(f"\n⚠️ DRY RUN - No files were actually modified")
|
||
click.echo(f" Run without --dry-run to perform the sweep")
|
||
|
||
return {
|
||
"swept_tasks": len(swept_tasks),
|
||
"processed_files": processed_files,
|
||
"inbox_file": str(self.inbox_file),
|
||
}
|
||
|
||
|
||
@godspeed.command()
|
||
@click.argument(
|
||
"notes_dir",
|
||
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
|
||
required=False,
|
||
)
|
||
@click.argument(
|
||
"godspeed_dir",
|
||
type=click.Path(file_okay=False, dir_okay=True, path_type=Path),
|
||
required=False,
|
||
)
|
||
@click.option(
|
||
"--dry-run", is_flag=True, help="Show what would be done without making changes"
|
||
)
|
||
def sweep(notes_dir, godspeed_dir, dry_run):
|
||
"""Sweep incomplete tasks from markdown files into Godspeed Inbox.
|
||
|
||
NOTES_DIR: Directory containing markdown files with tasks to sweep (optional, defaults to $NOTES_DIR)
|
||
GODSPEED_DIR: Godspeed sync directory (optional, defaults to sync directory)
|
||
"""
|
||
# Handle notes_dir default from environment
|
||
if notes_dir is None:
|
||
notes_dir_env = os.getenv("NOTES_DIR")
|
||
if not notes_dir_env:
|
||
click.echo(
|
||
"❌ No notes directory specified and $NOTES_DIR environment variable not set",
|
||
err=True,
|
||
)
|
||
click.echo("Usage: godspeed sweep <notes_dir> [godspeed_dir]", err=True)
|
||
click.echo(
|
||
" or: export NOTES_DIR=/path/to/notes && godspeed sweep", err=True
|
||
)
|
||
sys.exit(1)
|
||
notes_dir = Path(notes_dir_env)
|
||
if not notes_dir.exists():
|
||
click.echo(
|
||
f"❌ Notes directory from $NOTES_DIR does not exist: {notes_dir}",
|
||
err=True,
|
||
)
|
||
sys.exit(1)
|
||
if not notes_dir.is_dir():
|
||
click.echo(
|
||
f"❌ Notes path from $NOTES_DIR is not a directory: {notes_dir}",
|
||
err=True,
|
||
)
|
||
sys.exit(1)
|
||
|
||
if godspeed_dir is None:
|
||
godspeed_dir = get_sync_directory()
|
||
|
||
# Ensure we have Path objects
|
||
notes_dir = Path(notes_dir)
|
||
godspeed_dir = Path(godspeed_dir)
|
||
|
||
try:
|
||
sweeper = TaskSweeper(notes_dir, godspeed_dir, dry_run)
|
||
result = sweeper.sweep_tasks()
|
||
|
||
if result["swept_tasks"] > 0:
|
||
click.echo(f"\n🎉 Successfully swept {result['swept_tasks']} tasks!")
|
||
if not dry_run:
|
||
click.echo(f"💡 Next steps:")
|
||
click.echo(f" 1. Review tasks in: {result['inbox_file']}")
|
||
click.echo(f" 2. Run 'godspeed upload' to sync to API")
|
||
click.echo(
|
||
f" 3. Organize tasks into appropriate lists in Godspeed app"
|
||
)
|
||
else:
|
||
click.echo(f"\n✨ No incomplete tasks found to sweep.")
|
||
|
||
except Exception as e:
|
||
click.echo(f"❌ Error during sweep: {e}", err=True)
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
godspeed()
|