From 8933dadcd010891a5c6cfddfe74e4d3575f431a9 Mon Sep 17 00:00:00 2001 From: Bendt Date: Fri, 19 Dec 2025 15:53:34 -0500 Subject: [PATCH] Improve mail sync performance with connection pooling and larger batches --- src/services/microsoft_graph/mail.py | 6 +- src/utils/mail_utils/maildir.py | 308 +++++++++++++++------------ 2 files changed, 176 insertions(+), 138 deletions(-) diff --git a/src/services/microsoft_graph/mail.py b/src/services/microsoft_graph/mail.py index deeeddb..26948b5 100644 --- a/src/services/microsoft_graph/mail.py +++ b/src/services/microsoft_graph/mail.py @@ -111,7 +111,8 @@ async def fetch_mail_async( downloaded_count = 0 # Download messages in parallel batches for better performance - BATCH_SIZE = 5 + # Using 10 concurrent downloads with connection pooling for better throughput + BATCH_SIZE = 10 for i in range(0, len(messages_to_download), BATCH_SIZE): # Check if task was cancelled/disabled @@ -487,7 +488,8 @@ async def fetch_archive_mail_async( downloaded_count = 0 # Download messages in parallel batches for better performance - BATCH_SIZE = 5 + # Using 10 concurrent downloads with connection pooling for better throughput + BATCH_SIZE = 10 for i in range(0, len(messages_to_download), BATCH_SIZE): # Check if task was cancelled/disabled diff --git a/src/utils/mail_utils/maildir.py b/src/utils/mail_utils/maildir.py index a846a1e..a2d83c2 100644 --- a/src/utils/mail_utils/maildir.py +++ b/src/utils/mail_utils/maildir.py @@ -27,6 +27,31 @@ from src.utils.mail_utils.helpers import ( ) +# Module-level session for reuse +_shared_session: aiohttp.ClientSession | None = None + + +async def get_shared_session() -> aiohttp.ClientSession: + """Get or create a shared aiohttp session for connection reuse.""" + global _shared_session + if _shared_session is None or _shared_session.closed: + connector = aiohttp.TCPConnector( + limit=20, # Max concurrent connections + limit_per_host=10, # Max connections per host + ttl_dns_cache=300, # Cache DNS for 5 minutes + ) + _shared_session = aiohttp.ClientSession(connector=connector) + return _shared_session + + +async def close_shared_session(): + """Close the shared session when done.""" + global _shared_session + if _shared_session and not _shared_session.closed: + await _shared_session.close() + _shared_session = None + + async def save_mime_to_maildir_async( maildir_path, message, @@ -136,63 +161,68 @@ async def create_mime_message_async( # First try the direct body content approach message_id = message.get("id", "") + + # Get shared session for connection reuse + session = await get_shared_session() + try: # First get the message with body content body_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}?$select=body,bodyPreview" - async with aiohttp.ClientSession() as session: - async with session.get(body_url, headers=headers) as response: - if response.status == 200: - body_data = await response.json() + async with session.get(body_url, headers=headers) as response: + if response.status == 200: + body_data = await response.json() - # Get body content - body_content = body_data.get("body", {}).get("content", "") - body_type = body_data.get("body", {}).get("contentType", "text") - body_preview = body_data.get("bodyPreview", "") + # Get body content + body_content = body_data.get("body", {}).get("content", "") + body_type = body_data.get("body", {}).get("contentType", "text") + body_preview = body_data.get("bodyPreview", "") - # If we have body content, use it - if body_content: - if body_type.lower() == "html": - # Add both HTML and plain text versions - # Plain text conversion - plain_text = re.sub(r"", "\n", body_content) - plain_text = re.sub(r"<[^>]*>", "", plain_text) + # If we have body content, use it + if body_content: + if body_type.lower() == "html": + # Add both HTML and plain text versions + # Plain text conversion + plain_text = re.sub(r"", "\n", body_content) + plain_text = re.sub(r"<[^>]*>", "", plain_text) - mime_msg.attach(MIMEText(plain_text, "plain")) - mime_msg.attach(MIMEText(body_content, "html")) - else: - # Just plain text - mime_msg.attach(MIMEText(body_content, "plain")) - elif body_preview: - # Use preview if we have it - mime_msg.attach( - MIMEText( - f"{body_preview}\n\n[Message preview only. Full content not available.]", - "plain", - ) - ) + mime_msg.attach(MIMEText(plain_text, "plain")) + mime_msg.attach(MIMEText(body_content, "html")) else: - # Fallback to MIME content - progress.console.print( - f"No direct body content for message {truncate_id(message_id)}, trying MIME content..." + # Just plain text + mime_msg.attach(MIMEText(body_content, "plain")) + elif body_preview: + # Use preview if we have it + mime_msg.attach( + MIMEText( + f"{body_preview}\n\n[Message preview only. Full content not available.]", + "plain", ) - await fetch_mime_content( - mime_msg, message_id, headers, progress - ) - else: - progress.console.print( - f"Failed to get message body: {response.status}. Trying MIME content..." ) - await fetch_mime_content(mime_msg, message_id, headers, progress) + else: + # Fallback to MIME content + progress.console.print( + f"No direct body content for message {truncate_id(message_id)}, trying MIME content..." + ) + await fetch_mime_content( + mime_msg, message_id, headers, progress, session + ) + else: + progress.console.print( + f"Failed to get message body: {response.status}. Trying MIME content..." + ) + await fetch_mime_content( + mime_msg, message_id, headers, progress, session + ) except Exception as e: progress.console.print( f"Error getting message body: {e}. Trying MIME content..." ) - await fetch_mime_content(mime_msg, message_id, headers, progress) + await fetch_mime_content(mime_msg, message_id, headers, progress, session) # Handle attachments only if we want to download them if download_attachments: await add_attachments_async( - mime_msg, message, headers, attachments_dir, progress + mime_msg, message, headers, attachments_dir, progress, session ) else: # Add a header to indicate attachment info was skipped @@ -201,7 +231,7 @@ async def create_mime_message_async( return mime_msg -async def fetch_mime_content(mime_msg, message_id, headers, progress): +async def fetch_mime_content(mime_msg, message_id, headers, progress, session=None): """ Fetch and add MIME content to a message when direct body access fails. @@ -210,72 +240,78 @@ async def fetch_mime_content(mime_msg, message_id, headers, progress): message_id (str): Message ID. headers (dict): Headers including authentication. progress: Progress instance for updating progress bars. + session (aiohttp.ClientSession, optional): Shared session to use. """ # Fallback to getting the MIME content message_content_url = ( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value" ) try: - async with aiohttp.ClientSession() as session: - async with session.get(message_content_url, headers=headers) as response: - if response.status == 200: - full_content = await response.text() + # Use provided session or get shared session + if session is None: + session = await get_shared_session() - # Check for body tags - body_match = re.search( - r"]*>(.*?)", + async with session.get(message_content_url, headers=headers) as response: + if response.status == 200: + full_content = await response.text() + + # Check for body tags + body_match = re.search( + r"]*>(.*?)", + full_content, + re.DOTALL | re.IGNORECASE, + ) + if body_match: + body_content = body_match.group(1) + # Simple HTML to text conversion + body_text = re.sub(r"", "\n", body_content) + body_text = re.sub(r"<[^>]*>", "", body_text) + + # Add the plain text body + mime_msg.attach(MIMEText(body_text, "plain")) + + # Also add the HTML body + mime_msg.attach(MIMEText(full_content, "html")) + else: + # Fallback - try to find content between Content-Type: text/html and next boundary + html_parts = re.findall( + r"Content-Type: text/html.*?\r?\n\r?\n(.*?)(?:\r?\n\r?\n|$)", full_content, re.DOTALL | re.IGNORECASE, ) - if body_match: - body_content = body_match.group(1) - # Simple HTML to text conversion - body_text = re.sub(r"", "\n", body_content) - body_text = re.sub(r"<[^>]*>", "", body_text) + if html_parts: + html_content = html_parts[0] + mime_msg.attach(MIMEText(html_content, "html")) - # Add the plain text body - mime_msg.attach(MIMEText(body_text, "plain")) - - # Also add the HTML body - mime_msg.attach(MIMEText(full_content, "html")) + # Also make plain text version + plain_text = re.sub(r"", "\n", html_content) + plain_text = re.sub(r"<[^>]*>", "", plain_text) + mime_msg.attach(MIMEText(plain_text, "plain")) else: - # Fallback - try to find content between Content-Type: text/html and next boundary - html_parts = re.findall( - r"Content-Type: text/html.*?\r?\n\r?\n(.*?)(?:\r?\n\r?\n|$)", - full_content, - re.DOTALL | re.IGNORECASE, - ) - if html_parts: - html_content = html_parts[0] - mime_msg.attach(MIMEText(html_content, "html")) - - # Also make plain text version - plain_text = re.sub(r"", "\n", html_content) - plain_text = re.sub(r"<[^>]*>", "", plain_text) - mime_msg.attach(MIMEText(plain_text, "plain")) - else: - # Just use the raw content as text if nothing else works - mime_msg.attach(MIMEText(full_content, "plain")) - progress.console.print( - f"Using raw content for message {message_id} - no body tags found" - ) - else: - error_text = await response.text() - progress.console.print( - f"Failed to get MIME content: {response.status} {error_text}" - ) - mime_msg.attach( - MIMEText( - f"Failed to retrieve message body: HTTP {response.status}", - "plain", + # Just use the raw content as text if nothing else works + mime_msg.attach(MIMEText(full_content, "plain")) + progress.console.print( + f"Using raw content for message {message_id} - no body tags found" ) + else: + error_text = await response.text() + progress.console.print( + f"Failed to get MIME content: {response.status} {error_text}" + ) + mime_msg.attach( + MIMEText( + f"Failed to retrieve message body: HTTP {response.status}", + "plain", ) + ) except Exception as e: progress.console.print(f"Error retrieving MIME content: {e}") mime_msg.attach(MIMEText(f"Failed to retrieve message body: {str(e)}", "plain")) -async def add_attachments_async(mime_msg, message, headers, attachments_dir, progress): +async def add_attachments_async( + mime_msg, message, headers, attachments_dir, progress, session=None +): """ Add attachments to a MIME message. @@ -285,6 +321,7 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro headers (dict): Headers including authentication. attachments_dir (str): Path to save attachments. progress: Progress instance for updating progress bars. + session (aiohttp.ClientSession, optional): Shared session to use. Returns: None @@ -296,58 +333,57 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments" ) - async with aiohttp.ClientSession() as session: - async with session.get(attachments_url, headers=headers) as response: - if response.status != 200: - return + # Use provided session or get shared session + if session is None: + session = await get_shared_session() - attachments_data = await response.json() - attachments = attachments_data.get("value", []) + async with session.get(attachments_url, headers=headers) as response: + if response.status != 200: + return - if not attachments: - return + attachments_data = await response.json() + attachments = attachments_data.get("value", []) - # Create a directory for this message's attachments - message_attachments_dir = os.path.join(attachments_dir, message_id) - ensure_directory_exists(message_attachments_dir) + if not attachments: + return - # Add a header with attachment count - mime_msg["X-Attachment-Count"] = str(len(attachments)) + # Create a directory for this message's attachments + message_attachments_dir = os.path.join(attachments_dir, message_id) + ensure_directory_exists(message_attachments_dir) - for idx, attachment in enumerate(attachments): - attachment_name = safe_filename(attachment.get("name", "attachment")) - attachment_type = attachment.get( - "contentType", "application/octet-stream" + # Add a header with attachment count + mime_msg["X-Attachment-Count"] = str(len(attachments)) + + for idx, attachment in enumerate(attachments): + attachment_name = safe_filename(attachment.get("name", "attachment")) + attachment_type = attachment.get("contentType", "application/octet-stream") + + # Add attachment info to headers for reference + mime_msg[f"X-Attachment-{idx + 1}-Name"] = attachment_name + mime_msg[f"X-Attachment-{idx + 1}-Type"] = attachment_type + + attachment_part = MIMEBase(*attachment_type.split("/", 1)) + + # Get attachment content + if "contentBytes" in attachment: + attachment_content = base64.b64decode(attachment["contentBytes"]) + + # Save attachment to disk + attachment_path = os.path.join(message_attachments_dir, attachment_name) + with open(attachment_path, "wb") as f: + f.write(attachment_content) + + # Add to MIME message + attachment_part.set_payload(attachment_content) + encoders.encode_base64(attachment_part) + attachment_part.add_header( + "Content-Disposition", + f'attachment; filename="{attachment_name}"', ) + mime_msg.attach(attachment_part) - # Add attachment info to headers for reference - mime_msg[f"X-Attachment-{idx + 1}-Name"] = attachment_name - mime_msg[f"X-Attachment-{idx + 1}-Type"] = attachment_type - - attachment_part = MIMEBase(*attachment_type.split("/", 1)) - - # Get attachment content - if "contentBytes" in attachment: - attachment_content = base64.b64decode(attachment["contentBytes"]) - - # Save attachment to disk - attachment_path = os.path.join( - message_attachments_dir, attachment_name - ) - with open(attachment_path, "wb") as f: - f.write(attachment_content) - - # Add to MIME message - attachment_part.set_payload(attachment_content) - encoders.encode_base64(attachment_part) - attachment_part.add_header( - "Content-Disposition", - f'attachment; filename="{attachment_name}"', - ) - mime_msg.attach(attachment_part) - - progress.console.print(f"Downloaded attachment: {attachment_name}") - else: - progress.console.print( - f"Skipping attachment with no content: {attachment_name}" - ) + progress.console.print(f"Downloaded attachment: {attachment_name}") + else: + progress.console.print( + f"Skipping attachment with no content: {attachment_name}" + )