wip refactoring

This commit is contained in:
Tim Bendt
2025-05-12 17:19:34 -06:00
parent d75f16c25d
commit 7123ff1f43
13 changed files with 1258 additions and 421 deletions

View File

@@ -0,0 +1,3 @@
"""
Microsoft Graph API module for interacting with Microsoft 365 services.
"""

View File

@@ -0,0 +1,64 @@
"""
Authentication module for Microsoft Graph API.
"""
import os
import msal
def get_access_token(scopes):
"""
Authenticate with Microsoft Graph API and obtain an access token.
Args:
scopes (list): List of scopes to request.
Returns:
tuple: (access_token, headers) where access_token is the token string
and headers is a dict with Authorization header.
Raises:
ValueError: If environment variables are missing.
Exception: If authentication fails.
"""
# Read Azure app credentials from environment variables
client_id = os.getenv('AZURE_CLIENT_ID')
tenant_id = os.getenv('AZURE_TENANT_ID')
if not client_id or not tenant_id:
raise ValueError("Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.")
# Token cache
cache = msal.SerializableTokenCache()
cache_file = 'token_cache.bin'
if os.path.exists(cache_file):
cache.deserialize(open(cache_file, 'r').read())
# Authentication
authority = f'https://login.microsoftonline.com/{tenant_id}'
app = msal.PublicClientApplication(client_id, authority=authority, token_cache=cache)
accounts = app.get_accounts()
if accounts:
token_response = app.acquire_token_silent(scopes, account=accounts[0])
else:
flow = app.initiate_device_flow(scopes=scopes)
if 'user_code' not in flow:
raise Exception("Failed to create device flow")
from rich import print
from rich.panel import Panel
print(Panel(flow['message'], border_style="magenta", padding=2, title="MSAL Login Flow Link"))
token_response = app.acquire_token_by_device_flow(flow)
if 'access_token' not in token_response:
raise Exception("Failed to acquire token")
# Save token cache
with open(cache_file, 'w') as f:
f.write(cache.serialize())
access_token = token_response['access_token']
headers = {'Authorization': f'Bearer {access_token}', 'Prefer': 'outlook.body-content-type="text"'}
return access_token, headers

View File

@@ -0,0 +1,56 @@
"""
Calendar operations for Microsoft Graph API.
"""
import os
from datetime import datetime, timedelta
from apis.microsoft_graph.client import fetch_with_aiohttp
async def fetch_calendar_events(headers, days_back=1, days_forward=6, start_date=None, end_date=None):
"""
Fetch calendar events from Microsoft Graph API.
Args:
headers (dict): Headers including authentication.
days_back (int): Number of days to look back.
days_forward (int): Number of days to look forward.
start_date (datetime): Optional start date, overrides days_back if provided.
end_date (datetime): Optional end date, overrides days_forward if provided.
Returns:
tuple: (events, total_count) where events is a list of event dictionaries
and total_count is the total number of events.
"""
# Calculate date range
if start_date is None:
start_date = datetime.now() - timedelta(days=days_back)
if end_date is None:
end_date = start_date + timedelta(days=days_forward)
# Format dates for API
start_date_str = start_date.strftime('%Y-%m-%dT00:00:00Z')
end_date_str = end_date.strftime('%Y-%m-%dT23:59:59Z')
# Prepare the API query
calendar_url = (
f'https://graph.microsoft.com/v1.0/me/calendarView?'
f'startDateTime={start_date_str}&endDateTime={end_date_str}&'
f'$select=id,subject,organizer,start,end,location,isAllDay,showAs,sensitivity'
)
events = []
# Make the API request
response_data = await fetch_with_aiohttp(calendar_url, headers)
events.extend(response_data.get('value', []))
# Check if there are more events (pagination)
next_link = response_data.get('@odata.nextLink')
while next_link:
response_data = await fetch_with_aiohttp(next_link, headers)
events.extend(response_data.get('value', []))
next_link = response_data.get('@odata.nextLink')
# Return events and total count
return events, len(events)

View File

@@ -0,0 +1,85 @@
"""
HTTP client for Microsoft Graph API.
"""
import aiohttp
import asyncio
import orjson
# Define a global semaphore for throttling
semaphore = asyncio.Semaphore(4)
async def fetch_with_aiohttp(url, headers):
"""
Fetch data from Microsoft Graph API.
Args:
url (str): The URL to fetch data from.
headers (dict): Headers including authentication.
Returns:
dict: JSON response data.
Raises:
Exception: If the request fails.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}")
raw_bytes = await response.read()
content_length = response.headers.get('Content-Length')
if content_length and len(raw_bytes) != int(content_length):
print("Warning: Incomplete response received!")
return None
return orjson.loads(raw_bytes)
async def post_with_aiohttp(url, headers, json_data):
"""
Post data to Microsoft Graph API.
Args:
url (str): The URL to post data to.
headers (dict): Headers including authentication.
json_data (dict): JSON data to post.
Returns:
int: HTTP status code.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json_data) as response:
return response.status
async def patch_with_aiohttp(url, headers, json_data):
"""
Patch data to Microsoft Graph API.
Args:
url (str): The URL to patch data to.
headers (dict): Headers including authentication.
json_data (dict): JSON data to patch.
Returns:
int: HTTP status code.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=json_data) as response:
return response.status
async def delete_with_aiohttp(url, headers):
"""
Delete data from Microsoft Graph API.
Args:
url (str): The URL to delete data from.
headers (dict): Headers including authentication.
Returns:
int: HTTP status code.
"""
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
return response.status

View File

@@ -0,0 +1,204 @@
"""
Mail operations for Microsoft Graph API.
"""
import os
import re
import glob
from typing import Set
import aiohttp
from apis.microsoft_graph.client import fetch_with_aiohttp, patch_with_aiohttp, post_with_aiohttp, delete_with_aiohttp
async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_id, dry_run=False, download_attachments=False):
"""
Fetch mail from Microsoft Graph API and save to Maildir.
Args:
maildir_path (str): Path to the Maildir.
attachments_dir (str): Path to save attachments.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
download_attachments (bool): If True, download email attachments.
Returns:
None
"""
from utils.mail_utils.maildir import save_mime_to_maildir_async
from utils.mail_utils.helpers import truncate_id
mail_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead'
messages = []
# Fetch the total count of messages in the inbox
inbox_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox'
response = await fetch_with_aiohttp(inbox_url, headers)
total_messages = response.get('totalItemCount', 0)
progress.update(task_id, total=total_messages)
while mail_url:
try:
response_data = await fetch_with_aiohttp(mail_url, headers)
except Exception as e:
progress.console.print(f"Error fetching messages: {e}")
continue
messages.extend(response_data.get('value', []))
progress.advance(task_id, len(response_data.get('value', [])))
# Get the next page URL from @odata.nextLink
mail_url = response_data.get('@odata.nextLink')
inbox_msg_ids = set(message['id'] for message in messages)
progress.update(task_id, completed=(len(messages) / 2))
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml*')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*')))
for filename in Set.union(cur_files, new_files):
message_id = filename.split('.')[0].split('/')[-1] # Extract the Message-ID from the filename
if (message_id not in inbox_msg_ids):
if not dry_run:
progress.console.print(f"Deleting {filename} from inbox")
os.remove(filename)
else:
progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox")
for message in messages:
progress.console.print(f"Processing message: {message.get('subject', 'No Subject')}", end='\r')
await save_mime_to_maildir_async(maildir_path, message, attachments_dir, headers, progress, dry_run, download_attachments)
progress.update(task_id, advance=0.5)
progress.update(task_id, completed=len(messages))
progress.console.print(f"\nFinished saving {len(messages)} messages.")
async def archive_mail_async(maildir_path, headers, progress, task_id, dry_run=False):
"""
Archive mail from Maildir to Microsoft Graph API archive folder.
Args:
maildir_path (str): Path to the Maildir.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
Returns:
None
"""
archive_dir = os.path.join(maildir_path, '.Archives')
archive_files = glob.glob(os.path.join(archive_dir, '**', '*.eml*'), recursive=True)
progress.update(task_id, total=len(archive_files))
folder_response = await fetch_with_aiohttp('https://graph.microsoft.com/v1.0/me/mailFolders', headers)
folders = folder_response.get('value', [])
archive_folder_id = next((folder.get('id') for folder in folders if folder.get('displayName', '').lower() == 'archive'), None)
if not archive_folder_id:
raise Exception("No folder named 'Archive' found on the server.")
for filepath in archive_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
if not dry_run:
status = await post_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move',
headers,
{'destinationId': archive_folder_id}
)
if status != 201: # 201 Created indicates success
progress.console.print(f"Failed to move message to 'Archive': {message_id}, {status}")
if status == 404:
os.remove(filepath) # Remove the file from local archive if not found
progress.console.print(f"Message not found on server, removed local copy: {message_id}")
elif status == 204:
progress.console.print(f"Moved message to 'Archive': {message_id}")
else:
progress.console.print(f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}")
progress.advance(task_id)
return
async def delete_mail_async(maildir_path, headers, progress, task_id, dry_run=False):
"""
Delete mail from Maildir and Microsoft Graph API.
Args:
maildir_path (str): Path to the Maildir.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
Returns:
None
"""
trash_dir = os.path.join(maildir_path, '.Trash', 'cur')
trash_files = set(glob.glob(os.path.join(trash_dir, '*.eml*')))
progress.update(task_id, total=len(trash_files))
for filepath in trash_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
if not dry_run:
progress.console.print(f"Moving message to trash: {message_id}")
status = await delete_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
headers
)
if status == 204 or status == 404:
os.remove(filepath) # Remove the file from local trash
else:
progress.console.print(f"[DRY-RUN] Would delete message: {message_id}")
progress.advance(task_id)
async def synchronize_maildir_async(maildir_path, headers, progress, task_id, dry_run=False):
"""
Synchronize Maildir with Microsoft Graph API.
Args:
maildir_path (str): Path to the Maildir.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
task_id: ID of the task in the progress bar.
dry_run (bool): If True, don't actually make changes.
Returns:
None
"""
from utils.mail_utils.helpers import load_last_sync_timestamp, save_sync_timestamp, truncate_id
last_sync = load_last_sync_timestamp()
# Find messages moved from "new" to "cur" and mark them as read
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml*')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*')))
moved_to_cur = [os.path.basename(f) for f in cur_files - new_files]
progress.update(task_id, total=len(moved_to_cur))
for filename in moved_to_cur:
# TODO: this isn't scalable, we should use a more efficient way to check if the file was modified
if os.path.getmtime(os.path.join(cur_dir, filename)) < last_sync:
progress.update(task_id, advance=1)
continue
message_id = re.sub(r"\:2.+", "", filename.split('.')[0]) # Extract the Message-ID from the filename
if not dry_run:
status = await patch_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
headers,
{'isRead': True}
)
if status == 404:
os.remove(os.path.join(cur_dir, filename))
else:
progress.console.print(f"[DRY-RUN] Would mark message as read: {truncate_id(message_id)}")
progress.advance(task_id)
# Save the current sync timestamp
if not dry_run:
save_sync_timestamp()
else:
progress.console.print("[DRY-RUN] Would save sync timestamp.")

View File

@@ -128,6 +128,7 @@ class OneDriveTUI(App):
table = self.query_one("#items_table")
table.cursor_type = "row"
table.add_columns("", "Name", "Last Modified", "Size", "Web URL")
table.focus()
# Load cached token if available
if os.path.exists(self.cache_file):

View File

@@ -1,460 +1,188 @@
import glob
import json
"""
Fetch and synchronize emails and calendar events from Microsoft Outlook (Graph API).
"""
import os
import re
import time
from datetime import datetime, timedelta
from email.message import EmailMessage
from email.utils import format_datetime
from typing import Set
from dateutil import parser
from dateutil.tz import UTC
from rich import print
import argparse
import asyncio
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, MofNCompleteColumn
import aiohttp
import argparse
import asyncio
import html2text
import msal
import orjson
# Filepath for caching timestamp
cache_timestamp_file = 'cache_timestamp.json'
# Filepath for sync timestamp
sync_timestamp_file = 'sync_timestamp.json'
# Function to load the last sync timestamp
def load_last_sync_timestamp():
if os.path.exists(sync_timestamp_file):
with open(sync_timestamp_file, 'r') as f:
return json.load(f).get('last_sync', 0)
return 0
# Function to save the current sync timestamp
def save_sync_timestamp():
with open(sync_timestamp_file, 'w') as f:
json.dump({'last_sync': time.time()}, f)
# Import the refactored modules
from apis.microsoft_graph.auth import get_access_token
from apis.microsoft_graph.mail import fetch_mail_async, archive_mail_async, delete_mail_async, synchronize_maildir_async
from apis.microsoft_graph.calendar import fetch_calendar_events
from utils.calendar_utils import save_events_to_vdir, save_events_to_file
from utils.mail_utils.helpers import ensure_directory_exists
# Add argument parsing for dry-run mode
arg_parser = argparse.ArgumentParser(description="Fetch and synchronize emails.")
arg_parser.add_argument("--dry-run", action="store_true", help="Run in dry-run mode without making changes.", default=False)
arg_parser.add_argument("--vdir", help="Output calendar events in vdir format to the specified directory (each event in its own file)", default=None)
arg_parser.add_argument("--icsfile", help="Output calendar events into this ics file path.", default=None)
arg_parser.add_argument("--org", help="Specify the organization name for the subfolder to store emails and calendar events", default="corteva")
arg_parser.add_argument("--days-back", type=int, help="Number of days to look back for calendar events", default=1)
arg_parser.add_argument("--days-forward", type=int, help="Number of days to look forward for calendar events", default=6)
arg_parser.add_argument("--continue-iteration", action="store_true", help="Enable interactive mode to continue fetching more date ranges", default=False)
arg_parser.add_argument("--download-attachments", action="store_true", help="Download email attachments", default=False)
args = arg_parser.parse_args()
# Parse command line arguments
dry_run = args.dry_run
# Define a global semaphore for throttling
semaphore = asyncio.Semaphore(4)
async def fetch_with_aiohttp(url, headers):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}")
raw_bytes = await response.read()
content_length = response.headers.get('Content-Length')
if content_length and len(raw_bytes) != int(content_length):
print("Warning: Incomplete response received!")
return None
return orjson.loads(raw_bytes)
async def post_with_aiohttp(url, headers, json_data):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json_data) as response:
return response.status
async def patch_with_aiohttp(url, headers, json_data):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=json_data) as response:
return response.status
async def delete_with_aiohttp(url, headers):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
return response.status
async def synchronize_maildir_async(maildir_path, headers, progress, task_id):
last_sync = load_last_sync_timestamp()
# Find messages moved from "new" to "cur" and mark them as read
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml*')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*')))
moved_to_cur = [os.path.basename(f) for f in cur_files - new_files]
progress.update(task_id, total=len(moved_to_cur))
for filename in moved_to_cur:
# TODO: this isn't scalable, we should use a more efficient way to check if the file was modified
if os.path.getmtime(os.path.join(cur_dir, filename)) < last_sync:
progress.update(task_id, advance=1)
continue
message_id = re.sub(r"\:2.+", "", filename.split('.')[0]) # Extract the Message-ID from the filename
if not dry_run:
status = await patch_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
headers,
{'isRead': True}
)
if status == 404:
os.remove(os.path.join(cur_dir, filename))
else:
progress.console.print(f"[DRY-RUN] Would mark message as read: {message_id}")
progress.advance(task_id)
# Save the current sync timestamp
if not dry_run:
save_sync_timestamp()
else:
progress.console.print("[DRY-RUN] Would save sync timestamp.")
async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_id):
mail_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead'
messages = []
# Fetch the total count of messages in the inbox
inbox_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox'
response = await fetch_with_aiohttp(inbox_url, headers)
total_messages = response.get('totalItemCount', 0)
progress.update(task_id, total=total_messages)
while mail_url:
try:
response_data = await fetch_with_aiohttp(mail_url, headers)
except Exception as e:
progress.console.print(f"Error fetching messages: {e}")
continue
messages.extend(response_data.get('value', []))
progress.advance(task_id, len(response_data.get('value', [])))
# Get the next page URL from @odata.nextLink
mail_url = response_data.get('@odata.nextLink')
inbox_msg_ids = set(message['id'] for message in messages)
progress.update(task_id, completed=(len(messages) / 2))
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml*')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*')))
for filename in Set.union(cur_files, new_files):
message_id = filename.split('.')[0].split('/')[-1] # Extract the Message-ID from the filename
if (message_id not in inbox_msg_ids):
if not dry_run:
progress.console.print(f"Deleting {filename} from inbox")
os.remove(filename)
else:
progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox")
for message in messages:
progress.console.print(f"Processing message: {message.get('subject', 'No Subject')}", end='\r')
await save_mime_to_maildir_async(maildir_path, message, attachments_dir, headers, progress)
progress.update(task_id, advance=0.5)
progress.update(task_id, completed=len(messages))
progress.console.print(f"\nFinished saving {len(messages)} messages.")
async def archive_mail_async(maildir_path, headers, progress, task_id):
archive_dir = os.path.join(maildir_path, '.Archives')
archive_files = glob.glob(os.path.join(archive_dir, '**', '*.eml*'), recursive=True)
progress.update(task_id, total=len(archive_files))
folder_response = await fetch_with_aiohttp('https://graph.microsoft.com/v1.0/me/mailFolders', headers)
folders = folder_response.get('value', [])
archive_folder_id = next((folder.get('id') for folder in folders if folder.get('displayName', '').lower() == 'archive'), None)
if not archive_folder_id:
raise Exception("No folder named 'Archive' found on the server.")
for filepath in archive_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
if not dry_run:
status = await post_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move',
headers,
{'destinationId': archive_folder_id}
)
if status != 201: # 201 Created indicates success
progress.console.print(f"Failed to move message to 'Archive': {message_id}, {status}")
if status == 404:
os.remove(filepath) # Remove the file from local archive if not fo
progress.console.print(f"Message not found on server, removed local copy: {message_id}")
elif status == 204:
progress.console.print(f"Moved message to 'Archive': {message_id}")
else:
progress.console.print(f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}")
progress.advance(task_id)
return
async def delete_mail_async(maildir_path, headers, progress, task_id):
trash_dir = os.path.join(maildir_path, '.Trash', 'cur')
trash_files = set(glob.glob(os.path.join(trash_dir, '*.eml*')))
progress.update(task_id, total=len(trash_files))
for filepath in trash_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
if not dry_run:
progress.console.print(f"Moving message to trash: {message_id}")
status = await delete_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
headers
)
if status == 204 or status == 404:
os.remove(filepath) # Remove the file from local trash
else:
progress.console.print(f"[DRY-RUN] Would delete message: {message_id}")
progress.advance(task_id)
vdir_path = args.vdir
ics_path = args.icsfile
org_name = args.org
days_back = args.days_back
days_forward = args.days_forward
continue_iteration = args.continue_iteration
download_attachments = args.download_attachments
async def fetch_calendar_async(headers, progress, task_id):
yesterday = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(days=1)
end_of_today = datetime.now().replace(hour=23, minute=59, second=59)
six_days_future = end_of_today + timedelta(days=6)
# example https://graph.microsoft.com/v1.0/me/calendarView?startDateTime=2025-05-06T00:00:00&endDateTime=2025-05-13T23:59:59.999999&$count=true&$select=id
event_base_url =f"https://graph.microsoft.com/v1.0/me/calendarView?startDateTime={yesterday.isoformat()}&endDateTime={six_days_future.isoformat()}"
total_event_url = f"{event_base_url}&$count=true&$select=id"
"""
Fetch calendar events and save them in the appropriate format.
total = await fetch_with_aiohttp(total_event_url, headers)
Args:
headers: Authentication headers for Microsoft Graph API
progress: Progress instance for updating progress bars
task_id: ID of the task in the progress bar
total_events = total.get('@odata.count', 0) + 1
progress.update(task_id, total=total_events)
calendar_url = f"{event_base_url}&$top=100&$select=start,end,iCalUid,subject,bodyPreview,webLink,location,recurrence,showAs,responseStatus,onlineMeeting"
events = []
if total_events > 100:
progress.update(task_id, total=total_events + total_events % 100)
while calendar_url:
response_data = await fetch_with_aiohttp(calendar_url, headers)
events.extend(response_data.get('value', []))
progress.advance(task_id, 1)
Returns:
List of event dictionaries
# Get the next page URL from @odata.nextLink
calendar_url = response_data.get('@odata.nextLink')
Raises:
Exception: If there's an error fetching or saving events
"""
from datetime import datetime, timedelta
output_file = 'output_ics/outlook_events_latest.ics'
if not dry_run:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
progress.console.print(f"Saving events to {output_file}...")
with open(output_file, 'w') as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
for event in events:
progress.advance(task_id)
if 'start' in event and 'end' in event:
start = parser.isoparse(event['start']['dateTime']).astimezone(UTC)
end = parser.isoparse(event['end']['dateTime']).astimezone(UTC)
f.write(f"BEGIN:VEVENT\nSUMMARY:{event['subject']}\nDESCRIPTION:{event.get('bodyPreview', '')}\n")
f.write(f"UID:{event.get('iCalUId', '')}\n")
f.write(f"LOCATION:{event.get('location', {})['displayName']}\n")
f.write(f"CLASS:{event.get('showAs', '')}\n")
f.write(f"STATUS:{event.get('responseStatus', {})['response']}\n")
if 'onlineMeeting' in event and event['onlineMeeting']:
f.write(f"URL:{event.get('onlineMeeting', {}).get('joinUrl', '')}\n")
f.write(f"DTSTART:{start.strftime('%Y%m%dT%H%M%S')}\n")
f.write(f"DTEND:{end.strftime('%Y%m%dT%H%M%S')}\n")
if 'recurrence' in event and event['recurrence']: # Check if 'recurrence' exists and is not None
for rule in event['recurrence']:
if rule.startswith('RRULE'):
rule_parts = rule.split(';')
new_rule_parts = []
for part in rule_parts:
if part.startswith('UNTIL='):
until_value = part.split('=')[1]
until_date = parser.isoparse(until_value)
if start.tzinfo is not None and until_date.tzinfo is None:
until_date = until_date.replace(tzinfo=UTC)
new_rule_parts.append(f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}")
else:
new_rule_parts.append(part)
rule = ';'.join(new_rule_parts)
f.write(f"{rule}\n")
f.write("END:VEVENT\n")
f.write("END:VCALENDAR\n")
try:
# Use the utility function to fetch calendar events
progress.console.print("[cyan]Fetching events from Microsoft Graph API...[/cyan]")
events, total_events = await fetch_calendar_events(
headers=headers,
days_back=days_back,
days_forward=days_forward
)
progress.console.print(f"Saved events to {output_file}")
else:
progress.console.print(f"[DRY-RUN] Would save events to {output_file}")
progress.console.print(f"[cyan]Got {len(events)} events from API (reported total: {total_events})[/cyan]")
# Update progress bar with total events
progress.update(task_id, total=total_events)
# Save events to appropriate format
if not dry_run:
if vdir_path:
# Create org-specific directory within vdir path
org_vdir_path = os.path.join(vdir_path, org_name)
progress.console.print(f"[cyan]Saving events to vdir: {org_vdir_path}[/cyan]")
save_events_to_vdir(events, org_vdir_path, progress, task_id, dry_run)
progress.console.print(f"[green]Finished saving events to vdir: {org_vdir_path}[/green]")
elif ics_path:
# Save to a single ICS file in the output_ics directory
progress.console.print(f"[cyan]Saving events to ICS file: {ics_path}/events_latest.ics[/cyan]")
save_events_to_file(events, f"{ics_path}/events_latest.ics", progress, task_id, dry_run)
progress.console.print(f"[green]Finished saving events to ICS file[/green]")
else:
# No destination specified
progress.console.print("[yellow]Warning: No destination path (--vdir or --icsfile) specified for calendar events.[/yellow]")
else:
progress.console.print(f"[DRY-RUN] Would save {len(events)} events to {'vdir format' if vdir_path else 'single ICS file'}")
progress.update(task_id, advance=len(events))
# Interactive mode: Ask if the user wants to continue with the next date range
if continue_iteration:
# Move to the next date range
next_start_date = datetime.now() - timedelta(days=days_back)
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(f"\nCurrent date range: {next_start_date.strftime('%Y-%m-%d')} to {next_end_date.strftime('%Y-%m-%d')}")
user_response = input("\nContinue to iterate? [y/N]: ").strip().lower()
while user_response == 'y':
progress.console.print(f"\nFetching events for {next_start_date.strftime('%Y-%m-%d')} to {next_end_date.strftime('%Y-%m-%d')}...")
# Reset the progress bar for the new fetch
progress.update(task_id, completed=0, total=0)
# Fetch events for the next date range
next_events, next_total_events = await fetch_calendar_events(
headers=headers,
days_back=0,
days_forward=days_forward,
start_date=next_start_date,
end_date=next_end_date
)
# Update progress bar with total events
progress.update(task_id, total=next_total_events)
if not dry_run:
if vdir_path:
save_events_to_vdir(next_events, org_vdir_path, progress, task_id, dry_run)
else:
save_events_to_file(next_events, f'output_ics/outlook_events_{next_start_date.strftime("%Y%m%d")}.ics',
progress, task_id, dry_run)
else:
progress.console.print(f"[DRY-RUN] Would save {len(next_events)} events to {'vdir format' if vdir_path else 'output_ics/outlook_events_' + next_start_date.strftime("%Y%m%d") + '.ics'}")
progress.update(task_id, advance=len(next_events))
# Calculate the next date range
next_start_date = next_end_date
next_end_date = next_start_date + timedelta(days=days_forward)
progress.console.print(f"\nNext date range would be: {next_start_date.strftime('%Y-%m-%d')} to {next_end_date.strftime('%Y-%m-%d')}")
user_response = input("\nContinue to iterate? [y/N]: ").strip().lower()
return events
except Exception as e:
progress.console.print(f"[red]Error fetching or saving calendar events: {str(e)}[/red]")
import traceback
progress.console.print(f"[red]{traceback.format_exc()}[/red]")
progress.update(task_id, completed=True)
return []
# Function to create Maildir structure
def create_maildir_structure(base_path):
os.makedirs(os.path.join(base_path, 'cur'), exist_ok=True)
os.makedirs(os.path.join(base_path, 'new'), exist_ok=True)
os.makedirs(os.path.join(base_path, 'tmp'), exist_ok=True)
"""
Create the standard Maildir directory structure.
async def save_mime_to_maildir_async(maildir_path, email_data, attachments_dir, headers, progress):
# Create a new EmailMessage object
# Determine the directory based on isRead
target_dir = 'cur' if email_data.get('isRead', False) else 'new'
id = email_data.get('id', '')
if not id:
progress.console.print("Message ID not found. Skipping save.")
return
email_filename = f"{id}.eml"
email_filepath = os.path.join(maildir_path, target_dir, email_filename)
# Check if the file already exists
if os.path.exists(email_filepath):
progress.console.print(f"Message {id} already exists in {target_dir}. Skipping save.")
return
# Fetch the full MIME payload from the API
mime_url = f'https://graph.microsoft.com/v1.0/me/messages/{id}/$value'
try:
async with aiohttp.ClientSession() as session:
async with session.get(mime_url, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to fetch MIME payload for {id}: {response.status} {await response.text()}")
mime_payload = await response.text()
# Save the MIME payload to the Maildir
os.makedirs(os.path.dirname(email_filepath), exist_ok=True)
with open(email_filepath, 'w') as f:
f.write(mime_payload)
progress.console.print(f"Saved message {id} to {target_dir}.")
except Exception as e:
progress.console.print(f"Failed to save message {id}: {e}")
def save_email_to_maildir(maildir_path, email_data, attachments_dir, progress):
# Create a new EmailMessage object
msg = EmailMessage()
received_datetime = email_data.get('receivedDateTime', '')
if received_datetime:
parsed_datetime = parser.isoparse(received_datetime)
msg['Date'] = format_datetime(parsed_datetime)
else:
msg['Date'] = ''
msg['Message-ID'] = email_data.get('id', '')
msg['Subject'] = email_data.get('subject', 'No Subject')
msg['From'] = email_data.get('from', {}).get('emailAddress', {}).get('address', 'unknown@unknown.com')
msg['To'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('toRecipients', [])])
msg['Cc'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('ccRecipients', [])])
# Convert the email body from HTML to Markdown
body_html = email_data.get('body', {}).get('content', '')
if email_data.get('body', {}).get('contentType', '').lower() == 'html':
markdown_converter = html2text.HTML2Text()
markdown_converter.ignore_images = True
markdown_converter.ignore_links = True
body_markdown = markdown_converter.handle(body_html)
else:
body_markdown = body_html
# Remove lines between any alphanumeric BannerStart and BannerEnd
body_markdown = re.sub(r'\w+BannerStart.*?\w+BannerEnd', '', body_markdown, flags=re.DOTALL)
msg.set_content(body_markdown)
# Download attachments
progress.console.print(f"Downloading attachments for message: {msg['Message-ID']}")
for attachment in email_data.get('attachments', []):
attachment_name = attachment.get('name', 'unknown')
attachment_content = attachment.get('contentBytes')
if attachment_content:
attachment_path = os.path.join(attachments_dir, attachment_name)
if not dry_run:
with open(attachment_path, 'wb') as f:
f.write(attachment_content.encode('utf-8'))
msg.add_attachment(attachment_content.encode('utf-8'), filename=attachment_name)
else:
progress.console.print(f"[DRY-RUN] Would save attachment to {attachment_path}")
# Determine the directory based on isRead
target_dir = 'cur' if email_data.get('isRead', False) else 'new'
email_filename = f"{msg['Message-ID']}.eml"
email_filepath = os.path.join(maildir_path, target_dir, email_filename)
# Check if the file already exists in any subfolder
for root, _, files in os.walk(maildir_path):
if email_filename in files:
progress.console.print(f"Message {msg['Message-ID']} already exists in {root}. Skipping save.")
return
# Save the email to the Maildir
if not dry_run:
with open(email_filepath, 'w') as f:
f.write(msg.as_string())
progress.console.print(f"Saved message {msg['Message-ID']}")
else:
progress.console.print(f"[DRY-RUN] Would save message {msg['Message-ID']}")
Args:
base_path (str): Base path for the Maildir.
Returns:
None
"""
ensure_directory_exists(os.path.join(base_path, 'cur'))
ensure_directory_exists(os.path.join(base_path, 'new'))
ensure_directory_exists(os.path.join(base_path, 'tmp'))
ensure_directory_exists(os.path.join(base_path, '.Archives'))
ensure_directory_exists(os.path.join(base_path, '.Trash', 'cur'))
async def main():
"""
Main function to run the script.
Returns:
None
"""
# Save emails to Maildir
maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva"
maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + f"/{org_name}"
attachments_dir = os.path.join(maildir_path, 'attachments')
os.makedirs(attachments_dir, exist_ok=True)
ensure_directory_exists(attachments_dir)
create_maildir_structure(maildir_path)
# Read Azure app credentials from environment variables
client_id = os.getenv('AZURE_CLIENT_ID')
tenant_id = os.getenv('AZURE_TENANT_ID')
if not client_id or not tenant_id:
raise ValueError("Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.")
# Token cache
cache = msal.SerializableTokenCache()
cache_file = 'token_cache.bin'
if os.path.exists(cache_file):
cache.deserialize(open(cache_file, 'r').read())
# Authentication
authority = f'https://login.microsoftonline.com/{tenant_id}'
# Define scopes for Microsoft Graph API
scopes = ['https://graph.microsoft.com/Calendars.Read', 'https://graph.microsoft.com/Mail.ReadWrite']
app = msal.PublicClientApplication(client_id, authority=authority, token_cache=cache)
accounts = app.get_accounts()
if accounts:
token_response = app.acquire_token_silent(scopes, account=accounts[0])
else:
flow = app.initiate_device_flow(scopes=scopes)
if 'user_code' not in flow:
raise Exception("Failed to create device flow")
print(Panel(flow['message'], border_style="magenta", padding=2, title="MSAL Login Flow Link"))
token_response = app.acquire_token_by_device_flow(flow)
if 'access_token' not in token_response:
raise Exception("Failed to acquire token")
# Save token cache
with open(cache_file, 'w') as f:
f.write(cache.serialize())
access_token = token_response['access_token']
headers = {'Authorization': f'Bearer {access_token}', 'Prefer': 'outlook.body-content-type="text"'}
accounts = app.get_accounts()
if not accounts:
raise Exception("No accounts found")
maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva"
# Authenticate and get access token
access_token, headers = get_access_token(scopes)
# Set up the progress bars
progress = Progress(
SpinnerColumn(),
MofNCompleteColumn(),
*Progress.get_default_columns()
)
with progress:
task_fetch = progress.add_task("[green]Syncing Inbox...", total=0)
task_calendar = progress.add_task("[cyan]Fetching calendar...", total=0)
@@ -463,10 +191,10 @@ async def main():
task_delete = progress.add_task("[red]Deleting mail...", total=0)
await asyncio.gather(
synchronize_maildir_async(maildir_path, headers, progress, task_read),
archive_mail_async(maildir_path, headers, progress, task_archive),
delete_mail_async(maildir_path, headers, progress, task_delete),
fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_fetch),
synchronize_maildir_async(maildir_path, headers, progress, task_read, dry_run),
archive_mail_async(maildir_path, headers, progress, task_archive, dry_run),
delete_mail_async(maildir_path, headers, progress, task_delete, dry_run),
fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_fetch, dry_run, download_attachments),
fetch_calendar_async(headers, progress, task_calendar)
)

View File

@@ -104,6 +104,7 @@ class DocumentViewerScreen(Screen):
def on_mount(self) -> None:
"""Handle screen mount event."""
self.query_one("#content_container").focus()
self.download_document()
def on_button_pressed(self, event: Button.Pressed) -> None:

8
test_refactored.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/bin/bash
# Test script for the refactored code
echo "Testing the refactored code with a dry run (no attachment download)..."
python fetch_outlook.py --dry-run
echo -e "\nTesting with attachment downloading enabled..."
python fetch_outlook.py --dry-run --download-attachments

300
utils/calendar_utils.py Normal file
View File

@@ -0,0 +1,300 @@
"""
Utility module for handling calendar events and iCalendar operations.
"""
import re
import os
from datetime import datetime, timedelta
from dateutil import parser
from dateutil.tz import UTC
import glob
def truncate_id(text, first=8, last=8):
"""
Truncate long IDs or filenames to show just the first and last few characters.
Args:
text: The ID or filename to truncate
first: Number of characters to keep from the beginning
last: Number of characters to keep from the end
Returns:
Truncated string with ellipsis in the middle
"""
if not text or len(text) <= first + last + 3:
return text
return f"{text[:first]}...{text[-last:]}"
def clean_text(text):
"""
Clean text by removing instances of 3 or more consecutive underscores
which can affect readability.
Args:
text: Text to clean
Returns:
Cleaned text
"""
if not text:
return ""
# Replace 3 or more consecutive underscores with 2 underscores
return re.sub(r'_{3,}', '__', text)
def escape_ical_text(text):
"""
Escape text for iCalendar format according to RFC 5545.
Args:
text: Text to escape
Returns:
Escaped text
"""
if not text:
return ""
# First clean multiple underscores
text = clean_text(text)
text = text.replace("\\", "\\\\")
text = text.replace("\n", "\\n")
text = text.replace(",", "\\,")
text = text.replace(";", "\\;")
return text
async def fetch_calendar_events(headers, days_back=1, days_forward=6, fetch_function=None,
start_date=None, end_date=None):
"""
Fetch calendar events from Microsoft Graph API.
Args:
headers: Authentication headers for Microsoft Graph API
days_back: Number of days to look back (default: 1)
days_forward: Number of days to look forward (default: 6)
fetch_function: Async function to use for fetching data (default: None)
Should accept URL and headers as parameters
start_date: Optional explicit start date (datetime object)
end_date: Optional explicit end date (datetime object)
Returns:
Tuple of (events list, total_events count)
"""
if fetch_function is None:
raise ValueError("fetch_function is required for API calls")
# Calculate date range
if start_date is None:
start_date = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(days=days_back)
if end_date is None:
end_of_today = datetime.now().replace(hour=23, minute=59, second=59)
end_date = end_of_today + timedelta(days=days_forward)
# Build the API URL
event_base_url = f"https://graph.microsoft.com/v1.0/me/calendarView?startDateTime={start_date.isoformat()}&endDateTime={end_date.isoformat()}"
calendar_url = f"{event_base_url}&$top=100&$select=start,end,id,iCalUId,subject,bodyPreview,webLink,location,recurrence,showAs,responseStatus,onlineMeeting,lastModifiedDateTime"
# Fetch total count for progress reporting (if needed)
total_event_url = f"{event_base_url}&$count=true&$select=id"
try:
total_response = await fetch_function(total_event_url, headers)
total_events = total_response.get('@odata.count', 0)
except Exception as e:
print(f"Error fetching total events count: {e}")
total_events = 0
# Fetch all calendar events, handling pagination
events = []
while calendar_url:
try:
response_data = await fetch_function(calendar_url, headers)
if response_data:
events.extend(response_data.get('value', []))
# Get the next page URL from @odata.nextLink
calendar_url = response_data.get('@odata.nextLink')
else:
print("Received empty response from calendar API")
break
except Exception as e:
print(f"Error fetching calendar events: {e}")
break
# Only return the events and total_events
return events, total_events
def write_event_to_ical(f, event, start, end):
"""
Write a single event to an iCalendar file.
Args:
f: File-like object to write to
event: Dictionary containing event data
start: Start datetime with timezone information
end: End datetime with timezone information
"""
# Preserve the original timezones
start_tz = start.tzinfo
end_tz = end.tzinfo
f.write(f"BEGIN:VEVENT\nSUMMARY:{escape_ical_text(event['subject'])}\n")
# Handle multi-line description properly
description = event.get('bodyPreview', '')
if description:
escaped_description = escape_ical_text(description)
f.write(f"DESCRIPTION:{escaped_description}\n")
f.write(f"UID:{event.get('iCalUId', '')}\n")
f.write(f"LOCATION:{escape_ical_text(event.get('location', {}).get('displayName', ''))}\n")
f.write(f"CLASS:{event.get('showAs', '')}\n")
f.write(f"STATUS:{event.get('responseStatus', {}).get('response', '')}\n")
if 'onlineMeeting' in event and event['onlineMeeting']:
f.write(f"URL:{event.get('onlineMeeting', {}).get('joinUrl', '')}\n")
# Write start and end times with timezone info in iCalendar format
if start.tzinfo == UTC:
f.write(f"DTSTART:{start.strftime('%Y%m%dT%H%M%SZ')}\n")
else:
tz_name = start_tz.tzname(None) if start_tz else 'UTC'
f.write(f"DTSTART;TZID={tz_name}:{start.strftime('%Y%m%dT%H%M%S')}\n")
if end.tzinfo == UTC:
f.write(f"DTEND:{end.strftime('%Y%m%dT%H%M%SZ')}\n")
else:
tz_name = end_tz.tzname(None) if end_tz else 'UTC'
f.write(f"DTEND;TZID={tz_name}:{end.strftime('%Y%m%dT%H%M%S')}\n")
# Handle recurrence rules
if 'recurrence' in event and event['recurrence']:
for rule in event['recurrence']:
if rule.startswith('RRULE'):
rule_parts = rule.split(';')
new_rule_parts = []
for part in rule_parts:
if part.startswith('UNTIL='):
until_value = part.split('=')[1]
until_date = parser.isoparse(until_value)
if start.tzinfo is not None and until_date.tzinfo is None:
until_date = until_date.replace(tzinfo=start.tzinfo)
new_rule_parts.append(f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}")
else:
new_rule_parts.append(part)
rule = ';'.join(new_rule_parts)
f.write(f"{rule}\n")
f.write("END:VEVENT\n")
def save_events_to_vdir(events, org_vdir_path, progress, task_id, dry_run=False):
"""
Save events to vdir format (one file per event).
Args:
events: List of event dictionaries
org_vdir_path: Path to save the event files
progress: Progress object for updating UI
task_id: Task ID for progress tracking
dry_run: If True, don't actually write files
Returns:
Number of events processed
"""
if dry_run:
progress.console.print(f"[DRY-RUN] Would save {len(events)} events to vdir format in {org_vdir_path}")
return len(events)
os.makedirs(org_vdir_path, exist_ok=True)
progress.console.print(f"Saving events to vdir format in {org_vdir_path}...")
# Create a dictionary to track existing files and their metadata
existing_files = {}
for file_path in glob.glob(os.path.join(org_vdir_path, "*.ics")):
file_name = os.path.basename(file_path)
file_mod_time = os.path.getmtime(file_path)
existing_files[file_name] = {
'path': file_path,
'mtime': file_mod_time
}
processed_files = set()
for event in events:
progress.advance(task_id)
if 'start' not in event or 'end' not in event:
continue
# Parse start and end times with timezone information
start = parser.isoparse(event['start']['dateTime'])
end = parser.isoparse(event['end']['dateTime'])
uid = event.get('iCalUId', '')
if not uid:
# Generate a unique ID if none exists
uid = f"outlook-{event.get('id', '')}"
# Create a filename based on the UID
safe_filename = re.sub(r'[^\w\-]', '_', uid) + ".ics"
event_path = os.path.join(org_vdir_path, safe_filename)
processed_files.add(safe_filename)
# Check if we need to update this file
should_update = True
if safe_filename in existing_files:
# Only update if the event has been modified since the file was last updated
if 'lastModifiedDateTime' in event:
last_modified = parser.isoparse(event['lastModifiedDateTime']).timestamp()
file_mtime = existing_files[safe_filename]['mtime']
if last_modified <= file_mtime:
should_update = False
progress.console.print(f"Skipping unchanged event: {event['subject']}")
if should_update:
with open(event_path, 'w') as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
write_event_to_ical(f, event, start, end)
f.write("END:VCALENDAR\n")
# Remove files for events that no longer exist in the calendar view
for file_name in existing_files:
if file_name not in processed_files:
progress.console.print(f"Removing obsolete event file: {truncate_id(file_name)}")
os.remove(existing_files[file_name]['path'])
progress.console.print(f"Saved {len(events)} events to {org_vdir_path}")
return len(events)
def save_events_to_file(events, output_file, progress, task_id, dry_run=False):
"""
Save all events to a single iCalendar file.
Args:
events: List of event dictionaries
output_file: Path to the output file
progress: Progress object for updating UI
task_id: Task ID for progress tracking
dry_run: If True, don't actually write the file
Returns:
Number of events processed
"""
if dry_run:
progress.console.print(f"[DRY-RUN] Would save events to {output_file}")
return len(events)
os.makedirs(os.path.dirname(output_file), exist_ok=True)
progress.console.print(f"Saving events to {output_file}...")
with open(output_file, 'w') as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
for event in events:
progress.advance(task_id)
if 'start' in event and 'end' in event:
# Parse start and end times with timezone information
start = parser.isoparse(event['start']['dateTime'])
end = parser.isoparse(event['end']['dateTime'])
write_event_to_ical(f, event, start, end)
f.write("END:VCALENDAR\n")
progress.console.print(f"Saved events to {output_file}")
return len(events)

View File

@@ -0,0 +1,3 @@
"""
Mail utilities module for email operations.
"""

114
utils/mail_utils/helpers.py Normal file
View File

@@ -0,0 +1,114 @@
"""
Mail utility helper functions.
"""
import os
import json
import time
from datetime import datetime
def truncate_id(message_id, length=8):
"""
Truncate a message ID to a reasonable length for display.
Args:
message_id (str): The message ID to truncate.
length (int): The number of characters to keep.
Returns:
str: The truncated message ID.
"""
if not message_id:
return ""
if len(message_id) <= length:
return message_id
return f"{message_id[:length]}..."
def load_last_sync_timestamp():
"""
Load the last synchronization timestamp from a file.
Returns:
float: The timestamp of the last synchronization, or 0 if not available.
"""
try:
with open('sync_timestamp.json', 'r') as f:
data = json.load(f)
return data.get('timestamp', 0)
except (FileNotFoundError, json.JSONDecodeError):
return 0
def save_sync_timestamp():
"""
Save the current timestamp as the last synchronization timestamp.
Returns:
None
"""
current_time = time.time()
with open('sync_timestamp.json', 'w') as f:
json.dump({'timestamp': current_time}, f)
def format_datetime(dt_str, format_string="%m/%d %I:%M %p"):
"""
Format a datetime string from ISO format.
Args:
dt_str (str): ISO format datetime string.
format_string (str): Format string for the output.
Returns:
str: Formatted datetime string.
"""
if not dt_str:
return ""
try:
dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
return dt.strftime(format_string)
except (ValueError, AttributeError):
return dt_str
def safe_filename(filename):
"""
Convert a string to a safe filename.
Args:
filename (str): Original filename.
Returns:
str: Safe filename with invalid characters replaced.
"""
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename
def ensure_directory_exists(directory):
"""
Ensure that a directory exists, creating it if necessary.
Args:
directory (str): The directory path to check/create.
Returns:
None
"""
if not os.path.exists(directory):
os.makedirs(directory)
def parse_maildir_name(filename):
"""
Parse a Maildir filename to extract components.
Args:
filename (str): The maildir filename.
Returns:
tuple: (message_id, flags) components of the filename.
"""
# Maildir filename format: unique-id:flags
if ':' in filename:
message_id, flags = filename.split(':', 1)
else:
message_id = filename
flags = ''
return message_id, flags

270
utils/mail_utils/maildir.py Normal file
View File

@@ -0,0 +1,270 @@
"""
Maildir operations for handling local mail storage.
"""
import os
import email
import base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import time
import aiohttp
import re
from utils.calendar_utils import truncate_id
from utils.mail_utils.helpers import safe_filename, ensure_directory_exists, format_datetime
async def save_mime_to_maildir_async(maildir_path, message, attachments_dir, headers, progress, dry_run=False, download_attachments=False):
"""
Save a message from Microsoft Graph API to a Maildir.
Args:
maildir_path (str): Path to the Maildir.
message (dict): Message data from Microsoft Graph API.
attachments_dir (str): Path to save attachments.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
dry_run (bool): If True, don't actually save files.
download_attachments (bool): If True, download email attachments.
Returns:
None
"""
message_id = message.get('id', '')
# Determine target directory based on read status
target_dir = os.path.join(maildir_path, 'cur' if message.get('isRead', False) else 'new')
ensure_directory_exists(target_dir)
# Check if the file already exists in either new or cur
new_path = os.path.join(maildir_path, 'new', f"{message_id}.eml")
cur_path = os.path.join(maildir_path, 'cur', f"{message_id}.eml")
if os.path.exists(new_path) or os.path.exists(cur_path):
return # Skip if already exists
# Create MIME email
mime_msg = await create_mime_message_async(message, headers, attachments_dir, progress, download_attachments)
# Only save file if not in dry run mode
if not dry_run:
with open(os.path.join(target_dir, f"{message_id}.eml"), 'wb') as f:
f.write(mime_msg.as_bytes())
else:
progress.console.print(f"[DRY-RUN] Would save message: {message.get('subject', 'No Subject')}")
async def create_mime_message_async(message, headers, attachments_dir, progress, download_attachments=False):
"""
Create a MIME message from Microsoft Graph API message data.
Args:
message (dict): Message data from Microsoft Graph API.
headers (dict): Headers including authentication.
attachments_dir (str): Path to save attachments.
progress: Progress instance for updating progress bars.
download_attachments (bool): If True, download email attachments.
Returns:
MIMEMultipart: The MIME message.
"""
# Create a new MIMEMultipart message
mime_msg = MIMEMultipart()
# Message headers
mime_msg['Message-ID'] = message.get('id', '')
mime_msg['Subject'] = message.get('subject', 'No Subject')
# Sender information
sender = message.get('from', {}).get('emailAddress', {})
if sender:
mime_msg['From'] = f"{sender.get('name', '')} <{sender.get('address', '')}>".strip()
# Recipients
to_recipients = message.get('toRecipients', [])
cc_recipients = message.get('ccRecipients', [])
if to_recipients:
to_list = [f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() for r in to_recipients]
mime_msg['To'] = ', '.join(to_list)
if cc_recipients:
cc_list = [f"{r.get('emailAddress', {}).get('name', '')} <{r.get('emailAddress', {}).get('address', '')}>".strip() for r in cc_recipients]
mime_msg['Cc'] = ', '.join(cc_list)
# Date
received_datetime = message.get('receivedDateTime', '')
if received_datetime:
mime_msg['Date'] = received_datetime
# First try the direct body content approach
message_id = message.get('id', '')
try:
# First get the message with body content
body_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}?$select=body,bodyPreview"
async with aiohttp.ClientSession() as session:
async with session.get(body_url, headers=headers) as response:
if response.status == 200:
body_data = await response.json()
# Get body content
body_content = body_data.get('body', {}).get('content', '')
body_type = body_data.get('body', {}).get('contentType', 'text')
body_preview = body_data.get('bodyPreview', '')
# If we have body content, use it
if body_content:
if body_type.lower() == 'html':
# Add both HTML and plain text versions
# Plain text conversion
plain_text = re.sub(r'<br\s*/?>', '\n', body_content)
plain_text = re.sub(r'<[^>]*>', '', plain_text)
mime_msg.attach(MIMEText(plain_text, 'plain'))
mime_msg.attach(MIMEText(body_content, 'html'))
else:
# Just plain text
mime_msg.attach(MIMEText(body_content, 'plain'))
elif body_preview:
# Use preview if we have it
mime_msg.attach(MIMEText(f"{body_preview}\n\n[Message preview only. Full content not available.]", 'plain'))
else:
# Fallback to MIME content
progress.console.print(f"No direct body content for message {truncate_id(message_id)}, trying MIME content...")
await fetch_mime_content(mime_msg, message_id, headers, progress)
else:
progress.console.print(f"Failed to get message body: {response.status}. Trying MIME content...")
await fetch_mime_content(mime_msg, message_id, headers, progress)
except Exception as e:
progress.console.print(f"Error getting message body: {e}. Trying MIME content...")
await fetch_mime_content(mime_msg, message_id, headers, progress)
# Handle attachments only if we want to download them
if download_attachments:
await add_attachments_async(mime_msg, message, headers, attachments_dir, progress)
else:
# Add a header to indicate attachment info was skipped
mime_msg['X-Attachments-Skipped'] = 'True'
return mime_msg
async def fetch_mime_content(mime_msg, message_id, headers, progress):
"""
Fetch and add MIME content to a message when direct body access fails.
Args:
mime_msg (MIMEMultipart): The message to add content to.
message_id (str): Message ID.
headers (dict): Headers including authentication.
progress: Progress instance for updating progress bars.
"""
# Fallback to getting the MIME content
message_content_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/$value"
try:
async with aiohttp.ClientSession() as session:
async with session.get(message_content_url, headers=headers) as response:
if response.status == 200:
full_content = await response.text()
# Check for body tags
body_match = re.search(r'<body[^>]*>(.*?)</body>', full_content, re.DOTALL | re.IGNORECASE)
if body_match:
body_content = body_match.group(1)
# Simple HTML to text conversion
body_text = re.sub(r'<br\s*/?>', '\n', body_content)
body_text = re.sub(r'<[^>]*>', '', body_text)
# Add the plain text body
mime_msg.attach(MIMEText(body_text, 'plain'))
# Also add the HTML body
mime_msg.attach(MIMEText(full_content, 'html'))
else:
# Fallback - try to find content between Content-Type: text/html and next boundary
html_parts = re.findall(r'Content-Type: text/html.*?\r?\n\r?\n(.*?)(?:\r?\n\r?\n|$)',
full_content, re.DOTALL | re.IGNORECASE)
if html_parts:
html_content = html_parts[0]
mime_msg.attach(MIMEText(html_content, 'html'))
# Also make plain text version
plain_text = re.sub(r'<br\s*/?>', '\n', html_content)
plain_text = re.sub(r'<[^>]*>', '', plain_text)
mime_msg.attach(MIMEText(plain_text, 'plain'))
else:
# Just use the raw content as text if nothing else works
mime_msg.attach(MIMEText(full_content, 'plain'))
progress.console.print(f"Using raw content for message {message_id} - no body tags found")
else:
error_text = await response.text()
progress.console.print(f"Failed to get MIME content: {response.status} {error_text}")
mime_msg.attach(MIMEText(f"Failed to retrieve message body: HTTP {response.status}", 'plain'))
except Exception as e:
progress.console.print(f"Error retrieving MIME content: {e}")
mime_msg.attach(MIMEText(f"Failed to retrieve message body: {str(e)}", 'plain'))
async def add_attachments_async(mime_msg, message, headers, attachments_dir, progress):
"""
Add attachments to a MIME message.
Args:
mime_msg (MIMEMultipart): The MIME message to add attachments to.
message (dict): Message data from Microsoft Graph API.
headers (dict): Headers including authentication.
attachments_dir (str): Path to save attachments.
progress: Progress instance for updating progress bars.
Returns:
None
"""
message_id = message.get('id', '')
# Get attachments list
attachments_url = f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments"
async with aiohttp.ClientSession() as session:
async with session.get(attachments_url, headers=headers) as response:
if response.status != 200:
return
attachments_data = await response.json()
attachments = attachments_data.get('value', [])
if not attachments:
return
# Create a directory for this message's attachments
message_attachments_dir = os.path.join(attachments_dir, message_id)
ensure_directory_exists(message_attachments_dir)
# Add a header with attachment count
mime_msg['X-Attachment-Count'] = str(len(attachments))
for idx, attachment in enumerate(attachments):
attachment_name = safe_filename(attachment.get('name', 'attachment'))
attachment_type = attachment.get('contentType', 'application/octet-stream')
# Add attachment info to headers for reference
mime_msg[f'X-Attachment-{idx+1}-Name'] = attachment_name
mime_msg[f'X-Attachment-{idx+1}-Type'] = attachment_type
attachment_part = MIMEBase(*attachment_type.split('/', 1))
# Get attachment content
if 'contentBytes' in attachment:
attachment_content = base64.b64decode(attachment['contentBytes'])
# Save attachment to disk
attachment_path = os.path.join(message_attachments_dir, attachment_name)
with open(attachment_path, 'wb') as f:
f.write(attachment_content)
# Add to MIME message
attachment_part.set_payload(attachment_content)
encoders.encode_base64(attachment_part)
attachment_part.add_header('Content-Disposition', f'attachment; filename="{attachment_name}"')
mime_msg.attach(attachment_part)
progress.console.print(f"Downloaded attachment: {attachment_name}")
else:
progress.console.print(f"Skipping attachment with no content: {attachment_name}")