From 7e42644224e47df75b902dc6f3a21502296e585f Mon Sep 17 00:00:00 2001 From: Tim Bendt Date: Tue, 6 May 2025 11:44:36 -0600 Subject: [PATCH] async http requests --- fetch_outlook.py | 117 +++++++++++++++++++++++------------------------ 1 file changed, 56 insertions(+), 61 deletions(-) diff --git a/fetch_outlook.py b/fetch_outlook.py index dfd636f..c7eda77 100644 --- a/fetch_outlook.py +++ b/fetch_outlook.py @@ -2,7 +2,6 @@ import os import re from typing import Set import msal -import requests import json import glob from datetime import datetime @@ -11,12 +10,13 @@ from dateutil.tz import UTC from email.message import EmailMessage from email.utils import format_datetime from rich import print -from rich.progress import Progress, track, SpinnerColumn, MofNCompleteColumn +from rich.progress import Progress, SpinnerColumn, MofNCompleteColumn from rich.panel import Panel import time import html2text import asyncio import argparse +import aiohttp # Filepath for caching timestamp cache_timestamp_file = 'cache_timestamp.json' @@ -39,12 +39,39 @@ def save_sync_timestamp(): # Add argument parsing for dry-run mode arg_parser = argparse.ArgumentParser(description="Fetch and synchronize emails.") -arg_parser.add_argument("--dry-run", action="store_true", help="Run in dry-run mode without making changes.", default=True) +arg_parser.add_argument("--dry-run", action="store_true", help="Run in dry-run mode without making changes.", default=False) args = arg_parser.parse_args() dry_run = args.dry_run -async def mark_read_async(maildir_path, headers, progress, task_id): +async def fetch_with_aiohttp(url, headers): + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + if response.status != 200: + raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}") + return await response.json() + +async def post_with_aiohttp(url, headers, json_data): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, json=json_data) as response: + if response.status != 201: + raise Exception(f"Failed to post to {url}: {response.status} {await response.text()}") + return await response.json() + +async def patch_with_aiohttp(url, headers, json_data): + async with aiohttp.ClientSession() as session: + async with session.patch(url, headers=headers, json=json_data) as response: + if response.status != 200: + raise Exception(f"Failed to patch {url}: {response.status} {await response.text()}") + return await response.json() + +async def delete_with_aiohttp(url, headers): + async with aiohttp.ClientSession() as session: + async with session.delete(url, headers=headers) as response: + if response.status != 204: + raise Exception(f"Failed to delete {url}: {response.status} {await response.text()}") + +async def synchronize_maildir_async(maildir_path, headers, progress, task_id): last_sync = load_last_sync_timestamp() current_time = time.time() @@ -59,13 +86,11 @@ async def mark_read_async(maildir_path, headers, progress, task_id): for filename in moved_to_cur: message_id = filename.split('.')[0] # Extract the Message-ID from the filename if not dry_run: - response = requests.patch( + await patch_with_aiohttp( f'https://graph.microsoft.com/v1.0/me/messages/{message_id}', - headers=headers, - json={'isRead': True} + headers, + {'isRead': True} ) - if response.status_code != 200: - progress.console.print(Panel(f"Failed to mark message as read: {message_id}, {response.status_code}, {response.text}", padding=2, border_style="red")) else: progress.console.print(f"[DRY-RUN] Would mark message as read: {message_id}") progress.advance(task_id) @@ -82,21 +107,16 @@ async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, tas # Fetch the total count of messages in the inbox inbox_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox' - response = requests.get(inbox_url, headers=headers) + response = await fetch_with_aiohttp(inbox_url, headers) - if response.status_code != 200: - raise Exception(f"Failed to fetch inbox details: {response.status_code} {response.text}") - - total_messages = response.json().get('totalItemCount', 0) + total_messages = response.get('totalItemCount', 0) progress.update(task_id, total=total_messages) while mail_url: - response = requests.get(mail_url, headers=headers) - if response.status_code != 200: - raise Exception(f"Failed to fetch mail: {response.status_code} {response.text}") - response_data = response.json() + response_data = await fetch_with_aiohttp(mail_url, headers) messages.extend(response_data.get('value', [])) - progress.advance(task_id, (len(response_data.get('value', [])) / 2)) + progress.advance(task_id, len(response_data.get('value', []))) + # Get the next page URL from @odata.nextLink mail_url = response_data.get('@odata.nextLink') @@ -123,25 +143,14 @@ async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, tas progress.update(task_id, completed=len(messages)) progress.console.print(f"\nFinished saving {len(messages)} messages.") - - async def archive_mail_async(maildir_path, headers, progress, task_id): - # Find messages moved to ".Archives/**/*" and move them to the "Archive" folder on the server archive_dir = os.path.join(maildir_path, '.Archives') archive_files = glob.glob(os.path.join(archive_dir, '**', '*.eml'), recursive=True) progress.update(task_id, total=len(archive_files)) - # Fetch the list of folders to find the "Archive" folder ID - progress.console.print("Fetching server folders to locate 'Archive' folder...") - folder_response = requests.get('https://graph.microsoft.com/v1.0/me/mailFolders', headers=headers) - if folder_response.status_code != 200: - raise Exception(f"Failed to fetch mail folders: {folder_response.status_code}, {folder_response.text}") - folders = folder_response.json().get('value', []) - archive_folder_id = None - for folder in folders: - if folder.get('displayName', '').lower() == 'archive': - archive_folder_id = folder.get('id') - break + folder_response = await fetch_with_aiohttp('https://graph.microsoft.com/v1.0/me/mailFolders', headers) + folders = folder_response.get('value', []) + archive_folder_id = next((folder.get('id') for folder in folders if folder.get('displayName', '').lower() == 'archive'), None) if not archive_folder_id: raise Exception("No folder named 'Archive' found on the server.") @@ -150,10 +159,10 @@ async def archive_mail_async(maildir_path, headers, progress, task_id): message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename progress.console.print(f"Moving message to 'Archive' folder: {message_id}") if not dry_run: - response = requests.post( + response = await post_with_aiohttp( f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/move', - headers=headers, - json={'destinationId': archive_folder_id} + headers, + {'destinationId': archive_folder_id} ) if response.status_code != 201: # 201 Created indicates success progress.console.print(f"Failed to move message to 'Archive': {message_id}, {response.status_code}, {response.text}") @@ -165,58 +174,46 @@ async def archive_mail_async(maildir_path, headers, progress, task_id): return async def delete_mail_async(maildir_path, headers, progress, task_id): - # Find messages moved to ".Trash/cur" and delete them on the server trash_dir = os.path.join(maildir_path, '.Trash', 'cur') trash_files = set(glob.glob(os.path.join(trash_dir, '*.eml'))) progress.update(task_id, total=len(trash_files)) + for filepath in trash_files: message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename if not dry_run: progress.console.print(f"Moving message to trash: {message_id}") - response = requests.delete( + await delete_with_aiohttp( f'https://graph.microsoft.com/v1.0/me/messages/{message_id}', - headers=headers + headers ) - if response.status_code == 204: # 204 No Content indicates success - os.remove(filepath) # Remove the file from local trash + os.remove(filepath) # Remove the file from local trash else: progress.console.print(f"[DRY-RUN] Would delete message: {message_id}") progress.advance(task_id) async def fetch_calendar_async(headers, progress, task_id): total_event_url = 'https://graph.microsoft.com/v1.0/me/events?$count=true' - total = requests.get(total_event_url, headers=headers) - if (total.status_code != 200): - raise Exception(f"Failed to fetch count: {response.status_code} {response.text}") + total = await fetch_with_aiohttp(total_event_url, headers) - total_events = total.json().get('@odata.count', 0) + total_events = total.get('@odata.count', 0) progress.update(task_id, total=total_events) calendar_url = 'https://graph.microsoft.com/v1.0/me/events?$top=100&$orderby=start/dateTime asc' events = [] while calendar_url: - response = requests.get(calendar_url, headers=headers) - - if response.status_code != 200: - raise Exception(f"Failed to fetch calendar: {response.status_code} {response.text}") - - response_data = response.json() + response_data = await fetch_with_aiohttp(calendar_url, headers) events.extend(response_data.get('value', [])) progress.advance(task_id, len(response_data.get('value', []))) # Get the next page URL from @odata.nextLink calendar_url = response_data.get('@odata.nextLink') - # Call the synchronization function before fetching mail - async def download_calendar_events(headers, progress, task_id): # Fetch the total count of events in the calendar total_event_url = 'https://graph.microsoft.com/v1.0/me/events?$count=true' - total = requests.get(total_event_url, headers=headers) - if (response.status_code != 200): - raise Exception(f"Failed to fetch count: {response.status_code} {response.text}") + total = await fetch_with_aiohttp(total_event_url, headers) - total_events = total.json().get('@odata.count', 0) + total_events = total.get('@odata.count', 0) progress.update(task_id, total=total_events) print(f"Total events in calendar: {total_events}") @@ -225,10 +222,8 @@ async def download_calendar_events(headers, progress, task_id): events = [] progress.console.print("Fetching Calendar events...") while events_url: - response = requests.get(events_url, headers=headers) - response_data = response.json() + response_data = await fetch_with_aiohttp(events_url, headers) events.extend(response_data.get('value', [])) - # print(f"Fetched {len(events)} events so far...", end='\r') events_url = response_data.get('@odata.nextLink') progress.advance(task_id, len(response_data.get('value', []))) # Save events to a file in iCalendar format @@ -428,7 +423,7 @@ async def main(): task_delete = progress.add_task("[red]Deleting mail...", total=0) await asyncio.gather( - mark_read_async(maildir_path, headers, progress, task_read), + synchronize_maildir_async(maildir_path, headers, progress, task_read), archive_mail_async(maildir_path, headers, progress, task_archive), delete_mail_async(maildir_path, headers, progress, task_delete), fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_fetch),