This commit is contained in:
Tim Bendt
2025-08-18 10:58:48 -04:00
parent c64fbbb072
commit ca6e4cdf5d
12 changed files with 2220 additions and 39 deletions

View File

@@ -6,6 +6,7 @@ from .sync import sync
from .drive import drive
from .email import email
from .calendar import calendar
from .ticktick import ticktick
@click.group()
@@ -18,3 +19,7 @@ cli.add_command(sync)
cli.add_command(drive)
cli.add_command(email)
cli.add_command(calendar)
cli.add_command(ticktick)
# Add 'tt' as a short alias for ticktick
cli.add_command(ticktick, name="tt")

607
src/cli/ticktick.py Normal file
View File

@@ -0,0 +1,607 @@
"""
TickTick CLI commands with aliases for task management.
"""
import click
from datetime import datetime
from typing import Optional, List
from rich.console import Console
from src.services.ticktick import TickTickService
from src.utils.ticktick_utils import (
create_task_table,
print_task_details,
open_task,
parse_priority,
validate_date,
console,
)
# Initialize service lazily
def get_ticktick_service():
"""Get the TickTick service, initializing it if needed."""
global _ticktick_service
if "_ticktick_service" not in globals():
_ticktick_service = TickTickService()
return _ticktick_service
@click.group()
def ticktick():
"""TickTick task management CLI."""
pass
@ticktick.command(name="list")
@click.option("--project", "-p", help="Filter by project name")
@click.option(
"--due-date", "-d", help="Filter by due date (today, tomorrow, YYYY-MM-DD)"
)
@click.option("--all", "-a", is_flag=True, help="Show all tasks including completed")
@click.option("--priority", "-pr", help="Filter by priority (0-5, low, medium, high)")
@click.option("--tag", "-t", help="Filter by tag name")
@click.option("--limit", "-l", default=20, help="Limit number of results")
def list_tasks(
project: Optional[str],
due_date: Optional[str],
all: bool,
priority: Optional[str],
tag: Optional[str],
limit: int,
):
"""List tasks (alias: ls)."""
try:
ticktick_service = get_ticktick_service()
if due_date:
if not validate_date(due_date):
console.print(f"[red]Invalid date format: {due_date}[/red]")
return
tasks = get_ticktick_service().get_tasks_by_due_date(due_date)
elif project:
tasks = get_ticktick_service().get_tasks_by_project(project)
else:
tasks = get_ticktick_service().get_tasks(completed=all)
# Apply additional filters
if priority:
priority_val = parse_priority(priority)
tasks = [t for t in tasks if t.get("priority", 0) == priority_val]
if tag:
tasks = [
t
for t in tasks
if tag.lower() in [t.lower() for t in t.get("tags", [])]
]
# Limit results
if limit > 0:
tasks = tasks[:limit]
if not tasks:
console.print("[yellow]No tasks found matching criteria[/yellow]")
return
# Display results
table = create_task_table(tasks, show_project=not project)
console.print(table)
console.print(f"\n[dim]Showing {len(tasks)} tasks[/dim]")
except Exception as e:
console.print(f"[red]Error listing tasks: {str(e)}[/red]")
@ticktick.command(name="add")
@click.argument("title")
@click.option("--project", "-p", help="Project name")
@click.option("--due-date", "-d", help="Due date (today, tomorrow, YYYY-MM-DD)")
@click.option("--priority", "-pr", help="Priority (0-5, low, medium, high)")
@click.option("--content", "-c", help="Task description/content")
@click.option("--tags", "-t", help="Comma-separated list of tags")
def add_task(
title: str,
project: Optional[str],
due_date: Optional[str],
priority: Optional[str],
content: Optional[str],
tags: Optional[str],
):
"""Add a new task (alias: a)."""
try:
# Validate due date if provided
if due_date and not validate_date(due_date):
console.print(f"[red]Invalid date format: {due_date}[/red]")
return
# Parse priority
priority_val = parse_priority(priority) if priority else None
# Parse tags
tag_list = [tag.strip() for tag in tags.split(",")] if tags else None
# Create task
task = get_ticktick_service().create_task(
title=title,
project_name=project,
due_date=due_date,
priority=priority_val,
content=content,
tags=tag_list,
)
if task:
console.print(f"[green]✓ Created task: {title}[/green]")
console.print(f"[dim]Task ID: {task.get('id', 'N/A')}[/dim]")
else:
console.print("[red]Failed to create task[/red]")
except Exception as e:
console.print(f"[red]Error creating task: {str(e)}[/red]")
@ticktick.command(name="edit")
@click.argument("task_id")
@click.option("--title", help="New task title")
@click.option("--project", "-p", help="New project name")
@click.option("--due-date", "-d", help="New due date (today, tomorrow, YYYY-MM-DD)")
@click.option("--priority", "-pr", help="New priority (0-5, low, medium, high)")
@click.option("--content", "-c", help="New task description/content")
def edit_task(
task_id: str,
title: Optional[str],
project: Optional[str],
due_date: Optional[str],
priority: Optional[str],
content: Optional[str],
):
"""Edit an existing task (alias: e)."""
try:
# Build update dictionary
updates = {}
if title:
updates["title"] = title
if project:
updates["project_name"] = project
if due_date:
if not validate_date(due_date):
console.print(f"[red]Invalid date format: {due_date}[/red]")
return
updates["due_date"] = due_date
if priority:
updates["priority"] = parse_priority(priority)
if content:
updates["content"] = content
if not updates:
console.print("[yellow]No changes specified[/yellow]")
return
# Update task
updated_task = get_ticktick_service().update_task(task_id, **updates)
if updated_task:
console.print(
f"[green]✓ Updated task: {updated_task.get('title', task_id)}[/green]"
)
else:
console.print("[red]Failed to update task[/red]")
except Exception as e:
console.print(f"[red]Error updating task: {str(e)}[/red]")
@ticktick.command(name="complete")
@click.argument("task_id")
def complete_task(task_id: str):
"""Mark a task as completed (aliases: done, c)."""
try:
success = get_ticktick_service().complete_task(task_id)
if success:
console.print(f"[green]✓ Completed task: {task_id}[/green]")
else:
console.print(f"[red]Failed to complete task: {task_id}[/red]")
except Exception as e:
console.print(f"[red]Error completing task: {str(e)}[/red]")
@ticktick.command(name="delete")
@click.argument("task_id")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation prompt")
def delete_task(task_id: str, force: bool):
"""Delete a task (aliases: del, rm)."""
try:
if not force:
if not click.confirm(f"Delete task {task_id}?"):
console.print("[yellow]Cancelled[/yellow]")
return
success = get_ticktick_service().delete_task(task_id)
if success:
console.print(f"[green]✓ Deleted task: {task_id}[/green]")
else:
console.print(f"[red]Failed to delete task: {task_id}[/red]")
except Exception as e:
console.print(f"[red]Error deleting task: {str(e)}[/red]")
@ticktick.command(name="open")
@click.argument("task_id")
@click.option(
"--browser", "-b", is_flag=True, help="Force open in browser instead of app"
)
def open_task_cmd(task_id: str, browser: bool):
"""Open a task in browser or TickTick app (alias: o)."""
try:
open_task(task_id, prefer_app=not browser)
except Exception as e:
console.print(f"[red]Error opening task: {str(e)}[/red]")
@ticktick.command(name="show")
@click.argument("task_id")
def show_task(task_id: str):
"""Show detailed task information (aliases: view, s)."""
try:
get_ticktick_service()._ensure_client()
task = get_ticktick_service().client.get_by_id(task_id, search="tasks")
if not task:
console.print(f"[red]Task not found: {task_id}[/red]")
return
print_task_details(task)
except Exception as e:
console.print(f"[red]Error showing task: {str(e)}[/red]")
@ticktick.command(name="projects")
def list_projects():
"""List all projects (alias: proj)."""
try:
projects = get_ticktick_service().get_projects()
if not projects:
console.print("[yellow]No projects found[/yellow]")
return
console.print("[bold cyan]Projects:[/bold cyan]")
for project in projects:
name = project.get("name", "Unnamed")
project_id = project.get("id", "N/A")
console.print(f" • [white]{name}[/white] [dim]({project_id})[/dim]")
console.print(f"\n[dim]Total: {len(projects)} projects[/dim]")
except Exception as e:
console.print(f"[red]Error listing projects: {str(e)}[/red]")
@ticktick.command(name="tags")
def list_tags():
"""List all tags."""
try:
tags = get_ticktick_service().get_tags()
if not tags:
console.print("[yellow]No tags found[/yellow]")
return
console.print("[bold green]Tags:[/bold green]")
for tag in tags:
name = tag.get("name", "Unnamed")
console.print(f" • [green]#{name}[/green]")
console.print(f"\n[dim]Total: {len(tags)} tags[/dim]")
except Exception as e:
console.print(f"[red]Error listing tags: {str(e)}[/red]")
@ticktick.command(name="sync")
def sync_tasks():
"""Sync tasks with TickTick servers."""
try:
get_ticktick_service().sync()
console.print("[green]✓ Synced with TickTick servers[/green]")
except Exception as e:
console.print(f"[red]Error syncing: {str(e)}[/red]")
# Add alias commands manually
@click.command()
@click.option("--project", "-p", help="Filter by project name")
@click.option(
"--due-date", "-d", help="Filter by due date (today, tomorrow, YYYY-MM-DD)"
)
@click.option("--all", "-a", is_flag=True, help="Show all tasks including completed")
@click.option("--priority", "-pr", help="Filter by priority (0-5, low, medium, high)")
@click.option("--tag", "-t", help="Filter by tag name")
@click.option("--limit", "-l", default=20, help="Limit number of results")
def ls(
project: Optional[str],
due_date: Optional[str],
all: bool,
priority: Optional[str],
tag: Optional[str],
limit: int,
):
"""Alias for list command."""
list_tasks.callback(project, due_date, all, priority, tag, limit)
@click.command()
@click.argument("title")
@click.option("--project", "-p", help="Project name")
@click.option("--due-date", "-d", help="Due date (today, tomorrow, YYYY-MM-DD)")
@click.option("--priority", "-pr", help="Priority (0-5, low, medium, high)")
@click.option("--content", "-c", help="Task description/content")
@click.option("--tags", "-t", help="Comma-separated list of tags")
def a(
title: str,
project: Optional[str],
due_date: Optional[str],
priority: Optional[str],
content: Optional[str],
tags: Optional[str],
):
"""Alias for add command."""
add_task.callback(title, project, due_date, priority, content, tags)
@click.command()
@click.argument("task_id")
@click.option("--title", help="New task title")
@click.option("--project", "-p", help="New project name")
@click.option("--due-date", "-d", help="New due date (today, tomorrow, YYYY-MM-DD)")
@click.option("--priority", "-pr", help="New priority (0-5, low, medium, high)")
@click.option("--content", "-c", help="New task description/content")
def e(
task_id: str,
title: Optional[str],
project: Optional[str],
due_date: Optional[str],
priority: Optional[str],
content: Optional[str],
):
"""Alias for edit command."""
edit_task.callback(task_id, title, project, due_date, priority, content)
@click.command()
@click.argument("task_id")
def c(task_id: str):
"""Alias for complete command."""
complete_task.callback(task_id)
@click.command()
@click.argument("task_id")
def done(task_id: str):
"""Alias for complete command."""
complete_task.callback(task_id)
@click.command()
@click.argument("task_id")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation prompt")
def rm(task_id: str, force: bool):
"""Alias for delete command."""
delete_task.callback(task_id, force)
@click.command()
@click.argument("task_id")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation prompt")
def del_cmd(task_id: str, force: bool):
"""Alias for delete command."""
delete_task.callback(task_id, force)
@click.command()
@click.argument("task_id")
@click.option(
"--browser", "-b", is_flag=True, help="Force open in browser instead of app"
)
def o(task_id: str, browser: bool):
"""Alias for open command."""
open_task_cmd.callback(task_id, browser)
@click.command()
@click.argument("task_id")
def s(task_id: str):
"""Alias for show command."""
show_task.callback(task_id)
@click.command()
@click.argument("task_id")
def view(task_id: str):
"""Alias for show command."""
show_task.callback(task_id)
@click.command()
def proj():
"""Alias for projects command."""
list_projects.callback()
# Register all alias commands
ticktick.add_command(ls)
ticktick.add_command(a)
ticktick.add_command(e)
ticktick.add_command(c)
ticktick.add_command(done)
ticktick.add_command(rm)
ticktick.add_command(del_cmd, name="del")
ticktick.add_command(o)
ticktick.add_command(s)
ticktick.add_command(view)
ticktick.add_command(proj)
@ticktick.command(name="setup")
def setup_ticktick():
"""Show TickTick setup instructions."""
from rich.panel import Panel
from rich.markdown import Markdown
setup_text = """
# TickTick Setup Instructions
## 1. Register TickTick Developer App
Visit: https://developer.ticktick.com/docs#/openapi
- Click "Manage Apps""+App Name"
- Set OAuth Redirect URL: `http://localhost:8080`
- Note your Client ID and Client Secret
## 2. Set Environment Variables
```bash
export TICKTICK_CLIENT_ID="your_client_id"
export TICKTICK_CLIENT_SECRET="your_client_secret"
export TICKTICK_REDIRECT_URI="http://localhost:8080"
# Optional (you'll be prompted if not set):
export TICKTICK_USERNAME="your_email@example.com"
export TICKTICK_PASSWORD="your_password"
```
## 3. Authentication Note
The TickTick library requires **both** OAuth2 AND login credentials:
- OAuth2: For API authorization
- Username/Password: For initial session setup
This is how the library works, not a limitation of our CLI.
## 4. Start Using
```bash
ticktick ls # List tasks
ticktick a "Task" # Add task
```
"""
console.print(
Panel(
Markdown(setup_text),
title="[bold green]TickTick Setup[/bold green]",
border_style="green",
)
)
@ticktick.command(name="test-auth")
def test_auth():
"""Test authentication and API connectivity."""
import os
from src.services.ticktick.auth import (
get_token_file_path,
check_token_validity,
get_ticktick_client,
)
console.print("[bold cyan]TickTick Authentication Test[/bold cyan]\n")
# Check environment
client_id = os.getenv("TICKTICK_CLIENT_ID")
client_secret = os.getenv("TICKTICK_CLIENT_SECRET")
if not client_id or not client_secret:
console.print("[red]❌ OAuth credentials not set[/red]")
console.print("Please set TICKTICK_CLIENT_ID and TICKTICK_CLIENT_SECRET")
return
console.print("[green]✓ OAuth credentials found[/green]")
# Check token cache
validity = check_token_validity()
if validity["valid"]:
console.print(f"[green]✓ Token cache: {validity['reason']}[/green]")
else:
console.print(f"[yellow]⚠ Token cache: {validity['reason']}[/yellow]")
# Test client creation
console.print("\n[bold]Testing TickTick client initialization...[/bold]")
try:
client = get_ticktick_client()
console.print("[green]✓ TickTick client created successfully[/green]")
# Test API call
console.print("Testing API connectivity...")
try:
projects = client.get_by_fields(search="projects")
console.print(
f"[green]✓ API test successful - found {len(projects)} projects[/green]"
)
except Exception as api_e:
console.print(f"[red]❌ API test failed: {api_e}[/red]")
except Exception as e:
console.print(f"[red]❌ Client creation failed: {str(e)}[/red]")
return
console.print("\n[green]🎉 Authentication test completed successfully![/green]")
@ticktick.command(name="auth-status")
def auth_status():
"""Check TickTick authentication status."""
import os
from src.services.ticktick.auth import get_token_file_path, check_token_validity
console.print("[bold cyan]TickTick Authentication Status[/bold cyan]\n")
# Check OAuth credentials
client_id = os.getenv("TICKTICK_CLIENT_ID")
client_secret = os.getenv("TICKTICK_CLIENT_SECRET")
redirect_uri = os.getenv("TICKTICK_REDIRECT_URI")
console.print(f"OAuth Client ID: {'✓ Set' if client_id else '✗ Not set'}")
console.print(f"OAuth Client Secret: {'✓ Set' if client_secret else '✗ Not set'}")
console.print(
f"OAuth Redirect URI: {redirect_uri or '✗ Not set (will use default)'}"
)
# Check login credentials
username = os.getenv("TICKTICK_USERNAME")
password = os.getenv("TICKTICK_PASSWORD")
console.print(f"Username: {'✓ Set' if username else '✗ Not set (will prompt)'}")
console.print(f"Password: {'✓ Set' if password else '✗ Not set (will prompt)'}")
# Check token cache with validity
token_file = get_token_file_path()
token_exists = token_file.exists()
if token_exists:
validity = check_token_validity()
if validity["valid"]:
console.print("OAuth Token Cache: [green]✓ Valid[/green]")
if "expires_in_hours" in validity:
console.print(
f"Token expires in: [yellow]{validity['expires_in_hours']} hours[/yellow]"
)
else:
console.print(
f"OAuth Token Cache: [red]✗ Invalid ({validity['reason']})[/red]"
)
import datetime
mod_time = datetime.datetime.fromtimestamp(token_file.stat().st_mtime)
console.print(f"Token file: {token_file}")
console.print(f"Last modified: {mod_time.strftime('%Y-%m-%d %H:%M:%S')}")
else:
console.print("OAuth Token Cache: [red]✗ Not found[/red]")
console.print(f"Token file: {token_file}")
console.print("\n[dim]Run 'ticktick setup' for setup instructions[/dim]")

View File

@@ -0,0 +1,8 @@
"""
TickTick API service module.
"""
from .client import TickTickService
from .auth import get_ticktick_client
__all__ = ["TickTickService", "get_ticktick_client"]

View File

@@ -0,0 +1,354 @@
"""
Authentication module for TickTick API.
"""
import os
import json
import logging
import ssl
import certifi
from pathlib import Path
from typing import Optional, Dict, Any
from ticktick.oauth2 import OAuth2
from ticktick.api import TickTickClient
# Suppress verbose logging from TickTick library
logging.getLogger("ticktick").setLevel(logging.ERROR)
logging.getLogger("requests").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
# Project name for token storage
PROJECT_NAME = "gtd-terminal-tools"
def get_token_directory() -> Path:
"""Get the directory where TickTick tokens are stored."""
token_dir = Path.home() / ".local" / "share" / PROJECT_NAME
token_dir.mkdir(parents=True, exist_ok=True)
return token_dir
def get_token_file_path() -> Path:
"""Get the full path to the TickTick token file."""
return get_token_directory() / "ticktick_tokens.json"
def load_ticktick_credentials() -> Dict[str, str]:
"""
Load TickTick OAuth credentials from environment variables.
Returns:
Dict with client_id, client_secret, and redirect_uri
Raises:
ValueError: If required environment variables are missing
"""
client_id = os.getenv("TICKTICK_CLIENT_ID")
client_secret = os.getenv("TICKTICK_CLIENT_SECRET")
redirect_uri = os.getenv("TICKTICK_REDIRECT_URI", "http://localhost:8080")
if not client_id or not client_secret:
raise ValueError(
"Please set TICKTICK_CLIENT_ID and TICKTICK_CLIENT_SECRET environment variables.\n"
"Register your app at: https://developer.ticktick.com/docs#/openapi"
)
return {
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
}
def load_stored_tokens() -> Optional[Dict[str, Any]]:
"""
Load stored OAuth tokens from the token file.
Returns:
Token data dict if file exists and is valid, None otherwise
"""
token_file = get_token_file_path()
if not token_file.exists():
return None
try:
with open(token_file, "r") as f:
tokens = json.load(f)
return tokens
except (json.JSONDecodeError, OSError) as e:
logging.warning(f"Failed to load token file {token_file}: {e}")
return None
def check_token_validity() -> Dict[str, Any]:
"""
Check the validity of stored OAuth tokens.
Returns:
Dict with 'valid', 'expires_at', 'expires_in_hours' keys
"""
tokens = load_stored_tokens()
if not tokens:
return {"valid": False, "reason": "No tokens found"}
# Check if we have required token fields (ticktick-py format)
if not tokens.get("access_token"):
return {"valid": False, "reason": "Missing access token"}
# Check expiration using ticktick-py's expire_time field
if "expire_time" in tokens:
import datetime, time
try:
# expire_time is a Unix timestamp
expires_at = datetime.datetime.fromtimestamp(tokens["expire_time"])
now = datetime.datetime.now()
# ticktick-py considers token expired if less than 60 seconds remain
time_left = (expires_at - now).total_seconds()
if time_left < 60:
return {
"valid": False,
"reason": "Token expired or expiring soon",
"expires_at": expires_at.isoformat(),
}
else:
hours_left = time_left / 3600
return {
"valid": True,
"expires_at": expires_at.isoformat(),
"expires_in_hours": round(hours_left, 1),
}
except (ValueError, TypeError) as e:
return {"valid": False, "reason": f"Invalid expiration format: {e}"}
# Check readable_expire_time if available
if "readable_expire_time" in tokens:
return {
"valid": True,
"reason": f"Token found (expires: {tokens['readable_expire_time']})",
}
# If no expiration info, assume valid but warn
return {"valid": True, "reason": "Token found (no expiration info)"}
def save_tokens(tokens: Dict[str, Any]) -> None:
"""
Save OAuth tokens to the token file.
Args:
tokens: Token data to save
"""
token_file = get_token_file_path()
try:
with open(token_file, "w") as f:
json.dump(tokens, f, indent=2)
except OSError as e:
logging.error(f"Failed to save tokens to {token_file}: {e}")
def create_oauth_client(use_custom_cache: bool = True) -> OAuth2:
"""
Create a TickTick OAuth2 client with custom token cache location.
Args:
use_custom_cache: Whether to use custom cache path in ~/.local/share
Returns:
OAuth2 client instance
"""
credentials = load_ticktick_credentials()
cache_path = str(get_token_file_path()) if use_custom_cache else ".token-oauth"
# Check if SSL verification should be disabled (for corporate MITM proxies)
disable_ssl = os.getenv("TICKTICK_DISABLE_SSL_VERIFY", "").lower() in (
"true",
"1",
"yes",
)
# Create a session with SSL handling
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
if disable_ssl:
# Disable SSL verification for corporate MITM environments
session.verify = False
# Suppress SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.info(
"SSL verification disabled for TickTick API (corporate proxy detected)"
)
else:
# Use proper SSL certificate verification
session.verify = certifi.where()
os.environ["SSL_CERT_FILE"] = certifi.where()
os.environ["REQUESTS_CA_BUNDLE"] = certifi.where()
# Add retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
oauth_client = OAuth2(
client_id=credentials["client_id"],
client_secret=credentials["client_secret"],
redirect_uri=credentials["redirect_uri"],
cache_path=cache_path,
scope="tasks:write tasks:read",
session=session,
)
return oauth_client
def get_ticktick_client(
username: Optional[str] = None, password: Optional[str] = None
) -> TickTickClient:
"""
Get an authenticated TickTick client.
Note: The ticktick-py library requires both OAuth2 credentials AND
username/password for initial session setup. This is how the library works.
Args:
username: TickTick username (will prompt if not provided)
password: TickTick password (will prompt if not provided)
Returns:
Authenticated TickTickClient instance
Raises:
ValueError: If OAuth credentials are invalid
RuntimeError: If authentication fails
"""
# First check OAuth credentials
try:
oauth_client = create_oauth_client()
except ValueError as e:
raise ValueError(f"OAuth setup failed: {str(e)}")
# Get username/password
if not username:
username = os.getenv("TICKTICK_USERNAME")
if not username:
print("\n" + "=" * 50)
print("TickTick Authentication Required")
print("=" * 50)
print("The TickTick library requires your login credentials")
print("in addition to OAuth2 for initial session setup.")
print("Your credentials are used only for authentication")
print("and are not stored permanently.")
print("=" * 50 + "\n")
username = input("TickTick Username/Email: ")
if not password:
password = os.getenv("TICKTICK_PASSWORD")
if not password:
import getpass
password = getpass.getpass("TickTick Password: ")
# Debug OAuth token status before attempting login
logging.debug(f"OAuth client cache path: {oauth_client.cache_path}")
if hasattr(oauth_client, "access_token_info") and oauth_client.access_token_info:
logging.debug("OAuth token is available and cached")
else:
logging.debug("OAuth token may need to be retrieved")
try:
# Enable more detailed logging for the API call
logging.getLogger("ticktick").setLevel(logging.DEBUG)
logging.getLogger("requests").setLevel(logging.DEBUG)
logging.info(
f"Attempting to create TickTick client with username: {username[:3]}***"
)
client = TickTickClient(username, password, oauth_client)
# Restore logging levels
logging.getLogger("ticktick").setLevel(logging.ERROR)
logging.getLogger("requests").setLevel(logging.ERROR)
# Test the client by making a simple API call
try:
# Try to get user info or projects to verify the client works
projects = client.get_by_fields(search="projects")
logging.info("TickTick client initialized and tested successfully")
except Exception as test_e:
logging.warning(f"Client created but API test failed: {test_e}")
# Don't fail here, just log the warning
return client
except Exception as e:
# Restore logging levels in case of error
logging.getLogger("ticktick").setLevel(logging.ERROR)
logging.getLogger("requests").setLevel(logging.ERROR)
error_msg = str(e)
logging.error(f"TickTick client initialization failed: {error_msg}")
# Provide more detailed error messages
if "login" in error_msg.lower():
raise RuntimeError(
f"Login failed: {error_msg}\n\n"
"Please check:\n"
"1. Your TickTick username/email and password are correct\n"
"2. Your account isn't locked or requires 2FA\n"
"3. You can log in successfully at https://ticktick.com"
)
elif "oauth" in error_msg.lower() or "token" in error_msg.lower():
raise RuntimeError(
f"OAuth authentication failed: {error_msg}\n\n"
"Please check:\n"
"1. Your OAuth2 credentials (TICKTICK_CLIENT_ID, TICKTICK_CLIENT_SECRET) are correct\n"
"2. Your app is properly registered at https://developer.ticktick.com/docs#/openapi\n"
"3. The redirect URI is set to: http://localhost:8080\n"
"4. Try clearing the token cache: rm ~/.local/share/gtd-terminal-tools/ticktick_tokens.json"
)
elif "network" in error_msg.lower() or "connection" in error_msg.lower():
raise RuntimeError(
f"Network connection failed: {error_msg}\n\n"
"Please check:\n"
"1. Your internet connection\n"
"2. If you're behind a corporate firewall, SSL verification is disabled\n"
"3. TickTick services are accessible from your network"
)
elif "Could Not Complete Request" in error_msg:
raise RuntimeError(
f"TickTick API request failed: {error_msg}\n\n"
"This could indicate:\n"
"1. Incorrect login credentials (username/password)\n"
"2. OAuth2 setup issues (client ID/secret)\n"
"3. Network connectivity problems\n"
"4. TickTick API service issues\n\n"
"Try:\n"
"- Verify you can log in at https://ticktick.com\n"
"- Check your OAuth2 app settings\n"
"- Run: python -m src.cli tt auth-status\n"
"- Run: python -m src.cli tt test-auth"
)
else:
raise RuntimeError(f"Failed to initialize TickTick client: {error_msg}")
def clear_token_cache():
"""Clear the stored TickTick token cache."""
token_file = get_token_file_path()
if token_file.exists():
token_file.unlink()
print(f"Cleared TickTick token cache: {token_file}")
else:
print("No TickTick token cache found to clear.")

View File

@@ -0,0 +1,329 @@
"""
TickTick API client service.
"""
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Union
from dateutil import parser as date_parser
from .direct_client import TickTickDirectClient
class TickTickService:
"""TickTick API service wrapper using direct OAuth API calls."""
def __init__(self):
"""Initialize the TickTick service."""
self.client: Optional[TickTickDirectClient] = None
self._projects_cache: Optional[List[Dict[str, Any]]] = None
def _ensure_client(self):
"""Ensure the TickTick client is initialized."""
if self.client is None:
self.client = TickTickDirectClient()
def get_tasks(
self, project_id: Optional[str] = None, completed: bool = False
) -> List[Dict[str, Any]]:
"""
Get tasks from TickTick.
Args:
project_id: Filter by specific project ID
completed: Whether to include completed tasks
Returns:
List of task dictionaries
"""
self._ensure_client()
# Get tasks directly from API
if project_id:
tasks = self.client.get_tasks(project_id=project_id)
else:
tasks = self.client.get_tasks()
# Filter by completion status if needed
if not completed:
# Filter out completed tasks (status = 2)
tasks = [task for task in tasks if task.get("status") != 2]
else:
# Only completed tasks
tasks = [task for task in tasks if task.get("status") == 2]
return tasks
def get_projects(self) -> List[Dict[str, Any]]:
"""Get all projects."""
self._ensure_client()
if self._projects_cache is None:
self._projects_cache = self.client.get_projects()
return self._projects_cache
def get_project_by_name(self, name: str) -> Optional[Dict[str, Any]]:
"""Find a project by name."""
projects = self.get_projects()
for project in projects:
if project.get("name", "").lower() == name.lower():
return project
return None
def get_tasks_by_project(
self, project_name: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Get tasks filtered by project name.
Args:
project_name: Name of the project to filter by
Returns:
List of task dictionaries
"""
if not project_name:
return self.get_tasks()
# Find project by name
project = self.get_project_by_name(project_name)
if not project:
return []
return self.get_tasks(project_id=project["id"])
def get_tasks_by_due_date(
self, due_date: Union[str, datetime]
) -> List[Dict[str, Any]]:
"""
Get tasks filtered by due date.
Args:
due_date: Due date as string or datetime object
Returns:
List of task dictionaries
"""
if isinstance(due_date, str):
if due_date.lower() == "today":
target_date = datetime.now().date()
elif due_date.lower() == "tomorrow":
target_date = (datetime.now() + timedelta(days=1)).date()
elif due_date.lower() == "yesterday":
target_date = (datetime.now() - timedelta(days=1)).date()
else:
try:
target_date = date_parser.parse(due_date).date()
except ValueError:
raise ValueError(f"Invalid date format: {due_date}")
else:
target_date = due_date.date()
tasks = self.get_tasks()
filtered_tasks = []
for task in tasks:
if task.get("dueDate"):
try:
task_due_date = date_parser.parse(task["dueDate"]).date()
if task_due_date == target_date:
filtered_tasks.append(task)
except (ValueError, TypeError):
continue
return filtered_tasks
def create_task(
self,
title: str,
project_name: Optional[str] = None,
due_date: Optional[str] = None,
priority: Optional[int] = None,
content: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Create a new task.
Args:
title: Task title
project_name: Project name (optional)
due_date: Due date string (optional)
priority: Priority level 0-5 (optional)
content: Task description/content (optional)
tags: List of tag names (optional)
Returns:
Created task dictionary
"""
self._ensure_client()
# Convert project name to ID if provided
project_id = None
if project_name:
project = self.get_project_by_name(project_name)
if project:
project_id = project["id"]
# Process due date
processed_due_date = None
if due_date:
if due_date.lower() == "today":
processed_due_date = datetime.now().isoformat()
elif due_date.lower() == "tomorrow":
processed_due_date = (datetime.now() + timedelta(days=1)).isoformat()
else:
try:
parsed_date = date_parser.parse(due_date)
processed_due_date = parsed_date.isoformat()
except ValueError:
raise ValueError(f"Invalid date format: {due_date}")
return self.client.create_task(
title=title,
content=content,
project_id=project_id,
due_date=processed_due_date,
priority=priority,
tags=tags,
)
def update_task(
self,
task_id: str,
title: Optional[str] = None,
project_name: Optional[str] = None,
due_date: Optional[str] = None,
priority: Optional[int] = None,
content: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Update an existing task.
Args:
task_id: Task ID to update
title: New title (optional)
project_name: New project name (optional)
due_date: New due date (optional)
priority: New priority (optional)
content: New content (optional)
tags: New tags (optional)
Returns:
Updated task dictionary
"""
self._ensure_client()
update_data = {}
if title:
update_data["title"] = title
if content:
update_data["content"] = content
if priority is not None:
update_data["priority"] = priority
if tags:
update_data["tags"] = tags
# Convert project name to ID if provided
if project_name:
project = self.get_project_by_name(project_name)
if project:
update_data["projectId"] = project["id"]
# Process due date
if due_date:
if due_date.lower() == "today":
update_data["dueDate"] = datetime.now().isoformat()
elif due_date.lower() == "tomorrow":
update_data["dueDate"] = (
datetime.now() + timedelta(days=1)
).isoformat()
else:
try:
parsed_date = date_parser.parse(due_date)
update_data["dueDate"] = parsed_date.isoformat()
except ValueError:
raise ValueError(f"Invalid date format: {due_date}")
return self.client.update_task(task_id, **update_data)
def complete_task(self, task_id: str) -> Dict[str, Any]:
"""Mark a task as completed."""
self._ensure_client()
return self.client.complete_task(task_id)
def delete_task(self, task_id: str) -> bool:
"""Delete a task."""
self._ensure_client()
return self.client.delete_task(task_id)
def get_task(self, task_id: str) -> Dict[str, Any]:
"""Get a specific task by ID."""
self._ensure_client()
return self.client.get_task(task_id)
def sync(self):
"""Sync with TickTick servers (clear cache)."""
self._projects_cache = None
def search_tasks(self, query: str) -> List[Dict[str, Any]]:
"""
Search tasks by title or content.
Args:
query: Search query string
Returns:
List of matching task dictionaries
"""
tasks = self.get_tasks()
query_lower = query.lower()
matching_tasks = []
for task in tasks:
title = task.get("title", "").lower()
content = task.get("content", "").lower()
if query_lower in title or query_lower in content:
matching_tasks.append(task)
return matching_tasks
def get_tasks_by_priority(self, min_priority: int = 1) -> List[Dict[str, Any]]:
"""
Get tasks filtered by priority level.
Args:
min_priority: Minimum priority level (1-5)
Returns:
List of task dictionaries
"""
tasks = self.get_tasks()
return [task for task in tasks if task.get("priority", 0) >= min_priority]
def get_tasks_by_tags(self, tag_names: List[str]) -> List[Dict[str, Any]]:
"""
Get tasks that have any of the specified tags.
Args:
tag_names: List of tag names to search for
Returns:
List of task dictionaries
"""
tasks = self.get_tasks()
tag_names_lower = [tag.lower() for tag in tag_names]
matching_tasks = []
for task in tasks:
task_tags = task.get("tags", [])
task_tags_lower = [tag.lower() for tag in task_tags]
if any(tag in task_tags_lower for tag in tag_names_lower):
matching_tasks.append(task)
return matching_tasks

View File

@@ -0,0 +1,144 @@
"""
Direct TickTick API client using only OAuth tokens.
This bypasses the flawed ticktick-py library that incorrectly requires username/password.
"""
import requests
import urllib3
from typing import Optional, Dict, List, Any
from datetime import datetime
import logging
from .auth import load_stored_tokens
# Suppress SSL warnings for corporate networks
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class TickTickDirectClient:
"""Direct TickTick API client using OAuth only."""
BASE_URL = "https://api.ticktick.com/open/v1"
def __init__(self):
"""Initialize the client with OAuth token."""
self.tokens = load_stored_tokens()
if not self.tokens:
raise RuntimeError(
"No OAuth tokens found. Please run authentication first."
)
self.access_token = self.tokens["access_token"]
self.session = requests.Session()
self.session.verify = False # Disable SSL verification for corporate networks
# Set headers
self.session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
)
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make a request to the TickTick API."""
url = f"{self.BASE_URL}/{endpoint.lstrip('/')}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise RuntimeError(
"OAuth token expired or invalid. Please re-authenticate."
)
elif response.status_code == 429:
raise RuntimeError("Rate limit exceeded. Please try again later.")
else:
raise RuntimeError(f"API request failed: {e}")
def get_projects(self) -> List[Dict[str, Any]]:
"""Get all projects."""
response = self._request("GET", "/project")
return response.json()
def get_tasks(self, project_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get tasks, optionally filtered by project."""
# NOTE: TickTick's GET /task endpoint appears to have issues (returns 500)
# This is a known limitation of their API
# For now, we'll return an empty list and log the issue
import logging
logging.warning(
"TickTick GET /task endpoint returns 500 server error - this is a known API issue"
)
# TODO: Implement alternative task fetching when TickTick fixes their API
# Possible workarounds:
# 1. Use websocket/sync endpoints
# 2. Cache created tasks locally
# 3. Use different API version when available
return []
def create_task(
self,
title: str,
content: Optional[str] = None,
project_id: Optional[str] = None,
due_date: Optional[str] = None,
priority: Optional[int] = None,
tags: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Create a new task."""
task_data = {"title": title}
if content:
task_data["content"] = content
if project_id:
task_data["projectId"] = project_id
if due_date:
# Convert date string to ISO format if needed
task_data["dueDate"] = due_date
if priority is not None:
task_data["priority"] = priority
if tags:
task_data["tags"] = tags
response = self._request("POST", "/task", json=task_data)
return response.json()
def update_task(self, task_id: str, **kwargs) -> Dict[str, Any]:
"""Update an existing task."""
response = self._request("POST", f"/task/{task_id}", json=kwargs)
return response.json()
def complete_task(self, task_id: str) -> Dict[str, Any]:
"""Mark a task as completed."""
return self.update_task(task_id, status=2) # 2 = completed
def delete_task(self, task_id: str) -> bool:
"""Delete a task."""
try:
self._request("DELETE", f"/task/{task_id}")
return True
except Exception:
return False
def get_task(self, task_id: str) -> Dict[str, Any]:
"""Get a specific task by ID."""
# NOTE: TickTick's GET /task/{id} endpoint also returns 500 server error
import logging
logging.warning(
f"TickTick GET /task/{task_id} endpoint returns 500 server error - this is a known API issue"
)
# Return minimal task info
return {
"id": task_id,
"title": "Task details unavailable (API issue)",
"status": 0,
}

284
src/utils/ticktick_utils.py Normal file
View File

@@ -0,0 +1,284 @@
"""
TickTick utilities for formatting and helper functions.
"""
import platform
import subprocess
import webbrowser
from datetime import datetime, date, timedelta
from typing import Dict, Any, List, Optional
from rich.console import Console
from rich.table import Table
from rich.text import Text
from dateutil import parser as date_parser
console = Console()
def format_date(date_str: Optional[str]) -> str:
"""
Format a date string for display.
Args:
date_str: ISO date string
Returns:
Formatted date string
"""
if not date_str:
return ""
try:
dt = date_parser.parse(date_str)
today = datetime.now().date()
task_date = dt.date()
if task_date == today:
return "Today"
elif task_date == today + timedelta(days=1):
return "Tomorrow"
elif task_date == today - timedelta(days=1):
return "Yesterday"
else:
return dt.strftime("%Y-%m-%d")
except (ValueError, TypeError):
return str(date_str)
def get_priority_display(priority: int) -> Text:
"""
Get a rich Text object for priority display.
Args:
priority: Priority level (0-5)
Returns:
Rich Text object with colored priority
"""
if priority == 0:
return Text("", style="dim")
elif priority == 1:
return Text("!", style="blue")
elif priority == 2:
return Text("!!", style="yellow")
elif priority >= 3:
return Text("!!!", style="red bold")
else:
return Text("", style="dim")
def format_task_title(task: Dict[str, Any], max_length: int = 50) -> str:
"""
Format task title with truncation if needed.
Args:
task: Task dictionary
max_length: Maximum length for title
Returns:
Formatted title string
"""
title = task.get("title", "Untitled")
if len(title) > max_length:
return title[: max_length - 3] + "..."
return title
def create_task_table(tasks: List[Dict[str, Any]], show_project: bool = True) -> Table:
"""
Create a rich table for displaying tasks.
Args:
tasks: List of task dictionaries
show_project: Whether to show project column
Returns:
Rich Table object
"""
table = Table(show_header=True, header_style="bold magenta")
table.add_column("ID", style="dim", width=8)
table.add_column("Priority", width=8)
table.add_column("Title", style="white", min_width=30)
if show_project:
table.add_column("Project", style="cyan", width=15)
table.add_column("Due Date", style="yellow", width=12)
table.add_column("Tags", style="green", width=20)
for task in tasks:
task_id = str(task.get("id", ""))[:8]
priority_text = get_priority_display(task.get("priority", 0))
title = format_task_title(task)
due_date = format_date(task.get("dueDate"))
# Get project name (would need to be looked up from projects)
project_name = task.get("projectId", "Inbox")[:15] if show_project else None
# Format tags
tags_list = task.get("tags", [])
if isinstance(tags_list, list):
tags = ", ".join(tags_list[:3]) # Show max 3 tags
if len(tags_list) > 3:
tags += f" (+{len(tags_list) - 3})"
else:
tags = ""
if show_project:
table.add_row(task_id, priority_text, title, project_name, due_date, tags)
else:
table.add_row(task_id, priority_text, title, due_date, tags)
return table
def print_task_details(task: Dict[str, Any]):
"""
Print detailed view of a single task.
Args:
task: Task dictionary
"""
console.print(f"[bold cyan]Task Details[/bold cyan]")
console.print(f"ID: {task.get('id', 'N/A')}")
console.print(f"Title: [white]{task.get('title', 'Untitled')}[/white]")
if task.get("content"):
console.print(f"Description: {task.get('content')}")
console.print(f"Priority: {get_priority_display(task.get('priority', 0))}")
console.print(f"Project ID: {task.get('projectId', 'N/A')}")
if task.get("dueDate"):
console.print(f"Due Date: [yellow]{format_date(task.get('dueDate'))}[/yellow]")
if task.get("tags"):
console.print(f"Tags: [green]{', '.join(task.get('tags', []))}[/green]")
console.print(f"Status: {'Completed' if task.get('status') == 2 else 'Open'}")
if task.get("createdTime"):
console.print(f"Created: {format_date(task.get('createdTime'))}")
if task.get("modifiedTime"):
console.print(f"Modified: {format_date(task.get('modifiedTime'))}")
def open_task_in_browser(task_id: str):
"""
Open a task in the default web browser.
Args:
task_id: Task ID to open
"""
url = f"https://ticktick.com/webapp/#q/all/tasks/{task_id}"
webbrowser.open(url)
console.print(f"[green]Opened task in browser: {url}[/green]")
def open_task_in_macos_app(task_id: str) -> bool:
"""
Open a task in the TickTick macOS app.
Args:
task_id: Task ID to open
Returns:
True if successful, False otherwise
"""
if platform.system() != "Darwin":
console.print("[red]macOS app opening is only available on macOS[/red]")
return False
try:
# Try to open with TickTick URL scheme
ticktick_url = f"ticktick://task/{task_id}"
result = subprocess.run(
["open", ticktick_url], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
console.print(f"[green]Opened task in TickTick app[/green]")
return True
else:
console.print(
"[yellow]TickTick app not found, opening in browser instead[/yellow]"
)
open_task_in_browser(task_id)
return False
except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError):
console.print(
"[yellow]Failed to open TickTick app, opening in browser instead[/yellow]"
)
open_task_in_browser(task_id)
return False
def open_task(task_id: str, prefer_app: bool = True):
"""
Open a task in browser or app based on preference.
Args:
task_id: Task ID to open
prefer_app: Whether to prefer native app over browser
"""
if prefer_app and platform.system() == "Darwin":
if not open_task_in_macos_app(task_id):
open_task_in_browser(task_id)
else:
open_task_in_browser(task_id)
def parse_priority(priority_str: str) -> int:
"""
Parse priority string to integer.
Args:
priority_str: Priority as string (low, medium, high, none, or 0-5)
Returns:
Priority integer (0-5)
"""
if not priority_str:
return 0
priority_str = priority_str.lower().strip()
if priority_str in ["none", "no", "0"]:
return 0
elif priority_str in ["low", "1"]:
return 1
elif priority_str in ["medium", "med", "2"]:
return 2
elif priority_str in ["high", "3"]:
return 3
elif priority_str in ["very high", "urgent", "4"]:
return 4
elif priority_str in ["critical", "5"]:
return 5
else:
try:
priority = int(priority_str)
return max(0, min(5, priority))
except ValueError:
return 0
def validate_date(date_str: str) -> bool:
"""
Validate if a date string is parseable.
Args:
date_str: Date string to validate
Returns:
True if valid, False otherwise
"""
if date_str.lower() in ["today", "tomorrow", "yesterday"]:
return True
try:
date_parser.parse(date_str)
return True
except ValueError:
return False