from .config import get_config, MailAppConfig from .message_store import MessageStore from .widgets.ContentContainer import ContentContainer from .widgets.EnvelopeListItem import EnvelopeListItem, GroupHeader from .screens.LinkPanel import LinkPanel from .screens.ConfirmDialog import ConfirmDialog from .screens.SearchPanel import SearchPanel from src.mail.screens.HelpScreen import HelpScreen from .actions.task import action_create_task from .actions.open import action_open from .actions.delete import delete_current from .actions.calendar_invite import ( action_accept_invite, action_decline_invite, action_tentative_invite, ) from src.services.taskwarrior import client as taskwarrior_client from src.services.himalaya import client as himalaya_client from src.utils.shared_config import get_theme_name from src.utils.ipc import IPCListener, IPCMessage from textual.containers import Container, ScrollableContainer, Vertical, Horizontal from textual.timer import Timer from textual.binding import Binding from textual.reactive import reactive, Reactive from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem from textual.screen import Screen from textual.logging import TextualHandler from textual.app import App, ComposeResult, SystemCommand, RenderResult from textual.worker import Worker from textual import work import sys import os from datetime import UTC, datetime import logging from typing import Iterable, Optional, List, Dict, Any, Generator, Tuple # Add the parent directory to the system path to resolve relative imports sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import our new API modules # Updated imports with correct relative paths logging.basicConfig( level="NOTSET", handlers=[TextualHandler()], ) class StatusTitle(Static): total_messages: Reactive[int] = reactive(0) current_message_index: Reactive[int] = reactive(0) current_message_id: Reactive[int] = reactive(1) folder: Reactive[str] = reactive("INBOX") def render(self) -> RenderResult: return f"{self.folder} | ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}" class EmailViewerApp(App): """A simple email viewer app using the Himalaya CLI.""" CSS_PATH = "email_viewer.tcss" title = "Maildir GTD Reader" current_message_id: Reactive[int] = reactive(0) current_message_index: Reactive[int] = reactive(0) highlighted_message_index: Reactive[int] = reactive(0) folder = reactive("INBOX") current_account: Reactive[str] = reactive("") # Empty string = default account header_expanded = reactive(False) reload_needed = reactive(True) message_store = MessageStore() oldest_id: Reactive[int] = reactive(0) newest_id: Reactive[int] = reactive(0) msg_worker: Worker | None = None total_messages: Reactive[int] = reactive(0) status_title = reactive("Message View") sort_order_ascending: Reactive[bool] = reactive(True) selected_messages: Reactive[set[int]] = reactive(set()) main_content_visible: Reactive[bool] = reactive(True) search_query: Reactive[str] = reactive("") # Current search filter search_mode: Reactive[bool] = reactive(False) # True when showing search results _cached_envelopes: List[Dict[str, Any]] = [] # Cached envelopes before search _cached_metadata: Dict[int, Dict[str, Any]] = {} # Cached metadata before search def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]: yield from super().get_system_commands(screen) yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next) yield SystemCommand( "Previous Message", "Navigate to Previous ID", self.action_previous ) yield SystemCommand( "Delete Message", "Delete the current message", self.action_delete ) yield SystemCommand( "Archive Message", "Archive the current message", self.action_archive ) yield SystemCommand( "Open Message", "Open a specific message by ID", self.action_open ) yield SystemCommand( "Create Task", "Create a task using the task CLI", self.action_create_task ) yield SystemCommand( "Oldest Message", "Show the oldest message", self.action_oldest ) yield SystemCommand( "Newest Message", "Show the newest message", self.action_newest ) yield SystemCommand("Reload", "Reload the message list", self.fetch_envelopes) BINDINGS = [ Binding("j", "next", "Next message"), Binding("k", "previous", "Previous message"), Binding("#", "delete", "Delete message"), Binding("e", "archive", "Archive message"), Binding("o", "open", "Open message", show=False), Binding("q", "quit", "Quit application"), Binding("h", "toggle_header", "Toggle Envelope Header"), Binding("t", "create_task", "Create Task"), Binding("l", "open_links", "Show Links"), Binding("r", "reload", "Reload message list"), Binding("%", "reload", "Reload message list", show=False), Binding("1", "focus_1", "Focus Accounts Panel"), Binding("2", "focus_2", "Focus Folders Panel"), Binding("3", "focus_3", "Focus Envelopes Panel"), Binding("4", "focus_4", "Focus Main Content"), Binding("w", "toggle_main_content", "Toggle Message View Window"), Binding("m", "toggle_mode", "Toggle Markdown/HTML"), ] BINDINGS.extend( [ Binding("pagedown", "scroll_page_down", "Scroll page down"), Binding("b", "scroll_page_up", "Scroll page up"), Binding("s", "toggle_sort_order", "Toggle Sort Order"), Binding("x", "toggle_selection", "Toggle selection", show=False), Binding("space", "toggle_selection", "Toggle selection"), Binding("escape", "clear_selection", "Clear selection"), Binding("/", "search", "Search"), Binding("u", "toggle_read", "Toggle read/unread"), Binding("A", "accept_invite", "Accept invite"), Binding("D", "decline_invite", "Decline invite"), Binding("T", "tentative_invite", "Tentative"), Binding("?", "show_help", "Show Help"), ] ) def compose(self) -> ComposeResult: yield SearchPanel(id="search_panel") yield Horizontal( Vertical( ListView( ListItem(Label("All emails...")), id="envelopes_list", classes="list_view", initial_index=0, ), ListView(id="accounts_list", classes="list_view"), ListView(id="folders_list", classes="list_view"), id="sidebar", ), ContentContainer(id="main_content"), id="outer-wrapper", ) yield Footer() async def on_mount(self) -> None: self.alert_timer: Timer | None = None # Timer to throttle alerts self.theme = get_theme_name() self.title = "LUK Mail" self.query_one("#main_content").border_title = self.status_title sort_indicator = "↑" if self.sort_order_ascending else "↓" self.query_one("#envelopes_list").border_title = f"1️⃣ Emails {sort_indicator}" self.query_one("#accounts_list").border_title = "2️⃣ Accounts" self.query_one("#folders_list").border_title = "3️⃣ Folders" # Start IPC listener for refresh notifications from sync daemon self._ipc_listener = IPCListener("mail", self._on_ipc_message) self._ipc_listener.start() self.fetch_accounts() self.fetch_folders() worker = self.fetch_envelopes() await worker.wait() self.query_one("#envelopes_list").focus() self.action_oldest() def _on_ipc_message(self, message: IPCMessage) -> None: """Handle IPC messages from sync daemon.""" if message.event == "refresh": # Schedule a reload on the main thread self.call_from_thread(self.fetch_envelopes) def compute_status_title(self): metadata = self.message_store.get_metadata(self.current_message_id) message_date = metadata["date"] if metadata else "N/A" return f"✉️ Message ID: {self.current_message_id} | Date: {message_date}" def watch_status_title(self, old_status_title: str, new_status_title: str) -> None: self.query_one(ContentContainer).border_title = new_status_title def watch_sort_order_ascending(self, old_value: bool, new_value: bool) -> None: """Update the border title of the envelopes list when the sort order changes.""" sort_indicator = "↑" if new_value else "↓" self.query_one("#envelopes_list").border_title = f"1️⃣ Emails {sort_indicator}" def watch_current_message_index(self, old_index: int, new_index: int) -> None: if new_index < 0: new_index = 0 self.current_message_index = new_index if new_index > self.total_messages: new_index = self.total_messages self.current_message_index = new_index self._update_list_view_subtitle() self.query_one("#envelopes_list", ListView).index = new_index def watch_selected_messages( self, old_messages: set[int], new_messages: set[int] ) -> None: self._update_list_view_subtitle() def _update_list_view_subtitle(self) -> None: subtitle = f"[b]{self.current_message_index}[/b]/{self.total_messages}" if self.selected_messages: subtitle = f"(✓{len(self.selected_messages)}) {subtitle}" self.query_one("#envelopes_list").border_subtitle = subtitle def watch_total_messages(self, old_total: int, new_total: int) -> None: """Called when the total_messages reactive attribute changes.""" self._update_list_view_subtitle() def watch_reload_needed( self, old_reload_needed: bool, new_reload_needed: bool ) -> None: logging.info(f"Reload needed: {new_reload_needed}") if not old_reload_needed and new_reload_needed: self.fetch_envelopes() def watch_current_message_id( self, old_message_id: int, new_message_id: int ) -> None: """Called when the current message ID changes.""" logging.info( f"Current message ID changed from {old_message_id} to {new_message_id}" ) if new_message_id == old_message_id: return # If the main content view is not visible, don't load the message if not self.main_content_visible: return # Cancel any existing message loading worker if self.msg_worker: self.msg_worker.cancel() # Start a new worker to load the message content self.msg_worker = self.load_message_content(new_message_id) @work(exclusive=True) async def load_message_content(self, message_id: int) -> None: """Worker to load message content asynchronously.""" content_container = self.query_one(ContentContainer) folder = self.folder if self.folder else None account = self.current_account if self.current_account else None # Get envelope data for notification compression metadata = self.message_store.get_metadata(message_id) envelope = None if metadata: index = metadata.get("index", 0) # Check bounds before accessing envelopes list if 0 <= index < len(self.message_store.envelopes): envelope = self.message_store.envelopes[index] content_container.display_content( message_id, folder=folder, account=account, envelope=envelope ) if metadata: message_date = metadata["date"] if self.current_message_index != metadata["index"]: self.current_message_index = metadata["index"] list_view = self.query_one("#envelopes_list", ListView) if list_view.index != metadata["index"]: list_view.index = metadata["index"] # Mark message as read await self._mark_message_as_read(message_id, metadata["index"]) else: logging.warning(f"Message ID {message_id} not found in metadata.") async def _mark_message_as_read(self, message_id: int, index: int) -> None: """Mark a message as read and update the UI.""" # Skip if message_id is invalid or index is out of bounds if message_id <= 0: return if index < 0 or index >= len(self.message_store.envelopes): return # Check if already read envelope_data = self.message_store.envelopes[index] if envelope_data and envelope_data.get("type") != "header": flags = envelope_data.get("flags", []) if "Seen" in flags: return # Already read # Mark as read via himalaya with current folder/account folder = self.folder if self.folder else None account = self.current_account if self.current_account else None _, success = await himalaya_client.mark_as_read( message_id, folder=folder, account=account ) if success: # Update the envelope flags in the store if envelope_data: if "flags" not in envelope_data: envelope_data["flags"] = [] if "Seen" not in envelope_data["flags"]: envelope_data["flags"].append("Seen") # Update the visual state of the list item try: list_view = self.query_one("#envelopes_list", ListView) list_item = list_view.children[index] envelope_widget = list_item.query_one(EnvelopeListItem) envelope_widget.is_read = True envelope_widget.remove_class("unread") except Exception: pass # Widget may not exist def on_list_view_selected(self, event: ListView.Selected) -> None: """Called when an item in the list view is selected.""" if event.list_view.index is None: return # Handle folder selection if event.list_view.id == "folders_list": self._handle_folder_selected(event) return # Handle account selection if event.list_view.id == "accounts_list": self._handle_account_selected(event) return # Only handle selection from the envelopes list if event.list_view.id != "envelopes_list": return selected_index = event.list_view.index # Check bounds before accessing if selected_index < 0 or selected_index >= len(self.message_store.envelopes): return current_item = self.message_store.envelopes[selected_index] if current_item is None or current_item.get("type") == "header": return message_id = int(current_item["id"]) self.current_message_id = message_id self.current_message_index = selected_index # Focus the main content panel after selecting a message self.action_focus_4() def _handle_folder_selected(self, event: ListView.Selected) -> None: """Handle folder selection from the folders list.""" try: list_item = event.item label = list_item.query_one(Label) folder_text = str(label.renderable).strip() # Extract folder name (remove count suffix like " [dim](10)[/dim]") # The format is "FolderName [dim](count)[/dim]" or just "FolderName" import re folder_name = re.sub(r"\s*\[dim\]\(\d+\)\[/dim\]$", "", folder_text) if folder_name and folder_name != self.folder: self.folder = folder_name self.show_status(f"Switching to folder: {folder_name}") # Clear current state and reload self.current_message_id = 0 self.current_message_index = 0 self.selected_messages.clear() self.search_query = "" # Clear search when switching folders # Directly fetch instead of relying on reload_needed watcher self.fetch_envelopes() except Exception as e: logging.error(f"Error selecting folder: {e}") def _handle_account_selected(self, event: ListView.Selected) -> None: """Handle account selection from the accounts list.""" try: list_item = event.item label = list_item.query_one(Label) account_name = str(label.renderable).strip() if account_name and account_name != self.current_account: self.current_account = account_name self.folder = "INBOX" # Reset to INBOX when switching accounts self.show_status(f"Switching to account: {account_name}") # Clear current state and reload self.current_message_id = 0 self.current_message_index = 0 self.selected_messages.clear() self.search_query = "" # Clear search when switching accounts # Refresh folders for new account self.fetch_folders() # Directly fetch instead of relying on reload_needed watcher self.fetch_envelopes() except Exception as e: logging.error(f"Error selecting account: {e}") def on_list_view_highlighted(self, event: ListView.Highlighted) -> None: """Called when an item in the list view is highlighted (e.g., via arrow keys).""" if event.list_view.index is None: return # Only handle highlights from the envelopes list if event.list_view.id != "envelopes_list": return highlighted_index = event.list_view.index # Check bounds before accessing if highlighted_index < 0 or highlighted_index >= len( self.message_store.envelopes ): return current_item = self.message_store.envelopes[highlighted_index] if current_item is None or current_item.get("type") == "header": return # message_id = int(current_item["id"]) # self.current_message_id = message_id self.highlighted_message_index = highlighted_index def on_key(self, event) -> None: """Handle key events to intercept space on the envelopes list.""" # Intercept space key when envelopes list is focused to prevent default select behavior if event.key == "space": focused = self.focused if focused and focused.id == "envelopes_list": event.prevent_default() event.stop() self.action_toggle_selection() @work(exclusive=False) async def fetch_envelopes(self) -> None: msglist = self.query_one("#envelopes_list", ListView) try: msglist.loading = True # Use the Himalaya client to fetch envelopes with current folder/account folder = self.folder if self.folder else None account = self.current_account if self.current_account else None envelopes, success = await himalaya_client.list_envelopes( folder=folder, account=account ) if success: self.reload_needed = False # Ensure envelopes is a list, even if it's None from the client envelopes_list = envelopes if envelopes is not None else [] self.message_store.load(envelopes_list, self.sort_order_ascending) self.total_messages = self.message_store.total_messages # Use the centralized refresh method to update the ListView self._populate_list_view() # Restore the current index msglist.index = self.current_message_index else: self.show_status("Failed to fetch envelopes.", "error") except Exception as e: self.show_status(f"Error fetching message list: {e}", "error") finally: msglist.loading = False @work(exclusive=False) async def fetch_accounts(self) -> None: accounts_list = self.query_one("#accounts_list", ListView) try: accounts_list.loading = True # Use the Himalaya client to fetch accounts accounts, success = await himalaya_client.list_accounts() if success and accounts: for account in accounts: item = ListItem( Label( str(account["name"]).strip(), classes="account_name", markup=False, ) ) accounts_list.append(item) else: self.show_status("Failed to fetch accounts.", "error") except Exception as e: self.show_status(f"Error fetching account list: {e}", "error") finally: accounts_list.loading = False @work(exclusive=False) async def fetch_folders(self) -> None: folders_list = self.query_one("#folders_list", ListView) folders_list.clear() # Store folder names for count updates folder_names = ["INBOX"] # Use the Himalaya client to fetch folders for current account account = self.current_account if self.current_account else None folders_list.append( ListItem(Label("INBOX", classes="folder_name", markup=True)) ) try: folders_list.loading = True folders, success = await himalaya_client.list_folders(account=account) if success and folders: for folder in folders: folder_name = str(folder["name"]).strip() # Skip INBOX since we already added it if folder_name.upper() == "INBOX": continue folder_names.append(folder_name) item = ListItem( Label( folder_name, classes="folder_name", markup=True, ) ) folders_list.append(item) else: self.show_status("Failed to fetch folders.", "error") except Exception as e: self.show_status(f"Error fetching folder list: {e}", "error") finally: folders_list.loading = False # Fetch counts in background and update labels self._update_folder_counts(folder_names, account) @work(exclusive=False) async def _update_folder_counts( self, folder_names: List[str], account: str | None ) -> None: """Fetch and display message counts for folders.""" import asyncio folders_list = self.query_one("#folders_list", ListView) async def get_count_for_folder(folder_name: str, index: int): count, success = await himalaya_client.get_folder_count( folder_name, account ) if success and index < len(folders_list.children): try: list_item = folders_list.children[index] label = list_item.query_one(Label) label.update(f"{folder_name} [dim]({count})[/dim]") except Exception: pass # Widget may have been removed # Fetch counts in parallel tasks = [get_count_for_folder(name, i) for i, name in enumerate(folder_names)] await asyncio.gather(*tasks) def _populate_list_view(self) -> None: """Populate the ListView with new items using the new EnvelopeListItem widget.""" envelopes_list = self.query_one("#envelopes_list", ListView) envelopes_list.clear() config = get_config() for item in self.message_store.envelopes: if item and item.get("type") == "header": # Use the new GroupHeader widget for date groupings envelopes_list.append(ListItem(GroupHeader(label=item["label"]))) elif item: # Use the new EnvelopeListItem widget message_id = int(item.get("id", 0)) is_selected = message_id in self.selected_messages envelope_widget = EnvelopeListItem( envelope=item, config=config.envelope_display, is_selected=is_selected, ) envelopes_list.append(ListItem(envelope_widget)) self.refresh_list_view_items() # Initial refresh of item states def refresh_list_view_items(self) -> None: """Update the visual state of existing ListItems without clearing the list.""" envelopes_list = self.query_one("#envelopes_list", ListView) for i, list_item in enumerate(envelopes_list.children): if isinstance(list_item, ListItem): # Bounds check - ListView and message_store may be out of sync during transitions if i >= len(self.message_store.envelopes): break item_data = self.message_store.envelopes[i] if item_data and item_data.get("type") != "header": message_id = int(item_data["id"]) is_selected = message_id in self.selected_messages or False list_item.set_class(is_selected, "selection") # Try to update the EnvelopeListItem's selection state try: envelope_widget = list_item.query_one(EnvelopeListItem) envelope_widget.set_selected(is_selected) except Exception: pass # Widget may not exist or be of old type def show_message(self, message_id: int, new_index=None) -> None: if new_index: self.current_message_index = new_index self.action_focus_4() self.current_message_id = message_id def show_status(self, message: str, severity: str = "information") -> None: """Display a status message using the built-in notify function.""" self.notify( message, title="Status", severity=severity, timeout=2.6, markup=True ) async def action_toggle_sort_order(self) -> None: """Toggle the sort order of the envelope list.""" self.sort_order_ascending = not self.sort_order_ascending worker = self.fetch_envelopes() await worker.wait() if self.sort_order_ascending: self.action_oldest() else: self.action_newest() async def action_toggle_mode(self) -> None: """Toggle of content mode between plaintext and markdown.""" content_container = self.query_one(ContentContainer) await content_container.action_toggle_mode() async def action_show_help(self) -> None: """Show help screen with keyboard shortcuts.""" help_screen = HelpScreen(list(self.BINDINGS)) self.push_screen(help_screen) def action_next(self) -> None: if not self.current_message_index >= 0: return next_id, next_idx = self.message_store.find_next_valid_id( self.current_message_index ) if next_id is not None and next_idx is not None: self.current_message_id = next_id self.current_message_index = next_idx def action_previous(self) -> None: if not self.current_message_index >= 0: return prev_id, prev_idx = self.message_store.find_prev_valid_id( self.current_message_index ) if prev_id is not None and prev_idx is not None: self.current_message_id = prev_id self.current_message_index = prev_idx async def action_delete(self) -> None: """Delete the current or selected messages.""" if self.selected_messages: # --- Multi-message delete: show confirmation --- count = len(self.selected_messages) def do_delete(confirmed: bool) -> None: if confirmed: self.run_worker(self._delete_selected_messages()) self.push_screen( ConfirmDialog( title="Delete Messages", message=f"Delete {count} selected message{'s' if count > 1 else ''}?", confirm_label="Delete", cancel_label="Cancel", ), do_delete, ) else: # --- Single message delete (no confirmation needed) --- worker = delete_current(self) await worker.wait() async def _delete_selected_messages(self) -> None: """Delete all selected messages.""" message_ids_to_delete = list(self.selected_messages) next_id_to_select = None if message_ids_to_delete: highest_deleted_id = max(message_ids_to_delete) metadata = self.message_store.get_metadata(highest_deleted_id) if metadata: next_id, _ = self.message_store.find_next_valid_id(metadata["index"]) if next_id is None: next_id, _ = self.message_store.find_prev_valid_id( metadata["index"] ) next_id_to_select = next_id # Delete each message with current folder/account folder = self.folder if self.folder else None account = self.current_account if self.current_account else None success_count = 0 for mid in message_ids_to_delete: message, success = await himalaya_client.delete_message( mid, folder=folder, account=account ) if success: success_count += 1 else: self.show_status(f"Failed to delete message {mid}: {message}", "error") if success_count > 0: self.show_status( f"Deleted {success_count} message{'s' if success_count > 1 else ''}" ) self.selected_messages.clear() self.refresh_list_view_items() # Refresh the envelope list worker = self.fetch_envelopes() await worker.wait() # After refresh, select the next message if next_id_to_select: new_metadata = self.message_store.get_metadata(next_id_to_select) if new_metadata: self.current_message_id = next_id_to_select else: self.action_oldest() else: self.action_oldest() async def action_archive(self) -> None: """Archive the current or selected messages and update UI consistently.""" if self.selected_messages: # --- Multi-message archive: show confirmation --- count = len(self.selected_messages) def do_archive(confirmed: bool) -> None: if confirmed: self.run_worker(self._archive_selected_messages()) self.push_screen( ConfirmDialog( title="Archive Messages", message=f"Archive {count} selected message{'s' if count > 1 else ''}?", confirm_label="Archive", cancel_label="Cancel", ), do_archive, ) else: # --- Single message archive (no confirmation needed) --- await self._archive_single_message() async def _archive_selected_messages(self) -> None: """Archive all selected messages.""" message_ids_to_archive = list(self.selected_messages) next_id_to_select = None if message_ids_to_archive: highest_archived_id = max(message_ids_to_archive) metadata = self.message_store.get_metadata(highest_archived_id) if metadata: next_id, _ = self.message_store.find_next_valid_id(metadata["index"]) if next_id is None: next_id, _ = self.message_store.find_prev_valid_id( metadata["index"] ) next_id_to_select = next_id # Archive messages with current folder/account folder = self.folder if self.folder else None account = self.current_account if self.current_account else None message, success = await himalaya_client.archive_messages( [str(mid) for mid in message_ids_to_archive], folder=folder, account=account, ) if success: self.show_status( message or f"Archived {len(message_ids_to_archive)} messages" ) self.selected_messages.clear() self.refresh_list_view_items() else: self.show_status(f"Failed to archive messages: {message}", "error") return # Refresh the envelope list worker = self.fetch_envelopes() await worker.wait() # After refresh, select the next message if next_id_to_select: new_metadata = self.message_store.get_metadata(next_id_to_select) if new_metadata: self.current_message_id = next_id_to_select else: self.action_oldest() else: self.action_oldest() async def _archive_single_message(self) -> None: """Archive the current single message.""" if not self.current_message_id: self.show_status("No message selected to archive.", "error") return current_id = self.current_message_id current_idx = self.current_message_index next_id, _ = self.message_store.find_next_valid_id(current_idx) if next_id is None: next_id, _ = self.message_store.find_prev_valid_id(current_idx) next_id_to_select = next_id # Archive with current folder/account folder = self.folder if self.folder else None account = self.current_account if self.current_account else None message, success = await himalaya_client.archive_messages( [str(current_id)], folder=folder, account=account ) if success: self.show_status(message or "Archived") else: self.show_status( f"Failed to archive message {current_id}: {message}", "error" ) return # Refresh the envelope list worker = self.fetch_envelopes() await worker.wait() # After refresh, select the next message if next_id_to_select: new_metadata = self.message_store.get_metadata(next_id_to_select) if new_metadata: self.current_message_id = next_id_to_select else: self.action_oldest() else: self.action_oldest() def action_open(self) -> None: action_open(self) def action_create_task(self) -> None: action_create_task(self) def action_accept_invite(self) -> None: """Accept the calendar invite from the current email.""" action_accept_invite(self) def action_decline_invite(self) -> None: """Decline the calendar invite from the current email.""" action_decline_invite(self) def action_tentative_invite(self) -> None: """Tentatively accept the calendar invite from the current email.""" action_tentative_invite(self) def action_open_links(self) -> None: """Open the link panel showing links from the current message.""" content_container = self.query_one(ContentContainer) links = content_container.get_links() self.push_screen(LinkPanel(links)) def action_scroll_down(self) -> None: """Scroll the main content down.""" self.query_one("#content_scroll").scroll_down() def action_scroll_up(self) -> None: """Scroll the main content up.""" self.query_one("#content_scroll").scroll_up() def action_scroll_page_down(self) -> None: """Scroll the main content down by a page.""" self.query_one("#content_scroll").scroll_page_down() def action_scroll_page_up(self) -> None: """Scroll the main content up by a page.""" self.query_one("#content_scroll").scroll_page_up() def action_toggle_header(self) -> None: """Toggle between compressed and full envelope headers.""" content_container = self.query_one("#main_content", ContentContainer) if hasattr(content_container, "header") and content_container.header: content_container.header.toggle_full_headers() # Provide visual feedback if content_container.header.show_full_headers: self.notify("Showing full headers", timeout=1) else: self.notify("Showing compressed headers", timeout=1) def action_toggle_main_content(self) -> None: """Toggle the visibility of the main content pane.""" self.main_content_visible = not self.main_content_visible def watch_main_content_visible(self, visible: bool) -> None: """Called when main_content_visible changes.""" main_content = self.query_one("#main_content") accounts_list = self.query_one("#accounts_list") folders_list = self.query_one("#folders_list") main_content.display = visible accounts_list.display = visible folders_list.display = visible self.query_one("#envelopes_list").focus() def action_quit(self) -> None: # Stop IPC listener before exiting if hasattr(self, "_ipc_listener"): self._ipc_listener.stop() self.exit() def action_toggle_selection(self) -> None: """Toggle selection for the current message.""" current_item_data = self.message_store.envelopes[self.highlighted_message_index] if current_item_data and current_item_data.get("type") != "header": message_id = int(current_item_data["id"]) envelopes_list = self.query_one("#envelopes_list", ListView) current_list_item = envelopes_list.children[self.highlighted_message_index] # Toggle selection state if message_id in self.selected_messages: self.selected_messages.remove(message_id) is_selected = False else: self.selected_messages.add(message_id) is_selected = True # Update the EnvelopeListItem widget try: envelope_widget = current_list_item.query_one(EnvelopeListItem) envelope_widget.set_selected(is_selected) except Exception: # Fallback for old-style widgets try: checkbox_label = current_list_item.query_one(".checkbox", Label) if is_selected: checkbox_label.add_class("x-list") checkbox_label.update("\uf4a7") else: checkbox_label.remove_class("x-list") checkbox_label.update("\ue640") except Exception: pass self._update_list_view_subtitle() def action_clear_selection(self) -> None: """Clear all selected messages or focus search input if in search mode.""" # If in search mode, focus the search input instead of exiting if self.search_mode: search_panel = self.query_one("#search_panel", SearchPanel) search_panel.focus_input() return if self.selected_messages: self.selected_messages.clear() self.refresh_list_view_items() # Refresh all items to uncheck checkboxes self._update_list_view_subtitle() async def action_toggle_read(self) -> None: """Toggle read/unread status for the current or selected messages.""" folder = self.folder if self.folder else None account = self.current_account if self.current_account else None if self.selected_messages: # Toggle multiple selected messages for message_id in self.selected_messages: await self._toggle_message_read_status(message_id, folder, account) self.show_status( f"Toggled read status for {len(self.selected_messages)} messages" ) self.selected_messages.clear() else: # Toggle current message if self.current_message_id: await self._toggle_message_read_status( self.current_message_id, folder, account ) # Refresh the list to show updated read status await self.fetch_envelopes().wait() async def _toggle_message_read_status( self, message_id: int, folder: str | None, account: str | None ) -> None: """Toggle read status for a single message.""" # Find the message in the store to check current status metadata = self.message_store.get_metadata(message_id) if not metadata: return index = metadata.get("index", -1) if index < 0 or index >= len(self.message_store.envelopes): return envelope_data = self.message_store.envelopes[index] if not envelope_data or envelope_data.get("type") == "header": return flags = envelope_data.get("flags", []) is_read = "Seen" in flags if is_read: # Mark as unread result, success = await himalaya_client.mark_as_unread( message_id, folder=folder, account=account ) if success: if "Seen" in envelope_data.get("flags", []): envelope_data["flags"].remove("Seen") self.show_status(f"Marked message {message_id} as unread") self._update_envelope_read_state(index, is_read=False) else: # Mark as read result, success = await himalaya_client.mark_as_read( message_id, folder=folder, account=account ) if success: if "flags" not in envelope_data: envelope_data["flags"] = [] if "Seen" not in envelope_data["flags"]: envelope_data["flags"].append("Seen") self.show_status(f"Marked message {message_id} as read") self._update_envelope_read_state(index, is_read=True) def _update_envelope_read_state(self, index: int, is_read: bool) -> None: """Update the visual state of an envelope in the list.""" try: list_view = self.query_one("#envelopes_list", ListView) list_item = list_view.children[index] envelope_widget = list_item.query_one(EnvelopeListItem) envelope_widget.is_read = is_read if is_read: envelope_widget.remove_class("unread") else: envelope_widget.add_class("unread") except Exception: pass # Widget may not exist def action_oldest(self) -> None: self.fetch_envelopes() if self.reload_needed else None self.show_message(self.message_store.get_oldest_id()) def action_newest(self) -> None: self.fetch_envelopes() if self.reload_needed else None self.show_message(self.message_store.get_newest_id()) def action_reload(self) -> None: """Reload the message list.""" self.fetch_envelopes() self.show_status("Reloading messages...") def action_search(self) -> None: """Open the search panel.""" search_panel = self.query_one("#search_panel", SearchPanel) if not search_panel.is_visible: # Cache current envelopes before searching self._cached_envelopes = list(self.message_store.envelopes) self._cached_metadata = dict(self.message_store.metadata_by_id) self.search_mode = True search_panel.show(self.search_query) def on_search_panel_search_requested( self, event: SearchPanel.SearchRequested ) -> None: """Handle live search request from search panel.""" self._perform_search(event.query, focus_results=False) def on_search_panel_search_confirmed( self, event: SearchPanel.SearchConfirmed ) -> None: """Handle confirmed search (Enter key) - search and focus results.""" self._perform_search(event.query, focus_results=True) def on_search_panel_search_cancelled( self, event: SearchPanel.SearchCancelled ) -> None: """Handle search cancellation - restore previous envelope list.""" self.search_mode = False self.search_query = "" # Restore cached envelopes and metadata if self._cached_envelopes: self.message_store.envelopes = self._cached_envelopes self._cached_envelopes = [] if self._cached_metadata: self.message_store.metadata_by_id = self._cached_metadata self._cached_metadata = {} self._populate_list_view() # Restore envelope list title sort_indicator = "↑" if self.sort_order_ascending else "↓" self.query_one("#envelopes_list").border_title = f"1️⃣ Emails {sort_indicator}" self._update_list_view_subtitle() self.query_one("#envelopes_list").focus() @work(exclusive=True) async def _perform_search(self, query: str, focus_results: bool = False) -> None: """Perform search using Himalaya and display results in envelope list.""" search_panel = self.query_one("#search_panel", SearchPanel) search_panel.update_status(-1, searching=True) folder = self.folder if self.folder else None account = self.current_account if self.current_account else None results, success = await himalaya_client.search_envelopes( query, folder=folder, account=account ) if not success: search_panel.update_status(0, searching=False) self.show_status("Search failed", "error") return # Update search panel status search_panel.update_status(len(results), searching=False) if not results: # Clear the envelope list and show "no results" self._display_search_results([], query) return self.search_query = query self.search_mode = True self._display_search_results(results, query) if focus_results: # Focus the main content and select first result if results: first_id = int(results[0].get("id", 0)) if first_id: self.current_message_id = first_id self.action_focus_4() def _display_search_results( self, results: List[Dict[str, Any]], query: str ) -> None: """Display search results in the envelope list with a header.""" envelopes_list = self.query_one("#envelopes_list", ListView) envelopes_list.clear() config = get_config() # Build search header label if results: header_label = f"Search: '{query}' ({len(results)} result{'s' if len(results) != 1 else ''})" else: header_label = f"Search: '{query}' - No results found" if not results: # Clear the message viewer when no results envelopes_list.append(ListItem(GroupHeader(label=header_label))) content_container = self.query_one(ContentContainer) content_container.clear_content() self.message_store.envelopes = [] self.message_store.metadata_by_id = {} self.total_messages = 0 self.current_message_id = 0 return # Create a temporary message store for search results # We need to include the search header in the envelopes so indices match search_store = MessageStore() # Manually build envelopes list with search header first # so that ListView indices match message_store.envelopes indices grouped_envelopes = [{"type": "header", "label": header_label}] # Sort results by date sorted_results = sorted( results, key=lambda x: x.get("date", ""), reverse=not self.sort_order_ascending, ) # Group by month and build metadata months: Dict[str, bool] = {} for envelope in sorted_results: if "id" not in envelope: continue # Extract date and determine month group date_str = envelope.get("date", "") try: date = datetime.fromisoformat(date_str.replace("Z", "+00:00")) month_key = date.strftime("%B %Y") except (ValueError, TypeError): month_key = "Unknown Date" # Add month header if this is a new month if month_key not in months: months[month_key] = True grouped_envelopes.append({"type": "header", "label": month_key}) # Add the envelope grouped_envelopes.append(envelope) # Store metadata for quick access (index matches grouped_envelopes) envelope_id = int(envelope["id"]) search_store.metadata_by_id[envelope_id] = { "id": envelope_id, "subject": envelope.get("subject", ""), "from": envelope.get("from", {}), "to": envelope.get("to", {}), "cc": envelope.get("cc", {}), "date": date_str, "index": len(grouped_envelopes) - 1, } search_store.envelopes = grouped_envelopes search_store.total_messages = len(search_store.metadata_by_id) # Store for navigation (replace main store) self.message_store.envelopes = search_store.envelopes self.message_store.metadata_by_id = search_store.metadata_by_id self.total_messages = len(results) # Build ListView to match envelopes list exactly for item in self.message_store.envelopes: if item and item.get("type") == "header": envelopes_list.append(ListItem(GroupHeader(label=item["label"]))) elif item: message_id = int(item.get("id", 0)) is_selected = message_id in self.selected_messages envelope_widget = EnvelopeListItem( envelope=item, config=config.envelope_display, is_selected=is_selected, ) envelopes_list.append(ListItem(envelope_widget)) # Update border title to show search mode sort_indicator = "↑" if self.sort_order_ascending else "↓" self.query_one( "#envelopes_list" ).border_title = f"Search Results {sort_indicator}" # Select first result if available if len(envelopes_list.children) > 1: envelopes_list.index = 1 # Skip header def action_focus_1(self) -> None: self.query_one("#envelopes_list").focus() def action_focus_2(self) -> None: self.query_one("#accounts_list").focus() def action_focus_3(self) -> None: self.query_one("#folders_list").focus() def action_focus_4(self) -> None: self.query_one("#main_content").focus() if __name__ == "__main__": app = EmailViewerApp() app.run() def launch_email_viewer(): app = EmailViewerApp() app.run()