""" Mail operations for Microsoft Graph API. """ import os import re import glob import json import asyncio from email.parser import Parser from email.utils import getaddresses from typing import List, Dict, Any, Set from .client import ( fetch_with_aiohttp, patch_with_aiohttp, post_with_aiohttp, delete_with_aiohttp, batch_with_aiohttp, ) async def fetch_mail_async( maildir_path, attachments_dir, headers, progress, task_id, dry_run=False, download_attachments=False, is_cancelled=None, ): """ Fetch mail from Microsoft Graph API and save to Maildir. Args: maildir_path (str): Path to the Maildir. attachments_dir (str): Path to save attachments. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. download_attachments (bool): If True, download email attachments. is_cancelled (callable, optional): Callback that returns True if task should stop. Returns: None """ from src.utils.mail_utils.maildir import save_mime_to_maildir_async mail_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead" messages = [] # Fetch the total count of messages in the inbox inbox_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox" response = await fetch_with_aiohttp(inbox_url, headers) total_messages = response.get("totalItemCount", 0) progress.update(task_id, total=total_messages) while mail_url: try: response_data = await fetch_with_aiohttp(mail_url, headers) except Exception as e: progress.console.print(f"Error fetching messages: {e}") continue messages.extend(response_data.get("value", [])) progress.advance(task_id, len(response_data.get("value", []))) # Get the next page URL from @odata.nextLink mail_url = response_data.get("@odata.nextLink") inbox_msg_ids = set(message["id"] for message in messages) progress.update(task_id, completed=(len(messages) / 2)) new_dir = os.path.join(maildir_path, "new") cur_dir = os.path.join(maildir_path, "cur") new_files = set(glob.glob(os.path.join(new_dir, "*.eml*"))) cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*"))) # Get local message IDs (filename without extension) local_msg_ids = set() for filename in set.union(cur_files, new_files): message_id = os.path.basename(filename).split(".")[ 0 ] # Extract the Message-ID from the filename local_msg_ids.add(message_id) # Delete local files that no longer exist on server for filename in set.union(cur_files, new_files): message_id = os.path.basename(filename).split(".")[ 0 ] # Extract the Message-ID from the filename if message_id not in inbox_msg_ids: if not dry_run: progress.console.print(f"Deleting {filename} from inbox") os.remove(filename) else: progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox") # Filter messages to only include those not already local messages_to_download = [msg for msg in messages if msg["id"] not in local_msg_ids] progress.console.print( f"Found {len(messages)} total messages on server, {len(local_msg_ids)} already local" ) progress.console.print(f"Downloading {len(messages_to_download)} new messages") # Update progress to reflect only the messages we actually need to download progress.update(task_id, total=len(messages_to_download), completed=0) downloaded_count = 0 # Download messages in parallel batches for better performance BATCH_SIZE = 5 for i in range(0, len(messages_to_download), BATCH_SIZE): # Check if task was cancelled/disabled if is_cancelled and is_cancelled(): progress.console.print("Task cancelled, stopping inbox fetch") break batch = messages_to_download[i : i + BATCH_SIZE] # Create tasks for parallel download async def download_message(message): progress.console.print( f"Processing message: {message.get('subject', 'No Subject')[:50]}", end="\r", ) await save_mime_to_maildir_async( maildir_path, message, attachments_dir, headers, progress, dry_run, download_attachments, ) return 1 # Execute batch in parallel tasks = [download_message(msg) for msg in batch] results = await asyncio.gather(*tasks, return_exceptions=True) # Count successful downloads batch_success = sum(1 for r in results if r == 1) downloaded_count += batch_success progress.update(task_id, advance=len(batch)) # Log any errors for idx, result in enumerate(results): if isinstance(result, Exception): progress.console.print(f"Error downloading message: {result}") progress.update(task_id, completed=downloaded_count) progress.console.print(f"\nFinished downloading {downloaded_count} new messages.") progress.console.print( f"Total messages on server: {len(messages)}, Already local: {len(local_msg_ids)}" ) def _get_archive_sync_state_path(maildir_path: str) -> str: """Get the path to the archive sync state file.""" return os.path.join(maildir_path, ".Archive", ".sync_state.json") def _load_archive_sync_state(maildir_path: str) -> Set[str]: """Load the set of message IDs that have been synced to server.""" state_path = _get_archive_sync_state_path(maildir_path) if os.path.exists(state_path): try: with open(state_path, "r") as f: data = json.load(f) return set(data.get("synced_to_server", [])) except Exception: pass return set() def _save_archive_sync_state(maildir_path: str, synced_ids: Set[str]) -> None: """Save the set of message IDs that have been synced to server.""" state_path = _get_archive_sync_state_path(maildir_path) os.makedirs(os.path.dirname(state_path), exist_ok=True) with open(state_path, "w") as f: json.dump({"synced_to_server": list(synced_ids)}, f, indent=2) async def archive_mail_async( maildir_path, headers, progress, task_id, dry_run=False, is_cancelled=None ): """ Archive mail from Maildir to Microsoft Graph API archive folder using batch operations. Messages are moved to the server's Archive folder, but local copies are kept. A sync state file tracks which messages have already been synced to avoid re-processing them on subsequent runs. Args: maildir_path (str): Path to the Maildir. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. is_cancelled (callable, optional): Callback that returns True if task should stop. Returns: None """ # Load already-synced message IDs synced_ids = _load_archive_sync_state(maildir_path) # Check both possible archive folder names locally (prefer .Archive) archive_files = [] for archive_folder_name in [".Archive", ".Archives"]: archive_dir = os.path.join(maildir_path, archive_folder_name) if os.path.exists(archive_dir): archive_files.extend( glob.glob(os.path.join(archive_dir, "**", "*.eml*"), recursive=True) ) # Filter out already-synced messages files_to_sync = [] for filepath in archive_files: message_id = os.path.basename(filepath).split(".")[0] if message_id not in synced_ids: files_to_sync.append(filepath) if not files_to_sync: progress.update(task_id, total=0, completed=0) progress.console.print( f"No new messages to archive ({len(archive_files)} already synced)" ) return progress.update(task_id, total=len(files_to_sync)) progress.console.print( f"Found {len(files_to_sync)} new messages to sync to server Archive" ) # Get archive folder ID from server folder_response = await fetch_with_aiohttp( "https://graph.microsoft.com/v1.0/me/mailFolders", headers ) folders = folder_response.get("value", []) archive_folder_id = next( ( folder.get("id") for folder in folders if folder.get("displayName", "").lower() in ["archive", "archives"] ), None, ) if not archive_folder_id: raise Exception("No folder named 'Archive' or 'Archives' found on the server.") # Process files in batches of 20 (Microsoft Graph batch limit) batch_size = 20 successful_moves = [] newly_synced_ids: Set[str] = set() for i in range(0, len(files_to_sync), batch_size): # Check if task was cancelled/disabled if is_cancelled and is_cancelled(): progress.console.print("Task cancelled, stopping archive sync") break batch_files = files_to_sync[i : i + batch_size] # Add small delay between batches to respect API limits if i > 0: await asyncio.sleep(0.5) if not dry_run: # Prepare batch requests batch_requests = [] for idx, filepath in enumerate(batch_files): message_id = os.path.basename(filepath).split(".")[0] batch_requests.append( { "id": str(idx + 1), "method": "POST", "url": f"/me/messages/{message_id}/microsoft.graph.move", "body": {"destinationId": archive_folder_id}, "headers": {"Content-Type": "application/json"}, } ) try: # Execute batch request batch_response = await batch_with_aiohttp(batch_requests, headers) # Process batch results for response in batch_response.get("responses", []): request_id = ( int(response["id"]) - 1 ) # Convert back to 0-based index filepath = batch_files[request_id] message_id = os.path.basename(filepath).split(".")[0] status = response["status"] if status == 201: # 201 Created indicates successful move # Keep local file, just mark as synced newly_synced_ids.add(message_id) successful_moves.append(message_id) progress.console.print( f"Moved message to server Archive: {message_id}" ) elif status == 404: # Message not in Inbox (maybe already archived or deleted on server) # Mark as synced so we don't retry, but keep local copy newly_synced_ids.add(message_id) progress.console.print( f"Message not in Inbox (already archived?): {message_id}" ) else: progress.console.print( f"Failed to move message to Archive: {message_id}, status: {status}" ) except Exception as e: progress.console.print(f"Batch archive request failed: {str(e)}") # Fall back to individual requests for this batch for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] try: status = await post_with_aiohttp( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move", headers, {"destinationId": archive_folder_id}, ) if status == 201: newly_synced_ids.add(message_id) successful_moves.append(message_id) progress.console.print( f"Moved message to server Archive (fallback): {message_id}" ) elif status == 404: newly_synced_ids.add(message_id) progress.console.print( f"Message not in Inbox (already archived?): {message_id}" ) else: progress.console.print( f"Failed to move message to Archive: {message_id}, status: {status}" ) except Exception as individual_error: progress.console.print( f"Failed to archive {message_id}: {str(individual_error)}" ) else: # Dry run - just log what would be done for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] progress.console.print( f"[DRY-RUN] Would move message to server Archive: {message_id}" ) progress.advance(task_id, len(batch_files)) # Save sync state after each batch for resilience if not dry_run and newly_synced_ids: synced_ids.update(newly_synced_ids) _save_archive_sync_state(maildir_path, synced_ids) # Final summary if not dry_run and successful_moves: progress.console.print( f"Successfully synced {len(successful_moves)} messages to server Archive (kept local copies)" ) return async def fetch_archive_mail_async( maildir_path, attachments_dir, headers, progress, task_id, dry_run=False, download_attachments=False, max_messages=None, is_cancelled=None, ): """ Fetch archived mail from Microsoft Graph API Archive folder and save to local .Archive Maildir. Args: maildir_path (str): Path to the Maildir. attachments_dir (str): Path to save attachments. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. download_attachments (bool): If True, download email attachments. max_messages (int, optional): Maximum number of messages to fetch. None = all. is_cancelled (callable, optional): Callback that returns True if task should stop. Returns: None """ from src.utils.mail_utils.maildir import save_mime_to_maildir_async # Use the well-known 'archive' folder name mail_url = "https://graph.microsoft.com/v1.0/me/mailFolders/archive/messages?$top=100&$orderby=receivedDateTime desc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead" messages = [] # Fetch the total count of messages in the archive archive_info_url = "https://graph.microsoft.com/v1.0/me/mailFolders/archive" try: response = await fetch_with_aiohttp(archive_info_url, headers) total_messages = response.get("totalItemCount", 0) if response else 0 except Exception as e: progress.console.print(f"Error fetching archive folder info: {e}") total_messages = 0 # Apply max_messages limit if specified effective_total = ( min(total_messages, max_messages) if max_messages else total_messages ) progress.update(task_id, total=effective_total) progress.console.print( f"Archive folder has {total_messages} messages" + (f", fetching up to {max_messages}" if max_messages else "") ) # Fetch messages from archive fetched_count = 0 while mail_url: try: response_data = await fetch_with_aiohttp(mail_url, headers) except Exception as e: progress.console.print(f"Error fetching archive messages: {e}") break batch = response_data.get("value", []) if response_data else [] # Apply max_messages limit if max_messages and fetched_count + len(batch) > max_messages: batch = batch[: max_messages - fetched_count] messages.extend(batch) fetched_count += len(batch) break messages.extend(batch) fetched_count += len(batch) progress.advance(task_id, len(batch)) # Get the next page URL from @odata.nextLink mail_url = response_data.get("@odata.nextLink") if response_data else None # Set up local archive directory paths archive_dir = os.path.join(maildir_path, ".Archive") cur_dir = os.path.join(archive_dir, "cur") new_dir = os.path.join(archive_dir, "new") # Ensure directories exist os.makedirs(cur_dir, exist_ok=True) os.makedirs(new_dir, exist_ok=True) os.makedirs(os.path.join(archive_dir, "tmp"), exist_ok=True) # Get local message IDs in archive cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*"))) new_files = set(glob.glob(os.path.join(new_dir, "*.eml*"))) local_msg_ids = set() for filename in set.union(cur_files, new_files): message_id = os.path.basename(filename).split(".")[0] local_msg_ids.add(message_id) # Filter messages to only include those not already local messages_to_download = [msg for msg in messages if msg["id"] not in local_msg_ids] progress.console.print( f"Found {len(messages)} messages on server Archive, {len(local_msg_ids)} already local" ) progress.console.print( f"Downloading {len(messages_to_download)} new archived messages" ) # Update progress to reflect only the messages we actually need to download progress.update(task_id, total=len(messages_to_download), completed=0) # Load sync state once, we'll update it after each batch for resilience synced_ids = _load_archive_sync_state(maildir_path) if not dry_run else set() downloaded_count = 0 # Download messages in parallel batches for better performance BATCH_SIZE = 5 for i in range(0, len(messages_to_download), BATCH_SIZE): # Check if task was cancelled/disabled if is_cancelled and is_cancelled(): progress.console.print("Task cancelled, stopping archive fetch") break batch = messages_to_download[i : i + BATCH_SIZE] batch_msg_ids = [] # Create tasks for parallel download async def download_message(message): progress.console.print( f"Processing archived message: {message.get('subject', 'No Subject')[:50]}", end="\r", ) # Save to .Archive folder instead of main maildir await save_mime_to_maildir_async( archive_dir, # Use archive_dir instead of maildir_path message, attachments_dir, headers, progress, dry_run, download_attachments, ) return message["id"] # Execute batch in parallel tasks = [download_message(msg) for msg in batch] results = await asyncio.gather(*tasks, return_exceptions=True) # Process results and collect successful message IDs for result in results: if isinstance(result, Exception): progress.console.print(f"Error downloading archived message: {result}") elif result: batch_msg_ids.append(result) downloaded_count += 1 progress.update(task_id, advance=len(batch)) # Update sync state after each batch (not each message) for resilience + performance if not dry_run and batch_msg_ids: synced_ids.update(batch_msg_ids) _save_archive_sync_state(maildir_path, synced_ids) progress.update(task_id, completed=downloaded_count) progress.console.print( f"\nFinished downloading {downloaded_count} archived messages." ) progress.console.print( f"Total in server Archive: {total_messages}, Already local: {len(local_msg_ids)}" ) # Also add any messages we already had locally (from the full server list) # to ensure they're marked as synced if not dry_run and messages: for msg in messages: synced_ids.add(msg["id"]) _save_archive_sync_state(maildir_path, synced_ids) async def delete_mail_async(maildir_path, headers, progress, task_id, dry_run=False): """ Delete mail from Maildir and Microsoft Graph API using batch operations. Args: maildir_path (str): Path to the Maildir. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. Returns: None """ trash_dir = os.path.join(maildir_path, ".Trash", "cur") trash_files = set(glob.glob(os.path.join(trash_dir, "*.eml*"))) if not trash_files: progress.update(task_id, total=0, completed=0) progress.console.print("No messages to delete") return progress.update(task_id, total=len(trash_files)) # Process files in batches of 20 (Microsoft Graph batch limit) batch_size = 20 trash_files_list = list(trash_files) successful_deletes = [] for i in range(0, len(trash_files_list), batch_size): batch_files = trash_files_list[i : i + batch_size] # Add small delay between batches to respect API limits if i > 0: await asyncio.sleep(0.5) if not dry_run: # Prepare batch requests batch_requests = [] for idx, filepath in enumerate(batch_files): message_id = os.path.basename(filepath).split(".")[0] batch_requests.append( { "id": str(idx + 1), "method": "DELETE", "url": f"/me/messages/{message_id}", } ) try: # Execute batch request batch_response = await batch_with_aiohttp(batch_requests, headers) # Process batch results for response in batch_response.get("responses", []): request_id = ( int(response["id"]) - 1 ) # Convert back to 0-based index filepath = batch_files[request_id] message_id = os.path.basename(filepath).split(".")[0] status = response["status"] if ( status == 204 or status == 404 ): # 204 No Content or 404 Not Found (already deleted) os.remove(filepath) # Remove the file from local trash successful_deletes.append(message_id) progress.console.print(f"Deleted message: {message_id}") else: progress.console.print( f"Failed to delete message: {message_id}, status: {status}" ) except Exception as e: progress.console.print(f"Batch delete request failed: {str(e)}") # Fall back to individual requests for this batch for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] try: status = await delete_with_aiohttp( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers, ) if status == 204 or status == 404: os.remove(filepath) successful_deletes.append(message_id) progress.console.print( f"Deleted message (fallback): {message_id}" ) else: progress.console.print( f"Failed to delete message: {message_id}, status: {status}" ) except Exception as individual_error: progress.console.print( f"Failed to delete {message_id}: {str(individual_error)}" ) else: # Dry run - just log what would be done for filepath in batch_files: message_id = os.path.basename(filepath).split(".")[0] progress.console.print(f"[DRY-RUN] Would delete message: {message_id}") progress.advance(task_id, len(batch_files)) if not dry_run: progress.console.print( f"Successfully deleted {len(successful_deletes)} messages in batches" ) async def get_inbox_count_async(headers): """ Get the number of messages in the inbox. Args: headers (dict): Headers including authentication. Returns: int: The number of messages in the inbox. """ inbox_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox" response = await fetch_with_aiohttp(inbox_url, headers) return response.get("totalItemCount", 0) async def synchronize_maildir_async( maildir_path, headers, progress, task_id, dry_run=False ): """ Synchronize Maildir with Microsoft Graph API using batch operations. Args: maildir_path (str): Path to the Maildir. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. task_id: ID of the task in the progress bar. dry_run (bool): If True, don't actually make changes. Returns: None """ from src.utils.mail_utils.helpers import ( load_last_sync_timestamp, save_sync_timestamp, truncate_id, ) last_sync = load_last_sync_timestamp() # Find messages moved from "new" to "cur" and mark them as read new_dir = os.path.join(maildir_path, "new") cur_dir = os.path.join(maildir_path, "cur") new_files = set(glob.glob(os.path.join(new_dir, "*.eml*"))) cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*"))) moved_to_cur = [os.path.basename(f) for f in cur_files - new_files] # Filter out files that haven't been modified since last sync files_to_process = [] for filename in moved_to_cur: if os.path.getmtime(os.path.join(cur_dir, filename)) >= last_sync: files_to_process.append(filename) if not files_to_process: progress.update(task_id, total=0, completed=0) progress.console.print("No messages to mark as read") # Save timestamp even if no work was done if not dry_run: save_sync_timestamp() return progress.update(task_id, total=len(files_to_process)) # Process files in batches of 20 (Microsoft Graph batch limit) batch_size = 20 successful_reads = [] for i in range(0, len(files_to_process), batch_size): batch_files = files_to_process[i : i + batch_size] # Add small delay between batches to respect API limits if i > 0: await asyncio.sleep(0.5) if not dry_run: # Prepare batch requests batch_requests = [] for idx, filename in enumerate(batch_files): message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) batch_requests.append( { "id": str(idx + 1), "method": "PATCH", "url": f"/me/messages/{message_id}", "body": {"isRead": True}, "headers": {"Content-Type": "application/json"}, } ) try: # Execute batch request batch_response = await batch_with_aiohttp(batch_requests, headers) # Process batch results for response in batch_response.get("responses", []): request_id = ( int(response["id"]) - 1 ) # Convert back to 0-based index filename = batch_files[request_id] message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) status = response["status"] if status == 200: # 200 OK indicates successful update successful_reads.append(message_id) progress.console.print( f"Marked message as read: {truncate_id(message_id)}" ) elif status == 404: os.remove( os.path.join(cur_dir, filename) ) # Remove file if message doesn't exist on server progress.console.print( f"Message not found on server, removed local copy: {truncate_id(message_id)}" ) else: progress.console.print( f"Failed to mark message as read: {truncate_id(message_id)}, status: {status}" ) except Exception as e: progress.console.print(f"Batch read-status request failed: {str(e)}") # Fall back to individual requests for this batch for filename in batch_files: message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) try: status = await patch_with_aiohttp( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers, {"isRead": True}, ) if status == 200: successful_reads.append(message_id) progress.console.print( f"Marked message as read (fallback): {truncate_id(message_id)}" ) elif status == 404: os.remove(os.path.join(cur_dir, filename)) progress.console.print( f"Message not found on server, removed local copy: {truncate_id(message_id)}" ) else: progress.console.print( f"Failed to mark message as read: {truncate_id(message_id)}, status: {status}" ) except Exception as individual_error: progress.console.print( f"Failed to update read status for {truncate_id(message_id)}: {str(individual_error)}" ) else: # Dry run - just log what would be done for filename in batch_files: message_id = re.sub(r"\:2.+", "", filename.split(".")[0]) progress.console.print( f"[DRY-RUN] Would mark message as read: {truncate_id(message_id)}" ) progress.advance(task_id, len(batch_files)) # Save the current sync timestamp if not dry_run: save_sync_timestamp() progress.console.print( f"Successfully marked {len(successful_reads)} messages as read in batches" ) else: progress.console.print("[DRY-RUN] Would save sync timestamp.") def parse_email_for_graph_api(email_content: str) -> Dict[str, Any]: """ Parse email content and convert to Microsoft Graph API message format. Args: email_content: Raw email content (RFC 5322 format) Returns: Dictionary formatted for Microsoft Graph API send message """ parser = Parser() msg = parser.parsestr(email_content) # Parse recipients def parse_recipients(header_value: str) -> List[Dict[str, Any]]: if not header_value: return [] addresses = getaddresses([header_value]) return [ {"emailAddress": {"address": addr, "name": name if name else addr}} for name, addr in addresses if addr ] to_recipients = parse_recipients(msg.get("To", "")) cc_recipients = parse_recipients(msg.get("Cc", "")) bcc_recipients = parse_recipients(msg.get("Bcc", "")) # Get body content body_content = "" body_type = "text" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": body_content = part.get_payload(decode=True).decode( "utf-8", errors="ignore" ) body_type = "text" break elif part.get_content_type() == "text/html": body_content = part.get_payload(decode=True).decode( "utf-8", errors="ignore" ) body_type = "html" else: body_content = msg.get_payload(decode=True).decode("utf-8", errors="ignore") if msg.get_content_type() == "text/html": body_type = "html" # Build Graph API message message = { "subject": msg.get("Subject", ""), "body": {"contentType": body_type, "content": body_content}, "toRecipients": to_recipients, "ccRecipients": cc_recipients, "bccRecipients": bcc_recipients, } # Add reply-to if present reply_to = msg.get("Reply-To", "") if reply_to: message["replyTo"] = parse_recipients(reply_to) return message async def send_email_async( email_content: str, headers: Dict[str, str], dry_run: bool = False ) -> bool: """ Send email using Microsoft Graph API. Args: email_content: Raw email content (RFC 5322 format) headers: Authentication headers for Microsoft Graph API dry_run: If True, don't actually send the email Returns: True if email was sent successfully, False otherwise """ try: # Parse email content for Graph API message_data = parse_email_for_graph_api(email_content) if dry_run: print(f"[DRY-RUN] Would send email: {message_data['subject']}") print( f"[DRY-RUN] To: {[r['emailAddress']['address'] for r in message_data['toRecipients']]}" ) return True # Send email via Graph API send_url = "https://graph.microsoft.com/v1.0/me/sendMail" # Log attempt import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler( os.path.expanduser("~/Mail/sendmail.log"), mode="a" ), ], ) logging.info( f"Attempting to send email: {message_data['subject']} to {[r['emailAddress']['address'] for r in message_data['toRecipients']]}" ) response = await post_with_aiohttp(send_url, headers, {"message": message_data}) # Microsoft Graph sendMail returns 202 Accepted on success if response == 202: logging.info(f"Successfully sent email: {message_data['subject']}") return True else: logging.error( f"Unexpected response code {response} when sending email: {message_data['subject']}" ) return False except Exception as e: import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler( os.path.expanduser("~/Mail/sendmail.log"), mode="a" ), ], ) logging.error(f"Exception sending email: {e}", exc_info=True) print(f"Error sending email: {e}") return False async def process_outbox_async( maildir_path: str, org: str, headers: Dict[str, str], progress, task_id, dry_run: bool = False, ) -> tuple[int, int]: """ Process outbound emails in the outbox queue. Args: maildir_path: Base maildir path org: Organization name headers: Authentication headers for Microsoft Graph API progress: Progress instance for updating progress bars task_id: ID of the task in the progress bar dry_run: If True, don't actually send emails Returns: Tuple of (successful_sends, failed_sends) """ outbox_path = os.path.join(maildir_path, org, "outbox") new_dir = os.path.join(outbox_path, "new") cur_dir = os.path.join(outbox_path, "cur") failed_dir = os.path.join(outbox_path, "failed") # Ensure directories exist from src.utils.mail_utils.helpers import ensure_directory_exists ensure_directory_exists(failed_dir) # Get pending emails pending_emails = [] if os.path.exists(new_dir): pending_emails = [f for f in os.listdir(new_dir) if not f.startswith(".")] if not pending_emails: progress.update(task_id, total=0, completed=0) return 0, 0 progress.update(task_id, total=len(pending_emails)) progress.console.print( f"Processing {len(pending_emails)} outbound emails for {org}" ) successful_sends = 0 failed_sends = 0 for email_file in pending_emails: email_path = os.path.join(new_dir, email_file) try: # Read email content with open(email_path, "r", encoding="utf-8") as f: email_content = f.read() # Send email if await send_email_async(email_content, headers, dry_run): # Move to cur directory on success if not dry_run: cur_path = os.path.join(cur_dir, email_file) os.rename(email_path, cur_path) progress.console.print(f"✓ Sent email: {email_file}") else: progress.console.print(f"[DRY-RUN] Would send email: {email_file}") successful_sends += 1 else: # Move to failed directory on failure if not dry_run: failed_path = os.path.join(failed_dir, email_file) os.rename(email_path, failed_path) progress.console.print(f"✗ Failed to send email: {email_file}") # Log the failure import logging logging.error(f"Failed to send email: {email_file}") # Send notification about failure from src.utils.notifications import send_notification parser = Parser() msg = parser.parsestr(email_content) subject = msg.get("Subject", "Unknown") send_notification( title="Email Send Failed", message=f"Failed to send: {subject}", subtitle=f"Check {failed_dir}", sound="default", ) failed_sends += 1 except Exception as e: progress.console.print(f"✗ Error processing {email_file}: {e}") if not dry_run: # Move to failed directory failed_path = os.path.join(failed_dir, email_file) try: os.rename(email_path, failed_path) except (OSError, FileNotFoundError): pass # File might already be moved or deleted failed_sends += 1 progress.advance(task_id, 1) if not dry_run and successful_sends > 0: progress.console.print(f"✓ Successfully sent {successful_sends} emails") # Send success notification from src.utils.notifications import send_notification if successful_sends == 1: send_notification( title="Email Sent", message="1 email sent successfully", subtitle=f"from {org}", sound="default", ) else: send_notification( title="Emails Sent", message=f"{successful_sends} emails sent successfully", subtitle=f"from {org}", sound="default", ) if failed_sends > 0: progress.console.print(f"✗ Failed to send {failed_sends} emails") return successful_sends, failed_sends