import os import io import asyncio import tempfile from typing import Optional, Tuple, Set from pathlib import Path import aiohttp import mammoth from docx import Document from textual.app import ComposeResult from textual.binding import Binding from textual.containers import Container, ScrollableContainer, Horizontal, Vertical from textual.screen import Screen from textual.widgets import Label, Markdown, LoadingIndicator, Button, Footer from textual.worker import Worker, get_current_worker from textual import work from textual.reactive import Reactive, reactive # Define convertible formats PDF_CONVERTIBLE_FORMATS = { "doc", "docx", "epub", "eml", "htm", "html", "md", "msg", "odp", "ods", "odt", "pps", "ppsx", "ppt", "pptx", "rtf", "tif", "tiff", "xls", "xlsm", "xlsx" } JPG_CONVERTIBLE_FORMATS = { "3g2", "3gp", "3gp2", "3gpp", "3mf", "ai", "arw", "asf", "avi", "bas", "bash", "bat", "bmp", "c", "cbl", "cmd", "cool", "cpp", "cr2", "crw", "cs", "css", "csv", "cur", "dcm", "dcm30", "dic", "dicm", "dicom", "dng", "doc", "docx", "dwg", "eml", "epi", "eps", "epsf", "epsi", "epub", "erf", "fbx", "fppx", "gif", "glb", "h", "hcp", "heic", "heif", "htm", "html", "ico", "icon", "java", "jfif", "jpeg", "jpg", "js", "json", "key", "log", "m2ts", "m4a", "m4v", "markdown", "md", "mef", "mov", "movie", "mp3", "mp4", "mp4v", "mrw", "msg", "mts", "nef", "nrw", "numbers", "obj", "odp", "odt", "ogg", "orf", "pages", "pano", "pdf", "pef", "php", "pict", "pl", "ply", "png", "pot", "potm", "potx", "pps", "ppsx", "ppsxm", "ppt", "pptm", "pptx", "ps", "ps1", "psb", "psd", "py", "raw", "rb", "rtf", "rw1", "rw2", "sh", "sketch", "sql", "sr2", "stl", "tif", "tiff", "ts", "txt", "vb", "webm", "wma", "wmv", "xaml", "xbm", "xcf", "xd", "xml", "xpm", "yaml", "yml" } class DocumentViewerScreen(Screen): """Screen for viewing document content from OneDrive items.""" web_url: Reactive[str] = reactive("") download_url: Reactive[str] = reactive("") BINDINGS = [ Binding("escape", "close", "Close"), Binding("q", "close", "Close"), Binding("m", "toggle_mode", "Toggle Mode"), Binding("e", "export_and_open", "Export & Open"), ] def __init__(self, item_id: str, item_name: str, access_token: str, drive_id: str): """Initialize the document viewer screen. Args: item_id: The ID of the item to view. item_name: The name of the item to display. access_token: The access token for API requests. drive_id: The ID of the drive containing the item. """ super().__init__() self.item_id = item_id self.drive_id = drive_id self.item_name = item_name self.access_token = access_token self.document_content = "" self.plain_text_content = "" self.is_markdown_mode = False self.content_type = None self.raw_content = None self.file_extension = Path(item_name).suffix.lower().lstrip('.') def compose(self) -> ComposeResult: """Compose the document viewer screen.""" yield Container( Horizontal( Container( Label(f"Viewing: {self.item_name}", id="document_title"), Label(f'[link="{self.web_url}"]Open on Web[/link] | [link="{self.download_url}"]Download File[/link]', id="document_link"), ), Container( Button("Close", id="close_button"), id="button_container" ), id="top_container" ), ScrollableContainer( Markdown("", id="markdown_content"), Label("", id="plaintext_content", classes="hidden", markup=False), id="content_container", ), id="document_viewer" ) yield Footer() def on_mount(self) -> None: """Handle screen mount event.""" self.download_document() def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button press events.""" if event.button.id == "close_button": self.dismiss() elif event.button.id == "toggle_mode_button": self.action_toggle_mode() elif event.button.id == "export_button": self.action_export_and_open() def is_convertible_format(self) -> bool: """Check if the current file is convertible to PDF or JPG.""" return (self.file_extension in PDF_CONVERTIBLE_FORMATS or self.file_extension in JPG_CONVERTIBLE_FORMATS) def get_conversion_format(self) -> str: """Get the appropriate conversion format (pdf or jpg) for the current file.""" if self.file_extension in PDF_CONVERTIBLE_FORMATS: return "pdf" elif self.file_extension in JPG_CONVERTIBLE_FORMATS: return "jpg" return None @work async def download_document(self) -> None: """Download the document content.""" headers = {"Authorization": f"Bearer {self.access_token}"} try: metadataUrl = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}" async with aiohttp.ClientSession() as session: async with session.get(metadataUrl, headers=headers) as response: if response.status != 200: error_text = await response.text() self.notify(f"Failed to fetch document metadata: {error_text}", severity="error") return metadata = await response.json() self.item_name = metadata.get("name", self.item_name) self.file_extension = Path(self.item_name).suffix.lower().lstrip('.') self.download_url = metadata.get("@microsoft.graph.downloadUrl", "") self.web_url = metadata.get("webUrl", "") except Exception as e: self.notify(f"Error downloading document: {str(e)}", severity="error") try: url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content" # Show loading indicator self.query_one("#content_container").loading = True async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: error_text = await response.text() self.notify(f"Failed to download document: {error_text}", severity="error") return self.content_type = response.headers.get("content-type", "") self.raw_content = await response.read() # Process the content based on content type self.process_content() except Exception as e: self.notify(f"Error downloading document: {str(e)}", severity="error") finally: # Hide loading indicator self.query_one("#content_container").loading = False @work async def process_content(self) -> None: """Process the downloaded content based on its type.""" if not self.raw_content: self.notify("No content to display", severity="warning") return try: # Check for Office document types if self.content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": # Process as DOCX self.process_docx() elif self.content_type.startswith("text/"): # Process as plain text text_content = self.raw_content.decode("utf-8", errors="replace") self.document_content = text_content self.update_content_display() elif self.content_type.startswith("image/"): # For images, just display a message self.document_content = f"*Image file: {self.item_name}*\n\nUse the 'Open URL' command to view this image in your browser." self.update_content_display() else: # For other types, display a generic message conversion_info = "" if self.is_convertible_format(): conversion_format = self.get_conversion_format() conversion_info = f"\n\nThis file can be converted to {conversion_format.upper()}. Press 'e' or click 'Export & Open' to convert and view." self.document_content = f"*File: {self.item_name}*\n\nContent type: {self.content_type}{conversion_info}\n\nThis file type cannot be displayed directly in the viewer. You could [open in your browser]({self.web_url}), or [download the file]({self.download_url})." self.is_markdown_mode = True self.update_content_display() except Exception as e: self.notify(f"Error processing content: {str(e)}", severity="error") @work async def process_docx(self) -> None: """Process DOCX content and convert to Markdown and plain text.""" try: # Save the DOCX content to a temporary file with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file: temp_file.write(self.raw_content) temp_path = temp_file.name # Convert DOCX to Markdown using mammoth with open(temp_path, "rb") as docx_file: result = mammoth.convert_to_markdown(docx_file) markdown_text = result.value # Read the document structure with python-docx for plain text doc = Document(temp_path) self.plain_text_content = "\n\n".join([para.text for para in doc.paragraphs if para.text]) self.document_content = markdown_text # Clean up temporary file os.unlink(temp_path) # Store both versions self.update_content_display() except Exception as e: self.notify(f"Error processing DOCX: {str(e)}", severity="error") def update_content_display(self) -> None: """Update the content display with the processed document content.""" markdown_widget = self.query_one("#markdown_content", Markdown) plaintext_widget = self.query_one("#plaintext_content", Label) if self.is_markdown_mode: markdown_widget.update(self.document_content) markdown_widget.remove_class("hidden") plaintext_widget.add_class("hidden") else: plaintext_widget.update(self.plain_text_content) plaintext_widget.remove_class("hidden") markdown_widget.add_class("hidden") @work async def export_and_open_converted_file(self) -> None: """Export the file in converted format and open it.""" if not self.is_convertible_format(): self.notify("This file format cannot be converted.", severity="warning") return conversion_format = self.get_conversion_format() if not conversion_format: self.notify("No appropriate conversion format found.", severity="error") return try: # Build the URL with the format parameter url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content?format={conversion_format}" headers = {"Authorization": f"Bearer {self.access_token}"} # Download the converted file async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: error_text = await response.text() self.notify(f"Failed to export document: {error_text}", severity="error") return converted_content = await response.read() # Create temporary file with the right extension file_name = f"{os.path.splitext(self.item_name)[0]}.{conversion_format}" with tempfile.NamedTemporaryFile(suffix=f".{conversion_format}", delete=False, prefix=f"onedrive_export_") as temp_file: temp_file.write(converted_content) temp_path = temp_file.name # Open the file using the system default application self.notify(f"Opening exported {conversion_format.upper()} file: {file_name}") self.app.open_url(f"file://{temp_path}") self.query_one("#content_container").loading = False except Exception as e: self.notify(f"Error exporting document: {str(e)}", severity="error") async def action_toggle_mode(self) -> None: """Toggle between Markdown and plaintext display modes.""" self.notify("Switching Modes", severity="info") self.is_markdown_mode = not self.is_markdown_mode self.update_content_display() mode_name = "Markdown" if self.is_markdown_mode else "Plain Text" self.notify(f"Switched to {mode_name} mode") async def action_export_and_open(self) -> None: """Export the file in converted format and open it.""" self.query_one("#content_container").loading = True self.notify("Exporting and opening the converted file...") self.export_and_open_converted_file() async def action_close(self) -> None: """Close the document viewer screen.""" self.dismiss()