import io import os import tempfile from pathlib import Path from typing import ByteString import aiohttp import mammoth from docx import Document from textual_image.renderable import Image from openai import OpenAI from textual.app import ComposeResult from textual.binding import Binding from textual.containers import Container, ScrollableContainer, Horizontal from textual.screen import Screen from textual.widgets import Label, Markdown, Button, Footer, Static from textual import work from textual.reactive import reactive from PIL import Image as PILImage # Define convertible formats PDF_CONVERTIBLE_FORMATS = { "doc", "docx", "epub", "eml", "htm", "html", "md", "msg", "odp", "ods", "odt", "pps", "ppsx", "ppt", "pptx", "rtf", "tif", "tiff", "xls", "xlsm", "xlsx", } JPG_CONVERTIBLE_FORMATS = { "3g2", "3gp", "3gp2", "3gpp", "3mf", "ai", "arw", "asf", "avi", "bas", "bash", "bat", "bmp", "c", "cbl", "cmd", "cool", "cpp", "cr2", "crw", "cs", "css", "csv", "cur", "dcm", "dcm30", "dic", "dicm", "dicom", "dng", "doc", "docx", "dwg", "eml", "epi", "eps", "epsf", "epsi", "epub", "erf", "fbx", "fppx", "gif", "glb", "h", "hcp", "heic", "heif", "htm", "html", "ico", "icon", "java", "jfif", "jpeg", "jpg", "js", "json", "key", "log", "m2ts", "m4a", "m4v", "markdown", "md", "mef", "mov", "movie", "mp3", "mp4", "mp4v", "mrw", "msg", "mts", "nef", "nrw", "numbers", "obj", "odp", "odt", "ogg", "orf", "pages", "pano", "pdf", "pef", "php", "pict", "pl", "ply", "png", "pot", "potm", "potx", "pps", "ppsx", "ppsxm", "ppt", "pptm", "pptx", "ps", "ps1", "psb", "psd", "py", "raw", "rb", "rtf", "rw1", "rw2", "sh", "sketch", "sql", "sr2", "stl", "tif", "tiff", "ts", "txt", "vb", "webm", "wma", "wmv", "xaml", "xbm", "xcf", "xd", "xml", "xpm", "yaml", "yml", } # Enum for display modes class DisplayMode: IMAGE = "image" TEXT = "text" MARKDOWN = "markdown" class DocumentViewerScreen(Screen): """Screen for viewing document content from OneDrive items.""" web_url = reactive("") download_url = reactive("") use_markitdown = True image_bytes: ByteString = b"" BINDINGS = [ Binding("escape", "close", "Close"), Binding("q", "close", "Close"), Binding("m", "toggle_mode", "Toggle Mode"), Binding("e", "export_and_open", "Export & Open"), ] def __init__(self, item_id: str, item_name: str, access_token: str, drive_id: str): """Initialize the document viewer screen. Args: item_id: The ID of the item to view. item_name: The name of the item to display. access_token: The access token for API requests. drive_id: The ID of the drive containing the item. """ super().__init__() self.item_id = item_id self.drive_id = drive_id self.item_name = item_name self.access_token = access_token self.document_content = "" self.plain_text_content = "" self.content_type = None self.raw_content = None self.file_extension = Path(item_name).suffix.lower().lstrip(".") self.mode: DisplayMode = DisplayMode.TEXT def compose(self) -> ComposeResult: """Compose the document viewer screen.""" yield Container( Horizontal( Container(Button("✕", id="close_button"), id="button_container"), Container( Label(f"Viewing: {self.item_name}", id="document_title"), Label( f'[link="{self.web_url}"]Open on Web[/link] | [link="{self.download_url}"]Download File[/link]', id="document_link", ), ), id="top_container", ), ScrollableContainer( Markdown("", id="markdown_content"), Static( "", id="image_content", expand=True, ), Label("", id="plaintext_content", classes="hidden", markup=False), id="content_container", ), id="document_viewer", ) yield Footer() def on_mount(self) -> None: """Handle screen mount event.""" self.query_one("#content_container").focus() self.download_document() def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button press events.""" if event.button.id == "close_button": self.dismiss() elif event.button.id == "toggle_mode_button": self.action_toggle_mode() elif event.button.id == "export_button": self.action_export_and_open() def is_convertible_format(self) -> bool: """Check if the current file is convertible to PDF or JPG.""" return ( self.file_extension in PDF_CONVERTIBLE_FORMATS or self.file_extension in JPG_CONVERTIBLE_FORMATS ) def get_conversion_format(self) -> str: """Get the appropriate conversion format (pdf or jpg) for the current file.""" if self.file_extension in PDF_CONVERTIBLE_FORMATS: return "pdf" elif self.file_extension in JPG_CONVERTIBLE_FORMATS: return "jpg" return "" @work async def download_document(self) -> None: """Download the document content.""" headers = {"Authorization": f"Bearer {self.access_token}"} try: metadataUrl = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}" async with aiohttp.ClientSession() as session: async with session.get(metadataUrl, headers=headers) as response: if response.status != 200: error_text = await response.text() self.notify( f"Failed to fetch document metadata: {error_text}", severity="error", ) return metadata = await response.json() self.item_name = metadata.get("name", self.item_name) self.file_extension = ( Path(self.item_name).suffix.lower().lstrip(".") ) self.download_url = metadata.get("@microsoft.graph.downloadUrl", "") self.web_url = metadata.get("webUrl", "") except Exception as e: self.notify(f"Error downloading document: {str(e)}", severity="error") try: url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content" # Show loading indicator self.query_one("#content_container").loading = True async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: error_text = await response.text() self.notify( f"Failed to download document: {error_text}", severity="error", ) return self.content_type = response.headers.get("content-type", "") self.raw_content = await response.read() # Process the content based on content type self.process_content() except Exception as e: self.notify(f"Error downloading document: {str(e)}", severity="error") finally: # Hide loading indicator self.query_one("#content_container").loading = False @work async def process_content(self) -> None: """Process the downloaded content based on its type.""" if not self.raw_content: self.notify("No content to display", severity="warning") return try: if self.content_type.startswith("image/"): from PIL import Image as PILImage from io import BytesIO self.notify("Attempting to display image in terminal") if self.raw_content and len(self.raw_content) > 0: self.image_bytes = self.raw_content self.mode = DisplayMode.IMAGE # Decode the image using BytesIO and Pillow img = PILImage.open(BytesIO(self.image_bytes)) # Convert the image to RGB mode if it's not already if img.mode != "RGB": img = img.convert("RGB") # Create a Textual Image renderable textual_img = Image(img) textual_img.expand = True textual_img.width = 120 self.query_one("#image_content", Static).update(textual_img) self.update_content_display() return except Exception as e: self.notify( f"Error displaying image in terminal: {str(e)}", severity="error" ) try: if self.use_markitdown: self.notify( "Attempting to convert file into Markdown with Markitdown...", title="This could take a moment", severity="info", ) from markitdown import MarkItDown with tempfile.NamedTemporaryFile( suffix=f".{self.file_extension}", delete=False ) as temp_file: temp_file.write(self.raw_content) temp_path = temp_file.name client = OpenAI() md = MarkItDown( enable_plugins=True, llm_client=client, llm_model="gpt-4o" ) # Set to True to enable plugins result = md.convert( temp_path, ) self.mode = DisplayMode.MARKDOWN self.document_content = result.markdown self.plain_text_content = result.text_content self.update_content_display() return except Exception as e: self.notify(f"Error using MarkItDown: {str(e)}", severity="error") try: if ( self.content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" ): self.notify( "Processing DOCX file into Markdown using Mammoth...", severity="info", ) self.process_docx() elif self.content_type.startswith("text/"): # Process as plain text text_content = self.raw_content.decode("utf-8", errors="replace") self.document_content = text_content self.mode = DisplayMode.TEXT self.update_content_display() elif self.content_type.startswith("image/"): # For images, just display a message self.document_content = f"*Image file: {self.item_name}*\n\nUse the 'Open URL' command to view this image in your browser." self.mode = DisplayMode.MARKDOWN self.update_content_display() else: # For other types, display a generic message conversion_info = "" if self.is_convertible_format(): conversion_format = self.get_conversion_format() conversion_info = f"\n\nThis file can be converted to {conversion_format.upper()}. Press 'e' or click 'Export & Open' to convert and view." self.document_content = f"*File: {self.item_name}*\n\nContent type: {self.content_type}{conversion_info}\n\nThis file type cannot be displayed directly in the viewer. You could [open in your browser]({self.web_url}), or [download the file]({self.download_url})." self.mode = DisplayMode.MARKDOWN self.update_content_display() except Exception as e: self.notify(f"Error processing content: {str(e)}", severity="error") @work async def process_docx(self) -> None: """Process DOCX content and convert to Markdown and plain text.""" try: # Save the DOCX content to a temporary file with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file: temp_file.write(self.raw_content) temp_path = temp_file.name # Convert DOCX to Markdown using mammoth with open(temp_path, "rb") as docx_file: result = mammoth.convert_to_markdown(docx_file) markdown_text = result.value # Read the document structure with python-docx for plain text doc = Document(temp_path) self.plain_text_content = "\n\n".join( [para.text for para in doc.paragraphs if para.text] ) self.document_content = markdown_text # Clean up temporary file os.unlink(temp_path) # Store both versions self.update_content_display() except Exception as e: self.notify(f"Error processing DOCX: {str(e)}", severity="error") def update_content_display(self) -> None: """Update the content display with the processed document content.""" markdown_widget = self.query_one("#markdown_content", Markdown) plaintext_widget = self.query_one("#plaintext_content", Label) image_widget = self.query_one("#image_content", Static) if self.mode == DisplayMode.IMAGE: image_widget.remove_class("hidden") markdown_widget.add_class("hidden") plaintext_widget.add_class("hidden") elif self.mode == DisplayMode.MARKDOWN: markdown_widget.update(self.document_content) markdown_widget.remove_class("hidden") image_widget.add_class("hidden") plaintext_widget.add_class("hidden") else: plaintext_widget.update(self.plain_text_content) plaintext_widget.remove_class("hidden") image_widget.add_class("hidden") markdown_widget.add_class("hidden") @work async def export_and_open_converted_file(self) -> None: """Export the file in converted format and open it.""" if not self.is_convertible_format(): self.notify("This file format cannot be converted.", severity="warning") return conversion_format = self.get_conversion_format() if not conversion_format: self.notify("No appropriate conversion format found.", severity="error") return try: # Build the URL with the format parameter url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content?format={conversion_format}" headers = {"Authorization": f"Bearer {self.access_token}"} # Download the converted file async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: error_text = await response.text() self.notify( f"Failed to export document: {error_text}", severity="error" ) return converted_content = await response.read() # Create temporary file with the right extension file_name = ( f"{os.path.splitext(self.item_name)[0]}.{conversion_format}" ) with tempfile.NamedTemporaryFile( suffix=f".{conversion_format}", delete=False, prefix=f"onedrive_export_", ) as temp_file: temp_file.write(converted_content) temp_path = temp_file.name # Open the file using the system default application self.notify( f"Opening exported {conversion_format.upper()} file: {file_name}" ) self.app.open_url(f"file://{temp_path}") self.query_one("#content_container").loading = False except Exception as e: self.notify(f"Error exporting document: {str(e)}", severity="error") async def action_toggle_mode(self) -> None: """Toggle between Markdown and plaintext display modes.""" self.notify("Switching Modes", severity="info") self.mode = ( DisplayMode.MARKDOWN if self.mode != DisplayMode.MARKDOWN else DisplayMode.TEXT ) self.update_content_display() mode_name = self.mode.name.capitalize() self.notify(f"Switched to {mode_name} mode") async def action_export_and_open(self) -> None: """Export the file in converted format and open it.""" self.query_one("#content_container").loading = True self.notify("Exporting and opening the converted file...") self.export_and_open_converted_file() async def action_close(self) -> None: """Close the document viewer screen.""" self.dismiss()