dashboard sync app

This commit is contained in:
Tim Bendt
2025-12-16 17:13:26 -05:00
parent 73079f743a
commit d7c82a0da0
25 changed files with 4181 additions and 69 deletions

View File

@@ -1,4 +1,10 @@
from . import cli
if __name__ == "__main__":
def main():
"""Main entry point for the CLI."""
cli()
if __name__ == "__main__":
main()

View File

@@ -31,7 +31,7 @@ def start(config, daemon):
cmd.extend(["--config", config])
# Create pid file
pid_file = os.path.expanduser("~/.config/gtd-tools/gitlab_monitor.pid")
pid_file = os.path.expanduser("~/.config/luk/gitlab_monitor.pid")
Path(pid_file).parent.mkdir(parents=True, exist_ok=True)
# Start daemon process
@@ -61,7 +61,7 @@ def start(config, daemon):
@gitlab_monitor.command()
def stop():
"""Stop the GitLab pipeline monitoring daemon."""
pid_file = os.path.expanduser("~/.config/gtd-tools/gitlab_monitor.pid")
pid_file = os.path.expanduser("~/.config/luk/gitlab_monitor.pid")
if not os.path.exists(pid_file):
click.echo("Daemon is not running (no PID file found)")
@@ -88,7 +88,7 @@ def stop():
@gitlab_monitor.command()
def status():
"""Check the status of the GitLab pipeline monitoring daemon."""
pid_file = os.path.expanduser("~/.config/gtd-tools/gitlab_monitor.pid")
pid_file = os.path.expanduser("~/.config/luk/gitlab_monitor.pid")
if not os.path.exists(pid_file):
click.echo("Daemon is not running")

View File

@@ -1,6 +1,8 @@
import click
import asyncio
import os
import sys
import signal
import json
import time
from datetime import datetime, timedelta
@@ -31,7 +33,7 @@ from src.services.godspeed.sync import GodspeedSync
# Timing state management
def get_sync_state_file():
"""Get the path to the sync state file."""
return os.path.expanduser("~/.local/share/gtd-terminal-tools/sync_state.json")
return os.path.expanduser("~/.local/share/luk/sync_state.json")
def load_sync_state():
@@ -97,7 +99,7 @@ def get_godspeed_sync_directory():
return docs_dir
# Fall back to data directory
data_dir = home / ".local" / "share" / "gtd-terminal-tools" / "godspeed"
data_dir = home / ".local" / "share" / "luk" / "godspeed"
return data_dir
@@ -265,11 +267,12 @@ async def fetch_calendar_async(
# Update progress bar with total events
progress.update(task_id, total=total_events)
# Define org_vdir_path up front if vdir_path is specified
org_vdir_path = os.path.join(vdir_path, org_name) if vdir_path else None
# Save events to appropriate format
if not dry_run:
if vdir_path:
# Create org-specific directory within vdir path
org_vdir_path = os.path.join(vdir_path, org_name)
if vdir_path and org_vdir_path:
progress.console.print(
f"[cyan]Saving events to vdir: {org_vdir_path}[/cyan]"
)
@@ -342,7 +345,7 @@ async def fetch_calendar_async(
progress.update(task_id, total=next_total_events)
if not dry_run:
if vdir_path:
if vdir_path and org_vdir_path:
save_events_to_vdir(
next_events, org_vdir_path, progress, task_id, dry_run
)
@@ -494,9 +497,9 @@ async def _sync_outlook_data(
os.getenv("MAILDIR_PATH", os.path.expanduser("~/Mail")) + f"/{org}"
)
messages_before = 0
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
if notify:
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
if os.path.exists(new_dir):
messages_before += len([f for f in os.listdir(new_dir) if ".eml" in f])
if os.path.exists(cur_dir):
@@ -572,7 +575,51 @@ async def _sync_outlook_data(
click.echo("Sync complete.")
@click.command()
@click.group()
def sync():
"""Email and calendar synchronization."""
pass
def daemonize():
"""Properly daemonize the process for Unix systems."""
# First fork
try:
pid = os.fork()
if pid > 0:
# Parent exits
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #1 failed: {e}\n")
sys.exit(1)
# Decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# Second fork
try:
pid = os.fork()
if pid > 0:
# Parent exits
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #2 failed: {e}\n")
sys.exit(1)
# Redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, "r")
so = open(os.devnull, "a+")
se = open(os.devnull, "a+")
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
@sync.command()
@click.option(
"--dry-run",
is_flag=True,
@@ -628,13 +675,19 @@ async def _sync_outlook_data(
help="Run in daemon mode.",
default=False,
)
@click.option(
"--dashboard",
is_flag=True,
help="Run with TUI dashboard.",
default=False,
)
@click.option(
"--notify",
is_flag=True,
help="Send macOS notifications for new email messages",
default=False,
)
def sync(
def run(
dry_run,
vdir,
icsfile,
@@ -645,23 +698,31 @@ def sync(
download_attachments,
two_way_calendar,
daemon,
dashboard,
notify,
):
if daemon:
asyncio.run(
daemon_mode(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
)
if dashboard:
from .sync_dashboard import run_dashboard_sync
asyncio.run(run_dashboard_sync())
elif daemon:
from .sync_daemon import create_daemon_config, SyncDaemon
config = create_daemon_config(
dry_run=dry_run,
vdir=vdir,
icsfile=icsfile,
org=org,
days_back=days_back,
days_forward=days_forward,
continue_iteration=continue_iteration,
download_attachments=download_attachments,
two_way_calendar=two_way_calendar,
notify=notify,
)
daemon_instance = SyncDaemon(config)
daemon_instance.start()
else:
asyncio.run(
_sync_outlook_data(
@@ -679,6 +740,55 @@ def sync(
)
@sync.command()
def stop():
"""Stop the sync daemon."""
pid_file = os.path.expanduser("~/.config/luk/luk.pid")
if not os.path.exists(pid_file):
click.echo("Daemon is not running (no PID file found)")
return
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
# Send SIGTERM to process
os.kill(pid, signal.SIGTERM)
# Remove PID file
os.unlink(pid_file)
click.echo(f"Daemon stopped (PID {pid})")
except (ValueError, ProcessLookupError, OSError) as e:
click.echo(f"Error stopping daemon: {e}")
# Clean up stale PID file
if os.path.exists(pid_file):
os.unlink(pid_file)
@sync.command()
def status():
"""Check the status of the sync daemon."""
pid_file = os.path.expanduser("~/.config/luk/luk.pid")
if not os.path.exists(pid_file):
click.echo("Daemon is not running")
return
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
# Check if process exists
os.kill(pid, 0) # Send signal 0 to check if process exists
click.echo(f"Daemon is running (PID {pid})")
except (ValueError, ProcessLookupError, OSError):
click.echo("Daemon is not running (stale PID file)")
# Clean up stale PID file
os.unlink(pid_file)
def check_calendar_changes(vdir_path, org):
"""
Check if there are local calendar changes that need syncing.

329
src/cli/sync_daemon.py Normal file
View File

@@ -0,0 +1,329 @@
"""Daemon mode with proper Unix logging."""
import os
import sys
import logging
import logging.handlers
import asyncio
import time
import signal
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, Any
from src.cli.sync import _sync_outlook_data, should_run_godspeed_sync, should_run_sweep
from src.cli.sync import run_godspeed_sync, run_task_sweep, load_sync_state
class SyncDaemon:
"""Proper daemon with Unix logging."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.running = False
self.pid_file = Path(
config.get("pid_file", "~/.config/luk/luk.pid")
).expanduser()
self.log_file = Path(
config.get("log_file", "~/.local/share/luk/luk.log")
).expanduser()
self.sync_interval = config.get("sync_interval", 300) # 5 minutes
self.check_interval = config.get("check_interval", 10) # 10 seconds
self.logger = self._setup_logging()
def _setup_logging(self) -> logging.Logger:
"""Setup proper Unix logging."""
logger = logging.getLogger("sync_daemon")
logger.setLevel(logging.INFO)
# Ensure log directory exists
self.log_file.parent.mkdir(parents=True, exist_ok=True)
# Rotating file handler (10MB max, keep 5 backups)
handler = logging.handlers.RotatingFileHandler(
self.log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding="utf-8",
)
# Log format
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def daemonize(self) -> None:
"""Properly daemonize the process for Unix systems."""
# First fork
try:
pid = os.fork()
if pid > 0:
# Parent exits
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #1 failed: {e}\n")
sys.exit(1)
# Decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# Second fork
try:
pid = os.fork()
if pid > 0:
# Parent exits
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #2 failed: {e}\n")
sys.exit(1)
# Redirect standard file descriptors to /dev/null
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, "r")
so = open(os.devnull, "a+")
se = open(os.devnull, "a+")
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# Write PID file
self.pid_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.pid_file, "w") as f:
f.write(str(os.getpid()))
def start(self) -> None:
"""Start the daemon."""
# Check if already running
if self.is_running():
print(f"Daemon is already running (PID {self.get_pid()})")
return
print("Starting sync daemon...")
self.daemonize()
# Setup signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
self.logger.info("Sync daemon started")
self.running = True
# Run the daemon loop
asyncio.run(self._daemon_loop())
def stop(self) -> None:
"""Stop the daemon."""
if not self.is_running():
print("Daemon is not running")
return
try:
pid = self.get_pid()
os.kill(pid, signal.SIGTERM)
# Wait for process to exit
for _ in range(10):
try:
os.kill(pid, 0) # Check if process exists
time.sleep(0.5)
except ProcessLookupError:
break
else:
# Force kill if still running
os.kill(pid, signal.SIGKILL)
# Remove PID file
if self.pid_file.exists():
self.pid_file.unlink()
print(f"Daemon stopped (PID {pid})")
self.logger.info("Sync daemon stopped")
except Exception as e:
print(f"Error stopping daemon: {e}")
def status(self) -> None:
"""Check daemon status."""
if not self.is_running():
print("Daemon is not running")
return
pid = self.get_pid()
print(f"Daemon is running (PID {pid})")
# Show recent log entries
try:
with open(self.log_file, "r") as f:
lines = f.readlines()
if lines:
print("\nRecent log entries:")
for line in lines[-5:]:
print(f" {line.strip()}")
except Exception:
pass
def is_running(self) -> bool:
"""Check if daemon is running."""
if not self.pid_file.exists():
return False
try:
pid = self.get_pid()
os.kill(pid, 0) # Check if process exists
return True
except (ValueError, ProcessLookupError, OSError):
# Stale PID file, remove it
if self.pid_file.exists():
self.pid_file.unlink()
return False
def get_pid(self) -> int:
"""Get PID from file."""
with open(self.pid_file, "r") as f:
return int(f.read().strip())
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
self.logger.info(f"Received signal {signum}, shutting down...")
self.running = False
# Remove PID file
if self.pid_file.exists():
self.pid_file.unlink()
sys.exit(0)
async def _daemon_loop(self) -> None:
"""Main daemon loop."""
last_sync_time = time.time() - self.sync_interval # Force initial sync
while self.running:
try:
current_time = time.time()
if current_time - last_sync_time >= self.sync_interval:
self.logger.info("Performing scheduled sync...")
await self._perform_sync()
last_sync_time = current_time
self.logger.info("Scheduled sync completed")
else:
# Check for changes
changes_detected = await self._check_for_changes()
if changes_detected:
self.logger.info("Changes detected, triggering sync...")
await self._perform_sync()
last_sync_time = current_time
else:
self.logger.debug("No changes detected")
await asyncio.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"Error in daemon loop: {e}")
await asyncio.sleep(30) # Wait before retrying
async def _perform_sync(self) -> None:
"""Perform a full sync."""
try:
await _sync_outlook_data(
dry_run=self.config.get("dry_run", False),
vdir=self.config.get("vdir", "~/Calendar"),
icsfile=self.config.get("icsfile"),
org=self.config.get("org", "corteva"),
days_back=self.config.get("days_back", 1),
days_forward=self.config.get("days_forward", 30),
continue_iteration=self.config.get("continue_iteration", False),
download_attachments=self.config.get("download_attachments", False),
two_way_calendar=self.config.get("two_way_calendar", False),
notify=self.config.get("notify", False),
)
self.logger.info("Sync completed successfully")
except Exception as e:
self.logger.error(f"Sync failed: {e}")
async def _check_for_changes(self) -> bool:
"""Check if there are changes that require syncing."""
try:
# Check Godspeed operations
godspeed_sync_due = should_run_godspeed_sync()
sweep_due = should_run_sweep()
if godspeed_sync_due or sweep_due:
self.logger.info("Godspeed operations due")
return True
# Add other change detection logic here
# For now, just return False
return False
except Exception as e:
self.logger.error(f"Error checking for changes: {e}")
return False
def create_daemon_config(**kwargs) -> Dict[str, Any]:
"""Create daemon configuration from command line args."""
return {
"dry_run": kwargs.get("dry_run", False),
"vdir": kwargs.get("vdir", "~/Calendar"),
"icsfile": kwargs.get("icsfile"),
"org": kwargs.get("org", "corteva"),
"days_back": kwargs.get("days_back", 1),
"days_forward": kwargs.get("days_forward", 30),
"continue_iteration": kwargs.get("continue_iteration", False),
"download_attachments": kwargs.get("download_attachments", False),
"two_way_calendar": kwargs.get("two_way_calendar", False),
"notify": kwargs.get("notify", False),
"pid_file": kwargs.get("pid_file", "~/.config/luk/luk.pid"),
"log_file": kwargs.get("log_file", "~/.local/share/luk/luk.log"),
"sync_interval": kwargs.get("sync_interval", 300),
"check_interval": kwargs.get("check_interval", 10),
}
def main():
"""Main daemon entry point."""
import argparse
parser = argparse.ArgumentParser(description="Sync daemon management")
parser.add_argument(
"action", choices=["start", "stop", "status", "logs"], help="Action to perform"
)
parser.add_argument("--dry-run", action="store_true", help="Dry run mode")
parser.add_argument("--org", default="corteva", help="Organization name")
parser.add_argument("--vdir", default="~/Calendar", help="Calendar directory")
parser.add_argument("--notify", action="store_true", help="Enable notifications")
args = parser.parse_args()
config = create_daemon_config(
dry_run=args.dry_run, org=args.org, vdir=args.vdir, notify=args.notify
)
daemon = SyncDaemon(config)
if args.action == "start":
daemon.start()
elif args.action == "stop":
daemon.stop()
elif args.action == "status":
daemon.status()
elif args.action == "logs":
try:
with open(daemon.log_file, "r") as f:
print(f.read())
except Exception as e:
print(f"Error reading logs: {e}")
if __name__ == "__main__":
main()

680
src/cli/sync_dashboard.py Normal file
View File

@@ -0,0 +1,680 @@
"""TUI dashboard for sync progress with scrollable logs."""
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import (
Header,
Footer,
Static,
ProgressBar,
Log,
ListView,
ListItem,
Label,
)
from textual.reactive import reactive
from textual.binding import Binding
from rich.text import Text
from datetime import datetime, timedelta
import asyncio
from typing import Dict, Any, Optional, List, Callable
# Default sync interval in seconds (5 minutes)
DEFAULT_SYNC_INTERVAL = 300
# Futuristic spinner frames
# SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
# Alternative spinners you could use:
# SPINNER_FRAMES = ["◢", "◣", "◤", "◥"] # Rotating triangle
SPINNER_FRAMES = ["▰▱▱▱▱", "▰▰▱▱▱", "▰▰▰▱▱", "▰▰▰▰▱", "▰▰▰▰▰", "▱▰▰▰▰", "▱▱▰▰▰", "▱▱▱▰▰", "▱▱▱▱▰"] # Loading bar
# SPINNER_FRAMES = ["⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"] # Braille dots
# SPINNER_FRAMES = ["◐", "◓", "◑", "◒"] # Circle quarters
# SPINNER_FRAMES = ["⠁", "⠂", "⠄", "⡀", "⢀", "⠠", "⠐", "⠈"] # Braille orbit
class TaskStatus:
"""Status constants for tasks."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
ERROR = "error"
class TaskListItem(ListItem):
"""A list item representing a sync task."""
def __init__(self, task_id: str, task_name: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task_id = task_id
self.task_name = task_name
self.status = TaskStatus.PENDING
self.progress = 0
self.total = 100
self.spinner_frame = 0
def compose(self) -> ComposeResult:
"""Compose the task item layout."""
yield Static(self._build_content_text(), id=f"task-content-{self.task_id}")
def _get_status_icon(self) -> str:
"""Get icon based on status."""
if self.status == TaskStatus.RUNNING:
return SPINNER_FRAMES[self.spinner_frame % len(SPINNER_FRAMES)]
icons = {
TaskStatus.PENDING: "",
TaskStatus.COMPLETED: "",
TaskStatus.ERROR: "",
}
return icons.get(self.status, "")
def advance_spinner(self) -> None:
"""Advance the spinner to the next frame."""
self.spinner_frame = (self.spinner_frame + 1) % len(SPINNER_FRAMES)
def _get_status_color(self) -> str:
"""Get color based on status."""
colors = {
TaskStatus.PENDING: "dim",
TaskStatus.RUNNING: "cyan",
TaskStatus.COMPLETED: "bright_white",
TaskStatus.ERROR: "red",
}
return colors.get(self.status, "white")
def _build_content_text(self) -> Text:
"""Build the task content text."""
icon = self._get_status_icon()
color = self._get_status_color()
# Use green checkmark for completed, but white text for readability
if self.status == TaskStatus.RUNNING:
progress_pct = (
int((self.progress / self.total) * 100) if self.total > 0 else 0
)
text = Text()
text.append(f"{icon} ", style="cyan")
text.append(f"{self.task_name} [{progress_pct}%]", style=color)
return text
elif self.status == TaskStatus.COMPLETED:
text = Text()
text.append(f"{icon} ", style="green") # Green checkmark
text.append(f"{self.task_name} [Done]", style=color)
return text
elif self.status == TaskStatus.ERROR:
text = Text()
text.append(f"{icon} ", style="red")
text.append(f"{self.task_name} [Error]", style=color)
return text
else:
return Text(f"{icon} {self.task_name}", style=color)
def update_display(self) -> None:
"""Update the display of this item."""
try:
content = self.query_one(f"#task-content-{self.task_id}", Static)
content.update(self._build_content_text())
except Exception:
pass
class SyncDashboard(App):
"""TUI dashboard for sync operations."""
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("ctrl+c", "quit", "Quit"),
Binding("s", "sync_now", "Sync Now"),
Binding("r", "refresh", "Refresh"),
Binding("+", "increase_interval", "+Interval"),
Binding("-", "decrease_interval", "-Interval"),
Binding("up", "cursor_up", "Up", show=False),
Binding("down", "cursor_down", "Down", show=False),
]
CSS = """
.dashboard {
height: 100%;
layout: horizontal;
}
.sidebar {
width: 30;
height: 100%;
border: solid $primary;
padding: 0;
}
.sidebar-title {
text-style: bold;
padding: 1;
background: $primary-darken-2;
}
.countdown-container {
height: 3;
padding: 0 1;
border-top: solid $primary;
background: $surface;
}
.countdown-text {
text-align: center;
}
.main-panel {
width: 1fr;
height: 100%;
padding: 0;
}
.task-header {
height: 5;
padding: 1;
border-bottom: solid $primary;
}
.task-name {
text-style: bold;
}
.progress-row {
height: 3;
padding: 0 1;
}
.log-container {
height: 1fr;
border: solid $primary;
padding: 0;
}
.log-title {
padding: 0 1;
background: $primary-darken-2;
}
ListView {
height: 1fr;
}
ListItem {
padding: 0 1;
}
ListItem:hover {
background: $primary-darken-1;
}
Log {
height: 1fr;
border: none;
}
ProgressBar {
width: 1fr;
padding: 0 1;
}
"""
selected_task: reactive[str] = reactive("archive")
sync_interval: reactive[int] = reactive(DEFAULT_SYNC_INTERVAL)
next_sync_time: reactive[float] = reactive(0.0)
def __init__(self, sync_interval: int = DEFAULT_SYNC_INTERVAL):
super().__init__()
self._mounted: asyncio.Event = asyncio.Event()
self._task_logs: Dict[str, List[str]] = {}
self._task_items: Dict[str, TaskListItem] = {}
self._sync_callback: Optional[Callable] = None
self._countdown_task: Optional[asyncio.Task] = None
self._spinner_task: Optional[asyncio.Task] = None
self._initial_sync_interval = sync_interval
def compose(self) -> ComposeResult:
"""Compose the dashboard layout."""
yield Header()
with Horizontal(classes="dashboard"):
# Sidebar with task list
with Vertical(classes="sidebar"):
yield Static("Tasks", classes="sidebar-title")
yield ListView(
# Stage 1: Sync local changes to server
TaskListItem("archive", "Archive Mail", id="task-archive"),
TaskListItem("outbox", "Outbox Send", id="task-outbox"),
# Stage 2: Fetch from server
TaskListItem("inbox", "Inbox Sync", id="task-inbox"),
TaskListItem("calendar", "Calendar Sync", id="task-calendar"),
# Stage 3: Task management
TaskListItem("godspeed", "Godspeed Sync", id="task-godspeed"),
TaskListItem("sweep", "Task Sweep", id="task-sweep"),
id="task-list",
)
# Countdown timer at bottom of sidebar
with Vertical(classes="countdown-container"):
yield Static(
"Next sync: --:--", id="countdown", classes="countdown-text"
)
# Main panel with selected task details
with Vertical(classes="main-panel"):
# Task header with name and progress
with Vertical(classes="task-header"):
yield Static(
"Archive Mail", id="selected-task-name", classes="task-name"
)
with Horizontal(classes="progress-row"):
yield Static("Progress:", id="progress-label")
yield ProgressBar(total=100, id="task-progress")
yield Static("0%", id="progress-percent")
# Log for selected task
with Vertical(classes="log-container"):
yield Static("Activity Log", classes="log-title")
yield Log(id="task-log")
yield Footer()
def on_mount(self) -> None:
"""Initialize the dashboard."""
# Store references to task items
task_list = self.query_one("#task-list", ListView)
for item in task_list.children:
if isinstance(item, TaskListItem):
self._task_items[item.task_id] = item
self._task_logs[item.task_id] = []
# Initialize sync interval
self.sync_interval = self._initial_sync_interval
self.schedule_next_sync()
# Start countdown timer and spinner animation
self._countdown_task = asyncio.create_task(self._update_countdown())
self._spinner_task = asyncio.create_task(self._animate_spinners())
self._log_to_task("archive", "Dashboard initialized. Waiting to start sync...")
self._mounted.set()
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Handle task selection from the list."""
if isinstance(event.item, TaskListItem):
self.selected_task = event.item.task_id
self._update_main_panel()
def on_list_view_highlighted(self, event: ListView.Highlighted) -> None:
"""Handle task highlight from the list."""
if isinstance(event.item, TaskListItem):
self.selected_task = event.item.task_id
self._update_main_panel()
def _update_main_panel(self) -> None:
"""Update the main panel to show selected task details."""
task_item = self._task_items.get(self.selected_task)
if not task_item:
return
# Update task name
try:
name_widget = self.query_one("#selected-task-name", Static)
name_widget.update(Text(task_item.task_name, style="bold"))
except Exception:
pass
# Update progress bar
try:
progress_bar = self.query_one("#task-progress", ProgressBar)
progress_bar.total = task_item.total
progress_bar.progress = task_item.progress
percent_widget = self.query_one("#progress-percent", Static)
pct = (
int((task_item.progress / task_item.total) * 100)
if task_item.total > 0
else 0
)
percent_widget.update(f"{pct}%")
except Exception:
pass
# Update log with task-specific logs
try:
log_widget = self.query_one("#task-log", Log)
log_widget.clear()
for entry in self._task_logs.get(self.selected_task, []):
log_widget.write_line(entry)
except Exception:
pass
def _log_to_task(self, task_id: str, message: str, level: str = "INFO") -> None:
"""Add a log entry to a specific task."""
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = f"[{timestamp}] {level}: {message}"
if task_id not in self._task_logs:
self._task_logs[task_id] = []
self._task_logs[task_id].append(formatted)
# If this is the selected task, also write to the visible log
if task_id == self.selected_task:
try:
log_widget = self.query_one("#task-log", Log)
log_widget.write_line(formatted)
except Exception:
pass
def start_task(self, task_id: str, total: int = 100) -> None:
"""Start a task."""
if task_id in self._task_items:
item = self._task_items[task_id]
item.status = TaskStatus.RUNNING
item.progress = 0
item.total = total
item.update_display()
self._log_to_task(task_id, f"Starting {item.task_name}...")
if task_id == self.selected_task:
self._update_main_panel()
def update_task(self, task_id: str, progress: int, message: str = "") -> None:
"""Update task progress."""
if task_id in self._task_items:
item = self._task_items[task_id]
item.progress = progress
item.update_display()
if message:
self._log_to_task(task_id, message)
if task_id == self.selected_task:
self._update_main_panel()
def complete_task(self, task_id: str, message: str = "") -> None:
"""Mark a task as complete."""
if task_id in self._task_items:
item = self._task_items[task_id]
item.status = TaskStatus.COMPLETED
item.progress = item.total
item.update_display()
self._log_to_task(
task_id,
f"Completed: {message}" if message else "Completed successfully",
)
if task_id == self.selected_task:
self._update_main_panel()
def error_task(self, task_id: str, error: str) -> None:
"""Mark a task as errored."""
if task_id in self._task_items:
item = self._task_items[task_id]
item.status = TaskStatus.ERROR
item.update_display()
self._log_to_task(task_id, f"ERROR: {error}", "ERROR")
if task_id == self.selected_task:
self._update_main_panel()
def skip_task(self, task_id: str, reason: str = "") -> None:
"""Mark a task as skipped (completed with no work)."""
if task_id in self._task_items:
item = self._task_items[task_id]
item.status = TaskStatus.COMPLETED
item.update_display()
self._log_to_task(task_id, f"Skipped: {reason}" if reason else "Skipped")
if task_id == self.selected_task:
self._update_main_panel()
def action_refresh(self) -> None:
"""Refresh the dashboard."""
self._update_main_panel()
def action_cursor_up(self) -> None:
"""Move cursor up in task list."""
task_list = self.query_one("#task-list", ListView)
task_list.action_cursor_up()
def action_cursor_down(self) -> None:
"""Move cursor down in task list."""
task_list = self.query_one("#task-list", ListView)
task_list.action_cursor_down()
def action_sync_now(self) -> None:
"""Trigger an immediate sync."""
if self._sync_callback:
asyncio.create_task(self._run_sync_callback())
else:
self._log_to_task("archive", "No sync callback configured")
async def _run_sync_callback(self) -> None:
"""Run the sync callback if set."""
if self._sync_callback:
if asyncio.iscoroutinefunction(self._sync_callback):
await self._sync_callback()
else:
self._sync_callback()
def action_increase_interval(self) -> None:
"""Increase sync interval by 1 minute."""
self.sync_interval = min(self.sync_interval + 60, 3600) # Max 1 hour
self._update_countdown_display()
self._log_to_task(
self.selected_task,
f"Sync interval: {self.sync_interval // 60} min",
)
def action_decrease_interval(self) -> None:
"""Decrease sync interval by 1 minute."""
self.sync_interval = max(self.sync_interval - 60, 60) # Min 1 minute
self._update_countdown_display()
self._log_to_task(
self.selected_task,
f"Sync interval: {self.sync_interval // 60} min",
)
def set_sync_callback(self, callback: Callable) -> None:
"""Set the callback to run when sync is triggered."""
self._sync_callback = callback
def schedule_next_sync(self) -> None:
"""Schedule the next sync time."""
import time
self.next_sync_time = time.time() + self.sync_interval
def reset_all_tasks(self) -> None:
"""Reset all tasks to pending state."""
for task_id, item in self._task_items.items():
item.status = TaskStatus.PENDING
item.progress = 0
item.update_display()
self._update_main_panel()
async def _update_countdown(self) -> None:
"""Update the countdown timer every second."""
import time
while True:
try:
self._update_countdown_display()
await asyncio.sleep(1)
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(1)
def _update_countdown_display(self) -> None:
"""Update the countdown display widget."""
import time
try:
countdown_widget = self.query_one("#countdown", Static)
remaining = max(0, self.next_sync_time - time.time())
if remaining <= 0:
countdown_widget.update(f"Syncing... ({self.sync_interval // 60}m)")
else:
minutes = int(remaining // 60)
seconds = int(remaining % 60)
countdown_widget.update(
f"Next: {minutes:02d}:{seconds:02d} ({self.sync_interval // 60}m)"
)
except Exception:
pass
async def _animate_spinners(self) -> None:
"""Animate spinners for running tasks."""
while True:
try:
# Update all running task spinners
for task_id, item in self._task_items.items():
if item.status == TaskStatus.RUNNING:
item.advance_spinner()
item.update_display()
await asyncio.sleep(0.08) # ~12 FPS for smooth animation
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(0.08)
class SyncProgressTracker:
"""Track sync progress and update the dashboard."""
def __init__(self, dashboard: SyncDashboard):
self.dashboard = dashboard
def start_task(self, task_id: str, total: int = 100) -> None:
"""Start tracking a task."""
self.dashboard.start_task(task_id, total)
def update_task(self, task_id: str, progress: int, message: str = "") -> None:
"""Update task progress."""
self.dashboard.update_task(task_id, progress, message)
def complete_task(self, task_id: str, message: str = "") -> None:
"""Mark a task as complete."""
self.dashboard.complete_task(task_id, message)
def error_task(self, task_id: str, error: str) -> None:
"""Mark a task as failed."""
self.dashboard.error_task(task_id, error)
def skip_task(self, task_id: str, reason: str = "") -> None:
"""Mark a task as skipped."""
self.dashboard.skip_task(task_id, reason)
# Global dashboard instance
_dashboard_instance: Optional[SyncDashboard] = None
_progress_tracker: Optional[SyncProgressTracker] = None
def get_dashboard() -> Optional[SyncDashboard]:
"""Get the global dashboard instance."""
global _dashboard_instance
return _dashboard_instance
def get_progress_tracker() -> Optional[SyncProgressTracker]:
"""Get the global progress_tracker"""
global _progress_tracker
return _progress_tracker
async def run_dashboard_sync():
"""Run sync with dashboard UI."""
global _dashboard_instance, _progress_tracker
dashboard = SyncDashboard()
tracker = SyncProgressTracker(dashboard)
_dashboard_instance = dashboard
_progress_tracker = tracker
async def do_sync():
"""Run the actual sync process."""
try:
# Reset all tasks before starting
dashboard.reset_all_tasks()
# Simulate sync progress for demo (replace with actual sync calls)
# Stage 1: Sync local changes to server
# Archive mail
tracker.start_task("archive", 100)
tracker.update_task("archive", 50, "Scanning for archived messages...")
await asyncio.sleep(0.3)
tracker.update_task("archive", 100, "Moving 3 messages to archive...")
await asyncio.sleep(0.2)
tracker.complete_task("archive", "3 messages archived")
# Outbox
tracker.start_task("outbox", 100)
tracker.update_task("outbox", 50, "Checking outbox...")
await asyncio.sleep(0.2)
tracker.complete_task("outbox", "No pending emails")
# Stage 2: Fetch from server
# Inbox sync
tracker.start_task("inbox", 100)
for i in range(0, 101, 20):
tracker.update_task("inbox", i, f"Fetching emails... {i}%")
await asyncio.sleep(0.3)
tracker.complete_task("inbox", "150 emails processed")
# Calendar sync
tracker.start_task("calendar", 100)
for i in range(0, 101, 25):
tracker.update_task("calendar", i, f"Syncing events... {i}%")
await asyncio.sleep(0.3)
tracker.complete_task("calendar", "25 events synced")
# Stage 3: Task management
# Godspeed sync
tracker.start_task("godspeed", 100)
for i in range(0, 101, 33):
tracker.update_task(
"godspeed", min(i, 100), f"Syncing tasks... {min(i, 100)}%"
)
await asyncio.sleep(0.3)
tracker.complete_task("godspeed", "42 tasks synced")
# Task sweep
tracker.start_task("sweep")
tracker.update_task("sweep", 50, "Scanning notes directory...")
await asyncio.sleep(0.2)
tracker.skip_task("sweep", "Before 6 PM, skipping daily sweep")
# Schedule next sync
dashboard.schedule_next_sync()
except Exception as e:
tracker.error_task("archive", str(e))
# Set the sync callback so 's' key triggers it
dashboard.set_sync_callback(do_sync)
async def sync_loop():
"""Run sync on interval."""
import time
# Wait for the dashboard to be mounted before updating widgets
await dashboard._mounted.wait()
# Run initial sync
await do_sync()
# Then loop waiting for next sync time
while True:
try:
remaining = dashboard.next_sync_time - time.time()
if remaining <= 0:
await do_sync()
else:
await asyncio.sleep(1)
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(1)
# Run dashboard and sync loop concurrently
await asyncio.gather(dashboard.run_async(), sync_loop())

View File

@@ -9,7 +9,7 @@ class GitLabMonitorConfig:
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path or os.path.expanduser(
"~/.config/gtd-tools/gitlab_monitor.yaml"
"~/.config/luk/gitlab_monitor.yaml"
)
self.config = self._load_config()
@@ -56,9 +56,7 @@ class GitLabMonitorConfig:
},
"logging": {
"level": "INFO",
"log_file": os.path.expanduser(
"~/.config/gtd-tools/gitlab_monitor.log"
),
"log_file": os.path.expanduser("~/.local/share/luk/gitlab_monitor.log"),
},
}

View File

@@ -15,7 +15,12 @@ class GodspeedClient:
BASE_URL = "https://api.godspeedapp.com"
def __init__(self, email: str = None, password: str = None, token: str = None):
def __init__(
self,
email: Optional[str] = None,
password: Optional[str] = None,
token: Optional[str] = None,
):
self.email = email
self.password = password
self.token = token
@@ -60,7 +65,9 @@ class GodspeedClient:
response.raise_for_status()
return response.json()
def get_tasks(self, list_id: str = None, status: str = None) -> Dict[str, Any]:
def get_tasks(
self, list_id: Optional[str] = None, status: Optional[str] = None
) -> Dict[str, Any]:
"""Get tasks with optional filtering."""
params = {}
if list_id:
@@ -81,8 +88,8 @@ class GodspeedClient:
def create_task(
self,
title: str,
list_id: str = None,
notes: str = None,
list_id: Optional[str] = None,
notes: Optional[str] = None,
location: str = "end",
**kwargs,
) -> Dict[str, Any]:

View File

@@ -63,9 +63,22 @@ def get_access_token(scopes):
)
accounts = app.get_accounts()
token_response = None
# Try silent authentication first
if accounts:
token_response = app.acquire_token_silent(scopes, account=accounts[0])
else:
# If silent auth failed or no accounts, clear cache and do device flow
if not token_response or "access_token" not in token_response:
# Clear the cache to force fresh authentication
if os.path.exists(cache_file):
os.remove(cache_file)
cache = msal.SerializableTokenCache() # Create new empty cache
app = msal.PublicClientApplication(
client_id, authority=authority, token_cache=cache
)
flow = app.initiate_device_flow(scopes=scopes)
if "user_code" not in flow:
raise Exception("Failed to create device flow")

View File

@@ -18,16 +18,50 @@ semaphore = asyncio.Semaphore(2)
async def _handle_throttling_retry(func, *args, max_retries=3):
"""Handle 429 throttling with exponential backoff retry."""
"""Handle 429 throttling and 401 authentication errors with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await func(*args)
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
error_str = str(e)
if (
"429" in error_str
or "InvalidAuthenticationToken" in error_str
or "401" in error_str
) and attempt < max_retries - 1:
wait_time = (2**attempt) + 1 # Exponential backoff: 2, 5, 9 seconds
print(
f"Rate limited, waiting {wait_time}s before retry {attempt + 1}/{max_retries}"
)
if "429" in error_str:
print(
f"Rate limited, waiting {wait_time}s before retry {attempt + 1}/{max_retries}"
)
elif "InvalidAuthenticationToken" in error_str or "401" in error_str:
print(
f"Authentication failed (token expired), refreshing token and retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})"
)
# Force re-authentication by clearing cache and getting new token
import os
cache_file = "token_cache.bin"
if os.path.exists(cache_file):
os.remove(cache_file)
# Re-import and call get_access_token to refresh
from src.services.microsoft_graph.auth import get_access_token
# We need to get the scopes from somewhere - for now assume standard scopes
scopes = [
"https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite",
]
try:
new_token, new_headers = get_access_token(scopes)
# Update the headers in args - this is a bit hacky but should work
if len(args) > 1 and isinstance(args[1], dict):
args = list(args)
args[1] = new_headers
args = tuple(args)
except Exception as auth_error:
print(f"Failed to refresh token: {auth_error}")
raise e # Re-raise original error
await asyncio.sleep(wait_time)
continue
raise e
@@ -55,10 +89,11 @@ async def _fetch_impl(url, headers):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 429:
# Let the retry handler deal with throttling
if response.status in [401, 429]:
# Let the retry handler deal with authentication and throttling
response_text = await response.text()
raise Exception(
f"Failed to fetch {url}: {response.status} {await response.text()}"
f"Failed to fetch {url}: {response.status} {response_text}"
)
elif response.status != 200:
raise Exception(
@@ -92,9 +127,10 @@ async def _post_impl(url, headers, json_data):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json_data) as response:
if response.status == 429:
if response.status in [401, 429]:
response_text = await response.text()
raise Exception(
f"Failed to post {url}: {response.status} {await response.text()}"
f"Failed to post {url}: {response.status} {response_text}"
)
return response.status
@@ -119,9 +155,10 @@ async def _patch_impl(url, headers, json_data):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=json_data) as response:
if response.status == 429:
if response.status in [401, 429]:
response_text = await response.text()
raise Exception(
f"Failed to patch {url}: {response.status} {await response.text()}"
f"Failed to patch {url}: {response.status} {response_text}"
)
return response.status
@@ -145,9 +182,10 @@ async def _delete_impl(url, headers):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
if response.status == 429:
if response.status in [401, 429]:
response_text = await response.text()
raise Exception(
f"Failed to delete {url}: {response.status} {await response.text()}"
f"Failed to delete {url}: {response.status} {response_text}"
)
return response.status
@@ -176,9 +214,10 @@ async def _batch_impl(requests, headers):
async with session.post(
batch_url, headers=headers, json=batch_data
) as response:
if response.status == 429:
if response.status in [401, 429]:
response_text = await response.text()
raise Exception(
f"Batch request failed: {response.status} {await response.text()}"
f"Batch request failed: {response.status} {response_text}"
)
elif response.status != 200:
raise Exception(

352
src/utils/platform.py Normal file
View File

@@ -0,0 +1,352 @@
"""Cross-platform compatibility utilities."""
import os
import sys
import platform
import subprocess
from pathlib import Path
from typing import Optional, Dict, Any
def get_platform_info() -> Dict[str, str]:
"""Get platform information for compatibility checks."""
return {
"system": platform.system(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
"python_version": platform.python_version(),
"python_implementation": platform.python_implementation(),
}
def is_supported_platform() -> bool:
"""Check if the current platform is supported."""
system = platform.system()
python_version = tuple(map(int, platform.python_version().split(".")))
# Check Python version
if python_version < (3, 12):
return False
# Check operating system
supported_systems = ["Darwin", "Linux", "Windows"]
return system in supported_systems
def get_default_config_dir() -> Path:
"""Get platform-specific config directory."""
system = platform.system()
if system == "Darwin": # macOS
return Path.home() / "Library" / "Application Support" / "luk"
elif system == "Linux":
config_dir = os.environ.get("XDG_CONFIG_HOME")
if config_dir:
return Path(config_dir) / "luk"
return Path.home() / ".config" / "luk"
elif system == "Windows":
return Path(os.environ.get("APPDATA", "")) / "luk"
else:
# Fallback to ~/.config
return Path.home() / ".config" / "luk"
def get_default_data_dir() -> Path:
"""Get platform-specific data directory."""
system = platform.system()
if system == "Darwin": # macOS
return Path.home() / "Library" / "Application Support" / "luk"
elif system == "Linux":
data_dir = os.environ.get("XDG_DATA_HOME")
if data_dir:
return Path(data_dir) / "luk"
return Path.home() / ".local" / "share" / "luk"
elif system == "Windows":
return Path(os.environ.get("LOCALAPPDATA", "")) / "luk"
else:
# Fallback to ~/.local/share
return Path.home() / ".local" / "share" / "luk"
def get_default_log_dir() -> Path:
"""Get platform-specific log directory."""
system = platform.system()
if system == "Darwin": # macOS
return Path.home() / "Library" / "Logs" / "luk"
elif system == "Linux":
data_dir = os.environ.get("XDG_DATA_HOME")
if data_dir:
return Path(data_dir) / "luk" / "logs"
return Path.home() / ".local" / "share" / "luk" / "logs"
elif system == "Windows":
return Path(os.environ.get("LOCALAPPDATA", "")) / "luk" / "logs"
else:
# Fallback to ~/.local/share/logs
return Path.home() / ".local" / "share" / "luk" / "logs"
def get_default_maildir_path() -> Path:
"""Get platform-specific default Maildir path."""
system = platform.system()
if system == "Darwin": # macOS
return Path.home() / "Library" / "Mail"
elif system == "Linux":
return Path.home() / "Mail"
elif system == "Windows":
return Path.home() / "Mail"
else:
return Path.home() / "Mail"
def check_dependencies() -> Dict[str, bool]:
"""Check if required system dependencies are available."""
dependencies = {
"python": True, # We're running Python
"pip": False,
"git": False,
"curl": False,
"wget": False,
}
# Check for pip
try:
subprocess.run(["pip", "--version"], capture_output=True, check=True)
dependencies["pip"] = True
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Check for git
try:
subprocess.run(["git", "--version"], capture_output=True, check=True)
dependencies["git"] = True
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Check for curl
try:
subprocess.run(["curl", "--version"], capture_output=True, check=True)
dependencies["curl"] = True
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Check for wget
try:
subprocess.run(["wget", "--version"], capture_output=True, check=True)
dependencies["wget"] = True
except (subprocess.CalledProcessError, FileNotFoundError):
pass
return dependencies
def get_shell_info() -> Dict[str, str]:
"""Get shell information for completion setup."""
shell = os.environ.get("SHELL", "")
shell_name = Path(shell).name if shell else "unknown"
return {
"shell_path": shell,
"shell_name": shell_name,
"config_file": get_shell_config_file(shell_name),
}
def get_shell_config_file(shell_name: str) -> str:
"""Get the config file for a given shell."""
shell_configs = {
"bash": "~/.bashrc",
"zsh": "~/.zshrc",
"fish": "~/.config/fish/config.fish",
"ksh": "~/.kshrc",
"csh": "~/.cshrc",
"tcsh": "~/.tcshrc",
}
return shell_configs.get(shell_name, "~/.profile")
def setup_platform_specific() -> None:
"""Setup platform-specific configurations."""
system = platform.system()
if system == "Darwin":
setup_macos()
elif system == "Linux":
setup_linux()
elif system == "Windows":
setup_windows()
def setup_macos() -> None:
"""Setup macOS-specific configurations."""
# Ensure macOS-specific directories exist
config_dir = get_default_config_dir()
data_dir = get_default_data_dir()
log_dir = get_default_log_dir()
for directory in [config_dir, data_dir, log_dir]:
directory.mkdir(parents=True, exist_ok=True)
def setup_linux() -> None:
"""Setup Linux-specific configurations."""
# Ensure XDG directories exist
config_dir = get_default_config_dir()
data_dir = get_default_data_dir()
log_dir = get_default_log_dir()
for directory in [config_dir, data_dir, log_dir]:
directory.mkdir(parents=True, exist_ok=True)
def setup_windows() -> None:
"""Setup Windows-specific configurations."""
# Ensure Windows-specific directories exist
config_dir = get_default_config_dir()
data_dir = get_default_data_dir()
log_dir = get_default_log_dir()
for directory in [config_dir, data_dir, log_dir]:
directory.mkdir(parents=True, exist_ok=True)
def get_platform_specific_commands() -> Dict[str, str]:
"""Get platform-specific command equivalents."""
system = platform.system()
if system == "Darwin" or system == "Linux":
return {
"open": "open" if system == "Darwin" else "xdg-open",
"copy": "pbcopy" if system == "Darwin" else "xclip -selection clipboard",
"notify": "osascript -e 'display notification \"%s\"'"
if system == "Darwin"
else 'notify-send "%s"',
}
elif system == "Windows":
return {
"open": "start",
"copy": "clip",
"notify": "powershell -Command \"Add-Type -AssemblyName System.Windows.Forms; [System.Windows.Forms.MessageBox]::Show('%s')\"",
}
else:
return {}
def check_terminal_compatibility() -> Dict[str, bool]:
"""Check terminal compatibility for TUI features."""
return {
"color_support": sys.stdout.isatty(),
"unicode_support": True, # Most modern terminals support Unicode
"mouse_support": check_mouse_support(),
"textual_support": check_textual_support(),
}
def check_mouse_support() -> bool:
"""Check if terminal supports mouse events."""
# This is a basic check - actual mouse support depends on the terminal
return sys.stdout.isatty()
def check_textual_support() -> bool:
"""Check if Textual TUI framework can run."""
try:
import textual
return True
except ImportError:
return False
def get_platform_recommendations() -> list[str]:
"""Get platform-specific recommendations."""
system = platform.system()
recommendations = []
if system == "Darwin":
recommendations.extend(
[
"Install iTerm2 or Terminal.app for best TUI experience",
"Enable 'Terminal > Preferences > Profiles > Text > Unicode Normalization Form' set to 'None'",
"Consider using Homebrew for package management: brew install python3",
]
)
elif system == "Linux":
recommendations.extend(
[
"Use a modern terminal emulator like GNOME Terminal, Konsole, or Alacritty",
"Ensure UTF-8 locale is set: export LANG=en_US.UTF-8",
"Install system packages: sudo apt-get install python3-pip python3-venv",
]
)
elif system == "Windows":
recommendations.extend(
[
"Use Windows Terminal for best TUI experience",
"Enable UTF-8 support in Windows Terminal settings",
"Consider using WSL2 for better Unix compatibility",
"Install Python from python.org or Microsoft Store",
]
)
return recommendations
def validate_environment() -> Dict[str, Any]:
"""Validate the current environment for compatibility."""
platform_info = get_platform_info()
dependencies = check_dependencies()
terminal_compat = check_terminal_compatibility()
return {
"platform_supported": is_supported_platform(),
"platform_info": platform_info,
"dependencies": dependencies,
"terminal_compatibility": terminal_compat,
"recommendations": get_platform_recommendations(),
"config_dir": str(get_default_config_dir()),
"data_dir": str(get_default_data_dir()),
"log_dir": str(get_default_log_dir()),
}
if __name__ == "__main__":
# Print environment validation
env_info = validate_environment()
print("Platform Compatibility Check")
print("=" * 40)
print(
f"Platform: {env_info['platform_info']['system']} {env_info['platform_info']['release']}"
)
print(
f"Python: {env_info['platform_info']['python_version']} ({env_info['platform_info']['python_implementation']})"
)
print(f"Supported: {'' if env_info['platform_supported'] else ''}")
print()
print("Dependencies:")
for dep, available in env_info["dependencies"].items():
print(f" {dep}: {'' if available else ''}")
print()
print("Terminal Compatibility:")
for feature, supported in env_info["terminal_compatibility"].items():
print(f" {feature}: {'' if supported else ''}")
print()
print("Directories:")
print(f" Config: {env_info['config_dir']}")
print(f" Data: {env_info['data_dir']}")
print(f" Logs: {env_info['log_dir']}")
print()
if env_info["recommendations"]:
print("Recommendations:")
for rec in env_info["recommendations"]:
print(f"{rec}")