wip: refactor

This commit is contained in:
Tim Bendt
2025-07-01 22:24:34 -04:00
parent bec09bade8
commit 5232bf3b09
66 changed files with 842 additions and 165 deletions

20
src/cli/__init__.py Normal file
View File

@@ -0,0 +1,20 @@
# CLI module for the application
import click
from .sync import sync
from .drive import drive
from .email import email
from .calendar import calendar
@click.group()
def cli():
"""Root command for the CLI."""
pass
cli.add_command(sync)
cli.add_command(drive)
cli.add_command(email)
cli.add_command(calendar)

4
src/cli/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from . import cli
if __name__ == "__main__":
cli()

8
src/cli/calendar.py Normal file
View File

@@ -0,0 +1,8 @@
import click
import subprocess
@click.command()
def calendar():
"""Open the calendar (khal interactive)."""
click.echo("Opening calendar...")
subprocess.run(["khal", "interactive"])

9
src/cli/drive.py Normal file
View File

@@ -0,0 +1,9 @@
import click
import subprocess
@click.command()
def drive():
"""View OneDrive files."""
click.echo("Launching OneDrive viewer...")
subprocess.run(["python3", "src/drive_view_tui.py"])

9
src/cli/email.py Normal file
View File

@@ -0,0 +1,9 @@
import click
from src.maildir_gtd.app import launch_email_viewer
@click.command()
def email():
"""Read emails from Maildir."""
click.echo("Opening email viewer...")
launch_email_viewer()

283
src/cli/sync.py Normal file
View File

@@ -0,0 +1,283 @@
import click
import asyncio
import os
import sys
from rich.progress import Progress, SpinnerColumn, MofNCompleteColumn
from datetime import datetime, timedelta
from src.utils.mail_utils.helpers import ensure_directory_exists
from src.utils.calendar_utils import save_events_to_vdir, save_events_to_file
from src.services.microsoft_graph.calendar import fetch_calendar_events
from src.services.microsoft_graph.mail import (
fetch_mail_async,
archive_mail_async,
delete_mail_async,
synchronize_maildir_async,
)
from src.services.microsoft_graph.auth import get_access_token
# Function to create Maildir structure
def create_maildir_structure(base_path):
"""
Create the standard Maildir directory structure.
Args:
base_path (str): Base path for the Maildir.
Returns:
None
"""
ensure_directory_exists(os.path.join(base_path, "cur"))
ensure_directory_exists(os.path.join(base_path, "new"))
ensure_directory_exists(os.path.join(base_path, "tmp"))
ensure_directory_exists(os.path.join(base_path, ".Archives"))
ensure_directory_exists(os.path.join(base_path, ".Trash", "cur"))
async def fetch_calendar_async(headers, progress, task_id, dry_run, vdir_path, ics_path, org_name, days_back, days_forward, continue_iteration):
"""
Fetch calendar events and save them in the appropriate format.
Args:
headers: Authentication headers for Microsoft Graph API
progress: Progress instance for updating progress bars
task_id: ID of the task in the progress bar
Returns:
List of event dictionaries
Raises:
Exception: If there's an error fetching or saving events
"""
try:
# Use the utility function to fetch calendar events
progress.console.print(
"[cyan]Fetching events from Microsoft Graph API...[/cyan]"
)
events, total_events = await fetch_calendar_events(
headers=headers, days_back=days_back, days_forward=days_forward
)
progress.console.print(
f"[cyan]Got {len(events)} events from API (reported total: {total_events})[/cyan]"
)
# Update progress bar with total events
progress.update(task_id, total=total_events)
# Save events to appropriate format
if not dry_run:
if vdir_path:
# Create org-specific directory within vdir path
org_vdir_path = os.path.join(vdir_path, org_name)
progress.console.print(
f"[cyan]Saving events to vdir: {org_vdir_path}[/cyan]"
)
save_events_to_vdir(events, org_vdir_path,
progress, task_id, dry_run)
progress.console.print(
f"[green]Finished saving events to vdir: {org_vdir_path}[/green]"
)
elif ics_path:
# Save to a single ICS file in the output_ics directory
progress.console.print(
f"[cyan]Saving events to ICS file: {ics_path}/events_latest.ics[/cyan]"
)
save_events_to_file(
events, f"{ics_path}/events_latest.ics", progress, task_id, dry_run
)
progress.console.print(
f"[green]Finished saving events to ICS file[/green]"
)
else:
# No destination specified
progress.console.print(
"[yellow]Warning: No destination path (--vdir or --icsfile) specified for calendar events.[/yellow]"
)
else:
progress.console.print(
f"[DRY-RUN] Would save {len(events)} events to {
'vdir format' if vdir_path else 'single ICS file'}"
)
progress.update(task_id, advance=len(events))
# Interactive mode: Ask if the user wants to continue with the next date range
if continue_iteration:
# Move to the next date range
next_start_date = datetime.now() - timedelta(days=days_back)
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(
f"\nCurrent date range: {next_start_date.strftime(
'%Y-%m-%d')} to {next_end_date.strftime('%Y-%m-%d')}"
)
user_response = click.prompt(
"\nContinue to iterate? [y/N]", default="N").strip().lower()
while user_response == "y":
progress.console.print(
f"\nFetching events for {next_start_date.strftime(
'%Y-%m-%d')} to {next_end_date.strftime('%Y-%m-%d')}..."
)
# Reset the progress bar for the new fetch
progress.update(task_id, completed=0, total=0)
# Fetch events for the next date range
next_events, next_total_events = await fetch_calendar_events(
headers=headers,
days_back=0,
days_forward=days_forward,
start_date=next_start_date,
end_date=next_end_date,
)
# Update progress bar with total events
progress.update(task_id, total=next_total_events)
if not dry_run:
if vdir_path:
save_events_to_vdir(
next_events, org_vdir_path, progress, task_id, dry_run
)
else:
save_events_to_file(
next_events,
f"output_ics/outlook_events_{next_start_date.strftime('%Y%m%d')}.ics",
progress,
task_id,
dry_run,
)
else:
progress.console.print(
f"[DRY-RUN] Would save {len(next_events)} events to {
'vdir format' if vdir_path else 'output_ics/outlook_events_' + next_start_date.strftime('%Y%m%d') + '.ics'}"
)
progress.update(task_id, advance=len(next_events))
# Calculate the next date range
next_start_date = next_end_date
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(
f"\nNext date range would be: {next_start_date.strftime(
'%Y-%m-%d')} to {next_end_date.strftime('%Y-%m-%d')}"
)
user_response = click.prompt(
"\nContinue to iterate? [y/N]", default="N").strip().lower()
return events
except Exception as e:
progress.console.print(
f"[red]Error fetching or saving calendar events: {str(e)}[/red]"
)
import traceback
progress.console.print(f"[red]{traceback.format_exc()}[/red]")
progress.update(task_id, completed=True)
return []
async def _sync_outlook_data(dry_run, vdir, icsfile, org, days_back, days_forward, continue_iteration, download_attachments):
"""Synchronize data from external sources."""
# Save emails to Maildir
maildir_path = (
os.getenv("MAILDIR_PATH", os.path.expanduser(
"~/Mail")) + f"/{org}"
)
attachments_dir = os.path.join(maildir_path, "attachments")
ensure_directory_exists(attachments_dir)
create_maildir_structure(maildir_path)
# Define scopes for Microsoft Graph API
scopes = [
"https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite",
]
# Authenticate and get access token
access_token, headers = get_access_token(scopes)
# Set up the progress bars
progress = Progress(
SpinnerColumn(), MofNCompleteColumn(), *Progress.get_default_columns()
)
with progress:
task_fetch = progress.add_task("[green]Syncing Inbox...", total=0)
task_calendar = progress.add_task(
"[cyan]Fetching calendar...", total=0)
task_read = progress.add_task("[blue]Marking as read...", total=0)
task_archive = progress.add_task("[yellow]Archiving mail...", total=0)
task_delete = progress.add_task("[red]Deleting mail...", total=0)
await asyncio.gather(
synchronize_maildir_async(
maildir_path, headers, progress, task_read, dry_run
),
archive_mail_async(maildir_path, headers,
progress, task_archive, dry_run),
delete_mail_async(maildir_path, headers,
progress, task_delete, dry_run),
fetch_mail_async(
maildir_path,
attachments_dir,
headers,
progress,
task_fetch,
dry_run,
download_attachments,
),
fetch_calendar_async(headers, progress, task_calendar, dry_run, vdir, icsfile, org, days_back, days_forward, continue_iteration),
)
click.echo("Sync complete.")
@click.command()
@click.option(
"--dry-run",
is_flag=True,
help="Run in dry-run mode without making changes.",
default=False,
)
@click.option(
"--vdir",
help="Output calendar events in vdir format to the specified directory (each event in its own file)",
default="~/Calendar",
)
@click.option(
"--icsfile", help="Output calendar events into this ics file path.", default=None
)
@click.option(
"--org",
help="Specify the organization name for the subfolder to store emails and calendar events",
default="corteva",
)
@click.option(
"--days-back",
type=int,
help="Number of days to look back for calendar events",
default=1,
)
@click.option(
"--days-forward",
type=int,
help="Number of days to look forward for calendar events",
default=6,
)
@click.option(
"--continue-iteration",
is_flag=True,
help="Enable interactive mode to continue fetching more date ranges",
default=False,
)
@click.option(
"--download-attachments",
is_flag=True,
help="Download email attachments",
default=False,
)
def sync(dry_run, vdir, icsfile, org, days_back, days_forward, continue_iteration, download_attachments):
asyncio.run(_sync_outlook_data(dry_run, vdir, icsfile, org, days_back, days_forward, continue_iteration, download_attachments))

View File

@@ -0,0 +1 @@
# Initialize the maildir_gtd package

View File

@@ -0,0 +1 @@
# Initialize the actions subpackage

View File

@@ -0,0 +1,42 @@
import asyncio
import logging
from textual import work
from src.services.himalaya import client as himalaya_client
@work(exclusive=True)
async def archive_current(app):
"""Archive the current message."""
if not app.current_message_id:
app.show_status("No message selected to archive.", "error")
return
# Store the current message ID and index
current_message_id = app.current_message_id
current_index = app.current_message_index
# Find the next message to display after archiving
next_id, next_idx = app.message_store.find_next_valid_id(current_index)
if next_id is None or next_idx is None:
# If there's no next message, try to find a previous one
next_id, next_idx = app.message_store.find_prev_valid_id(current_index)
# Archive the message using our Himalaya client module
success = await himalaya_client.archive_message(current_message_id)
if success:
app.show_status(f"Message {current_message_id} archived.", "success")
app.message_store.remove_envelope(current_message_id)
app.refresh_list_view()
# Select the next available message if it exists
if next_id is not None and next_idx is not None:
app.current_message_id = next_id
app.current_message_index = next_idx
else:
# If there are no other messages, reset the UI
app.current_message_id = 0
app.show_status("No more messages available.", "warning")
else:
app.show_status(f"Failed to archive message {current_message_id}.", "error")

View File

@@ -0,0 +1,41 @@
import asyncio
import logging
from textual import work
from src.services.himalaya import client as himalaya_client
@work(exclusive=True)
async def delete_current(app):
"""Delete the current message."""
if not app.current_message_id:
app.show_status("No message selected to delete.", "error")
return
# Store the current message ID and index
current_message_id = app.current_message_id
current_index = app.current_message_index
# Find the next message to display after deletion
next_id, next_idx = app.message_store.find_next_valid_id(current_index)
if next_id is None or next_idx is None:
# If there's no next message, try to find a previous one
next_id, next_idx = app.message_store.find_prev_valid_id(current_index)
# Delete the message using our Himalaya client module
success = await himalaya_client.delete_message(current_message_id)
if success:
app.show_status(f"Message {current_message_id} deleted.", "success")
app.message_store.remove_envelope(current_message_id)
app.refresh_list_view()
# Select the next available message if it exists
if next_id is not None and next_idx is not None:
app.current_message_id = next_id
app.current_message_index = next_idx
else:
# If there are no other messages, reset the UI
app.current_message_id = 0
app.show_status("No more messages available.", "warning")
else:
app.show_status(f"Failed to delete message {current_message_id}.", "error")

View File

@@ -0,0 +1,17 @@
async def action_newest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if app.reload_needed:
await app.action_fetch_list()
ids = sorted(
(int(envelope["id"]) for envelope in app.all_envelopes), reverse=True
)
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,17 @@
async def action_next(app) -> None:
"""Show the next email message by finding the next higher ID from the list of envelope IDs."""
try:
if app.reload_needed:
app.action_fetch_list()
ids = sorted(int(envelope["id"]) for envelope in app.all_envelopes)
for envelope_id in ids:
if envelope_id > int(app.current_message_id):
app.show_message(envelope_id)
return
app.show_status("No newer messages found.", severity="warning")
app.action_newest()
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,15 @@
def action_oldest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if app.reload_needed:
app.action_fetch_list()
ids = sorted((int(envelope["id"]) for envelope in app.all_envelopes))
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,21 @@
from ..screens.OpenMessage import OpenMessageScreen
def action_open(app) -> None:
"""Show the input modal for opening a specific message by ID."""
def check_id(message_id: str | None) -> bool:
try:
int(message_id)
app.show_message(message_id)
if message_id is not None and message_id > 0:
app.show_message(message_id)
except ValueError:
app.bell()
app.show_status(
"Invalid message ID. Please enter an integer.", severity="error"
)
return True
return False
app.push_screen(OpenMessageScreen(), check_id)

View File

@@ -0,0 +1,20 @@
def action_previous(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if app.reload_needed:
app.action_fetch_list()
ids = sorted(
(int(envelope["id"]) for envelope in app.all_envelopes), reverse=True
)
for envelope_id in ids:
if envelope_id < int(app.current_message_id):
app.current_message_id = envelope_id
app.show_message(app.current_message_id)
return
app.show_status("No older messages found.", severity="warning")
app.action_oldest()
else:
app.show_status("Failed to fetch envelope list.", severity="error")
except Exception as e:
app.show_status(f"Error: {e}", severity="error")

View File

@@ -0,0 +1,14 @@
import logging
from textual.logging import TextualHandler
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
def show_message(app, message_id: int) -> None:
"""Fetch and display the email message by ID."""
logging.info("Showing message ID: " + str(message_id))
app.current_message_id = message_id

View File

@@ -0,0 +1,54 @@
import asyncio
import logging
from textual import work
from textual.screen import ModalScreen
from src.services.taskwarrior import client as taskwarrior_client
from ..screens.CreateTask import CreateTaskScreen
class TaskAction:
def __init__(self, app):
self.app = app
def action_create_task(app):
"""Show the create task screen."""
current_message_id = app.current_message_id
if not current_message_id:
app.show_status("No message selected to create task from.", "error")
return
# Prepare data for the create task screen
metadata = app.message_store.get_metadata(current_message_id)
subject = metadata.get("subject", "No subject") if metadata else "No subject"
from_addr = metadata["from"].get("addr", "Unknown") if metadata else "Unknown"
# Show the create task screen with the current message data
app.push_screen(CreateTaskScreen(subject=subject, from_addr=from_addr))
@work(exclusive=True)
async def create_task(
subject, description=None, tags=None, project=None, due=None, priority=None
):
"""
Create a task with the Taskwarrior API client.
"""
try:
success, result = await taskwarrior_client.create_task(
task_description=subject,
tags=tags or [],
project=project,
due=due,
priority=priority,
)
if success:
return True, result
else:
logging.error(f"Failed to create task: {result}")
return False, result
except Exception as e:
logging.error(f"Exception creating task: {e}")
return False, str(e)

468
src/maildir_gtd/app.py Normal file
View File

@@ -0,0 +1,468 @@
from .message_store import MessageStore
from .widgets.ContentContainer import ContentContainer
from .widgets.EnvelopeHeader import EnvelopeHeader
from .actions.task import action_create_task
from .actions.open import action_open
from .actions.delete import delete_current
from .actions.archive import archive_current
from src.services.taskwarrior import client as taskwarrior_client
from src.services.himalaya import client as himalaya_client
from textual.containers import ScrollableContainer, Vertical, Horizontal
from textual.timer import Timer
from textual.binding import Binding
from textual.reactive import reactive, Reactive
from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem
from textual.screen import Screen
from textual.logging import TextualHandler
from textual.app import App, ComposeResult, SystemCommand, RenderResult
from textual.worker import Worker
from textual import work
import re
import sys
import os
from datetime import UTC, datetime
import asyncio
import logging
from typing import Iterable, Optional, List, Dict, Any, Generator, Tuple
from collections import defaultdict
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import our new API modules
# Updated imports with correct relative paths
logging.basicConfig(
level="NOTSET",
handlers=[TextualHandler()],
)
class StatusTitle(Static):
total_messages: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
current_message_id: Reactive[int] = reactive(1)
folder: Reactive[str] = reactive("INBOX")
def render(self) -> RenderResult:
return f"{self.folder} | ID: {self.current_message_id} | [b]{self.current_message_index}[/b]/{self.total_messages}"
class EmailViewerApp(App):
"""A simple email viewer app using the Himalaya CLI."""
CSS_PATH = "email_viewer.tcss"
title = "Maildir GTD Reader"
current_message_id: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
folder = reactive("INBOX")
header_expanded = reactive(False)
reload_needed = reactive(True)
message_store = MessageStore()
oldest_id: Reactive[int] = reactive(0)
newest_id: Reactive[int] = reactive(0)
msg_worker: Worker | None = None
total_messages: Reactive[int] = reactive(0)
status_title = reactive("Message View")
sort_order_ascending: Reactive[bool] = reactive(True)
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next)
yield SystemCommand(
"Previous Message", "Navigate to Previous ID", self.action_previous
)
yield SystemCommand(
"Delete Message", "Delete the current message", self.action_delete
)
yield SystemCommand(
"Archive Message", "Archive the current message", self.action_archive
)
yield SystemCommand(
"Open Message", "Open a specific message by ID", self.action_open
)
yield SystemCommand(
"Create Task", "Create a task using the task CLI", self.action_create_task
)
yield SystemCommand(
"Oldest Message", "Show the oldest message", self.action_oldest
)
yield SystemCommand(
"Newest Message", "Show the newest message", self.action_newest
)
yield SystemCommand("Reload", "Reload the message list", self.fetch_envelopes)
BINDINGS = [
Binding("j", "next", "Next message"),
Binding("k", "previous", "Previous message"),
Binding("#", "delete", "Delete message"),
Binding("e", "archive", "Archive message"),
Binding("o", "open", "Open message", show=False),
Binding("q", "quit", "Quit application"),
Binding("h", "toggle_header", "Toggle Envelope Header"),
Binding("t", "create_task", "Create Task"),
Binding("%", "reload", "Reload message list"),
Binding("1", "focus_1", "Focus Accounts Panel"),
Binding("2", "focus_2", "Focus Folders Panel"),
Binding("3", "focus_3", "Focus Envelopes Panel"),
Binding("m", "toggle_mode", "Toggle Content Mode"),
]
BINDINGS.extend(
[
Binding("space", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up"),
Binding("s", "toggle_sort_order", "Toggle Sort Order"),
]
)
def compose(self) -> ComposeResult:
yield Horizontal(
Vertical(
ListView(
ListItem(Label("All emails...")),
id="envelopes_list",
classes="list_view",
initial_index=0,
),
ListView(id="accounts_list", classes="list_view"),
ListView(id="folders_list", classes="list_view"),
id="sidebar",
),
ContentContainer(id="main_content"),
id="outer-wrapper",
)
yield Footer()
async def on_mount(self) -> None:
self.alert_timer: Timer | None = None # Timer to throttle alerts
self.theme = "monokai"
self.title = "MaildirGTD"
self.query_one("#main_content").border_title = self.status_title
sort_indicator = "" if self.sort_order_ascending else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {
sort_indicator}"
self.query_one("#accounts_list").border_title = "2⃣ Accounts"
self.query_one("#folders_list").border_title = "3⃣ Folders"
self.fetch_accounts()
self.fetch_folders()
worker = self.fetch_envelopes()
await worker.wait()
self.query_one("#envelopes_list").focus()
self.action_oldest()
def compute_status_title(self):
return f"✉️ Message ID: {self.current_message_id} "
def watch_status_title(self, old_status_title: str, new_status_title: str) -> None:
self.query_one(ContentContainer).border_title = new_status_title
def watch_sort_order_ascending(self, old_value: bool, new_value: bool) -> None:
"""Update the border title of the envelopes list when the sort order changes."""
sort_indicator = "" if new_value else ""
self.query_one("#envelopes_list").border_title = f"1⃣ Emails {
sort_indicator}"
def watch_current_message_index(self, old_index: int, new_index: int) -> None:
if new_index < 0:
new_index = 0
self.current_message_index = new_index
if new_index > self.total_messages:
new_index = self.total_messages
self.current_message_index = new_index
self.query_one(
"#envelopes_list"
).border_subtitle = f"[b]{new_index}[/b]/{self.total_messages}"
self.query_one("#envelopes_list").index = new_index
def watch_reload_needed(
self, old_reload_needed: bool, new_reload_needed: bool
) -> None:
logging.info(f"Reload needed: {new_reload_needed}")
if not old_reload_needed and new_reload_needed:
self.fetch_envelopes()
def watch_current_message_id(
self, old_message_id: int, new_message_id: int
) -> None:
"""Called when the current message ID changes."""
logging.info(
f"Current message ID changed from {
old_message_id} to {new_message_id}"
)
if new_message_id == old_message_id:
return
self.msg_worker.cancel() if self.msg_worker else None
logging.info(f"new_message_id: {new_message_id}, type: {
type(new_message_id)}")
content_container = self.query_one(ContentContainer)
content_container.display_content(new_message_id)
metadata = self.message_store.get_metadata(new_message_id)
if metadata:
# Pass the complete date string with timezone information
message_date = metadata["date"]
if self.current_message_index != metadata["index"]:
self.current_message_index = metadata["index"]
# content_container.update_header(
# subject=metadata.get("subject", "").strip(),
# from_=metadata["from"].get("addr", ""),
# to=metadata["to"].get("addr", ""),
# date=message_date,
# cc=metadata["cc"].get("addr", "") if "cc" in metadata else "",
# )
list_view = self.query_one("#envelopes_list")
if list_view.index != metadata["index"]:
list_view.index = metadata["index"]
else:
logging.warning(
f"Message ID {new_message_id} not found in metadata.")
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Called when an item in the list view is selected."""
current_item = self.message_store.envelopes[event.list_view.index]
if current_item is None or current_item.get("type") == "header":
return
message_id = int(current_item["id"])
self.current_message_id = message_id
self.current_message_index = event.list_view.index
@work(exclusive=False)
async def fetch_envelopes(self) -> None:
msglist = self.query_one("#envelopes_list")
try:
msglist.loading = True
# Use the Himalaya client to fetch envelopes
envelopes, success = await himalaya_client.list_envelopes()
if success and envelopes:
self.reload_needed = False
self.message_store.load(envelopes, self.sort_order_ascending)
self.total_messages = self.message_store.total_messages
# Use the centralized refresh method to update the ListView
self.refresh_list_view()
# Restore the current index
msglist.index = self.current_message_index
else:
self.show_status("Failed to fetch envelopes.", "error")
except Exception as e:
self.show_status(f"Error fetching message list: {e}", "error")
finally:
msglist.loading = False
@work(exclusive=False)
async def fetch_accounts(self) -> None:
accounts_list = self.query_one("#accounts_list")
try:
accounts_list.loading = True
# Use the Himalaya client to fetch accounts
accounts, success = await himalaya_client.list_accounts()
if success and accounts:
for account in accounts:
item = ListItem(
Label(
str(account["name"]).strip(),
classes="account_name",
markup=False,
)
)
accounts_list.append(item)
else:
self.show_status("Failed to fetch accounts.", "error")
except Exception as e:
self.show_status(f"Error fetching account list: {e}", "error")
finally:
accounts_list.loading = False
@work(exclusive=False)
async def fetch_folders(self) -> None:
folders_list = self.query_one("#folders_list")
folders_list.clear()
folders_list.append(
ListItem(Label("INBOX", classes="folder_name", markup=False))
)
try:
folders_list.loading = True
# Use the Himalaya client to fetch folders
folders, success = await himalaya_client.list_folders()
if success and folders:
for folder in folders:
item = ListItem(
Label(
str(folder["name"]).strip(),
classes="folder_name",
markup=False,
)
)
folders_list.append(item)
else:
self.show_status("Failed to fetch folders.", "error")
except Exception as e:
self.show_status(f"Error fetching folder list: {e}", "error")
finally:
folders_list.loading = False
def refresh_list_view(self) -> None:
"""Refresh the ListView to ensure it matches the MessageStore exactly."""
envelopes_list = self.query_one("#envelopes_list")
envelopes_list.clear()
for item in self.message_store.envelopes:
if item and item.get("type") == "header":
envelopes_list.append(
ListItem(
Label(
item["label"],
classes="group_header",
markup=False,
)
)
)
elif item: # Check if not None
envelopes_list.append(
ListItem(
Label(
str(item.get("subject", "")).strip(),
classes="email_subject",
markup=False,
)
)
)
# Update total messages count
self.total_messages = self.message_store.total_messages
def show_message(self, message_id: int, new_index=None) -> None:
if new_index:
self.current_message_index = new_index
self.current_message_id = message_id
def show_status(self, message: str, severity: str = "information") -> None:
"""Display a status message using the built-in notify function."""
self.notify(
message, title="Status", severity=severity, timeout=2.6, markup=True
)
async def action_toggle_sort_order(self) -> None:
"""Toggle the sort order of the envelope list."""
self.sort_order_ascending = not self.sort_order_ascending
worker = self.fetch_envelopes()
await worker.wait()
if self.sort_order_ascending:
self.action_oldest()
else:
self.action_newest()
async def action_toggle_mode(self) -> None:
"""Toggle the content mode between plaintext and markdown."""
content_container = self.query_one(ContentContainer)
await content_container.toggle_mode()
def action_next(self) -> None:
if not self.current_message_index >= 0:
return
next_id, next_idx = self.message_store.find_next_valid_id(
self.current_message_index
)
if next_id is not None and next_idx is not None:
self.current_message_id = next_id
self.current_message_index = next_idx
self.fetch_envelopes() if self.reload_needed else None
def action_previous(self) -> None:
if not self.current_message_index >= 0:
return
prev_id, prev_idx = self.message_store.find_prev_valid_id(
self.current_message_index
)
if prev_id is not None and prev_idx is not None:
self.current_message_id = prev_id
self.current_message_index = prev_idx
self.fetch_envelopes() if self.reload_needed else None
async def action_delete(self) -> None:
"""Delete the current message and update UI consistently."""
# Call the delete_current function which uses our Himalaya client module
worker = delete_current(self)
await worker.wait()
async def action_archive(self) -> None:
"""Archive the current message and update UI consistently."""
# Call the archive_current function which uses our Himalaya client module
worker = archive_current(self)
await worker.wait()
def action_open(self) -> None:
action_open(self)
def action_create_task(self) -> None:
action_create_task(self)
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content").scroll_down()
def action_scroll_up(self) -> None:
"""Scroll the main content up."""
self.query_one("#main_content").scroll_up()
def action_scroll_page_down(self) -> None:
"""Scroll the main content down by a page."""
self.query_one("#main_content").scroll_page_down()
def action_scroll_page_up(self) -> None:
"""Scroll the main content up by a page."""
self.query_one("#main_content").scroll_page_up()
def action_quit(self) -> None:
"""Quit the application."""
self.exit()
def action_oldest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.message_store.get_oldest_id())
def action_newest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
self.show_message(self.message_store.get_newest_id())
def action_focus_1(self) -> None:
self.query_one("#envelopes_list").focus()
def action_focus_2(self) -> None:
self.query_one("#accounts_list").focus()
def action_focus_3(self) -> None:
self.query_one("#folders_list").focus()
if __name__ == "__main__":
app = EmailViewerApp()
app.run()
def launch_email_viewer():
app = EmailViewerApp()
app.run()

View File

@@ -0,0 +1,173 @@
/* Basic stylesheet for the Textual Email Viewer App */
#main_content, .list_view {
scrollbar-size: 1 1;
border: round rgb(117, 106, 129);
height: 1fr;
}
#sidebar {
width: 1fr
}
#main_content {
width: 2fr;
}
#sidebar:focus-within {
background: $panel;
.list_view:blur {
height: 3;
}
.list_view:focus {
height: 2fr;
}
}
#main_content:focus, .list_view:focus {
border: round $secondary;
background: rgb(55, 53, 57);
border-title-style: bold;
}
Label#task_prompt {
padding: 1;
color: rgb(128,128,128);
}
Label#task_prompt_label {
padding: 1;
color: rgb(255, 216, 102);
}
Label#message_label {
padding: 1;
}
StatusTitle {
dock: top;
width: 100%;
height: 1;
color: $text;
background: rgb(64, 62, 65);
content-align: center middle;
}
EnvelopeHeader {
dock: top;
width: 100%;
max-height: 2;
tint: $primary 10%;
}
Markdown {
padding: 1 2;
}
.email_subject {
width: 1fr;
padding: 0
}
.header_key {
tint: gray 20%;
min-width: 10;
text-style:bold;
}
.header_value {
padding:0 1 0 0;
height: auto;
width: auto;
}
.modal_screen {
align: center middle;
margin: 1;
padding: 2;
border: round $border;
background: $panel;
width: auto;
height: auto;
}
#create_task_container {
width: 50%;
height: 50%;
border: heavy $secondary;
layout: horizontal;
align: center middle;
Label {
width: auto;
}
Input {
width: 1fr;
}
}
#envelopes_list {
ListItem:odd {
background: rgb(45, 45, 46);
}
ListItem:even {
background: rgb(50, 50, 56);
}
& > ListItem {
&.-highlight {
color: $block-cursor-blurred-foreground;
background: $block-cursor-blurred-background;
text-style: $block-cursor-blurred-text-style;
}
}
}
#open_message_container, #create_task_container {
dock: bottom;
width: 100%;
padding: 0 1;
height: 5;
Input {
width: 1fr;
}
Label, Button {
width: auto;
}
}
Label.group_header {
color: rgb(140, 140, 140);
text-style: bold;
background: rgb(64, 62, 65);
width: 100%;
padding: 0 1;
}
#plaintext_content {
padding: 1 2;
height: auto;
width: 100%;
}
#html_content {
padding: 1 2;
height: auto;
width: 100%;
}
.hidden {
display: none;
}
#markdown_content {
padding: 1 2;
}
ContentContainer {
width: 100%;
height: 1fr;
}

View File

@@ -0,0 +1,150 @@
import logging
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime, UTC
from src.services.himalaya import client as himalaya_client
class MessageStore:
"""Store and manage message envelopes"""
def __init__(self):
self.envelopes: List[Dict[str, Any]] = []
self.metadata_by_id: Dict[int, Dict[str, Any]] = {}
self.total_messages = 0
def load(
self, envelopes: List[Dict[str, Any]], sort_ascending: bool = True
) -> None:
"""Load envelopes from Himalaya client and process them"""
if not envelopes:
self.envelopes = []
self.metadata_by_id = {}
self.total_messages = 0
return
# Sort by date
envelopes.sort(
key=lambda x: x.get("date", ""),
reverse=not sort_ascending,
)
# Group envelopes by month
grouped_envelopes = []
months = {}
for envelope in envelopes:
if "id" not in envelope:
continue
# Extract date and determine month group
date_str = envelope.get("date", "")
try:
date = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
month_key = date.strftime("%B %Y")
except (ValueError, TypeError):
month_key = "Unknown Date"
# Add month header if this is a new month
if month_key not in months:
months[month_key] = True
grouped_envelopes.append({"type": "header", "label": month_key})
# Add the envelope
grouped_envelopes.append(envelope)
# Store metadata for quick access
envelope_id = int(envelope["id"])
self.metadata_by_id[envelope_id] = {
"id": envelope_id,
"subject": envelope.get("subject", ""),
"from": envelope.get("from", {}),
"to": envelope.get("to", {}),
"cc": envelope.get("cc", {}),
"date": date_str,
"index": len(grouped_envelopes) - 1,
}
self.envelopes = grouped_envelopes
self.total_messages = len(self.metadata_by_id)
async def reload(self, sort_ascending: bool = True) -> None:
"""Reload envelopes from the Himalaya client"""
envelopes, success = await himalaya_client.list_envelopes()
if success:
self.load(envelopes, sort_ascending)
else:
logging.error("Failed to reload envelopes")
def get_metadata(self, message_id: int) -> Optional[Dict[str, Any]]:
"""Get metadata for a message by ID"""
return self.metadata_by_id.get(message_id)
def find_next_valid_id(
self, current_index: int
) -> Tuple[Optional[int], Optional[int]]:
"""Find the next valid message ID and its index"""
if not self.envelopes or current_index >= len(self.envelopes) - 1:
return None, None
# Start from current index + 1
for idx in range(current_index + 1, len(self.envelopes)):
item = self.envelopes[idx]
# Skip header items
if item and item.get("type") != "header" and "id" in item:
return int(item["id"]), idx
return None, None
def find_prev_valid_id(
self, current_index: int
) -> Tuple[Optional[int], Optional[int]]:
"""Find the previous valid message ID and its index"""
if not self.envelopes or current_index <= 0:
return None, None
# Start from current index - 1
for idx in range(current_index - 1, -1, -1):
item = self.envelopes[idx]
# Skip header items
if item and item.get("type") != "header" and "id" in item:
return int(item["id"]), idx
return None, None
def get_oldest_id(self) -> int:
"""Get the ID of the oldest message (first non-header item)"""
for item in self.envelopes:
if item and item.get("type") != "header" and "id" in item:
return int(item["id"])
return 0
def get_newest_id(self) -> int:
"""Get the ID of the newest message (last non-header item)"""
for item in reversed(self.envelopes):
if item and item.get("type") != "header" and "id" in item:
return int(item["id"])
return 0
def remove_envelope(self, message_id: int) -> None:
"""Remove an envelope from the store"""
metadata = self.metadata_by_id.get(message_id)
if not metadata:
return
index = metadata["index"]
if 0 <= index < len(self.envelopes):
# Remove from the envelopes list
self.envelopes.pop(index)
# Remove from metadata dictionary
del self.metadata_by_id[message_id]
# Update indexes for all subsequent messages
for id_, meta in self.metadata_by_id.items():
if meta["index"] > index:
meta["index"] -= 1
# Update total message count
self.total_messages = len(self.metadata_by_id)
else:
logging.warning(f"Invalid index {index} for message ID {message_id}")

View File

@@ -0,0 +1,115 @@
import logging
from textual.screen import ModalScreen
from textual.widgets import Input, Label, Button, ListView, ListItem
from textual.containers import Vertical, Horizontal, Container
from textual import on, work
from src.services.taskwarrior import client as taskwarrior_client
class CreateTaskScreen(ModalScreen):
"""Screen for creating a new task."""
def __init__(self, subject="", from_addr="", **kwargs):
super().__init__(**kwargs)
self.subject = subject
self.from_addr = from_addr
self.selected_project = None
def compose(self):
yield Container(
Vertical(
Label("Create Task", id="create_task_title"),
Horizontal(
Label("Subject:"),
Input(
placeholder="Task subject",
value=self.subject,
id="subject_input",
),
),
Horizontal(
Label("Project:"),
Input(placeholder="Project name", id="project_input"),
),
Horizontal(
Label("Tags:"),
Input(placeholder="Comma-separated tags", id="tags_input"),
),
Horizontal(
Label("Due:"),
Input(
placeholder="Due date (e.g., today, tomorrow, fri)",
id="due_input",
),
),
Horizontal(
Label("Priority:"),
Input(placeholder="Priority (H, M, L)", id="priority_input"),
),
Horizontal(
Button("Create", id="create_btn", variant="primary"),
Button("Cancel", id="cancel_btn", variant="error"),
),
id="create_task_form",
),
id="create_task_container",
)
def on_mount(self):
self.styles.align = ("center", "middle")
@on(Button.Pressed, "#create_btn")
def on_create_pressed(self):
"""Create the task when the Create button is pressed."""
# Get input values
subject = self.query_one("#subject_input").value
project = self.query_one("#project_input").value
tags_input = self.query_one("#tags_input").value
due = self.query_one("#due_input").value
priority = self.query_one("#priority_input").value
# Process tags (split by commas and trim whitespace)
tags = [tag.strip() for tag in tags_input.split(",")] if tags_input else []
# Add a tag for the sender, if provided
if self.from_addr and "@" in self.from_addr:
domain = self.from_addr.split("@")[1].split(".")[0]
if domain and domain not in ["gmail", "yahoo", "hotmail", "outlook"]:
tags.append(domain)
# Create the task
self.create_task_worker(subject, tags, project, due, priority)
@on(Button.Pressed, "#cancel_btn")
def on_cancel_pressed(self):
"""Dismiss the screen when Cancel is pressed."""
self.dismiss()
@work(exclusive=True)
async def create_task_worker(
self, subject, tags=None, project=None, due=None, priority=None
):
"""Worker to create a task using the Taskwarrior API client."""
if not subject:
self.app.show_status("Task subject cannot be empty.", "error")
return
# Validate priority
if priority and priority not in ["H", "M", "L"]:
self.app.show_status("Priority must be H, M, or L.", "warning")
priority = None
# Create the task
success, result = await taskwarrior_client.create_task(
task_description=subject,
tags=tags or [],
project=project,
due=due,
priority=priority,
)
if success:
self.app.show_status(f"Task created: {subject}", "success")
self.dismiss()
else:
self.app.show_status(f"Failed to create task: {result}", "error")

View File

@@ -0,0 +1,561 @@
import io
import os
import tempfile
from pathlib import Path
from typing import ByteString
import aiohttp
import mammoth
from docx import Document
from textual_image.renderable import Image
from openai import OpenAI
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container, ScrollableContainer, Horizontal
from textual.screen import Screen
from textual.widgets import Label, Markdown, Button, Footer, Static
from textual import work
from textual.reactive import reactive
from PIL import Image as PILImage
# Define convertible formats
PDF_CONVERTIBLE_FORMATS = {
"doc",
"docx",
"epub",
"eml",
"htm",
"html",
"md",
"msg",
"odp",
"ods",
"odt",
"pps",
"ppsx",
"ppt",
"pptx",
"rtf",
"tif",
"tiff",
"xls",
"xlsm",
"xlsx",
}
JPG_CONVERTIBLE_FORMATS = {
"3g2",
"3gp",
"3gp2",
"3gpp",
"3mf",
"ai",
"arw",
"asf",
"avi",
"bas",
"bash",
"bat",
"bmp",
"c",
"cbl",
"cmd",
"cool",
"cpp",
"cr2",
"crw",
"cs",
"css",
"csv",
"cur",
"dcm",
"dcm30",
"dic",
"dicm",
"dicom",
"dng",
"doc",
"docx",
"dwg",
"eml",
"epi",
"eps",
"epsf",
"epsi",
"epub",
"erf",
"fbx",
"fppx",
"gif",
"glb",
"h",
"hcp",
"heic",
"heif",
"htm",
"html",
"ico",
"icon",
"java",
"jfif",
"jpeg",
"jpg",
"js",
"json",
"key",
"log",
"m2ts",
"m4a",
"m4v",
"markdown",
"md",
"mef",
"mov",
"movie",
"mp3",
"mp4",
"mp4v",
"mrw",
"msg",
"mts",
"nef",
"nrw",
"numbers",
"obj",
"odp",
"odt",
"ogg",
"orf",
"pages",
"pano",
"pdf",
"pef",
"php",
"pict",
"pl",
"ply",
"png",
"pot",
"potm",
"potx",
"pps",
"ppsx",
"ppsxm",
"ppt",
"pptm",
"pptx",
"ps",
"ps1",
"psb",
"psd",
"py",
"raw",
"rb",
"rtf",
"rw1",
"rw2",
"sh",
"sketch",
"sql",
"sr2",
"stl",
"tif",
"tiff",
"ts",
"txt",
"vb",
"webm",
"wma",
"wmv",
"xaml",
"xbm",
"xcf",
"xd",
"xml",
"xpm",
"yaml",
"yml",
}
# Enum for display modes
class DisplayMode:
IMAGE = "image"
TEXT = "text"
MARKDOWN = "markdown"
class DocumentViewerScreen(Screen):
"""Screen for viewing document content from OneDrive items."""
web_url = reactive("")
download_url = reactive("")
use_markitdown = True
image_bytes: ByteString = b""
BINDINGS = [
Binding("escape", "close", "Close"),
Binding("q", "close", "Close"),
Binding("m", "toggle_mode", "Toggle Mode"),
Binding("e", "export_and_open", "Export & Open"),
]
def __init__(self, item_id: str, item_name: str, access_token: str, drive_id: str):
"""Initialize the document viewer screen.
Args:
item_id: The ID of the item to view.
item_name: The name of the item to display.
access_token: The access token for API requests.
drive_id: The ID of the drive containing the item.
"""
super().__init__()
self.item_id = item_id
self.drive_id = drive_id
self.item_name = item_name
self.access_token = access_token
self.document_content = ""
self.plain_text_content = ""
self.content_type = None
self.raw_content = None
self.file_extension = Path(item_name).suffix.lower().lstrip(".")
self.mode: DisplayMode = DisplayMode.TEXT
def compose(self) -> ComposeResult:
"""Compose the document viewer screen."""
yield Container(
Horizontal(
Container(Button("", id="close_button"), id="button_container"),
Container(
Label(f"Viewing: {self.item_name}", id="document_title"),
Label(
f'[link="{self.web_url}"]Open on Web[/link] | [link="{self.download_url}"]Download File[/link]',
id="document_link",
),
),
Button("Toggle Mode", id="toggle_mode_button"),
id="top_container",
),
ScrollableContainer(
Markdown("", id="markdown_content"),
Static(
"",
id="image_content",
expand=True,
),
Label("", id="plaintext_content", classes="hidden", markup=False),
id="content_container",
),
id="document_viewer",
)
yield Footer()
def on_mount(self) -> None:
"""Handle screen mount event."""
self.query_one("#content_container").focus()
self.download_document()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press events."""
if event.button.id == "close_button":
self.dismiss()
elif event.button.id == "toggle_mode_button":
self.action_toggle_mode()
elif event.button.id == "export_button":
self.action_export_and_open()
def is_convertible_format(self) -> bool:
"""Check if the current file is convertible to PDF or JPG."""
return (
self.file_extension in PDF_CONVERTIBLE_FORMATS
or self.file_extension in JPG_CONVERTIBLE_FORMATS
)
def get_conversion_format(self) -> str:
"""Get the appropriate conversion format (pdf or jpg) for the current file."""
if self.file_extension in PDF_CONVERTIBLE_FORMATS:
return "pdf"
elif self.file_extension in JPG_CONVERTIBLE_FORMATS:
return "jpg"
return ""
@work
async def download_document(self) -> None:
"""Download the document content."""
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
metadataUrl = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}"
async with aiohttp.ClientSession() as session:
async with session.get(metadataUrl, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(
f"Failed to fetch document metadata: {error_text}",
severity="error",
)
return
metadata = await response.json()
self.item_name = metadata.get("name", self.item_name)
self.file_extension = (
Path(self.item_name).suffix.lower().lstrip(".")
)
self.download_url = metadata.get("@microsoft.graph.downloadUrl", "")
self.web_url = metadata.get("webUrl", "")
except Exception as e:
self.notify(f"Error downloading document: {str(e)}", severity="error")
try:
url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content"
# Show loading indicator
self.query_one("#content_container").loading = True
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(
f"Failed to download document: {error_text}",
severity="error",
)
return
self.content_type = response.headers.get("content-type", "")
self.raw_content = await response.read()
# Process the content based on content type
self.process_content()
except Exception as e:
self.notify(f"Error downloading document: {str(e)}", severity="error")
finally:
# Hide loading indicator
self.query_one("#content_container").loading = False
@work
async def process_content(self) -> None:
"""Process the downloaded content based on its type."""
if not self.raw_content:
self.notify("No content to display", severity="warning")
return
try:
if self.content_type.startswith("image/"):
from PIL import Image as PILImage
from io import BytesIO
self.notify("Attempting to display image in terminal")
if self.raw_content and len(self.raw_content) > 0:
self.image_bytes = self.raw_content
self.mode = DisplayMode.IMAGE
# Decode the image using BytesIO and Pillow
img = PILImage.open(BytesIO(self.image_bytes))
# Convert the image to RGB mode if it's not already
if img.mode != "RGB":
img = img.convert("RGB")
# Create a Textual Image renderable
textual_img = Image(img)
textual_img.expand = True
textual_img.width = 120
self.query_one("#image_content", Static).update(textual_img)
self.update_content_display()
return
except Exception as e:
self.notify(
f"Error displaying image in terminal: {str(e)}", severity="error"
)
try:
if self.use_markitdown:
self.notify(
"Attempting to convert file into Markdown with Markitdown...",
title="This could take a moment",
severity="info",
)
from markitdown import MarkItDown
with tempfile.NamedTemporaryFile(
suffix=f".{self.file_extension}", delete=False
) as temp_file:
temp_file.write(self.raw_content)
temp_path = temp_file.name
client = OpenAI()
md = MarkItDown(
enable_plugins=True, llm_client=client, llm_model="gpt-4o"
) # Set to True to enable plugins
result = md.convert(
temp_path,
)
self.mode = DisplayMode.MARKDOWN
self.document_content = result.markdown
self.plain_text_content = result.text_content
self.update_content_display()
return
except Exception as e:
self.notify(f"Error using MarkItDown: {str(e)}", severity="error")
try:
if (
self.content_type
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
self.notify(
"Processing DOCX file into Markdown using Mammoth...",
severity="info",
)
self.process_docx()
elif self.content_type.startswith("text/"):
# Process as plain text
text_content = self.raw_content.decode("utf-8", errors="replace")
self.document_content = text_content
self.mode = DisplayMode.TEXT
self.update_content_display()
elif self.content_type.startswith("image/"):
# For images, just display a message
self.document_content = f"*Image file: {self.item_name}*\n\nUse the 'Open URL' command to view this image in your browser."
self.mode = DisplayMode.MARKDOWN
self.update_content_display()
else:
# For other types, display a generic message
conversion_info = ""
if self.is_convertible_format():
conversion_format = self.get_conversion_format()
conversion_info = f"\n\nThis file can be converted to {conversion_format.upper()}. Press 'e' or click 'Export & Open' to convert and view."
self.document_content = f"*File: {self.item_name}*\n\nContent type: {self.content_type}{conversion_info}\n\nThis file type cannot be displayed directly in the viewer. You could [open in your browser]({self.web_url}), or [download the file]({self.download_url})."
self.mode = DisplayMode.MARKDOWN
self.update_content_display()
except Exception as e:
self.notify(f"Error processing content: {str(e)}", severity="error")
@work
async def process_docx(self) -> None:
"""Process DOCX content and convert to Markdown and plain text."""
try:
# Save the DOCX content to a temporary file
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file:
temp_file.write(self.raw_content)
temp_path = temp_file.name
# Convert DOCX to Markdown using mammoth
with open(temp_path, "rb") as docx_file:
result = mammoth.convert_to_markdown(docx_file)
markdown_text = result.value
# Read the document structure with python-docx for plain text
doc = Document(temp_path)
self.plain_text_content = "\n\n".join(
[para.text for para in doc.paragraphs if para.text]
)
self.document_content = markdown_text
# Clean up temporary file
os.unlink(temp_path)
# Store both versions
self.update_content_display()
except Exception as e:
self.notify(f"Error processing DOCX: {str(e)}", severity="error")
def update_content_display(self) -> None:
"""Update the content display with the processed document content."""
markdown_widget = self.query_one("#markdown_content", Markdown)
plaintext_widget = self.query_one("#plaintext_content", Label)
image_widget = self.query_one("#image_content", Static)
toggle_button = self.query_one("#toggle_mode_button", Button)
if self.mode == DisplayMode.IMAGE:
toggle_button.label = "\U000f02e9"
image_widget.remove_class("hidden")
markdown_widget.add_class("hidden")
plaintext_widget.add_class("hidden")
elif self.mode == DisplayMode.MARKDOWN:
toggle_button.label = "Mode \U000f0354|\U000f09ed"
markdown_widget.update(self.document_content)
markdown_widget.remove_class("hidden")
image_widget.add_class("hidden")
plaintext_widget.add_class("hidden")
else:
toggle_button.label = "Mode \U000f0f5b|\U000f021a"
plaintext_widget.update(self.plain_text_content)
plaintext_widget.remove_class("hidden")
image_widget.add_class("hidden")
markdown_widget.add_class("hidden")
@work
async def export_and_open_converted_file(self) -> None:
"""Export the file in converted format and open it."""
if not self.is_convertible_format():
self.notify("This file format cannot be converted.", severity="warning")
return
conversion_format = self.get_conversion_format()
if not conversion_format:
self.notify("No appropriate conversion format found.", severity="error")
return
try:
# Build the URL with the format parameter
url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content?format={conversion_format}"
headers = {"Authorization": f"Bearer {self.access_token}"}
# Download the converted file
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(
f"Failed to export document: {error_text}", severity="error"
)
return
converted_content = await response.read()
# Create temporary file with the right extension
file_name = (
f"{os.path.splitext(self.item_name)[0]}.{conversion_format}"
)
with tempfile.NamedTemporaryFile(
suffix=f".{conversion_format}",
delete=False,
prefix=f"onedrive_export_",
) as temp_file:
temp_file.write(converted_content)
temp_path = temp_file.name
# Open the file using the system default application
self.notify(
f"Opening exported {conversion_format.upper()} file: {file_name}"
)
self.app.open_url(f"file://{temp_path}")
self.query_one("#content_container").loading = False
except Exception as e:
self.notify(f"Error exporting document: {str(e)}", severity="error")
async def action_toggle_mode(self) -> None:
"""Toggle between Markdown and plaintext display modes."""
self.notify("Switching Modes", severity="info")
self.mode = (
DisplayMode.MARKDOWN
if self.mode != DisplayMode.MARKDOWN
else DisplayMode.TEXT
)
self.update_content_display()
mode_name = str(self.mode).capitalize()
self.notify(f"Switched to {mode_name} mode")
async def action_export_and_open(self) -> None:
"""Export the file in converted format and open it."""
self.query_one("#content_container").loading = True
self.notify("Exporting and opening the converted file...")
self.export_and_open_converted_file()
async def action_close(self) -> None:
"""Close the document viewer screen."""
self.dismiss()

View File

@@ -0,0 +1,35 @@
from textual import on
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.widgets import Input, Label, Button
from textual.containers import Horizontal
class OpenMessageScreen(ModalScreen[int | None]):
def compose(self) -> ComposeResult:
yield Horizontal(
Label("📨 ID", id="message_label"),
Input(
placeholder="Enter message ID (integer only)",
type="integer",
id="open_message_input",
),
Button("Cancel", id="cancel"),
Button("Open", variant="primary", id="submit"),
id="open_message_container",
classes="modal_screen",
)
@on(Input.Submitted)
def handle_message_id(self, event) -> None:
input_widget = self.query_one("#open_message_input", Input)
message_id = int(input_widget.value if input_widget.value else 0)
self.dismiss(message_id)
def button_on_click(self, event) -> None:
if event.button.id == "cancel":
self.dismiss()
elif event.button.id == "submit":
input_widget = self.query_one("#open_message_input", Input)
message_id = int(input_widget.value if input_widget.value else 0)
self.dismiss(message_id)

View File

@@ -0,0 +1,6 @@
# Initialize the screens package
from .CreateTask import CreateTaskScreen
from .OpenMessage import OpenMessageScreen
from .DocumentViewer import DocumentViewerScreen
__all__ = ["CreateTaskScreen", "OpenMessageScreen", "DocumentViewerScreen"]

42
src/maildir_gtd/utils.py Normal file
View File

@@ -0,0 +1,42 @@
from datetime import UTC, datetime, timedelta
import re
from typing import List, Dict
def group_envelopes_by_date(envelopes: List[Dict]) -> List[Dict]:
"""Group envelopes by date and add headers for each group."""
grouped_envelopes = []
today = datetime.now().astimezone(UTC)
yesterday = today - timedelta(days=1)
start_of_week = today - timedelta(days=today.weekday())
start_of_last_week = start_of_week - timedelta(weeks=1)
start_of_month = today.replace(day=1)
start_of_last_month = (start_of_month - timedelta(days=1)).replace(day=1)
def get_group_label(date: datetime) -> str:
if date.date() == today.date():
return "Today"
elif date.date() == yesterday.date():
return "Yesterday"
elif date >= start_of_week:
return "This Week"
elif date >= start_of_last_week:
return "Last Week"
elif date >= start_of_month:
return "This Month"
elif date >= start_of_last_month:
return "Last Month"
else:
return "Older"
current_group = None
for envelope in envelopes:
envelope_date = re.sub(r"[\+\-]\d\d:\d\d", "", envelope["date"])
envelope_date = datetime.strptime(envelope_date, "%Y-%m-%d %H:%M").astimezone(UTC)
group_label = get_group_label(envelope_date)
if group_label != current_group:
grouped_envelopes.append({"type": "header", "label": group_label})
current_group = group_label
grouped_envelopes.append(envelope)
return grouped_envelopes

View File

@@ -0,0 +1,162 @@
from markitdown import MarkItDown
from textual import work
from textual.containers import Vertical, ScrollableContainer
from textual.widgets import Static, Markdown, Label
from src.services.himalaya import client as himalaya_client
import logging
from datetime import datetime
import re
import os
import sys
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class EnvelopeHeader(Vertical):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.subject_label = Label("")
self.from_label = Label("")
self.to_label = Label("")
self.date_label = Label("")
self.cc_label = Label("")
def on_mount(self):
self.styles.height = "auto"
self.mount(self.subject_label)
self.mount(self.from_label)
self.mount(self.to_label)
self.mount(self.cc_label)
self.mount(self.date_label)
def update(self, subject, from_, to, date, cc=None):
self.subject_label.update(f"[b]Subject:[/b] {subject}")
self.from_label.update(f"[b]From:[/b] {from_}")
self.to_label.update(f"[b]To:[/b] {to}")
# Format the date for better readability
if date:
try:
# Try to convert the date string to a datetime object
date_obj = datetime.fromisoformat(date.replace("Z", "+00:00"))
formatted_date = date_obj.strftime("%a, %d %b %Y %H:%M:%S %Z")
self.date_label.update(f"[b]Date:[/b] {formatted_date}")
except (ValueError, TypeError):
# If parsing fails, just use the original date string
self.date_label.update(f"[b]Date:[/b] {date}")
else:
self.date_label.update("[b]Date:[/b] Unknown")
if cc:
self.cc_label.update(f"[b]CC:[/b] {cc}")
self.cc_label.styles.display = "block"
else:
self.cc_label.styles.display = "none"
class ContentContainer(ScrollableContainer):
can_focus = True
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.md = MarkItDown()
self.header = EnvelopeHeader(id="envelope_header")
self.content = Markdown("", id="markdown_content")
self.html_content = Static("", id="html_content", markup=False)
self.current_mode = "text" # Default to text mode
self.current_content = None
self.current_message_id = None
self.content_worker = None
def compose(self):
yield self.content
yield self.html_content
def on_mount(self):
# Hide markdown content initially
self.content.styles.display = "none"
self.html_content.styles.display = "block"
async def toggle_mode(self):
"""Toggle between plaintext and HTML viewing modes."""
if self.current_mode == "html":
self.current_mode = "text"
self.html_content.styles.display = "none"
self.content.styles.display = "block"
else:
self.current_mode = "html"
self.content.styles.display = "none"
self.html_content.styles.display = "block"
# Reload the content if we have a message ID
if self.current_message_id:
self.display_content(self.current_message_id)
def update_header(self, subject, from_, to, date, cc=None):
self.header.update(subject, from_, to, date, cc)
@work(exclusive=True)
async def fetch_message_content(self, message_id: int, format: str):
"""Fetch message content using the Himalaya client module."""
if not message_id:
self.notify("No message ID provided.")
return
content, success = await himalaya_client.get_message_content(message_id)
if success:
self._update_content(content)
else:
self.notify(
f"Failed to fetch content for message ID {message_id}.")
def display_content(self, message_id: int) -> None:
"""Display the content of a message."""
if not message_id:
return
self.current_message_id = message_id
# Cancel any existing content fetch operations
if self.content_worker:
self.content_worker.cancel()
# Fetch content in the current mode
format_type = "text" if self.current_mode == "text" else "html"
self.content_worker = self.fetch_message_content(
message_id, format_type)
def _update_content(self, content: str | None) -> None:
"""Update the content widgets with the fetched content."""
try:
if self.current_mode == "text":
# For text mode, use the Markdown widget
self.content.update(content)
else:
# For HTML mode, use the Static widget with markup
# First, try to extract the body content if it's HTML
body_match = re.search(
r"<body[^>]*>(.*?)</body>", content, re.DOTALL | re.IGNORECASE
)
if body_match:
content = body_match.group(1)
# Replace some common HTML elements with Textual markup
content = content.replace("<b>", "[b]").replace("</b>", "[/b]")
content = content.replace("<i>", "[i]").replace("</i>", "[/i]")
content = content.replace("<u>", "[u]").replace("</u>", "[/u]")
# Convert links to a readable format
content = re.sub(
r'<a href="([^"]+)"[^>]*>([^<]+)</a>', r"[\2](\1)", content
)
# Add CSS for better readability
self.html_content.update(content)
except Exception as e:
logging.error(f"Error updating content: {e}")
if self.current_mode == "text":
self.content.update(f"Error displaying content: {e}")
else:
self.html_content.update(f"Error displaying content: {e}")

View File

@@ -0,0 +1,115 @@
from textual.reactive import Reactive
from textual.app import ComposeResult
from textual.widgets import Label
from textual.containers import Horizontal, ScrollableContainer
from datetime import datetime
import re
from datetime import UTC
class EnvelopeHeader(ScrollableContainer):
subject = Reactive("")
from_ = Reactive("")
to = Reactive("")
date = Reactive("")
cc = Reactive("")
bcc = Reactive("")
"""Header for the email viewer."""
def on_mount(self) -> None:
"""Mount the header."""
def compose(self) -> ComposeResult:
yield Horizontal(
Label("Subject:", classes="header_key"),
Label(self.subject, classes="header_value",
markup=False, id="subject"),
)
yield Horizontal(
Label("Date:", classes="header_key"),
Label(self.date, classes="header_value", markup=False, id="date"),
)
# yield Horizontal(
# Label("From:", classes="header_key"),
# Label(self.from_,
# classes="header_value", markup=False, id="from"),
# )
# yield Horizontal(
# Label("To:", classes="header_key"),
# Label(self.to, classes="header_value",
# markup=False, id="to"),
# )
# yield Horizontal(
# )
# yield Horizontal(
# Label("CC:", classes="header_key"),
# Label(self.cc, classes="header_value",
# markup=False, id="cc"),
# )
def watch_subject(self, subject: str) -> None:
"""Watch the subject for changes."""
self.query_one("#subject", Label).update(subject)
# def watch_to(self, to: str) -> None:
# """Watch the to field for changes."""
# self.query_one("#to").update(to)
# def watch_from(self, from_: str) -> None:
# """Watch the from field for changes."""
# self.query_one("#from").update(from_)
def watch_date(self, date: str) -> None:
"""Watch the date for changes and convert to local timezone."""
if date:
try:
# If date already has timezone info, parse it
if any(x in date for x in ['+', '-', 'Z']):
# Try parsing with timezone info
try:
# Handle ISO format with Z suffix
if 'Z' in date:
parsed_date = datetime.fromisoformat(
date.replace('Z', '+00:00'))
else:
parsed_date = datetime.fromisoformat(date)
except ValueError:
# Try another common format
parsed_date = datetime.strptime(
date, "%Y-%m-%d %H:%M%z")
else:
# No timezone info, assume UTC
try:
parsed_date = datetime.strptime(
date, "%Y-%m-%d %H:%M").replace(tzinfo=UTC)
except ValueError:
# If regular parsing fails, try to extract date cmpnts
match = re.search(
r"(\d{4}-\d{2}-\d{2})\s+(\d{2}:\d{2})", date)
if match:
date_part, time_part = match.groups()
parsed_date = datetime.strptime(
f"{date_part} {time_part}", "%Y-%m-%d %H:%M"
).replace(tzinfo=UTC)
else:
# If all else fails, just use the original string
self.query_one("#date", Label).update(date)
return
# Convert to local timezone
local_date = parsed_date.astimezone()
# Format for display
formatted_date = local_date.strftime("%a %b %d %H:%M (%Z)")
self.query_one("#date", Label).update(formatted_date)
except Exception:
# If parsing fails, just display the original date
self.query_one("#date", Label).update(f"{date}")
else:
self.query_one("#date", Label).update("")
# def watch_cc(self, cc: str) -> None:
# """Watch the cc field for changes."""
# self.query_one("#cc").update(cc)

View File

@@ -0,0 +1 @@
# Initialize the screens subpackage

7
src/services/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""
APIs package for the GTD Terminal Tools project.
This package contains modules for interacting with various external services like:
- Himalaya email client
- Taskwarrior task manager
"""

View File

@@ -0,0 +1,21 @@
"""
Himalaya API module for interacting with the Himalaya email client.
"""
from .client import (
list_envelopes,
list_accounts,
list_folders,
delete_message,
archive_message,
get_message_content,
)
__all__ = [
"list_envelopes",
"list_accounts",
"list_folders",
"delete_message",
"archive_message",
"get_message_content",
]

View File

@@ -0,0 +1,184 @@
from typing import Tuple, List, Dict, Any, Optional
import asyncio
import json
import logging
import subprocess
async def list_envelopes(limit: int = 9999) -> Tuple[List[Dict[str, Any]], bool]:
"""
Retrieve a list of email envelopes using the Himalaya CLI.
Args:
limit: Maximum number of envelopes to retrieve
Returns:
Tuple containing:
- List of envelope dictionaries
- Success status (True if operation was successful)
"""
try:
process = await asyncio.create_subprocess_shell(
f"himalaya envelope list -o json -s {limit}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
envelopes = json.loads(stdout.decode())
return envelopes, True
else:
logging.error(f"Error listing envelopes: {stderr.decode()}")
return [], False
except Exception as e:
logging.error(f"Exception during envelope listing: {e}")
return [], False
async def list_accounts() -> Tuple[List[Dict[str, Any]], bool]:
"""
Retrieve a list of accounts configured in Himalaya.
Returns:
Tuple containing:
- List of account dictionaries
- Success status (True if operation was successful)
"""
try:
process = await asyncio.create_subprocess_shell(
"himalaya account list -o json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
accounts = json.loads(stdout.decode())
return accounts, True
else:
logging.error(f"Error listing accounts: {stderr.decode()}")
return [], False
except Exception as e:
logging.error(f"Exception during account listing: {e}")
return [], False
async def list_folders() -> Tuple[List[Dict[str, Any]], bool]:
"""
Retrieve a list of folders available in Himalaya.
Returns:
Tuple containing:
- List of folder dictionaries
- Success status (True if operation was successful)
"""
try:
process = await asyncio.create_subprocess_shell(
"himalaya folder list -o json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
folders = json.loads(stdout.decode())
return folders, True
else:
logging.error(f"Error listing folders: {stderr.decode()}")
return [], False
except Exception as e:
logging.error(f"Exception during folder listing: {e}")
return [], False
async def delete_message(message_id: int) -> bool:
"""
Delete a message by its ID.
Args:
message_id: The ID of the message to delete
Returns:
True if deletion was successful, False otherwise
"""
try:
process = await asyncio.create_subprocess_shell(
f"himalaya message delete {message_id}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logging.error(f"Exception during message deletion: {e}")
return False
async def archive_message(message_id: int) -> bool:
"""
Archive a message by its ID.
Args:
message_id: The ID of the message to archive
Returns:
True if archiving was successful, False otherwise
"""
try:
process = await asyncio.create_subprocess_shell(
f"himalaya message archive {message_id}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logging.error(f"Exception during message archiving: {e}")
return False
async def get_message_content(message_id: int) -> Tuple[Optional[str], bool]:
"""
Retrieve the content of a message by its ID.
Args:
message_id: The ID of the message to retrieve
format: The desired format of the message content ("html" or "text")
Returns:
Tuple containing:
- Message content (or None if retrieval failed)
- Success status (True if operation was successful)
"""
try:
cmd = f"himalaya message read {message_id}"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
content = stdout.decode()
return content, True
else:
logging.error(f"Error retrieving message content: {
stderr.decode()}")
return None, False
except Exception as e:
logging.error(f"Exception during message content retrieval: {e}")
return None, False
def sync_himalaya():
"""Synchronize data using Himalaya."""
try:
# subprocess.run(["himalaya", "sync"], check=True)
print("Himalaya sync completed successfully.")
except subprocess.CalledProcessError as e:
print(f"Error during Himalaya sync: {e}")

View File

@@ -0,0 +1,3 @@
"""
Microsoft Graph API module for interacting with Microsoft 365 services.
"""

View File

@@ -0,0 +1,68 @@
"""
Authentication module for Microsoft Graph API.
"""
import os
import msal
from rich import print
from rich.panel import Panel
def ensure_directory_exists(path):
if not os.path.exists(path):
os.makedirs(path)
def get_access_token(scopes):
"""
Authenticate with Microsoft Graph API and obtain an access token.
Args:
scopes (list): List of scopes to request.
Returns:
tuple: (access_token, headers) where access_token is the token string
and headers is a dict with Authorization header.
Raises:
ValueError: If environment variables are missing.
Exception: If authentication fails.
"""
# Read Azure app credentials from environment variables
client_id = os.getenv('AZURE_CLIENT_ID')
tenant_id = os.getenv('AZURE_TENANT_ID')
if not client_id or not tenant_id:
raise ValueError("Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.")
# Token cache
cache = msal.SerializableTokenCache()
cache_file = 'token_cache.bin'
if os.path.exists(cache_file):
cache.deserialize(open(cache_file, 'r').read())
# Authentication
authority = f'https://login.microsoftonline.com/{tenant_id}'
app = msal.PublicClientApplication(client_id, authority=authority, token_cache=cache)
accounts = app.get_accounts()
if accounts:
token_response = app.acquire_token_silent(scopes, account=accounts[0])
else:
flow = app.initiate_device_flow(scopes=scopes)
if 'user_code' not in flow:
raise Exception("Failed to create device flow")
print(Panel(flow['message'], border_style="magenta", padding=2, title="MSAL Login Flow Link"))
token_response = app.acquire_token_by_device_flow(flow)
if 'access_token' not in token_response:
raise Exception("Failed to acquire token")
# Save token cache
with open(cache_file, 'w') as f:
f.write(cache.serialize())
access_token = token_response['access_token']
headers = {'Authorization': f'Bearer {access_token}', 'Prefer': 'outlook.body-content-type="text",IdType="ImmutableId"'}
return access_token, headers

View File

@@ -0,0 +1,60 @@
"""
Calendar operations for Microsoft Graph API.
"""
import os
from datetime import datetime, timedelta
from .client import fetch_with_aiohttp
async def fetch_calendar_events(
headers, days_back=1, days_forward=6, start_date=None, end_date=None
):
"""
Fetch calendar events from Microsoft Graph API.
Args:
headers (dict): Headers including authentication.
days_back (int): Number of days to look back.
days_forward (int): Number of days to look forward.
start_date (datetime): Optional start date, overrides days_back if provided.
end_date (datetime): Optional end date, overrides days_forward if provided.
Returns:
tuple: (events, total_count) where events is a list of event dictionaries
and total_count is the total number of events.
"""
# Calculate date range
if start_date is None:
start_date = datetime.now() - timedelta(days=days_back)
if end_date is None:
end_date = start_date + timedelta(days=days_forward)
# Format dates for API
start_date_str = start_date.strftime("%Y-%m-%dT00:00:00Z")
end_date_str = end_date.strftime("%Y-%m-%dT23:59:59Z")
# Prepare the API query
calendar_url = (
f"https://graph.microsoft.com/v1.0/me/calendarView?"
f"startDateTime={start_date_str}&endDateTime={end_date_str}&"
f"$select=id,subject,organizer,start,end,location,isAllDay,showAs,sensitivity"
)
events = []
# Make the API request
response_data = await fetch_with_aiohttp(calendar_url, headers)
events.extend(response_data.get("value", []))
# Check if there are more events (pagination)
next_link = response_data.get("@odata.nextLink")
while next_link:
response_data = await fetch_with_aiohttp(next_link, headers)
events.extend(response_data.get("value", []))
next_link = response_data.get("@odata.nextLink")
# Return events and total count
return events, len(events)

View File

@@ -0,0 +1,85 @@
"""
HTTP client for Microsoft Graph API.
"""
import aiohttp
import asyncio
import orjson
# Define a global semaphore for throttling
semaphore = asyncio.Semaphore(4)
async def fetch_with_aiohttp(url, headers):
"""
Fetch data from Microsoft Graph API.
Args:
url (str): The URL to fetch data from.
headers (dict): Headers including authentication.
Returns:
dict: JSON response data.
Raises:
Exception: If the request fails.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}")
raw_bytes = await response.read()
content_length = response.headers.get('Content-Length')
if content_length and len(raw_bytes) != int(content_length):
print("Warning: Incomplete response received!")
return None
return orjson.loads(raw_bytes)
async def post_with_aiohttp(url, headers, json_data):
"""
Post data to Microsoft Graph API.
Args:
url (str): The URL to post data to.
headers (dict): Headers including authentication.
json_data (dict): JSON data to post.
Returns:
int: HTTP status code.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json_data) as response:
return response.status
async def patch_with_aiohttp(url, headers, json_data):
"""
Patch data to Microsoft Graph API.
Args:
url (str): The URL to patch data to.
headers (dict): Headers including authentication.
json_data (dict): JSON data to patch.
Returns:
int: HTTP status code.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=json_data) as response:
return response.status
async def delete_with_aiohttp(url, headers):
"""
Delete data from Microsoft Graph API.
Args:
url (str): The URL to delete data from.
headers (dict): Headers including authentication.
Returns:
int: HTTP status code.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
return response.status

View File

@@ -0,0 +1,262 @@
"""
Mail operations for Microsoft Graph API.
"""
import os
import re
import glob
from typing import Set
import aiohttp
from .client import (
fetch_with_aiohttp,
patch_with_aiohttp,
post_with_aiohttp,
delete_with_aiohttp,
)
async def fetch_mail_async(
maildir_path,
attachments_dir,
headers,
progress,
task_id,
dry_run=False,
download_attachments=False,
):
"""
Fetch mail from Microsoft Graph API and save to Maildir.
Args:
maildir_path (str): Path to the Maildir.
attachments_dir (str): Path to save attachments.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
download_attachments (bool): If True, download email attachments.
Returns:
None
"""
from src.utils.mail_utils.maildir import save_mime_to_maildir_async
from utils.mail_utils.helpers import truncate_id
mail_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead"
messages = []
# Fetch the total count of messages in the inbox
inbox_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox"
response = await fetch_with_aiohttp(inbox_url, headers)
total_messages = response.get("totalItemCount", 0)
progress.update(task_id, total=total_messages)
while mail_url:
try:
response_data = await fetch_with_aiohttp(mail_url, headers)
except Exception as e:
progress.console.print(f"Error fetching messages: {e}")
continue
messages.extend(response_data.get("value", []))
progress.advance(task_id, len(response_data.get("value", [])))
# Get the next page URL from @odata.nextLink
mail_url = response_data.get("@odata.nextLink")
inbox_msg_ids = set(message["id"] for message in messages)
progress.update(task_id, completed=(len(messages) / 2))
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
new_files = set(glob.glob(os.path.join(new_dir, "*.eml*")))
cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*")))
for filename in Set.union(cur_files, new_files):
message_id = filename.split(".")[0].split("/")[
-1
] # Extract the Message-ID from the filename
if message_id not in inbox_msg_ids:
if not dry_run:
progress.console.print(f"Deleting {filename} from inbox")
os.remove(filename)
else:
progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox")
for message in messages:
progress.console.print(
f"Processing message: {message.get('subject', 'No Subject')}", end="\r"
)
await save_mime_to_maildir_async(
maildir_path,
message,
attachments_dir,
headers,
progress,
dry_run,
download_attachments,
)
progress.update(task_id, advance=0.5)
progress.update(task_id, completed=len(messages))
progress.console.print(f"\nFinished saving {len(messages)} messages.")
async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=False):
"""
Archive mail from Maildir to Microsoft Graph API archive folder.
Args:
maildir_path (str): Path to the Maildir.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
Returns:
None
"""
archive_dir = os.path.join(maildir_path, ".Archives")
archive_files = glob.glob(os.path.join(archive_dir, "**", "*.eml*"), recursive=True)
progress.update(task_id, total=len(archive_files))
folder_response = await fetch_with_aiohttp(
"https://graph.microsoft.com/v1.0/me/mailFolders", headers
)
folders = folder_response.get("value", [])
archive_folder_id = next(
(
folder.get("id")
for folder in folders
if folder.get("displayName", "").lower() == "archive"
),
None,
)
if not archive_folder_id:
raise Exception("No folder named 'Archive' found on the server.")
for filepath in archive_files:
message_id = os.path.basename(filepath).split(".")[
0
] # Extract the Message-ID from the filename
if not dry_run:
status = await post_with_aiohttp(
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move",
headers,
{"destinationId": archive_folder_id},
)
if status != 201: # 201 Created indicates success
progress.console.print(
f"Failed to move message to 'Archive': {message_id}, {status}"
)
if status == 404:
os.remove(filepath) # Remove the file from local archive if not found
progress.console.print(
f"Message not found on server, removed local copy: {message_id}"
)
elif status == 204:
progress.console.print(f"Moved message to 'Archive': {message_id}")
else:
progress.console.print(
f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}"
)
progress.advance(task_id)
return
async def delete_mail_async(maildir_path, headers, progress, task_id, dry_run=False):
"""
Delete mail from Maildir and Microsoft Graph API.
Args:
maildir_path (str): Path to the Maildir.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
Returns:
None
"""
trash_dir = os.path.join(maildir_path, ".Trash", "cur")
trash_files = set(glob.glob(os.path.join(trash_dir, "*.eml*")))
progress.update(task_id, total=len(trash_files))
for filepath in trash_files:
message_id = os.path.basename(filepath).split(".")[
0
] # Extract the Message-ID from the filename
if not dry_run:
progress.console.print(f"Moving message to trash: {message_id}")
status = await delete_with_aiohttp(
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers
)
if status == 204 or status == 404:
os.remove(filepath) # Remove the file from local trash
else:
progress.console.print(f"[DRY-RUN] Would delete message: {message_id}")
progress.advance(task_id)
async def synchronize_maildir_async(
maildir_path, headers, progress, task_id, dry_run=False
):
"""
Synchronize Maildir with Microsoft Graph API.
Args:
maildir_path (str): Path to the Maildir.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
Returns:
None
"""
from src.utils.mail_utils.helpers import (
load_last_sync_timestamp,
save_sync_timestamp,
truncate_id,
)
last_sync = load_last_sync_timestamp()
# Find messages moved from "new" to "cur" and mark them as read
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
new_files = set(glob.glob(os.path.join(new_dir, "*.eml*")))
cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*")))
moved_to_cur = [os.path.basename(f) for f in cur_files - new_files]
progress.update(task_id, total=len(moved_to_cur))
for filename in moved_to_cur:
# TODO: this isn't scalable, we should use a more efficient way to check if the file was modified
if os.path.getmtime(os.path.join(cur_dir, filename)) < last_sync:
progress.update(task_id, advance=1)
continue
message_id = re.sub(
r"\:2.+", "", filename.split(".")[0]
) # Extract the Message-ID from the filename
if not dry_run:
status = await patch_with_aiohttp(
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}",
headers,
{"isRead": True},
)
if status == 404:
os.remove(os.path.join(cur_dir, filename))
else:
progress.console.print(
f"[DRY-RUN] Would mark message as read: {truncate_id(message_id)}"
)
progress.advance(task_id)
# Save the current sync timestamp
if not dry_run:
save_sync_timestamp()
else:
progress.console.print("[DRY-RUN] Would save sync timestamp.")

View File

@@ -0,0 +1,17 @@
"""
Taskwarrior API module for interacting with the Taskwarrior command-line task manager.
"""
from .client import (
create_task,
list_tasks,
complete_task,
delete_task,
)
__all__ = [
"create_task",
"list_tasks",
"complete_task",
"delete_task",
]

View File

@@ -0,0 +1,146 @@
import asyncio
import json
import logging
from typing import Tuple, List, Dict, Any, Optional, Union
async def create_task(task_description: str, tags: List[str] = None, project: str = None,
due: str = None, priority: str = None) -> Tuple[bool, Optional[str]]:
"""
Create a new task using the Taskwarrior CLI.
Args:
task_description: Description of the task
tags: List of tags to apply to the task
project: Project to which the task belongs
due: Due date in the format that Taskwarrior accepts
priority: Priority of the task (H, M, L)
Returns:
Tuple containing:
- Success status (True if operation was successful)
- Task ID or error message
"""
try:
cmd = ["task", "add"]
# Add project if specified
if project:
cmd.append(f"project:{project}")
# Add tags if specified
if tags:
for tag in tags:
cmd.append(f"+{tag}")
# Add due date if specified
if due:
cmd.append(f"due:{due}")
# Add priority if specified
if priority and priority in ["H", "M", "L"]:
cmd.append(f"priority:{priority}")
# Add task description
cmd.append(task_description)
# Convert command list to string
cmd_str = " ".join(cmd)
process = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return True, stdout.decode().strip()
else:
error_msg = stderr.decode().strip()
logging.error(f"Error creating task: {error_msg}")
return False, error_msg
except Exception as e:
logging.error(f"Exception during task creation: {e}")
return False, str(e)
async def list_tasks(filter_str: str = "") -> Tuple[List[Dict[str, Any]], bool]:
"""
List tasks from Taskwarrior.
Args:
filter_str: Optional filter string to pass to Taskwarrior
Returns:
Tuple containing:
- List of task dictionaries
- Success status (True if operation was successful)
"""
try:
cmd = f"task {filter_str} export"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
tasks = json.loads(stdout.decode())
return tasks, True
else:
logging.error(f"Error listing tasks: {stderr.decode()}")
return [], False
except Exception as e:
logging.error(f"Exception during task listing: {e}")
return [], False
async def complete_task(task_id: str) -> bool:
"""
Mark a task as completed.
Args:
task_id: ID of the task to complete
Returns:
True if task was completed successfully, False otherwise
"""
try:
cmd = f"echo 'yes' | task {task_id} done"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logging.error(f"Exception during task completion: {e}")
return False
async def delete_task(task_id: str) -> bool:
"""
Delete a task.
Args:
task_id: ID of the task to delete
Returns:
True if task was deleted successfully, False otherwise
"""
try:
cmd = f"echo 'yes' | task {task_id} delete"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception as e:
logging.error(f"Exception during task deletion: {e}")
return False

300
src/utils/calendar_utils.py Normal file
View File

@@ -0,0 +1,300 @@
"""
Utility module for handling calendar events and iCalendar operations.
"""
import re
import os
from datetime import datetime, timedelta
from dateutil import parser
from dateutil.tz import UTC
import glob
def truncate_id(text, first=8, last=8):
"""
Truncate long IDs or filenames to show just the first and last few characters.
Args:
text: The ID or filename to truncate
first: Number of characters to keep from the beginning
last: Number of characters to keep from the end
Returns:
Truncated string with ellipsis in the middle
"""
if not text or len(text) <= first + last + 3:
return text
return f"{text[:first]}...{text[-last:]}"
def clean_text(text):
"""
Clean text by removing instances of 3 or more consecutive underscores
which can affect readability.
Args:
text: Text to clean
Returns:
Cleaned text
"""
if not text:
return ""
# Replace 3 or more consecutive underscores with 2 underscores
return re.sub(r'_{3,}', '__', text)
def escape_ical_text(text):
"""
Escape text for iCalendar format according to RFC 5545.
Args:
text: Text to escape
Returns:
Escaped text
"""
if not text:
return ""
# First clean multiple underscores
text = clean_text(text)
text = text.replace("\\", "\\\\")
text = text.replace("\n", "\\n")
text = text.replace(",", "\\,")
text = text.replace(";", "\\;")
return text
async def fetch_calendar_events(headers, days_back=1, days_forward=6, fetch_function=None,
start_date=None, end_date=None):
"""
Fetch calendar events from Microsoft Graph API.
Args:
headers: Authentication headers for Microsoft Graph API
days_back: Number of days to look back (default: 1)
days_forward: Number of days to look forward (default: 6)
fetch_function: Async function to use for fetching data (default: None)
Should accept URL and headers as parameters
start_date: Optional explicit start date (datetime object)
end_date: Optional explicit end date (datetime object)
Returns:
Tuple of (events list, total_events count)
"""
if fetch_function is None:
raise ValueError("fetch_function is required for API calls")
# Calculate date range
if start_date is None:
start_date = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(days=days_back)
if end_date is None:
end_of_today = datetime.now().replace(hour=23, minute=59, second=59)
end_date = end_of_today + timedelta(days=days_forward)
# Build the API URL
event_base_url = f"https://graph.microsoft.com/v1.0/me/calendarView?startDateTime={start_date.isoformat()}&endDateTime={end_date.isoformat()}"
calendar_url = f"{event_base_url}&$top=100&$select=start,end,id,iCalUId,subject,bodyPreview,webLink,location,recurrence,showAs,responseStatus,onlineMeeting,lastModifiedDateTime"
# Fetch total count for progress reporting (if needed)
total_event_url = f"{event_base_url}&$count=true&$select=id"
try:
total_response = await fetch_function(total_event_url, headers)
total_events = total_response.get('@odata.count', 0)
except Exception as e:
print(f"Error fetching total events count: {e}")
total_events = 0
# Fetch all calendar events, handling pagination
events = []
while calendar_url:
try:
response_data = await fetch_function(calendar_url, headers)
if response_data:
events.extend(response_data.get('value', []))
# Get the next page URL from @odata.nextLink
calendar_url = response_data.get('@odata.nextLink')
else:
print("Received empty response from calendar API")
break
except Exception as e:
print(f"Error fetching calendar events: {e}")
break
# Only return the events and total_events
return events, total_events
def write_event_to_ical(f, event, start, end):
"""
Write a single event to an iCalendar file.
Args:
f: File-like object to write to
event: Dictionary containing event data
start: Start datetime with timezone information
end: End datetime with timezone information
"""
# Preserve the original timezones
start_tz = start.tzinfo
end_tz = end.tzinfo
f.write(f"BEGIN:VEVENT\nSUMMARY:{escape_ical_text(event['subject'])}\n")
# Handle multi-line description properly
description = event.get('bodyPreview', '')
if description:
escaped_description = escape_ical_text(description)
f.write(f"DESCRIPTION:{escaped_description}\n")
f.write(f"UID:{event.get('iCalUId', '')}\n")
f.write(f"LOCATION:{escape_ical_text(event.get('location', {}).get('displayName', ''))}\n")
f.write(f"CLASS:{event.get('showAs', '')}\n")
f.write(f"STATUS:{event.get('responseStatus', {}).get('response', '')}\n")
if 'onlineMeeting' in event and event['onlineMeeting']:
f.write(f"URL:{event.get('onlineMeeting', {}).get('joinUrl', '')}\n")
# Write start and end times with timezone info in iCalendar format
if start.tzinfo == UTC:
f.write(f"DTSTART:{start.strftime('%Y%m%dT%H%M%SZ')}\n")
else:
tz_name = start_tz.tzname(None) if start_tz else 'UTC'
f.write(f"DTSTART;TZID={tz_name}:{start.strftime('%Y%m%dT%H%M%S')}\n")
if end.tzinfo == UTC:
f.write(f"DTEND:{end.strftime('%Y%m%dT%H%M%SZ')}\n")
else:
tz_name = end_tz.tzname(None) if end_tz else 'UTC'
f.write(f"DTEND;TZID={tz_name}:{end.strftime('%Y%m%dT%H%M%S')}\n")
# Handle recurrence rules
if 'recurrence' in event and event['recurrence']:
for rule in event['recurrence']:
if rule.startswith('RRULE'):
rule_parts = rule.split(';')
new_rule_parts = []
for part in rule_parts:
if part.startswith('UNTIL='):
until_value = part.split('=')[1]
until_date = parser.isoparse(until_value)
if start.tzinfo is not None and until_date.tzinfo is None:
until_date = until_date.replace(tzinfo=start.tzinfo)
new_rule_parts.append(f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}")
else:
new_rule_parts.append(part)
rule = ';'.join(new_rule_parts)
f.write(f"{rule}\n")
f.write("END:VEVENT\n")
def save_events_to_vdir(events, org_vdir_path, progress, task_id, dry_run=False):
"""
Save events to vdir format (one file per event).
Args:
events: List of event dictionaries
org_vdir_path: Path to save the event files
progress: Progress object for updating UI
task_id: Task ID for progress tracking
dry_run: If True, don't actually write files
Returns:
Number of events processed
"""
if dry_run:
progress.console.print(f"[DRY-RUN] Would save {len(events)} events to vdir format in {org_vdir_path}")
return len(events)
os.makedirs(org_vdir_path, exist_ok=True)
progress.console.print(f"Saving events to vdir format in {org_vdir_path}...")
# Create a dictionary to track existing files and their metadata
existing_files = {}
for file_path in glob.glob(os.path.join(org_vdir_path, "*.ics")):
file_name = os.path.basename(file_path)
file_mod_time = os.path.getmtime(file_path)
existing_files[file_name] = {
'path': file_path,
'mtime': file_mod_time
}
processed_files = set()
for event in events:
progress.advance(task_id)
if 'start' not in event or 'end' not in event:
continue
# Parse start and end times with timezone information
start = parser.isoparse(event['start']['dateTime'])
end = parser.isoparse(event['end']['dateTime'])
uid = event.get('iCalUId', '')
if not uid:
# Generate a unique ID if none exists
uid = f"outlook-{event.get('id', '')}"
# Create a filename based on the UID
safe_filename = re.sub(r'[^\w\-]', '_', uid) + ".ics"
event_path = os.path.join(org_vdir_path, safe_filename)
processed_files.add(safe_filename)
# Check if we need to update this file
should_update = True
if safe_filename in existing_files:
# Only update if the event has been modified since the file was last updated
if 'lastModifiedDateTime' in event:
last_modified = parser.isoparse(event['lastModifiedDateTime']).timestamp()
file_mtime = existing_files[safe_filename]['mtime']
if last_modified <= file_mtime:
should_update = False
progress.console.print(f"Skipping unchanged event: {event['subject']}")
if should_update:
with open(event_path, 'w') as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
write_event_to_ical(f, event, start, end)
f.write("END:VCALENDAR\n")
# Remove files for events that no longer exist in the calendar view
for file_name in existing_files:
if file_name not in processed_files:
progress.console.print(f"Removing obsolete event file: {truncate_id(file_name)}")
os.remove(existing_files[file_name]['path'])
progress.console.print(f"Saved {len(events)} events to {org_vdir_path}")
return len(events)
def save_events_to_file(events, output_file, progress, task_id, dry_run=False):
"""
Save all events to a single iCalendar file.
Args:
events: List of event dictionaries
output_file: Path to the output file
progress: Progress object for updating UI
task_id: Task ID for progress tracking
dry_run: If True, don't actually write the file
Returns:
Number of events processed
"""
if dry_run:
progress.console.print(f"[DRY-RUN] Would save events to {output_file}")
return len(events)
os.makedirs(os.path.dirname(output_file), exist_ok=True)
progress.console.print(f"Saving events to {output_file}...")
with open(output_file, 'w') as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
for event in events:
progress.advance(task_id)
if 'start' in event and 'end' in event:
# Parse start and end times with timezone information
start = parser.isoparse(event['start']['dateTime'])
end = parser.isoparse(event['end']['dateTime'])
write_event_to_ical(f, event, start, end)
f.write("END:VCALENDAR\n")
progress.console.print(f"Saved events to {output_file}")
return len(events)

123
src/utils/file_icons.py Normal file
View File

@@ -0,0 +1,123 @@
import os
def get_file_icon(name, is_folder, with_color=False):
"""Return a Nerd Font glyph based on file type or extension, optionally with color markup."""
icon = ""
color = ""
if is_folder:
icon = "\uf07b" # Nerd Font folder icon
color = "#FFB86C" # Folder color (orange/yellow)
else:
# Get the file extension
_, ext = os.path.splitext(name.lower())
# Map extensions to icons and colors
icons = {
# Documents
".pdf": ("\uf1c1", "#8BE9FD"), # PDF - cyan
".doc": ("\uf1c2", "#8BE9FD"), ".docx": ("\uf1c2", "#8BE9FD"), # Word - cyan
".xls": ("\uf1c3", "#8BE9FD"), ".xlsx": ("\uf1c3", "#8BE9FD"), # Excel - cyan
".ppt": ("\uf1c4", "#8BE9FD"), ".pptx": ("\uf1c4", "#8BE9FD"), # PowerPoint - cyan
".txt": ("\uf15c", "#8BE9FD"), # Text - cyan
".md": ("\uf48a", "#8BE9FD"), # Markdown - cyan
".rtf": ("\uf15c", "#8BE9FD"), # RTF - cyan
".odt": ("\uf1c2", "#8BE9FD"), # ODT - cyan
# Code/Development
".py": ("\ue73c", "#BD93F9"), # Python - purple
".js": ("\ue781", "#BD93F9"), # JavaScript - purple
".ts": ("\ue628", "#BD93F9"), # TypeScript - purple
".html": ("\uf13b", "#BD93F9"), ".htm": ("\uf13b", "#BD93F9"), # HTML - purple
".css": ("\uf13c", "#BD93F9"), # CSS - purple
".json": ("\ue60b", "#BD93F9"), # JSON - purple
".xml": ("\uf121", "#BD93F9"), # XML - purple
".yml": ("\uf481", "#BD93F9"), ".yaml": ("\uf481", "#BD93F9"), # YAML - purple
".sh": ("\uf489", "#BD93F9"), # Shell script - purple
".bat": ("\uf489", "#BD93F9"), # Batch - purple
".ps1": ("\uf489", "#BD93F9"), # PowerShell - purple
".cpp": ("\ue61d", "#BD93F9"), ".c": ("\ue61e", "#BD93F9"), # C/C++ - purple
".java": ("\ue738", "#BD93F9"), # Java - purple
".rb": ("\ue739", "#BD93F9"), # Ruby - purple
".go": ("\ue724", "#BD93F9"), # Go - purple
".php": ("\ue73d", "#BD93F9"), # PHP - purple
# Images
".jpg": ("\uf1c5", "#50FA7B"), ".jpeg": ("\uf1c5", "#50FA7B"), # JPEG - green
".png": ("\uf1c5", "#50FA7B"), # PNG - green
".gif": ("\uf1c5", "#50FA7B"), # GIF - green
".svg": ("\uf1c5", "#50FA7B"), # SVG - green
".bmp": ("\uf1c5", "#50FA7B"), # BMP - green
".tiff": ("\uf1c5", "#50FA7B"), ".tif": ("\uf1c5", "#50FA7B"), # TIFF - green
".ico": ("\uf1c5", "#50FA7B"), # ICO - green
# Media
".mp3": ("\uf1c7", "#FF79C6"), # Audio - pink
".wav": ("\uf1c7", "#FF79C6"), # Audio - pink
".ogg": ("\uf1c7", "#FF79C6"), # Audio - pink
".mp4": ("\uf1c8", "#FF79C6"), # Video - pink
".avi": ("\uf1c8", "#FF79C6"), # Video - pink
".mov": ("\uf1c8", "#FF79C6"), # Video - pink
".mkv": ("\uf1c8", "#FF79C6"), # Video - pink
".wmv": ("\uf1c8", "#FF79C6"), # Video - pink
# Archives
".zip": ("\uf1c6", "#FF5555"), # ZIP - red
".rar": ("\uf1c6", "#FF5555"), # RAR - red
".7z": ("\uf1c6", "#FF5555"), # 7z - red
".tar": ("\uf1c6", "#FF5555"), ".gz": ("\uf1c6", "#FF5555"), # TAR/GZ - red
".bz2": ("\uf1c6", "#FF5555"), # BZ2 - red
# Others
".exe": ("\uf085", "#F8F8F2"), # Executable - white
".iso": ("\uf0a0", "#F8F8F2"), # ISO - white
".dll": ("\uf085", "#F8F8F2"), # DLL - white
".db": ("\uf1c0", "#F8F8F2"), # Database - white
".sql": ("\uf1c0", "#F8F8F2"), # SQL - white
}
# Set default icon and color for unknown file types
icon = "\uf15b" # Default file icon
color = "#F8F8F2" # Default color (white)
# Get icon and color from the map if the extension exists
if ext in icons:
icon, color = icons[ext]
# Return either plain icon or with color markup
if with_color:
return f"[{color}]{icon}[/]"
else:
return icon
def get_icon_class(name, is_folder):
"""Determine CSS class for the icon based on file type."""
if is_folder:
return "folder-icon"
# Get the file extension
_, ext = os.path.splitext(name.lower())
# Document files
if ext in [".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".txt", ".md", ".rtf", ".odt"]:
return "document-icon"
# Code files
elif ext in [".py", ".js", ".ts", ".html", ".htm", ".css", ".json", ".xml", ".yml", ".yaml",
".sh", ".bat", ".ps1", ".cpp", ".c", ".java", ".rb", ".go", ".php"]:
return "code-icon"
# Image files
elif ext in [".jpg", ".jpeg", ".png", ".gif", ".svg", ".bmp", ".tiff", ".tif", ".ico"]:
return "image-icon"
# Archive files
elif ext in [".zip", ".rar", ".7z", ".tar", ".gz", ".bz2"]:
return "archive-icon"
# Media files
elif ext in [".mp3", ".wav", ".ogg", ".mp4", ".avi", ".mov", ".mkv", ".wmv"]:
return "media-icon"
# Default for other file types
return ""

View File

@@ -0,0 +1,3 @@
"""
Mail utilities module for email operations.
"""

View File

@@ -0,0 +1,133 @@
"""
Mail utility helper functions.
"""
import os
import json
import time
from datetime import datetime
import email.utils
def truncate_id(message_id, length=8):
"""
Truncate a message ID to a reasonable length for display.
Args:
message_id (str): The message ID to truncate.
length (int): The number of characters to keep.
Returns:
str: The truncated message ID.
"""
if not message_id:
return ""
if len(message_id) <= length:
return message_id
return f"{message_id[:length]}..."
def load_last_sync_timestamp():
"""
Load the last synchronization timestamp from a file.
Returns:
float: The timestamp of the last synchronization, or 0 if not available.
"""
try:
with open('sync_timestamp.json', 'r') as f:
data = json.load(f)
return data.get('timestamp', 0)
except (FileNotFoundError, json.JSONDecodeError):
return 0
def save_sync_timestamp():
"""
Save the current timestamp as the last synchronization timestamp.
Returns:
None
"""
current_time = time.time()
with open('sync_timestamp.json', 'w') as f:
json.dump({'timestamp': current_time}, f)
def format_datetime(dt_str, format_string="%m/%d %I:%M %p"):
"""
Format a datetime string from ISO format.
Args:
dt_str (str): ISO format datetime string.
format_string (str): Format string for the output.
Returns:
str: Formatted datetime string.
"""
if not dt_str:
return ""
try:
dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
return dt.strftime(format_string)
except (ValueError, AttributeError):
return dt_str
def format_mime_date(dt_str):
"""
Format a datetime string from ISO format to RFC 5322 format for MIME Date headers.
Args:
dt_str (str): ISO format datetime string.
Returns:
str: Formatted datetime string in RFC 5322 format.
"""
if not dt_str:
return ""
try:
dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
return email.utils.format_datetime(dt)
except (ValueError, AttributeError):
return dt_str
def safe_filename(filename):
"""
Convert a string to a safe filename.
Args:
filename (str): Original filename.
Returns:
str: Safe filename with invalid characters replaced.
"""
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename
def ensure_directory_exists(directory):
"""
Ensure that a directory exists, creating it if necessary.
Args:
directory (str): The directory path to check/create.
Returns:
None
"""
if not os.path.exists(directory):
os.makedirs(directory)
def parse_maildir_name(filename):
"""
Parse a Maildir filename to extract components.
Args:
filename (str): The maildir filename.
Returns:
tuple: (message_id, flags) components of the filename.
"""
# Maildir filename format: unique-id:flags
if ':' in filename:
message_id, flags = filename.split(':', 1)
else:
message_id = filename
flags = ''
return message_id, flags

View File

@@ -0,0 +1,270 @@
"""
Maildir operations for handling local mail storage.
"""
import os
import email
import base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import time
import aiohttp
import re
from src.utils.calendar_utils import truncate_id
from utils.mail_utils.helpers import safe_filename, ensure_directory_exists, format_datetime, format_mime_date
async def save_mime_to_maildir_async(maildir_path, message, attachments_dir, headers, progress, dry_run=False, download_attachments=False):
"""
Save a message from Microsoft Graph API to a Maildir.
Args:
maildir_path (str): Path to the Maildir.
message (dict): Message data from Microsoft Graph API.
attachments_dir (str): Path to save attachments.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
dry_run (bool): If True, don't actually save files.
download_attachments (bool): If True, download email attachments.
Returns:
None
"""
message_id = message.get('id', '')
# Determine target directory based on read status
target_dir = os.path.join(maildir_path, 'cur' if message.get('isRead', False) else 'new')
ensure_directory_exists(target_dir)
# Check if the file already exists in either new or cur
new_path = os.path.join(maildir_path, 'new', f"{message_id}.eml")
cur_path = os.path.join(maildir_path, 'cur', f"{message_id}.eml")
if os.path.exists(new_path) or os.path.exists(cur_path):
return # Skip if already exists
# Create MIME email
mime_msg = await create_mime_message_async(message, headers, attachments_dir, progress, download_attachments)
# Only save file if not in dry run mode
if not dry_run:
with open(os.path.join(target_dir, f"{message_id}.eml"), 'wb') as f:
f.write(mime_msg.as_bytes())
else:
progress.console.print(f"[DRY-RUN] Would save message: {message.get('subject', 'No Subject')}")
async def create_mime_message_async(message, headers, attachments_dir, progress, download_attachments=False):
"""
Create a MIME message from Microsoft Graph API message data.
Args:
message (dict): Message data from Microsoft Graph API.
headers (dict): Headers including authentication.
attachments_dir (str): Path to save attachments.
progress: Progress instance for updating progress bars.
download_attachments (bool): If True, download email attachments.
Returns:
MIMEMultipart: The MIME message.
"""
# Create a new MIMEMultipart message
mime_msg = MIMEMultipart()
# Message headers
mime_msg['Message-ID'] = message.get('id', '')
mime_msg['Subject'] = message.get('subject', 'No Subject')
# Sender information
sender = message.get('from', {}).get('emailAddress', {})
if sender:
mime_msg['From'] = f"{sender.get('name', '')} <{sender.get('address', '')}>".strip()
# Recipients
to_recipients = message.get('toRecipients', [])
cc_recipients = message.get('ccRecipients', [])
if to_recipients:
to_list = [f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() for r in to_recipients]
mime_msg['To'] = ', '.join(to_list)
if cc_recipients:
cc_list = [f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() for r in cc_recipients]
mime_msg['Cc'] = ', '.join(cc_list)
# Date - using the new format_mime_date function to ensure RFC 5322 compliance
received_datetime = message.get('receivedDateTime', '')
if received_datetime:
mime_msg['Date'] = format_mime_date(received_datetime)
# First try the direct body content approach
message_id = message.get('id', '')
try:
# First get the message with body content
body_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}?$select=body,bodyPreview"
async with aiohttp.ClientSession() as session:
async with session.get(body_url, headers=headers) as response:
if response.status == 200:
body_data = await response.json()
# Get body content
body_content = body_data.get('body', {}).get('content', '')
body_type = body_data.get('body', {}).get('contentType', 'text')
body_preview = body_data.get('bodyPreview', '')
# If we have body content, use it
if body_content:
if body_type.lower() == 'html':
# Add both HTML and plain text versions
# Plain text conversion
plain_text = re.sub(r'<br\s*/?>', '\n', body_content)
plain_text = re.sub(r'<[^>]*>', '', plain_text)
mime_msg.attach(MIMEText(plain_text, 'plain'))
mime_msg.attach(MIMEText(body_content, 'html'))
else:
# Just plain text
mime_msg.attach(MIMEText(body_content, 'plain'))
elif body_preview:
# Use preview if we have it
mime_msg.attach(MIMEText(f"{body_preview}\n\n[Message preview only. Full content not available.]", 'plain'))
else:
# Fallback to MIME content
progress.console.print(f"No direct body content for message {truncate_id(message_id)}, trying MIME content...")
await fetch_mime_content(mime_msg, message_id, headers, progress)
else:
progress.console.print(f"Failed to get message body: {response.status}. Trying MIME content...")
await fetch_mime_content(mime_msg, message_id, headers, progress)
except Exception as e:
progress.console.print(f"Error getting message body: {e}. Trying MIME content...")
await fetch_mime_content(mime_msg, message_id, headers, progress)
# Handle attachments only if we want to download them
if download_attachments:
await add_attachments_async(mime_msg, message, headers, attachments_dir, progress)
else:
# Add a header to indicate attachment info was skipped
mime_msg['X-Attachments-Skipped'] = 'True'
return mime_msg
async def fetch_mime_content(mime_msg, message_id, headers, progress):
"""
Fetch and add MIME content to a message when direct body access fails.
Args:
mime_msg (MIMEMultipart): The message to add content to.
message_id (str): Message ID.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
"""
# Fallback to getting the MIME content
message_content_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value"
try:
async with aiohttp.ClientSession() as session:
async with session.get(message_content_url, headers=headers) as response:
if response.status == 200:
full_content = await response.text()
# Check for body tags
body_match = re.search(r'<body[^>]*>(.*?)</body>', full_content, re.DOTALL | re.IGNORECASE)
if body_match:
body_content = body_match.group(1)
# Simple HTML to text conversion
body_text = re.sub(r'<br\s*/?>', '\n', body_content)
body_text = re.sub(r'<[^>]*>', '', body_text)
# Add the plain text body
mime_msg.attach(MIMEText(body_text, 'plain'))
# Also add the HTML body
mime_msg.attach(MIMEText(full_content, 'html'))
else:
# Fallback - try to find content between Content-Type: text/html and next boundary
html_parts = re.findall(r'Content-Type: text/html.*?\r?\n\r?\n(.*?)(?:\r?\n\r?\n|$)',
full_content, re.DOTALL | re.IGNORECASE)
if html_parts:
html_content = html_parts[0]
mime_msg.attach(MIMEText(html_content, 'html'))
# Also make plain text version
plain_text = re.sub(r'<br\s*/?>', '\n', html_content)
plain_text = re.sub(r'<[^>]*>', '', plain_text)
mime_msg.attach(MIMEText(plain_text, 'plain'))
else:
# Just use the raw content as text if nothing else works
mime_msg.attach(MIMEText(full_content, 'plain'))
progress.console.print(f"Using raw content for message {message_id} - no body tags found")
else:
error_text = await response.text()
progress.console.print(f"Failed to get MIME content: {response.status} {error_text}")
mime_msg.attach(MIMEText(f"Failed to retrieve message body: HTTP {response.status}", 'plain'))
except Exception as e:
progress.console.print(f"Error retrieving MIME content: {e}")
mime_msg.attach(MIMEText(f"Failed to retrieve message body: {str(e)}", 'plain'))
async def add_attachments_async(mime_msg, message, headers, attachments_dir, progress):
"""
Add attachments to a MIME message.
Args:
mime_msg (MIMEMultipart): The MIME message to add attachments to.
message (dict): Message data from Microsoft Graph API.
headers (dict): Headers including authentication.
attachments_dir (str): Path to save attachments.
progress: Progress instance for updating progress bars.
Returns:
None
"""
message_id = message.get('id', '')
# Get attachments list
attachments_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments"
async with aiohttp.ClientSession() as session:
async with session.get(attachments_url, headers=headers) as response:
if response.status != 200:
return
attachments_data = await response.json()
attachments = attachments_data.get('value', [])
if not attachments:
return
# Create a directory for this message's attachments
message_attachments_dir = os.path.join(attachments_dir, message_id)
ensure_directory_exists(message_attachments_dir)
# Add a header with attachment count
mime_msg['X-Attachment-Count'] = str(len(attachments))
for idx, attachment in enumerate(attachments):
attachment_name = safe_filename(attachment.get('name', 'attachment'))
attachment_type = attachment.get('contentType', 'application/octet-stream')
# Add attachment info to headers for reference
mime_msg[f'X-Attachment-{idx+1}-Name'] = attachment_name
mime_msg[f'X-Attachment-{idx+1}-Type'] = attachment_type
attachment_part = MIMEBase(*attachment_type.split('/', 1))
# Get attachment content
if 'contentBytes' in attachment:
attachment_content = base64.b64decode(attachment['contentBytes'])
# Save attachment to disk
attachment_path = os.path.join(message_attachments_dir, attachment_name)
with open(attachment_path, 'wb') as f:
f.write(attachment_content)
# Add to MIME message
attachment_part.set_payload(attachment_content)
encoders.encode_base64(attachment_part)
attachment_part.add_header('Content-Disposition', f'attachment; filename="{attachment_name}"')
mime_msg.attach(attachment_part)
progress.console.print(f"Downloaded attachment: {attachment_name}")
else:
progress.console.print(f"Skipping attachment with no content: {attachment_name}")