import os import sys import logging from datetime import datetime import msal import aiohttp # Suppress debug logging from authentication and HTTP libraries logging.getLogger("msal").setLevel(logging.ERROR) logging.getLogger("urllib3").setLevel(logging.ERROR) logging.getLogger("requests").setLevel(logging.ERROR) logging.getLogger("requests_oauthlib").setLevel(logging.ERROR) logging.getLogger("aiohttp").setLevel(logging.ERROR) logging.getLogger("aiohttp.access").setLevel(logging.ERROR) logging.getLogger("asyncio").setLevel(logging.ERROR) from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Container, Horizontal, Vertical from textual.widgets import ( Header, Footer, Label, DataTable, Button, LoadingIndicator, OptionList, ) from textual.reactive import reactive from textual import work from textual.widgets.option_list import Option # Import file icons utility - note the updated import from src.utils.file_icons import get_file_icon # Import our DocumentViewerScreen sys.path.append(os.path.join(os.path.dirname(__file__), "src", "maildir_gtd")) from screens.DocumentViewer import DocumentViewerScreen class FolderHistoryEntry: """Represents an entry in the folder navigation history.""" def __init__(self, folder_id: str, folder_name: str, parent_id: str = ""): self.folder_id = folder_id self.folder_name = folder_name self.parent_id = parent_id def __eq__(self, other): if not isinstance(other, FolderHistoryEntry): return False return self.folder_id == other.folder_id class OneDriveTUI(App): """A Textual app for OneDrive integration with MSAL authentication.""" CSS_PATH = "drive_view_tui.tcss" # Reactive variables is_authenticated = reactive(False) selected_drive_id = reactive("") drive_name = reactive("") current_view = reactive("Following") # Track current view: "Following" or "Root" current_folder_id = reactive("root") # Track current folder ID current_folder_name = reactive("Root") # Track current folder name # App bindings BINDINGS = [ Binding("q", "quit", "Quit"), Binding("r", "refresh", "Refresh"), Binding("f", "toggle_follow", "Toggle Follow"), Binding("o", "open_url", "Open URL"), Binding("enter", "open_url", "Open URL"), Binding("v", "view_document", "View Document"), Binding("tab", "next_view", "Switch View"), Binding("backspace", "navigate_back", "Back"), Binding("b", "navigate_back", "Back"), ] def __init__(self): super().__init__() self.access_token = None self.drives = [] self.followed_items = [] self.current_items = {} # Store currently displayed items self.folder_history = [] # History stack for folder navigation self.msal_app = None self.cache = msal.SerializableTokenCache() # Read Azure app credentials from environment variables self.client_id = os.getenv("AZURE_CLIENT_ID") self.tenant_id = os.getenv("AZURE_TENANT_ID") self.scopes = ["https://graph.microsoft.com/Files.ReadWrite.All"] self.cache_file = "token_cache.bin" def compose(self) -> ComposeResult: """Create child widgets for the app.""" yield Header(show_clock=True) with Container(id="main_container"): with Horizontal(id="top_bar"): yield Button("\uf148 Up", id="back_button", classes="hide") yield Label( "Authenticating with Microsoft Graph API...", id="status_label" ) yield LoadingIndicator(id="loading") yield OptionList( Option("Following", id="following"), Option("Root", id="root"), id="view_options", ) with Container(id="auth_container"): yield Label("", id="auth_message") yield Button("Login", id="login_button", variant="primary") with Container(id="content_container", classes="hide"): with Vertical(id="items_container"): yield DataTable(id="items_table") yield Label("No items found", id="no_items_label", classes="hide") yield Footer() async def on_mount(self) -> None: """Initialize the app when mounted.""" self.query_one("#login_button").styles.width = "20" self.query_one("#view_options").border_title = "My Files" # Initialize the table table = self.query_one("#items_table", DataTable) table.cursor_type = "row" table.add_columns("◇", "Name", "Last Modified", "Size", "Web URL") table.focus() # Load cached token if available if os.path.exists(self.cache_file): with open(self.cache_file, "r") as f: self.cache.deserialize(f.read()) # Initialize MSAL app self.initialize_msal() # Try silent authentication first self.authenticate_silent() def initialize_msal(self) -> None: """Initialize the MSAL application.""" if not self.client_id or not self.tenant_id: self.notify( "Please set AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.", severity="error", timeout=10, ) return authority = f"https://login.microsoftonline.com/{self.tenant_id}" self.msal_app = msal.PublicClientApplication( self.client_id, authority=authority, token_cache=self.cache ) def authenticate_silent(self) -> None: """Try silent authentication first.""" if not self.msal_app: return accounts = self.msal_app.get_accounts() if accounts: self.query_one("#status_label").update("Trying silent authentication...") self.get_token_silent(accounts[0]) else: self.query_one("#status_label").update("Please log in to continue.") self.query_one("#auth_container").remove_class("hide") self.query_one("#loading").remove() @work async def get_token_silent(self, account): """Get token silently.""" token_response = self.msal_app.acquire_token_silent( self.scopes, account=account ) if token_response and "access_token" in token_response: self.access_token = token_response["access_token"] self.is_authenticated = True self.load_initial_data() else: self.query_one("#status_label").update( "Silent authentication failed. Please log in." ) self.query_one("#auth_container").remove_class("hide") self.query_one("#loading").remove() async def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button presses.""" if event.button.id == "back_button": self.action_navigate_back() if event.button.id == "login_button": self.initiate_device_flow() @work async def initiate_device_flow(self): """Initiate the MSAL device code flow.""" self.query_one("#loading").styles.display = "block" self.query_one("#status_label").update("Initiating device code flow...") # Initiate device flow flow = self.msal_app.initiate_device_flow(scopes=self.scopes) if "user_code" not in flow: self.notify("Failed to create device flow", severity="error") return # Display the device code message self.query_one("#auth_message").update(flow["message"]) # Wait for the user to authenticate token_response = self.msal_app.acquire_token_by_device_flow(flow) if "access_token" not in token_response: self.notify("Failed to acquire token", severity="error") return # Save token to cache with open(self.cache_file, "w") as f: f.write(self.cache.serialize()) self.access_token = token_response["access_token"] self.is_authenticated = True # Proceed with loading drives and followed items self.load_initial_data() @work async def load_initial_data(self): """Load initial data after authentication.""" self.query_one("#status_label").update("Loading drives...") # Load drives first if not self.access_token: return headers = {"Authorization": f"Bearer {self.access_token}"} try: async with aiohttp.ClientSession() as session: async with session.get( "https://graph.microsoft.com/v1.0/me/drives", headers=headers ) as response: if response.status != 200: self.notify( f"Failed to load drives: {response.status}", severity="error", ) return drives_data = await response.json() self.drives = drives_data.get("value", []) except Exception as e: self.notify(f"Error loading drives: {str(e)}", severity="error") # Hide auth container and show content container self.query_one("#auth_container").add_class("hide") self.query_one("#content_container").remove_class("hide") self.query_one("#loading").remove() # Find and select the OneDrive drive for drive in self.drives: if drive.get("name") == "OneDrive": self.selected_drive_id = drive.get("id") self.drive_name = drive.get("name") break # Set Following as default view option_list = self.query_one("#view_options") option_list.highlighted = 0 # Select "Following" option # If we have a selected drive, load followed items if self.selected_drive_id: self.load_followed_items() async def on_option_list_option_selected( self, event: OptionList.OptionSelected ) -> None: """Handle option list selection.""" selected_option = event.option.id if selected_option == "following": self.current_view = "Following" if self.selected_drive_id: self.load_followed_items() elif selected_option == "root": self.current_view = "Root" if self.selected_drive_id: self.load_drive_folder_items() async def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: """Handle row selection in the items table.""" selected_id = event.row_key.value self.open_item(selected_id) async def on_data_table_row_highlighted( self, event: DataTable.RowHighlighted ) -> None: self.selected_item_id = event.row_key.value def open_item(self, selected_id: str): if selected_id: self.folder_history.append( FolderHistoryEntry( self.current_folder_id, self.current_folder_name, self.selected_drive_id, ) ) # Get an item from current items by ID string selected_row = self.current_items[selected_id] item_name = selected_row.get("name") # Check if it's a folder is_folder = bool(selected_row.get("folder")) if is_folder: self.notify(f"Selected folder: {item_name}", timeout=1) # Load items in the folder self.query_one("#back_button").remove_class("hide") self.query_one("#status_label").update( f"Loading items in folder: {item_name}" ) self.load_drive_folder_items( folder_id=selected_id, drive_id=selected_row.get("parentReference", {}).get( "driveId", self.selected_drive_id ), ) else: self.notify(f"Selected file: {item_name}") self.action_view_document() @work async def load_drive_folder_items( self, folder_id: str = "", drive_id: str = "", track_history: bool = True ): """Load root items from the selected drive.""" if not self.access_token or not self.selected_drive_id: return self.query_one("#status_label").update("Loading drive folder items...") headers = {"Authorization": f"Bearer {self.access_token}"} url = "https://graph.microsoft.com/v1.0/me/drive/root/children" if folder_id and drive_id and folder_id != "root": url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{folder_id}/children" # if track_history: # self.folder_history.append(FolderHistoryEntry(folder_id, self.current_folder_name, drive_id)) self.selected_drive_id = drive_id self.current_folder_id = folder_id self.current_folder_name = self.current_items[folder_id].get( "name", "Unknown" ) try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: self.notify( f"Failed to load drive items: {response.status}", severity="error", ) return items_data = await response.json() # Update the table with the root items self.update_items_table(items_data.get("value", [])) except Exception as e: self.notify(f"Error loading root items: {str(e)}", severity="error") # update the status label with breadcrumbs from the folder_history if self.folder_history: breadcrumbs = " / \uf07b ".join( [entry.folder_name for entry in self.folder_history] ) self.query_one("#status_label").update( f"\uf07b {breadcrumbs} / \uf07b {self.current_folder_name}" ) else: self.query_one("#status_label").update( f" \uf07b {self.current_folder_name}" ) @work async def load_followed_items(self): """Load followed items from the selected drive.""" if not self.access_token or not self.selected_drive_id: return self.query_one("#status_label").update("Loading followed items...") headers = {"Authorization": f"Bearer {self.access_token}"} try: url = "https://graph.microsoft.com/v1.0/me/drive/following" async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: self.notify( f"Failed to load followed items: {response.status}", severity="error", ) return items_data = await response.json() followed_items = items_data.get("value", []) # Update the table with the followed items self.update_items_table(followed_items) except Exception as e: self.notify(f"Error loading followed items: {str(e)}", severity="error") self.query_one("#status_label").update("Ready") def update_items_table(self, items, is_root_view=False): """Update the table with the given items.""" table = self.query_one(DataTable) table.clear() if not items: self.query_one("#no_items_label").remove_class("hide") return self.query_one("#no_items_label").add_class("hide") for item in items: name = item.get("name", "Unknown") is_folder = bool(item.get("folder")) # Get icon for the file type item_type = get_file_icon(name, is_folder) # Format the last modified date last_modified = item.get("lastModifiedDateTime", "") if last_modified: try: date_obj = datetime.fromisoformat( last_modified.replace("Z", "+00:00") ) last_modified = date_obj.strftime("%Y-%m-%d %H:%M") except: pass # Format the size size = item.get("size", 0) if size: if size < 1024: size_str = f"{size} B" elif size < 1024 * 1024: size_str = f"{size / 1024:.1f} KB" elif size < 1024 * 1024 * 1024: size_str = f"{size / (1024 * 1024):.1f} MB" else: size_str = f"{size / (1024 * 1024 * 1024):.1f} GB" else: size_str = "N/A" web_url = item.get("webUrl", "") # Limit filename length to 160 characters display_name = name[:50] + "..." if len(name) > 50 else name # Add row to table with the appropriate icon class for styling row_key = table.add_row( item_type, display_name, last_modified, size_str, web_url, key=item.get("id"), ) # Add item to the list of current items keyed by row_key so we can look up all information later self.current_items[row_key] = item async def action_next_view(self) -> None: """Switch to the next view (Following/Root).""" option_list = self.query_one("#view_options") if self.current_view == "Following": option_list.highlighted = 1 # Switch to Root self.current_view = "Root" self.load_drive_folder_items() else: option_list.highlighted = 0 # Switch to Following self.current_view = "Following" self.load_followed_items() self.notify(f"Switched to {self.current_view} view") async def action_refresh(self) -> None: """Refresh the data.""" if self.is_authenticated and self.selected_drive_id: if self.current_view == "Following": self.load_followed_items() elif self.current_view == "Root": self.load_drive_folder_items() self.notify("Refreshed items") async def action_toggle_follow(self) -> None: """Toggle follow status for selected item.""" # This would be implemented to follow/unfollow the selected item # Currently just a placeholder for the key binding self.notify("Toggle follow functionality not implemented yet") async def action_open_url(self) -> None: """Open the web URL of the selected item.""" table = self.query_one("#items_table") if table.cursor_row is not None: selected_row = table.get_row_at(table.cursor_row) if selected_row and len(selected_row) > 4: web_url = selected_row[4] if web_url: self.notify(f"Opening URL: {web_url}") # Use Textual's built-in open_url method self.app.open_url(web_url) def action_view_document(self) -> None: """View the selected document using the DocumentViewerScreen.""" # Get the name of the selected item selected_row = self.current_items.get(self.selected_item_id) if not selected_row: return selected_name = selected_row.get("name") drive_id = selected_row.get("parentReference", {}).get( "driveId", self.selected_drive_id ) web_url = selected_row.get("webUrl", "") # Open the document viewer screen with all required details viewer = DocumentViewerScreen( self.selected_item_id, selected_name, self.access_token, drive_id ) viewer.web_url = web_url # Pass the webUrl to the viewer self.push_screen(viewer) async def action_quit(self) -> None: """Quit the application.""" self.exit() def action_navigate_back(self) -> None: """Navigate back to the previous folder.""" if self.folder_history: previous_entry = self.folder_history.pop() self.current_folder_id = previous_entry.folder_id self.current_folder_name = previous_entry.folder_name if len(self.folder_history) <= 0: self.query_one("#back_button").add_class("hide") self.load_drive_folder_items( folder_id=previous_entry.folder_id, drive_id=previous_entry.parent_id, track_history=False, ) else: self.notify("No previous folder to navigate back to") if __name__ == "__main__": app = OneDriveTUI() app.run()