Files
luk/tui_email_viewer.py
2025-04-30 10:03:15 -04:00

304 lines
13 KiB
Python

import logging
from typing import Iterable
from textual import on
from textual.app import App, ComposeResult, SystemCommand
from textual.logging import TextualHandler
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Label, Input, Button
from textual.reactive import Reactive
from textual.binding import Binding
from textual.timer import Timer
from textual.containers import ScrollableContainer, Horizontal, Vertical, Grid
import subprocess
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
class OpenMessageScreen(Screen[int]):
def compose(self) -> ComposeResult:
yield Horizontal(
Label("📨", id="message_label"),
Input(placeholder="Enter message ID (integer only)", type="integer", id="open_message_input"),
Button("Open", variant="primary", id="open_message_button")
)
@on(Input.Submitted)
def handle_message_id(self) -> None:
logging.info("Open message")
input_widget = self.query_one("#open_message_input", Input)
self.disabled = True
self.loading = True
message_id = int(input_widget.value)
self.dismiss(message_id)
@on(Input._on_key)
def handle_close(self, event) -> None:
if (event.key == "escape" or event.key == "ctrl+c"):
self.dismiss()
class CreateTaskScreen(Screen[str]):
def compose(self) -> ComposeResult:
yield Vertical(
Label("$>", id="task_prompt"),
Label("task ", id="task_prompt_label"),
Input(placeholder="arguments", id="task_input")
)
@on(Input.Submitted)
def handle_task_args(self) -> None:
input_widget = self.query_one("#task_input", Input)
self.disabled = True
self.loading = True
task_args = input_widget.value
self.dismiss(task_args)
@on(Input._on_key)
def handle_close(self, event) -> None:
if (event.key == "escape" or event.key == "ctrl+c"):
self.dismiss()
class EmailViewerApp(App):
"""A Textual app for viewing and managing emails."""
title = "Mail Reader"
CSS_PATH = "email_viewer.tcss" # Optional: For styling
current_message_id: Reactive[int] = Reactive(1)
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next)
yield SystemCommand("Previous Message", "Navigate to Previous ID", self.action_previous)
yield SystemCommand("Delete Message", "Delete the current message", self.action_delete)
yield SystemCommand("Archive Message", "Archive the current message", self.action_archive)
yield SystemCommand("Open Message", "Open a specific message by ID", self.action_open)
yield SystemCommand("Create Task", "Create a task using the task CLI", self.action_create_task)
BINDINGS = [
Binding("n", "next", "Next message"),
Binding("p", "previous", "Previous message"),
Binding("d", "delete", "Delete message"),
Binding("a", "archive", "Archive message"),
Binding("o", "open", "Open message", show=False),
Binding("q", "quit", "Quit application"),
Binding("c", "create_task", "Create Task")
]
BINDINGS.extend([
Binding("j", "scroll_down", "Scroll down"),
Binding("k", "scroll_up", "Scroll up"),
Binding("down", "scroll_down", "Scroll down"),
Binding("up", "scroll_up", "Scroll up"),
Binding("space", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up")
])
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header(show_clock=True)
yield Footer(Label("[n] Next | [p] Previous | [d] Delete | [a] Archive | [o] Open | [q] Quit | [c] Create Task"))
yield ScrollableContainer(Static(Label("Email Viewer App"), id="main_content"))
def on_mount(self) -> None:
"""Called when the app is mounted."""
self.alert_timer: Timer | None = None # Timer to throttle alerts
# Fetch the ID of the most recent message using the Himalaya CLI
try:
result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"],
capture_output=True,
text=True
)
if result.returncode == 0:
import json
envelopes = json.loads(result.stdout)
if envelopes:
self.current_message_id = int(envelopes[0]['id']) # Get the first envelope's ID
else:
self.query_one("#main_content", Static).update("Failed to fetch the most recent message ID.")
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
self.show_message(self.current_message_id)
def show_message(self, message_id: int) -> None:
self.query_one("#main_content", Static).loading = True
"""Fetch and display the email message by ID."""
try:
result = subprocess.run(
["himalaya", "message", "read", str(message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
# Render the email content as Markdown
from rich.markdown import Markdown
markdown_content = Markdown(result.stdout, justify=True )
self.query_one("#main_content", Static).loading = False
self.query_one("#main_content", Static).update(markdown_content)
else:
self.query_one("#main_content", Static).update(f"Failed to fetch message {message_id}.")
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
def show_status(self, message: str, severity: str = "information") -> None:
"""Display a status message using the built-in notify function."""
self.notify(message, title="Status", severity=severity, timeout=1, markup=True)
def action_next(self) -> None:
"""Show the next email message, iterating until a valid one is found or giving up after 100 attempts."""
try:
result = subprocess.run(
["nvim", "--server", " /tmp/nvim-server", " --remote-send", "':Himalaya<CR>'"],
capture_output=False,
text=True
)
except Exception as e:
logging.warning(f"Error running nvim himalaya refresh command. Maybe the nvim server isn't started? {e}")
return
self.query_one("#main_content", Static).loading = True
attempts = 0
while attempts < 100:
self.current_message_id += 1
try:
result = subprocess.run(
["himalaya", "message", "read", str(self.current_message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
self.query_one("#main_content", Static).loading = False
self.show_message(self.current_message_id)
self.show_status(f"Showing next message: {self.current_message_id}")
return
else:
attempts += 1
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
return
self.query_one("#main_content", Static).update("No more messages found after 100 attempts.")
def action_previous(self) -> None:
"""Show the previous email message, iterating until a valid one is found or giving up after 100 attempts."""
self.show_status("Loading previous message...")
attempts = 0
while attempts < 100 and self.current_message_id > 1:
self.current_message_id -= 1
try:
result = subprocess.run(
["himalaya", "message", "read", str(self.current_message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
self.show_message(self.current_message_id)
self.show_status(f"Showing previous message: {self.current_message_id}")
return
else:
attempts += 1
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
return
self.query_one("#main_content", Static).update("No more messages found after 100 attempts.")
def action_delete(self) -> None:
"""Delete the current email message."""
self.show_status(f"Deleting message {self.current_message_id}...")
self.query_one("#main_content", Static).loading = True
try:
result = subprocess.run(
["himalaya", "message", "delete", str(self.current_message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
self.query_one("#main_content", Static).loading = False
self.query_one("#main_content", Static).update(f"Message {self.current_message_id} deleted.")
self.show_status(f"Message {self.current_message_id} deleted.")
self.action_next() # Automatically show the next message
else:
self.query_one("#main_content", Static).update(f"Failed to delete message {self.current_message_id}.")
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
def action_archive(self) -> None:
"""Archive the current email message."""
self.show_status(f"Archiving message {self.current_message_id}...")
try:
result = subprocess.run(
["himalaya", "message", "move", "Archives", str(self.current_message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
self.query_one("#main_content", Static).update(f"Message {self.current_message_id} archived.")
self.show_status(f"Message {self.current_message_id} archived.")
self.action_next() # Automatically show the next message
else:
self.query_one("#main_content", Static).update(f"Failed to archive message {self.current_message_id}.")
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
def action_open(self) -> None:
"""Show the input modal for opening a specific message by ID."""
def check_id(message_id: str) -> bool:
try:
int(message_id)
self.app.show_status(f"Opening message {message_id}...")
self.app.current_message_id = message_id
self.app.show_message(self.app.current_message_id)
except ValueError:
self.app.show_status("Invalid message ID. Please enter an integer.", severity="error")
return True
except ValueError:
return False
self.push_screen(OpenMessageScreen(), check_id)
def action_create_task(self) -> None:
"""Show the input modal for creating a task."""
def check_task(task_args: str) -> bool:
try:
result = subprocess.run(
["task"] + task_args.split(),
capture_output=True,
text=True
)
if result.returncode == 0:
self.show_status("Task created successfully.")
else:
self.show_status(f"Failed to create task: {result.stderr}")
except Exception as e:
self.show_status(f"Error: {e}", severity="error")
return True
return False
self.push_screen(CreateTaskScreen(), check_task)
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content", Static).scroll_down()
def action_scroll_up(self) -> None:
"""Scroll the main content up."""
self.query_one("#main_content", Static).scroll_up()
def action_scroll_page_down(self) -> None:
"""Scroll the main content down by a page."""
self.query_one("#main_content", Static).scroll_page_down()
def action_scroll_page_up(self) -> None:
"""Scroll the main content up by a page."""
self.query_one("#main_content", Static).scroll_page_up()
def action_quit(self) -> None:
"""Quit the application."""
self.exit()
if __name__ == "__main__":
app = EmailViewerApp()
app.run()