working good state

This commit is contained in:
Tim Bendt
2025-05-01 11:59:28 -04:00
parent 7a5b911414
commit 1f75a03553
20 changed files with 173 additions and 78 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -12,9 +12,8 @@ def action_archive(app) -> None:
text=True text=True
) )
if result.returncode == 0: if result.returncode == 0:
app.query_one("#main_content", Static).update(f"Message {app.current_message_id} archived.")
action_next(app) # Automatically show the next message action_next(app) # Automatically show the next message
else: else:
app.query_one("#main_content", Static).update(f"Failed to archive message {app.current_message_id}.") app.show_status(f"Error archiving message: {result.stderr}", "error")
except Exception as e: except Exception as e:
app.query_one("#main_content", Static).update(f"Error: {e}") app.show_status(f"Error: {e}", "error")

View File

@@ -14,10 +14,9 @@ def action_delete(app) -> None:
text=True text=True
) )
if result.returncode == 0: if result.returncode == 0:
app.query_one("#main_content", Static).loading = False app.query_one("#main_content").loading = False
app.query_one("#main_content", Static).update(f"Message {app.current_message_id} deleted.")
action_next(app) # Automatically show the next message action_next(app) # Automatically show the next message
else: else:
app.query_one("#main_content", Static).update(f"Failed to delete message {app.current_message_id}.") app.show_status(f"Failed to delete message {app.current_message_id}.", "error")
except Exception as e: except Exception as e:
app.query_one("#main_content", Static).update(f"Error: {e}") app.show_status(f"Error: {e}", "error")

View File

@@ -0,0 +1,23 @@
import subprocess
def action_newest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json", "-s", "9999"],
capture_output=True,
text=True
)
if result.returncode == 0:
import json
envelopes = json.loads(result.stdout)
ids = sorted((int(envelope['id']) for envelope in envelopes), reverse=True)
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -15,7 +15,7 @@ def action_next(app) -> None:
"""Show the next email message by finding the next higher ID from the list of envelope IDs.""" """Show the next email message by finding the next higher ID from the list of envelope IDs."""
try: try:
result = subprocess.run( result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"], ["himalaya", "envelope", "list", "-o", "json", "-s", "9999"],
capture_output=True, capture_output=True,
text=True text=True
) )
@@ -23,13 +23,13 @@ def action_next(app) -> None:
import json import json
envelopes = json.loads(result.stdout) envelopes = json.loads(result.stdout)
ids = sorted(int(envelope['id']) for envelope in envelopes) ids = sorted(int(envelope['id']) for envelope in envelopes)
for index, envelope_id in enumerate(ids): for envelope_id in ids:
if envelope_id > int(app.current_message_id): if envelope_id > int(app.current_message_id):
app.show_message(envelope_id) app.show_message(envelope_id)
return return
app.show_status("No newer messages found.", severity="warning") app.show_status("No newer messages found.", severity="warning")
app.show_message(envelopes[-1]['id']) # Automatically show the previous message app.action_newest()
else: else:
app.show_status("Failed to fetch envelope list.", severity="error") app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e: except Exception as e:

View File

@@ -0,0 +1,23 @@
import subprocess
def action_oldest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json", "-s", "9999"],
capture_output=True,
text=True
)
if result.returncode == 0:
import json
envelopes = json.loads(result.stdout)
ids = sorted((int(envelope['id']) for envelope in envelopes))
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -1,11 +1,11 @@
from textual.widgets import Static
import subprocess import subprocess
def action_previous(app) -> None: def action_previous(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs.""" """Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try: try:
result = subprocess.run( result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"], ["himalaya", "envelope", "list", "-o", "json", "-s", "9999"],
capture_output=True, capture_output=True,
text=True text=True
) )
@@ -19,6 +19,7 @@ def action_previous(app) -> None:
app.show_message(app.current_message_id) app.show_message(app.current_message_id)
return return
app.show_status("No older messages found.", severity="warning") app.show_status("No older messages found.", severity="warning")
app.action_oldest()
else: else:
app.show_status("Failed to fetch envelope list.", severity="error") app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e: except Exception as e:

View File

@@ -1,23 +1,14 @@
from textual.widgets import Static import logging
from rich.markdown import Markdown
import subprocess import subprocess
from textual.logging import TextualHandler
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
def show_message(app, message_id: int) -> None: def show_message(app, message_id: int) -> None:
"""Fetch and display the email message by ID.""" """Fetch and display the email message by ID."""
app.current_message_id = message_id app.current_message_id = message_id
app.query_one("#main_content", Static).loading = True logging.info("Showing message ID: " + str(message_id))
try:
result = subprocess.run(
["himalaya", "message", "read", str(message_id)],
capture_output=True,
text=True
)
if result.returncode == 0:
# Render the email content as Markdown
markdown_content = Markdown(result.stdout, justify=True)
app.query_one("#main_content", Static).loading = False
app.query_one("#main_content", Static).update(markdown_content)
else:
app.query_one("#main_content", Static).update(f"Failed to fetch message {message_id}.")
except Exception as e:
app.query_one("#main_content", Static).update(f"Error: {e}")

View File

@@ -1,5 +1,10 @@
import re
import sys import sys
import os import os
from datetime import datetime # Add this import at the top of the file
from actions.newest import action_newest
from actions.oldest import action_oldest
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import logging import logging
@@ -9,8 +14,8 @@ from textual.widget import Widget
from textual.app import App, ComposeResult, SystemCommand, RenderResult from textual.app import App, ComposeResult, SystemCommand, RenderResult
from textual.logging import TextualHandler from textual.logging import TextualHandler
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Label, Input, Button from textual.widgets import Header, Footer, Static, Label, Input, Button, Markdown
from textual.reactive import Reactive from textual.reactive import reactive, Reactive
from textual.binding import Binding from textual.binding import Binding
from textual.timer import Timer from textual.timer import Timer
from textual.containers import ScrollableContainer, Horizontal, Vertical, Grid from textual.containers import ScrollableContainer, Horizontal, Vertical, Grid
@@ -22,25 +27,24 @@ from maildir_gtd.actions.show_message import show_message
from maildir_gtd.actions.next import action_next from maildir_gtd.actions.next import action_next
from maildir_gtd.actions.previous import action_previous from maildir_gtd.actions.previous import action_previous
from maildir_gtd.actions.task import action_create_task from maildir_gtd.actions.task import action_create_task
from maildir_gtd.widgets.EnvelopeHeader import EnvelopeHeader
logging.basicConfig( logging.basicConfig(
level="NOTSET", level="NOTSET",
handlers=[TextualHandler()], handlers=[TextualHandler()],
) )
class Hello(Widget):
"""Display a greeting."""
def render(self) -> RenderResult:
return "Hello, [b]World[/b]!"
class StatusTitle(Static): class StatusTitle(Static):
total_messages: Reactive[int] = Reactive(0) total_messages: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = Reactive(0) current_message_index: Reactive[int] = reactive(0)
current_message_id: Reactive[int] = Reactive(1) current_message_id: Reactive[int] = reactive(1)
folder: Reactive[str] = reactive("INBOX")
def render(self) -> RenderResult: def render(self) -> RenderResult:
return f"Inbox Message ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}" return f"{self.folder} | ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}"
@@ -48,9 +52,10 @@ class StatusTitle(Static):
class EmailViewerApp(App): class EmailViewerApp(App):
"""A simple email viewer app using the Himalaya CLI.""" """A simple email viewer app using the Himalaya CLI."""
title = "Maildir GTD Reader" title = "Maildir GTD Reader"
current_message_id: Reactive[int] = Reactive(1) current_message_id: Reactive[int] = reactive(1)
CSS_PATH = "email_viewer.tcss" CSS_PATH = "email_viewer.tcss"
folder = reactive("INBOX")
markdown: Reactive[str] = reactive("Loading...")
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]: def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen) yield from super().get_system_commands(screen)
@@ -60,6 +65,8 @@ class EmailViewerApp(App):
yield SystemCommand("Archive Message", "Archive the current message", self.action_archive) yield SystemCommand("Archive Message", "Archive the current message", self.action_archive)
yield SystemCommand("Open Message", "Open a specific message by ID", self.action_open) yield SystemCommand("Open Message", "Open a specific message by ID", self.action_open)
yield SystemCommand("Create Task", "Create a task using the task CLI", self.action_create_task) yield SystemCommand("Create Task", "Create a task using the task CLI", self.action_create_task)
yield SystemCommand("Oldest Message", "Show the oldest message", self.action_oldest)
yield SystemCommand("Newest Message", "Show the newest message", self.action_newest)
BINDINGS = [ BINDINGS = [
Binding("j", "next", "Next message"), Binding("j", "next", "Next message"),
@@ -80,20 +87,36 @@ class EmailViewerApp(App):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
"""Create child widgets for the app.""" """Create child widgets for the app."""
yield Header(show_clock=True) # yield Header(show_clock=True)
yield StatusTitle().data_bind(EmailViewerApp.current_message_id) yield StatusTitle().data_bind(EmailViewerApp.current_message_id)
yield ScrollableContainer(Static(Label("Loading Email Viewer App"), id="main_content")) yield EnvelopeHeader()
yield Markdown(id="main_content", markdown=self.markdown)
yield Footer() yield Footer()
def watch_current_message_id(self, message_id: int, old_message_id: int) -> None: def watch_current_message_id(self, old_message_id: int, new_message_id: int) -> None:
"""Called when the current message ID changes.""" """Called when the current message ID changes."""
if (message_id == old_message_id): logging.info(f"Current message ID changed from {old_message_id} to {new_message_id}")
self.query_one("#main_content").loading = True
self.markdown = ""
if (new_message_id == old_message_id):
return return
# self.notify("Current message ID changed", title="Status", severity="information")
logging.info(f"Current message ID changed to {message_id}")
try: try:
rawText = subprocess.run(
["himalaya", "message", "read", str(new_message_id)],
capture_output=True,
text=True
)
if rawText.returncode == 0:
# Render the email content as Markdown
fixedText = rawText.stdout.replace("(https://urldefense.com/v3/", "(")
fixedText = re.sub(r"atlOrigin.+?\)", ")", fixedText)
self.query_one("#main_content").loading = False
self.query_one("#main_content").update(markdown = str(fixedText))
logging.info(fixedText)
result = subprocess.run( result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"], ["himalaya", "envelope", "list", "-o", "json", "-s", "9999"],
capture_output=True, capture_output=True,
text=True text=True
) )
@@ -102,40 +125,35 @@ class EmailViewerApp(App):
envelopes = json.loads(result.stdout) envelopes = json.loads(result.stdout)
if envelopes: if envelopes:
status = self.query_one(StatusTitle) status = self.query_one(StatusTitle)
status.total_messages = len(envelopes) # Get the first envelope's ID status.total_messages = len(envelopes)
status.current_message_index = next(
(index + 1 for index, envelope in enumerate(envelopes) if int(envelope['id']) == message_id), headers = self.query_one(EnvelopeHeader)
0 # Default to 0 if no match is found # Find the index of the envelope that matches the current_message_id
) for index, envelope in enumerate(sorted(envelopes, key=lambda x: int(x['id']))):
if int(envelope['id']) == new_message_id:
headers.subject = envelope['subject']
headers.from_ = envelope['from']['addr']
headers.to = envelope['to']['addr']
headers.date = datetime.strptime(envelope['date'].replace("+00:00", ""), "%Y-%m-%d %H:%M").strftime("%a %b %d %H:%M")
status.current_message_index = index + 1 # 1-based index
break
status.update()
headers.update()
else: else:
self.query_one("#main_content", Static).update("Failed to fetch the most recent message ID.") self.query_one("#main_content").update("Failed to fetch the most recent message ID.")
except Exception as e: except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}") self.query_one("#main_content").update(f"Error: {e}")
def on_mount(self) -> None: def on_mount(self) -> None:
self.alert_timer: Timer | None = None # Timer to throttle alerts self.alert_timer: Timer | None = None # Timer to throttle alerts
self.theme = "monokai" self.theme = "monokai"
self.title = "MaildirGTD"
# self.watch(self.query_one(StatusTitle), "current_message_id", update_progress) # self.watch(self.query_one(StatusTitle), "current_message_id", update_progress)
# Fetch the ID of the most recent message using the Himalaya CLI # Fetch the ID of the most recent message using the Himalaya CLI
try: self.action_oldest()
result = subprocess.run(
["himalaya", "envelope", "list", "-o", "json"],
capture_output=True,
text=True
)
if result.returncode == 0:
import json
envelopes = json.loads(result.stdout)
if envelopes:
self.current_message_id = int(envelopes[0]['id'])
else:
self.query_one("#main_content", Static).update("Failed to fetch the most recent message ID.")
except Exception as e:
self.query_one("#main_content", Static).update(f"Error: {e}")
self.show_message(self.current_message_id)
def show_message(self, message_id: int) -> None: def show_message(self, message_id: int) -> None:
show_message(self, message_id) show_message(self, message_id)
@@ -165,24 +183,30 @@ class EmailViewerApp(App):
def action_scroll_down(self) -> None: def action_scroll_down(self) -> None:
"""Scroll the main content down.""" """Scroll the main content down."""
self.query_one("#main_content", Static).scroll_down() self.query_one("#main_content").scroll_down()
def action_scroll_up(self) -> None: def action_scroll_up(self) -> None:
"""Scroll the main content up.""" """Scroll the main content up."""
self.query_one("#main_content", Static).scroll_up() self.query_one("#main_content").scroll_up()
def action_scroll_page_down(self) -> None: def action_scroll_page_down(self) -> None:
"""Scroll the main content down by a page.""" """Scroll the main content down by a page."""
self.query_one("#main_content", Static).scroll_page_down() self.query_one("#main_content").scroll_page_down()
def action_scroll_page_up(self) -> None: def action_scroll_page_up(self) -> None:
"""Scroll the main content up by a page.""" """Scroll the main content up by a page."""
self.query_one("#main_content", Static).scroll_page_up() self.query_one("#main_content").scroll_page_up()
def action_quit(self) -> None: def action_quit(self) -> None:
"""Quit the application.""" """Quit the application."""
self.exit() self.exit()
def action_oldest(self) -> None:
action_oldest(self)
def action_newest(self) -> None:
action_newest(self)
if __name__ == "__main__": if __name__ == "__main__":
app = EmailViewerApp() app = EmailViewerApp()
app.run() app.run()

View File

@@ -18,6 +18,16 @@ StatusTitle {
width: 100%; width: 100%;
height: 1; height: 1;
color: $text; color: $text;
background: $surface; background: rgb(64, 62, 65);
content-align: center middle; content-align: center middle;
} }
EnvelopeHeader {
width: 100%;
height: auto;
background: $panel;
}
#main_content {
padding: 1 2;
}

View File

@@ -0,0 +1,24 @@
from textual.reactive import Reactive
from textual.app import RenderResult
from textual.widgets import Static, Label
class EnvelopeHeader(Static):
subject = Reactive("")
from_ = Reactive("")
to = Reactive("")
date = Reactive("")
"""Header for the email viewer."""
def on_mount(self) -> None:
"""Mount the header."""
def render(self) -> RenderResult:
return f"[b][dim]Subject:[/dim] {self.subject}[/] \r\n" \
f"[dim]From:[/dim] {self.from_} \r\n" \
f"[dim]To:[/dim] {self.to} \r\n" \
f"[dim]Date:[/dim] {self.date}"

View File

@@ -0,0 +1 @@
# Initialize the screens subpackage