diff --git a/src/services/microsoft_graph/mail.py b/src/services/microsoft_graph/mail.py
index deeeddb..26948b5 100644
--- a/src/services/microsoft_graph/mail.py
+++ b/src/services/microsoft_graph/mail.py
@@ -111,7 +111,8 @@ async def fetch_mail_async(
downloaded_count = 0
# Download messages in parallel batches for better performance
- BATCH_SIZE = 5
+ # Using 10 concurrent downloads with connection pooling for better throughput
+ BATCH_SIZE = 10
for i in range(0, len(messages_to_download), BATCH_SIZE):
# Check if task was cancelled/disabled
@@ -487,7 +488,8 @@ async def fetch_archive_mail_async(
downloaded_count = 0
# Download messages in parallel batches for better performance
- BATCH_SIZE = 5
+ # Using 10 concurrent downloads with connection pooling for better throughput
+ BATCH_SIZE = 10
for i in range(0, len(messages_to_download), BATCH_SIZE):
# Check if task was cancelled/disabled
diff --git a/src/utils/mail_utils/maildir.py b/src/utils/mail_utils/maildir.py
index a846a1e..a2d83c2 100644
--- a/src/utils/mail_utils/maildir.py
+++ b/src/utils/mail_utils/maildir.py
@@ -27,6 +27,31 @@ from src.utils.mail_utils.helpers import (
)
+# Module-level session for reuse
+_shared_session: aiohttp.ClientSession | None = None
+
+
+async def get_shared_session() -> aiohttp.ClientSession:
+ """Get or create a shared aiohttp session for connection reuse."""
+ global _shared_session
+ if _shared_session is None or _shared_session.closed:
+ connector = aiohttp.TCPConnector(
+ limit=20, # Max concurrent connections
+ limit_per_host=10, # Max connections per host
+ ttl_dns_cache=300, # Cache DNS for 5 minutes
+ )
+ _shared_session = aiohttp.ClientSession(connector=connector)
+ return _shared_session
+
+
+async def close_shared_session():
+ """Close the shared session when done."""
+ global _shared_session
+ if _shared_session and not _shared_session.closed:
+ await _shared_session.close()
+ _shared_session = None
+
+
async def save_mime_to_maildir_async(
maildir_path,
message,
@@ -136,63 +161,68 @@ async def create_mime_message_async(
# First try the direct body content approach
message_id = message.get("id", "")
+
+ # Get shared session for connection reuse
+ session = await get_shared_session()
+
try:
# First get the message with body content
body_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}?$select=body,bodyPreview"
- async with aiohttp.ClientSession() as session:
- async with session.get(body_url, headers=headers) as response:
- if response.status == 200:
- body_data = await response.json()
+ async with session.get(body_url, headers=headers) as response:
+ if response.status == 200:
+ body_data = await response.json()
- # Get body content
- body_content = body_data.get("body", {}).get("content", "")
- body_type = body_data.get("body", {}).get("contentType", "text")
- body_preview = body_data.get("bodyPreview", "")
+ # Get body content
+ body_content = body_data.get("body", {}).get("content", "")
+ body_type = body_data.get("body", {}).get("contentType", "text")
+ body_preview = body_data.get("bodyPreview", "")
- # If we have body content, use it
- if body_content:
- if body_type.lower() == "html":
- # Add both HTML and plain text versions
- # Plain text conversion
- plain_text = re.sub(r"
", "\n", body_content)
- plain_text = re.sub(r"<[^>]*>", "", plain_text)
+ # If we have body content, use it
+ if body_content:
+ if body_type.lower() == "html":
+ # Add both HTML and plain text versions
+ # Plain text conversion
+ plain_text = re.sub(r"
", "\n", body_content)
+ plain_text = re.sub(r"<[^>]*>", "", plain_text)
- mime_msg.attach(MIMEText(plain_text, "plain"))
- mime_msg.attach(MIMEText(body_content, "html"))
- else:
- # Just plain text
- mime_msg.attach(MIMEText(body_content, "plain"))
- elif body_preview:
- # Use preview if we have it
- mime_msg.attach(
- MIMEText(
- f"{body_preview}\n\n[Message preview only. Full content not available.]",
- "plain",
- )
- )
+ mime_msg.attach(MIMEText(plain_text, "plain"))
+ mime_msg.attach(MIMEText(body_content, "html"))
else:
- # Fallback to MIME content
- progress.console.print(
- f"No direct body content for message {truncate_id(message_id)}, trying MIME content..."
+ # Just plain text
+ mime_msg.attach(MIMEText(body_content, "plain"))
+ elif body_preview:
+ # Use preview if we have it
+ mime_msg.attach(
+ MIMEText(
+ f"{body_preview}\n\n[Message preview only. Full content not available.]",
+ "plain",
)
- await fetch_mime_content(
- mime_msg, message_id, headers, progress
- )
- else:
- progress.console.print(
- f"Failed to get message body: {response.status}. Trying MIME content..."
)
- await fetch_mime_content(mime_msg, message_id, headers, progress)
+ else:
+ # Fallback to MIME content
+ progress.console.print(
+ f"No direct body content for message {truncate_id(message_id)}, trying MIME content..."
+ )
+ await fetch_mime_content(
+ mime_msg, message_id, headers, progress, session
+ )
+ else:
+ progress.console.print(
+ f"Failed to get message body: {response.status}. Trying MIME content..."
+ )
+ await fetch_mime_content(
+ mime_msg, message_id, headers, progress, session
+ )
except Exception as e:
progress.console.print(
f"Error getting message body: {e}. Trying MIME content..."
)
- await fetch_mime_content(mime_msg, message_id, headers, progress)
+ await fetch_mime_content(mime_msg, message_id, headers, progress, session)
# Handle attachments only if we want to download them
if download_attachments:
await add_attachments_async(
- mime_msg, message, headers, attachments_dir, progress
+ mime_msg, message, headers, attachments_dir, progress, session
)
else:
# Add a header to indicate attachment info was skipped
@@ -201,7 +231,7 @@ async def create_mime_message_async(
return mime_msg
-async def fetch_mime_content(mime_msg, message_id, headers, progress):
+async def fetch_mime_content(mime_msg, message_id, headers, progress, session=None):
"""
Fetch and add MIME content to a message when direct body access fails.
@@ -210,72 +240,78 @@ async def fetch_mime_content(mime_msg, message_id, headers, progress):
message_id (str): Message ID.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
+ session (aiohttp.ClientSession, optional): Shared session to use.
"""
# Fallback to getting the MIME content
message_content_url = (
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value"
)
try:
- async with aiohttp.ClientSession() as session:
- async with session.get(message_content_url, headers=headers) as response:
- if response.status == 200:
- full_content = await response.text()
+ # Use provided session or get shared session
+ if session is None:
+ session = await get_shared_session()
- # Check for body tags
- body_match = re.search(
- r"