Compare commits

...

3 Commits

Author SHA1 Message Date
Bendt
4dbb7c5fea task creation 2025-12-18 12:09:21 -05:00
Bendt
82fbc31683 better link display and opening 2025-12-18 11:57:29 -05:00
Bendt
e08f552386 link panel 2025-12-18 11:43:37 -05:00
13 changed files with 1629 additions and 170 deletions

BIN
.coverage

Binary file not shown.

View File

@@ -33,6 +33,8 @@ dependencies = [
"openai>=1.78.1",
"orjson>=3.10.18",
"pillow>=11.2.1",
"pydantic>=2.0.0",
"pydantic-settings>=2.0.0",
"python-dateutil>=2.9.0.post0",
"python-docx>=1.1.2",
"requests>=2.31.0",
@@ -40,6 +42,7 @@ dependencies = [
"textual>=3.2.0",
"textual-image>=0.8.2",
"ticktick-py>=2.0.0",
"toml>=0.10.0",
]
[project.scripts]

View File

@@ -1,5 +1,8 @@
from .config import get_config, MaildirGTDConfig
from .message_store import MessageStore
from .widgets.ContentContainer import ContentContainer
from .widgets.EnvelopeListItem import EnvelopeListItem, GroupHeader
from .screens.LinkPanel import LinkPanel
from .actions.task import action_create_task
from .actions.open import action_open
from .actions.delete import delete_current
@@ -102,6 +105,7 @@ class EmailViewerApp(App):
Binding("q", "quit", "Quit application"),
Binding("h", "toggle_header", "Toggle Envelope Header"),
Binding("t", "create_task", "Create Task"),
Binding("l", "open_links", "Show Links"),
Binding("%", "reload", "Reload message list"),
Binding("1", "focus_1", "Focus Accounts Panel"),
Binding("2", "focus_2", "Focus Folders Panel"),
@@ -358,68 +362,27 @@ class EmailViewerApp(App):
folders_list.loading = False
def _populate_list_view(self) -> None:
"""Populate the ListView with new items. This clears existing items."""
"""Populate the ListView with new items using the new EnvelopeListItem widget."""
envelopes_list = self.query_one("#envelopes_list", ListView)
envelopes_list.clear()
config = get_config()
for item in self.message_store.envelopes:
if item and item.get("type") == "header":
envelopes_list.append(
ListItem(
Container(
Label("", classes="checkbox"), # Hidden checkbox for header
Label(
item["label"],
classes="group_header",
markup=False,
),
classes="envelope_item_row",
)
)
# Use the new GroupHeader widget for date groupings
envelopes_list.append(ListItem(GroupHeader(label=item["label"])))
elif item:
# Use the new EnvelopeListItem widget
message_id = int(item.get("id", 0))
is_selected = message_id in self.selected_messages
envelope_widget = EnvelopeListItem(
envelope=item,
config=config.envelope_display,
is_selected=is_selected,
)
elif item: # Check if not None
# Extract sender and date
sender_name = item.get("from", {}).get(
"name", item.get("from", {}).get("addr", "Unknown")
)
if not sender_name:
sender_name = item.get("from", {}).get("addr", "Unknown")
envelopes_list.append(ListItem(envelope_widget))
# Truncate sender name
max_sender_len = 25 # Adjust as needed
if len(sender_name) > max_sender_len:
sender_name = sender_name[: max_sender_len - 3] + "..."
message_date_str = item.get("date", "")
formatted_date = ""
if message_date_str:
try:
# Parse the date string, handling potential timezone info
dt_object = datetime.fromisoformat(message_date_str)
formatted_date = dt_object.strftime("%m/%d %H:%M")
except ValueError:
formatted_date = "Invalid Date"
list_item = ListItem(
Container(
Container(
Label("", classes="checkbox"), # Placeholder for checkbox
Label(sender_name, classes="sender_name"),
Label(formatted_date, classes="message_date"),
classes="envelope_header_row",
),
Container(
Label(
str(item.get("subject", "")).strip(),
classes="email_subject",
markup=False,
),
classes="envelope_subject_row",
),
classes="envelope_item_row",
)
)
envelopes_list.append(list_item)
self.refresh_list_view_items() # Initial refresh of item states
def refresh_list_view_items(self) -> None:
@@ -429,46 +392,17 @@ class EmailViewerApp(App):
if isinstance(list_item, ListItem):
item_data = self.message_store.envelopes[i]
# Find the checkbox label within the ListItem's children
# checkbox_label = list_item.query_one(".checkbox", Label)
if item_data and item_data.get("type") != "header":
message_id = int(item_data["id"])
is_selected = message_id in self.selected_messages or False
list_item.set_class(is_selected, "selection")
# if checkbox_label:
# checkbox_label.update("\uf4a7" if is_selected else "\ue640")
# checkbox_label.display = True # Always display checkbox
# list_item.highlighted = is_selected
# # Update sender and date labels
# sender_name = item_data.get("from", {}).get("name", item_data.get("from", {}).get("addr", "Unknown"))
# if not sender_name:
# sender_name = item_data.get("from", {}).get("addr", "Unknown")
# max_sender_len = 25
# if len(sender_name) > max_sender_len:
# sender_name = sender_name[:max_sender_len-3] + "..."
# list_item.query_one(".sender_name", Label).update(sender_name)
# message_date_str = item_data.get("date", "")
# formatted_date = ""
# if message_date_str:
# try:
# dt_object = datetime.fromisoformat(message_date_str)
# formatted_date = dt_object.strftime("%m/%d %H:%M")
# except ValueError:
# formatted_date = "Invalid Date"
# list_item.query_one(".message_date", Label).update(formatted_date)
# else:
# # For header items, checkbox should be unchecked and visible
# checkbox_label.update("\ue640") # Always unchecked for headers
# checkbox_label.display = True # Always display checkbox
# list_item.highlighted = False # Headers are never highlighted for selection
# Update total messages count (this is still fine here)
# self.total_messages = self.message_store.total_messages
# Try to update the EnvelopeListItem's selection state
try:
envelope_widget = list_item.query_one(EnvelopeListItem)
envelope_widget.set_selected(is_selected)
except Exception:
pass # Widget may not exist or be of old type
def show_message(self, message_id: int, new_index=None) -> None:
if new_index:
@@ -602,6 +536,12 @@ class EmailViewerApp(App):
def action_create_task(self) -> None:
action_create_task(self)
def action_open_links(self) -> None:
"""Open the link panel showing links from the current message."""
content_container = self.query_one(ContentContainer)
links = content_container.get_links()
self.push_screen(LinkPanel(links))
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content").scroll_down()
@@ -644,15 +584,31 @@ class EmailViewerApp(App):
message_id = int(current_item_data["id"])
envelopes_list = self.query_one("#envelopes_list", ListView)
current_list_item = envelopes_list.children[self.highlighted_message_index]
checkbox_label = current_list_item.query_one(".checkbox", Label)
# Toggle selection state
if message_id in self.selected_messages:
self.selected_messages.remove(message_id)
checkbox_label.remove_class("x-list")
checkbox_label.update("\ue640")
is_selected = False
else:
self.selected_messages.add(message_id)
checkbox_label.add_class("x-list")
checkbox_label.update("\uf4a7")
is_selected = True
# Update the EnvelopeListItem widget
try:
envelope_widget = current_list_item.query_one(EnvelopeListItem)
envelope_widget.set_selected(is_selected)
except Exception:
# Fallback for old-style widgets
try:
checkbox_label = current_list_item.query_one(".checkbox", Label)
if is_selected:
checkbox_label.add_class("x-list")
checkbox_label.update("\uf4a7")
else:
checkbox_label.remove_class("x-list")
checkbox_label.update("\ue640")
except Exception:
pass
self._update_list_view_subtitle()

170
src/maildir_gtd/config.py Normal file
View File

@@ -0,0 +1,170 @@
"""Configuration system for MaildirGTD email reader using Pydantic."""
import logging
import os
from pathlib import Path
from typing import Literal, Optional
import toml
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class TaskBackendConfig(BaseModel):
"""Configuration for task management backend."""
backend: Literal["taskwarrior", "dstask"] = "taskwarrior"
taskwarrior_path: str = "task"
dstask_path: str = Field(
default_factory=lambda: str(Path.home() / ".local" / "bin" / "dstask")
)
class EnvelopeDisplayConfig(BaseModel):
"""Configuration for envelope list item rendering."""
# Sender display
max_sender_length: int = 25
# Date/time display
date_format: str = "%m/%d"
time_format: str = "%H:%M"
show_date: bool = True
show_time: bool = True
# Grouping
group_by: Literal["relative", "absolute"] = "relative"
# relative: "Today", "Yesterday", "This Week", etc.
# absolute: "December 2025", "November 2025", etc.
# Layout
lines: Literal[2, 3] = 2
# 2: sender/date on line 1, subject on line 2
# 3: sender/date on line 1, subject on line 2, preview on line 3
show_checkbox: bool = True
show_preview: bool = False # Only used when lines=3
# NerdFont icons for status
icon_unread: str = "\uf0e0" # nf-fa-envelope (filled)
icon_read: str = "\uf2b6" # nf-fa-envelope_open (open)
icon_flagged: str = "\uf024" # nf-fa-flag
icon_attachment: str = "\uf0c6" # nf-fa-paperclip
class KeybindingsConfig(BaseModel):
"""Keybinding customization."""
next_message: str = "j"
prev_message: str = "k"
delete: str = "#"
archive: str = "e"
open_by_id: str = "o"
quit: str = "q"
toggle_header: str = "h"
create_task: str = "t"
reload: str = "%"
toggle_sort: str = "s"
toggle_selection: str = "x"
clear_selection: str = "escape"
scroll_page_down: str = "space"
scroll_page_up: str = "b"
toggle_main_content: str = "w"
open_links: str = "l"
toggle_view_mode: str = "m"
class ContentDisplayConfig(BaseModel):
"""Configuration for message content display."""
# View mode: "markdown" for pretty rendering, "html" for raw/plain display
default_view_mode: Literal["markdown", "html"] = "markdown"
class LinkPanelConfig(BaseModel):
"""Configuration for the link panel."""
# Whether to close the panel after opening a link
close_on_open: bool = False
class ThemeConfig(BaseModel):
"""Theme/appearance settings."""
theme_name: str = "monokai"
class MaildirGTDConfig(BaseModel):
"""Main configuration for MaildirGTD email reader."""
task: TaskBackendConfig = Field(default_factory=TaskBackendConfig)
envelope_display: EnvelopeDisplayConfig = Field(
default_factory=EnvelopeDisplayConfig
)
content_display: ContentDisplayConfig = Field(default_factory=ContentDisplayConfig)
link_panel: LinkPanelConfig = Field(default_factory=LinkPanelConfig)
keybindings: KeybindingsConfig = Field(default_factory=KeybindingsConfig)
theme: ThemeConfig = Field(default_factory=ThemeConfig)
@classmethod
def get_config_path(cls) -> Path:
"""Get the path to the config file."""
# Check environment variable first
env_path = os.getenv("MAILDIR_GTD_CONFIG")
if env_path:
return Path(env_path)
# Default to ~/.config/luk/maildir_gtd.toml
return Path.home() / ".config" / "luk" / "maildir_gtd.toml"
@classmethod
def load(cls, config_path: Optional[Path] = None) -> "MaildirGTDConfig":
"""Load config from TOML file with defaults for missing values."""
if config_path is None:
config_path = cls.get_config_path()
if config_path.exists():
try:
with open(config_path, "r") as f:
data = toml.load(f)
logger.info(f"Loaded config from {config_path}")
return cls.model_validate(data)
except Exception as e:
logger.warning(f"Error loading config from {config_path}: {e}")
logger.warning("Using default configuration")
return cls()
else:
logger.info(f"No config file at {config_path}, using defaults")
return cls()
def save(self, config_path: Optional[Path] = None) -> None:
"""Save current config to TOML file."""
if config_path is None:
config_path = self.get_config_path()
# Ensure parent directory exists
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w") as f:
toml.dump(self.model_dump(), f)
logger.info(f"Saved config to {config_path}")
# Global config instance (lazy-loaded)
_config: Optional[MaildirGTDConfig] = None
def get_config() -> MaildirGTDConfig:
"""Get the global config instance, loading it if necessary."""
global _config
if _config is None:
_config = MaildirGTDConfig.load()
return _config
def reload_config() -> MaildirGTDConfig:
"""Force reload of the config from disk."""
global _config
_config = MaildirGTDConfig.load()
return _config

View File

@@ -70,6 +70,106 @@ Markdown {
padding: 1 2;
}
/* =====================================================
NEW EnvelopeListItem and GroupHeader styles
===================================================== */
/* EnvelopeListItem - the main envelope display widget */
EnvelopeListItem {
height: auto;
width: 1fr;
padding: 0;
}
EnvelopeListItem .envelope-content {
height: auto;
width: 1fr;
}
EnvelopeListItem .envelope-row-1 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-2 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-3 {
height: 1;
width: 1fr;
}
EnvelopeListItem .status-icon {
width: 3;
padding: 0 1 0 0;
color: $text-muted;
}
EnvelopeListItem .status-icon.unread {
color: $accent;
}
EnvelopeListItem .checkbox {
width: 2;
padding: 0 1 0 0;
}
EnvelopeListItem .sender-name {
width: 1fr;
}
EnvelopeListItem .message-datetime {
width: auto;
padding: 0 1;
color: $secondary;
}
EnvelopeListItem .email-subject {
width: 1fr;
padding: 0 4;
}
EnvelopeListItem .email-preview {
width: 1fr;
padding: 0 4;
color: $text-muted;
}
/* Unread message styling */
EnvelopeListItem.unread .sender-name {
text-style: bold;
}
EnvelopeListItem.unread .email-subject {
text-style: bold;
}
/* Selected message styling */
EnvelopeListItem.selected {
tint: $accent 20%;
}
/* GroupHeader - date group separator */
GroupHeader {
height: 1;
width: 1fr;
background: rgb(64, 62, 65);
}
GroupHeader .group-header-label {
color: rgb(160, 160, 160);
text-style: bold;
padding: 0 1;
width: 1fr;
}
/* =====================================================
END NEW styles
===================================================== */
/* Legacy styles (keeping for backward compatibility) */
.email_subject {
width: 1fr;
padding: 0 2;
@@ -141,7 +241,7 @@ Markdown {
tint: $accent 20%;
}
#open_message_container, #create_task_container {
#open_message_container {
border: panel $border;
dock: right;
width: 25%;

View File

@@ -2,13 +2,85 @@ import logging
from textual.screen import ModalScreen
from textual.widgets import Input, Label, Button, ListView, ListItem
from textual.containers import Vertical, Horizontal, Container
from textual.binding import Binding
from textual import on, work
from src.services.taskwarrior import client as taskwarrior_client
from src.services.task_client import create_task, get_backend_info
class CreateTaskScreen(ModalScreen):
"""Screen for creating a new task."""
BINDINGS = [
Binding("escape", "cancel", "Close"),
Binding("ctrl+s", "submit", "Create Task"),
]
DEFAULT_CSS = """
CreateTaskScreen {
align: right middle;
}
CreateTaskScreen #create_task_container {
dock: right;
width: 40%;
min-width: 50;
max-width: 80;
height: 100%;
background: $surface;
border: round $primary;
padding: 1 2;
}
CreateTaskScreen #create_task_container:focus-within {
border: round $accent;
}
CreateTaskScreen #create_task_form {
height: auto;
width: 1fr;
}
CreateTaskScreen .form-field {
height: auto;
width: 1fr;
margin-bottom: 1;
}
CreateTaskScreen .form-field Label {
height: 1;
width: 1fr;
color: $text-muted;
margin-bottom: 0;
}
CreateTaskScreen .form-field Input {
width: 1fr;
}
CreateTaskScreen .form-field Input:focus {
border: tall $accent;
}
CreateTaskScreen .button-row {
height: auto;
width: 1fr;
align: center middle;
margin-top: 1;
}
CreateTaskScreen .button-row Button {
margin: 0 1;
}
CreateTaskScreen .form-hint {
height: 1;
width: 1fr;
color: $text-muted;
text-align: center;
margin-top: 1;
}
"""
def __init__(self, subject="", from_addr="", **kwargs):
super().__init__(**kwargs)
self.subject = subject
@@ -16,62 +88,84 @@ class CreateTaskScreen(ModalScreen):
self.selected_project = None
def compose(self):
yield Container(
Vertical(
Horizontal(
Label("Subject:"),
Input(
placeholder="Task subject",
with Container(id="create_task_container"):
with Vertical(id="create_task_form"):
# Subject field
with Vertical(classes="form-field"):
yield Label("Subject")
yield Input(
placeholder="Task description",
value=self.subject,
id="subject_input",
),
),
Horizontal(
Label("Project:"),
Input(placeholder="Project name", id="project_input"),
),
Horizontal(
Label("Tags:"),
Input(placeholder="Comma-separated tags", id="tags_input"),
),
Horizontal(
Label("Due:"),
Input(
placeholder="Due date (e.g., today, tomorrow, fri)",
)
# Project field
with Vertical(classes="form-field"):
yield Label("Project")
yield Input(placeholder="e.g., work, home", id="project_input")
# Tags field
with Vertical(classes="form-field"):
yield Label("Tags")
yield Input(placeholder="tag1, tag2, ...", id="tags_input")
# Due date field
with Vertical(classes="form-field"):
yield Label("Due")
yield Input(
placeholder="today, tomorrow, fri, 2024-01-15",
id="due_input",
),
),
Horizontal(
Label("Priority:"),
Input(placeholder="Priority (H, M, L)", id="priority_input"),
),
Horizontal(
Button("Create", id="create_btn", variant="primary"),
Button("Cancel", id="cancel_btn", variant="error"),
),
id="create_task_form",
),
id="create_task_container",
)
)
# Priority field
with Vertical(classes="form-field"):
yield Label("Priority")
yield Input(placeholder="H, M, or L", id="priority_input")
# Buttons
with Horizontal(classes="button-row"):
yield Button("Create", id="create_btn", variant="primary")
yield Button("Cancel", id="cancel_btn", variant="error")
yield Label("ctrl+s: create, esc: cancel", classes="form-hint")
def on_mount(self):
self.query_one("#create_task_container",
Container).border_title = "New Task (taskwarrior)"
self.styles.align = ("center", "middle")
backend_name, _ = get_backend_info()
container = self.query_one("#create_task_container", Container)
container.border_title = "\uf0ae New Task" # nf-fa-tasks
container.border_subtitle = backend_name
# Focus the subject input
self.query_one("#subject_input", Input).focus()
def action_cancel(self):
"""Close the screen."""
self.dismiss()
def action_submit(self):
"""Submit the form."""
self._create_task()
@on(Input.Submitted)
def on_input_submitted(self, event: Input.Submitted):
"""Handle Enter key in any input field."""
self._create_task()
@on(Button.Pressed, "#create_btn")
def on_create_pressed(self):
"""Create the task when the Create button is pressed."""
self._create_task()
def _create_task(self):
"""Gather form data and create the task."""
# Get input values
subject = self.query_one("#subject_input").value
project = self.query_one("#project_input").value
tags_input = self.query_one("#tags_input").value
due = self.query_one("#due_input").value
priority = self.query_one("#priority_input").value
subject = self.query_one("#subject_input", Input).value
project = self.query_one("#project_input", Input).value
tags_input = self.query_one("#tags_input", Input).value
due = self.query_one("#due_input", Input).value
priority = self.query_one("#priority_input", Input).value
# Process tags (split by commas and trim whitespace)
tags = [tag.strip()
for tag in tags_input.split(",")] if tags_input else []
tags = [tag.strip() for tag in tags_input.split(",")] if tags_input else []
# Add a tag for the sender, if provided
if self.from_addr and "@" in self.from_addr:
@@ -91,18 +185,20 @@ class CreateTaskScreen(ModalScreen):
async def create_task_worker(
self, subject, tags=None, project=None, due=None, priority=None
):
"""Worker to create a task using the Taskwarrior API client."""
"""Worker to create a task using the configured backend."""
if not subject:
self.app.show_status("Task subject cannot be empty.", "error")
return
# Validate priority
if priority and priority not in ["H", "M", "L"]:
if priority and priority.upper() not in ["H", "M", "L"]:
self.app.show_status("Priority must be H, M, or L.", "warning")
priority = None
elif priority:
priority = priority.upper()
# Create the task
success, result = await taskwarrior_client.create_task(
# Create the task using the unified client
success, result = await create_task(
task_description=subject,
tags=tags or [],
project=project,

View File

@@ -0,0 +1,501 @@
"""Link panel for viewing and opening URLs from email messages."""
import re
import webbrowser
from dataclasses import dataclass, field
from typing import List, Optional
from urllib.parse import urlparse
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Label, ListView, ListItem, Static
from src.maildir_gtd.config import get_config
@dataclass
class LinkItem:
"""Represents a link extracted from an email."""
url: str
label: str # Derived from anchor text or URL
domain: str # Extracted for display
short_display: str # Truncated/friendly display
context: str = "" # Surrounding text for context
mnemonic: str = "" # Quick-select key hint
@classmethod
def from_url(
cls,
url: str,
anchor_text: str = "",
context: str = "",
max_display_len: int = 60,
) -> "LinkItem":
"""Create a LinkItem from a URL and optional anchor text."""
parsed = urlparse(url)
domain = parsed.netloc.replace("www.", "")
# Use anchor text as label if available, otherwise derive from URL
if anchor_text and anchor_text.strip():
label = anchor_text.strip()
else:
# Try to derive a meaningful label from the URL path
label = cls._derive_label_from_url(parsed)
# Create short display version
short_display = cls._shorten_url(url, domain, parsed.path, max_display_len)
return cls(
url=url,
label=label,
domain=domain,
short_display=short_display,
context=context[:80] if context else "",
)
@staticmethod
def _derive_label_from_url(parsed) -> str:
"""Derive a human-readable label from URL components."""
path = parsed.path.strip("/")
if not path:
return parsed.netloc
# Split path and take last meaningful segment
segments = [s for s in path.split("/") if s]
if segments:
last = segments[-1]
# Remove file extensions
last = re.sub(r"\.[a-zA-Z0-9]+$", "", last)
# Replace common separators with spaces
last = re.sub(r"[-_]", " ", last)
# Capitalize words
return last.title()[:40]
return parsed.netloc
@staticmethod
def _shorten_url(url: str, domain: str, path: str, max_len: int) -> str:
"""Create a shortened, readable version of the URL.
Intelligently shortens URLs by:
- Special handling for known sites (GitHub, Google Docs, Jira, GitLab)
- Keeping first and last path segments, eliding middle only if needed
- Adapting to available width
"""
# Special handling for common sites
path = path.strip("/")
# GitHub: user/repo/issues/123 -> user/repo #123
if "github.com" in domain:
match = re.match(r"([^/]+/[^/]+)/(issues|pull)/(\d+)", path)
if match:
repo, type_, num = match.groups()
icon = "#" if type_ == "issues" else "PR#"
return f"{domain} > {repo} {icon}{num}"
match = re.match(r"([^/]+/[^/]+)", path)
if match:
return f"{domain} > {match.group(1)}"
# Google Docs
if "docs.google.com" in domain:
if "/document/" in path:
return f"{domain} > Document"
if "/spreadsheets/" in path:
return f"{domain} > Spreadsheet"
if "/presentation/" in path:
return f"{domain} > Slides"
# Jira/Atlassian
if "atlassian.net" in domain or "jira" in domain.lower():
match = re.search(r"([A-Z]+-\d+)", path)
if match:
return f"{domain} > {match.group(1)}"
# GitLab
if "gitlab" in domain.lower():
match = re.match(r"([^/]+/[^/]+)/-/(issues|merge_requests)/(\d+)", path)
if match:
repo, type_, num = match.groups()
icon = "#" if type_ == "issues" else "MR!"
return f"{domain} > {repo} {icon}{num}"
# Generic shortening - keep URL readable
if len(url) <= max_len:
return url
# Build shortened path, keeping as many segments as fit
path_parts = [p for p in path.split("/") if p]
if not path_parts:
return domain
# Try to fit the full path first
full_path = "/".join(path_parts)
result = f"{domain} > {full_path}"
if len(result) <= max_len:
return result
# Keep first segment + last two segments if possible
if len(path_parts) >= 3:
short_path = f"{path_parts[0]}/.../{path_parts[-2]}/{path_parts[-1]}"
result = f"{domain} > {short_path}"
if len(result) <= max_len:
return result
# Keep first + last segment
if len(path_parts) >= 2:
short_path = f"{path_parts[0]}/.../{path_parts[-1]}"
result = f"{domain} > {short_path}"
if len(result) <= max_len:
return result
# Just last segment
result = f"{domain} > .../{path_parts[-1]}"
if len(result) <= max_len:
return result
# Truncate with ellipsis as last resort
result = f"{domain} > {path_parts[-1]}"
if len(result) > max_len:
result = result[: max_len - 3] + "..."
return result
def extract_links_from_content(content: str) -> List[LinkItem]:
"""Extract all links from HTML or markdown content."""
links: List[LinkItem] = []
seen_urls: set = set()
# Pattern for HTML links: <a href="...">text</a>
html_pattern = r'<a\s+[^>]*href=["\']([^"\']+)["\'][^>]*>([^<]*)</a>'
for match in re.finditer(html_pattern, content, re.IGNORECASE):
url, anchor_text = match.groups()
if url and url not in seen_urls and _is_valid_url(url):
# Get surrounding context
start = max(0, match.start() - 40)
end = min(len(content), match.end() + 40)
context = _clean_context(content[start:end])
links.append(LinkItem.from_url(url, anchor_text, context))
seen_urls.add(url)
# Pattern for markdown links: [text](url)
md_pattern = r"\[([^\]]+)\]\(([^)]+)\)"
for match in re.finditer(md_pattern, content):
anchor_text, url = match.groups()
if url and url not in seen_urls and _is_valid_url(url):
start = max(0, match.start() - 40)
end = min(len(content), match.end() + 40)
context = _clean_context(content[start:end])
links.append(LinkItem.from_url(url, anchor_text, context))
seen_urls.add(url)
# Pattern for bare URLs
url_pattern = r'https?://[^\s<>"\'\)]+[^\s<>"\'\.\,\)\]]'
for match in re.finditer(url_pattern, content):
url = match.group(0)
if url not in seen_urls and _is_valid_url(url):
start = max(0, match.start() - 40)
end = min(len(content), match.end() + 40)
context = _clean_context(content[start:end])
links.append(LinkItem.from_url(url, "", context))
seen_urls.add(url)
# Assign mnemonic hints
_assign_mnemonics(links)
return links
def _is_valid_url(url: str) -> bool:
"""Check if a URL is valid and worth displaying."""
if not url:
return False
# Skip mailto, tel, javascript, etc.
if re.match(r"^(mailto|tel|javascript|data|#):", url, re.IGNORECASE):
return False
# Skip very short URLs or fragments
if len(url) < 10:
return False
# Must start with http/https
if not url.startswith(("http://", "https://")):
return False
return True
def _clean_context(context: str) -> str:
"""Clean up context string for display."""
# Remove HTML tags
context = re.sub(r"<[^>]+>", "", context)
# Normalize whitespace
context = " ".join(context.split())
return context.strip()
def _assign_mnemonics(links: List[LinkItem]) -> None:
"""Assign unique mnemonic key hints to links."""
used_mnemonics: set = set()
# Characters to use for mnemonics (easily typeable)
# Exclude keys used by app/panel bindings: h,j,k,l (navigation), q (quit),
# b (page up), e (archive), o (open), s (sort), t (task), w (toggle), x (select)
reserved_keys = set("hjklqbeostwx")
available_chars = "".join(
c for c in "asdfgqwertyuiopzxcvbnm" if c not in reserved_keys
)
for link in links:
mnemonic = None
# Try first letter of label (prioritize link text over domain)
if link.label:
first = link.label[0].lower()
if first in available_chars and first not in used_mnemonics:
mnemonic = first
used_mnemonics.add(first)
# Try first letter of domain as fallback
if not mnemonic and link.domain:
first = link.domain[0].lower()
if first in available_chars and first not in used_mnemonics:
mnemonic = first
used_mnemonics.add(first)
# Try other letters from label
if not mnemonic and link.label:
for char in link.label.lower():
if char in available_chars and char not in used_mnemonics:
mnemonic = char
used_mnemonics.add(char)
break
# Try first two letters combined logic
if not mnemonic:
# Try label word initials
candidates = []
if link.label:
words = link.label.split()
if len(words) >= 2:
candidates.append((words[0][0] + words[1][0]).lower())
if link.domain and len(link.domain) > 1:
candidates.append(link.domain[:2].lower())
for candidate in candidates:
if len(candidate) == 2 and candidate not in used_mnemonics:
# Check both chars are available
if all(c in available_chars for c in candidate):
mnemonic = candidate
used_mnemonics.add(candidate)
break
# Fallback: find any unused character
if not mnemonic:
for char in available_chars:
if char not in used_mnemonics:
mnemonic = char
used_mnemonics.add(char)
break
link.mnemonic = mnemonic or ""
class LinkListItem(Static):
"""Widget for displaying a single link in the list."""
DEFAULT_CSS = """
LinkListItem {
height: auto;
width: 1fr;
padding: 0 1;
}
"""
def __init__(self, link: LinkItem, index: int, **kwargs):
super().__init__(**kwargs)
self.link = link
self.index = index
def render(self) -> str:
"""Render the link item using Rich markup."""
mnemonic = self.link.mnemonic if self.link.mnemonic else "?"
# Line 1: [mnemonic] domain - label
line1 = (
f"[bold cyan]\\[{mnemonic}][/] [dim]{self.link.domain}[/] {self.link.label}"
)
# Line 2: shortened URL (indented)
line2 = f" [dim italic]{self.link.short_display}[/]"
return f"{line1}\n{line2}"
class LinkPanel(ModalScreen):
"""Side panel for viewing and opening links from the current message."""
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("enter", "open_selected", "Open Link"),
Binding("j", "next_link", "Next"),
Binding("k", "prev_link", "Previous"),
]
DEFAULT_CSS = """
LinkPanel {
align: right middle;
}
LinkPanel #link-panel-container {
dock: right;
width: 50%;
min-width: 60;
max-width: 100;
height: 100%;
background: $surface;
border: round $primary;
padding: 1;
}
LinkPanel #link-panel-container:focus-within {
border: round $accent;
}
LinkPanel .link-panel-title {
text-style: bold;
padding: 0 0 1 0;
color: $text;
}
LinkPanel .link-panel-hint {
color: $text-muted;
padding: 0 0 1 0;
}
LinkPanel #link-list {
height: 1fr;
scrollbar-size: 1 1;
}
LinkPanel #link-list > ListItem {
height: auto;
padding: 0;
}
LinkPanel #link-list > ListItem:hover {
background: $boost;
}
LinkPanel #link-list > ListItem.-highlight {
background: $accent 30%;
}
LinkPanel .no-links-label {
color: $text-muted;
padding: 2;
text-align: center;
}
"""
def __init__(self, links: List[LinkItem], **kwargs):
super().__init__(**kwargs)
self.links = links
self._mnemonic_map: dict[str, LinkItem] = {
link.mnemonic: link for link in links if link.mnemonic
}
def compose(self) -> ComposeResult:
with Container(id="link-panel-container"):
yield Label("\uf0c1 Links", classes="link-panel-title") # nf-fa-link
yield Label(
"j/k: navigate, enter: open, esc: close",
classes="link-panel-hint",
)
if self.links:
with ListView(id="link-list"):
for i, link in enumerate(self.links):
yield ListItem(LinkListItem(link, i))
else:
yield Label("No links found in this message.", classes="no-links-label")
def on_mount(self) -> None:
self.query_one("#link-panel-container").border_title = "Links"
self.query_one(
"#link-panel-container"
).border_subtitle = f"{len(self.links)} found"
if self.links:
self.query_one("#link-list").focus()
def on_key(self, event) -> None:
"""Handle mnemonic key presses."""
key = event.key.lower()
# Check for single-char mnemonic
if key in self._mnemonic_map:
self._open_link(self._mnemonic_map[key])
event.prevent_default()
return
# Check for two-char mnemonics (accumulate?)
# For simplicity, we'll just support single-char for now
# A more sophisticated approach would use a timeout buffer
def action_open_selected(self) -> None:
"""Open the currently selected link."""
if not self.links:
return
try:
link_list = self.query_one("#link-list", ListView)
if link_list.index is not None and 0 <= link_list.index < len(self.links):
self._open_link(self.links[link_list.index])
except Exception:
pass
def action_next_link(self) -> None:
"""Move to next link."""
try:
link_list = self.query_one("#link-list", ListView)
if link_list.index is not None and link_list.index < len(self.links) - 1:
link_list.index += 1
except Exception:
pass
def action_prev_link(self) -> None:
"""Move to previous link."""
try:
link_list = self.query_one("#link-list", ListView)
if link_list.index is not None and link_list.index > 0:
link_list.index -= 1
except Exception:
pass
def _open_link(self, link: LinkItem) -> None:
"""Open a link in the default browser."""
try:
webbrowser.open(link.url)
self.app.notify(f"Opened: {link.short_display}", title="Link Opened")
# Only dismiss if configured to close on open
config = get_config()
if config.link_panel.close_on_open:
self.dismiss()
except Exception as e:
self.app.notify(f"Failed to open link: {e}", severity="error")
@on(ListView.Selected)
def on_list_selected(self, event: ListView.Selected) -> None:
"""Handle list item selection (Enter key or click)."""
if event.list_view.index is not None and 0 <= event.list_view.index < len(
self.links
):
self._open_link(self.links[event.list_view.index])

View File

@@ -2,5 +2,13 @@
from .CreateTask import CreateTaskScreen
from .OpenMessage import OpenMessageScreen
from .DocumentViewer import DocumentViewerScreen
from .LinkPanel import LinkPanel, LinkItem, extract_links_from_content
__all__ = ["CreateTaskScreen", "OpenMessageScreen", "DocumentViewerScreen"]
__all__ = [
"CreateTaskScreen",
"OpenMessageScreen",
"DocumentViewerScreen",
"LinkPanel",
"LinkItem",
"extract_links_from_content",
]

View File

@@ -3,9 +3,13 @@ from textual import work
from textual.binding import Binding
from textual.containers import Vertical, ScrollableContainer
from textual.widgets import Static, Markdown, Label
from textual.reactive import reactive
from src.services.himalaya import client as himalaya_client
from src.maildir_gtd.config import get_config
from src.maildir_gtd.screens.LinkPanel import extract_links_from_content, LinkItem
import logging
from datetime import datetime
from typing import Literal, List
import re
import os
import sys
@@ -57,9 +61,15 @@ class EnvelopeHeader(Vertical):
class ContentContainer(ScrollableContainer):
"""Container for displaying email content with toggleable view modes."""
can_focus = True
# Reactive to track view mode and update UI
current_mode: reactive[Literal["markdown", "html"]] = reactive("markdown")
BINDINGS = [
Binding("m", "toggle_mode", "Toggle View Mode")
Binding("m", "toggle_mode", "Toggle View Mode"),
]
def __init__(self, **kwargs):
@@ -68,34 +78,53 @@ class ContentContainer(ScrollableContainer):
self.header = EnvelopeHeader(id="envelope_header")
self.content = Markdown("", id="markdown_content")
self.html_content = Static("", id="html_content", markup=False)
self.current_mode = "html" # Default to text mode
self.current_content = None
self.current_message_id = None
self.content_worker = None
# Load default view mode from config
config = get_config()
self.current_mode = config.content_display.default_view_mode
def compose(self):
yield self.content
yield self.html_content
def on_mount(self):
# Hide markdown content initially
# self.action_notify("loading message...")
self.content.styles.display = "none"
self.html_content.styles.display = "block"
# Set initial display based on config default
self._apply_view_mode()
self._update_mode_indicator()
async def action_toggle_mode(self):
"""Toggle between plaintext and HTML viewing modes."""
if self.current_mode == "html":
self.current_mode = "text"
def watch_current_mode(self, old_mode: str, new_mode: str) -> None:
"""React to mode changes."""
self._apply_view_mode()
self._update_mode_indicator()
def _apply_view_mode(self) -> None:
"""Apply the current view mode to widget visibility."""
if self.current_mode == "markdown":
self.html_content.styles.display = "none"
self.content.styles.display = "block"
else:
self.current_mode = "html"
self.content.styles.display = "none"
self.html_content.styles.display = "block"
# self.action_notify(f"switched to mode {self.current_mode}")
def _update_mode_indicator(self) -> None:
"""Update the border subtitle to show current mode."""
mode_label = "Markdown" if self.current_mode == "markdown" else "HTML/Text"
mode_icon = (
"\ue73e" if self.current_mode == "markdown" else "\uf121"
) # nf-md-language_markdown / nf-fa-code
self.border_subtitle = f"{mode_icon} {mode_label}"
async def action_toggle_mode(self):
"""Toggle between markdown and HTML viewing modes."""
if self.current_mode == "html":
self.current_mode = "markdown"
else:
self.current_mode = "html"
# Reload the content if we have a message ID
self.border_sibtitle = self.current_mode;
if self.current_message_id:
self.display_content(self.current_message_id)
@@ -113,19 +142,17 @@ class ContentContainer(ScrollableContainer):
if success:
self._update_content(content)
else:
self.notify(
f"Failed to fetch content for message ID {message_id}.")
self.notify(f"Failed to fetch content for message ID {message_id}.")
def display_content(self, message_id: int) -> None:
"""Display the content of a message."""
# self.action_notify(f"recieved message_id to display {message_id}")
if not message_id:
return
self.current_message_id = message_id
# Immediately show a loading message
if self.current_mode == "text":
if self.current_mode == "markdown":
self.content.update("Loading...")
else:
self.html_content.update("Loading...")
@@ -135,15 +162,20 @@ class ContentContainer(ScrollableContainer):
self.content_worker.cancel()
# Fetch content in the current mode
format_type = "text" if self.current_mode == "text" else "html"
self.content_worker = self.fetch_message_content(
message_id, format_type)
format_type = "text" if self.current_mode == "markdown" else "html"
self.content_worker = self.fetch_message_content(message_id, format_type)
def _update_content(self, content: str | None) -> None:
"""Update the content widgets with the fetched content."""
if content is None:
content = "(No content)"
# Store the raw content for link extraction
self.current_content = content
try:
if self.current_mode == "text":
# For text mode, use the Markdown widget
if self.current_mode == "markdown":
# For markdown mode, use the Markdown widget
self.content.update(content)
else:
# For HTML mode, use the Static widget with markup
@@ -169,7 +201,13 @@ class ContentContainer(ScrollableContainer):
except Exception as e:
logging.error(f"Error updating content: {e}")
if self.current_mode == "text":
if self.current_mode == "markdown":
self.content.update(f"Error displaying content: {e}")
else:
self.html_content.update(f"Error displaying content: {e}")
def get_links(self) -> List[LinkItem]:
"""Extract and return links from the current message content."""
if not self.current_content:
return []
return extract_links_from_content(self.current_content)

View File

@@ -0,0 +1,248 @@
"""Custom widget for rendering envelope list items with configurable display."""
from datetime import datetime
from typing import Any, Dict, Optional
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widgets import Label, Static
from src.maildir_gtd.config import EnvelopeDisplayConfig, get_config
class EnvelopeListItem(Static):
"""A widget for rendering a single envelope in the list.
Supports configurable layout:
- 2-line mode: sender/date on line 1, subject on line 2
- 3-line mode: adds a preview line
Displays read/unread status with NerdFont icons.
"""
DEFAULT_CSS = """
EnvelopeListItem {
height: auto;
width: 1fr;
padding: 0;
}
EnvelopeListItem .envelope-row-1 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-2 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-3 {
height: 1;
width: 1fr;
}
EnvelopeListItem .status-icon {
width: 2;
padding: 0 1 0 0;
}
EnvelopeListItem .checkbox {
width: 2;
padding: 0 1 0 0;
}
EnvelopeListItem .sender-name {
width: 1fr;
}
EnvelopeListItem .message-datetime {
width: auto;
padding: 0 1;
color: $text-muted;
}
EnvelopeListItem .email-subject {
width: 1fr;
padding: 0 3;
text-style: bold;
}
EnvelopeListItem .email-preview {
width: 1fr;
padding: 0 3;
color: $text-muted;
}
EnvelopeListItem.unread .sender-name {
text-style: bold;
}
EnvelopeListItem.unread .email-subject {
text-style: bold;
}
"""
def __init__(
self,
envelope: Dict[str, Any],
config: Optional[EnvelopeDisplayConfig] = None,
is_selected: bool = False,
**kwargs,
):
"""Initialize the envelope list item.
Args:
envelope: The envelope data dictionary from himalaya
config: Display configuration (uses global config if not provided)
is_selected: Whether this item is currently selected
"""
super().__init__(**kwargs)
self.envelope = envelope
self.config = config or get_config().envelope_display
self._is_selected = is_selected
# Parse envelope data
self._parse_envelope()
def _parse_envelope(self) -> None:
"""Parse envelope data into display-ready values."""
# Get sender info
from_data = self.envelope.get("from", {})
self.sender_name = from_data.get("name") or from_data.get("addr", "Unknown")
if not self.sender_name:
self.sender_name = from_data.get("addr", "Unknown")
# Truncate sender name if needed
max_len = self.config.max_sender_length
if len(self.sender_name) > max_len:
self.sender_name = self.sender_name[: max_len - 1] + "\u2026" # ellipsis
# Get subject
self.subject = str(self.envelope.get("subject", "")).strip() or "(No subject)"
# Parse date
self.formatted_datetime = self._format_datetime()
# Get read/unread status (himalaya uses "flags" field)
flags = self.envelope.get("flags", [])
self.is_read = "Seen" in flags if isinstance(flags, list) else False
self.is_flagged = "Flagged" in flags if isinstance(flags, list) else False
self.has_attachment = (
"Attachments" in flags if isinstance(flags, list) else False
)
# Message ID for selection tracking
self.message_id = int(self.envelope.get("id", 0))
def _format_datetime(self) -> str:
"""Format the message date/time according to config."""
date_str = self.envelope.get("date", "")
if not date_str:
return ""
try:
# Parse ISO format date
if "Z" in date_str:
date_str = date_str.replace("Z", "+00:00")
dt = datetime.fromisoformat(date_str)
parts = []
if self.config.show_date:
parts.append(dt.strftime(self.config.date_format))
if self.config.show_time:
parts.append(dt.strftime(self.config.time_format))
return " ".join(parts)
except (ValueError, TypeError):
return "Invalid Date"
def compose(self) -> ComposeResult:
"""Compose the widget layout."""
# Determine status icon
if self.is_read:
status_icon = self.config.icon_read
status_class = "status-icon read"
else:
status_icon = self.config.icon_unread
status_class = "status-icon unread"
# Add flagged/attachment indicators
extra_icons = ""
if self.is_flagged:
extra_icons += f" {self.config.icon_flagged}"
if self.has_attachment:
extra_icons += f" {self.config.icon_attachment}"
# Build the layout based on config.lines
with Vertical(classes="envelope-content"):
# Row 1: Status icon, checkbox, sender, datetime
with Horizontal(classes="envelope-row-1"):
yield Label(status_icon + extra_icons, classes=status_class)
if self.config.show_checkbox:
checkbox_char = "\uf4a7" if self._is_selected else "\ue640"
yield Label(checkbox_char, classes="checkbox")
yield Label(self.sender_name, classes="sender-name", markup=False)
yield Label(self.formatted_datetime, classes="message-datetime")
# Row 2: Subject
with Horizontal(classes="envelope-row-2"):
yield Label(self.subject, classes="email-subject", markup=False)
# Row 3: Preview (only in 3-line mode with preview enabled)
if self.config.lines == 3 and self.config.show_preview:
preview = self.envelope.get("preview", "")[:60]
if preview:
with Horizontal(classes="envelope-row-3"):
yield Label(preview, classes="email-preview", markup=False)
def on_mount(self) -> None:
"""Set up classes on mount."""
if not self.is_read:
self.add_class("unread")
if self._is_selected:
self.add_class("selected")
def set_selected(self, selected: bool) -> None:
"""Update the selection state."""
self._is_selected = selected
if selected:
self.add_class("selected")
else:
self.remove_class("selected")
# Update checkbox display
if self.config.show_checkbox:
try:
checkbox = self.query_one(".checkbox", Label)
checkbox.update("\uf4a7" if selected else "\ue640")
except Exception:
pass # Widget may not be mounted yet
class GroupHeader(Static):
"""A header widget for grouping envelopes by date."""
DEFAULT_CSS = """
GroupHeader {
height: 1;
width: 1fr;
background: $surface;
color: $text-muted;
text-style: bold;
padding: 0 1;
}
"""
def __init__(self, label: str, **kwargs):
"""Initialize the group header.
Args:
label: The header label (e.g., "Today", "December 2025")
"""
super().__init__(**kwargs)
self.label = label
def compose(self) -> ComposeResult:
"""Compose just returns itself as a Static."""
yield Label(self.label, classes="group-header-label")

View File

@@ -1 +1,6 @@
# Initialize the screens subpackage
# Initialize the widgets subpackage
from .ContentContainer import ContentContainer
from .EnvelopeHeader import EnvelopeHeader
from .EnvelopeListItem import EnvelopeListItem, GroupHeader
__all__ = ["ContentContainer", "EnvelopeHeader", "EnvelopeListItem", "GroupHeader"]

305
src/services/task_client.py Normal file
View File

@@ -0,0 +1,305 @@
"""Unified task client that supports multiple backends (taskwarrior, dstask)."""
import asyncio
import json
import logging
import shlex
from typing import Tuple, List, Dict, Any, Optional
from src.maildir_gtd.config import get_config
logger = logging.getLogger(__name__)
def get_backend_info() -> Tuple[str, str]:
"""Get the configured backend name and command path.
Returns:
Tuple of (backend_name, command_path)
"""
config = get_config()
backend = config.task.backend
if backend == "dstask":
return "dstask", config.task.dstask_path
else:
return "taskwarrior", config.task.taskwarrior_path
async def create_task(
task_description: str,
tags: List[str] = None,
project: str = None,
due: str = None,
priority: str = None,
) -> Tuple[bool, Optional[str]]:
"""
Create a new task using the configured backend.
Args:
task_description: Description of the task
tags: List of tags to apply to the task
project: Project to which the task belongs
due: Due date in the format the backend accepts
priority: Priority of the task (H, M, L)
Returns:
Tuple containing:
- Success status (True if operation was successful)
- Task ID/message or error message
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
return await _create_task_dstask(
cmd_path, task_description, tags, project, due, priority
)
else:
return await _create_task_taskwarrior(
cmd_path, task_description, tags, project, due, priority
)
except Exception as e:
logger.error(f"Exception during task creation: {e}")
return False, str(e)
async def _create_task_taskwarrior(
cmd_path: str,
task_description: str,
tags: List[str] = None,
project: str = None,
due: str = None,
priority: str = None,
) -> Tuple[bool, Optional[str]]:
"""Create task using taskwarrior."""
cmd = [cmd_path, "add"]
# Add project if specified
if project:
cmd.append(f"project:{project}")
# Add tags if specified
if tags:
for tag in tags:
if tag:
cmd.append(f"+{tag}")
# Add due date if specified
if due:
cmd.append(f"due:{due}")
# Add priority if specified
if priority and priority in ["H", "M", "L"]:
cmd.append(f"priority:{priority}")
# Add task description
cmd.append(task_description)
# Use shlex.join for proper escaping
cmd_str = shlex.join(cmd)
logger.debug(f"Taskwarrior command: {cmd_str}")
process = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return True, stdout.decode().strip()
else:
error_msg = stderr.decode().strip()
logger.error(f"Error creating task: {error_msg}")
return False, error_msg
async def _create_task_dstask(
cmd_path: str,
task_description: str,
tags: List[str] = None,
project: str = None,
due: str = None,
priority: str = None,
) -> Tuple[bool, Optional[str]]:
"""Create task using dstask.
dstask syntax: dstask add [+tag...] [project:X] [priority:X] [due:X] description
"""
cmd = [cmd_path, "add"]
# Add tags if specified (dstask uses +tag syntax like taskwarrior)
if tags:
for tag in tags:
if tag:
cmd.append(f"+{tag}")
# Add project if specified
if project:
cmd.append(f"project:{project}")
# Add priority if specified (dstask uses P1, P2, P3 but also accepts priority:H/M/L)
if priority and priority in ["H", "M", "L"]:
# Map to dstask priority format
priority_map = {"H": "P1", "M": "P2", "L": "P3"}
cmd.append(priority_map[priority])
# Add due date if specified
if due:
cmd.append(f"due:{due}")
# Add task description (must be last for dstask)
cmd.append(task_description)
# Use shlex.join for proper escaping
cmd_str = shlex.join(cmd)
logger.debug(f"dstask command: {cmd_str}")
process = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return True, stdout.decode().strip()
else:
error_msg = stderr.decode().strip()
logger.error(f"Error creating dstask task: {error_msg}")
return False, error_msg
async def list_tasks(filter_str: str = "") -> Tuple[List[Dict[str, Any]], bool]:
"""
List tasks from the configured backend.
Args:
filter_str: Optional filter string
Returns:
Tuple containing:
- List of task dictionaries
- Success status (True if operation was successful)
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
return await _list_tasks_dstask(cmd_path, filter_str)
else:
return await _list_tasks_taskwarrior(cmd_path, filter_str)
except Exception as e:
logger.error(f"Exception during task listing: {e}")
return [], False
async def _list_tasks_taskwarrior(
cmd_path: str, filter_str: str = ""
) -> Tuple[List[Dict[str, Any]], bool]:
"""List tasks using taskwarrior."""
cmd = f"{shlex.quote(cmd_path)} {filter_str} export"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
tasks = json.loads(stdout.decode())
return tasks, True
else:
logger.error(f"Error listing tasks: {stderr.decode()}")
return [], False
async def _list_tasks_dstask(
cmd_path: str, filter_str: str = ""
) -> Tuple[List[Dict[str, Any]], bool]:
"""List tasks using dstask."""
# dstask uses 'dstask export' for JSON output
cmd = f"{shlex.quote(cmd_path)} {filter_str} export"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
try:
tasks = json.loads(stdout.decode())
return tasks, True
except json.JSONDecodeError:
# dstask might output tasks differently
logger.warning("Could not parse dstask JSON output")
return [], False
else:
logger.error(f"Error listing dstask tasks: {stderr.decode()}")
return [], False
async def complete_task(task_id: str) -> bool:
"""
Mark a task as completed.
Args:
task_id: ID of the task to complete
Returns:
True if task was completed successfully, False otherwise
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
cmd = f"{shlex.quote(cmd_path)} done {task_id}"
else:
cmd = f"echo 'yes' | {shlex.quote(cmd_path)} {task_id} done"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logger.error(f"Exception during task completion: {e}")
return False
async def delete_task(task_id: str) -> bool:
"""
Delete a task.
Args:
task_id: ID of the task to delete
Returns:
True if task was deleted successfully, False otherwise
"""
backend, cmd_path = get_backend_info()
try:
if backend == "dstask":
cmd = f"{shlex.quote(cmd_path)} remove {task_id}"
else:
cmd = f"echo 'yes' | {shlex.quote(cmd_path)} {task_id} delete"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logger.error(f"Exception during task deletion: {e}")
return False

29
uv.lock generated
View File

@@ -864,6 +864,8 @@ dependencies = [
{ name = "openai" },
{ name = "orjson" },
{ name = "pillow" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "python-dateutil" },
{ name = "python-docx" },
{ name = "requests" },
@@ -871,6 +873,7 @@ dependencies = [
{ name = "textual" },
{ name = "textual-image" },
{ name = "ticktick-py" },
{ name = "toml" },
]
[package.dev-dependencies]
@@ -899,6 +902,8 @@ requires-dist = [
{ name = "openai", specifier = ">=1.78.1" },
{ name = "orjson", specifier = ">=3.10.18" },
{ name = "pillow", specifier = ">=11.2.1" },
{ name = "pydantic", specifier = ">=2.0.0" },
{ name = "pydantic-settings", specifier = ">=2.0.0" },
{ name = "python-dateutil", specifier = ">=2.9.0.post0" },
{ name = "python-docx", specifier = ">=1.1.2" },
{ name = "requests", specifier = ">=2.31.0" },
@@ -906,6 +911,7 @@ requires-dist = [
{ name = "textual", specifier = ">=3.2.0" },
{ name = "textual-image", specifier = ">=0.8.2" },
{ name = "ticktick-py", specifier = ">=2.0.0" },
{ name = "toml", specifier = ">=0.10.0" },
]
[package.metadata.requires-dev]
@@ -1686,6 +1692,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6f/9a/e73262f6c6656262b5fdd723ad90f518f579b7bc8622e43a942eec53c938/pydantic_core-2.33.2-cp313-cp313t-win_amd64.whl", hash = "sha256:c2fc0a768ef76c15ab9238afa6da7f69895bb5d1ee83aeea2e3509af4472d0b9", size = 1935777, upload-time = "2025-04-23T18:32:25.088Z" },
]
[[package]]
name = "pydantic-settings"
version = "2.12.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pydantic" },
{ name = "python-dotenv" },
{ name = "typing-inspection" },
]
sdist = { url = "https://files.pythonhosted.org/packages/43/4b/ac7e0aae12027748076d72a8764ff1c9d82ca75a7a52622e67ed3f765c54/pydantic_settings-2.12.0.tar.gz", hash = "sha256:005538ef951e3c2a68e1c08b292b5f2e71490def8589d4221b95dab00dafcfd0", size = 194184, upload-time = "2025-11-10T14:25:47.013Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/60/5d4751ba3f4a40a6891f24eec885f51afd78d208498268c734e256fb13c4/pydantic_settings-2.12.0-py3-none-any.whl", hash = "sha256:fddb9fd99a5b18da837b29710391e945b1e30c135477f484084ee513adb93809", size = 51880, upload-time = "2025-11-10T14:25:45.546Z" },
]
[[package]]
name = "pydub"
version = "0.25.1"
@@ -2126,6 +2146,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/de/cb/6291e38d14a52c73a4bf62a5cde88855741c1294f4a68cf38b46861d8480/ticktick_py-2.0.1-py3-none-any.whl", hash = "sha256:676c603322010ba9e508eda71698e917a3e2ba472bcfd26be2e5db198455fda5", size = 45675, upload-time = "2021-06-24T20:24:07.355Z" },
]
[[package]]
name = "toml"
version = "0.10.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/be/ba/1f744cdc819428fc6b5084ec34d9b30660f6f9daaf70eead706e3203ec3c/toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f", size = 22253, upload-time = "2020-11-01T01:40:22.204Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/44/6f/7120676b6d73228c96e17f1f794d8ab046fc910d781c8d151120c3f1569e/toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", size = 16588, upload-time = "2020-11-01T01:40:20.672Z" },
]
[[package]]
name = "tqdm"
version = "4.67.1"