Files
luk/drive_view_tui.py
2025-07-15 23:39:53 -04:00

569 lines
21 KiB
Python

import os
import sys
import logging
from datetime import datetime
import msal
import aiohttp
# Suppress debug logging from authentication and HTTP libraries
logging.getLogger("msal").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("requests").setLevel(logging.ERROR)
logging.getLogger("requests_oauthlib").setLevel(logging.ERROR)
logging.getLogger("aiohttp").setLevel(logging.ERROR)
logging.getLogger("aiohttp.access").setLevel(logging.ERROR)
logging.getLogger("asyncio").setLevel(logging.ERROR)
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import (
Header,
Footer,
Label,
DataTable,
Button,
LoadingIndicator,
OptionList,
)
from textual.reactive import reactive
from textual import work
from textual.widgets.option_list import Option
# Import file icons utility - note the updated import
from utils.file_icons import get_file_icon
# Import our DocumentViewerScreen
sys.path.append(os.path.join(os.path.dirname(__file__), "maildir_gtd"))
from maildir_gtd.screens.DocumentViewer import DocumentViewerScreen
class FolderHistoryEntry:
"""Represents an entry in the folder navigation history."""
def __init__(self, folder_id: str, folder_name: str, parent_id: str = ""):
self.folder_id = folder_id
self.folder_name = folder_name
self.parent_id = parent_id
def __eq__(self, other):
if not isinstance(other, FolderHistoryEntry):
return False
return self.folder_id == other.folder_id
class OneDriveTUI(App):
"""A Textual app for OneDrive integration with MSAL authentication."""
CSS_PATH = "drive_view_tui.tcss"
# Reactive variables
is_authenticated = reactive(False)
selected_drive_id = reactive("")
drive_name = reactive("")
current_view = reactive("Following") # Track current view: "Following" or "Root"
current_folder_id = reactive("root") # Track current folder ID
current_folder_name = reactive("Root") # Track current folder name
# App bindings
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("r", "refresh", "Refresh"),
Binding("f", "toggle_follow", "Toggle Follow"),
Binding("o", "open_url", "Open URL"),
Binding("enter", "open_url", "Open URL"),
Binding("v", "view_document", "View Document"),
Binding("tab", "next_view", "Switch View"),
Binding("backspace", "navigate_back", "Back"),
Binding("b", "navigate_back", "Back"),
]
def __init__(self):
super().__init__()
self.access_token = None
self.drives = []
self.followed_items = []
self.current_items = {} # Store currently displayed items
self.folder_history = [] # History stack for folder navigation
self.msal_app = None
self.cache = msal.SerializableTokenCache()
# Read Azure app credentials from environment variables
self.client_id = os.getenv("AZURE_CLIENT_ID")
self.tenant_id = os.getenv("AZURE_TENANT_ID")
self.scopes = ["https://graph.microsoft.com/Files.ReadWrite.All"]
self.cache_file = "token_cache.bin"
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header(show_clock=True)
with Container(id="main_container"):
with Horizontal(id="top_bar"):
yield Button("\uf148 Up", id="back_button", classes="hide")
yield Label(
"Authenticating with Microsoft Graph API...", id="status_label"
)
yield LoadingIndicator(id="loading")
yield OptionList(
Option("Following", id="following"),
Option("Root", id="root"),
id="view_options",
)
with Container(id="auth_container"):
yield Label("", id="auth_message")
yield Button("Login", id="login_button", variant="primary")
with Container(id="content_container", classes="hide"):
with Vertical(id="items_container"):
yield DataTable(id="items_table")
yield Label("No items found", id="no_items_label", classes="hide")
yield Footer()
async def on_mount(self) -> None:
"""Initialize the app when mounted."""
self.query_one("#login_button").styles.width = "20"
self.query_one("#view_options").border_title = "My Files"
# Initialize the table
table = self.query_one("#items_table", DataTable)
table.cursor_type = "row"
table.add_columns("", "Name", "Last Modified", "Size", "Web URL")
table.focus()
# Load cached token if available
if os.path.exists(self.cache_file):
with open(self.cache_file, "r") as f:
self.cache.deserialize(f.read())
# Initialize MSAL app
self.initialize_msal()
# Try silent authentication first
self.authenticate_silent()
def initialize_msal(self) -> None:
"""Initialize the MSAL application."""
if not self.client_id or not self.tenant_id:
self.notify(
"Please set AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.",
severity="error",
timeout=10,
)
return
authority = f"https://login.microsoftonline.com/{self.tenant_id}"
self.msal_app = msal.PublicClientApplication(
self.client_id, authority=authority, token_cache=self.cache
)
def authenticate_silent(self) -> None:
"""Try silent authentication first."""
if not self.msal_app:
return
accounts = self.msal_app.get_accounts()
if accounts:
self.query_one("#status_label").update("Trying silent authentication...")
self.get_token_silent(accounts[0])
else:
self.query_one("#status_label").update("Please log in to continue.")
self.query_one("#auth_container").remove_class("hide")
self.query_one("#loading").remove()
@work
async def get_token_silent(self, account):
"""Get token silently."""
token_response = self.msal_app.acquire_token_silent(
self.scopes, account=account
)
if token_response and "access_token" in token_response:
self.access_token = token_response["access_token"]
self.is_authenticated = True
self.load_initial_data()
else:
self.query_one("#status_label").update(
"Silent authentication failed. Please log in."
)
self.query_one("#auth_container").remove_class("hide")
self.query_one("#loading").remove()
async def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "back_button":
self.action_navigate_back()
if event.button.id == "login_button":
self.initiate_device_flow()
@work
async def initiate_device_flow(self):
"""Initiate the MSAL device code flow."""
self.query_one("#loading").styles.display = "block"
self.query_one("#status_label").update("Initiating device code flow...")
# Initiate device flow
flow = self.msal_app.initiate_device_flow(scopes=self.scopes)
if "user_code" not in flow:
self.notify("Failed to create device flow", severity="error")
return
# Display the device code message
self.query_one("#auth_message").update(flow["message"])
# Wait for the user to authenticate
token_response = self.msal_app.acquire_token_by_device_flow(flow)
if "access_token" not in token_response:
self.notify("Failed to acquire token", severity="error")
return
# Save token to cache
with open(self.cache_file, "w") as f:
f.write(self.cache.serialize())
self.access_token = token_response["access_token"]
self.is_authenticated = True
# Proceed with loading drives and followed items
self.load_initial_data()
@work
async def load_initial_data(self):
"""Load initial data after authentication."""
self.query_one("#status_label").update("Loading drives...")
# Load drives first
if not self.access_token:
return
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
"https://graph.microsoft.com/v1.0/me/drives", headers=headers
) as response:
if response.status != 200:
self.notify(
f"Failed to load drives: {response.status}",
severity="error",
)
return
drives_data = await response.json()
self.drives = drives_data.get("value", [])
except Exception as e:
self.notify(f"Error loading drives: {str(e)}", severity="error")
# Hide auth container and show content container
self.query_one("#auth_container").add_class("hide")
self.query_one("#content_container").remove_class("hide")
self.query_one("#loading").remove()
# Find and select the OneDrive drive
for drive in self.drives:
if drive.get("name") == "OneDrive":
self.selected_drive_id = drive.get("id")
self.drive_name = drive.get("name")
break
# Set Following as default view
option_list = self.query_one("#view_options")
option_list.highlighted = 0 # Select "Following" option
# If we have a selected drive, load followed items
if self.selected_drive_id:
self.load_followed_items()
async def on_option_list_option_selected(
self, event: OptionList.OptionSelected
) -> None:
"""Handle option list selection."""
selected_option = event.option.id
if selected_option == "following":
self.current_view = "Following"
if self.selected_drive_id:
self.load_followed_items()
elif selected_option == "root":
self.current_view = "Root"
if self.selected_drive_id:
self.load_drive_folder_items()
async def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection in the items table."""
selected_id = event.row_key.value
self.open_item(selected_id)
async def on_data_table_row_highlighted(
self, event: DataTable.RowHighlighted
) -> None:
self.selected_item_id = event.row_key.value
def open_item(self, selected_id: str):
if selected_id:
self.folder_history.append(
FolderHistoryEntry(
self.current_folder_id,
self.current_folder_name,
self.selected_drive_id,
)
)
# Get an item from current items by ID string
selected_row = self.current_items[selected_id]
item_name = selected_row.get("name")
# Check if it's a folder
is_folder = bool(selected_row.get("folder"))
if is_folder:
self.notify(f"Selected folder: {item_name}", timeout=1)
# Load items in the folder
self.query_one("#back_button").remove_class("hide")
self.query_one("#status_label").update(
f"Loading items in folder: {item_name}"
)
self.load_drive_folder_items(
folder_id=selected_id,
drive_id=selected_row.get("parentReference", {}).get(
"driveId", self.selected_drive_id
),
)
else:
self.notify(f"Selected file: {item_name}")
self.action_view_document()
@work
async def load_drive_folder_items(
self, folder_id: str = "", drive_id: str = "", track_history: bool = True
):
"""Load root items from the selected drive."""
if not self.access_token or not self.selected_drive_id:
return
self.query_one("#status_label").update("Loading drive folder items...")
headers = {"Authorization": f"Bearer {self.access_token}"}
url = "https://graph.microsoft.com/v1.0/me/drive/root/children"
if folder_id and drive_id and folder_id != "root":
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{folder_id}/children"
# if track_history:
# self.folder_history.append(FolderHistoryEntry(folder_id, self.current_folder_name, drive_id))
self.selected_drive_id = drive_id
self.current_folder_id = folder_id
self.current_folder_name = self.current_items[folder_id].get(
"name", "Unknown"
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
self.notify(
f"Failed to load drive items: {response.status}",
severity="error",
)
return
items_data = await response.json()
# Update the table with the root items
self.update_items_table(items_data.get("value", []))
except Exception as e:
self.notify(f"Error loading root items: {str(e)}", severity="error")
# update the status label with breadcrumbs from the folder_history
if self.folder_history:
breadcrumbs = " / \uf07b ".join(
[entry.folder_name for entry in self.folder_history]
)
self.query_one("#status_label").update(
f"\uf07b {breadcrumbs} / \uf07b {self.current_folder_name}"
)
else:
self.query_one("#status_label").update(
f" \uf07b {self.current_folder_name}"
)
@work
async def load_followed_items(self):
"""Load followed items from the selected drive."""
if not self.access_token or not self.selected_drive_id:
return
self.query_one("#status_label").update("Loading followed items...")
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
url = "https://graph.microsoft.com/v1.0/me/drive/following"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
self.notify(
f"Failed to load followed items: {response.status}",
severity="error",
)
return
items_data = await response.json()
followed_items = items_data.get("value", [])
# Update the table with the followed items
self.update_items_table(followed_items)
except Exception as e:
self.notify(f"Error loading followed items: {str(e)}", severity="error")
self.query_one("#status_label").update("Ready")
def update_items_table(self, items, is_root_view=False):
"""Update the table with the given items."""
table = self.query_one(DataTable)
table.clear()
if not items:
self.query_one("#no_items_label").remove_class("hide")
return
self.query_one("#no_items_label").add_class("hide")
for item in items:
name = item.get("name", "Unknown")
is_folder = bool(item.get("folder"))
# Get icon for the file type
item_type = get_file_icon(name, is_folder)
# Format the last modified date
last_modified = item.get("lastModifiedDateTime", "")
if last_modified:
try:
date_obj = datetime.fromisoformat(
last_modified.replace("Z", "+00:00")
)
last_modified = date_obj.strftime("%Y-%m-%d %H:%M")
except:
pass
# Format the size
size = item.get("size", 0)
if size:
if size < 1024:
size_str = f"{size} B"
elif size < 1024 * 1024:
size_str = f"{size / 1024:.1f} KB"
elif size < 1024 * 1024 * 1024:
size_str = f"{size / (1024 * 1024):.1f} MB"
else:
size_str = f"{size / (1024 * 1024 * 1024):.1f} GB"
else:
size_str = "N/A"
web_url = item.get("webUrl", "")
# Limit filename length to 160 characters
display_name = name[:50] + "..." if len(name) > 50 else name
# Add row to table with the appropriate icon class for styling
row_key = table.add_row(
item_type,
display_name,
last_modified,
size_str,
web_url,
key=item.get("id"),
)
# Add item to the list of current items keyed by row_key so we can look up all information later
self.current_items[row_key] = item
async def action_next_view(self) -> None:
"""Switch to the next view (Following/Root)."""
option_list = self.query_one("#view_options")
if self.current_view == "Following":
option_list.highlighted = 1 # Switch to Root
self.current_view = "Root"
self.load_drive_folder_items()
else:
option_list.highlighted = 0 # Switch to Following
self.current_view = "Following"
self.load_followed_items()
self.notify(f"Switched to {self.current_view} view")
async def action_refresh(self) -> None:
"""Refresh the data."""
if self.is_authenticated and self.selected_drive_id:
if self.current_view == "Following":
self.load_followed_items()
elif self.current_view == "Root":
self.load_drive_folder_items()
self.notify("Refreshed items")
async def action_toggle_follow(self) -> None:
"""Toggle follow status for selected item."""
# This would be implemented to follow/unfollow the selected item
# Currently just a placeholder for the key binding
self.notify("Toggle follow functionality not implemented yet")
async def action_open_url(self) -> None:
"""Open the web URL of the selected item."""
table = self.query_one("#items_table")
if table.cursor_row is not None:
selected_row = table.get_row_at(table.cursor_row)
if selected_row and len(selected_row) > 4:
web_url = selected_row[4]
if web_url:
self.notify(f"Opening URL: {web_url}")
# Use Textual's built-in open_url method
self.app.open_url(web_url)
def action_view_document(self) -> None:
"""View the selected document using the DocumentViewerScreen."""
# Get the name of the selected item
selected_row = self.current_items.get(self.selected_item_id)
if not selected_row:
return
selected_name = selected_row.get("name")
drive_id = selected_row.get("parentReference", {}).get(
"driveId", self.selected_drive_id
)
web_url = selected_row.get("webUrl", "")
# Open the document viewer screen with all required details
viewer = DocumentViewerScreen(
self.selected_item_id, selected_name, self.access_token, drive_id
)
viewer.web_url = web_url # Pass the webUrl to the viewer
self.push_screen(viewer)
async def action_quit(self) -> None:
"""Quit the application."""
self.exit()
def action_navigate_back(self) -> None:
"""Navigate back to the previous folder."""
if self.folder_history:
previous_entry = self.folder_history.pop()
self.current_folder_id = previous_entry.folder_id
self.current_folder_name = previous_entry.folder_name
if len(self.folder_history) <= 0:
self.query_one("#back_button").add_class("hide")
self.load_drive_folder_items(
folder_id=previous_entry.folder_id,
drive_id=previous_entry.parent_id,
track_history=False,
)
else:
self.notify("No previous folder to navigate back to")
if __name__ == "__main__":
app = OneDriveTUI()
app.run()