task creation

This commit is contained in:
Bendt
2025-12-18 12:09:21 -05:00
parent 82fbc31683
commit 4dbb7c5fea
4 changed files with 451 additions and 50 deletions

305
src/services/task_client.py Normal file
View File

@@ -0,0 +1,305 @@
"""Unified task client that supports multiple backends (taskwarrior, dstask)."""
import asyncio
import json
import logging
import shlex
from typing import Tuple, List, Dict, Any, Optional
from src.maildir_gtd.config import get_config
logger = logging.getLogger(__name__)
def get_backend_info() -> Tuple[str, str]:
"""Get the configured backend name and command path.
Returns:
Tuple of (backend_name, command_path)
"""
config = get_config()
backend = config.task.backend
if backend == "dstask":
return "dstask", config.task.dstask_path
else:
return "taskwarrior", config.task.taskwarrior_path
async def create_task(
task_description: str,
tags: List[str] = None,
project: str = None,
due: str = None,
priority: str = None,
) -> Tuple[bool, Optional[str]]:
"""
Create a new task using the configured backend.
Args:
task_description: Description of the task
tags: List of tags to apply to the task
project: Project to which the task belongs
due: Due date in the format the backend accepts
priority: Priority of the task (H, M, L)
Returns:
Tuple containing:
- Success status (True if operation was successful)
- Task ID/message or error message
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
return await _create_task_dstask(
cmd_path, task_description, tags, project, due, priority
)
else:
return await _create_task_taskwarrior(
cmd_path, task_description, tags, project, due, priority
)
except Exception as e:
logger.error(f"Exception during task creation: {e}")
return False, str(e)
async def _create_task_taskwarrior(
cmd_path: str,
task_description: str,
tags: List[str] = None,
project: str = None,
due: str = None,
priority: str = None,
) -> Tuple[bool, Optional[str]]:
"""Create task using taskwarrior."""
cmd = [cmd_path, "add"]
# Add project if specified
if project:
cmd.append(f"project:{project}")
# Add tags if specified
if tags:
for tag in tags:
if tag:
cmd.append(f"+{tag}")
# Add due date if specified
if due:
cmd.append(f"due:{due}")
# Add priority if specified
if priority and priority in ["H", "M", "L"]:
cmd.append(f"priority:{priority}")
# Add task description
cmd.append(task_description)
# Use shlex.join for proper escaping
cmd_str = shlex.join(cmd)
logger.debug(f"Taskwarrior command: {cmd_str}")
process = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return True, stdout.decode().strip()
else:
error_msg = stderr.decode().strip()
logger.error(f"Error creating task: {error_msg}")
return False, error_msg
async def _create_task_dstask(
cmd_path: str,
task_description: str,
tags: List[str] = None,
project: str = None,
due: str = None,
priority: str = None,
) -> Tuple[bool, Optional[str]]:
"""Create task using dstask.
dstask syntax: dstask add [+tag...] [project:X] [priority:X] [due:X] description
"""
cmd = [cmd_path, "add"]
# Add tags if specified (dstask uses +tag syntax like taskwarrior)
if tags:
for tag in tags:
if tag:
cmd.append(f"+{tag}")
# Add project if specified
if project:
cmd.append(f"project:{project}")
# Add priority if specified (dstask uses P1, P2, P3 but also accepts priority:H/M/L)
if priority and priority in ["H", "M", "L"]:
# Map to dstask priority format
priority_map = {"H": "P1", "M": "P2", "L": "P3"}
cmd.append(priority_map[priority])
# Add due date if specified
if due:
cmd.append(f"due:{due}")
# Add task description (must be last for dstask)
cmd.append(task_description)
# Use shlex.join for proper escaping
cmd_str = shlex.join(cmd)
logger.debug(f"dstask command: {cmd_str}")
process = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return True, stdout.decode().strip()
else:
error_msg = stderr.decode().strip()
logger.error(f"Error creating dstask task: {error_msg}")
return False, error_msg
async def list_tasks(filter_str: str = "") -> Tuple[List[Dict[str, Any]], bool]:
"""
List tasks from the configured backend.
Args:
filter_str: Optional filter string
Returns:
Tuple containing:
- List of task dictionaries
- Success status (True if operation was successful)
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
return await _list_tasks_dstask(cmd_path, filter_str)
else:
return await _list_tasks_taskwarrior(cmd_path, filter_str)
except Exception as e:
logger.error(f"Exception during task listing: {e}")
return [], False
async def _list_tasks_taskwarrior(
cmd_path: str, filter_str: str = ""
) -> Tuple[List[Dict[str, Any]], bool]:
"""List tasks using taskwarrior."""
cmd = f"{shlex.quote(cmd_path)} {filter_str} export"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
tasks = json.loads(stdout.decode())
return tasks, True
else:
logger.error(f"Error listing tasks: {stderr.decode()}")
return [], False
async def _list_tasks_dstask(
cmd_path: str, filter_str: str = ""
) -> Tuple[List[Dict[str, Any]], bool]:
"""List tasks using dstask."""
# dstask uses 'dstask export' for JSON output
cmd = f"{shlex.quote(cmd_path)} {filter_str} export"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
try:
tasks = json.loads(stdout.decode())
return tasks, True
except json.JSONDecodeError:
# dstask might output tasks differently
logger.warning("Could not parse dstask JSON output")
return [], False
else:
logger.error(f"Error listing dstask tasks: {stderr.decode()}")
return [], False
async def complete_task(task_id: str) -> bool:
"""
Mark a task as completed.
Args:
task_id: ID of the task to complete
Returns:
True if task was completed successfully, False otherwise
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
cmd = f"{shlex.quote(cmd_path)} done {task_id}"
else:
cmd = f"echo 'yes' | {shlex.quote(cmd_path)} {task_id} done"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logger.error(f"Exception during task completion: {e}")
return False
async def delete_task(task_id: str) -> bool:
"""
Delete a task.
Args:
task_id: ID of the task to delete
Returns:
True if task was deleted successfully, False otherwise
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
cmd = f"{shlex.quote(cmd_path)} remove {task_id}"
else:
cmd = f"echo 'yes' | {shlex.quote(cmd_path)} {task_id} delete"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logger.error(f"Exception during task deletion: {e}")
return False