Improve mail sync performance with connection pooling and larger batches

This commit is contained in:
Bendt
2025-12-19 15:53:34 -05:00
parent aaabd83fc7
commit 8933dadcd0
2 changed files with 176 additions and 138 deletions

View File

@@ -111,7 +111,8 @@ async def fetch_mail_async(
downloaded_count = 0 downloaded_count = 0
# Download messages in parallel batches for better performance # Download messages in parallel batches for better performance
BATCH_SIZE = 5 # Using 10 concurrent downloads with connection pooling for better throughput
BATCH_SIZE = 10
for i in range(0, len(messages_to_download), BATCH_SIZE): for i in range(0, len(messages_to_download), BATCH_SIZE):
# Check if task was cancelled/disabled # Check if task was cancelled/disabled
@@ -487,7 +488,8 @@ async def fetch_archive_mail_async(
downloaded_count = 0 downloaded_count = 0
# Download messages in parallel batches for better performance # Download messages in parallel batches for better performance
BATCH_SIZE = 5 # Using 10 concurrent downloads with connection pooling for better throughput
BATCH_SIZE = 10
for i in range(0, len(messages_to_download), BATCH_SIZE): for i in range(0, len(messages_to_download), BATCH_SIZE):
# Check if task was cancelled/disabled # Check if task was cancelled/disabled

View File

@@ -27,6 +27,31 @@ from src.utils.mail_utils.helpers import (
) )
# Module-level session for reuse
_shared_session: aiohttp.ClientSession | None = None
async def get_shared_session() -> aiohttp.ClientSession:
"""Get or create a shared aiohttp session for connection reuse."""
global _shared_session
if _shared_session is None or _shared_session.closed:
connector = aiohttp.TCPConnector(
limit=20, # Max concurrent connections
limit_per_host=10, # Max connections per host
ttl_dns_cache=300, # Cache DNS for 5 minutes
)
_shared_session = aiohttp.ClientSession(connector=connector)
return _shared_session
async def close_shared_session():
"""Close the shared session when done."""
global _shared_session
if _shared_session and not _shared_session.closed:
await _shared_session.close()
_shared_session = None
async def save_mime_to_maildir_async( async def save_mime_to_maildir_async(
maildir_path, maildir_path,
message, message,
@@ -136,10 +161,13 @@ async def create_mime_message_async(
# First try the direct body content approach # First try the direct body content approach
message_id = message.get("id", "") message_id = message.get("id", "")
# Get shared session for connection reuse
session = await get_shared_session()
try: try:
# First get the message with body content # First get the message with body content
body_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}?$select=body,bodyPreview" body_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}?$select=body,bodyPreview"
async with aiohttp.ClientSession() as session:
async with session.get(body_url, headers=headers) as response: async with session.get(body_url, headers=headers) as response:
if response.status == 200: if response.status == 200:
body_data = await response.json() body_data = await response.json()
@@ -176,23 +204,25 @@ async def create_mime_message_async(
f"No direct body content for message {truncate_id(message_id)}, trying MIME content..." f"No direct body content for message {truncate_id(message_id)}, trying MIME content..."
) )
await fetch_mime_content( await fetch_mime_content(
mime_msg, message_id, headers, progress mime_msg, message_id, headers, progress, session
) )
else: else:
progress.console.print( progress.console.print(
f"Failed to get message body: {response.status}. Trying MIME content..." f"Failed to get message body: {response.status}. Trying MIME content..."
) )
await fetch_mime_content(mime_msg, message_id, headers, progress) await fetch_mime_content(
mime_msg, message_id, headers, progress, session
)
except Exception as e: except Exception as e:
progress.console.print( progress.console.print(
f"Error getting message body: {e}. Trying MIME content..." f"Error getting message body: {e}. Trying MIME content..."
) )
await fetch_mime_content(mime_msg, message_id, headers, progress) await fetch_mime_content(mime_msg, message_id, headers, progress, session)
# Handle attachments only if we want to download them # Handle attachments only if we want to download them
if download_attachments: if download_attachments:
await add_attachments_async( await add_attachments_async(
mime_msg, message, headers, attachments_dir, progress mime_msg, message, headers, attachments_dir, progress, session
) )
else: else:
# Add a header to indicate attachment info was skipped # Add a header to indicate attachment info was skipped
@@ -201,7 +231,7 @@ async def create_mime_message_async(
return mime_msg return mime_msg
async def fetch_mime_content(mime_msg, message_id, headers, progress): async def fetch_mime_content(mime_msg, message_id, headers, progress, session=None):
""" """
Fetch and add MIME content to a message when direct body access fails. Fetch and add MIME content to a message when direct body access fails.
@@ -210,13 +240,17 @@ async def fetch_mime_content(mime_msg, message_id, headers, progress):
message_id (str): Message ID. message_id (str): Message ID.
headers (dict): Headers including authentication. headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars. progress: Progress instance for updating progress bars.
session (aiohttp.ClientSession, optional): Shared session to use.
""" """
# Fallback to getting the MIME content # Fallback to getting the MIME content
message_content_url = ( message_content_url = (
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value" f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value"
) )
try: try:
async with aiohttp.ClientSession() as session: # Use provided session or get shared session
if session is None:
session = await get_shared_session()
async with session.get(message_content_url, headers=headers) as response: async with session.get(message_content_url, headers=headers) as response:
if response.status == 200: if response.status == 200:
full_content = await response.text() full_content = await response.text()
@@ -275,7 +309,9 @@ async def fetch_mime_content(mime_msg, message_id, headers, progress):
mime_msg.attach(MIMEText(f"Failed to retrieve message body: {str(e)}", "plain")) mime_msg.attach(MIMEText(f"Failed to retrieve message body: {str(e)}", "plain"))
async def add_attachments_async(mime_msg, message, headers, attachments_dir, progress): async def add_attachments_async(
mime_msg, message, headers, attachments_dir, progress, session=None
):
""" """
Add attachments to a MIME message. Add attachments to a MIME message.
@@ -285,6 +321,7 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro
headers (dict): Headers including authentication. headers (dict): Headers including authentication.
attachments_dir (str): Path to save attachments. attachments_dir (str): Path to save attachments.
progress: Progress instance for updating progress bars. progress: Progress instance for updating progress bars.
session (aiohttp.ClientSession, optional): Shared session to use.
Returns: Returns:
None None
@@ -296,7 +333,10 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments" f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments"
) )
async with aiohttp.ClientSession() as session: # Use provided session or get shared session
if session is None:
session = await get_shared_session()
async with session.get(attachments_url, headers=headers) as response: async with session.get(attachments_url, headers=headers) as response:
if response.status != 200: if response.status != 200:
return return
@@ -316,9 +356,7 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro
for idx, attachment in enumerate(attachments): for idx, attachment in enumerate(attachments):
attachment_name = safe_filename(attachment.get("name", "attachment")) attachment_name = safe_filename(attachment.get("name", "attachment"))
attachment_type = attachment.get( attachment_type = attachment.get("contentType", "application/octet-stream")
"contentType", "application/octet-stream"
)
# Add attachment info to headers for reference # Add attachment info to headers for reference
mime_msg[f"X-Attachment-{idx + 1}-Name"] = attachment_name mime_msg[f"X-Attachment-{idx + 1}-Name"] = attachment_name
@@ -331,9 +369,7 @@ async def add_attachments_async(mime_msg, message, headers, attachments_dir, pro
attachment_content = base64.b64decode(attachment["contentBytes"]) attachment_content = base64.b64decode(attachment["contentBytes"])
# Save attachment to disk # Save attachment to disk
attachment_path = os.path.join( attachment_path = os.path.join(message_attachments_dir, attachment_name)
message_attachments_dir, attachment_name
)
with open(attachment_path, "wb") as f: with open(attachment_path, "wb") as f:
f.write(attachment_content) f.write(attachment_content)