Files
luk/maildir_gtd/app.py
2025-05-05 09:16:19 -06:00

456 lines
19 KiB
Python

import re
import sys
import os
from datetime import datetime
import asyncio
import logging
from typing import Iterable
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from textual import work
from textual.worker import Worker
from textual.app import App, ComposeResult, SystemCommand, RenderResult
from textual.logging import TextualHandler
from textual.screen import Screen
from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem
from textual.reactive import reactive, Reactive
from textual.binding import Binding
from textual.timer import Timer
from textual.containers import ScrollableContainer, Grid, Vertical, Horizontal
from actions.archive import archive_current
from actions.delete import delete_current
from actions.open import action_open
from actions.task import action_create_task
from widgets.EnvelopeHeader import EnvelopeHeader
from maildir_gtd.utils import group_envelopes_by_date
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
class StatusTitle(Static):
total_messages: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
current_message_id: Reactive[int] = reactive(1)
folder: Reactive[str] = reactive("INBOX")
def render(self) -> RenderResult:
return f"{self.folder} | ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}"
class EmailViewerApp(App):
"""A simple email viewer app using the Himalaya CLI."""
CSS_PATH = "email_viewer.tcss"
title = "Maildir GTD Reader"
current_message_id: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
folder = reactive("INBOX")
header_expanded = reactive(False)
reload_needed = reactive(True)
all_envelopes = reactive([])
next_id: Reactive[int] = reactive(0)
previous_id: Reactive[int] = reactive(0)
oldest_id: Reactive[int] = reactive(0)
newest_id: Reactive[int] = reactive(0)
msg_worker: Worker | None = None
messsage_metadata: dict[int, dict] = {}
message_body_cache: dict[int, str] = {}
total_messages: Reactive[int] = reactive(0)
status_title = reactive("Message View")
sort_order_ascending: Reactive[bool] = reactive(True)
valid_envelopes = reactive([])
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next)
yield SystemCommand("Previous Message", "Navigate to Previous ID", self.action_previous)
yield SystemCommand("Delete Message", "Delete the current message", self.action_delete)
yield SystemCommand("Archive Message", "Archive the current message", self.action_archive)
yield SystemCommand("Open Message", "Open a specific message by ID", self.action_open)
yield SystemCommand("Create Task", "Create a task using the task CLI", self.action_create_task)
yield SystemCommand("Oldest Message", "Show the oldest message", self.action_oldest)
yield SystemCommand("Newest Message", "Show the newest message", self.action_newest)
yield SystemCommand("Reload", "Reload the message list", self.fetch_envelopes)
BINDINGS = [
Binding("j", "next", "Next message"),
Binding("k", "previous", "Previous message"),
Binding("#", "delete", "Delete message"),
Binding("e", "archive", "Archive message"),
Binding("o", "open", "Open message", show=False),
Binding("q", "quit", "Quit application"),
Binding("h", "toggle_header", "Toggle Envelope Header"),
Binding("t", "create_task", "Create Task"),
Binding("%", "reload", "Reload message list"),
Binding("1", "focus_1", "Focus Accounts Panel"),
Binding("2", "focus_2", "Focus Folders Panel"),
Binding("3", "focus_3", "Focus Envelopes Panel")
]
BINDINGS.extend([
Binding("space", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up"),
Binding("s", "toggle_sort_order", "Toggle Sort Order")
])
def compose(self) -> ComposeResult:
yield Horizontal(
Vertical(
ListView(ListItem(Label("All emails...")), id="envelopes_list", classes="list_view", initial_index=0),
ListView(id="accounts_list", classes="list_view"),
ListView(id="folders_list", classes="list_view"),
id="sidebar"
),
ScrollableContainer(
EnvelopeHeader(),
Markdown(),
id="main_content"
),
id="outer-wrapper"
)
yield Footer()
async def on_mount(self) -> None:
self.alert_timer: Timer | None = None # Timer to throttle alerts
self.theme = "monokai"
self.title = "MaildirGTD"
self.query_one("#main_content").border_title = self.status_title
sort_indicator = '\u2191' if self.sort_order_ascending else '\u2193'
self.query_one("#envelopes_list").border_title = f"\[1] Emails {sort_indicator}"
self.query_one("#accounts_list").border_title = "\[2] Accounts"
self.query_one("#folders_list").border_title = "\[3] Folders"
# self.query_one(ListView).data_bind(index=EmailViewerApp.current_message_index)
# self.watch(self.query_one(StatusTitle), "current_message_id", update_progress)
# Fetch the ID of the most recent message using the Himalaya CLI
self.fetch_accounts()
self.fetch_folders()
worker = self.fetch_envelopes()
await worker.wait()
self.query_one("#envelopes_list").focus()
self.action_oldest()
def compute_status_title(self) -> None:
sort_indicator = "\u2191" if self.sort_order_ascending else "\u2193"
return f"✉️ Message ID: {self.current_message_id} "
def compute_valid_envelopes(self) -> None:
return (envelope for envelope in self.all_envelopes if envelope.get('id'))
def watch_status_title(self, old_status_title: str, new_status_title: str) -> None:
self.query_one("#main_content").border_title = new_status_title
def watch_sort_order_ascending(self, old_value: bool, new_value: bool) -> None:
"""Update the border title of the envelopes list when the sort order changes."""
sort_indicator = "\u2191" if new_value else "\u2193"
self.query_one("#envelopes_list").border_title = f"\[1] Emails {sort_indicator}"
def watch_current_message_index(self, old_index: int, new_index: int) -> None:
self.query_one("#envelopes_list").border_subtitle = f"[b]{self.current_message_index}[/b]/{self.total_messages}"
def compute_newest_id(self) -> None:
if not self.all_envelopes:
return 0
return sorted((int(envelope['id']) for envelope in self.valid_envelopes))[-1]
def compute_oldest_id(self) -> None:
if not self.valid_envelopes:
return 0
sorted_msgs = sorted((int(envelope['id']) for envelope in self.valid_envelopes))
if len(sorted_msgs) > 0:
return sorted_msgs[0]
return 0
def compute_next_id(self) -> None:
if not self.valid_envelopes:
return 0
for envelope_id in sorted(int(envelope['id']) for envelope in self.valid_envelopes):
if envelope_id > int(self.current_message_id):
return envelope_id
return self.newest_id
def compute_previous_id(self) -> None:
if not self.valid_envelopes:
return 0
for envelope_id in sorted((int(envelope['id']) for envelope in self.valid_envelopes), reverse=True):
if envelope_id < int(self.current_message_id):
return envelope_id
return self.oldest_id
def watch_reload_needed(self, old_reload_needed: bool, new_reload_needed: bool) -> None:
logging.info(f"Reload needed: {new_reload_needed}")
if (old_reload_needed == False and new_reload_needed == True):
self.fetch_envelopes()
def watch_current_message_id(self, old_message_id: int, new_message_id: int) -> None:
"""Called when the current message ID changes."""
logging.info(f"Current message ID changed from {old_message_id} to {new_message_id}")
if (new_message_id == old_message_id):
return
self.msg_worker.cancel() if self.msg_worker else None
headers = self.query_one(EnvelopeHeader)
if new_message_id in self.message_metadata:
metadata = self.message_metadata[new_message_id]
self.current_message_index = metadata['index']
headers.subject = metadata['subject'].strip()
headers.from_ = metadata['from'].get('addr', '')
headers.to = metadata['to'].get('addr', '')
headers.date = datetime.strptime(metadata['date'].replace("+00:00", ""), "%Y-%m-%d %H:%M").strftime("%a %b %d %H:%M")
headers.cc = metadata['cc'].get('addr', '') if 'cc' in metadata else ""
self.query_one(ListView).index = metadata['index']
else:
logging.warning(f"Message ID {new_message_id} not found in metadata.")
if (self.message_body_cache.get(new_message_id)):
# If the message body is already cached, use it
msg = self.query_one(Markdown)
msg.update(self.message_body_cache[new_message_id])
return
else:
self.query_one("#main_content").loading = True
self.msg_worker = self.fetch_one_message(new_message_id)
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Called when an item in the list view is selected."""
# logging.info(f"Selected item: {self.all_envelopes[event.list_view.index]}")
if self.all_envelopes[event.list_view.index] is None or self.all_envelopes[event.list_view.index].get("type") == "header":
# If the selected item is a header, do not change the current message ID
return
self.current_message_id = int(self.all_envelopes[event.list_view.index]['id'])
@work(exclusive=False)
async def fetch_one_message(self, new_message_id:int) -> None:
msg = self.query_one(Markdown)
try:
process = await asyncio.create_subprocess_shell(
f"himalaya message read {str(new_message_id)}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}...")
if process.returncode == 0:
# Render the email content as Markdown
fixedText = stdout.decode().replace("(https://urldefense.com/v3/", "(")
fixedText = re.sub(r"atlOrigin.+?\)", ")", fixedText)
logging.info(f"rendering fixedText: {fixedText[0:50]}")
self.message_body_cache[new_message_id] = fixedText
await msg.update(fixedText)
self.query_one("#main_content").loading = False
logging.info(fixedText)
except Exception as e:
self.show_status(f"Error fetching message content: {e}", "error")
logging.error(f"Error fetching message content: {e}")
@work(exclusive=False)
async def fetch_envelopes(self) -> None:
msglist = self.query_one("#envelopes_list")
try:
msglist.loading = True
process = await asyncio.create_subprocess_shell(
"himalaya envelope list -o json -s 9999",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}")
if process.returncode == 0:
import json
envelopes = json.loads(stdout.decode())
if envelopes:
self.reload_needed = False
self.total_messages = len(envelopes)
msglist.clear()
envelopes = sorted(envelopes, key=lambda x: int(x['id']), reverse=not self.sort_order_ascending)
grouped_envelopes = group_envelopes_by_date(envelopes)
self.all_envelopes = grouped_envelopes
self.message_metadata = {
envelope['id']: {
'subject': envelope.get('subject', ''),
'from': envelope.get('from', {}),
'to': envelope.get('to', {}),
'date': envelope.get('date', ''),
'cc': envelope.get('cc', {}),
'index': index # Store the position index
}
for index, envelope in enumerate(self.all_envelopes)
if 'id' in envelope
}
for item in grouped_envelopes:
if item.get("type") == "header":
msglist.append(ListItem(Label(item["label"], classes="group_header", markup=False)))
else:
msglist.append(ListItem(Label(str(item['subject']).strip(), classes="email_subject", markup=False)))
msglist.index = self.current_message_index
else:
self.show_status("Failed to fetch any envelopes.", "error")
except Exception as e:
self.show_status(f"Error fetching message list: {e}", "error")
finally:
msglist.loading = False
@work(exclusive=False)
async def fetch_accounts(self) -> None:
accounts_list = self.query_one("#accounts_list")
try:
accounts_list.loading = True
process = await asyncio.create_subprocess_shell(
"himalaya account list -o json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}")
if process.returncode == 0:
import json
accounts = json.loads(stdout.decode())
if accounts:
for account in accounts:
item = ListItem(Label(str(account['name']).strip(), classes="account_name", markup=False))
accounts_list.append(item)
except Exception as e:
self.show_status(f"Error fetching account list: {e}", "error")
finally:
accounts_list.loading = False
@work(exclusive=False)
async def fetch_folders(self) -> None:
folders_list = self.query_one("#folders_list")
folders_list.clear()
folders_list.append(ListItem(Label("INBOX", classes="folder_name", markup=False)))
try:
folders_list.loading = True
process = await asyncio.create_subprocess_shell(
"himalaya folder list -o json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}")
if process.returncode == 0:
import json
folders = json.loads(stdout.decode())
if folders:
for folder in folders:
item = ListItem(Label(str(folder['name']).strip(), classes="folder_name", markup=False))
folders_list.append(item)
except Exception as e:
self.show_status(f"Error fetching folder list: {e}", "error")
finally:
folders_list.loading = False
def show_message(self, message_id: int) -> None:
self.current_message_id = message_id
def show_status(self, message: str, severity: str = "information") -> None:
"""Display a status message using the built-in notify function."""
self.notify(message, title="Status", severity=severity, timeout=1.6, markup=True)
def action_toggle_header(self) -> None:
"""Toggle the visibility of the EnvelopeHeader panel."""
header = self.query_one(EnvelopeHeader)
header.styles.height = "1" if self.header_expanded else "auto"
self.header_expanded = not self.header_expanded
async def action_toggle_sort_order(self) -> None:
"""Toggle the sort order of the envelope list."""
self.sort_order_ascending = not self.sort_order_ascending
worker = self.fetch_envelopes()
await worker.wait()
# Call action_newest or action_oldest based on the new sort order
if self.sort_order_ascending:
self.action_oldest()
else:
self.action_newest()
def action_next(self) -> None:
if self.sort_order_ascending:
self.show_message(self.next_id)
else:
self.show_message(self.previous_id)
self.fetch_envelopes() if self.reload_needed else None
def action_previous(self) -> None:
if self.sort_order_ascending:
self.show_message(self.previous_id)
else:
self.show_message(self.next_id)
self.fetch_envelopes() if self.reload_needed else None
def action_delete(self) -> None:
self.all_envelopes.remove(self.all_envelopes[self.current_message_index])
self.message_body_cache.pop(self.current_message_id, None)
self.total_messages = len(self.all_envelopes)
delete_current(self)
def action_archive(self) -> None:
self.all_envelopes.remove(self.all_envelopes[self.current_message_index])
self.message_body_cache.pop(self.current_message_id, None)
self.total_messages = len(self.all_envelopes)
archive_current(self)
def action_open(self) -> None:
action_open(self)
def action_create_task(self) -> None:
action_create_task(self)
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content").scroll_down()
def action_scroll_up(self) -> None:
"""Scroll the main content up."""
self.query_one("#main_content").scroll_up()
def action_scroll_page_down(self) -> None:
"""Scroll the main content down by a page."""
self.query_one("#main_content").scroll_page_down()
def action_scroll_page_up(self) -> None:
"""Scroll the main content up by a page."""
self.query_one("#main_content").scroll_page_up()
def action_quit(self) -> None:
"""Quit the application."""
self.exit()
def action_oldest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.oldest_id)
def action_newest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.newest_id)
def action_focus_1(self) -> None:
self.query_one("#envelopes_list").focus()
def action_focus_2(self) -> None:
self.query_one("#accounts_list").focus()
def action_focus_3(self) -> None:
self.query_one("#folders_list").focus()
if __name__ == "__main__":
app = EmailViewerApp()
app.run()