ruff formatted

This commit is contained in:
Tim Bendt
2025-05-08 12:09:43 -06:00
parent e0e7e6ac76
commit 125f500769
17 changed files with 485 additions and 286 deletions

View File

@@ -2,20 +2,22 @@ import edn_format
import os
import sys
def parse_edn_file(file_path):
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
data = edn_format.loads(file.read())
return data
def convert_to_markdown(data, output_dir):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for i, item in enumerate(data):
file_name = f"output_{i+1}.md"
file_name = f"output_{i + 1}.md"
file_path = os.path.join(output_dir, file_name)
with open(file_path, 'w') as file:
with open(file_path, "w") as file:
file.write("# Data Item\n\n")
if isinstance(item, dict):
for key, value in item.items():
@@ -24,6 +26,7 @@ def convert_to_markdown(data, output_dir):
else:
file.write(f"{item}\n\n")
def main():
if len(sys.argv) < 2:
print("Usage: python edn_to_markdown.py <input_edn_file>")
@@ -41,5 +44,6 @@ def main():
convert_to_markdown(data, output_dir)
print(f"Converted EDN data to Markdown files in '{output_dir}'")
if __name__ == "__main__":
main()

View File

@@ -23,27 +23,35 @@ import msal
import orjson
# Filepath for caching timestamp
cache_timestamp_file = 'cache_timestamp.json'
cache_timestamp_file = "cache_timestamp.json"
# Filepath for sync timestamp
sync_timestamp_file = 'sync_timestamp.json'
sync_timestamp_file = "sync_timestamp.json"
# Function to load the last sync timestamp
def load_last_sync_timestamp():
if os.path.exists(sync_timestamp_file):
with open(sync_timestamp_file, 'r') as f:
return json.load(f).get('last_sync', 0)
with open(sync_timestamp_file, "r") as f:
return json.load(f).get("last_sync", 0)
return 0
# Function to save the current sync timestamp
def save_sync_timestamp():
with open(sync_timestamp_file, 'w') as f:
json.dump({'last_sync': time.time()}, f)
with open(sync_timestamp_file, "w") as f:
json.dump({"last_sync": time.time()}, f)
# Add argument parsing for dry-run mode
arg_parser = argparse.ArgumentParser(description="Fetch and synchronize emails.")
arg_parser.add_argument("--dry-run", action="store_true", help="Run in dry-run mode without making changes.", default=False)
arg_parser.add_argument(
"--dry-run",
action="store_true",
help="Run in dry-run mode without making changes.",
default=False,
)
args = arg_parser.parse_args()
dry_run = args.dry_run
@@ -51,45 +59,52 @@ dry_run = args.dry_run
# Define a global semaphore for throttling
semaphore = asyncio.Semaphore(4)
async def fetch_with_aiohttp(url, headers):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to fetch {url}: {response.status} {await response.text()}")
raise Exception(
f"Failed to fetch {url}: {response.status} {await response.text()}"
)
raw_bytes = await response.read()
content_length = response.headers.get('Content-Length')
content_length = response.headers.get("Content-Length")
if content_length and len(raw_bytes) != int(content_length):
print("Warning: Incomplete response received!")
return None
return orjson.loads(raw_bytes)
async def post_with_aiohttp(url, headers, json_data):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=json_data) as response:
return response.status
async def patch_with_aiohttp(url, headers, json_data):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=json_data) as response:
return response.status
async def delete_with_aiohttp(url, headers):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
return response.status
async def synchronize_maildir_async(maildir_path, headers, progress, task_id):
last_sync = load_last_sync_timestamp()
# Find messages moved from "new" to "cur" and mark them as read
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml*')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*')))
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
new_files = set(glob.glob(os.path.join(new_dir, "*.eml*")))
cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*")))
moved_to_cur = [os.path.basename(f) for f in cur_files - new_files]
progress.update(task_id, total=len(moved_to_cur))
@@ -98,18 +113,22 @@ async def synchronize_maildir_async(maildir_path, headers, progress, task_id):
if os.path.getmtime(os.path.join(cur_dir, filename)) < last_sync:
progress.update(task_id, advance=1)
continue
message_id = re.sub(r"\:2.+", "", filename.split('.')[0]) # Extract the Message-ID from the filename
message_id = re.sub(
r"\:2.+", "", filename.split(".")[0]
) # Extract the Message-ID from the filename
if not dry_run:
status = await patch_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}",
headers,
{'isRead': True}
{"isRead": True},
)
if status == 404:
os.remove(os.path.join(cur_dir, filename))
else:
progress.console.print(f"[DRY-RUN] Would mark message as read: {message_id}")
progress.console.print(
f"[DRY-RUN] Would mark message as read: {message_id}"
)
progress.advance(task_id)
# Save the current sync timestamp
@@ -118,16 +137,17 @@ async def synchronize_maildir_async(maildir_path, headers, progress, task_id):
else:
progress.console.print("[DRY-RUN] Would save sync timestamp.")
async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_id):
mail_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead'
mail_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=100&$orderby=receivedDateTime asc&$select=id,subject,from,toRecipients,ccRecipients,receivedDateTime,isRead"
messages = []
# Fetch the total count of messages in the inbox
inbox_url = 'https://graph.microsoft.com/v1.0/me/mailFolders/inbox'
inbox_url = "https://graph.microsoft.com/v1.0/me/mailFolders/inbox"
response = await fetch_with_aiohttp(inbox_url, headers)
total_messages = response.get('totalItemCount', 0)
total_messages = response.get("totalItemCount", 0)
progress.update(task_id, total=total_messages)
while mail_url:
@@ -136,22 +156,24 @@ async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, tas
except Exception as e:
progress.console.print(f"Error fetching messages: {e}")
continue
messages.extend(response_data.get('value', []))
progress.advance(task_id, len(response_data.get('value', [])))
messages.extend(response_data.get("value", []))
progress.advance(task_id, len(response_data.get("value", [])))
# Get the next page URL from @odata.nextLink
mail_url = response_data.get('@odata.nextLink')
mail_url = response_data.get("@odata.nextLink")
inbox_msg_ids = set(message['id'] for message in messages)
inbox_msg_ids = set(message["id"] for message in messages)
progress.update(task_id, completed=(len(messages) / 2))
new_dir = os.path.join(maildir_path, 'new')
cur_dir = os.path.join(maildir_path, 'cur')
new_files = set(glob.glob(os.path.join(new_dir, '*.eml*')))
cur_files = set(glob.glob(os.path.join(cur_dir, '*.eml*')))
new_dir = os.path.join(maildir_path, "new")
cur_dir = os.path.join(maildir_path, "cur")
new_files = set(glob.glob(os.path.join(new_dir, "*.eml*")))
cur_files = set(glob.glob(os.path.join(cur_dir, "*.eml*")))
for filename in Set.union(cur_files, new_files):
message_id = filename.split('.')[0].split('/')[-1] # Extract the Message-ID from the filename
if (message_id not in inbox_msg_ids):
message_id = filename.split(".")[0].split("/")[
-1
] # Extract the Message-ID from the filename
if message_id not in inbox_msg_ids:
if not dry_run:
progress.console.print(f"Deleting {filename} from inbox")
os.remove(filename)
@@ -159,57 +181,81 @@ async def fetch_mail_async(maildir_path, attachments_dir, headers, progress, tas
progress.console.print(f"[DRY-RUN] Would delete {filename} from inbox")
for message in messages:
progress.console.print(f"Processing message: {message.get('subject', 'No Subject')}", end='\r')
await save_mime_to_maildir_async(maildir_path, message, attachments_dir, headers, progress)
progress.console.print(
f"Processing message: {message.get('subject', 'No Subject')}", end="\r"
)
await save_mime_to_maildir_async(
maildir_path, message, attachments_dir, headers, progress
)
progress.update(task_id, advance=0.5)
progress.update(task_id, completed=len(messages))
progress.console.print(f"\nFinished saving {len(messages)} messages.")
async def archive_mail_async(maildir_path, headers, progress, task_id):
archive_dir = os.path.join(maildir_path, '.Archives')
archive_files = glob.glob(os.path.join(archive_dir, '**', '*.eml*'), recursive=True)
archive_dir = os.path.join(maildir_path, ".Archives")
archive_files = glob.glob(os.path.join(archive_dir, "**", "*.eml*"), recursive=True)
progress.update(task_id, total=len(archive_files))
folder_response = await fetch_with_aiohttp('https://graph.microsoft.com/v1.0/me/mailFolders', headers)
folders = folder_response.get('value', [])
archive_folder_id = next((folder.get('id') for folder in folders if folder.get('displayName', '').lower() == 'archive'), None)
folder_response = await fetch_with_aiohttp(
"https://graph.microsoft.com/v1.0/me/mailFolders", headers
)
folders = folder_response.get("value", [])
archive_folder_id = next(
(
folder.get("id")
for folder in folders
if folder.get("displayName", "").lower() == "archive"
),
None,
)
if not archive_folder_id:
raise Exception("No folder named 'Archive' found on the server.")
for filepath in archive_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
message_id = os.path.basename(filepath).split(".")[
0
] # Extract the Message-ID from the filename
if not dry_run:
status = await post_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move',
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/microsoft.graph.move",
headers,
{'destinationId': archive_folder_id}
{"destinationId": archive_folder_id},
)
if status != 201: # 201 Created indicates success
progress.console.print(f"Failed to move message to 'Archive': {message_id}, {status}")
progress.console.print(
f"Failed to move message to 'Archive': {message_id}, {status}"
)
if status == 404:
os.remove(filepath) # Remove the file from local archive if not fo
progress.console.print(f"Message not found on server, removed local copy: {message_id}")
progress.console.print(
f"Message not found on server, removed local copy: {message_id}"
)
elif status == 204:
progress.console.print(f"Moved message to 'Archive': {message_id}")
else:
progress.console.print(f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}")
progress.console.print(
f"[DRY-RUN] Would move message to 'Archive' folder: {message_id}"
)
progress.advance(task_id)
return
async def delete_mail_async(maildir_path, headers, progress, task_id):
trash_dir = os.path.join(maildir_path, '.Trash', 'cur')
trash_files = set(glob.glob(os.path.join(trash_dir, '*.eml*')))
trash_dir = os.path.join(maildir_path, ".Trash", "cur")
trash_files = set(glob.glob(os.path.join(trash_dir, "*.eml*")))
progress.update(task_id, total=len(trash_files))
for filepath in trash_files:
message_id = os.path.basename(filepath).split('.')[0] # Extract the Message-ID from the filename
message_id = os.path.basename(filepath).split(".")[
0
] # Extract the Message-ID from the filename
if not dry_run:
progress.console.print(f"Moving message to trash: {message_id}")
status = await delete_with_aiohttp(
f'https://graph.microsoft.com/v1.0/me/messages/{message_id}',
headers
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers
)
if status == 204 or status == 404:
os.remove(filepath) # Remove the file from local trash
@@ -217,17 +263,18 @@ async def delete_mail_async(maildir_path, headers, progress, task_id):
progress.console.print(f"[DRY-RUN] Would delete message: {message_id}")
progress.advance(task_id)
async def fetch_calendar_async(headers, progress, task_id):
yesterday = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(days=1)
end_of_today = datetime.now().replace(hour=23, minute=59, second=59)
six_days_future = end_of_today + timedelta(days=6)
# example https://graph.microsoft.com/v1.0/me/calendarView?startDateTime=2025-05-06T00:00:00&endDateTime=2025-05-13T23:59:59.999999&$count=true&$select=id
event_base_url =f"https://graph.microsoft.com/v1.0/me/calendarView?startDateTime={yesterday.isoformat()}&endDateTime={six_days_future.isoformat()}"
event_base_url = f"https://graph.microsoft.com/v1.0/me/calendarView?startDateTime={yesterday.isoformat()}&endDateTime={six_days_future.isoformat()}"
total_event_url = f"{event_base_url}&$count=true&$select=id"
total = await fetch_with_aiohttp(total_event_url, headers)
total_events = total.get('@odata.count', 0) + 1
total_events = total.get("@odata.count", 0) + 1
progress.update(task_id, total=total_events)
calendar_url = f"{event_base_url}&$top=100&$select=start,end,iCalUid,subject,bodyPreview,webLink,location,recurrence,showAs,responseStatus,onlineMeeting"
events = []
@@ -235,47 +282,58 @@ async def fetch_calendar_async(headers, progress, task_id):
progress.update(task_id, total=total_events + total_events % 100)
while calendar_url:
response_data = await fetch_with_aiohttp(calendar_url, headers)
events.extend(response_data.get('value', []))
events.extend(response_data.get("value", []))
progress.advance(task_id, 1)
# Get the next page URL from @odata.nextLink
calendar_url = response_data.get('@odata.nextLink')
calendar_url = response_data.get("@odata.nextLink")
output_file = 'output_ics/outlook_events_latest.ics'
output_file = "output_ics/outlook_events_latest.ics"
if not dry_run:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
progress.console.print(f"Saving events to {output_file}...")
with open(output_file, 'w') as f:
with open(output_file, "w") as f:
f.write("BEGIN:VCALENDAR\nVERSION:2.0\n")
for event in events:
progress.advance(task_id)
if 'start' in event and 'end' in event:
start = parser.isoparse(event['start']['dateTime']).astimezone(UTC)
end = parser.isoparse(event['end']['dateTime']).astimezone(UTC)
f.write(f"BEGIN:VEVENT\nSUMMARY:{event['subject']}\nDESCRIPTION:{event.get('bodyPreview', '')}\n")
if "start" in event and "end" in event:
start = parser.isoparse(event["start"]["dateTime"]).astimezone(UTC)
end = parser.isoparse(event["end"]["dateTime"]).astimezone(UTC)
f.write(
f"BEGIN:VEVENT\nSUMMARY:{event['subject']}\nDESCRIPTION:{event.get('bodyPreview', '')}\n"
)
f.write(f"UID:{event.get('iCalUId', '')}\n")
f.write(f"LOCATION:{event.get('location', {})['displayName']}\n")
f.write(f"CLASS:{event.get('showAs', '')}\n")
f.write(f"STATUS:{event.get('responseStatus', {})['response']}\n")
if 'onlineMeeting' in event and event['onlineMeeting']:
f.write(f"URL:{event.get('onlineMeeting', {}).get('joinUrl', '')}\n")
if "onlineMeeting" in event and event["onlineMeeting"]:
f.write(
f"URL:{event.get('onlineMeeting', {}).get('joinUrl', '')}\n"
)
f.write(f"DTSTART:{start.strftime('%Y%m%dT%H%M%S')}\n")
f.write(f"DTEND:{end.strftime('%Y%m%dT%H%M%S')}\n")
if 'recurrence' in event and event['recurrence']: # Check if 'recurrence' exists and is not None
for rule in event['recurrence']:
if rule.startswith('RRULE'):
rule_parts = rule.split(';')
if (
"recurrence" in event and event["recurrence"]
): # Check if 'recurrence' exists and is not None
for rule in event["recurrence"]:
if rule.startswith("RRULE"):
rule_parts = rule.split(";")
new_rule_parts = []
for part in rule_parts:
if part.startswith('UNTIL='):
until_value = part.split('=')[1]
if part.startswith("UNTIL="):
until_value = part.split("=")[1]
until_date = parser.isoparse(until_value)
if start.tzinfo is not None and until_date.tzinfo is None:
if (
start.tzinfo is not None
and until_date.tzinfo is None
):
until_date = until_date.replace(tzinfo=UTC)
new_rule_parts.append(f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}")
new_rule_parts.append(
f"UNTIL={until_date.strftime('%Y%m%dT%H%M%SZ')}"
)
else:
new_rule_parts.append(part)
rule = ';'.join(new_rule_parts)
rule = ";".join(new_rule_parts)
f.write(f"{rule}\n")
f.write("END:VEVENT\n")
f.write("END:VCALENDAR\n")
@@ -285,18 +343,20 @@ async def fetch_calendar_async(headers, progress, task_id):
progress.console.print(f"[DRY-RUN] Would save events to {output_file}")
# Function to create Maildir structure
def create_maildir_structure(base_path):
os.makedirs(os.path.join(base_path, 'cur'), exist_ok=True)
os.makedirs(os.path.join(base_path, 'new'), exist_ok=True)
os.makedirs(os.path.join(base_path, 'tmp'), exist_ok=True)
os.makedirs(os.path.join(base_path, "cur"), exist_ok=True)
os.makedirs(os.path.join(base_path, "new"), exist_ok=True)
os.makedirs(os.path.join(base_path, "tmp"), exist_ok=True)
async def save_mime_to_maildir_async(maildir_path, email_data, attachments_dir, headers, progress):
async def save_mime_to_maildir_async(
maildir_path, email_data, attachments_dir, headers, progress
):
# Create a new EmailMessage object
# Determine the directory based on isRead
target_dir = 'cur' if email_data.get('isRead', False) else 'new'
id = email_data.get('id', '')
target_dir = "cur" if email_data.get("isRead", False) else "new"
id = email_data.get("id", "")
if not id:
progress.console.print("Message ID not found. Skipping save.")
return
@@ -305,48 +365,67 @@ async def save_mime_to_maildir_async(maildir_path, email_data, attachments_dir,
# Check if the file already exists
if os.path.exists(email_filepath):
progress.console.print(f"Message {id} already exists in {target_dir}. Skipping save.")
progress.console.print(
f"Message {id} already exists in {target_dir}. Skipping save."
)
return
# Fetch the full MIME payload from the API
mime_url = f'https://graph.microsoft.com/v1.0/me/messages/{id}/$value'
mime_url = f"https://graph.microsoft.com/v1.0/me/messages/{id}/$value"
try:
async with aiohttp.ClientSession() as session:
async with session.get(mime_url, headers=headers) as response:
if response.status != 200:
raise Exception(f"Failed to fetch MIME payload for {id}: {response.status} {await response.text()}")
raise Exception(
f"Failed to fetch MIME payload for {id}: {response.status} {await response.text()}"
)
mime_payload = await response.text()
# Save the MIME payload to the Maildir
os.makedirs(os.path.dirname(email_filepath), exist_ok=True)
with open(email_filepath, 'w') as f:
with open(email_filepath, "w") as f:
f.write(mime_payload)
progress.console.print(f"Saved message {id} to {target_dir}.")
except Exception as e:
progress.console.print(f"Failed to save message {id}: {e}")
def save_email_to_maildir(maildir_path, email_data, attachments_dir, progress):
# Create a new EmailMessage object
msg = EmailMessage()
received_datetime = email_data.get('receivedDateTime', '')
received_datetime = email_data.get("receivedDateTime", "")
if received_datetime:
parsed_datetime = parser.isoparse(received_datetime)
msg['Date'] = format_datetime(parsed_datetime)
msg["Date"] = format_datetime(parsed_datetime)
else:
msg['Date'] = ''
msg["Date"] = ""
msg['Message-ID'] = email_data.get('id', '')
msg['Subject'] = email_data.get('subject', 'No Subject')
msg['From'] = email_data.get('from', {}).get('emailAddress', {}).get('address', 'unknown@unknown.com')
msg['To'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('toRecipients', [])])
msg['Cc'] = ', '.join([recipient['emailAddress']['address'] for recipient in email_data.get('ccRecipients', [])])
msg["Message-ID"] = email_data.get("id", "")
msg["Subject"] = email_data.get("subject", "No Subject")
msg["From"] = (
email_data.get("from", {})
.get("emailAddress", {})
.get("address", "unknown@unknown.com")
)
msg["To"] = ", ".join(
[
recipient["emailAddress"]["address"]
for recipient in email_data.get("toRecipients", [])
]
)
msg["Cc"] = ", ".join(
[
recipient["emailAddress"]["address"]
for recipient in email_data.get("ccRecipients", [])
]
)
# Convert the email body from HTML to Markdown
body_html = email_data.get('body', {}).get('content', '')
if email_data.get('body', {}).get('contentType', '').lower() == 'html':
body_html = email_data.get("body", {}).get("content", "")
if email_data.get("body", {}).get("contentType", "").lower() == "html":
markdown_converter = html2text.HTML2Text()
markdown_converter.ignore_images = True
markdown_converter.ignore_links = True
@@ -355,38 +434,45 @@ def save_email_to_maildir(maildir_path, email_data, attachments_dir, progress):
body_markdown = body_html
# Remove lines between any alphanumeric BannerStart and BannerEnd
body_markdown = re.sub(r'\w+BannerStart.*?\w+BannerEnd', '', body_markdown, flags=re.DOTALL)
body_markdown = re.sub(
r"\w+BannerStart.*?\w+BannerEnd", "", body_markdown, flags=re.DOTALL
)
msg.set_content(body_markdown)
# Download attachments
progress.console.print(f"Downloading attachments for message: {msg['Message-ID']}")
for attachment in email_data.get('attachments', []):
attachment_name = attachment.get('name', 'unknown')
attachment_content = attachment.get('contentBytes')
for attachment in email_data.get("attachments", []):
attachment_name = attachment.get("name", "unknown")
attachment_content = attachment.get("contentBytes")
if attachment_content:
attachment_path = os.path.join(attachments_dir, attachment_name)
if not dry_run:
with open(attachment_path, 'wb') as f:
f.write(attachment_content.encode('utf-8'))
msg.add_attachment(attachment_content.encode('utf-8'), filename=attachment_name)
with open(attachment_path, "wb") as f:
f.write(attachment_content.encode("utf-8"))
msg.add_attachment(
attachment_content.encode("utf-8"), filename=attachment_name
)
else:
progress.console.print(f"[DRY-RUN] Would save attachment to {attachment_path}")
progress.console.print(
f"[DRY-RUN] Would save attachment to {attachment_path}"
)
# Determine the directory based on isRead
target_dir = 'cur' if email_data.get('isRead', False) else 'new'
target_dir = "cur" if email_data.get("isRead", False) else "new"
email_filename = f"{msg['Message-ID']}.eml"
email_filepath = os.path.join(maildir_path, target_dir, email_filename)
# Check if the file already exists in any subfolder
for root, _, files in os.walk(maildir_path):
if email_filename in files:
progress.console.print(f"Message {msg['Message-ID']} already exists in {root}. Skipping save.")
progress.console.print(
f"Message {msg['Message-ID']} already exists in {root}. Skipping save."
)
return
# Save the email to the Maildir
if not dry_run:
with open(email_filepath, 'w') as f:
with open(email_filepath, "w") as f:
f.write(msg.as_string())
progress.console.print(f"Saved message {msg['Message-ID']}")
else:
@@ -394,66 +480,77 @@ def save_email_to_maildir(maildir_path, email_data, attachments_dir, progress):
async def main():
# Save emails to Maildir
maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva"
attachments_dir = os.path.join(maildir_path, 'attachments')
maildir_path = os.getenv("MAILDIR_PATH", os.path.expanduser("~/Mail")) + "/corteva"
attachments_dir = os.path.join(maildir_path, "attachments")
os.makedirs(attachments_dir, exist_ok=True)
create_maildir_structure(maildir_path)
# Read Azure app credentials from environment variables
client_id = os.getenv('AZURE_CLIENT_ID')
tenant_id = os.getenv('AZURE_TENANT_ID')
client_id = os.getenv("AZURE_CLIENT_ID")
tenant_id = os.getenv("AZURE_TENANT_ID")
if not client_id or not tenant_id:
raise ValueError("Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables.")
raise ValueError(
"Please set the AZURE_CLIENT_ID and AZURE_TENANT_ID environment variables."
)
# Token cache
cache = msal.SerializableTokenCache()
cache_file = 'token_cache.bin'
cache_file = "token_cache.bin"
if os.path.exists(cache_file):
cache.deserialize(open(cache_file, 'r').read())
cache.deserialize(open(cache_file, "r").read())
# Authentication
authority = f'https://login.microsoftonline.com/{tenant_id}'
scopes = ['https://graph.microsoft.com/Calendars.Read', 'https://graph.microsoft.com/Mail.ReadWrite']
authority = f"https://login.microsoftonline.com/{tenant_id}"
scopes = [
"https://graph.microsoft.com/Calendars.Read",
"https://graph.microsoft.com/Mail.ReadWrite",
]
app = msal.PublicClientApplication(client_id, authority=authority, token_cache=cache)
app = msal.PublicClientApplication(
client_id, authority=authority, token_cache=cache
)
accounts = app.get_accounts()
if accounts:
token_response = app.acquire_token_silent(scopes, account=accounts[0])
else:
flow = app.initiate_device_flow(scopes=scopes)
if 'user_code' not in flow:
if "user_code" not in flow:
raise Exception("Failed to create device flow")
print(Panel(flow['message'], border_style="magenta", padding=2, title="MSAL Login Flow Link"))
print(
Panel(
flow["message"],
border_style="magenta",
padding=2,
title="MSAL Login Flow Link",
)
)
token_response = app.acquire_token_by_device_flow(flow)
if 'access_token' not in token_response:
if "access_token" not in token_response:
raise Exception("Failed to acquire token")
# Save token cache
with open(cache_file, 'w') as f:
with open(cache_file, "w") as f:
f.write(cache.serialize())
access_token = token_response['access_token']
headers = {'Authorization': f'Bearer {access_token}', 'Prefer': 'outlook.body-content-type="text"'}
access_token = token_response["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Prefer": 'outlook.body-content-type="text"',
}
accounts = app.get_accounts()
if not accounts:
raise Exception("No accounts found")
maildir_path = os.getenv('MAILDIR_PATH', os.path.expanduser('~/Mail')) + "/corteva"
maildir_path = os.getenv("MAILDIR_PATH", os.path.expanduser("~/Mail")) + "/corteva"
progress = Progress(
SpinnerColumn(),
MofNCompleteColumn(),
*Progress.get_default_columns()
SpinnerColumn(), MofNCompleteColumn(), *Progress.get_default_columns()
)
with progress:
task_fetch = progress.add_task("[green]Syncing Inbox...", total=0)
@@ -466,9 +563,12 @@ async def main():
synchronize_maildir_async(maildir_path, headers, progress, task_read),
archive_mail_async(maildir_path, headers, progress, task_archive),
delete_mail_async(maildir_path, headers, progress, task_delete),
fetch_mail_async(maildir_path, attachments_dir, headers, progress, task_fetch),
fetch_calendar_async(headers, progress, task_calendar)
fetch_mail_async(
maildir_path, attachments_dir, headers, progress, task_fetch
),
fetch_calendar_async(headers, progress, task_calendar),
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -20,7 +20,7 @@ async def archive_current(app) -> None:
process = await asyncio.create_subprocess_shell(
f"himalaya message move Archives {app.current_message_id}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
# app.reload_needed = True

View File

@@ -2,6 +2,7 @@ import asyncio
from textual import work
from textual.widgets import ListView
@work(exclusive=False)
async def delete_current(app) -> None:
app.show_status(f"Deleting message {app.current_message_id}...")
@@ -10,7 +11,7 @@ async def delete_current(app) -> None:
process = await asyncio.create_subprocess_shell(
f"himalaya message delete {app.current_message_id}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
# app.reload_needed = True
@@ -20,6 +21,9 @@ async def delete_current(app) -> None:
app.query_one(ListView).index = index
# app.action_next() # Automatically show the next message
else:
app.show_status(f"Failed to delete message {app.current_message_id}. {stderr.decode()}", "error")
app.show_status(
f"Failed to delete message {app.current_message_id}. {stderr.decode()}",
"error",
)
except Exception as e:
app.show_status(f"Error: {e}", "error")

View File

@@ -1,12 +1,12 @@
async def action_newest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if (app.reload_needed):
if app.reload_needed:
await app.action_fetch_list()
ids = sorted((int(envelope['id']) for envelope in app.all_envelopes), reverse=True)
ids = sorted(
(int(envelope["id"]) for envelope in app.all_envelopes), reverse=True
)
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return

View File

@@ -1,10 +1,9 @@
async def action_next(app) -> None:
"""Show the next email message by finding the next higher ID from the list of envelope IDs."""
try:
if (app.reload_needed):
if app.reload_needed:
app.action_fetch_list()
ids = sorted(int(envelope['id']) for envelope in app.all_envelopes)
ids = sorted(int(envelope["id"]) for envelope in app.all_envelopes)
for envelope_id in ids:
if envelope_id > int(app.current_message_id):
app.show_message(envelope_id)

View File

@@ -1,12 +1,10 @@
def action_oldest(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if (app.reload_needed):
if app.reload_needed:
app.action_fetch_list()
ids = sorted((int(envelope['id']) for envelope in app.all_envelopes))
ids = sorted((int(envelope["id"]) for envelope in app.all_envelopes))
app.current_message_id = ids[0]
app.show_message(app.current_message_id)
return

View File

@@ -3,15 +3,19 @@ from maildir_gtd.screens.OpenMessage import OpenMessageScreen
def action_open(app) -> None:
"""Show the input modal for opening a specific message by ID."""
def check_id(message_id: str | None) -> bool:
try:
int(message_id)
app.show_message(message_id)
if (message_id is not None and message_id > 0):
if message_id is not None and message_id > 0:
app.show_message(message_id)
except ValueError:
app.bell()
app.show_status("Invalid message ID. Please enter an integer.", severity="error")
app.show_status(
"Invalid message ID. Please enter an integer.", severity="error"
)
return True
return False
app.push_screen(OpenMessageScreen(), check_id)

View File

@@ -1,12 +1,12 @@
def action_previous(app) -> None:
"""Show the previous email message by finding the next lower ID from the list of envelope IDs."""
try:
if (app.reload_needed):
if app.reload_needed:
app.action_fetch_list()
ids = sorted((int(envelope['id']) for envelope in app.all_envelopes), reverse=True)
ids = sorted(
(int(envelope["id"]) for envelope in app.all_envelopes), reverse=True
)
for envelope_id in ids:
if envelope_id < int(app.current_message_id):
app.current_message_id = envelope_id

View File

@@ -7,6 +7,7 @@ logging.basicConfig(
handlers=[TextualHandler()],
)
def show_message(app, message_id: int) -> None:
"""Fetch and display the email message by ID."""
logging.info("Showing message ID: " + str(message_id))

View File

@@ -11,15 +11,18 @@ def action_create_task(app) -> None:
result = await asyncio.create_subprocess_shell(
f"task add {task_args}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await result.communicate()
if result.returncode == 0:
app.show_status(f"Task created: {stdout.decode()}")
else:
app.show_status(f"Failed to create task: {stderr.decode()}", severity="error")
app.show_status(
f"Failed to create task: {stderr.decode()}", severity="error"
)
except Exception as e:
app.show_status(f"Error: {e}", severity="error")
return True
return False
app.push_screen(CreateTaskScreen(), check_task)

View File

@@ -17,7 +17,7 @@ from textual.worker import Worker
from textual.app import App, ComposeResult, SystemCommand, RenderResult
from textual.logging import TextualHandler
from textual.screen import Screen
from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem
from textual.widgets import Footer, Static, Label, Markdown, ListView, ListItem
from textual.reactive import reactive, Reactive
from textual.binding import Binding
from textual.timer import Timer
@@ -36,7 +36,6 @@ logging.basicConfig(
)
class StatusTitle(Static):
total_messages: Reactive[int] = reactive(0)
current_message_index: Reactive[int] = reactive(0)
@@ -49,6 +48,7 @@ class StatusTitle(Static):
class EmailViewerApp(App):
"""A simple email viewer app using the Himalaya CLI."""
CSS_PATH = "email_viewer.tcss"
title = "Maildir GTD Reader"
current_message_id: Reactive[int] = reactive(0)
@@ -70,13 +70,27 @@ class EmailViewerApp(App):
def get_system_commands(self, screen: Screen) -> Iterable[SystemCommand]:
yield from super().get_system_commands(screen)
yield SystemCommand("Next Message", "Navigate to Next ID", self.action_next)
yield SystemCommand("Previous Message", "Navigate to Previous ID", self.action_previous)
yield SystemCommand("Delete Message", "Delete the current message", self.action_delete)
yield SystemCommand("Archive Message", "Archive the current message", self.action_archive)
yield SystemCommand("Open Message", "Open a specific message by ID", self.action_open)
yield SystemCommand("Create Task", "Create a task using the task CLI", self.action_create_task)
yield SystemCommand("Oldest Message", "Show the oldest message", self.action_oldest)
yield SystemCommand("Newest Message", "Show the newest message", self.action_newest)
yield SystemCommand(
"Previous Message", "Navigate to Previous ID", self.action_previous
)
yield SystemCommand(
"Delete Message", "Delete the current message", self.action_delete
)
yield SystemCommand(
"Archive Message", "Archive the current message", self.action_archive
)
yield SystemCommand(
"Open Message", "Open a specific message by ID", self.action_open
)
yield SystemCommand(
"Create Task", "Create a task using the task CLI", self.action_create_task
)
yield SystemCommand(
"Oldest Message", "Show the oldest message", self.action_oldest
)
yield SystemCommand(
"Newest Message", "Show the newest message", self.action_newest
)
yield SystemCommand("Reload", "Reload the message list", self.fetch_envelopes)
BINDINGS = [
@@ -91,29 +105,32 @@ class EmailViewerApp(App):
Binding("%", "reload", "Reload message list"),
Binding("1", "focus_1", "Focus Accounts Panel"),
Binding("2", "focus_2", "Focus Folders Panel"),
Binding("3", "focus_3", "Focus Envelopes Panel")
Binding("3", "focus_3", "Focus Envelopes Panel"),
]
BINDINGS.extend([
Binding("space", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up"),
Binding("s", "toggle_sort_order", "Toggle Sort Order")
])
BINDINGS.extend(
[
Binding("space", "scroll_page_down", "Scroll page down"),
Binding("b", "scroll_page_up", "Scroll page up"),
Binding("s", "toggle_sort_order", "Toggle Sort Order"),
]
)
def compose(self) -> ComposeResult:
yield Horizontal(
Vertical(
ListView(ListItem(Label("All emails...")), id="envelopes_list", classes="list_view", initial_index=0),
ListView(id="accounts_list", classes="list_view"),
ListView(id="folders_list", classes="list_view"),
id="sidebar"
ListView(
ListItem(Label("All emails...")),
id="envelopes_list",
classes="list_view",
initial_index=0,
),
ListView(id="accounts_list", classes="list_view"),
ListView(id="folders_list", classes="list_view"),
id="sidebar",
),
ScrollableContainer(
EnvelopeHeader(),
Markdown(),
id="main_content"
),
id="outer-wrapper"
ScrollableContainer(EnvelopeHeader(), Markdown(), id="main_content"),
id="outer-wrapper",
)
yield Footer()
@@ -122,7 +139,7 @@ class EmailViewerApp(App):
self.theme = "monokai"
self.title = "MaildirGTD"
self.query_one("#main_content").border_title = self.status_title
sort_indicator = '\u2191' if self.sort_order_ascending else '\u2193'
sort_indicator = "\u2191" if self.sort_order_ascending else "\u2193"
self.query_one("#envelopes_list").border_title = f"\[1] Emails {sort_indicator}"
self.query_one("#accounts_list").border_title = "\[2] Accounts"
@@ -142,7 +159,7 @@ class EmailViewerApp(App):
return f"✉️ Message ID: {self.current_message_id} "
def compute_valid_envelopes(self) -> None:
return (envelope for envelope in self.all_envelopes if envelope.get('id'))
return (envelope for envelope in self.all_envelopes if envelope.get("id"))
def watch_status_title(self, old_status_title: str, new_status_title: str) -> None:
self.query_one("#main_content").border_title = new_status_title
@@ -153,39 +170,52 @@ class EmailViewerApp(App):
self.query_one("#envelopes_list").border_title = f"\[1] Emails {sort_indicator}"
def watch_current_message_index(self, old_index: int, new_index: int) -> None:
if (new_index < 0):
if new_index < 0:
new_index = 0
self.current_message_index = new_index
if (new_index > self.total_messages):
if new_index > self.total_messages:
new_index = self.total_messages
self.current_message_index = new_index
self.query_one("#envelopes_list").border_subtitle = f"[b]{new_index}[/b]/{self.total_messages}"
self.query_one(
"#envelopes_list"
).border_subtitle = f"[b]{new_index}[/b]/{self.total_messages}"
self.query_one("#envelopes_list").index = new_index
def compute_newest_id(self) -> None:
if not self.all_envelopes:
return 0
items = sorted(self.valid_envelopes, key=lambda x: x['date'], reverse=not self.sort_order_ascending)
return items[-1]['id'] if items else 0
items = sorted(
self.valid_envelopes,
key=lambda x: x["date"],
reverse=not self.sort_order_ascending,
)
return items[-1]["id"] if items else 0
def compute_oldest_id(self) -> None:
if not self.valid_envelopes:
return 0
items = sorted(self.valid_envelopes, key=lambda x: x['date'], reverse=not self.sort_order_ascending)
return items[0]['id'] if items else 0
items = sorted(
self.valid_envelopes,
key=lambda x: x["date"],
reverse=not self.sort_order_ascending,
)
return items[0]["id"] if items else 0
def watch_reload_needed(self, old_reload_needed: bool, new_reload_needed: bool) -> None:
def watch_reload_needed(
self, old_reload_needed: bool, new_reload_needed: bool
) -> None:
logging.info(f"Reload needed: {new_reload_needed}")
if (not old_reload_needed and new_reload_needed):
if not old_reload_needed and new_reload_needed:
self.fetch_envelopes()
def watch_current_message_id(self, old_message_id: int, new_message_id: int) -> None:
def watch_current_message_id(
self, old_message_id: int, new_message_id: int
) -> None:
"""Called when the current message ID changes."""
logging.info(f"Current message ID changed from {old_message_id} to {new_message_id}")
if (new_message_id == old_message_id):
logging.info(
f"Current message ID changed from {old_message_id} to {new_message_id}"
)
if new_message_id == old_message_id:
return
self.msg_worker.cancel() if self.msg_worker else None
headers = self.query_one(EnvelopeHeader)
@@ -193,19 +223,21 @@ class EmailViewerApp(App):
logging.info(f"message_metadata keys: {list(self.message_metadata.keys())}")
if new_message_id in self.message_metadata:
metadata = self.message_metadata[new_message_id]
self.current_message_index = metadata['index']
headers.subject = metadata['subject'].strip()
headers.from_ = metadata['from'].get('addr', '')
headers.to = metadata['to'].get('addr', '')
msgdate = re.sub(r"[\+\-]\d\d:\d\d", "", metadata['date'])
msgdate = datetime.strptime(msgdate, "%Y-%m-%d %H:%M").strftime("%a %b %d %H:%M")
headers.date = msgdate
headers.cc = metadata['cc'].get('addr', '') if 'cc' in metadata else ""
self.query_one(ListView).index = metadata['index']
self.current_message_index = metadata["index"]
headers.subject = metadata["subject"].strip()
headers.from_ = metadata["from"].get("addr", "")
headers.to = metadata["to"].get("addr", "")
message_date = re.sub(r"[\+\-]\d\d:\d\d", "", metadata["date"])
message_date = datetime.strptime(message_date, "%Y-%m-%d %H:%M").strftime(
"%a %b %d %H:%M"
)
headers.date = message_date
headers.cc = metadata["cc"].get("addr", "") if "cc" in metadata else ""
self.query_one(ListView).index = metadata["index"]
else:
logging.warning(f"Message ID {new_message_id} not found in metadata.")
if (self.message_body_cache.get(new_message_id)):
if self.message_body_cache.get(new_message_id):
# If the message body is already cached, use it
msg = self.query_one(Markdown)
msg.update(self.message_body_cache[new_message_id])
@@ -217,21 +249,23 @@ class EmailViewerApp(App):
def on_list_view_selected(self, event: ListView.Selected) -> None:
"""Called when an item in the list view is selected."""
# logging.info(f"Selected item: {self.all_envelopes[event.list_view.index]}")
if self.all_envelopes[event.list_view.index] is None or self.all_envelopes[event.list_view.index].get("type") == "header":
if (
self.all_envelopes[event.list_view.index] is None
or self.all_envelopes[event.list_view.index].get("type") == "header"
):
# If the selected item is a header, do not change the current message ID
return
self.current_message_id = int(self.all_envelopes[event.list_view.index]['id'])
self.current_message_id = int(self.all_envelopes[event.list_view.index]["id"])
@work(exclusive=False)
async def fetch_one_message(self, new_message_id:int) -> None:
async def fetch_one_message(self, new_message_id: int) -> None:
msg = self.query_one(Markdown)
try:
process = await asyncio.create_subprocess_shell(
f"himalaya message read {str(new_message_id)}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}...")
@@ -257,38 +291,59 @@ class EmailViewerApp(App):
process = await asyncio.create_subprocess_shell(
"himalaya envelope list -o json -s 9999",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}")
if process.returncode == 0:
import json
envelopes = json.loads(stdout.decode())
if envelopes:
self.reload_needed = False
self.total_messages = len(envelopes)
msglist.clear()
envelopes = sorted(envelopes, key=lambda x: x['date'], reverse=not self.sort_order_ascending)
envelopes = sorted(
envelopes,
key=lambda x: x["date"],
reverse=not self.sort_order_ascending,
)
grouped_envelopes = group_envelopes_by_date(envelopes)
self.all_envelopes = grouped_envelopes
self.message_metadata = {
int(envelope['id']): {
'subject': envelope.get('subject', ''),
'from': envelope.get('from', {}),
'to': envelope.get('to', {}),
'date': envelope.get('date', ''),
'cc': envelope.get('cc', {}),
'index': index # Store the position index
int(envelope["id"]): {
"subject": envelope.get("subject", ""),
"from": envelope.get("from", {}),
"to": envelope.get("to", {}),
"date": envelope.get("date", ""),
"cc": envelope.get("cc", {}),
"index": index, # Store the position index
}
for index, envelope in enumerate(self.all_envelopes)
if 'id' in envelope
if "id" in envelope
}
for item in grouped_envelopes:
if item.get("type") == "header":
msglist.append(ListItem(Label(item["label"], classes="group_header", markup=False)))
msglist.append(
ListItem(
Label(
item["label"],
classes="group_header",
markup=False,
)
)
)
else:
msglist.append(ListItem(Label(str(item['subject']).strip(), classes="email_subject", markup=False)))
msglist.append(
ListItem(
Label(
str(item["subject"]).strip(),
classes="email_subject",
markup=False,
)
)
)
msglist.index = self.current_message_index
else:
self.show_status("Failed to fetch any envelopes.", "error")
@@ -305,16 +360,23 @@ class EmailViewerApp(App):
process = await asyncio.create_subprocess_shell(
"himalaya account list -o json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}")
if process.returncode == 0:
import json
accounts = json.loads(stdout.decode())
if accounts:
for account in accounts:
item = ListItem(Label(str(account['name']).strip(), classes="account_name", markup=False))
item = ListItem(
Label(
str(account["name"]).strip(),
classes="account_name",
markup=False,
)
)
accounts_list.append(item)
except Exception as e:
self.show_status(f"Error fetching account list: {e}", "error")
@@ -325,22 +387,31 @@ class EmailViewerApp(App):
async def fetch_folders(self) -> None:
folders_list = self.query_one("#folders_list")
folders_list.clear()
folders_list.append(ListItem(Label("INBOX", classes="folder_name", markup=False)))
folders_list.append(
ListItem(Label("INBOX", classes="folder_name", markup=False))
)
try:
folders_list.loading = True
process = await asyncio.create_subprocess_shell(
"himalaya folder list -o json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
logging.info(f"stdout: {stdout.decode()[0:50]}")
if process.returncode == 0:
import json
folders = json.loads(stdout.decode())
if folders:
for folder in folders:
item = ListItem(Label(str(folder['name']).strip(), classes="folder_name", markup=False))
item = ListItem(
Label(
str(folder["name"]).strip(),
classes="folder_name",
markup=False,
)
)
folders_list.append(item)
except Exception as e:
self.show_status(f"Error fetching folder list: {e}", "error")
@@ -354,7 +425,9 @@ class EmailViewerApp(App):
def show_status(self, message: str, severity: str = "information") -> None:
"""Display a status message using the built-in notify function."""
self.notify(message, title="Status", severity=severity, timeout=2.6, markup=True)
self.notify(
message, title="Status", severity=severity, timeout=2.6, markup=True
)
def action_toggle_header(self) -> None:
"""Toggle the visibility of the EnvelopeHeader panel."""
@@ -380,7 +453,10 @@ class EmailViewerApp(App):
modifier = 1
idx = self.current_message_index
try:
if self.all_envelopes[idx + modifier] is None or self.all_envelopes[idx + modifier].get("type") == "header":
if (
self.all_envelopes[idx + modifier] is None
or self.all_envelopes[idx + modifier].get("type") == "header"
):
idx = idx + modifier
except IndexError:
# If we reach the end of the list, wrap around to the beginning
@@ -394,7 +470,10 @@ class EmailViewerApp(App):
modifier = -1
idx = self.current_message_index
try:
if self.all_envelopes[idx + modifier] is None or self.all_envelopes[idx + modifier].get("type") == "header":
if (
self.all_envelopes[idx + modifier] is None
or self.all_envelopes[idx + modifier].get("type") == "header"
):
idx = idx + modifier
except IndexError:
# If we reach the beginning of the list, wrap around to the end
@@ -404,30 +483,40 @@ class EmailViewerApp(App):
async def action_delete(self) -> None:
self.query_one("#envelopes_list").pop(self.current_message_index)
self.all_envelopes = list(filter(lambda x: int(x.get("id", "0")) != self.current_message_id, self.all_envelopes))
self.all_envelopes = list(
filter(
lambda x: int(x.get("id", "0")) != self.current_message_id,
self.all_envelopes,
)
)
self.message_metadata.pop(self.current_message_id, None)
self.message_body_cache.pop(self.current_message_id, None)
self.total_messages = len(self.message_metadata)
delete_current(self)
newmsg = self.all_envelopes[self.current_message_index]
if newmsg.get('type') == "header":
if newmsg.get("type") == "header":
newmsg = self.all_envelopes[self.current_message_index + 1]
return
self.show_message(newmsg['id'])
self.show_message(newmsg["id"])
async def action_archive(self) -> None:
self.query_one("#envelopes_list").pop(self.current_message_index)
self.all_envelopes = list(filter(lambda x: int(x.get("id", "0")) != self.current_message_id, self.all_envelopes))
self.all_envelopes = list(
filter(
lambda x: int(x.get("id", "0")) != self.current_message_id,
self.all_envelopes,
)
)
self.message_metadata.pop(self.current_message_id, None)
self.message_body_cache.pop(self.current_message_id, None)
self.total_messages = len(self.message_metadata)
worker = archive_current(self)
await worker.wait()
newmsg = self.all_envelopes[self.current_message_index]
if newmsg.get('type') == "header":
if newmsg.get("type") == "header":
newmsg = self.all_envelopes[self.current_message_index + 1]
return
self.show_message(newmsg['id'])
self.show_message(newmsg["id"])
def action_open(self) -> None:
action_open(self)
@@ -435,7 +524,6 @@ class EmailViewerApp(App):
def action_create_task(self) -> None:
action_create_task(self)
def action_scroll_down(self) -> None:
"""Scroll the main content down."""
self.query_one("#main_content").scroll_down()
@@ -473,6 +561,7 @@ class EmailViewerApp(App):
def action_focus_3(self) -> None:
self.query_one("#folders_list").focus()
if __name__ == "__main__":
app = EmailViewerApp()
app.run()

View File

@@ -18,22 +18,21 @@ class CreateTaskScreen(ModalScreen[str]):
Button("Submit", id="submit", variant="primary"),
),
id="create_task_container",
classes="modal_screen"
classes="modal_screen",
)
@on(Input.Submitted)
def handle_task_args(self) -> None:
input_widget = self.query_one("#task_input", Input)
self.visible = False
self.disabled = True
self.loading = True
task_args = input_widget.value
self.dismiss(task_args)
input_widget = self.query_one("#task_input", Input)
self.visible = False
self.disabled = True
self.loading = True
task_args = input_widget.value
self.dismiss(task_args)
def on_key(self, event) -> None:
if (event.key == "escape" or event.key == "ctrl+c"):
self.dismiss()
if event.key == "escape" or event.key == "ctrl+c":
self.dismiss()
def button_on_click(self, event):
if event.button.id == "cancel":

View File

@@ -4,24 +4,27 @@ from textual.screen import ModalScreen
from textual.widgets import Input, Label, Button
from textual.containers import Horizontal
class OpenMessageScreen(ModalScreen[int | None]):
class OpenMessageScreen(ModalScreen[int | None]):
def compose(self) -> ComposeResult:
yield Horizontal(
Label("📨 ID", id="message_label"),
Input(placeholder="Enter message ID (integer only)", type="integer", id="open_message_input"),
Input(
placeholder="Enter message ID (integer only)",
type="integer",
id="open_message_input",
),
Button("Cancel", id="cancel"),
Button("Open", variant="primary", id="submit"),
id="open_message_container",
classes="modal_screen"
classes="modal_screen",
)
@on(Input.Submitted)
def handle_message_id(self, event) -> None:
input_widget = self.query_one("#open_message_input", Input)
message_id = int(input_widget.value if input_widget.value else 0)
self.dismiss(message_id)
input_widget = self.query_one("#open_message_input", Input)
message_id = int(input_widget.value if input_widget.value else 0)
self.dismiss(message_id)
def button_on_click(self, event) -> None:
if event.button.id == "cancel":
@@ -30,4 +33,3 @@ class OpenMessageScreen(ModalScreen[int | None]):
input_widget = self.query_one("#open_message_input", Input)
message_id = int(input_widget.value if input_widget.value else 0)
self.dismiss(message_id)

View File

@@ -2,6 +2,7 @@ from datetime import datetime, timedelta
import re
from typing import List, Dict
def group_envelopes_by_date(envelopes: List[Dict]) -> List[Dict]:
"""Group envelopes by date and add headers for each group."""
grouped_envelopes = []
@@ -30,7 +31,7 @@ def group_envelopes_by_date(envelopes: List[Dict]) -> List[Dict]:
current_group = None
for envelope in envelopes:
envelope_date = re.sub(r"[\+\-]\d\d:\d\d", "", envelope['date'])
envelope_date = re.sub(r"[\+\-]\d\d:\d\d", "", envelope["date"])
envelope_date = datetime.strptime(envelope_date, "%Y-%m-%d %H:%M")
group_label = get_group_label(envelope_date)
if group_label != current_group:

View File

@@ -3,8 +3,8 @@ from textual.app import ComposeResult
from textual.widgets import Label
from textual.containers import Horizontal, ScrollableContainer
class EnvelopeHeader(ScrollableContainer):
class EnvelopeHeader(ScrollableContainer):
subject = Reactive("")
from_ = Reactive("")
to = Reactive("")
@@ -13,35 +13,34 @@ class EnvelopeHeader(ScrollableContainer):
bcc = Reactive("")
"""Header for the email viewer."""
def on_mount(self) -> None:
"""Mount the header."""
def compose(self) -> ComposeResult:
yield Horizontal(
Label("Subject:", classes="header_key"),
Label(self.subject, classes="header_value", markup=False, id="subject")
)
yield Horizontal(
Label("Date:", classes="header_key"),
Label(self.date, classes="header_value", markup=False, id="date"),
)
# yield Horizontal(
# Label("From:", classes="header_key"),
# Label(self.from_, classes="header_value", markup=False, id="from"),
# )
# yield Horizontal(
# Label("To:", classes="header_key"),
# Label(self.to, classes="header_value", markup=False, id="to"),
# )
# yield Horizontal(
# )
# yield Horizontal(
# Label("CC:", classes="header_key"),
# Label(self.cc, classes="header_value", markup=False, id="cc"),
# )
yield Horizontal(
Label("Subject:", classes="header_key"),
Label(self.subject, classes="header_value", markup=False, id="subject"),
)
yield Horizontal(
Label("Date:", classes="header_key"),
Label(self.date, classes="header_value", markup=False, id="date"),
)
# yield Horizontal(
# Label("From:", classes="header_key"),
# Label(self.from_, classes="header_value", markup=False, id="from"),
# )
# yield Horizontal(
# Label("To:", classes="header_key"),
# Label(self.to, classes="header_value", markup=False, id="to"),
# )
# yield Horizontal(
# )
# yield Horizontal(
# Label("CC:", classes="header_key"),
# Label(self.cc, classes="header_value", markup=False, id="cc"),
# )
def watch_subject(self, subject: str) -> None:
"""Watch the subject for changes."""
@@ -62,8 +61,3 @@ class EnvelopeHeader(ScrollableContainer):
# def watch_cc(self, cc: str) -> None:
# """Watch the cc field for changes."""
# self.query_one("#cc").update(cc)

1
tui.py
View File

@@ -1,6 +1,7 @@
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, Static, Label
class MSALApp(App):
"""A Textual app for MSAL authentication."""