400 lines
15 KiB
Python
400 lines
15 KiB
Python
import os
|
|
from select import select
|
|
import sys
|
|
import json
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
import msal
|
|
import aiohttp
|
|
|
|
|
|
from textual.app import App, ComposeResult
|
|
from textual.binding import Binding
|
|
from textual.containers import Container, Horizontal, Vertical
|
|
from textual.widgets import (
|
|
Header,
|
|
Footer,
|
|
Static,
|
|
Label,
|
|
DataTable,
|
|
Button,
|
|
ListView,
|
|
ListItem,
|
|
LoadingIndicator
|
|
)
|
|
from textual.reactive import reactive
|
|
from textual.worker import Worker, get_current_worker
|
|
from textual import work
|
|
|
|
|
|
class OneDriveTUI(App):
|
|
"""A Textual app for OneDrive integration with MSAL authentication."""
|
|
|
|
CSS_PATH = "drive_view_tui.tcss"
|
|
|
|
# Reactive variables
|
|
is_authenticated = reactive(False)
|
|
selected_drive_id = reactive("")
|
|
drive_name = reactive("")
|
|
|
|
# App bindings
|
|
BINDINGS = [
|
|
Binding("q", "quit", "Quit"),
|
|
Binding("r", "refresh", "Refresh"),
|
|
Binding("f", "toggle_follow", "Toggle Follow"),
|
|
Binding("o", "open_url", "Open URL"),
|
|
|
|
]
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.access_token = None
|
|
self.drives = []
|
|
self.followed_items = []
|
|
self.msal_app = None
|
|
self.cache = None
|
|
# Read Azure app credentials from environment variables
|
|
self.client_id = os.getenv("AZURE_CLIENT_ID")
|
|
self.tenant_id = os.getenv("AZURE_TENANT_ID")
|
|
self.scopes = ["https://graph.microsoft.com/Files.ReadWrite.All"]
|
|
self.cache_file = "token_cache.bin"
|
|
|
|
def compose(self) -> ComposeResult:
|
|
"""Create child widgets for the app."""
|
|
yield Header(show_clock=True)
|
|
|
|
with Container(id="main_container"):
|
|
yield Label("Authenticating with Microsoft Graph API...", id="status_label")
|
|
with Container(id="auth_container"):
|
|
yield Label("", id="auth_message")
|
|
yield Button("Login", id="login_button", variant="primary")
|
|
|
|
with Horizontal(id="content_container", classes="hide"):
|
|
with Vertical(id="drive_container"):
|
|
yield ListView(id="drive_list")
|
|
with Vertical(id="items_container"):
|
|
yield DataTable(id="items_table")
|
|
yield Label("No items found", id="no_items_label", classes="hide")
|
|
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
"""Initialize the app when mounted."""
|
|
self.cache = msal.SerializableTokenCache()
|
|
self.query_one("#auth_container").ALLOW_SELECT = True
|
|
# Initialize the table
|
|
self.query_one("#drive_list").border_title = "Available Drives"
|
|
self.query_one("#content_container").border_title = "Followed Items"
|
|
table = self.query_one("#items_table")
|
|
table.add_columns("Name", "Type", "Last Modified", "Size", "Web URL")
|
|
|
|
# Load cached token if available
|
|
if os.path.exists(self.cache_file):
|
|
with open(self.cache_file, "r") as f:
|
|
self.cache.deserialize(f.read())
|
|
|
|
# Initialize MSAL app
|
|
self.initialize_msal()
|
|
self.notify("Initializing MSAL app...", severity="info")
|
|
|
|
|
|
|
|
def initialize_msal(self) -> None:
|
|
"""Initialize the MSAL application."""
|
|
if not self.client_id or not self.tenant_id:
|
|
self.notify(
|
|
"Please set AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.",
|
|
severity="error",
|
|
timeout=10,
|
|
)
|
|
return
|
|
|
|
authority = f"https://login.microsoftonline.com/{self.tenant_id}"
|
|
self.msal_app = msal.PublicClientApplication(
|
|
self.client_id, authority=authority, token_cache=self.cache
|
|
)
|
|
# Try silent authentication first
|
|
if not self.msal_app:
|
|
return
|
|
|
|
accounts = self.msal_app.get_accounts()
|
|
if accounts:
|
|
self.query_one("#status_label").update("Trying silent authentication...")
|
|
worker = self.get_token_silent(accounts[0])
|
|
worker.wait()
|
|
self.query_one("#status_label").update("Authenticated successfully.")
|
|
self.query_one("#auth_container").add_class("hide")
|
|
self.notify("Authenticated successfully.", severity="success")
|
|
else:
|
|
self.query_one("#status_label").update("Please log in to continue.")
|
|
self.query_one("#auth_container").remove_class("hide")
|
|
|
|
|
|
|
|
@work
|
|
async def get_token_silent(self, account):
|
|
"""Get token silently."""
|
|
token_response = self.msal_app.acquire_token_silent(self.scopes, account=account)
|
|
if token_response and "access_token" in token_response:
|
|
self.access_token = token_response["access_token"]
|
|
self.is_authenticated = True
|
|
self.load_initial_data()
|
|
else:
|
|
self.query_one("#status_label").update("Silent authentication failed. Please log in.")
|
|
self.query_one("#auth_container").remove_class("hide")
|
|
self.query_one("#content_container").loading = False
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
"""Handle button presses."""
|
|
if event.button.id == "login_button":
|
|
self.initiate_device_flow()
|
|
|
|
|
|
def initiate_device_flow(self):
|
|
self.notify("Starting device code flow...", severity="info")
|
|
"""Initiate the MSAL device code flow."""
|
|
self.query_one("#content_container").loading = True
|
|
self.query_one("#status_label").update("Initiating device code flow...")
|
|
|
|
# Initiate device flow
|
|
flow = self.msal_app.initiate_device_flow(scopes=self.scopes)
|
|
|
|
if "user_code" not in flow:
|
|
self.notify("Failed to create device flow", severity="error")
|
|
return
|
|
# self.notify(str(flow), severity="info")
|
|
# Display the device code message
|
|
self.query_one("#auth_message").update(flow["message"])
|
|
self.query_one("#status_label").update("Waiting for authentication...")
|
|
self.wait_for_device_code(flow)
|
|
|
|
|
|
@work(thread=True)
|
|
async def wait_for_device_code(self, flow):
|
|
"""Wait for the user to authenticate using the device code."""
|
|
|
|
# Poll for token acquisition
|
|
result = self.msal_app.acquire_token_by_device_flow(flow)
|
|
if "access_token" in result:
|
|
self.access_token = result["access_token"]
|
|
self.is_authenticated = True
|
|
|
|
elif "error" in result and result["error"] == "authorization_pending":
|
|
# Wait before polling again
|
|
asyncio.sleep(5)
|
|
else:
|
|
self.notify(f"Authentication failed: {result.get('error_description', 'Unknown error')}", severity="error")
|
|
return
|
|
|
|
# Save the token to cache
|
|
with open(self.cache_file, "w") as f:
|
|
f.write(self.cache.serialize())
|
|
|
|
# Load initial data after authentication
|
|
self.load_initial_data()
|
|
|
|
@work
|
|
async def load_initial_data(self):
|
|
"""Load initial data after authentication."""
|
|
|
|
|
|
# Load drives first
|
|
|
|
# Hide auth container and show content container
|
|
self.query_one("#auth_container").add_class("hide")
|
|
self.query_one("#content_container").remove_class("hide")
|
|
self.query_one("#content_container").loading = False
|
|
|
|
worker = self.load_drives()
|
|
await worker.wait()
|
|
# Find and select the OneDrive drive
|
|
for drive in self.drives:
|
|
if drive.get("name") == "OneDrive":
|
|
self.selected_drive_id = drive.get("id")
|
|
self.drive_name = drive.get("name")
|
|
break
|
|
|
|
# If we have a selected drive, load followed items
|
|
if self.selected_drive_id:
|
|
self.load_followed_items()
|
|
|
|
@work
|
|
async def load_drives(self):
|
|
"""Load OneDrive drives."""
|
|
if not self.access_token:
|
|
return
|
|
|
|
headers = {"Authorization": f"Bearer {self.access_token}"}
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
"https://graph.microsoft.com/v1.0/me/drives",
|
|
headers=headers
|
|
) as response:
|
|
if response.status != 200:
|
|
self.notify(f"Failed to load drives: {response.status}", severity="error")
|
|
return
|
|
|
|
drives_data = await response.json()
|
|
self.drives = drives_data.get("value", [])
|
|
for drive in self.drives:
|
|
drive_name = drive.get("name", "Unknown")
|
|
drive_id = drive.get("id", "Unknown")
|
|
# Add the drive to the list
|
|
self.query_one("#drive_list").append(
|
|
ListItem(Label(drive_name))
|
|
)
|
|
|
|
# Update the drives label
|
|
if self.drives:
|
|
self.query_one("#drive_list").border_subtitle = f"Available: {len(self.drives)}"
|
|
except Exception as e:
|
|
self.notify(f"Error loading drives: {str(e)}", severity="error")
|
|
|
|
@work
|
|
async def load_followed_items(self):
|
|
"""Load followed items from the selected drive."""
|
|
if not self.access_token or not self.selected_drive_id:
|
|
return
|
|
|
|
self.query_one("#status_label").update("Loading followed items...")
|
|
headers = {"Authorization": f"Bearer {self.access_token}"}
|
|
|
|
# Update drive label
|
|
self.query_one("#drive_list").index = 0
|
|
|
|
try:
|
|
url = f"https://graph.microsoft.com/v1.0/me/drives/{self.selected_drive_id}/following"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
if response.status != 200:
|
|
self.notify(f"Failed to load followed items: {await response.text()}", severity="error")
|
|
return
|
|
|
|
items_data = await response.json()
|
|
self.followed_items = items_data.get("value", [])
|
|
|
|
# Update the table with the followed items
|
|
# self.update_items_table()
|
|
table = self.query_one("#items_table")
|
|
table.clear()
|
|
|
|
if not self.followed_items:
|
|
self.query_one("#no_items_label").remove_class("hide")
|
|
return
|
|
|
|
self.query_one("#no_items_label").add_class("hide")
|
|
|
|
for item in self.followed_items:
|
|
name = item.get("name", "Unknown")
|
|
item_type = "Folder" if item.get("folder") else "File"
|
|
|
|
# Format the last modified date
|
|
last_modified = item.get("lastModifiedDateTime", "")
|
|
if last_modified:
|
|
try:
|
|
date_obj = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
|
|
last_modified = date_obj.strftime("%Y-%m-%d %H:%M")
|
|
except:
|
|
pass
|
|
|
|
# Format the size
|
|
size = item.get("size", 0)
|
|
if size:
|
|
if size < 1024:
|
|
size_str = f"{size} B"
|
|
elif size < 1024 * 1024:
|
|
size_str = f"{size / 1024:.1f} KB"
|
|
elif size < 1024 * 1024 * 1024:
|
|
size_str = f"{size / (1024 * 1024):.1f} MB"
|
|
else:
|
|
size_str = f"{size / (1024 * 1024 * 1024):.1f} GB"
|
|
else:
|
|
size_str = "N/A"
|
|
|
|
web_url = item.get("webUrl", "")
|
|
|
|
table.add_row(name, item_type, last_modified, size_str, web_url)
|
|
except Exception as e:
|
|
self.notify(f"Error loading followed items: {str(e)}", severity="error")
|
|
|
|
self.query_one("#status_label").update("Ready")
|
|
|
|
async def update_items_table(self):
|
|
|
|
table = self.query_one("#items_table")
|
|
table.clear()
|
|
|
|
if not self.followed_items:
|
|
self.query_one("#no_items_label").remove_class("hide")
|
|
return
|
|
|
|
self.query_one("#no_items_label").add_class("hide")
|
|
|
|
for item in self.followed_items:
|
|
name = item.get("name", "Unknown")
|
|
item_type = "Folder" if item.get("folder") else "File"
|
|
|
|
# Format the last modified date
|
|
last_modified = item.get("lastModifiedDateTime", "")
|
|
if last_modified:
|
|
try:
|
|
date_obj = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
|
|
last_modified = date_obj.strftime("%Y-%m-%d %H:%M")
|
|
except:
|
|
pass
|
|
|
|
# Format the size
|
|
size = item.get("size", 0)
|
|
if size:
|
|
if size < 1024:
|
|
size_str = f"{size} B"
|
|
elif size < 1024 * 1024:
|
|
size_str = f"{size / 1024:.1f} KB"
|
|
elif size < 1024 * 1024 * 1024:
|
|
size_str = f"{size / (1024 * 1024):.1f} MB"
|
|
else:
|
|
size_str = f"{size / (1024 * 1024 * 1024):.1f} GB"
|
|
else:
|
|
size_str = "N/A"
|
|
|
|
web_url = item.get("webUrl", "")
|
|
|
|
table.add_row(name, item_type, last_modified, size_str, web_url)
|
|
|
|
async def action_refresh(self) -> None:
|
|
"""Refresh the data."""
|
|
if self.is_authenticated and self.selected_drive_id:
|
|
self.load_followed_items()
|
|
self.notify("Refreshed followed items")
|
|
|
|
async def action_toggle_follow(self) -> None:
|
|
"""Toggle follow status for selected item."""
|
|
# This would be implemented to follow/unfollow the selected item
|
|
# Currently just a placeholder for the key binding
|
|
self.notify("Toggle follow functionality not implemented yet")
|
|
|
|
async def action_open_url(self) -> None:
|
|
"""Open the web URL of the selected item."""
|
|
table = self.query_one("#items_table")
|
|
if table.cursor_row is not None:
|
|
selected_row = table.get_row_at(table.cursor_row)
|
|
if selected_row and len(selected_row) > 4:
|
|
web_url = selected_row[4]
|
|
if web_url:
|
|
self.notify(f"Opening URL: {web_url}")
|
|
# Use Textual's built-in open_url method instead of os.system
|
|
self.app.open_url(web_url)
|
|
|
|
async def action_quit(self) -> None:
|
|
"""Quit the application."""
|
|
self.exit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = OneDriveTUI()
|
|
app.run()
|