import os import io import asyncio import tempfile from typing import Optional import aiohttp import mammoth from docx import Document from textual.app import ComposeResult from textual.binding import Binding from textual.containers import Container, ScrollableContainer, Horizontal from textual.screen import Screen from textual.widgets import Label, Markdown, LoadingIndicator, Button, Footer from textual.worker import Worker, get_current_worker from textual import work class DocumentViewerScreen(Screen): """Screen for viewing document content from OneDrive items.""" BINDINGS = [ Binding("escape", "close", "Close"), Binding("q", "close", "Close"), Binding("m", "toggle_mode", "Toggle Mode"), ] def __init__(self, item_id: str, item_name: str, access_token: str, drive_id: str): """Initialize the document viewer screen. Args: item_id: The ID of the item to view. item_name: The name of the item to display. access_token: The access token for API requests. """ super().__init__() self.item_id = item_id self.drive_id = drive_id self.item_name = item_name self.access_token = access_token self.document_content = "" self.plain_text_content = "" self.is_markdown_mode = False self.content_type = None self.raw_content = None def compose(self) -> ComposeResult: """Compose the document viewer screen.""" yield Container( Horizontal( Label(f"Viewing: {self.item_name}", id="document_title"), Container( Button("Close", id="close_button"), Button("Toggle Mode", id="toggle_mode_button"), id="button_container" ), id="top_container" ), ScrollableContainer( Markdown("", id="markdown_content"), Label("", id="plaintext_content", classes="hidden", markup=False), id="content_container", ), id="document_viewer" ) yield Footer() def on_mount(self) -> None: """Handle screen mount event.""" self.download_document() def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button press events.""" if event.button.id == "close_button": self.dismiss() elif event.button.id == "toggle_mode_button": self.action_toggle_mode() @work async def download_document(self) -> None: """Download the document content.""" try: url = f"https://graph.microsoft.com/v1.0/drives/{self.drive_id}/items/{self.item_id}/content" headers = {"Authorization": f"Bearer {self.access_token}"} # Show loading indicator self.query_one("#content_container").loading = True async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: error_text = await response.text() self.notify(f"Failed to download document: {error_text}", severity="error") return self.content_type = response.headers.get("content-type", "") self.raw_content = await response.read() # Process the content based on content type self.process_content() except Exception as e: self.notify(f"Error downloading document: {str(e)}", severity="error") finally: # Hide loading indicator self.query_one("#content_container").loading = False @work async def process_content(self) -> None: """Process the downloaded content based on its type.""" if not self.raw_content: self.notify("No content to display", severity="warning") return try: # Check for Office document types if self.content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": # Process as DOCX self.process_docx() elif self.content_type.startswith("text/"): # Process as plain text text_content = self.raw_content.decode("utf-8", errors="replace") self.document_content = text_content self.update_content_display() elif self.content_type.startswith("image/"): # For images, just display a message self.document_content = f"*Image file: {self.item_name}*\n\nUse the 'Open URL' command to view this image in your browser." self.update_content_display() else: # For other types, display a generic message self.document_content = f"*File: {self.item_name}*\n\nContent type: {self.content_type}\n\nThis file type cannot be displayed in the viewer. Use the 'Open URL' command to view this file in your browser." self.update_content_display() except Exception as e: self.notify(f"Error processing content: {str(e)}", severity="error") @work async def process_docx(self) -> None: """Process DOCX content and convert to Markdown and plain text.""" try: # Save the DOCX content to a temporary file with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file: temp_file.write(self.raw_content) temp_path = temp_file.name # Convert DOCX to Markdown using mammoth with open(temp_path, "rb") as docx_file: result = mammoth.convert_to_markdown(docx_file) markdown_text = result.value # Read the document structure with python-docx for plain text doc = Document(temp_path) self.plain_text_content = "\n\n".join([para.text for para in doc.paragraphs if para.text]) self.document_content = markdown_text # Clean up temporary file os.unlink(temp_path) # Store both versions self.update_content_display() except Exception as e: self.notify(f"Error processing DOCX: {str(e)}", severity="error") def update_content_display(self) -> None: """Update the content display with the processed document content.""" markdown_widget = self.query_one("#markdown_content", Markdown) plaintext_widget = self.query_one("#plaintext_content", Label) if self.is_markdown_mode: markdown_widget.update(self.document_content) markdown_widget.remove_class("hidden") plaintext_widget.add_class("hidden") else: plaintext_widget.update(self.plain_text_content) plaintext_widget.remove_class("hidden") markdown_widget.add_class("hidden") async def action_toggle_mode(self) -> None: """Toggle between Markdown and plaintext display modes.""" self.is_markdown_mode = not self.is_markdown_mode self.update_content_display() mode_name = "Markdown" if self.is_markdown_mode else "Plain Text" self.notify(f"Switched to {mode_name} mode") async def action_close(self) -> None: """Close the document viewer screen.""" self.dismiss()