Files
luk/drive_view_tui.py
2025-05-12 10:40:01 -06:00

496 lines
19 KiB
Python

import os
import sys
import json
import asyncio
from datetime import datetime
from pathlib import Path
import msal
import aiohttp
from rich.panel import Panel
from rich import print as rprint
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import (
Header,
Footer,
Static,
Label,
DataTable,
Button,
ListView,
ListItem,
LoadingIndicator,
OptionList
)
from textual.reactive import reactive
from textual.worker import Worker, get_current_worker
from textual import work
from textual.widgets.option_list import Option
# Import our DocumentViewerScreen
sys.path.append(os.path.join(os.path.dirname(__file__), "maildir_gtd"))
from maildir_gtd.screens.DocumentViewer import DocumentViewerScreen
class FolderHistoryEntry:
"""Represents an entry in the folder navigation history."""
def __init__(self, folder_id: str, folder_name: str, parent_id: str = None):
self.folder_id = folder_id
self.folder_name = folder_name
self.parent_id = parent_id
def __eq__(self, other):
if not isinstance(other, FolderHistoryEntry):
return False
return self.folder_id == other.folder_id
class OneDriveTUI(App):
"""A Textual app for OneDrive integration with MSAL authentication."""
CSS_PATH = "drive_view_tui.tcss"
# Reactive variables
is_authenticated = reactive(False)
selected_drive_id = reactive("")
drive_name = reactive("")
current_view = reactive("Following") # Track current view: "Following" or "Root"
current_folder_id = reactive("root") # Track current folder ID
current_folder_name = reactive("Root") # Track current folder name
# App bindings
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("r", "refresh", "Refresh"),
Binding("f", "toggle_follow", "Toggle Follow"),
Binding("o", "open_url", "Open URL"),
Binding("enter", "open_url", "Open URL"),
Binding("v", "view_document", "View Document"),
Binding("tab", "next_view", "Switch View"),
Binding("backspace", "navigate_back", "Back"),
Binding("b", "navigate_back", "Back"),
]
def __init__(self):
super().__init__()
self.access_token = None
self.drives = []
self.followed_items = []
self.current_items = {} # Store currently displayed items
self.folder_history = [] # History stack for folder navigation
self.msal_app = None
self.cache = msal.SerializableTokenCache()
# Read Azure app credentials from environment variables
self.client_id = os.getenv("AZURE_CLIENT_ID")
self.tenant_id = os.getenv("AZURE_TENANT_ID")
self.scopes = ["https://graph.microsoft.com/Files.ReadWrite.All"]
self.cache_file = "token_cache.bin"
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header(show_clock=True)
with Container(id="main_container"):
yield LoadingIndicator(id="loading")
yield Label("Authenticating with Microsoft Graph API...", id="status_label")
with Container(id="auth_container"):
yield Label("", id="auth_message")
yield Button("Login", id="login_button", variant="primary")
with Container(id="content_container", classes="hide"):
with Horizontal():
with Vertical(id="navigation_container"):
yield OptionList(
Option("Following", id="following"),
Option("Root", id="root"),
id="view_options"
)
with Vertical(id="items_container"):
yield DataTable(id="items_table")
yield Label("No items found", id="no_items_label", classes="hide")
yield Footer()
async def on_mount(self) -> None:
"""Initialize the app when mounted."""
self.query_one("#login_button").styles.width = "20"
self.query_one("#view_options").border_title = "My Files"
# Initialize the table
table = self.query_one("#items_table")
table.cursor_type = "row"
table.add_columns("Type", "Name", "Last Modified", "Size", "Web URL")
# Load cached token if available
if os.path.exists(self.cache_file):
with open(self.cache_file, "r") as f:
self.cache.deserialize(f.read())
# Initialize MSAL app
self.initialize_msal()
# Try silent authentication first
self.authenticate_silent()
def initialize_msal(self) -> None:
"""Initialize the MSAL application."""
if not self.client_id or not self.tenant_id:
self.notify(
"Please set AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.",
severity="error",
timeout=10,
)
return
authority = f"https://login.microsoftonline.com/{self.tenant_id}"
self.msal_app = msal.PublicClientApplication(
self.client_id, authority=authority, token_cache=self.cache
)
def authenticate_silent(self) -> None:
"""Try silent authentication first."""
if not self.msal_app:
return
accounts = self.msal_app.get_accounts()
if accounts:
self.query_one("#status_label").update("Trying silent authentication...")
self.get_token_silent(accounts[0])
else:
self.query_one("#status_label").update("Please log in to continue.")
self.query_one("#auth_container").remove_class("hide")
self.query_one("#loading").remove()
@work
async def get_token_silent(self, account):
"""Get token silently."""
token_response = self.msal_app.acquire_token_silent(self.scopes, account=account)
if token_response and "access_token" in token_response:
self.access_token = token_response["access_token"]
self.is_authenticated = True
self.load_initial_data()
else:
self.query_one("#status_label").update("Silent authentication failed. Please log in.")
self.query_one("#auth_container").remove_class("hide")
self.query_one("#loading").remove()
async def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "login_button":
self.initiate_device_flow()
@work
async def initiate_device_flow(self):
"""Initiate the MSAL device code flow."""
self.query_one("#loading").styles.display = "block"
self.query_one("#status_label").update("Initiating device code flow...")
# Initiate device flow
flow = self.msal_app.initiate_device_flow(scopes=self.scopes)
if "user_code" not in flow:
self.notify("Failed to create device flow", severity="error")
return
# Display the device code message
self.query_one("#auth_message").update(flow["message"])
# Wait for the user to authenticate
token_response = self.msal_app.acquire_token_by_device_flow(flow)
if "access_token" not in token_response:
self.notify("Failed to acquire token", severity="error")
return
# Save token to cache
with open(self.cache_file, "w") as f:
f.write(self.cache.serialize())
self.access_token = token_response["access_token"]
self.is_authenticated = True
# Proceed with loading drives and followed items
self.load_initial_data()
@work
async def load_initial_data(self):
"""Load initial data after authentication."""
self.query_one("#status_label").update("Loading drives...")
# Load drives first
if not self.access_token:
return
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
"https://graph.microsoft.com/v1.0/me/drives",
headers=headers
) as response:
if response.status != 200:
self.notify(f"Failed to load drives: {response.status}", severity="error")
return
drives_data = await response.json()
self.drives = drives_data.get("value", [])
except Exception as e:
self.notify(f"Error loading drives: {str(e)}", severity="error")
# Hide auth container and show content container
self.query_one("#auth_container").add_class("hide")
self.query_one("#content_container").remove_class("hide")
self.query_one("#loading").remove()
# Find and select the OneDrive drive
for drive in self.drives:
if drive.get("name") == "OneDrive":
self.selected_drive_id = drive.get("id")
self.drive_name = drive.get("name")
break
# Set Following as default view
option_list = self.query_one("#view_options")
option_list.highlighted = 0 # Select "Following" option
# If we have a selected drive, load followed items
if self.selected_drive_id:
self.load_followed_items()
async def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None:
"""Handle option list selection."""
selected_option = event.option.id
if selected_option == "following":
self.current_view = "Following"
if self.selected_drive_id:
self.load_followed_items()
elif selected_option == "root":
self.current_view = "Root"
if self.selected_drive_id:
self.load_root_items()
async def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection in the items table."""
selected_id = event.row_key.value
self.open_item(selected_id)
async def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
self.selected_item_id = event.row_key.value
def open_item(self, selected_id: str):
if selected_id:
# Get an item from current items by ID string
selected_row = self.current_items[selected_id]
item_name = selected_row.get("name")
# Check if it's a folder
is_folder = bool(selected_row.get("folder"))
if is_folder:
self.notify(f"Selected folder: {item_name}")
# Load items in the folder
self.query_one("#status_label").update(f"Loading items in folder: {item_name}")
self.load_root_items(folder_id=selected_id, drive_id=selected_row.get("parentReference", {}).get("driveId", self.selected_drive_id))
else:
self.notify(f"Selected file: {item_name}")
self.action_view_document()
@work
async def load_root_items(self, folder_id: str = "", drive_id: str = "", track_history: bool = True):
"""Load root items from the selected drive."""
if not self.access_token or not self.selected_drive_id:
return
self.query_one("#status_label").update("Loading drive folder items...")
headers = {"Authorization": f"Bearer {self.access_token}"}
url = f"https://graph.microsoft.com/v1.0/me/drive/root/children"
if folder_id and drive_id:
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{folder_id}/children"
if track_history:
self.folder_history.append(FolderHistoryEntry(folder_id, self.current_folder_name, drive_id))
self.selected_drive_id = drive_id
self.current_folder_id = folder_id
self.current_folder_name = self.current_items[folder_id].get("name", "Unknown")
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
self.notify(f"Failed to load drive items: {response.status}", severity="error")
return
items_data = await response.json()
# Update the table with the root items
self.update_items_table(items_data.get("value", []))
except Exception as e:
self.notify(f"Error loading root items: {str(e)}", severity="error")
self.query_one("#status_label").update("Ready")
@work
async def load_followed_items(self):
"""Load followed items from the selected drive."""
if not self.access_token or not self.selected_drive_id:
return
self.query_one("#status_label").update("Loading followed items...")
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
url = f"https://graph.microsoft.com/v1.0/me/drive/following"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
self.notify(f"Failed to load followed items: {response.status}", severity="error")
return
items_data = await response.json()
followed_items = items_data.get("value", [])
# Update the table with the followed items
self.update_items_table(followed_items)
except Exception as e:
self.notify(f"Error loading followed items: {str(e)}", severity="error")
self.query_one("#status_label").update("Ready")
def update_items_table(self, items, is_root_view=False):
"""Update the table with the given items."""
table = self.query_one(DataTable)
table.clear()
if not items:
self.query_one("#no_items_label").remove_class("hide")
return
self.query_one("#no_items_label").add_class("hide")
for item in items:
name = item.get("name", "Unknown")
is_folder = bool(item.get("folder"))
# Add folder icon if it's a folder and we're in root view
item_type = "📁" if is_folder else "📄"
# Format the last modified date
last_modified = item.get("lastModifiedDateTime", "")
if last_modified:
try:
date_obj = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
last_modified = date_obj.strftime("%Y-%m-%d %H:%M")
except:
pass
# Format the size
size = item.get("size", 0)
if size:
if size < 1024:
size_str = f"{size} B"
elif size < 1024 * 1024:
size_str = f"{size / 1024:.1f} KB"
elif size < 1024 * 1024 * 1024:
size_str = f"{size / (1024 * 1024):.1f} MB"
else:
size_str = f"{size / (1024 * 1024 * 1024):.1f} GB"
else:
size_str = "N/A"
web_url = item.get("webUrl", "")
item_id = item.get("id")
item_drive_id = item.get("parentReference", {}).get("driveId", self.selected_drive_id)
row_key = table.add_row(item_type, name, last_modified, size_str, web_url, key=item.get("id"))
# add item to to the list of current items keyed by row_key so we can look up all information later
self.current_items[row_key] = item
async def action_next_view(self) -> None:
"""Switch to the next view (Following/Root)."""
option_list = self.query_one("#view_options")
if self.current_view == "Following":
option_list.highlighted = 1 # Switch to Root
self.current_view = "Root"
self.load_root_items()
else:
option_list.highlighted = 0 # Switch to Following
self.current_view = "Following"
self.load_followed_items()
self.notify(f"Switched to {self.current_view} view")
async def action_refresh(self) -> None:
"""Refresh the data."""
if self.is_authenticated and self.selected_drive_id:
if self.current_view == "Following":
self.load_followed_items()
elif self.current_view == "Root":
self.load_root_items()
self.notify("Refreshed items")
async def action_toggle_follow(self) -> None:
"""Toggle follow status for selected item."""
# This would be implemented to follow/unfollow the selected item
# Currently just a placeholder for the key binding
self.notify("Toggle follow functionality not implemented yet")
async def action_open_url(self) -> None:
"""Open the web URL of the selected item."""
table = self.query_one("#items_table")
if table.cursor_row is not None:
selected_row = table.get_row_at(table.cursor_row)
if selected_row and len(selected_row) > 4:
web_url = selected_row[4]
if web_url:
self.notify(f"Opening URL: {web_url}")
# Use Textual's built-in open_url method
self.app.open_url(web_url)
def action_view_document(self) -> None:
"""View the selected document using the DocumentViewerScreen."""
# Get the name of the selected item
selected_row = self.current_items.get(self.selected_item_id)
if not selected_row:
return
selected_name = selected_row.get("name")
drive_id = selected_row.get("parentReference", {}).get("driveId", self.selected_drive_id)
web_url = selected_row.get("webUrl", "")
# Open the document viewer screen with all required details
viewer = DocumentViewerScreen(self.selected_item_id, selected_name, self.access_token, drive_id)
viewer.web_url = web_url # Pass the webUrl to the viewer
self.push_screen(viewer)
async def action_quit(self) -> None:
"""Quit the application."""
self.exit()
async def action_navigate_back(self) -> None:
"""Navigate back to the previous folder."""
if self.folder_history:
previous_entry = self.folder_history.pop()
self.current_folder_id = previous_entry.folder_id
self.current_folder_name = previous_entry.folder_name
self.load_root_items(folder_id=previous_entry.folder_id, drive_id=previous_entry.parent_id, track_history=False)
else:
self.notify("No previous folder to navigate back to")
if __name__ == "__main__":
app = OneDriveTUI()
app.run()