Files
luk/maildir_gtd/screens/DocumentViewer.py
2025-05-12 17:19:34 -06:00

316 lines
14 KiB
Python

import os
import io
import asyncio
import tempfile
from typing import Optional, Tuple, Set
from pathlib import Path
import aiohttp
import mammoth
from docx import Document
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container, ScrollableContainer, Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Label, Markdown, LoadingIndicator, Button, Footer
from textual.worker import Worker, get_current_worker
from textual import work
from textual.reactive import Reactive, reactive
# Define convertible formats
PDF_CONVERTIBLE_FORMATS = {
"doc", "docx", "epub", "eml", "htm", "html", "md", "msg", "odp",
"ods", "odt", "pps", "ppsx", "ppt", "pptx", "rtf", "tif", "tiff",
"xls", "xlsm", "xlsx"
}
JPG_CONVERTIBLE_FORMATS = {
"3g2", "3gp", "3gp2", "3gpp", "3mf", "ai", "arw", "asf", "avi",
"bas", "bash", "bat", "bmp", "c", "cbl", "cmd", "cool", "cpp",
"cr2", "crw", "cs", "css", "csv", "cur", "dcm", "dcm30", "dic",
"dicm", "dicom", "dng", "doc", "docx", "dwg", "eml", "epi", "eps",
"epsf", "epsi", "epub", "erf", "fbx", "fppx", "gif", "glb", "h",
"hcp", "heic", "heif", "htm", "html", "ico", "icon", "java", "jfif",
"jpeg", "jpg", "js", "json", "key", "log", "m2ts", "m4a", "m4v",
"markdown", "md", "mef", "mov", "movie", "mp3", "mp4", "mp4v", "mrw",
"msg", "mts", "nef", "nrw", "numbers", "obj", "odp", "odt", "ogg",
"orf", "pages", "pano", "pdf", "pef", "php", "pict", "pl", "ply",
"png", "pot", "potm", "potx", "pps", "ppsx", "ppsxm", "ppt", "pptm",
"pptx", "ps", "ps1", "psb", "psd", "py", "raw", "rb", "rtf", "rw1",
"rw2", "sh", "sketch", "sql", "sr2", "stl", "tif", "tiff", "ts",
"txt", "vb", "webm", "wma", "wmv", "xaml", "xbm", "xcf", "xd", "xml",
"xpm", "yaml", "yml"
}
class DocumentViewerScreen(Screen):
"""Screen for viewing document content from OneDrive items."""
web_url: Reactive[str] = reactive("")
download_url: Reactive[str] = reactive("")
BINDINGS = [
Binding("escape", "close", "Close"),
Binding("q", "close", "Close"),
Binding("m", "toggle_mode", "Toggle Mode"),
Binding("e", "export_and_open", "Export & Open"),
]
def __init__(self, item_id: str, item_name: str, access_token: str, drive_id: str):
"""Initialize the document viewer screen.
Args:
item_id: The ID of the item to view.
item_name: The name of the item to display.
access_token: The access token for API requests.
drive_id: The ID of the drive containing the item.
"""
super().__init__()
self.item_id = item_id
self.drive_id = drive_id
self.item_name = item_name
self.access_token = access_token
self.document_content = ""
self.plain_text_content = ""
self.is_markdown_mode = False
self.content_type = None
self.raw_content = None
self.file_extension = Path(item_name).suffix.lower().lstrip('.')
def compose(self) -> ComposeResult:
"""Compose the document viewer screen."""
yield Container(
Horizontal(
Container(
Button("", id="close_button"),
id="button_container"
),
Container(
Label(f"Viewing: {self.item_name}", id="document_title"),
Label(f'[link="{self.web_url}"]Open on Web[/link] | [link="{self.download_url}"]Download File[/link]', id="document_link"),
),
id="top_container"
),
ScrollableContainer(
Markdown("", id="markdown_content"),
Label("", id="plaintext_content", classes="hidden", markup=False),
id="content_container",
),
id="document_viewer"
)
yield Footer()
def on_mount(self) -> None:
"""Handle screen mount event."""
self.query_one("#content_container").focus()
self.download_document()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press events."""
if event.button.id == "close_button":
self.dismiss()
elif event.button.id == "toggle_mode_button":
self.action_toggle_mode()
elif event.button.id == "export_button":
self.action_export_and_open()
def is_convertible_format(self) -> bool:
"""Check if the current file is convertible to PDF or JPG."""
return (self.file_extension in PDF_CONVERTIBLE_FORMATS or
self.file_extension in JPG_CONVERTIBLE_FORMATS)
def get_conversion_format(self) -> str:
"""Get the appropriate conversion format (pdf or jpg) for the current file."""
if self.file_extension in PDF_CONVERTIBLE_FORMATS:
return "pdf"
elif self.file_extension in JPG_CONVERTIBLE_FORMATS:
return "jpg"
return None
@work
async def download_document(self) -> None:
"""Download the document content."""
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
metadataUrl = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}"
async with aiohttp.ClientSession() as session:
async with session.get(metadataUrl, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(f"Failed to fetch document metadata: {error_text}", severity="error")
return
metadata = await response.json()
self.item_name = metadata.get("name", self.item_name)
self.file_extension = Path(self.item_name).suffix.lower().lstrip('.')
self.download_url = metadata.get("@microsoft.graph.downloadUrl", "")
self.web_url = metadata.get("webUrl", "")
except Exception as e:
self.notify(f"Error downloading document: {str(e)}", severity="error")
try:
url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content"
# Show loading indicator
self.query_one("#content_container").loading = True
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(f"Failed to download document: {error_text}", severity="error")
return
self.content_type = response.headers.get("content-type", "")
self.raw_content = await response.read()
# Process the content based on content type
self.process_content()
except Exception as e:
self.notify(f"Error downloading document: {str(e)}", severity="error")
finally:
# Hide loading indicator
self.query_one("#content_container").loading = False
@work
async def process_content(self) -> None:
"""Process the downloaded content based on its type."""
if not self.raw_content:
self.notify("No content to display", severity="warning")
return
try:
# Check for Office document types
if self.content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
# Process as DOCX
self.process_docx()
elif self.content_type.startswith("text/"):
# Process as plain text
text_content = self.raw_content.decode("utf-8", errors="replace")
self.document_content = text_content
self.update_content_display()
elif self.content_type.startswith("image/"):
# For images, just display a message
self.document_content = f"*Image file: {self.item_name}*\n\nUse the 'Open URL' command to view this image in your browser."
self.update_content_display()
else:
# For other types, display a generic message
conversion_info = ""
if self.is_convertible_format():
conversion_format = self.get_conversion_format()
conversion_info = f"\n\nThis file can be converted to {conversion_format.upper()}. Press 'e' or click 'Export & Open' to convert and view."
self.document_content = f"*File: {self.item_name}*\n\nContent type: {self.content_type}{conversion_info}\n\nThis file type cannot be displayed directly in the viewer. You could [open in your browser]({self.web_url}), or [download the file]({self.download_url})."
self.is_markdown_mode = True
self.update_content_display()
except Exception as e:
self.notify(f"Error processing content: {str(e)}", severity="error")
@work
async def process_docx(self) -> None:
"""Process DOCX content and convert to Markdown and plain text."""
try:
# Save the DOCX content to a temporary file
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file:
temp_file.write(self.raw_content)
temp_path = temp_file.name
# Convert DOCX to Markdown using mammoth
with open(temp_path, "rb") as docx_file:
result = mammoth.convert_to_markdown(docx_file)
markdown_text = result.value
# Read the document structure with python-docx for plain text
doc = Document(temp_path)
self.plain_text_content = "\n\n".join([para.text for para in doc.paragraphs if para.text])
self.document_content = markdown_text
# Clean up temporary file
os.unlink(temp_path)
# Store both versions
self.update_content_display()
except Exception as e:
self.notify(f"Error processing DOCX: {str(e)}", severity="error")
def update_content_display(self) -> None:
"""Update the content display with the processed document content."""
markdown_widget = self.query_one("#markdown_content", Markdown)
plaintext_widget = self.query_one("#plaintext_content", Label)
if self.is_markdown_mode:
markdown_widget.update(self.document_content)
markdown_widget.remove_class("hidden")
plaintext_widget.add_class("hidden")
else:
plaintext_widget.update(self.plain_text_content)
plaintext_widget.remove_class("hidden")
markdown_widget.add_class("hidden")
@work
async def export_and_open_converted_file(self) -> None:
"""Export the file in converted format and open it."""
if not self.is_convertible_format():
self.notify("This file format cannot be converted.", severity="warning")
return
conversion_format = self.get_conversion_format()
if not conversion_format:
self.notify("No appropriate conversion format found.", severity="error")
return
try:
# Build the URL with the format parameter
url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content?format={conversion_format}"
headers = {"Authorization": f"Bearer {self.access_token}"}
# Download the converted file
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.notify(f"Failed to export document: {error_text}", severity="error")
return
converted_content = await response.read()
# Create temporary file with the right extension
file_name = f"{os.path.splitext(self.item_name)[0]}.{conversion_format}"
with tempfile.NamedTemporaryFile(suffix=f".{conversion_format}",
delete=False,
prefix=f"onedrive_export_") as temp_file:
temp_file.write(converted_content)
temp_path = temp_file.name
# Open the file using the system default application
self.notify(f"Opening exported {conversion_format.upper()} file: {file_name}")
self.app.open_url(f"file://{temp_path}")
self.query_one("#content_container").loading = False
except Exception as e:
self.notify(f"Error exporting document: {str(e)}", severity="error")
async def action_toggle_mode(self) -> None:
"""Toggle between Markdown and plaintext display modes."""
self.notify("Switching Modes", severity="info")
self.is_markdown_mode = not self.is_markdown_mode
self.update_content_display()
mode_name = "Markdown" if self.is_markdown_mode else "Plain Text"
self.notify(f"Switched to {mode_name} mode")
async def action_export_and_open(self) -> None:
"""Export the file in converted format and open it."""
self.query_one("#content_container").loading = True
self.notify("Exporting and opening the converted file...")
self.export_and_open_converted_file()
async def action_close(self) -> None:
"""Close the document viewer screen."""
self.dismiss()