drive viewer
This commit is contained in:
385
drive_view_tui.py
Normal file
385
drive_view_tui.py
Normal file
@@ -0,0 +1,385 @@
|
||||
import os
|
||||
from select import select
|
||||
import sys
|
||||
import json
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
|
||||
import msal
|
||||
import aiohttp
|
||||
|
||||
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.containers import Container, Horizontal, Vertical
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
Static,
|
||||
Label,
|
||||
DataTable,
|
||||
Button,
|
||||
ListView,
|
||||
ListItem,
|
||||
LoadingIndicator
|
||||
)
|
||||
from textual.reactive import reactive
|
||||
from textual.worker import Worker, get_current_worker
|
||||
from textual import work
|
||||
|
||||
|
||||
class OneDriveTUI(App):
|
||||
"""A Textual app for OneDrive integration with MSAL authentication."""
|
||||
|
||||
CSS_PATH = "drive_view_tui.tcss"
|
||||
|
||||
# Reactive variables
|
||||
is_authenticated = reactive(False)
|
||||
selected_drive_id = reactive("")
|
||||
drive_name = reactive("")
|
||||
|
||||
# App bindings
|
||||
BINDINGS = [
|
||||
Binding("q", "quit", "Quit"),
|
||||
Binding("r", "refresh", "Refresh"),
|
||||
Binding("f", "toggle_follow", "Toggle Follow"),
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.access_token = None
|
||||
self.drives = []
|
||||
self.followed_items = []
|
||||
self.msal_app = None
|
||||
self.cache = None
|
||||
# Read Azure app credentials from environment variables
|
||||
self.client_id = os.getenv("AZURE_CLIENT_ID")
|
||||
self.tenant_id = os.getenv("AZURE_TENANT_ID")
|
||||
self.scopes = ["https://graph.microsoft.com/Files.ReadWrite.All"]
|
||||
self.cache_file = "token_cache.bin"
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Create child widgets for the app."""
|
||||
yield Header(show_clock=True)
|
||||
|
||||
with Container(id="main_container"):
|
||||
yield Label("Authenticating with Microsoft Graph API...", id="status_label")
|
||||
with Container(id="auth_container"):
|
||||
yield Label("", id="auth_message")
|
||||
yield Button("Login", id="login_button", variant="primary")
|
||||
|
||||
with Horizontal(id="content_container", classes="hide"):
|
||||
with Vertical(id="drive_container"):
|
||||
yield ListView(id="drive_list")
|
||||
with Vertical(id="items_container"):
|
||||
yield DataTable(id="items_table")
|
||||
yield Label("No items found", id="no_items_label", classes="hide")
|
||||
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Initialize the app when mounted."""
|
||||
self.cache = msal.SerializableTokenCache()
|
||||
self.query_one("#auth_container").ALLOW_SELECT = True
|
||||
# Initialize the table
|
||||
self.query_one("#drive_list").border_title = "Available Drives"
|
||||
self.query_one("#content_container").border_title = "Followed Items"
|
||||
table = self.query_one("#items_table")
|
||||
table.add_columns("Name", "Type", "Last Modified", "Size", "Web URL")
|
||||
|
||||
# Load cached token if available
|
||||
if os.path.exists(self.cache_file):
|
||||
with open(self.cache_file, "r") as f:
|
||||
self.cache.deserialize(f.read())
|
||||
|
||||
# Initialize MSAL app
|
||||
self.initialize_msal()
|
||||
self.notify("Initializing MSAL app...", severity="info")
|
||||
|
||||
|
||||
|
||||
def initialize_msal(self) -> None:
|
||||
"""Initialize the MSAL application."""
|
||||
if not self.client_id or not self.tenant_id:
|
||||
self.notify(
|
||||
"Please set AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.",
|
||||
severity="error",
|
||||
timeout=10,
|
||||
)
|
||||
return
|
||||
|
||||
authority = f"https://login.microsoftonline.com/{self.tenant_id}"
|
||||
self.msal_app = msal.PublicClientApplication(
|
||||
self.client_id, authority=authority, token_cache=self.cache
|
||||
)
|
||||
# Try silent authentication first
|
||||
if not self.msal_app:
|
||||
return
|
||||
|
||||
accounts = self.msal_app.get_accounts()
|
||||
if accounts:
|
||||
self.query_one("#status_label").update("Trying silent authentication...")
|
||||
worker = self.get_token_silent(accounts[0])
|
||||
worker.wait()
|
||||
self.query_one("#status_label").update("Authenticated successfully.")
|
||||
self.query_one("#auth_container").add_class("hide")
|
||||
self.notify("Authenticated successfully.", severity="success")
|
||||
else:
|
||||
self.query_one("#status_label").update("Please log in to continue.")
|
||||
self.query_one("#auth_container").remove_class("hide")
|
||||
|
||||
|
||||
|
||||
@work
|
||||
async def get_token_silent(self, account):
|
||||
"""Get token silently."""
|
||||
token_response = self.msal_app.acquire_token_silent(self.scopes, account=account)
|
||||
if token_response and "access_token" in token_response:
|
||||
self.access_token = token_response["access_token"]
|
||||
self.is_authenticated = True
|
||||
self.load_initial_data()
|
||||
else:
|
||||
self.query_one("#status_label").update("Silent authentication failed. Please log in.")
|
||||
self.query_one("#auth_container").remove_class("hide")
|
||||
self.query_one("#content_container").loading = False
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button presses."""
|
||||
if event.button.id == "login_button":
|
||||
self.initiate_device_flow()
|
||||
|
||||
|
||||
def initiate_device_flow(self):
|
||||
self.notify("Starting device code flow...", severity="info")
|
||||
"""Initiate the MSAL device code flow."""
|
||||
self.query_one("#content_container").loading = True
|
||||
self.query_one("#status_label").update("Initiating device code flow...")
|
||||
|
||||
# Initiate device flow
|
||||
flow = self.msal_app.initiate_device_flow(scopes=self.scopes)
|
||||
|
||||
if "user_code" not in flow:
|
||||
self.notify("Failed to create device flow", severity="error")
|
||||
return
|
||||
# self.notify(str(flow), severity="info")
|
||||
# Display the device code message
|
||||
self.query_one("#auth_message").update(flow["message"])
|
||||
self.query_one("#status_label").update("Waiting for authentication...")
|
||||
self.wait_for_device_code(flow)
|
||||
|
||||
|
||||
@work(thread=True)
|
||||
async def wait_for_device_code(self, flow):
|
||||
"""Wait for the user to authenticate using the device code."""
|
||||
|
||||
# Poll for token acquisition
|
||||
result = self.msal_app.acquire_token_by_device_flow(flow)
|
||||
if "access_token" in result:
|
||||
self.access_token = result["access_token"]
|
||||
self.is_authenticated = True
|
||||
|
||||
elif "error" in result and result["error"] == "authorization_pending":
|
||||
# Wait before polling again
|
||||
asyncio.sleep(5)
|
||||
else:
|
||||
self.notify(f"Authentication failed: {result.get('error_description', 'Unknown error')}", severity="error")
|
||||
return
|
||||
|
||||
# Save the token to cache
|
||||
with open(self.cache_file, "w") as f:
|
||||
f.write(self.cache.serialize())
|
||||
|
||||
# Load initial data after authentication
|
||||
self.load_initial_data()
|
||||
|
||||
@work
|
||||
async def load_initial_data(self):
|
||||
"""Load initial data after authentication."""
|
||||
|
||||
|
||||
# Load drives first
|
||||
|
||||
# Hide auth container and show content container
|
||||
self.query_one("#auth_container").add_class("hide")
|
||||
self.query_one("#content_container").remove_class("hide")
|
||||
self.query_one("#content_container").loading = False
|
||||
|
||||
worker = self.load_drives()
|
||||
await worker.wait()
|
||||
# Find and select the OneDrive drive
|
||||
for drive in self.drives:
|
||||
if drive.get("name") == "OneDrive":
|
||||
self.selected_drive_id = drive.get("id")
|
||||
self.drive_name = drive.get("name")
|
||||
break
|
||||
|
||||
# If we have a selected drive, load followed items
|
||||
if self.selected_drive_id:
|
||||
self.load_followed_items()
|
||||
|
||||
@work
|
||||
async def load_drives(self):
|
||||
"""Load OneDrive drives."""
|
||||
if not self.access_token:
|
||||
return
|
||||
|
||||
headers = {"Authorization": f"Bearer {self.access_token}"}
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
"https://graph.microsoft.com/v1.0/me/drives",
|
||||
headers=headers
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
self.notify(f"Failed to load drives: {response.status}", severity="error")
|
||||
return
|
||||
|
||||
drives_data = await response.json()
|
||||
self.drives = drives_data.get("value", [])
|
||||
for drive in self.drives:
|
||||
drive_name = drive.get("name", "Unknown")
|
||||
drive_id = drive.get("id", "Unknown")
|
||||
# Add the drive to the list
|
||||
self.query_one("#drive_list").append(
|
||||
ListItem(Label(drive_name))
|
||||
)
|
||||
|
||||
# Update the drives label
|
||||
if self.drives:
|
||||
self.query_one("#drive_list").border_subtitle = f"Available: {len(self.drives)}"
|
||||
except Exception as e:
|
||||
self.notify(f"Error loading drives: {str(e)}", severity="error")
|
||||
|
||||
@work
|
||||
async def load_followed_items(self):
|
||||
"""Load followed items from the selected drive."""
|
||||
if not self.access_token or not self.selected_drive_id:
|
||||
return
|
||||
|
||||
self.query_one("#status_label").update("Loading followed items...")
|
||||
headers = {"Authorization": f"Bearer {self.access_token}"}
|
||||
|
||||
# Update drive label
|
||||
self.query_one("#drive_list").index = 0
|
||||
|
||||
try:
|
||||
url = f"https://graph.microsoft.com/v1.0/me/drives/{self.selected_drive_id}/following"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
self.notify(f"Failed to load followed items: {await response.text()}", severity="error")
|
||||
return
|
||||
|
||||
items_data = await response.json()
|
||||
self.followed_items = items_data.get("value", [])
|
||||
|
||||
# Update the table with the followed items
|
||||
# self.update_items_table()
|
||||
table = self.query_one("#items_table")
|
||||
table.clear()
|
||||
|
||||
if not self.followed_items:
|
||||
self.query_one("#no_items_label").remove_class("hide")
|
||||
return
|
||||
|
||||
self.query_one("#no_items_label").add_class("hide")
|
||||
|
||||
for item in self.followed_items:
|
||||
name = item.get("name", "Unknown")
|
||||
item_type = "Folder" if item.get("folder") else "File"
|
||||
|
||||
# Format the last modified date
|
||||
last_modified = item.get("lastModifiedDateTime", "")
|
||||
if last_modified:
|
||||
try:
|
||||
date_obj = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
|
||||
last_modified = date_obj.strftime("%Y-%m-%d %H:%M")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Format the size
|
||||
size = item.get("size", 0)
|
||||
if size:
|
||||
if size < 1024:
|
||||
size_str = f"{size} B"
|
||||
elif size < 1024 * 1024:
|
||||
size_str = f"{size / 1024:.1f} KB"
|
||||
elif size < 1024 * 1024 * 1024:
|
||||
size_str = f"{size / (1024 * 1024):.1f} MB"
|
||||
else:
|
||||
size_str = f"{size / (1024 * 1024 * 1024):.1f} GB"
|
||||
else:
|
||||
size_str = "N/A"
|
||||
|
||||
web_url = item.get("webUrl", "")
|
||||
|
||||
table.add_row(name, item_type, last_modified, size_str, web_url)
|
||||
except Exception as e:
|
||||
self.notify(f"Error loading followed items: {str(e)}", severity="error")
|
||||
|
||||
self.query_one("#status_label").update("Ready")
|
||||
|
||||
async def update_items_table(self):
|
||||
|
||||
table = self.query_one("#items_table")
|
||||
table.clear()
|
||||
|
||||
if not self.followed_items:
|
||||
self.query_one("#no_items_label").remove_class("hide")
|
||||
return
|
||||
|
||||
self.query_one("#no_items_label").add_class("hide")
|
||||
|
||||
for item in self.followed_items:
|
||||
name = item.get("name", "Unknown")
|
||||
item_type = "Folder" if item.get("folder") else "File"
|
||||
|
||||
# Format the last modified date
|
||||
last_modified = item.get("lastModifiedDateTime", "")
|
||||
if last_modified:
|
||||
try:
|
||||
date_obj = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
|
||||
last_modified = date_obj.strftime("%Y-%m-%d %H:%M")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Format the size
|
||||
size = item.get("size", 0)
|
||||
if size:
|
||||
if size < 1024:
|
||||
size_str = f"{size} B"
|
||||
elif size < 1024 * 1024:
|
||||
size_str = f"{size / 1024:.1f} KB"
|
||||
elif size < 1024 * 1024 * 1024:
|
||||
size_str = f"{size / (1024 * 1024):.1f} MB"
|
||||
else:
|
||||
size_str = f"{size / (1024 * 1024 * 1024):.1f} GB"
|
||||
else:
|
||||
size_str = "N/A"
|
||||
|
||||
web_url = item.get("webUrl", "")
|
||||
|
||||
table.add_row(name, item_type, last_modified, size_str, web_url)
|
||||
|
||||
async def action_refresh(self) -> None:
|
||||
"""Refresh the data."""
|
||||
if self.is_authenticated and self.selected_drive_id:
|
||||
self.load_followed_items()
|
||||
self.notify("Refreshed followed items")
|
||||
|
||||
async def action_toggle_follow(self) -> None:
|
||||
"""Toggle follow status for selected item."""
|
||||
# This would be implemented to follow/unfollow the selected item
|
||||
# Currently just a placeholder for the key binding
|
||||
self.notify("Toggle follow functionality not implemented yet")
|
||||
|
||||
async def action_quit(self) -> None:
|
||||
"""Quit the application."""
|
||||
self.exit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = OneDriveTUI()
|
||||
app.run()
|
||||
Reference in New Issue
Block a user