diff --git a/.coverage b/.coverage index eca4f3d..c5909dd 100644 Binary files a/.coverage and b/.coverage differ diff --git a/src/cli/__init__.py b/src/cli/__init__.py index a456b5d..6d8a5af 100644 --- a/src/cli/__init__.py +++ b/src/cli/__init__.py @@ -9,6 +9,7 @@ from .calendar import calendar from .ticktick import ticktick from .godspeed import godspeed from .gitlab_monitor import gitlab_monitor +from .tasks import tasks @click.group() @@ -24,6 +25,7 @@ cli.add_command(calendar) cli.add_command(ticktick) cli.add_command(godspeed) cli.add_command(gitlab_monitor) +cli.add_command(tasks) # Add 'mail' as an alias for email cli.add_command(email, name="mail") diff --git a/src/cli/sync_dashboard.py b/src/cli/sync_dashboard.py index 439f542..821ccb2 100644 --- a/src/cli/sync_dashboard.py +++ b/src/cli/sync_dashboard.py @@ -298,6 +298,7 @@ class SyncDashboard(App): TaskListItem("calendar", "Calendar Sync", id="task-calendar"), # Stage 3: Task management TaskListItem("godspeed", "Godspeed Sync", id="task-godspeed"), + TaskListItem("dstask", "dstask Sync", id="task-dstask"), TaskListItem("sweep", "Task Sweep", id="task-sweep"), id="task-list", ) @@ -871,6 +872,14 @@ async def run_dashboard_sync( await asyncio.sleep(0.3) tracker.complete_task("godspeed", "42 tasks synced") + # dstask sync + tracker.start_task("dstask", 100) + tracker.update_task("dstask", 30, "Running dstask sync...") + await asyncio.sleep(0.3) + tracker.update_task("dstask", 70, "Pushing changes...") + await asyncio.sleep(0.2) + tracker.complete_task("dstask", "Sync completed") + # Task sweep tracker.start_task("sweep") tracker.update_task("sweep", 50, "Scanning notes directory...") @@ -1084,6 +1093,24 @@ async def run_dashboard_sync( else: tracker.skip_task("godspeed", "Not due yet (every 15 min)") + # dstask sync + tracker.start_task("dstask", 100) + try: + from src.services.dstask.client import DstaskClient + + dstask_client = DstaskClient() + if dstask_client.is_available(): + tracker.update_task("dstask", 30, "Running dstask sync...") + success = dstask_client.sync() + if success: + tracker.complete_task("dstask", "Sync completed") + else: + tracker.error_task("dstask", "Sync failed") + else: + tracker.skip_task("dstask", "dstask not installed") + except Exception as e: + tracker.error_task("dstask", str(e)) + # Task sweep (runs once daily after 6 PM) tracker.start_task("sweep", 100) if should_run_sweep(): @@ -1133,6 +1160,7 @@ async def run_dashboard_sync( "inbox", "calendar", "godspeed", + "dstask", "sweep", ]: if task_id in dashboard._task_items: diff --git a/src/cli/tasks.py b/src/cli/tasks.py new file mode 100644 index 0000000..2325985 --- /dev/null +++ b/src/cli/tasks.py @@ -0,0 +1,11 @@ +"""CLI command for Tasks TUI.""" + +import click + + +@click.command() +def tasks(): + """Launch the Tasks TUI for managing tasks via dstask.""" + from src.tasks import run_app + + run_app() diff --git a/src/services/dstask/__init__.py b/src/services/dstask/__init__.py new file mode 100644 index 0000000..b0c44ca --- /dev/null +++ b/src/services/dstask/__init__.py @@ -0,0 +1,5 @@ +"""dstask client service for Tasks TUI.""" + +from .client import DstaskClient + +__all__ = ["DstaskClient"] diff --git a/src/services/dstask/client.py b/src/services/dstask/client.py new file mode 100644 index 0000000..bcddb40 --- /dev/null +++ b/src/services/dstask/client.py @@ -0,0 +1,345 @@ +"""dstask CLI client implementation. + +This module implements the TaskBackend interface for dstask, +a local-first task manager with Git sync support. +""" + +import json +import logging +import subprocess +from datetime import datetime +from pathlib import Path +from typing import Optional + +from src.tasks.backend import ( + Project, + Task, + TaskBackend, + TaskPriority, + TaskStatus, +) + +logger = logging.getLogger(__name__) + + +class DstaskClient(TaskBackend): + """Client for interacting with dstask CLI.""" + + def __init__(self, dstask_path: Optional[str] = None): + """Initialize dstask client. + + Args: + dstask_path: Path to dstask binary. Defaults to ~/.local/bin/dstask + """ + if dstask_path is None: + dstask_path = str(Path.home() / ".local" / "bin" / "dstask") + self.dstask_path = dstask_path + + def is_available(self) -> bool: + """Check if dstask binary is available and executable.""" + path = Path(self.dstask_path) + return path.exists() and path.is_file() + + def _run_command( + self, args: list[str], capture_output: bool = True + ) -> subprocess.CompletedProcess: + """Run a dstask command. + + Args: + args: Command arguments (without dstask binary) + capture_output: Whether to capture stdout/stderr + + Returns: + CompletedProcess result + """ + cmd = [self.dstask_path] + args + logger.debug(f"Running: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + capture_output=capture_output, + text=True, + ) + + if result.returncode != 0: + logger.warning(f"dstask command failed: {result.stderr}") + + return result + + def _parse_datetime(self, value: str) -> Optional[datetime]: + """Parse datetime from dstask JSON format.""" + if not value: + return None + + # dstask uses RFC3339 format + try: + # Handle Z suffix + if value.endswith("Z"): + value = value[:-1] + "+00:00" + dt = datetime.fromisoformat(value) + # dstask uses year 1 (0001-01-01) to indicate no date set + if dt.year == 1: + return None + return dt + except ValueError: + logger.warning(f"Failed to parse datetime: {value}") + return None + + def _parse_task(self, data: dict) -> Task: + """Parse a task from dstask JSON output.""" + # Map dstask status to TaskStatus + status_map = { + "pending": TaskStatus.PENDING, + "active": TaskStatus.ACTIVE, + "resolved": TaskStatus.DONE, + "deleted": TaskStatus.DELETED, + } + status = status_map.get(data.get("status", "pending"), TaskStatus.PENDING) + + # Parse priority + priority_str = data.get("priority", "P2") + priority = TaskPriority.from_string(priority_str) + + return Task( + uuid=data.get("uuid", ""), + id=data.get("id", 0), + summary=data.get("summary", ""), + status=status, + priority=priority, + project=data.get("project", ""), + tags=data.get("tags", []) or [], + notes=data.get("notes", ""), + due=self._parse_datetime(data.get("due", "")), + created=self._parse_datetime(data.get("created", "")), + resolved=self._parse_datetime(data.get("resolved", "")), + ) + + def _get_tasks_json(self, command: str = "show-open") -> list[Task]: + """Get tasks using a dstask command that outputs JSON.""" + result = self._run_command([command]) + + if result.returncode != 0: + logger.error(f"Failed to get tasks: {result.stderr}") + return [] + + try: + data = json.loads(result.stdout) + return [self._parse_task(t) for t in data] + except json.JSONDecodeError as e: + logger.error(f"Failed to parse dstask output: {e}") + return [] + + def get_tasks( + self, + project: Optional[str] = None, + tags: Optional[list[str]] = None, + status: Optional[TaskStatus] = None, + ) -> list[Task]: + """Get tasks, optionally filtered by project, tags, or status.""" + # Build filter arguments + args = ["show-open"] + + if project: + args.append(f"project:{project}") + + if tags: + for tag in tags: + args.append(f"+{tag}") + + result = self._run_command(args) + + if result.returncode != 0: + return [] + + try: + data = json.loads(result.stdout) + tasks = [self._parse_task(t) for t in data] + + # Filter by status if specified + if status: + tasks = [t for t in tasks if t.status == status] + + return tasks + except json.JSONDecodeError: + return [] + + def get_next_tasks(self) -> list[Task]: + """Get the 'next' tasks to work on.""" + return self._get_tasks_json("next") + + def get_task(self, task_id: str) -> Optional[Task]: + """Get a single task by ID or UUID.""" + result = self._run_command(["show-open"]) + + if result.returncode != 0: + return None + + try: + data = json.loads(result.stdout) + for t in data: + if str(t.get("id")) == task_id or t.get("uuid") == task_id: + return self._parse_task(t) + except json.JSONDecodeError: + pass + + return None + + def add_task( + self, + summary: str, + project: Optional[str] = None, + tags: Optional[list[str]] = None, + priority: Optional[TaskPriority] = None, + due: Optional[datetime] = None, + notes: Optional[str] = None, + ) -> Task: + """Create a new task.""" + args = ["add", summary] + + if project: + args.append(f"project:{project}") + + if tags: + for tag in tags: + args.append(f"+{tag}") + + if priority: + args.append(priority.value) + + if due: + # dstask uses various date formats + args.append(f"due:{due.strftime('%Y-%m-%d')}") + + result = self._run_command(args) + + if result.returncode != 0: + raise RuntimeError(f"Failed to add task: {result.stderr}") + + # Get the newly created task (it should be the last one) + tasks = self.get_next_tasks() + if tasks: + # Find task by summary (best effort) + for task in reversed(tasks): + if task.summary == summary: + # Add notes if provided + if notes: + self._run_command(["note", str(task.id), notes]) + task.notes = notes + return task + + # Return a placeholder if we can't find it + return Task( + uuid="", + id=0, + summary=summary, + project=project or "", + tags=tags or [], + priority=priority or TaskPriority.P2, + notes=notes or "", + due=due, + ) + + def complete_task(self, task_id: str) -> bool: + """Mark a task as complete.""" + result = self._run_command(["done", task_id]) + return result.returncode == 0 + + def delete_task(self, task_id: str) -> bool: + """Delete a task.""" + # dstask uses 'remove' for deletion + result = self._run_command(["remove", task_id]) + return result.returncode == 0 + + def start_task(self, task_id: str) -> bool: + """Start working on a task (mark as active).""" + result = self._run_command(["start", task_id]) + return result.returncode == 0 + + def stop_task(self, task_id: str) -> bool: + """Stop working on a task (mark as pending).""" + result = self._run_command(["stop", task_id]) + return result.returncode == 0 + + def modify_task( + self, + task_id: str, + summary: Optional[str] = None, + project: Optional[str] = None, + tags: Optional[list[str]] = None, + priority: Optional[TaskPriority] = None, + due: Optional[datetime] = None, + notes: Optional[str] = None, + ) -> bool: + """Modify a task.""" + args = ["modify", task_id] + + if summary: + args.append(summary) + + if project: + args.append(f"project:{project}") + + if tags: + for tag in tags: + args.append(f"+{tag}") + + if priority: + args.append(priority.value) + + if due: + args.append(f"due:{due.strftime('%Y-%m-%d')}") + + result = self._run_command(args) + + # Handle notes separately + if notes is not None and result.returncode == 0: + self._run_command(["note", task_id, notes]) + + return result.returncode == 0 + + def get_projects(self) -> list[Project]: + """Get all projects.""" + result = self._run_command(["show-projects"]) + + if result.returncode != 0: + return [] + + try: + data = json.loads(result.stdout) + projects = [] + for p in data: + priority = TaskPriority.from_string(p.get("priority", "P2")) + projects.append( + Project( + name=p.get("name", ""), + task_count=p.get("taskCount", 0), + resolved_count=p.get("resolvedCount", 0), + active=p.get("active", True), + priority=priority, + ) + ) + return projects + except json.JSONDecodeError: + return [] + + def get_tags(self) -> list[str]: + """Get all tags.""" + result = self._run_command(["show-tags"]) + + if result.returncode != 0: + return [] + + # show-tags outputs plain text, one tag per line + tags = result.stdout.strip().split("\n") + return [t.strip() for t in tags if t.strip()] + + def sync(self) -> bool: + """Sync tasks with Git remote.""" + result = self._run_command(["sync"]) + return result.returncode == 0 + + def edit_task_interactive(self, task_id: str) -> bool: + """Open task in editor for interactive editing.""" + # This needs to run without capturing output + result = self._run_command(["edit", task_id], capture_output=False) + return result.returncode == 0 diff --git a/src/tasks/__init__.py b/src/tasks/__init__.py new file mode 100644 index 0000000..664a6e4 --- /dev/null +++ b/src/tasks/__init__.py @@ -0,0 +1,18 @@ +"""Tasks TUI module for managing tasks via dstask/taskwarrior.""" + +from .config import TasksAppConfig, get_config, reload_config +from .backend import Task, TaskBackend, TaskPriority, TaskStatus, Project +from .app import TasksApp, run_app + +__all__ = [ + "TasksAppConfig", + "get_config", + "reload_config", + "Task", + "TaskBackend", + "TaskPriority", + "TaskStatus", + "Project", + "TasksApp", + "run_app", +] diff --git a/src/tasks/app.py b/src/tasks/app.py new file mode 100644 index 0000000..8544dd1 --- /dev/null +++ b/src/tasks/app.py @@ -0,0 +1,476 @@ +"""Tasks TUI application. + +A Textual-based TUI for managing tasks via dstask/taskwarrior. +""" + +import logging +import sys +import os +from typing import Optional + +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.logging import TextualHandler +from textual.widgets import DataTable, Footer, Header, Static + +from .config import get_config, TasksAppConfig +from .backend import Task, TaskBackend, TaskPriority, TaskStatus, Project + +# Add the parent directory to the system path to resolve relative imports +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +logging.basicConfig( + level="NOTSET", + handlers=[TextualHandler()], +) + +logger = logging.getLogger(__name__) + + +class TasksStatusBar(Static): + """Status bar showing task counts and current filters.""" + + total_tasks: int = 0 + active_filters: list[str] = [] + + def render(self) -> str: + filter_text = " | ".join(self.active_filters) if self.active_filters else "All" + return f"Tasks: {self.total_tasks} | Filter: {filter_text}" + + +class TasksApp(App): + """A TUI for managing tasks via dstask/taskwarrior.""" + + CSS = """ + Screen { + layout: grid; + grid-size: 1; + grid-rows: auto 1fr auto auto; + } + + #task-table { + height: 100%; + } + + DataTable > .datatable--cursor { + background: $accent; + color: $text; + } + + .priority-p0 { + color: red; + } + + .priority-p1 { + color: orange; + } + + .priority-p2 { + color: yellow; + } + + .priority-p3 { + color: gray; + } + + .overdue { + color: red; + text-style: bold; + } + + .status-active { + color: cyan; + text-style: bold; + } + + #status-bar { + dock: bottom; + height: 1; + background: $surface; + color: $text-muted; + padding: 0 1; + } + """ + + BINDINGS = [ + Binding("q", "quit", "Quit", show=True), + Binding("j", "cursor_down", "Down", show=False), + Binding("k", "cursor_up", "Up", show=False), + Binding("g", "first_task", "First", show=False), + Binding("G", "last_task", "Last", show=False), + Binding("d", "complete_task", "Done", show=True), + Binding("s", "start_task", "Start", show=True), + Binding("S", "stop_task", "Stop", show=False), + Binding("a", "add_task", "Add", show=True), + Binding("e", "edit_task", "Edit", show=True), + Binding("x", "delete_task", "Delete", show=False), + Binding("p", "filter_project", "Project", show=True), + Binding("t", "filter_tag", "Tag", show=True), + Binding("c", "clear_filters", "Clear", show=True), + Binding("r", "refresh", "Refresh", show=True), + Binding("y", "sync", "Sync", show=True), + Binding("?", "help", "Help", show=True), + Binding("enter", "view_task", "View", show=False), + ] + + # Class-level type hints (instance variables initialized in __init__) + tasks: list[Task] + projects: list[Project] + tags: list[str] + current_project_filter: Optional[str] + current_tag_filters: list[str] + backend: Optional[TaskBackend] + config: Optional[TasksAppConfig] + + def __init__(self, backend: Optional[TaskBackend] = None): + super().__init__() + # Initialize instance variables + self.tasks = [] + self.projects = [] + self.tags = [] + self.current_project_filter = None + self.current_tag_filters = [] + self.config = get_config() + + if backend: + self.backend = backend + else: + # Create backend from config + from src.services.dstask import DstaskClient + + self.backend = DstaskClient(self.config.backend.dstask_path) + + def compose(self) -> ComposeResult: + """Create the app layout.""" + yield Header() + yield DataTable(id="task-table", cursor_type="row") + yield TasksStatusBar(id="status-bar") + yield Footer() + + def on_mount(self) -> None: + """Initialize the app on mount.""" + table = self.query_one("#task-table", DataTable) + + # Setup columns based on config + columns = ( + self.config.display.columns + if self.config + else ["id", "priority", "project", "tags", "summary", "due"] + ) + + for col in columns: + width = None + if self.config and col in self.config.display.column_widths: + w = self.config.display.column_widths[col] + if w > 0: + width = w + table.add_column(col.capitalize(), width=width, key=col) + + # Load tasks + self.load_tasks() + + def _format_priority(self, priority: TaskPriority) -> str: + """Format priority with icon.""" + if not self.config: + return priority.value + + icons = self.config.icons + icon_map = { + TaskPriority.P0: icons.priority_p0, + TaskPriority.P1: icons.priority_p1, + TaskPriority.P2: icons.priority_p2, + TaskPriority.P3: icons.priority_p3, + } + return f"{icon_map.get(priority, '')} {priority.value}" + + def _format_tags(self, tags: list[str]) -> str: + """Format tags list.""" + if not tags: + return "" + return " ".join(f"+{t}" for t in tags[:3]) # Show max 3 tags + + def _format_due(self, task: Task) -> str: + """Format due date.""" + if not task.due: + return "" + + date_str = task.due.strftime( + self.config.display.date_format if self.config else "%Y-%m-%d" + ) + + if task.is_overdue: + return f"! {date_str}" + return date_str + + def _get_row_data(self, task: Task) -> list[str]: + """Get row data for a task based on configured columns.""" + columns = ( + self.config.display.columns + if self.config + else ["id", "priority", "project", "tags", "summary", "due"] + ) + + data = [] + for col in columns: + if col == "id": + data.append(str(task.id)) + elif col == "priority": + data.append(self._format_priority(task.priority)) + elif col == "project": + data.append(task.project or "") + elif col == "tags": + data.append(self._format_tags(task.tags)) + elif col == "summary": + data.append(task.summary) + elif col == "due": + data.append(self._format_due(task)) + elif col == "status": + data.append(task.status.value) + else: + data.append("") + return data + + def load_tasks(self) -> None: + """Load tasks from backend.""" + if not self.backend: + return + + # Get tasks with current filters + self.tasks = self.backend.get_tasks( + project=self.current_project_filter, + tags=self.current_tag_filters if self.current_tag_filters else None, + ) + + # Also load projects and tags for filtering + self.projects = self.backend.get_projects() + self.tags = self.backend.get_tags() + + # Update table + self._update_table() + + def _update_table(self) -> None: + """Update the task table with current tasks.""" + table = self.query_one("#task-table", DataTable) + table.clear() + + for task in self.tasks: + row_data = self._get_row_data(task) + table.add_row(*row_data, key=str(task.id)) + + # Update status bar + status_bar = self.query_one("#status-bar", TasksStatusBar) + status_bar.total_tasks = len(self.tasks) + + filters = [] + if self.current_project_filter: + filters.append(f"project:{self.current_project_filter}") + for tag in self.current_tag_filters: + filters.append(f"+{tag}") + status_bar.active_filters = filters + status_bar.refresh() + + def _get_selected_task(self) -> Optional[Task]: + """Get the currently selected task.""" + table = self.query_one("#task-table", DataTable) + if table.cursor_row is None or table.cursor_row >= len(self.tasks): + return None + return self.tasks[table.cursor_row] + + # Navigation actions + def action_cursor_down(self) -> None: + """Move cursor down.""" + table = self.query_one("#task-table", DataTable) + table.action_cursor_down() + + def action_cursor_up(self) -> None: + """Move cursor up.""" + table = self.query_one("#task-table", DataTable) + table.action_cursor_up() + + def action_first_task(self) -> None: + """Go to first task.""" + table = self.query_one("#task-table", DataTable) + table.move_cursor(row=0) + + def action_last_task(self) -> None: + """Go to last task.""" + table = self.query_one("#task-table", DataTable) + if self.tasks: + table.move_cursor(row=len(self.tasks) - 1) + + # Task actions + def action_complete_task(self) -> None: + """Mark selected task as complete.""" + task = self._get_selected_task() + if not task or not self.backend: + return + + if self.backend.complete_task(str(task.id)): + self.notify(f"Task {task.id} completed", severity="information") + self.load_tasks() + else: + self.notify(f"Failed to complete task {task.id}", severity="error") + + def action_start_task(self) -> None: + """Start working on selected task.""" + task = self._get_selected_task() + if not task or not self.backend: + return + + if self.backend.start_task(str(task.id)): + self.notify(f"Started task {task.id}", severity="information") + self.load_tasks() + else: + self.notify(f"Failed to start task {task.id}", severity="error") + + def action_stop_task(self) -> None: + """Stop working on selected task.""" + task = self._get_selected_task() + if not task or not self.backend: + return + + if self.backend.stop_task(str(task.id)): + self.notify(f"Stopped task {task.id}", severity="information") + self.load_tasks() + else: + self.notify(f"Failed to stop task {task.id}", severity="error") + + def action_delete_task(self) -> None: + """Delete selected task.""" + task = self._get_selected_task() + if not task or not self.backend: + return + + # TODO: Add confirmation dialog + if self.backend.delete_task(str(task.id)): + self.notify(f"Deleted task {task.id}", severity="warning") + self.load_tasks() + else: + self.notify(f"Failed to delete task {task.id}", severity="error") + + def action_edit_task(self) -> None: + """Edit selected task in editor.""" + task = self._get_selected_task() + if not task or not self.backend: + return + + # Suspend the app, open editor, then resume + with self.suspend(): + self.backend.edit_task_interactive(str(task.id)) + + self.load_tasks() + + def action_add_task(self) -> None: + """Add a new task.""" + # TODO: Push AddTask screen + self.notify("Add task not yet implemented", severity="warning") + + def action_view_task(self) -> None: + """View task details.""" + task = self._get_selected_task() + if not task: + return + # TODO: Push TaskDetail screen + self.notify(f"Task: {task.summary}\nNotes: {task.notes or 'None'}") + + # Filter actions + def action_filter_project(self) -> None: + """Open project filter dialog.""" + from .screens.FilterScreens import ProjectFilterScreen + + if not self.projects: + self.notify("No projects found", severity="warning") + return + + project_data = [(p.name, p.task_count) for p in self.projects if p.name] + + def handle_project_selection(project: str | None) -> None: + if project != self.current_project_filter: + self.current_project_filter = project + self.load_tasks() + if project: + self.notify(f"Filtering by project: {project}") + else: + self.notify("Project filter cleared") + + self.push_screen( + ProjectFilterScreen(project_data, self.current_project_filter), + handle_project_selection, + ) + + def action_filter_tag(self) -> None: + """Open tag filter dialog.""" + from .screens.FilterScreens import TagFilterScreen + + if not self.tags: + self.notify("No tags found", severity="warning") + return + + def handle_tag_selection(tags: list[str]) -> None: + if tags != self.current_tag_filters: + self.current_tag_filters = tags + self.load_tasks() + if tags: + self.notify(f"Filtering by tags: {', '.join(tags)}") + else: + self.notify("Tag filters cleared") + + self.push_screen( + TagFilterScreen(self.tags, self.current_tag_filters), + handle_tag_selection, + ) + + def action_clear_filters(self) -> None: + """Clear all filters.""" + self.current_project_filter = None + self.current_tag_filters = [] + self.load_tasks() + self.notify("Filters cleared", severity="information") + + # Other actions + def action_refresh(self) -> None: + """Refresh task list.""" + self.load_tasks() + self.notify("Refreshed", severity="information") + + def action_sync(self) -> None: + """Sync tasks with remote.""" + if not self.backend: + return + + if self.backend.sync(): + self.notify("Sync complete", severity="information") + self.load_tasks() + else: + self.notify("Sync failed", severity="error") + + def action_help(self) -> None: + """Show help.""" + help_text = """ +Keybindings: + j/k - Navigate up/down + g/G - First/Last task + d - Mark task done + s/S - Start/Stop task + a - Add new task + e - Edit task in editor + x - Delete task + p - Filter by project + t - Filter by tag + c - Clear filters + r - Refresh + y - Sync with remote + Enter - View task details + q - Quit +""" + self.notify(help_text.strip(), timeout=10) + + +def run_app(backend: Optional[TaskBackend] = None) -> None: + """Run the Tasks TUI application.""" + app = TasksApp(backend=backend) + app.run() + + +if __name__ == "__main__": + run_app() diff --git a/src/tasks/backend.py b/src/tasks/backend.py new file mode 100644 index 0000000..5803abf --- /dev/null +++ b/src/tasks/backend.py @@ -0,0 +1,259 @@ +"""Task backend abstraction for Tasks TUI. + +This module defines the abstract interface that all task backends must implement, +allowing the TUI to work with different task management systems (dstask, taskwarrior, etc.) +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Optional + + +class TaskStatus(Enum): + """Task status values.""" + + PENDING = "pending" + ACTIVE = "active" + DONE = "done" + DELETED = "deleted" + + +class TaskPriority(Enum): + """Task priority levels (P0 = highest, P3 = lowest).""" + + P0 = "P0" # Critical + P1 = "P1" # High + P2 = "P2" # Normal + P3 = "P3" # Low + + @classmethod + def from_string(cls, value: str) -> "TaskPriority": + """Parse priority from string.""" + value = value.upper().strip() + if value in ("P0", "0", "CRITICAL"): + return cls.P0 + elif value in ("P1", "1", "HIGH", "H"): + return cls.P1 + elif value in ("P2", "2", "NORMAL", "MEDIUM", "M"): + return cls.P2 + elif value in ("P3", "3", "LOW", "L"): + return cls.P3 + return cls.P2 # Default to normal + + +@dataclass +class Task: + """Unified task representation across backends.""" + + uuid: str + id: int # Short numeric ID for display + summary: str + status: TaskStatus = TaskStatus.PENDING + priority: TaskPriority = TaskPriority.P2 + project: str = "" + tags: list[str] = field(default_factory=list) + notes: str = "" + due: Optional[datetime] = None + created: Optional[datetime] = None + resolved: Optional[datetime] = None + + @property + def is_overdue(self) -> bool: + """Check if task is overdue.""" + if self.due is None or self.status == TaskStatus.DONE: + return False + # Use timezone-aware now() if due date is timezone-aware + now = datetime.now(timezone.utc) if self.due.tzinfo else datetime.now() + return now > self.due + + +@dataclass +class Project: + """Project information.""" + + name: str + task_count: int = 0 + resolved_count: int = 0 + active: bool = True + priority: TaskPriority = TaskPriority.P2 + + +class TaskBackend(ABC): + """Abstract base class for task management backends.""" + + @abstractmethod + def get_tasks( + self, + project: Optional[str] = None, + tags: Optional[list[str]] = None, + status: Optional[TaskStatus] = None, + ) -> list[Task]: + """Get tasks, optionally filtered by project, tags, or status. + + Args: + project: Filter by project name + tags: Filter by tags (tasks must have all specified tags) + status: Filter by status + + Returns: + List of matching tasks + """ + pass + + @abstractmethod + def get_next_tasks(self) -> list[Task]: + """Get the 'next' tasks to work on (priority-sorted actionable tasks).""" + pass + + @abstractmethod + def get_task(self, task_id: str) -> Optional[Task]: + """Get a single task by ID or UUID. + + Args: + task_id: Task ID (numeric) or UUID + + Returns: + Task if found, None otherwise + """ + pass + + @abstractmethod + def add_task( + self, + summary: str, + project: Optional[str] = None, + tags: Optional[list[str]] = None, + priority: Optional[TaskPriority] = None, + due: Optional[datetime] = None, + notes: Optional[str] = None, + ) -> Task: + """Create a new task. + + Args: + summary: Task description + project: Project name + tags: List of tags + priority: Task priority + due: Due date + notes: Additional notes + + Returns: + The created task + """ + pass + + @abstractmethod + def complete_task(self, task_id: str) -> bool: + """Mark a task as complete. + + Args: + task_id: Task ID or UUID + + Returns: + True if successful + """ + pass + + @abstractmethod + def delete_task(self, task_id: str) -> bool: + """Delete a task. + + Args: + task_id: Task ID or UUID + + Returns: + True if successful + """ + pass + + @abstractmethod + def start_task(self, task_id: str) -> bool: + """Start working on a task (mark as active). + + Args: + task_id: Task ID or UUID + + Returns: + True if successful + """ + pass + + @abstractmethod + def stop_task(self, task_id: str) -> bool: + """Stop working on a task (mark as pending). + + Args: + task_id: Task ID or UUID + + Returns: + True if successful + """ + pass + + @abstractmethod + def modify_task( + self, + task_id: str, + summary: Optional[str] = None, + project: Optional[str] = None, + tags: Optional[list[str]] = None, + priority: Optional[TaskPriority] = None, + due: Optional[datetime] = None, + notes: Optional[str] = None, + ) -> bool: + """Modify a task. + + Args: + task_id: Task ID or UUID + summary: New summary (if provided) + project: New project (if provided) + tags: New tags (if provided) + priority: New priority (if provided) + due: New due date (if provided) + notes: New notes (if provided) + + Returns: + True if successful + """ + pass + + @abstractmethod + def get_projects(self) -> list[Project]: + """Get all projects. + + Returns: + List of projects with task counts + """ + pass + + @abstractmethod + def get_tags(self) -> list[str]: + """Get all tags. + + Returns: + List of tag names + """ + pass + + @abstractmethod + def sync(self) -> bool: + """Sync tasks with remote (if supported). + + Returns: + True if successful (or not applicable) + """ + pass + + @abstractmethod + def edit_task_interactive(self, task_id: str) -> bool: + """Open task in editor for interactive editing. + + Args: + task_id: Task ID or UUID + + Returns: + True if successful + """ + pass diff --git a/src/tasks/config.py b/src/tasks/config.py new file mode 100644 index 0000000..b10de27 --- /dev/null +++ b/src/tasks/config.py @@ -0,0 +1,201 @@ +"""Configuration system for Tasks TUI using Pydantic.""" + +import logging +import os +from pathlib import Path +from typing import Literal, Optional + +import toml +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class BackendConfig(BaseModel): + """Configuration for task management backend.""" + + # Which backend to use: dstask or taskwarrior + backend: Literal["dstask", "taskwarrior"] = "dstask" + + # Path to dstask binary + dstask_path: str = Field( + default_factory=lambda: str(Path.home() / ".local" / "bin" / "dstask") + ) + + # Path to taskwarrior binary + taskwarrior_path: str = "task" + + +class DisplayConfig(BaseModel): + """Configuration for task list display.""" + + # Columns to show in the task table + # Available: id, priority, project, tags, summary, due, status + columns: list[str] = Field( + default_factory=lambda: ["id", "priority", "project", "tags", "summary", "due"] + ) + + # Column widths (0 = auto) + column_widths: dict[str, int] = Field( + default_factory=lambda: { + "id": 4, + "priority": 3, + "project": 15, + "tags": 15, + "summary": 0, # auto-expand + "due": 10, + "status": 8, + } + ) + + # Date format for due dates + date_format: str = "%Y-%m-%d" + + # Show completed tasks + show_completed: bool = False + + # Default sort column + default_sort: str = "priority" + + # Sort direction (asc or desc) + sort_direction: Literal["asc", "desc"] = "asc" + + +class IconsConfig(BaseModel): + """NerdFont icons for task display.""" + + # Priority icons (P0 = highest, P3 = lowest) + priority_p0: str = "\uf06a" # nf-fa-exclamation_circle (critical) + priority_p1: str = "\uf062" # nf-fa-arrow_up (high) + priority_p2: str = "\uf068" # nf-fa-minus (normal) + priority_p3: str = "\uf063" # nf-fa-arrow_down (low) + + # Status icons + status_pending: str = "\uf10c" # nf-fa-circle_o (empty circle) + status_active: str = "\uf192" # nf-fa-dot_circle_o (dot circle) + status_done: str = "\uf058" # nf-fa-check_circle (checked) + + # Other icons + project: str = "\uf07b" # nf-fa-folder + tag: str = "\uf02b" # nf-fa-tag + due: str = "\uf073" # nf-fa-calendar + overdue: str = "\uf071" # nf-fa-warning + + +class KeybindingsConfig(BaseModel): + """Keybinding customization.""" + + # Navigation + next_task: str = "j" + prev_task: str = "k" + first_task: str = "g" + last_task: str = "G" + + # Actions + complete_task: str = "d" + edit_task: str = "e" + add_task: str = "a" + delete_task: str = "x" + start_task: str = "s" + stop_task: str = "S" + + # Filtering + filter_project: str = "p" + filter_tag: str = "t" + clear_filters: str = "c" + + # Other + refresh: str = "r" + sync: str = "y" + quit: str = "q" + help: str = "?" + + +class ThemeConfig(BaseModel): + """Theme/appearance settings.""" + + # Priority colors (CSS color names or hex) + color_p0: str = "red" + color_p1: str = "orange" + color_p2: str = "yellow" + color_p3: str = "gray" + + # Status colors + color_pending: str = "white" + color_active: str = "cyan" + color_done: str = "green" + + # Overdue color + color_overdue: str = "red" + + +class TasksAppConfig(BaseModel): + """Main configuration for Tasks TUI.""" + + backend: BackendConfig = Field(default_factory=BackendConfig) + display: DisplayConfig = Field(default_factory=DisplayConfig) + icons: IconsConfig = Field(default_factory=IconsConfig) + keybindings: KeybindingsConfig = Field(default_factory=KeybindingsConfig) + theme: ThemeConfig = Field(default_factory=ThemeConfig) + + @classmethod + def get_config_path(cls) -> Path: + """Get the path to the config file.""" + # Check environment variable first + env_path = os.getenv("LUK_TASKS_CONFIG") + if env_path: + return Path(env_path) + + # Default to ~/.config/luk/tasks.toml + return Path.home() / ".config" / "luk" / "tasks.toml" + + @classmethod + def load(cls, config_path: Optional[Path] = None) -> "TasksAppConfig": + """Load config from TOML file with defaults for missing values.""" + if config_path is None: + config_path = cls.get_config_path() + + if config_path.exists(): + try: + with open(config_path, "r") as f: + data = toml.load(f) + logger.info(f"Loaded config from {config_path}") + return cls.model_validate(data) + except Exception as e: + logger.warning(f"Error loading config from {config_path}: {e}") + logger.warning("Using default configuration") + return cls() + else: + logger.info(f"No config file at {config_path}, using defaults") + return cls() + + def save(self, config_path: Optional[Path] = None) -> None: + """Save current config to TOML file.""" + if config_path is None: + config_path = self.get_config_path() + + # Ensure parent directory exists + config_path.parent.mkdir(parents=True, exist_ok=True) + + with open(config_path, "w") as f: + toml.dump(self.model_dump(), f) + logger.info(f"Saved config to {config_path}") + + +# Global config instance (lazy-loaded) +_config: Optional[TasksAppConfig] = None + + +def get_config() -> TasksAppConfig: + """Get the global config instance, loading it if necessary.""" + global _config + if _config is None: + _config = TasksAppConfig.load() + return _config + + +def reload_config() -> TasksAppConfig: + """Force reload of the config from disk.""" + global _config + _config = TasksAppConfig.load() + return _config diff --git a/src/tasks/screens/FilterScreens.py b/src/tasks/screens/FilterScreens.py new file mode 100644 index 0000000..848fbd2 --- /dev/null +++ b/src/tasks/screens/FilterScreens.py @@ -0,0 +1,232 @@ +"""Filter selection screens for Tasks TUI.""" + +from typing import Optional + +from textual import on +from textual.app import ComposeResult +from textual.binding import Binding +from textual.containers import Container, Vertical +from textual.screen import ModalScreen +from textual.widgets import Label, SelectionList, Button +from textual.widgets.selection_list import Selection + + +class ProjectFilterScreen(ModalScreen[Optional[str]]): + """Modal screen for selecting a project filter.""" + + BINDINGS = [ + Binding("escape", "cancel", "Cancel"), + Binding("enter", "select", "Select"), + ] + + DEFAULT_CSS = """ + ProjectFilterScreen { + align: center middle; + } + + ProjectFilterScreen #filter-container { + width: 50; + height: auto; + max-height: 80%; + background: $surface; + border: thick $primary; + padding: 1 2; + } + + ProjectFilterScreen #filter-title { + text-style: bold; + width: 1fr; + height: 1; + text-align: center; + margin-bottom: 1; + } + + ProjectFilterScreen SelectionList { + height: auto; + max-height: 15; + margin-bottom: 1; + } + + ProjectFilterScreen #filter-buttons { + width: 1fr; + height: auto; + align: center middle; + margin-top: 1; + } + + ProjectFilterScreen Button { + margin: 0 1; + } + """ + + def __init__( + self, + projects: list[tuple[str, int]], # List of (project_name, task_count) + current_filter: Optional[str] = None, + **kwargs, + ): + """Initialize the project filter screen. + + Args: + projects: List of (project_name, task_count) tuples + current_filter: Currently selected project filter + """ + super().__init__(**kwargs) + self._projects = projects + self._current_filter = current_filter + + def compose(self) -> ComposeResult: + with Container(id="filter-container"): + yield Label("Select Project", id="filter-title") + + selections = [ + Selection( + f"{name} ({count})", + name, + initial_state=name == self._current_filter, + ) + for name, count in self._projects + ] + + yield SelectionList[str](*selections, id="project-list") + + with Container(id="filter-buttons"): + yield Button("Cancel", id="cancel", variant="default") + yield Button("Clear", id="clear", variant="warning") + yield Button("Apply", id="apply", variant="primary") + + def on_mount(self) -> None: + """Focus the selection list.""" + self.query_one("#project-list", SelectionList).focus() + + @on(Button.Pressed, "#apply") + def handle_apply(self) -> None: + selection_list = self.query_one("#project-list", SelectionList) + selected = list(selection_list.selected) + if selected: + self.dismiss(selected[0]) # Return first selected project + else: + self.dismiss(None) + + @on(Button.Pressed, "#clear") + def handle_clear(self) -> None: + self.dismiss(None) + + @on(Button.Pressed, "#cancel") + def handle_cancel(self) -> None: + self.dismiss(self._current_filter) # Return unchanged + + def action_cancel(self) -> None: + self.dismiss(self._current_filter) + + def action_select(self) -> None: + self.handle_apply() + + +class TagFilterScreen(ModalScreen[list[str]]): + """Modal screen for selecting tag filters (multi-select).""" + + BINDINGS = [ + Binding("escape", "cancel", "Cancel"), + Binding("enter", "select", "Select"), + ] + + DEFAULT_CSS = """ + TagFilterScreen { + align: center middle; + } + + TagFilterScreen #filter-container { + width: 50; + height: auto; + max-height: 80%; + background: $surface; + border: thick $primary; + padding: 1 2; + } + + TagFilterScreen #filter-title { + text-style: bold; + width: 1fr; + height: 1; + text-align: center; + margin-bottom: 1; + } + + TagFilterScreen SelectionList { + height: auto; + max-height: 15; + margin-bottom: 1; + } + + TagFilterScreen #filter-buttons { + width: 1fr; + height: auto; + align: center middle; + margin-top: 1; + } + + TagFilterScreen Button { + margin: 0 1; + } + """ + + def __init__( + self, + tags: list[str], + current_filters: list[str], + **kwargs, + ): + """Initialize the tag filter screen. + + Args: + tags: List of available tags + current_filters: Currently selected tag filters + """ + super().__init__(**kwargs) + self._tags = tags + self._current_filters = current_filters + + def compose(self) -> ComposeResult: + with Container(id="filter-container"): + yield Label("Select Tags (multi-select)", id="filter-title") + + selections = [ + Selection( + f"+{tag}", + tag, + initial_state=tag in self._current_filters, + ) + for tag in self._tags + ] + + yield SelectionList[str](*selections, id="tag-list") + + with Container(id="filter-buttons"): + yield Button("Cancel", id="cancel", variant="default") + yield Button("Clear", id="clear", variant="warning") + yield Button("Apply", id="apply", variant="primary") + + def on_mount(self) -> None: + """Focus the selection list.""" + self.query_one("#tag-list", SelectionList).focus() + + @on(Button.Pressed, "#apply") + def handle_apply(self) -> None: + selection_list = self.query_one("#tag-list", SelectionList) + selected = list(selection_list.selected) + self.dismiss(selected) + + @on(Button.Pressed, "#clear") + def handle_clear(self) -> None: + self.dismiss([]) + + @on(Button.Pressed, "#cancel") + def handle_cancel(self) -> None: + self.dismiss(self._current_filters) # Return unchanged + + def action_cancel(self) -> None: + self.dismiss(self._current_filters) + + def action_select(self) -> None: + self.handle_apply() diff --git a/src/tasks/screens/__init__.py b/src/tasks/screens/__init__.py new file mode 100644 index 0000000..974cd63 --- /dev/null +++ b/src/tasks/screens/__init__.py @@ -0,0 +1,5 @@ +"""Screen components for Tasks TUI.""" + +from .FilterScreens import ProjectFilterScreen, TagFilterScreen + +__all__ = ["ProjectFilterScreen", "TagFilterScreen"] diff --git a/src/tasks/widgets/__init__.py b/src/tasks/widgets/__init__.py new file mode 100644 index 0000000..7ce3c23 --- /dev/null +++ b/src/tasks/widgets/__init__.py @@ -0,0 +1 @@ +"""Widget components for Tasks TUI."""