496 lines
19 KiB
Python
496 lines
19 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import asyncio
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import msal
|
|
import aiohttp
|
|
from rich.panel import Panel
|
|
from rich import print as rprint
|
|
|
|
from textual.app import App, ComposeResult
|
|
from textual.binding import Binding
|
|
from textual.containers import Container, Horizontal, Vertical
|
|
from textual.widgets import (
|
|
Header,
|
|
Footer,
|
|
Static,
|
|
Label,
|
|
DataTable,
|
|
Button,
|
|
ListView,
|
|
ListItem,
|
|
LoadingIndicator,
|
|
OptionList
|
|
)
|
|
from textual.reactive import reactive
|
|
from textual.worker import Worker, get_current_worker
|
|
from textual import work
|
|
from textual.widgets.option_list import Option
|
|
|
|
# Import our DocumentViewerScreen
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), "maildir_gtd"))
|
|
from maildir_gtd.screens.DocumentViewer import DocumentViewerScreen
|
|
|
|
|
|
class FolderHistoryEntry:
|
|
"""Represents an entry in the folder navigation history."""
|
|
|
|
def __init__(self, folder_id: str, folder_name: str, parent_id: str = None):
|
|
self.folder_id = folder_id
|
|
self.folder_name = folder_name
|
|
self.parent_id = parent_id
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, FolderHistoryEntry):
|
|
return False
|
|
return self.folder_id == other.folder_id
|
|
|
|
|
|
class OneDriveTUI(App):
|
|
"""A Textual app for OneDrive integration with MSAL authentication."""
|
|
|
|
CSS_PATH = "drive_view_tui.tcss"
|
|
|
|
# Reactive variables
|
|
is_authenticated = reactive(False)
|
|
selected_drive_id = reactive("")
|
|
drive_name = reactive("")
|
|
current_view = reactive("Following") # Track current view: "Following" or "Root"
|
|
current_folder_id = reactive("root") # Track current folder ID
|
|
current_folder_name = reactive("Root") # Track current folder name
|
|
|
|
# App bindings
|
|
BINDINGS = [
|
|
Binding("q", "quit", "Quit"),
|
|
Binding("r", "refresh", "Refresh"),
|
|
Binding("f", "toggle_follow", "Toggle Follow"),
|
|
Binding("o", "open_url", "Open URL"),
|
|
Binding("enter", "open_url", "Open URL"),
|
|
Binding("v", "view_document", "View Document"),
|
|
Binding("tab", "next_view", "Switch View"),
|
|
Binding("backspace", "navigate_back", "Back"),
|
|
Binding("b", "navigate_back", "Back"),
|
|
]
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.access_token = None
|
|
self.drives = []
|
|
self.followed_items = []
|
|
self.current_items = {} # Store currently displayed items
|
|
self.folder_history = [] # History stack for folder navigation
|
|
self.msal_app = None
|
|
self.cache = msal.SerializableTokenCache()
|
|
# Read Azure app credentials from environment variables
|
|
self.client_id = os.getenv("AZURE_CLIENT_ID")
|
|
self.tenant_id = os.getenv("AZURE_TENANT_ID")
|
|
|
|
self.scopes = ["https://graph.microsoft.com/Files.ReadWrite.All"]
|
|
self.cache_file = "token_cache.bin"
|
|
|
|
def compose(self) -> ComposeResult:
|
|
"""Create child widgets for the app."""
|
|
yield Header(show_clock=True)
|
|
|
|
with Container(id="main_container"):
|
|
yield LoadingIndicator(id="loading")
|
|
yield Label("Authenticating with Microsoft Graph API...", id="status_label")
|
|
|
|
with Container(id="auth_container"):
|
|
yield Label("", id="auth_message")
|
|
yield Button("Login", id="login_button", variant="primary")
|
|
|
|
with Container(id="content_container", classes="hide"):
|
|
with Horizontal():
|
|
with Vertical(id="navigation_container"):
|
|
|
|
yield OptionList(
|
|
Option("Following", id="following"),
|
|
Option("Root", id="root"),
|
|
id="view_options"
|
|
)
|
|
|
|
with Vertical(id="items_container"):
|
|
yield DataTable(id="items_table")
|
|
yield Label("No items found", id="no_items_label", classes="hide")
|
|
|
|
yield Footer()
|
|
|
|
async def on_mount(self) -> None:
|
|
"""Initialize the app when mounted."""
|
|
self.query_one("#login_button").styles.width = "20"
|
|
self.query_one("#view_options").border_title = "My Files"
|
|
# Initialize the table
|
|
table = self.query_one("#items_table")
|
|
table.cursor_type = "row"
|
|
table.add_columns("Type", "Name", "Last Modified", "Size", "Web URL")
|
|
|
|
# Load cached token if available
|
|
if os.path.exists(self.cache_file):
|
|
with open(self.cache_file, "r") as f:
|
|
self.cache.deserialize(f.read())
|
|
|
|
# Initialize MSAL app
|
|
self.initialize_msal()
|
|
|
|
# Try silent authentication first
|
|
self.authenticate_silent()
|
|
|
|
def initialize_msal(self) -> None:
|
|
"""Initialize the MSAL application."""
|
|
if not self.client_id or not self.tenant_id:
|
|
self.notify(
|
|
"Please set AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.",
|
|
severity="error",
|
|
timeout=10,
|
|
)
|
|
return
|
|
|
|
authority = f"https://login.microsoftonline.com/{self.tenant_id}"
|
|
self.msal_app = msal.PublicClientApplication(
|
|
self.client_id, authority=authority, token_cache=self.cache
|
|
)
|
|
|
|
def authenticate_silent(self) -> None:
|
|
"""Try silent authentication first."""
|
|
if not self.msal_app:
|
|
return
|
|
|
|
accounts = self.msal_app.get_accounts()
|
|
if accounts:
|
|
self.query_one("#status_label").update("Trying silent authentication...")
|
|
self.get_token_silent(accounts[0])
|
|
else:
|
|
self.query_one("#status_label").update("Please log in to continue.")
|
|
self.query_one("#auth_container").remove_class("hide")
|
|
self.query_one("#loading").remove()
|
|
|
|
@work
|
|
async def get_token_silent(self, account):
|
|
"""Get token silently."""
|
|
token_response = self.msal_app.acquire_token_silent(self.scopes, account=account)
|
|
if token_response and "access_token" in token_response:
|
|
self.access_token = token_response["access_token"]
|
|
self.is_authenticated = True
|
|
self.load_initial_data()
|
|
else:
|
|
self.query_one("#status_label").update("Silent authentication failed. Please log in.")
|
|
self.query_one("#auth_container").remove_class("hide")
|
|
self.query_one("#loading").remove()
|
|
|
|
async def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
"""Handle button presses."""
|
|
if event.button.id == "login_button":
|
|
self.initiate_device_flow()
|
|
|
|
@work
|
|
async def initiate_device_flow(self):
|
|
"""Initiate the MSAL device code flow."""
|
|
self.query_one("#loading").styles.display = "block"
|
|
self.query_one("#status_label").update("Initiating device code flow...")
|
|
|
|
# Initiate device flow
|
|
flow = self.msal_app.initiate_device_flow(scopes=self.scopes)
|
|
|
|
if "user_code" not in flow:
|
|
self.notify("Failed to create device flow", severity="error")
|
|
return
|
|
|
|
# Display the device code message
|
|
self.query_one("#auth_message").update(flow["message"])
|
|
|
|
# Wait for the user to authenticate
|
|
token_response = self.msal_app.acquire_token_by_device_flow(flow)
|
|
|
|
if "access_token" not in token_response:
|
|
self.notify("Failed to acquire token", severity="error")
|
|
return
|
|
|
|
# Save token to cache
|
|
with open(self.cache_file, "w") as f:
|
|
f.write(self.cache.serialize())
|
|
|
|
self.access_token = token_response["access_token"]
|
|
self.is_authenticated = True
|
|
|
|
# Proceed with loading drives and followed items
|
|
self.load_initial_data()
|
|
|
|
@work
|
|
async def load_initial_data(self):
|
|
"""Load initial data after authentication."""
|
|
self.query_one("#status_label").update("Loading drives...")
|
|
|
|
# Load drives first
|
|
if not self.access_token:
|
|
return
|
|
|
|
headers = {"Authorization": f"Bearer {self.access_token}"}
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
"https://graph.microsoft.com/v1.0/me/drives",
|
|
headers=headers
|
|
) as response:
|
|
if response.status != 200:
|
|
self.notify(f"Failed to load drives: {response.status}", severity="error")
|
|
return
|
|
|
|
drives_data = await response.json()
|
|
self.drives = drives_data.get("value", [])
|
|
except Exception as e:
|
|
self.notify(f"Error loading drives: {str(e)}", severity="error")
|
|
|
|
# Hide auth container and show content container
|
|
self.query_one("#auth_container").add_class("hide")
|
|
self.query_one("#content_container").remove_class("hide")
|
|
self.query_one("#loading").remove()
|
|
|
|
# Find and select the OneDrive drive
|
|
for drive in self.drives:
|
|
if drive.get("name") == "OneDrive":
|
|
self.selected_drive_id = drive.get("id")
|
|
self.drive_name = drive.get("name")
|
|
break
|
|
|
|
# Set Following as default view
|
|
option_list = self.query_one("#view_options")
|
|
option_list.highlighted = 0 # Select "Following" option
|
|
|
|
# If we have a selected drive, load followed items
|
|
if self.selected_drive_id:
|
|
self.load_followed_items()
|
|
|
|
async def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None:
|
|
"""Handle option list selection."""
|
|
selected_option = event.option.id
|
|
if selected_option == "following":
|
|
self.current_view = "Following"
|
|
if self.selected_drive_id:
|
|
self.load_followed_items()
|
|
elif selected_option == "root":
|
|
self.current_view = "Root"
|
|
if self.selected_drive_id:
|
|
self.load_root_items()
|
|
|
|
async def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
|
|
"""Handle row selection in the items table."""
|
|
selected_id = event.row_key.value
|
|
self.open_item(selected_id)
|
|
|
|
async def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
|
|
self.selected_item_id = event.row_key.value
|
|
|
|
def open_item(self, selected_id: str):
|
|
if selected_id:
|
|
# Get an item from current items by ID string
|
|
selected_row = self.current_items[selected_id]
|
|
|
|
item_name = selected_row.get("name")
|
|
|
|
# Check if it's a folder
|
|
is_folder = bool(selected_row.get("folder"))
|
|
|
|
if is_folder:
|
|
self.notify(f"Selected folder: {item_name}")
|
|
# Load items in the folder
|
|
self.query_one("#status_label").update(f"Loading items in folder: {item_name}")
|
|
self.load_root_items(folder_id=selected_id, drive_id=selected_row.get("parentReference", {}).get("driveId", self.selected_drive_id))
|
|
|
|
else:
|
|
self.notify(f"Selected file: {item_name}")
|
|
self.action_view_document()
|
|
|
|
@work
|
|
async def load_root_items(self, folder_id: str = "", drive_id: str = "", track_history: bool = True):
|
|
"""Load root items from the selected drive."""
|
|
if not self.access_token or not self.selected_drive_id:
|
|
return
|
|
|
|
self.query_one("#status_label").update("Loading drive folder items...")
|
|
headers = {"Authorization": f"Bearer {self.access_token}"}
|
|
url = f"https://graph.microsoft.com/v1.0/me/drive/root/children"
|
|
|
|
if folder_id and drive_id:
|
|
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{folder_id}/children"
|
|
if track_history:
|
|
self.folder_history.append(FolderHistoryEntry(folder_id, self.current_folder_name, drive_id))
|
|
self.selected_drive_id = drive_id
|
|
self.current_folder_id = folder_id
|
|
self.current_folder_name = self.current_items[folder_id].get("name", "Unknown")
|
|
try:
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
if response.status != 200:
|
|
self.notify(f"Failed to load drive items: {response.status}", severity="error")
|
|
return
|
|
|
|
items_data = await response.json()
|
|
|
|
|
|
# Update the table with the root items
|
|
self.update_items_table(items_data.get("value", []))
|
|
except Exception as e:
|
|
self.notify(f"Error loading root items: {str(e)}", severity="error")
|
|
|
|
self.query_one("#status_label").update("Ready")
|
|
|
|
@work
|
|
async def load_followed_items(self):
|
|
"""Load followed items from the selected drive."""
|
|
if not self.access_token or not self.selected_drive_id:
|
|
return
|
|
|
|
self.query_one("#status_label").update("Loading followed items...")
|
|
headers = {"Authorization": f"Bearer {self.access_token}"}
|
|
|
|
try:
|
|
url = f"https://graph.microsoft.com/v1.0/me/drive/following"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
if response.status != 200:
|
|
self.notify(f"Failed to load followed items: {response.status}", severity="error")
|
|
return
|
|
|
|
items_data = await response.json()
|
|
followed_items = items_data.get("value", [])
|
|
|
|
|
|
# Update the table with the followed items
|
|
self.update_items_table(followed_items)
|
|
except Exception as e:
|
|
self.notify(f"Error loading followed items: {str(e)}", severity="error")
|
|
|
|
self.query_one("#status_label").update("Ready")
|
|
|
|
def update_items_table(self, items, is_root_view=False):
|
|
"""Update the table with the given items."""
|
|
table = self.query_one(DataTable)
|
|
table.clear()
|
|
|
|
if not items:
|
|
self.query_one("#no_items_label").remove_class("hide")
|
|
return
|
|
|
|
self.query_one("#no_items_label").add_class("hide")
|
|
|
|
for item in items:
|
|
name = item.get("name", "Unknown")
|
|
is_folder = bool(item.get("folder"))
|
|
|
|
# Add folder icon if it's a folder and we're in root view
|
|
|
|
item_type = "📁" if is_folder else "📄"
|
|
|
|
# Format the last modified date
|
|
last_modified = item.get("lastModifiedDateTime", "")
|
|
if last_modified:
|
|
try:
|
|
date_obj = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
|
|
last_modified = date_obj.strftime("%Y-%m-%d %H:%M")
|
|
except:
|
|
pass
|
|
|
|
# Format the size
|
|
size = item.get("size", 0)
|
|
if size:
|
|
if size < 1024:
|
|
size_str = f"{size} B"
|
|
elif size < 1024 * 1024:
|
|
size_str = f"{size / 1024:.1f} KB"
|
|
elif size < 1024 * 1024 * 1024:
|
|
size_str = f"{size / (1024 * 1024):.1f} MB"
|
|
else:
|
|
size_str = f"{size / (1024 * 1024 * 1024):.1f} GB"
|
|
else:
|
|
size_str = "N/A"
|
|
|
|
web_url = item.get("webUrl", "")
|
|
item_id = item.get("id")
|
|
item_drive_id = item.get("parentReference", {}).get("driveId", self.selected_drive_id)
|
|
row_key = table.add_row(item_type, name, last_modified, size_str, web_url, key=item.get("id"))
|
|
# add item to to the list of current items keyed by row_key so we can look up all information later
|
|
self.current_items[row_key] = item
|
|
|
|
async def action_next_view(self) -> None:
|
|
"""Switch to the next view (Following/Root)."""
|
|
option_list = self.query_one("#view_options")
|
|
if self.current_view == "Following":
|
|
option_list.highlighted = 1 # Switch to Root
|
|
self.current_view = "Root"
|
|
self.load_root_items()
|
|
else:
|
|
option_list.highlighted = 0 # Switch to Following
|
|
self.current_view = "Following"
|
|
self.load_followed_items()
|
|
|
|
self.notify(f"Switched to {self.current_view} view")
|
|
|
|
async def action_refresh(self) -> None:
|
|
"""Refresh the data."""
|
|
if self.is_authenticated and self.selected_drive_id:
|
|
if self.current_view == "Following":
|
|
self.load_followed_items()
|
|
elif self.current_view == "Root":
|
|
self.load_root_items()
|
|
self.notify("Refreshed items")
|
|
|
|
async def action_toggle_follow(self) -> None:
|
|
"""Toggle follow status for selected item."""
|
|
# This would be implemented to follow/unfollow the selected item
|
|
# Currently just a placeholder for the key binding
|
|
self.notify("Toggle follow functionality not implemented yet")
|
|
|
|
async def action_open_url(self) -> None:
|
|
"""Open the web URL of the selected item."""
|
|
table = self.query_one("#items_table")
|
|
if table.cursor_row is not None:
|
|
selected_row = table.get_row_at(table.cursor_row)
|
|
if selected_row and len(selected_row) > 4:
|
|
web_url = selected_row[4]
|
|
if web_url:
|
|
self.notify(f"Opening URL: {web_url}")
|
|
# Use Textual's built-in open_url method
|
|
self.app.open_url(web_url)
|
|
|
|
def action_view_document(self) -> None:
|
|
"""View the selected document using the DocumentViewerScreen."""
|
|
# Get the name of the selected item
|
|
|
|
selected_row = self.current_items.get(self.selected_item_id)
|
|
if not selected_row:
|
|
return
|
|
|
|
selected_name = selected_row.get("name")
|
|
drive_id = selected_row.get("parentReference", {}).get("driveId", self.selected_drive_id)
|
|
web_url = selected_row.get("webUrl", "")
|
|
# Open the document viewer screen with all required details
|
|
viewer = DocumentViewerScreen(self.selected_item_id, selected_name, self.access_token, drive_id)
|
|
viewer.web_url = web_url # Pass the webUrl to the viewer
|
|
self.push_screen(viewer)
|
|
|
|
async def action_quit(self) -> None:
|
|
"""Quit the application."""
|
|
self.exit()
|
|
|
|
async def action_navigate_back(self) -> None:
|
|
"""Navigate back to the previous folder."""
|
|
if self.folder_history:
|
|
previous_entry = self.folder_history.pop()
|
|
self.current_folder_id = previous_entry.folder_id
|
|
self.current_folder_name = previous_entry.folder_name
|
|
self.load_root_items(folder_id=previous_entry.folder_id, drive_id=previous_entry.parent_id, track_history=False)
|
|
else:
|
|
self.notify("No previous folder to navigate back to")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = OneDriveTUI()
|
|
app.run()
|