Compare commits
2 Commits
37be42884f
...
36d48c18d1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
36d48c18d1 | ||
|
|
fe65183fb7 |
@@ -37,7 +37,7 @@ from textual.widgets.option_list import Option
|
||||
from src.utils.file_icons import get_file_icon
|
||||
|
||||
# Import our DocumentViewerScreen
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "src", "maildir_gtd"))
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "src", "mail"))
|
||||
from screens.DocumentViewer import DocumentViewerScreen
|
||||
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from .calendar import calendar
|
||||
from .ticktick import ticktick
|
||||
from .godspeed import godspeed
|
||||
from .gitlab_monitor import gitlab_monitor
|
||||
from .tasks import tasks
|
||||
|
||||
|
||||
@click.group()
|
||||
@@ -24,7 +25,10 @@ cli.add_command(calendar)
|
||||
cli.add_command(ticktick)
|
||||
cli.add_command(godspeed)
|
||||
cli.add_command(gitlab_monitor)
|
||||
cli.add_command(tasks)
|
||||
|
||||
# Add 'mail' as an alias for email
|
||||
cli.add_command(email, name="mail")
|
||||
# Add 'tt' as a short alias for ticktick
|
||||
cli.add_command(ticktick, name="tt")
|
||||
# Add 'gs' as a short alias for godspeed
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import click
|
||||
from src.maildir_gtd.app import launch_email_viewer
|
||||
from src.mail.app import launch_email_viewer
|
||||
|
||||
|
||||
@click.command()
|
||||
|
||||
@@ -298,6 +298,7 @@ class SyncDashboard(App):
|
||||
TaskListItem("calendar", "Calendar Sync", id="task-calendar"),
|
||||
# Stage 3: Task management
|
||||
TaskListItem("godspeed", "Godspeed Sync", id="task-godspeed"),
|
||||
TaskListItem("dstask", "dstask Sync", id="task-dstask"),
|
||||
TaskListItem("sweep", "Task Sweep", id="task-sweep"),
|
||||
id="task-list",
|
||||
)
|
||||
@@ -871,6 +872,14 @@ async def run_dashboard_sync(
|
||||
await asyncio.sleep(0.3)
|
||||
tracker.complete_task("godspeed", "42 tasks synced")
|
||||
|
||||
# dstask sync
|
||||
tracker.start_task("dstask", 100)
|
||||
tracker.update_task("dstask", 30, "Running dstask sync...")
|
||||
await asyncio.sleep(0.3)
|
||||
tracker.update_task("dstask", 70, "Pushing changes...")
|
||||
await asyncio.sleep(0.2)
|
||||
tracker.complete_task("dstask", "Sync completed")
|
||||
|
||||
# Task sweep
|
||||
tracker.start_task("sweep")
|
||||
tracker.update_task("sweep", 50, "Scanning notes directory...")
|
||||
@@ -1084,6 +1093,24 @@ async def run_dashboard_sync(
|
||||
else:
|
||||
tracker.skip_task("godspeed", "Not due yet (every 15 min)")
|
||||
|
||||
# dstask sync
|
||||
tracker.start_task("dstask", 100)
|
||||
try:
|
||||
from src.services.dstask.client import DstaskClient
|
||||
|
||||
dstask_client = DstaskClient()
|
||||
if dstask_client.is_available():
|
||||
tracker.update_task("dstask", 30, "Running dstask sync...")
|
||||
success = dstask_client.sync()
|
||||
if success:
|
||||
tracker.complete_task("dstask", "Sync completed")
|
||||
else:
|
||||
tracker.error_task("dstask", "Sync failed")
|
||||
else:
|
||||
tracker.skip_task("dstask", "dstask not installed")
|
||||
except Exception as e:
|
||||
tracker.error_task("dstask", str(e))
|
||||
|
||||
# Task sweep (runs once daily after 6 PM)
|
||||
tracker.start_task("sweep", 100)
|
||||
if should_run_sweep():
|
||||
@@ -1133,6 +1160,7 @@ async def run_dashboard_sync(
|
||||
"inbox",
|
||||
"calendar",
|
||||
"godspeed",
|
||||
"dstask",
|
||||
"sweep",
|
||||
]:
|
||||
if task_id in dashboard._task_items:
|
||||
|
||||
11
src/cli/tasks.py
Normal file
11
src/cli/tasks.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""CLI command for Tasks TUI."""
|
||||
|
||||
import click
|
||||
|
||||
|
||||
@click.command()
|
||||
def tasks():
|
||||
"""Launch the Tasks TUI for managing tasks via dstask."""
|
||||
from src.tasks import run_app
|
||||
|
||||
run_app()
|
||||
1
src/mail/__init__.py
Normal file
1
src/mail/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Initialize the mail package
|
||||
@@ -1,4 +1,4 @@
|
||||
from .config import get_config, MaildirGTDConfig
|
||||
from .config import get_config, MailAppConfig
|
||||
from .message_store import MessageStore
|
||||
from .widgets.ContentContainer import ContentContainer
|
||||
from .widgets.EnvelopeListItem import EnvelopeListItem, GroupHeader
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Configuration system for MaildirGTD email reader using Pydantic."""
|
||||
"""Configuration system for Mail email reader using Pydantic."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
@@ -90,7 +90,7 @@ class LinkPanelConfig(BaseModel):
|
||||
close_on_open: bool = False
|
||||
|
||||
|
||||
class MailConfig(BaseModel):
|
||||
class MailOperationsConfig(BaseModel):
|
||||
"""Configuration for mail operations."""
|
||||
|
||||
# Folder to move messages to when archiving
|
||||
@@ -103,8 +103,8 @@ class ThemeConfig(BaseModel):
|
||||
theme_name: str = "monokai"
|
||||
|
||||
|
||||
class MaildirGTDConfig(BaseModel):
|
||||
"""Main configuration for MaildirGTD email reader."""
|
||||
class MailAppConfig(BaseModel):
|
||||
"""Main configuration for Mail email reader."""
|
||||
|
||||
task: TaskBackendConfig = Field(default_factory=TaskBackendConfig)
|
||||
envelope_display: EnvelopeDisplayConfig = Field(
|
||||
@@ -112,7 +112,7 @@ class MaildirGTDConfig(BaseModel):
|
||||
)
|
||||
content_display: ContentDisplayConfig = Field(default_factory=ContentDisplayConfig)
|
||||
link_panel: LinkPanelConfig = Field(default_factory=LinkPanelConfig)
|
||||
mail: MailConfig = Field(default_factory=MailConfig)
|
||||
mail: MailOperationsConfig = Field(default_factory=MailOperationsConfig)
|
||||
keybindings: KeybindingsConfig = Field(default_factory=KeybindingsConfig)
|
||||
theme: ThemeConfig = Field(default_factory=ThemeConfig)
|
||||
|
||||
@@ -120,15 +120,15 @@ class MaildirGTDConfig(BaseModel):
|
||||
def get_config_path(cls) -> Path:
|
||||
"""Get the path to the config file."""
|
||||
# Check environment variable first
|
||||
env_path = os.getenv("MAILDIR_GTD_CONFIG")
|
||||
env_path = os.getenv("LUK_MAIL_CONFIG")
|
||||
if env_path:
|
||||
return Path(env_path)
|
||||
|
||||
# Default to ~/.config/luk/maildir_gtd.toml
|
||||
return Path.home() / ".config" / "luk" / "maildir_gtd.toml"
|
||||
# Default to ~/.config/luk/mail.toml
|
||||
return Path.home() / ".config" / "luk" / "mail.toml"
|
||||
|
||||
@classmethod
|
||||
def load(cls, config_path: Optional[Path] = None) -> "MaildirGTDConfig":
|
||||
def load(cls, config_path: Optional[Path] = None) -> "MailAppConfig":
|
||||
"""Load config from TOML file with defaults for missing values."""
|
||||
if config_path is None:
|
||||
config_path = cls.get_config_path()
|
||||
@@ -161,19 +161,19 @@ class MaildirGTDConfig(BaseModel):
|
||||
|
||||
|
||||
# Global config instance (lazy-loaded)
|
||||
_config: Optional[MaildirGTDConfig] = None
|
||||
_config: Optional[MailAppConfig] = None
|
||||
|
||||
|
||||
def get_config() -> MaildirGTDConfig:
|
||||
def get_config() -> MailAppConfig:
|
||||
"""Get the global config instance, loading it if necessary."""
|
||||
global _config
|
||||
if _config is None:
|
||||
_config = MaildirGTDConfig.load()
|
||||
_config = MailAppConfig.load()
|
||||
return _config
|
||||
|
||||
|
||||
def reload_config() -> MaildirGTDConfig:
|
||||
def reload_config() -> MailAppConfig:
|
||||
"""Force reload of the config from disk."""
|
||||
global _config
|
||||
_config = MaildirGTDConfig.load()
|
||||
_config = MailAppConfig.load()
|
||||
return _config
|
||||
@@ -13,7 +13,7 @@ from textual.containers import Container, Vertical
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Label, ListView, ListItem, Static
|
||||
|
||||
from src.maildir_gtd.config import get_config
|
||||
from src.mail.config import get_config
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -5,8 +5,8 @@ from textual.containers import Vertical, ScrollableContainer
|
||||
from textual.widgets import Static, Markdown, Label
|
||||
from textual.reactive import reactive
|
||||
from src.services.himalaya import client as himalaya_client
|
||||
from src.maildir_gtd.config import get_config
|
||||
from src.maildir_gtd.screens.LinkPanel import extract_links_from_content, LinkItem
|
||||
from src.mail.config import get_config
|
||||
from src.mail.screens.LinkPanel import extract_links_from_content, LinkItem
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Literal, List
|
||||
@@ -7,7 +7,7 @@ from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.widgets import Label, Static
|
||||
|
||||
from src.maildir_gtd.config import EnvelopeDisplayConfig, get_config
|
||||
from src.mail.config import EnvelopeDisplayConfig, get_config
|
||||
|
||||
|
||||
class EnvelopeListItem(Static):
|
||||
@@ -1 +0,0 @@
|
||||
# Initialize the maildir_gtd package
|
||||
5
src/services/dstask/__init__.py
Normal file
5
src/services/dstask/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""dstask client service for Tasks TUI."""
|
||||
|
||||
from .client import DstaskClient
|
||||
|
||||
__all__ = ["DstaskClient"]
|
||||
345
src/services/dstask/client.py
Normal file
345
src/services/dstask/client.py
Normal file
@@ -0,0 +1,345 @@
|
||||
"""dstask CLI client implementation.
|
||||
|
||||
This module implements the TaskBackend interface for dstask,
|
||||
a local-first task manager with Git sync support.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from src.tasks.backend import (
|
||||
Project,
|
||||
Task,
|
||||
TaskBackend,
|
||||
TaskPriority,
|
||||
TaskStatus,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DstaskClient(TaskBackend):
|
||||
"""Client for interacting with dstask CLI."""
|
||||
|
||||
def __init__(self, dstask_path: Optional[str] = None):
|
||||
"""Initialize dstask client.
|
||||
|
||||
Args:
|
||||
dstask_path: Path to dstask binary. Defaults to ~/.local/bin/dstask
|
||||
"""
|
||||
if dstask_path is None:
|
||||
dstask_path = str(Path.home() / ".local" / "bin" / "dstask")
|
||||
self.dstask_path = dstask_path
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if dstask binary is available and executable."""
|
||||
path = Path(self.dstask_path)
|
||||
return path.exists() and path.is_file()
|
||||
|
||||
def _run_command(
|
||||
self, args: list[str], capture_output: bool = True
|
||||
) -> subprocess.CompletedProcess:
|
||||
"""Run a dstask command.
|
||||
|
||||
Args:
|
||||
args: Command arguments (without dstask binary)
|
||||
capture_output: Whether to capture stdout/stderr
|
||||
|
||||
Returns:
|
||||
CompletedProcess result
|
||||
"""
|
||||
cmd = [self.dstask_path] + args
|
||||
logger.debug(f"Running: {' '.join(cmd)}")
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=capture_output,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.warning(f"dstask command failed: {result.stderr}")
|
||||
|
||||
return result
|
||||
|
||||
def _parse_datetime(self, value: str) -> Optional[datetime]:
|
||||
"""Parse datetime from dstask JSON format."""
|
||||
if not value:
|
||||
return None
|
||||
|
||||
# dstask uses RFC3339 format
|
||||
try:
|
||||
# Handle Z suffix
|
||||
if value.endswith("Z"):
|
||||
value = value[:-1] + "+00:00"
|
||||
dt = datetime.fromisoformat(value)
|
||||
# dstask uses year 1 (0001-01-01) to indicate no date set
|
||||
if dt.year == 1:
|
||||
return None
|
||||
return dt
|
||||
except ValueError:
|
||||
logger.warning(f"Failed to parse datetime: {value}")
|
||||
return None
|
||||
|
||||
def _parse_task(self, data: dict) -> Task:
|
||||
"""Parse a task from dstask JSON output."""
|
||||
# Map dstask status to TaskStatus
|
||||
status_map = {
|
||||
"pending": TaskStatus.PENDING,
|
||||
"active": TaskStatus.ACTIVE,
|
||||
"resolved": TaskStatus.DONE,
|
||||
"deleted": TaskStatus.DELETED,
|
||||
}
|
||||
status = status_map.get(data.get("status", "pending"), TaskStatus.PENDING)
|
||||
|
||||
# Parse priority
|
||||
priority_str = data.get("priority", "P2")
|
||||
priority = TaskPriority.from_string(priority_str)
|
||||
|
||||
return Task(
|
||||
uuid=data.get("uuid", ""),
|
||||
id=data.get("id", 0),
|
||||
summary=data.get("summary", ""),
|
||||
status=status,
|
||||
priority=priority,
|
||||
project=data.get("project", ""),
|
||||
tags=data.get("tags", []) or [],
|
||||
notes=data.get("notes", ""),
|
||||
due=self._parse_datetime(data.get("due", "")),
|
||||
created=self._parse_datetime(data.get("created", "")),
|
||||
resolved=self._parse_datetime(data.get("resolved", "")),
|
||||
)
|
||||
|
||||
def _get_tasks_json(self, command: str = "show-open") -> list[Task]:
|
||||
"""Get tasks using a dstask command that outputs JSON."""
|
||||
result = self._run_command([command])
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"Failed to get tasks: {result.stderr}")
|
||||
return []
|
||||
|
||||
try:
|
||||
data = json.loads(result.stdout)
|
||||
return [self._parse_task(t) for t in data]
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to parse dstask output: {e}")
|
||||
return []
|
||||
|
||||
def get_tasks(
|
||||
self,
|
||||
project: Optional[str] = None,
|
||||
tags: Optional[list[str]] = None,
|
||||
status: Optional[TaskStatus] = None,
|
||||
) -> list[Task]:
|
||||
"""Get tasks, optionally filtered by project, tags, or status."""
|
||||
# Build filter arguments
|
||||
args = ["show-open"]
|
||||
|
||||
if project:
|
||||
args.append(f"project:{project}")
|
||||
|
||||
if tags:
|
||||
for tag in tags:
|
||||
args.append(f"+{tag}")
|
||||
|
||||
result = self._run_command(args)
|
||||
|
||||
if result.returncode != 0:
|
||||
return []
|
||||
|
||||
try:
|
||||
data = json.loads(result.stdout)
|
||||
tasks = [self._parse_task(t) for t in data]
|
||||
|
||||
# Filter by status if specified
|
||||
if status:
|
||||
tasks = [t for t in tasks if t.status == status]
|
||||
|
||||
return tasks
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
def get_next_tasks(self) -> list[Task]:
|
||||
"""Get the 'next' tasks to work on."""
|
||||
return self._get_tasks_json("next")
|
||||
|
||||
def get_task(self, task_id: str) -> Optional[Task]:
|
||||
"""Get a single task by ID or UUID."""
|
||||
result = self._run_command(["show-open"])
|
||||
|
||||
if result.returncode != 0:
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(result.stdout)
|
||||
for t in data:
|
||||
if str(t.get("id")) == task_id or t.get("uuid") == task_id:
|
||||
return self._parse_task(t)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def add_task(
|
||||
self,
|
||||
summary: str,
|
||||
project: Optional[str] = None,
|
||||
tags: Optional[list[str]] = None,
|
||||
priority: Optional[TaskPriority] = None,
|
||||
due: Optional[datetime] = None,
|
||||
notes: Optional[str] = None,
|
||||
) -> Task:
|
||||
"""Create a new task."""
|
||||
args = ["add", summary]
|
||||
|
||||
if project:
|
||||
args.append(f"project:{project}")
|
||||
|
||||
if tags:
|
||||
for tag in tags:
|
||||
args.append(f"+{tag}")
|
||||
|
||||
if priority:
|
||||
args.append(priority.value)
|
||||
|
||||
if due:
|
||||
# dstask uses various date formats
|
||||
args.append(f"due:{due.strftime('%Y-%m-%d')}")
|
||||
|
||||
result = self._run_command(args)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Failed to add task: {result.stderr}")
|
||||
|
||||
# Get the newly created task (it should be the last one)
|
||||
tasks = self.get_next_tasks()
|
||||
if tasks:
|
||||
# Find task by summary (best effort)
|
||||
for task in reversed(tasks):
|
||||
if task.summary == summary:
|
||||
# Add notes if provided
|
||||
if notes:
|
||||
self._run_command(["note", str(task.id), notes])
|
||||
task.notes = notes
|
||||
return task
|
||||
|
||||
# Return a placeholder if we can't find it
|
||||
return Task(
|
||||
uuid="",
|
||||
id=0,
|
||||
summary=summary,
|
||||
project=project or "",
|
||||
tags=tags or [],
|
||||
priority=priority or TaskPriority.P2,
|
||||
notes=notes or "",
|
||||
due=due,
|
||||
)
|
||||
|
||||
def complete_task(self, task_id: str) -> bool:
|
||||
"""Mark a task as complete."""
|
||||
result = self._run_command(["done", task_id])
|
||||
return result.returncode == 0
|
||||
|
||||
def delete_task(self, task_id: str) -> bool:
|
||||
"""Delete a task."""
|
||||
# dstask uses 'remove' for deletion
|
||||
result = self._run_command(["remove", task_id])
|
||||
return result.returncode == 0
|
||||
|
||||
def start_task(self, task_id: str) -> bool:
|
||||
"""Start working on a task (mark as active)."""
|
||||
result = self._run_command(["start", task_id])
|
||||
return result.returncode == 0
|
||||
|
||||
def stop_task(self, task_id: str) -> bool:
|
||||
"""Stop working on a task (mark as pending)."""
|
||||
result = self._run_command(["stop", task_id])
|
||||
return result.returncode == 0
|
||||
|
||||
def modify_task(
|
||||
self,
|
||||
task_id: str,
|
||||
summary: Optional[str] = None,
|
||||
project: Optional[str] = None,
|
||||
tags: Optional[list[str]] = None,
|
||||
priority: Optional[TaskPriority] = None,
|
||||
due: Optional[datetime] = None,
|
||||
notes: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Modify a task."""
|
||||
args = ["modify", task_id]
|
||||
|
||||
if summary:
|
||||
args.append(summary)
|
||||
|
||||
if project:
|
||||
args.append(f"project:{project}")
|
||||
|
||||
if tags:
|
||||
for tag in tags:
|
||||
args.append(f"+{tag}")
|
||||
|
||||
if priority:
|
||||
args.append(priority.value)
|
||||
|
||||
if due:
|
||||
args.append(f"due:{due.strftime('%Y-%m-%d')}")
|
||||
|
||||
result = self._run_command(args)
|
||||
|
||||
# Handle notes separately
|
||||
if notes is not None and result.returncode == 0:
|
||||
self._run_command(["note", task_id, notes])
|
||||
|
||||
return result.returncode == 0
|
||||
|
||||
def get_projects(self) -> list[Project]:
|
||||
"""Get all projects."""
|
||||
result = self._run_command(["show-projects"])
|
||||
|
||||
if result.returncode != 0:
|
||||
return []
|
||||
|
||||
try:
|
||||
data = json.loads(result.stdout)
|
||||
projects = []
|
||||
for p in data:
|
||||
priority = TaskPriority.from_string(p.get("priority", "P2"))
|
||||
projects.append(
|
||||
Project(
|
||||
name=p.get("name", ""),
|
||||
task_count=p.get("taskCount", 0),
|
||||
resolved_count=p.get("resolvedCount", 0),
|
||||
active=p.get("active", True),
|
||||
priority=priority,
|
||||
)
|
||||
)
|
||||
return projects
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
def get_tags(self) -> list[str]:
|
||||
"""Get all tags."""
|
||||
result = self._run_command(["show-tags"])
|
||||
|
||||
if result.returncode != 0:
|
||||
return []
|
||||
|
||||
# show-tags outputs plain text, one tag per line
|
||||
tags = result.stdout.strip().split("\n")
|
||||
return [t.strip() for t in tags if t.strip()]
|
||||
|
||||
def sync(self) -> bool:
|
||||
"""Sync tasks with Git remote."""
|
||||
result = self._run_command(["sync"])
|
||||
return result.returncode == 0
|
||||
|
||||
def edit_task_interactive(self, task_id: str) -> bool:
|
||||
"""Open task in editor for interactive editing."""
|
||||
# This needs to run without capturing output
|
||||
result = self._run_command(["edit", task_id], capture_output=False)
|
||||
return result.returncode == 0
|
||||
@@ -4,7 +4,7 @@ import json
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
from src.maildir_gtd.config import get_config
|
||||
from src.mail.config import get_config
|
||||
|
||||
|
||||
async def list_envelopes(limit: int = 9999) -> Tuple[List[Dict[str, Any]], bool]:
|
||||
|
||||
@@ -6,7 +6,7 @@ import logging
|
||||
import shlex
|
||||
from typing import Tuple, List, Dict, Any, Optional
|
||||
|
||||
from src.maildir_gtd.config import get_config
|
||||
from src.mail.config import get_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
18
src/tasks/__init__.py
Normal file
18
src/tasks/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
||||
"""Tasks TUI module for managing tasks via dstask/taskwarrior."""
|
||||
|
||||
from .config import TasksAppConfig, get_config, reload_config
|
||||
from .backend import Task, TaskBackend, TaskPriority, TaskStatus, Project
|
||||
from .app import TasksApp, run_app
|
||||
|
||||
__all__ = [
|
||||
"TasksAppConfig",
|
||||
"get_config",
|
||||
"reload_config",
|
||||
"Task",
|
||||
"TaskBackend",
|
||||
"TaskPriority",
|
||||
"TaskStatus",
|
||||
"Project",
|
||||
"TasksApp",
|
||||
"run_app",
|
||||
]
|
||||
476
src/tasks/app.py
Normal file
476
src/tasks/app.py
Normal file
@@ -0,0 +1,476 @@
|
||||
"""Tasks TUI application.
|
||||
|
||||
A Textual-based TUI for managing tasks via dstask/taskwarrior.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.logging import TextualHandler
|
||||
from textual.widgets import DataTable, Footer, Header, Static
|
||||
|
||||
from .config import get_config, TasksAppConfig
|
||||
from .backend import Task, TaskBackend, TaskPriority, TaskStatus, Project
|
||||
|
||||
# Add the parent directory to the system path to resolve relative imports
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
logging.basicConfig(
|
||||
level="NOTSET",
|
||||
handlers=[TextualHandler()],
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TasksStatusBar(Static):
|
||||
"""Status bar showing task counts and current filters."""
|
||||
|
||||
total_tasks: int = 0
|
||||
active_filters: list[str] = []
|
||||
|
||||
def render(self) -> str:
|
||||
filter_text = " | ".join(self.active_filters) if self.active_filters else "All"
|
||||
return f"Tasks: {self.total_tasks} | Filter: {filter_text}"
|
||||
|
||||
|
||||
class TasksApp(App):
|
||||
"""A TUI for managing tasks via dstask/taskwarrior."""
|
||||
|
||||
CSS = """
|
||||
Screen {
|
||||
layout: grid;
|
||||
grid-size: 1;
|
||||
grid-rows: auto 1fr auto auto;
|
||||
}
|
||||
|
||||
#task-table {
|
||||
height: 100%;
|
||||
}
|
||||
|
||||
DataTable > .datatable--cursor {
|
||||
background: $accent;
|
||||
color: $text;
|
||||
}
|
||||
|
||||
.priority-p0 {
|
||||
color: red;
|
||||
}
|
||||
|
||||
.priority-p1 {
|
||||
color: orange;
|
||||
}
|
||||
|
||||
.priority-p2 {
|
||||
color: yellow;
|
||||
}
|
||||
|
||||
.priority-p3 {
|
||||
color: gray;
|
||||
}
|
||||
|
||||
.overdue {
|
||||
color: red;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
.status-active {
|
||||
color: cyan;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
#status-bar {
|
||||
dock: bottom;
|
||||
height: 1;
|
||||
background: $surface;
|
||||
color: $text-muted;
|
||||
padding: 0 1;
|
||||
}
|
||||
"""
|
||||
|
||||
BINDINGS = [
|
||||
Binding("q", "quit", "Quit", show=True),
|
||||
Binding("j", "cursor_down", "Down", show=False),
|
||||
Binding("k", "cursor_up", "Up", show=False),
|
||||
Binding("g", "first_task", "First", show=False),
|
||||
Binding("G", "last_task", "Last", show=False),
|
||||
Binding("d", "complete_task", "Done", show=True),
|
||||
Binding("s", "start_task", "Start", show=True),
|
||||
Binding("S", "stop_task", "Stop", show=False),
|
||||
Binding("a", "add_task", "Add", show=True),
|
||||
Binding("e", "edit_task", "Edit", show=True),
|
||||
Binding("x", "delete_task", "Delete", show=False),
|
||||
Binding("p", "filter_project", "Project", show=True),
|
||||
Binding("t", "filter_tag", "Tag", show=True),
|
||||
Binding("c", "clear_filters", "Clear", show=True),
|
||||
Binding("r", "refresh", "Refresh", show=True),
|
||||
Binding("y", "sync", "Sync", show=True),
|
||||
Binding("?", "help", "Help", show=True),
|
||||
Binding("enter", "view_task", "View", show=False),
|
||||
]
|
||||
|
||||
# Class-level type hints (instance variables initialized in __init__)
|
||||
tasks: list[Task]
|
||||
projects: list[Project]
|
||||
tags: list[str]
|
||||
current_project_filter: Optional[str]
|
||||
current_tag_filters: list[str]
|
||||
backend: Optional[TaskBackend]
|
||||
config: Optional[TasksAppConfig]
|
||||
|
||||
def __init__(self, backend: Optional[TaskBackend] = None):
|
||||
super().__init__()
|
||||
# Initialize instance variables
|
||||
self.tasks = []
|
||||
self.projects = []
|
||||
self.tags = []
|
||||
self.current_project_filter = None
|
||||
self.current_tag_filters = []
|
||||
self.config = get_config()
|
||||
|
||||
if backend:
|
||||
self.backend = backend
|
||||
else:
|
||||
# Create backend from config
|
||||
from src.services.dstask import DstaskClient
|
||||
|
||||
self.backend = DstaskClient(self.config.backend.dstask_path)
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Create the app layout."""
|
||||
yield Header()
|
||||
yield DataTable(id="task-table", cursor_type="row")
|
||||
yield TasksStatusBar(id="status-bar")
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Initialize the app on mount."""
|
||||
table = self.query_one("#task-table", DataTable)
|
||||
|
||||
# Setup columns based on config
|
||||
columns = (
|
||||
self.config.display.columns
|
||||
if self.config
|
||||
else ["id", "priority", "project", "tags", "summary", "due"]
|
||||
)
|
||||
|
||||
for col in columns:
|
||||
width = None
|
||||
if self.config and col in self.config.display.column_widths:
|
||||
w = self.config.display.column_widths[col]
|
||||
if w > 0:
|
||||
width = w
|
||||
table.add_column(col.capitalize(), width=width, key=col)
|
||||
|
||||
# Load tasks
|
||||
self.load_tasks()
|
||||
|
||||
def _format_priority(self, priority: TaskPriority) -> str:
|
||||
"""Format priority with icon."""
|
||||
if not self.config:
|
||||
return priority.value
|
||||
|
||||
icons = self.config.icons
|
||||
icon_map = {
|
||||
TaskPriority.P0: icons.priority_p0,
|
||||
TaskPriority.P1: icons.priority_p1,
|
||||
TaskPriority.P2: icons.priority_p2,
|
||||
TaskPriority.P3: icons.priority_p3,
|
||||
}
|
||||
return f"{icon_map.get(priority, '')} {priority.value}"
|
||||
|
||||
def _format_tags(self, tags: list[str]) -> str:
|
||||
"""Format tags list."""
|
||||
if not tags:
|
||||
return ""
|
||||
return " ".join(f"+{t}" for t in tags[:3]) # Show max 3 tags
|
||||
|
||||
def _format_due(self, task: Task) -> str:
|
||||
"""Format due date."""
|
||||
if not task.due:
|
||||
return ""
|
||||
|
||||
date_str = task.due.strftime(
|
||||
self.config.display.date_format if self.config else "%Y-%m-%d"
|
||||
)
|
||||
|
||||
if task.is_overdue:
|
||||
return f"! {date_str}"
|
||||
return date_str
|
||||
|
||||
def _get_row_data(self, task: Task) -> list[str]:
|
||||
"""Get row data for a task based on configured columns."""
|
||||
columns = (
|
||||
self.config.display.columns
|
||||
if self.config
|
||||
else ["id", "priority", "project", "tags", "summary", "due"]
|
||||
)
|
||||
|
||||
data = []
|
||||
for col in columns:
|
||||
if col == "id":
|
||||
data.append(str(task.id))
|
||||
elif col == "priority":
|
||||
data.append(self._format_priority(task.priority))
|
||||
elif col == "project":
|
||||
data.append(task.project or "")
|
||||
elif col == "tags":
|
||||
data.append(self._format_tags(task.tags))
|
||||
elif col == "summary":
|
||||
data.append(task.summary)
|
||||
elif col == "due":
|
||||
data.append(self._format_due(task))
|
||||
elif col == "status":
|
||||
data.append(task.status.value)
|
||||
else:
|
||||
data.append("")
|
||||
return data
|
||||
|
||||
def load_tasks(self) -> None:
|
||||
"""Load tasks from backend."""
|
||||
if not self.backend:
|
||||
return
|
||||
|
||||
# Get tasks with current filters
|
||||
self.tasks = self.backend.get_tasks(
|
||||
project=self.current_project_filter,
|
||||
tags=self.current_tag_filters if self.current_tag_filters else None,
|
||||
)
|
||||
|
||||
# Also load projects and tags for filtering
|
||||
self.projects = self.backend.get_projects()
|
||||
self.tags = self.backend.get_tags()
|
||||
|
||||
# Update table
|
||||
self._update_table()
|
||||
|
||||
def _update_table(self) -> None:
|
||||
"""Update the task table with current tasks."""
|
||||
table = self.query_one("#task-table", DataTable)
|
||||
table.clear()
|
||||
|
||||
for task in self.tasks:
|
||||
row_data = self._get_row_data(task)
|
||||
table.add_row(*row_data, key=str(task.id))
|
||||
|
||||
# Update status bar
|
||||
status_bar = self.query_one("#status-bar", TasksStatusBar)
|
||||
status_bar.total_tasks = len(self.tasks)
|
||||
|
||||
filters = []
|
||||
if self.current_project_filter:
|
||||
filters.append(f"project:{self.current_project_filter}")
|
||||
for tag in self.current_tag_filters:
|
||||
filters.append(f"+{tag}")
|
||||
status_bar.active_filters = filters
|
||||
status_bar.refresh()
|
||||
|
||||
def _get_selected_task(self) -> Optional[Task]:
|
||||
"""Get the currently selected task."""
|
||||
table = self.query_one("#task-table", DataTable)
|
||||
if table.cursor_row is None or table.cursor_row >= len(self.tasks):
|
||||
return None
|
||||
return self.tasks[table.cursor_row]
|
||||
|
||||
# Navigation actions
|
||||
def action_cursor_down(self) -> None:
|
||||
"""Move cursor down."""
|
||||
table = self.query_one("#task-table", DataTable)
|
||||
table.action_cursor_down()
|
||||
|
||||
def action_cursor_up(self) -> None:
|
||||
"""Move cursor up."""
|
||||
table = self.query_one("#task-table", DataTable)
|
||||
table.action_cursor_up()
|
||||
|
||||
def action_first_task(self) -> None:
|
||||
"""Go to first task."""
|
||||
table = self.query_one("#task-table", DataTable)
|
||||
table.move_cursor(row=0)
|
||||
|
||||
def action_last_task(self) -> None:
|
||||
"""Go to last task."""
|
||||
table = self.query_one("#task-table", DataTable)
|
||||
if self.tasks:
|
||||
table.move_cursor(row=len(self.tasks) - 1)
|
||||
|
||||
# Task actions
|
||||
def action_complete_task(self) -> None:
|
||||
"""Mark selected task as complete."""
|
||||
task = self._get_selected_task()
|
||||
if not task or not self.backend:
|
||||
return
|
||||
|
||||
if self.backend.complete_task(str(task.id)):
|
||||
self.notify(f"Task {task.id} completed", severity="information")
|
||||
self.load_tasks()
|
||||
else:
|
||||
self.notify(f"Failed to complete task {task.id}", severity="error")
|
||||
|
||||
def action_start_task(self) -> None:
|
||||
"""Start working on selected task."""
|
||||
task = self._get_selected_task()
|
||||
if not task or not self.backend:
|
||||
return
|
||||
|
||||
if self.backend.start_task(str(task.id)):
|
||||
self.notify(f"Started task {task.id}", severity="information")
|
||||
self.load_tasks()
|
||||
else:
|
||||
self.notify(f"Failed to start task {task.id}", severity="error")
|
||||
|
||||
def action_stop_task(self) -> None:
|
||||
"""Stop working on selected task."""
|
||||
task = self._get_selected_task()
|
||||
if not task or not self.backend:
|
||||
return
|
||||
|
||||
if self.backend.stop_task(str(task.id)):
|
||||
self.notify(f"Stopped task {task.id}", severity="information")
|
||||
self.load_tasks()
|
||||
else:
|
||||
self.notify(f"Failed to stop task {task.id}", severity="error")
|
||||
|
||||
def action_delete_task(self) -> None:
|
||||
"""Delete selected task."""
|
||||
task = self._get_selected_task()
|
||||
if not task or not self.backend:
|
||||
return
|
||||
|
||||
# TODO: Add confirmation dialog
|
||||
if self.backend.delete_task(str(task.id)):
|
||||
self.notify(f"Deleted task {task.id}", severity="warning")
|
||||
self.load_tasks()
|
||||
else:
|
||||
self.notify(f"Failed to delete task {task.id}", severity="error")
|
||||
|
||||
def action_edit_task(self) -> None:
|
||||
"""Edit selected task in editor."""
|
||||
task = self._get_selected_task()
|
||||
if not task or not self.backend:
|
||||
return
|
||||
|
||||
# Suspend the app, open editor, then resume
|
||||
with self.suspend():
|
||||
self.backend.edit_task_interactive(str(task.id))
|
||||
|
||||
self.load_tasks()
|
||||
|
||||
def action_add_task(self) -> None:
|
||||
"""Add a new task."""
|
||||
# TODO: Push AddTask screen
|
||||
self.notify("Add task not yet implemented", severity="warning")
|
||||
|
||||
def action_view_task(self) -> None:
|
||||
"""View task details."""
|
||||
task = self._get_selected_task()
|
||||
if not task:
|
||||
return
|
||||
# TODO: Push TaskDetail screen
|
||||
self.notify(f"Task: {task.summary}\nNotes: {task.notes or 'None'}")
|
||||
|
||||
# Filter actions
|
||||
def action_filter_project(self) -> None:
|
||||
"""Open project filter dialog."""
|
||||
from .screens.FilterScreens import ProjectFilterScreen
|
||||
|
||||
if not self.projects:
|
||||
self.notify("No projects found", severity="warning")
|
||||
return
|
||||
|
||||
project_data = [(p.name, p.task_count) for p in self.projects if p.name]
|
||||
|
||||
def handle_project_selection(project: str | None) -> None:
|
||||
if project != self.current_project_filter:
|
||||
self.current_project_filter = project
|
||||
self.load_tasks()
|
||||
if project:
|
||||
self.notify(f"Filtering by project: {project}")
|
||||
else:
|
||||
self.notify("Project filter cleared")
|
||||
|
||||
self.push_screen(
|
||||
ProjectFilterScreen(project_data, self.current_project_filter),
|
||||
handle_project_selection,
|
||||
)
|
||||
|
||||
def action_filter_tag(self) -> None:
|
||||
"""Open tag filter dialog."""
|
||||
from .screens.FilterScreens import TagFilterScreen
|
||||
|
||||
if not self.tags:
|
||||
self.notify("No tags found", severity="warning")
|
||||
return
|
||||
|
||||
def handle_tag_selection(tags: list[str]) -> None:
|
||||
if tags != self.current_tag_filters:
|
||||
self.current_tag_filters = tags
|
||||
self.load_tasks()
|
||||
if tags:
|
||||
self.notify(f"Filtering by tags: {', '.join(tags)}")
|
||||
else:
|
||||
self.notify("Tag filters cleared")
|
||||
|
||||
self.push_screen(
|
||||
TagFilterScreen(self.tags, self.current_tag_filters),
|
||||
handle_tag_selection,
|
||||
)
|
||||
|
||||
def action_clear_filters(self) -> None:
|
||||
"""Clear all filters."""
|
||||
self.current_project_filter = None
|
||||
self.current_tag_filters = []
|
||||
self.load_tasks()
|
||||
self.notify("Filters cleared", severity="information")
|
||||
|
||||
# Other actions
|
||||
def action_refresh(self) -> None:
|
||||
"""Refresh task list."""
|
||||
self.load_tasks()
|
||||
self.notify("Refreshed", severity="information")
|
||||
|
||||
def action_sync(self) -> None:
|
||||
"""Sync tasks with remote."""
|
||||
if not self.backend:
|
||||
return
|
||||
|
||||
if self.backend.sync():
|
||||
self.notify("Sync complete", severity="information")
|
||||
self.load_tasks()
|
||||
else:
|
||||
self.notify("Sync failed", severity="error")
|
||||
|
||||
def action_help(self) -> None:
|
||||
"""Show help."""
|
||||
help_text = """
|
||||
Keybindings:
|
||||
j/k - Navigate up/down
|
||||
g/G - First/Last task
|
||||
d - Mark task done
|
||||
s/S - Start/Stop task
|
||||
a - Add new task
|
||||
e - Edit task in editor
|
||||
x - Delete task
|
||||
p - Filter by project
|
||||
t - Filter by tag
|
||||
c - Clear filters
|
||||
r - Refresh
|
||||
y - Sync with remote
|
||||
Enter - View task details
|
||||
q - Quit
|
||||
"""
|
||||
self.notify(help_text.strip(), timeout=10)
|
||||
|
||||
|
||||
def run_app(backend: Optional[TaskBackend] = None) -> None:
|
||||
"""Run the Tasks TUI application."""
|
||||
app = TasksApp(backend=backend)
|
||||
app.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_app()
|
||||
259
src/tasks/backend.py
Normal file
259
src/tasks/backend.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""Task backend abstraction for Tasks TUI.
|
||||
|
||||
This module defines the abstract interface that all task backends must implement,
|
||||
allowing the TUI to work with different task management systems (dstask, taskwarrior, etc.)
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class TaskStatus(Enum):
|
||||
"""Task status values."""
|
||||
|
||||
PENDING = "pending"
|
||||
ACTIVE = "active"
|
||||
DONE = "done"
|
||||
DELETED = "deleted"
|
||||
|
||||
|
||||
class TaskPriority(Enum):
|
||||
"""Task priority levels (P0 = highest, P3 = lowest)."""
|
||||
|
||||
P0 = "P0" # Critical
|
||||
P1 = "P1" # High
|
||||
P2 = "P2" # Normal
|
||||
P3 = "P3" # Low
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, value: str) -> "TaskPriority":
|
||||
"""Parse priority from string."""
|
||||
value = value.upper().strip()
|
||||
if value in ("P0", "0", "CRITICAL"):
|
||||
return cls.P0
|
||||
elif value in ("P1", "1", "HIGH", "H"):
|
||||
return cls.P1
|
||||
elif value in ("P2", "2", "NORMAL", "MEDIUM", "M"):
|
||||
return cls.P2
|
||||
elif value in ("P3", "3", "LOW", "L"):
|
||||
return cls.P3
|
||||
return cls.P2 # Default to normal
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
"""Unified task representation across backends."""
|
||||
|
||||
uuid: str
|
||||
id: int # Short numeric ID for display
|
||||
summary: str
|
||||
status: TaskStatus = TaskStatus.PENDING
|
||||
priority: TaskPriority = TaskPriority.P2
|
||||
project: str = ""
|
||||
tags: list[str] = field(default_factory=list)
|
||||
notes: str = ""
|
||||
due: Optional[datetime] = None
|
||||
created: Optional[datetime] = None
|
||||
resolved: Optional[datetime] = None
|
||||
|
||||
@property
|
||||
def is_overdue(self) -> bool:
|
||||
"""Check if task is overdue."""
|
||||
if self.due is None or self.status == TaskStatus.DONE:
|
||||
return False
|
||||
# Use timezone-aware now() if due date is timezone-aware
|
||||
now = datetime.now(timezone.utc) if self.due.tzinfo else datetime.now()
|
||||
return now > self.due
|
||||
|
||||
|
||||
@dataclass
|
||||
class Project:
|
||||
"""Project information."""
|
||||
|
||||
name: str
|
||||
task_count: int = 0
|
||||
resolved_count: int = 0
|
||||
active: bool = True
|
||||
priority: TaskPriority = TaskPriority.P2
|
||||
|
||||
|
||||
class TaskBackend(ABC):
|
||||
"""Abstract base class for task management backends."""
|
||||
|
||||
@abstractmethod
|
||||
def get_tasks(
|
||||
self,
|
||||
project: Optional[str] = None,
|
||||
tags: Optional[list[str]] = None,
|
||||
status: Optional[TaskStatus] = None,
|
||||
) -> list[Task]:
|
||||
"""Get tasks, optionally filtered by project, tags, or status.
|
||||
|
||||
Args:
|
||||
project: Filter by project name
|
||||
tags: Filter by tags (tasks must have all specified tags)
|
||||
status: Filter by status
|
||||
|
||||
Returns:
|
||||
List of matching tasks
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_next_tasks(self) -> list[Task]:
|
||||
"""Get the 'next' tasks to work on (priority-sorted actionable tasks)."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_task(self, task_id: str) -> Optional[Task]:
|
||||
"""Get a single task by ID or UUID.
|
||||
|
||||
Args:
|
||||
task_id: Task ID (numeric) or UUID
|
||||
|
||||
Returns:
|
||||
Task if found, None otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def add_task(
|
||||
self,
|
||||
summary: str,
|
||||
project: Optional[str] = None,
|
||||
tags: Optional[list[str]] = None,
|
||||
priority: Optional[TaskPriority] = None,
|
||||
due: Optional[datetime] = None,
|
||||
notes: Optional[str] = None,
|
||||
) -> Task:
|
||||
"""Create a new task.
|
||||
|
||||
Args:
|
||||
summary: Task description
|
||||
project: Project name
|
||||
tags: List of tags
|
||||
priority: Task priority
|
||||
due: Due date
|
||||
notes: Additional notes
|
||||
|
||||
Returns:
|
||||
The created task
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def complete_task(self, task_id: str) -> bool:
|
||||
"""Mark a task as complete.
|
||||
|
||||
Args:
|
||||
task_id: Task ID or UUID
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_task(self, task_id: str) -> bool:
|
||||
"""Delete a task.
|
||||
|
||||
Args:
|
||||
task_id: Task ID or UUID
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start_task(self, task_id: str) -> bool:
|
||||
"""Start working on a task (mark as active).
|
||||
|
||||
Args:
|
||||
task_id: Task ID or UUID
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop_task(self, task_id: str) -> bool:
|
||||
"""Stop working on a task (mark as pending).
|
||||
|
||||
Args:
|
||||
task_id: Task ID or UUID
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def modify_task(
|
||||
self,
|
||||
task_id: str,
|
||||
summary: Optional[str] = None,
|
||||
project: Optional[str] = None,
|
||||
tags: Optional[list[str]] = None,
|
||||
priority: Optional[TaskPriority] = None,
|
||||
due: Optional[datetime] = None,
|
||||
notes: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Modify a task.
|
||||
|
||||
Args:
|
||||
task_id: Task ID or UUID
|
||||
summary: New summary (if provided)
|
||||
project: New project (if provided)
|
||||
tags: New tags (if provided)
|
||||
priority: New priority (if provided)
|
||||
due: New due date (if provided)
|
||||
notes: New notes (if provided)
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_projects(self) -> list[Project]:
|
||||
"""Get all projects.
|
||||
|
||||
Returns:
|
||||
List of projects with task counts
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_tags(self) -> list[str]:
|
||||
"""Get all tags.
|
||||
|
||||
Returns:
|
||||
List of tag names
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def sync(self) -> bool:
|
||||
"""Sync tasks with remote (if supported).
|
||||
|
||||
Returns:
|
||||
True if successful (or not applicable)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def edit_task_interactive(self, task_id: str) -> bool:
|
||||
"""Open task in editor for interactive editing.
|
||||
|
||||
Args:
|
||||
task_id: Task ID or UUID
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
pass
|
||||
201
src/tasks/config.py
Normal file
201
src/tasks/config.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""Configuration system for Tasks TUI using Pydantic."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Literal, Optional
|
||||
|
||||
import toml
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BackendConfig(BaseModel):
|
||||
"""Configuration for task management backend."""
|
||||
|
||||
# Which backend to use: dstask or taskwarrior
|
||||
backend: Literal["dstask", "taskwarrior"] = "dstask"
|
||||
|
||||
# Path to dstask binary
|
||||
dstask_path: str = Field(
|
||||
default_factory=lambda: str(Path.home() / ".local" / "bin" / "dstask")
|
||||
)
|
||||
|
||||
# Path to taskwarrior binary
|
||||
taskwarrior_path: str = "task"
|
||||
|
||||
|
||||
class DisplayConfig(BaseModel):
|
||||
"""Configuration for task list display."""
|
||||
|
||||
# Columns to show in the task table
|
||||
# Available: id, priority, project, tags, summary, due, status
|
||||
columns: list[str] = Field(
|
||||
default_factory=lambda: ["id", "priority", "project", "tags", "summary", "due"]
|
||||
)
|
||||
|
||||
# Column widths (0 = auto)
|
||||
column_widths: dict[str, int] = Field(
|
||||
default_factory=lambda: {
|
||||
"id": 4,
|
||||
"priority": 3,
|
||||
"project": 15,
|
||||
"tags": 15,
|
||||
"summary": 0, # auto-expand
|
||||
"due": 10,
|
||||
"status": 8,
|
||||
}
|
||||
)
|
||||
|
||||
# Date format for due dates
|
||||
date_format: str = "%Y-%m-%d"
|
||||
|
||||
# Show completed tasks
|
||||
show_completed: bool = False
|
||||
|
||||
# Default sort column
|
||||
default_sort: str = "priority"
|
||||
|
||||
# Sort direction (asc or desc)
|
||||
sort_direction: Literal["asc", "desc"] = "asc"
|
||||
|
||||
|
||||
class IconsConfig(BaseModel):
|
||||
"""NerdFont icons for task display."""
|
||||
|
||||
# Priority icons (P0 = highest, P3 = lowest)
|
||||
priority_p0: str = "\uf06a" # nf-fa-exclamation_circle (critical)
|
||||
priority_p1: str = "\uf062" # nf-fa-arrow_up (high)
|
||||
priority_p2: str = "\uf068" # nf-fa-minus (normal)
|
||||
priority_p3: str = "\uf063" # nf-fa-arrow_down (low)
|
||||
|
||||
# Status icons
|
||||
status_pending: str = "\uf10c" # nf-fa-circle_o (empty circle)
|
||||
status_active: str = "\uf192" # nf-fa-dot_circle_o (dot circle)
|
||||
status_done: str = "\uf058" # nf-fa-check_circle (checked)
|
||||
|
||||
# Other icons
|
||||
project: str = "\uf07b" # nf-fa-folder
|
||||
tag: str = "\uf02b" # nf-fa-tag
|
||||
due: str = "\uf073" # nf-fa-calendar
|
||||
overdue: str = "\uf071" # nf-fa-warning
|
||||
|
||||
|
||||
class KeybindingsConfig(BaseModel):
|
||||
"""Keybinding customization."""
|
||||
|
||||
# Navigation
|
||||
next_task: str = "j"
|
||||
prev_task: str = "k"
|
||||
first_task: str = "g"
|
||||
last_task: str = "G"
|
||||
|
||||
# Actions
|
||||
complete_task: str = "d"
|
||||
edit_task: str = "e"
|
||||
add_task: str = "a"
|
||||
delete_task: str = "x"
|
||||
start_task: str = "s"
|
||||
stop_task: str = "S"
|
||||
|
||||
# Filtering
|
||||
filter_project: str = "p"
|
||||
filter_tag: str = "t"
|
||||
clear_filters: str = "c"
|
||||
|
||||
# Other
|
||||
refresh: str = "r"
|
||||
sync: str = "y"
|
||||
quit: str = "q"
|
||||
help: str = "?"
|
||||
|
||||
|
||||
class ThemeConfig(BaseModel):
|
||||
"""Theme/appearance settings."""
|
||||
|
||||
# Priority colors (CSS color names or hex)
|
||||
color_p0: str = "red"
|
||||
color_p1: str = "orange"
|
||||
color_p2: str = "yellow"
|
||||
color_p3: str = "gray"
|
||||
|
||||
# Status colors
|
||||
color_pending: str = "white"
|
||||
color_active: str = "cyan"
|
||||
color_done: str = "green"
|
||||
|
||||
# Overdue color
|
||||
color_overdue: str = "red"
|
||||
|
||||
|
||||
class TasksAppConfig(BaseModel):
|
||||
"""Main configuration for Tasks TUI."""
|
||||
|
||||
backend: BackendConfig = Field(default_factory=BackendConfig)
|
||||
display: DisplayConfig = Field(default_factory=DisplayConfig)
|
||||
icons: IconsConfig = Field(default_factory=IconsConfig)
|
||||
keybindings: KeybindingsConfig = Field(default_factory=KeybindingsConfig)
|
||||
theme: ThemeConfig = Field(default_factory=ThemeConfig)
|
||||
|
||||
@classmethod
|
||||
def get_config_path(cls) -> Path:
|
||||
"""Get the path to the config file."""
|
||||
# Check environment variable first
|
||||
env_path = os.getenv("LUK_TASKS_CONFIG")
|
||||
if env_path:
|
||||
return Path(env_path)
|
||||
|
||||
# Default to ~/.config/luk/tasks.toml
|
||||
return Path.home() / ".config" / "luk" / "tasks.toml"
|
||||
|
||||
@classmethod
|
||||
def load(cls, config_path: Optional[Path] = None) -> "TasksAppConfig":
|
||||
"""Load config from TOML file with defaults for missing values."""
|
||||
if config_path is None:
|
||||
config_path = cls.get_config_path()
|
||||
|
||||
if config_path.exists():
|
||||
try:
|
||||
with open(config_path, "r") as f:
|
||||
data = toml.load(f)
|
||||
logger.info(f"Loaded config from {config_path}")
|
||||
return cls.model_validate(data)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error loading config from {config_path}: {e}")
|
||||
logger.warning("Using default configuration")
|
||||
return cls()
|
||||
else:
|
||||
logger.info(f"No config file at {config_path}, using defaults")
|
||||
return cls()
|
||||
|
||||
def save(self, config_path: Optional[Path] = None) -> None:
|
||||
"""Save current config to TOML file."""
|
||||
if config_path is None:
|
||||
config_path = self.get_config_path()
|
||||
|
||||
# Ensure parent directory exists
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(config_path, "w") as f:
|
||||
toml.dump(self.model_dump(), f)
|
||||
logger.info(f"Saved config to {config_path}")
|
||||
|
||||
|
||||
# Global config instance (lazy-loaded)
|
||||
_config: Optional[TasksAppConfig] = None
|
||||
|
||||
|
||||
def get_config() -> TasksAppConfig:
|
||||
"""Get the global config instance, loading it if necessary."""
|
||||
global _config
|
||||
if _config is None:
|
||||
_config = TasksAppConfig.load()
|
||||
return _config
|
||||
|
||||
|
||||
def reload_config() -> TasksAppConfig:
|
||||
"""Force reload of the config from disk."""
|
||||
global _config
|
||||
_config = TasksAppConfig.load()
|
||||
return _config
|
||||
232
src/tasks/screens/FilterScreens.py
Normal file
232
src/tasks/screens/FilterScreens.py
Normal file
@@ -0,0 +1,232 @@
|
||||
"""Filter selection screens for Tasks TUI."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from textual import on
|
||||
from textual.app import ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.containers import Container, Vertical
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Label, SelectionList, Button
|
||||
from textual.widgets.selection_list import Selection
|
||||
|
||||
|
||||
class ProjectFilterScreen(ModalScreen[Optional[str]]):
|
||||
"""Modal screen for selecting a project filter."""
|
||||
|
||||
BINDINGS = [
|
||||
Binding("escape", "cancel", "Cancel"),
|
||||
Binding("enter", "select", "Select"),
|
||||
]
|
||||
|
||||
DEFAULT_CSS = """
|
||||
ProjectFilterScreen {
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
ProjectFilterScreen #filter-container {
|
||||
width: 50;
|
||||
height: auto;
|
||||
max-height: 80%;
|
||||
background: $surface;
|
||||
border: thick $primary;
|
||||
padding: 1 2;
|
||||
}
|
||||
|
||||
ProjectFilterScreen #filter-title {
|
||||
text-style: bold;
|
||||
width: 1fr;
|
||||
height: 1;
|
||||
text-align: center;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
ProjectFilterScreen SelectionList {
|
||||
height: auto;
|
||||
max-height: 15;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
ProjectFilterScreen #filter-buttons {
|
||||
width: 1fr;
|
||||
height: auto;
|
||||
align: center middle;
|
||||
margin-top: 1;
|
||||
}
|
||||
|
||||
ProjectFilterScreen Button {
|
||||
margin: 0 1;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
projects: list[tuple[str, int]], # List of (project_name, task_count)
|
||||
current_filter: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the project filter screen.
|
||||
|
||||
Args:
|
||||
projects: List of (project_name, task_count) tuples
|
||||
current_filter: Currently selected project filter
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._projects = projects
|
||||
self._current_filter = current_filter
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
with Container(id="filter-container"):
|
||||
yield Label("Select Project", id="filter-title")
|
||||
|
||||
selections = [
|
||||
Selection(
|
||||
f"{name} ({count})",
|
||||
name,
|
||||
initial_state=name == self._current_filter,
|
||||
)
|
||||
for name, count in self._projects
|
||||
]
|
||||
|
||||
yield SelectionList[str](*selections, id="project-list")
|
||||
|
||||
with Container(id="filter-buttons"):
|
||||
yield Button("Cancel", id="cancel", variant="default")
|
||||
yield Button("Clear", id="clear", variant="warning")
|
||||
yield Button("Apply", id="apply", variant="primary")
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Focus the selection list."""
|
||||
self.query_one("#project-list", SelectionList).focus()
|
||||
|
||||
@on(Button.Pressed, "#apply")
|
||||
def handle_apply(self) -> None:
|
||||
selection_list = self.query_one("#project-list", SelectionList)
|
||||
selected = list(selection_list.selected)
|
||||
if selected:
|
||||
self.dismiss(selected[0]) # Return first selected project
|
||||
else:
|
||||
self.dismiss(None)
|
||||
|
||||
@on(Button.Pressed, "#clear")
|
||||
def handle_clear(self) -> None:
|
||||
self.dismiss(None)
|
||||
|
||||
@on(Button.Pressed, "#cancel")
|
||||
def handle_cancel(self) -> None:
|
||||
self.dismiss(self._current_filter) # Return unchanged
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
self.dismiss(self._current_filter)
|
||||
|
||||
def action_select(self) -> None:
|
||||
self.handle_apply()
|
||||
|
||||
|
||||
class TagFilterScreen(ModalScreen[list[str]]):
|
||||
"""Modal screen for selecting tag filters (multi-select)."""
|
||||
|
||||
BINDINGS = [
|
||||
Binding("escape", "cancel", "Cancel"),
|
||||
Binding("enter", "select", "Select"),
|
||||
]
|
||||
|
||||
DEFAULT_CSS = """
|
||||
TagFilterScreen {
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
TagFilterScreen #filter-container {
|
||||
width: 50;
|
||||
height: auto;
|
||||
max-height: 80%;
|
||||
background: $surface;
|
||||
border: thick $primary;
|
||||
padding: 1 2;
|
||||
}
|
||||
|
||||
TagFilterScreen #filter-title {
|
||||
text-style: bold;
|
||||
width: 1fr;
|
||||
height: 1;
|
||||
text-align: center;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
TagFilterScreen SelectionList {
|
||||
height: auto;
|
||||
max-height: 15;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
TagFilterScreen #filter-buttons {
|
||||
width: 1fr;
|
||||
height: auto;
|
||||
align: center middle;
|
||||
margin-top: 1;
|
||||
}
|
||||
|
||||
TagFilterScreen Button {
|
||||
margin: 0 1;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tags: list[str],
|
||||
current_filters: list[str],
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the tag filter screen.
|
||||
|
||||
Args:
|
||||
tags: List of available tags
|
||||
current_filters: Currently selected tag filters
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._tags = tags
|
||||
self._current_filters = current_filters
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
with Container(id="filter-container"):
|
||||
yield Label("Select Tags (multi-select)", id="filter-title")
|
||||
|
||||
selections = [
|
||||
Selection(
|
||||
f"+{tag}",
|
||||
tag,
|
||||
initial_state=tag in self._current_filters,
|
||||
)
|
||||
for tag in self._tags
|
||||
]
|
||||
|
||||
yield SelectionList[str](*selections, id="tag-list")
|
||||
|
||||
with Container(id="filter-buttons"):
|
||||
yield Button("Cancel", id="cancel", variant="default")
|
||||
yield Button("Clear", id="clear", variant="warning")
|
||||
yield Button("Apply", id="apply", variant="primary")
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Focus the selection list."""
|
||||
self.query_one("#tag-list", SelectionList).focus()
|
||||
|
||||
@on(Button.Pressed, "#apply")
|
||||
def handle_apply(self) -> None:
|
||||
selection_list = self.query_one("#tag-list", SelectionList)
|
||||
selected = list(selection_list.selected)
|
||||
self.dismiss(selected)
|
||||
|
||||
@on(Button.Pressed, "#clear")
|
||||
def handle_clear(self) -> None:
|
||||
self.dismiss([])
|
||||
|
||||
@on(Button.Pressed, "#cancel")
|
||||
def handle_cancel(self) -> None:
|
||||
self.dismiss(self._current_filters) # Return unchanged
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
self.dismiss(self._current_filters)
|
||||
|
||||
def action_select(self) -> None:
|
||||
self.handle_apply()
|
||||
5
src/tasks/screens/__init__.py
Normal file
5
src/tasks/screens/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Screen components for Tasks TUI."""
|
||||
|
||||
from .FilterScreens import ProjectFilterScreen, TagFilterScreen
|
||||
|
||||
__all__ = ["ProjectFilterScreen", "TagFilterScreen"]
|
||||
1
src/tasks/widgets/__init__.py
Normal file
1
src/tasks/widgets/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Widget components for Tasks TUI."""
|
||||
Reference in New Issue
Block a user