From 1f306fffd742f2011ea6d11b56b7ab46f27e7b65 Mon Sep 17 00:00:00 2001 From: Tim Bendt Date: Tue, 15 Jul 2025 23:39:53 -0400 Subject: [PATCH] add vdir sync feature --- README.md | 13 + benchmark_list_update.py | 315 ----------------- drive_view_tui.py | 10 + src/cli/sync.py | 166 ++++++--- src/services/microsoft_graph/auth.py | 55 ++- src/services/microsoft_graph/calendar.py | 413 +++++++++++++++++++++- src/services/microsoft_graph/client.py | 122 ++++++- src/services/microsoft_graph/mail.py | 414 +++++++++++++++++++---- src/utils/mail_utils/maildir.py | 225 ++++++++---- 9 files changed, 1212 insertions(+), 521 deletions(-) delete mode 100755 benchmark_list_update.py diff --git a/README.md b/README.md index e69de29..6c92beb 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,13 @@ +# LŪK + +> Pronunced Look... + +A collection of tools for syncing microsoft Mail and Calendars with local file-system based PIM standards like Maildir, VDIR, etc so you can use local TUI and CLI tools to read write and manage your mail and events. + +## Features + +- Download Emails to a local maildir +- Download Events to a local VDIR +- two-way sync of locally changed files +- View OneDrive Folders and Files in your terminal +- a couple different ways to view email messages locally, but you should probably be using [aerc] diff --git a/benchmark_list_update.py b/benchmark_list_update.py deleted file mode 100755 index b9d5e14..0000000 --- a/benchmark_list_update.py +++ /dev/null @@ -1,315 +0,0 @@ -#!/usr/bin/env python3 -""" -Benchmark script to compare two approaches for updating envelopes list in maildir_gtd. -This script compares: -1. Using .pop() to remove items from ListView -2. Using refresh_list_view() to rebuild the entire ListView - -It tests with different numbers of envelopes (100, 1000, 2000) and measures: -- Time to remove a single item -- Time to remove multiple items in sequence -- Memory usage -""" - -import sys -import os -import time -import random -import gc -import tracemalloc -from datetime import datetime, timedelta, UTC -from typing import List, Dict, Any, Callable, Tuple -import json - -# Add parent directory to path so we can import modules correctly -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -# Import required classes and functions -from textual.widgets import ListView, ListItem, Label -from textual.app import App, ComposeResult -from textual.containers import Vertical - -# Import our application's modules -from maildir_gtd.app import MessageStore -from maildir_gtd.utils import group_envelopes_by_date - -# Mock class to simulate the ListView behavior -class MockListView: - def __init__(self): - self.items = [] - self.index = 0 - - def append(self, item): - self.items.append(item) - - def pop(self, idx=None): - if idx is None: - return self.items.pop() - return self.items.pop(idx) - - def clear(self): - self.items = [] - - def __len__(self): - return len(self.items) - -# Helper functions to generate test data -def generate_envelope(idx: int) -> Dict[str, Any]: - """Generate a synthetic envelope with predictable data.""" - now = datetime.now(UTC) - # Distribute dates over the last 60 days to create realistic grouping - date = now - timedelta(days=random.randint(0, 60), - hours=random.randint(0, 23), - minutes=random.randint(0, 59)) - - return { - "id": str(idx), - "subject": f"Test Subject {idx}", - "from": {"addr": f"sender{idx}@example.com"}, - "to": {"addr": f"recipient{idx}@example.com"}, - "date": date.strftime("%Y-%m-%d %H:%M"), - "cc": {}, - "type": "message" - } - -def generate_test_envelopes(count: int) -> List[Dict[str, Any]]: - """Generate a specified number of test envelopes.""" - return [generate_envelope(i) for i in range(1, count + 1)] - -# Benchmark functions -def benchmark_pop_approach(store: MessageStore, list_view: MockListView, indices_to_remove: List[int]) -> float: - """Benchmark the .pop() approach.""" - start_time = time.time() - - for idx in sorted(indices_to_remove, reverse=True): # Remove from highest to lowest to avoid index shifting issues - msg_id = int(store.envelopes[idx]["id"]) - store.remove(msg_id) - list_view.pop(idx) - - end_time = time.time() - return end_time - start_time - -def benchmark_refresh_approach(store: MessageStore, list_view: MockListView, indices_to_remove: List[int]) -> float: - """Benchmark the refresh_list_view approach.""" - start_time = time.time() - - for idx in indices_to_remove: - msg_id = int(store.envelopes[idx]["id"]) - store.remove(msg_id) - - # Simulate refresh_list_view by clearing and rebuilding the list - list_view.clear() - for item in store.envelopes: - if item and item.get("type") == "header": - list_view.append(f"Header: {item['label']}") - elif item: # Check if not None - list_view.append(f"Email: {item.get('subject', '')}") - - end_time = time.time() - return end_time - start_time - -def run_memory_benchmark(func, *args): - """Run a function with memory tracking.""" - tracemalloc.start() - result = func(*args) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - return result, current, peak - -def run_benchmark(envelope_count: int, num_operations: int = 10): - """Run benchmarks for a specific number of envelopes.""" - print(f"\n{'=' * 50}") - print(f"Running benchmark with {envelope_count} envelopes") - print(f"{'=' * 50}") - - # Generate test data - envelopes = generate_test_envelopes(envelope_count) - - # Set up for pop approach - pop_store = MessageStore() - pop_store.load(envelopes.copy()) - pop_list_view = MockListView() - - # Build initial list view - for item in pop_store.envelopes: - if item and item.get("type") == "header": - pop_list_view.append(f"Header: {item['label']}") - elif item: - pop_list_view.append(f"Email: {item.get('subject', '')}") - - # Set up for refresh approach - refresh_store = MessageStore() - refresh_store.load(envelopes.copy()) - refresh_list_view = MockListView() - - # Build initial list view - for item in refresh_store.envelopes: - if item and item.get("type") == "header": - refresh_list_view.append(f"Header: {item['label']}") - elif item: - refresh_list_view.append(f"Email: {item.get('subject', '')}") - - # Generate random indices to remove (ensure they're valid message indices, not headers) - valid_indices = [] - for idx, item in enumerate(pop_store.envelopes): - if item and item.get("type") != "header" and item is not None: - valid_indices.append(idx) - - if len(valid_indices) < num_operations: - num_operations = len(valid_indices) - print(f"Warning: Only {num_operations} valid messages available for removal") - - indices_to_remove = random.sample(valid_indices, num_operations) - - # Single operation benchmark - print("\n🔹 Single operation benchmark (removing 1 item):") - - # Pop approach - single operation - gc.collect() # Ensure clean state - single_pop_time, pop_current, pop_peak = run_memory_benchmark( - benchmark_pop_approach, pop_store, pop_list_view, [indices_to_remove[0]] - ) - print(f" Pop approach: {single_pop_time*1000:.2f} ms (Memory - Current: {pop_current/1024:.1f} KB, Peak: {pop_peak/1024:.1f} KB)") - - # Refresh approach - single operation - gc.collect() # Ensure clean state - single_refresh_time, refresh_current, refresh_peak = run_memory_benchmark( - benchmark_refresh_approach, refresh_store, refresh_list_view, [indices_to_remove[0]] - ) - print(f" Refresh approach: {single_refresh_time*1000:.2f} ms (Memory - Current: {refresh_current/1024:.1f} KB, Peak: {refresh_peak/1024:.1f} KB)") - - # Determine which is better for single operation - if single_pop_time < single_refresh_time: - print(f" 🥇 Pop is {single_refresh_time/single_pop_time:.1f}x faster for single operation") - else: - print(f" 🥇 Refresh is {single_pop_time/single_refresh_time:.1f}x faster for single operation") - - # Reset for multi-operation benchmark - gc.collect() - pop_store = MessageStore() - pop_store.load(envelopes.copy()) - pop_list_view = MockListView() - for item in pop_store.envelopes: - if item and item.get("type") == "header": - pop_list_view.append(f"Header: {item['label']}") - elif item: - pop_list_view.append(f"Email: {item.get('subject', '')}") - - refresh_store = MessageStore() - refresh_store.load(envelopes.copy()) - refresh_list_view = MockListView() - for item in refresh_store.envelopes: - if item and item.get("type") == "header": - refresh_list_view.append(f"Header: {item['label']}") - elif item: - refresh_list_view.append(f"Email: {item.get('subject', '')}") - - # Multiple operations benchmark - print(f"\n🔹 Multiple operations benchmark (removing {num_operations} items):") - - # Pop approach - multiple operations - gc.collect() - multi_pop_time, pop_current, pop_peak = run_memory_benchmark( - benchmark_pop_approach, pop_store, pop_list_view, indices_to_remove - ) - print(f" Pop approach: {multi_pop_time*1000:.2f} ms (Memory - Current: {pop_current/1024:.1f} KB, Peak: {pop_peak/1024:.1f} KB)") - - # Refresh approach - multiple operations - gc.collect() - multi_refresh_time, refresh_current, refresh_peak = run_memory_benchmark( - benchmark_refresh_approach, refresh_store, refresh_list_view, indices_to_remove - ) - print(f" Refresh approach: {multi_refresh_time*1000:.2f} ms (Memory - Current: {refresh_current/1024:.1f} KB, Peak: {refresh_peak/1024:.1f} KB)") - - # Determine which is better for multiple operations - if multi_pop_time < multi_refresh_time: - print(f" 🥇 Pop is {multi_refresh_time/multi_pop_time:.1f}x faster for multiple operations") - else: - print(f" 🥇 Refresh is {multi_pop_time/multi_refresh_time:.1f}x faster for multiple operations") - - return { - "envelope_count": envelope_count, - "num_operations": num_operations, - "single_operation": { - "pop_time_ms": single_pop_time * 1000, - "refresh_time_ms": single_refresh_time * 1000, - "pop_memory_kb": pop_peak / 1024, - "refresh_memory_kb": refresh_peak / 1024 - }, - "multiple_operations": { - "pop_time_ms": multi_pop_time * 1000, - "refresh_time_ms": multi_refresh_time * 1000, - "pop_memory_kb": pop_peak / 1024, - "refresh_memory_kb": refresh_peak / 1024 - } - } - -def main(): - print("\n📊 MAILDIR GTD LIST UPDATE BENCHMARK 📊") - print("Comparing .pop() vs refresh_list_view() approaches") - print("=" * 60) - - # Define test cases - envelope_counts = [100, 1000, 2000] - results = [] - - for count in envelope_counts: - result = run_benchmark(count) - results.append(result) - - # Print summary - print("\n" + "=" * 60) - print("📊 BENCHMARK SUMMARY") - print("=" * 60) - - # Console table formatting - print(f"{'Size':<10} | {'Single Op (pop)':<15} | {'Single Op (refresh)':<20} | {'Multi Op (pop)':<15} | {'Multi Op (refresh)':<20}") - print("-" * 90) - - for result in results: - count = result["envelope_count"] - single_pop = f"{result['single_operation']['pop_time_ms']:.2f} ms" - single_refresh = f"{result['single_operation']['refresh_time_ms']:.2f} ms" - multi_pop = f"{result['multiple_operations']['pop_time_ms']:.2f} ms" - multi_refresh = f"{result['multiple_operations']['refresh_time_ms']:.2f} ms" - - print(f"{count:<10} | {single_pop:<15} | {single_refresh:<20} | {multi_pop:<15} | {multi_refresh:<20}") - - # Display conclusions - print("\n🔍 CONCLUSIONS:") - for result in results: - count = result["envelope_count"] - single_ratio = result['single_operation']['refresh_time_ms'] / result['single_operation']['pop_time_ms'] - multi_ratio = result['multiple_operations']['refresh_time_ms'] / result['multiple_operations']['pop_time_ms'] - - print(f"\nFor {count} envelopes:") - - if single_ratio > 1: - print(f"- Single operation: .pop() is {single_ratio:.1f}x faster") - else: - print(f"- Single operation: refresh_list_view() is {1/single_ratio:.1f}x faster") - - if multi_ratio > 1: - print(f"- Multiple operations: .pop() is {multi_ratio:.1f}x faster") - else: - print(f"- Multiple operations: refresh_list_view() is {1/multi_ratio:.1f}x faster") - - print("\n🔑 RECOMMENDATION:") - # Calculate average performance difference across all tests - avg_single_ratio = sum(r['single_operation']['refresh_time_ms'] / r['single_operation']['pop_time_ms'] for r in results) / len(results) - avg_multi_ratio = sum(r['multiple_operations']['refresh_time_ms'] / r['multiple_operations']['pop_time_ms'] for r in results) / len(results) - - if avg_single_ratio > 1 and avg_multi_ratio > 1: - print("The .pop() approach is generally faster, but consider the following:") - print("- .pop() risks index misalignment issues with the message_store") - print("- refresh_list_view() ensures UI and data structure stay synchronized") - print("- The performance difference may not be noticeable to users") - print("👉 Recommendation: Use refresh_list_view() for reliability unless performance becomes a real issue") - else: - print("The refresh_list_view() approach is not only safer but also performs competitively:") - print("- It ensures perfect synchronization between UI and data model") - print("- It eliminates the risk of index misalignment") - print("👉 Recommendation: Use refresh_list_view() approach as it's more reliable and performs well") - -if __name__ == "__main__": - main() diff --git a/drive_view_tui.py b/drive_view_tui.py index fb56e53..f24fe68 100644 --- a/drive_view_tui.py +++ b/drive_view_tui.py @@ -1,11 +1,21 @@ import os import sys +import logging from datetime import datetime import msal import aiohttp +# Suppress debug logging from authentication and HTTP libraries +logging.getLogger("msal").setLevel(logging.ERROR) +logging.getLogger("urllib3").setLevel(logging.ERROR) +logging.getLogger("requests").setLevel(logging.ERROR) +logging.getLogger("requests_oauthlib").setLevel(logging.ERROR) +logging.getLogger("aiohttp").setLevel(logging.ERROR) +logging.getLogger("aiohttp.access").setLevel(logging.ERROR) +logging.getLogger("asyncio").setLevel(logging.ERROR) + from textual.app import App, ComposeResult from textual.binding import Binding diff --git a/src/cli/sync.py b/src/cli/sync.py index ca0b630..e139479 100644 --- a/src/cli/sync.py +++ b/src/cli/sync.py @@ -7,7 +7,10 @@ from datetime import datetime, timedelta from src.utils.mail_utils.helpers import ensure_directory_exists from src.utils.calendar_utils import save_events_to_vdir, save_events_to_file -from src.services.microsoft_graph.calendar import fetch_calendar_events +from src.services.microsoft_graph.calendar import ( + fetch_calendar_events, + sync_local_calendar_changes, +) from src.services.microsoft_graph.mail import ( fetch_mail_async, archive_mail_async, @@ -214,6 +217,7 @@ async def _sync_outlook_data( days_forward, continue_iteration, download_attachments, + two_way_calendar, ): """Synchronize data from external sources.""" @@ -243,14 +247,30 @@ async def _sync_outlook_data( with progress: task_fetch = progress.add_task("[green]Syncing Inbox...", total=0) task_calendar = progress.add_task("[cyan]Fetching calendar...", total=0) + task_local_calendar = progress.add_task( + "[magenta]Syncing local calendar...", total=0 + ) task_read = progress.add_task("[blue]Marking as read...", total=0) task_archive = progress.add_task("[yellow]Archiving mail...", total=0) task_delete = progress.add_task("[red]Deleting mail...", total=0) - # Stage 1: Synchronize local changes (read, archive, delete) to the server + # Stage 1: Synchronize local changes (read, archive, delete, calendar) to the server progress.console.print( "[bold cyan]Step 1: Syncing local changes to server...[/bold cyan]" ) + + # Handle calendar sync first (if vdir is specified and two-way sync is enabled) + calendar_sync_results = (0, 0) + if vdir and two_way_calendar: + org_vdir_path = os.path.join(os.path.expanduser(vdir), org) + progress.console.print( + f"[magenta]Checking for local calendar changes in {org_vdir_path}...[/magenta]" + ) + calendar_sync_results = await sync_local_calendar_changes( + headers, org_vdir_path, progress, task_local_calendar, dry_run + ) + + # Handle mail changes in parallel await asyncio.gather( synchronize_maildir_async( maildir_path, headers, progress, task_read, dry_run @@ -260,6 +280,17 @@ async def _sync_outlook_data( ) progress.console.print("[bold green]Step 1: Local changes synced.[/bold green]") + # Report calendar sync results + created, deleted = calendar_sync_results + if two_way_calendar and (created > 0 or deleted > 0): + progress.console.print( + f"[magenta]📅 Two-way calendar sync: {created} events created, {deleted} events deleted[/magenta]" + ) + elif two_way_calendar: + progress.console.print( + "[magenta]📅 Two-way calendar sync: No local changes detected[/magenta]" + ) + # Stage 2: Fetch new data from the server progress.console.print( "\n[bold cyan]Step 2: Fetching new data from server...[/bold cyan]" @@ -335,6 +366,12 @@ async def _sync_outlook_data( help="Download email attachments", default=False, ) +@click.option( + "--two-way-calendar", + is_flag=True, + help="Enable two-way calendar sync (sync local changes to server)", + default=False, +) @click.option( "--daemon", is_flag=True, @@ -350,6 +387,7 @@ def sync( days_forward, continue_iteration, download_attachments, + two_way_calendar, daemon, ): if daemon: @@ -363,6 +401,7 @@ def sync( days_forward, continue_iteration, download_attachments, + two_way_calendar, ) ) else: @@ -376,6 +415,7 @@ def sync( days_forward, continue_iteration, download_attachments, + two_way_calendar, ) ) @@ -389,20 +429,44 @@ async def daemon_mode( days_forward, continue_iteration, download_attachments, + two_way_calendar, ): """ Run the script in daemon mode, periodically syncing emails. """ from src.services.microsoft_graph.mail import get_inbox_count_async + from rich.console import Console + from rich.live import Live + from rich.panel import Panel + from rich.text import Text + from datetime import datetime import time + console = Console() sync_interval = 300 # 5 minutes check_interval = 10 # 10 seconds last_sync_time = time.time() - sync_interval # Force initial sync + def create_status_display(status_text, status_color="cyan"): + """Create a status panel for daemon mode.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + content = Text() + content.append(f"[{timestamp}] ", style="dim") + content.append(status_text, style=status_color) + + return Panel( + content, title="📧 Email Sync Daemon", border_style="blue", padding=(0, 1) + ) + + # Initial display + console.print(create_status_display("Starting daemon mode...", "green")) + while True: if time.time() - last_sync_time >= sync_interval: - click.echo("[green]Performing full sync...[/green]") + # Show full sync status + console.clear() + console.print(create_status_display("Performing full sync...", "green")) + # Perform a full sync await _sync_outlook_data( dry_run, @@ -413,45 +477,69 @@ async def daemon_mode( days_forward, continue_iteration, download_attachments, + two_way_calendar, ) last_sync_time = time.time() + + # Show completion + console.print(create_status_display("Full sync completed ✅", "green")) else: - # Perform a quick check - click.echo("[cyan]Checking for new messages...[/cyan]") - # Authenticate and get access token - scopes = ["https://graph.microsoft.com/Mail.Read"] - access_token, headers = get_access_token(scopes) - remote_message_count = await get_inbox_count_async(headers) - maildir_path = os.path.expanduser(f"~/Mail/{org}") - local_message_count = len( - [ - f - for f in os.listdir(os.path.join(maildir_path, "new")) - if ".eml" in f - ] - ) + len( - [ - f - for f in os.listdir(os.path.join(maildir_path, "cur")) - if ".eml" in f - ] - ) - if remote_message_count != local_message_count: - click.echo( - f"[yellow]New messages detected ({remote_message_count} / {local_message_count}), performing full sync...[/yellow]" + # Show checking status + console.clear() + console.print(create_status_display("Checking for new messages...", "cyan")) + + try: + # Authenticate and get access token + scopes = ["https://graph.microsoft.com/Mail.Read"] + access_token, headers = get_access_token(scopes) + remote_message_count = await get_inbox_count_async(headers) + maildir_path = os.path.expanduser(f"~/Mail/{org}") + + # Count local messages + new_dir = os.path.join(maildir_path, "new") + cur_dir = os.path.join(maildir_path, "cur") + local_message_count = 0 + + if os.path.exists(new_dir): + local_message_count += len( + [f for f in os.listdir(new_dir) if ".eml" in f] + ) + if os.path.exists(cur_dir): + local_message_count += len( + [f for f in os.listdir(cur_dir) if ".eml" in f] + ) + + if remote_message_count != local_message_count: + console.print( + create_status_display( + f"New messages detected! Remote: {remote_message_count}, Local: {local_message_count}. Starting sync...", + "yellow", + ) + ) + + await _sync_outlook_data( + dry_run, + vdir, + icsfile, + org, + days_back, + days_forward, + continue_iteration, + download_attachments, + two_way_calendar, + ) + last_sync_time = time.time() + console.print(create_status_display("Sync completed ✅", "green")) + else: + console.print( + create_status_display( + f"No new messages (Remote: {remote_message_count}, Local: {local_message_count})", + "green", + ) + ) + except Exception as e: + console.print( + create_status_display(f"Error during check: {str(e)}", "red") ) - await _sync_outlook_data( - dry_run, - vdir, - icsfile, - org, - days_back, - days_forward, - continue_iteration, - download_attachments, - ) - last_sync_time = time.time() - else: - click.echo("[green]No new messages detected.[/green]") time.sleep(check_interval) diff --git a/src/services/microsoft_graph/auth.py b/src/services/microsoft_graph/auth.py index 5afc8e3..98bee93 100644 --- a/src/services/microsoft_graph/auth.py +++ b/src/services/microsoft_graph/auth.py @@ -1,15 +1,30 @@ """ Authentication module for Microsoft Graph API. """ + import os import msal +import logging from rich import print from rich.panel import Panel +# Comprehensive logging suppression for authentication-related libraries +logging.getLogger("msal").setLevel(logging.ERROR) +logging.getLogger("urllib3").setLevel(logging.ERROR) +logging.getLogger("requests").setLevel(logging.ERROR) +logging.getLogger("requests_oauthlib").setLevel(logging.ERROR) +logging.getLogger("aiohttp").setLevel(logging.ERROR) +logging.getLogger("aiohttp.access").setLevel(logging.ERROR) +logging.getLogger("asyncio").setLevel(logging.ERROR) +logging.getLogger("azure").setLevel(logging.ERROR) +logging.getLogger("azure.core").setLevel(logging.ERROR) + + def ensure_directory_exists(path): if not os.path.exists(path): os.makedirs(path) + def get_access_token(scopes): """ Authenticate with Microsoft Graph API and obtain an access token. @@ -26,43 +41,57 @@ def get_access_token(scopes): Exception: If authentication fails. """ # Read Azure app credentials from environment variables - client_id = os.getenv('AZURE_CLIENT_ID') - tenant_id = os.getenv('AZURE_TENANT_ID') + client_id = os.getenv("AZURE_CLIENT_ID") + tenant_id = os.getenv("AZURE_TENANT_ID") if not client_id or not tenant_id: - raise ValueError("Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.") + raise ValueError( + "Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables." + ) # Token cache cache = msal.SerializableTokenCache() - cache_file = 'token_cache.bin' + cache_file = "token_cache.bin" if os.path.exists(cache_file): - cache.deserialize(open(cache_file, 'r').read()) + cache.deserialize(open(cache_file, "r").read()) # Authentication - authority = f'https://login.microsoftonline.com/{tenant_id}' - app = msal.PublicClientApplication(client_id, authority=authority, token_cache=cache) + authority = f"https://login.microsoftonline.com/{tenant_id}" + app = msal.PublicClientApplication( + client_id, authority=authority, token_cache=cache + ) accounts = app.get_accounts() if accounts: token_response = app.acquire_token_silent(scopes, account=accounts[0]) else: flow = app.initiate_device_flow(scopes=scopes) - if 'user_code' not in flow: + if "user_code" not in flow: raise Exception("Failed to create device flow") - print(Panel(flow['message'], border_style="magenta", padding=2, title="MSAL Login Flow Link")) + print( + Panel( + flow["message"], + border_style="magenta", + padding=2, + title="MSAL Login Flow Link", + ) + ) token_response = app.acquire_token_by_device_flow(flow) - if 'access_token' not in token_response: + if "access_token" not in token_response: raise Exception("Failed to acquire token") # Save token cache - with open(cache_file, 'w') as f: + with open(cache_file, "w") as f: f.write(cache.serialize()) - access_token = token_response['access_token'] - headers = {'Authorization': f'Bearer {access_token}', 'Prefer': 'outlook.body-content-type="text",IdType="ImmutableId"'} + access_token = token_response["access_token"] + headers = { + "Authorization": f"Bearer {access_token}", + "Prefer": 'outlook.body-content-type="text",IdType="ImmutableId"', + } return access_token, headers diff --git a/src/services/microsoft_graph/calendar.py b/src/services/microsoft_graph/calendar.py index 0925daa..1ff481b 100644 --- a/src/services/microsoft_graph/calendar.py +++ b/src/services/microsoft_graph/calendar.py @@ -3,9 +3,13 @@ Calendar operations for Microsoft Graph API. """ import os +import json +import re +import glob from datetime import datetime, timedelta +from dateutil import parser -from .client import fetch_with_aiohttp +from .client import fetch_with_aiohttp, post_with_aiohttp, delete_with_aiohttp async def fetch_calendar_events( @@ -40,7 +44,7 @@ async def fetch_calendar_events( calendar_url = ( f"https://graph.microsoft.com/v1.0/me/calendarView?" f"startDateTime={start_date_str}&endDateTime={end_date_str}&" - f"$select=id,subject,organizer,start,end,location,isAllDay,showAs,sensitivity&$count=true" + f"$select=id,subject,organizer,start,end,location,isAllDay,showAs,sensitivity,iCalUId,lastModifiedDateTime&$count=true" ) events = [] @@ -59,3 +63,408 @@ async def fetch_calendar_events( # Return events and total count total_count = response_data.get("@odata.count", len(events)) return events, total_count + + +def parse_ical_file(file_path): + """ + Parse a single iCalendar file and extract event data. + + Args: + file_path (str): Path to the .ics file + + Returns: + dict: Event data or None if parsing fails + """ + try: + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + event_data = {} + in_event = False + + for line in content.split("\n"): + line = line.strip() + + if line == "BEGIN:VEVENT": + in_event = True + continue + elif line == "END:VEVENT": + break + elif not in_event: + continue + + if ":" in line: + key, value = line.split(":", 1) + + # Handle special cases + if key == "UID": + event_data["uid"] = value + elif key == "SUMMARY": + event_data["subject"] = ( + value.replace("\\,", ",") + .replace("\\;", ";") + .replace("\\n", "\n") + ) + elif key.startswith("DTSTART"): + event_data["start"] = _parse_ical_datetime(key, value) + elif key.startswith("DTEND"): + event_data["end"] = _parse_ical_datetime(key, value) + elif key == "LOCATION": + event_data["location"] = value.replace("\\,", ",").replace( + "\\;", ";" + ) + elif key == "DESCRIPTION": + event_data["description"] = ( + value.replace("\\,", ",") + .replace("\\;", ";") + .replace("\\n", "\n") + ) + + # Get file modification time for tracking local changes + event_data["local_mtime"] = os.path.getmtime(file_path) + event_data["local_file"] = file_path + + return event_data if "uid" in event_data else None + + except Exception as e: + print(f"Error parsing {file_path}: {e}") + return None + + +def _parse_ical_datetime(key, value): + """Parse iCalendar datetime format.""" + try: + if "TZID=" in key: + # Extract timezone info if present + tz_part = ( + key.split("TZID=")[1].split(":")[0] + if ":" in key + else key.split("TZID=")[1] + ) + # For now, treat as naive datetime and let dateutil handle it + return parser.parse(value.replace("Z", "")) + elif value.endswith("Z"): + # UTC time + return parser.parse(value) + else: + # Naive datetime + return parser.parse(value.replace("Z", "")) + except Exception: + return None + + +def get_local_calendar_events(vdir_path): + """ + Get all local calendar events from vdir format. + + Args: + vdir_path (str): Path to vdir calendar directory + + Returns: + dict: Dictionary mapping UIDs to event data + """ + local_events = {} + + if not os.path.exists(vdir_path): + return local_events + + ics_files = glob.glob(os.path.join(vdir_path, "*.ics")) + + for file_path in ics_files: + event_data = parse_ical_file(file_path) + if event_data and "uid" in event_data: + local_events[event_data["uid"]] = event_data + + return local_events + + +async def create_calendar_event(headers, event_data): + """ + Create a new calendar event on Microsoft Graph. + + Args: + headers (dict): Authentication headers + event_data (dict): Event data from local file + + Returns: + dict: Created event response or None if failed + """ + try: + # Convert local event data to Microsoft Graph format + graph_event = { + "subject": event_data.get("subject", "Untitled Event"), + "start": {"dateTime": event_data["start"].isoformat(), "timeZone": "UTC"}, + "end": {"dateTime": event_data["end"].isoformat(), "timeZone": "UTC"}, + } + + if event_data.get("location"): + graph_event["location"] = {"displayName": event_data["location"]} + + if event_data.get("description"): + graph_event["body"] = { + "contentType": "text", + "content": event_data["description"], + } + + # Create the event + create_url = "https://graph.microsoft.com/v1.0/me/events" + status = await post_with_aiohttp(create_url, headers, graph_event) + + if status == 201: + return graph_event + else: + print(f"Failed to create event: HTTP {status}") + return None + + except Exception as e: + print(f"Error creating event: {e}") + return None + + +async def delete_calendar_event_by_uid(headers, ical_uid): + """ + Delete a calendar event by its iCalUId. + + Args: + headers (dict): Authentication headers + ical_uid (str): The iCalUId of the event to delete + + Returns: + bool: True if deleted successfully, False otherwise + """ + try: + # First, find the event by iCalUId + search_url = f"https://graph.microsoft.com/v1.0/me/events?$filter=iCalUId eq '{ical_uid}'" + response = await fetch_with_aiohttp(search_url, headers) + + events = response.get("value", []) + if not events: + print(f"Event with UID {ical_uid} not found on server") + return False + + # Delete the event using its Graph ID + event_id = events[0]["id"] + delete_url = f"https://graph.microsoft.com/v1.0/me/events/{event_id}" + status = await delete_with_aiohttp(delete_url, headers) + + if status == 204: + print(f"Successfully deleted event with UID {ical_uid}") + return True + else: + print(f"Failed to delete event: HTTP {status}") + return False + + except Exception as e: + print(f"Error deleting event: {e}") + return False + + +def get_sync_timestamp_file(vdir_path): + """Get the path to the sync timestamp file.""" + return os.path.join(vdir_path, ".sync_timestamp") + + +def get_last_sync_time(vdir_path): + """ + Get the timestamp of the last sync. + + Args: + vdir_path (str): Path to vdir calendar directory + + Returns: + float: Unix timestamp of last sync, or 0 if never synced + """ + timestamp_file = get_sync_timestamp_file(vdir_path) + if os.path.exists(timestamp_file): + try: + with open(timestamp_file, "r") as f: + return float(f.read().strip()) + except (ValueError, IOError): + return 0 + return 0 + + +def update_sync_timestamp(vdir_path): + """ + Update the sync timestamp to current time. + + Args: + vdir_path (str): Path to vdir calendar directory + """ + timestamp_file = get_sync_timestamp_file(vdir_path) + try: + with open(timestamp_file, "w") as f: + f.write(str(datetime.now().timestamp())) + except IOError as e: + print(f"Warning: Could not update sync timestamp: {e}") + + +def detect_deleted_events(vdir_path): + """ + Detect events that have been deleted from vdir since last sync. + Uses sync state and file modification times to determine deletions. + + Args: + vdir_path (str): Path to vdir calendar directory + + Returns: + list: List of UIDs that were deleted locally + """ + if not os.path.exists(vdir_path): + return [] + + state_file = os.path.join(vdir_path, ".sync_state.json") + last_sync_time = get_last_sync_time(vdir_path) + + # Load previous sync state + previous_state = {} + if os.path.exists(state_file): + try: + with open(state_file, "r") as f: + previous_state = json.load(f) + except Exception: + return [] + + if not previous_state: + return [] # No previous state to compare against + + # Get current local events + current_local_events = get_local_calendar_events(vdir_path) + + deleted_events = [] + + # Check each event from previous state + for uid in previous_state: + if uid not in current_local_events: + # Event is no longer in local files + # Check if the vdir has been modified since last sync + # This ensures we only delete events that were intentionally removed + vdir_mtime = os.path.getmtime(vdir_path) + if vdir_mtime > last_sync_time: + deleted_events.append(uid) + + return deleted_events + + +async def sync_local_calendar_changes( + headers, vdir_path, progress, task_id, dry_run=False +): + """ + Sync local calendar changes (new events and deletions) to Microsoft Graph. + + Args: + headers (dict): Authentication headers + vdir_path (str): Path to local vdir calendar directory + progress: Progress instance for updates + task_id: Progress task ID + dry_run (bool): If True, only report what would be done + + Returns: + tuple: (created_count, deleted_count) + """ + if not os.path.exists(vdir_path): + progress.console.print( + f"[yellow]Local calendar directory not found: {vdir_path}[/yellow]" + ) + return 0, 0 + + # Track state file for knowing what was previously synced + state_file = os.path.join(vdir_path, ".sync_state.json") + + # Load previous sync state + previous_state = {} + if os.path.exists(state_file): + try: + with open(state_file, "r") as f: + previous_state = json.load(f) + except Exception as e: + progress.console.print(f"[yellow]Could not load sync state: {e}[/yellow]") + + # Detect deleted events using enhanced detection + deleted_events = detect_deleted_events(vdir_path) + + # Get current local events + current_local_events = get_local_calendar_events(vdir_path) + + # Get current remote events to avoid duplicates + try: + remote_events, _ = await fetch_calendar_events( + headers, days_back=30, days_forward=90 + ) + remote_uids = { + event.get("iCalUId", event.get("id", "")) for event in remote_events + } + except Exception as e: + progress.console.print(f"[red]Error fetching remote events: {e}[/red]") + return 0, 0 + + created_count = 0 + deleted_count = 0 + + # Find new local events (not in previous state and not on server) + new_local_events = [] + for uid, event_data in current_local_events.items(): + if uid not in previous_state and uid not in remote_uids: + # This is a new local event + new_local_events.append((uid, event_data)) + + progress.update(task_id, total=len(new_local_events) + len(deleted_events)) + + # Handle deletions FIRST to clean up server before adding new events + for uid in deleted_events: + if dry_run: + progress.console.print(f"[DRY-RUN] Would delete event with UID: {uid}") + else: + result = await delete_calendar_event_by_uid(headers, uid) + if result: + deleted_count += 1 + progress.console.print(f"[green]Deleted event with UID: {uid}[/green]") + else: + progress.console.print( + f"[red]Failed to delete event with UID: {uid}[/red]" + ) + + progress.advance(task_id) + + # Create new events on server + for uid, event_data in new_local_events: + if dry_run: + progress.console.print( + f"[DRY-RUN] Would create event: {event_data.get('subject', 'Untitled')}" + ) + else: + result = await create_calendar_event(headers, event_data) + if result: + created_count += 1 + progress.console.print( + f"[green]Created event: {event_data.get('subject', 'Untitled')}[/green]" + ) + else: + progress.console.print( + f"[red]Failed to create event: {event_data.get('subject', 'Untitled')}[/red]" + ) + + progress.advance(task_id) + + # Update sync state and timestamp + if not dry_run: + new_state = { + uid: event_data.get("local_mtime", 0) + for uid, event_data in current_local_events.items() + } + try: + with open(state_file, "w") as f: + json.dump(new_state, f, indent=2) + + # Update sync timestamp to mark when this sync completed + update_sync_timestamp(vdir_path) + + except Exception as e: + progress.console.print(f"[yellow]Could not save sync state: {e}[/yellow]") + + if created_count > 0 or deleted_count > 0: + progress.console.print( + f"[cyan]Local calendar sync completed: {created_count} created, {deleted_count} deleted[/cyan]" + ) + + return created_count, deleted_count diff --git a/src/services/microsoft_graph/client.py b/src/services/microsoft_graph/client.py index ba35123..4431056 100644 --- a/src/services/microsoft_graph/client.py +++ b/src/services/microsoft_graph/client.py @@ -1,16 +1,41 @@ """ HTTP client for Microsoft Graph API. """ + import aiohttp import asyncio +import logging import orjson -# Define a global semaphore for throttling -semaphore = asyncio.Semaphore(4) +# Suppress debug logging from HTTP libraries +logging.getLogger("aiohttp").setLevel(logging.ERROR) +logging.getLogger("aiohttp.access").setLevel(logging.ERROR) +logging.getLogger("urllib3").setLevel(logging.ERROR) +logging.getLogger("asyncio").setLevel(logging.ERROR) + +# Define a global semaphore for throttling - reduced for better compliance +semaphore = asyncio.Semaphore(2) + + +async def _handle_throttling_retry(func, *args, max_retries=3): + """Handle 429 throttling with exponential backoff retry.""" + for attempt in range(max_retries): + try: + return await func(*args) + except Exception as e: + if "429" in str(e) and attempt < max_retries - 1: + wait_time = (2**attempt) + 1 # Exponential backoff: 2, 5, 9 seconds + print( + f"Rate limited, waiting {wait_time}s before retry {attempt + 1}/{max_retries}" + ) + await asyncio.sleep(wait_time) + continue + raise e + async def fetch_with_aiohttp(url, headers): """ - Fetch data from Microsoft Graph API. + Fetch data from Microsoft Graph API with throttling and retry logic. Args: url (str): The URL to fetch data from. @@ -20,23 +45,36 @@ async def fetch_with_aiohttp(url, headers): dict: JSON response data. Raises: - Exception: If the request fails. + Exception: If the request fails after retries. """ + return await _handle_throttling_retry(_fetch_impl, url, headers) + + +async def _fetch_impl(url, headers): + """Internal fetch implementation.""" async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: - if response.status != 200: - raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}") + if response.status == 429: + # Let the retry handler deal with throttling + raise Exception( + f"Failed to fetch {url}: {response.status} {await response.text()}" + ) + elif response.status != 200: + raise Exception( + f"Failed to fetch {url}: {response.status} {await response.text()}" + ) raw_bytes = await response.read() - content_length = response.headers.get('Content-Length') + content_length = response.headers.get("Content-Length") if content_length and len(raw_bytes) != int(content_length): print("Warning: Incomplete response received!") return None return orjson.loads(raw_bytes) + async def post_with_aiohttp(url, headers, json_data): """ - Post data to Microsoft Graph API. + Post data to Microsoft Graph API with throttling and retry logic. Args: url (str): The URL to post data to. @@ -46,14 +84,24 @@ async def post_with_aiohttp(url, headers, json_data): Returns: int: HTTP status code. """ + return await _handle_throttling_retry(_post_impl, url, headers, json_data) + + +async def _post_impl(url, headers, json_data): + """Internal post implementation.""" async with semaphore: async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=json_data) as response: + if response.status == 429: + raise Exception( + f"Failed to post {url}: {response.status} {await response.text()}" + ) return response.status + async def patch_with_aiohttp(url, headers, json_data): """ - Patch data to Microsoft Graph API. + Patch data to Microsoft Graph API with throttling and retry logic. Args: url (str): The URL to patch data to. @@ -63,14 +111,24 @@ async def patch_with_aiohttp(url, headers, json_data): Returns: int: HTTP status code. """ + return await _handle_throttling_retry(_patch_impl, url, headers, json_data) + + +async def _patch_impl(url, headers, json_data): + """Internal patch implementation.""" async with semaphore: async with aiohttp.ClientSession() as session: async with session.patch(url, headers=headers, json=json_data) as response: + if response.status == 429: + raise Exception( + f"Failed to patch {url}: {response.status} {await response.text()}" + ) return response.status + async def delete_with_aiohttp(url, headers): """ - Delete data from Microsoft Graph API. + Delete data from Microsoft Graph API with throttling and retry logic. Args: url (str): The URL to delete data from. @@ -79,7 +137,51 @@ async def delete_with_aiohttp(url, headers): Returns: int: HTTP status code. """ + return await _handle_throttling_retry(_delete_impl, url, headers) + + +async def _delete_impl(url, headers): + """Internal delete implementation.""" async with semaphore: async with aiohttp.ClientSession() as session: async with session.delete(url, headers=headers) as response: + if response.status == 429: + raise Exception( + f"Failed to delete {url}: {response.status} {await response.text()}" + ) return response.status + + +async def batch_with_aiohttp(requests, headers): + """ + Execute multiple requests in a single batch call to Microsoft Graph API with throttling and retry logic. + + Args: + requests (list): List of request dictionaries with 'id', 'method', 'url', and optional 'body' keys. + headers (dict): Headers including authentication. + + Returns: + dict: Batch response with individual request responses. + """ + return await _handle_throttling_retry(_batch_impl, requests, headers) + + +async def _batch_impl(requests, headers): + """Internal batch implementation.""" + batch_url = "https://graph.microsoft.com/v1.0/$batch" + batch_data = {"requests": requests} + + async with semaphore: + async with aiohttp.ClientSession() as session: + async with session.post( + batch_url, headers=headers, json=batch_data + ) as response: + if response.status == 429: + raise Exception( + f"Batch request failed: {response.status} {await response.text()}" + ) + elif response.status != 200: + raise Exception( + f"Batch request failed: {response.status} {await response.text()}" + ) + return await response.json() diff --git a/src/services/microsoft_graph/mail.py b/src/services/microsoft_graph/mail.py index c2a5f75..bc14ff2 100644 --- a/src/services/microsoft_graph/mail.py +++ b/src/services/microsoft_graph/mail.py @@ -5,6 +5,7 @@ Mail operations for Microsoft Graph API. import os import re import glob +import asyncio from typing import Set import aiohttp @@ -13,6 +14,7 @@ from .client import ( patch_with_aiohttp, post_with_aiohttp, delete_with_aiohttp, + batch_with_aiohttp, ) @@ -73,9 +75,18 @@ async def fetch_mail_async( new_files = set(glob.glob(os.path.join(new_dir, "*.eml*"))) cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*"))) - for filename in Set.union(cur_files, new_files): - message_id = filename.split(".")[0].split("/")[ - -1 + # Get local message IDs (filename without extension) + local_msg_ids = set() + for filename in set.union(cur_files, new_files): + message_id = os.path.basename(filename).split(".")[ + 0 + ] # Extract the Message-ID from the filename + local_msg_ids.add(message_id) + + # Delete local files that no longer exist on server + for filename in set.union(cur_files, new_files): + message_id = os.path.basename(filename).split(".")[ + 0 ] # Extract the Message-ID from the filename if message_id not in inbox_msg_ids: if not dry_run: @@ -84,7 +95,18 @@ async def fetch_mail_async( else: progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox") - for message in messages: + # Filter messages to only include those not already local + messages_to_download = [msg for msg in messages if msg["id"] not in local_msg_ids] + + progress.console.print( + f"Found {len(messages)} total messages on server, {len(local_msg_ids)} already local" + ) + progress.console.print(f"Downloading {len(messages_to_download)} new messages") + + # Update progress to reflect only the messages we actually need to download + progress.update(task_id, total=len(messages_to_download), completed=0) + + for message in messages_to_download: progress.console.print( f"Processing message: {message.get('subject', 'No Subject')}", end="\r" ) @@ -97,14 +119,19 @@ async def fetch_mail_async( dry_run, download_attachments, ) - progress.update(task_id, advance=0.5) - progress.update(task_id, completed=len(messages)) - progress.console.print(f"\nFinished saving {len(messages)} messages.") + progress.update(task_id, advance=1) + progress.update(task_id, completed=len(messages_to_download)) + progress.console.print( + f"\nFinished downloading {len(messages_to_download)} new messages." + ) + progress.console.print( + f"Total messages on server: {len(messages)}, Already local: {len(local_msg_ids)}" + ) async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=False): """ - Archive mail from Maildir to Microsoft Graph API archive folder. + Archive mail from Maildir to Microsoft Graph API archive folder using batch operations. Args: maildir_path (str): Path to the Maildir. @@ -125,8 +152,14 @@ async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=F glob.glob(os.path.join(archive_dir, "**", "*.eml*"), recursive=True) ) + if not archive_files: + progress.update(task_id, total=0, completed=0) + progress.console.print("No messages to archive") + return + progress.update(task_id, total=len(archive_files)) + # Get archive folder ID from server folder_response = await fetch_with_aiohttp( "https://graph.microsoft.com/v1.0/me/mailFolders", headers ) @@ -143,44 +176,115 @@ async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=F if not archive_folder_id: raise Exception("No folder named 'Archive' or 'Archives' found on the server.") - for filepath in archive_files: - message_id = os.path.basename(filepath).split(".")[ - 0 - ] # Extract the Message-ID from the filename + # Process files in batches of 20 (Microsoft Graph batch limit) + batch_size = 20 + successful_moves = [] + + for i in range(0, len(archive_files), batch_size): + batch_files = archive_files[i : i + batch_size] + + # Add small delay between batches to respect API limits + if i > 0: + await asyncio.sleep(0.5) if not dry_run: - status = await post_with_aiohttp( - f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move", - headers, - {"destinationId": archive_folder_id}, - ) - if status == 201: # 201 Created indicates successful move - os.remove( - filepath - ) # Remove the local file since it's now archived on server - progress.console.print(f"Moved message to 'Archive': {message_id}") - elif status == 404: - os.remove( - filepath - ) # Remove the file from local archive if not found on server - progress.console.print( - f"Message not found on server, removed local copy: {message_id}" - ) - else: - progress.console.print( - f"Failed to move message to 'Archive': {message_id}, status: {status}" + # Prepare batch requests + batch_requests = [] + for idx, filepath in enumerate(batch_files): + message_id = os.path.basename(filepath).split(".")[0] + batch_requests.append( + { + "id": str(idx + 1), + "method": "POST", + "url": f"/me/messages/{message_id}/microsoft.graph.move", + "body": {"destinationId": archive_folder_id}, + "headers": {"Content-Type": "application/json"}, + } ) + + try: + # Execute batch request + batch_response = await batch_with_aiohttp(batch_requests, headers) + + # Process batch results + for response in batch_response.get("responses", []): + request_id = ( + int(response["id"]) - 1 + ) # Convert back to 0-based index + filepath = batch_files[request_id] + message_id = os.path.basename(filepath).split(".")[0] + status = response["status"] + + if status == 201: # 201 Created indicates successful move + os.remove( + filepath + ) # Remove the local file since it's now archived on server + successful_moves.append(message_id) + progress.console.print( + f"Moved message to 'Archive': {message_id}" + ) + elif status == 404: + os.remove( + filepath + ) # Remove the file from local archive if not found on server + progress.console.print( + f"Message not found on server, removed local copy: {message_id}" + ) + else: + progress.console.print( + f"Failed to move message to 'Archive': {message_id}, status: {status}" + ) + + except Exception as e: + progress.console.print(f"Batch archive request failed: {str(e)}") + # Fall back to individual requests for this batch + for filepath in batch_files: + message_id = os.path.basename(filepath).split(".")[0] + try: + status = await post_with_aiohttp( + f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move", + headers, + {"destinationId": archive_folder_id}, + ) + if status == 201: + os.remove(filepath) + successful_moves.append(message_id) + progress.console.print( + f"Moved message to 'Archive' (fallback): {message_id}" + ) + elif status == 404: + os.remove(filepath) + progress.console.print( + f"Message not found on server, removed local copy: {message_id}" + ) + else: + progress.console.print( + f"Failed to move message to 'Archive': {message_id}, status: {status}" + ) + except Exception as individual_error: + progress.console.print( + f"Failed to archive {message_id}: {str(individual_error)}" + ) else: - progress.console.print( - f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}" - ) - progress.advance(task_id) + # Dry run - just log what would be done + for filepath in batch_files: + message_id = os.path.basename(filepath).split(".")[0] + progress.console.print( + f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}" + ) + + progress.advance(task_id, len(batch_files)) + + if not dry_run: + progress.console.print( + f"Successfully archived {len(successful_moves)} messages in batches" + ) return async def delete_mail_async(maildir_path, headers, progress, task_id, dry_run=False): """ - Delete mail from Maildir and Microsoft Graph API. + Delete mail from Maildir and Microsoft Graph API using batch operations. Args: maildir_path (str): Path to the Maildir. @@ -194,22 +298,99 @@ async def delete_mail_async(maildir_path, headers, progress, task_id, dry_run=Fa """ trash_dir = os.path.join(maildir_path, ".Trash", "cur") trash_files = set(glob.glob(os.path.join(trash_dir, "*.eml*"))) + + if not trash_files: + progress.update(task_id, total=0, completed=0) + progress.console.print("No messages to delete") + return + progress.update(task_id, total=len(trash_files)) - for filepath in trash_files: - message_id = os.path.basename(filepath).split(".")[ - 0 - ] # Extract the Message-ID from the filename + # Process files in batches of 20 (Microsoft Graph batch limit) + batch_size = 20 + trash_files_list = list(trash_files) + successful_deletes = [] + + for i in range(0, len(trash_files_list), batch_size): + batch_files = trash_files_list[i : i + batch_size] + + # Add small delay between batches to respect API limits + if i > 0: + await asyncio.sleep(0.5) + if not dry_run: - progress.console.print(f"Moving message to trash: {message_id}") - status = await delete_with_aiohttp( - f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers - ) - if status == 204 or status == 404: - os.remove(filepath) # Remove the file from local trash + # Prepare batch requests + batch_requests = [] + for idx, filepath in enumerate(batch_files): + message_id = os.path.basename(filepath).split(".")[0] + batch_requests.append( + { + "id": str(idx + 1), + "method": "DELETE", + "url": f"/me/messages/{message_id}", + } + ) + + try: + # Execute batch request + batch_response = await batch_with_aiohttp(batch_requests, headers) + + # Process batch results + for response in batch_response.get("responses", []): + request_id = ( + int(response["id"]) - 1 + ) # Convert back to 0-based index + filepath = batch_files[request_id] + message_id = os.path.basename(filepath).split(".")[0] + status = response["status"] + + if ( + status == 204 or status == 404 + ): # 204 No Content or 404 Not Found (already deleted) + os.remove(filepath) # Remove the file from local trash + successful_deletes.append(message_id) + progress.console.print(f"Deleted message: {message_id}") + else: + progress.console.print( + f"Failed to delete message: {message_id}, status: {status}" + ) + + except Exception as e: + progress.console.print(f"Batch delete request failed: {str(e)}") + # Fall back to individual requests for this batch + for filepath in batch_files: + message_id = os.path.basename(filepath).split(".")[0] + try: + status = await delete_with_aiohttp( + f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", + headers, + ) + if status == 204 or status == 404: + os.remove(filepath) + successful_deletes.append(message_id) + progress.console.print( + f"Deleted message (fallback): {message_id}" + ) + else: + progress.console.print( + f"Failed to delete message: {message_id}, status: {status}" + ) + except Exception as individual_error: + progress.console.print( + f"Failed to delete {message_id}: {str(individual_error)}" + ) else: - progress.console.print(f"[DRY-RUN] Would delete message: {message_id}") - progress.advance(task_id) + # Dry run - just log what would be done + for filepath in batch_files: + message_id = os.path.basename(filepath).split(".")[0] + progress.console.print(f"[DRY-RUN] Would delete message: {message_id}") + + progress.advance(task_id, len(batch_files)) + + if not dry_run: + progress.console.print( + f"Successfully deleted {len(successful_deletes)} messages in batches" + ) async def get_inbox_count_async(headers): @@ -231,7 +412,7 @@ async def synchronize_maildir_async( maildir_path, headers, progress, task_id, dry_run=False ): """ - Synchronize Maildir with Microsoft Graph API. + Synchronize Maildir with Microsoft Graph API using batch operations. Args: maildir_path (str): Path to the Maildir. @@ -258,32 +439,123 @@ async def synchronize_maildir_async( cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*"))) moved_to_cur = [os.path.basename(f) for f in cur_files - new_files] - progress.update(task_id, total=len(moved_to_cur)) - for filename in moved_to_cur: - # TODO: this isn't scalable, we should use a more efficient way to check if the file was modified - if os.path.getmtime(os.path.join(cur_dir, filename)) < last_sync: - progress.update(task_id, advance=1) - continue - message_id = re.sub( - r"\:2.+", "", filename.split(".")[0] - ) # Extract the Message-ID from the filename - if not dry_run: - status = await patch_with_aiohttp( - f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", - headers, - {"isRead": True}, - ) - if status == 404: - os.remove(os.path.join(cur_dir, filename)) + # Filter out files that haven't been modified since last sync + files_to_process = [] + for filename in moved_to_cur: + if os.path.getmtime(os.path.join(cur_dir, filename)) >= last_sync: + files_to_process.append(filename) + + if not files_to_process: + progress.update(task_id, total=0, completed=0) + progress.console.print("No messages to mark as read") + # Save timestamp even if no work was done + if not dry_run: + save_sync_timestamp() + return + + progress.update(task_id, total=len(files_to_process)) + + # Process files in batches of 20 (Microsoft Graph batch limit) + batch_size = 20 + successful_reads = [] + + for i in range(0, len(files_to_process), batch_size): + batch_files = files_to_process[i : i + batch_size] + + # Add small delay between batches to respect API limits + if i > 0: + await asyncio.sleep(0.5) + + if not dry_run: + # Prepare batch requests + batch_requests = [] + for idx, filename in enumerate(batch_files): + message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) + batch_requests.append( + { + "id": str(idx + 1), + "method": "PATCH", + "url": f"/me/messages/{message_id}", + "body": {"isRead": True}, + "headers": {"Content-Type": "application/json"}, + } + ) + + try: + # Execute batch request + batch_response = await batch_with_aiohttp(batch_requests, headers) + + # Process batch results + for response in batch_response.get("responses", []): + request_id = ( + int(response["id"]) - 1 + ) # Convert back to 0-based index + filename = batch_files[request_id] + message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) + status = response["status"] + + if status == 200: # 200 OK indicates successful update + successful_reads.append(message_id) + progress.console.print( + f"Marked message as read: {truncate_id(message_id)}" + ) + elif status == 404: + os.remove( + os.path.join(cur_dir, filename) + ) # Remove file if message doesn't exist on server + progress.console.print( + f"Message not found on server, removed local copy: {truncate_id(message_id)}" + ) + else: + progress.console.print( + f"Failed to mark message as read: {truncate_id(message_id)}, status: {status}" + ) + + except Exception as e: + progress.console.print(f"Batch read-status request failed: {str(e)}") + # Fall back to individual requests for this batch + for filename in batch_files: + message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) + try: + status = await patch_with_aiohttp( + f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", + headers, + {"isRead": True}, + ) + if status == 200: + successful_reads.append(message_id) + progress.console.print( + f"Marked message as read (fallback): {truncate_id(message_id)}" + ) + elif status == 404: + os.remove(os.path.join(cur_dir, filename)) + progress.console.print( + f"Message not found on server, removed local copy: {truncate_id(message_id)}" + ) + else: + progress.console.print( + f"Failed to mark message as read: {truncate_id(message_id)}, status: {status}" + ) + except Exception as individual_error: + progress.console.print( + f"Failed to update read status for {truncate_id(message_id)}: {str(individual_error)}" + ) else: - progress.console.print( - f"[DRY-RUN] Would mark message as read: {truncate_id(message_id)}" - ) - progress.advance(task_id) + # Dry run - just log what would be done + for filename in batch_files: + message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) + progress.console.print( + f"[DRY-RUN] Would mark message as read: {truncate_id(message_id)}" + ) + + progress.advance(task_id, len(batch_files)) # Save the current sync timestamp if not dry_run: save_sync_timestamp() + progress.console.print( + f"Successfully marked {len(successful_reads)} messages as read in batches" + ) else: progress.console.print("[DRY-RUN] Would save sync timestamp.") diff --git a/src/utils/mail_utils/maildir.py b/src/utils/mail_utils/maildir.py index ce86142..a846a1e 100644 --- a/src/utils/mail_utils/maildir.py +++ b/src/utils/mail_utils/maildir.py @@ -1,6 +1,7 @@ """ Maildir operations for handling local mail storage. """ + import os import email import base64 @@ -11,11 +12,30 @@ from email import encoders import time import aiohttp import re +import logging + +# Suppress HTTP library debug logging +logging.getLogger("aiohttp").setLevel(logging.ERROR) +logging.getLogger("aiohttp.access").setLevel(logging.ERROR) from src.utils.calendar_utils import truncate_id -from src.utils.mail_utils.helpers import safe_filename, ensure_directory_exists, format_datetime, format_mime_date +from src.utils.mail_utils.helpers import ( + safe_filename, + ensure_directory_exists, + format_datetime, + format_mime_date, +) -async def save_mime_to_maildir_async(maildir_path, message, attachments_dir, headers, progress, dry_run=False, download_attachments=False): + +async def save_mime_to_maildir_async( + maildir_path, + message, + attachments_dir, + headers, + progress, + dry_run=False, + download_attachments=False, +): """ Save a message from Microsoft Graph API to a Maildir. @@ -31,30 +51,39 @@ async def save_mime_to_maildir_async(maildir_path, message, attachments_dir, hea Returns: None """ - message_id = message.get('id', '') + message_id = message.get("id", "") # Determine target directory based on read status - target_dir = os.path.join(maildir_path, 'cur' if message.get('isRead', False) else 'new') + target_dir = os.path.join( + maildir_path, "cur" if message.get("isRead", False) else "new" + ) ensure_directory_exists(target_dir) # Check if the file already exists in either new or cur - new_path = os.path.join(maildir_path, 'new', f"{message_id}.eml") - cur_path = os.path.join(maildir_path, 'cur', f"{message_id}.eml") + new_path = os.path.join(maildir_path, "new", f"{message_id}.eml") + cur_path = os.path.join(maildir_path, "cur", f"{message_id}.eml") if os.path.exists(new_path) or os.path.exists(cur_path): return # Skip if already exists # Create MIME email - mime_msg = await create_mime_message_async(message, headers, attachments_dir, progress, download_attachments) + mime_msg = await create_mime_message_async( + message, headers, attachments_dir, progress, download_attachments + ) # Only save file if not in dry run mode if not dry_run: - with open(os.path.join(target_dir, f"{message_id}.eml"), 'wb') as f: + with open(os.path.join(target_dir, f"{message_id}.eml"), "wb") as f: f.write(mime_msg.as_bytes()) else: - progress.console.print(f"[DRY-RUN] Would save message: {message.get('subject', 'No Subject')}") + progress.console.print( + f"[DRY-RUN] Would save message: {message.get('subject', 'No Subject')}" + ) -async def create_mime_message_async(message, headers, attachments_dir, progress, download_attachments=False): + +async def create_mime_message_async( + message, headers, attachments_dir, progress, download_attachments=False +): """ Create a MIME message from Microsoft Graph API message data. @@ -72,33 +101,41 @@ async def create_mime_message_async(message, headers, attachments_dir, progress, mime_msg = MIMEMultipart() # Message headers - mime_msg['Message-ID'] = message.get('id', '') - mime_msg['Subject'] = message.get('subject', 'No Subject') + mime_msg["Message-ID"] = message.get("id", "") + mime_msg["Subject"] = message.get("subject", "No Subject") # Sender information - sender = message.get('from', {}).get('emailAddress', {}) + sender = message.get("from", {}).get("emailAddress", {}) if sender: - mime_msg['From'] = f"{sender.get('name', '')} <{sender.get('address', '')}>".strip() + mime_msg["From"] = ( + f"{sender.get('name', '')} <{sender.get('address', '')}>".strip() + ) # Recipients - to_recipients = message.get('toRecipients', []) - cc_recipients = message.get('ccRecipients', []) + to_recipients = message.get("toRecipients", []) + cc_recipients = message.get("ccRecipients", []) if to_recipients: - to_list = [f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() for r in to_recipients] - mime_msg['To'] = ', '.join(to_list) + to_list = [ + f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() + for r in to_recipients + ] + mime_msg["To"] = ", ".join(to_list) if cc_recipients: - cc_list = [f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() for r in cc_recipients] - mime_msg['Cc'] = ', '.join(cc_list) + cc_list = [ + f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() + for r in cc_recipients + ] + mime_msg["Cc"] = ", ".join(cc_list) # Date - using the new format_mime_date function to ensure RFC 5322 compliance - received_datetime = message.get('receivedDateTime', '') + received_datetime = message.get("receivedDateTime", "") if received_datetime: - mime_msg['Date'] = format_mime_date(received_datetime) + mime_msg["Date"] = format_mime_date(received_datetime) # First try the direct body content approach - message_id = message.get('id', '') + message_id = message.get("id", "") try: # First get the message with body content body_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}?$select=body,bodyPreview" @@ -108,46 +145,62 @@ async def create_mime_message_async(message, headers, attachments_dir, progress, body_data = await response.json() # Get body content - body_content = body_data.get('body', {}).get('content', '') - body_type = body_data.get('body', {}).get('contentType', 'text') - body_preview = body_data.get('bodyPreview', '') + body_content = body_data.get("body", {}).get("content", "") + body_type = body_data.get("body", {}).get("contentType", "text") + body_preview = body_data.get("bodyPreview", "") # If we have body content, use it if body_content: - if body_type.lower() == 'html': + if body_type.lower() == "html": # Add both HTML and plain text versions # Plain text conversion - plain_text = re.sub(r'', '\n', body_content) - plain_text = re.sub(r'<[^>]*>', '', plain_text) + plain_text = re.sub(r"", "\n", body_content) + plain_text = re.sub(r"<[^>]*>", "", plain_text) - mime_msg.attach(MIMEText(plain_text, 'plain')) - mime_msg.attach(MIMEText(body_content, 'html')) + mime_msg.attach(MIMEText(plain_text, "plain")) + mime_msg.attach(MIMEText(body_content, "html")) else: # Just plain text - mime_msg.attach(MIMEText(body_content, 'plain')) + mime_msg.attach(MIMEText(body_content, "plain")) elif body_preview: # Use preview if we have it - mime_msg.attach(MIMEText(f"{body_preview}\n\n[Message preview only. Full content not available.]", 'plain')) + mime_msg.attach( + MIMEText( + f"{body_preview}\n\n[Message preview only. Full content not available.]", + "plain", + ) + ) else: # Fallback to MIME content - progress.console.print(f"No direct body content for message {truncate_id(message_id)}, trying MIME content...") - await fetch_mime_content(mime_msg, message_id, headers, progress) + progress.console.print( + f"No direct body content for message {truncate_id(message_id)}, trying MIME content..." + ) + await fetch_mime_content( + mime_msg, message_id, headers, progress + ) else: - progress.console.print(f"Failed to get message body: {response.status}. Trying MIME content...") + progress.console.print( + f"Failed to get message body: {response.status}. Trying MIME content..." + ) await fetch_mime_content(mime_msg, message_id, headers, progress) except Exception as e: - progress.console.print(f"Error getting message body: {e}. Trying MIME content...") + progress.console.print( + f"Error getting message body: {e}. Trying MIME content..." + ) await fetch_mime_content(mime_msg, message_id, headers, progress) # Handle attachments only if we want to download them if download_attachments: - await add_attachments_async(mime_msg, message, headers, attachments_dir, progress) + await add_attachments_async( + mime_msg, message, headers, attachments_dir, progress + ) else: # Add a header to indicate attachment info was skipped - mime_msg['X-Attachments-Skipped'] = 'True' + mime_msg["X-Attachments-Skipped"] = "True" return mime_msg + async def fetch_mime_content(mime_msg, message_id, headers, progress): """ Fetch and add MIME content to a message when direct body access fails. @@ -159,7 +212,9 @@ async def fetch_mime_content(mime_msg, message_id, headers, progress): progress: Progress instance for updating progress bars. """ # Fallback to getting the MIME content - message_content_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value" + message_content_url = ( + f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value" + ) try: async with aiohttp.ClientSession() as session: async with session.get(message_content_url, headers=headers) as response: @@ -167,41 +222,58 @@ async def fetch_mime_content(mime_msg, message_id, headers, progress): full_content = await response.text() # Check for body tags - body_match = re.search(r']*>(.*?)', full_content, re.DOTALL | re.IGNORECASE) + body_match = re.search( + r"]*>(.*?)", + full_content, + re.DOTALL | re.IGNORECASE, + ) if body_match: body_content = body_match.group(1) # Simple HTML to text conversion - body_text = re.sub(r'', '\n', body_content) - body_text = re.sub(r'<[^>]*>', '', body_text) + body_text = re.sub(r"", "\n", body_content) + body_text = re.sub(r"<[^>]*>", "", body_text) # Add the plain text body - mime_msg.attach(MIMEText(body_text, 'plain')) + mime_msg.attach(MIMEText(body_text, "plain")) # Also add the HTML body - mime_msg.attach(MIMEText(full_content, 'html')) + mime_msg.attach(MIMEText(full_content, "html")) else: # Fallback - try to find content between Content-Type: text/html and next boundary - html_parts = re.findall(r'Content-Type: text/html.*?\r?\n\r?\n(.*?)(?:\r?\n\r?\n|$)', - full_content, re.DOTALL | re.IGNORECASE) + html_parts = re.findall( + r"Content-Type: text/html.*?\r?\n\r?\n(.*?)(?:\r?\n\r?\n|$)", + full_content, + re.DOTALL | re.IGNORECASE, + ) if html_parts: html_content = html_parts[0] - mime_msg.attach(MIMEText(html_content, 'html')) + mime_msg.attach(MIMEText(html_content, "html")) # Also make plain text version - plain_text = re.sub(r'', '\n', html_content) - plain_text = re.sub(r'<[^>]*>', '', plain_text) - mime_msg.attach(MIMEText(plain_text, 'plain')) + plain_text = re.sub(r"", "\n", html_content) + plain_text = re.sub(r"<[^>]*>", "", plain_text) + mime_msg.attach(MIMEText(plain_text, "plain")) else: # Just use the raw content as text if nothing else works - mime_msg.attach(MIMEText(full_content, 'plain')) - progress.console.print(f"Using raw content for message {message_id} - no body tags found") + mime_msg.attach(MIMEText(full_content, "plain")) + progress.console.print( + f"Using raw content for message {message_id} - no body tags found" + ) else: error_text = await response.text() - progress.console.print(f"Failed to get MIME content: {response.status} {error_text}") - mime_msg.attach(MIMEText(f"Failed to retrieve message body: HTTP {response.status}", 'plain')) + progress.console.print( + f"Failed to get MIME content: {response.status} {error_text}" + ) + mime_msg.attach( + MIMEText( + f"Failed to retrieve message body: HTTP {response.status}", + "plain", + ) + ) except Exception as e: progress.console.print(f"Error retrieving MIME content: {e}") - mime_msg.attach(MIMEText(f"Failed to retrieve message body: {str(e)}", 'plain')) + mime_msg.attach(MIMEText(f"Failed to retrieve message body: {str(e)}", "plain")) + async def add_attachments_async(mime_msg, message, headers, attachments_dir, progress): """ @@ -217,10 +289,12 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro Returns: None """ - message_id = message.get('id', '') + message_id = message.get("id", "") # Get attachments list - attachments_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments" + attachments_url = ( + f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments" + ) async with aiohttp.ClientSession() as session: async with session.get(attachments_url, headers=headers) as response: @@ -228,7 +302,7 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro return attachments_data = await response.json() - attachments = attachments_data.get('value', []) + attachments = attachments_data.get("value", []) if not attachments: return @@ -238,33 +312,42 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro ensure_directory_exists(message_attachments_dir) # Add a header with attachment count - mime_msg['X-Attachment-Count'] = str(len(attachments)) + mime_msg["X-Attachment-Count"] = str(len(attachments)) for idx, attachment in enumerate(attachments): - attachment_name = safe_filename(attachment.get('name', 'attachment')) - attachment_type = attachment.get('contentType', 'application/octet-stream') + attachment_name = safe_filename(attachment.get("name", "attachment")) + attachment_type = attachment.get( + "contentType", "application/octet-stream" + ) # Add attachment info to headers for reference - mime_msg[f'X-Attachment-{idx+1}-Name'] = attachment_name - mime_msg[f'X-Attachment-{idx+1}-Type'] = attachment_type + mime_msg[f"X-Attachment-{idx + 1}-Name"] = attachment_name + mime_msg[f"X-Attachment-{idx + 1}-Type"] = attachment_type - attachment_part = MIMEBase(*attachment_type.split('/', 1)) + attachment_part = MIMEBase(*attachment_type.split("/", 1)) # Get attachment content - if 'contentBytes' in attachment: - attachment_content = base64.b64decode(attachment['contentBytes']) + if "contentBytes" in attachment: + attachment_content = base64.b64decode(attachment["contentBytes"]) # Save attachment to disk - attachment_path = os.path.join(message_attachments_dir, attachment_name) - with open(attachment_path, 'wb') as f: + attachment_path = os.path.join( + message_attachments_dir, attachment_name + ) + with open(attachment_path, "wb") as f: f.write(attachment_content) # Add to MIME message attachment_part.set_payload(attachment_content) encoders.encode_base64(attachment_part) - attachment_part.add_header('Content-Disposition', f'attachment; filename="{attachment_name}"') + attachment_part.add_header( + "Content-Disposition", + f'attachment; filename="{attachment_name}"', + ) mime_msg.attach(attachment_part) progress.console.print(f"Downloaded attachment: {attachment_name}") else: - progress.console.print(f"Skipping attachment with no content: {attachment_name}") + progress.console.print( + f"Skipping attachment with no content: {attachment_name}" + )