""" Mail operations for Microsoft Graph API. """ import os import re import glob import asyncio from typing import Set import aiohttp from .client import ( fetch_with_aiohttp, patch_with_aiohttp, post_with_aiohttp, delete_with_aiohttp, batch_with_aiohttp, ) async def fetch_mail_async( maildir_path, attachments_dir, headers, progress, task_id, dry_run=False, download_attachments=False, ): """ Fetch mail from Microsoft Graph API and save to Maildir. Args: maildir_path (str): Path to the Maildir. attachments_dir (str): Path to save attachments. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. download_attachments (bool): If True, download email attachments. Returns: None """ from src.utils.mail_utils.maildir import save_mime_to_maildir_async from src.utils.mail_utils.helpers import truncate_id mail_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead" messages = [] # Fetch the total count of messages in the inbox inbox_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox" response = await fetch_with_aiohttp(inbox_url, headers) total_messages = response.get("totalItemCount", 0) progress.update(task_id, total=total_messages) while mail_url: try: response_data = await fetch_with_aiohttp(mail_url, headers) except Exception as e: progress.console.print(f"Error fetching messages: {e}") continue messages.extend(response_data.get("value", [])) progress.advance(task_id, len(response_data.get("value", []))) # Get the next page URL from @odata.nextLink mail_url = response_data.get("@odata.nextLink") inbox_msg_ids = set(message["id"] for message in messages) progress.update(task_id, completed=(len(messages) / 2)) new_dir = os.path.join(maildir_path, "new") cur_dir = os.path.join(maildir_path, "cur") new_files = set(glob.glob(os.path.join(new_dir, "*.eml*"))) cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*"))) # Get local message IDs (filename without extension) local_msg_ids = set() for filename in set.union(cur_files, new_files): message_id = os.path.basename(filename).split(".")[ 0 ] # Extract the Message-ID from the filename local_msg_ids.add(message_id) # Delete local files that no longer exist on server for filename in set.union(cur_files, new_files): message_id = os.path.basename(filename).split(".")[ 0 ] # Extract the Message-ID from the filename if message_id not in inbox_msg_ids: if not dry_run: progress.console.print(f"Deleting {filename} from inbox") os.remove(filename) else: progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox") # Filter messages to only include those not already local messages_to_download = [msg for msg in messages if msg["id"] not in local_msg_ids] progress.console.print( f"Found {len(messages)} total messages on server, {len(local_msg_ids)} already local" ) progress.console.print(f"Downloading {len(messages_to_download)} new messages") # Update progress to reflect only the messages we actually need to download progress.update(task_id, total=len(messages_to_download), completed=0) for message in messages_to_download: progress.console.print( f"Processing message: {message.get('subject', 'No Subject')}", end="\r" ) await save_mime_to_maildir_async( maildir_path, message, attachments_dir, headers, progress, dry_run, download_attachments, ) progress.update(task_id, advance=1) progress.update(task_id, completed=len(messages_to_download)) progress.console.print( f"\nFinished downloading {len(messages_to_download)} new messages." ) progress.console.print( f"Total messages on server: {len(messages)}, Already local: {len(local_msg_ids)}" ) async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=False): """ Archive mail from Maildir to Microsoft Graph API archive folder using batch operations. Args: maildir_path (str): Path to the Maildir. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. Returns: None """ # Check both possible archive folder names locally archive_files = [] for archive_folder_name in [".Archives", ".Archive"]: archive_dir = os.path.join(maildir_path, archive_folder_name) if os.path.exists(archive_dir): archive_files.extend( glob.glob(os.path.join(archive_dir, "**", "*.eml*"), recursive=True) ) if not archive_files: progress.update(task_id, total=0, completed=0) progress.console.print("No messages to archive") return progress.update(task_id, total=len(archive_files)) # Get archive folder ID from server folder_response = await fetch_with_aiohttp( "https://graph.microsoft.com/v1.0/me/mailFolders", headers ) folders = folder_response.get("value", []) archive_folder_id = next( ( folder.get("id") for folder in folders if folder.get("displayName", "").lower() in ["archive", "archives"] ), None, ) if not archive_folder_id: raise Exception("No folder named 'Archive' or 'Archives' found on the server.") # Process files in batches of 20 (Microsoft Graph batch limit) batch_size = 20 successful_moves = [] for i in range(0, len(archive_files), batch_size): batch_files = archive_files[i : i + batch_size] # Add small delay between batches to respect API limits if i > 0: await asyncio.sleep(0.5) if not dry_run: # Prepare batch requests batch_requests = [] for idx, filepath in enumerate(batch_files): message_id = os.path.basename(filepath).split(".")[0] batch_requests.append( { "id": str(idx + 1), "method": "POST", "url": f"/me/messages/{message_id}/microsoft.graph.move", "body": {"destinationId": archive_folder_id}, "headers": {"Content-Type": "application/json"}, } ) try: # Execute batch request batch_response = await batch_with_aiohttp(batch_requests, headers) # Process batch results for response in batch_response.get("responses", []): request_id = ( int(response["id"]) - 1 ) # Convert back to 0-based index filepath = batch_files[request_id] message_id = os.path.basename(filepath).split(".")[0] status = response["status"] if status == 201: # 201 Created indicates successful move os.remove( filepath ) # Remove the local file since it's now archived on server successful_moves.append(message_id) progress.console.print( f"Moved message to 'Archive': {message_id}" ) elif status == 404: os.remove( filepath ) # Remove the file from local archive if not found on server progress.console.print( f"Message not found on server, removed local copy: {message_id}" ) else: progress.console.print( f"Failed to move message to 'Archive': {message_id}, status: {status}" ) except Exception as e: progress.console.print(f"Batch archive request failed: {str(e)}") # Fall back to individual requests for this batch for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] try: status = await post_with_aiohttp( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move", headers, {"destinationId": archive_folder_id}, ) if status == 201: os.remove(filepath) successful_moves.append(message_id) progress.console.print( f"Moved message to 'Archive' (fallback): {message_id}" ) elif status == 404: os.remove(filepath) progress.console.print( f"Message not found on server, removed local copy: {message_id}" ) else: progress.console.print( f"Failed to move message to 'Archive': {message_id}, status: {status}" ) except Exception as individual_error: progress.console.print( f"Failed to archive {message_id}: {str(individual_error)}" ) else: # Dry run - just log what would be done for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] progress.console.print( f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}" ) progress.advance(task_id, len(batch_files)) if not dry_run: progress.console.print( f"Successfully archived {len(successful_moves)} messages in batches" ) return async def delete_mail_async(maildir_path, headers, progress, task_id, dry_run=False): """ Delete mail from Maildir and Microsoft Graph API using batch operations. Args: maildir_path (str): Path to the Maildir. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. Returns: None """ trash_dir = os.path.join(maildir_path, ".Trash", "cur") trash_files = set(glob.glob(os.path.join(trash_dir, "*.eml*"))) if not trash_files: progress.update(task_id, total=0, completed=0) progress.console.print("No messages to delete") return progress.update(task_id, total=len(trash_files)) # Process files in batches of 20 (Microsoft Graph batch limit) batch_size = 20 trash_files_list = list(trash_files) successful_deletes = [] for i in range(0, len(trash_files_list), batch_size): batch_files = trash_files_list[i : i + batch_size] # Add small delay between batches to respect API limits if i > 0: await asyncio.sleep(0.5) if not dry_run: # Prepare batch requests batch_requests = [] for idx, filepath in enumerate(batch_files): message_id = os.path.basename(filepath).split(".")[0] batch_requests.append( { "id": str(idx + 1), "method": "DELETE", "url": f"/me/messages/{message_id}", } ) try: # Execute batch request batch_response = await batch_with_aiohttp(batch_requests, headers) # Process batch results for response in batch_response.get("responses", []): request_id = ( int(response["id"]) - 1 ) # Convert back to 0-based index filepath = batch_files[request_id] message_id = os.path.basename(filepath).split(".")[0] status = response["status"] if ( status == 204 or status == 404 ): # 204 No Content or 404 Not Found (already deleted) os.remove(filepath) # Remove the file from local trash successful_deletes.append(message_id) progress.console.print(f"Deleted message: {message_id}") else: progress.console.print( f"Failed to delete message: {message_id}, status: {status}" ) except Exception as e: progress.console.print(f"Batch delete request failed: {str(e)}") # Fall back to individual requests for this batch for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] try: status = await delete_with_aiohttp( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers, ) if status == 204 or status == 404: os.remove(filepath) successful_deletes.append(message_id) progress.console.print( f"Deleted message (fallback): {message_id}" ) else: progress.console.print( f"Failed to delete message: {message_id}, status: {status}" ) except Exception as individual_error: progress.console.print( f"Failed to delete {message_id}: {str(individual_error)}" ) else: # Dry run - just log what would be done for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] progress.console.print(f"[DRY-RUN] Would delete message: {message_id}") progress.advance(task_id, len(batch_files)) if not dry_run: progress.console.print( f"Successfully deleted {len(successful_deletes)} messages in batches" ) async def get_inbox_count_async(headers): """ Get the number of messages in the inbox. Args: headers (dict): Headers including authentication. Returns: int: The number of messages in the inbox. """ inbox_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox" response = await fetch_with_aiohttp(inbox_url, headers) return response.get("totalItemCount", 0) async def synchronize_maildir_async( maildir_path, headers, progress, task_id, dry_run=False ): """ Synchronize Maildir with Microsoft Graph API using batch operations. Args: maildir_path (str): Path to the Maildir. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. Returns: None """ from src.utils.mail_utils.helpers import ( load_last_sync_timestamp, save_sync_timestamp, truncate_id, ) last_sync = load_last_sync_timestamp() # Find messages moved from "new" to "cur" and mark them as read new_dir = os.path.join(maildir_path, "new") cur_dir = os.path.join(maildir_path, "cur") new_files = set(glob.glob(os.path.join(new_dir, "*.eml*"))) cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*"))) moved_to_cur = [os.path.basename(f) for f in cur_files - new_files] # Filter out files that haven't been modified since last sync files_to_process = [] for filename in moved_to_cur: if os.path.getmtime(os.path.join(cur_dir, filename)) >= last_sync: files_to_process.append(filename) if not files_to_process: progress.update(task_id, total=0, completed=0) progress.console.print("No messages to mark as read") # Save timestamp even if no work was done if not dry_run: save_sync_timestamp() return progress.update(task_id, total=len(files_to_process)) # Process files in batches of 20 (Microsoft Graph batch limit) batch_size = 20 successful_reads = [] for i in range(0, len(files_to_process), batch_size): batch_files = files_to_process[i : i + batch_size] # Add small delay between batches to respect API limits if i > 0: await asyncio.sleep(0.5) if not dry_run: # Prepare batch requests batch_requests = [] for idx, filename in enumerate(batch_files): message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) batch_requests.append( { "id": str(idx + 1), "method": "PATCH", "url": f"/me/messages/{message_id}", "body": {"isRead": True}, "headers": {"Content-Type": "application/json"}, } ) try: # Execute batch request batch_response = await batch_with_aiohttp(batch_requests, headers) # Process batch results for response in batch_response.get("responses", []): request_id = ( int(response["id"]) - 1 ) # Convert back to 0-based index filename = batch_files[request_id] message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) status = response["status"] if status == 200: # 200 OK indicates successful update successful_reads.append(message_id) progress.console.print( f"Marked message as read: {truncate_id(message_id)}" ) elif status == 404: os.remove( os.path.join(cur_dir, filename) ) # Remove file if message doesn't exist on server progress.console.print( f"Message not found on server, removed local copy: {truncate_id(message_id)}" ) else: progress.console.print( f"Failed to mark message as read: {truncate_id(message_id)}, status: {status}" ) except Exception as e: progress.console.print(f"Batch read-status request failed: {str(e)}") # Fall back to individual requests for this batch for filename in batch_files: message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) try: status = await patch_with_aiohttp( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers, {"isRead": True}, ) if status == 200: successful_reads.append(message_id) progress.console.print( f"Marked message as read (fallback): {truncate_id(message_id)}" ) elif status == 404: os.remove(os.path.join(cur_dir, filename)) progress.console.print( f"Message not found on server, removed local copy: {truncate_id(message_id)}" ) else: progress.console.print( f"Failed to mark message as read: {truncate_id(message_id)}, status: {status}" ) except Exception as individual_error: progress.console.print( f"Failed to update read status for {truncate_id(message_id)}: {str(individual_error)}" ) else: # Dry run - just log what would be done for filename in batch_files: message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) progress.console.print( f"[DRY-RUN] Would mark message as read: {truncate_id(message_id)}" ) progress.advance(task_id, len(batch_files)) # Save the current sync timestamp if not dry_run: save_sync_timestamp() progress.console.print( f"Successfully marked {len(successful_reads)} messages as read in batches" ) else: progress.console.print("[DRY-RUN] Would save sync timestamp.")