"""dstask CLI client implementation. This module implements the TaskBackend interface for dstask, a local-first task manager with Git sync support. """ import json import logging import subprocess from datetime import datetime from pathlib import Path from typing import Optional from src.tasks.backend import ( Project, Task, TaskBackend, TaskPriority, TaskStatus, ) logger = logging.getLogger(__name__) class DstaskClient(TaskBackend): """Client for interacting with dstask CLI.""" def __init__(self, dstask_path: Optional[str] = None): """Initialize dstask client. Args: dstask_path: Path to dstask binary. Defaults to ~/.local/bin/dstask """ if dstask_path is None: dstask_path = str(Path.home() / ".local" / "bin" / "dstask") self.dstask_path = dstask_path def is_available(self) -> bool: """Check if dstask binary is available and executable.""" path = Path(self.dstask_path) return path.exists() and path.is_file() def _run_command( self, args: list[str], capture_output: bool = True ) -> subprocess.CompletedProcess: """Run a dstask command. Args: args: Command arguments (without dstask binary) capture_output: Whether to capture stdout/stderr Returns: CompletedProcess result """ cmd = [self.dstask_path] + args logger.debug(f"Running: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=capture_output, text=True, ) if result.returncode != 0: logger.warning(f"dstask command failed: {result.stderr}") return result def _parse_datetime(self, value: str) -> Optional[datetime]: """Parse datetime from dstask JSON format.""" if not value: return None # dstask uses RFC3339 format try: # Handle Z suffix if value.endswith("Z"): value = value[:-1] + "+00:00" dt = datetime.fromisoformat(value) # dstask uses year 1 (0001-01-01) to indicate no date set if dt.year == 1: return None return dt except ValueError: logger.warning(f"Failed to parse datetime: {value}") return None def _parse_task(self, data: dict) -> Task: """Parse a task from dstask JSON output.""" # Map dstask status to TaskStatus status_map = { "pending": TaskStatus.PENDING, "active": TaskStatus.ACTIVE, "resolved": TaskStatus.DONE, "deleted": TaskStatus.DELETED, } status = status_map.get(data.get("status", "pending"), TaskStatus.PENDING) # Parse priority priority_str = data.get("priority", "P2") priority = TaskPriority.from_string(priority_str) return Task( uuid=data.get("uuid", ""), id=data.get("id", 0), summary=data.get("summary", ""), status=status, priority=priority, project=data.get("project", ""), tags=data.get("tags", []) or [], notes=data.get("notes", ""), due=self._parse_datetime(data.get("due", "")), created=self._parse_datetime(data.get("created", "")), resolved=self._parse_datetime(data.get("resolved", "")), ) def _get_tasks_json(self, command: str = "show-open") -> list[Task]: """Get tasks using a dstask command that outputs JSON.""" result = self._run_command([command]) if result.returncode != 0: logger.error(f"Failed to get tasks: {result.stderr}") return [] try: data = json.loads(result.stdout) return [self._parse_task(t) for t in data] except json.JSONDecodeError as e: logger.error(f"Failed to parse dstask output: {e}") return [] def get_tasks( self, project: Optional[str] = None, tags: Optional[list[str]] = None, status: Optional[TaskStatus] = None, ) -> list[Task]: """Get tasks, optionally filtered by project, tags, or status.""" # Build filter arguments args = ["show-open"] if project: args.append(f"project:{project}") if tags: for tag in tags: args.append(f"+{tag}") result = self._run_command(args) if result.returncode != 0: return [] try: data = json.loads(result.stdout) tasks = [self._parse_task(t) for t in data] # Filter by status if specified if status: tasks = [t for t in tasks if t.status == status] return tasks except json.JSONDecodeError: return [] def get_next_tasks(self) -> list[Task]: """Get the 'next' tasks to work on.""" return self._get_tasks_json("next") def get_task(self, task_id: str) -> Optional[Task]: """Get a single task by ID or UUID.""" result = self._run_command(["show-open"]) if result.returncode != 0: return None try: data = json.loads(result.stdout) for t in data: if str(t.get("id")) == task_id or t.get("uuid") == task_id: return self._parse_task(t) except json.JSONDecodeError: pass return None def add_task( self, summary: str, project: Optional[str] = None, tags: Optional[list[str]] = None, priority: Optional[TaskPriority] = None, due: Optional[datetime] = None, notes: Optional[str] = None, ) -> Task: """Create a new task.""" args = ["add", summary] if project: args.append(f"project:{project}") if tags: for tag in tags: args.append(f"+{tag}") if priority: args.append(priority.value) if due: # dstask uses various date formats args.append(f"due:{due.strftime('%Y-%m-%d')}") result = self._run_command(args) if result.returncode != 0: raise RuntimeError(f"Failed to add task: {result.stderr}") # Get the newly created task (it should be the last one) tasks = self.get_next_tasks() if tasks: # Find task by summary (best effort) for task in reversed(tasks): if task.summary == summary: # Add notes if provided if notes: self._run_command(["note", str(task.id), notes]) task.notes = notes return task # Return a placeholder if we can't find it return Task( uuid="", id=0, summary=summary, project=project or "", tags=tags or [], priority=priority or TaskPriority.P2, notes=notes or "", due=due, ) def complete_task(self, task_id: str) -> bool: """Mark a task as complete.""" result = self._run_command(["done", task_id]) return result.returncode == 0 def delete_task(self, task_id: str) -> bool: """Delete a task.""" # dstask uses 'remove' for deletion result = self._run_command(["remove", task_id]) return result.returncode == 0 def start_task(self, task_id: str) -> bool: """Start working on a task (mark as active).""" result = self._run_command(["start", task_id]) return result.returncode == 0 def stop_task(self, task_id: str) -> bool: """Stop working on a task (mark as pending).""" result = self._run_command(["stop", task_id]) return result.returncode == 0 def modify_task( self, task_id: str, summary: Optional[str] = None, project: Optional[str] = None, tags: Optional[list[str]] = None, priority: Optional[TaskPriority] = None, due: Optional[datetime] = None, notes: Optional[str] = None, ) -> bool: """Modify a task.""" args = ["modify", task_id] if summary: args.append(summary) if project: args.append(f"project:{project}") if tags: for tag in tags: args.append(f"+{tag}") if priority: args.append(priority.value) if due: args.append(f"due:{due.strftime('%Y-%m-%d')}") result = self._run_command(args) # Handle notes separately if notes is not None and result.returncode == 0: self._run_command(["note", task_id, notes]) return result.returncode == 0 def get_projects(self) -> list[Project]: """Get all projects.""" result = self._run_command(["show-projects"]) if result.returncode != 0: return [] try: data = json.loads(result.stdout) projects = [] for p in data: priority = TaskPriority.from_string(p.get("priority", "P2")) projects.append( Project( name=p.get("name", ""), task_count=p.get("taskCount", 0), resolved_count=p.get("resolvedCount", 0), active=p.get("active", True), priority=priority, ) ) return projects except json.JSONDecodeError: return [] def get_tags(self) -> list[str]: """Get all tags.""" result = self._run_command(["show-tags"]) if result.returncode != 0: return [] # show-tags outputs plain text, one tag per line tags = result.stdout.strip().split("\n") return [t.strip() for t in tags if t.strip()] def sync(self) -> bool: """Sync tasks with Git remote.""" result = self._run_command(["sync"]) return result.returncode == 0 def edit_task_interactive(self, task_id: str) -> bool: """Open task in editor for interactive editing.""" # This needs to run without capturing output result = self._run_command(["edit", task_id], capture_output=False) return result.returncode == 0