This commit is contained in:
Bendt
2025-12-18 22:11:47 -05:00
parent 0ed7800575
commit a41d59e529
26 changed files with 4187 additions and 373 deletions

6
src/calendar/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Calendar TUI package."""
from .backend import CalendarBackend, Event
from .app import CalendarApp, run_app
__all__ = ["CalendarBackend", "Event", "CalendarApp", "run_app"]

401
src/calendar/app.py Normal file
View File

@@ -0,0 +1,401 @@
"""Calendar TUI application.
A Textual-based TUI for viewing calendar events via khal.
"""
import logging
import sys
import os
from datetime import date, datetime, timedelta
from typing import Optional
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical
from textual.logging import TextualHandler
from textual.widgets import Footer, Header, Static
from textual.reactive import reactive
from src.calendar.backend import CalendarBackend, Event
from src.calendar.widgets.WeekGrid import WeekGrid
from src.calendar.widgets.AddEventForm import EventFormData
from src.utils.shared_config import get_theme_name
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
logger = logging.getLogger(__name__)
class CalendarStatusBar(Static):
"""Status bar showing current week and selected event."""
week_label: str = ""
event_info: str = ""
def render(self) -> str:
if self.event_info:
return f"{self.week_label} | {self.event_info}"
return self.week_label
class CalendarApp(App):
"""A TUI for viewing calendar events via khal."""
CSS = """
Screen {
layout: vertical;
}
#week-grid {
height: 1fr;
}
#week-grid > WeekGridHeader {
height: 1;
dock: top;
background: $surface;
}
#week-grid > WeekGridBody {
height: 1fr;
}
#status-bar {
dock: bottom;
height: 1;
background: $surface;
color: $text-muted;
padding: 0 1;
}
#event-detail {
dock: bottom;
height: auto;
max-height: 12;
border-top: solid $primary;
padding: 1;
background: $surface;
}
#event-detail.hidden {
display: none;
}
"""
BINDINGS = [
Binding("q", "quit", "Quit", show=True),
Binding("j", "cursor_down", "Down", show=False),
Binding("k", "cursor_up", "Up", show=False),
Binding("h", "cursor_left", "Left", show=False),
Binding("l", "cursor_right", "Right", show=False),
Binding("H", "prev_week", "Prev Week", show=True),
Binding("L", "next_week", "Next Week", show=True),
Binding("g", "goto_today", "Today", show=True),
Binding("w", "toggle_weekends", "Weekends", show=True),
Binding("r", "refresh", "Refresh", show=True),
Binding("enter", "view_event", "View", show=True),
Binding("a", "add_event", "Add", show=True),
Binding("?", "help", "Help", show=True),
]
# Reactive attributes
include_weekends: reactive[bool] = reactive(True)
# Instance attributes
backend: Optional[CalendarBackend]
def __init__(self, backend: Optional[CalendarBackend] = None):
super().__init__()
if backend:
self.backend = backend
else:
# Create backend from config (default: khal)
from src.services.khal import KhalClient
self.backend = KhalClient()
def compose(self) -> ComposeResult:
"""Create the app layout."""
yield Header()
yield WeekGrid(id="week-grid")
yield Static(id="event-detail", classes="hidden")
yield CalendarStatusBar(id="status-bar")
yield Footer()
def on_mount(self) -> None:
"""Initialize the app on mount."""
self.theme = get_theme_name()
# Load events for current week
self.load_events()
# Update status bar and title
self._update_status()
self._update_title()
def load_events(self) -> None:
"""Load events from backend for the current week."""
if not self.backend:
return
grid = self.query_one("#week-grid", WeekGrid)
week_start = grid.week_start
# Get events using backend's helper method
events_by_date = self.backend.get_week_events(
week_start, include_weekends=self.include_weekends
)
# Set events on grid
grid.set_events(events_by_date)
# Update status bar with week label
self._update_status()
def _update_status(self) -> None:
"""Update the status bar."""
grid = self.query_one("#week-grid", WeekGrid)
status = self.query_one("#status-bar", CalendarStatusBar)
# Week label
week_start = grid.week_start
week_end = week_start + timedelta(days=6)
status.week_label = (
f"Week of {week_start.strftime('%b %d')} - {week_end.strftime('%b %d, %Y')}"
)
# Event info
event = grid.get_event_at_cursor()
if event:
time_str = event.start.strftime("%H:%M") + "-" + event.end.strftime("%H:%M")
status.event_info = f"{time_str} {event.title}"
else:
status.event_info = ""
status.refresh()
# Also update title when status changes
self._update_title()
def _update_title(self) -> None:
"""Update the app title with full date range and week number."""
grid = self.query_one("#week-grid", WeekGrid)
week_start = grid.week_start
week_end = week_start + timedelta(days=6)
week_num = week_start.isocalendar()[1]
# Format: "2025 December 14 - 20 (Week 48)"
if week_start.month == week_end.month:
# Same month
self.title = (
f"{week_start.year} {week_start.strftime('%B')} "
f"{week_start.day} - {week_end.day} (Week {week_num})"
)
else:
# Different months
self.title = (
f"{week_start.strftime('%B %d')} - "
f"{week_end.strftime('%B %d, %Y')} (Week {week_num})"
)
def _update_event_detail(self, event: Optional[Event]) -> None:
"""Update the event detail pane."""
detail = self.query_one("#event-detail", Static)
if event:
detail.remove_class("hidden")
# Format event details
date_str = event.start.strftime("%A, %B %d")
time_str = (
event.start.strftime("%H:%M") + " - " + event.end.strftime("%H:%M")
)
duration = event.duration_minutes
hours, mins = divmod(duration, 60)
dur_str = f"{hours}h {mins}m" if hours else f"{mins}m"
lines = [
f"[bold]{event.title}[/bold]",
f"{date_str}",
f"{time_str} ({dur_str})",
]
if event.location:
lines.append(f"[dim]Location:[/dim] {event.location}")
if event.organizer:
lines.append(f"[dim]Organizer:[/dim] {event.organizer}")
if event.categories:
lines.append(f"[dim]Categories:[/dim] {event.categories}")
if event.url:
lines.append(f"[dim]URL:[/dim] {event.url}")
if event.status:
lines.append(f"[dim]Status:[/dim] {event.status}")
if event.recurring:
lines.append("[dim]Recurring:[/dim] Yes")
if event.description:
# Truncate long descriptions
desc = (
event.description[:200] + "..."
if len(event.description) > 200
else event.description
)
lines.append(f"[dim]Description:[/dim] {desc}")
detail.update("\n".join(lines))
else:
detail.add_class("hidden")
# Handle WeekGrid messages
def on_week_grid_week_changed(self, message: WeekGrid.WeekChanged) -> None:
"""Handle week change - reload events."""
self.load_events()
def on_week_grid_event_selected(self, message: WeekGrid.EventSelected) -> None:
"""Handle event selection."""
self._update_event_detail(message.event)
# Navigation actions (forwarded to grid)
def action_cursor_down(self) -> None:
"""Move cursor down."""
grid = self.query_one("#week-grid", WeekGrid)
grid.action_cursor_down()
self._update_status()
def action_cursor_up(self) -> None:
"""Move cursor up."""
grid = self.query_one("#week-grid", WeekGrid)
grid.action_cursor_up()
self._update_status()
def action_cursor_left(self) -> None:
"""Move cursor left."""
grid = self.query_one("#week-grid", WeekGrid)
grid.action_cursor_left()
self._update_status()
def action_cursor_right(self) -> None:
"""Move cursor right."""
grid = self.query_one("#week-grid", WeekGrid)
grid.action_cursor_right()
self._update_status()
def action_prev_week(self) -> None:
"""Navigate to previous week."""
grid = self.query_one("#week-grid", WeekGrid)
grid.action_prev_week()
def action_next_week(self) -> None:
"""Navigate to next week."""
grid = self.query_one("#week-grid", WeekGrid)
grid.action_next_week()
def action_goto_today(self) -> None:
"""Navigate to today."""
grid = self.query_one("#week-grid", WeekGrid)
grid.action_goto_today()
self.load_events()
def action_toggle_weekends(self) -> None:
"""Toggle weekend display."""
self.include_weekends = not self.include_weekends
grid = self.query_one("#week-grid", WeekGrid)
grid.include_weekends = self.include_weekends
self.load_events()
mode = "7 days" if self.include_weekends else "5 days (weekdays)"
self.notify(f"Showing {mode}")
def action_refresh(self) -> None:
"""Refresh events from backend."""
self.load_events()
self.notify("Refreshed")
def action_view_event(self) -> None:
"""View the selected event details."""
grid = self.query_one("#week-grid", WeekGrid)
event = grid.get_event_at_cursor()
if event:
self._update_event_detail(event)
else:
self.notify("No event at cursor")
def action_add_event(self) -> None:
"""Open the add event modal."""
from src.calendar.screens.AddEventScreen import AddEventScreen
# Get calendars from backend
calendars: list[str] = []
if self.backend:
try:
calendars = self.backend.get_calendars()
except Exception:
pass
# Get current cursor date/time for initial values
grid = self.query_one("#week-grid", WeekGrid)
cursor_date = grid.get_cursor_date()
cursor_time = grid.get_cursor_time()
def handle_result(data: EventFormData | None) -> None:
if data is None:
return
if not self.backend:
self.notify("No calendar backend available", severity="error")
return
try:
self.backend.create_event(
title=data.title,
start=data.start_datetime,
end=data.end_datetime,
calendar=data.calendar,
location=data.location,
description=data.description,
all_day=data.all_day,
)
self.notify(f"Created event: {data.title}")
self.load_events() # Refresh to show new event
except Exception as e:
self.notify(f"Failed to create event: {e}", severity="error")
self.push_screen(
AddEventScreen(
calendars=calendars,
initial_date=cursor_date,
initial_time=cursor_time,
),
handle_result,
)
def action_help(self) -> None:
"""Show help."""
help_text = """
Keybindings:
j/k - Move cursor up/down (time)
h/l - Move cursor left/right (day)
H/L - Previous/Next week
g - Go to today
w - Toggle weekends (5/7 days)
Enter - View event details
a - Add new event
r - Refresh
q - Quit
"""
self.notify(help_text.strip(), timeout=10)
def run_app(backend: Optional[CalendarBackend] = None) -> None:
"""Run the Calendar TUI application."""
app = CalendarApp(backend=backend)
app.run()
if __name__ == "__main__":
run_app()

218
src/calendar/backend.py Normal file
View File

@@ -0,0 +1,218 @@
"""Calendar backend abstraction for Calendar TUI.
This module defines the abstract interface that all calendar backends must implement,
allowing the TUI to work with different calendar systems (khal, calcurse, etc.)
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, date, time, timedelta
from typing import Optional, List, Tuple
@dataclass
class Event:
"""Unified calendar event representation across backends."""
uid: str
title: str
start: datetime
end: datetime
location: str = ""
description: str = ""
calendar: str = ""
all_day: bool = False
recurring: bool = False
organizer: str = ""
url: str = ""
categories: str = ""
status: str = "" # CONFIRMED, TENTATIVE, CANCELLED
@property
def duration_minutes(self) -> int:
"""Get duration in minutes."""
delta = self.end - self.start
return int(delta.total_seconds() / 60)
@property
def start_time(self) -> time:
"""Get start time."""
return self.start.time()
@property
def end_time(self) -> time:
"""Get end time."""
return self.end.time()
@property
def date(self) -> date:
"""Get the date of the event."""
return self.start.date()
def overlaps(self, other: "Event") -> bool:
"""Check if this event overlaps with another."""
return self.start < other.end and self.end > other.start
def get_row_span(self, minutes_per_row: int = 30) -> Tuple[int, int]:
"""Get the row range for this event in a grid.
Args:
minutes_per_row: Minutes each row represents (default 30)
Returns:
Tuple of (start_row, end_row) where rows are 0-indexed from midnight
"""
start_minutes = self.start.hour * 60 + self.start.minute
end_minutes = self.end.hour * 60 + self.end.minute
# Handle events ending at midnight (next day)
if end_minutes == 0 and self.end.date() > self.start.date():
end_minutes = 24 * 60
start_row = start_minutes // minutes_per_row
end_row = (end_minutes + minutes_per_row - 1) // minutes_per_row # Round up
return start_row, end_row
class CalendarBackend(ABC):
"""Abstract base class for calendar backends."""
@abstractmethod
def get_events(
self,
start_date: date,
end_date: date,
calendar: Optional[str] = None,
) -> List[Event]:
"""Get events in a date range.
Args:
start_date: Start of range (inclusive)
end_date: End of range (inclusive)
calendar: Optional calendar name to filter by
Returns:
List of events in the range, sorted by start time
"""
pass
@abstractmethod
def get_event(self, uid: str) -> Optional[Event]:
"""Get a single event by UID.
Args:
uid: Event unique identifier
Returns:
Event if found, None otherwise
"""
pass
@abstractmethod
def get_calendars(self) -> List[str]:
"""Get list of available calendar names.
Returns:
List of calendar names
"""
pass
@abstractmethod
def create_event(
self,
title: str,
start: datetime,
end: datetime,
calendar: Optional[str] = None,
location: Optional[str] = None,
description: Optional[str] = None,
all_day: bool = False,
) -> Event:
"""Create a new event.
Args:
title: Event title
start: Start datetime
end: End datetime
calendar: Calendar to add event to
location: Event location
description: Event description
all_day: Whether this is an all-day event
Returns:
The created event
"""
pass
@abstractmethod
def delete_event(self, uid: str) -> bool:
"""Delete an event.
Args:
uid: Event unique identifier
Returns:
True if deleted successfully
"""
pass
@abstractmethod
def update_event(
self,
uid: str,
title: Optional[str] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
location: Optional[str] = None,
description: Optional[str] = None,
) -> Optional[Event]:
"""Update an existing event.
Args:
uid: Event unique identifier
title: New title (if provided)
start: New start time (if provided)
end: New end time (if provided)
location: New location (if provided)
description: New description (if provided)
Returns:
Updated event if successful, None otherwise
"""
pass
def get_week_events(
self,
week_start: date,
include_weekends: bool = True,
) -> dict[date, List[Event]]:
"""Get events for a week, grouped by date.
Args:
week_start: First day of the week
include_weekends: Whether to include Saturday/Sunday
Returns:
Dict mapping dates to lists of events
"""
days = 7 if include_weekends else 5
end_date = week_start + timedelta(days=days - 1)
events = self.get_events(week_start, end_date)
# Group by date
by_date: dict[date, List[Event]] = {}
for i in range(days):
d = week_start + timedelta(days=i)
by_date[d] = []
for event in events:
event_date = event.date
if event_date in by_date:
by_date[event_date].append(event)
# Sort each day's events by start time
for d in by_date:
by_date[d].sort(key=lambda e: e.start)
return by_date

110
src/calendar/config.py Normal file
View File

@@ -0,0 +1,110 @@
"""Calendar TUI configuration."""
import os
from pathlib import Path
from typing import Optional
try:
import toml
except ImportError:
toml = None # type: ignore
# Default configuration values
DEFAULT_CONFIG = {
"display": {
"work_day_start_hour": 7, # 7 AM
"work_day_end_hour": 19, # 7 PM
"include_weekends": True,
"minutes_per_row": 30,
"day_column_width": 20,
"week_start_day": 0, # 0=Sunday, 1=Monday, ..., 6=Saturday
},
"backend": {
"type": "khal", # khal, calcurse, etc.
"calendar_path": "~/Calendar/corteva",
},
"theme": {
"event_color": "blue",
"overlap_color": "dark_orange",
"cursor_style": "reverse",
"work_hours_time_color": "blue",
"off_hours_time_color": "bright_black",
},
}
def get_config_path() -> Path:
"""Get the calendar config file path."""
# Check XDG_CONFIG_HOME first, then fall back to ~/.config
config_home = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
return Path(config_home) / "luk" / "calendar.toml"
def load_config() -> dict:
"""Load calendar configuration from TOML file.
Returns merged config with defaults for any missing values.
"""
config = DEFAULT_CONFIG.copy()
config_path = get_config_path()
if config_path.exists() and toml is not None:
try:
user_config = toml.load(config_path)
# Deep merge user config into defaults
for section, values in user_config.items():
if section in config and isinstance(config[section], dict):
config[section].update(values)
else:
config[section] = values
except Exception:
pass # Use defaults on error
return config
def get_display_config() -> dict:
"""Get display-related configuration."""
return load_config().get("display", DEFAULT_CONFIG["display"])
def get_backend_config() -> dict:
"""Get backend-related configuration."""
return load_config().get("backend", DEFAULT_CONFIG["backend"])
def get_theme_config() -> dict:
"""Get theme-related configuration."""
return load_config().get("theme", DEFAULT_CONFIG["theme"])
# Convenience accessors
def work_day_start_hour() -> int:
"""Get the work day start hour (for initial scroll position)."""
return get_display_config().get("work_day_start_hour", 7)
def work_day_end_hour() -> int:
"""Get the work day end hour."""
return get_display_config().get("work_day_end_hour", 19)
def include_weekends_default() -> bool:
"""Get default for including weekends."""
return get_display_config().get("include_weekends", True)
def minutes_per_row() -> int:
"""Get minutes per row (default 30)."""
return get_display_config().get("minutes_per_row", 30)
def day_column_width() -> int:
"""Get day column width."""
return get_display_config().get("day_column_width", 20)
def week_start_day() -> int:
"""Get the week start day (0=Sunday, 1=Monday, ..., 6=Saturday)."""
return get_display_config().get("week_start_day", 0)

View File

@@ -0,0 +1,155 @@
"""Add Event modal screen for Calendar TUI."""
from datetime import date, time
from typing import Optional
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Input, Label
from src.calendar.widgets.AddEventForm import AddEventForm, EventFormData
class AddEventScreen(ModalScreen[Optional[EventFormData]]):
"""Modal screen for adding a new calendar event."""
BINDINGS = [
Binding("escape", "cancel", "Cancel"),
Binding("ctrl+s", "submit", "Save"),
]
DEFAULT_CSS = """
AddEventScreen {
align: center middle;
}
AddEventScreen #add-event-container {
width: 80%;
height: auto;
max-height: 85%;
background: $surface;
border: thick $primary;
padding: 1;
}
AddEventScreen #add-event-title {
text-style: bold;
width: 100%;
height: 1;
text-align: center;
margin-bottom: 1;
}
AddEventScreen #add-event-content {
width: 100%;
height: auto;
}
AddEventScreen #add-event-form {
width: 1fr;
}
AddEventScreen #add-event-sidebar {
width: 16;
height: auto;
padding: 1;
align: center top;
}
AddEventScreen #add-event-sidebar Button {
width: 100%;
margin-bottom: 1;
}
AddEventScreen #help-text {
width: 100%;
height: 1;
color: $text-muted;
text-align: center;
margin-top: 1;
}
"""
def __init__(
self,
calendars: list[str] | None = None,
initial_date: date | None = None,
initial_time: time | None = None,
initial_data: EventFormData | None = None,
**kwargs,
):
"""Initialize the add event screen.
Args:
calendars: List of available calendar names for the dropdown
initial_date: Pre-populate with this date
initial_time: Pre-populate with this time
initial_data: Pre-populate form with this data (overrides date/time)
"""
super().__init__(**kwargs)
self._calendars = calendars or []
self._initial_date = initial_date
self._initial_time = initial_time
self._initial_data = initial_data
def compose(self) -> ComposeResult:
with Vertical(id="add-event-container"):
yield Label("Add New Event", id="add-event-title")
with Horizontal(id="add-event-content"):
yield AddEventForm(
calendars=self._calendars,
initial_date=self._initial_date,
initial_time=self._initial_time,
initial_data=self._initial_data,
id="add-event-form",
)
with Vertical(id="add-event-sidebar"):
yield Button("Create", id="create", variant="primary")
yield Button("Cancel", id="cancel", variant="default")
yield Label("Ctrl+S to save, Escape to cancel", id="help-text")
def on_mount(self) -> None:
"""Focus the title input."""
try:
form = self.query_one("#add-event-form", AddEventForm)
title_input = form.query_one("#title-input")
title_input.focus()
except Exception:
pass
@on(Button.Pressed, "#create")
def handle_create(self) -> None:
"""Handle create button press."""
self.action_submit()
@on(Button.Pressed, "#cancel")
def handle_cancel(self) -> None:
"""Handle cancel button press."""
self.action_cancel()
@on(Input.Submitted, "#title-input")
def handle_title_submit(self) -> None:
"""Handle Enter key in title input."""
self.action_submit()
def action_submit(self) -> None:
"""Validate and submit the form."""
form = self.query_one("#add-event-form", AddEventForm)
is_valid, error = form.validate()
if not is_valid:
self.notify(error, severity="error")
return
data = form.get_form_data()
self.dismiss(data)
def action_cancel(self) -> None:
"""Cancel and dismiss."""
self.dismiss(None)

View File

@@ -0,0 +1,5 @@
"""Calendar TUI screens."""
from .AddEventScreen import AddEventScreen
__all__ = ["AddEventScreen"]

View File

@@ -0,0 +1,377 @@
"""Reusable Add Event form widget for Calendar TUI.
This widget can be used standalone in modals or embedded in other screens.
"""
from dataclasses import dataclass
from datetime import datetime, date, time, timedelta
from typing import Optional
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical, ScrollableContainer
from textual.message import Message
from textual.widget import Widget
from textual.widgets import Input, Label, Select, TextArea, Checkbox, MaskedInput
@dataclass
class EventFormData:
"""Data from the add event form."""
title: str
start_date: date
start_time: time
end_date: date
end_time: time
location: Optional[str] = None
description: Optional[str] = None
calendar: Optional[str] = None
all_day: bool = False
@property
def start_datetime(self) -> datetime:
"""Get start as datetime."""
return datetime.combine(self.start_date, self.start_time)
@property
def end_datetime(self) -> datetime:
"""Get end as datetime."""
return datetime.combine(self.end_date, self.end_time)
class AddEventForm(Widget):
"""A reusable form widget for creating/editing calendar events.
This widget emits EventFormData when submitted and can be embedded
in various contexts (modal screens, sidebars, etc.)
"""
DEFAULT_CSS = """
AddEventForm {
width: 100%;
height: auto;
padding: 1;
}
AddEventForm ScrollableContainer {
height: auto;
max-height: 100%;
}
AddEventForm .form-row {
width: 100%;
height: auto;
margin-bottom: 1;
}
AddEventForm .form-label {
width: 12;
height: 1;
padding-right: 1;
}
AddEventForm .form-input {
width: 1fr;
}
AddEventForm #title-input {
width: 1fr;
}
AddEventForm .date-input {
width: 14;
}
AddEventForm .time-input {
width: 10;
}
AddEventForm #calendar-select {
width: 1fr;
}
AddEventForm #location-input {
width: 1fr;
}
AddEventForm #description-textarea {
width: 1fr;
height: 6;
}
AddEventForm .required {
color: $error;
}
AddEventForm .datetime-row {
width: 100%;
height: auto;
}
AddEventForm .datetime-group {
width: auto;
height: auto;
margin-right: 2;
}
AddEventForm .datetime-label {
width: auto;
padding-right: 1;
color: $text-muted;
}
"""
class Submitted(Message):
"""Message emitted when the form is submitted."""
def __init__(self, data: EventFormData) -> None:
super().__init__()
self.data = data
class Cancelled(Message):
"""Message emitted when the form is cancelled."""
pass
def __init__(
self,
calendars: list[str] | None = None,
initial_date: date | None = None,
initial_time: time | None = None,
initial_data: EventFormData | None = None,
**kwargs,
):
"""Initialize the add event form.
Args:
calendars: List of available calendar names for the dropdown
initial_date: Pre-populate with this date
initial_time: Pre-populate with this time
initial_data: Pre-populate form with this data (overrides date/time)
"""
super().__init__(**kwargs)
self._calendars = calendars or []
self._initial_date = initial_date or date.today()
self._initial_time = initial_time or time(9, 0)
self._initial_data = initial_data
def compose(self) -> ComposeResult:
"""Compose the form layout."""
if self._initial_data:
initial = self._initial_data
start_date = initial.start_date
start_time = initial.start_time
end_date = initial.end_date
end_time = initial.end_time
title = initial.title
location = initial.location or ""
description = initial.description or ""
calendar = initial.calendar or ""
all_day = initial.all_day
else:
start_date = self._initial_date
start_time = self._initial_time
# Default to 1 hour duration
end_date = start_date
end_time = time(start_time.hour + 1, start_time.minute)
if start_time.hour >= 23:
end_time = time(23, 59)
title = ""
location = ""
description = ""
calendar = ""
all_day = False
with ScrollableContainer():
# Title (required)
with Horizontal(classes="form-row"):
yield Label("Title", classes="form-label")
yield Label("*", classes="required")
yield Input(
value=title,
placeholder="Event title...",
id="title-input",
classes="form-input",
)
# Start Date/Time
with Vertical(classes="form-row"):
yield Label("Start", classes="form-label")
with Horizontal(classes="datetime-row"):
with Horizontal(classes="datetime-group"):
yield Label("Date:", classes="datetime-label")
yield MaskedInput(
template="9999-99-99",
value=start_date.strftime("%Y-%m-%d"),
id="start-date-input",
classes="date-input",
)
with Horizontal(classes="datetime-group"):
yield Label("Time:", classes="datetime-label")
yield MaskedInput(
template="99:99",
value=start_time.strftime("%H:%M"),
id="start-time-input",
classes="time-input",
)
# End Date/Time
with Vertical(classes="form-row"):
yield Label("End", classes="form-label")
with Horizontal(classes="datetime-row"):
with Horizontal(classes="datetime-group"):
yield Label("Date:", classes="datetime-label")
yield MaskedInput(
template="9999-99-99",
value=end_date.strftime("%Y-%m-%d"),
id="end-date-input",
classes="date-input",
)
with Horizontal(classes="datetime-group"):
yield Label("Time:", classes="datetime-label")
yield MaskedInput(
template="99:99",
value=end_time.strftime("%H:%M"),
id="end-time-input",
classes="time-input",
)
# All day checkbox
with Horizontal(classes="form-row"):
yield Label("", classes="form-label")
yield Checkbox("All day event", value=all_day, id="all-day-checkbox")
# Calendar selection (optional dropdown)
if self._calendars:
with Horizontal(classes="form-row"):
yield Label("Calendar", classes="form-label")
options = [("(default)", "")] + [(c, c) for c in self._calendars]
yield Select(
options=options,
value=calendar,
id="calendar-select",
allow_blank=True,
)
# Location (optional)
with Horizontal(classes="form-row"):
yield Label("Location", classes="form-label")
yield Input(
value=location,
placeholder="Event location...",
id="location-input",
classes="form-input",
)
# Description (optional textarea)
with Vertical(classes="form-row"):
yield Label("Description", classes="form-label")
yield TextArea(
description,
id="description-textarea",
)
def get_form_data(self) -> EventFormData:
"""Extract current form data.
Returns:
EventFormData with current form values
"""
title = self.query_one("#title-input", Input).value.strip()
# Parse start date/time from MaskedInput
start_date_input = self.query_one("#start-date-input", MaskedInput)
start_time_input = self.query_one("#start-time-input", MaskedInput)
start_date_str = start_date_input.value.strip()
start_time_str = start_time_input.value.strip()
try:
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
except ValueError:
start_date = date.today()
try:
start_time = datetime.strptime(start_time_str, "%H:%M").time()
except ValueError:
start_time = time(9, 0)
# Parse end date/time from MaskedInput
end_date_input = self.query_one("#end-date-input", MaskedInput)
end_time_input = self.query_one("#end-time-input", MaskedInput)
end_date_str = end_date_input.value.strip()
end_time_str = end_time_input.value.strip()
try:
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
except ValueError:
end_date = start_date
try:
end_time = datetime.strptime(end_time_str, "%H:%M").time()
except ValueError:
end_time = time(start_time.hour + 1, start_time.minute)
# All day
all_day = self.query_one("#all-day-checkbox", Checkbox).value
# Calendar
calendar: str | None = None
try:
calendar_select = self.query_one("#calendar-select", Select)
cal_value = calendar_select.value
if isinstance(cal_value, str) and cal_value:
calendar = cal_value
except Exception:
pass # No calendar select
# Location
location = self.query_one("#location-input", Input).value.strip() or None
# Description
try:
desc_area = self.query_one("#description-textarea", TextArea)
description = desc_area.text.strip() or None
except Exception:
description = None
return EventFormData(
title=title,
start_date=start_date,
start_time=start_time,
end_date=end_date,
end_time=end_time,
location=location,
description=description,
calendar=calendar,
all_day=all_day,
)
def validate(self) -> tuple[bool, str]:
"""Validate the form data.
Returns:
Tuple of (is_valid, error_message)
"""
data = self.get_form_data()
if not data.title:
return False, "Title is required"
# Validate that end is after start
if data.end_datetime <= data.start_datetime:
return False, "End time must be after start time"
return True, ""
def submit(self) -> bool:
"""Validate and submit the form.
Returns:
True if form was valid and submitted, False otherwise
"""
is_valid, error = self.validate()
if not is_valid:
return False
self.post_message(self.Submitted(self.get_form_data()))
return True
def cancel(self) -> None:
"""Cancel the form."""
self.post_message(self.Cancelled())

View File

@@ -0,0 +1,242 @@
"""Mini month calendar widget for Calendar TUI sidebar.
Displays a compact month view with day numbers, highlighting:
- Today
- Current week
- Selected day
"""
from datetime import date, timedelta
from typing import Optional
from rich.segment import Segment
from rich.style import Style
from textual.message import Message
from textual.reactive import reactive
from textual.strip import Strip
from textual.widget import Widget
def get_month_calendar(year: int, month: int) -> list[list[Optional[date]]]:
"""Generate a calendar grid for a month.
Returns a list of weeks, where each week is a list of 7 dates (or None for empty cells).
Week starts on Monday.
"""
import calendar
# Get first day of month and number of days
first_day = date(year, month, 1)
if month == 12:
last_day = date(year + 1, 1, 1) - timedelta(days=1)
else:
last_day = date(year, month + 1, 1) - timedelta(days=1)
# Monday = 0, Sunday = 6
first_weekday = first_day.weekday()
weeks: list[list[Optional[date]]] = []
current_week: list[Optional[date]] = [None] * first_weekday
current = first_day
while current <= last_day:
current_week.append(current)
if len(current_week) == 7:
weeks.append(current_week)
current_week = []
current += timedelta(days=1)
# Fill remaining days in last week
if current_week:
while len(current_week) < 7:
current_week.append(None)
weeks.append(current_week)
return weeks
class MonthCalendar(Widget):
"""A compact month calendar widget for sidebars."""
DEFAULT_CSS = """
MonthCalendar {
width: 24;
height: auto;
padding: 0 1;
}
"""
# Reactive attributes
display_month: reactive[date] = reactive(lambda: date.today().replace(day=1))
selected_date: reactive[date] = reactive(date.today)
week_start: reactive[date] = reactive(lambda: date.today())
class DateSelected(Message):
"""A date was clicked/selected."""
def __init__(self, selected: date) -> None:
super().__init__()
self.date = selected
class MonthChanged(Message):
"""Month navigation occurred."""
def __init__(self, month: date) -> None:
super().__init__()
self.month = month
def __init__(
self,
selected_date: Optional[date] = None,
week_start: Optional[date] = None,
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
if selected_date:
self.selected_date = selected_date
self.display_month = selected_date.replace(day=1)
if week_start:
self.week_start = week_start
def _get_theme_color(self, color_name: str) -> str:
"""Get a color from the current theme."""
try:
theme = self.app.current_theme
color = getattr(theme, color_name, None)
if color:
return str(color)
except Exception:
pass
# Fallback colors
fallbacks = {
"secondary": "#81A1C1",
"primary": "#88C0D0",
"accent": "#B48EAD",
"foreground": "#D8DEE9",
"surface": "#3B4252",
}
return fallbacks.get(color_name, "white")
@property
def _weeks(self) -> list[list[Optional[date]]]:
"""Get the weeks for the current display month."""
return get_month_calendar(self.display_month.year, self.display_month.month)
def get_content_height(self, container, viewport, width: int) -> int:
"""Calculate height: header + day names + weeks."""
return 2 + len(self._weeks) # Month header + day names + week rows
def render_line(self, y: int) -> Strip:
"""Render a line of the calendar."""
if y == 0:
return self._render_month_header()
elif y == 1:
return self._render_day_names()
else:
week_idx = y - 2
weeks = self._weeks
if 0 <= week_idx < len(weeks):
return self._render_week(weeks[week_idx])
return Strip.blank(self.size.width)
def _render_month_header(self) -> Strip:
"""Render the month/year header with navigation arrows."""
month_name = self.display_month.strftime("%B %Y")
header = f"< {month_name:^16} >"
header = header[: self.size.width].ljust(self.size.width)
primary_color = self._get_theme_color("primary")
style = Style(bold=True, color=primary_color)
return Strip([Segment(header, style)])
def _render_day_names(self) -> Strip:
"""Render the day name headers (Mo Tu We ...)."""
day_names = "Mo Tu We Th Fr Sa Su"
# Pad to widget width
line = day_names[: self.size.width].ljust(self.size.width)
style = Style(color="bright_black")
return Strip([Segment(line, style)])
def _render_week(self, week: list[Optional[date]]) -> Strip:
"""Render a week row."""
segments = []
today = date.today()
# Calculate the week containing week_start
week_end = self.week_start + timedelta(days=6)
secondary_color = self._get_theme_color("secondary")
primary_color = self._get_theme_color("primary")
for i, day in enumerate(week):
if day is None:
segments.append(Segment(" "))
else:
day_str = f"{day.day:2d} "
# Determine styling
if day == self.selected_date:
# Selected date - reverse video
style = Style(bold=True, reverse=True)
elif day == today:
# Today - highlighted with secondary color
style = Style(bold=True, color=secondary_color)
elif self.week_start <= day <= week_end:
# In current week view - subtle highlight
style = Style(color=primary_color)
elif day.weekday() >= 5:
# Weekend
style = Style(color="bright_black")
else:
# Normal day
style = Style()
segments.append(Segment(day_str, style))
# Pad remaining width
current_width = sum(len(s.text) for s in segments)
if current_width < self.size.width:
segments.append(Segment(" " * (self.size.width - current_width)))
return Strip(segments)
def update_week(self, week_start: date) -> None:
"""Update the current week highlight.
Also updates display_month if the week is in a different month.
"""
self.week_start = week_start
# Optionally auto-update display month to show the week
week_month = week_start.replace(day=1)
if week_month != self.display_month:
self.display_month = week_month
self.refresh()
def update_selected(self, selected: date) -> None:
"""Update the selected date."""
self.selected_date = selected
self.refresh()
def next_month(self) -> None:
"""Navigate to next month."""
year = self.display_month.year
month = self.display_month.month + 1
if month > 12:
month = 1
year += 1
self.display_month = date(year, month, 1)
self.post_message(self.MonthChanged(self.display_month))
self.refresh()
def prev_month(self) -> None:
"""Navigate to previous month."""
year = self.display_month.year
month = self.display_month.month - 1
if month < 1:
month = 12
year -= 1
self.display_month = date(year, month, 1)
self.post_message(self.MonthChanged(self.display_month))
self.refresh()

View File

@@ -0,0 +1,751 @@
"""Week view grid widget for Calendar TUI.
Displays a week of calendar events in a grid layout where:
- Columns represent days (5 or 7)
- Rows represent time slots (30 minutes per row)
- Events span multiple rows proportionally to their duration
"""
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta
from typing import List, Optional, Tuple
from rich.style import Style
from rich.segment import Segment
from textual.binding import Binding
from textual.containers import Vertical
from textual.geometry import Size
from textual.message import Message
from textual.reactive import reactive
from textual.scroll_view import ScrollView
from textual.strip import Strip
from textual.widget import Widget
from src.calendar.backend import Event
from src.calendar import config
# Column widths
TIME_COLUMN_WIDTH = 6 # "HH:MM "
MIN_DAY_COLUMN_WIDTH = 10 # Minimum width for each day column
DEFAULT_DAY_COLUMN_WIDTH = 20 # Default/preferred width for each day column
def get_rows_per_hour() -> int:
"""Get rows per hour from config."""
return 60 // config.minutes_per_row()
def get_total_rows() -> int:
"""Get total rows for 24 hours."""
return 24 * get_rows_per_hour()
def get_week_start_for_date(target_date: date) -> date:
"""Get the week start date for a given date based on config.
Config uses: 0=Sunday, 1=Monday, ..., 6=Saturday
Python weekday() uses: 0=Monday, ..., 6=Sunday
"""
week_start_cfg = config.week_start_day() # 0=Sunday, 1=Monday, etc.
python_weekday = target_date.weekday() # 0=Monday, 6=Sunday
# Convert config week start to python weekday
# Sunday(0) -> 6, Monday(1) -> 0, Tuesday(2) -> 1, etc.
python_week_start = (week_start_cfg - 1) % 7
# Calculate days since week start
days_since_week_start = (python_weekday - python_week_start) % 7
return target_date - timedelta(days=days_since_week_start)
def get_day_column_for_date(target_date: date, week_start: date) -> int:
"""Get the column index for a date within its week.
Returns the number of days since week_start.
"""
return (target_date - week_start).days
@dataclass
class DayColumn:
"""Events and layout for a single day column."""
day: date
events: List[Event] = field(default_factory=list)
# 2D grid: row -> list of events at that row
grid: List[List[Event]] = field(default_factory=list)
def __post_init__(self):
# Initialize grid with rows for 24 hours
self.grid = [[] for _ in range(get_total_rows())]
def layout_events(self) -> None:
"""Layout events handling overlaps."""
total_rows = get_total_rows()
minutes_per_row = config.minutes_per_row()
# Clear the grid
self.grid = [[] for _ in range(total_rows)]
# Sort events by start time, then by duration (longer first)
sorted_events = sorted(
self.events, key=lambda e: (e.start, -(e.end - e.start).total_seconds())
)
for event in sorted_events:
if event.all_day:
continue # Handle all-day events separately
start_row, end_row = event.get_row_span(minutes_per_row)
# Clamp to valid range
start_row = max(0, min(start_row, total_rows - 1))
end_row = max(start_row + 1, min(end_row, total_rows))
# Add event to each row it spans
for row in range(start_row, end_row):
if event not in self.grid[row]:
self.grid[row].append(event)
class WeekGridHeader(Widget):
"""Fixed header widget showing day names."""
DEFAULT_CSS = """
WeekGridHeader {
height: 1;
background: $surface;
}
"""
def __init__(
self,
days: List[date],
cursor_col: int = 0,
include_weekends: bool = True,
name: Optional[str] = None,
id: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id)
self._days = days
self._cursor_col = cursor_col
self._include_weekends = include_weekends
def update_days(self, days: List[date], cursor_col: int) -> None:
"""Update the displayed days."""
self._days = days
self._cursor_col = cursor_col
self.refresh()
def set_include_weekends(self, include_weekends: bool) -> None:
"""Update the include_weekends setting."""
self._include_weekends = include_weekends
self.refresh()
@property
def num_days(self) -> int:
return 7 if self._include_weekends else 5
def _get_day_column_width(self) -> int:
"""Calculate day column width based on available space."""
available_width = self.size.width - TIME_COLUMN_WIDTH
if available_width <= 0 or self.num_days == 0:
return DEFAULT_DAY_COLUMN_WIDTH
width_per_day = available_width // self.num_days
return max(MIN_DAY_COLUMN_WIDTH, width_per_day)
def _get_theme_color(self, color_name: str) -> str:
"""Get a color from the current theme."""
try:
theme = self.app.current_theme
color = getattr(theme, color_name, None)
if color:
return str(color)
except Exception:
pass
# Fallback colors
fallbacks = {
"secondary": "#81A1C1",
"primary": "#88C0D0",
"foreground": "#D8DEE9",
"surface": "#3B4252",
}
return fallbacks.get(color_name, "white")
def render_line(self, y: int) -> Strip:
"""Render the header row."""
day_col_width = self._get_day_column_width()
if y != 0:
return Strip.blank(TIME_COLUMN_WIDTH + (day_col_width * self.num_days))
segments = []
# Time column spacer
segments.append(Segment(" " * TIME_COLUMN_WIDTH))
# Get theme colors
secondary_color = self._get_theme_color("secondary")
# Day headers
today = date.today()
for i, day in enumerate(self._days):
day_name = day.strftime("%a %m/%d")
# Style based on selection and today
if i == self._cursor_col:
style = Style(bold=True, reverse=True)
elif day == today:
# Highlight today with theme secondary color
style = Style(bold=True, color="white", bgcolor=secondary_color)
elif day.weekday() >= 5: # Weekend
style = Style(color="bright_black")
else:
style = Style()
# Center the day name in the column
header = day_name.center(day_col_width)
segments.append(Segment(header, style))
return Strip(segments)
class WeekGridBody(ScrollView):
"""Scrollable body of the week grid showing time slots and events."""
# Reactive attributes
cursor_row: reactive[int] = reactive(0)
cursor_col: reactive[int] = reactive(0)
# Messages
class CursorMoved(Message):
"""Cursor position changed."""
def __init__(self, row: int, col: int) -> None:
super().__init__()
self.row = row
self.col = col
def __init__(
self,
include_weekends: bool = True,
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
self._days: List[DayColumn] = []
self._include_weekends = include_weekends
self._work_day_start = config.work_day_start_hour()
self._work_day_end = config.work_day_end_hour()
def _get_theme_color(self, color_name: str) -> str:
"""Get a color from the current theme."""
try:
theme = self.app.current_theme
color = getattr(theme, color_name, None)
if color:
return str(color)
except Exception:
pass
# Fallback colors
fallbacks = {
"secondary": "#81A1C1",
"primary": "#88C0D0",
"accent": "#B48EAD",
"foreground": "#D8DEE9",
"surface": "#3B4252",
"warning": "#EBCB8B",
"error": "#BF616A",
}
return fallbacks.get(color_name, "white")
@property
def num_days(self) -> int:
return 7 if self._include_weekends else 5
def _get_day_column_width(self) -> int:
"""Calculate day column width based on available space."""
available_width = self.size.width - TIME_COLUMN_WIDTH
if available_width <= 0 or self.num_days == 0:
return DEFAULT_DAY_COLUMN_WIDTH
width_per_day = available_width // self.num_days
return max(MIN_DAY_COLUMN_WIDTH, width_per_day)
@property
def content_width(self) -> int:
return TIME_COLUMN_WIDTH + (self._get_day_column_width() * self.num_days)
def get_content_height(self, container: Size, viewport: Size, width: int) -> int:
return get_total_rows()
def on_mount(self) -> None:
"""Set up virtual size for scrolling."""
self._update_virtual_size()
def _update_virtual_size(self) -> None:
"""Update the virtual size based on content dimensions."""
self.virtual_size = Size(self.content_width, get_total_rows())
def set_days(self, days: List[DayColumn]) -> None:
"""Set the day columns to display."""
self._days = days
self._update_virtual_size()
self.refresh()
def set_include_weekends(self, include_weekends: bool) -> None:
"""Update the include_weekends setting."""
self._include_weekends = include_weekends
self._update_virtual_size()
self.refresh()
def watch_cursor_row(self, old: int, new: int) -> None:
"""Handle cursor row changes."""
total_rows = get_total_rows()
# Clamp cursor row
if new < 0:
self.cursor_row = 0
elif new >= total_rows:
self.cursor_row = total_rows - 1
else:
# Scroll to keep cursor visible with a 2-row margin from viewport edges
self._scroll_to_keep_cursor_visible(new)
self.post_message(self.CursorMoved(new, self.cursor_col))
self.refresh()
def _scroll_to_keep_cursor_visible(self, cursor_row: int) -> None:
"""Scroll viewport only when cursor gets within 2 rows of the edge."""
margin = 2 # Number of rows to keep between cursor and viewport edge
scroll_y = int(self.scroll_offset.y)
viewport_height = self.size.height
# Calculate visible range
visible_top = scroll_y
visible_bottom = scroll_y + viewport_height - 1
# Check if cursor is too close to the top edge
if cursor_row < visible_top + margin:
# Scroll up to keep margin above cursor
new_scroll_y = max(0, cursor_row - margin)
self.scroll_to(y=new_scroll_y, animate=False)
# Check if cursor is too close to the bottom edge
elif cursor_row > visible_bottom - margin:
# Scroll down to keep margin below cursor
new_scroll_y = cursor_row - viewport_height + margin + 1
self.scroll_to(y=new_scroll_y, animate=False)
def watch_cursor_col(self, old: int, new: int) -> None:
"""Handle cursor column changes."""
self.post_message(self.CursorMoved(self.cursor_row, new))
self.refresh()
def render_line(self, y: int) -> Strip:
"""Render a single line of the grid."""
scroll_y = int(self.scroll_offset.y)
row_index = y + scroll_y
total_rows = get_total_rows()
if row_index < 0 or row_index >= total_rows:
return Strip.blank(self.content_width)
return self._render_time_row(row_index)
def _render_time_row(self, row_index: int) -> Strip:
"""Render a time row with events."""
rows_per_hour = get_rows_per_hour()
minutes_per_row = config.minutes_per_row()
segments = []
# Check if this is the current time row
now = datetime.now()
current_row = (now.hour * rows_per_hour) + (now.minute // minutes_per_row)
is_current_time_row = row_index == current_row
# Time label (only show on the hour)
if row_index % rows_per_hour == 0:
hour = row_index // rows_per_hour
time_str = f"{hour:02d}:00 "
else:
time_str = " " # Blank for half-hour
# Style time label - highlight current time, dim outside work hours
if is_current_time_row:
secondary_color = self._get_theme_color("secondary")
time_style = Style(color=secondary_color, bold=True)
elif (
row_index < self._work_day_start * rows_per_hour
or row_index >= self._work_day_end * rows_per_hour
):
time_style = Style(color="bright_black")
else:
primary_color = self._get_theme_color("primary")
time_style = Style(color=primary_color)
segments.append(Segment(time_str, time_style))
# Event cells for each day
for col_idx, day_col in enumerate(self._days):
cell_text, cell_style = self._render_event_cell(day_col, row_index, col_idx)
segments.append(Segment(cell_text, cell_style))
return Strip(segments)
def _render_event_cell(
self, day_col: DayColumn, row_index: int, col_idx: int
) -> Tuple[str, Style]:
"""Render a single cell for a day/time slot."""
events_at_row = day_col.grid[row_index] if row_index < len(day_col.grid) else []
rows_per_hour = get_rows_per_hour()
minutes_per_row = config.minutes_per_row()
day_col_width = self._get_day_column_width()
is_cursor = col_idx == self.cursor_col and row_index == self.cursor_row
if not events_at_row:
# Empty cell
if is_cursor:
return ">" + " " * (day_col_width - 1), Style(reverse=True)
else:
# Grid line style
if row_index % rows_per_hour == 0:
return "-" * day_col_width, Style(color="bright_black")
else:
return " " * day_col_width, Style()
# Get the event to display (first one if multiple)
event = events_at_row[0]
# Determine if this is the start row for this event
start_row, _ = event.get_row_span(minutes_per_row)
is_start = row_index == max(0, start_row)
# Build cell text
if is_start:
# Show event title with time
time_str = event.start.strftime("%H:%M")
title = event.title[: day_col_width - 7] # Leave room for time
cell_text = f"{time_str} {title}"
else:
# Continuation of event
cell_text = "" + event.title[: day_col_width - 3]
# Pad/truncate to column width
cell_text = cell_text[:day_col_width].ljust(day_col_width)
# Style based on event and cursor
if is_cursor:
style = Style(bold=True, reverse=True)
elif len(events_at_row) > 1:
# Overlapping events - use warning color
warning_color = self._get_theme_color("warning")
style = Style(bgcolor=warning_color, color="black")
else:
# Normal event - use primary color
primary_color = self._get_theme_color("primary")
style = Style(bgcolor=primary_color, color="black")
return cell_text, style
def get_event_at_cursor(self) -> Optional[Event]:
"""Get the event at the current cursor position."""
if self.cursor_col < 0 or self.cursor_col >= len(self._days):
return None
day_col = self._days[self.cursor_col]
if self.cursor_row < 0 or self.cursor_row >= len(day_col.grid):
return None
events = day_col.grid[self.cursor_row]
return events[0] if events else None
class WeekGrid(Vertical):
"""Week view calendar grid widget with fixed header."""
DEFAULT_CSS = """
WeekGrid {
height: 1fr;
}
WeekGridHeader {
height: 1;
}
WeekGridBody {
height: 1fr;
scrollbar-gutter: stable;
}
"""
BINDINGS = [
Binding("j", "cursor_down", "Down", show=False),
Binding("k", "cursor_up", "Up", show=False),
Binding("h", "cursor_left", "Left", show=False),
Binding("l", "cursor_right", "Right", show=False),
Binding("H", "prev_week", "Prev Week", show=True),
Binding("L", "next_week", "Next Week", show=True),
Binding("g", "goto_today", "Today", show=True),
Binding("enter", "select_event", "View", show=True),
]
# Reactive attributes
week_start: reactive[date] = reactive(date.today)
include_weekends: reactive[bool] = reactive(True)
# Messages
class EventSelected(Message):
"""Event was selected."""
def __init__(self, event: Event) -> None:
super().__init__()
self.event = event
class WeekChanged(Message):
"""Week was changed via navigation."""
def __init__(self, week_start: date) -> None:
super().__init__()
self.week_start = week_start
def __init__(
self,
week_start: Optional[date] = None,
include_weekends: bool = True,
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
# Initialize state BEFORE setting reactive attributes
self._days: List[DayColumn] = []
self._events_by_date: dict[date, List[Event]] = {}
self._header: Optional[WeekGridHeader] = None
self._body: Optional[WeekGridBody] = None
# Set week start based on config.week_start_day() if not provided
if week_start is None:
today = date.today()
week_start = get_week_start_for_date(today)
self.include_weekends = include_weekends
self.week_start = week_start
@property
def num_days(self) -> int:
return 7 if self.include_weekends else 5
@property
def cursor_row(self) -> int:
"""Get current cursor row."""
if self._body:
return self._body.cursor_row
return 0
@cursor_row.setter
def cursor_row(self, value: int) -> None:
"""Set cursor row."""
if self._body:
self._body.cursor_row = value
@property
def cursor_col(self) -> int:
"""Get current cursor column."""
if self._body:
return self._body.cursor_col
return 0
@cursor_col.setter
def cursor_col(self, value: int) -> None:
"""Set cursor column."""
if self._body:
self._body.cursor_col = value
if self._header:
self._header.update_days([d.day for d in self._days], value)
def compose(self):
"""Compose the widget."""
days = [d.day for d in self._days] if self._days else []
self._header = WeekGridHeader(
days=days,
cursor_col=0,
include_weekends=self.include_weekends,
)
self._body = WeekGridBody(
include_weekends=self.include_weekends,
)
yield self._header
yield self._body
def on_mount(self) -> None:
"""Initialize on mount - set cursor to current day/time."""
self._init_week()
self.goto_now()
def _init_week(self) -> None:
"""Initialize the week's day columns."""
self._days = []
# Always iterate through 7 days from week_start
for i in range(7):
day = self.week_start + timedelta(days=i)
# Skip weekend days (Saturday=5, Sunday=6) when not including weekends
if not self.include_weekends and day.weekday() >= 5:
continue
col = DayColumn(day=day)
if day in self._events_by_date:
col.events = self._events_by_date[day]
col.layout_events()
self._days.append(col)
# Update child widgets
if self._header:
self._header.update_days(
[d.day for d in self._days], self._body.cursor_col if self._body else 0
)
if self._body:
self._body.set_days(self._days)
def set_events(self, events_by_date: dict[date, List[Event]]) -> None:
"""Set the events to display."""
self._events_by_date = events_by_date
self._init_week()
self.refresh()
def goto_now(self) -> None:
"""Set cursor to current day and time, scroll to work day start."""
today = date.today()
now = datetime.now()
rows_per_hour = get_rows_per_hour()
minutes_per_row = config.minutes_per_row()
# Set week to contain today using configurable week start day
week_start_date = get_week_start_for_date(today)
if self.week_start != week_start_date:
self.week_start = week_start_date
# Set cursor column to today (relative to week start)
col = get_day_column_for_date(today, self.week_start)
if not self.include_weekends and col >= 5:
col = 4 # Last weekday if weekend
self.cursor_col = col
# Set cursor row to current time
current_row = (now.hour * rows_per_hour) + (now.minute // minutes_per_row)
self.cursor_row = current_row
# Scroll to show work day start initially
if self._body:
work_start_row = config.work_day_start_hour() * rows_per_hour
# If current time is before work day start, scroll to work day start
# Otherwise scroll to show current time
scroll_target = min(work_start_row, current_row)
self._body.scroll_to(y=scroll_target, animate=False)
def watch_week_start(self, old: date, new: date) -> None:
"""Handle week_start changes."""
self._init_week()
self.post_message(self.WeekChanged(new))
self.refresh()
def watch_include_weekends(self, old: bool, new: bool) -> None:
"""Handle include_weekends changes."""
if self._header:
self._header.set_include_weekends(new)
if self._body:
self._body.set_include_weekends(new)
self._init_week()
self.refresh()
def on_week_grid_body_cursor_moved(self, message: WeekGridBody.CursorMoved) -> None:
"""Handle cursor moves in body - update header."""
if self._header:
self._header.update_days([d.day for d in self._days], message.col)
def get_event_at_cursor(self) -> Optional[Event]:
"""Get the event at the current cursor position."""
if self._body:
return self._body.get_event_at_cursor()
return None
def get_cursor_date(self) -> date:
"""Get the date at the current cursor column."""
if self._days and 0 <= self.cursor_col < len(self._days):
return self._days[self.cursor_col].day
return date.today()
def get_cursor_time(self):
"""Get the time at the current cursor row.
Returns:
A time object for the cursor row position.
"""
from datetime import time as time_type
minutes_per_row = config.minutes_per_row()
total_minutes = self.cursor_row * minutes_per_row
hour = total_minutes // 60
minute = total_minutes % 60
# Clamp to valid range
hour = max(0, min(23, hour))
minute = max(0, min(59, minute))
return time_type(hour, minute)
# Actions
def action_cursor_down(self) -> None:
"""Move cursor down."""
if self._body:
self._body.cursor_row += 1
def action_cursor_up(self) -> None:
"""Move cursor up."""
if self._body:
self._body.cursor_row -= 1
def action_cursor_left(self) -> None:
"""Move cursor left (wraps to previous week)."""
if self._body:
if self._body.cursor_col <= 0:
# Wrap to previous week
self._body.cursor_col = self.num_days - 1
self.action_prev_week()
else:
self._body.cursor_col -= 1
if self._header:
self._header.update_days(
[d.day for d in self._days], self._body.cursor_col
)
def action_cursor_right(self) -> None:
"""Move cursor right (wraps to next week)."""
if self._body:
if self._body.cursor_col >= self.num_days - 1:
# Wrap to next week
self._body.cursor_col = 0
self.action_next_week()
else:
self._body.cursor_col += 1
if self._header:
self._header.update_days(
[d.day for d in self._days], self._body.cursor_col
)
def action_prev_week(self) -> None:
"""Navigate to previous week."""
self.week_start = self.week_start - timedelta(weeks=1)
def action_next_week(self) -> None:
"""Navigate to next week."""
self.week_start = self.week_start + timedelta(weeks=1)
def action_goto_today(self) -> None:
"""Navigate to current week and today's column/time."""
self.goto_now()
def action_select_event(self) -> None:
"""Select the event at cursor."""
event = self.get_event_at_cursor()
if event:
self.post_message(self.EventSelected(event))

View File

@@ -0,0 +1,7 @@
"""Calendar TUI widgets."""
from .WeekGrid import WeekGrid
from .AddEventForm import AddEventForm, EventFormData
from .MonthCalendar import MonthCalendar
__all__ = ["WeekGrid", "AddEventForm", "EventFormData", "MonthCalendar"]

View File

@@ -1,8 +1,32 @@
"""Calendar CLI commands."""
import click
import subprocess
@click.command()
def calendar():
"""Open the calendar (khal interactive)."""
click.echo("Opening calendar...")
subprocess.run(["khal", "interactive"])
@click.option("--interactive", "-i", is_flag=True, help="Use khal interactive mode")
@click.option("--weekdays", "-w", is_flag=True, help="Show only weekdays (Mon-Fri)")
def calendar(interactive: bool, weekdays: bool):
"""Open the calendar TUI.
Displays a week view of calendar events from khal.
Navigation:
j/k - Move up/down (time)
h/l - Move left/right (day)
H/L - Previous/Next week
g - Go to today
w - Toggle weekends
Enter - View event details
q - Quit
"""
if interactive:
# Fallback to khal interactive mode
import subprocess
click.echo("Opening khal interactive...")
subprocess.run(["khal", "interactive"])
else:
from src.calendar import run_app
run_app()

View File

@@ -20,6 +20,7 @@ from src.services.microsoft_graph.calendar import (
)
from src.services.microsoft_graph.mail import (
fetch_mail_async,
fetch_archive_mail_async,
archive_mail_async,
delete_mail_async,
synchronize_maildir_async,
@@ -216,7 +217,9 @@ def create_maildir_structure(base_path):
ensure_directory_exists(os.path.join(base_path, "cur"))
ensure_directory_exists(os.path.join(base_path, "new"))
ensure_directory_exists(os.path.join(base_path, "tmp"))
ensure_directory_exists(os.path.join(base_path, ".Archives"))
ensure_directory_exists(os.path.join(base_path, ".Archive", "cur"))
ensure_directory_exists(os.path.join(base_path, ".Archive", "new"))
ensure_directory_exists(os.path.join(base_path, ".Archive", "tmp"))
ensure_directory_exists(os.path.join(base_path, ".Trash", "cur"))
# Create outbox structure for sending emails
ensure_directory_exists(os.path.join(base_path, "outbox", "new"))
@@ -436,6 +439,7 @@ async def _sync_outlook_data(
with progress:
task_fetch = progress.add_task("[green]Syncing Inbox...", total=0)
task_fetch_archive = progress.add_task("[green]Syncing Archive...", total=0)
task_calendar = progress.add_task("[cyan]Fetching calendar...", total=0)
task_local_calendar = progress.add_task(
"[magenta]Syncing local calendar...", total=0
@@ -515,6 +519,15 @@ async def _sync_outlook_data(
dry_run,
download_attachments,
),
fetch_archive_mail_async(
maildir_path,
attachments_dir,
headers,
progress,
task_fetch_archive,
dry_run,
download_attachments,
),
fetch_calendar_async(
headers,
progress,

View File

@@ -17,15 +17,45 @@ from textual.binding import Binding
from rich.text import Text
from datetime import datetime, timedelta
import asyncio
import json
import os
import sys
import time
from typing import Dict, Any, Optional, List, Callable
from pathlib import Path
from src.utils.shared_config import get_theme_name
# Default sync interval in seconds (5 minutes)
DEFAULT_SYNC_INTERVAL = 300
# Sync tasks config file path
SYNC_TASKS_CONFIG_PATH = os.path.expanduser("~/.config/luk/sync_tasks.json")
def load_sync_tasks_config() -> Dict[str, bool]:
"""Load sync tasks enabled/disabled state from config file.
Returns:
Dict mapping task_id to enabled state (True = enabled, False = disabled)
"""
if os.path.exists(SYNC_TASKS_CONFIG_PATH):
try:
with open(SYNC_TASKS_CONFIG_PATH, "r") as f:
return json.load(f)
except Exception:
pass
# Default: all tasks enabled
return {}
def save_sync_tasks_config(config: Dict[str, bool]) -> None:
"""Save sync tasks enabled/disabled state to config file."""
os.makedirs(os.path.dirname(SYNC_TASKS_CONFIG_PATH), exist_ok=True)
with open(SYNC_TASKS_CONFIG_PATH, "w") as f:
json.dump(config, f, indent=2)
# Futuristic spinner frames
# SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
# Alternative spinners you could use:
@@ -73,7 +103,9 @@ class TaskStatus:
class TaskListItem(ListItem):
"""A list item representing a sync task."""
def __init__(self, task_id: str, task_name: str, *args, **kwargs):
def __init__(
self, task_id: str, task_name: str, enabled: bool = True, *args, **kwargs
):
super().__init__(*args, **kwargs)
self.task_id = task_id
self.task_name = task_name
@@ -81,6 +113,7 @@ class TaskListItem(ListItem):
self.progress = 0
self.total = 100
self.spinner_frame = 0
self.enabled = enabled
def compose(self) -> ComposeResult:
"""Compose the task item layout."""
@@ -88,6 +121,8 @@ class TaskListItem(ListItem):
def _get_status_icon(self) -> str:
"""Get icon based on status."""
if not self.enabled:
return "" # Disabled icon
if self.status == TaskStatus.RUNNING:
return SPINNER_FRAMES[self.spinner_frame % len(SPINNER_FRAMES)]
icons = {
@@ -103,6 +138,8 @@ class TaskListItem(ListItem):
def _get_status_color(self) -> str:
"""Get color based on status."""
if not self.enabled:
return "dim italic"
colors = {
TaskStatus.PENDING: "dim",
TaskStatus.RUNNING: "cyan",
@@ -116,6 +153,13 @@ class TaskListItem(ListItem):
icon = self._get_status_icon()
color = self._get_status_color()
# Disabled tasks show differently
if not self.enabled:
text = Text()
text.append(f"{icon} ", style="dim")
text.append(f"{self.task_name} [Disabled]", style=color)
return text
# Use green checkmark for completed, but white text for readability
if self.status == TaskStatus.RUNNING:
progress_pct = (
@@ -156,6 +200,7 @@ class SyncDashboard(App):
Binding("s", "sync_now", "Sync Now"),
Binding("d", "daemonize", "Daemonize"),
Binding("r", "refresh", "Refresh"),
Binding("t", "toggle_task", "Toggle"),
Binding("+", "increase_interval", "+Interval"),
Binding("-", "decrease_interval", "-Interval"),
Binding("up", "cursor_up", "Up", show=False),
@@ -277,10 +322,16 @@ class SyncDashboard(App):
self._initial_sync_interval = sync_interval
self._notify = notify
self._demo_mode = demo_mode
# Load task enabled/disabled config
self._tasks_config = load_sync_tasks_config()
# Merge provided config with defaults
self._sync_config = {**DEFAULT_SYNC_CONFIG, **(sync_config or {})}
self._sync_config["notify"] = notify
def _is_task_enabled(self, task_id: str) -> bool:
"""Check if a task is enabled (default: True)."""
return self._tasks_config.get(task_id, True)
def compose(self) -> ComposeResult:
"""Compose the dashboard layout."""
yield Header()
@@ -291,15 +342,56 @@ class SyncDashboard(App):
yield Static("Tasks", classes="sidebar-title")
yield ListView(
# Stage 1: Sync local changes to server
TaskListItem("archive", "Archive Mail", id="task-archive"),
TaskListItem("outbox", "Outbox Send", id="task-outbox"),
TaskListItem(
"archive",
"Archive Mail",
enabled=self._is_task_enabled("archive"),
id="task-archive",
),
TaskListItem(
"outbox",
"Outbox Send",
enabled=self._is_task_enabled("outbox"),
id="task-outbox",
),
# Stage 2: Fetch from server
TaskListItem("inbox", "Inbox Sync", id="task-inbox"),
TaskListItem("calendar", "Calendar Sync", id="task-calendar"),
TaskListItem(
"inbox",
"Inbox Sync",
enabled=self._is_task_enabled("inbox"),
id="task-inbox",
),
TaskListItem(
"archive-fetch",
"Archive Sync",
enabled=self._is_task_enabled("archive-fetch"),
id="task-archive-fetch",
),
TaskListItem(
"calendar",
"Calendar Sync",
enabled=self._is_task_enabled("calendar"),
id="task-calendar",
),
# Stage 3: Task management
TaskListItem("godspeed", "Godspeed Sync", id="task-godspeed"),
TaskListItem("dstask", "dstask Sync", id="task-dstask"),
TaskListItem("sweep", "Task Sweep", id="task-sweep"),
TaskListItem(
"godspeed",
"Godspeed Sync",
enabled=self._is_task_enabled("godspeed"),
id="task-godspeed",
),
TaskListItem(
"dstask",
"dstask Sync",
enabled=self._is_task_enabled("dstask"),
id="task-dstask",
),
TaskListItem(
"sweep",
"Task Sweep",
enabled=self._is_task_enabled("sweep"),
id="task-sweep",
),
id="task-list",
)
# Countdown timer at bottom of sidebar
@@ -332,6 +424,9 @@ class SyncDashboard(App):
def on_mount(self) -> None:
"""Initialize the dashboard."""
# Set theme from shared config
self.theme = get_theme_name()
# Store references to task items
task_list = self.query_one("#task-list", ListView)
for item in task_list.children:
@@ -436,6 +531,12 @@ class SyncDashboard(App):
if task_id == self.selected_task:
self._update_main_panel()
def is_task_enabled(self, task_id: str) -> bool:
"""Check if a task is enabled."""
if task_id in self._task_items:
return self._task_items[task_id].enabled
return self._is_task_enabled(task_id)
def update_task(self, task_id: str, progress: int, message: str = "") -> None:
"""Update task progress."""
if task_id in self._task_items:
@@ -495,6 +596,25 @@ class SyncDashboard(App):
task_list = self.query_one("#task-list", ListView)
task_list.action_cursor_down()
def action_toggle_task(self) -> None:
"""Toggle the selected task enabled/disabled state."""
if self.selected_task not in self._task_items:
return
item = self._task_items[self.selected_task]
# Toggle enabled state
item.enabled = not item.enabled
item.update_display()
# Update config and save
self._tasks_config[self.selected_task] = item.enabled
save_sync_tasks_config(self._tasks_config)
# Log the change
state = "enabled" if item.enabled else "disabled"
self._log_to_task(self.selected_task, f"Task {state}")
self._update_main_panel()
def action_sync_now(self) -> None:
"""Trigger an immediate sync."""
if self._sync_callback:
@@ -770,6 +890,10 @@ class SyncProgressTracker:
"""Mark a task as skipped."""
self.dashboard.skip_task(task_id, reason)
def is_task_enabled(self, task_id: str) -> bool:
"""Check if a task is enabled."""
return self.dashboard.is_task_enabled(task_id)
# Global dashboard instance
_dashboard_instance: Optional[SyncDashboard] = None
@@ -823,68 +947,99 @@ async def run_dashboard_sync(
# Stage 1: Sync local changes to server
# Archive mail
tracker.start_task("archive", 100)
tracker.update_task("archive", 50, "Scanning for archived messages...")
await asyncio.sleep(0.3)
tracker.update_task("archive", 100, "Moving 3 messages to archive...")
await asyncio.sleep(0.2)
tracker.complete_task("archive", "3 messages archived")
if tracker.is_task_enabled("archive"):
tracker.start_task("archive", 100)
tracker.update_task("archive", 50, "Scanning for archived messages...")
await asyncio.sleep(0.3)
tracker.update_task("archive", 100, "Moving 3 messages to archive...")
await asyncio.sleep(0.2)
tracker.complete_task("archive", "3 messages archived")
else:
tracker.skip_task("archive", "Disabled")
# Outbox
tracker.start_task("outbox", 100)
tracker.update_task("outbox", 50, "Checking outbox...")
await asyncio.sleep(0.2)
tracker.complete_task("outbox", "No pending emails")
if tracker.is_task_enabled("outbox"):
tracker.start_task("outbox", 100)
tracker.update_task("outbox", 50, "Checking outbox...")
await asyncio.sleep(0.2)
tracker.complete_task("outbox", "No pending emails")
else:
tracker.skip_task("outbox", "Disabled")
# Stage 2: Fetch from server
# Inbox sync - simulate finding new messages
tracker.start_task("inbox", 100)
for i in range(0, 101, 20):
tracker.update_task("inbox", i, f"Fetching emails... {i}%")
await asyncio.sleep(0.3)
if tracker.is_task_enabled("inbox"):
tracker.start_task("inbox", 100)
for i in range(0, 101, 20):
tracker.update_task("inbox", i, f"Fetching emails... {i}%")
await asyncio.sleep(0.3)
new_message_count = random.randint(0, 5)
if new_message_count > 0:
tracker.complete_task("inbox", f"{new_message_count} new emails")
if dashboard._notify:
from src.utils.notifications import notify_new_emails
new_message_count = random.randint(0, 5)
if new_message_count > 0:
tracker.complete_task("inbox", f"{new_message_count} new emails")
if dashboard._notify:
from src.utils.notifications import notify_new_emails
notify_new_emails(new_message_count, "")
notify_new_emails(new_message_count, "")
else:
tracker.complete_task("inbox", "No new emails")
else:
tracker.complete_task("inbox", "No new emails")
tracker.skip_task("inbox", "Disabled")
# Archive fetch
if tracker.is_task_enabled("archive-fetch"):
tracker.start_task("archive-fetch", 100)
for i in range(0, 101, 25):
tracker.update_task("archive-fetch", i, f"Fetching archive... {i}%")
await asyncio.sleep(0.2)
tracker.complete_task("archive-fetch", "Archive synced")
else:
tracker.skip_task("archive-fetch", "Disabled")
# Calendar sync
tracker.start_task("calendar", 100)
for i in range(0, 101, 25):
tracker.update_task("calendar", i, f"Syncing events... {i}%")
await asyncio.sleep(0.3)
tracker.complete_task("calendar", "25 events synced")
if tracker.is_task_enabled("calendar"):
tracker.start_task("calendar", 100)
for i in range(0, 101, 25):
tracker.update_task("calendar", i, f"Syncing events... {i}%")
await asyncio.sleep(0.3)
tracker.complete_task("calendar", "25 events synced")
else:
tracker.skip_task("calendar", "Disabled")
# Stage 3: Task management
# Godspeed sync
tracker.start_task("godspeed", 100)
for i in range(0, 101, 33):
tracker.update_task(
"godspeed", min(i, 100), f"Syncing tasks... {min(i, 100)}%"
)
await asyncio.sleep(0.3)
tracker.complete_task("godspeed", "42 tasks synced")
if tracker.is_task_enabled("godspeed"):
tracker.start_task("godspeed", 100)
for i in range(0, 101, 33):
tracker.update_task(
"godspeed", min(i, 100), f"Syncing tasks... {min(i, 100)}%"
)
await asyncio.sleep(0.3)
tracker.complete_task("godspeed", "42 tasks synced")
else:
tracker.skip_task("godspeed", "Disabled")
# dstask sync
tracker.start_task("dstask", 100)
tracker.update_task("dstask", 30, "Running dstask sync...")
await asyncio.sleep(0.3)
tracker.update_task("dstask", 70, "Pushing changes...")
await asyncio.sleep(0.2)
tracker.complete_task("dstask", "Sync completed")
if tracker.is_task_enabled("dstask"):
tracker.start_task("dstask", 100)
tracker.update_task("dstask", 30, "Running dstask sync...")
await asyncio.sleep(0.3)
tracker.update_task("dstask", 70, "Pushing changes...")
await asyncio.sleep(0.2)
tracker.complete_task("dstask", "Sync completed")
else:
tracker.skip_task("dstask", "Disabled")
# Task sweep
tracker.start_task("sweep")
tracker.update_task("sweep", 50, "Scanning notes directory...")
await asyncio.sleep(0.2)
tracker.skip_task("sweep", "Before 6 PM, skipping daily sweep")
if tracker.is_task_enabled("sweep"):
tracker.start_task("sweep")
tracker.update_task("sweep", 50, "Scanning notes directory...")
await asyncio.sleep(0.2)
tracker.skip_task("sweep", "Before 6 PM, skipping daily sweep")
else:
tracker.skip_task("sweep", "Disabled")
# Schedule next sync
dashboard.schedule_next_sync()
@@ -902,6 +1057,7 @@ async def run_dashboard_sync(
synchronize_maildir_async,
process_outbox_async,
fetch_mail_async,
fetch_archive_mail_async,
)
from src.services.microsoft_graph.calendar import (
fetch_calendar_events,
@@ -955,32 +1111,46 @@ async def run_dashboard_sync(
# ===== STAGE 1: Sync local changes to server =====
# Archive mail
tracker.start_task("archive", 100)
tracker.update_task("archive", 10, "Checking for archived messages...")
try:
archive_progress = DashboardProgressAdapter(tracker, "archive")
await archive_mail_async(
maildir_path, headers, archive_progress, None, dry_run
)
tracker.complete_task("archive", "Archive sync complete")
except Exception as e:
tracker.error_task("archive", str(e))
if tracker.is_task_enabled("archive"):
tracker.start_task("archive", 100)
tracker.update_task("archive", 10, "Checking for archived messages...")
try:
archive_progress = DashboardProgressAdapter(tracker, "archive")
await archive_mail_async(
maildir_path,
headers,
archive_progress,
None,
dry_run,
is_cancelled=lambda: not tracker.is_task_enabled("archive"),
)
if tracker.is_task_enabled("archive"):
tracker.complete_task("archive", "Archive sync complete")
else:
tracker.skip_task("archive", "Cancelled")
except Exception as e:
tracker.error_task("archive", str(e))
else:
tracker.skip_task("archive", "Disabled")
# Process outbox (send pending emails)
tracker.start_task("outbox", 100)
tracker.update_task("outbox", 10, "Checking outbox...")
try:
outbox_progress = DashboardProgressAdapter(tracker, "outbox")
result = await process_outbox_async(
base_maildir_path, org, headers, outbox_progress, None, dry_run
)
sent_count, failed_count = result if result else (0, 0)
if sent_count > 0:
tracker.complete_task("outbox", f"{sent_count} emails sent")
else:
tracker.complete_task("outbox", "No pending emails")
except Exception as e:
tracker.error_task("outbox", str(e))
if tracker.is_task_enabled("outbox"):
tracker.start_task("outbox", 100)
tracker.update_task("outbox", 10, "Checking outbox...")
try:
outbox_progress = DashboardProgressAdapter(tracker, "outbox")
result = await process_outbox_async(
base_maildir_path, org, headers, outbox_progress, None, dry_run
)
sent_count, failed_count = result if result else (0, 0)
if sent_count > 0:
tracker.complete_task("outbox", f"{sent_count} emails sent")
else:
tracker.complete_task("outbox", "No pending emails")
except Exception as e:
tracker.error_task("outbox", str(e))
else:
tracker.skip_task("outbox", "Disabled")
# ===== STAGE 2: Fetch from server =====
@@ -994,160 +1164,216 @@ async def run_dashboard_sync(
messages_before += len([f for f in os.listdir(cur_dir) if ".eml" in f])
# Inbox sync
tracker.start_task("inbox", 100)
tracker.update_task("inbox", 10, "Fetching emails from server...")
try:
inbox_progress = DashboardProgressAdapter(tracker, "inbox")
await fetch_mail_async(
maildir_path,
attachments_dir,
headers,
inbox_progress,
None,
dry_run,
download_attachments,
)
tracker.update_task("inbox", 80, "Processing messages...")
# Count new messages
messages_after = 0
if os.path.exists(new_dir):
messages_after += len(
[f for f in os.listdir(new_dir) if ".eml" in f]
)
if os.path.exists(cur_dir):
messages_after += len(
[f for f in os.listdir(cur_dir) if ".eml" in f]
if tracker.is_task_enabled("inbox"):
tracker.start_task("inbox", 100)
tracker.update_task("inbox", 10, "Fetching emails from server...")
try:
inbox_progress = DashboardProgressAdapter(tracker, "inbox")
await fetch_mail_async(
maildir_path,
attachments_dir,
headers,
inbox_progress,
None,
dry_run,
download_attachments,
is_cancelled=lambda: not tracker.is_task_enabled("inbox"),
)
new_message_count = messages_after - messages_before
# Check if cancelled before completing
if not tracker.is_task_enabled("inbox"):
tracker.skip_task("inbox", "Cancelled")
else:
tracker.update_task("inbox", 80, "Processing messages...")
if new_message_count > 0:
tracker.complete_task("inbox", f"{new_message_count} new emails")
if dashboard._notify and not dry_run:
notify_new_emails(new_message_count, org)
else:
tracker.complete_task("inbox", "No new emails")
except Exception as e:
tracker.error_task("inbox", str(e))
# Count new messages
messages_after = 0
if os.path.exists(new_dir):
messages_after += len(
[f for f in os.listdir(new_dir) if ".eml" in f]
)
if os.path.exists(cur_dir):
messages_after += len(
[f for f in os.listdir(cur_dir) if ".eml" in f]
)
new_message_count = messages_after - messages_before
if new_message_count > 0:
tracker.complete_task(
"inbox", f"{new_message_count} new emails"
)
if dashboard._notify and not dry_run:
notify_new_emails(new_message_count, org)
else:
tracker.complete_task("inbox", "No new emails")
except Exception as e:
tracker.error_task("inbox", str(e))
else:
tracker.skip_task("inbox", "Disabled")
# Archive sync (fetch archived messages from server)
if tracker.is_task_enabled("archive-fetch"):
tracker.start_task("archive-fetch", 100)
tracker.update_task("archive-fetch", 10, "Fetching archived emails...")
try:
archive_progress = DashboardProgressAdapter(
tracker, "archive-fetch"
)
await fetch_archive_mail_async(
maildir_path,
attachments_dir,
headers,
archive_progress,
None,
dry_run,
download_attachments,
is_cancelled=lambda: not tracker.is_task_enabled(
"archive-fetch"
),
)
if tracker.is_task_enabled("archive-fetch"):
tracker.complete_task("archive-fetch", "Archive synced")
else:
tracker.skip_task("archive-fetch", "Cancelled")
except Exception as e:
tracker.error_task("archive-fetch", str(e))
else:
tracker.skip_task("archive-fetch", "Disabled")
# Calendar sync
tracker.start_task("calendar", 100)
tracker.update_task("calendar", 10, "Fetching calendar events...")
try:
events, total_events = await fetch_calendar_events(
headers=headers, days_back=days_back, days_forward=days_forward
)
tracker.update_task(
"calendar", 50, f"Processing {len(events)} events..."
)
if tracker.is_task_enabled("calendar"):
tracker.start_task("calendar", 100)
tracker.update_task("calendar", 10, "Fetching calendar events...")
try:
events, total_events = await fetch_calendar_events(
headers=headers, days_back=days_back, days_forward=days_forward
)
tracker.update_task(
"calendar", 50, f"Processing {len(events)} events..."
)
if not dry_run:
calendar_progress = DashboardProgressAdapter(tracker, "calendar")
org_vdir_path = os.path.join(vdir, org) if vdir else None
if vdir and org_vdir_path:
save_events_to_vdir(
events, org_vdir_path, calendar_progress, None, dry_run
)
elif icsfile:
save_events_to_file(
events,
f"{icsfile}/events_latest.ics",
calendar_progress,
None,
dry_run,
if not dry_run:
calendar_progress = DashboardProgressAdapter(
tracker, "calendar"
)
org_vdir_path = os.path.join(vdir, org) if vdir else None
if vdir and org_vdir_path:
save_events_to_vdir(
events, org_vdir_path, calendar_progress, None, dry_run
)
elif icsfile:
save_events_to_file(
events,
f"{icsfile}/events_latest.ics",
calendar_progress,
None,
dry_run,
)
tracker.complete_task("calendar", f"{len(events)} events synced")
except Exception as e:
tracker.error_task("calendar", str(e))
tracker.complete_task("calendar", f"{len(events)} events synced")
except Exception as e:
tracker.error_task("calendar", str(e))
else:
tracker.skip_task("calendar", "Disabled")
# ===== STAGE 3: Godspeed operations =====
# Godspeed sync (runs every 15 minutes)
tracker.start_task("godspeed", 100)
if should_run_godspeed_sync():
tracker.update_task("godspeed", 10, "Syncing with Godspeed...")
try:
email, password, token = get_godspeed_credentials()
if token or (email and password):
from src.services.godspeed.client import GodspeedClient
from src.services.godspeed.sync import GodspeedSync
if tracker.is_task_enabled("godspeed"):
tracker.start_task("godspeed", 100)
if should_run_godspeed_sync():
tracker.update_task("godspeed", 10, "Syncing with Godspeed...")
try:
email, password, token = get_godspeed_credentials()
if token or (email and password):
from src.services.godspeed.client import GodspeedClient
from src.services.godspeed.sync import GodspeedSync
sync_dir = get_godspeed_sync_directory()
client = GodspeedClient(
email=email, password=password, token=token
)
sync_engine = GodspeedSync(client, sync_dir)
sync_engine.sync_bidirectional()
sync_dir = get_godspeed_sync_directory()
client = GodspeedClient(
email=email, password=password, token=token
)
sync_engine = GodspeedSync(client, sync_dir)
sync_engine.sync_bidirectional()
state = load_sync_state()
state["last_godspeed_sync"] = time.time()
save_sync_state(state)
state = load_sync_state()
state["last_godspeed_sync"] = time.time()
save_sync_state(state)
tracker.complete_task("godspeed", "Sync completed")
else:
tracker.skip_task("godspeed", "No credentials configured")
except Exception as e:
tracker.error_task("godspeed", str(e))
tracker.complete_task("godspeed", "Sync completed")
else:
tracker.skip_task("godspeed", "No credentials configured")
except Exception as e:
tracker.error_task("godspeed", str(e))
else:
tracker.skip_task("godspeed", "Not due yet (every 15 min)")
else:
tracker.skip_task("godspeed", "Not due yet (every 15 min)")
tracker.skip_task("godspeed", "Disabled")
# dstask sync
tracker.start_task("dstask", 100)
try:
from src.services.dstask.client import DstaskClient
if tracker.is_task_enabled("dstask"):
tracker.start_task("dstask", 100)
try:
from src.services.dstask.client import DstaskClient
dstask_client = DstaskClient()
if dstask_client.is_available():
tracker.update_task("dstask", 30, "Running dstask sync...")
success = dstask_client.sync()
if success:
tracker.complete_task("dstask", "Sync completed")
dstask_client = DstaskClient()
if dstask_client.is_available():
tracker.update_task("dstask", 30, "Running dstask sync...")
success = dstask_client.sync()
if success:
tracker.complete_task("dstask", "Sync completed")
else:
tracker.error_task("dstask", "Sync failed")
else:
tracker.error_task("dstask", "Sync failed")
else:
tracker.skip_task("dstask", "dstask not installed")
except Exception as e:
tracker.error_task("dstask", str(e))
tracker.skip_task("dstask", "dstask not installed")
except Exception as e:
tracker.error_task("dstask", str(e))
else:
tracker.skip_task("dstask", "Disabled")
# Task sweep (runs once daily after 6 PM)
tracker.start_task("sweep", 100)
if should_run_sweep():
tracker.update_task("sweep", 10, "Sweeping tasks from notes...")
try:
from src.cli.godspeed import TaskSweeper
if tracker.is_task_enabled("sweep"):
tracker.start_task("sweep", 100)
if should_run_sweep():
tracker.update_task("sweep", 10, "Sweeping tasks from notes...")
try:
from src.cli.godspeed import TaskSweeper
from datetime import datetime
notes_dir_env = os.getenv("NOTES_DIR")
if notes_dir_env and Path(notes_dir_env).exists():
godspeed_dir = get_godspeed_sync_directory()
sweeper = TaskSweeper(
Path(notes_dir_env), godspeed_dir, dry_run=dry_run
)
result = sweeper.sweep_tasks()
state = load_sync_state()
state["last_sweep_date"] = datetime.now().strftime(
"%Y-%m-%d"
)
save_sync_state(state)
swept = result.get("swept_tasks", 0)
if swept > 0:
tracker.complete_task("sweep", f"{swept} tasks swept")
else:
tracker.complete_task("sweep", "No tasks to sweep")
else:
tracker.skip_task("sweep", "$NOTES_DIR not configured")
except Exception as e:
tracker.error_task("sweep", str(e))
else:
from datetime import datetime
notes_dir_env = os.getenv("NOTES_DIR")
if notes_dir_env and Path(notes_dir_env).exists():
godspeed_dir = get_godspeed_sync_directory()
sweeper = TaskSweeper(
Path(notes_dir_env), godspeed_dir, dry_run=dry_run
)
result = sweeper.sweep_tasks()
state = load_sync_state()
state["last_sweep_date"] = datetime.now().strftime("%Y-%m-%d")
save_sync_state(state)
swept = result.get("swept_tasks", 0)
if swept > 0:
tracker.complete_task("sweep", f"{swept} tasks swept")
else:
tracker.complete_task("sweep", "No tasks to sweep")
current_hour = datetime.now().hour
if current_hour < 18:
tracker.skip_task("sweep", "Before 6 PM")
else:
tracker.skip_task("sweep", "$NOTES_DIR not configured")
except Exception as e:
tracker.error_task("sweep", str(e))
tracker.skip_task("sweep", "Already completed today")
else:
from datetime import datetime
current_hour = datetime.now().hour
if current_hour < 18:
tracker.skip_task("sweep", "Before 6 PM")
else:
tracker.skip_task("sweep", "Already completed today")
tracker.skip_task("sweep", "Disabled")
# Schedule next sync
dashboard.schedule_next_sync()

View File

@@ -9,6 +9,7 @@ from .actions.open import action_open
from .actions.delete import delete_current
from src.services.taskwarrior import client as taskwarrior_client
from src.services.himalaya import client as himalaya_client
from src.utils.shared_config import get_theme_name
from textual.containers import Container, ScrollableContainer, Vertical, Horizontal
from textual.timer import Timer
from textual.binding import Binding
@@ -147,7 +148,7 @@ class EmailViewerApp(App):
async def on_mount(self) -> None:
self.alert_timer: Timer | None = None # Timer to throttle alerts
self.theme = "monokai"
self.theme = get_theme_name()
self.title = "MaildirGTD"
self.query_one("#main_content").border_title = self.status_title
sort_indicator = "" if self.sort_order_ascending else ""

View File

@@ -0,0 +1,5 @@
"""Khal service package for calendar operations."""
from .client import KhalClient
__all__ = ["KhalClient"]

332
src/services/khal/client.py Normal file
View File

@@ -0,0 +1,332 @@
"""Khal CLI client for calendar operations.
This module provides a client that uses the khal CLI tool to interact with
calendar data stored in vdir format.
"""
import subprocess
import logging
from datetime import datetime, date, timedelta
from typing import Optional, List
from src.calendar.backend import CalendarBackend, Event
logger = logging.getLogger(__name__)
class KhalClient(CalendarBackend):
"""Calendar backend using khal CLI."""
def __init__(self, config_path: Optional[str] = None):
"""Initialize the Khal client.
Args:
config_path: Optional path to khal config file
"""
self.config_path = config_path
def _run_khal(
self, args: List[str], capture_output: bool = True
) -> subprocess.CompletedProcess:
"""Run a khal command.
Args:
args: Command arguments (after 'khal')
capture_output: Whether to capture stdout/stderr
Returns:
CompletedProcess result
"""
cmd = ["khal"] + args
if self.config_path:
cmd.extend(["-c", self.config_path])
logger.debug(f"Running khal command: {' '.join(cmd)}")
return subprocess.run(
cmd,
capture_output=capture_output,
text=True,
)
def _parse_event_line(
self, line: str, day_header_date: Optional[date] = None
) -> Optional[Event]:
"""Parse a single event line from khal list output.
Expected format: title|start-time|end-time|start|end|location|uid|description|organizer|url|categories|status|recurring
Args:
line: The line to parse
day_header_date: Current day being parsed (from day headers)
Returns:
Event if successfully parsed, None otherwise
"""
# Skip empty lines and day headers
if not line or "|" not in line:
return None
parts = line.split("|")
if len(parts) < 5:
return None
try:
title = parts[0].strip()
start_str = parts[3].strip() # Full datetime
end_str = parts[4].strip() # Full datetime
location = parts[5].strip() if len(parts) > 5 else ""
uid = parts[6].strip() if len(parts) > 6 else ""
description = parts[7].strip() if len(parts) > 7 else ""
organizer = parts[8].strip() if len(parts) > 8 else ""
url = parts[9].strip() if len(parts) > 9 else ""
categories = parts[10].strip() if len(parts) > 10 else ""
status = parts[11].strip() if len(parts) > 11 else ""
recurring_symbol = parts[12].strip() if len(parts) > 12 else ""
# Parse datetimes (format: YYYY-MM-DD HH:MM)
start = datetime.strptime(start_str, "%Y-%m-%d %H:%M")
end = datetime.strptime(end_str, "%Y-%m-%d %H:%M")
# Check for all-day events (typically start at 00:00 and end at 00:00 next day)
all_day = (
start.hour == 0
and start.minute == 0
and end.hour == 0
and end.minute == 0
and (end.date() - start.date()).days >= 1
)
# Check if event is recurring (repeat symbol is typically a loop arrow)
recurring = bool(recurring_symbol)
return Event(
uid=uid or f"{title}_{start_str}",
title=title,
start=start,
end=end,
location=location,
description=description,
organizer=organizer,
url=url,
categories=categories,
status=status,
all_day=all_day,
recurring=recurring,
)
except (ValueError, IndexError) as e:
logger.warning(f"Failed to parse event line '{line}': {e}")
return None
def get_events(
self,
start_date: date,
end_date: date,
calendar: Optional[str] = None,
) -> List[Event]:
"""Get events in a date range.
Args:
start_date: Start of range (inclusive)
end_date: End of range (inclusive)
calendar: Optional calendar name to filter by
Returns:
List of events in the range, sorted by start time
"""
# Format dates for khal
start_str = start_date.strftime("%Y-%m-%d")
# Add one day to end_date to make it inclusive
end_dt = end_date + timedelta(days=1)
end_str = end_dt.strftime("%Y-%m-%d")
# Build command
# Format: title|start-time|end-time|start|end|location|uid|description|organizer|url|categories|status|recurring
format_str = "{title}|{start-time}|{end-time}|{start}|{end}|{location}|{uid}|{description}|{organizer}|{url}|{categories}|{status}|{repeat-symbol}"
args = ["list", "-f", format_str, start_str, end_str]
if calendar:
args.extend(["-a", calendar])
result = self._run_khal(args)
if result.returncode != 0:
logger.error(f"khal list failed: {result.stderr}")
return []
events = []
current_day: Optional[date] = None
for line in result.stdout.strip().split("\n"):
line = line.strip()
if not line:
continue
# Check for day headers (e.g., "Today, 2025-12-18" or "Monday, 2025-12-22")
if ", " in line and "|" not in line:
try:
# Extract date from header
date_part = line.split(", ")[-1]
current_day = datetime.strptime(date_part, "%Y-%m-%d").date()
except ValueError:
pass
continue
event = self._parse_event_line(line, current_day)
if event:
events.append(event)
# Sort by start time
events.sort(key=lambda e: e.start)
return events
def get_event(self, uid: str) -> Optional[Event]:
"""Get a single event by UID.
Args:
uid: Event unique identifier
Returns:
Event if found, None otherwise
"""
# khal doesn't have a direct "get by uid" command
# We search for it instead
result = self._run_khal(["search", uid])
if result.returncode != 0 or not result.stdout.strip():
return None
# Parse the first result
# Search output format is different, so we need to handle it
lines = result.stdout.strip().split("\n")
if lines:
# For now, return None - would need more parsing
# This is a limitation of khal's CLI
return None
return None
def get_calendars(self) -> List[str]:
"""Get list of available calendar names.
Returns:
List of calendar names
"""
result = self._run_khal(["printcalendars"])
if result.returncode != 0:
logger.error(f"khal printcalendars failed: {result.stderr}")
return []
calendars = []
for line in result.stdout.strip().split("\n"):
line = line.strip()
if line:
calendars.append(line)
return calendars
def create_event(
self,
title: str,
start: datetime,
end: datetime,
calendar: Optional[str] = None,
location: Optional[str] = None,
description: Optional[str] = None,
all_day: bool = False,
) -> Event:
"""Create a new event.
Args:
title: Event title
start: Start datetime
end: End datetime
calendar: Calendar to add event to
location: Event location
description: Event description
all_day: Whether this is an all-day event
Returns:
The created event
"""
# Build khal new command
# Format: khal new [-a calendar] start end title [:: description] [-l location]
if all_day:
start_str = start.strftime("%Y-%m-%d")
end_str = end.strftime("%Y-%m-%d")
else:
start_str = start.strftime("%Y-%m-%d %H:%M")
end_str = end.strftime("%H:%M") # End time only if same day
if end.date() != start.date():
end_str = end.strftime("%Y-%m-%d %H:%M")
args = ["new"]
if calendar:
args.extend(["-a", calendar])
if location:
args.extend(["-l", location])
args.extend([start_str, end_str, title])
if description:
args.extend(["::", description])
result = self._run_khal(args)
if result.returncode != 0:
raise RuntimeError(f"Failed to create event: {result.stderr}")
# Return a constructed event (khal doesn't return the created event)
return Event(
uid=f"new_{title}_{start.isoformat()}",
title=title,
start=start,
end=end,
location=location or "",
description=description or "",
calendar=calendar or "",
all_day=all_day,
)
def delete_event(self, uid: str) -> bool:
"""Delete an event.
Args:
uid: Event unique identifier
Returns:
True if deleted successfully
"""
# khal edit with --delete flag
# This is tricky because khal edit is interactive
# We might need to use khal's Python API directly for this
logger.warning("delete_event not fully implemented for khal CLI")
return False
def update_event(
self,
uid: str,
title: Optional[str] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
location: Optional[str] = None,
description: Optional[str] = None,
) -> Optional[Event]:
"""Update an existing event.
Args:
uid: Event unique identifier
title: New title (if provided)
start: New start time (if provided)
end: New end time (if provided)
location: New location (if provided)
description: New description (if provided)
Returns:
Updated event if successful, None otherwise
"""
# khal edit is interactive, so this is limited via CLI
logger.warning("update_event not fully implemented for khal CLI")
return None

View File

@@ -5,10 +5,11 @@ Mail operations for Microsoft Graph API.
import os
import re
import glob
import json
import asyncio
from email.parser import Parser
from email.utils import getaddresses
from typing import List, Dict, Any
from typing import List, Dict, Any, Set
from .client import (
fetch_with_aiohttp,
@@ -27,6 +28,7 @@ async def fetch_mail_async(
task_id,
dry_run=False,
download_attachments=False,
is_cancelled=None,
):
"""
Fetch mail from Microsoft Graph API and save to Maildir.
@@ -39,6 +41,7 @@ async def fetch_mail_async(
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
download_attachments (bool): If True, download email attachments.
is_cancelled (callable, optional): Callback that returns True if task should stop.
Returns:
None
@@ -105,8 +108,14 @@ async def fetch_mail_async(
# Update progress to reflect only the messages we actually need to download
progress.update(task_id, total=len(messages_to_download), completed=0)
downloaded_count = 0
for message in messages_to_download:
# Check if task was cancelled/disabled
if is_cancelled and is_cancelled():
progress.console.print("Task cancelled, stopping inbox fetch")
break
progress.console.print(
f"Processing message: {message.get('subject', 'No Subject')}", end="\r"
)
@@ -120,44 +129,92 @@ async def fetch_mail_async(
download_attachments,
)
progress.update(task_id, advance=1)
progress.update(task_id, completed=len(messages_to_download))
progress.console.print(
f"\nFinished downloading {len(messages_to_download)} new messages."
)
downloaded_count += 1
progress.update(task_id, completed=downloaded_count)
progress.console.print(f"\nFinished downloading {downloaded_count} new messages.")
progress.console.print(
f"Total messages on server: {len(messages)}, Already local: {len(local_msg_ids)}"
)
async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=False):
def _get_archive_sync_state_path(maildir_path: str) -> str:
"""Get the path to the archive sync state file."""
return os.path.join(maildir_path, ".Archive", ".sync_state.json")
def _load_archive_sync_state(maildir_path: str) -> Set[str]:
"""Load the set of message IDs that have been synced to server."""
state_path = _get_archive_sync_state_path(maildir_path)
if os.path.exists(state_path):
try:
with open(state_path, "r") as f:
data = json.load(f)
return set(data.get("synced_to_server", []))
except Exception:
pass
return set()
def _save_archive_sync_state(maildir_path: str, synced_ids: Set[str]) -> None:
"""Save the set of message IDs that have been synced to server."""
state_path = _get_archive_sync_state_path(maildir_path)
os.makedirs(os.path.dirname(state_path), exist_ok=True)
with open(state_path, "w") as f:
json.dump({"synced_to_server": list(synced_ids)}, f, indent=2)
async def archive_mail_async(
maildir_path, headers, progress, task_id, dry_run=False, is_cancelled=None
):
"""
Archive mail from Maildir to Microsoft Graph API archive folder using batch operations.
Messages are moved to the server's Archive folder, but local copies are kept.
A sync state file tracks which messages have already been synced to avoid
re-processing them on subsequent runs.
Args:
maildir_path (str): Path to the Maildir.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
is_cancelled (callable, optional): Callback that returns True if task should stop.
Returns:
None
"""
# Check both possible archive folder names locally
# Load already-synced message IDs
synced_ids = _load_archive_sync_state(maildir_path)
# Check both possible archive folder names locally (prefer .Archive)
archive_files = []
for archive_folder_name in [".Archives", ".Archive"]:
for archive_folder_name in [".Archive", ".Archives"]:
archive_dir = os.path.join(maildir_path, archive_folder_name)
if os.path.exists(archive_dir):
archive_files.extend(
glob.glob(os.path.join(archive_dir, "**", "*.eml*"), recursive=True)
)
if not archive_files:
# Filter out already-synced messages
files_to_sync = []
for filepath in archive_files:
message_id = os.path.basename(filepath).split(".")[0]
if message_id not in synced_ids:
files_to_sync.append(filepath)
if not files_to_sync:
progress.update(task_id, total=0, completed=0)
progress.console.print("No messages to archive")
progress.console.print(
f"No new messages to archive ({len(archive_files)} already synced)"
)
return
progress.update(task_id, total=len(archive_files))
progress.update(task_id, total=len(files_to_sync))
progress.console.print(
f"Found {len(files_to_sync)} new messages to sync to server Archive"
)
# Get archive folder ID from server
folder_response = await fetch_with_aiohttp(
@@ -179,9 +236,15 @@ async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=F
# Process files in batches of 20 (Microsoft Graph batch limit)
batch_size = 20
successful_moves = []
newly_synced_ids: Set[str] = set()
for i in range(0, len(archive_files), batch_size):
batch_files = archive_files[i : i + batch_size]
for i in range(0, len(files_to_sync), batch_size):
# Check if task was cancelled/disabled
if is_cancelled and is_cancelled():
progress.console.print("Task cancelled, stopping archive sync")
break
batch_files = files_to_sync[i : i + batch_size]
# Add small delay between batches to respect API limits
if i > 0:
@@ -216,23 +279,22 @@ async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=F
status = response["status"]
if status == 201: # 201 Created indicates successful move
os.remove(
filepath
) # Remove the local file since it's now archived on server
# Keep local file, just mark as synced
newly_synced_ids.add(message_id)
successful_moves.append(message_id)
progress.console.print(
f"Moved message to 'Archive': {message_id}"
f"Moved message to server Archive: {message_id}"
)
elif status == 404:
os.remove(
filepath
) # Remove the file from local archive if not found on server
# Message not in Inbox (maybe already archived or deleted on server)
# Mark as synced so we don't retry, but keep local copy
newly_synced_ids.add(message_id)
progress.console.print(
f"Message not found on server, removed local copy: {message_id}"
f"Message not in Inbox (already archived?): {message_id}"
)
else:
progress.console.print(
f"Failed to move message to 'Archive': {message_id}, status: {status}"
f"Failed to move message to Archive: {message_id}, status: {status}"
)
except Exception as e:
@@ -247,19 +309,19 @@ async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=F
{"destinationId": archive_folder_id},
)
if status == 201:
os.remove(filepath)
newly_synced_ids.add(message_id)
successful_moves.append(message_id)
progress.console.print(
f"Moved message to 'Archive' (fallback): {message_id}"
f"Moved message to server Archive (fallback): {message_id}"
)
elif status == 404:
os.remove(filepath)
newly_synced_ids.add(message_id)
progress.console.print(
f"Message not found on server, removed local copy: {message_id}"
f"Message not in Inbox (already archived?): {message_id}"
)
else:
progress.console.print(
f"Failed to move message to 'Archive': {message_id}, status: {status}"
f"Failed to move message to Archive: {message_id}, status: {status}"
)
except Exception as individual_error:
progress.console.print(
@@ -270,18 +332,184 @@ async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=F
for filepath in batch_files:
message_id = os.path.basename(filepath).split(".")[0]
progress.console.print(
f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}"
f"[DRY-RUN] Would move message to server Archive: {message_id}"
)
progress.advance(task_id, len(batch_files))
if not dry_run:
# Save sync state after each batch for resilience
if not dry_run and newly_synced_ids:
synced_ids.update(newly_synced_ids)
_save_archive_sync_state(maildir_path, synced_ids)
# Final summary
if not dry_run and successful_moves:
progress.console.print(
f"Successfully archived {len(successful_moves)} messages in batches"
f"Successfully synced {len(successful_moves)} messages to server Archive (kept local copies)"
)
return
async def fetch_archive_mail_async(
maildir_path,
attachments_dir,
headers,
progress,
task_id,
dry_run=False,
download_attachments=False,
max_messages=None,
is_cancelled=None,
):
"""
Fetch archived mail from Microsoft Graph API Archive folder and save to local .Archive Maildir.
Args:
maildir_path (str): Path to the Maildir.
attachments_dir (str): Path to save attachments.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
download_attachments (bool): If True, download email attachments.
max_messages (int, optional): Maximum number of messages to fetch. None = all.
is_cancelled (callable, optional): Callback that returns True if task should stop.
Returns:
None
"""
from src.utils.mail_utils.maildir import save_mime_to_maildir_async
# Use the well-known 'archive' folder name
mail_url = "https://graph.microsoft.com/v1.0/me/mailFolders/archive/messages?$top=100&$orderby=receivedDateTime desc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead"
messages = []
# Fetch the total count of messages in the archive
archive_info_url = "https://graph.microsoft.com/v1.0/me/mailFolders/archive"
try:
response = await fetch_with_aiohttp(archive_info_url, headers)
total_messages = response.get("totalItemCount", 0) if response else 0
except Exception as e:
progress.console.print(f"Error fetching archive folder info: {e}")
total_messages = 0
# Apply max_messages limit if specified
effective_total = (
min(total_messages, max_messages) if max_messages else total_messages
)
progress.update(task_id, total=effective_total)
progress.console.print(
f"Archive folder has {total_messages} messages"
+ (f", fetching up to {max_messages}" if max_messages else "")
)
# Fetch messages from archive
fetched_count = 0
while mail_url:
try:
response_data = await fetch_with_aiohttp(mail_url, headers)
except Exception as e:
progress.console.print(f"Error fetching archive messages: {e}")
break
batch = response_data.get("value", []) if response_data else []
# Apply max_messages limit
if max_messages and fetched_count + len(batch) > max_messages:
batch = batch[: max_messages - fetched_count]
messages.extend(batch)
fetched_count += len(batch)
break
messages.extend(batch)
fetched_count += len(batch)
progress.advance(task_id, len(batch))
# Get the next page URL from @odata.nextLink
mail_url = response_data.get("@odata.nextLink") if response_data else None
# Set up local archive directory paths
archive_dir = os.path.join(maildir_path, ".Archive")
cur_dir = os.path.join(archive_dir, "cur")
new_dir = os.path.join(archive_dir, "new")
# Ensure directories exist
os.makedirs(cur_dir, exist_ok=True)
os.makedirs(new_dir, exist_ok=True)
os.makedirs(os.path.join(archive_dir, "tmp"), exist_ok=True)
# Get local message IDs in archive
cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*")))
new_files = set(glob.glob(os.path.join(new_dir, "*.eml*")))
local_msg_ids = set()
for filename in set.union(cur_files, new_files):
message_id = os.path.basename(filename).split(".")[0]
local_msg_ids.add(message_id)
# Filter messages to only include those not already local
messages_to_download = [msg for msg in messages if msg["id"] not in local_msg_ids]
progress.console.print(
f"Found {len(messages)} messages on server Archive, {len(local_msg_ids)} already local"
)
progress.console.print(
f"Downloading {len(messages_to_download)} new archived messages"
)
# Update progress to reflect only the messages we actually need to download
progress.update(task_id, total=len(messages_to_download), completed=0)
# Load sync state once, we'll update it incrementally
synced_ids = _load_archive_sync_state(maildir_path) if not dry_run else set()
downloaded_count = 0
for message in messages_to_download:
# Check if task was cancelled/disabled
if is_cancelled and is_cancelled():
progress.console.print("Task cancelled, stopping archive fetch")
break
progress.console.print(
f"Processing archived message: {message.get('subject', 'No Subject')[:50]}",
end="\r",
)
# Save to .Archive folder instead of main maildir
await save_mime_to_maildir_async(
archive_dir, # Use archive_dir instead of maildir_path
message,
attachments_dir,
headers,
progress,
dry_run,
download_attachments,
)
progress.update(task_id, advance=1)
downloaded_count += 1
# Update sync state after each message for resilience
# This ensures we don't try to re-upload this message in archive_mail_async
if not dry_run:
synced_ids.add(message["id"])
_save_archive_sync_state(maildir_path, synced_ids)
progress.update(task_id, completed=downloaded_count)
progress.console.print(
f"\nFinished downloading {downloaded_count} archived messages."
)
progress.console.print(
f"Total in server Archive: {total_messages}, Already local: {len(local_msg_ids)}"
)
# Also add any messages we already had locally (from the full server list)
# to ensure they're marked as synced
if not dry_run and messages:
for msg in messages:
synced_ids.add(msg["id"])
_save_archive_sync_state(maildir_path, synced_ids)
async def delete_mail_async(maildir_path, headers, progress, task_id, dry_run=False):
"""
Delete mail from Maildir and Microsoft Graph API using batch operations.

View File

@@ -10,12 +10,14 @@ from typing import Optional
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import ScrollableContainer
from textual.containers import ScrollableContainer, Vertical, Horizontal
from textual.logging import TextualHandler
from textual.widgets import DataTable, Footer, Header, Static, Markdown
from .config import get_config, TasksAppConfig
from .backend import Task, TaskBackend, TaskPriority, TaskStatus, Project
from .widgets.FilterSidebar import FilterSidebar
from src.utils.shared_config import get_theme_name
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -45,69 +47,119 @@ class TasksApp(App):
CSS = """
Screen {
layout: grid;
grid-size: 1;
grid-size: 2;
grid-columns: auto 1fr;
grid-rows: auto 1fr auto auto;
}
Header {
column-span: 2;
}
Footer {
column-span: 2;
}
#sidebar {
width: 28;
height: 100%;
row-span: 1;
}
#sidebar.hidden {
display: none;
}
#main-area {
height: 100%;
width: 1fr;
}
#task-table {
height: 100%;
}
DataTable > .datatable--cursor {
background: $accent;
color: $text;
}
.priority-p0 {
color: red;
}
.priority-p1 {
color: orange;
}
.priority-p2 {
color: yellow;
}
.priority-p3 {
color: gray;
}
.overdue {
color: red;
text-style: bold;
}
.status-active {
color: cyan;
text-style: bold;
}
#status-bar {
dock: bottom;
height: 1;
background: $surface;
color: $text-muted;
padding: 0 1;
column-span: 2;
}
#detail-pane {
dock: bottom;
height: 50%;
border-top: solid $primary;
background: $surface;
column-span: 2;
}
#detail-pane.hidden {
display: none;
}
#task-details {
height: auto;
max-height: 8;
padding: 1;
border-bottom: solid $primary-darken-2;
}
#notes-container {
height: 1fr;
padding: 1;
}
#notes-content {
height: 100%;
width: 100%;
}
#notes-pane {
dock: bottom;
height: 50%;
border-top: solid $primary;
padding: 1;
background: $surface;
column-span: 2;
}
#notes-pane.hidden {
display: none;
}
#notes-content {
height: 100%;
width: 100%;
}
"""
BINDINGS = [
@@ -124,9 +176,7 @@ class TasksApp(App):
Binding("n", "toggle_notes", "Notes", show=True),
Binding("N", "edit_notes", "Edit Notes", show=False),
Binding("x", "delete_task", "Delete", show=False),
Binding("p", "filter_project", "Project", show=True),
Binding("t", "filter_tag", "Tag", show=True),
Binding("o", "sort_tasks", "Sort", show=True),
Binding("w", "toggle_sidebar", "Filters", show=True),
Binding("c", "clear_filters", "Clear", show=True),
Binding("r", "refresh", "Refresh", show=True),
Binding("y", "sync", "Sync", show=True),
@@ -143,6 +193,8 @@ class TasksApp(App):
current_sort_column: str
current_sort_ascending: bool
notes_visible: bool
detail_visible: bool
sidebar_visible: bool
backend: Optional[TaskBackend]
config: Optional[TasksAppConfig]
@@ -157,6 +209,8 @@ class TasksApp(App):
self.current_sort_column = "priority"
self.current_sort_ascending = True
self.notes_visible = False
self.detail_visible = False
self.sidebar_visible = True # Start with sidebar visible
self.config = get_config()
if backend:
@@ -170,9 +224,22 @@ class TasksApp(App):
def compose(self) -> ComposeResult:
"""Create the app layout."""
yield Header()
yield DataTable(id="task-table", cursor_type="row")
yield FilterSidebar(id="sidebar")
yield Vertical(
DataTable(id="task-table", cursor_type="row"),
id="main-area",
)
yield Vertical(
Static("", id="task-details"),
ScrollableContainer(
Markdown("*No notes*", id="notes-content"),
id="notes-container",
),
id="detail-pane",
classes="hidden",
)
yield ScrollableContainer(
Markdown("*No task selected*", id="notes-content"),
Markdown("*No task selected*", id="notes-only-content"),
id="notes-pane",
classes="hidden",
)
@@ -181,22 +248,17 @@ class TasksApp(App):
def on_mount(self) -> None:
"""Initialize the app on mount."""
self.theme = get_theme_name()
table = self.query_one("#task-table", DataTable)
# Setup columns based on config
# Setup columns based on config with dynamic widths
columns = (
self.config.display.columns
if self.config
else ["id", "priority", "project", "tags", "summary", "due"]
else ["id", "priority", "summary", "due", "project", "tags"]
)
for col in columns:
width = None
if self.config and col in self.config.display.column_widths:
w = self.config.display.column_widths[col]
if w > 0:
width = w
table.add_column(col.capitalize(), width=width, key=col)
self._setup_columns(table, columns)
# Set notes pane height from config
if self.config:
@@ -206,9 +268,54 @@ class TasksApp(App):
height = max(10, min(90, height))
notes_pane.styles.height = f"{height}%"
# Load tasks
# Load tasks (this will also update the sidebar)
self.load_tasks()
def _setup_columns(self, table: DataTable, columns: list[str]) -> None:
"""Setup table columns with dynamic widths based on available space."""
# Minimum widths for each column type
min_widths = {
"id": 3,
"priority": 5,
"project": 8,
"tags": 8,
"summary": 20,
"due": 10,
"status": 8,
}
# Preferred widths (used when space allows)
preferred_widths = {
"id": 3,
"priority": 5,
"project": 12,
"tags": 12,
"summary": 0, # 0 means take remaining space
"due": 10,
"status": 10,
}
# Calculate available width (approximate, will be refined on resize)
# Use config widths if available, otherwise use preferred
for col in columns:
if self.config and col in self.config.display.column_widths:
config_width = self.config.display.column_widths[col]
if config_width > 0:
# Use config width but enforce minimum
width = max(config_width, min_widths.get(col, 4))
else:
# 0 means auto/flexible - let DataTable handle it
width = None
else:
# Use preferred width
pref = preferred_widths.get(col, 10)
if pref == 0:
width = None
else:
width = max(pref, min_widths.get(col, 4))
table.add_column(col.capitalize(), width=width, key=col)
def _format_priority(self, priority: TaskPriority) -> str:
"""Format priority with icon."""
if not self.config:
@@ -288,9 +395,22 @@ class TasksApp(App):
self.projects = self.backend.get_projects()
self.tags = self.backend.get_tags()
# Update sidebar with available filters
self._update_sidebar()
# Update table
self._update_table()
def _update_sidebar(self) -> None:
"""Update the filter sidebar with current projects and tags."""
try:
sidebar = self.query_one("#sidebar", FilterSidebar)
# Convert projects to (name, count) tuples
project_data = [(p.name, p.task_count) for p in self.projects if p.name]
sidebar.update_filters(projects=project_data, tags=self.tags)
except Exception:
pass # Sidebar may not be mounted yet
def _sort_tasks(self) -> None:
"""Sort tasks based on current sort settings."""
@@ -462,82 +582,139 @@ class TasksApp(App):
)
def action_view_task(self) -> None:
"""View task details."""
"""Toggle task detail pane showing full details and notes."""
task = self._get_selected_task()
if not task:
return
# TODO: Push TaskDetail screen
self.notify(f"Task: {task.summary}\nNotes: {task.notes or 'None'}")
# Filter actions
def action_filter_project(self) -> None:
"""Open project filter dialog."""
from .screens.FilterScreens import ProjectFilterScreen
if not self.projects:
self.notify("No projects found", severity="warning")
self.notify("No task selected", severity="warning")
return
project_data = [(p.name, p.task_count) for p in self.projects if p.name]
detail_pane = self.query_one("#detail-pane")
def handle_project_selection(project: str | None) -> None:
if project != self.current_project_filter:
self.current_project_filter = project
self.load_tasks()
if project:
self.notify(f"Filtering by project: {project}")
else:
self.notify("Project filter cleared")
# Toggle visibility
self.detail_visible = not self.detail_visible
self.push_screen(
ProjectFilterScreen(project_data, self.current_project_filter),
handle_project_selection,
if self.detail_visible:
# Hide notes-only pane if visible
if self.notes_visible:
self.query_one("#notes-pane").add_class("hidden")
self.notes_visible = False
detail_pane.remove_class("hidden")
self._update_detail_display(task)
else:
detail_pane.add_class("hidden")
def _update_detail_display(self, task: Task) -> None:
"""Update the detail pane with task information."""
details_widget = self.query_one("#task-details", Static)
notes_widget = self.query_one("#notes-content", Markdown)
# Format task details
lines = []
# Title/Summary
lines.append(f"[bold]{task.summary}[/bold]")
lines.append("")
# Priority and Status
priority_colors = {"P0": "red", "P1": "orange", "P2": "yellow", "P3": "dim"}
p_color = priority_colors.get(task.priority.value, "white")
lines.append(
f"[dim]Priority:[/dim] [{p_color}]{task.priority.value}[/{p_color}] [dim]Status:[/dim] {task.status.value}"
)
def action_filter_tag(self) -> None:
"""Open tag filter dialog."""
from .screens.FilterScreens import TagFilterScreen
# Due date
if task.due:
date_format = self.config.display.date_format if self.config else "%Y-%m-%d"
due_str = task.due.strftime(date_format)
if task.is_overdue:
lines.append(
f"[dim]Due:[/dim] [red bold]{due_str} (OVERDUE)[/red bold]"
)
else:
lines.append(f"[dim]Due:[/dim] {due_str}")
if not self.tags:
self.notify("No tags found", severity="warning")
return
# Project
if task.project:
lines.append(f"[dim]Project:[/dim] {task.project}")
def handle_tag_selection(tags: list[str]) -> None:
if tags != self.current_tag_filters:
self.current_tag_filters = tags
self.load_tasks()
if tags:
self.notify(f"Filtering by tags: {', '.join(tags)}")
else:
self.notify("Tag filters cleared")
# Tags
if task.tags:
tags_str = " ".join(f"+{t}" for t in task.tags)
lines.append(f"[dim]Tags:[/dim] {tags_str}")
self.push_screen(
TagFilterScreen(self.tags, self.current_tag_filters),
handle_tag_selection,
)
# Created date
if task.created:
created_str = task.created.strftime("%Y-%m-%d %H:%M")
lines.append(f"[dim]Created:[/dim] {created_str}")
def action_sort_tasks(self) -> None:
"""Open sort dialog."""
from .screens.FilterScreens import SortScreen, SortConfig
# UUID (for reference)
lines.append(f"[dim]UUID:[/dim] {task.uuid[:8]}...")
def handle_sort_selection(config: SortConfig | None) -> None:
if config is not None:
self.current_sort_column = config.column
self.current_sort_ascending = config.ascending
self._sort_tasks()
self._update_table()
direction = "asc" if config.ascending else "desc"
self.notify(f"Sorted by {config.column} ({direction})")
details_widget.update("\n".join(lines))
self.push_screen(
SortScreen(self.current_sort_column, self.current_sort_ascending),
handle_sort_selection,
)
# Update notes
if task.notes:
notes_widget.update(task.notes)
else:
notes_widget.update("*No notes for this task*")
# Sidebar actions and handlers
def action_toggle_sidebar(self) -> None:
"""Toggle the filter sidebar visibility."""
sidebar = self.query_one("#sidebar", FilterSidebar)
self.sidebar_visible = not self.sidebar_visible
if self.sidebar_visible:
sidebar.remove_class("hidden")
else:
sidebar.add_class("hidden")
def on_filter_sidebar_project_filter_changed(
self, event: FilterSidebar.ProjectFilterChanged
) -> None:
"""Handle project filter changes from sidebar."""
if event.project != self.current_project_filter:
self.current_project_filter = event.project
self.load_tasks()
if event.project:
self.notify(f"Filtering by project: {event.project}")
else:
self.notify("Project filter cleared")
def on_filter_sidebar_tag_filter_changed(
self, event: FilterSidebar.TagFilterChanged
) -> None:
"""Handle tag filter changes from sidebar."""
if event.tags != self.current_tag_filters:
self.current_tag_filters = event.tags
self.load_tasks()
if event.tags:
self.notify(f"Filtering by tags: {', '.join(event.tags)}")
else:
self.notify("Tag filters cleared")
def on_filter_sidebar_sort_changed(self, event: FilterSidebar.SortChanged) -> None:
"""Handle sort changes from sidebar."""
self.current_sort_column = event.column
self.current_sort_ascending = event.ascending
self._sort_tasks()
self._update_table()
direction = "asc" if event.ascending else "desc"
self.notify(f"Sorted by {event.column} ({direction})")
def action_clear_filters(self) -> None:
"""Clear all filters."""
self.current_project_filter = None
self.current_tag_filters = []
# Also clear sidebar selections
try:
sidebar = self.query_one("#sidebar", FilterSidebar)
sidebar.clear_all_filters()
except Exception:
pass
self.load_tasks()
self.notify("Filters cleared", severity="information")
@@ -571,10 +748,8 @@ Keybindings:
n - Toggle notes pane
N - Edit notes
x - Delete task
p - Filter by project
t - Filter by tag
o - Sort tasks
c - Clear filters
w - Toggle filter sidebar
c - Clear all filters
r - Refresh
y - Sync with remote
Enter - View task details
@@ -584,13 +759,18 @@ Keybindings:
# Notes actions
def action_toggle_notes(self) -> None:
"""Toggle the notes pane visibility."""
"""Toggle the notes-only pane visibility."""
notes_pane = self.query_one("#notes-pane")
self.notes_visible = not self.notes_visible
if self.notes_visible:
# Hide detail pane if visible
if self.detail_visible:
self.query_one("#detail-pane").add_class("hidden")
self.detail_visible = False
notes_pane.remove_class("hidden")
self._update_notes_display()
self._update_notes_only_display()
else:
notes_pane.add_class("hidden")
@@ -617,7 +797,11 @@ Keybindings:
# Reload task to get updated notes
self.load_tasks()
if self.notes_visible:
self._update_notes_display()
self._update_notes_only_display()
if self.detail_visible:
task = self._get_selected_task()
if task:
self._update_detail_display(task)
def _edit_notes_builtin(self, task: Task) -> None:
"""Edit notes using built-in TextArea widget."""
@@ -629,7 +813,11 @@ Keybindings:
self.backend.modify_task(str(task.id), notes=new_notes)
self.load_tasks()
if self.notes_visible:
self._update_notes_display()
self._update_notes_only_display()
if self.detail_visible:
updated_task = self._get_selected_task()
if updated_task:
self._update_detail_display(updated_task)
self.notify("Notes saved", severity="information")
self.push_screen(
@@ -637,10 +825,10 @@ Keybindings:
handle_notes_save,
)
def _update_notes_display(self) -> None:
"""Update the notes pane with the selected task's notes."""
def _update_notes_only_display(self) -> None:
"""Update the notes-only pane with the selected task's notes."""
task = self._get_selected_task()
notes_widget = self.query_one("#notes-content", Markdown)
notes_widget = self.query_one("#notes-only-content", Markdown)
if task:
if task.notes:
@@ -651,9 +839,13 @@ Keybindings:
notes_widget.update("*No task selected*")
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
"""Handle row highlight changes to update notes display."""
"""Handle row highlight changes to update pane displays."""
if self.notes_visible:
self._update_notes_display()
self._update_notes_only_display()
if self.detail_visible:
task = self._get_selected_task()
if task:
self._update_detail_display(task)
def run_app(backend: Optional[TaskBackend] = None) -> None:

View File

@@ -32,17 +32,17 @@ class DisplayConfig(BaseModel):
# Columns to show in the task table
# Available: id, priority, project, tags, summary, due, status
columns: list[str] = Field(
default_factory=lambda: ["id", "priority", "project", "tags", "summary", "due"]
default_factory=lambda: ["id", "priority", "summary", "due", "project", "tags"]
)
# Column widths (0 = auto)
# Column widths (0 = auto/flexible, takes remaining space)
column_widths: dict[str, int] = Field(
default_factory=lambda: {
"id": 4,
"priority": 3,
"project": 15,
"tags": 15,
"summary": 0, # auto-expand
"id": 3,
"priority": 5,
"project": 12,
"tags": 12,
"summary": 0, # auto-expand to fill remaining space
"due": 10,
"status": 8,
}

View File

@@ -0,0 +1,374 @@
"""Filter sidebar widget for Tasks TUI.
A collapsible sidebar containing project filter, tag filter, and sort options.
Changes are applied immediately when selections change.
Uses bordered list containers similar to the mail app sidebar.
"""
from typing import Optional
from textual import on
from textual.app import ComposeResult
from textual.containers import Vertical, ScrollableContainer
from textual.message import Message
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import Label, SelectionList, RadioButton, RadioSet, Static
from textual.widgets.selection_list import Selection
class FilterSidebar(Widget):
"""Collapsible sidebar with project filter, tag filter, and sort options."""
DEFAULT_CSS = """
FilterSidebar {
width: 30;
height: 100%;
background: $surface;
}
FilterSidebar.hidden {
display: none;
}
FilterSidebar #sidebar-scroll {
height: 100%;
width: 100%;
scrollbar-size: 1 1;
}
/* Bordered list containers like mail app */
FilterSidebar .filter-list {
height: auto;
max-height: 8;
min-height: 3;
border: round rgb(117, 106, 129);
margin: 0 0 1 0;
scrollbar-size: 1 1;
}
FilterSidebar .filter-list:focus {
border: round $secondary;
background: rgb(55, 53, 57);
border-title-style: bold;
}
FilterSidebar .sort-section {
height: auto;
border: round rgb(117, 106, 129);
margin: 0 0 1 0;
padding: 0 1;
}
FilterSidebar .sort-section:focus-within {
border: round $secondary;
background: rgb(55, 53, 57);
border-title-style: bold;
}
FilterSidebar SelectionList {
height: auto;
max-height: 8;
background: transparent;
border: none;
padding: 0;
}
FilterSidebar RadioSet {
height: auto;
background: transparent;
border: none;
padding: 0;
width: 100%;
}
FilterSidebar RadioButton {
height: 1;
background: transparent;
padding: 0;
}
FilterSidebar .direction-label {
margin-top: 1;
color: $text-muted;
height: 1;
}
"""
# Messages for filter/sort changes
class ProjectFilterChanged(Message):
"""Sent when project filter selection changes."""
def __init__(self, project: Optional[str]) -> None:
self.project = project
super().__init__()
class TagFilterChanged(Message):
"""Sent when tag filter selection changes."""
def __init__(self, tags: list[str]) -> None:
self.tags = tags
super().__init__()
class SortChanged(Message):
"""Sent when sort settings change."""
def __init__(self, column: str, ascending: bool) -> None:
self.column = column
self.ascending = ascending
super().__init__()
# Available sort columns
SORT_COLUMNS = [
("priority", "Priority"),
("project", "Project"),
("summary", "Summary"),
("due", "Due Date"),
("status", "Status"),
]
# Reactive properties - use factory functions for mutable defaults
projects: reactive[list[tuple[str, int]]] = reactive(list)
tags: reactive[list[str]] = reactive(list)
current_project: reactive[Optional[str]] = reactive(None)
current_tags: reactive[list[str]] = reactive(list)
current_sort_column: reactive[str] = reactive("priority")
current_sort_ascending: reactive[bool] = reactive(True)
def __init__(
self,
projects: Optional[list[tuple[str, int]]] = None,
tags: Optional[list[str]] = None,
current_project: Optional[str] = None,
current_tags: Optional[list[str]] = None,
current_sort_column: str = "priority",
current_sort_ascending: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.projects = projects or []
self.tags = tags or []
self.current_project = current_project
self.current_tags = current_tags or []
self.current_sort_column = current_sort_column
self.current_sort_ascending = current_sort_ascending
def compose(self) -> ComposeResult:
with ScrollableContainer(id="sidebar-scroll"):
# Project filter section - bordered list
yield SelectionList[str](id="project-list", classes="filter-list")
# Tag filter section - bordered list
yield SelectionList[str](id="tag-list", classes="filter-list")
# Sort section - bordered container
with Vertical(id="sort-section", classes="sort-section"):
with RadioSet(id="sort-column-set"):
for key, display in self.SORT_COLUMNS:
yield RadioButton(
display,
value=key == self.current_sort_column,
id=f"sort-{key}",
)
yield Label("Direction", classes="direction-label")
with RadioSet(id="sort-direction-set"):
yield RadioButton(
"Ascending",
value=self.current_sort_ascending,
id="sort-asc",
)
yield RadioButton(
"Descending",
value=not self.current_sort_ascending,
id="sort-desc",
)
def on_mount(self) -> None:
"""Initialize the sidebar with current filter state and set border titles."""
# Set border titles like mail app
project_list = self.query_one("#project-list", SelectionList)
project_list.border_title = "Projects"
tag_list = self.query_one("#tag-list", SelectionList)
tag_list.border_title = "Tags"
sort_section = self.query_one("#sort-section")
sort_section.border_title = "Sort"
# Update the lists
self._update_project_list()
self._update_tag_list()
self._update_subtitles()
def _update_subtitles(self) -> None:
"""Update border subtitles to show selection counts."""
project_list = self.query_one("#project-list", SelectionList)
if self.current_project:
project_list.border_subtitle = f"[b]{self.current_project}[/b]"
else:
project_list.border_subtitle = f"{len(self.projects)} available"
tag_list = self.query_one("#tag-list", SelectionList)
if self.current_tags:
tag_list.border_subtitle = f"[b]{len(self.current_tags)} selected[/b]"
else:
tag_list.border_subtitle = f"{len(self.tags)} available"
sort_section = self.query_one("#sort-section")
direction = "" if self.current_sort_ascending else ""
# Get display name for current column
col_display = next(
(d for k, d in self.SORT_COLUMNS if k == self.current_sort_column),
self.current_sort_column,
)
sort_section.border_subtitle = f"{col_display} {direction}"
def _update_project_list(self) -> None:
"""Update the project selection list."""
project_list = self.query_one("#project-list", SelectionList)
project_list.clear_options()
for name, count in self.projects:
project_list.add_option(
Selection(
f"{name} ({count})",
name,
initial_state=name == self.current_project,
)
)
def _update_tag_list(self) -> None:
"""Update the tag selection list."""
tag_list = self.query_one("#tag-list", SelectionList)
tag_list.clear_options()
for tag in self.tags:
tag_list.add_option(
Selection(
f"+{tag}",
tag,
initial_state=tag in self.current_tags,
)
)
def update_filters(
self,
projects: Optional[list[tuple[str, int]]] = None,
tags: Optional[list[str]] = None,
) -> None:
"""Update available projects and tags."""
if projects is not None:
self.projects = projects
self._update_project_list()
if tags is not None:
self.tags = tags
self._update_tag_list()
self._update_subtitles()
def set_current_project(self, project: Optional[str]) -> None:
"""Set the current project filter (updates UI to match)."""
self.current_project = project
self._update_project_list()
self._update_subtitles()
def set_current_tags(self, tags: list[str]) -> None:
"""Set the current tag filters (updates UI to match)."""
self.current_tags = tags
self._update_tag_list()
self._update_subtitles()
def set_sort_settings(self, column: str, ascending: bool) -> None:
"""Set the current sort settings (updates UI to match)."""
self.current_sort_column = column
self.current_sort_ascending = ascending
# Update radio buttons
column_set = self.query_one("#sort-column-set", RadioSet)
for button in column_set.query(RadioButton):
if button.id == f"sort-{column}":
button.value = True
direction_set = self.query_one("#sort-direction-set", RadioSet)
asc_btn = direction_set.query_one("#sort-asc", RadioButton)
desc_btn = direction_set.query_one("#sort-desc", RadioButton)
asc_btn.value = ascending
desc_btn.value = not ascending
self._update_subtitles()
@on(SelectionList.SelectedChanged, "#project-list")
def _on_project_selection_changed(
self, event: SelectionList.SelectedChanged
) -> None:
"""Handle project selection changes."""
selected = list(event.selection_list.selected)
# For project, we only allow single selection
if selected:
new_project = selected[0]
# If same project clicked again, deselect it
if new_project == self.current_project:
self.current_project = None
event.selection_list.deselect(new_project)
else:
# Deselect previous if any
if self.current_project:
event.selection_list.deselect(self.current_project)
self.current_project = new_project
else:
self.current_project = None
self._update_subtitles()
self.post_message(self.ProjectFilterChanged(self.current_project))
@on(SelectionList.SelectedChanged, "#tag-list")
def _on_tag_selection_changed(self, event: SelectionList.SelectedChanged) -> None:
"""Handle tag selection changes."""
selected = list(event.selection_list.selected)
self.current_tags = selected
self._update_subtitles()
self.post_message(self.TagFilterChanged(self.current_tags))
@on(RadioSet.Changed, "#sort-column-set")
def _on_sort_column_changed(self, event: RadioSet.Changed) -> None:
"""Handle sort column changes."""
if event.pressed and event.pressed.id:
column = event.pressed.id.replace("sort-", "")
if column in [c[0] for c in self.SORT_COLUMNS]:
self.current_sort_column = column
self._update_subtitles()
self.post_message(
self.SortChanged(
self.current_sort_column, self.current_sort_ascending
)
)
@on(RadioSet.Changed, "#sort-direction-set")
def _on_sort_direction_changed(self, event: RadioSet.Changed) -> None:
"""Handle sort direction changes."""
if event.pressed and event.pressed.id:
self.current_sort_ascending = event.pressed.id == "sort-asc"
self._update_subtitles()
self.post_message(
self.SortChanged(self.current_sort_column, self.current_sort_ascending)
)
def clear_all_filters(self) -> None:
"""Clear all project and tag filters."""
# Clear project
project_list = self.query_one("#project-list", SelectionList)
if self.current_project:
project_list.deselect(self.current_project)
self.current_project = None
# Clear tags
tag_list = self.query_one("#tag-list", SelectionList)
for tag in self.current_tags:
tag_list.deselect(tag)
self.current_tags = []
self._update_subtitles()
# Notify app
self.post_message(self.ProjectFilterChanged(None))
self.post_message(self.TagFilterChanged([]))

View File

@@ -1,5 +1,6 @@
"""Widget components for Tasks TUI."""
from .AddTaskForm import AddTaskForm, TaskFormData
from .FilterSidebar import FilterSidebar
__all__ = ["AddTaskForm", "TaskFormData"]
__all__ = ["AddTaskForm", "TaskFormData", "FilterSidebar"]

114
src/utils/shared_config.py Normal file
View File

@@ -0,0 +1,114 @@
"""Shared configuration for all LUK TUI applications.
This module provides shared settings that apply across all apps,
such as theme configuration.
"""
import logging
import os
from pathlib import Path
from typing import Optional
try:
import toml
except ImportError:
toml = None # type: ignore
logger = logging.getLogger(__name__)
# Available Textual themes
AVAILABLE_THEMES = [
"textual-dark",
"textual-light",
"nord",
"gruvbox",
"catppuccin-mocha",
"dracula",
"monokai",
"solarized-light",
"tokyo-night",
]
# Default shared configuration
DEFAULT_SHARED_CONFIG = {
"theme": {
"name": "monokai", # Default Textual theme
},
}
def get_shared_config_path() -> Path:
"""Get the shared config file path."""
config_home = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
return Path(config_home) / "luk" / "shared.toml"
def load_shared_config() -> dict:
"""Load shared configuration from TOML file.
Returns merged config with defaults for any missing values.
"""
config = DEFAULT_SHARED_CONFIG.copy()
config_path = get_shared_config_path()
if config_path.exists() and toml is not None:
try:
user_config = toml.load(config_path)
# Deep merge user config into defaults
for section, values in user_config.items():
if section in config and isinstance(config[section], dict):
config[section].update(values)
else:
config[section] = values
logger.info(f"Loaded shared config from {config_path}")
except Exception as e:
logger.warning(f"Error loading shared config: {e}")
else:
logger.debug(f"No shared config at {config_path}, using defaults")
return config
def get_theme_name() -> str:
"""Get the configured theme name."""
config = load_shared_config()
return config.get("theme", {}).get("name", "monokai")
def create_default_shared_config() -> None:
"""Create a default shared.toml config file if it doesn't exist."""
config_path = get_shared_config_path()
if config_path.exists():
return
if toml is None:
logger.warning("toml module not available, cannot create config")
return
# Ensure parent directory exists
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w") as f:
toml.dump(DEFAULT_SHARED_CONFIG, f)
logger.info(f"Created default shared config at {config_path}")
# Global cached config
_shared_config: Optional[dict] = None
def get_shared_config() -> dict:
"""Get the cached shared config, loading if necessary."""
global _shared_config
if _shared_config is None:
_shared_config = load_shared_config()
return _shared_config
def reload_shared_config() -> dict:
"""Force reload of shared config from disk."""
global _shared_config
_shared_config = load_shared_config()
return _shared_config