Files
luk/fetch_outlook.py
2025-05-06 11:44:36 -06:00

435 lines
19 KiB
Python

import os
import re
from typing import Set
import msal
import json
import glob
from datetime import datetime
from dateutil import parser
from dateutil.tz import UTC
from email.message import EmailMessage
from email.utils import format_datetime
from rich import print
from rich.progress import Progress, SpinnerColumn, MofNCompleteColumn
from rich.panel import Panel
import time
import html2text
import asyncio
import argparse
import aiohttp
# Filepath for caching timestamp
cache_timestamp_file = 'cache_timestamp.json'
# Filepath for sync timestamp
sync_timestamp_file = 'sync_timestamp.json'
# Function to load the last sync timestamp
def load_last_sync_timestamp():
if os.path.exists(sync_timestamp_file):
with open(sync_timestamp_file, 'r') as f:
return json.load(f).get('last_sync', 0)
return 0
# Function to save the current sync timestamp
def save_sync_timestamp():
with open(sync_timestamp_file, 'w') as f:
json.dump({'last_sync': time.time()}, f)
# Add argument parsing for dry-run mode
arg_parser = argparse.ArgumentParser(description="Fetch and synchronize emails.")
arg_parser.add_argument("--dry-run", action="store_true", help="Run in dry-run mode without making changes.", default=False)
args = arg_parser.parse_args()
dry_run = args.dry_run
async def fetch_with_aiohttp(url, headers):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}")
return await response.json()
async def post_with_aiohttp(url, headers, json_data):
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json_data) as response:
if response.status != 201:
raise Exception(f"Failed to post to {url}: {response.status} {await response.text()}")
return await response.json()
async def patch_with_aiohttp(url, headers, json_data):
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=json_data) as response:
if response.status != 200:
raise Exception(f"Failed to patch {url}: {response.status} {await response.text()}")
return await response.json()
async def delete_with_aiohttp(url, headers):
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
if response.status != 204:
raise Exception(f"Failed to delete {url}: {response.status} {await response.text()}")
async def synchronize_maildir_async(maildir_path, headers, progress, task_id):
last_sync = load_last_sync_timestamp()
current_time = time.time()
# Find messages moved from "new" to "cur" and mark them as read
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml')))
moved_to_cur = [os.path.basename(f) for f in cur_files - new_files]
progress.update(task_id, total=len(moved_to_cur))
for filename in moved_to_cur:
message_id = filename.split('.')[0] # Extract the Message-ID from the filename
if not dry_run:
await patch_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
headers,
{'isRead': True}
)
else:
progress.console.print(f"[DRY-RUN] Would mark message as read: {message_id}")
progress.advance(task_id)
# Save the current sync timestamp
if not dry_run:
save_sync_timestamp()
else:
progress.console.print("[DRY-RUN] Would save sync timestamp.")
async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_id):
mail_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead,body,attachments'
messages = []
# Fetch the total count of messages in the inbox
inbox_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox'
response = await fetch_with_aiohttp(inbox_url, headers)
total_messages = response.get('totalItemCount', 0)
progress.update(task_id, total=total_messages)
while mail_url:
response_data = await fetch_with_aiohttp(mail_url, headers)
messages.extend(response_data.get('value', []))
progress.advance(task_id, len(response_data.get('value', [])))
# Get the next page URL from @odata.nextLink
mail_url = response_data.get('@odata.nextLink')
inbox_msg_ids = set(message['id'] for message in messages)
progress.update(task_id, completed=(len(messages) / 2))
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml')))
for filename in Set.union(cur_files, new_files):
message_id = filename.split('.')[0].split('/')[-1] # Extract the Message-ID from the filename
if (message_id not in inbox_msg_ids):
if not dry_run:
progress.console.print(f"Deleting {filename} from inbox")
os.remove(filename)
else:
progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox")
for message in messages:
progress.console.print(f"Processing message: {message.get('subject', 'No Subject')}", end='\r')
save_email_to_maildir(maildir_path, message, attachments_dir, progress)
progress.update(task_id, advance=0.5)
progress.update(task_id, completed=len(messages))
progress.console.print(f"\nFinished saving {len(messages)} messages.")
async def archive_mail_async(maildir_path, headers, progress, task_id):
archive_dir = os.path.join(maildir_path, '.Archives')
archive_files = glob.glob(os.path.join(archive_dir, '**', '*.eml'), recursive=True)
progress.update(task_id, total=len(archive_files))
folder_response = await fetch_with_aiohttp('https://graph.microsoft.com/v1.0/me/mailFolders', headers)
folders = folder_response.get('value', [])
archive_folder_id = next((folder.get('id') for folder in folders if folder.get('displayName', '').lower() == 'archive'), None)
if not archive_folder_id:
raise Exception("No folder named 'Archive' found on the server.")
for filepath in archive_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
progress.console.print(f"Moving message to 'Archive' folder: {message_id}")
if not dry_run:
response = await post_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/move',
headers,
{'destinationId': archive_folder_id}
)
if response.status_code != 201: # 201 Created indicates success
progress.console.print(f"Failed to move message to 'Archive': {message_id}, {response.status_code}, {response.text}")
if response.status_code == 404:
os.remove(filepath) # Remove the file from local archive if not found on server
else:
progress.console.print(f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}")
progress.advance(task_id)
return
async def delete_mail_async(maildir_path, headers, progress, task_id):
trash_dir = os.path.join(maildir_path, '.Trash', 'cur')
trash_files = set(glob.glob(os.path.join(trash_dir, '*.eml')))
progress.update(task_id, total=len(trash_files))
for filepath in trash_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
if not dry_run:
progress.console.print(f"Moving message to trash: {message_id}")
await delete_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
headers
)
os.remove(filepath) # Remove the file from local trash
else:
progress.console.print(f"[DRY-RUN] Would delete message: {message_id}")
progress.advance(task_id)
async def fetch_calendar_async(headers, progress, task_id):
total_event_url = 'https://graph.microsoft.com/v1.0/me/events?$count=true'
total = await fetch_with_aiohttp(total_event_url, headers)
total_events = total.get('@odata.count', 0)
progress.update(task_id, total=total_events)
calendar_url = 'https://graph.microsoft.com/v1.0/me/events?$top=100&$orderby=start/dateTime asc'
events = []
while calendar_url:
response_data = await fetch_with_aiohttp(calendar_url, headers)
events.extend(response_data.get('value', []))
progress.advance(task_id, len(response_data.get('value', [])))
# Get the next page URL from @odata.nextLink
calendar_url = response_data.get('@odata.nextLink')
async def download_calendar_events(headers, progress, task_id):
# Fetch the total count of events in the calendar
total_event_url = 'https://graph.microsoft.com/v1.0/me/events?$count=true'
total = await fetch_with_aiohttp(total_event_url, headers)
total_events = total.get('@odata.count', 0)
progress.update(task_id, total=total_events)
print(f"Total events in calendar: {total_events}")
# Fetch events with pagination and expand recurring events
events_url = 'https://graph.microsoft.com/v1.0/me/events?$top=100&$expand=instances'
events = []
progress.console.print("Fetching Calendar events...")
while events_url:
response_data = await fetch_with_aiohttp(events_url, headers)
events.extend(response_data.get('value', []))
events_url = response_data.get('@odata.nextLink')
progress.advance(task_id, len(response_data.get('value', [])))
# Save events to a file in iCalendar format
output_file = f'output_ics/outlook_events_latest.ics'
if not dry_run:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
progress.console.print(f"Saving events to {output_file}...")
with open(output_file, 'w') as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
for event in events:
if 'start' in event and 'end' in event:
start = parser.isoparse(event['start']['dateTime'])
end = parser.isoparse(event['end']['dateTime'])
f.write(f"BEGIN:VEVENT\nSUMMARY:{event['subject']}\n")
f.write(f"DTSTART:{start.strftime('%Y%m%dT%H%M%S')}\n")
f.write(f"DTEND:{end.strftime('%Y%m%dT%H%M%S')}\n")
if 'recurrence' in event and event['recurrence']: # Check if 'recurrence' exists and is not None
for rule in event['recurrence']:
if rule.startswith('RRULE'):
rule_parts = rule.split(';')
new_rule_parts = []
for part in rule_parts:
if part.startswith('UNTIL='):
until_value = part.split('=')[1]
until_date = parser.isoparse(until_value)
if start.tzinfo is not None and until_date.tzinfo is None:
until_date = until_date.replace(tzinfo=UTC)
new_rule_parts.append(f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}")
else:
new_rule_parts.append(part)
rule = ';'.join(new_rule_parts)
f.write(f"{rule}\n")
f.write("END:VEVENT\n")
f.write("END:VCALENDAR\n")
progress.console.print(f"Saved events to {output_file}")
else:
progress.console.print(f"[DRY-RUN] Would save events to {output_file}")
# Function to check if the cache is still valid
def is_cache_valid():
if 'timestamp' in cache_timestamp and 'max_age' in cache_timestamp:
current_time = time.time()
cache_expiry_time = cache_timestamp['timestamp'] + cache_timestamp['max_age']
return current_time < cache_expiry_time
return False
# Function to create Maildir structure
def create_maildir_structure(base_path):
os.makedirs(os.path.join(base_path, 'cur'), exist_ok=True)
os.makedirs(os.path.join(base_path, 'new'), exist_ok=True)
os.makedirs(os.path.join(base_path, 'tmp'), exist_ok=True)
def save_email_to_maildir(maildir_path, email_data, attachments_dir, progress):
# Create a new EmailMessage object
msg = EmailMessage()
received_datetime = email_data.get('receivedDateTime', '')
if received_datetime:
parsed_datetime = parser.isoparse(received_datetime)
msg['Date'] = format_datetime(parsed_datetime)
else:
msg['Date'] = ''
msg['Message-ID'] = email_data.get('id', '')
msg['Subject'] = email_data.get('subject', 'No Subject')
msg['From'] = email_data.get('from', {}).get('emailAddress', {}).get('address', 'unknown@unknown.com')
msg['To'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('toRecipients', [])])
msg['Cc'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('ccRecipients', [])])
# Convert the email body from HTML to Markdown
body_html = email_data.get('body', {}).get('content', '')
if email_data.get('body', {}).get('contentType', '').lower() == 'html':
markdown_converter = html2text.HTML2Text()
markdown_converter.ignore_images = True
markdown_converter.ignore_links = True
body_markdown = markdown_converter.handle(body_html)
else:
body_markdown = body_html
# Remove lines between any alphanumeric BannerStart and BannerEnd
body_markdown = re.sub(r'\w+BannerStart.*?\w+BannerEnd', '', body_markdown, flags=re.DOTALL)
msg.set_content(body_markdown)
# Download attachments
progress.console.print(f"Downloading attachments for message: {msg['Message-ID']}")
for attachment in email_data.get('attachments', []):
attachment_id = attachment.get('id')
attachment_name = attachment.get('name', 'unknown')
attachment_content = attachment.get('contentBytes')
if attachment_content:
attachment_path = os.path.join(attachments_dir, attachment_name)
if not dry_run:
with open(attachment_path, 'wb') as f:
f.write(attachment_content.encode('utf-8'))
msg.add_attachment(attachment_content.encode('utf-8'), filename=attachment_name)
else:
progress.console.print(f"[DRY-RUN] Would save attachment to {attachment_path}")
# Determine the directory based on isRead
target_dir = 'cur' if email_data.get('isRead', False) else 'new'
email_filename = f"{msg['Message-ID']}.eml"
email_filepath = os.path.join(maildir_path, target_dir, email_filename)
# Check if the file already exists in any subfolder
for root, _, files in os.walk(maildir_path):
if email_filename in files:
progress.console.print(f"Message {msg['Message-ID']} already exists in {root}. Skipping save.")
return
# Save the email to the Maildir
if not dry_run:
with open(email_filepath, 'w') as f:
f.write(msg.as_string())
progress.console.print(f"Saved message {msg['Message-ID']}")
else:
progress.console.print(f"[DRY-RUN] Would save message {msg['Message-ID']}")
async def main():
# Load cached timestamp if it exists
if os.path.exists(cache_timestamp_file):
with open(cache_timestamp_file, 'r') as f:
cache_timestamp = json.load(f)
else:
cache_timestamp = {}
# Save emails to Maildir
maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva"
attachments_dir = os.path.join(maildir_path, 'attachments')
os.makedirs(attachments_dir, exist_ok=True)
create_maildir_structure(maildir_path)
# Read Azure app credentials from environment variables
client_id = os.getenv('AZURE_CLIENT_ID')
tenant_id = os.getenv('AZURE_TENANT_ID')
if not client_id or not tenant_id:
raise ValueError("Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.")
# Token cache
cache = msal.SerializableTokenCache()
cache_file = 'token_cache.bin'
if os.path.exists(cache_file):
cache.deserialize(open(cache_file, 'r').read())
# Filepath for caching ETag
etag_cache_file = 'etag_cache.json'
# Load cached ETag if it exists
if os.path.exists(etag_cache_file):
with open(etag_cache_file, 'r') as f:
etag_cache = json.load(f)
else:
etag_cache = {}
# Authentication
authority = f'https://login.microsoftonline.com/{tenant_id}'
scopes = ['https://graph.microsoft.com/Calendars.Read', 'https://graph.microsoft.com/Mail.ReadWrite']
app = msal.PublicClientApplication(client_id, authority=authority, token_cache=cache)
accounts = app.get_accounts()
if accounts:
token_response = app.acquire_token_silent(scopes, account=accounts[0])
else:
flow = app.initiate_device_flow(scopes=scopes)
if 'user_code' not in flow:
raise Exception("Failed to create device flow")
print(Panel(flow['message'], border_style="magenta", padding=2, title="MSAL Login Flow Link"))
token_response = app.acquire_token_by_device_flow(flow)
if 'access_token' not in token_response:
raise Exception("Failed to acquire token")
# Save token cache
with open(cache_file, 'w') as f:
f.write(cache.serialize())
access_token = token_response['access_token']
headers = {'Authorization': f'Bearer {access_token}'}
accounts = app.get_accounts()
if not accounts:
raise Exception("No accounts found")
maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva"
progress = Progress(
SpinnerColumn(),
MofNCompleteColumn(),
*Progress.get_default_columns()
)
with progress:
task_fetch = progress.add_task("[green]Syncing Inbox...", total=0)
task_calendar = progress.add_task("[cyan]Fetching calendar...", total=0)
task_read = progress.add_task("[blue]Marking as read...", total=0)
task_archive = progress.add_task("[yellow]Archiving mail...", total=0)
task_delete = progress.add_task("[red]Deleting mail...", total=0)
await asyncio.gather(
synchronize_maildir_async(maildir_path, headers, progress, task_read),
archive_mail_async(maildir_path, headers, progress, task_archive),
delete_mail_async(maildir_path, headers, progress, task_delete),
fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_fetch),
fetch_calendar_async(headers, progress, task_calendar)
)
if __name__ == "__main__":
asyncio.run(main())