Files
luk/src/utils/calendar_utils.py
Tim Bendt d7ca5e451d fix sync timestamp creation in calendar downloads
Ensures .sync_timestamp file is created during regular calendar sync (server → local) so daemon can properly detect local calendar changes and track sync history.

🤖 Generated with [opencode](https://opencode.ai)

Co-Authored-By: opencode <noreply@opencode.ai>
2025-07-16 10:08:32 -04:00

332 lines
11 KiB
Python

"""
Utility module for handling calendar events and iCalendar operations.
"""
import re
import os
from datetime import datetime, timedelta
from dateutil import parser
from dateutil.tz import UTC
import glob
def truncate_id(text, first=8, last=8):
"""
Truncate long IDs or filenames to show just the first and last few characters.
Args:
text: The ID or filename to truncate
first: Number of characters to keep from the beginning
last: Number of characters to keep from the end
Returns:
Truncated string with ellipsis in the middle
"""
if not text or len(text) <= first + last + 3:
return text
return f"{text[:first]}...{text[-last:]}"
def clean_text(text):
"""
Clean text by removing instances of 3 or more consecutive underscores
which can affect readability.
Args:
text: Text to clean
Returns:
Cleaned text
"""
if not text:
return ""
# Replace 3 or more consecutive underscores with 2 underscores
return re.sub(r"_{3,}", "__", text)
def escape_ical_text(text):
"""
Escape text for iCalendar format according to RFC 5545.
Args:
text: Text to escape
Returns:
Escaped text
"""
if not text:
return ""
# First clean multiple underscores
text = clean_text(text)
text = text.replace("\\", "\\\\")
text = text.replace("\n", "\\n")
text = text.replace(",", "\\,")
text = text.replace(";", "\\;")
return text
async def fetch_calendar_events(
headers,
days_back=1,
days_forward=6,
fetch_function=None,
start_date=None,
end_date=None,
):
"""
Fetch calendar events from Microsoft Graph API.
Args:
headers: Authentication headers for Microsoft Graph API
days_back: Number of days to look back (default: 1)
days_forward: Number of days to look forward (default: 6)
fetch_function: Async function to use for fetching data (default: None)
Should accept URL and headers as parameters
start_date: Optional explicit start date (datetime object)
end_date: Optional explicit end date (datetime object)
Returns:
Tuple of (events list, total_events count)
"""
if fetch_function is None:
raise ValueError("fetch_function is required for API calls")
# Calculate date range
if start_date is None:
start_date = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(
days=days_back
)
if end_date is None:
end_of_today = datetime.now().replace(hour=23, minute=59, second=59)
end_date = end_of_today + timedelta(days=days_forward)
# Build the API URL
event_base_url = f"https://graph.microsoft.com/v1.0/me/calendarView?startDateTime={start_date.isoformat()}&endDateTime={end_date.isoformat()}"
calendar_url = f"{event_base_url}&$top=100&$select=start,end,id,iCalUId,subject,bodyPreview,webLink,location,recurrence,showAs,responseStatus,onlineMeeting,lastModifiedDateTime"
# Fetch total count for progress reporting (if needed)
total_event_url = f"{event_base_url}&$count=true&$select=id"
try:
total_response = await fetch_function(total_event_url, headers)
total_events = total_response.get("@odata.count", 0)
except Exception as e:
print(f"Error fetching total events count: {e}")
total_events = 0
# Fetch all calendar events, handling pagination
events = []
while calendar_url:
try:
response_data = await fetch_function(calendar_url, headers)
if response_data:
events.extend(response_data.get("value", []))
# Get the next page URL from @odata.nextLink
calendar_url = response_data.get("@odata.nextLink")
else:
print("Received empty response from calendar API")
break
except Exception as e:
print(f"Error fetching calendar events: {e}")
break
# Only return the events and total_events
return events, total_events
def write_event_to_ical(f, event, start, end):
"""
Write a single event to an iCalendar file.
Args:
f: File-like object to write to
event: Dictionary containing event data
start: Start datetime with timezone information
end: End datetime with timezone information
"""
# Preserve the original timezones
start_tz = start.tzinfo
end_tz = end.tzinfo
f.write(f"BEGIN:VEVENT\nSUMMARY:{escape_ical_text(event['subject'])}\n")
# Handle multi-line description properly
description = event.get("bodyPreview", "")
if description:
escaped_description = escape_ical_text(description)
f.write(f"DESCRIPTION:{escaped_description}\n")
f.write(f"UID:{event.get('iCalUId', '')}\n")
f.write(
f"LOCATION:{escape_ical_text(event.get('location', {}).get('displayName', ''))}\n"
)
f.write(f"CLASS:{event.get('showAs', '')}\n")
f.write(f"STATUS:{event.get('responseStatus', {}).get('response', '')}\n")
if "onlineMeeting" in event and event["onlineMeeting"]:
f.write(f"URL:{event.get('onlineMeeting', {}).get('joinUrl', '')}\n")
# Write start and end times with timezone info in iCalendar format
if start.tzinfo == UTC:
f.write(f"DTSTART:{start.strftime('%Y%m%dT%H%M%SZ')}\n")
else:
tz_name = start_tz.tzname(None) if start_tz else "UTC"
f.write(f"DTSTART;TZID={tz_name}:{start.strftime('%Y%m%dT%H%M%S')}\n")
if end.tzinfo == UTC:
f.write(f"DTEND:{end.strftime('%Y%m%dT%H%M%SZ')}\n")
else:
tz_name = end_tz.tzname(None) if end_tz else "UTC"
f.write(f"DTEND;TZID={tz_name}:{end.strftime('%Y%m%dT%H%M%S')}\n")
# Handle recurrence rules
if "recurrence" in event and event["recurrence"]:
for rule in event["recurrence"]:
if rule.startswith("RRULE"):
rule_parts = rule.split(";")
new_rule_parts = []
for part in rule_parts:
if part.startswith("UNTIL="):
until_value = part.split("=")[1]
until_date = parser.isoparse(until_value)
if start.tzinfo is not None and until_date.tzinfo is None:
until_date = until_date.replace(tzinfo=start.tzinfo)
new_rule_parts.append(
f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}"
)
else:
new_rule_parts.append(part)
rule = ";".join(new_rule_parts)
f.write(f"{rule}\n")
f.write("END:VEVENT\n")
def save_events_to_vdir(events, org_vdir_path, progress, task_id, dry_run=False):
"""
Save events to vdir format (one file per event).
Args:
events: List of event dictionaries
org_vdir_path: Path to save the event files
progress: Progress object for updating UI
task_id: Task ID for progress tracking
dry_run: If True, don't actually write files
Returns:
Number of events processed
"""
if dry_run:
progress.console.print(
f"[DRY-RUN] Would save {len(events)} events to vdir format in {org_vdir_path}"
)
return len(events)
os.makedirs(org_vdir_path, exist_ok=True)
progress.console.print(f"Saving events to vdir format in {org_vdir_path}...")
# Create a dictionary to track existing files and their metadata
existing_files = {}
for file_path in glob.glob(os.path.join(org_vdir_path, "*.ics")):
file_name = os.path.basename(file_path)
file_mod_time = os.path.getmtime(file_path)
existing_files[file_name] = {"path": file_path, "mtime": file_mod_time}
processed_files = set()
for event in events:
progress.advance(task_id)
if "start" not in event or "end" not in event:
continue
# Parse start and end times with timezone information
start = parser.isoparse(event["start"]["dateTime"])
end = parser.isoparse(event["end"]["dateTime"])
uid = event.get("iCalUId", "")
if not uid:
# Generate a unique ID if none exists
uid = f"outlook-{event.get('id', '')}"
# Create a filename based on the UID
safe_filename = re.sub(r"[^\w\-]", "_", uid) + ".ics"
event_path = os.path.join(org_vdir_path, safe_filename)
processed_files.add(safe_filename)
# Check if we need to update this file
should_update = True
if safe_filename in existing_files:
# Only update if the event has been modified since the file was last updated
if "lastModifiedDateTime" in event:
last_modified = parser.isoparse(
event["lastModifiedDateTime"]
).timestamp()
file_mtime = existing_files[safe_filename]["mtime"]
if last_modified <= file_mtime:
should_update = False
progress.console.print(
f"Skipping unchanged event: {event['subject']}"
)
if should_update:
with open(event_path, "w") as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
write_event_to_ical(f, event, start, end)
f.write("END:VCALENDAR\n")
# Remove files for events that no longer exist in the calendar view
for file_name in existing_files:
if file_name not in processed_files:
progress.console.print(
f"Removing obsolete event file: {truncate_id(file_name)}"
)
os.remove(existing_files[file_name]["path"])
# Create sync timestamp to track when this download sync completed
timestamp_file = os.path.join(org_vdir_path, ".sync_timestamp")
try:
with open(timestamp_file, "w") as f:
f.write(str(datetime.now().timestamp()))
progress.console.print(f"Updated sync timestamp for calendar monitoring")
except IOError as e:
progress.console.print(f"Warning: Could not create sync timestamp: {e}")
progress.console.print(f"Saved {len(events)} events to {org_vdir_path}")
return len(events)
def save_events_to_file(events, output_file, progress, task_id, dry_run=False):
"""
Save all events to a single iCalendar file.
Args:
events: List of event dictionaries
output_file: Path to the output file
progress: Progress object for updating UI
task_id: Task ID for progress tracking
dry_run: If True, don't actually write the file
Returns:
Number of events processed
"""
if dry_run:
progress.console.print(f"[DRY-RUN] Would save events to {output_file}")
return len(events)
os.makedirs(os.path.dirname(output_file), exist_ok=True)
progress.console.print(f"Saving events to {output_file}...")
with open(output_file, "w") as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
for event in events:
progress.advance(task_id)
if "start" in event and "end" in event:
# Parse start and end times with timezone information
start = parser.isoparse(event["start"]["dateTime"])
end = parser.isoparse(event["end"]["dateTime"])
write_event_to_ical(f, event, start, end)
f.write("END:VCALENDAR\n")
progress.console.print(f"Saved events to {output_file}")
return len(events)