refactor email viewer app

This commit is contained in:
Tim Bendt
2025-04-30 13:11:00 -04:00
parent 3f48ef8e11
commit a158fad485
27 changed files with 362 additions and 303 deletions

1
maildir_gtd/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Initialize the maildir_gtd package

Binary file not shown.

View File

@@ -0,0 +1 @@
# Initialize the actions subpackage

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,21 @@
from textual.widgets import Static
import subprocess
from maildir_gtd.actions.next import action_next
def action_archive(app) -> None:
"""Archive the current email message."""
app.show_status(f"Archiving message {app.current_message_id}...")
try:
result = subprocess.run(
["himalaya", "message", "move", "Archives", str(app.current_message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
app.query_one("#main_content", Static).update(f"Message {app.current_message_id} archived.")
app.show_status(f"Message {app.current_message_id} archived.")
action_next(app) # Automatically show the next message
else:
app.query_one("#main_content", Static).update(f"Failed to archive message {app.current_message_id}.")
except Exception as e:
app.query_one("#main_content", Static).update(f"Error: {e}")

View File

@@ -0,0 +1,23 @@
import subprocess
from textual.widgets import Static
from maildir_gtd.actions.next import action_next
def action_delete(app) -> None:
"""Delete the current email message."""
app.show_status(f"Deleting message {app.current_message_id}...")
app.query_one("#main_content", Static).loading = True
try:
result = subprocess.run(
["himalaya", "message", "delete", str(app.current_message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
app.query_one("#main_content", Static).loading = False
app.query_one("#main_content", Static).update(f"Message {app.current_message_id} deleted.")
action_next(app) # Automatically show the next message
else:
app.query_one("#main_content", Static).update(f"Failed to delete message {app.current_message_id}.")
except Exception as e:
app.query_one("#main_content", Static).update(f"Error: {e}")

View File

@@ -0,0 +1,36 @@
import logging
from typing import Iterable
from textual import on
from textual.app import App, ComposeResult, SystemCommand
from textual.logging import TextualHandler
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Label, Input, Button
from textual.reactive import Reactive
from textual.binding import Binding
from textual.timer import Timer
from textual.containers import ScrollableContainer, Horizontal, Vertical, Grid
import subprocess
def action_next(app) -> None:
"""Show the next email message by finding the next higher ID from the list of envelope IDs."""
try:
result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"],
capture_output=True,
text=True
)
if result.returncode == 0:
import json
envelopes = json.loads(result.stdout)
ids = sorted(int(envelope['id']) for envelope in envelopes)
for envelope_id in ids:
if envelope_id > app.current_message_id:
app.current_message_id = envelope_id
app.show_message(app.current_message_id)
app.show_status(f"Showing next message: {app.current_message_id}")
return
app.show_status("No newer messages found.", severity="warning")
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,16 @@
from maildir_gtd.screens.OpenMessage import OpenMessageScreen
def action_open(app) -> None:
"""Show the input modal for opening a specific message by ID."""
def check_id(message_id: str) -> bool:
try:
int(message_id)
app.show_status(f"Opening message {message_id}...")
app.current_message_id = message_id
app.show_message(app.current_message_id)
except ValueError:
app.show_status("Invalid message ID. Please enter an integer.", severity="error")
return True
return False
app.push_screen(OpenMessageScreen(), check_id)

View File

@@ -0,0 +1,26 @@
from textual.widgets import Static
import subprocess
def action_previous(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"],
capture_output=True,
text=True
)
if result.returncode == 0:
import json
envelopes = json.loads(result.stdout)
ids = sorted((int(envelope['id']) for envelope in envelopes), reverse=True)
for envelope_id in ids:
if envelope_id < app.current_message_id:
app.current_message_id = envelope_id
app.show_message(app.current_message_id)
app.show_status(f"Showing previous message: {app.current_message_id}")
return
app.show_status("No older messages found.", severity="warning")
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,22 @@
from textual.widgets import Static
import subprocess
def show_message(app, message_id: int) -> None:
"""Fetch and display the email message by ID."""
app.query_one("#main_content", Static).loading = True
try:
result = subprocess.run(
["himalaya", "message", "read", str(message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
# Render the email content as Markdown
from rich.markdown import Markdown
markdown_content = Markdown(result.stdout, justify=True)
app.query_one("#main_content", Static).loading = False
app.query_one("#main_content", Static).update(markdown_content)
else:
app.query_one("#main_content", Static).update(f"Failed to fetch message {message_id}.")
except Exception as e:
app.query_one("#main_content", Static).update(f"Error: {e}")

View File

@@ -0,0 +1,22 @@
import subprocess
from maildir_gtd.screens.CreateTask import CreateTaskScreen
def action_create_task(app) -> None:
"""Show the input modal for creating a task."""
def check_task(task_args: str) -> bool:
try:
result = subprocess.run(
["task", "add"] + task_args.split(" "),
capture_output=True,
text=True
)
if result.returncode == 0:
app.show_status("Task created successfully.")
else:
app.show_status(f"Failed to create task: {result.stderr}")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")
return True
return False
app.push_screen(CreateTaskScreen(), check_task)

142
maildir_gtd/app.py Normal file
View File

@@ -0,0 +1,142 @@
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import logging
from typing import Iterable
from textual import on
from textual.app import App, ComposeResult, SystemCommand
from textual.logging import TextualHandler
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Label, Input, Button
from textual.reactive import Reactive
from textual.binding import Binding
from textual.timer import Timer
from textual.containers import ScrollableContainer, Horizontal, Vertical, Grid
import subprocess
from maildir_gtd.actions.archive import action_archive
from maildir_gtd.actions.delete import action_delete
from maildir_gtd.actions.open import action_open
from maildir_gtd.actions.show_message import show_message
from maildir_gtd.actions.next import action_next
from maildir_gtd.actions.previous import action_previous
from maildir_gtd.actions.task import action_create_task
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
class EmailViewerApp(App):
"""A simple email viewer app using the Himalaya CLI."""
title = "Maildir GTD Reader"
CSS_PATH = "email_viewer.tcss" # Optional: For styling
current_message_id: Reactive[int] = Reactive(1)
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next)
yield SystemCommand("Previous Message", "Navigate to Previous ID", self.action_previous)
yield SystemCommand("Delete Message", "Delete the current message", self.action_delete)
yield SystemCommand("Archive Message", "Archive the current message", self.action_archive)
yield SystemCommand("Open Message", "Open a specific message by ID", self.action_open)
yield SystemCommand("Create Task", "Create a task using the task CLI", self.action_create_task)
BINDINGS = [
Binding("n", "next", "Next message"),
Binding("p", "previous", "Previous message"),
Binding("d", "delete", "Delete message"),
Binding("a", "archive", "Archive message"),
Binding("o", "open", "Open message", show=False),
Binding("q", "quit", "Quit application"),
Binding("t", "create_task", "Create Task")
]
BINDINGS.extend([
Binding("j", "scroll_down", "Scroll down"),
Binding("k", "scroll_up", "Scroll up"),
Binding("down", "scroll_down", "Scroll down"),
Binding("up", "scroll_up", "Scroll up"),
Binding("space", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up")
])
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header(show_clock=True)
yield Footer(Label("[n] Next | [p] Previous | [d] Delete | [a] Archive | [o] Open | [q] Quit | [c] Create Task"))
yield ScrollableContainer(Static(Label("Email Viewer App"), id="main_content"))
def on_mount(self) -> None:
"""Called when the app is mounted."""
self.alert_timer: Timer | None = None # Timer to throttle alerts
# Fetch the ID of the most recent message using the Himalaya CLI
try:
result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"],
capture_output=True,
text=True
)
if result.returncode == 0:
import json
envelopes = json.loads(result.stdout)
if envelopes:
self.current_message_id = int(envelopes[0]['id']) # Get the first envelope's ID
else:
self.query_one("#main_content", Static).update("Failed to fetch the most recent message ID.")
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
self.show_message(self.current_message_id)
def show_message(self, message_id: int) -> None:
show_message(self, message_id)
def show_status(self, message: str, severity: str = "information") -> None:
"""Display a status message using the built-in notify function."""
self.notify(message, title="Status", severity=severity, timeout=1, markup=True)
def action_next(self) -> None:
action_next(self)
def action_previous(self) -> None:
action_previous(self)
def action_delete(self) -> None:
action_delete(self)
def action_archive(self) -> None:
action_archive(self)
def action_open(self) -> None:
action_open(self)
def action_create_task(self) -> None:
action_create_task(self)
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content", Static).scroll_down()
def action_scroll_up(self) -> None:
"""Scroll the main content up."""
self.query_one("#main_content", Static).scroll_up()
def action_scroll_page_down(self) -> None:
"""Scroll the main content down by a page."""
self.query_one("#main_content", Static).scroll_page_down()
def action_scroll_page_up(self) -> None:
"""Scroll the main content up by a page."""
self.query_one("#main_content", Static).scroll_page_up()
def action_quit(self) -> None:
"""Quit the application."""
self.exit()
if __name__ == "__main__":
app = EmailViewerApp()
app.run()

View File

@@ -0,0 +1,15 @@
/* Basic stylesheet for the Textual Email Viewer App */
Label#task_prompt {
padding: 1;
color: rgb(128,128,128);
}
Label#task_prompt_label {
padding: 1;
color: rgb(255, 216, 102);
}
Label#message_label {
padding: 1;
}

View File

@@ -0,0 +1,26 @@
from textual import on
from textual.app import ComposeResult, Screen
from textual.widgets import Input, Label
from textual.containers import Horizontal
class CreateTaskScreen(Screen[str]):
def compose(self) -> ComposeResult:
yield Horizontal(
Label("$>", id="task_prompt"),
Label("task add ", id="task_prompt_label"),
Input(placeholder="arguments", id="task_input")
)
@on(Input.Submitted)
def handle_task_args(self) -> None:
input_widget = self.query_one("#task_input", Input)
self.disabled = True
self.loading = True
task_args = input_widget.value
self.dismiss(task_args)
@on(Input._on_key)
def handle_close(self, event) -> None:
if (event.key == "escape" or event.key == "ctrl+c"):
self.dismiss()

View File

@@ -0,0 +1,25 @@
from textual import on
from textual.app import ComposeResult, Screen
from textual.widgets import Input, Label, Button
from textual.containers import Horizontal
class OpenMessageScreen(Screen[int]):
def compose(self) -> ComposeResult:
yield Horizontal(
Label("📨", id="message_label"),
Input(placeholder="Enter message ID (integer only)", type="integer", id="open_message_input"),
Button("Open", variant="primary", id="open_message_button")
)
@on(Input.Submitted)
def handle_message_id(self) -> None:
input_widget = self.query_one("#open_message_input", Input)
self.disabled = True
self.loading = True
message_id = int(input_widget.value)
self.dismiss(message_id)
@on(Input._on_key)
def handle_close(self, event) -> None:
if (event.key == "escape" or event.key == "ctrl+c"):
self.dismiss()

View File

@@ -0,0 +1 @@
# Initialize the screens subpackage