Compare commits

...

11 Commits

Author SHA1 Message Date
Bendt
8be4b4785c Add live search panel with debounced Himalaya search and help modal 2025-12-19 14:31:21 -05:00
Bendt
0cd7cf6984 Implement mail search using Himalaya CLI with auto-select first result 2025-12-19 14:18:40 -05:00
Bendt
d3468f7395 Fix mail search crash when envelope fields are None 2025-12-19 14:09:10 -05:00
Bendt
b75c069035 Fix mini-calendar to respect week_start_day config setting 2025-12-19 13:00:42 -05:00
Bendt
3629757e70 Add context filter to Tasks TUI and fix calendar UI bugs
Tasks TUI:
- Add context support to TaskBackend interface (get_context, set_context,
  get_contexts methods)
- Implement context methods in DstaskClient
- Add Context section to FilterSidebar (above projects/tags)
- Context changes persist via backend CLI

Calendar TUI:
- Remove duplicate header from InvitesPanel (use border_title instead)
- Fix border_title color to use $primary
- Fix WeekGrid to always scroll to work day start (7am) on mount
2025-12-19 11:51:53 -05:00
Bendt
be2f67bb7b Fix TUI bugs: folder selection, filter stability, UI consistency
- Mail: Fix folder/account selector not triggering reload (use direct
  fetch instead of reactive reload_needed flag)
- Tasks: Store all_projects/all_tags on mount so filters don't change
  when filtering; add OR search for multiple tags
- Sync: Use rounded borders and border_title for sidebar/activity log
- Calendar: Remove padding from mini-calendar, add rounded border and
  border_title to invites panel
2025-12-19 11:24:15 -05:00
Bendt
25385c6482 Add search functionality to Mail TUI with / keybinding
- Add reusable SearchScreen modal and ClearableSearchInput widget
- Implement filter_by_query in MessageStore for client-side filtering
- Search matches subject, sender name/email, recipient name/email
- Press / to open search, Escape to clear search filter
- Shows search query in list subtitle when filter is active
2025-12-19 11:01:05 -05:00
Bendt
3c45e2a154 Add calendar invites panel to Calendar TUI sidebar
- Create InvitesPanel widget showing pending invites from Microsoft Graph
- Add fetch_pending_invites() and respond_to_invite() API functions
- Invites load asynchronously in background on app mount
- Display invite subject, date/time, and organizer
- Add 'i' keybinding to focus invites panel
- Style: tentative invites shown in warning color
2025-12-19 10:51:15 -05:00
Bendt
a82f001918 Add mini-calendar sidebar to Calendar TUI
- Add MonthCalendar widget as a collapsible sidebar (toggle with 's')
- Sidebar syncs with main week grid (week highlight, selected date)
- Click dates in sidebar to navigate week grid to that date
- Click month navigation arrows to change displayed month
- Add goto_date() method to WeekGrid for date navigation
2025-12-19 10:40:33 -05:00
Bendt
48d2455b9c Make TUI the default mode for luk sync command
- luk sync now launches interactive TUI dashboard by default
- Add --once flag for single sync (non-interactive)
- Add --daemon flag for background daemon mode
- Keep 'luk sync run' as legacy subcommand for backwards compatibility
- Move common options (org, vdir, notify, etc.) to group level
2025-12-19 10:33:48 -05:00
Bendt
d4226caf0a Complete Phase 1: parallel sync, IPC, theme colors, lazy CLI loading
- Sync: Parallelize message downloads with asyncio.gather (batch size 5)
- Sync: Increase HTTP semaphore from 2 to 5 concurrent requests
- Sync: Add IPC notifications to sync daemon after sync completes
- Mail: Replace all hardcoded RGB colors with theme variables
- Mail: Remove envelope icon/checkbox gap (padding cleanup)
- Mail: Add IPC listener for refresh notifications from sync
- Calendar: Style current time line with error color and solid line
- Tasks: Fix table not displaying (CSS grid to horizontal layout)
- CLI: Implement lazy command loading for faster startup (~12s to ~0.3s)
- Add PROJECT_PLAN.md with full improvement roadmap
- Add src/utils/ipc.py for Unix socket cross-app communication
2025-12-19 10:29:53 -05:00
26 changed files with 2790 additions and 229 deletions

587
PROJECT_PLAN.md Normal file
View File

@@ -0,0 +1,587 @@
# LUK Project Plan
This document outlines planned improvements across the LUK applications (Mail, Calendar, Sync, Tasks).
---
## Bug Fixes
### Tasks App - Table Not Displaying (FIXED)
**Priority:** Critical
**File:** `src/tasks/app.py`
The tasks app was not showing the task table due to a CSS grid layout issue. The grid layout with mixed `dock` and `grid` positioning caused the main content area to have 0 height.
**Fix:** Changed from grid layout to horizontal layout with docked header/footer.
---
## Sync App
### Performance Improvements
#### 1. Parallelize Message Downloads
**Priority:** High
**Files:** `src/services/microsoft_graph/mail.py`, `src/services/microsoft_graph/client.py`
Currently, message downloads are sequential (`for` loop with `await`). This is a significant bottleneck.
**Current code pattern:**
```python
for msg_id in message_ids:
content = await client.get_message(msg_id)
await save_to_maildir(content)
```
**Proposed changes:**
1. Increase semaphore limit in `client.py` from 2 to 5 concurrent HTTP requests
2. Batch parallel downloads using `asyncio.gather()` with batches of 5-10 messages
3. Batch sync state writes every N messages instead of after each message (currently an I/O bottleneck in archive sync)
**Example implementation:**
```python
BATCH_SIZE = 5
async def fetch_batch(batch_ids):
return await asyncio.gather(*[client.get_message(id) for id in batch_ids])
for i in range(0, len(message_ids), BATCH_SIZE):
batch = message_ids[i:i + BATCH_SIZE]
results = await fetch_batch(batch)
for content in results:
await save_to_maildir(content)
# Write sync state every batch instead of every message
save_sync_state()
```
#### 2. Optimize Maildir Writes
**Priority:** Medium
**File:** `src/utils/mail_utils/maildir.py`
The `save_mime_to_maildir_async` function is a potential bottleneck. Consider:
- Batching file writes
- Using thread pool for I/O operations
---
### CLI Improvements
#### 3. Default to TUI Mode
**Priority:** Medium
**File:** `src/cli/sync.py`
Currently `luk sync` requires subcommands. Change to:
- `luk sync` → Opens TUI (interactive mode) by default
- `luk sync --once` / `luk sync -1` → One-shot sync
- `luk sync --daemon` / `luk sync -d` → Daemon mode
- `luk sync status` → Show sync status
---
### UI Consistency
#### 4. Navigation and Styling
**Priority:** Low
**File:** `src/cli/sync_dashboard.py`
- Add `j`/`k` keybindings for list navigation (vim-style)
- Use `border: round` in TCSS for consistency with other apps
- Add `.border_title` styling for list containers
#### 5. Notifications Toggle
**Priority:** Low
**Files:** `src/cli/sync_dashboard.py`, `src/utils/notifications.py`
Add a UI switch to enable/disable desktop notifications during sync.
---
## Inter-Process Communication (IPC)
### Overview
**Priority:** High
**Goal:** Enable real-time updates between apps when data changes:
- Mail app refreshes when sync downloads new messages
- Tasks app refreshes when syncing or when tasks are added from mail app
- Calendar app reloads when sync updates events
### Platform Options
#### macOS Options
1. **Distributed Notifications (NSDistributedNotificationCenter)**
- Native macOS IPC mechanism
- Simple pub/sub model
- Lightweight, no server needed
- Python: Use `pyobjc` (`Foundation.NSDistributedNotificationCenter`)
- Example: `CFNotificationCenterPostNotification()`
2. **Unix Domain Sockets**
- Works on both macOS and Linux
- Requires a listener process (daemon or embedded in sync)
- More complex but more flexible
- Can send structured data (JSON messages)
3. **Named Pipes (FIFO)**
- Simple, works on both platforms
- One-way communication
- Less suitable for bidirectional messaging
4. **File-based Signaling**
- Write a "dirty" file that apps watch via `watchdog` or `fsevents`
- Simple but has latency
- Works cross-platform
#### Linux Options
1. **D-Bus**
- Standard Linux IPC
- Python: Use `dbus-python` or `pydbus`
- Powerful but more complex
- Not available on macOS by default
2. **Unix Domain Sockets**
- Same as macOS, fully compatible
- Recommended for cross-platform compatibility
3. **systemd Socket Activation**
- If running as a systemd service
- Clean integration with Linux init
### Recommended Approach
Use **Unix Domain Sockets** for cross-platform compatibility:
```python
# Socket path
SOCKET_PATH = Path.home() / ".cache" / "luk" / "ipc.sock"
# Message types
class IPCMessage:
MAIL_UPDATED = "mail.updated"
TASKS_UPDATED = "tasks.updated"
CALENDAR_UPDATED = "calendar.updated"
# Sync daemon sends notifications
async def notify_mail_updated(folder: str):
await send_ipc_message({"type": IPCMessage.MAIL_UPDATED, "folder": folder})
# Mail app listens
async def listen_for_updates():
async for message in ipc_listener():
if message["type"] == IPCMessage.MAIL_UPDATED:
self.load_messages()
```
**Implementation files:**
- `src/utils/ipc.py` - IPC client/server utilities
- `src/cli/sync_daemon.py` - Add notification sending
- `src/mail/app.py` - Add listener for mail updates
- `src/tasks/app.py` - Add listener for task updates
- `src/calendar/app.py` - Add listener for calendar updates
---
## Calendar App
### Visual Improvements
#### 1. Current Time Hour Line Styling
**Priority:** High
**File:** `src/calendar/widgets/WeekGrid.py`
The current time indicator hour line should have a subtle contrasting background color to make it more visible.
#### 2. Cursor Hour Header Highlighting
**Priority:** Medium
**File:** `src/calendar/widgets/WeekGrid.py`
The hour header at cursor position should have a brighter background, similar to how the day header is highlighted when selected.
---
### Layout Improvements
#### 3. Responsive Detail Panel
**Priority:** Medium
**Files:** `src/calendar/app.py`, `src/calendar/widgets/`
When the terminal is wider than X characters (e.g., 120), show a side-docked detail panel for the selected event instead of a modal/overlay.
#### 4. Sidebar Mini-Calendar
**Priority:** Medium
**Files:** `src/calendar/app.py`, `src/calendar/widgets/MonthCalendar.py`
When the sidebar is toggled on, display a mini-calendar in the top-left corner showing:
- Current day highlighted
- The week(s) currently visible in the main WeekGrid pane
- Click/navigate to jump to a specific date
**Implementation:**
- Reuse existing `MonthCalendar` widget in compact mode
- Add reactive property to sync selected week with main pane
- Add to sidebar composition when toggled
---
### Microsoft Graph Integration
#### 5. Calendar Invites Sidebar
**Priority:** Medium
**Files:** `src/calendar/app.py`, `src/services/microsoft_graph/calendar.py`
Display a list of pending calendar invites from Microsoft Graph API in a sidebar panel:
- List pending invites (meeting requests)
- Show invite details (organizer, time, subject)
- Accept/Decline/Tentative actions
- No sync needed - fetch on-demand from API
**API Endpoints:**
- `GET /me/calendar/events?$filter=responseStatus/response eq 'notResponded'`
- `POST /me/events/{id}/accept`
- `POST /me/events/{id}/decline`
- `POST /me/events/{id}/tentativelyAccept`
**UI:**
- Toggle with keybinding (e.g., `i` for invites)
- Sidebar shows list of pending invites
- Detail view shows full invite info
- Action buttons/keys for response
---
### Search Feature
#### 6. Calendar Search
**Priority:** Medium
**Files:** `src/calendar/app.py`, `src/services/khal/client.py` (if khal supports search)
Add search functionality if the underlying backend (khal) supports it:
- `/` keybinding to open search input
- Search by event title, description, location
- Display search results in a modal or replace main view
- Navigate to selected event
**Check:** Does `khal search` command exist?
---
### Help System
#### 7. Help Toast (Keep Current Implementation)
**Priority:** Low
**File:** `src/calendar/app.py`
The `?` key shows a help toast using `self.notify()`. This pattern should be implemented in other apps (Mail, Tasks, Sync) for consistency.
---
## Mail App
### Layout Fixes
#### 1. Remove Envelope Icon/Checkbox Gap
**Priority:** High
**File:** `src/mail/widgets/EnvelopeListItem.py`
There's a 1-character space between the envelope icon and checkbox that should be removed for tighter layout.
---
### Theme Improvements
#### 2. Replace Hardcoded RGB Colors
**Priority:** High
**File:** `src/mail/email_viewer.tcss`
Multiple hardcoded RGB values should use Textual theme variables for better theming support:
| Line | Current | Replacement |
|------|---------|-------------|
| 6 | `border: round rgb(117, 106, 129)` | `border: round $border` |
| 46 | `background: rgb(55, 53, 57)` | `background: $surface` |
| 52, 57 | RGB label colors | Theme variables |
| 69 | `background: rgb(64, 62, 65)` | `background: $panel` |
| 169, 225, 228, 272, 274 | Various RGB colors | Theme variables |
---
### Keybindings
#### 3. Add Refresh Keybinding
**Priority:** Medium
**File:** `src/mail/app.py`
Add `r` keybinding to refresh/reload the message list.
#### 4. Add Mark Read/Unread Action
**Priority:** Medium
**Files:** `src/mail/app.py`, `src/mail/actions/` (new file)
Add action to toggle read/unread status on selected message(s).
---
### Search Feature
#### 5. Mail Search
**Priority:** Medium
**Files:** `src/mail/app.py`, backend integration
Add search functionality if the underlying mail backend supports it:
- `/` keybinding to open search input
- Search by subject, sender, body
- Display results in message list
- Check: Does himalaya or configured backend support search?
---
### UI Enhancements
#### 6. Folder Message Counts
**Priority:** Medium
**Files:** `src/mail/app.py`, `src/mail/widgets/`
Display total message count next to each folder name (e.g., "Inbox (42)").
#### 7. Sort Setting in Config/UI
**Priority:** Low
**Files:** `src/mail/config.py`, `src/mail/app.py`
Add configurable sort order (date, sender, subject) with UI toggle.
---
### Message Display
#### 8. URL Compression in Markdown View
**Priority:** Medium
**Files:** `src/mail/widgets/ContentContainer.py`, `src/mail/screens/LinkPanel.py`
Compress long URLs in the markdown view to ~50 characters with a nerdfont icon. The `_shorten_url` algorithm in `LinkPanel.py` can be reused.
**Considerations:**
- Cache processed markdown to avoid re-processing on scroll
- Store URL mapping for click handling
#### 9. Remove Emoji from Border Title
**Priority:** Low
**File:** `src/mail/widgets/ContentContainer.py` or `EnvelopeHeader.py`
Remove the envelope emoji prefix before message ID in border titles.
#### 10. Enhance Subject Styling
**Priority:** Medium
**File:** `src/mail/widgets/EnvelopeHeader.py`
- Move subject line to the top of the header
- Make it bolder/brighter for better visual hierarchy
---
## Tasks App
### Search Feature
#### 1. Task Search
**Priority:** Medium
**Files:** `src/tasks/app.py`, `src/services/dstask/client.py`
Add search functionality:
- `/` keybinding to open search input
- Search by task summary, notes, project, tags
- Display matching tasks
- Check: dstask likely supports filtering which can be used for search
**Implementation:**
- Add search input widget (TextInput)
- Filter tasks locally or via dstask command
- Update table to show only matching tasks
- Clear search with Escape
---
### Help System
#### 2. Implement Help Toast
**Priority:** Low
**File:** `src/tasks/app.py`
Add `?` keybinding to show help toast (matching Calendar app pattern).
**Note:** This is already implemented in the current code.
---
## Cross-App Improvements
### 1. Consistent Help System
Implement `?` key help toast in all apps using `self.notify()`:
- Mail: `src/mail/app.py`
- Tasks: `src/tasks/app.py` (already has it)
- Sync: `src/cli/sync_dashboard.py`
### 2. Consistent Navigation
Add vim-style `j`/`k` navigation to all list views across apps.
### 3. Consistent Border Styling
Use `border: round` and `.border_title` styling consistently in all TCSS files.
### 4. Consistent Search Interface
Implement `/` keybinding for search across all apps with similar UX:
- `/` opens search input
- Enter executes search
- Escape clears/closes search
- Results displayed in main view or filtered list
---
## Implementation Priority
### Phase 1: Critical/High Priority
1. ~~Tasks App: Fix table display~~ (DONE)
2. Sync: Parallelize message downloads
3. Mail: Replace hardcoded RGB colors
4. Mail: Remove envelope icon/checkbox gap
5. Calendar: Current time hour line styling
6. IPC: Implement cross-app refresh notifications
### Phase 2: Medium Priority
1. Sync: Default to TUI mode
2. Calendar: Cursor hour header highlighting
3. Calendar: Responsive detail panel
4. Calendar: Sidebar mini-calendar
5. Calendar: Calendar invites sidebar
6. Mail: Add refresh keybinding
7. Mail: Add mark read/unread action
8. Mail: Folder message counts
9. Mail: URL compression in markdown view
10. Mail: Enhance subject styling
11. Mail: Search feature
12. Tasks: Search feature
13. Calendar: Search feature
### Phase 3: Low Priority
1. Sync: UI consistency (j/k navigation, borders)
2. Sync: Notifications toggle
3. Calendar: Help toast (already implemented, replicate to other apps)
4. Mail: Remove emoji from border title
5. Mail: Sort setting in config/UI
6. Cross-app: Consistent help, navigation, and styling
---
## Technical Notes
### IPC Implementation Details
For macOS-first with Linux compatibility:
```python
# src/utils/ipc.py
import asyncio
import json
from pathlib import Path
SOCKET_PATH = Path.home() / ".cache" / "luk" / "ipc.sock"
class IPCServer:
"""Server for sending notifications to listening apps."""
async def broadcast(self, message: dict):
"""Send message to all connected clients."""
pass
class IPCClient:
"""Client for receiving notifications in apps."""
async def listen(self, callback):
"""Listen for messages and call callback."""
pass
```
### Backend Search Capabilities
| Backend | Search Support |
|---------|---------------|
| dstask | Filter by project/tag, summary search via shell |
| himalaya | Check `himalaya search` command |
| khal | Check `khal search` command |
| Microsoft Graph | Full text search via `$search` parameter |
---
## Notes
- All UI improvements should be tested with different terminal sizes
- Theme changes should be tested with multiple Textual themes
- Performance improvements should include before/after benchmarks
- New keybindings should be documented in each app's help toast
- IPC should gracefully handle missing socket (apps work standalone)
- Search should be responsive and not block UI
---
## Library Updates & Python Version Review
### Priority: Medium (Scheduled Review)
Periodically review the latest releases of heavily-used libraries to identify:
- Bug fixes that address issues we've encountered
- New features that could improve the codebase
- Deprecation warnings that need to be addressed
- Security updates
### Key Libraries to Review
| Library | Current Use | Review Focus |
|---------|-------------|--------------|
| **Textual** | All TUI apps | New widgets, performance improvements, theming changes, CSS features |
| **aiohttp** | Microsoft Graph API client | Async improvements, connection pooling |
| **msal** | Microsoft authentication | Token caching, auth flow improvements |
| **rich** | Console output (via Textual) | New formatting options |
| **orjson** | Fast JSON parsing | Performance improvements |
| **pyobjc** (macOS) | Notifications | API changes, compatibility |
### Textual Changelog Review Checklist
When reviewing Textual releases, check for:
1. **New widgets** - Could replace custom implementations
2. **CSS features** - New selectors, pseudo-classes, properties
3. **Theming updates** - New theme variables, design token changes
4. **Performance** - Rendering optimizations, memory improvements
5. **Breaking changes** - Deprecated APIs, signature changes
6. **Worker improvements** - Background task handling
### Python Version Upgrade
#### Current Status
- Check `.python-version` and `pyproject.toml` for current Python version
- Evaluate upgrade to Python 3.13 or 3.14 when stable
#### Python 3.13 Features to Consider
- Improved error messages
- Type system enhancements (`typing` module improvements)
- Performance optimizations (PEP 709 - inline comprehensions)
#### Python 3.14 Considerations
- **Status:** Currently in alpha/beta (as of Dec 2024)
- **Expected stable release:** October 2025
- **Recommendation:** Wait for stable release before adopting
- **Pre-release testing:** Can test compatibility in CI/CD before adoption
#### Upgrade Checklist
1. [ ] Review Python release notes for breaking changes
2. [ ] Check library compatibility (especially `pyobjc`, `textual`, `msal`)
3. [ ] Update `.python-version` (mise/pyenv)
4. [ ] Update `pyproject.toml` `requires-python` field
5. [ ] Run full test suite
6. [ ] Test on both macOS and Linux (if applicable)
7. [ ] Update CI/CD Python version
### Action Items
1. **Quarterly Review** - Schedule quarterly reviews of library changelogs
2. **Dependabot/Renovate** - Consider adding automated dependency update PRs
3. **Changelog Reading** - Before updating, read changelogs for breaking changes
4. **Test Coverage** - Ensure adequate test coverage before major updates

View File

@@ -18,6 +18,8 @@ from textual.reactive import reactive
from src.calendar.backend import CalendarBackend, Event
from src.calendar.widgets.WeekGrid import WeekGrid
from src.calendar.widgets.MonthCalendar import MonthCalendar
from src.calendar.widgets.InvitesPanel import InvitesPanel, CalendarInvite
from src.calendar.widgets.AddEventForm import EventFormData
from src.utils.shared_config import get_theme_name
@@ -51,6 +53,33 @@ class CalendarApp(App):
Screen {
layout: vertical;
}
#main-content {
layout: horizontal;
height: 1fr;
}
#sidebar {
width: 26;
border-right: solid $surface-darken-1;
background: $surface;
padding: 1 0;
}
#sidebar.hidden {
display: none;
}
#sidebar-calendar {
height: auto;
}
#sidebar-invites {
height: auto;
margin-top: 1;
border-top: solid $surface-darken-1;
padding-top: 1;
}
#week-grid {
height: 1fr;
@@ -98,6 +127,8 @@ class CalendarApp(App):
Binding("L", "next_week", "Next Week", show=True),
Binding("g", "goto_today", "Today", show=True),
Binding("w", "toggle_weekends", "Weekends", show=True),
Binding("s", "toggle_sidebar", "Sidebar", show=True),
Binding("i", "focus_invites", "Invites", show=True),
Binding("r", "refresh", "Refresh", show=True),
Binding("enter", "view_event", "View", show=True),
Binding("a", "add_event", "Add", show=True),
@@ -106,12 +137,15 @@ class CalendarApp(App):
# Reactive attributes
include_weekends: reactive[bool] = reactive(True)
show_sidebar: reactive[bool] = reactive(True)
# Instance attributes
backend: Optional[CalendarBackend]
_invites: list[CalendarInvite]
def __init__(self, backend: Optional[CalendarBackend] = None):
super().__init__()
self._invites = []
if backend:
self.backend = backend
@@ -124,7 +158,11 @@ class CalendarApp(App):
def compose(self) -> ComposeResult:
"""Create the app layout."""
yield Header()
yield WeekGrid(id="week-grid")
with Horizontal(id="main-content"):
with Vertical(id="sidebar"):
yield MonthCalendar(id="sidebar-calendar")
yield InvitesPanel(id="sidebar-invites")
yield WeekGrid(id="week-grid")
yield Static(id="event-detail", classes="hidden")
yield CalendarStatusBar(id="status-bar")
yield Footer()
@@ -136,10 +174,88 @@ class CalendarApp(App):
# Load events for current week
self.load_events()
# Sync sidebar calendar with current week
self._sync_sidebar_calendar()
# Load invites in background
self.run_worker(self._load_invites_async(), exclusive=True)
# Update status bar and title
self._update_status()
self._update_title()
async def _load_invites_async(self) -> None:
"""Load pending calendar invites from Microsoft Graph."""
try:
from src.services.microsoft_graph.auth import get_access_token
from src.services.microsoft_graph.calendar import fetch_pending_invites
from dateutil import parser as date_parser
# Get auth token
scopes = ["https://graph.microsoft.com/Calendars.Read"]
_, headers = get_access_token(scopes)
# Fetch invites
raw_invites = await fetch_pending_invites(headers, days_forward=30)
# Convert to CalendarInvite objects
invites = []
for inv in raw_invites:
try:
start_str = inv.get("start", {}).get("dateTime", "")
end_str = inv.get("end", {}).get("dateTime", "")
start_dt = (
date_parser.parse(start_str) if start_str else datetime.now()
)
end_dt = date_parser.parse(end_str) if end_str else start_dt
organizer_data = inv.get("organizer", {}).get("emailAddress", {})
organizer_name = organizer_data.get(
"name", organizer_data.get("address", "Unknown")
)
invite = CalendarInvite(
id=inv.get("id", ""),
subject=inv.get("subject", "No Subject"),
organizer=organizer_name,
start=start_dt,
end=end_dt,
location=inv.get("location", {}).get("displayName"),
is_all_day=inv.get("isAllDay", False),
response_status=inv.get("responseStatus", {}).get(
"response", "notResponded"
),
)
invites.append(invite)
except Exception as e:
logger.warning(f"Failed to parse invite: {e}")
# Update the panel
self._invites = invites
self.call_from_thread(self._update_invites_panel)
except Exception as e:
logger.warning(f"Failed to load invites: {e}")
# Silently fail - invites are optional
def _update_invites_panel(self) -> None:
"""Update the invites panel with loaded invites."""
try:
panel = self.query_one("#sidebar-invites", InvitesPanel)
panel.set_invites(self._invites)
except Exception:
pass
def _sync_sidebar_calendar(self) -> None:
"""Sync the sidebar calendar with the main week grid."""
try:
grid = self.query_one("#week-grid", WeekGrid)
calendar = self.query_one("#sidebar-calendar", MonthCalendar)
calendar.update_week(grid.week_start)
calendar.update_selected(grid.get_cursor_date())
except Exception:
pass # Sidebar might not exist yet
def load_events(self) -> None:
"""Load events from backend for the current week."""
if not self.backend:
@@ -255,11 +371,22 @@ class CalendarApp(App):
def on_week_grid_week_changed(self, message: WeekGrid.WeekChanged) -> None:
"""Handle week change - reload events."""
self.load_events()
self._sync_sidebar_calendar()
def on_week_grid_event_selected(self, message: WeekGrid.EventSelected) -> None:
"""Handle event selection."""
self._update_event_detail(message.event)
# Handle MonthCalendar messages
def on_month_calendar_date_selected(
self, message: MonthCalendar.DateSelected
) -> None:
"""Handle date selection from sidebar calendar."""
grid = self.query_one("#week-grid", WeekGrid)
grid.goto_date(message.date)
self.load_events()
self._sync_sidebar_calendar()
# Navigation actions (forwarded to grid)
def action_cursor_down(self) -> None:
"""Move cursor down."""
@@ -311,6 +438,15 @@ class CalendarApp(App):
mode = "7 days" if self.include_weekends else "5 days (weekdays)"
self.notify(f"Showing {mode}")
def action_toggle_sidebar(self) -> None:
"""Toggle sidebar visibility."""
self.show_sidebar = not self.show_sidebar
sidebar = self.query_one("#sidebar", Vertical)
if self.show_sidebar:
sidebar.remove_class("hidden")
else:
sidebar.add_class("hidden")
def action_refresh(self) -> None:
"""Refresh events from backend."""
self.load_events()
@@ -383,6 +519,8 @@ Keybindings:
H/L - Previous/Next week
g - Go to today
w - Toggle weekends (5/7 days)
s - Toggle sidebar
i - Focus invites panel
Enter - View event details
a - Add new event
r - Refresh
@@ -390,6 +528,16 @@ Keybindings:
"""
self.notify(help_text.strip(), timeout=10)
def action_focus_invites(self) -> None:
"""Focus on the invites panel and show invite count."""
if not self.show_sidebar:
self.action_toggle_sidebar()
if self._invites:
self.notify(f"You have {len(self._invites)} pending invite(s)")
else:
self.notify("No pending invites")
def run_app(backend: Optional[CalendarBackend] = None) -> None:
"""Run the Calendar TUI application."""

View File

@@ -0,0 +1,208 @@
"""Calendar invites panel widget for Calendar TUI sidebar.
Displays pending calendar invites that need a response.
"""
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
from rich.segment import Segment
from rich.style import Style
from textual.message import Message
from textual.reactive import reactive
from textual.strip import Strip
from textual.widget import Widget
@dataclass
class CalendarInvite:
"""A calendar invite pending response."""
id: str
subject: str
organizer: str
start: datetime
end: datetime
location: Optional[str] = None
is_all_day: bool = False
response_status: str = "notResponded" # notResponded, tentativelyAccepted
class InvitesPanel(Widget):
"""Panel showing pending calendar invites."""
DEFAULT_CSS = """
InvitesPanel {
width: 100%;
height: auto;
min-height: 3;
padding: 0 1;
border: round $primary;
border-title-color: $primary;
}
"""
# Reactive attributes
invites: reactive[List[CalendarInvite]] = reactive(list)
selected_index: reactive[int] = reactive(0)
class InviteSelected(Message):
"""An invite was selected."""
def __init__(self, invite: CalendarInvite) -> None:
super().__init__()
self.invite = invite
class InviteRespond(Message):
"""User wants to respond to an invite."""
def __init__(self, invite: CalendarInvite, response: str) -> None:
super().__init__()
self.invite = invite
self.response = response # accept, tentativelyAccept, decline
def __init__(
self,
invites: Optional[List[CalendarInvite]] = None,
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
if invites:
self.invites = invites
def on_mount(self) -> None:
"""Set border title on mount."""
self._update_border_title()
def _update_border_title(self) -> None:
"""Update border title with invite count."""
count = len(self.invites)
self.border_title = f"Invites ({count})" if count else "Invites"
def _get_theme_color(self, color_name: str) -> str:
"""Get a color from the current theme."""
try:
theme = self.app.current_theme
color = getattr(theme, color_name, None)
if color:
return str(color)
except Exception:
pass
# Fallback colors
fallbacks = {
"secondary": "#81A1C1",
"primary": "#88C0D0",
"accent": "#B48EAD",
"foreground": "#D8DEE9",
"surface": "#3B4252",
"warning": "#EBCB8B",
}
return fallbacks.get(color_name, "white")
def get_content_height(self, container, viewport, width: int) -> int:
"""Calculate height: invite rows only (no internal header)."""
if not self.invites:
return 1 # "No pending invites"
return len(self.invites) * 2 # 2 lines per invite
def render_line(self, y: int) -> Strip:
"""Render a line of the panel."""
if not self.invites:
if y == 0:
return self._render_empty_message()
return Strip.blank(self.size.width)
# Each invite takes 2 lines
invite_idx = y // 2
line_in_invite = y % 2
if 0 <= invite_idx < len(self.invites):
return self._render_invite_line(
self.invites[invite_idx],
line_in_invite,
invite_idx == self.selected_index,
)
return Strip.blank(self.size.width)
def _render_empty_message(self) -> Strip:
"""Render empty state message."""
msg = "No pending invites"
msg = msg[: self.size.width].ljust(self.size.width)
style = Style(color="bright_black")
return Strip([Segment(msg, style)])
def _render_invite_line(
self, invite: CalendarInvite, line: int, is_selected: bool
) -> Strip:
"""Render a line of an invite."""
if line == 0:
# First line: subject (truncated)
text = invite.subject[: self.size.width - 2]
if is_selected:
text = "> " + text[: self.size.width - 2]
text = text[: self.size.width].ljust(self.size.width)
if is_selected:
style = Style(bold=True, reverse=True)
else:
style = Style()
return Strip([Segment(text, style)])
else:
# Second line: date/time and organizer
date_str = invite.start.strftime("%m/%d %H:%M")
organizer = invite.organizer[:15] if invite.organizer else ""
info = f" {date_str} - {organizer}"
info = info[: self.size.width].ljust(self.size.width)
warning_color = self._get_theme_color("warning")
if invite.response_status == "tentativelyAccepted":
style = Style(color=warning_color, italic=True)
else:
style = Style(color="bright_black")
return Strip([Segment(info, style)])
def set_invites(self, invites: List[CalendarInvite]) -> None:
"""Update the list of invites."""
self.invites = invites
if self.selected_index >= len(invites):
self.selected_index = max(0, len(invites) - 1)
self._update_border_title()
self.refresh()
def select_next(self) -> None:
"""Select the next invite."""
if self.invites and self.selected_index < len(self.invites) - 1:
self.selected_index += 1
self.refresh()
def select_previous(self) -> None:
"""Select the previous invite."""
if self.invites and self.selected_index > 0:
self.selected_index -= 1
self.refresh()
def get_selected_invite(self) -> Optional[CalendarInvite]:
"""Get the currently selected invite."""
if self.invites and 0 <= self.selected_index < len(self.invites):
return self.invites[self.selected_index]
return None
def on_click(self, event) -> None:
"""Handle mouse clicks."""
y = event.y
if not self.invites:
return
# Calculate which invite was clicked (2 lines per invite)
invite_idx = y // 2
if 0 <= invite_idx < len(self.invites):
self.selected_index = invite_idx
self.post_message(self.InviteSelected(self.invites[invite_idx]))
self.refresh()

View File

@@ -16,15 +16,21 @@ from textual.reactive import reactive
from textual.strip import Strip
from textual.widget import Widget
from src.calendar import config
def get_month_calendar(year: int, month: int) -> list[list[Optional[date]]]:
def get_month_calendar(
year: int, month: int, week_start_day: int = 0
) -> list[list[Optional[date]]]:
"""Generate a calendar grid for a month.
Returns a list of weeks, where each week is a list of 7 dates (or None for empty cells).
Week starts on Monday.
"""
import calendar
Args:
year: The year
month: The month (1-12)
week_start_day: Config format (0=Sunday, 1=Monday, ..., 6=Saturday)
"""
# Get first day of month and number of days
first_day = date(year, month, 1)
if month == 12:
@@ -32,11 +38,17 @@ def get_month_calendar(year: int, month: int) -> list[list[Optional[date]]]:
else:
last_day = date(year, month + 1, 1) - timedelta(days=1)
# Monday = 0, Sunday = 6
first_weekday = first_day.weekday()
# Convert config week start to python weekday
# Config: 0=Sunday, 1=Monday, ..., 6=Saturday
# Python: 0=Monday, 1=Tuesday, ..., 6=Sunday
python_week_start = (week_start_day - 1) % 7
# Calculate which day of the week the first day falls on (relative to week start)
first_python_weekday = first_day.weekday() # 0=Monday, 6=Sunday
days_offset = (first_python_weekday - python_week_start) % 7
weeks: list[list[Optional[date]]] = []
current_week: list[Optional[date]] = [None] * first_weekday
current_week: list[Optional[date]] = [None] * days_offset
current = first_day
while current <= last_day:
@@ -55,6 +67,22 @@ def get_month_calendar(year: int, month: int) -> list[list[Optional[date]]]:
return weeks
def get_day_names(week_start_day: int = 0) -> str:
"""Get day name headers based on week start day.
Args:
week_start_day: Config format (0=Sunday, 1=Monday, ..., 6=Saturday)
Returns:
String like "Su Mo Tu We Th Fr Sa" or "Mo Tu We Th Fr Sa Su"
"""
# Full list starting from Sunday (config day 0)
all_days = ["Su", "Mo", "Tu", "We", "Th", "Fr", "Sa"]
# Rotate to start from week_start_day
rotated = all_days[week_start_day:] + all_days[:week_start_day]
return " ".join(rotated)
class MonthCalendar(Widget):
"""A compact month calendar widget for sidebars."""
@@ -62,7 +90,7 @@ class MonthCalendar(Widget):
MonthCalendar {
width: 24;
height: auto;
padding: 0 1;
padding: 0;
}
"""
@@ -122,7 +150,11 @@ class MonthCalendar(Widget):
@property
def _weeks(self) -> list[list[Optional[date]]]:
"""Get the weeks for the current display month."""
return get_month_calendar(self.display_month.year, self.display_month.month)
return get_month_calendar(
self.display_month.year,
self.display_month.month,
config.week_start_day(),
)
def get_content_height(self, container, viewport, width: int) -> int:
"""Calculate height: header + day names + weeks."""
@@ -152,8 +184,8 @@ class MonthCalendar(Widget):
return Strip([Segment(header, style)])
def _render_day_names(self) -> Strip:
"""Render the day name headers (Mo Tu We ...)."""
day_names = "Mo Tu We Th Fr Sa Su"
"""Render the day name headers based on week start setting."""
day_names = get_day_names(config.week_start_day())
# Pad to widget width
line = day_names[: self.size.width].ljust(self.size.width)
style = Style(color="bright_black")
@@ -240,3 +272,41 @@ class MonthCalendar(Widget):
self.display_month = date(year, month, 1)
self.post_message(self.MonthChanged(self.display_month))
self.refresh()
def on_click(self, event) -> None:
"""Handle mouse clicks on the calendar."""
# Row 0 is the month header (< Month Year >)
# Row 1 is day names (Mo Tu We ...)
# Row 2+ are the week rows
y = event.y
x = event.x
if y == 0:
# Month header - check for navigation arrows
if x < 2:
self.prev_month()
elif x >= self.size.width - 2:
self.next_month()
return
if y == 1:
# Day names row - ignore
return
# Week row - calculate which date was clicked
week_idx = y - 2
weeks = self._weeks
if week_idx < 0 or week_idx >= len(weeks):
return
week = weeks[week_idx]
# Each day takes 3 characters ("DD "), so find which day column
day_col = x // 3
if day_col < 0 or day_col >= 7:
return
clicked_date = week[day_col]
if clicked_date is not None:
self.selected_date = clicked_date
self.post_message(self.DateSelected(clicked_date))
self.refresh()

View File

@@ -373,8 +373,8 @@ class WeekGridBody(ScrollView):
# Style time label - highlight current time, dim outside work hours
if is_current_time_row:
secondary_color = self._get_theme_color("secondary")
time_style = Style(color=secondary_color, bold=True)
error_color = self._get_theme_color("error")
time_style = Style(color=error_color, bold=True)
elif (
row_index < self._work_day_start * rows_per_hour
or row_index >= self._work_day_end * rows_per_hour
@@ -388,13 +388,19 @@ class WeekGridBody(ScrollView):
# Event cells for each day
for col_idx, day_col in enumerate(self._days):
cell_text, cell_style = self._render_event_cell(day_col, row_index, col_idx)
cell_text, cell_style = self._render_event_cell(
day_col, row_index, col_idx, is_current_time_row
)
segments.append(Segment(cell_text, cell_style))
return Strip(segments)
def _render_event_cell(
self, day_col: DayColumn, row_index: int, col_idx: int
self,
day_col: DayColumn,
row_index: int,
col_idx: int,
is_current_time_row: bool = False,
) -> Tuple[str, Style]:
"""Render a single cell for a day/time slot."""
events_at_row = day_col.grid[row_index] if row_index < len(day_col.grid) else []
@@ -404,10 +410,16 @@ class WeekGridBody(ScrollView):
is_cursor = col_idx == self.cursor_col and row_index == self.cursor_row
# Get colors for current time line
error_color = self._get_theme_color("error") if is_current_time_row else None
if not events_at_row:
# Empty cell
if is_cursor:
return ">" + " " * (day_col_width - 1), Style(reverse=True)
elif is_current_time_row:
# Current time indicator line
return "" * day_col_width, Style(color=error_color, bold=True)
else:
# Grid line style
if row_index % rows_per_hour == 0:
@@ -636,13 +648,10 @@ class WeekGrid(Vertical):
current_row = (now.hour * rows_per_hour) + (now.minute // minutes_per_row)
self.cursor_row = current_row
# Scroll to show work day start initially
# Always scroll to work day start initially (e.g., 7am)
if self._body:
work_start_row = config.work_day_start_hour() * rows_per_hour
# If current time is before work day start, scroll to work day start
# Otherwise scroll to show current time
scroll_target = min(work_start_row, current_row)
self._body.scroll_to(y=scroll_target, animate=False)
self._body.scroll_to(y=work_start_row, animate=False)
def watch_week_start(self, old: date, new: date) -> None:
"""Handle week_start changes."""
@@ -749,3 +758,20 @@ class WeekGrid(Vertical):
event = self.get_event_at_cursor()
if event:
self.post_message(self.EventSelected(event))
def goto_date(self, target_date: date) -> None:
"""Navigate to a specific date.
Sets the week to contain the target date and places cursor on that day.
"""
# Get the week start for the target date
week_start_date = get_week_start_for_date(target_date)
if self.week_start != week_start_date:
self.week_start = week_start_date
# Set cursor column to the target date
col = get_day_column_for_date(target_date, self.week_start)
if not self.include_weekends and col >= 5:
col = 4 # Last weekday if weekend
self.cursor_col = col

View File

@@ -3,5 +3,13 @@
from .WeekGrid import WeekGrid
from .AddEventForm import AddEventForm, EventFormData
from .MonthCalendar import MonthCalendar
from .InvitesPanel import InvitesPanel, CalendarInvite
__all__ = ["WeekGrid", "AddEventForm", "EventFormData", "MonthCalendar"]
__all__ = [
"WeekGrid",
"AddEventForm",
"EventFormData",
"MonthCalendar",
"InvitesPanel",
"CalendarInvite",
]

View File

@@ -1,37 +1,55 @@
# CLI module for the application
# Uses lazy imports to speed up startup time
import click
from .sync import sync
from .drive import drive
from .email import email
from .calendar import calendar
from .ticktick import ticktick
from .godspeed import godspeed
from .gitlab_monitor import gitlab_monitor
from .tasks import tasks
import importlib
@click.group()
class LazyGroup(click.Group):
"""A click Group that lazily loads subcommands."""
def __init__(self, *args, lazy_subcommands=None, **kwargs):
super().__init__(*args, **kwargs)
self._lazy_subcommands = lazy_subcommands or {}
def list_commands(self, ctx):
base = super().list_commands(ctx)
lazy = list(self._lazy_subcommands.keys())
return sorted(base + lazy)
def get_command(self, ctx, cmd_name):
if cmd_name in self._lazy_subcommands:
return self._load_command(cmd_name)
return super().get_command(ctx, cmd_name)
def _load_command(self, cmd_name):
module_path, attr_name = self._lazy_subcommands[cmd_name]
# Handle relative imports
if module_path.startswith("."):
module = importlib.import_module(module_path, package="src.cli")
else:
module = importlib.import_module(module_path)
return getattr(module, attr_name)
# Create CLI with lazy loading - commands only imported when invoked
@click.group(
cls=LazyGroup,
lazy_subcommands={
"sync": (".sync", "sync"),
"drive": (".drive", "drive"),
"email": (".email", "email"),
"mail": (".email", "email"), # alias
"calendar": (".calendar", "calendar"),
"ticktick": (".ticktick", "ticktick"),
"tt": (".ticktick", "ticktick"), # alias
"godspeed": (".godspeed", "godspeed"),
"gs": (".godspeed", "godspeed"), # alias
"gitlab_monitor": (".gitlab_monitor", "gitlab_monitor"),
"glm": (".gitlab_monitor", "gitlab_monitor"), # alias
"tasks": (".tasks", "tasks"),
},
)
def cli():
"""Root command for the CLI."""
"""LUK - Local Unix Kit for productivity."""
pass
cli.add_command(sync)
cli.add_command(drive)
cli.add_command(email)
cli.add_command(calendar)
cli.add_command(ticktick)
cli.add_command(godspeed)
cli.add_command(gitlab_monitor)
cli.add_command(tasks)
# Add 'mail' as an alias for email
cli.add_command(email, name="mail")
# Add 'tt' as a short alias for ticktick
cli.add_command(ticktick, name="tt")
# Add 'gs' as a short alias for godspeed
cli.add_command(godspeed, name="gs")
# Add 'glm' as a short alias for gitlab_monitor
cli.add_command(gitlab_monitor, name="glm")

View File

@@ -588,10 +588,149 @@ async def _sync_outlook_data(
click.echo("Sync complete.")
@click.group()
def sync():
"""Email and calendar synchronization."""
pass
@click.group(invoke_without_command=True)
@click.option(
"--once",
is_flag=True,
help="Run a single sync and exit (non-interactive).",
default=False,
)
@click.option(
"--daemon",
is_flag=True,
help="Run in background daemon mode.",
default=False,
)
@click.option(
"--org",
help="Specify the organization name for the subfolder to store emails and calendar events",
default="corteva",
)
@click.option(
"--vdir",
help="Output calendar events in vdir format to the specified directory",
default="~/Calendar",
)
@click.option(
"--notify/--no-notify",
help="Send macOS notifications for new email messages",
default=True,
)
@click.option(
"--dry-run",
is_flag=True,
help="Run in dry-run mode without making changes.",
default=False,
)
@click.option(
"--demo",
is_flag=True,
help="Run with simulated sync (demo mode)",
default=False,
)
@click.option(
"--days-back",
type=int,
help="Number of days to look back for calendar events",
default=1,
)
@click.option(
"--days-forward",
type=int,
help="Number of days to look forward for calendar events",
default=30,
)
@click.option(
"--download-attachments",
is_flag=True,
help="Download email attachments",
default=False,
)
@click.option(
"--two-way-calendar",
is_flag=True,
help="Enable two-way calendar sync (sync local changes to server)",
default=False,
)
@click.pass_context
def sync(
ctx,
once,
daemon,
org,
vdir,
notify,
dry_run,
demo,
days_back,
days_forward,
download_attachments,
two_way_calendar,
):
"""Email and calendar synchronization.
By default, opens the interactive TUI dashboard.
Use --once for a single sync, or --daemon for background mode.
"""
# If a subcommand is invoked, let it handle everything
if ctx.invoked_subcommand is not None:
return
# Handle the default behavior (no subcommand)
if daemon:
# Run in daemon mode
from .sync_daemon import create_daemon_config, SyncDaemon
config = create_daemon_config(
dry_run=dry_run,
vdir=vdir,
icsfile=None,
org=org,
days_back=days_back,
days_forward=days_forward,
continue_iteration=False,
download_attachments=download_attachments,
two_way_calendar=two_way_calendar,
notify=notify,
)
daemon_instance = SyncDaemon(config)
daemon_instance.start()
elif once:
# Run a single sync (non-interactive)
asyncio.run(
_sync_outlook_data(
dry_run,
vdir,
None, # icsfile
org,
days_back,
days_forward,
False, # continue_iteration
download_attachments,
two_way_calendar,
notify,
)
)
else:
# Default: Launch interactive TUI dashboard
from .sync_dashboard import run_dashboard_sync
sync_config = {
"org": org,
"vdir": vdir,
"notify": notify,
"dry_run": dry_run,
"days_back": days_back,
"days_forward": days_forward,
"download_attachments": download_attachments,
"two_way_calendar": two_way_calendar,
"continue_iteration": False,
"icsfile": None,
}
asyncio.run(
run_dashboard_sync(notify=notify, sync_config=sync_config, demo_mode=demo)
)
def daemonize():
@@ -682,18 +821,6 @@ def daemonize():
help="Enable two-way calendar sync (sync local changes to server)",
default=False,
)
@click.option(
"--daemon",
is_flag=True,
help="Run in daemon mode.",
default=False,
)
@click.option(
"--dashboard",
is_flag=True,
help="Run with TUI dashboard.",
default=False,
)
@click.option(
"--notify",
is_flag=True,
@@ -710,59 +837,23 @@ def run(
continue_iteration,
download_attachments,
two_way_calendar,
daemon,
dashboard,
notify,
):
if dashboard:
from .sync_dashboard import run_dashboard_sync
sync_config = {
"dry_run": dry_run,
"vdir": vdir,
"icsfile": icsfile,
"org": org,
"days_back": days_back,
"days_forward": days_forward,
"continue_iteration": continue_iteration,
"download_attachments": download_attachments,
"two_way_calendar": two_way_calendar,
"notify": notify,
}
asyncio.run(run_dashboard_sync(notify=notify, sync_config=sync_config))
elif daemon:
from .sync_daemon import create_daemon_config, SyncDaemon
config = create_daemon_config(
dry_run=dry_run,
vdir=vdir,
icsfile=icsfile,
org=org,
days_back=days_back,
days_forward=days_forward,
continue_iteration=continue_iteration,
download_attachments=download_attachments,
two_way_calendar=two_way_calendar,
notify=notify,
)
daemon_instance = SyncDaemon(config)
daemon_instance.start()
else:
asyncio.run(
_sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
)
"""Run a single sync operation (legacy command, prefer 'luk sync --once')."""
asyncio.run(
_sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
)
)
@sync.command()

View File

@@ -14,6 +14,7 @@ from typing import Optional, Dict, Any
from src.cli.sync import _sync_outlook_data, should_run_godspeed_sync, should_run_sweep
from src.cli.sync import run_godspeed_sync, run_task_sweep, load_sync_state
from src.utils.ipc import notify_all, notify_refresh
class SyncDaemon:
@@ -247,6 +248,13 @@ class SyncDaemon:
notify=self.config.get("notify", False),
)
self.logger.info("Sync completed successfully")
# Notify all running TUI apps to refresh their data
results = await notify_all({"source": "sync_daemon"})
notified = [app for app, success in results.items() if success]
if notified:
self.logger.info(f"Notified apps to refresh: {', '.join(notified)}")
except Exception as e:
self.logger.error(f"Sync failed: {e}")

View File

@@ -216,16 +216,10 @@ class SyncDashboard(App):
.sidebar {
width: 30;
height: 100%;
border: solid $primary;
border: round $primary;
padding: 0;
}
.sidebar-title {
text-style: bold;
padding: 1;
background: $primary-darken-2;
}
.countdown-container {
height: 5;
padding: 0 1;
@@ -269,15 +263,10 @@ class SyncDashboard(App):
.log-container {
height: 1fr;
border: solid $primary;
border: round $primary;
padding: 0;
}
.log-title {
padding: 0 1;
background: $primary-darken-2;
}
ListView {
height: 1fr;
}
@@ -338,8 +327,7 @@ class SyncDashboard(App):
with Horizontal(classes="dashboard"):
# Sidebar with task list
with Vertical(classes="sidebar"):
yield Static("Tasks", classes="sidebar-title")
with Vertical(classes="sidebar", id="tasks-sidebar"):
yield ListView(
# Stage 1: Sync local changes to server
TaskListItem(
@@ -416,8 +404,7 @@ class SyncDashboard(App):
yield Static("0%", id="progress-percent")
# Log for selected task
with Vertical(classes="log-container"):
yield Static("Activity Log", classes="log-title")
with Vertical(classes="log-container", id="log-container"):
yield Log(id="task-log")
yield Footer()
@@ -427,6 +414,13 @@ class SyncDashboard(App):
# Set theme from shared config
self.theme = get_theme_name()
# Set border titles
try:
self.query_one("#tasks-sidebar").border_title = "Tasks"
self.query_one("#log-container").border_title = "Activity Log"
except Exception:
pass
# Store references to task items
task_list = self.query_one("#task-list", ListView)
for item in task_list.children:

View File

@@ -4,12 +4,14 @@ from .widgets.ContentContainer import ContentContainer
from .widgets.EnvelopeListItem import EnvelopeListItem, GroupHeader
from .screens.LinkPanel import LinkPanel
from .screens.ConfirmDialog import ConfirmDialog
from .screens.SearchPanel import SearchPanel
from .actions.task import action_create_task
from .actions.open import action_open
from .actions.delete import delete_current
from src.services.taskwarrior import client as taskwarrior_client
from src.services.himalaya import client as himalaya_client
from src.utils.shared_config import get_theme_name
from src.utils.ipc import IPCListener, IPCMessage
from textual.containers import Container, ScrollableContainer, Vertical, Horizontal
from textual.timer import Timer
from textual.binding import Binding
@@ -72,6 +74,9 @@ class EmailViewerApp(App):
sort_order_ascending: Reactive[bool] = reactive(True)
selected_messages: Reactive[set[int]] = reactive(set())
main_content_visible: Reactive[bool] = reactive(True)
search_query: Reactive[str] = reactive("") # Current search filter
search_mode: Reactive[bool] = reactive(False) # True when showing search results
_cached_envelopes: List[Dict[str, Any]] = [] # Cached envelopes before search
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
@@ -125,10 +130,12 @@ class EmailViewerApp(App):
Binding("x", "toggle_selection", "Toggle selection", show=False),
Binding("space", "toggle_selection", "Toggle selection"),
Binding("escape", "clear_selection", "Clear selection"),
Binding("/", "search", "Search"),
]
)
def compose(self) -> ComposeResult:
yield SearchPanel(id="search_panel")
yield Horizontal(
Vertical(
ListView(
@@ -149,7 +156,7 @@ class EmailViewerApp(App):
async def on_mount(self) -> None:
self.alert_timer: Timer | None = None # Timer to throttle alerts
self.theme = get_theme_name()
self.title = "MaildirGTD"
self.title = "LUK Mail"
self.query_one("#main_content").border_title = self.status_title
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {sort_indicator}"
@@ -157,6 +164,10 @@ class EmailViewerApp(App):
self.query_one("#folders_list").border_title = "3⃣ Folders"
# Start IPC listener for refresh notifications from sync daemon
self._ipc_listener = IPCListener("mail", self._on_ipc_message)
self._ipc_listener.start()
self.fetch_accounts()
self.fetch_folders()
worker = self.fetch_envelopes()
@@ -164,6 +175,12 @@ class EmailViewerApp(App):
self.query_one("#envelopes_list").focus()
self.action_oldest()
def _on_ipc_message(self, message: IPCMessage) -> None:
"""Handle IPC messages from sync daemon."""
if message.event == "refresh":
# Schedule a reload on the main thread
self.call_from_thread(self.fetch_envelopes)
def compute_status_title(self):
metadata = self.message_store.get_metadata(self.current_message_id)
message_date = metadata["date"] if metadata else "N/A"
@@ -339,7 +356,9 @@ class EmailViewerApp(App):
self.current_message_id = 0
self.current_message_index = 0
self.selected_messages.clear()
self.reload_needed = True
self.search_query = "" # Clear search when switching folders
# Directly fetch instead of relying on reload_needed watcher
self.fetch_envelopes()
except Exception as e:
logging.error(f"Error selecting folder: {e}")
@@ -358,9 +377,11 @@ class EmailViewerApp(App):
self.current_message_id = 0
self.current_message_index = 0
self.selected_messages.clear()
self.search_query = "" # Clear search when switching accounts
# Refresh folders for new account
self.fetch_folders()
self.reload_needed = True
# Directly fetch instead of relying on reload_needed watcher
self.fetch_envelopes()
except Exception as e:
logging.error(f"Error selecting account: {e}")
@@ -820,6 +841,9 @@ class EmailViewerApp(App):
self.query_one("#envelopes_list").focus()
def action_quit(self) -> None:
# Stop IPC listener before exiting
if hasattr(self, "_ipc_listener"):
self._ipc_listener.stop()
self.exit()
def action_toggle_selection(self) -> None:
@@ -858,10 +882,31 @@ class EmailViewerApp(App):
self._update_list_view_subtitle()
def action_clear_selection(self) -> None:
"""Clear all selected messages."""
self.selected_messages.clear()
self.refresh_list_view_items() # Refresh all items to uncheck checkboxes
self._update_list_view_subtitle()
"""Clear all selected messages and exit search mode."""
if self.selected_messages:
self.selected_messages.clear()
self.refresh_list_view_items() # Refresh all items to uncheck checkboxes
self._update_list_view_subtitle()
# Exit search mode if active
if self.search_mode:
search_panel = self.query_one("#search_panel", SearchPanel)
search_panel.hide()
self.search_mode = False
self.search_query = ""
# Restore cached envelopes
if self._cached_envelopes:
self.message_store.envelopes = self._cached_envelopes
self._cached_envelopes = []
self._populate_list_view()
# Restore envelope list title
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one(
"#envelopes_list"
).border_title = f"1⃣ Emails {sort_indicator}"
self._update_list_view_subtitle()
def action_oldest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
@@ -871,6 +916,127 @@ class EmailViewerApp(App):
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.message_store.get_newest_id())
def action_search(self) -> None:
"""Open the search panel."""
search_panel = self.query_one("#search_panel", SearchPanel)
if not search_panel.is_visible:
# Cache current envelopes before searching
self._cached_envelopes = list(self.message_store.envelopes)
search_panel.show(self.search_query)
def on_search_panel_search_requested(
self, event: SearchPanel.SearchRequested
) -> None:
"""Handle live search request from search panel."""
self._perform_search(event.query, focus_results=False)
def on_search_panel_search_confirmed(
self, event: SearchPanel.SearchConfirmed
) -> None:
"""Handle confirmed search (Enter key) - search and focus results."""
self._perform_search(event.query, focus_results=True)
def on_search_panel_search_cancelled(
self, event: SearchPanel.SearchCancelled
) -> None:
"""Handle search cancellation - restore previous envelope list."""
self.search_mode = False
self.search_query = ""
# Restore cached envelopes
if self._cached_envelopes:
self.message_store.envelopes = self._cached_envelopes
self._cached_envelopes = []
self._populate_list_view()
# Restore envelope list title
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {sort_indicator}"
self._update_list_view_subtitle()
self.query_one("#envelopes_list").focus()
@work(exclusive=True)
async def _perform_search(self, query: str, focus_results: bool = False) -> None:
"""Perform search using Himalaya and display results in envelope list."""
search_panel = self.query_one("#search_panel", SearchPanel)
search_panel.update_status(-1, searching=True)
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
results, success = await himalaya_client.search_envelopes(
query, folder=folder, account=account
)
if not success:
search_panel.update_status(0, searching=False)
self.show_status("Search failed", "error")
return
# Update search panel status
search_panel.update_status(len(results), searching=False)
if not results:
# Clear the envelope list and show "no results"
self._display_search_results([], query)
return
self.search_query = query
self.search_mode = True
self._display_search_results(results, query)
if focus_results:
# Focus the main content and select first result
if results:
first_id = int(results[0].get("id", 0))
if first_id:
self.current_message_id = first_id
self.action_focus_4()
def _display_search_results(
self, results: List[Dict[str, Any]], query: str
) -> None:
"""Display search results in the envelope list with a header."""
envelopes_list = self.query_one("#envelopes_list", ListView)
envelopes_list.clear()
config = get_config()
# Add search results header
header_label = f"Search: '{query}' ({len(results)} result{'s' if len(results) != 1 else ''})"
envelopes_list.append(ListItem(GroupHeader(label=header_label)))
# Create a temporary message store for search results
search_store = MessageStore()
search_store.load(results, self.sort_order_ascending)
# Store for navigation (replace main store temporarily)
self.message_store.envelopes = search_store.envelopes
self.total_messages = len(results)
for item in search_store.envelopes:
if item and item.get("type") == "header":
envelopes_list.append(ListItem(GroupHeader(label=item["label"])))
elif item:
message_id = int(item.get("id", 0))
is_selected = message_id in self.selected_messages
envelope_widget = EnvelopeListItem(
envelope=item,
config=config.envelope_display,
is_selected=is_selected,
)
envelopes_list.append(ListItem(envelope_widget))
# Update border title to show search mode
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one(
"#envelopes_list"
).border_title = f"Search Results {sort_indicator}"
# Select first result if available
if len(envelopes_list.children) > 1:
envelopes_list.index = 1 # Skip header
def action_focus_1(self) -> None:
self.query_one("#envelopes_list").focus()

View File

@@ -3,7 +3,7 @@
#main_content, .list_view {
scrollbar-size: 1 1;
border: round rgb(117, 106, 129);
border: round $border;
height: 1fr;
}
@@ -43,18 +43,18 @@
#main_content:focus, .list_view:focus {
border: round $secondary;
background: rgb(55, 53, 57);
background: $surface;
border-title-style: bold;
}
Label#task_prompt {
padding: 1;
color: rgb(128,128,128);
color: $text-muted;
}
Label#task_prompt_label {
padding: 1;
color: rgb(255, 216, 102);
color: $warning;
}
Label#message_label {
@@ -66,7 +66,7 @@ StatusTitle {
width: 100%;
height: 1;
color: $text;
background: rgb(64, 62, 65);
background: $panel;
content-align: center middle;
}
@@ -113,8 +113,8 @@ EnvelopeListItem .envelope-row-3 {
}
EnvelopeListItem .status-icon {
width: 3;
padding: 0 1 0 0;
width: 2;
padding: 0;
color: $text-muted;
}
@@ -124,7 +124,7 @@ EnvelopeListItem .status-icon.unread {
EnvelopeListItem .checkbox {
width: 2;
padding: 0 1 0 0;
padding: 0;
}
EnvelopeListItem .sender-name {
@@ -166,11 +166,11 @@ EnvelopeListItem.selected {
GroupHeader {
height: 1;
width: 1fr;
background: rgb(64, 62, 65);
background: $panel;
}
GroupHeader .group-header-label {
color: rgb(160, 160, 160);
color: $text-muted;
text-style: bold;
padding: 0 1;
width: 1fr;
@@ -222,10 +222,10 @@ GroupHeader .group-header-label {
#envelopes_list {
ListItem:odd {
background: rgb(45, 45, 46);
background: $surface;
}
ListItem:even {
background: rgb(50, 50, 56);
background: $surface-darken-1;
}
& > ListItem {
@@ -269,9 +269,9 @@ GroupHeader .group-header-label {
}
Label.group_header {
color: rgb(140, 140, 140);
color: $text-muted;
text-style: bold;
background: rgb(64, 62, 65);
background: $panel;
width: 100%;
padding: 0 1;
}
@@ -300,6 +300,3 @@ ContentContainer {
width: 100%;
height: 1fr;
}
.checkbox {
padding-right: 1;
}

View File

@@ -148,3 +148,69 @@ class MessageStore:
self.total_messages = len(self.metadata_by_id)
else:
logging.warning(f"Invalid index {index} for message ID {message_id}")
def filter_by_query(self, query: str) -> List[Dict[str, Any]]:
"""Filter envelopes by search query.
Searches subject, from name, from address, to name, and to address.
Returns a new list of filtered envelopes (with headers regenerated).
"""
if not query or not query.strip():
return self.envelopes
query_lower = query.lower().strip()
filtered = []
current_month = None
for item in self.envelopes:
if item is None:
continue
# Skip headers - we'll regenerate them
if item.get("type") == "header":
continue
# Check if envelope matches query
# Use "or ''" to handle None values (key exists but value is None)
subject = (item.get("subject") or "").lower()
from_info = item.get("from", {})
from_name = (
(from_info.get("name") or "").lower()
if isinstance(from_info, dict)
else ""
)
from_addr = (
(from_info.get("addr") or "").lower()
if isinstance(from_info, dict)
else ""
)
to_info = item.get("to", {})
to_name = (
(to_info.get("name") or "").lower() if isinstance(to_info, dict) else ""
)
to_addr = (
(to_info.get("addr") or "").lower() if isinstance(to_info, dict) else ""
)
if (
query_lower in subject
or query_lower in from_name
or query_lower in from_addr
or query_lower in to_name
or query_lower in to_addr
):
# Regenerate month header if needed
date_str = item.get("date", "")
try:
date = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
month_key = date.strftime("%B %Y")
except (ValueError, TypeError):
month_key = "Unknown Date"
if month_key != current_month:
current_month = month_key
filtered.append({"type": "header", "label": month_key})
filtered.append(item)
return filtered

View File

@@ -0,0 +1,300 @@
"""Docked search panel for mail app with live search.
Provides a search input docked to the top of the window with:
- Live search with 1 second debounce
- Cancel button to restore previous state
- Help button showing Himalaya search syntax
"""
from typing import Optional, List, Dict, Any
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.message import Message
from textual.reactive import reactive
from textual.screen import ModalScreen
from textual.timer import Timer
from textual.widget import Widget
from textual.widgets import Button, Input, Label, Static
HIMALAYA_SEARCH_HELP = """
## Himalaya Search Query Syntax
A filter query is composed of operators and conditions:
### Operators
- `not <condition>` - filter envelopes that do NOT match the condition
- `<condition> and <condition>` - filter envelopes matching BOTH conditions
- `<condition> or <condition>` - filter envelopes matching EITHER condition
### Conditions
- `date <yyyy-mm-dd>` - match the given date
- `before <yyyy-mm-dd>` - date strictly before the given date
- `after <yyyy-mm-dd>` - date strictly after the given date
- `from <pattern>` - senders matching the pattern
- `to <pattern>` - recipients matching the pattern
- `subject <pattern>` - subject matching the pattern
- `body <pattern>` - text body matching the pattern
- `flag <flag>` - envelopes with the given flag (e.g., `seen`, `flagged`)
### Examples
- `from john` - emails from anyone named John
- `subject meeting and after 2025-01-01` - meetings after Jan 1st
- `not flag seen` - unread emails
- `from boss or from manager` - emails from boss or manager
- `body urgent and before 2025-12-01` - urgent emails before Dec 1st
### Sort Query
Start with `order by`, followed by:
- `date [asc|desc]`
- `from [asc|desc]`
- `to [asc|desc]`
- `subject [asc|desc]`
Example: `from john order by date desc`
""".strip()
class SearchHelpModal(ModalScreen[None]):
"""Modal showing Himalaya search syntax help."""
DEFAULT_CSS = """
SearchHelpModal {
align: center middle;
}
SearchHelpModal > Vertical {
width: 80;
max-width: 90%;
height: auto;
max-height: 80%;
border: solid $primary;
background: $surface;
padding: 1 2;
}
SearchHelpModal > Vertical > Static {
height: auto;
max-height: 100%;
}
SearchHelpModal > Vertical > Horizontal {
height: auto;
align: center middle;
margin-top: 1;
}
"""
BINDINGS = [
Binding("escape", "close", "Close"),
]
def compose(self) -> ComposeResult:
from textual.widgets import Markdown
with Vertical():
yield Markdown(HIMALAYA_SEARCH_HELP)
with Horizontal():
yield Button("Close", variant="primary", id="close-btn")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "close-btn":
self.dismiss(None)
def action_close(self) -> None:
self.dismiss(None)
class SearchPanel(Widget):
"""Docked search panel with live search capability."""
DEFAULT_CSS = """
SearchPanel {
dock: top;
height: auto;
width: 100%;
background: $surface;
border-bottom: solid $primary;
padding: 0 1;
display: none;
}
SearchPanel.visible {
display: block;
}
SearchPanel > Horizontal {
height: auto;
width: 100%;
align: left middle;
}
SearchPanel .search-label {
width: auto;
padding: 0 1;
color: $primary;
}
SearchPanel Input {
width: 1fr;
margin: 0 1;
}
SearchPanel Button {
width: auto;
min-width: 8;
margin: 0 0 0 1;
}
SearchPanel .search-status {
width: auto;
padding: 0 1;
color: $text-muted;
}
"""
BINDINGS = [
Binding("escape", "cancel", "Cancel search", show=False),
]
# Reactive to track search state
is_searching: reactive[bool] = reactive(False)
result_count: reactive[int] = reactive(-1) # -1 = no search yet
class SearchRequested(Message):
"""Fired when a search should be performed."""
def __init__(self, query: str) -> None:
super().__init__()
self.query = query
class SearchCancelled(Message):
"""Fired when the search is cancelled."""
pass
class SearchConfirmed(Message):
"""Fired when user presses Enter to confirm search and focus results."""
def __init__(self, query: str) -> None:
super().__init__()
self.query = query
def __init__(
self,
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
self._debounce_timer: Optional[Timer] = None
self._last_query: str = ""
def compose(self) -> ComposeResult:
with Horizontal():
yield Label("Search:", classes="search-label")
yield Input(
placeholder="from <name> or subject <text> or body <text>...",
id="search-input",
)
yield Label("", classes="search-status", id="search-status")
yield Button("?", variant="default", id="help-btn")
yield Button("Cancel", variant="warning", id="cancel-btn")
def show(self, initial_query: str = "") -> None:
"""Show the search panel and focus the input."""
self.add_class("visible")
input_widget = self.query_one("#search-input", Input)
input_widget.value = initial_query
self._last_query = initial_query
input_widget.focus()
def hide(self) -> None:
"""Hide the search panel."""
self.remove_class("visible")
self._cancel_debounce()
self.result_count = -1
@property
def is_visible(self) -> bool:
"""Check if the panel is visible."""
return self.has_class("visible")
@property
def search_query(self) -> str:
"""Get the current search query."""
return self.query_one("#search-input", Input).value
def _cancel_debounce(self) -> None:
"""Cancel any pending debounced search."""
if self._debounce_timer:
self._debounce_timer.stop()
self._debounce_timer = None
def _trigger_search(self) -> None:
"""Trigger the actual search after debounce."""
query = self.query_one("#search-input", Input).value.strip()
if query and query != self._last_query:
self._last_query = query
self.is_searching = True
self.post_message(self.SearchRequested(query))
def on_input_changed(self, event: Input.Changed) -> None:
"""Handle input changes with debounce."""
if event.input.id != "search-input":
return
# Cancel any existing timer
self._cancel_debounce()
# Don't search empty queries
if not event.value.strip():
return
# Set up new debounce timer (1 second)
self._debounce_timer = self.set_timer(1.0, self._trigger_search)
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle Enter key - trigger search immediately and confirm."""
if event.input.id != "search-input":
return
self._cancel_debounce()
query = event.value.strip()
if query:
self._last_query = query
self.is_searching = True
self.post_message(self.SearchConfirmed(query))
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "cancel-btn":
self.action_cancel()
elif event.button.id == "help-btn":
self.app.push_screen(SearchHelpModal())
def action_cancel(self) -> None:
"""Cancel the search and restore previous state."""
self._cancel_debounce()
self.hide()
self.post_message(self.SearchCancelled())
def update_status(self, count: int, searching: bool = False) -> None:
"""Update the search status display."""
self.is_searching = searching
self.result_count = count
status = self.query_one("#search-status", Label)
if searching:
status.update("Searching...")
elif count >= 0:
status.update(f"{count} result{'s' if count != 1 else ''}")
else:
status.update("")
def watch_is_searching(self, searching: bool) -> None:
"""Update UI when searching state changes."""
status = self.query_one("#search-status", Label)
if searching:
status.update("Searching...")

View File

@@ -4,6 +4,7 @@ from .OpenMessage import OpenMessageScreen
from .DocumentViewer import DocumentViewerScreen
from .LinkPanel import LinkPanel, LinkItem, extract_links_from_content
from .ConfirmDialog import ConfirmDialog
from .SearchPanel import SearchPanel, SearchHelpModal
__all__ = [
"CreateTaskScreen",
@@ -13,4 +14,6 @@ __all__ = [
"LinkItem",
"extract_links_from_content",
"ConfirmDialog",
"SearchPanel",
"SearchHelpModal",
]

View File

@@ -44,12 +44,17 @@ class EnvelopeListItem(Static):
EnvelopeListItem .status-icon {
width: 2;
padding: 0 1 0 0;
padding: 0;
}
EnvelopeListItem .checkbox {
width: 2;
padding: 0 1 0 0;
padding: 0;
}
EnvelopeListItem .checkbox {
width: 2;
padding: 0;
}
EnvelopeListItem .sender-name {

View File

@@ -349,3 +349,43 @@ class DstaskClient(TaskBackend):
# This needs to run without capturing output
result = self._run_command(["note", task_id], capture_output=False)
return result.returncode == 0
def get_context(self) -> Optional[str]:
"""Get the current context filter.
Returns:
Current context string, or None if no context is set
"""
result = self._run_command(["context"])
if result.returncode == 0:
context = result.stdout.strip()
return context if context else None
return None
def set_context(self, context: Optional[str]) -> bool:
"""Set the context filter.
Args:
context: Context string (e.g., "+work", "project:foo") or None to clear
Returns:
True if successful
"""
if context is None or context.lower() == "none" or context == "":
result = self._run_command(["context", "none"])
else:
result = self._run_command(["context", context])
return result.returncode == 0
def get_contexts(self) -> list[str]:
"""Get available contexts based on tags.
For dstask, contexts are typically tag-based filters like "+work".
We derive available contexts from the existing tags.
Returns:
List of context strings (tag-based)
"""
# Get all tags and convert to context format
tags = self.get_tags()
return [f"+{tag}" for tag in tags if tag]

View File

@@ -312,6 +312,57 @@ async def mark_as_read(
return str(e), False
async def search_envelopes(
query: str,
folder: Optional[str] = None,
account: Optional[str] = None,
limit: int = 100,
) -> Tuple[List[Dict[str, Any]], bool]:
"""
Search for envelopes matching a query using Himalaya CLI.
The query is searched across from, to, subject, and body fields.
Args:
query: The search term to look for
folder: The folder to search in (defaults to INBOX)
account: The account to use (defaults to default account)
limit: Maximum number of results to return
Returns:
Tuple containing:
- List of matching envelope dictionaries
- Success status (True if operation was successful)
"""
try:
# Build a compound query to search from, to, subject, and body
# Himalaya query syntax: from <pattern> or to <pattern> or subject <pattern> or body <pattern>
search_query = f"from {query} or to {query} or subject {query} or body {query}"
cmd = f"himalaya envelope list -o json -s {limit} {search_query}"
if folder:
cmd += f" -f '{folder}'"
if account:
cmd += f" -a '{account}'"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
envelopes = json.loads(stdout.decode())
return envelopes, True
else:
logging.error(f"Error searching envelopes: {stderr.decode()}")
return [], False
except Exception as e:
logging.error(f"Exception during envelope search: {e}")
return [], False
def sync_himalaya():
"""This command does not exist. Halucinated by AI."""
try:

View File

@@ -468,3 +468,76 @@ async def sync_local_calendar_changes(
)
return created_count, deleted_count
async def fetch_pending_invites(headers, days_forward=30):
"""
Fetch calendar invites that need a response (pending/tentative).
Args:
headers (dict): Headers including authentication.
days_forward (int): Number of days to look forward.
Returns:
list: List of invite dictionaries with response status info.
"""
start_date = datetime.now()
end_date = start_date + timedelta(days=days_forward)
start_date_str = start_date.strftime("%Y-%m-%dT00:00:00Z")
end_date_str = end_date.strftime("%Y-%m-%dT23:59:59Z")
# Fetch events with response status
calendar_url = (
f"https://graph.microsoft.com/v1.0/me/calendarView?"
f"startDateTime={start_date_str}&endDateTime={end_date_str}&"
f"$select=id,subject,organizer,start,end,location,isAllDay,responseStatus,isCancelled&"
f"$filter=responseStatus/response eq 'notResponded' or responseStatus/response eq 'tentativelyAccepted'&"
f"$orderby=start/dateTime"
)
invites = []
try:
response_data = await fetch_with_aiohttp(calendar_url, headers)
invites.extend(response_data.get("value", []))
# Handle pagination
next_link = response_data.get("@odata.nextLink")
while next_link:
response_data = await fetch_with_aiohttp(next_link, headers)
invites.extend(response_data.get("value", []))
next_link = response_data.get("@odata.nextLink")
except Exception as e:
print(f"Error fetching pending invites: {e}")
return invites
async def respond_to_invite(headers, event_id, response):
"""
Respond to a calendar invite.
Args:
headers (dict): Authentication headers
event_id (str): The ID of the event to respond to
response (str): Response type - 'accept', 'tentativelyAccept', or 'decline'
Returns:
bool: True if response was successful
"""
valid_responses = ["accept", "tentativelyAccept", "decline"]
if response not in valid_responses:
print(f"Invalid response type: {response}. Must be one of {valid_responses}")
return False
try:
response_url = (
f"https://graph.microsoft.com/v1.0/me/events/{event_id}/{response}"
)
status = await post_with_aiohttp(response_url, headers, {})
return status in (200, 202)
except Exception as e:
print(f"Error responding to invite: {e}")
return False

View File

@@ -13,8 +13,8 @@ logging.getLogger("aiohttp.access").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("asyncio").setLevel(logging.ERROR)
# Define a global semaphore for throttling - reduced for better compliance
semaphore = asyncio.Semaphore(2)
# Define a global semaphore for throttling - increased for better parallelization
semaphore = asyncio.Semaphore(5)
async def _handle_throttling_retry(func, *args, max_retries=3):

View File

@@ -110,26 +110,47 @@ async def fetch_mail_async(
progress.update(task_id, total=len(messages_to_download), completed=0)
downloaded_count = 0
for message in messages_to_download:
# Download messages in parallel batches for better performance
BATCH_SIZE = 5
for i in range(0, len(messages_to_download), BATCH_SIZE):
# Check if task was cancelled/disabled
if is_cancelled and is_cancelled():
progress.console.print("Task cancelled, stopping inbox fetch")
break
progress.console.print(
f"Processing message: {message.get('subject', 'No Subject')}", end="\r"
)
await save_mime_to_maildir_async(
maildir_path,
message,
attachments_dir,
headers,
progress,
dry_run,
download_attachments,
)
progress.update(task_id, advance=1)
downloaded_count += 1
batch = messages_to_download[i : i + BATCH_SIZE]
# Create tasks for parallel download
async def download_message(message):
progress.console.print(
f"Processing message: {message.get('subject', 'No Subject')[:50]}",
end="\r",
)
await save_mime_to_maildir_async(
maildir_path,
message,
attachments_dir,
headers,
progress,
dry_run,
download_attachments,
)
return 1
# Execute batch in parallel
tasks = [download_message(msg) for msg in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successful downloads
batch_success = sum(1 for r in results if r == 1)
downloaded_count += batch_success
progress.update(task_id, advance=len(batch))
# Log any errors
for idx, result in enumerate(results):
if isinstance(result, Exception):
progress.console.print(f"Error downloading message: {result}")
progress.update(task_id, completed=downloaded_count)
progress.console.print(f"\nFinished downloading {downloaded_count} new messages.")
@@ -461,37 +482,57 @@ async def fetch_archive_mail_async(
# Update progress to reflect only the messages we actually need to download
progress.update(task_id, total=len(messages_to_download), completed=0)
# Load sync state once, we'll update it incrementally
# Load sync state once, we'll update it after each batch for resilience
synced_ids = _load_archive_sync_state(maildir_path) if not dry_run else set()
downloaded_count = 0
for message in messages_to_download:
# Download messages in parallel batches for better performance
BATCH_SIZE = 5
for i in range(0, len(messages_to_download), BATCH_SIZE):
# Check if task was cancelled/disabled
if is_cancelled and is_cancelled():
progress.console.print("Task cancelled, stopping archive fetch")
break
progress.console.print(
f"Processing archived message: {message.get('subject', 'No Subject')[:50]}",
end="\r",
)
# Save to .Archive folder instead of main maildir
await save_mime_to_maildir_async(
archive_dir, # Use archive_dir instead of maildir_path
message,
attachments_dir,
headers,
progress,
dry_run,
download_attachments,
)
progress.update(task_id, advance=1)
downloaded_count += 1
batch = messages_to_download[i : i + BATCH_SIZE]
batch_msg_ids = []
# Update sync state after each message for resilience
# This ensures we don't try to re-upload this message in archive_mail_async
if not dry_run:
synced_ids.add(message["id"])
# Create tasks for parallel download
async def download_message(message):
progress.console.print(
f"Processing archived message: {message.get('subject', 'No Subject')[:50]}",
end="\r",
)
# Save to .Archive folder instead of main maildir
await save_mime_to_maildir_async(
archive_dir, # Use archive_dir instead of maildir_path
message,
attachments_dir,
headers,
progress,
dry_run,
download_attachments,
)
return message["id"]
# Execute batch in parallel
tasks = [download_message(msg) for msg in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results and collect successful message IDs
for result in results:
if isinstance(result, Exception):
progress.console.print(f"Error downloading archived message: {result}")
elif result:
batch_msg_ids.append(result)
downloaded_count += 1
progress.update(task_id, advance=len(batch))
# Update sync state after each batch (not each message) for resilience + performance
if not dry_run and batch_msg_ids:
synced_ids.update(batch_msg_ids)
_save_archive_sync_state(maildir_path, synced_ids)
progress.update(task_id, completed=downloaded_count)

View File

@@ -46,24 +46,20 @@ class TasksApp(App):
CSS = """
Screen {
layout: grid;
grid-size: 2;
grid-columns: auto 1fr;
grid-rows: auto 1fr auto auto;
layout: horizontal;
}
Header {
column-span: 2;
dock: top;
}
Footer {
column-span: 2;
dock: bottom;
}
#sidebar {
width: 28;
height: 100%;
row-span: 1;
}
#sidebar.hidden {
@@ -116,7 +112,6 @@ class TasksApp(App):
background: $surface;
color: $text-muted;
padding: 0 1;
column-span: 2;
}
#detail-pane {
@@ -124,7 +119,6 @@ class TasksApp(App):
height: 50%;
border-top: solid $primary;
background: $surface;
column-span: 2;
}
#detail-pane.hidden {
@@ -154,7 +148,6 @@ class TasksApp(App):
border-top: solid $primary;
padding: 1;
background: $surface;
column-span: 2;
}
#notes-pane.hidden {
@@ -204,6 +197,10 @@ class TasksApp(App):
self.tasks = []
self.projects = []
self.tags = []
self.all_projects = [] # Stable list of all projects (not filtered)
self.all_tags = [] # Stable list of all tags (not filtered)
self.all_contexts = [] # Available contexts from backend
self.current_context = None # Current active context
self.current_project_filter = None
self.current_tag_filters = []
self.current_sort_column = "priority"
@@ -268,7 +265,10 @@ class TasksApp(App):
height = max(10, min(90, height))
notes_pane.styles.height = f"{height}%"
# Load tasks (this will also update the sidebar)
# Load ALL projects and tags once for stable sidebar
self._load_all_filters()
# Load tasks (filtered by current filters)
self.load_tasks()
def _setup_columns(self, table: DataTable, columns: list[str]) -> None:
@@ -382,32 +382,67 @@ class TasksApp(App):
if not self.backend:
return
# Get tasks with current filters
self.tasks = self.backend.get_tasks(
project=self.current_project_filter,
tags=self.current_tag_filters if self.current_tag_filters else None,
)
# Get ALL tasks first (unfiltered)
all_tasks = self.backend.get_tasks()
# Apply client-side filtering for OR logic
self.tasks = self._filter_tasks(all_tasks)
# Sort tasks
self._sort_tasks()
# Also load projects and tags for filtering
self.projects = self.backend.get_projects()
self.tags = self.backend.get_tags()
# Update sidebar with available filters
self._update_sidebar()
# Update table
self._update_table()
def _load_all_filters(self) -> None:
"""Load all projects, tags, and contexts once for stable sidebar."""
if not self.backend:
return
self.all_projects = self.backend.get_projects()
self.all_tags = self.backend.get_tags()
self.all_contexts = self.backend.get_contexts()
self.current_context = self.backend.get_context()
# Update sidebar with stable filter options
self._update_sidebar()
def _filter_tasks(self, tasks: list[Task]) -> list[Task]:
"""Filter tasks by current project and tag filters using OR logic.
- If project filter is set, only show tasks from that project
- If tag filters are set, show tasks that have ANY of the selected tags (OR)
"""
filtered = tasks
# Filter by project (single project filter is AND)
if self.current_project_filter:
filtered = [t for t in filtered if t.project == self.current_project_filter]
# Filter by tags using OR logic - show tasks with ANY of the selected tags
if self.current_tag_filters:
filtered = [
t
for t in filtered
if any(tag in t.tags for tag in self.current_tag_filters)
]
return filtered
def _update_sidebar(self) -> None:
"""Update the filter sidebar with current projects and tags."""
"""Update the filter sidebar with all available projects, tags, and contexts."""
try:
sidebar = self.query_one("#sidebar", FilterSidebar)
# Convert projects to (name, count) tuples
project_data = [(p.name, p.task_count) for p in self.projects if p.name]
sidebar.update_filters(projects=project_data, tags=self.tags)
# Use stable all_projects/all_tags/all_contexts, not filtered ones
project_data = [(p.name, p.task_count) for p in self.all_projects if p.name]
sidebar.update_filters(
contexts=self.all_contexts,
projects=project_data,
tags=self.all_tags,
)
# Set current context in sidebar if loaded from backend
if self.current_context:
sidebar.set_current_context(self.current_context)
except Exception:
pass # Sidebar may not be mounted yet
@@ -670,6 +705,21 @@ class TasksApp(App):
else:
sidebar.add_class("hidden")
def on_filter_sidebar_context_changed(
self, event: FilterSidebar.ContextChanged
) -> None:
"""Handle context changes from sidebar."""
if event.context != self.current_context:
# Set context via backend (this persists it)
if self.backend:
self.backend.set_context(event.context)
self.current_context = event.context
self.load_tasks()
if event.context:
self.notify(f"Context set: {event.context}")
else:
self.notify("Context cleared")
def on_filter_sidebar_project_filter_changed(
self, event: FilterSidebar.ProjectFilterChanged
) -> None:

View File

@@ -269,3 +269,36 @@ class TaskBackend(ABC):
True if successful
"""
pass
@abstractmethod
def get_context(self) -> Optional[str]:
"""Get the current context filter.
Returns:
Current context string, or None if no context is set
"""
pass
@abstractmethod
def set_context(self, context: Optional[str]) -> bool:
"""Set the context filter.
Args:
context: Context string (e.g., "+work", "project:foo") or None to clear
Returns:
True if successful
"""
pass
@abstractmethod
def get_contexts(self) -> list[str]:
"""Get available predefined contexts.
For taskwarrior, returns named contexts from config.
For dstask, may return common tag-based contexts.
Returns:
List of context names/filters
"""
pass

View File

@@ -18,7 +18,7 @@ from textual.widgets.selection_list import Selection
class FilterSidebar(Widget):
"""Collapsible sidebar with project filter, tag filter, and sort options."""
"""Collapsible sidebar with context, project filter, tag filter, and sort options."""
DEFAULT_CSS = """
FilterSidebar {
@@ -53,6 +53,13 @@ class FilterSidebar(Widget):
border-title-style: bold;
}
/* Context section - single selection list */
FilterSidebar #context-list {
height: auto;
max-height: 6;
min-height: 3;
}
FilterSidebar .sort-section {
height: auto;
border: round rgb(117, 106, 129);
@@ -110,6 +117,13 @@ class FilterSidebar(Widget):
self.tags = tags
super().__init__()
class ContextChanged(Message):
"""Sent when context selection changes."""
def __init__(self, context: Optional[str]) -> None:
self.context = context
super().__init__()
class SortChanged(Message):
"""Sent when sort settings change."""
@@ -128,8 +142,10 @@ class FilterSidebar(Widget):
]
# Reactive properties - use factory functions for mutable defaults
contexts: reactive[list[str]] = reactive(list)
projects: reactive[list[tuple[str, int]]] = reactive(list)
tags: reactive[list[str]] = reactive(list)
current_context: reactive[Optional[str]] = reactive(None)
current_project: reactive[Optional[str]] = reactive(None)
current_tags: reactive[list[str]] = reactive(list)
current_sort_column: reactive[str] = reactive("priority")
@@ -137,8 +153,10 @@ class FilterSidebar(Widget):
def __init__(
self,
contexts: Optional[list[str]] = None,
projects: Optional[list[tuple[str, int]]] = None,
tags: Optional[list[str]] = None,
current_context: Optional[str] = None,
current_project: Optional[str] = None,
current_tags: Optional[list[str]] = None,
current_sort_column: str = "priority",
@@ -146,8 +164,10 @@ class FilterSidebar(Widget):
**kwargs,
):
super().__init__(**kwargs)
self.contexts = contexts or []
self.projects = projects or []
self.tags = tags or []
self.current_context = current_context
self.current_project = current_project
self.current_tags = current_tags or []
self.current_sort_column = current_sort_column
@@ -155,6 +175,9 @@ class FilterSidebar(Widget):
def compose(self) -> ComposeResult:
with ScrollableContainer(id="sidebar-scroll"):
# Context filter section - bordered list (at top since it's global)
yield SelectionList[str](id="context-list", classes="filter-list")
# Project filter section - bordered list
yield SelectionList[str](id="project-list", classes="filter-list")
@@ -187,6 +210,9 @@ class FilterSidebar(Widget):
def on_mount(self) -> None:
"""Initialize the sidebar with current filter state and set border titles."""
# Set border titles like mail app
context_list = self.query_one("#context-list", SelectionList)
context_list.border_title = "Context"
project_list = self.query_one("#project-list", SelectionList)
project_list.border_title = "Projects"
@@ -197,12 +223,19 @@ class FilterSidebar(Widget):
sort_section.border_title = "Sort"
# Update the lists
self._update_context_list()
self._update_project_list()
self._update_tag_list()
self._update_subtitles()
def _update_subtitles(self) -> None:
"""Update border subtitles to show selection counts."""
context_list = self.query_one("#context-list", SelectionList)
if self.current_context:
context_list.border_subtitle = f"[b]{self.current_context}[/b]"
else:
context_list.border_subtitle = "none"
project_list = self.query_one("#project-list", SelectionList)
if self.current_project:
project_list.border_subtitle = f"[b]{self.current_project}[/b]"
@@ -224,6 +257,20 @@ class FilterSidebar(Widget):
)
sort_section.border_subtitle = f"{col_display} {direction}"
def _update_context_list(self) -> None:
"""Update the context selection list."""
context_list = self.query_one("#context-list", SelectionList)
context_list.clear_options()
for ctx in self.contexts:
context_list.add_option(
Selection(
ctx,
ctx,
initial_state=ctx == self.current_context,
)
)
def _update_project_list(self) -> None:
"""Update the project selection list."""
project_list = self.query_one("#project-list", SelectionList)
@@ -254,10 +301,14 @@ class FilterSidebar(Widget):
def update_filters(
self,
contexts: Optional[list[str]] = None,
projects: Optional[list[tuple[str, int]]] = None,
tags: Optional[list[str]] = None,
) -> None:
"""Update available projects and tags."""
"""Update available contexts, projects and tags."""
if contexts is not None:
self.contexts = contexts
self._update_context_list()
if projects is not None:
self.projects = projects
self._update_project_list()
@@ -272,6 +323,12 @@ class FilterSidebar(Widget):
self._update_project_list()
self._update_subtitles()
def set_current_context(self, context: Optional[str]) -> None:
"""Set the current context (updates UI to match)."""
self.current_context = context
self._update_context_list()
self._update_subtitles()
def set_current_tags(self, tags: list[str]) -> None:
"""Set the current tag filters (updates UI to match)."""
self.current_tags = tags
@@ -297,6 +354,30 @@ class FilterSidebar(Widget):
self._update_subtitles()
@on(SelectionList.SelectedChanged, "#context-list")
def _on_context_selection_changed(
self, event: SelectionList.SelectedChanged
) -> None:
"""Handle context selection changes."""
selected = list(event.selection_list.selected)
# For context, we only allow single selection
if selected:
new_context = selected[0]
# If same context clicked again, deselect it (clear context)
if new_context == self.current_context:
self.current_context = None
event.selection_list.deselect(new_context)
else:
# Deselect previous if any
if self.current_context:
event.selection_list.deselect(self.current_context)
self.current_context = new_context
else:
self.current_context = None
self._update_subtitles()
self.post_message(self.ContextChanged(self.current_context))
@on(SelectionList.SelectedChanged, "#project-list")
def _on_project_selection_changed(
self, event: SelectionList.SelectedChanged

318
src/utils/ipc.py Normal file
View File

@@ -0,0 +1,318 @@
"""Inter-Process Communication using Unix Domain Sockets.
This module provides a simple pub/sub mechanism for cross-app notifications.
The sync daemon can broadcast messages when data changes, and TUI apps can
listen for these messages to refresh their displays.
Usage:
# In sync daemon (publisher):
from src.utils.ipc import notify_refresh
await notify_refresh("mail") # Notify mail app to refresh
await notify_refresh("calendar") # Notify calendar app to refresh
await notify_refresh("tasks") # Notify tasks app to refresh
# In TUI apps (subscriber):
from src.utils.ipc import IPCListener
class MyApp(App):
def on_mount(self):
self.ipc_listener = IPCListener("mail", self.on_refresh)
self.ipc_listener.start()
def on_unmount(self):
self.ipc_listener.stop()
async def on_refresh(self, message):
# Refresh the app's data
await self.refresh_data()
"""
import asyncio
import json
import os
import socket
import threading
from pathlib import Path
from typing import Callable, Optional, Any, Dict
# Socket paths for each app type
SOCKET_DIR = Path("~/.local/share/luk/ipc").expanduser()
SOCKET_PATHS = {
"mail": SOCKET_DIR / "mail.sock",
"calendar": SOCKET_DIR / "calendar.sock",
"tasks": SOCKET_DIR / "tasks.sock",
}
def ensure_socket_dir():
"""Ensure the socket directory exists."""
SOCKET_DIR.mkdir(parents=True, exist_ok=True)
def get_socket_path(app_type: str) -> Path:
"""Get the socket path for a given app type."""
if app_type not in SOCKET_PATHS:
raise ValueError(
f"Unknown app type: {app_type}. Must be one of: {list(SOCKET_PATHS.keys())}"
)
return SOCKET_PATHS[app_type]
class IPCMessage:
"""A message sent via IPC."""
def __init__(self, event: str, data: Optional[Dict[str, Any]] = None):
self.event = event
self.data = data or {}
def to_json(self) -> str:
return json.dumps({"event": self.event, "data": self.data})
@classmethod
def from_json(cls, json_str: str) -> "IPCMessage":
parsed = json.loads(json_str)
return cls(event=parsed["event"], data=parsed.get("data", {}))
async def notify_refresh(app_type: str, data: Optional[Dict[str, Any]] = None) -> bool:
"""Send a refresh notification to a specific app.
Args:
app_type: The type of app to notify ("mail", "calendar", "tasks")
data: Optional data to include with the notification
Returns:
True if the notification was sent successfully, False otherwise
"""
socket_path = get_socket_path(app_type)
if not socket_path.exists():
# No listener, that's okay
return False
try:
message = IPCMessage("refresh", data)
# Connect to the socket and send the message
reader, writer = await asyncio.open_unix_connection(str(socket_path))
writer.write((message.to_json() + "\n").encode())
await writer.drain()
writer.close()
await writer.wait_closed()
return True
except (ConnectionRefusedError, FileNotFoundError, OSError):
# Socket exists but no one is listening, or other error
return False
except Exception:
return False
async def notify_all(data: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
"""Send a refresh notification to all apps.
Args:
data: Optional data to include with the notification
Returns:
Dictionary of app_type -> success status
"""
results = {}
for app_type in SOCKET_PATHS:
results[app_type] = await notify_refresh(app_type, data)
return results
class IPCListener:
"""Listens for IPC messages on a Unix socket.
Usage:
listener = IPCListener("mail", on_message_callback)
listener.start()
# ... later ...
listener.stop()
"""
def __init__(
self,
app_type: str,
callback: Callable[[IPCMessage], Any],
):
"""Initialize the IPC listener.
Args:
app_type: The type of app ("mail", "calendar", "tasks")
callback: Function to call when a message is received.
Can be sync or async.
"""
self.app_type = app_type
self.callback = callback
self.socket_path = get_socket_path(app_type)
self._server: Optional[asyncio.AbstractServer] = None
self._running = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
async def _handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
"""Handle an incoming client connection."""
try:
data = await reader.readline()
if data:
message_str = data.decode().strip()
if message_str:
message = IPCMessage.from_json(message_str)
# Call the callback (handle both sync and async)
result = self.callback(message)
if asyncio.iscoroutine(result):
await result
except Exception:
pass # Ignore errors from malformed messages
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
async def _run_server(self):
"""Run the Unix socket server."""
ensure_socket_dir()
# Remove stale socket file if it exists
if self.socket_path.exists():
self.socket_path.unlink()
self._server = await asyncio.start_unix_server(
self._handle_client, path=str(self.socket_path)
)
# Set socket permissions (readable/writable by owner only)
os.chmod(self.socket_path, 0o600)
async with self._server:
await self._server.serve_forever()
def _run_in_thread(self):
"""Run the event loop in a separate thread."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._run_server())
except asyncio.CancelledError:
pass
finally:
self._loop.close()
def start(self):
"""Start listening for IPC messages in a background thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run_in_thread, daemon=True)
self._thread.start()
def stop(self):
"""Stop listening for IPC messages."""
if not self._running:
return
self._running = False
# Cancel the server
if self._server and self._loop:
self._loop.call_soon_threadsafe(self._server.close)
# Stop the event loop
if self._loop:
self._loop.call_soon_threadsafe(self._loop.stop)
# Wait for thread to finish
if self._thread:
self._thread.join(timeout=1.0)
# Clean up socket file
if self.socket_path.exists():
try:
self.socket_path.unlink()
except Exception:
pass
class AsyncIPCListener:
"""Async version of IPCListener for use within an existing event loop.
Usage in a Textual app:
class MyApp(App):
async def on_mount(self):
self.ipc_listener = AsyncIPCListener("mail", self.on_refresh)
await self.ipc_listener.start()
async def on_unmount(self):
await self.ipc_listener.stop()
async def on_refresh(self, message):
self.refresh_data()
"""
def __init__(
self,
app_type: str,
callback: Callable[[IPCMessage], Any],
):
self.app_type = app_type
self.callback = callback
self.socket_path = get_socket_path(app_type)
self._server: Optional[asyncio.AbstractServer] = None
async def _handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
"""Handle an incoming client connection."""
try:
data = await reader.readline()
if data:
message_str = data.decode().strip()
if message_str:
message = IPCMessage.from_json(message_str)
result = self.callback(message)
if asyncio.iscoroutine(result):
await result
except Exception:
pass
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
async def start(self):
"""Start the Unix socket server."""
ensure_socket_dir()
if self.socket_path.exists():
self.socket_path.unlink()
self._server = await asyncio.start_unix_server(
self._handle_client, path=str(self.socket_path)
)
os.chmod(self.socket_path, 0o600)
async def stop(self):
"""Stop the server and clean up."""
if self._server:
self._server.close()
await self._server.wait_closed()
if self.socket_path.exists():
try:
self.socket_path.unlink()
except Exception:
pass

179
src/utils/search.py Normal file
View File

@@ -0,0 +1,179 @@
"""Reusable search input screen for TUI apps.
A modal input dialog that can be used for search across all apps.
"""
from typing import Optional
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Vertical, Horizontal
from textual.screen import ModalScreen
from textual.widgets import Input, Static, Label, Button
class SearchScreen(ModalScreen[str | None]):
"""A modal screen for search input.
Returns the search query string on submit, or None on cancel.
"""
DEFAULT_CSS = """
SearchScreen {
align: center middle;
}
SearchScreen > Vertical {
width: 60;
height: auto;
border: solid $primary;
background: $surface;
padding: 1 2;
}
SearchScreen > Vertical > Label {
margin-bottom: 1;
}
SearchScreen > Vertical > Input {
margin-bottom: 1;
}
SearchScreen > Vertical > Horizontal {
height: auto;
align: right middle;
}
SearchScreen > Vertical > Horizontal > Button {
margin-left: 1;
}
"""
BINDINGS = [
Binding("escape", "cancel", "Cancel", show=True),
]
def __init__(
self,
title: str = "Search",
placeholder: str = "Enter search query...",
initial_value: str = "",
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
self._title = title
self._placeholder = placeholder
self._initial_value = initial_value
def compose(self) -> ComposeResult:
with Vertical():
yield Label(self._title)
yield Input(
placeholder=self._placeholder,
value=self._initial_value,
id="search-input",
)
with Horizontal():
yield Button("Search", variant="primary", id="search-btn")
yield Button("Cancel", variant="default", id="cancel-btn")
def on_mount(self) -> None:
"""Focus the input on mount."""
self.query_one("#search-input", Input).focus()
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle Enter key in input."""
self.dismiss(event.value)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "search-btn":
query = self.query_one("#search-input", Input).value
self.dismiss(query)
elif event.button.id == "cancel-btn":
self.dismiss(None)
def action_cancel(self) -> None:
"""Cancel the search."""
self.dismiss(None)
class ClearableSearchInput(Static):
"""A search input widget with clear button for use in sidebars/headers.
Emits SearchInput.Submitted message when user submits a query.
Emits SearchInput.Cleared message when user clears the search.
"""
DEFAULT_CSS = """
ClearableSearchInput {
height: 3;
padding: 0 1;
}
ClearableSearchInput > Horizontal {
height: auto;
}
ClearableSearchInput > Horizontal > Input {
width: 1fr;
}
ClearableSearchInput > Horizontal > Button {
width: 3;
min-width: 3;
}
"""
from textual.message import Message
class Submitted(Message):
"""Search query was submitted."""
def __init__(self, query: str) -> None:
super().__init__()
self.query = query
class Cleared(Message):
"""Search was cleared."""
pass
def __init__(
self,
placeholder: str = "Search...",
name: Optional[str] = None,
id: Optional[str] = None,
classes: Optional[str] = None,
) -> None:
super().__init__(name=name, id=id, classes=classes)
self._placeholder = placeholder
def compose(self) -> ComposeResult:
with Horizontal():
yield Input(placeholder=self._placeholder, id="search-input")
yield Button("X", id="clear-btn", variant="error")
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle search submission."""
self.post_message(self.Submitted(event.value))
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle clear button."""
if event.button.id == "clear-btn":
input_widget = self.query_one("#search-input", Input)
input_widget.value = ""
input_widget.focus()
self.post_message(self.Cleared())
@property
def value(self) -> str:
"""Get the current search value."""
return self.query_one("#search-input", Input).value
@value.setter
def value(self, new_value: str) -> None:
"""Set the search value."""
self.query_one("#search-input", Input).value = new_value