import glob import json import os import re import time from datetime import datetime, timedelta from email.message import EmailMessage from email.utils import format_datetime from typing import Set from dateutil import parser from dateutil.tz import UTC from rich import print from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, MofNCompleteColumn import aiohttp import argparse import asyncio import html2text import msal import orjson # Filepath for caching timestamp cache_timestamp_file = 'cache_timestamp.json' # Filepath for sync timestamp sync_timestamp_file = 'sync_timestamp.json' # Function to load the last sync timestamp def load_last_sync_timestamp(): if os.path.exists(sync_timestamp_file): with open(sync_timestamp_file, 'r') as f: return json.load(f).get('last_sync', 0) return 0 # Function to save the current sync timestamp def save_sync_timestamp(): with open(sync_timestamp_file, 'w') as f: json.dump({'last_sync': time.time()}, f) # Add argument parsing for dry-run mode arg_parser = argparse.ArgumentParser(description="Fetch and synchronize emails.") arg_parser.add_argument("--dry-run", action="store_true", help="Run in dry-run mode without making changes.", default=False) args = arg_parser.parse_args() dry_run = args.dry_run # Define a global semaphore for throttling semaphore = asyncio.Semaphore(4) async def fetch_with_aiohttp(url, headers): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}") raw_bytes = await response.read() content_length = response.headers.get('Content-Length') if content_length and len(raw_bytes) != int(content_length): print("Warning: Incomplete response received!") return None return orjson.loads(raw_bytes) async def post_with_aiohttp(url, headers, json_data): async with semaphore: async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=json_data) as response: return response.status async def patch_with_aiohttp(url, headers, json_data): async with semaphore: async with aiohttp.ClientSession() as session: async with session.patch(url, headers=headers, json=json_data) as response: return response.status async def delete_with_aiohttp(url, headers): async with semaphore: async with aiohttp.ClientSession() as session: async with session.delete(url, headers=headers) as response: return response.status async def synchronize_maildir_async(maildir_path, headers, progress, task_id): last_sync = load_last_sync_timestamp() current_time = time.time() # Find messages moved from "new" to "cur" and mark them as read new_dir = os.path.join(maildir_path, 'new') cur_dir = os.path.join(maildir_path, 'cur') new_files = set(glob.glob(os.path.join(new_dir, '*.eml*'))) cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*'))) moved_to_cur = [os.path.basename(f) for f in cur_files - new_files] progress.update(task_id, total=len(moved_to_cur)) for filename in moved_to_cur: # TODO: this isn't scalable, we should use a more efficient way to check if the file was modified if os.path.getmtime(os.path.join(cur_dir, filename)) < last_sync: progress.update(task_id, advance=1) continue message_id = re.sub(r"\:2.+", "", filename.split('.')[0]) # Extract the Message-ID from the filename if not dry_run: status = await patch_with_aiohttp( f'https://graph.microsoft.com/v1.0/me/messages/{message_id}', headers, {'isRead': True} ) if status == 404: os.remove(os.path.join(cur_dir, filename)) else: progress.console.print(f"[DRY-RUN] Would mark message as read: {message_id}") progress.advance(task_id) # Save the current sync timestamp if not dry_run: save_sync_timestamp() else: progress.console.print("[DRY-RUN] Would save sync timestamp.") async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_id): mail_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead' messages = [] # Fetch the total count of messages in the inbox inbox_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox' response = await fetch_with_aiohttp(inbox_url, headers) total_messages = response.get('totalItemCount', 0) progress.update(task_id, total=total_messages) while mail_url: try: response_data = await fetch_with_aiohttp(mail_url, headers) except Exception as e: progress.console.print(f"Error fetching messages: {e}") continue messages.extend(response_data.get('value', [])) progress.advance(task_id, len(response_data.get('value', []))) # Get the next page URL from @odata.nextLink mail_url = response_data.get('@odata.nextLink') inbox_msg_ids = set(message['id'] for message in messages) progress.update(task_id, completed=(len(messages) / 2)) new_dir = os.path.join(maildir_path, 'new') cur_dir = os.path.join(maildir_path, 'cur') new_files = set(glob.glob(os.path.join(new_dir, '*.eml*'))) cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*'))) for filename in Set.union(cur_files, new_files): message_id = filename.split('.')[0].split('/')[-1] # Extract the Message-ID from the filename if (message_id not in inbox_msg_ids): if not dry_run: progress.console.print(f"Deleting {filename} from inbox") os.remove(filename) else: progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox") for message in messages: progress.console.print(f"Processing message: {message.get('subject', 'No Subject')}", end='\r') await save_mime_to_maildir_async(maildir_path, message, attachments_dir, headers, progress) progress.update(task_id, advance=0.5) progress.update(task_id, completed=len(messages)) progress.console.print(f"\nFinished saving {len(messages)} messages.") async def archive_mail_async(maildir_path, headers, progress, task_id): archive_dir = os.path.join(maildir_path, '.Archives') archive_files = glob.glob(os.path.join(archive_dir, '**', '*.eml*'), recursive=True) progress.update(task_id, total=len(archive_files)) folder_response = await fetch_with_aiohttp('https://graph.microsoft.com/v1.0/me/mailFolders', headers) folders = folder_response.get('value', []) archive_folder_id = next((folder.get('id') for folder in folders if folder.get('displayName', '').lower() == 'archive'), None) if not archive_folder_id: raise Exception("No folder named 'Archive' found on the server.") for filepath in archive_files: message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename if not dry_run: status = await post_with_aiohttp( f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move', headers, {'destinationId': archive_folder_id} ) if status != 201: # 201 Created indicates success progress.console.print(f"Failed to move message to 'Archive': {message_id}, {status}") if status == 404: os.remove(filepath) # Remove the file from local archive if not fo progress.console.print(f"Message not found on server, removed local copy: {message_id}") elif status == 204: progress.console.print(f"Moved message to 'Archive': {message_id}") else: progress.console.print(f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}") progress.advance(task_id) return async def delete_mail_async(maildir_path, headers, progress, task_id): trash_dir = os.path.join(maildir_path, '.Trash', 'cur') trash_files = set(glob.glob(os.path.join(trash_dir, '*.eml*'))) progress.update(task_id, total=len(trash_files)) for filepath in trash_files: message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename if not dry_run: progress.console.print(f"Moving message to trash: {message_id}") status = await delete_with_aiohttp( f'https://graph.microsoft.com/v1.0/me/messages/{message_id}', headers ) if status == 204 or status == 404: os.remove(filepath) # Remove the file from local trash else: progress.console.print(f"[DRY-RUN] Would delete message: {message_id}") progress.advance(task_id) async def fetch_calendar_async(headers, progress, task_id): yesterday = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(days=1) end_of_today = datetime.now().replace(hour=23, minute=59, second=59) six_days_future = end_of_today + timedelta(days=6) # example https://graph.microsoft.com/v1.0/me/calendarView?startDateTime=2025-05-06T00:00:00&endDateTime=2025-05-13T23:59:59.999999&$count=true&$select=id event_base_url =f"https://graph.microsoft.com/v1.0/me/calendarView?startDateTime={yesterday.isoformat()}&endDateTime={six_days_future.isoformat()}" total_event_url = f"{event_base_url}&$count=true&$select=id" total = await fetch_with_aiohttp(total_event_url, headers) total_events = total.get('@odata.count', 0) + 1 progress.update(task_id, total=total_events) calendar_url = f"{event_base_url}&$top=100&$select=start,end,iCalUid,subject,bodyPreview,webLink,location,recurrence,showAs,responseStatus,onlineMeeting" events = [] if total_events > 100: progress.update(task_id, total=total_events + total_events % 100) while calendar_url: response_data = await fetch_with_aiohttp(calendar_url, headers) events.extend(response_data.get('value', [])) progress.advance(task_id, 1) # Get the next page URL from @odata.nextLink calendar_url = response_data.get('@odata.nextLink') output_file = f'output_ics/outlook_events_latest.ics' if not dry_run: os.makedirs(os.path.dirname(output_file), exist_ok=True) progress.console.print(f"Saving events to {output_file}...") with open(output_file, 'w') as f: f.write("BEGIN:VCALENDAR\nVERSION:2.0\n") for event in events: progress.advance(task_id) if 'start' in event and 'end' in event: start = parser.isoparse(event['start']['dateTime']).astimezone(UTC) end = parser.isoparse(event['end']['dateTime']).astimezone(UTC) f.write(f"BEGIN:VEVENT\nSUMMARY:{event['subject']}\nDESCRIPTION:{event.get('bodyPreview', '')}\n") f.write(f"UID:{event.get('iCalUId', '')}\n") f.write(f"LOCATION:{event.get('location', {})['displayName']}\n") f.write(f"CLASS:{event.get('showAs', '')}\n") f.write(f"STATUS:{event.get('responseStatus', {})['response']}\n") if 'onlineMeeting' in event and event['onlineMeeting']: f.write(f"URL:{event.get('onlineMeeting', {}).get('joinUrl', '')}\n") f.write(f"DTSTART:{start.strftime('%Y%m%dT%H%M%S')}\n") f.write(f"DTEND:{end.strftime('%Y%m%dT%H%M%S')}\n") if 'recurrence' in event and event['recurrence']: # Check if 'recurrence' exists and is not None for rule in event['recurrence']: if rule.startswith('RRULE'): rule_parts = rule.split(';') new_rule_parts = [] for part in rule_parts: if part.startswith('UNTIL='): until_value = part.split('=')[1] until_date = parser.isoparse(until_value) if start.tzinfo is not None and until_date.tzinfo is None: until_date = until_date.replace(tzinfo=UTC) new_rule_parts.append(f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}") else: new_rule_parts.append(part) rule = ';'.join(new_rule_parts) f.write(f"{rule}\n") f.write("END:VEVENT\n") f.write("END:VCALENDAR\n") progress.console.print(f"Saved events to {output_file}") else: progress.console.print(f"[DRY-RUN] Would save events to {output_file}") # Function to create Maildir structure def create_maildir_structure(base_path): os.makedirs(os.path.join(base_path, 'cur'), exist_ok=True) os.makedirs(os.path.join(base_path, 'new'), exist_ok=True) os.makedirs(os.path.join(base_path, 'tmp'), exist_ok=True) async def save_mime_to_maildir_async(maildir_path, email_data, attachments_dir, headers, progress): # Create a new EmailMessage object msg = EmailMessage() # Determine the directory based on isRead target_dir = 'cur' if email_data.get('isRead', False) else 'new' id = email_data.get('id', '') if not id: progress.console.print(f"Message ID not found. Skipping save.") return email_filename = f"{id}.eml" email_filepath = os.path.join(maildir_path, target_dir, email_filename) # Check if the file already exists if os.path.exists(email_filepath): progress.console.print(f"Message {id} already exists in {target_dir}. Skipping save.") return # Fetch the full MIME payload from the API mime_url = f'https://graph.microsoft.com/v1.0/me/messages/{id}/$value' try: async with aiohttp.ClientSession() as session: async with session.get(mime_url, headers=headers) as response: if response.status != 200: raise Exception(f"Failed to fetch MIME payload for {id}: {response.status} {await response.text()}") mime_payload = await response.text() # Save the MIME payload to the Maildir os.makedirs(os.path.dirname(email_filepath), exist_ok=True) with open(email_filepath, 'w') as f: f.write(mime_payload) progress.console.print(f"Saved message {id} to {target_dir}.") except Exception as e: progress.console.print(f"Failed to save message {id}: {e}") def save_email_to_maildir(maildir_path, email_data, attachments_dir, progress): # Create a new EmailMessage object msg = EmailMessage() received_datetime = email_data.get('receivedDateTime', '') if received_datetime: parsed_datetime = parser.isoparse(received_datetime) msg['Date'] = format_datetime(parsed_datetime) else: msg['Date'] = '' msg['Message-ID'] = email_data.get('id', '') msg['Subject'] = email_data.get('subject', 'No Subject') msg['From'] = email_data.get('from', {}).get('emailAddress', {}).get('address', 'unknown@unknown.com') msg['To'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('toRecipients', [])]) msg['Cc'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('ccRecipients', [])]) # Convert the email body from HTML to Markdown body_html = email_data.get('body', {}).get('content', '') if email_data.get('body', {}).get('contentType', '').lower() == 'html': markdown_converter = html2text.HTML2Text() markdown_converter.ignore_images = True markdown_converter.ignore_links = True body_markdown = markdown_converter.handle(body_html) else: body_markdown = body_html # Remove lines between any alphanumeric BannerStart and BannerEnd body_markdown = re.sub(r'\w+BannerStart.*?\w+BannerEnd', '', body_markdown, flags=re.DOTALL) msg.set_content(body_markdown) # Download attachments progress.console.print(f"Downloading attachments for message: {msg['Message-ID']}") for attachment in email_data.get('attachments', []): attachment_id = attachment.get('id') attachment_name = attachment.get('name', 'unknown') attachment_content = attachment.get('contentBytes') if attachment_content: attachment_path = os.path.join(attachments_dir, attachment_name) if not dry_run: with open(attachment_path, 'wb') as f: f.write(attachment_content.encode('utf-8')) msg.add_attachment(attachment_content.encode('utf-8'), filename=attachment_name) else: progress.console.print(f"[DRY-RUN] Would save attachment to {attachment_path}") # Determine the directory based on isRead target_dir = 'cur' if email_data.get('isRead', False) else 'new' email_filename = f"{msg['Message-ID']}.eml" email_filepath = os.path.join(maildir_path, target_dir, email_filename) # Check if the file already exists in any subfolder for root, _, files in os.walk(maildir_path): if email_filename in files: progress.console.print(f"Message {msg['Message-ID']} already exists in {root}. Skipping save.") return # Save the email to the Maildir if not dry_run: with open(email_filepath, 'w') as f: f.write(msg.as_string()) progress.console.print(f"Saved message {msg['Message-ID']}") else: progress.console.print(f"[DRY-RUN] Would save message {msg['Message-ID']}") async def main(): # Load cached timestamp if it exists if os.path.exists(cache_timestamp_file): with open(cache_timestamp_file, 'r') as f: cache_timestamp = json.load(f) else: cache_timestamp = {} # Save emails to Maildir maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva" attachments_dir = os.path.join(maildir_path, 'attachments') os.makedirs(attachments_dir, exist_ok=True) create_maildir_structure(maildir_path) # Read Azure app credentials from environment variables client_id = os.getenv('AZURE_CLIENT_ID') tenant_id = os.getenv('AZURE_TENANT_ID') if not client_id or not tenant_id: raise ValueError("Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.") # Token cache cache = msal.SerializableTokenCache() cache_file = 'token_cache.bin' if os.path.exists(cache_file): cache.deserialize(open(cache_file, 'r').read()) # Filepath for caching ETag etag_cache_file = 'etag_cache.json' # Load cached ETag if it exists if os.path.exists(etag_cache_file): with open(etag_cache_file, 'r') as f: etag_cache = json.load(f) else: etag_cache = {} # Authentication authority = f'https://login.microsoftonline.com/{tenant_id}' scopes = ['https://graph.microsoft.com/Calendars.Read', 'https://graph.microsoft.com/Mail.ReadWrite'] app = msal.PublicClientApplication(client_id, authority=authority, token_cache=cache) accounts = app.get_accounts() if accounts: token_response = app.acquire_token_silent(scopes, account=accounts[0]) else: flow = app.initiate_device_flow(scopes=scopes) if 'user_code' not in flow: raise Exception("Failed to create device flow") print(Panel(flow['message'], border_style="magenta", padding=2, title="MSAL Login Flow Link")) token_response = app.acquire_token_by_device_flow(flow) if 'access_token' not in token_response: raise Exception("Failed to acquire token") # Save token cache with open(cache_file, 'w') as f: f.write(cache.serialize()) access_token = token_response['access_token'] headers = {'Authorization': f'Bearer {access_token}', 'Prefer': 'outlook.body-content-type="text"'} accounts = app.get_accounts() if not accounts: raise Exception("No accounts found") maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva" progress = Progress( SpinnerColumn(), MofNCompleteColumn(), *Progress.get_default_columns() ) with progress: task_fetch = progress.add_task("[green]Syncing Inbox...", total=0) task_calendar = progress.add_task("[cyan]Fetching calendar...", total=0) task_read = progress.add_task("[blue]Marking as read...", total=0) task_archive = progress.add_task("[yellow]Archiving mail...", total=0) task_delete = progress.add_task("[red]Deleting mail...", total=0) await asyncio.gather( synchronize_maildir_async(maildir_path, headers, progress, task_read), archive_mail_async(maildir_path, headers, progress, task_archive), delete_mail_async(maildir_path, headers, progress, task_delete), fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_fetch), fetch_calendar_async(headers, progress, task_calendar) ) if __name__ == "__main__": asyncio.run(main())