Files
luk/src/cli/sync.py
Tim Bendt fc7d07ae6b add calendar change detection to daemon mode
Enhances the sync daemon to monitor both email and calendar changes, automatically triggering syncs when local calendar events are added or deleted in VDIR format.

🤖 Generated with [opencode](https://opencode.ai)

Co-Authored-By: opencode <noreply@opencode.ai>
2025-07-16 09:46:31 -04:00

650 lines
22 KiB
Python

import click
import asyncio
import os
import sys
from rich.progress import Progress, SpinnerColumn, MofNCompleteColumn
from datetime import datetime, timedelta
from src.utils.mail_utils.helpers import ensure_directory_exists
from src.utils.calendar_utils import save_events_to_vdir, save_events_to_file
from src.services.microsoft_graph.calendar import (
fetch_calendar_events,
sync_local_calendar_changes,
get_last_sync_time,
detect_deleted_events,
)
from src.services.microsoft_graph.mail import (
fetch_mail_async,
archive_mail_async,
delete_mail_async,
synchronize_maildir_async,
)
from src.services.microsoft_graph.auth import get_access_token
# Function to create Maildir structure
def create_maildir_structure(base_path):
"""
Create the standard Maildir directory structure.
Args:
base_path (str): Base path for the Maildir.
Returns:
None
"""
ensure_directory_exists(os.path.join(base_path, "cur"))
ensure_directory_exists(os.path.join(base_path, "new"))
ensure_directory_exists(os.path.join(base_path, "tmp"))
ensure_directory_exists(os.path.join(base_path, ".Archives"))
ensure_directory_exists(os.path.join(base_path, ".Trash", "cur"))
async def fetch_calendar_async(
headers,
progress,
task_id,
dry_run,
vdir_path,
ics_path,
org_name,
days_back,
days_forward,
continue_iteration,
):
"""
Fetch calendar events and save them in the appropriate format.
Args:
headers: Authentication headers for Microsoft Graph API
progress: Progress instance for updating progress bars
task_id: ID of the task in the progress bar
Returns:
List of event dictionaries
Raises:
Exception: If there's an error fetching or saving events
"""
try:
# Use the utility function to fetch calendar events
progress.console.print(
"[cyan]Fetching events from Microsoft Graph API...[/cyan]"
)
events, total_events = await fetch_calendar_events(
headers=headers, days_back=days_back, days_forward=days_forward
)
progress.console.print(
f"[cyan]Got {len(events)} events from API (reported total: {total_events})[/cyan]"
)
# Update progress bar with total events
progress.update(task_id, total=total_events)
# Save events to appropriate format
if not dry_run:
if vdir_path:
# Create org-specific directory within vdir path
org_vdir_path = os.path.join(vdir_path, org_name)
progress.console.print(
f"[cyan]Saving events to vdir: {org_vdir_path}[/cyan]"
)
save_events_to_vdir(events, org_vdir_path, progress, task_id, dry_run)
progress.console.print(
f"[green]Finished saving events to vdir: {org_vdir_path}[/green]"
)
elif ics_path:
# Save to a single ICS file in the output_ics directory
progress.console.print(
f"[cyan]Saving events to ICS file: {ics_path}/events_latest.ics[/cyan]"
)
save_events_to_file(
events, f"{ics_path}/events_latest.ics", progress, task_id, dry_run
)
progress.console.print(
f"[green]Finished saving events to ICS file[/green]"
)
else:
# No destination specified
progress.console.print(
"[yellow]Warning: No destination path (--vdir or --icsfile) specified for calendar events.[/yellow]"
)
else:
progress.console.print(
f"[DRY-RUN] Would save {len(events)} events to {
'vdir format' if vdir_path else 'single ICS file'
}"
)
progress.update(task_id, advance=len(events))
# Interactive mode: Ask if the user wants to continue with the next date range
if continue_iteration:
# Move to the next date range
next_start_date = datetime.now() - timedelta(days=days_back)
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(
f"\nCurrent date range: {next_start_date.strftime('%Y-%m-%d')} to {
next_end_date.strftime('%Y-%m-%d')
}"
)
user_response = (
click.prompt("\nContinue to iterate? [y/N]", default="N")
.strip()
.lower()
)
while user_response == "y":
progress.console.print(
f"\nFetching events for {next_start_date.strftime('%Y-%m-%d')} to {
next_end_date.strftime('%Y-%m-%d')
}..."
)
# Reset the progress bar for the new fetch
progress.update(task_id, completed=0, total=0)
# Fetch events for the next date range
next_events, next_total_events = await fetch_calendar_events(
headers=headers,
days_back=0,
days_forward=days_forward,
start_date=next_start_date,
end_date=next_end_date,
)
# Update progress bar with total events
progress.update(task_id, total=next_total_events)
if not dry_run:
if vdir_path:
save_events_to_vdir(
next_events, org_vdir_path, progress, task_id, dry_run
)
else:
save_events_to_file(
next_events,
f"output_ics/outlook_events_{next_start_date.strftime('%Y%m%d')}.ics",
progress,
task_id,
dry_run,
)
else:
progress.console.print(
f"[DRY-RUN] Would save {len(next_events)} events to {
'vdir format'
if vdir_path
else 'output_ics/outlook_events_'
+ next_start_date.strftime('%Y%m%d')
+ '.ics'
}"
)
progress.update(task_id, advance=len(next_events))
# Calculate the next date range
next_start_date = next_end_date
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(
f"\nNext date range would be: {
next_start_date.strftime('%Y-%m-%d')
} to {next_end_date.strftime('%Y-%m-%d')}"
)
user_response = (
click.prompt("\nContinue to iterate? [y/N]", default="N")
.strip()
.lower()
)
return events
except Exception as e:
progress.console.print(
f"[red]Error fetching or saving calendar events: {str(e)}[/red]"
)
import traceback
progress.console.print(f"[red]{traceback.format_exc()}[/red]")
progress.update(task_id, completed=True)
return []
async def _sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
):
"""Synchronize data from external sources."""
# Expand the user home directory in vdir path
vdir = os.path.expanduser(vdir)
# Save emails to Maildir
maildir_path = os.getenv("MAILDIR_PATH", os.path.expanduser("~/Mail")) + f"/{org}"
attachments_dir = os.path.join(maildir_path, "attachments")
ensure_directory_exists(attachments_dir)
create_maildir_structure(maildir_path)
# Define scopes for Microsoft Graph API
scopes = [
"https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite",
]
# Authenticate and get access token
access_token, headers = get_access_token(scopes)
# Set up the progress bars
progress = Progress(
SpinnerColumn(), MofNCompleteColumn(), *Progress.get_default_columns()
)
with progress:
task_fetch = progress.add_task("[green]Syncing Inbox...", total=0)
task_calendar = progress.add_task("[cyan]Fetching calendar...", total=0)
task_local_calendar = progress.add_task(
"[magenta]Syncing local calendar...", total=0
)
task_read = progress.add_task("[blue]Marking as read...", total=0)
task_archive = progress.add_task("[yellow]Archiving mail...", total=0)
task_delete = progress.add_task("[red]Deleting mail...", total=0)
# Stage 1: Synchronize local changes (read, archive, delete, calendar) to the server
progress.console.print(
"[bold cyan]Step 1: Syncing local changes to server...[/bold cyan]"
)
# Handle calendar sync first (if vdir is specified and two-way sync is enabled)
calendar_sync_results = (0, 0)
if vdir and two_way_calendar:
org_vdir_path = os.path.join(os.path.expanduser(vdir), org)
progress.console.print(
f"[magenta]Checking for local calendar changes in {org_vdir_path}...[/magenta]"
)
calendar_sync_results = await sync_local_calendar_changes(
headers, org_vdir_path, progress, task_local_calendar, dry_run
)
# Handle mail changes in parallel
await asyncio.gather(
synchronize_maildir_async(
maildir_path, headers, progress, task_read, dry_run
),
archive_mail_async(maildir_path, headers, progress, task_archive, dry_run),
delete_mail_async(maildir_path, headers, progress, task_delete, dry_run),
)
progress.console.print("[bold green]Step 1: Local changes synced.[/bold green]")
# Report calendar sync results
created, deleted = calendar_sync_results
if two_way_calendar and (created > 0 or deleted > 0):
progress.console.print(
f"[magenta]📅 Two-way calendar sync: {created} events created, {deleted} events deleted[/magenta]"
)
elif two_way_calendar:
progress.console.print(
"[magenta]📅 Two-way calendar sync: No local changes detected[/magenta]"
)
# Stage 2: Fetch new data from the server
progress.console.print(
"\n[bold cyan]Step 2: Fetching new data from server...[/bold cyan]"
)
await asyncio.gather(
fetch_mail_async(
maildir_path,
attachments_dir,
headers,
progress,
task_fetch,
dry_run,
download_attachments,
),
fetch_calendar_async(
headers,
progress,
task_calendar,
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
),
)
progress.console.print("[bold green]Step 2: New data fetched.[/bold green]")
click.echo("Sync complete.")
@click.command()
@click.option(
"--dry-run",
is_flag=True,
help="Run in dry-run mode without making changes.",
default=False,
)
@click.option(
"--vdir",
help="Output calendar events in vdir format to the specified directory (each event in its own file)",
default="~/Calendar",
)
@click.option(
"--icsfile", help="Output calendar events into this ics file path.", default=None
)
@click.option(
"--org",
help="Specify the organization name for the subfolder to store emails and calendar events",
default="corteva",
)
@click.option(
"--days-back",
type=int,
help="Number of days to look back for calendar events",
default=1,
)
@click.option(
"--days-forward",
type=int,
help="Number of days to look forward for calendar events",
default=30,
)
@click.option(
"--continue-iteration",
is_flag=True,
help="Enable interactive mode to continue fetching more date ranges",
default=False,
)
@click.option(
"--download-attachments",
is_flag=True,
help="Download email attachments",
default=False,
)
@click.option(
"--two-way-calendar",
is_flag=True,
help="Enable two-way calendar sync (sync local changes to server)",
default=False,
)
@click.option(
"--daemon",
is_flag=True,
help="Run in daemon mode.",
default=False,
)
def sync(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
daemon,
):
if daemon:
asyncio.run(
daemon_mode(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
)
)
else:
asyncio.run(
_sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
)
)
def check_calendar_changes(vdir_path, org):
"""
Check if there are local calendar changes that need syncing.
Args:
vdir_path (str): Base vdir path
org (str): Organization name
Returns:
tuple: (has_changes, change_description)
"""
if not vdir_path:
return False, "No vdir path configured"
org_vdir_path = os.path.join(os.path.expanduser(vdir_path), org)
if not os.path.exists(org_vdir_path):
return False, "Calendar directory does not exist"
try:
# Get last sync time
last_sync_time = get_last_sync_time(org_vdir_path)
# Check if vdir directory has been modified since last sync
vdir_mtime = os.path.getmtime(org_vdir_path)
if vdir_mtime > last_sync_time:
# Check for specific types of changes
deleted_events = detect_deleted_events(org_vdir_path)
# Count .ics files to detect new events
import glob
ics_files = glob.glob(os.path.join(org_vdir_path, "*.ics"))
# Load previous state to compare
state_file = os.path.join(org_vdir_path, ".sync_state.json")
previous_state = {}
if os.path.exists(state_file):
try:
import json
with open(state_file, "r") as f:
previous_state = json.load(f)
except Exception:
pass
new_event_count = len(ics_files) - len(previous_state) + len(deleted_events)
if deleted_events or new_event_count > 0:
changes = []
if new_event_count > 0:
changes.append(f"{new_event_count} new events")
if deleted_events:
changes.append(f"{len(deleted_events)} deleted events")
return True, ", ".join(changes)
else:
return True, "directory modified"
return False, "no changes detected"
except Exception as e:
return False, f"error checking calendar: {str(e)}"
async def daemon_mode(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
):
"""
Run the script in daemon mode, periodically syncing emails and calendar.
"""
from src.services.microsoft_graph.mail import get_inbox_count_async
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from datetime import datetime
import time
console = Console()
sync_interval = 300 # 5 minutes
check_interval = 10 # 10 seconds
last_sync_time = time.time() - sync_interval # Force initial sync
def create_status_display(status_text, status_color="cyan"):
"""Create a status panel for daemon mode."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = Text()
content.append(f"[{timestamp}] ", style="dim")
content.append(status_text, style=status_color)
return Panel(
content,
title="📧 Email & Calendar Sync Daemon",
border_style="blue",
padding=(0, 1),
)
# Initial display
console.print(create_status_display("Starting daemon mode...", "green"))
while True:
if time.time() - last_sync_time >= sync_interval:
# Show full sync status
console.clear()
console.print(create_status_display("Performing full sync...", "green"))
# Perform a full sync
await _sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
)
last_sync_time = time.time()
# Show completion
console.print(create_status_display("Full sync completed ✅", "green"))
else:
# Show checking status
console.clear()
console.print(create_status_display("Checking for changes...", "cyan"))
try:
# Authenticate and get access token for mail check
scopes = ["https://graph.microsoft.com/Mail.Read"]
access_token, headers = get_access_token(scopes)
remote_message_count = await get_inbox_count_async(headers)
maildir_path = os.path.expanduser(f"~/Mail/{org}")
# Count local messages
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
local_message_count = 0
if os.path.exists(new_dir):
local_message_count += len(
[f for f in os.listdir(new_dir) if ".eml" in f]
)
if os.path.exists(cur_dir):
local_message_count += len(
[f for f in os.listdir(cur_dir) if ".eml" in f]
)
mail_changes = remote_message_count != local_message_count
# Check for calendar changes if two-way sync is enabled
calendar_changes = False
calendar_change_desc = ""
if two_way_calendar and vdir:
calendar_changes, calendar_change_desc = check_calendar_changes(
vdir, org
)
# Determine what changed and show appropriate status
if mail_changes and calendar_changes:
console.print(
create_status_display(
f"Changes detected! Mail: Remote {remote_message_count}, Local {local_message_count} | Calendar: {calendar_change_desc}. Starting sync...",
"yellow",
)
)
elif mail_changes:
console.print(
create_status_display(
f"New messages detected! Remote: {remote_message_count}, Local: {local_message_count}. Starting sync...",
"yellow",
)
)
elif calendar_changes:
console.print(
create_status_display(
f"Calendar changes detected! {calendar_change_desc}. Starting sync...",
"yellow",
)
)
# Sync if any changes detected
if mail_changes or calendar_changes:
await _sync_outlook_data(
dry_run,
vdir,
icsfile,
org,
days_back,
days_forward,
continue_iteration,
download_attachments,
two_way_calendar,
)
last_sync_time = time.time()
console.print(create_status_display("Sync completed ✅", "green"))
else:
status_parts = [
f"Mail: Remote {remote_message_count}, Local {local_message_count}"
]
if two_way_calendar:
status_parts.append(f"Calendar: {calendar_change_desc}")
console.print(
create_status_display(
f"No changes detected ({', '.join(status_parts)})",
"green",
)
)
except Exception as e:
console.print(
create_status_display(f"Error during check: {str(e)}", "red")
)
time.sleep(check_interval)