import os import sys import json import asyncio from datetime import datetime from pathlib import Path import msal import aiohttp from rich.panel import Panel from rich import print as rprint from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Container, Horizontal, Vertical from textual.widgets import ( Header, Footer, Static, Label, DataTable, Button, ListView, ListItem, LoadingIndicator, OptionList ) from textual.reactive import reactive from textual.worker import Worker, get_current_worker from textual import work from textual.widgets.option_list import Option # Import file icons utility - note the updated import from utils.file_icons import get_file_icon # Import our DocumentViewerScreen sys.path.append(os.path.join(os.path.dirname(__file__), "maildir_gtd")) from maildir_gtd.screens.DocumentViewer import DocumentViewerScreen class FolderHistoryEntry: """Represents an entry in the folder navigation history.""" def __init__(self, folder_id: str, folder_name: str, parent_id: str = None): self.folder_id = folder_id self.folder_name = folder_name self.parent_id = parent_id def __eq__(self, other): if not isinstance(other, FolderHistoryEntry): return False return self.folder_id == other.folder_id class OneDriveTUI(App): """A Textual app for OneDrive integration with MSAL authentication.""" CSS_PATH = "drive_view_tui.tcss" # Reactive variables is_authenticated = reactive(False) selected_drive_id = reactive("") drive_name = reactive("") current_view = reactive("Following") # Track current view: "Following" or "Root" current_folder_id = reactive("root") # Track current folder ID current_folder_name = reactive("Root") # Track current folder name # App bindings BINDINGS = [ Binding("q", "quit", "Quit"), Binding("r", "refresh", "Refresh"), Binding("f", "toggle_follow", "Toggle Follow"), Binding("o", "open_url", "Open URL"), Binding("enter", "open_url", "Open URL"), Binding("v", "view_document", "View Document"), Binding("tab", "next_view", "Switch View"), Binding("backspace", "navigate_back", "Back"), Binding("b", "navigate_back", "Back"), ] def __init__(self): super().__init__() self.access_token = None self.drives = [] self.followed_items = [] self.current_items = {} # Store currently displayed items self.folder_history = [] # History stack for folder navigation self.msal_app = None self.cache = msal.SerializableTokenCache() # Read Azure app credentials from environment variables self.client_id = os.getenv("AZURE_CLIENT_ID") self.tenant_id = os.getenv("AZURE_TENANT_ID") self.scopes = ["https://graph.microsoft.com/Files.ReadWrite.All"] self.cache_file = "token_cache.bin" def compose(self) -> ComposeResult: """Create child widgets for the app.""" yield Header(show_clock=True) with Container(id="main_container"): with Horizontal(id="top_bar"): yield Button("\uf148 Up", id="back_button", classes="hide") yield Label("Authenticating with Microsoft Graph API...", id="status_label") yield LoadingIndicator(id="loading") yield OptionList( Option("Following", id="following"), Option("Root", id="root"), id="view_options" ) with Container(id="auth_container"): yield Label("", id="auth_message") yield Button("Login", id="login_button", variant="primary") with Container(id="content_container", classes="hide"): with Vertical(id="items_container"): yield DataTable(id="items_table") yield Label("No items found", id="no_items_label", classes="hide") yield Footer() async def on_mount(self) -> None: """Initialize the app when mounted.""" self.query_one("#login_button").styles.width = "20" self.query_one("#view_options").border_title = "My Files" # Initialize the table table = self.query_one("#items_table") table.cursor_type = "row" table.add_columns("◇", "Name", "Last Modified", "Size", "Web URL") table.focus() # Load cached token if available if os.path.exists(self.cache_file): with open(self.cache_file, "r") as f: self.cache.deserialize(f.read()) # Initialize MSAL app self.initialize_msal() # Try silent authentication first self.authenticate_silent() def initialize_msal(self) -> None: """Initialize the MSAL application.""" if not self.client_id or not self.tenant_id: self.notify( "Please set AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.", severity="error", timeout=10, ) return authority = f"https://login.microsoftonline.com/{self.tenant_id}" self.msal_app = msal.PublicClientApplication( self.client_id, authority=authority, token_cache=self.cache ) def authenticate_silent(self) -> None: """Try silent authentication first.""" if not self.msal_app: return accounts = self.msal_app.get_accounts() if accounts: self.query_one("#status_label").update("Trying silent authentication...") self.get_token_silent(accounts[0]) else: self.query_one("#status_label").update("Please log in to continue.") self.query_one("#auth_container").remove_class("hide") self.query_one("#loading").remove() @work async def get_token_silent(self, account): """Get token silently.""" token_response = self.msal_app.acquire_token_silent(self.scopes, account=account) if token_response and "access_token" in token_response: self.access_token = token_response["access_token"] self.is_authenticated = True self.load_initial_data() else: self.query_one("#status_label").update("Silent authentication failed. Please log in.") self.query_one("#auth_container").remove_class("hide") self.query_one("#loading").remove() async def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button presses.""" if event.button.id == "back_button": self.action_navigate_back() if event.button.id == "login_button": self.initiate_device_flow() @work async def initiate_device_flow(self): """Initiate the MSAL device code flow.""" self.query_one("#loading").styles.display = "block" self.query_one("#status_label").update("Initiating device code flow...") # Initiate device flow flow = self.msal_app.initiate_device_flow(scopes=self.scopes) if "user_code" not in flow: self.notify("Failed to create device flow", severity="error") return # Display the device code message self.query_one("#auth_message").update(flow["message"]) # Wait for the user to authenticate token_response = self.msal_app.acquire_token_by_device_flow(flow) if "access_token" not in token_response: self.notify("Failed to acquire token", severity="error") return # Save token to cache with open(self.cache_file, "w") as f: f.write(self.cache.serialize()) self.access_token = token_response["access_token"] self.is_authenticated = True # Proceed with loading drives and followed items self.load_initial_data() @work async def load_initial_data(self): """Load initial data after authentication.""" self.query_one("#status_label").update("Loading drives...") # Load drives first if not self.access_token: return headers = {"Authorization": f"Bearer {self.access_token}"} try: async with aiohttp.ClientSession() as session: async with session.get( "https://graph.microsoft.com/v1.0/me/drives", headers=headers ) as response: if response.status != 200: self.notify(f"Failed to load drives: {response.status}", severity="error") return drives_data = await response.json() self.drives = drives_data.get("value", []) except Exception as e: self.notify(f"Error loading drives: {str(e)}", severity="error") # Hide auth container and show content container self.query_one("#auth_container").add_class("hide") self.query_one("#content_container").remove_class("hide") self.query_one("#loading").remove() # Find and select the OneDrive drive for drive in self.drives: if drive.get("name") == "OneDrive": self.selected_drive_id = drive.get("id") self.drive_name = drive.get("name") break # Set Following as default view option_list = self.query_one("#view_options") option_list.highlighted = 0 # Select "Following" option # If we have a selected drive, load followed items if self.selected_drive_id: self.load_followed_items() async def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None: """Handle option list selection.""" selected_option = event.option.id if selected_option == "following": self.current_view = "Following" if self.selected_drive_id: self.load_followed_items() elif selected_option == "root": self.current_view = "Root" if self.selected_drive_id: self.load_drive_folder_items() async def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: """Handle row selection in the items table.""" selected_id = event.row_key.value self.open_item(selected_id) async def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: self.selected_item_id = event.row_key.value def open_item(self, selected_id: str): if selected_id: self.folder_history.append(FolderHistoryEntry(self.current_folder_id, self.current_folder_name, self.selected_drive_id)) # Get an item from current items by ID string selected_row = self.current_items[selected_id] item_name = selected_row.get("name") # Check if it's a folder is_folder = bool(selected_row.get("folder")) if is_folder: self.notify(f"Selected folder: {item_name}", timeout=1) # Load items in the folder self.query_one("#back_button").remove_class("hide") self.query_one("#status_label").update(f"Loading items in folder: {item_name}") self.load_drive_folder_items(folder_id=selected_id, drive_id=selected_row.get("parentReference", {}).get("driveId", self.selected_drive_id)) else: self.notify(f"Selected file: {item_name}") self.action_view_document() @work async def load_drive_folder_items(self, folder_id: str = "", drive_id: str = "", track_history: bool = True): """Load root items from the selected drive.""" if not self.access_token or not self.selected_drive_id: return self.query_one("#status_label").update("Loading drive folder items...") headers = {"Authorization": f"Bearer {self.access_token}"} url = f"https://graph.microsoft.com/v1.0/me/drive/root/children" if folder_id and drive_id and folder_id != "root": url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{folder_id}/children" # if track_history: # self.folder_history.append(FolderHistoryEntry(folder_id, self.current_folder_name, drive_id)) self.selected_drive_id = drive_id self.current_folder_id = folder_id self.current_folder_name = self.current_items[folder_id].get("name", "Unknown") try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: self.notify(f"Failed to load drive items: {response.status}", severity="error") return items_data = await response.json() # Update the table with the root items self.update_items_table(items_data.get("value", [])) except Exception as e: self.notify(f"Error loading root items: {str(e)}", severity="error") #update the status label with breadcrumbs from the folder_history if self.folder_history: breadcrumbs = " / \uf07b ".join([entry.folder_name for entry in self.folder_history]) self.query_one("#status_label").update(f"\uf07b {breadcrumbs} / \uf07b {self.current_folder_name}") else: self.query_one("#status_label").update(f" \uf07b {self.current_folder_name}") @work async def load_followed_items(self): """Load followed items from the selected drive.""" if not self.access_token or not self.selected_drive_id: return self.query_one("#status_label").update("Loading followed items...") headers = {"Authorization": f"Bearer {self.access_token}"} try: url = f"https://graph.microsoft.com/v1.0/me/drive/following" async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: self.notify(f"Failed to load followed items: {response.status}", severity="error") return items_data = await response.json() followed_items = items_data.get("value", []) # Update the table with the followed items self.update_items_table(followed_items) except Exception as e: self.notify(f"Error loading followed items: {str(e)}", severity="error") self.query_one("#status_label").update("Ready") def update_items_table(self, items, is_root_view=False): """Update the table with the given items.""" table = self.query_one(DataTable) table.clear() if not items: self.query_one("#no_items_label").remove_class("hide") return self.query_one("#no_items_label").add_class("hide") for item in items: name = item.get("name", "Unknown") is_folder = bool(item.get("folder")) # Get icon for the file type item_type = get_file_icon(name, is_folder) # Format the last modified date last_modified = item.get("lastModifiedDateTime", "") if last_modified: try: date_obj = datetime.fromisoformat(last_modified.replace('Z', '+00:00')) last_modified = date_obj.strftime("%Y-%m-%d %H:%M") except: pass # Format the size size = item.get("size", 0) if size: if size < 1024: size_str = f"{size} B" elif size < 1024 * 1024: size_str = f"{size / 1024:.1f} KB" elif size < 1024 * 1024 * 1024: size_str = f"{size / (1024 * 1024):.1f} MB" else: size_str = f"{size / (1024 * 1024 * 1024):.1f} GB" else: size_str = "N/A" web_url = item.get("webUrl", "") item_id = item.get("id") # Limit filename length to 160 characters display_name = name[:50] + '...' if len(name) > 50 else name # Add row to table with the appropriate icon class for styling row_key = table.add_row( item_type, display_name, last_modified, size_str, web_url, key=item.get("id") ) # Add item to the list of current items keyed by row_key so we can look up all information later self.current_items[row_key] = item async def action_next_view(self) -> None: """Switch to the next view (Following/Root).""" option_list = self.query_one("#view_options") if self.current_view == "Following": option_list.highlighted = 1 # Switch to Root self.current_view = "Root" self.load_drive_folder_items() else: option_list.highlighted = 0 # Switch to Following self.current_view = "Following" self.load_followed_items() self.notify(f"Switched to {self.current_view} view") async def action_refresh(self) -> None: """Refresh the data.""" if self.is_authenticated and self.selected_drive_id: if self.current_view == "Following": self.load_followed_items() elif self.current_view == "Root": self.load_drive_folder_items() self.notify("Refreshed items") async def action_toggle_follow(self) -> None: """Toggle follow status for selected item.""" # This would be implemented to follow/unfollow the selected item # Currently just a placeholder for the key binding self.notify("Toggle follow functionality not implemented yet") async def action_open_url(self) -> None: """Open the web URL of the selected item.""" table = self.query_one("#items_table") if table.cursor_row is not None: selected_row = table.get_row_at(table.cursor_row) if selected_row and len(selected_row) > 4: web_url = selected_row[4] if web_url: self.notify(f"Opening URL: {web_url}") # Use Textual's built-in open_url method self.app.open_url(web_url) def action_view_document(self) -> None: """View the selected document using the DocumentViewerScreen.""" # Get the name of the selected item selected_row = self.current_items.get(self.selected_item_id) if not selected_row: return selected_name = selected_row.get("name") drive_id = selected_row.get("parentReference", {}).get("driveId", self.selected_drive_id) web_url = selected_row.get("webUrl", "") # Open the document viewer screen with all required details viewer = DocumentViewerScreen(self.selected_item_id, selected_name, self.access_token, drive_id) viewer.web_url = web_url # Pass the webUrl to the viewer self.push_screen(viewer) async def action_quit(self) -> None: """Quit the application.""" self.exit() def action_navigate_back(self) -> None: """Navigate back to the previous folder.""" if self.folder_history: previous_entry = self.folder_history.pop() self.current_folder_id = previous_entry.folder_id self.current_folder_name = previous_entry.folder_name if len(self.folder_history) <= 0: self.query_one("#back_button").add_class("hide") self.load_drive_folder_items(folder_id=previous_entry.folder_id, drive_id=previous_entry.parent_id, track_history=False) else: self.notify("No previous folder to navigate back to") if __name__ == "__main__": app = OneDriveTUI() app.run()