feat: Add CalendarInvitePanel to display invite details in mail app
- Create CalendarInvitePanel widget showing event summary, time, location, organizer, and attendees with accept/decline/tentative buttons - Add is_calendar_email() to notification_detector for detecting invite emails - Add get_raw_message() to himalaya client for exporting full MIME content - Refactor calendar_parser.py with proper icalendar parsing (METHOD at VCALENDAR level, not VEVENT) - Integrate calendar panel into ContentContainer.display_content flow - Update tests for new calendar parsing API - Minor: fix today's header style in calendar WeekGrid
This commit is contained in:
@@ -27,6 +27,7 @@ dependencies = [
|
||||
"certifi>=2025.4.26",
|
||||
"click>=8.1.0",
|
||||
"html2text>=2025.4.15",
|
||||
"icalendar>=6.0.0",
|
||||
"mammoth>=1.9.0",
|
||||
"markitdown[all]>=0.1.1",
|
||||
"msal>=1.32.3",
|
||||
|
||||
@@ -199,7 +199,7 @@ class WeekGridHeader(Widget):
|
||||
style = Style(bold=True, reverse=True)
|
||||
elif day == today:
|
||||
# Highlight today with theme secondary color
|
||||
style = Style(bold=True, color="white", bgcolor=secondary_color)
|
||||
style = Style(bold=True, color=secondary_color)
|
||||
elif day.weekday() >= 5: # Weekend
|
||||
style = Style(color="bright_black")
|
||||
else:
|
||||
|
||||
@@ -84,9 +84,7 @@ NOTIFICATION_TYPES = [
|
||||
]
|
||||
|
||||
|
||||
def is_notification_email(
|
||||
envelope: dict[str, Any], content: str | None = None
|
||||
) -> bool:
|
||||
def is_notification_email(envelope: dict[str, Any], content: str | None = None) -> bool:
|
||||
"""Check if an email is a notification-style email.
|
||||
|
||||
Args:
|
||||
@@ -341,3 +339,43 @@ def _extract_general_notification_summary(content: str) -> dict[str, Any]:
|
||||
summary["action_items"] = summary["action_items"][:5]
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
# Calendar email patterns
|
||||
CALENDAR_SUBJECT_PATTERNS = [
|
||||
r"^canceled:",
|
||||
r"^cancelled:",
|
||||
r"^accepted:",
|
||||
r"^declined:",
|
||||
r"^tentative:",
|
||||
r"^updated:",
|
||||
r"^invitation:",
|
||||
r"^meeting\s+(request|update|cancel)",
|
||||
r"^\[calendar\]",
|
||||
r"invite\s+you\s+to",
|
||||
r"has\s+invited\s+you",
|
||||
]
|
||||
|
||||
|
||||
def is_calendar_email(envelope: dict[str, Any]) -> bool:
|
||||
"""Check if an email is a calendar invite/update/cancellation.
|
||||
|
||||
Args:
|
||||
envelope: Email envelope metadata from himalaya
|
||||
|
||||
Returns:
|
||||
True if email appears to be a calendar-related email
|
||||
"""
|
||||
subject = envelope.get("subject", "").lower().strip()
|
||||
|
||||
# Check subject patterns
|
||||
for pattern in CALENDAR_SUBJECT_PATTERNS:
|
||||
if re.search(pattern, subject, re.IGNORECASE):
|
||||
return True
|
||||
|
||||
# Check for meeting-related keywords in subject
|
||||
meeting_keywords = ["meeting", "appointment", "calendar", "invite", "rsvp"]
|
||||
if any(keyword in subject for keyword in meeting_keywords):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
"""Calendar ICS file parser utilities."""
|
||||
|
||||
import base64
|
||||
import re
|
||||
from typing import Optional, List
|
||||
from dataclasses import dataclass
|
||||
from icalendar import Calendar
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
from icalendar import Calendar
|
||||
except ImportError:
|
||||
Calendar = None # type: ignore
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -16,11 +21,11 @@ class ParsedCalendarEvent:
|
||||
summary: Optional[str] = None
|
||||
location: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
start: Optional[str] = None
|
||||
end: Optional[str] = None
|
||||
start: Optional[datetime] = None
|
||||
end: Optional[datetime] = None
|
||||
all_day: bool = False
|
||||
|
||||
# Calendar method
|
||||
# Calendar method (REQUEST, CANCEL, REPLY, etc.)
|
||||
method: Optional[str] = None
|
||||
|
||||
# Organizer
|
||||
@@ -28,75 +33,266 @@ class ParsedCalendarEvent:
|
||||
organizer_email: Optional[str] = None
|
||||
|
||||
# Attendees
|
||||
attendees: List[str] = []
|
||||
attendees: List[str] = field(default_factory=list)
|
||||
|
||||
# Status
|
||||
# Status (CONFIRMED, TENTATIVE, CANCELLED)
|
||||
status: Optional[str] = None
|
||||
|
||||
# UID for matching with Graph API
|
||||
uid: Optional[str] = None
|
||||
|
||||
def parse_calendar_part(content: str) -> Optional[ParsedCalendarEvent]:
|
||||
"""Parse calendar MIME part content."""
|
||||
|
||||
try:
|
||||
calendar = Calendar.from_ical(content)
|
||||
def extract_ics_from_mime(raw_message: str) -> Optional[str]:
|
||||
"""Extract ICS calendar content from raw MIME message.
|
||||
|
||||
# Get first event (most invites are single events)
|
||||
if calendar.events:
|
||||
event = calendar.events[0]
|
||||
Looks for text/calendar parts and base64-decoded .ics attachments.
|
||||
|
||||
# Extract organizer
|
||||
organizer = event.get("organizer")
|
||||
if organizer:
|
||||
organizer_name = organizer.cn if organizer else None
|
||||
organizer_email = organizer.email if organizer else None
|
||||
Args:
|
||||
raw_message: Full raw email in EML/MIME format
|
||||
|
||||
# Extract attendees
|
||||
attendees = []
|
||||
if event.get("attendees"):
|
||||
for attendee in event.attendees:
|
||||
email = attendee.email if attendee else None
|
||||
name = attendee.cn if attendee else None
|
||||
if email:
|
||||
attendees.append(f"{name} ({email})" if name else email)
|
||||
else:
|
||||
attendees.append(email)
|
||||
return ParsedCalendarEvent(
|
||||
summary=event.get("summary"),
|
||||
location=event.get("location"),
|
||||
description=event.get("description"),
|
||||
start=str(event.get("dtstart")) if event.get("dtstart") else None,
|
||||
end=str(event.get("dtend")) if event.get("dtend") else None,
|
||||
all_day=event.get("x-google", "all-day") == "true",
|
||||
method=event.get("method"),
|
||||
organizer_name=organizer_name,
|
||||
organizer_email=organizer_email,
|
||||
attendees=attendees,
|
||||
status=event.get("status"),
|
||||
Returns:
|
||||
ICS content string if found, None otherwise
|
||||
"""
|
||||
# Pattern 1: Look for inline text/calendar content
|
||||
# Content-Type: text/calendar followed by the ICS content
|
||||
calendar_pattern = re.compile(
|
||||
r"Content-Type:\s*text/calendar[^\n]*\n"
|
||||
r"(?:Content-Transfer-Encoding:\s*(\w+)[^\n]*\n)?"
|
||||
r"(?:[^\n]+\n)*?" # Other headers
|
||||
r"\n" # Empty line before content
|
||||
r"(BEGIN:VCALENDAR.*?END:VCALENDAR)",
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
|
||||
match = calendar_pattern.search(raw_message)
|
||||
if match:
|
||||
encoding = match.group(1)
|
||||
ics_content = match.group(2)
|
||||
|
||||
if encoding and encoding.lower() == "base64":
|
||||
try:
|
||||
# Remove line breaks and decode
|
||||
ics_bytes = base64.b64decode(
|
||||
ics_content.replace("\n", "").replace("\r", "")
|
||||
)
|
||||
return ics_bytes.decode("utf-8", errors="replace")
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing calendar ICS {e}")
|
||||
logging.debug(f"Failed to decode base64 ICS: {e}")
|
||||
else:
|
||||
return ics_content
|
||||
|
||||
# Pattern 2: Look for base64-encoded text/calendar
|
||||
base64_pattern = re.compile(
|
||||
r"Content-Type:\s*text/calendar[^\n]*\n"
|
||||
r"Content-Transfer-Encoding:\s*base64[^\n]*\n"
|
||||
r"(?:[^\n]+\n)*?" # Other headers
|
||||
r"\n" # Empty line before content
|
||||
r"([A-Za-z0-9+/=\s]+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
match = base64_pattern.search(raw_message)
|
||||
if match:
|
||||
try:
|
||||
b64_content = (
|
||||
match.group(1).replace("\n", "").replace("\r", "").replace(" ", "")
|
||||
)
|
||||
ics_bytes = base64.b64decode(b64_content)
|
||||
return ics_bytes.decode("utf-8", errors="replace")
|
||||
except Exception as e:
|
||||
logging.debug(f"Failed to decode base64 calendar: {e}")
|
||||
|
||||
# Pattern 3: Just look for raw VCALENDAR block
|
||||
vcal_pattern = re.compile(r"(BEGIN:VCALENDAR.*?END:VCALENDAR)", re.DOTALL)
|
||||
match = vcal_pattern.search(raw_message)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def parse_calendar_attachment(attachment_content: str) -> Optional[ParsedCalendarEvent]:
|
||||
"""Parse calendar file attachment."""
|
||||
def parse_ics_content(ics_content: str) -> Optional[ParsedCalendarEvent]:
|
||||
"""Parse ICS calendar content into a ParsedCalendarEvent.
|
||||
|
||||
# Handle base64 encoded ICS files
|
||||
Args:
|
||||
ics_content: Raw ICS/iCalendar content string
|
||||
|
||||
Returns:
|
||||
ParsedCalendarEvent if parsing succeeded, None otherwise
|
||||
"""
|
||||
if Calendar is None:
|
||||
logging.warning("icalendar library not installed, cannot parse ICS")
|
||||
return None
|
||||
|
||||
try:
|
||||
# Handle bytes input
|
||||
if isinstance(ics_content, bytes):
|
||||
ics_content = ics_content.decode("utf-8", errors="replace")
|
||||
|
||||
calendar = Calendar.from_ical(ics_content)
|
||||
|
||||
# METHOD is a calendar-level property, not event-level
|
||||
method = str(calendar.get("method", "")).upper() or None
|
||||
|
||||
# Get first VEVENT component
|
||||
events = [c for c in calendar.walk() if c.name == "VEVENT"]
|
||||
if not events:
|
||||
logging.debug("No VEVENT found in calendar")
|
||||
return None
|
||||
|
||||
event = events[0]
|
||||
|
||||
# Extract organizer info
|
||||
organizer_name = None
|
||||
organizer_email = None
|
||||
organizer = event.get("organizer")
|
||||
if organizer:
|
||||
# Organizer can be a vCalAddress object
|
||||
organizer_name = (
|
||||
str(organizer.params.get("CN", ""))
|
||||
if hasattr(organizer, "params")
|
||||
else None
|
||||
)
|
||||
# Extract email from mailto: URI
|
||||
organizer_str = str(organizer)
|
||||
if organizer_str.lower().startswith("mailto:"):
|
||||
organizer_email = organizer_str[7:]
|
||||
else:
|
||||
organizer_email = organizer_str
|
||||
|
||||
# Extract attendees
|
||||
attendees = []
|
||||
attendee_list = event.get("attendee")
|
||||
if attendee_list:
|
||||
# Can be a single attendee or a list
|
||||
if not isinstance(attendee_list, list):
|
||||
attendee_list = [attendee_list]
|
||||
for att in attendee_list:
|
||||
att_str = str(att)
|
||||
if att_str.lower().startswith("mailto:"):
|
||||
att_email = att_str[7:]
|
||||
else:
|
||||
att_email = att_str
|
||||
att_name = (
|
||||
str(att.params.get("CN", "")) if hasattr(att, "params") else None
|
||||
)
|
||||
if att_name and att_email:
|
||||
attendees.append(f"{att_name} <{att_email}>")
|
||||
elif att_email:
|
||||
attendees.append(att_email)
|
||||
|
||||
# Extract start/end times
|
||||
start_dt = None
|
||||
end_dt = None
|
||||
all_day = False
|
||||
|
||||
dtstart = event.get("dtstart")
|
||||
if dtstart:
|
||||
dt_val = dtstart.dt
|
||||
if hasattr(dt_val, "hour"):
|
||||
start_dt = dt_val
|
||||
else:
|
||||
# Date only = all day event
|
||||
start_dt = dt_val
|
||||
all_day = True
|
||||
|
||||
dtend = event.get("dtend")
|
||||
if dtend:
|
||||
end_dt = dtend.dt
|
||||
|
||||
return ParsedCalendarEvent(
|
||||
summary=str(event.get("summary", "")) or None,
|
||||
location=str(event.get("location", "")) or None,
|
||||
description=str(event.get("description", "")) or None,
|
||||
start=start_dt,
|
||||
end=end_dt,
|
||||
all_day=all_day,
|
||||
method=method,
|
||||
organizer_name=organizer_name,
|
||||
organizer_email=organizer_email,
|
||||
attendees=attendees,
|
||||
status=str(event.get("status", "")).upper() or None,
|
||||
uid=str(event.get("uid", "")) or None,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error parsing calendar ICS: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def parse_calendar_from_raw_message(raw_message: str) -> Optional[ParsedCalendarEvent]:
|
||||
"""Extract and parse calendar event from raw email message.
|
||||
|
||||
Args:
|
||||
raw_message: Full raw email in EML/MIME format
|
||||
|
||||
Returns:
|
||||
ParsedCalendarEvent if found and parsed, None otherwise
|
||||
"""
|
||||
ics_content = extract_ics_from_mime(raw_message)
|
||||
if ics_content:
|
||||
return parse_ics_content(ics_content)
|
||||
return None
|
||||
|
||||
|
||||
# Legacy function names for compatibility
|
||||
def parse_calendar_part(content: str) -> Optional[ParsedCalendarEvent]:
|
||||
"""Parse calendar MIME part content. Legacy wrapper for parse_ics_content."""
|
||||
return parse_ics_content(content)
|
||||
|
||||
|
||||
def parse_calendar_attachment(attachment_content: str) -> Optional[ParsedCalendarEvent]:
|
||||
"""Parse base64-encoded calendar file attachment."""
|
||||
try:
|
||||
decoded = base64.b64decode(attachment_content)
|
||||
return parse_calendar_part(decoded)
|
||||
|
||||
return parse_ics_content(decoded.decode("utf-8", errors="replace"))
|
||||
except Exception as e:
|
||||
logging.error(f"Error decoding calendar attachment: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def is_cancelled_event(event: ParsedCalendarEvent) -> bool:
|
||||
"""Check if event is cancelled."""
|
||||
return event.method == "CANCEL"
|
||||
"""Check if event is a cancellation."""
|
||||
return event.method == "CANCEL" or event.status == "CANCELLED"
|
||||
|
||||
|
||||
def is_event_request(event: ParsedCalendarEvent) -> bool:
|
||||
"""Check if event is an invite request."""
|
||||
return event.method == "REQUEST"
|
||||
|
||||
|
||||
def format_event_time(event: ParsedCalendarEvent) -> str:
|
||||
"""Format event time for display.
|
||||
|
||||
Returns a human-readable string like:
|
||||
- "Mon, Dec 30, 2025 2:00 PM - 3:00 PM"
|
||||
- "All day: Mon, Dec 30, 2025"
|
||||
"""
|
||||
if not event.start:
|
||||
return "Time not specified"
|
||||
|
||||
if event.all_day:
|
||||
if hasattr(event.start, "strftime"):
|
||||
return f"All day: {event.start.strftime('%a, %b %d, %Y')}"
|
||||
return f"All day: {event.start}"
|
||||
|
||||
try:
|
||||
start_str = (
|
||||
event.start.strftime("%a, %b %d, %Y %I:%M %p")
|
||||
if hasattr(event.start, "strftime")
|
||||
else str(event.start)
|
||||
)
|
||||
if event.end and hasattr(event.end, "strftime"):
|
||||
# Same day? Just show end time
|
||||
if (
|
||||
hasattr(event.start, "date")
|
||||
and hasattr(event.end, "date")
|
||||
and event.start.date() == event.end.date()
|
||||
):
|
||||
end_str = event.end.strftime("%I:%M %p")
|
||||
else:
|
||||
end_str = event.end.strftime("%a, %b %d, %Y %I:%M %p")
|
||||
return f"{start_str} - {end_str}"
|
||||
return start_str
|
||||
except Exception:
|
||||
return str(event.start)
|
||||
|
||||
190
src/mail/widgets/CalendarInvitePanel.py
Normal file
190
src/mail/widgets/CalendarInvitePanel.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""Calendar invite panel widget for displaying calendar event details with actions."""
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.widgets import Static, Button, Label
|
||||
from textual.reactive import reactive
|
||||
from textual.message import Message
|
||||
|
||||
from src.mail.utils.calendar_parser import (
|
||||
ParsedCalendarEvent,
|
||||
is_cancelled_event,
|
||||
is_event_request,
|
||||
format_event_time,
|
||||
)
|
||||
|
||||
|
||||
class CalendarInvitePanel(Vertical):
|
||||
"""Panel displaying calendar invite details with accept/decline/tentative actions.
|
||||
|
||||
This widget shows at the top of the ContentContainer when viewing a calendar email.
|
||||
"""
|
||||
|
||||
DEFAULT_CSS = """
|
||||
CalendarInvitePanel {
|
||||
height: auto;
|
||||
max-height: 14;
|
||||
padding: 1;
|
||||
margin-bottom: 1;
|
||||
background: $surface;
|
||||
border: solid $primary;
|
||||
}
|
||||
|
||||
CalendarInvitePanel.cancelled {
|
||||
border: solid $error;
|
||||
}
|
||||
|
||||
CalendarInvitePanel.request {
|
||||
border: solid $success;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .event-badge {
|
||||
padding: 0 1;
|
||||
margin-right: 1;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .event-badge.cancelled {
|
||||
background: $error;
|
||||
color: $text;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .event-badge.request {
|
||||
background: $success;
|
||||
color: $text;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .event-badge.reply {
|
||||
background: $warning;
|
||||
color: $text;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .event-title {
|
||||
text-style: bold;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .event-detail {
|
||||
color: $text-muted;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .action-buttons {
|
||||
height: auto;
|
||||
margin-top: 1;
|
||||
}
|
||||
|
||||
CalendarInvitePanel .action-buttons Button {
|
||||
margin-right: 1;
|
||||
}
|
||||
"""
|
||||
|
||||
class InviteAction(Message):
|
||||
"""Message sent when user takes an action on the invite."""
|
||||
|
||||
def __init__(self, action: str, event: ParsedCalendarEvent) -> None:
|
||||
self.action = action # "accept", "decline", "tentative"
|
||||
self.event = event
|
||||
super().__init__()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
event: ParsedCalendarEvent,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self.event = event
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Compose the calendar invite panel."""
|
||||
# Determine badge and styling based on method
|
||||
badge_text, badge_class = self._get_badge_info()
|
||||
|
||||
with Horizontal():
|
||||
yield Label(badge_text, classes=f"event-badge {badge_class}")
|
||||
yield Label(
|
||||
self.event.summary or "Calendar Event",
|
||||
classes="event-title",
|
||||
)
|
||||
|
||||
# Event time
|
||||
time_str = format_event_time(self.event)
|
||||
yield Static(f"\uf017 {time_str}", classes="event-detail") # nf-fa-clock_o
|
||||
|
||||
# Location if present
|
||||
if self.event.location:
|
||||
yield Static(
|
||||
f"\uf041 {self.event.location}", # nf-fa-map_marker
|
||||
classes="event-detail",
|
||||
)
|
||||
|
||||
# Organizer
|
||||
if self.event.organizer_name or self.event.organizer_email:
|
||||
organizer = self.event.organizer_name or self.event.organizer_email
|
||||
yield Static(
|
||||
f"\uf007 {organizer}", # nf-fa-user
|
||||
classes="event-detail",
|
||||
)
|
||||
|
||||
# Attendees count
|
||||
if self.event.attendees:
|
||||
count = len(self.event.attendees)
|
||||
yield Static(
|
||||
f"\uf0c0 {count} attendee{'s' if count != 1 else ''}", # nf-fa-users
|
||||
classes="event-detail",
|
||||
)
|
||||
|
||||
# Action buttons (only for REQUEST method, not for CANCEL)
|
||||
if is_event_request(self.event):
|
||||
with Horizontal(classes="action-buttons"):
|
||||
yield Button(
|
||||
"\uf00c Accept", # nf-fa-check
|
||||
id="btn-accept",
|
||||
variant="success",
|
||||
)
|
||||
yield Button(
|
||||
"? Tentative",
|
||||
id="btn-tentative",
|
||||
variant="warning",
|
||||
)
|
||||
yield Button(
|
||||
"\uf00d Decline", # nf-fa-times
|
||||
id="btn-decline",
|
||||
variant="error",
|
||||
)
|
||||
elif is_cancelled_event(self.event):
|
||||
yield Static(
|
||||
"[dim]This meeting has been cancelled[/dim]",
|
||||
classes="event-detail",
|
||||
)
|
||||
|
||||
def _get_badge_info(self) -> tuple[str, str]:
|
||||
"""Get badge text and CSS class based on event method."""
|
||||
method = self.event.method or ""
|
||||
|
||||
if method == "CANCEL" or self.event.status == "CANCELLED":
|
||||
return "CANCELLED", "cancelled"
|
||||
elif method == "REQUEST":
|
||||
return "INVITE", "request"
|
||||
elif method == "REPLY":
|
||||
return "REPLY", "reply"
|
||||
elif method == "COUNTER":
|
||||
return "COUNTER", "reply"
|
||||
else:
|
||||
return "EVENT", ""
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Apply styling based on event type."""
|
||||
if is_cancelled_event(self.event):
|
||||
self.add_class("cancelled")
|
||||
elif is_event_request(self.event):
|
||||
self.add_class("request")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle action button presses."""
|
||||
button_id = event.button.id
|
||||
|
||||
if button_id == "btn-accept":
|
||||
self.post_message(self.InviteAction("accept", self.event))
|
||||
elif button_id == "btn-tentative":
|
||||
self.post_message(self.InviteAction("tentative", self.event))
|
||||
elif button_id == "btn-decline":
|
||||
self.post_message(self.InviteAction("decline", self.event))
|
||||
@@ -12,7 +12,12 @@ from src.mail.screens.LinkPanel import (
|
||||
LinkItem as LinkItemClass,
|
||||
)
|
||||
from src.mail.notification_compressor import create_compressor
|
||||
from src.mail.notification_detector import NotificationType
|
||||
from src.mail.notification_detector import NotificationType, is_calendar_email
|
||||
from src.mail.utils.calendar_parser import (
|
||||
ParsedCalendarEvent,
|
||||
parse_calendar_from_raw_message,
|
||||
)
|
||||
from src.mail.widgets.CalendarInvitePanel import CalendarInvitePanel
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Literal, List, Dict, Any, Optional
|
||||
@@ -177,6 +182,10 @@ class ContentContainer(ScrollableContainer):
|
||||
self.current_notification_type: Optional[NotificationType] = None
|
||||
self.is_compressed_view: bool = False
|
||||
|
||||
# Calendar invite state
|
||||
self.calendar_panel: Optional[CalendarInvitePanel] = None
|
||||
self.current_calendar_event: Optional[ParsedCalendarEvent] = None
|
||||
|
||||
# Load default view mode and notification compression from config
|
||||
config = get_config()
|
||||
self.current_mode = config.content_display.default_view_mode
|
||||
@@ -236,6 +245,23 @@ class ContentContainer(ScrollableContainer):
|
||||
self.notify("No message ID provided.")
|
||||
return
|
||||
|
||||
# Check if this is a calendar email and fetch raw message for ICS parsing
|
||||
if self.current_envelope and is_calendar_email(self.current_envelope):
|
||||
raw_content, raw_success = await himalaya_client.get_raw_message(
|
||||
message_id, folder=self.current_folder, account=self.current_account
|
||||
)
|
||||
if raw_success and raw_content:
|
||||
calendar_event = parse_calendar_from_raw_message(raw_content)
|
||||
if calendar_event:
|
||||
self.current_calendar_event = calendar_event
|
||||
self._show_calendar_panel(calendar_event)
|
||||
else:
|
||||
self._hide_calendar_panel()
|
||||
else:
|
||||
self._hide_calendar_panel()
|
||||
else:
|
||||
self._hide_calendar_panel()
|
||||
|
||||
content, success = await himalaya_client.get_message_content(
|
||||
message_id, folder=self.current_folder, account=self.current_account
|
||||
)
|
||||
@@ -349,3 +375,40 @@ class ContentContainer(ScrollableContainer):
|
||||
if not self.current_content:
|
||||
return []
|
||||
return extract_links_from_content(self.current_content)
|
||||
|
||||
def _show_calendar_panel(self, event: ParsedCalendarEvent) -> None:
|
||||
"""Show the calendar invite panel at the top of the content."""
|
||||
# Remove existing panel if any
|
||||
self._hide_calendar_panel()
|
||||
|
||||
# Create and mount new panel at the beginning
|
||||
self.calendar_panel = CalendarInvitePanel(event, id="calendar_invite_panel")
|
||||
self.mount(self.calendar_panel, before=0)
|
||||
|
||||
def _hide_calendar_panel(self) -> None:
|
||||
"""Hide/remove the calendar invite panel."""
|
||||
self.current_calendar_event = None
|
||||
if self.calendar_panel:
|
||||
try:
|
||||
self.calendar_panel.remove()
|
||||
except Exception:
|
||||
pass # Panel may already be removed
|
||||
self.calendar_panel = None
|
||||
|
||||
def on_calendar_invite_panel_invite_action(
|
||||
self, event: CalendarInvitePanel.InviteAction
|
||||
) -> None:
|
||||
"""Handle calendar invite actions from the panel.
|
||||
|
||||
Bubbles the action up to the app level for processing.
|
||||
"""
|
||||
# Get the app and call the appropriate action
|
||||
action = event.action
|
||||
calendar_event = event.event
|
||||
|
||||
if action == "accept":
|
||||
self.app.action_accept_invite()
|
||||
elif action == "decline":
|
||||
self.app.action_decline_invite()
|
||||
elif action == "tentative":
|
||||
self.app.action_tentative_invite()
|
||||
|
||||
@@ -482,3 +482,62 @@ def sync_himalaya():
|
||||
print("Himalaya sync completed successfully.")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error during Himalaya sync: {e}")
|
||||
|
||||
|
||||
async def get_raw_message(
|
||||
message_id: int,
|
||||
folder: Optional[str] = None,
|
||||
account: Optional[str] = None,
|
||||
) -> Tuple[Optional[str], bool]:
|
||||
"""
|
||||
Retrieve the full raw message (EML format) by its ID.
|
||||
|
||||
This exports the complete MIME message including all parts (text, HTML,
|
||||
attachments like ICS calendar files). Useful for parsing calendar invites.
|
||||
|
||||
Args:
|
||||
message_id: The ID of the message to retrieve
|
||||
folder: The folder containing the message
|
||||
account: The account to use
|
||||
|
||||
Returns:
|
||||
Tuple containing:
|
||||
- Raw message content (EML format) or None if retrieval failed
|
||||
- Success status (True if operation was successful)
|
||||
"""
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
try:
|
||||
# Create a temporary directory for the export
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
eml_path = os.path.join(tmpdir, f"message_{message_id}.eml")
|
||||
|
||||
cmd = f"himalaya message export -F -d '{eml_path}' {message_id}"
|
||||
if folder:
|
||||
cmd += f" -f '{folder}'"
|
||||
if account:
|
||||
cmd += f" -a '{account}'"
|
||||
|
||||
process = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
|
||||
if process.returncode == 0:
|
||||
# Read the exported EML file
|
||||
if os.path.exists(eml_path):
|
||||
with open(eml_path, "r", encoding="utf-8", errors="replace") as f:
|
||||
content = f.read()
|
||||
return content, True
|
||||
else:
|
||||
logging.error(f"EML file not created at {eml_path}")
|
||||
return None, False
|
||||
else:
|
||||
logging.error(f"Error exporting raw message: {stderr.decode()}")
|
||||
return None, False
|
||||
except Exception as e:
|
||||
logging.error(f"Exception during raw message export: {e}")
|
||||
return None, False
|
||||
|
||||
@@ -1,7 +1,14 @@
|
||||
"""Unit tests for calendar email detection and ICS parsing."""
|
||||
|
||||
import pytest
|
||||
from src.mail.utils import calendar_parser
|
||||
from src.mail.utils.calendar_parser import (
|
||||
parse_ics_content,
|
||||
parse_calendar_from_raw_message,
|
||||
extract_ics_from_mime,
|
||||
is_cancelled_event,
|
||||
is_event_request,
|
||||
ParsedCalendarEvent,
|
||||
)
|
||||
from src.mail.notification_detector import is_calendar_email
|
||||
|
||||
|
||||
@@ -44,58 +51,91 @@ class TestICSParsing:
|
||||
"""Test ICS file parsing."""
|
||||
|
||||
def test_parse_cancellation_ics(self):
|
||||
"""Test parsing of cancellation ICS from test fixture."""
|
||||
import base64
|
||||
from pathlib import Path
|
||||
"""Test parsing of cancellation ICS."""
|
||||
ics_content = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//Test//Test//EN
|
||||
METHOD:CANCEL
|
||||
BEGIN:VEVENT
|
||||
UID:test-uid-001
|
||||
SUMMARY:Technical Refinement Meeting
|
||||
DTSTART:20251230T140000Z
|
||||
DTEND:20251230T150000Z
|
||||
STATUS:CANCELLED
|
||||
ORGANIZER;CN=Test Organizer:mailto:organizer@example.com
|
||||
ATTENDEE;CN=Test Attendee:mailto:attendee@example.com
|
||||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
|
||||
fixture_path = Path(
|
||||
"tests/fixtures/test_mailbox/INBOX/cur/17051226-calendar-invite-001.test:2"
|
||||
)
|
||||
if not fixture_path.exists():
|
||||
pytest.skip("Test fixture file not found")
|
||||
return
|
||||
|
||||
with open(fixture_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
event = parse_calendar_part(content)
|
||||
event = parse_ics_content(ics_content)
|
||||
assert event is not None
|
||||
assert is_cancelled_event(event) is True
|
||||
assert event.method == "CANCEL"
|
||||
assert event.summary == "Technical Refinement Meeting"
|
||||
|
||||
def test_parse_invite_ics(self):
|
||||
"""Test parsing of invite ICS from test fixture."""
|
||||
import base64
|
||||
from pathlib import Path
|
||||
"""Test parsing of invite/request ICS."""
|
||||
ics_content = """BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
PRODID:-//Test//Test//EN
|
||||
METHOD:REQUEST
|
||||
BEGIN:VEVENT
|
||||
UID:test-uid-002
|
||||
SUMMARY:Team Standup
|
||||
DTSTART:20251230T100000Z
|
||||
DTEND:20251230T103000Z
|
||||
STATUS:CONFIRMED
|
||||
ORGANIZER;CN=Test Organizer:mailto:organizer@example.com
|
||||
ATTENDEE;CN=Test Attendee:mailto:attendee@example.com
|
||||
LOCATION:Conference Room A
|
||||
END:VEVENT
|
||||
END:VCALENDAR"""
|
||||
|
||||
fixture_path = Path(
|
||||
"tests/fixtures/test_mailbox/INBOX/cur/17051226-calendar-invite-001.test:2"
|
||||
)
|
||||
if not fixture_path.exists():
|
||||
pytest.skip("Test fixture file not found")
|
||||
return
|
||||
|
||||
with open(fixture_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
event = parse_calendar_part(content)
|
||||
event = parse_ics_content(ics_content)
|
||||
assert event is not None
|
||||
assert is_event_request(event) is True
|
||||
assert event.method == "REQUEST"
|
||||
assert event.summary == "Technical Refinement Meeting"
|
||||
assert event.summary == "Team Standup"
|
||||
assert event.location == "Conference Room A"
|
||||
|
||||
def test_invalid_ics(self):
|
||||
"""Test parsing of invalid ICS content."""
|
||||
event = parse_calendar_part("invalid ics content")
|
||||
|
||||
event = parse_ics_content("invalid ics content")
|
||||
assert event is None # Should return None for invalid ICS
|
||||
|
||||
def test_base64_decoding(self):
|
||||
"""Test base64 decoding of ICS attachment."""
|
||||
# Test that we can decode base64
|
||||
encoded = "BASE64ENCODED_I_TEST"
|
||||
import base64
|
||||
def test_extract_ics_from_mime(self):
|
||||
"""Test extraction of ICS from raw MIME message."""
|
||||
raw_message = """From: organizer@example.com
|
||||
To: attendee@example.com
|
||||
Subject: Meeting Invite
|
||||
Content-Type: multipart/mixed; boundary="boundary123"
|
||||
|
||||
decoded = base64.b64decode(encoded)
|
||||
assert decoded == encoded
|
||||
--boundary123
|
||||
Content-Type: text/plain
|
||||
|
||||
You have been invited to a meeting.
|
||||
|
||||
--boundary123
|
||||
Content-Type: text/calendar
|
||||
|
||||
BEGIN:VCALENDAR
|
||||
VERSION:2.0
|
||||
METHOD:REQUEST
|
||||
BEGIN:VEVENT
|
||||
UID:mime-test-001
|
||||
SUMMARY:MIME Test Meeting
|
||||
DTSTART:20251230T140000Z
|
||||
DTEND:20251230T150000Z
|
||||
END:VEVENT
|
||||
END:VCALENDAR
|
||||
--boundary123--
|
||||
"""
|
||||
ics = extract_ics_from_mime(raw_message)
|
||||
assert ics is not None
|
||||
assert "BEGIN:VCALENDAR" in ics
|
||||
assert "MIME Test Meeting" in ics
|
||||
|
||||
event = parse_ics_content(ics)
|
||||
assert event is not None
|
||||
assert event.summary == "MIME Test Meeting"
|
||||
assert event.method == "REQUEST"
|
||||
|
||||
Reference in New Issue
Block a user