Files
luk/src/cli/sync.py
Bendt 48d2455b9c Make TUI the default mode for luk sync command
- luk sync now launches interactive TUI dashboard by default
- Add --once flag for single sync (non-interactive)
- Add --daemon flag for background daemon mode
- Keep 'luk sync run' as legacy subcommand for backwards compatibility
- Move common options (org, vdir, notify, etc.) to group level
2025-12-19 10:33:48 -05:00

1234 lines
39 KiB
Python

import click
import asyncio
import os
import sys
import signal
import json
import time
from datetime import datetime, timedelta
from pathlib import Path
from rich.progress import Progress, SpinnerColumn, MofNCompleteColumn
from src.utils.mail_utils.helpers import ensure_directory_exists
from src.utils.calendar_utils import save_events_to_vdir, save_events_to_file
from src.utils.notifications import notify_new_emails
from src.services.microsoft_graph.calendar import (
fetch_calendar_events,
sync_local_calendar_changes,
get_last_sync_time,
detect_deleted_events,
)
from src.services.microsoft_graph.mail import (
fetch_mail_async,
fetch_archive_mail_async,
archive_mail_async,
delete_mail_async,
synchronize_maildir_async,
process_outbox_async,
)
from src.services.microsoft_graph.auth import get_access_token
from src.services.godspeed.client import GodspeedClient
from src.services.godspeed.sync import GodspeedSync
# Timing state management
def get_sync_state_file():
"""Get the path to the sync state file."""
return os.path.expanduser("~/.local/share/luk/sync_state.json")
def load_sync_state():
"""Load the sync state from file."""
state_file = get_sync_state_file()
if os.path.exists(state_file):
try:
with open(state_file, "r") as f:
return json.load(f)
except Exception:
pass
return {
"last_godspeed_sync": 0,
"last_sweep_date": None,
"sweep_completed_today": False,
}
def save_sync_state(state):
"""Save the sync state to file."""
state_file = get_sync_state_file()
os.makedirs(os.path.dirname(state_file), exist_ok=True)
with open(state_file, "w") as f:
json.dump(state, f, indent=2)
def should_run_godspeed_sync():
"""Check if Godspeed sync should run (every 15 minutes)."""
state = load_sync_state()
current_time = time.time()
last_sync = state.get("last_godspeed_sync", 0)
return current_time - last_sync >= 900 # 15 minutes in seconds
def should_run_sweep():
"""Check if sweep should run (once after 6pm each day)."""
state = load_sync_state()
current_time = datetime.now()
# Check if it's after 6 PM
if current_time.hour < 18:
return False
# Check if we've already swept today
today_str = current_time.strftime("%Y-%m-%d")
last_sweep_date = state.get("last_sweep_date")
return last_sweep_date != today_str
def get_godspeed_sync_directory():
"""Get Godspeed sync directory from environment or default."""
sync_dir = os.getenv("GODSPEED_SYNC_DIR")
if sync_dir:
return Path(sync_dir)
# Default to ~/Documents/Godspeed or ~/.local/share/gtd-terminal-tools/godspeed
home = Path.home()
# Try Documents first
docs_dir = home / "Documents" / "Godspeed"
if docs_dir.parent.exists():
return docs_dir
# Fall back to data directory
data_dir = home / ".local" / "share" / "luk" / "godspeed"
return data_dir
def get_godspeed_credentials():
"""Get Godspeed credentials from environment."""
email = os.getenv("GODSPEED_EMAIL")
password = os.getenv("GODSPEED_PASSWORD")
token = os.getenv("GODSPEED_TOKEN")
return email, password, token
async def run_godspeed_sync(progress=None):
"""Run Godspeed bidirectional sync."""
try:
email, password, token = get_godspeed_credentials()
if not (token or (email and password)):
if progress:
progress.console.print(
"[yellow]⚠️ Skipping Godspeed sync: No credentials configured[/yellow]"
)
return False
sync_dir = get_godspeed_sync_directory()
if progress:
progress.console.print(
f"[cyan]🔄 Running Godspeed sync to {sync_dir}...[/cyan]"
)
client = GodspeedClient(email=email, password=password, token=token)
sync_engine = GodspeedSync(client, sync_dir)
sync_engine.sync_bidirectional()
# Update sync state
state = load_sync_state()
state["last_godspeed_sync"] = time.time()
save_sync_state(state)
if progress:
progress.console.print("[green]✅ Godspeed sync completed[/green]")
return True
except Exception as e:
if progress:
progress.console.print(f"[red]❌ Godspeed sync failed: {e}[/red]")
return False
async def run_task_sweep(progress=None):
"""Run task sweep from notes directory to Godspeed inbox."""
try:
from src.cli.godspeed import TaskSweeper
notes_dir_env = os.getenv("NOTES_DIR")
if not notes_dir_env:
if progress:
progress.console.print(
"[yellow]⚠️ Skipping task sweep: $NOTES_DIR not configured[/yellow]"
)
return False
notes_dir = Path(notes_dir_env)
if not notes_dir.exists():
if progress:
progress.console.print(
f"[yellow]⚠️ Skipping task sweep: Notes directory does not exist: {notes_dir}[/yellow]"
)
return False
godspeed_dir = get_godspeed_sync_directory()
if progress:
progress.console.print(
f"[cyan]🧹 Running task sweep from {notes_dir} to {godspeed_dir}...[/cyan]"
)
sweeper = TaskSweeper(notes_dir, godspeed_dir, dry_run=False)
result = sweeper.sweep_tasks()
# Update sweep state
state = load_sync_state()
state["last_sweep_date"] = datetime.now().strftime("%Y-%m-%d")
save_sync_state(state)
if result["swept_tasks"] > 0:
if progress:
progress.console.print(
f"[green]✅ Task sweep completed: {result['swept_tasks']} tasks swept[/green]"
)
else:
if progress:
progress.console.print(
"[green]✅ Task sweep completed: No tasks to sweep[/green]"
)
return True
except Exception as e:
if progress:
progress.console.print(f"[red]❌ Task sweep failed: {e}[/red]")
return False
# Function to create Maildir structure
def create_maildir_structure(base_path):
"""
Create the standard Maildir directory structure.
Args:
base_path (str): Base path for the Maildir.
Returns:
None
"""
ensure_directory_exists(os.path.join(base_path, "cur"))
ensure_directory_exists(os.path.join(base_path, "new"))
ensure_directory_exists(os.path.join(base_path, "tmp"))
ensure_directory_exists(os.path.join(base_path, ".Archive", "cur"))
ensure_directory_exists(os.path.join(base_path, ".Archive", "new"))
ensure_directory_exists(os.path.join(base_path, ".Archive", "tmp"))
ensure_directory_exists(os.path.join(base_path, ".Trash", "cur"))
# Create outbox structure for sending emails
ensure_directory_exists(os.path.join(base_path, "outbox", "new"))
ensure_directory_exists(os.path.join(base_path, "outbox", "cur"))
ensure_directory_exists(os.path.join(base_path, "outbox", "tmp"))
ensure_directory_exists(os.path.join(base_path, "outbox", "failed"))
async def fetch_calendar_async(
headers,
progress,
task_id,
dry_run,
vdir_path,
ics_path,
org_name,
days_back,
days_forward,
continue_iteration,
):
"""
Fetch calendar events and save them in the appropriate format.
Args:
headers: Authentication headers for Microsoft Graph API
progress: Progress instance for updating progress bars
task_id: ID of the task in the progress bar
Returns:
List of event dictionaries
Raises:
Exception: If there's an error fetching or saving events
"""
try:
# Use the utility function to fetch calendar events
progress.console.print(
"[cyan]Fetching events from Microsoft Graph API...[/cyan]"
)
events, total_events = await fetch_calendar_events(
headers=headers, days_back=days_back, days_forward=days_forward
)
progress.console.print(
f"[cyan]Got {len(events)} events from API (reported total: {total_events})[/cyan]"
)
# Update progress bar with total events
progress.update(task_id, total=total_events)
# Define org_vdir_path up front if vdir_path is specified
org_vdir_path = os.path.join(vdir_path, org_name) if vdir_path else None
# Save events to appropriate format
if not dry_run:
if vdir_path and org_vdir_path:
progress.console.print(
f"[cyan]Saving events to vdir: {org_vdir_path}[/cyan]"
)
save_events_to_vdir(events, org_vdir_path, progress, task_id, dry_run)
progress.console.print(
f"[green]Finished saving events to vdir: {org_vdir_path}[/green]"
)
elif ics_path:
# Save to a single ICS file in the output_ics directory
progress.console.print(
f"[cyan]Saving events to ICS file: {ics_path}/events_latest.ics[/cyan]"
)
save_events_to_file(
events, f"{ics_path}/events_latest.ics", progress, task_id, dry_run
)
progress.console.print(
"[green]Finished saving events to ICS file[/green]"
)
else:
# No destination specified
progress.console.print(
"[yellow]Warning: No destination path (--vdir or --icsfile) specified for calendar events.[/yellow]"
)
else:
progress.console.print(
f"[DRY-RUN] Would save {len(events)} events to {
'vdir format' if vdir_path else 'single ICS file'
}"
)
progress.update(task_id, advance=len(events))
# Interactive mode: Ask if the user wants to continue with the next date range
if continue_iteration:
# Move to the next date range
next_start_date = datetime.now() - timedelta(days=days_back)
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(
f"\nCurrent date range: {next_start_date.strftime('%Y-%m-%d')} to {
next_end_date.strftime('%Y-%m-%d')
}"
)
user_response = (
click.prompt("\nContinue to iterate? [y/N]", default="N")
.strip()
.lower()
)
while user_response == "y":
progress.console.print(
f"\nFetching events for {next_start_date.strftime('%Y-%m-%d')} to {
next_end_date.strftime('%Y-%m-%d')
}..."
)
# Reset the progress bar for the new fetch
progress.update(task_id, completed=0, total=0)
# Fetch events for the next date range
next_events, next_total_events = await fetch_calendar_events(
headers=headers,
days_back=0,
days_forward=days_forward,
start_date=next_start_date,
end_date=next_end_date,
)
# Update progress bar with total events
progress.update(task_id, total=next_total_events)
if not dry_run:
if vdir_path and org_vdir_path:
save_events_to_vdir(
next_events, org_vdir_path, progress, task_id, dry_run
)
else:
save_events_to_file(
next_events,
f"output_ics/outlook_events_{next_start_date.strftime('%Y%m%d')}.ics",
progress,
task_id,
dry_run,
)
else:
progress.console.print(
f"[DRY-RUN] Would save {len(next_events)} events to {
'vdir format'
if vdir_path
else 'output_ics/outlook_events_'
+ next_start_date.strftime('%Y%m%d')
+ '.ics'
}"
)
progress.update(task_id, advance=len(next_events))
# Calculate the next date range
next_start_date = next_end_date
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(
f"\nNext date range would be: {
next_start_date.strftime('%Y-%m-%d')
} to {next_end_date.strftime('%Y-%m-%d')}"
)
user_response = (
click.prompt("\nContinue to iterate? [y/N]", default="N")
.strip()
.lower()
)
return events
except Exception as e:
progress.console.print(
f"[red]Error fetching or saving calendar events: {str(e)}[/red]"
)
import traceback
progress.console.print(f"[red]{traceback.format_exc()}[/red]")
progress.update(task_id, completed=True)
return []
async def _sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
):
"""Synchronize data from external sources."""
# Expand the user home directory in vdir path
vdir = os.path.expanduser(vdir)
# Save emails to Maildir
base_maildir_path = os.getenv("MAILDIR_PATH", os.path.expanduser("~/Mail"))
maildir_path = base_maildir_path + f"/{org}"
attachments_dir = os.path.join(maildir_path, "attachments")
ensure_directory_exists(attachments_dir)
create_maildir_structure(maildir_path)
# Define scopes for Microsoft Graph API
scopes = [
"https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite",
]
# Authenticate and get access token
access_token, headers = get_access_token(scopes)
# Set up the progress bars
progress = Progress(
SpinnerColumn(), MofNCompleteColumn(), *Progress.get_default_columns()
)
with progress:
task_fetch = progress.add_task("[green]Syncing Inbox...", total=0)
task_fetch_archive = progress.add_task("[green]Syncing Archive...", total=0)
task_calendar = progress.add_task("[cyan]Fetching calendar...", total=0)
task_local_calendar = progress.add_task(
"[magenta]Syncing local calendar...", total=0
)
task_read = progress.add_task("[blue]Marking as read...", total=0)
task_archive = progress.add_task("[yellow]Archiving mail...", total=0)
task_delete = progress.add_task("[red]Deleting mail...", total=0)
task_outbox = progress.add_task(
"[bright_green]Sending outbound mail...", total=0
)
# Stage 1: Synchronize local changes (read, archive, delete, calendar) to the server
progress.console.print(
"[bold cyan]Step 1: Syncing local changes to server...[/bold cyan]"
)
# Handle calendar sync first (if vdir is specified and two-way sync is enabled)
calendar_sync_results = (0, 0)
if vdir and two_way_calendar:
org_vdir_path = os.path.join(os.path.expanduser(vdir), org)
progress.console.print(
f"[magenta]Checking for local calendar changes in {org_vdir_path}...[/magenta]"
)
calendar_sync_results = await sync_local_calendar_changes(
headers, org_vdir_path, progress, task_local_calendar, dry_run
)
# Handle mail changes and outbound email in parallel
await asyncio.gather(
synchronize_maildir_async(
maildir_path, headers, progress, task_read, dry_run
),
archive_mail_async(maildir_path, headers, progress, task_archive, dry_run),
delete_mail_async(maildir_path, headers, progress, task_delete, dry_run),
process_outbox_async(
base_maildir_path, org, headers, progress, task_outbox, dry_run
),
)
progress.console.print("[bold green]Step 1: Local changes synced.[/bold green]")
# Report calendar sync results
created, deleted = calendar_sync_results
if two_way_calendar and (created > 0 or deleted > 0):
progress.console.print(
f"[magenta]📅 Two-way calendar sync: {created} events created, {deleted} events deleted[/magenta]"
)
elif two_way_calendar:
progress.console.print(
"[magenta]📅 Two-way calendar sync: No local changes detected[/magenta]"
)
# Stage 2: Fetch new data from the server
progress.console.print(
"\n[bold cyan]Step 2: Fetching new data from server...[/bold cyan]"
)
# Track messages before sync for notifications
maildir_path = (
os.getenv("MAILDIR_PATH", os.path.expanduser("~/Mail")) + f"/{org}"
)
messages_before = 0
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
if notify:
if os.path.exists(new_dir):
messages_before += len([f for f in os.listdir(new_dir) if ".eml" in f])
if os.path.exists(cur_dir):
messages_before += len([f for f in os.listdir(cur_dir) if ".eml" in f])
await asyncio.gather(
fetch_mail_async(
maildir_path,
attachments_dir,
headers,
progress,
task_fetch,
dry_run,
download_attachments,
),
fetch_archive_mail_async(
maildir_path,
attachments_dir,
headers,
progress,
task_fetch_archive,
dry_run,
download_attachments,
),
fetch_calendar_async(
headers,
progress,
task_calendar,
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
),
)
# Send notification for new emails if enabled
if notify and not dry_run:
messages_after = 0
if os.path.exists(new_dir):
messages_after += len([f for f in os.listdir(new_dir) if ".eml" in f])
if os.path.exists(cur_dir):
messages_after += len([f for f in os.listdir(cur_dir) if ".eml" in f])
new_message_count = messages_after - messages_before
if new_message_count > 0:
notify_new_emails(new_message_count, org)
progress.console.print("[bold green]Step 2: New data fetched.[/bold green]")
# Stage 3: Run Godspeed operations based on timing
progress.console.print(
"\n[bold cyan]Step 3: Running Godspeed operations...[/bold cyan]"
)
# Check if Godspeed sync should run (every 15 minutes)
if should_run_godspeed_sync():
await run_godspeed_sync(progress)
else:
progress.console.print("[dim]⏭️ Skipping Godspeed sync (not due yet)[/dim]")
# Check if task sweep should run (once after 6pm daily)
if should_run_sweep():
await run_task_sweep(progress)
else:
current_hour = datetime.now().hour
if current_hour < 18:
progress.console.print(
"[dim]⏭️ Skipping task sweep (before 6 PM)[/dim]"
)
else:
progress.console.print(
"[dim]⏭️ Skipping task sweep (already completed today)[/dim]"
)
progress.console.print(
"[bold green]Step 3: Godspeed operations completed.[/bold green]"
)
click.echo("Sync complete.")
@click.group(invoke_without_command=True)
@click.option(
"--once",
is_flag=True,
help="Run a single sync and exit (non-interactive).",
default=False,
)
@click.option(
"--daemon",
is_flag=True,
help="Run in background daemon mode.",
default=False,
)
@click.option(
"--org",
help="Specify the organization name for the subfolder to store emails and calendar events",
default="corteva",
)
@click.option(
"--vdir",
help="Output calendar events in vdir format to the specified directory",
default="~/Calendar",
)
@click.option(
"--notify/--no-notify",
help="Send macOS notifications for new email messages",
default=True,
)
@click.option(
"--dry-run",
is_flag=True,
help="Run in dry-run mode without making changes.",
default=False,
)
@click.option(
"--demo",
is_flag=True,
help="Run with simulated sync (demo mode)",
default=False,
)
@click.option(
"--days-back",
type=int,
help="Number of days to look back for calendar events",
default=1,
)
@click.option(
"--days-forward",
type=int,
help="Number of days to look forward for calendar events",
default=30,
)
@click.option(
"--download-attachments",
is_flag=True,
help="Download email attachments",
default=False,
)
@click.option(
"--two-way-calendar",
is_flag=True,
help="Enable two-way calendar sync (sync local changes to server)",
default=False,
)
@click.pass_context
def sync(
ctx,
once,
daemon,
org,
vdir,
notify,
dry_run,
demo,
days_back,
days_forward,
download_attachments,
two_way_calendar,
):
"""Email and calendar synchronization.
By default, opens the interactive TUI dashboard.
Use --once for a single sync, or --daemon for background mode.
"""
# If a subcommand is invoked, let it handle everything
if ctx.invoked_subcommand is not None:
return
# Handle the default behavior (no subcommand)
if daemon:
# Run in daemon mode
from .sync_daemon import create_daemon_config, SyncDaemon
config = create_daemon_config(
dry_run=dry_run,
vdir=vdir,
icsfile=None,
org=org,
days_back=days_back,
days_forward=days_forward,
continue_iteration=False,
download_attachments=download_attachments,
two_way_calendar=two_way_calendar,
notify=notify,
)
daemon_instance = SyncDaemon(config)
daemon_instance.start()
elif once:
# Run a single sync (non-interactive)
asyncio.run(
_sync_outlook_data(
dry_run,
vdir,
None, # icsfile
org,
days_back,
days_forward,
False, # continue_iteration
download_attachments,
two_way_calendar,
notify,
)
)
else:
# Default: Launch interactive TUI dashboard
from .sync_dashboard import run_dashboard_sync
sync_config = {
"org": org,
"vdir": vdir,
"notify": notify,
"dry_run": dry_run,
"days_back": days_back,
"days_forward": days_forward,
"download_attachments": download_attachments,
"two_way_calendar": two_way_calendar,
"continue_iteration": False,
"icsfile": None,
}
asyncio.run(
run_dashboard_sync(notify=notify, sync_config=sync_config, demo_mode=demo)
)
def daemonize():
"""Properly daemonize the process for Unix systems."""
# First fork
try:
pid = os.fork()
if pid > 0:
# Parent exits
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #1 failed: {e}\n")
sys.exit(1)
# Decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# Second fork
try:
pid = os.fork()
if pid > 0:
# Parent exits
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #2 failed: {e}\n")
sys.exit(1)
# Redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, "r")
so = open(os.devnull, "a+")
se = open(os.devnull, "a+")
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
@sync.command()
@click.option(
"--dry-run",
is_flag=True,
help="Run in dry-run mode without making changes.",
default=False,
)
@click.option(
"--vdir",
help="Output calendar events in vdir format to the specified directory (each event in its own file)",
default="~/Calendar",
)
@click.option(
"--icsfile", help="Output calendar events into this ics file path.", default=None
)
@click.option(
"--org",
help="Specify the organization name for the subfolder to store emails and calendar events",
default="corteva",
)
@click.option(
"--days-back",
type=int,
help="Number of days to look back for calendar events",
default=1,
)
@click.option(
"--days-forward",
type=int,
help="Number of days to look forward for calendar events",
default=30,
)
@click.option(
"--continue-iteration",
is_flag=True,
help="Enable interactive mode to continue fetching more date ranges",
default=False,
)
@click.option(
"--download-attachments",
is_flag=True,
help="Download email attachments",
default=False,
)
@click.option(
"--two-way-calendar",
is_flag=True,
help="Enable two-way calendar sync (sync local changes to server)",
default=False,
)
@click.option(
"--notify",
is_flag=True,
help="Send macOS notifications for new email messages",
default=False,
)
def run(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
):
"""Run a single sync operation (legacy command, prefer 'luk sync --once')."""
asyncio.run(
_sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
)
)
@sync.command()
def stop():
"""Stop the sync daemon."""
pid_file = os.path.expanduser("~/.config/luk/luk.pid")
if not os.path.exists(pid_file):
click.echo("Daemon is not running (no PID file found)")
return
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
# Send SIGTERM to process
os.kill(pid, signal.SIGTERM)
# Remove PID file
os.unlink(pid_file)
click.echo(f"Daemon stopped (PID {pid})")
except (ValueError, ProcessLookupError, OSError) as e:
click.echo(f"Error stopping daemon: {e}")
# Clean up stale PID file
if os.path.exists(pid_file):
os.unlink(pid_file)
@sync.command()
def status():
"""Check the status of the sync daemon."""
pid_file = os.path.expanduser("~/.config/luk/luk.pid")
if not os.path.exists(pid_file):
click.echo("Daemon is not running")
return
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
# Check if process exists
os.kill(pid, 0) # Send signal 0 to check if process exists
click.echo(f"Daemon is running (PID {pid})")
except (ValueError, ProcessLookupError, OSError):
click.echo("Daemon is not running (stale PID file)")
# Clean up stale PID file
os.unlink(pid_file)
@sync.command(name="interactive")
@click.option(
"--org",
help="Specify the organization name for the subfolder to store emails and calendar events",
default="corteva",
)
@click.option(
"--vdir",
help="Output calendar events in vdir format to the specified directory",
default="~/Calendar",
)
@click.option(
"--notify/--no-notify",
help="Send macOS notifications for new email messages",
default=True,
)
@click.option(
"--dry-run",
is_flag=True,
help="Run in dry-run mode without making changes.",
default=False,
)
@click.option(
"--demo",
is_flag=True,
help="Run with simulated sync (demo mode)",
default=False,
)
def interactive(org, vdir, notify, dry_run, demo):
"""Launch interactive TUI dashboard for sync operations."""
from .sync_dashboard import run_dashboard_sync
sync_config = {
"org": org,
"vdir": vdir,
"notify": notify,
"dry_run": dry_run,
"days_back": 1,
"days_forward": 30,
"download_attachments": False,
"two_way_calendar": False,
"continue_iteration": False,
"icsfile": None,
}
asyncio.run(
run_dashboard_sync(notify=notify, sync_config=sync_config, demo_mode=demo)
)
# Alias 'i' for 'interactive'
sync.add_command(interactive, name="i")
def check_calendar_changes(vdir_path, org):
"""
Check if there are local calendar changes that need syncing.
Args:
vdir_path (str): Base vdir path
org (str): Organization name
Returns:
tuple: (has_changes, change_description)
"""
if not vdir_path:
return False, "No vdir path configured"
org_vdir_path = os.path.join(os.path.expanduser(vdir_path), org)
if not os.path.exists(org_vdir_path):
return False, "Calendar directory does not exist"
try:
# Get last sync time
last_sync_time = get_last_sync_time(org_vdir_path)
# Check if vdir directory has been modified since last sync
vdir_mtime = os.path.getmtime(org_vdir_path)
if vdir_mtime > last_sync_time:
# Check for specific types of changes
deleted_events = detect_deleted_events(org_vdir_path)
# Count .ics files to detect new events
import glob
ics_files = glob.glob(os.path.join(org_vdir_path, "*.ics"))
# Load previous state to compare
state_file = os.path.join(org_vdir_path, ".sync_state.json")
previous_state = {}
if os.path.exists(state_file):
try:
import json
with open(state_file, "r") as f:
previous_state = json.load(f)
except Exception:
pass
new_event_count = len(ics_files) - len(previous_state) + len(deleted_events)
if deleted_events or new_event_count > 0:
changes = []
if new_event_count > 0:
changes.append(f"{new_event_count} new events")
if deleted_events:
changes.append(f"{len(deleted_events)} deleted events")
return True, ", ".join(changes)
else:
return True, "directory modified"
return False, "no changes detected"
except Exception as e:
return False, f"error checking calendar: {str(e)}"
async def daemon_mode(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
):
"""
Run the script in daemon mode, periodically syncing emails and calendar.
"""
from src.services.microsoft_graph.mail import get_inbox_count_async
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from datetime import datetime
import time
console = Console()
sync_interval = 300 # 5 minutes
check_interval = 10 # 10 seconds
last_sync_time = time.time() - sync_interval # Force initial sync
def create_status_display(status_text, status_color="cyan"):
"""Create a status panel for daemon mode."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = Text()
content.append(f"[{timestamp}] ", style="dim")
content.append(status_text, style=status_color)
return Panel(
content,
title="📧 Email & Calendar Sync Daemon",
border_style="blue",
padding=(0, 1),
)
# Initial display
console.print(create_status_display("Starting daemon mode...", "green"))
while True:
if time.time() - last_sync_time >= sync_interval:
# Show full sync status
console.clear()
console.print(create_status_display("Performing full sync...", "green"))
# Perform a full sync
await _sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
)
last_sync_time = time.time()
# Show completion
console.print(create_status_display("Full sync completed ✅", "green"))
else:
# Show checking status
console.clear()
console.print(create_status_display("Checking for changes...", "cyan"))
try:
# Authenticate and get access token for mail check
scopes = ["https://graph.microsoft.com/Mail.Read"]
access_token, headers = get_access_token(scopes)
remote_message_count = await get_inbox_count_async(headers)
maildir_path = os.path.expanduser(f"~/Mail/{org}")
# Count local messages
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
local_message_count = 0
if os.path.exists(new_dir):
local_message_count += len(
[f for f in os.listdir(new_dir) if ".eml" in f]
)
if os.path.exists(cur_dir):
local_message_count += len(
[f for f in os.listdir(cur_dir) if ".eml" in f]
)
mail_changes = remote_message_count != local_message_count
# Check for calendar changes if two-way sync is enabled
calendar_changes = False
calendar_change_desc = ""
if two_way_calendar and vdir:
calendar_changes, calendar_change_desc = check_calendar_changes(
vdir, org
)
# Check for outbound emails in outbox
base_maildir_path = os.getenv(
"MAILDIR_PATH", os.path.expanduser("~/Mail")
)
outbox_new_dir = os.path.join(base_maildir_path, org, "outbox", "new")
outbox_changes = False
pending_email_count = 0
if os.path.exists(outbox_new_dir):
pending_emails = [
f for f in os.listdir(outbox_new_dir) if not f.startswith(".")
]
pending_email_count = len(pending_emails)
outbox_changes = pending_email_count > 0
# Check Godspeed operations
godspeed_sync_due = should_run_godspeed_sync()
sweep_due = should_run_sweep()
# Determine what changed and show appropriate status
changes_detected = (
mail_changes
or calendar_changes
or outbox_changes
or godspeed_sync_due
or sweep_due
)
if changes_detected:
change_parts = []
if mail_changes:
change_parts.append(
f"Mail: Remote {remote_message_count}, Local {local_message_count}"
)
if calendar_changes:
change_parts.append(f"Calendar: {calendar_change_desc}")
if outbox_changes:
change_parts.append(f"Outbox: {pending_email_count} pending")
if godspeed_sync_due:
change_parts.append("Godspeed sync due")
if sweep_due:
change_parts.append("Task sweep due")
console.print(
create_status_display(
f"Changes detected! {' | '.join(change_parts)}. Starting sync...",
"yellow",
)
)
# Sync if any changes detected
if changes_detected:
await _sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
notify,
)
last_sync_time = time.time()
console.print(create_status_display("Sync completed ✅", "green"))
else:
status_parts = [
f"Mail: Remote {remote_message_count}, Local {local_message_count}"
]
if two_way_calendar:
status_parts.append(f"Calendar: {calendar_change_desc}")
status_parts.append(f"Outbox: {pending_email_count} pending")
# Add Godspeed status
state = load_sync_state()
last_godspeed = state.get("last_godspeed_sync", 0)
minutes_since_godspeed = int((time.time() - last_godspeed) / 60)
status_parts.append(f"Godspeed: {minutes_since_godspeed}m ago")
last_sweep = state.get("last_sweep_date")
if last_sweep == datetime.now().strftime("%Y-%m-%d"):
status_parts.append("Sweep: done today")
else:
current_hour = datetime.now().hour
if current_hour >= 18:
status_parts.append("Sweep: due")
else:
hours_until_sweep = 18 - current_hour
status_parts.append(f"Sweep: in {hours_until_sweep}h")
console.print(
create_status_display(
f"No changes detected ({', '.join(status_parts)})",
"green",
)
)
except Exception as e:
console.print(
create_status_display(f"Error during check: {str(e)}", "red")
)
time.sleep(check_interval)