move and rename module

This commit is contained in:
Bendt
2025-12-18 14:00:54 -05:00
parent 37be42884f
commit fe65183fb7
33 changed files with 26 additions and 24 deletions

1
src/mail/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Initialize the mail package

View File

@@ -0,0 +1 @@
# Initialize the actions subpackage

View File

@@ -0,0 +1,39 @@
from textual import work
from src.services.himalaya import client as himalaya_client
@work(exclusive=True)
async def archive_current(app):
"""Archive the current message."""
if not app.current_message_id:
app.show_status("No message selected to archive.", "error")
return
# Store the current message ID and index
current_message_id = app.current_message_id
current_index = app.current_message_index
# Find the next message to display after archiving
next_id, next_idx = app.message_store.find_next_valid_id(current_index)
if next_id is None or next_idx is None:
# If there's no next message, try to find a previous one
next_id, next_idx = app.message_store.find_prev_valid_id(current_index)
# Archive the message using our Himalaya client module
success = await himalaya_client.archive_messages([str(current_message_id)])
if success:
app.show_status(f"Message {current_message_id} archived.", "success")
app.message_store.remove_envelope(current_message_id)
app.refresh_list_view_items()
# Select the next available message if it exists
if next_id is not None and next_idx is not None:
app.current_message_id = next_id
app.current_message_index = next_idx
else:
# If there are no other messages, reset the UI
app.current_message_id = 0
app.show_status("No more messages available.", "warning")
else:
app.show_status(f"Failed to archive message {current_message_id}.", "error")

View File

@@ -0,0 +1,43 @@
import asyncio
import logging
from textual import work
from src.services.himalaya import client as himalaya_client
@work(exclusive=True)
async def delete_current(app):
"""Delete the current message."""
if not app.current_message_id:
app.show_status("No message selected to delete.", "error")
return
# Store the current message ID and index
current_message_id = app.current_message_id
current_index = app.current_message_index
# Find the next message to display after deletion
next_id, next_idx = app.message_store.find_next_valid_id(current_index)
if next_id is None or next_idx is None:
# If there's no next message, try to find a previous one
next_id, next_idx = app.message_store.find_prev_valid_id(current_index)
# Delete the message using our Himalaya client module
message, success = await himalaya_client.delete_message(current_message_id)
if success:
app.show_status(f"Message {current_message_id} deleted.", "success")
app.message_store.remove_envelope(current_message_id)
app.refresh_list_view()
# Select the next available message if it exists
if next_id is not None and next_idx is not None:
app.current_message_id = next_id
app.current_message_index = next_idx
else:
# If there are no other messages, reset the UI
app.current_message_id = 0
app.show_status("No more messages available.", "warning")
else:
app.show_status(
f"Failed to delete message {current_message_id}: {message}", "error"
)

View File

@@ -0,0 +1,17 @@
async def action_newest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if app.reload_needed:
await app.action_fetch_list()
ids = sorted(
(int(envelope["id"]) for envelope in app.all_envelopes), reverse=True
)
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

17
src/mail/actions/next.py Normal file
View File

@@ -0,0 +1,17 @@
async def action_next(app) -> None:
"""Show the next email message by finding the next higher ID from the list of envelope IDs."""
try:
if app.reload_needed:
app.action_fetch_list()
ids = sorted(int(envelope["id"]) for envelope in app.all_envelopes)
for envelope_id in ids:
if envelope_id > int(app.current_message_id):
app.show_message(envelope_id)
return
app.show_status("No newer messages found.", severity="warning")
app.action_newest()
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,15 @@
def action_oldest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if app.reload_needed:
app.action_fetch_list()
ids = sorted((int(envelope["id"]) for envelope in app.all_envelopes))
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

21
src/mail/actions/open.py Normal file
View File

@@ -0,0 +1,21 @@
from ..screens.OpenMessage import OpenMessageScreen
def action_open(app) -> None:
"""Show the input modal for opening a specific message by ID."""
def check_id(message_id: str | None) -> bool:
try:
int(message_id)
app.show_message(message_id)
if message_id is not None and message_id > 0:
app.show_message(message_id)
except ValueError:
app.bell()
app.show_status(
"Invalid message ID. Please enter an integer.", severity="error"
)
return True
return False
app.push_screen(OpenMessageScreen(), check_id)

View File

@@ -0,0 +1,20 @@
def action_previous(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if app.reload_needed:
app.action_fetch_list()
ids = sorted(
(int(envelope["id"]) for envelope in app.all_envelopes), reverse=True
)
for envelope_id in ids:
if envelope_id < int(app.current_message_id):
app.current_message_id = envelope_id
app.show_message(app.current_message_id)
return
app.show_status("No older messages found.", severity="warning")
app.action_oldest()
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,14 @@
import logging
from textual.logging import TextualHandler
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
def show_message(app, message_id: int) -> None:
"""Fetch and display the email message by ID."""
logging.info("Showing message ID: " + str(message_id))
app.current_message_id = message_id

54
src/mail/actions/task.py Normal file
View File

@@ -0,0 +1,54 @@
import asyncio
import logging
from textual import work
from textual.screen import ModalScreen
from src.services.taskwarrior import client as taskwarrior_client
from ..screens.CreateTask import CreateTaskScreen
class TaskAction:
def __init__(self, app):
self.app = app
def action_create_task(app):
"""Show the create task screen."""
current_message_id = app.current_message_id
if not current_message_id:
app.show_status("No message selected to create task from.", "error")
return
# Prepare data for the create task screen
metadata = app.message_store.get_metadata(current_message_id)
subject = metadata.get("subject", "No subject") if metadata else "No subject"
from_addr = metadata["from"].get("addr", "Unknown") if metadata else "Unknown"
# Show the create task screen with the current message data
app.push_screen(CreateTaskScreen(subject=subject, from_addr=from_addr))
@work(exclusive=True)
async def create_task(
subject, description=None, tags=None, project=None, due=None, priority=None
):
"""
Create a task with the Taskwarrior API client.
"""
try:
success, result = await taskwarrior_client.create_task(
task_description=subject,
tags=tags or [],
project=project,
due=due,
priority=priority,
)
if success:
return True, result
else:
logging.error(f"Failed to create task: {result}")
return False, result
except Exception as e:
logging.error(f"Exception creating task: {e}")
return False, str(e)

814
src/mail/app.py Normal file
View File

@@ -0,0 +1,814 @@
from .config import get_config, MailAppConfig
from .message_store import MessageStore
from .widgets.ContentContainer import ContentContainer
from .widgets.EnvelopeListItem import EnvelopeListItem, GroupHeader
from .screens.LinkPanel import LinkPanel
from .screens.ConfirmDialog import ConfirmDialog
from .actions.task import action_create_task
from .actions.open import action_open
from .actions.delete import delete_current
from src.services.taskwarrior import client as taskwarrior_client
from src.services.himalaya import client as himalaya_client
from textual.containers import Container, ScrollableContainer, Vertical, Horizontal
from textual.timer import Timer
from textual.binding import Binding
from textual.reactive import reactive, Reactive
from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem
from textual.screen import Screen
from textual.logging import TextualHandler
from textual.app import App, ComposeResult, SystemCommand, RenderResult
from textual.worker import Worker
from textual import work
import sys
import os
from datetime import UTC, datetime
import logging
from typing import Iterable, Optional, List, Dict, Any, Generator, Tuple
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import our new API modules
# Updated imports with correct relative paths
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
class StatusTitle(Static):
total_messages: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
current_message_id: Reactive[int] = reactive(1)
folder: Reactive[str] = reactive("INBOX")
def render(self) -> RenderResult:
return f"{self.folder} | ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}"
class EmailViewerApp(App):
"""A simple email viewer app using the Himalaya CLI."""
CSS_PATH = "email_viewer.tcss"
title = "Maildir GTD Reader"
current_message_id: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
highlighted_message_index: Reactive[int] = reactive(0)
folder = reactive("INBOX")
header_expanded = reactive(False)
reload_needed = reactive(True)
message_store = MessageStore()
oldest_id: Reactive[int] = reactive(0)
newest_id: Reactive[int] = reactive(0)
msg_worker: Worker | None = None
total_messages: Reactive[int] = reactive(0)
status_title = reactive("Message View")
sort_order_ascending: Reactive[bool] = reactive(True)
selected_messages: Reactive[set[int]] = reactive(set())
main_content_visible: Reactive[bool] = reactive(True)
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next)
yield SystemCommand(
"Previous Message", "Navigate to Previous ID", self.action_previous
)
yield SystemCommand(
"Delete Message", "Delete the current message", self.action_delete
)
yield SystemCommand(
"Archive Message", "Archive the current message", self.action_archive
)
yield SystemCommand(
"Open Message", "Open a specific message by ID", self.action_open
)
yield SystemCommand(
"Create Task", "Create a task using the task CLI", self.action_create_task
)
yield SystemCommand(
"Oldest Message", "Show the oldest message", self.action_oldest
)
yield SystemCommand(
"Newest Message", "Show the newest message", self.action_newest
)
yield SystemCommand("Reload", "Reload the message list", self.fetch_envelopes)
BINDINGS = [
Binding("j", "next", "Next message"),
Binding("k", "previous", "Previous message"),
Binding("#", "delete", "Delete message"),
Binding("e", "archive", "Archive message"),
Binding("o", "open", "Open message", show=False),
Binding("q", "quit", "Quit application"),
Binding("h", "toggle_header", "Toggle Envelope Header"),
Binding("t", "create_task", "Create Task"),
Binding("l", "open_links", "Show Links"),
Binding("%", "reload", "Reload message list"),
Binding("1", "focus_1", "Focus Accounts Panel"),
Binding("2", "focus_2", "Focus Folders Panel"),
Binding("3", "focus_3", "Focus Envelopes Panel"),
Binding("4", "focus_4", "Focus Main Content"),
Binding("w", "toggle_main_content", "Toggle Message View Window"),
]
BINDINGS.extend(
[
Binding("pagedown", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up"),
Binding("s", "toggle_sort_order", "Toggle Sort Order"),
Binding("x", "toggle_selection", "Toggle selection", show=False),
Binding("space", "toggle_selection", "Toggle selection"),
Binding("escape", "clear_selection", "Clear selection"),
]
)
def compose(self) -> ComposeResult:
yield Horizontal(
Vertical(
ListView(
ListItem(Label("All emails...")),
id="envelopes_list",
classes="list_view",
initial_index=0,
),
ListView(id="accounts_list", classes="list_view"),
ListView(id="folders_list", classes="list_view"),
id="sidebar",
),
ContentContainer(id="main_content"),
id="outer-wrapper",
)
yield Footer()
async def on_mount(self) -> None:
self.alert_timer: Timer | None = None # Timer to throttle alerts
self.theme = "monokai"
self.title = "MaildirGTD"
self.query_one("#main_content").border_title = self.status_title
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {sort_indicator}"
self.query_one("#accounts_list").border_title = "2⃣ Accounts"
self.query_one("#folders_list").border_title = "3⃣ Folders"
self.fetch_accounts()
self.fetch_folders()
worker = self.fetch_envelopes()
await worker.wait()
self.query_one("#envelopes_list").focus()
self.action_oldest()
def compute_status_title(self):
metadata = self.message_store.get_metadata(self.current_message_id)
message_date = metadata["date"] if metadata else "N/A"
return f"✉️ Message ID: {self.current_message_id} | Date: {message_date}"
def watch_status_title(self, old_status_title: str, new_status_title: str) -> None:
self.query_one(ContentContainer).border_title = new_status_title
def watch_sort_order_ascending(self, old_value: bool, new_value: bool) -> None:
"""Update the border title of the envelopes list when the sort order changes."""
sort_indicator = "" if new_value else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {sort_indicator}"
def watch_current_message_index(self, old_index: int, new_index: int) -> None:
if new_index < 0:
new_index = 0
self.current_message_index = new_index
if new_index > self.total_messages:
new_index = self.total_messages
self.current_message_index = new_index
self._update_list_view_subtitle()
self.query_one("#envelopes_list", ListView).index = new_index
def watch_selected_messages(
self, old_messages: set[int], new_messages: set[int]
) -> None:
self._update_list_view_subtitle()
def _update_list_view_subtitle(self) -> None:
subtitle = f"[b]{self.current_message_index}[/b]/{self.total_messages}"
if self.selected_messages:
subtitle = f"(✓{len(self.selected_messages)}) {subtitle}"
self.query_one("#envelopes_list").border_subtitle = subtitle
def watch_total_messages(self, old_total: int, new_total: int) -> None:
"""Called when the total_messages reactive attribute changes."""
self._update_list_view_subtitle()
def watch_reload_needed(
self, old_reload_needed: bool, new_reload_needed: bool
) -> None:
logging.info(f"Reload needed: {new_reload_needed}")
if not old_reload_needed and new_reload_needed:
self.fetch_envelopes()
def watch_current_message_id(
self, old_message_id: int, new_message_id: int
) -> None:
"""Called when the current message ID changes."""
logging.info(
f"Current message ID changed from {old_message_id} to {new_message_id}"
)
if new_message_id == old_message_id:
return
# If the main content view is not visible, don't load the message
if not self.main_content_visible:
return
# Cancel any existing message loading worker
if self.msg_worker:
self.msg_worker.cancel()
# Start a new worker to load the message content
self.msg_worker = self.load_message_content(new_message_id)
@work(exclusive=True)
async def load_message_content(self, message_id: int) -> None:
"""Worker to load message content asynchronously."""
content_container = self.query_one(ContentContainer)
content_container.display_content(message_id)
metadata = self.message_store.get_metadata(message_id)
if metadata:
message_date = metadata["date"]
if self.current_message_index != metadata["index"]:
self.current_message_index = metadata["index"]
list_view = self.query_one("#envelopes_list", ListView)
if list_view.index != metadata["index"]:
list_view.index = metadata["index"]
# Mark message as read
await self._mark_message_as_read(message_id, metadata["index"])
else:
logging.warning(f"Message ID {message_id} not found in metadata.")
async def _mark_message_as_read(self, message_id: int, index: int) -> None:
"""Mark a message as read and update the UI."""
# Check if already read
envelope_data = self.message_store.envelopes[index]
if envelope_data and envelope_data.get("type") != "header":
flags = envelope_data.get("flags", [])
if "Seen" in flags:
return # Already read
# Mark as read via himalaya
_, success = await himalaya_client.mark_as_read(message_id)
if success:
# Update the envelope flags in the store
if envelope_data:
if "flags" not in envelope_data:
envelope_data["flags"] = []
if "Seen" not in envelope_data["flags"]:
envelope_data["flags"].append("Seen")
# Update the visual state of the list item
try:
list_view = self.query_one("#envelopes_list", ListView)
list_item = list_view.children[index]
envelope_widget = list_item.query_one(EnvelopeListItem)
envelope_widget.is_read = True
envelope_widget.remove_class("unread")
except Exception:
pass # Widget may not exist
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Called when an item in the list view is selected."""
if event.list_view.index is None:
return
# Only handle selection from the envelopes list
if event.list_view.id != "envelopes_list":
return
selected_index = event.list_view.index
# Check bounds before accessing
if selected_index < 0 or selected_index >= len(self.message_store.envelopes):
return
current_item = self.message_store.envelopes[selected_index]
if current_item is None or current_item.get("type") == "header":
return
message_id = int(current_item["id"])
self.current_message_id = message_id
self.current_message_index = selected_index
# Focus the main content panel after selecting a message
self.action_focus_4()
def on_list_view_highlighted(self, event: ListView.Highlighted) -> None:
"""Called when an item in the list view is highlighted (e.g., via arrow keys)."""
if event.list_view.index is None:
return
# Only handle highlights from the envelopes list
if event.list_view.id != "envelopes_list":
return
highlighted_index = event.list_view.index
# Check bounds before accessing
if highlighted_index < 0 or highlighted_index >= len(
self.message_store.envelopes
):
return
current_item = self.message_store.envelopes[highlighted_index]
if current_item is None or current_item.get("type") == "header":
return
# message_id = int(current_item["id"])
# self.current_message_id = message_id
self.highlighted_message_index = highlighted_index
def on_key(self, event) -> None:
"""Handle key events to intercept space on the envelopes list."""
# Intercept space key when envelopes list is focused to prevent default select behavior
if event.key == "space":
focused = self.focused
if focused and focused.id == "envelopes_list":
event.prevent_default()
event.stop()
self.action_toggle_selection()
@work(exclusive=False)
async def fetch_envelopes(self) -> None:
msglist = self.query_one("#envelopes_list", ListView)
try:
msglist.loading = True
# Use the Himalaya client to fetch envelopes
envelopes, success = await himalaya_client.list_envelopes()
if success:
self.reload_needed = False
# Ensure envelopes is a list, even if it's None from the client
envelopes_list = envelopes if envelopes is not None else []
self.message_store.load(envelopes_list, self.sort_order_ascending)
self.total_messages = self.message_store.total_messages
# Use the centralized refresh method to update the ListView
self._populate_list_view()
# Restore the current index
msglist.index = self.current_message_index
else:
self.show_status("Failed to fetch envelopes.", "error")
except Exception as e:
self.show_status(f"Error fetching message list: {e}", "error")
finally:
msglist.loading = False
@work(exclusive=False)
async def fetch_accounts(self) -> None:
accounts_list = self.query_one("#accounts_list", ListView)
try:
accounts_list.loading = True
# Use the Himalaya client to fetch accounts
accounts, success = await himalaya_client.list_accounts()
if success and accounts:
for account in accounts:
item = ListItem(
Label(
str(account["name"]).strip(),
classes="account_name",
markup=False,
)
)
accounts_list.append(item)
else:
self.show_status("Failed to fetch accounts.", "error")
except Exception as e:
self.show_status(f"Error fetching account list: {e}", "error")
finally:
accounts_list.loading = False
@work(exclusive=False)
async def fetch_folders(self) -> None:
folders_list = self.query_one("#folders_list", ListView)
folders_list.clear()
folders_list.append(
ListItem(Label("INBOX", classes="folder_name", markup=False))
)
try:
folders_list.loading = True
# Use the Himalaya client to fetch folders
folders, success = await himalaya_client.list_folders()
if success and folders:
for folder in folders:
item = ListItem(
Label(
str(folder["name"]).strip(),
classes="folder_name",
markup=False,
)
)
folders_list.append(item)
else:
self.show_status("Failed to fetch folders.", "error")
except Exception as e:
self.show_status(f"Error fetching folder list: {e}", "error")
finally:
folders_list.loading = False
def _populate_list_view(self) -> None:
"""Populate the ListView with new items using the new EnvelopeListItem widget."""
envelopes_list = self.query_one("#envelopes_list", ListView)
envelopes_list.clear()
config = get_config()
for item in self.message_store.envelopes:
if item and item.get("type") == "header":
# Use the new GroupHeader widget for date groupings
envelopes_list.append(ListItem(GroupHeader(label=item["label"])))
elif item:
# Use the new EnvelopeListItem widget
message_id = int(item.get("id", 0))
is_selected = message_id in self.selected_messages
envelope_widget = EnvelopeListItem(
envelope=item,
config=config.envelope_display,
is_selected=is_selected,
)
envelopes_list.append(ListItem(envelope_widget))
self.refresh_list_view_items() # Initial refresh of item states
def refresh_list_view_items(self) -> None:
"""Update the visual state of existing ListItems without clearing the list."""
envelopes_list = self.query_one("#envelopes_list", ListView)
for i, list_item in enumerate(envelopes_list.children):
if isinstance(list_item, ListItem):
item_data = self.message_store.envelopes[i]
if item_data and item_data.get("type") != "header":
message_id = int(item_data["id"])
is_selected = message_id in self.selected_messages or False
list_item.set_class(is_selected, "selection")
# Try to update the EnvelopeListItem's selection state
try:
envelope_widget = list_item.query_one(EnvelopeListItem)
envelope_widget.set_selected(is_selected)
except Exception:
pass # Widget may not exist or be of old type
def show_message(self, message_id: int, new_index=None) -> None:
if new_index:
self.current_message_index = new_index
self.action_focus_4()
self.current_message_id = message_id
def show_status(self, message: str, severity: str = "information") -> None:
"""Display a status message using the built-in notify function."""
self.notify(
message, title="Status", severity=severity, timeout=2.6, markup=True
)
async def action_toggle_sort_order(self) -> None:
"""Toggle the sort order of the envelope list."""
self.sort_order_ascending = not self.sort_order_ascending
worker = self.fetch_envelopes()
await worker.wait()
if self.sort_order_ascending:
self.action_oldest()
else:
self.action_newest()
async def action_toggle_mode(self) -> None:
"""Toggle the content mode between plaintext and markdown."""
content_container = self.query_one(ContentContainer)
await content_container.action_toggle_mode()
def action_next(self) -> None:
if not self.current_message_index >= 0:
return
next_id, next_idx = self.message_store.find_next_valid_id(
self.current_message_index
)
if next_id is not None and next_idx is not None:
self.current_message_id = next_id
self.current_message_index = next_idx
def action_previous(self) -> None:
if not self.current_message_index >= 0:
return
prev_id, prev_idx = self.message_store.find_prev_valid_id(
self.current_message_index
)
if prev_id is not None and prev_idx is not None:
self.current_message_id = prev_id
self.current_message_index = prev_idx
async def action_delete(self) -> None:
"""Delete the current or selected messages."""
if self.selected_messages:
# --- Multi-message delete: show confirmation ---
count = len(self.selected_messages)
def do_delete(confirmed: bool) -> None:
if confirmed:
self.run_worker(self._delete_selected_messages())
self.push_screen(
ConfirmDialog(
title="Delete Messages",
message=f"Delete {count} selected message{'s' if count > 1 else ''}?",
confirm_label="Delete",
cancel_label="Cancel",
),
do_delete,
)
else:
# --- Single message delete (no confirmation needed) ---
worker = delete_current(self)
await worker.wait()
async def _delete_selected_messages(self) -> None:
"""Delete all selected messages."""
message_ids_to_delete = list(self.selected_messages)
next_id_to_select = None
if message_ids_to_delete:
highest_deleted_id = max(message_ids_to_delete)
metadata = self.message_store.get_metadata(highest_deleted_id)
if metadata:
next_id, _ = self.message_store.find_next_valid_id(metadata["index"])
if next_id is None:
next_id, _ = self.message_store.find_prev_valid_id(
metadata["index"]
)
next_id_to_select = next_id
# Delete each message
success_count = 0
for mid in message_ids_to_delete:
message, success = await himalaya_client.delete_message(mid)
if success:
success_count += 1
else:
self.show_status(f"Failed to delete message {mid}: {message}", "error")
if success_count > 0:
self.show_status(
f"Deleted {success_count} message{'s' if success_count > 1 else ''}"
)
self.selected_messages.clear()
self.refresh_list_view_items()
# Refresh the envelope list
worker = self.fetch_envelopes()
await worker.wait()
# After refresh, select the next message
if next_id_to_select:
new_metadata = self.message_store.get_metadata(next_id_to_select)
if new_metadata:
self.current_message_id = next_id_to_select
else:
self.action_oldest()
else:
self.action_oldest()
async def action_archive(self) -> None:
"""Archive the current or selected messages and update UI consistently."""
if self.selected_messages:
# --- Multi-message archive: show confirmation ---
count = len(self.selected_messages)
def do_archive(confirmed: bool) -> None:
if confirmed:
self.run_worker(self._archive_selected_messages())
self.push_screen(
ConfirmDialog(
title="Archive Messages",
message=f"Archive {count} selected message{'s' if count > 1 else ''}?",
confirm_label="Archive",
cancel_label="Cancel",
),
do_archive,
)
else:
# --- Single message archive (no confirmation needed) ---
await self._archive_single_message()
async def _archive_selected_messages(self) -> None:
"""Archive all selected messages."""
message_ids_to_archive = list(self.selected_messages)
next_id_to_select = None
if message_ids_to_archive:
highest_archived_id = max(message_ids_to_archive)
metadata = self.message_store.get_metadata(highest_archived_id)
if metadata:
next_id, _ = self.message_store.find_next_valid_id(metadata["index"])
if next_id is None:
next_id, _ = self.message_store.find_prev_valid_id(
metadata["index"]
)
next_id_to_select = next_id
message, success = await himalaya_client.archive_messages(
[str(mid) for mid in message_ids_to_archive]
)
if success:
self.show_status(
message or f"Archived {len(message_ids_to_archive)} messages"
)
self.selected_messages.clear()
self.refresh_list_view_items()
else:
self.show_status(f"Failed to archive messages: {message}", "error")
return
# Refresh the envelope list
worker = self.fetch_envelopes()
await worker.wait()
# After refresh, select the next message
if next_id_to_select:
new_metadata = self.message_store.get_metadata(next_id_to_select)
if new_metadata:
self.current_message_id = next_id_to_select
else:
self.action_oldest()
else:
self.action_oldest()
async def _archive_single_message(self) -> None:
"""Archive the current single message."""
if not self.current_message_id:
self.show_status("No message selected to archive.", "error")
return
current_id = self.current_message_id
current_idx = self.current_message_index
next_id, _ = self.message_store.find_next_valid_id(current_idx)
if next_id is None:
next_id, _ = self.message_store.find_prev_valid_id(current_idx)
next_id_to_select = next_id
message, success = await himalaya_client.archive_messages([str(current_id)])
if success:
self.show_status(message or "Archived")
else:
self.show_status(
f"Failed to archive message {current_id}: {message}", "error"
)
return
# Refresh the envelope list
worker = self.fetch_envelopes()
await worker.wait()
# After refresh, select the next message
if next_id_to_select:
new_metadata = self.message_store.get_metadata(next_id_to_select)
if new_metadata:
self.current_message_id = next_id_to_select
else:
self.action_oldest()
else:
self.action_oldest()
def action_open(self) -> None:
action_open(self)
def action_create_task(self) -> None:
action_create_task(self)
def action_open_links(self) -> None:
"""Open the link panel showing links from the current message."""
content_container = self.query_one(ContentContainer)
links = content_container.get_links()
self.push_screen(LinkPanel(links))
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content").scroll_down()
def action_scroll_up(self) -> None:
"""Scroll the main content up."""
self.query_one("#main_content").scroll_up()
def action_scroll_page_down(self) -> None:
"""Scroll the main content down by a page."""
self.query_one("#main_content").scroll_page_down()
def action_scroll_page_up(self) -> None:
"""Scroll the main content up by a page."""
self.query_one("#main_content").scroll_page_up()
def action_toggle_main_content(self) -> None:
"""Toggle the visibility of the main content pane."""
self.main_content_visible = not self.main_content_visible
def watch_main_content_visible(self, visible: bool) -> None:
"""Called when main_content_visible changes."""
main_content = self.query_one("#main_content")
accounts_list = self.query_one("#accounts_list")
folders_list = self.query_one("#folders_list")
main_content.display = visible
accounts_list.display = visible
folders_list.display = visible
self.query_one("#envelopes_list").focus()
def action_quit(self) -> None:
self.exit()
def action_toggle_selection(self) -> None:
"""Toggle selection for the current message."""
current_item_data = self.message_store.envelopes[self.highlighted_message_index]
if current_item_data and current_item_data.get("type") != "header":
message_id = int(current_item_data["id"])
envelopes_list = self.query_one("#envelopes_list", ListView)
current_list_item = envelopes_list.children[self.highlighted_message_index]
# Toggle selection state
if message_id in self.selected_messages:
self.selected_messages.remove(message_id)
is_selected = False
else:
self.selected_messages.add(message_id)
is_selected = True
# Update the EnvelopeListItem widget
try:
envelope_widget = current_list_item.query_one(EnvelopeListItem)
envelope_widget.set_selected(is_selected)
except Exception:
# Fallback for old-style widgets
try:
checkbox_label = current_list_item.query_one(".checkbox", Label)
if is_selected:
checkbox_label.add_class("x-list")
checkbox_label.update("\uf4a7")
else:
checkbox_label.remove_class("x-list")
checkbox_label.update("\ue640")
except Exception:
pass
self._update_list_view_subtitle()
def action_clear_selection(self) -> None:
"""Clear all selected messages."""
self.selected_messages.clear()
self.refresh_list_view_items() # Refresh all items to uncheck checkboxes
self._update_list_view_subtitle()
def action_oldest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.message_store.get_oldest_id())
def action_newest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.message_store.get_newest_id())
def action_focus_1(self) -> None:
self.query_one("#envelopes_list").focus()
def action_focus_2(self) -> None:
self.query_one("#accounts_list").focus()
def action_focus_3(self) -> None:
self.query_one("#folders_list").focus()
def action_focus_4(self) -> None:
self.query_one("#main_content").focus()
if __name__ == "__main__":
app = EmailViewerApp()
app.run()
def launch_email_viewer():
app = EmailViewerApp()
app.run()

179
src/mail/config.py Normal file
View File

@@ -0,0 +1,179 @@
"""Configuration system for Mail email reader using Pydantic."""
import logging
import os
from pathlib import Path
from typing import Literal, Optional
import toml
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class TaskBackendConfig(BaseModel):
"""Configuration for task management backend."""
backend: Literal["taskwarrior", "dstask"] = "taskwarrior"
taskwarrior_path: str = "task"
dstask_path: str = Field(
default_factory=lambda: str(Path.home() / ".local" / "bin" / "dstask")
)
class EnvelopeDisplayConfig(BaseModel):
"""Configuration for envelope list item rendering."""
# Sender display
max_sender_length: int = 25
# Date/time display
date_format: str = "%m/%d"
time_format: str = "%H:%M"
show_date: bool = True
show_time: bool = True
# Grouping
group_by: Literal["relative", "absolute"] = "relative"
# relative: "Today", "Yesterday", "This Week", etc.
# absolute: "December 2025", "November 2025", etc.
# Layout
lines: Literal[2, 3] = 2
# 2: sender/date on line 1, subject on line 2
# 3: sender/date on line 1, subject on line 2, preview on line 3
show_checkbox: bool = True
show_preview: bool = False # Only used when lines=3
# NerdFont icons for status
icon_unread: str = "\uf0e0" # nf-fa-envelope (filled)
icon_read: str = "\uf2b6" # nf-fa-envelope_open (open)
icon_flagged: str = "\uf024" # nf-fa-flag
icon_attachment: str = "\uf0c6" # nf-fa-paperclip
class KeybindingsConfig(BaseModel):
"""Keybinding customization."""
next_message: str = "j"
prev_message: str = "k"
delete: str = "#"
archive: str = "e"
open_by_id: str = "o"
quit: str = "q"
toggle_header: str = "h"
create_task: str = "t"
reload: str = "%"
toggle_sort: str = "s"
toggle_selection: str = "space"
clear_selection: str = "escape"
scroll_page_down: str = "pagedown"
scroll_page_down: str = "space"
scroll_page_up: str = "b"
toggle_main_content: str = "w"
open_links: str = "l"
toggle_view_mode: str = "m"
class ContentDisplayConfig(BaseModel):
"""Configuration for message content display."""
# View mode: "markdown" for pretty rendering, "html" for raw/plain display
default_view_mode: Literal["markdown", "html"] = "markdown"
class LinkPanelConfig(BaseModel):
"""Configuration for the link panel."""
# Whether to close the panel after opening a link
close_on_open: bool = False
class MailOperationsConfig(BaseModel):
"""Configuration for mail operations."""
# Folder to move messages to when archiving
archive_folder: str = "Archive"
class ThemeConfig(BaseModel):
"""Theme/appearance settings."""
theme_name: str = "monokai"
class MailAppConfig(BaseModel):
"""Main configuration for Mail email reader."""
task: TaskBackendConfig = Field(default_factory=TaskBackendConfig)
envelope_display: EnvelopeDisplayConfig = Field(
default_factory=EnvelopeDisplayConfig
)
content_display: ContentDisplayConfig = Field(default_factory=ContentDisplayConfig)
link_panel: LinkPanelConfig = Field(default_factory=LinkPanelConfig)
mail: MailOperationsConfig = Field(default_factory=MailOperationsConfig)
keybindings: KeybindingsConfig = Field(default_factory=KeybindingsConfig)
theme: ThemeConfig = Field(default_factory=ThemeConfig)
@classmethod
def get_config_path(cls) -> Path:
"""Get the path to the config file."""
# Check environment variable first
env_path = os.getenv("LUK_MAIL_CONFIG")
if env_path:
return Path(env_path)
# Default to ~/.config/luk/mail.toml
return Path.home() / ".config" / "luk" / "mail.toml"
@classmethod
def load(cls, config_path: Optional[Path] = None) -> "MailAppConfig":
"""Load config from TOML file with defaults for missing values."""
if config_path is None:
config_path = cls.get_config_path()
if config_path.exists():
try:
with open(config_path, "r") as f:
data = toml.load(f)
logger.info(f"Loaded config from {config_path}")
return cls.model_validate(data)
except Exception as e:
logger.warning(f"Error loading config from {config_path}: {e}")
logger.warning("Using default configuration")
return cls()
else:
logger.info(f"No config file at {config_path}, using defaults")
return cls()
def save(self, config_path: Optional[Path] = None) -> None:
"""Save current config to TOML file."""
if config_path is None:
config_path = self.get_config_path()
# Ensure parent directory exists
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w") as f:
toml.dump(self.model_dump(), f)
logger.info(f"Saved config to {config_path}")
# Global config instance (lazy-loaded)
_config: Optional[MailAppConfig] = None
def get_config() -> MailAppConfig:
"""Get the global config instance, loading it if necessary."""
global _config
if _config is None:
_config = MailAppConfig.load()
return _config
def reload_config() -> MailAppConfig:
"""Force reload of the config from disk."""
global _config
_config = MailAppConfig.load()
return _config

305
src/mail/email_viewer.tcss Normal file
View File

@@ -0,0 +1,305 @@
/* Basic stylesheet for the Textual Email Viewer App */
#main_content, .list_view {
scrollbar-size: 1 1;
border: round rgb(117, 106, 129);
height: 1fr;
}
#sidebar {
width: 1fr
}
.list_view {
height: 3;
}
#main_content {
width: 2fr;
}
.envelope-selected {
tint: $accent 20%;
}
#sidebar:focus-within {
background: $panel;
.list_view:blur {
height: 3;
}
.list_view:focus {
height: 2fr;
}
}
#envelopes_list {
height: 2fr;
}
#main_content:focus, .list_view:focus {
border: round $secondary;
background: rgb(55, 53, 57);
border-title-style: bold;
}
Label#task_prompt {
padding: 1;
color: rgb(128,128,128);
}
Label#task_prompt_label {
padding: 1;
color: rgb(255, 216, 102);
}
Label#message_label {
padding: 1;
}
StatusTitle {
dock: top;
width: 100%;
height: 1;
color: $text;
background: rgb(64, 62, 65);
content-align: center middle;
}
EnvelopeHeader {
dock: top;
width: 100%;
max-height: 2;
tint: $primary 10%;
}
Markdown {
padding: 1 2;
}
/* =====================================================
NEW EnvelopeListItem and GroupHeader styles
===================================================== */
/* EnvelopeListItem - the main envelope display widget */
EnvelopeListItem {
height: auto;
width: 1fr;
padding: 0;
}
EnvelopeListItem .envelope-content {
height: auto;
width: 1fr;
}
EnvelopeListItem .envelope-row-1 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-2 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-3 {
height: 1;
width: 1fr;
}
EnvelopeListItem .status-icon {
width: 3;
padding: 0 1 0 0;
color: $text-muted;
}
EnvelopeListItem .status-icon.unread {
color: $accent;
}
EnvelopeListItem .checkbox {
width: 2;
padding: 0 1 0 0;
}
EnvelopeListItem .sender-name {
width: 1fr;
}
EnvelopeListItem .message-datetime {
width: auto;
padding: 0 1;
color: $secondary;
}
EnvelopeListItem .email-subject {
width: 1fr;
padding: 0 4;
}
EnvelopeListItem .email-preview {
width: 1fr;
padding: 0 4;
color: $text-muted;
}
/* Unread message styling */
EnvelopeListItem.unread .sender-name {
text-style: bold;
}
EnvelopeListItem.unread .email-subject {
text-style: bold;
}
/* Selected message styling */
EnvelopeListItem.selected {
tint: $accent 20%;
}
/* GroupHeader - date group separator */
GroupHeader {
height: 1;
width: 1fr;
background: rgb(64, 62, 65);
}
GroupHeader .group-header-label {
color: rgb(160, 160, 160);
text-style: bold;
padding: 0 1;
width: 1fr;
}
/* =====================================================
END NEW styles
===================================================== */
/* Legacy styles (keeping for backward compatibility) */
.email_subject {
width: 1fr;
padding: 0 2;
text-style: bold;
}
.sender_name {
tint: gray 30%;
}
.message_date {
padding: 0 2;
color: $secondary;
}
.header_key {
tint: gray 20%;
min-width: 10;
text-style:bold;
}
.header_value {
padding:0 1 0 0;
height: auto;
width: auto;
}
.modal_screen {
align: center middle;
margin: 1;
padding: 2;
border: round $border;
background: $panel;
width: auto;
height: auto;
}
#envelopes_list {
ListItem:odd {
background: rgb(45, 45, 46);
}
ListItem:even {
background: rgb(50, 50, 56);
}
& > ListItem {
&.-highlight, .selection {
color: $block-cursor-blurred-foreground;
background: $block-cursor-blurred-background;
text-style: $block-cursor-blurred-text-style;
}
}
}
.envelope_item_row {
height: auto;
width: 1fr;
.envelope_header_row, .envelope_subject_row {
height: auto;
}
}
.x-list {
tint: $accent 20%;
}
#open_message_container {
border: panel $border;
dock: right;
width: 25%;
min-width: 60;
padding: 0 1;
height: 100%;
Input {
width: 1fr;
}
Label, Button {
width: auto;
}
}
Label.group_header {
color: rgb(140, 140, 140);
text-style: bold;
background: rgb(64, 62, 65);
width: 100%;
padding: 0 1;
}
#plaintext_content {
padding: 1 2;
height: auto;
width: 100%;
}
#html_content {
padding: 1 2;
height: auto;
width: 100%;
}
.hidden {
display: none;
}
#markdown_content {
padding: 1 2;
}
ContentContainer {
width: 100%;
height: 1fr;
}
.checkbox {
padding-right: 1;
}

150
src/mail/message_store.py Normal file
View File

@@ -0,0 +1,150 @@
import logging
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime, UTC
from src.services.himalaya import client as himalaya_client
class MessageStore:
"""Store and manage message envelopes"""
def __init__(self):
self.envelopes: List[Dict[str, Any]] = []
self.metadata_by_id: Dict[int, Dict[str, Any]] = {}
self.total_messages = 0
def load(
self, envelopes: List[Dict[str, Any]], sort_ascending: bool = True
) -> None:
"""Load envelopes from Himalaya client and process them"""
if not envelopes:
self.envelopes = []
self.metadata_by_id = {}
self.total_messages = 0
return
# Sort by date
envelopes.sort(
key=lambda x: x.get("date", ""),
reverse=not sort_ascending,
)
# Group envelopes by month
grouped_envelopes = []
months = {}
for envelope in envelopes:
if "id" not in envelope:
continue
# Extract date and determine month group
date_str = envelope.get("date", "")
try:
date = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
month_key = date.strftime("%B %Y")
except (ValueError, TypeError):
month_key = "Unknown Date"
# Add month header if this is a new month
if month_key not in months:
months[month_key] = True
grouped_envelopes.append({"type": "header", "label": month_key})
# Add the envelope
grouped_envelopes.append(envelope)
# Store metadata for quick access
envelope_id = int(envelope["id"])
self.metadata_by_id[envelope_id] = {
"id": envelope_id,
"subject": envelope.get("subject", ""),
"from": envelope.get("from", {}),
"to": envelope.get("to", {}),
"cc": envelope.get("cc", {}),
"date": date_str,
"index": len(grouped_envelopes) - 1,
}
self.envelopes = grouped_envelopes
self.total_messages = len(self.metadata_by_id)
async def reload(self, sort_ascending: bool = True) -> None:
"""Reload envelopes from the Himalaya client"""
envelopes, success = await himalaya_client.list_envelopes()
if success:
self.load(envelopes, sort_ascending)
else:
logging.error("Failed to reload envelopes")
def get_metadata(self, message_id: int) -> Optional[Dict[str, Any]]:
"""Get metadata for a message by ID"""
return self.metadata_by_id.get(message_id)
def find_next_valid_id(
self, current_index: int
) -> Tuple[Optional[int], Optional[int]]:
"""Find the next valid message ID and its index"""
if not self.envelopes or current_index >= len(self.envelopes) - 1:
return None, None
# Start from current index + 1
for idx in range(current_index + 1, len(self.envelopes)):
item = self.envelopes[idx]
# Skip header items
if item and item.get("type") != "header" and "id" in item:
return int(item["id"]), idx
return None, None
def find_prev_valid_id(
self, current_index: int
) -> Tuple[Optional[int], Optional[int]]:
"""Find the previous valid message ID and its index"""
if not self.envelopes or current_index <= 0:
return None, None
# Start from current index - 1
for idx in range(current_index - 1, -1, -1):
item = self.envelopes[idx]
# Skip header items
if item and item.get("type") != "header" and "id" in item:
return int(item["id"]), idx
return None, None
def get_oldest_id(self) -> int:
"""Get the ID of the oldest message (first non-header item)"""
for item in self.envelopes:
if item and item.get("type") != "header" and "id" in item:
return int(item["id"])
return 0
def get_newest_id(self) -> int:
"""Get the ID of the newest message (last non-header item)"""
for item in reversed(self.envelopes):
if item and item.get("type") != "header" and "id" in item:
return int(item["id"])
return 0
def remove_envelope(self, message_id: int) -> None:
"""Remove an envelope from the store"""
metadata = self.metadata_by_id.get(message_id)
if not metadata:
return
index = metadata["index"]
if 0 <= index < len(self.envelopes):
# Remove from the envelopes list
self.envelopes.pop(index)
# Remove from metadata dictionary
del self.metadata_by_id[message_id]
# Update indexes for all subsequent messages
for id_, meta in self.metadata_by_id.items():
if meta["index"] > index:
meta["index"] -= 1
# Update total message count
self.total_messages = len(self.metadata_by_id)
else:
logging.warning(f"Invalid index {index} for message ID {message_id}")

View File

@@ -0,0 +1,108 @@
"""Confirmation dialog screen for destructive actions."""
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, Container
from textual.screen import ModalScreen
from textual.widgets import Button, Label, Static
class ConfirmDialog(ModalScreen[bool]):
"""A modal confirmation dialog that returns True if confirmed, False otherwise."""
BINDINGS = [
Binding("escape", "cancel", "Cancel"),
Binding("enter", "confirm", "Confirm"),
Binding("y", "confirm", "Yes"),
Binding("n", "cancel", "No"),
]
DEFAULT_CSS = """
ConfirmDialog {
align: center middle;
}
ConfirmDialog #confirm-container {
width: 50;
height: auto;
min-height: 7;
background: $surface;
border: thick $primary;
padding: 1 2;
}
ConfirmDialog #confirm-title {
text-style: bold;
width: 1fr;
height: 1;
text-align: center;
margin-bottom: 1;
}
ConfirmDialog #confirm-message {
width: 1fr;
height: 1;
text-align: center;
margin-bottom: 1;
}
ConfirmDialog #confirm-buttons {
width: 1fr;
height: 3;
align: center middle;
margin-top: 1;
}
ConfirmDialog Button {
margin: 0 1;
}
"""
def __init__(
self,
title: str = "Confirm",
message: str = "Are you sure?",
confirm_label: str = "Yes",
cancel_label: str = "No",
**kwargs,
):
"""Initialize the confirmation dialog.
Args:
title: The dialog title
message: The confirmation message
confirm_label: Label for the confirm button
cancel_label: Label for the cancel button
"""
super().__init__(**kwargs)
self._title = title
self._message = message
self._confirm_label = confirm_label
self._cancel_label = cancel_label
def compose(self) -> ComposeResult:
with Container(id="confirm-container"):
yield Label(self._title, id="confirm-title")
yield Label(self._message, id="confirm-message")
with Horizontal(id="confirm-buttons"):
yield Button(self._cancel_label, id="cancel", variant="default")
yield Button(self._confirm_label, id="confirm", variant="error")
def on_mount(self) -> None:
"""Focus the cancel button by default (safer option)."""
self.query_one("#cancel", Button).focus()
@on(Button.Pressed, "#confirm")
def handle_confirm(self) -> None:
self.dismiss(True)
@on(Button.Pressed, "#cancel")
def handle_cancel(self) -> None:
self.dismiss(False)
def action_confirm(self) -> None:
self.dismiss(True)
def action_cancel(self) -> None:
self.dismiss(False)

View File

@@ -0,0 +1,213 @@
import logging
from textual.screen import ModalScreen
from textual.widgets import Input, Label, Button, ListView, ListItem
from textual.containers import Vertical, Horizontal, Container
from textual.binding import Binding
from textual import on, work
from src.services.task_client import create_task, get_backend_info
class CreateTaskScreen(ModalScreen):
"""Screen for creating a new task."""
BINDINGS = [
Binding("escape", "cancel", "Close"),
Binding("ctrl+s", "submit", "Create Task"),
]
DEFAULT_CSS = """
CreateTaskScreen {
align: right middle;
}
CreateTaskScreen #create_task_container {
dock: right;
width: 40%;
min-width: 50;
max-width: 80;
height: 100%;
background: $surface;
border: round $primary;
padding: 1 2;
}
CreateTaskScreen #create_task_container:focus-within {
border: round $accent;
}
CreateTaskScreen #create_task_form {
height: auto;
width: 1fr;
}
CreateTaskScreen .form-field {
height: auto;
width: 1fr;
margin-bottom: 1;
}
CreateTaskScreen .form-field Label {
height: 1;
width: 1fr;
color: $text-muted;
margin-bottom: 0;
}
CreateTaskScreen .form-field Input {
width: 1fr;
}
CreateTaskScreen .form-field Input:focus {
border: tall $accent;
}
CreateTaskScreen .button-row {
height: auto;
width: 1fr;
align: center middle;
margin-top: 1;
}
CreateTaskScreen .button-row Button {
margin: 0 1;
}
CreateTaskScreen .form-hint {
height: 1;
width: 1fr;
color: $text-muted;
text-align: center;
margin-top: 1;
}
"""
def __init__(self, subject="", from_addr="", **kwargs):
super().__init__(**kwargs)
self.subject = subject
self.from_addr = from_addr
self.selected_project = None
def compose(self):
with Container(id="create_task_container"):
with Vertical(id="create_task_form"):
# Subject field
with Vertical(classes="form-field"):
yield Label("Subject")
yield Input(
placeholder="Task description",
value=self.subject,
id="subject_input",
)
# Project field
with Vertical(classes="form-field"):
yield Label("Project")
yield Input(placeholder="e.g., work, home", id="project_input")
# Tags field
with Vertical(classes="form-field"):
yield Label("Tags")
yield Input(placeholder="tag1, tag2, ...", id="tags_input")
# Due date field
with Vertical(classes="form-field"):
yield Label("Due")
yield Input(
placeholder="today, tomorrow, fri, 2024-01-15",
id="due_input",
)
# Priority field
with Vertical(classes="form-field"):
yield Label("Priority")
yield Input(placeholder="H, M, or L", id="priority_input")
# Buttons
with Horizontal(classes="button-row"):
yield Button("Create", id="create_btn", variant="primary")
yield Button("Cancel", id="cancel_btn", variant="error")
yield Label("ctrl+s: create, esc: cancel", classes="form-hint")
def on_mount(self):
backend_name, _ = get_backend_info()
container = self.query_one("#create_task_container", Container)
container.border_title = "\uf0ae New Task" # nf-fa-tasks
container.border_subtitle = backend_name
# Focus the subject input
self.query_one("#subject_input", Input).focus()
def action_cancel(self):
"""Close the screen."""
self.dismiss()
def action_submit(self):
"""Submit the form."""
self._create_task()
@on(Input.Submitted)
def on_input_submitted(self, event: Input.Submitted):
"""Handle Enter key in any input field."""
self._create_task()
@on(Button.Pressed, "#create_btn")
def on_create_pressed(self):
"""Create the task when the Create button is pressed."""
self._create_task()
def _create_task(self):
"""Gather form data and create the task."""
# Get input values
subject = self.query_one("#subject_input", Input).value
project = self.query_one("#project_input", Input).value
tags_input = self.query_one("#tags_input", Input).value
due = self.query_one("#due_input", Input).value
priority = self.query_one("#priority_input", Input).value
# Process tags (split by commas and trim whitespace)
tags = [tag.strip() for tag in tags_input.split(",")] if tags_input else []
# Add a tag for the sender, if provided
if self.from_addr and "@" in self.from_addr:
domain = self.from_addr.split("@")[1].split(".")[0]
if domain and domain not in ["gmail", "yahoo", "hotmail", "outlook"]:
tags.append(domain)
# Create the task
self.create_task_worker(subject, tags, project, due, priority)
@on(Button.Pressed, "#cancel_btn")
def on_cancel_pressed(self):
"""Dismiss the screen when Cancel is pressed."""
self.dismiss()
@work(exclusive=True)
async def create_task_worker(
self, subject, tags=None, project=None, due=None, priority=None
):
"""Worker to create a task using the configured backend."""
if not subject:
self.app.show_status("Task subject cannot be empty.", "error")
return
# Validate priority
if priority and priority.upper() not in ["H", "M", "L"]:
self.app.show_status("Priority must be H, M, or L.", "warning")
priority = None
elif priority:
priority = priority.upper()
# Create the task using the unified client
success, result = await create_task(
task_description=subject,
tags=tags or [],
project=project,
due=due,
priority=priority,
)
if success:
self.app.show_status(f"Task created: {subject}", "success")
self.dismiss()
else:
self.app.show_status(f"Failed to create task: {result}", "error")

View File

@@ -0,0 +1,561 @@
import io
import os
import tempfile
from pathlib import Path
from typing import ByteString
import aiohttp
import mammoth
from docx import Document
from textual_image.renderable import Image
from openai import OpenAI
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container, ScrollableContainer, Horizontal
from textual.screen import Screen
from textual.widgets import Label, Markdown, Button, Footer, Static
from textual import work
from textual.reactive import reactive
from PIL import Image as PILImage
# Define convertible formats
PDF_CONVERTIBLE_FORMATS = {
"doc",
"docx",
"epub",
"eml",
"htm",
"html",
"md",
"msg",
"odp",
"ods",
"odt",
"pps",
"ppsx",
"ppt",
"pptx",
"rtf",
"tif",
"tiff",
"xls",
"xlsm",
"xlsx",
}
JPG_CONVERTIBLE_FORMATS = {
"3g2",
"3gp",
"3gp2",
"3gpp",
"3mf",
"ai",
"arw",
"asf",
"avi",
"bas",
"bash",
"bat",
"bmp",
"c",
"cbl",
"cmd",
"cool",
"cpp",
"cr2",
"crw",
"cs",
"css",
"csv",
"cur",
"dcm",
"dcm30",
"dic",
"dicm",
"dicom",
"dng",
"doc",
"docx",
"dwg",
"eml",
"epi",
"eps",
"epsf",
"epsi",
"epub",
"erf",
"fbx",
"fppx",
"gif",
"glb",
"h",
"hcp",
"heic",
"heif",
"htm",
"html",
"ico",
"icon",
"java",
"jfif",
"jpeg",
"jpg",
"js",
"json",
"key",
"log",
"m2ts",
"m4a",
"m4v",
"markdown",
"md",
"mef",
"mov",
"movie",
"mp3",
"mp4",
"mp4v",
"mrw",
"msg",
"mts",
"nef",
"nrw",
"numbers",
"obj",
"odp",
"odt",
"ogg",
"orf",
"pages",
"pano",
"pdf",
"pef",
"php",
"pict",
"pl",
"ply",
"png",
"pot",
"potm",
"potx",
"pps",
"ppsx",
"ppsxm",
"ppt",
"pptm",
"pptx",
"ps",
"ps1",
"psb",
"psd",
"py",
"raw",
"rb",
"rtf",
"rw1",
"rw2",
"sh",
"sketch",
"sql",
"sr2",
"stl",
"tif",
"tiff",
"ts",
"txt",
"vb",
"webm",
"wma",
"wmv",
"xaml",
"xbm",
"xcf",
"xd",
"xml",
"xpm",
"yaml",
"yml",
}
# Enum for display modes
class DisplayMode:
IMAGE = "image"
TEXT = "text"
MARKDOWN = "markdown"
class DocumentViewerScreen(Screen):
"""Screen for viewing document content from OneDrive items."""
web_url = reactive("")
download_url = reactive("")
use_markitdown = True
image_bytes: ByteString = b""
BINDINGS = [
Binding("escape", "close", "Close"),
Binding("q", "close", "Close"),
Binding("m", "toggle_mode", "Toggle Mode"),
Binding("e", "export_and_open", "Export & Open"),
]
def __init__(self, item_id: str, item_name: str, access_token: str, drive_id: str):
"""Initialize the document viewer screen.
Args:
item_id: The ID of the item to view.
item_name: The name of the item to display.
access_token: The access token for API requests.
drive_id: The ID of the drive containing the item.
"""
super().__init__()
self.item_id = item_id
self.drive_id = drive_id
self.item_name = item_name
self.access_token = access_token
self.document_content = ""
self.plain_text_content = ""
self.content_type = None
self.raw_content = None
self.file_extension = Path(item_name).suffix.lower().lstrip(".")
self.mode: DisplayMode = DisplayMode.TEXT
def compose(self) -> ComposeResult:
"""Compose the document viewer screen."""
yield Container(
Horizontal(
Container(Button("", id="close_button"), id="button_container"),
Container(
Label(f"Viewing: {self.item_name}", id="document_title"),
Label(
f'[link="{self.web_url}"]Open on Web[/link] | [link="{self.download_url}"]Download File[/link]',
id="document_link",
),
),
Button("Toggle Mode", id="toggle_mode_button"),
id="top_container",
),
ScrollableContainer(
Markdown("", id="markdown_content"),
Static(
"",
id="image_content",
expand=True,
),
Label("", id="plaintext_content", classes="hidden", markup=False),
id="content_container",
),
id="document_viewer",
)
yield Footer()
def on_mount(self) -> None:
"""Handle screen mount event."""
self.query_one("#content_container").focus()
self.download_document()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press events."""
if event.button.id == "close_button":
self.dismiss()
elif event.button.id == "toggle_mode_button":
self.action_toggle_mode()
elif event.button.id == "export_button":
self.action_export_and_open()
def is_convertible_format(self) -> bool:
"""Check if the current file is convertible to PDF or JPG."""
return (
self.file_extension in PDF_CONVERTIBLE_FORMATS
or self.file_extension in JPG_CONVERTIBLE_FORMATS
)
def get_conversion_format(self) -> str:
"""Get the appropriate conversion format (pdf or jpg) for the current file."""
if self.file_extension in PDF_CONVERTIBLE_FORMATS:
return "pdf"
elif self.file_extension in JPG_CONVERTIBLE_FORMATS:
return "jpg"
return ""
@work
async def download_document(self) -> None:
"""Download the document content."""
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
metadataUrl = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}"
async with aiohttp.ClientSession() as session:
async with session.get(metadataUrl, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(
f"Failed to fetch document metadata: {error_text}",
severity="error",
)
return
metadata = await response.json()
self.item_name = metadata.get("name", self.item_name)
self.file_extension = (
Path(self.item_name).suffix.lower().lstrip(".")
)
self.download_url = metadata.get("@microsoft.graph.downloadUrl", "")
self.web_url = metadata.get("webUrl", "")
except Exception as e:
self.notify(f"Error downloading document: {str(e)}", severity="error")
try:
url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content"
# Show loading indicator
self.query_one("#content_container").loading = True
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(
f"Failed to download document: {error_text}",
severity="error",
)
return
self.content_type = response.headers.get("content-type", "")
self.raw_content = await response.read()
# Process the content based on content type
self.process_content()
except Exception as e:
self.notify(f"Error downloading document: {str(e)}", severity="error")
finally:
# Hide loading indicator
self.query_one("#content_container").loading = False
@work
async def process_content(self) -> None:
"""Process the downloaded content based on its type."""
if not self.raw_content:
self.notify("No content to display", severity="warning")
return
try:
if self.content_type.startswith("image/"):
from PIL import Image as PILImage
from io import BytesIO
self.notify("Attempting to display image in terminal")
if self.raw_content and len(self.raw_content) > 0:
self.image_bytes = self.raw_content
self.mode = DisplayMode.IMAGE
# Decode the image using BytesIO and Pillow
img = PILImage.open(BytesIO(self.image_bytes))
# Convert the image to RGB mode if it's not already
if img.mode != "RGB":
img = img.convert("RGB")
# Create a Textual Image renderable
textual_img = Image(img)
textual_img.expand = True
textual_img.width = 120
self.query_one("#image_content", Static).update(textual_img)
self.update_content_display()
return
except Exception as e:
self.notify(
f"Error displaying image in terminal: {str(e)}", severity="error"
)
try:
if self.use_markitdown:
self.notify(
"Attempting to convert file into Markdown with Markitdown...",
title="This could take a moment",
severity="info",
)
from markitdown import MarkItDown
with tempfile.NamedTemporaryFile(
suffix=f".{self.file_extension}", delete=False
) as temp_file:
temp_file.write(self.raw_content)
temp_path = temp_file.name
client = OpenAI()
md = MarkItDown(
enable_plugins=True, llm_client=client, llm_model="gpt-4o"
) # Set to True to enable plugins
result = md.convert(
temp_path,
)
self.mode = DisplayMode.MARKDOWN
self.document_content = result.markdown
self.plain_text_content = result.text_content
self.update_content_display()
return
except Exception as e:
self.notify(f"Error using MarkItDown: {str(e)}", severity="error")
try:
if (
self.content_type
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
self.notify(
"Processing DOCX file into Markdown using Mammoth...",
severity="info",
)
self.process_docx()
elif self.content_type.startswith("text/"):
# Process as plain text
text_content = self.raw_content.decode("utf-8", errors="replace")
self.document_content = text_content
self.mode = DisplayMode.TEXT
self.update_content_display()
elif self.content_type.startswith("image/"):
# For images, just display a message
self.document_content = f"*Image file: {self.item_name}*\n\nUse the 'Open URL' command to view this image in your browser."
self.mode = DisplayMode.MARKDOWN
self.update_content_display()
else:
# For other types, display a generic message
conversion_info = ""
if self.is_convertible_format():
conversion_format = self.get_conversion_format()
conversion_info = f"\n\nThis file can be converted to {conversion_format.upper()}. Press 'e' or click 'Export & Open' to convert and view."
self.document_content = f"*File: {self.item_name}*\n\nContent type: {self.content_type}{conversion_info}\n\nThis file type cannot be displayed directly in the viewer. You could [open in your browser]({self.web_url}), or [download the file]({self.download_url})."
self.mode = DisplayMode.MARKDOWN
self.update_content_display()
except Exception as e:
self.notify(f"Error processing content: {str(e)}", severity="error")
@work
async def process_docx(self) -> None:
"""Process DOCX content and convert to Markdown and plain text."""
try:
# Save the DOCX content to a temporary file
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file:
temp_file.write(self.raw_content)
temp_path = temp_file.name
# Convert DOCX to Markdown using mammoth
with open(temp_path, "rb") as docx_file:
result = mammoth.convert_to_markdown(docx_file)
markdown_text = result.value
# Read the document structure with python-docx for plain text
doc = Document(temp_path)
self.plain_text_content = "\n\n".join(
[para.text for para in doc.paragraphs if para.text]
)
self.document_content = markdown_text
# Clean up temporary file
os.unlink(temp_path)
# Store both versions
self.update_content_display()
except Exception as e:
self.notify(f"Error processing DOCX: {str(e)}", severity="error")
def update_content_display(self) -> None:
"""Update the content display with the processed document content."""
markdown_widget = self.query_one("#markdown_content", Markdown)
plaintext_widget = self.query_one("#plaintext_content", Label)
image_widget = self.query_one("#image_content", Static)
toggle_button = self.query_one("#toggle_mode_button", Button)
if self.mode == DisplayMode.IMAGE:
toggle_button.label = "\U000f02e9"
image_widget.remove_class("hidden")
markdown_widget.add_class("hidden")
plaintext_widget.add_class("hidden")
elif self.mode == DisplayMode.MARKDOWN:
toggle_button.label = "Mode \U000f0354|\U000f09ed"
markdown_widget.update(self.document_content)
markdown_widget.remove_class("hidden")
image_widget.add_class("hidden")
plaintext_widget.add_class("hidden")
else:
toggle_button.label = "Mode \U000f0f5b|\U000f021a"
plaintext_widget.update(self.plain_text_content)
plaintext_widget.remove_class("hidden")
image_widget.add_class("hidden")
markdown_widget.add_class("hidden")
@work
async def export_and_open_converted_file(self) -> None:
"""Export the file in converted format and open it."""
if not self.is_convertible_format():
self.notify("This file format cannot be converted.", severity="warning")
return
conversion_format = self.get_conversion_format()
if not conversion_format:
self.notify("No appropriate conversion format found.", severity="error")
return
try:
# Build the URL with the format parameter
url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content?format={conversion_format}"
headers = {"Authorization": f"Bearer {self.access_token}"}
# Download the converted file
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(
f"Failed to export document: {error_text}", severity="error"
)
return
converted_content = await response.read()
# Create temporary file with the right extension
file_name = (
f"{os.path.splitext(self.item_name)[0]}.{conversion_format}"
)
with tempfile.NamedTemporaryFile(
suffix=f".{conversion_format}",
delete=False,
prefix=f"onedrive_export_",
) as temp_file:
temp_file.write(converted_content)
temp_path = temp_file.name
# Open the file using the system default application
self.notify(
f"Opening exported {conversion_format.upper()} file: {file_name}"
)
self.app.open_url(f"file://{temp_path}")
self.query_one("#content_container").loading = False
except Exception as e:
self.notify(f"Error exporting document: {str(e)}", severity="error")
async def action_toggle_mode(self) -> None:
"""Toggle between Markdown and plaintext display modes."""
self.notify("Switching Modes", severity="info")
self.mode = (
DisplayMode.MARKDOWN
if self.mode != DisplayMode.MARKDOWN
else DisplayMode.TEXT
)
self.update_content_display()
mode_name = str(self.mode).capitalize()
self.notify(f"Switched to {mode_name} mode")
async def action_export_and_open(self) -> None:
"""Export the file in converted format and open it."""
self.query_one("#content_container").loading = True
self.notify("Exporting and opening the converted file...")
self.export_and_open_converted_file()
async def action_close(self) -> None:
"""Close the document viewer screen."""
self.dismiss()

View File

@@ -0,0 +1,555 @@
"""Link panel for viewing and opening URLs from email messages."""
import re
import webbrowser
from dataclasses import dataclass, field
from typing import List, Optional
from urllib.parse import urlparse
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Label, ListView, ListItem, Static
from src.mail.config import get_config
@dataclass
class LinkItem:
"""Represents a link extracted from an email."""
url: str
label: str # Derived from anchor text or URL
domain: str # Extracted for display
short_display: str # Truncated/friendly display
context: str = "" # Surrounding text for context
mnemonic: str = "" # Quick-select key hint
@classmethod
def from_url(
cls,
url: str,
anchor_text: str = "",
context: str = "",
max_display_len: int = 60,
) -> "LinkItem":
"""Create a LinkItem from a URL and optional anchor text."""
parsed = urlparse(url)
domain = parsed.netloc.replace("www.", "")
# Use anchor text as label if available, otherwise derive from URL
if anchor_text and anchor_text.strip():
label = anchor_text.strip()
else:
# Try to derive a meaningful label from the URL path
label = cls._derive_label_from_url(parsed)
# Create short display version
short_display = cls._shorten_url(url, domain, parsed.path, max_display_len)
return cls(
url=url,
label=label,
domain=domain,
short_display=short_display,
context=context[:80] if context else "",
)
@staticmethod
def _derive_label_from_url(parsed) -> str:
"""Derive a human-readable label from URL components."""
path = parsed.path.strip("/")
if not path:
return parsed.netloc
# Split path and take last meaningful segment
segments = [s for s in path.split("/") if s]
if segments:
last = segments[-1]
# Remove file extensions
last = re.sub(r"\.[a-zA-Z0-9]+$", "", last)
# Replace common separators with spaces
last = re.sub(r"[-_]", " ", last)
# Capitalize words
return last.title()[:40]
return parsed.netloc
@staticmethod
def _shorten_url(url: str, domain: str, path: str, max_len: int) -> str:
"""Create a shortened, readable version of the URL.
Intelligently shortens URLs by:
- Special handling for known sites (GitHub, Google Docs, Jira, GitLab)
- Keeping first and last path segments, eliding middle only if needed
- Adapting to available width
"""
# Special handling for common sites
path = path.strip("/")
# GitHub: user/repo/issues/123 -> user/repo #123
if "github.com" in domain:
match = re.match(r"([^/]+/[^/]+)/(issues|pull)/(\d+)", path)
if match:
repo, type_, num = match.groups()
icon = "#" if type_ == "issues" else "PR#"
return f"{domain} > {repo} {icon}{num}"
match = re.match(r"([^/]+/[^/]+)", path)
if match:
return f"{domain} > {match.group(1)}"
# Google Docs
if "docs.google.com" in domain:
if "/document/" in path:
return f"{domain} > Document"
if "/spreadsheets/" in path:
return f"{domain} > Spreadsheet"
if "/presentation/" in path:
return f"{domain} > Slides"
# Jira/Atlassian
if "atlassian.net" in domain or "jira" in domain.lower():
match = re.search(r"([A-Z]+-\d+)", path)
if match:
return f"{domain} > {match.group(1)}"
# GitLab
if "gitlab" in domain.lower():
match = re.match(r"([^/]+/[^/]+)/-/(issues|merge_requests)/(\d+)", path)
if match:
repo, type_, num = match.groups()
icon = "#" if type_ == "issues" else "MR!"
return f"{domain} > {repo} {icon}{num}"
# Generic shortening - keep URL readable
if len(url) <= max_len:
return url
# Build shortened path, keeping as many segments as fit
path_parts = [p for p in path.split("/") if p]
if not path_parts:
return domain
# Try to fit the full path first
full_path = "/".join(path_parts)
result = f"{domain} > {full_path}"
if len(result) <= max_len:
return result
# Keep first segment + last two segments if possible
if len(path_parts) >= 3:
short_path = f"{path_parts[0]}/.../{path_parts[-2]}/{path_parts[-1]}"
result = f"{domain} > {short_path}"
if len(result) <= max_len:
return result
# Keep first + last segment
if len(path_parts) >= 2:
short_path = f"{path_parts[0]}/.../{path_parts[-1]}"
result = f"{domain} > {short_path}"
if len(result) <= max_len:
return result
# Just last segment
result = f"{domain} > .../{path_parts[-1]}"
if len(result) <= max_len:
return result
# Truncate with ellipsis as last resort
result = f"{domain} > {path_parts[-1]}"
if len(result) > max_len:
result = result[: max_len - 3] + "..."
return result
def extract_links_from_content(content: str) -> List[LinkItem]:
"""Extract all links from HTML or markdown content."""
links: List[LinkItem] = []
seen_urls: set = set()
# Pattern for HTML links: <a href="...">text</a>
html_pattern = r'<a\s+[^>]*href=["\']([^"\']+)["\'][^>]*>([^<]*)</a>'
for match in re.finditer(html_pattern, content, re.IGNORECASE):
url, anchor_text = match.groups()
if url and url not in seen_urls and _is_valid_url(url):
# Get surrounding context
start = max(0, match.start() - 40)
end = min(len(content), match.end() + 40)
context = _clean_context(content[start:end])
links.append(LinkItem.from_url(url, anchor_text, context))
seen_urls.add(url)
# Pattern for markdown links: [text](url)
md_pattern = r"\[([^\]]+)\]\(([^)]+)\)"
for match in re.finditer(md_pattern, content):
anchor_text, url = match.groups()
if url and url not in seen_urls and _is_valid_url(url):
start = max(0, match.start() - 40)
end = min(len(content), match.end() + 40)
context = _clean_context(content[start:end])
links.append(LinkItem.from_url(url, anchor_text, context))
seen_urls.add(url)
# Pattern for bare URLs
url_pattern = r'https?://[^\s<>"\'\)]+[^\s<>"\'\.\,\)\]]'
for match in re.finditer(url_pattern, content):
url = match.group(0)
if url not in seen_urls and _is_valid_url(url):
start = max(0, match.start() - 40)
end = min(len(content), match.end() + 40)
context = _clean_context(content[start:end])
links.append(LinkItem.from_url(url, "", context))
seen_urls.add(url)
# Assign mnemonic hints
_assign_mnemonics(links)
return links
def _is_valid_url(url: str) -> bool:
"""Check if a URL is valid and worth displaying."""
if not url:
return False
# Skip mailto, tel, javascript, etc.
if re.match(r"^(mailto|tel|javascript|data|#):", url, re.IGNORECASE):
return False
# Skip very short URLs or fragments
if len(url) < 10:
return False
# Must start with http/https
if not url.startswith(("http://", "https://")):
return False
return True
def _clean_context(context: str) -> str:
"""Clean up context string for display."""
# Remove HTML tags
context = re.sub(r"<[^>]+>", "", context)
# Normalize whitespace
context = " ".join(context.split())
return context.strip()
def _assign_mnemonics(links: List[LinkItem]) -> None:
"""Assign unique mnemonic key hints to links."""
used_mnemonics: set = set()
# Characters to use for mnemonics (easily typeable)
# Exclude keys used by app/panel bindings: h,j,k,l (navigation), q (quit),
# b (page up), e (archive), o (open), s (sort), t (task), w (toggle), x (select)
reserved_keys = set("hjklqbeostwx")
available_chars = "".join(
c for c in "asdfgqwertyuiopzxcvbnm" if c not in reserved_keys
)
for link in links:
mnemonic = None
# Try first letter of label (prioritize link text over domain)
if link.label:
first = link.label[0].lower()
if first in available_chars and first not in used_mnemonics:
mnemonic = first
used_mnemonics.add(first)
# Try first letter of domain as fallback
if not mnemonic and link.domain:
first = link.domain[0].lower()
if first in available_chars and first not in used_mnemonics:
mnemonic = first
used_mnemonics.add(first)
# Try other letters from label
if not mnemonic and link.label:
for char in link.label.lower():
if char in available_chars and char not in used_mnemonics:
mnemonic = char
used_mnemonics.add(char)
break
# Try first two letters combined logic
if not mnemonic:
# Try label word initials
candidates = []
if link.label:
words = link.label.split()
if len(words) >= 2:
candidates.append((words[0][0] + words[1][0]).lower())
if link.domain and len(link.domain) > 1:
candidates.append(link.domain[:2].lower())
for candidate in candidates:
if len(candidate) == 2 and candidate not in used_mnemonics:
# Check both chars are available
if all(c in available_chars for c in candidate):
mnemonic = candidate
used_mnemonics.add(candidate)
break
# Fallback: find any unused character
if not mnemonic:
for char in available_chars:
if char not in used_mnemonics:
mnemonic = char
used_mnemonics.add(char)
break
link.mnemonic = mnemonic or ""
class LinkListItem(Static):
"""Widget for displaying a single link in the list."""
DEFAULT_CSS = """
LinkListItem {
height: auto;
width: 1fr;
padding: 0 1;
}
"""
def __init__(self, link: LinkItem, index: int, **kwargs):
super().__init__(**kwargs)
self.link = link
self.index = index
def render(self) -> str:
"""Render the link item using Rich markup."""
mnemonic = self.link.mnemonic if self.link.mnemonic else "?"
# Line 1: [mnemonic] domain - label
line1 = (
f"[bold cyan]\\[{mnemonic}][/] [dim]{self.link.domain}[/] {self.link.label}"
)
# Line 2: shortened URL (indented)
line2 = f" [dim italic]{self.link.short_display}[/]"
return f"{line1}\n{line2}"
class LinkPanel(ModalScreen):
"""Side panel for viewing and opening links from the current message."""
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("enter", "open_selected", "Open Link"),
Binding("j", "next_link", "Next"),
Binding("k", "prev_link", "Previous"),
]
DEFAULT_CSS = """
LinkPanel {
align: right middle;
}
LinkPanel #link-panel-container {
dock: right;
width: 50%;
min-width: 60;
max-width: 100;
height: 100%;
background: $surface;
border: round $primary;
padding: 1;
}
LinkPanel #link-panel-container:focus-within {
border: round $accent;
}
LinkPanel .link-panel-title {
text-style: bold;
padding: 0 0 1 0;
color: $text;
}
LinkPanel .link-panel-hint {
color: $text-muted;
padding: 0 0 1 0;
}
LinkPanel #link-list {
height: 1fr;
scrollbar-size: 1 1;
}
LinkPanel #link-list > ListItem {
height: auto;
padding: 0;
}
LinkPanel #link-list > ListItem:hover {
background: $boost;
}
LinkPanel #link-list > ListItem.-highlight {
background: $accent 30%;
}
LinkPanel .no-links-label {
color: $text-muted;
padding: 2;
text-align: center;
}
"""
def __init__(self, links: List[LinkItem], **kwargs):
super().__init__(**kwargs)
self.links = links
self._mnemonic_map: dict[str, LinkItem] = {
link.mnemonic: link for link in links if link.mnemonic
}
self._key_buffer: str = ""
self._key_timer = None
# Check if we have any multi-char mnemonics
self._has_multi_char = any(len(m) > 1 for m in self._mnemonic_map.keys())
def compose(self) -> ComposeResult:
with Container(id="link-panel-container"):
yield Label("\uf0c1 Links", classes="link-panel-title") # nf-fa-link
yield Label(
"j/k: navigate, enter: open, esc: close",
classes="link-panel-hint",
)
if self.links:
with ListView(id="link-list"):
for i, link in enumerate(self.links):
yield ListItem(LinkListItem(link, i))
else:
yield Label("No links found in this message.", classes="no-links-label")
def on_mount(self) -> None:
self.query_one("#link-panel-container").border_title = "Links"
self.query_one(
"#link-panel-container"
).border_subtitle = f"{len(self.links)} found"
if self.links:
self.query_one("#link-list").focus()
def on_key(self, event) -> None:
"""Handle mnemonic key presses with buffering for multi-char mnemonics."""
key = event.key.lower()
# Only buffer alphabetic keys
if not key.isalpha() or len(key) != 1:
return
# Cancel any pending timer
if self._key_timer:
self._key_timer.stop()
self._key_timer = None
# Add key to buffer
self._key_buffer += key
# Check for exact match with buffered keys
if self._key_buffer in self._mnemonic_map:
# If no multi-char mnemonics exist, open immediately
if not self._has_multi_char:
self._open_link(self._mnemonic_map[self._key_buffer])
self._key_buffer = ""
event.prevent_default()
return
# Check if any longer mnemonic starts with our buffer
has_longer_match = any(
m.startswith(self._key_buffer) and len(m) > len(self._key_buffer)
for m in self._mnemonic_map.keys()
)
if has_longer_match:
# Wait for possible additional keys
self._key_timer = self.set_timer(0.4, self._flush_key_buffer)
else:
# No longer matches possible, open immediately
self._open_link(self._mnemonic_map[self._key_buffer])
self._key_buffer = ""
event.prevent_default()
return
# Check if buffer could still match something
could_match = any(
m.startswith(self._key_buffer) for m in self._mnemonic_map.keys()
)
if could_match:
# Wait for more keys
self._key_timer = self.set_timer(0.4, self._flush_key_buffer)
event.prevent_default()
else:
# No possible match, clear buffer
self._key_buffer = ""
def _flush_key_buffer(self) -> None:
"""Called after timeout to process buffered keys."""
self._key_timer = None
if self._key_buffer and self._key_buffer in self._mnemonic_map:
self._open_link(self._mnemonic_map[self._key_buffer])
self._key_buffer = ""
def action_open_selected(self) -> None:
"""Open the currently selected link."""
if not self.links:
return
try:
link_list = self.query_one("#link-list", ListView)
if link_list.index is not None and 0 <= link_list.index < len(self.links):
self._open_link(self.links[link_list.index])
except Exception:
pass
def action_next_link(self) -> None:
"""Move to next link."""
try:
link_list = self.query_one("#link-list", ListView)
if link_list.index is not None and link_list.index < len(self.links) - 1:
link_list.index += 1
except Exception:
pass
def action_prev_link(self) -> None:
"""Move to previous link."""
try:
link_list = self.query_one("#link-list", ListView)
if link_list.index is not None and link_list.index > 0:
link_list.index -= 1
except Exception:
pass
def _open_link(self, link: LinkItem) -> None:
"""Open a link in the default browser."""
try:
webbrowser.open(link.url)
self.app.notify(f"Opened: {link.short_display}", title="Link Opened")
# Only dismiss if configured to close on open
config = get_config()
if config.link_panel.close_on_open:
self.dismiss()
except Exception as e:
self.app.notify(f"Failed to open link: {e}", severity="error")
@on(ListView.Selected)
def on_list_selected(self, event: ListView.Selected) -> None:
"""Handle list item selection (Enter key or click)."""
if event.list_view.index is not None and 0 <= event.list_view.index < len(
self.links
):
self._open_link(self.links[event.list_view.index])

View File

@@ -0,0 +1,35 @@
from textual import on
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.widgets import Input, Label, Button
from textual.containers import Horizontal
class OpenMessageScreen(ModalScreen[int | None]):
def compose(self) -> ComposeResult:
yield Horizontal(
Label("📨 ID", id="message_label"),
Input(
placeholder="Enter message ID (integer only)",
type="integer",
id="open_message_input",
),
Button("Cancel", id="cancel"),
Button("Open", variant="primary", id="submit"),
id="open_message_container",
classes="modal_screen",
)
@on(Input.Submitted)
def handle_message_id(self, event) -> None:
input_widget = self.query_one("#open_message_input", Input)
message_id = int(input_widget.value if input_widget.value else 0)
self.dismiss(message_id)
def button_on_click(self, event) -> None:
if event.button.id == "cancel":
self.dismiss()
elif event.button.id == "submit":
input_widget = self.query_one("#open_message_input", Input)
message_id = int(input_widget.value if input_widget.value else 0)
self.dismiss(message_id)

View File

@@ -0,0 +1,16 @@
# Initialize the screens package
from .CreateTask import CreateTaskScreen
from .OpenMessage import OpenMessageScreen
from .DocumentViewer import DocumentViewerScreen
from .LinkPanel import LinkPanel, LinkItem, extract_links_from_content
from .ConfirmDialog import ConfirmDialog
__all__ = [
"CreateTaskScreen",
"OpenMessageScreen",
"DocumentViewerScreen",
"LinkPanel",
"LinkItem",
"extract_links_from_content",
"ConfirmDialog",
]

42
src/mail/utils.py Normal file
View File

@@ -0,0 +1,42 @@
from datetime import UTC, datetime, timedelta
import re
from typing import List, Dict
def group_envelopes_by_date(envelopes: List[Dict]) -> List[Dict]:
"""Group envelopes by date and add headers for each group."""
grouped_envelopes = []
today = datetime.now().astimezone(UTC)
yesterday = today - timedelta(days=1)
start_of_week = today - timedelta(days=today.weekday())
start_of_last_week = start_of_week - timedelta(weeks=1)
start_of_month = today.replace(day=1)
start_of_last_month = (start_of_month - timedelta(days=1)).replace(day=1)
def get_group_label(date: datetime) -> str:
if date.date() == today.date():
return "Today"
elif date.date() == yesterday.date():
return "Yesterday"
elif date >= start_of_week:
return "This Week"
elif date >= start_of_last_week:
return "Last Week"
elif date >= start_of_month:
return "This Month"
elif date >= start_of_last_month:
return "Last Month"
else:
return "Older"
current_group = None
for envelope in envelopes:
envelope_date = re.sub(r"[\+\-]\d\d:\d\d", "", envelope["date"])
envelope_date = datetime.strptime(envelope_date, "%Y-%m-%d %H:%M").astimezone(UTC)
group_label = get_group_label(envelope_date)
if group_label != current_group:
grouped_envelopes.append({"type": "header", "label": group_label})
current_group = group_label
grouped_envelopes.append(envelope)
return grouped_envelopes

View File

@@ -0,0 +1,213 @@
from markitdown import MarkItDown
from textual import work
from textual.binding import Binding
from textual.containers import Vertical, ScrollableContainer
from textual.widgets import Static, Markdown, Label
from textual.reactive import reactive
from src.services.himalaya import client as himalaya_client
from src.mail.config import get_config
from src.mail.screens.LinkPanel import extract_links_from_content, LinkItem
import logging
from datetime import datetime
from typing import Literal, List
import re
import os
import sys
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class EnvelopeHeader(Vertical):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.subject_label = Label("")
self.from_label = Label("")
self.to_label = Label("")
self.date_label = Label("")
self.cc_label = Label("")
def on_mount(self):
self.styles.height = "auto"
self.mount(self.subject_label)
self.mount(self.from_label)
self.mount(self.to_label)
self.mount(self.cc_label)
self.mount(self.date_label)
def update(self, subject, from_, to, date, cc=None):
self.subject_label.update(f"[b]Subject:[/b] {subject}")
self.from_label.update(f"[b]From:[/b] {from_}")
self.to_label.update(f"[b]To:[/b] {to}")
# Format the date for better readability
if date:
try:
# Try to convert the date string to a datetime object
date_obj = datetime.fromisoformat(date.replace("Z", "+00:00"))
formatted_date = date_obj.strftime("%a, %d %b %Y %H:%M:%S %Z")
self.date_label.update(f"[b]Date:[/b] {formatted_date}")
except (ValueError, TypeError):
# If parsing fails, just use the original date string
self.date_label.update(f"[b]Date:[/b] {date}")
else:
self.date_label.update("[b]Date:[/b] Unknown")
if cc:
self.cc_label.update(f"[b]CC:[/b] {cc}")
self.cc_label.styles.display = "block"
else:
self.cc_label.styles.display = "none"
class ContentContainer(ScrollableContainer):
"""Container for displaying email content with toggleable view modes."""
can_focus = True
# Reactive to track view mode and update UI
current_mode: reactive[Literal["markdown", "html"]] = reactive("markdown")
BINDINGS = [
Binding("m", "toggle_mode", "Toggle View Mode"),
]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.md = MarkItDown()
self.header = EnvelopeHeader(id="envelope_header")
self.content = Markdown("", id="markdown_content")
self.html_content = Static("", id="html_content", markup=False)
self.current_content = None
self.current_message_id = None
self.content_worker = None
# Load default view mode from config
config = get_config()
self.current_mode = config.content_display.default_view_mode
def compose(self):
yield self.content
yield self.html_content
def on_mount(self):
# Set initial display based on config default
self._apply_view_mode()
self._update_mode_indicator()
def watch_current_mode(self, old_mode: str, new_mode: str) -> None:
"""React to mode changes."""
self._apply_view_mode()
self._update_mode_indicator()
def _apply_view_mode(self) -> None:
"""Apply the current view mode to widget visibility."""
if self.current_mode == "markdown":
self.html_content.styles.display = "none"
self.content.styles.display = "block"
else:
self.content.styles.display = "none"
self.html_content.styles.display = "block"
def _update_mode_indicator(self) -> None:
"""Update the border subtitle to show current mode."""
mode_label = "Markdown" if self.current_mode == "markdown" else "HTML/Text"
mode_icon = (
"\ue73e" if self.current_mode == "markdown" else "\uf121"
) # nf-md-language_markdown / nf-fa-code
self.border_subtitle = f"{mode_icon} {mode_label}"
async def action_toggle_mode(self):
"""Toggle between markdown and HTML viewing modes."""
if self.current_mode == "html":
self.current_mode = "markdown"
else:
self.current_mode = "html"
# Reload the content if we have a message ID
if self.current_message_id:
self.display_content(self.current_message_id)
def update_header(self, subject, from_, to, date, cc=None):
self.header.update(subject, from_, to, date, cc)
@work(exclusive=True)
async def fetch_message_content(self, message_id: int, format: str):
"""Fetch message content using the Himalaya client module."""
if not message_id:
self.notify("No message ID provided.")
return
content, success = await himalaya_client.get_message_content(message_id)
if success:
self._update_content(content)
else:
self.notify(f"Failed to fetch content for message ID {message_id}.")
def display_content(self, message_id: int) -> None:
"""Display the content of a message."""
if not message_id:
return
self.current_message_id = message_id
# Immediately show a loading message
if self.current_mode == "markdown":
self.content.update("Loading...")
else:
self.html_content.update("Loading...")
# Cancel any existing content fetch operations
if self.content_worker:
self.content_worker.cancel()
# Fetch content in the current mode
format_type = "text" if self.current_mode == "markdown" else "html"
self.content_worker = self.fetch_message_content(message_id, format_type)
def _update_content(self, content: str | None) -> None:
"""Update the content widgets with the fetched content."""
if content is None:
content = "(No content)"
# Store the raw content for link extraction
self.current_content = content
try:
if self.current_mode == "markdown":
# For markdown mode, use the Markdown widget
self.content.update(content)
else:
# For HTML mode, use the Static widget with markup
# First, try to extract the body content if it's HTML
body_match = re.search(
r"<body[^>]*>(.*?)</body>", content, re.DOTALL | re.IGNORECASE
)
if body_match:
content = body_match.group(1)
# Replace some common HTML elements with Textual markup
content = content.replace("<b>", "[b]").replace("</b>", "[/b]")
content = content.replace("<i>", "[i]").replace("</i>", "[/i]")
content = content.replace("<u>", "[u]").replace("</u>", "[/u]")
# Convert links to a readable format
content = re.sub(
r'<a href="([^"]+)"[^>]*>([^<]+)</a>', r"[\2](\1)", content
)
# Add CSS for better readability
self.html_content.update(content)
except Exception as e:
logging.error(f"Error updating content: {e}")
if self.current_mode == "markdown":
self.content.update(f"Error displaying content: {e}")
else:
self.html_content.update(f"Error displaying content: {e}")
def get_links(self) -> List[LinkItem]:
"""Extract and return links from the current message content."""
if not self.current_content:
return []
return extract_links_from_content(self.current_content)

View File

@@ -0,0 +1,115 @@
from textual.reactive import Reactive
from textual.app import ComposeResult
from textual.widgets import Label
from textual.containers import Horizontal, ScrollableContainer
from datetime import datetime
import re
from datetime import UTC
class EnvelopeHeader(ScrollableContainer):
subject = Reactive("")
from_ = Reactive("")
to = Reactive("")
date = Reactive("")
cc = Reactive("")
bcc = Reactive("")
"""Header for the email viewer."""
def on_mount(self) -> None:
"""Mount the header."""
def compose(self) -> ComposeResult:
yield Horizontal(
Label("Subject:", classes="header_key"),
Label(self.subject, classes="header_value",
markup=False, id="subject"),
)
yield Horizontal(
Label("Date:", classes="header_key"),
Label(self.date, classes="header_value", markup=False, id="date"),
)
# yield Horizontal(
# Label("From:", classes="header_key"),
# Label(self.from_,
# classes="header_value", markup=False, id="from"),
# )
# yield Horizontal(
# Label("To:", classes="header_key"),
# Label(self.to, classes="header_value",
# markup=False, id="to"),
# )
# yield Horizontal(
# )
# yield Horizontal(
# Label("CC:", classes="header_key"),
# Label(self.cc, classes="header_value",
# markup=False, id="cc"),
# )
def watch_subject(self, subject: str) -> None:
"""Watch the subject for changes."""
self.query_one("#subject", Label).update(subject)
# def watch_to(self, to: str) -> None:
# """Watch the to field for changes."""
# self.query_one("#to").update(to)
# def watch_from(self, from_: str) -> None:
# """Watch the from field for changes."""
# self.query_one("#from").update(from_)
def watch_date(self, date: str) -> None:
"""Watch the date for changes and convert to local timezone."""
if date:
try:
# If date already has timezone info, parse it
if any(x in date for x in ['+', '-', 'Z']):
# Try parsing with timezone info
try:
# Handle ISO format with Z suffix
if 'Z' in date:
parsed_date = datetime.fromisoformat(
date.replace('Z', '+00:00'))
else:
parsed_date = datetime.fromisoformat(date)
except ValueError:
# Try another common format
parsed_date = datetime.strptime(
date, "%Y-%m-%d %H:%M%z")
else:
# No timezone info, assume UTC
try:
parsed_date = datetime.strptime(
date, "%Y-%m-%d %H:%M").replace(tzinfo=UTC)
except ValueError:
# If regular parsing fails, try to extract date cmpnts
match = re.search(
r"(\d{4}-\d{2}-\d{2})\s+(\d{2}:\d{2})", date)
if match:
date_part, time_part = match.groups()
parsed_date = datetime.strptime(
f"{date_part} {time_part}", "%Y-%m-%d %H:%M"
).replace(tzinfo=UTC)
else:
# If all else fails, just use the original string
self.query_one("#date", Label).update(date)
return
# Convert to local timezone
local_date = parsed_date.astimezone()
# Format for display
formatted_date = local_date.strftime("%a %b %d %H:%M (%Z)")
self.query_one("#date", Label).update(formatted_date)
except Exception:
# If parsing fails, just display the original date
self.query_one("#date", Label).update(f"{date}")
else:
self.query_one("#date", Label).update("")
# def watch_cc(self, cc: str) -> None:
# """Watch the cc field for changes."""
# self.query_one("#cc").update(cc)

View File

@@ -0,0 +1,258 @@
"""Custom widget for rendering envelope list items with configurable display."""
from datetime import datetime
from typing import Any, Dict, Optional
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widgets import Label, Static
from src.mail.config import EnvelopeDisplayConfig, get_config
class EnvelopeListItem(Static):
"""A widget for rendering a single envelope in the list.
Supports configurable layout:
- 2-line mode: sender/date on line 1, subject on line 2
- 3-line mode: adds a preview line
Displays read/unread status with NerdFont icons.
"""
DEFAULT_CSS = """
EnvelopeListItem {
height: auto;
width: 1fr;
padding: 0;
}
EnvelopeListItem .envelope-row-1 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-2 {
height: 1;
width: 1fr;
}
EnvelopeListItem .envelope-row-3 {
height: 1;
width: 1fr;
}
EnvelopeListItem .status-icon {
width: 2;
padding: 0 1 0 0;
}
EnvelopeListItem .checkbox {
width: 2;
padding: 0 1 0 0;
}
EnvelopeListItem .sender-name {
width: 1fr;
color: $text-muted;
}
EnvelopeListItem .message-datetime {
width: auto;
padding: 0 1;
color: $text-disabled;
}
EnvelopeListItem .email-subject {
width: 1fr;
padding: 0 3;
color: $text-muted;
}
EnvelopeListItem .email-preview {
width: 1fr;
padding: 0 3;
color: $text-muted;
}
EnvelopeListItem.unread .sender-name {
text-style: bold;
color: $text;
}
EnvelopeListItem.unread .message-datetime {
color: $text-muted;
}
EnvelopeListItem.unread .email-subject {
text-style: bold;
color: $text;
}
"""
def __init__(
self,
envelope: Dict[str, Any],
config: Optional[EnvelopeDisplayConfig] = None,
is_selected: bool = False,
**kwargs,
):
"""Initialize the envelope list item.
Args:
envelope: The envelope data dictionary from himalaya
config: Display configuration (uses global config if not provided)
is_selected: Whether this item is currently selected
"""
super().__init__(**kwargs)
self.envelope = envelope
self.config = config or get_config().envelope_display
self._is_selected = is_selected
# Parse envelope data
self._parse_envelope()
def _parse_envelope(self) -> None:
"""Parse envelope data into display-ready values."""
# Get sender info
from_data = self.envelope.get("from", {})
self.sender_name = from_data.get("name") or from_data.get("addr", "Unknown")
if not self.sender_name:
self.sender_name = from_data.get("addr", "Unknown")
# Truncate sender name if needed
max_len = self.config.max_sender_length
if len(self.sender_name) > max_len:
self.sender_name = self.sender_name[: max_len - 1] + "\u2026" # ellipsis
# Get subject
self.subject = str(self.envelope.get("subject", "")).strip() or "(No subject)"
# Parse date
self.formatted_datetime = self._format_datetime()
# Get read/unread status (himalaya uses "flags" field)
flags = self.envelope.get("flags", [])
self.is_read = "Seen" in flags if isinstance(flags, list) else False
self.is_flagged = "Flagged" in flags if isinstance(flags, list) else False
self.has_attachment = (
"Attachments" in flags if isinstance(flags, list) else False
)
# Message ID for selection tracking
self.message_id = int(self.envelope.get("id", 0))
def _format_datetime(self) -> str:
"""Format the message date/time according to config."""
date_str = self.envelope.get("date", "")
if not date_str:
return ""
try:
# Parse ISO format date
if "Z" in date_str:
date_str = date_str.replace("Z", "+00:00")
dt = datetime.fromisoformat(date_str)
# Convert to local timezone
dt = dt.astimezone()
parts = []
if self.config.show_date:
parts.append(dt.strftime(self.config.date_format))
if self.config.show_time:
parts.append(dt.strftime(self.config.time_format))
return " ".join(parts)
except (ValueError, TypeError):
return "Invalid Date"
def compose(self) -> ComposeResult:
"""Compose the widget layout."""
# Determine status icon
if self.is_read:
status_icon = self.config.icon_read
status_class = "status-icon read"
else:
status_icon = self.config.icon_unread
status_class = "status-icon unread"
# Add flagged/attachment indicators
extra_icons = ""
if self.is_flagged:
extra_icons += f" {self.config.icon_flagged}"
if self.has_attachment:
extra_icons += f" {self.config.icon_attachment}"
# Build the layout based on config.lines
with Vertical(classes="envelope-content"):
# Row 1: Status icon, checkbox, sender, datetime
with Horizontal(classes="envelope-row-1"):
yield Label(status_icon + extra_icons, classes=status_class)
if self.config.show_checkbox:
checkbox_char = "\uf4a7" if self._is_selected else "\ue640"
yield Label(checkbox_char, classes="checkbox")
yield Label(self.sender_name, classes="sender-name", markup=False)
yield Label(self.formatted_datetime, classes="message-datetime")
# Row 2: Subject
with Horizontal(classes="envelope-row-2"):
yield Label(self.subject, classes="email-subject", markup=False)
# Row 3: Preview (only in 3-line mode with preview enabled)
if self.config.lines == 3 and self.config.show_preview:
preview = self.envelope.get("preview", "")[:60]
if preview:
with Horizontal(classes="envelope-row-3"):
yield Label(preview, classes="email-preview", markup=False)
def on_mount(self) -> None:
"""Set up classes on mount."""
if not self.is_read:
self.add_class("unread")
if self._is_selected:
self.add_class("selected")
def set_selected(self, selected: bool) -> None:
"""Update the selection state."""
self._is_selected = selected
if selected:
self.add_class("selected")
else:
self.remove_class("selected")
# Update checkbox display
if self.config.show_checkbox:
try:
checkbox = self.query_one(".checkbox", Label)
checkbox.update("\uf4a7" if selected else "\ue640")
except Exception:
pass # Widget may not be mounted yet
class GroupHeader(Static):
"""A header widget for grouping envelopes by date."""
DEFAULT_CSS = """
GroupHeader {
height: 1;
width: 1fr;
background: $surface;
color: $text-muted;
text-style: bold;
padding: 0 1;
}
"""
def __init__(self, label: str, **kwargs):
"""Initialize the group header.
Args:
label: The header label (e.g., "Today", "December 2025")
"""
super().__init__(**kwargs)
self.label = label
def compose(self) -> ComposeResult:
"""Compose just returns itself as a Static."""
yield Label(self.label, classes="group-header-label")

View File

@@ -0,0 +1,6 @@
# Initialize the widgets subpackage
from .ContentContainer import ContentContainer
from .EnvelopeHeader import EnvelopeHeader
from .EnvelopeListItem import EnvelopeListItem, GroupHeader
__all__ = ["ContentContainer", "EnvelopeHeader", "EnvelopeListItem", "GroupHeader"]