calendar replies

This commit is contained in:
Bendt
2026-01-02 10:20:10 -05:00
parent 8a121d7fec
commit efe417b41a
6 changed files with 411 additions and 188 deletions

BIN
.coverage

Binary file not shown.

View File

@@ -425,7 +425,7 @@ async def _sync_outlook_data(
# Define scopes for Microsoft Graph API # Define scopes for Microsoft Graph API
scopes = [ scopes = [
"https://graph.microsoft.com/Calendars.ReadWrite", "https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite", "https://graph.microsoft.com/Mail.ReadWrite",
] ]
@@ -721,7 +721,7 @@ def sync(
# This prevents the TUI from appearing to freeze during device flow auth # This prevents the TUI from appearing to freeze during device flow auth
if not demo: if not demo:
scopes = [ scopes = [
"https://graph.microsoft.com/Calendars.ReadWrite", "https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite", "https://graph.microsoft.com/Mail.ReadWrite",
] ]
if not has_valid_cached_token(scopes): if not has_valid_cached_token(scopes):
@@ -963,7 +963,7 @@ def interactive(org, vdir, notify, dry_run, demo):
# This prevents the TUI from appearing to freeze during device flow auth # This prevents the TUI from appearing to freeze during device flow auth
if not demo: if not demo:
scopes = [ scopes = [
"https://graph.microsoft.com/Calendars.ReadWrite", "https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite", "https://graph.microsoft.com/Mail.ReadWrite",
] ]
if not has_valid_cached_token(scopes): if not has_valid_cached_token(scopes):

View File

@@ -1103,7 +1103,7 @@ async def run_dashboard_sync(
# Get auth token # Get auth token
scopes = [ scopes = [
"https://graph.microsoft.com/Calendars.ReadWrite", "https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite", "https://graph.microsoft.com/Mail.ReadWrite",
] ]
access_token, headers = get_access_token(scopes) access_token, headers = get_access_token(scopes)

View File

@@ -1,14 +1,22 @@
"""Calendar invite actions for mail app. """Calendar invite actions for mail app.
Allows responding to calendar invites directly from email. Allows responding to calendar invites directly from email using ICS/SMTP.
Uses the iTIP (iCalendar Transport-Independent Interoperability Protocol)
standard to send REPLY messages via email instead of requiring Calendar.ReadWrite
API permissions.
""" """
import asyncio
import aiohttp
import logging import logging
import os import os
import time
from datetime import datetime, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Optional, Tuple from typing import Optional, Tuple
from src.mail.utils.calendar_parser import ParsedCalendarEvent
# Set up dedicated RSVP logger # Set up dedicated RSVP logger
rsvp_logger = logging.getLogger("calendar_rsvp") rsvp_logger = logging.getLogger("calendar_rsvp")
rsvp_logger.setLevel(logging.DEBUG) rsvp_logger.setLevel(logging.DEBUG)
@@ -22,145 +30,306 @@ if not rsvp_logger.handlers:
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
rsvp_logger.addHandler(handler) rsvp_logger.addHandler(handler)
# Timeout for API calls (seconds)
API_TIMEOUT = 15
# Required scopes for calendar operations def _get_user_email() -> Optional[str]:
CALENDAR_SCOPES = [ """Get the current user's email address from MSAL cache.
"https://graph.microsoft.com/Calendars.ReadWrite",
Returns:
User's email address if found, None otherwise.
"""
import msal
client_id = os.getenv("AZURE_CLIENT_ID")
tenant_id = os.getenv("AZURE_TENANT_ID")
if not client_id or not tenant_id:
rsvp_logger.warning("Azure credentials not configured")
return None
cache_file = os.path.expanduser("~/.local/share/luk/token_cache.bin")
if not os.path.exists(cache_file):
rsvp_logger.warning("Token cache file not found")
return None
try:
cache = msal.SerializableTokenCache()
cache.deserialize(open(cache_file, "r").read())
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.PublicClientApplication(
client_id, authority=authority, token_cache=cache
)
accounts = app.get_accounts()
if accounts:
# The username field contains the user's email
return accounts[0].get("username")
return None
except Exception as e:
rsvp_logger.error(f"Failed to get user email from MSAL: {e}")
return None
def _get_user_display_name() -> Optional[str]:
"""Get the current user's display name from MSAL cache.
Returns:
User's display name if found, None otherwise.
"""
import msal
client_id = os.getenv("AZURE_CLIENT_ID")
tenant_id = os.getenv("AZURE_TENANT_ID")
if not client_id or not tenant_id:
return None
cache_file = os.path.expanduser("~/.local/share/luk/token_cache.bin")
if not os.path.exists(cache_file):
return None
try:
cache = msal.SerializableTokenCache()
cache.deserialize(open(cache_file, "r").read())
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.PublicClientApplication(
client_id, authority=authority, token_cache=cache
)
accounts = app.get_accounts()
if accounts:
# Try to get name from account, fallback to username
name = accounts[0].get("name")
if name:
return name
# Fallback: construct name from email
username = accounts[0].get("username", "")
if "@" in username:
local_part = username.split("@")[0]
# Convert firstname.lastname to Firstname Lastname
parts = local_part.replace(".", " ").replace("_", " ").split()
return " ".join(p.capitalize() for p in parts)
return None
except Exception as e:
rsvp_logger.debug(f"Failed to get display name: {e}")
return None
def generate_ics_reply(
event: ParsedCalendarEvent,
response: str,
attendee_email: str,
attendee_name: Optional[str] = None,
) -> str:
"""Generate an iCalendar REPLY for a calendar invite.
Args:
event: The parsed calendar event from the original invite
response: Response type - 'ACCEPTED', 'TENTATIVE', or 'DECLINED'
attendee_email: The attendee's email address
attendee_name: The attendee's display name (optional)
Returns:
ICS content string formatted as an iTIP REPLY
"""
# Map response to PARTSTAT value
partstat_map = {
"accept": "ACCEPTED",
"tentativelyAccept": "TENTATIVE",
"decline": "DECLINED",
}
partstat = partstat_map.get(response, "ACCEPTED")
# Generate DTSTAMP in UTC format
dtstamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
# Build attendee line with proper formatting
if attendee_name:
attendee_line = (
f'ATTENDEE;PARTSTAT={partstat};CN="{attendee_name}":MAILTO:{attendee_email}'
)
else:
attendee_line = f"ATTENDEE;PARTSTAT={partstat}:MAILTO:{attendee_email}"
# Build organizer line
if event.organizer_name:
organizer_line = (
f'ORGANIZER;CN="{event.organizer_name}":MAILTO:{event.organizer_email}'
)
else:
organizer_line = f"ORGANIZER:MAILTO:{event.organizer_email}"
# Build the response subject prefix
response_prefix = {
"accept": "Accepted",
"tentativelyAccept": "Tentative",
"decline": "Declined",
}.get(response, "Accepted")
summary = f"{response_prefix}: {event.summary or '(no subject)'}"
# Build the ICS content following iTIP REPLY standard
ics_lines = [
"BEGIN:VCALENDAR",
"VERSION:2.0",
"PRODID:-//LUK Mail//Calendar Reply//EN",
"METHOD:REPLY",
"BEGIN:VEVENT",
f"UID:{event.uid}",
f"DTSTAMP:{dtstamp}",
organizer_line,
attendee_line,
f"SEQUENCE:{event.sequence}",
f"SUMMARY:{summary}",
"END:VEVENT",
"END:VCALENDAR",
] ]
return "\r\n".join(ics_lines)
def _get_auth_headers_sync() -> Optional[dict]:
"""Get auth headers synchronously using cached token only.
Returns None if no valid cached token exists (to avoid blocking on device flow).
"""
from src.services.microsoft_graph.auth import (
has_valid_cached_token,
get_access_token,
)
rsvp_logger.debug("Checking for valid cached token...")
if not has_valid_cached_token(CALENDAR_SCOPES):
rsvp_logger.warning("No valid cached token found")
return None
try:
rsvp_logger.debug("Getting access token from cache...")
_, headers = get_access_token(CALENDAR_SCOPES)
rsvp_logger.debug("Got auth headers successfully")
return headers
except Exception as e:
rsvp_logger.error(f"Failed to get auth headers: {e}")
return None
async def find_event_by_uid(uid: str, headers: dict) -> Optional[dict]: def build_calendar_reply_email(
"""Find a calendar event by its iCalUId. event: ParsedCalendarEvent,
response: str,
from_email: str,
to_email: str,
from_name: Optional[str] = None,
) -> str:
"""Build a MIME email with calendar REPLY attachment.
The email is formatted according to iTIP/iMIP standards so that
Exchange/Outlook will recognize it as a calendar action.
Args: Args:
uid: The iCalendar UID from the ICS file event: The parsed calendar event from the original invite
headers: Auth headers for MS Graph API
Returns:
Event dict if found, None otherwise
"""
rsvp_logger.info(f"Looking up event by UID: {uid}")
try:
# Search by iCalUId - this is the unique identifier that should match
uid_escaped = uid.replace("'", "''")
url = (
f"https://graph.microsoft.com/v1.0/me/events?"
f"$filter=iCalUId eq '{uid_escaped}'&"
f"$select=id,subject,organizer,start,end,responseStatus,iCalUId"
)
rsvp_logger.debug(f"Request URL: {url}")
# Use aiohttp directly with timeout
timeout = aiohttp.ClientTimeout(total=API_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
rsvp_logger.debug(f"Response status: {response.status}")
if response.status != 200:
error_text = await response.text()
rsvp_logger.error(f"API error: {response.status} - {error_text}")
return None
data = await response.json()
events = data.get("value", [])
rsvp_logger.info(f"Found {len(events)} events matching UID")
if events:
event = events[0]
rsvp_logger.debug(
f"Event found: {event.get('subject')} - ID: {event.get('id')}"
)
return event
return None
except asyncio.TimeoutError:
rsvp_logger.error(f"Timeout after {API_TIMEOUT}s looking up event by UID")
return None
except Exception as e:
rsvp_logger.error(f"Error finding event by UID: {e}", exc_info=True)
return None
async def respond_to_calendar_invite(
event_id: str, response: str, headers: dict
) -> Tuple[bool, str]:
"""Respond to a calendar invite.
Args:
event_id: Microsoft Graph event ID
response: Response type - 'accept', 'tentativelyAccept', or 'decline' response: Response type - 'accept', 'tentativelyAccept', or 'decline'
headers: Auth headers for MS Graph API from_email: Sender's email address
to_email: Recipient's email address (the organizer)
from_name: Sender's display name (optional)
Returns: Returns:
Tuple of (success, message) Complete RFC 5322 email as string
""" """
rsvp_logger.info(f"Responding to event {event_id} with: {response}") # Generate the ICS reply content
ics_content = generate_ics_reply(event, response, from_email, from_name)
try: # Build response text for email body
response_url = (
f"https://graph.microsoft.com/v1.0/me/events/{event_id}/{response}"
)
rsvp_logger.debug(f"Response URL: {response_url}")
# Use aiohttp directly with timeout
timeout = aiohttp.ClientTimeout(total=API_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(response_url, headers=headers, json={}) as resp:
rsvp_logger.debug(f"Response status: {resp.status}")
if resp.status in (200, 202):
response_text = { response_text = {
"accept": "accepted", "accept": "accepted",
"tentativelyAccept": "tentatively accepted", "tentativelyAccept": "tentatively accepted",
"decline": "declined", "decline": "declined",
}.get(response, response) }.get(response, "accepted")
rsvp_logger.info(f"Successfully {response_text} the meeting")
return True, f"Successfully {response_text} the meeting" subject_prefix = {
else: "accept": "Accepted",
error_text = await resp.text() "tentativelyAccept": "Tentative",
rsvp_logger.error( "decline": "Declined",
f"Failed to respond: {resp.status} - {error_text}" }.get(response, "Accepted")
)
return False, f"Failed to respond: {resp.status}" subject = f"{subject_prefix}: {event.summary or '(no subject)'}"
# Create the email message
msg = MIMEMultipart("mixed")
# Set headers
if from_name:
msg["From"] = f'"{from_name}" <{from_email}>'
else:
msg["From"] = from_email
msg["To"] = to_email
msg["Subject"] = subject
# Add Content-Class header for Exchange compatibility
msg["Content-Class"] = "urn:content-classes:calendarmessage"
# Create text body
body_text = f"This meeting has been {response_text}."
text_part = MIMEText(body_text, "plain", "utf-8")
msg.attach(text_part)
# Create calendar part with proper iTIP headers
# The content-type must include method=REPLY for Exchange to recognize it
calendar_part = MIMEText(ics_content, "calendar", "utf-8")
calendar_part.set_param("method", "REPLY")
calendar_part.add_header("Content-Disposition", "attachment", filename="invite.ics")
msg.attach(calendar_part)
return msg.as_string()
def queue_calendar_reply(
event: ParsedCalendarEvent,
response: str,
from_email: str,
to_email: str,
from_name: Optional[str] = None,
) -> Tuple[bool, str]:
"""Queue a calendar reply email for sending via the outbox.
Args:
event: The parsed calendar event from the original invite
response: Response type - 'accept', 'tentativelyAccept', or 'decline'
from_email: Sender's email address
to_email: Recipient's email address (the organizer)
from_name: Sender's display name (optional)
Returns:
Tuple of (success, message)
"""
try:
# Build the email
email_content = build_calendar_reply_email(
event, response, from_email, to_email, from_name
)
# Determine organization from email domain
org = "default"
if "@" in from_email:
domain = from_email.split("@")[1].lower()
# Map known domains to org names (matching sendmail script logic)
domain_to_org = {
"corteva.com": "corteva",
}
org = domain_to_org.get(domain, domain.split(".")[0])
# Queue the email in the outbox
base_path = os.path.expanduser(os.getenv("MAILDIR_PATH", "~/Mail"))
outbox_path = os.path.join(base_path, org, "outbox")
# Ensure directories exist
for subdir in ["new", "cur", "tmp", "failed"]:
dir_path = os.path.join(outbox_path, subdir)
os.makedirs(dir_path, exist_ok=True)
# Generate unique filename
timestamp = str(int(time.time() * 1000000))
hostname = os.uname().nodename
filename = f"{timestamp}.{os.getpid()}.{hostname}"
# Write to tmp first, then move to new (atomic operation)
tmp_path = os.path.join(outbox_path, "tmp", filename)
new_path = os.path.join(outbox_path, "new", filename)
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(email_content)
os.rename(tmp_path, new_path)
response_text = {
"accept": "accepted",
"tentativelyAccept": "tentatively accepted",
"decline": "declined",
}.get(response, "accepted")
rsvp_logger.info(
f"Queued calendar reply: {response_text} for '{event.summary}' to {event.organizer_email}"
)
return True, f"Response queued - will be sent on next sync"
except asyncio.TimeoutError:
rsvp_logger.error(f"Timeout after {API_TIMEOUT}s responding to invite")
return False, f"Request timed out after {API_TIMEOUT}s"
except Exception as e: except Exception as e:
rsvp_logger.error(f"Error responding to invite: {e}", exc_info=True) rsvp_logger.error(f"Failed to queue calendar reply: {e}", exc_info=True)
return False, f"Error: {str(e)}" return False, f"Failed to queue response: {str(e)}"
def action_accept_invite(app): def action_accept_invite(app):
@@ -179,7 +348,7 @@ def action_tentative_invite(app):
def _respond_to_current_invite(app, response: str): def _respond_to_current_invite(app, response: str):
"""Helper to respond to the current message's calendar invite.""" """Helper to respond to the current message's calendar invite using ICS/SMTP."""
from src.mail.widgets.ContentContainer import ContentContainer from src.mail.widgets.ContentContainer import ContentContainer
rsvp_logger.info(f"Starting invite response: {response}") rsvp_logger.info(f"Starting invite response: {response}")
@@ -190,18 +359,19 @@ def _respond_to_current_invite(app, response: str):
app.notify("No message selected", severity="warning") app.notify("No message selected", severity="warning")
return return
# Get auth headers FIRST (synchronously, before spawning worker) # Get user's email from MSAL cache
# This uses cached token only - won't block on device flow user_email = _get_user_email()
headers = _get_auth_headers_sync() if not user_email:
if not headers: rsvp_logger.error("Could not determine user email - run 'luk sync' first")
rsvp_logger.error("No valid auth token - user needs to run luk sync first")
app.notify( app.notify(
"Not authenticated. Run 'luk sync' first to login.", severity="error" "Could not determine your email. Run 'luk sync' first.", severity="error"
) )
return return
user_name = _get_user_display_name()
rsvp_logger.debug(f"User: {user_name} <{user_email}>")
# Get the parsed calendar event from ContentContainer # Get the parsed calendar event from ContentContainer
# This has the UID from the ICS which we can use for direct lookup
calendar_event = None calendar_event = None
try: try:
content_container = app.query_one(ContentContainer) content_container = app.query_one(ContentContainer)
@@ -216,61 +386,36 @@ def _respond_to_current_invite(app, response: str):
event_uid = calendar_event.uid event_uid = calendar_event.uid
event_summary = calendar_event.summary or "(no subject)" event_summary = calendar_event.summary or "(no subject)"
organizer_email = calendar_event.organizer_email
rsvp_logger.info(f"Calendar event: {event_summary}, UID: {event_uid}") rsvp_logger.info(
f"Calendar event: {event_summary}, UID: {event_uid}, Organizer: {organizer_email}"
)
if not event_uid: if not event_uid:
rsvp_logger.warning("No UID found in calendar event") rsvp_logger.warning("No UID found in calendar event")
app.notify("Calendar invite missing UID - cannot respond", severity="warning") app.notify("Calendar invite missing UID - cannot respond", severity="warning")
return return
app.run_worker( if not organizer_email:
_async_respond_to_invite(app, event_uid, event_summary, response, headers), rsvp_logger.warning("No organizer email found in calendar event")
exclusive=True,
name="respond_invite",
)
async def _async_respond_to_invite(
app, event_uid: str, event_summary: str, response: str, headers: dict
):
"""Async worker to find and respond to calendar invite using UID."""
rsvp_logger.info(f"Async response started for UID: {event_uid}")
app.notify(f"Looking up event...")
# Find event by UID (direct lookup, no search needed)
graph_event = await find_event_by_uid(event_uid, headers)
if not graph_event:
rsvp_logger.warning(f"Event not found for UID: {event_uid}")
app.notify( app.notify(
f"Event not found in calendar: {event_summary[:40]}", "Calendar invite missing organizer - cannot respond", severity="warning"
severity="warning",
) )
return return
event_id = graph_event.get("id") # Queue the calendar reply (organizer_email is guaranteed non-None here)
if not event_id: success, message = queue_calendar_reply(
rsvp_logger.error("No event ID in response") calendar_event, response, user_email, organizer_email, user_name
app.notify("Could not get event ID from calendar", severity="error") )
return
current_response = graph_event.get("responseStatus", {}).get("response", "")
rsvp_logger.debug(f"Current response status: {current_response}")
# Check if already responded
if current_response == "accepted" and response == "accept":
rsvp_logger.info("Already accepted")
app.notify("Already accepted this invite", severity="information")
return
elif current_response == "declined" and response == "decline":
rsvp_logger.info("Already declined")
app.notify("Already declined this invite", severity="information")
return
# Respond to the invite
success, message = await respond_to_calendar_invite(event_id, response, headers)
severity = "information" if success else "error" severity = "information" if success else "error"
app.notify(message, severity=severity) app.notify(message, severity=severity)
if success:
response_text = {
"accept": "Accepted",
"tentativelyAccept": "Tentatively accepted",
"decline": "Declined",
}.get(response, "Responded to")
rsvp_logger.info(f"{response_text} invite: {event_summary}")

View File

@@ -41,6 +41,9 @@ class ParsedCalendarEvent:
# UID for matching with Graph API # UID for matching with Graph API
uid: Optional[str] = None uid: Optional[str] = None
# Sequence number for iTIP REPLY
sequence: int = 0
def extract_ics_from_mime(raw_message: str) -> Optional[str]: def extract_ics_from_mime(raw_message: str) -> Optional[str]:
"""Extract ICS calendar content from raw MIME message. """Extract ICS calendar content from raw MIME message.
@@ -200,6 +203,15 @@ def parse_ics_content(ics_content: str) -> Optional[ParsedCalendarEvent]:
if dtend: if dtend:
end_dt = dtend.dt end_dt = dtend.dt
# Extract sequence number (defaults to 0)
sequence = 0
seq_val = event.get("sequence")
if seq_val is not None:
try:
sequence = int(seq_val)
except (ValueError, TypeError):
sequence = 0
return ParsedCalendarEvent( return ParsedCalendarEvent(
summary=str(event.get("summary", "")) or None, summary=str(event.get("summary", "")) or None,
location=str(event.get("location", "")) or None, location=str(event.get("location", "")) or None,
@@ -213,6 +225,7 @@ def parse_ics_content(ics_content: str) -> Optional[ParsedCalendarEvent]:
attendees=attendees, attendees=attendees,
status=str(event.get("status", "")).upper() or None, status=str(event.get("status", "")).upper() or None,
uid=str(event.get("uid", "")) or None, uid=str(event.get("uid", "")) or None,
sequence=sequence,
) )
except Exception as e: except Exception as e:

View File

@@ -2,6 +2,7 @@
Mail operations for Microsoft Graph API. Mail operations for Microsoft Graph API.
""" """
import base64
import os import os
import re import re
import glob import glob
@@ -860,30 +861,90 @@ def parse_email_for_graph_api(email_content: str) -> Dict[str, Any]:
cc_recipients = parse_recipients(msg.get("Cc", "")) cc_recipients = parse_recipients(msg.get("Cc", ""))
bcc_recipients = parse_recipients(msg.get("Bcc", "")) bcc_recipients = parse_recipients(msg.get("Bcc", ""))
# Get body content # Get body content and attachments
body_content = "" body_content = ""
body_type = "text" body_type = "text"
attachments: List[Dict[str, Any]] = []
if msg.is_multipart(): if msg.is_multipart():
for part in msg.walk(): for part in msg.walk():
if part.get_content_type() == "text/plain": content_type = part.get_content_type()
body_content = part.get_payload(decode=True).decode( content_disposition = part.get("Content-Disposition", "")
"utf-8", errors="ignore"
) # Skip multipart containers
if content_type.startswith("multipart/"):
continue
# Handle text/plain body
if content_type == "text/plain" and "attachment" not in content_disposition:
payload = part.get_payload(decode=True)
if payload:
body_content = payload.decode("utf-8", errors="ignore")
body_type = "text" body_type = "text"
break
elif part.get_content_type() == "text/html": # Handle text/html body
body_content = part.get_payload(decode=True).decode( elif (
"utf-8", errors="ignore" content_type == "text/html" and "attachment" not in content_disposition
) ):
payload = part.get_payload(decode=True)
if payload:
body_content = payload.decode("utf-8", errors="ignore")
body_type = "html" body_type = "html"
# Handle calendar attachments (text/calendar)
elif content_type == "text/calendar":
payload = part.get_payload(decode=True)
if payload:
# Get filename from Content-Disposition or use default
filename = part.get_filename() or "invite.ics"
# Base64 encode the content for Graph API
content_bytes = (
payload
if isinstance(payload, bytes)
else payload.encode("utf-8")
)
attachments.append(
{
"@odata.type": "#microsoft.graph.fileAttachment",
"name": filename,
"contentType": "text/calendar; method=REPLY",
"contentBytes": base64.b64encode(content_bytes).decode(
"ascii"
),
}
)
# Handle other attachments
elif "attachment" in content_disposition or part.get_filename():
payload = part.get_payload(decode=True)
if payload:
filename = part.get_filename() or "attachment"
content_bytes = (
payload
if isinstance(payload, bytes)
else payload.encode("utf-8")
)
attachments.append(
{
"@odata.type": "#microsoft.graph.fileAttachment",
"name": filename,
"contentType": content_type,
"contentBytes": base64.b64encode(content_bytes).decode(
"ascii"
),
}
)
else: else:
body_content = msg.get_payload(decode=True).decode("utf-8", errors="ignore") payload = msg.get_payload(decode=True)
if payload:
body_content = payload.decode("utf-8", errors="ignore")
if msg.get_content_type() == "text/html": if msg.get_content_type() == "text/html":
body_type = "html" body_type = "html"
# Build Graph API message # Build Graph API message
message = { message: Dict[str, Any] = {
"subject": msg.get("Subject", ""), "subject": msg.get("Subject", ""),
"body": {"contentType": body_type, "content": body_content}, "body": {"contentType": body_type, "content": body_content},
"toRecipients": to_recipients, "toRecipients": to_recipients,
@@ -891,6 +952,10 @@ def parse_email_for_graph_api(email_content: str) -> Dict[str, Any]:
"bccRecipients": bcc_recipients, "bccRecipients": bcc_recipients,
} }
# Add attachments if present
if attachments:
message["attachments"] = attachments
# Add reply-to if present # Add reply-to if present
reply_to = msg.get("Reply-To", "") reply_to = msg.get("Reply-To", "")
if reply_to: if reply_to: