import re import sys import os from datetime import datetime import asyncio import logging from typing import Iterable # Add the parent directory to the system path to resolve relative imports sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from textual import work from textual.worker import Worker from textual.app import App, ComposeResult, SystemCommand, RenderResult from textual.logging import TextualHandler from textual.screen import Screen from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem from textual.reactive import reactive, Reactive from textual.binding import Binding from textual.timer import Timer from textual.containers import ScrollableContainer, Vertical, Horizontal from actions.archive import archive_current from actions.delete import delete_current from actions.open import action_open from actions.task import action_create_task from widgets.EnvelopeHeader import EnvelopeHeader from widgets.ContentContainer import ContentContainer from maildir_gtd.utils import group_envelopes_by_date logging.basicConfig( level="NOTSET", handlers=[TextualHandler()], ) class StatusTitle(Static): total_messages: Reactive[int] = reactive(0) current_message_index: Reactive[int] = reactive(0) current_message_id: Reactive[int] = reactive(1) folder: Reactive[str] = reactive("INBOX") def render(self) -> RenderResult: return f"{self.folder} | ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}" class EmailViewerApp(App): """A simple email viewer app using the Himalaya CLI.""" CSS_PATH = "email_viewer.tcss" title = "Maildir GTD Reader" current_message_id: Reactive[int] = reactive(0) current_message_index: Reactive[int] = reactive(0) folder = reactive("INBOX") header_expanded = reactive(False) reload_needed = reactive(True) all_envelopes = reactive([]) # Keep as list for compatibility with ListView envelope_map = {} # Add a dictionary to map IDs to envelope data envelope_index_map = {} # Map indices in the list to envelope IDs oldest_id: Reactive[int] = reactive(0) newest_id: Reactive[int] = reactive(0) msg_worker: Worker | None = None message_metadata: dict[int, dict] = {} message_body_cache: dict[int, str] = {} total_messages: Reactive[int] = reactive(0) status_title = reactive("Message View") sort_order_ascending: Reactive[bool] = reactive(True) valid_envelopes = reactive([]) def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]: yield from super().get_system_commands(screen) yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next) yield SystemCommand( "Previous Message", "Navigate to Previous ID", self.action_previous ) yield SystemCommand( "Delete Message", "Delete the current message", self.action_delete ) yield SystemCommand( "Archive Message", "Archive the current message", self.action_archive ) yield SystemCommand( "Open Message", "Open a specific message by ID", self.action_open ) yield SystemCommand( "Create Task", "Create a task using the task CLI", self.action_create_task ) yield SystemCommand( "Oldest Message", "Show the oldest message", self.action_oldest ) yield SystemCommand( "Newest Message", "Show the newest message", self.action_newest ) yield SystemCommand("Reload", "Reload the message list", self.fetch_envelopes) BINDINGS = [ Binding("j", "next", "Next message"), Binding("k", "previous", "Previous message"), Binding("#", "delete", "Delete message"), Binding("e", "archive", "Archive message"), Binding("o", "open", "Open message", show=False), Binding("q", "quit", "Quit application"), Binding("h", "toggle_header", "Toggle Envelope Header"), Binding("t", "create_task", "Create Task"), Binding("%", "reload", "Reload message list"), Binding("1", "focus_1", "Focus Accounts Panel"), Binding("2", "focus_2", "Focus Folders Panel"), Binding("3", "focus_3", "Focus Envelopes Panel"), Binding("m", "toggle_mode", "Toggle Content Mode"), ] BINDINGS.extend( [ Binding("space", "scroll_page_down", "Scroll page down"), Binding("b", "scroll_page_up", "Scroll page up"), Binding("s", "toggle_sort_order", "Toggle Sort Order"), ] ) def compose(self) -> ComposeResult: yield Horizontal( Vertical( ListView( ListItem(Label("All emails...")), id="envelopes_list", classes="list_view", initial_index=0, ), ListView(id="accounts_list", classes="list_view"), ListView(id="folders_list", classes="list_view"), id="sidebar", ), ContentContainer(id="main_content"), id="outer-wrapper", ) yield Footer() async def on_mount(self) -> None: self.alert_timer: Timer | None = None # Timer to throttle alerts self.theme = "monokai" self.title = "MaildirGTD" self.query_one("#main_content").border_title = self.status_title sort_indicator = "\u2191" if self.sort_order_ascending else "\u2193" self.query_one("#envelopes_list").border_title = f"\[1] Emails {sort_indicator}" self.query_one("#accounts_list").border_title = "\[2] Accounts" self.query_one("#folders_list").border_title = "\[3] Folders" # self.query_one(ListView).data_bind(index=EmailViewerApp.current_message_index) # self.watch(self.query_one(StatusTitle), "current_message_id", update_progress) # Fetch the ID of the most recent message using the Himalaya CLI self.fetch_accounts() self.fetch_folders() worker = self.fetch_envelopes() await worker.wait() self.query_one("#envelopes_list").focus() self.action_oldest() def compute_status_title(self) -> None: return f"✉️ Message ID: {self.current_message_id} " def compute_valid_envelopes(self) -> None: return (envelope for envelope in self.all_envelopes if envelope.get("id")) def watch_status_title(self, old_status_title: str, new_status_title: str) -> None: self.query_one(ContentContainer).border_title = new_status_title def watch_sort_order_ascending(self, old_value: bool, new_value: bool) -> None: """Update the border title of the envelopes list when the sort order changes.""" sort_indicator = "\u2191" if new_value else "\u2193" self.query_one("#envelopes_list").border_title = f"\[1] Emails {sort_indicator}" def watch_current_message_index(self, old_index: int, new_index: int) -> None: if new_index < 0: new_index = 0 self.current_message_index = new_index if new_index > self.total_messages: new_index = self.total_messages self.current_message_index = new_index self.query_one( "#envelopes_list" ).border_subtitle = f"[b]{new_index}[/b]/{self.total_messages}" self.query_one("#envelopes_list").index = new_index def compute_newest_id(self) -> None: if not self.all_envelopes: return 0 items = sorted( self.valid_envelopes, key=lambda x: x["date"], reverse=not self.sort_order_ascending, ) return items[-1]["id"] if items else 0 def compute_oldest_id(self) -> None: if not self.valid_envelopes: return 0 items = sorted( self.valid_envelopes, key=lambda x: x["date"], reverse=not self.sort_order_ascending, ) return items[0]["id"] if items else 0 def watch_reload_needed( self, old_reload_needed: bool, new_reload_needed: bool ) -> None: logging.info(f"Reload needed: {new_reload_needed}") if not old_reload_needed and new_reload_needed: self.fetch_envelopes() def watch_current_message_id( self, old_message_id: int, new_message_id: int ) -> None: """Called when the current message ID changes.""" logging.info( f"Current message ID changed from {old_message_id} to {new_message_id}" ) if new_message_id == old_message_id: return self.msg_worker.cancel() if self.msg_worker else None logging.info(f"new_message_id: {new_message_id}, type: {type(new_message_id)}") logging.info(f"message_metadata keys: {list(self.message_metadata.keys())}") content_container = self.query_one(ContentContainer) content_container.display_content(new_message_id) if new_message_id in self.message_metadata: metadata = self.message_metadata[new_message_id] message_date = re.sub(r"[\+\-]\d\d:\d\d", "", metadata["date"]) message_date = datetime.strptime(message_date, "%Y-%m-%d %H:%M").strftime( "%a %b %d %H:%M" ) # Only update the current_message_index if it's different from the index in the ListView # This prevents the sidebar selection from getting out of sync with the displayed content if self.current_message_index != metadata["index"]: self.current_message_index = metadata["index"] content_container.update_header( subject=metadata.get("subject", "").strip(), from_=metadata["from"].get("addr", ""), to=metadata["to"].get("addr", ""), date=message_date, cc=metadata["cc"].get("addr", "") if "cc" in metadata else "", ) # Make sure the ListView index matches the current message index list_view = self.query_one("#envelopes_list") if list_view.index != metadata["index"]: list_view.index = metadata["index"] else: logging.warning(f"Message ID {new_message_id} not found in metadata.") def on_list_view_selected(self, event: ListView.Selected) -> None: """Called when an item in the list view is selected.""" current_item = self.all_envelopes[event.list_view.index] # Skip if it's a header or None if current_item is None or current_item.get("type") == "header": return # Get the message ID and update current index in a consistent way message_id = int(current_item["id"]) self.current_message_id = message_id # Update the index directly based on the ListView selection # This ensures the sidebar selection and displayed content stay in sync self.current_message_index = event.list_view.index @work(exclusive=False) async def fetch_envelopes(self) -> None: msglist = self.query_one("#envelopes_list") try: msglist.loading = True process = await asyncio.create_subprocess_shell( "himalaya envelope list -o json -s 9999", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() logging.info(f"stdout: {stdout.decode()[0:50]}") if process.returncode == 0: import json envelopes = json.loads(stdout.decode()) if envelopes: self.reload_needed = False self.total_messages = len(envelopes) msglist.clear() envelopes = sorted( envelopes, key=lambda x: x["date"], reverse=not self.sort_order_ascending, ) grouped_envelopes = group_envelopes_by_date(envelopes) self.all_envelopes = grouped_envelopes # Update our dictionary mappings self.envelope_map = {int(envelope["id"]): envelope for envelope in grouped_envelopes if "id" in envelope} self.envelope_index_map = {index: int(envelope["id"]) for index, envelope in enumerate(grouped_envelopes) if "id" in envelope} # Store metadata with correct indices self.message_metadata = { int(envelope["id"]): { "subject": envelope.get("subject", ""), "from": envelope.get("from", {}), "to": envelope.get("to", {}), "date": envelope.get("date", ""), "cc": envelope.get("cc", {}), "index": index, # Store the position index } for index, envelope in enumerate(self.all_envelopes) if "id" in envelope } # Add items to the ListView for item in grouped_envelopes: if item.get("type") == "header": msglist.append( ListItem( Label( item["label"], classes="group_header", markup=False, ) ) ) else: msglist.append( ListItem( Label( str(item["subject"]).strip(), classes="email_subject", markup=False, ) ) ) msglist.index = self.current_message_index else: self.show_status("Failed to fetch any envelopes.", "error") except Exception as e: self.show_status(f"Error fetching message list: {e}", "error") finally: msglist.loading = False @work(exclusive=False) async def fetch_accounts(self) -> None: accounts_list = self.query_one("#accounts_list") try: accounts_list.loading = True process = await asyncio.create_subprocess_shell( "himalaya account list -o json", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() logging.info(f"stdout: {stdout.decode()[0:50]}") if process.returncode == 0: import json accounts = json.loads(stdout.decode()) if accounts: for account in accounts: item = ListItem( Label( str(account["name"]).strip(), classes="account_name", markup=False, ) ) accounts_list.append(item) except Exception as e: self.show_status(f"Error fetching account list: {e}", "error") finally: accounts_list.loading = False @work(exclusive=False) async def fetch_folders(self) -> None: folders_list = self.query_one("#folders_list") folders_list.clear() folders_list.append( ListItem(Label("INBOX", classes="folder_name", markup=False)) ) try: folders_list.loading = True process = await asyncio.create_subprocess_shell( "himalaya folder list -o json", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() logging.info(f"stdout: {stdout.decode()[0:50]}") if process.returncode == 0: import json folders = json.loads(stdout.decode()) if folders: for folder in folders: item = ListItem( Label( str(folder["name"]).strip(), classes="folder_name", markup=False, ) ) folders_list.append(item) except Exception as e: self.show_status(f"Error fetching folder list: {e}", "error") finally: folders_list.loading = False def show_message(self, message_id: int, new_index=None) -> None: if new_index: self.current_message_index = new_index self.current_message_id = message_id def show_status(self, message: str, severity: str = "information") -> None: """Display a status message using the built-in notify function.""" self.notify( message, title="Status", severity=severity, timeout=2.6, markup=True ) async def action_toggle_sort_order(self) -> None: """Toggle the sort order of the envelope list.""" self.sort_order_ascending = not self.sort_order_ascending worker = self.fetch_envelopes() await worker.wait() # Call action_newest or action_oldest based on the new sort order if self.sort_order_ascending: self.action_oldest() else: self.action_newest() async def action_toggle_mode(self) -> None: """Toggle the content mode between plaintext and markdown.""" content_container = self.query_one(ContentContainer) await content_container.toggle_mode() def action_next(self) -> None: if not self.current_message_index >= 0: return modifier = 1 idx = self.current_message_index try: if ( self.all_envelopes[idx + modifier] is None or self.all_envelopes[idx + modifier].get("type") == "header" ): idx = idx + modifier except IndexError: # If we reach the end of the list, wrap around to the beginning idx = 0 self.show_message(self.all_envelopes[idx + modifier].get("id"), idx + modifier) self.fetch_envelopes() if self.reload_needed else None def action_previous(self) -> None: if not self.current_message_index >= 0: return modifier = -1 idx = self.current_message_index try: if ( self.all_envelopes[idx + modifier] is None or self.all_envelopes[idx + modifier].get("type") == "header" ): idx = idx + modifier except IndexError: # If we reach the beginning of the list, wrap around to the end idx = len(self.all_envelopes) - 1 self.show_message(self.all_envelopes[idx + modifier].get("id"), idx + modifier) self.fetch_envelopes() if self.reload_needed else None async def action_delete(self) -> None: # Remove from all data structures self.all_envelopes = [item for item in self.all_envelopes if item and item.get("id") != self.current_message_id] self.envelope_map.pop(self.current_message_id, None) self.envelope_index_map = {index: id for index, id in self.envelope_index_map.items() if id != self.current_message_id} self.message_metadata = { k: v for k, v in self.message_metadata.items() if k != self.current_message_id } self.message_body_cache = { k: v for k, v in self.message_body_cache.items() if k != self.current_message_id } self.total_messages = len(self.message_metadata) # Perform delete operation delete_current(self) # Get next message to display try: newmsg = self.all_envelopes[self.current_message_index] # Skip headers if newmsg.get("type") == "header": if self.current_message_index + 1 < len(self.all_envelopes): newmsg = self.all_envelopes[self.current_message_index + 1] else: # If we're at the end, go to the previous message newmsg = self.all_envelopes[self.current_message_index - 1] self.current_message_index -= 1 # Show the next message if "id" in newmsg: self.show_message(newmsg["id"]) except (IndexError, KeyError): # If no more messages, just reload envelopes self.reload_needed = True self.fetch_envelopes() async def action_archive(self) -> None: # Remove from all data structures self.all_envelopes = [item for item in self.all_envelopes if item and item.get("id") != self.current_message_id] self.envelope_map.pop(self.current_message_id, None) self.envelope_index_map = {index: id for index, id in self.envelope_index_map.items() if id != self.current_message_id} self.message_metadata = { k: v for k, v in self.message_metadata.items() if k != self.current_message_id } self.message_body_cache = { k: v for k, v in self.message_body_cache.items() if k != self.current_message_id } self.total_messages = len(self.message_metadata) # Perform archive operation worker = archive_current(self) await worker.wait() # Get next message to display try: newmsg = self.all_envelopes[self.current_message_index] # Skip headers if newmsg.get("type") == "header": if self.current_message_index + 1 < len(self.all_envelopes): newmsg = self.all_envelopes[self.current_message_index + 1] else: # If we're at the end, go to the previous message newmsg = self.all_envelopes[self.current_message_index - 1] self.current_message_index -= 1 # Show the next message if "id" in newmsg: self.show_message(newmsg["id"]) except (IndexError, KeyError): # If no more messages, just reload envelopes self.reload_needed = True self.fetch_envelopes() def action_open(self) -> None: action_open(self) def action_create_task(self) -> None: action_create_task(self) def action_scroll_down(self) -> None: """Scroll the main content down.""" self.query_one("#main_content").scroll_down() def action_scroll_up(self) -> None: """Scroll the main content up.""" self.query_one("#main_content").scroll_up() def action_scroll_page_down(self) -> None: """Scroll the main content down by a page.""" self.query_one("#main_content").scroll_page_down() def action_scroll_page_up(self) -> None: """Scroll the main content up by a page.""" self.query_one("#main_content").scroll_page_up() def action_quit(self) -> None: """Quit the application.""" self.exit() def action_oldest(self) -> None: self.fetch_envelopes() if self.reload_needed else None self.show_message(self.oldest_id) def action_newest(self) -> None: self.fetch_envelopes() if self.reload_needed else None self.show_message(self.newest_id) def action_focus_1(self) -> None: self.query_one("#envelopes_list").focus() def action_focus_2(self) -> None: self.query_one("#accounts_list").focus() def action_focus_3(self) -> None: self.query_one("#folders_list").focus() if __name__ == "__main__": app = EmailViewerApp() app.run()