Files
luk/src/mail/app.py

1212 lines
48 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from .config import get_config, MailAppConfig
from .message_store import MessageStore
from .widgets.ContentContainer import ContentContainer
from .widgets.EnvelopeListItem import EnvelopeListItem, GroupHeader
from .screens.LinkPanel import LinkPanel
from .screens.ConfirmDialog import ConfirmDialog
from .screens.SearchPanel import SearchPanel
from .actions.task import action_create_task
from .actions.open import action_open
from .actions.delete import delete_current
from src.services.taskwarrior import client as taskwarrior_client
from src.services.himalaya import client as himalaya_client
from src.utils.shared_config import get_theme_name
from src.utils.ipc import IPCListener, IPCMessage
from textual.containers import Container, ScrollableContainer, Vertical, Horizontal
from textual.timer import Timer
from textual.binding import Binding
from textual.reactive import reactive, Reactive
from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem
from textual.screen import Screen
from textual.logging import TextualHandler
from textual.app import App, ComposeResult, SystemCommand, RenderResult
from textual.worker import Worker
from textual import work
import sys
import os
from datetime import UTC, datetime
import logging
from typing import Iterable, Optional, List, Dict, Any, Generator, Tuple
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import our new API modules
# Updated imports with correct relative paths
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
class StatusTitle(Static):
total_messages: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
current_message_id: Reactive[int] = reactive(1)
folder: Reactive[str] = reactive("INBOX")
def render(self) -> RenderResult:
return f"{self.folder} | ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}"
class EmailViewerApp(App):
"""A simple email viewer app using the Himalaya CLI."""
CSS_PATH = "email_viewer.tcss"
title = "Maildir GTD Reader"
current_message_id: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
highlighted_message_index: Reactive[int] = reactive(0)
folder = reactive("INBOX")
current_account: Reactive[str] = reactive("") # Empty string = default account
header_expanded = reactive(False)
reload_needed = reactive(True)
message_store = MessageStore()
oldest_id: Reactive[int] = reactive(0)
newest_id: Reactive[int] = reactive(0)
msg_worker: Worker | None = None
total_messages: Reactive[int] = reactive(0)
status_title = reactive("Message View")
sort_order_ascending: Reactive[bool] = reactive(True)
selected_messages: Reactive[set[int]] = reactive(set())
main_content_visible: Reactive[bool] = reactive(True)
search_query: Reactive[str] = reactive("") # Current search filter
search_mode: Reactive[bool] = reactive(False) # True when showing search results
_cached_envelopes: List[Dict[str, Any]] = [] # Cached envelopes before search
_cached_metadata: Dict[int, Dict[str, Any]] = {} # Cached metadata before search
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next)
yield SystemCommand(
"Previous Message", "Navigate to Previous ID", self.action_previous
)
yield SystemCommand(
"Delete Message", "Delete the current message", self.action_delete
)
yield SystemCommand(
"Archive Message", "Archive the current message", self.action_archive
)
yield SystemCommand(
"Open Message", "Open a specific message by ID", self.action_open
)
yield SystemCommand(
"Create Task", "Create a task using the task CLI", self.action_create_task
)
yield SystemCommand(
"Oldest Message", "Show the oldest message", self.action_oldest
)
yield SystemCommand(
"Newest Message", "Show the newest message", self.action_newest
)
yield SystemCommand("Reload", "Reload the message list", self.fetch_envelopes)
BINDINGS = [
Binding("j", "next", "Next message"),
Binding("k", "previous", "Previous message"),
Binding("#", "delete", "Delete message"),
Binding("e", "archive", "Archive message"),
Binding("o", "open", "Open message", show=False),
Binding("q", "quit", "Quit application"),
Binding("h", "toggle_header", "Toggle Envelope Header"),
Binding("t", "create_task", "Create Task"),
Binding("l", "open_links", "Show Links"),
Binding("r", "reload", "Reload message list"),
Binding("%", "reload", "Reload message list", show=False),
Binding("1", "focus_1", "Focus Accounts Panel"),
Binding("2", "focus_2", "Focus Folders Panel"),
Binding("3", "focus_3", "Focus Envelopes Panel"),
Binding("4", "focus_4", "Focus Main Content"),
Binding("w", "toggle_main_content", "Toggle Message View Window"),
]
BINDINGS.extend(
[
Binding("pagedown", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up"),
Binding("s", "toggle_sort_order", "Toggle Sort Order"),
Binding("x", "toggle_selection", "Toggle selection", show=False),
Binding("space", "toggle_selection", "Toggle selection"),
Binding("escape", "clear_selection", "Clear selection"),
Binding("/", "search", "Search"),
Binding("u", "toggle_read", "Toggle read/unread"),
]
)
def compose(self) -> ComposeResult:
yield SearchPanel(id="search_panel")
yield Horizontal(
Vertical(
ListView(
ListItem(Label("All emails...")),
id="envelopes_list",
classes="list_view",
initial_index=0,
),
ListView(id="accounts_list", classes="list_view"),
ListView(id="folders_list", classes="list_view"),
id="sidebar",
),
ContentContainer(id="main_content"),
id="outer-wrapper",
)
yield Footer()
async def on_mount(self) -> None:
self.alert_timer: Timer | None = None # Timer to throttle alerts
self.theme = get_theme_name()
self.title = "LUK Mail"
self.query_one("#main_content").border_title = self.status_title
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {sort_indicator}"
self.query_one("#accounts_list").border_title = "2⃣ Accounts"
self.query_one("#folders_list").border_title = "3⃣ Folders"
# Start IPC listener for refresh notifications from sync daemon
self._ipc_listener = IPCListener("mail", self._on_ipc_message)
self._ipc_listener.start()
self.fetch_accounts()
self.fetch_folders()
worker = self.fetch_envelopes()
await worker.wait()
self.query_one("#envelopes_list").focus()
self.action_oldest()
def _on_ipc_message(self, message: IPCMessage) -> None:
"""Handle IPC messages from sync daemon."""
if message.event == "refresh":
# Schedule a reload on the main thread
self.call_from_thread(self.fetch_envelopes)
def compute_status_title(self):
metadata = self.message_store.get_metadata(self.current_message_id)
message_date = metadata["date"] if metadata else "N/A"
return f"✉️ Message ID: {self.current_message_id} | Date: {message_date}"
def watch_status_title(self, old_status_title: str, new_status_title: str) -> None:
self.query_one(ContentContainer).border_title = new_status_title
def watch_sort_order_ascending(self, old_value: bool, new_value: bool) -> None:
"""Update the border title of the envelopes list when the sort order changes."""
sort_indicator = "" if new_value else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {sort_indicator}"
def watch_current_message_index(self, old_index: int, new_index: int) -> None:
if new_index < 0:
new_index = 0
self.current_message_index = new_index
if new_index > self.total_messages:
new_index = self.total_messages
self.current_message_index = new_index
self._update_list_view_subtitle()
self.query_one("#envelopes_list", ListView).index = new_index
def watch_selected_messages(
self, old_messages: set[int], new_messages: set[int]
) -> None:
self._update_list_view_subtitle()
def _update_list_view_subtitle(self) -> None:
subtitle = f"[b]{self.current_message_index}[/b]/{self.total_messages}"
if self.selected_messages:
subtitle = f"(✓{len(self.selected_messages)}) {subtitle}"
self.query_one("#envelopes_list").border_subtitle = subtitle
def watch_total_messages(self, old_total: int, new_total: int) -> None:
"""Called when the total_messages reactive attribute changes."""
self._update_list_view_subtitle()
def watch_reload_needed(
self, old_reload_needed: bool, new_reload_needed: bool
) -> None:
logging.info(f"Reload needed: {new_reload_needed}")
if not old_reload_needed and new_reload_needed:
self.fetch_envelopes()
def watch_current_message_id(
self, old_message_id: int, new_message_id: int
) -> None:
"""Called when the current message ID changes."""
logging.info(
f"Current message ID changed from {old_message_id} to {new_message_id}"
)
if new_message_id == old_message_id:
return
# If the main content view is not visible, don't load the message
if not self.main_content_visible:
return
# Cancel any existing message loading worker
if self.msg_worker:
self.msg_worker.cancel()
# Start a new worker to load the message content
self.msg_worker = self.load_message_content(new_message_id)
@work(exclusive=True)
async def load_message_content(self, message_id: int) -> None:
"""Worker to load message content asynchronously."""
content_container = self.query_one(ContentContainer)
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
content_container.display_content(message_id, folder=folder, account=account)
metadata = self.message_store.get_metadata(message_id)
if metadata:
message_date = metadata["date"]
if self.current_message_index != metadata["index"]:
self.current_message_index = metadata["index"]
list_view = self.query_one("#envelopes_list", ListView)
if list_view.index != metadata["index"]:
list_view.index = metadata["index"]
# Mark message as read
await self._mark_message_as_read(message_id, metadata["index"])
else:
logging.warning(f"Message ID {message_id} not found in metadata.")
async def _mark_message_as_read(self, message_id: int, index: int) -> None:
"""Mark a message as read and update the UI."""
# Skip if message_id is invalid or index is out of bounds
if message_id <= 0:
return
if index < 0 or index >= len(self.message_store.envelopes):
return
# Check if already read
envelope_data = self.message_store.envelopes[index]
if envelope_data and envelope_data.get("type") != "header":
flags = envelope_data.get("flags", [])
if "Seen" in flags:
return # Already read
# Mark as read via himalaya with current folder/account
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
_, success = await himalaya_client.mark_as_read(
message_id, folder=folder, account=account
)
if success:
# Update the envelope flags in the store
if envelope_data:
if "flags" not in envelope_data:
envelope_data["flags"] = []
if "Seen" not in envelope_data["flags"]:
envelope_data["flags"].append("Seen")
# Update the visual state of the list item
try:
list_view = self.query_one("#envelopes_list", ListView)
list_item = list_view.children[index]
envelope_widget = list_item.query_one(EnvelopeListItem)
envelope_widget.is_read = True
envelope_widget.remove_class("unread")
except Exception:
pass # Widget may not exist
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Called when an item in the list view is selected."""
if event.list_view.index is None:
return
# Handle folder selection
if event.list_view.id == "folders_list":
self._handle_folder_selected(event)
return
# Handle account selection
if event.list_view.id == "accounts_list":
self._handle_account_selected(event)
return
# Only handle selection from the envelopes list
if event.list_view.id != "envelopes_list":
return
selected_index = event.list_view.index
# Check bounds before accessing
if selected_index < 0 or selected_index >= len(self.message_store.envelopes):
return
current_item = self.message_store.envelopes[selected_index]
if current_item is None or current_item.get("type") == "header":
return
message_id = int(current_item["id"])
self.current_message_id = message_id
self.current_message_index = selected_index
# Focus the main content panel after selecting a message
self.action_focus_4()
def _handle_folder_selected(self, event: ListView.Selected) -> None:
"""Handle folder selection from the folders list."""
try:
list_item = event.item
label = list_item.query_one(Label)
folder_name = str(label.renderable).strip()
if folder_name and folder_name != self.folder:
self.folder = folder_name
self.show_status(f"Switching to folder: {folder_name}")
# Clear current state and reload
self.current_message_id = 0
self.current_message_index = 0
self.selected_messages.clear()
self.search_query = "" # Clear search when switching folders
# Directly fetch instead of relying on reload_needed watcher
self.fetch_envelopes()
except Exception as e:
logging.error(f"Error selecting folder: {e}")
def _handle_account_selected(self, event: ListView.Selected) -> None:
"""Handle account selection from the accounts list."""
try:
list_item = event.item
label = list_item.query_one(Label)
account_name = str(label.renderable).strip()
if account_name and account_name != self.current_account:
self.current_account = account_name
self.folder = "INBOX" # Reset to INBOX when switching accounts
self.show_status(f"Switching to account: {account_name}")
# Clear current state and reload
self.current_message_id = 0
self.current_message_index = 0
self.selected_messages.clear()
self.search_query = "" # Clear search when switching accounts
# Refresh folders for new account
self.fetch_folders()
# Directly fetch instead of relying on reload_needed watcher
self.fetch_envelopes()
except Exception as e:
logging.error(f"Error selecting account: {e}")
def on_list_view_highlighted(self, event: ListView.Highlighted) -> None:
"""Called when an item in the list view is highlighted (e.g., via arrow keys)."""
if event.list_view.index is None:
return
# Only handle highlights from the envelopes list
if event.list_view.id != "envelopes_list":
return
highlighted_index = event.list_view.index
# Check bounds before accessing
if highlighted_index < 0 or highlighted_index >= len(
self.message_store.envelopes
):
return
current_item = self.message_store.envelopes[highlighted_index]
if current_item is None or current_item.get("type") == "header":
return
# message_id = int(current_item["id"])
# self.current_message_id = message_id
self.highlighted_message_index = highlighted_index
def on_key(self, event) -> None:
"""Handle key events to intercept space on the envelopes list."""
# Intercept space key when envelopes list is focused to prevent default select behavior
if event.key == "space":
focused = self.focused
if focused and focused.id == "envelopes_list":
event.prevent_default()
event.stop()
self.action_toggle_selection()
@work(exclusive=False)
async def fetch_envelopes(self) -> None:
msglist = self.query_one("#envelopes_list", ListView)
try:
msglist.loading = True
# Use the Himalaya client to fetch envelopes with current folder/account
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
envelopes, success = await himalaya_client.list_envelopes(
folder=folder, account=account
)
if success:
self.reload_needed = False
# Ensure envelopes is a list, even if it's None from the client
envelopes_list = envelopes if envelopes is not None else []
self.message_store.load(envelopes_list, self.sort_order_ascending)
self.total_messages = self.message_store.total_messages
# Use the centralized refresh method to update the ListView
self._populate_list_view()
# Restore the current index
msglist.index = self.current_message_index
else:
self.show_status("Failed to fetch envelopes.", "error")
except Exception as e:
self.show_status(f"Error fetching message list: {e}", "error")
finally:
msglist.loading = False
@work(exclusive=False)
async def fetch_accounts(self) -> None:
accounts_list = self.query_one("#accounts_list", ListView)
try:
accounts_list.loading = True
# Use the Himalaya client to fetch accounts
accounts, success = await himalaya_client.list_accounts()
if success and accounts:
for account in accounts:
item = ListItem(
Label(
str(account["name"]).strip(),
classes="account_name",
markup=False,
)
)
accounts_list.append(item)
else:
self.show_status("Failed to fetch accounts.", "error")
except Exception as e:
self.show_status(f"Error fetching account list: {e}", "error")
finally:
accounts_list.loading = False
@work(exclusive=False)
async def fetch_folders(self) -> None:
folders_list = self.query_one("#folders_list", ListView)
folders_list.clear()
folders_list.append(
ListItem(Label("INBOX", classes="folder_name", markup=False))
)
try:
folders_list.loading = True
# Use the Himalaya client to fetch folders for current account
account = self.current_account if self.current_account else None
folders, success = await himalaya_client.list_folders(account=account)
if success and folders:
for folder in folders:
folder_name = str(folder["name"]).strip()
# Skip INBOX since we already added it
if folder_name.upper() == "INBOX":
continue
item = ListItem(
Label(
folder_name,
classes="folder_name",
markup=False,
)
)
folders_list.append(item)
else:
self.show_status("Failed to fetch folders.", "error")
except Exception as e:
self.show_status(f"Error fetching folder list: {e}", "error")
finally:
folders_list.loading = False
def _populate_list_view(self) -> None:
"""Populate the ListView with new items using the new EnvelopeListItem widget."""
envelopes_list = self.query_one("#envelopes_list", ListView)
envelopes_list.clear()
config = get_config()
for item in self.message_store.envelopes:
if item and item.get("type") == "header":
# Use the new GroupHeader widget for date groupings
envelopes_list.append(ListItem(GroupHeader(label=item["label"])))
elif item:
# Use the new EnvelopeListItem widget
message_id = int(item.get("id", 0))
is_selected = message_id in self.selected_messages
envelope_widget = EnvelopeListItem(
envelope=item,
config=config.envelope_display,
is_selected=is_selected,
)
envelopes_list.append(ListItem(envelope_widget))
self.refresh_list_view_items() # Initial refresh of item states
def refresh_list_view_items(self) -> None:
"""Update the visual state of existing ListItems without clearing the list."""
envelopes_list = self.query_one("#envelopes_list", ListView)
for i, list_item in enumerate(envelopes_list.children):
if isinstance(list_item, ListItem):
# Bounds check - ListView and message_store may be out of sync during transitions
if i >= len(self.message_store.envelopes):
break
item_data = self.message_store.envelopes[i]
if item_data and item_data.get("type") != "header":
message_id = int(item_data["id"])
is_selected = message_id in self.selected_messages or False
list_item.set_class(is_selected, "selection")
# Try to update the EnvelopeListItem's selection state
try:
envelope_widget = list_item.query_one(EnvelopeListItem)
envelope_widget.set_selected(is_selected)
except Exception:
pass # Widget may not exist or be of old type
def show_message(self, message_id: int, new_index=None) -> None:
if new_index:
self.current_message_index = new_index
self.action_focus_4()
self.current_message_id = message_id
def show_status(self, message: str, severity: str = "information") -> None:
"""Display a status message using the built-in notify function."""
self.notify(
message, title="Status", severity=severity, timeout=2.6, markup=True
)
async def action_toggle_sort_order(self) -> None:
"""Toggle the sort order of the envelope list."""
self.sort_order_ascending = not self.sort_order_ascending
worker = self.fetch_envelopes()
await worker.wait()
if self.sort_order_ascending:
self.action_oldest()
else:
self.action_newest()
async def action_toggle_mode(self) -> None:
"""Toggle the content mode between plaintext and markdown."""
content_container = self.query_one(ContentContainer)
await content_container.action_toggle_mode()
def action_next(self) -> None:
if not self.current_message_index >= 0:
return
next_id, next_idx = self.message_store.find_next_valid_id(
self.current_message_index
)
if next_id is not None and next_idx is not None:
self.current_message_id = next_id
self.current_message_index = next_idx
def action_previous(self) -> None:
if not self.current_message_index >= 0:
return
prev_id, prev_idx = self.message_store.find_prev_valid_id(
self.current_message_index
)
if prev_id is not None and prev_idx is not None:
self.current_message_id = prev_id
self.current_message_index = prev_idx
async def action_delete(self) -> None:
"""Delete the current or selected messages."""
if self.selected_messages:
# --- Multi-message delete: show confirmation ---
count = len(self.selected_messages)
def do_delete(confirmed: bool) -> None:
if confirmed:
self.run_worker(self._delete_selected_messages())
self.push_screen(
ConfirmDialog(
title="Delete Messages",
message=f"Delete {count} selected message{'s' if count > 1 else ''}?",
confirm_label="Delete",
cancel_label="Cancel",
),
do_delete,
)
else:
# --- Single message delete (no confirmation needed) ---
worker = delete_current(self)
await worker.wait()
async def _delete_selected_messages(self) -> None:
"""Delete all selected messages."""
message_ids_to_delete = list(self.selected_messages)
next_id_to_select = None
if message_ids_to_delete:
highest_deleted_id = max(message_ids_to_delete)
metadata = self.message_store.get_metadata(highest_deleted_id)
if metadata:
next_id, _ = self.message_store.find_next_valid_id(metadata["index"])
if next_id is None:
next_id, _ = self.message_store.find_prev_valid_id(
metadata["index"]
)
next_id_to_select = next_id
# Delete each message with current folder/account
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
success_count = 0
for mid in message_ids_to_delete:
message, success = await himalaya_client.delete_message(
mid, folder=folder, account=account
)
if success:
success_count += 1
else:
self.show_status(f"Failed to delete message {mid}: {message}", "error")
if success_count > 0:
self.show_status(
f"Deleted {success_count} message{'s' if success_count > 1 else ''}"
)
self.selected_messages.clear()
self.refresh_list_view_items()
# Refresh the envelope list
worker = self.fetch_envelopes()
await worker.wait()
# After refresh, select the next message
if next_id_to_select:
new_metadata = self.message_store.get_metadata(next_id_to_select)
if new_metadata:
self.current_message_id = next_id_to_select
else:
self.action_oldest()
else:
self.action_oldest()
async def action_archive(self) -> None:
"""Archive the current or selected messages and update UI consistently."""
if self.selected_messages:
# --- Multi-message archive: show confirmation ---
count = len(self.selected_messages)
def do_archive(confirmed: bool) -> None:
if confirmed:
self.run_worker(self._archive_selected_messages())
self.push_screen(
ConfirmDialog(
title="Archive Messages",
message=f"Archive {count} selected message{'s' if count > 1 else ''}?",
confirm_label="Archive",
cancel_label="Cancel",
),
do_archive,
)
else:
# --- Single message archive (no confirmation needed) ---
await self._archive_single_message()
async def _archive_selected_messages(self) -> None:
"""Archive all selected messages."""
message_ids_to_archive = list(self.selected_messages)
next_id_to_select = None
if message_ids_to_archive:
highest_archived_id = max(message_ids_to_archive)
metadata = self.message_store.get_metadata(highest_archived_id)
if metadata:
next_id, _ = self.message_store.find_next_valid_id(metadata["index"])
if next_id is None:
next_id, _ = self.message_store.find_prev_valid_id(
metadata["index"]
)
next_id_to_select = next_id
# Archive messages with current folder/account
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
message, success = await himalaya_client.archive_messages(
[str(mid) for mid in message_ids_to_archive],
folder=folder,
account=account,
)
if success:
self.show_status(
message or f"Archived {len(message_ids_to_archive)} messages"
)
self.selected_messages.clear()
self.refresh_list_view_items()
else:
self.show_status(f"Failed to archive messages: {message}", "error")
return
# Refresh the envelope list
worker = self.fetch_envelopes()
await worker.wait()
# After refresh, select the next message
if next_id_to_select:
new_metadata = self.message_store.get_metadata(next_id_to_select)
if new_metadata:
self.current_message_id = next_id_to_select
else:
self.action_oldest()
else:
self.action_oldest()
async def _archive_single_message(self) -> None:
"""Archive the current single message."""
if not self.current_message_id:
self.show_status("No message selected to archive.", "error")
return
current_id = self.current_message_id
current_idx = self.current_message_index
next_id, _ = self.message_store.find_next_valid_id(current_idx)
if next_id is None:
next_id, _ = self.message_store.find_prev_valid_id(current_idx)
next_id_to_select = next_id
# Archive with current folder/account
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
message, success = await himalaya_client.archive_messages(
[str(current_id)], folder=folder, account=account
)
if success:
self.show_status(message or "Archived")
else:
self.show_status(
f"Failed to archive message {current_id}: {message}", "error"
)
return
# Refresh the envelope list
worker = self.fetch_envelopes()
await worker.wait()
# After refresh, select the next message
if next_id_to_select:
new_metadata = self.message_store.get_metadata(next_id_to_select)
if new_metadata:
self.current_message_id = next_id_to_select
else:
self.action_oldest()
else:
self.action_oldest()
def action_open(self) -> None:
action_open(self)
def action_create_task(self) -> None:
action_create_task(self)
def action_open_links(self) -> None:
"""Open the link panel showing links from the current message."""
content_container = self.query_one(ContentContainer)
links = content_container.get_links()
self.push_screen(LinkPanel(links))
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content").scroll_down()
def action_scroll_up(self) -> None:
"""Scroll the main content up."""
self.query_one("#main_content").scroll_up()
def action_scroll_page_down(self) -> None:
"""Scroll the main content down by a page."""
self.query_one("#main_content").scroll_page_down()
def action_scroll_page_up(self) -> None:
"""Scroll the main content up by a page."""
self.query_one("#main_content").scroll_page_up()
def action_toggle_main_content(self) -> None:
"""Toggle the visibility of the main content pane."""
self.main_content_visible = not self.main_content_visible
def watch_main_content_visible(self, visible: bool) -> None:
"""Called when main_content_visible changes."""
main_content = self.query_one("#main_content")
accounts_list = self.query_one("#accounts_list")
folders_list = self.query_one("#folders_list")
main_content.display = visible
accounts_list.display = visible
folders_list.display = visible
self.query_one("#envelopes_list").focus()
def action_quit(self) -> None:
# Stop IPC listener before exiting
if hasattr(self, "_ipc_listener"):
self._ipc_listener.stop()
self.exit()
def action_toggle_selection(self) -> None:
"""Toggle selection for the current message."""
current_item_data = self.message_store.envelopes[self.highlighted_message_index]
if current_item_data and current_item_data.get("type") != "header":
message_id = int(current_item_data["id"])
envelopes_list = self.query_one("#envelopes_list", ListView)
current_list_item = envelopes_list.children[self.highlighted_message_index]
# Toggle selection state
if message_id in self.selected_messages:
self.selected_messages.remove(message_id)
is_selected = False
else:
self.selected_messages.add(message_id)
is_selected = True
# Update the EnvelopeListItem widget
try:
envelope_widget = current_list_item.query_one(EnvelopeListItem)
envelope_widget.set_selected(is_selected)
except Exception:
# Fallback for old-style widgets
try:
checkbox_label = current_list_item.query_one(".checkbox", Label)
if is_selected:
checkbox_label.add_class("x-list")
checkbox_label.update("\uf4a7")
else:
checkbox_label.remove_class("x-list")
checkbox_label.update("\ue640")
except Exception:
pass
self._update_list_view_subtitle()
def action_clear_selection(self) -> None:
"""Clear all selected messages or focus search input if in search mode."""
# If in search mode, focus the search input instead of exiting
if self.search_mode:
search_panel = self.query_one("#search_panel", SearchPanel)
search_panel.focus_input()
return
if self.selected_messages:
self.selected_messages.clear()
self.refresh_list_view_items() # Refresh all items to uncheck checkboxes
self._update_list_view_subtitle()
async def action_toggle_read(self) -> None:
"""Toggle read/unread status for the current or selected messages."""
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
if self.selected_messages:
# Toggle multiple selected messages
for message_id in self.selected_messages:
await self._toggle_message_read_status(message_id, folder, account)
self.show_status(
f"Toggled read status for {len(self.selected_messages)} messages"
)
self.selected_messages.clear()
else:
# Toggle current message
if self.current_message_id:
await self._toggle_message_read_status(
self.current_message_id, folder, account
)
# Refresh the list to show updated read status
await self.fetch_envelopes().wait()
async def _toggle_message_read_status(
self, message_id: int, folder: str | None, account: str | None
) -> None:
"""Toggle read status for a single message."""
# Find the message in the store to check current status
metadata = self.message_store.get_metadata(message_id)
if not metadata:
return
index = metadata.get("index", -1)
if index < 0 or index >= len(self.message_store.envelopes):
return
envelope_data = self.message_store.envelopes[index]
if not envelope_data or envelope_data.get("type") == "header":
return
flags = envelope_data.get("flags", [])
is_read = "Seen" in flags
if is_read:
# Mark as unread
result, success = await himalaya_client.mark_as_unread(
message_id, folder=folder, account=account
)
if success:
if "Seen" in envelope_data.get("flags", []):
envelope_data["flags"].remove("Seen")
self.show_status(f"Marked message {message_id} as unread")
self._update_envelope_read_state(index, is_read=False)
else:
# Mark as read
result, success = await himalaya_client.mark_as_read(
message_id, folder=folder, account=account
)
if success:
if "flags" not in envelope_data:
envelope_data["flags"] = []
if "Seen" not in envelope_data["flags"]:
envelope_data["flags"].append("Seen")
self.show_status(f"Marked message {message_id} as read")
self._update_envelope_read_state(index, is_read=True)
def _update_envelope_read_state(self, index: int, is_read: bool) -> None:
"""Update the visual state of an envelope in the list."""
try:
list_view = self.query_one("#envelopes_list", ListView)
list_item = list_view.children[index]
envelope_widget = list_item.query_one(EnvelopeListItem)
envelope_widget.is_read = is_read
if is_read:
envelope_widget.remove_class("unread")
else:
envelope_widget.add_class("unread")
except Exception:
pass # Widget may not exist
def action_oldest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.message_store.get_oldest_id())
def action_newest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.message_store.get_newest_id())
def action_reload(self) -> None:
"""Reload the message list."""
self.fetch_envelopes()
self.show_status("Reloading messages...")
def action_search(self) -> None:
"""Open the search panel."""
search_panel = self.query_one("#search_panel", SearchPanel)
if not search_panel.is_visible:
# Cache current envelopes before searching
self._cached_envelopes = list(self.message_store.envelopes)
self._cached_metadata = dict(self.message_store.metadata_by_id)
self.search_mode = True
search_panel.show(self.search_query)
def on_search_panel_search_requested(
self, event: SearchPanel.SearchRequested
) -> None:
"""Handle live search request from search panel."""
self._perform_search(event.query, focus_results=False)
def on_search_panel_search_confirmed(
self, event: SearchPanel.SearchConfirmed
) -> None:
"""Handle confirmed search (Enter key) - search and focus results."""
self._perform_search(event.query, focus_results=True)
def on_search_panel_search_cancelled(
self, event: SearchPanel.SearchCancelled
) -> None:
"""Handle search cancellation - restore previous envelope list."""
self.search_mode = False
self.search_query = ""
# Restore cached envelopes and metadata
if self._cached_envelopes:
self.message_store.envelopes = self._cached_envelopes
self._cached_envelopes = []
if self._cached_metadata:
self.message_store.metadata_by_id = self._cached_metadata
self._cached_metadata = {}
self._populate_list_view()
# Restore envelope list title
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {sort_indicator}"
self._update_list_view_subtitle()
self.query_one("#envelopes_list").focus()
@work(exclusive=True)
async def _perform_search(self, query: str, focus_results: bool = False) -> None:
"""Perform search using Himalaya and display results in envelope list."""
search_panel = self.query_one("#search_panel", SearchPanel)
search_panel.update_status(-1, searching=True)
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
results, success = await himalaya_client.search_envelopes(
query, folder=folder, account=account
)
if not success:
search_panel.update_status(0, searching=False)
self.show_status("Search failed", "error")
return
# Update search panel status
search_panel.update_status(len(results), searching=False)
if not results:
# Clear the envelope list and show "no results"
self._display_search_results([], query)
return
self.search_query = query
self.search_mode = True
self._display_search_results(results, query)
if focus_results:
# Focus the main content and select first result
if results:
first_id = int(results[0].get("id", 0))
if first_id:
self.current_message_id = first_id
self.action_focus_4()
def _display_search_results(
self, results: List[Dict[str, Any]], query: str
) -> None:
"""Display search results in the envelope list with a header."""
envelopes_list = self.query_one("#envelopes_list", ListView)
envelopes_list.clear()
config = get_config()
# Build search header label
if results:
header_label = f"Search: '{query}' ({len(results)} result{'s' if len(results) != 1 else ''})"
else:
header_label = f"Search: '{query}' - No results found"
if not results:
# Clear the message viewer when no results
envelopes_list.append(ListItem(GroupHeader(label=header_label)))
content_container = self.query_one(ContentContainer)
content_container.clear_content()
self.message_store.envelopes = []
self.message_store.metadata_by_id = {}
self.total_messages = 0
self.current_message_id = 0
return
# Create a temporary message store for search results
# We need to include the search header in the envelopes so indices match
search_store = MessageStore()
# Manually build envelopes list with search header first
# so that ListView indices match message_store.envelopes indices
grouped_envelopes = [{"type": "header", "label": header_label}]
# Sort results by date
sorted_results = sorted(
results,
key=lambda x: x.get("date", ""),
reverse=not self.sort_order_ascending,
)
# Group by month and build metadata
months: Dict[str, bool] = {}
for envelope in sorted_results:
if "id" not in envelope:
continue
# Extract date and determine month group
date_str = envelope.get("date", "")
try:
date = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
month_key = date.strftime("%B %Y")
except (ValueError, TypeError):
month_key = "Unknown Date"
# Add month header if this is a new month
if month_key not in months:
months[month_key] = True
grouped_envelopes.append({"type": "header", "label": month_key})
# Add the envelope
grouped_envelopes.append(envelope)
# Store metadata for quick access (index matches grouped_envelopes)
envelope_id = int(envelope["id"])
search_store.metadata_by_id[envelope_id] = {
"id": envelope_id,
"subject": envelope.get("subject", ""),
"from": envelope.get("from", {}),
"to": envelope.get("to", {}),
"cc": envelope.get("cc", {}),
"date": date_str,
"index": len(grouped_envelopes) - 1,
}
search_store.envelopes = grouped_envelopes
search_store.total_messages = len(search_store.metadata_by_id)
# Store for navigation (replace main store)
self.message_store.envelopes = search_store.envelopes
self.message_store.metadata_by_id = search_store.metadata_by_id
self.total_messages = len(results)
# Build ListView to match envelopes list exactly
for item in self.message_store.envelopes:
if item and item.get("type") == "header":
envelopes_list.append(ListItem(GroupHeader(label=item["label"])))
elif item:
message_id = int(item.get("id", 0))
is_selected = message_id in self.selected_messages
envelope_widget = EnvelopeListItem(
envelope=item,
config=config.envelope_display,
is_selected=is_selected,
)
envelopes_list.append(ListItem(envelope_widget))
# Update border title to show search mode
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one(
"#envelopes_list"
).border_title = f"Search Results {sort_indicator}"
# Select first result if available
if len(envelopes_list.children) > 1:
envelopes_list.index = 1 # Skip header
def action_focus_1(self) -> None:
self.query_one("#envelopes_list").focus()
def action_focus_2(self) -> None:
self.query_one("#accounts_list").focus()
def action_focus_3(self) -> None:
self.query_one("#folders_list").focus()
def action_focus_4(self) -> None:
self.query_one("#main_content").focus()
if __name__ == "__main__":
app = EmailViewerApp()
app.run()
def launch_email_viewer():
app = EmailViewerApp()
app.run()