Files
luk/src/services/dstask/client.py
2026-01-07 13:08:15 -05:00

400 lines
12 KiB
Python

"""dstask CLI client implementation.
This module implements the TaskBackend interface for dstask,
a local-first task manager with Git sync support.
"""
import json
import logging
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
from src.tasks.backend import (
Project,
Task,
TaskBackend,
TaskPriority,
TaskStatus,
)
logger = logging.getLogger(__name__)
class DstaskClient(TaskBackend):
"""Client for interacting with dstask CLI."""
def __init__(self, dstask_path: Optional[str] = None):
"""Initialize dstask client.
Args:
dstask_path: Path to dstask binary. Defaults to ~/.local/bin/dstask
"""
if dstask_path is None:
dstask_path = str(Path.home() / ".local" / "bin" / "dstask")
self.dstask_path = dstask_path
def is_available(self) -> bool:
"""Check if dstask binary is available and executable."""
path = Path(self.dstask_path)
return path.exists() and path.is_file()
def _run_command(
self, args: list[str], capture_output: bool = True
) -> subprocess.CompletedProcess:
"""Run a dstask command.
Args:
args: Command arguments (without dstask binary)
capture_output: Whether to capture stdout/stderr
Returns:
CompletedProcess result
"""
cmd = [self.dstask_path] + args
logger.debug(f"Running: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=capture_output,
text=True,
)
if result.returncode != 0:
logger.warning(f"dstask command failed: {result.stderr}")
return result
def _parse_datetime(self, value: str) -> Optional[datetime]:
"""Parse datetime from dstask JSON format."""
if not value:
return None
# dstask uses RFC3339 format
try:
# Handle Z suffix
if value.endswith("Z"):
value = value[:-1] + "+00:00"
dt = datetime.fromisoformat(value)
# dstask uses year 1 (0001-01-01) to indicate no date set
if dt.year == 1:
return None
return dt
except ValueError:
logger.warning(f"Failed to parse datetime: {value}")
return None
def _parse_task(self, data: dict) -> Task:
"""Parse a task from dstask JSON output."""
# Map dstask status to TaskStatus
status_map = {
"pending": TaskStatus.PENDING,
"active": TaskStatus.ACTIVE,
"resolved": TaskStatus.DONE,
"deleted": TaskStatus.DELETED,
}
status = status_map.get(data.get("status", "pending"), TaskStatus.PENDING)
# Parse priority
priority_str = data.get("priority", "P2")
priority = TaskPriority.from_string(priority_str)
return Task(
uuid=data.get("uuid", ""),
id=data.get("id", 0),
summary=data.get("summary", ""),
status=status,
priority=priority,
project=data.get("project", ""),
tags=data.get("tags", []) or [],
notes=data.get("notes", ""),
due=self._parse_datetime(data.get("due", "")),
created=self._parse_datetime(data.get("created", "")),
resolved=self._parse_datetime(data.get("resolved", "")),
)
def _get_tasks_json(self, command: str = "show-open") -> list[Task]:
"""Get tasks using a dstask command that outputs JSON."""
result = self._run_command([command])
if result.returncode != 0:
logger.error(f"Failed to get tasks: {result.stderr}")
return []
try:
data = json.loads(result.stdout)
return [self._parse_task(t) for t in data]
except json.JSONDecodeError as e:
logger.error(f"Failed to parse dstask output: {e}")
return []
def get_tasks(
self,
project: Optional[str] = None,
tags: Optional[list[str]] = None,
status: Optional[TaskStatus] = None,
) -> list[Task]:
"""Get tasks, optionally filtered by project, tags, or status."""
# Build filter arguments
args = ["show-open"]
if project:
args.append(f"project:{project}")
if tags:
for tag in tags:
args.append(f"+{tag}")
result = self._run_command(args)
if result.returncode != 0:
return []
try:
data = json.loads(result.stdout)
tasks = [self._parse_task(t) for t in data]
# Filter by status if specified
if status:
tasks = [t for t in tasks if t.status == status]
return tasks
except json.JSONDecodeError:
return []
def get_next_tasks(self) -> list[Task]:
"""Get the 'next' tasks to work on."""
return self._get_tasks_json("next")
def get_task(self, task_id: str) -> Optional[Task]:
"""Get a single task by ID or UUID."""
result = self._run_command(["show-open"])
if result.returncode != 0:
return None
try:
data = json.loads(result.stdout)
for t in data:
if str(t.get("id")) == task_id or t.get("uuid") == task_id:
return self._parse_task(t)
except json.JSONDecodeError:
pass
return None
def add_task(
self,
summary: str,
project: Optional[str] = None,
tags: Optional[list[str]] = None,
priority: Optional[TaskPriority] = None,
due: Optional[datetime] = None,
notes: Optional[str] = None,
) -> Task:
"""Create a new task.
Notes are added using dstask's / syntax during creation, where
everything after / becomes the note content. Each word must be
a separate argument for this to work.
"""
args = ["add", summary]
if project:
args.append(f"project:{project}")
if tags:
for tag in tags:
args.append(f"+{tag}")
if priority:
args.append(priority.value)
if due:
# dstask uses various date formats
args.append(f"due:{due.strftime('%Y-%m-%d')}")
# Add notes using / syntax - each word must be a separate argument
# dstask interprets everything after "/" as note content
if notes:
args.append("/")
# Split notes into words to pass as separate arguments
args.extend(notes.split())
result = self._run_command(args)
if result.returncode != 0:
raise RuntimeError(f"Failed to add task: {result.stderr}")
# Get the newly created task (it should be the last one)
tasks = self.get_next_tasks()
if tasks:
# Find task by summary (best effort)
for task in reversed(tasks):
if task.summary == summary:
return task
# Return a placeholder if we can't find it
return Task(
uuid="",
id=0,
summary=summary,
project=project or "",
tags=tags or [],
priority=priority or TaskPriority.P2,
notes=notes or "",
due=due,
)
def complete_task(self, task_id: str) -> bool:
"""Mark a task as complete."""
result = self._run_command(["done", task_id])
return result.returncode == 0
def delete_task(self, task_id: str) -> bool:
"""Delete a task."""
# dstask uses 'remove' for deletion
result = self._run_command(["remove", task_id])
return result.returncode == 0
def start_task(self, task_id: str) -> bool:
"""Start working on a task (mark as active)."""
result = self._run_command(["start", task_id])
return result.returncode == 0
def stop_task(self, task_id: str) -> bool:
"""Stop working on a task (mark as pending)."""
result = self._run_command(["stop", task_id])
return result.returncode == 0
def modify_task(
self,
task_id: str,
summary: Optional[str] = None,
project: Optional[str] = None,
tags: Optional[list[str]] = None,
priority: Optional[TaskPriority] = None,
due: Optional[datetime] = None,
notes: Optional[str] = None,
) -> bool:
"""Modify a task."""
args = ["modify", task_id]
if summary:
args.append(summary)
if project:
args.append(f"project:{project}")
if tags:
for tag in tags:
args.append(f"+{tag}")
if priority:
args.append(priority.value)
if due:
args.append(f"due:{due.strftime('%Y-%m-%d')}")
result = self._run_command(args)
# Handle notes separately
if notes is not None and result.returncode == 0:
self._run_command(["note", task_id, notes])
return result.returncode == 0
def get_projects(self) -> list[Project]:
"""Get all projects."""
result = self._run_command(["show-projects"])
if result.returncode != 0:
return []
try:
data = json.loads(result.stdout)
projects = []
for p in data:
priority = TaskPriority.from_string(p.get("priority", "P2"))
projects.append(
Project(
name=p.get("name", ""),
task_count=p.get("taskCount", 0),
resolved_count=p.get("resolvedCount", 0),
active=p.get("active", True),
priority=priority,
)
)
return projects
except json.JSONDecodeError:
return []
def get_tags(self) -> list[str]:
"""Get all tags."""
result = self._run_command(["show-tags"])
if result.returncode != 0:
return []
# show-tags outputs plain text, one tag per line
tags = result.stdout.strip().split("\n")
return [t.strip() for t in tags if t.strip()]
def sync(self) -> bool:
"""Sync tasks with Git remote."""
result = self._run_command(["sync"])
return result.returncode == 0
def edit_task_interactive(self, task_id: str) -> bool:
"""Open task in editor for interactive editing."""
# This needs to run without capturing output
result = self._run_command(["edit", task_id], capture_output=False)
return result.returncode == 0
def edit_note_interactive(self, task_id: str) -> bool:
"""Open task notes in editor for interactive editing."""
# This needs to run without capturing output
result = self._run_command(["note", task_id], capture_output=False)
return result.returncode == 0
def get_context(self) -> Optional[str]:
"""Get the current context filter.
Returns:
Current context string, or None if no context is set
"""
result = self._run_command(["context"])
if result.returncode == 0:
context = result.stdout.strip()
return context if context else None
return None
def set_context(self, context: Optional[str]) -> bool:
"""Set the context filter.
Args:
context: Context string (e.g., "+work", "project:foo") or None to clear
Returns:
True if successful
"""
if context is None or context.lower() == "none" or context == "":
result = self._run_command(["context", "none"])
else:
result = self._run_command(["context", context])
return result.returncode == 0
def get_contexts(self) -> list[str]:
"""Get available contexts based on tags.
For dstask, contexts are typically tag-based filters like "+work".
We derive available contexts from the existing tags.
Returns:
List of context strings (tag-based)
"""
# Get all tags and convert to context format
tags = self.get_tags()
return [f"+{tag}" for tag in tags if tag]