#!/usr/bin/env python3 """ Test script for Godspeed sync functionality. This script demonstrates the Godspeed sync tool by creating sample data and testing various sync scenarios. """ import os import tempfile from pathlib import Path import json # Mock data for testing without real API calls MOCK_LISTS = [ {"id": "list-1", "name": "Personal"}, {"id": "list-2", "name": "Work Projects"}, {"id": "list-3", "name": "Shopping"}, ] MOCK_TASKS = [ { "id": "task-1", "title": "Buy groceries", "list_id": "list-3", "is_complete": False, "notes": "Don't forget milk and eggs", }, { "id": "task-2", "title": "Finish quarterly report", "list_id": "list-2", "is_complete": False, "notes": "Due Friday", }, { "id": "task-3", "title": "Call dentist", "list_id": "list-1", "is_complete": True, "notes": "", }, { "id": "task-4", "title": "Review pull requests", "list_id": "list-2", "is_complete": False, "notes": "Check PR #123 and #124", }, ] class MockGodspeedClient: """Mock client for testing without hitting real API.""" def __init__(self, **kwargs): pass def get_lists(self): return MOCK_LISTS def get_tasks(self, **kwargs): filtered_tasks = MOCK_TASKS if kwargs.get("list_id"): filtered_tasks = [ t for t in MOCK_TASKS if t["list_id"] == kwargs["list_id"] ] if kwargs.get("status"): if kwargs["status"] == "complete": filtered_tasks = [t for t in filtered_tasks if t["is_complete"]] elif kwargs["status"] == "incomplete": filtered_tasks = [t for t in filtered_tasks if not t["is_complete"]] # Mock the API response format lists_dict = {lst["id"]: lst for lst in MOCK_LISTS} return {"tasks": filtered_tasks, "lists": lists_dict} def create_task(self, **kwargs): new_task = { "id": f"task-{len(MOCK_TASKS) + 1}", "title": kwargs["title"], "list_id": kwargs.get("list_id"), "is_complete": False, "notes": kwargs.get("notes", ""), } MOCK_TASKS.append(new_task) return new_task def update_task(self, task_id, **kwargs): for task in MOCK_TASKS: if task["id"] == task_id: task.update(kwargs) return task raise Exception(f"Task {task_id} not found") def complete_task(self, task_id): return self.update_task(task_id, is_complete=True) def test_markdown_parsing(): """Test markdown task parsing functionality.""" print("Testing markdown parsing...") from src.services.godspeed.sync import GodspeedSync # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: sync_dir = Path(temp_dir) sync_engine = GodspeedSync(None, sync_dir) # Test task line parsing test_lines = [ "- [ ] Simple task ", "- [x] Completed task ", "- [ ] Task with notes \n Some additional notes here", "- [ ] New task without ID", ] for line in test_lines: parsed = sync_engine._parse_task_line(line) if parsed: local_id, is_complete, title, notes = parsed print(f" Parsed: {title} (ID: {local_id}, Complete: {is_complete})") if notes: print(f" Notes: {notes}") else: print(f" Failed to parse: {line}") def test_file_operations(): """Test file reading and writing operations.""" print("\nTesting file operations...") from src.services.godspeed.sync import GodspeedSync with tempfile.TemporaryDirectory() as temp_dir: sync_dir = Path(temp_dir) sync_engine = GodspeedSync(None, sync_dir) # Create test tasks test_tasks = [ ("abc123", False, "Buy milk", "From the grocery store"), ("def456", True, "Call mom", ""), ("ghi789", False, "Finish project", "Due next week"), ] # Write tasks to file test_file = sync_dir / "test_list.md" sync_engine._write_list_file(test_file, test_tasks) print(f" Created test file: {test_file}") # Read tasks back read_tasks = sync_engine._read_list_file(test_file) print(f" Read {len(read_tasks)} tasks back from file") for i, (original, read_back) in enumerate(zip(test_tasks, read_tasks)): if original == read_back: print(f" Task {i + 1}: ✓ Match") else: print(f" Task {i + 1}: ✗ Mismatch") print(f" Original: {original}") print(f" Read back: {read_back}") def test_mock_sync(): """Test sync operations with mock data.""" print("\nTesting sync with mock data...") # Temporarily replace the real client import src.services.godspeed.sync as sync_module original_client_class = sync_module.GodspeedClient sync_module.GodspeedClient = MockGodspeedClient try: from src.services.godspeed.sync import GodspeedSync with tempfile.TemporaryDirectory() as temp_dir: sync_dir = Path(temp_dir) # Create mock client and sync engine mock_client = MockGodspeedClient() sync_engine = GodspeedSync(mock_client, sync_dir) # Test download print(" Testing download...") sync_engine.download_from_api() # Check created files md_files = list(sync_dir.glob("*.md")) print(f" Created {len(md_files)} markdown files") for md_file in md_files: tasks = sync_engine._read_list_file(md_file) print(f" {md_file.name}: {len(tasks)} tasks") # Test status status = sync_engine.get_sync_status() print( f" Status: {status['local_files']} files, {status['total_local_tasks']} tasks" ) # Test upload (modify a file first) if md_files: first_file = md_files[0] with open(first_file, "a") as f: f.write("- [ ] New local task \n") print(" Testing upload...") sync_engine.upload_to_api() print( f" Upload completed, now {len(MOCK_TASKS)} total tasks in mock data" ) finally: # Restore original client sync_module.GodspeedClient = original_client_class def test_cli_integration(): """Test CLI commands (without real API calls).""" print("\nTesting CLI integration...") # Test that imports work try: from src.cli.godspeed import godspeed, get_sync_directory print(" ✓ CLI imports successful") # Test sync directory detection sync_dir = get_sync_directory() print(f" ✓ Sync directory: {sync_dir}") except ImportError as e: print(f" ✗ CLI import failed: {e}") def main(): """Run all tests.""" print("=== Godspeed Sync Test Suite ===\n") try: test_markdown_parsing() test_file_operations() test_mock_sync() test_cli_integration() print("\n=== Test Summary ===") print("✓ All tests completed successfully!") print("\nTo use the real Godspeed sync:") print("1. Set environment variables:") print(" export GODSPEED_EMAIL='your@email.com'") print(" export GODSPEED_PASSWORD='your-password'") print(" # OR") print(" export GODSPEED_TOKEN='your-api-token'") print("") print("2. Run sync commands:") print(" python -m src.cli.godspeed download") print(" python -m src.cli.godspeed status") print(" python -m src.cli.godspeed sync") except Exception as e: print(f"\n✗ Test failed: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()