feat: Add notification email compression feature

Add intelligent notification email detection and compression:
- Detect notification emails from GitLab, GitHub, Jira, Confluence, Datadog, Renovate
- Extract structured summaries from notification emails
- Compress notifications into terminal-friendly markdown format
- Add configuration options for notification compression mode

Features:
- Rule-based detection using sender domains and subject patterns
- Type-specific extractors for each notification platform
- Configurable compression modes (summary, detailed, off)
- Integrated with ContentContainer for seamless display

Files added:
- src/mail/notification_detector.py: Notification type detection
- src/mail/notification_compressor.py: Content compression

Modified:
- src/mail/config.py: Add notification_compression_mode config
- src/mail/widgets/ContentContainer.py: Integrate compressor
- src/mail/app.py: Pass envelope data to display_content
- PROJECT_PLAN.md: Document new feature
This commit is contained in:
Bendt
2025-12-28 10:49:25 -05:00
parent 504e0d534d
commit 1c1b86b96b
6 changed files with 603 additions and 5 deletions

View File

@@ -265,9 +265,17 @@ class EmailViewerApp(App):
content_container = self.query_one(ContentContainer)
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
content_container.display_content(message_id, folder=folder, account=account)
# Get envelope data for notification compression
metadata = self.message_store.get_metadata(message_id)
envelope = None
if metadata:
envelope = self.message_store.envelopes.get(metadata["index"])
content_container.display_content(
message_id, folder=folder, account=account, envelope=envelope
)
if metadata:
message_date = metadata["date"]
if self.current_message_index != metadata["index"]:

View File

@@ -86,6 +86,10 @@ class ContentDisplayConfig(BaseModel):
compress_urls: bool = True
max_url_length: int = 50 # Maximum length before URL is compressed
# Notification compression: compress notification emails into summaries
compress_notifications: bool = True
notification_compression_mode: Literal["summary", "detailed", "off"] = "summary"
class LinkPanelConfig(BaseModel):
"""Configuration for the link panel."""

View File

@@ -0,0 +1,221 @@
"""Notification email compressor for terminal-friendly display."""
from typing import Dict, Any, Optional
from textual.widgets import Markdown
from .notification_detector import (
is_notification_email,
classify_notification,
extract_notification_summary,
NotificationType,
)
class NotificationCompressor:
"""Compress notification emails into terminal-friendly summaries."""
def __init__(self, mode: str = "summary"):
"""Initialize compressor.
Args:
mode: Compression mode - "summary", "detailed", or "off"
"""
self.mode = mode
def should_compress(self, envelope: Dict[str, Any]) -> bool:
"""Check if email should be compressed.
Args:
envelope: Email envelope metadata
Returns:
True if email should be compressed
"""
if self.mode == "off":
return False
return is_notification_email(envelope)
def compress(
self, content: str, envelope: Dict[str, Any]
) -> tuple[str, Optional[NotificationType]]:
"""Compress notification email content.
Args:
content: Raw email content
envelope: Email envelope metadata
Returns:
Tuple of (compressed content, notification_type)
"""
if not self.should_compress(envelope):
return content, None
# Classify notification type
notif_type = classify_notification(envelope, content)
# Extract summary
summary = extract_notification_summary(content, notif_type)
# Format as markdown
compressed = self._format_as_markdown(summary, envelope, notif_type)
return compressed, notif_type
def _format_as_markdown(
self,
summary: Dict[str, Any],
envelope: Dict[str, Any],
notif_type: Optional[NotificationType],
) -> str:
"""Format summary as markdown for terminal display.
Args:
summary: Extracted summary data
envelope: Email envelope metadata
notif_type: Classified notification type
Returns:
Markdown-formatted compressed email
"""
from_addr = envelope.get("from", {}).get("name") or envelope.get(
"from", {}
).get("addr", "")
subject = envelope.get("subject", "")
# Get icon
icon = notif_type.icon if notif_type else "\uf0f3"
# Build markdown
lines = []
# Header with icon
if notif_type:
lines.append(f"## {icon} {notif_type.name.title()} Notification")
else:
lines.append(f"## {icon} Notification")
lines.append("")
# Title/subject
if summary.get("title"):
lines.append(f"**{summary['title']}**")
else:
lines.append(f"**{subject}**")
lines.append("")
# Metadata section
if summary.get("metadata"):
lines.append("### Details")
for key, value in summary["metadata"].items():
# Format key nicely
key_formatted = key.replace("_", " ").title()
lines.append(f"- **{key_formatted}**: {value}")
lines.append("")
# Action items
if summary.get("action_items"):
lines.append("### Actions")
for i, action in enumerate(summary["action_items"], 1):
lines.append(f"{i}. {action}")
lines.append("")
# Add footer
lines.append("---")
lines.append("")
lines.append(f"*From: {from_addr}*")
lines.append(
"*This is a compressed notification view. Press `m` to toggle full view.*"
)
return "\n".join(lines)
class DetailedCompressor(NotificationCompressor):
"""Compressor that includes more detail in summaries."""
def _format_as_markdown(
self,
summary: Dict[str, Any],
envelope: Dict[str, Any],
notif_type: Optional[NotificationType],
) -> str:
"""Format summary with more detail."""
from_addr = envelope.get("from", {}).get("name") or envelope.get(
"from", {}
).get("addr", "")
subject = envelope.get("subject", "")
date = envelope.get("date", "")
icon = notif_type.icon if notif_type else "\uf0f3"
lines = []
# Header
lines.append(
f"## {icon} {notif_type.name.title()} Notification"
if notif_type
else f"## {icon} Notification"
)
lines.append("")
# Subject and from
lines.append(f"**Subject:** {subject}")
lines.append(f"**From:** {from_addr}")
lines.append(f"**Date:** {date}")
lines.append("")
# Summary title
if summary.get("title"):
lines.append(f"### {summary['title']}")
lines.append("")
# Metadata table
if summary.get("metadata"):
lines.append("| Property | Value |")
lines.append("|----------|-------|")
for key, value in summary["metadata"].items():
key_formatted = key.replace("_", " ").title()
lines.append(f"| {key_formatted} | {value} |")
lines.append("")
# Action items
if summary.get("action_items"):
lines.append("### Action Items")
for i, action in enumerate(summary["action_items"], 1):
lines.append(f"- [ ] {action}")
lines.append("")
# Key links
if summary.get("key_links"):
lines.append("### Important Links")
for link in summary["key_links"]:
lines.append(f"- [{link.get('text', 'Link')}]({link.get('url', '#')})")
lines.append("")
# Footer
lines.append("---")
lines.append(
"*This is a compressed notification view. Press `m` to toggle full view.*"
)
return "\n".join(lines)
def create_compressor(mode: str) -> NotificationCompressor:
"""Factory function to create appropriate compressor.
Args:
mode: Compression mode - "summary", "detailed", or "off"
Returns:
NotificationCompressor instance
"""
if mode == "detailed":
return DetailedCompressor(mode=mode)
else:
return NotificationCompressor(mode=mode)

View File

@@ -0,0 +1,336 @@
"""Email notification detection utilities."""
import re
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class NotificationType:
"""Classification of notification email types."""
name: str
patterns: List[str]
domains: List[str]
icon: str
def matches(self, envelope: Dict[str, Any], content: Optional[str] = None) -> bool:
"""Check if envelope matches this notification type."""
# Check sender domain
from_addr = envelope.get("from", {}).get("addr", "").lower()
if any(domain in from_addr for domain in self.domains):
return True
# Check subject patterns
subject = envelope.get("subject", "").lower()
if any(re.search(pattern, subject, re.IGNORECASE) for pattern in self.patterns):
return True
return False
# Common notification types
NOTIFICATION_TYPES = [
NotificationType(
name="gitlab",
patterns=[r"\[gitlab\]", r"pipeline", r"merge request", r"mention.*you"],
domains=["gitlab.com", "@gitlab"],
icon="\uf296",
),
NotificationType(
name="github",
patterns=[r"\[github\]", r"pr #", r"pull request", r"issue #", r"mention"],
domains=["github.com", "noreply@github.com"],
icon="\uf09b",
),
NotificationType(
name="jira",
patterns=[r"\[jira\]", r"[a-z]+-\d+", r"issue updated", r"comment added"],
domains=["atlassian.net", "jira"],
icon="\uf1b3",
),
NotificationType(
name="confluence",
patterns=[r"\[confluence\]", r"page created", r"page updated", r"comment"],
domains=["atlassian.net", "confluence"],
icon="\uf298",
),
NotificationType(
name="datadog",
patterns=[r"alert", r"monitor", r"incident", r"downtime"],
domains=["datadoghq.com", "datadog"],
icon="\uf1b0",
),
NotificationType(
name="renovate",
patterns=[r"renovate", r"dependency update", r"lock file"],
domains=["renovate", "renovatebot"],
icon="\uf1e6",
),
NotificationType(
name="general",
patterns=[r"\[.*?\]", r"notification", r"digest", r"summary"],
domains=["noreply@", "no-reply@", "notifications@"],
icon="\uf0f3",
),
]
def is_notification_email(
envelope: Dict[str, Any], content: Optional[str] = None
) -> bool:
"""Check if an email is a notification-style email.
Args:
envelope: Email envelope metadata from himalaya
content: Optional email content for content-based detection
Returns:
True if email appears to be a notification
"""
# Check against known notification types
for notif_type in NOTIFICATION_TYPES:
if notif_type.matches(envelope, content):
return True
# Check for generic notification indicators
subject = envelope.get("subject", "").lower()
from_addr = envelope.get("from", {}).get("addr", "").lower()
# Generic notification patterns
generic_patterns = [
r"^\[.*?\]", # Brackets at start
r"weekly|daily|monthly.*report|digest|summary",
r"you were mentioned",
r"this is an automated message",
r"do not reply|don't reply",
]
if any(re.search(pattern, subject, re.IGNORECASE) for pattern in generic_patterns):
return True
# Check for notification senders
notification_senders = ["noreply", "no-reply", "notifications", "robot", "bot"]
if any(sender in from_addr for sender in notification_senders):
return True
return False
def classify_notification(
envelope: Dict[str, Any], content: Optional[str] = None
) -> Optional[NotificationType]:
"""Classify the type of notification email.
Args:
envelope: Email envelope metadata from himalaya
content: Optional email content for content-based detection
Returns:
NotificationType if classified, None if not a notification
"""
for notif_type in NOTIFICATION_TYPES:
if notif_type.matches(envelope, content):
return notif_type
return None
def extract_notification_summary(
content: str, notification_type: Optional[NotificationType] = None
) -> Dict[str, Any]:
"""Extract structured summary from notification email content.
Args:
content: Email body content
notification_type: Classified notification type (optional)
Returns:
Dictionary with extracted summary fields
"""
summary = {
"title": None,
"action_items": [],
"key_links": [],
"metadata": {},
}
# Extract based on notification type
if notification_type and notification_type.name == "gitlab":
summary.update(_extract_gitlab_summary(content))
elif notification_type and notification_type.name == "github":
summary.update(_extract_github_summary(content))
elif notification_type and notification_type.name == "jira":
summary.update(_extract_jira_summary(content))
elif notification_type and notification_type.name == "confluence":
summary.update(_extract_confluence_summary(content))
elif notification_type and notification_type.name == "datadog":
summary.update(_extract_datadog_summary(content))
elif notification_type and notification_type.name == "renovate":
summary.update(_extract_renovate_summary(content))
else:
summary.update(_extract_general_notification_summary(content))
return summary
def _extract_gitlab_summary(content: str) -> Dict[str, Any]:
"""Extract summary from GitLab notification."""
summary = {"action_items": [], "key_links": [], "metadata": {}}
# Pipeline patterns
pipeline_match = re.search(
r"Pipeline #(\d+).*?(?:failed|passed|canceled) by (.+?)[\n\r]",
content,
re.IGNORECASE,
)
if pipeline_match:
summary["metadata"]["pipeline_id"] = pipeline_match.group(1)
summary["metadata"]["triggered_by"] = pipeline_match.group(2)
summary["title"] = f"Pipeline #{pipeline_match.group(1)}"
# Merge request patterns
mr_match = re.search(r"Merge request #(\d+):\s*(.+?)[\n\r]", content, re.IGNORECASE)
if mr_match:
summary["metadata"]["mr_id"] = mr_match.group(1)
summary["metadata"]["mr_title"] = mr_match.group(2)
summary["title"] = f"MR #{mr_match.group(1)}: {mr_match.group(2)}"
# Mention patterns
mention_match = re.search(
r"<@(.+?)> mentioned you in (?:#|@)(.+?)[\n\r]", content, re.IGNORECASE
)
if mention_match:
summary["metadata"]["mentioned_by"] = mention_match.group(1)
summary["metadata"]["mentioned_in"] = mention_match.group(2)
summary["title"] = f"Mention by {mention_match.group(1)}"
return summary
def _extract_github_summary(content: str) -> Dict[str, Any]:
"""Extract summary from GitHub notification."""
summary = {"action_items": [], "key_links": [], "metadata": {}}
# PR/Issue patterns
pr_match = re.search(r"(?:PR|Issue) #(\d+):\s*(.+?)[\n\r]", content, re.IGNORECASE)
if pr_match:
summary["metadata"]["number"] = pr_match.group(1)
summary["metadata"]["title"] = pr_match.group(2)
summary["title"] = f"#{pr_match.group(1)}: {pr_match.group(2)}"
# Review requested
if "review requested" in content.lower():
summary["action_items"].append("Review requested")
return summary
def _extract_jira_summary(content: str) -> Dict[str, Any]:
"""Extract summary from Jira notification."""
summary = {"action_items": [], "key_links": [], "metadata": {}}
# Issue patterns
issue_match = re.search(r"([A-Z]+-\d+):\s*(.+?)[\n\r]", content, re.IGNORECASE)
if issue_match:
summary["metadata"]["issue_key"] = issue_match.group(1)
summary["metadata"]["issue_title"] = issue_match.group(2)
summary["title"] = f"{issue_match.group(1)}: {issue_match.group(2)}"
# Status changes
if "status changed" in content.lower():
status_match = re.search(
r"status changed from (.+?) to (.+)", content, re.IGNORECASE
)
if status_match:
summary["metadata"]["status_from"] = status_match.group(1)
summary["metadata"]["status_to"] = status_match.group(2)
summary["action_items"].append(
f"Status: {status_match.group(1)}{status_match.group(2)}"
)
return summary
def _extract_confluence_summary(content: str) -> Dict[str, Any]:
"""Extract summary from Confluence notification."""
summary = {"action_items": [], "key_links": [], "metadata": {}}
# Page patterns
page_match = re.search(r"Page \"(.+?)\"", content, re.IGNORECASE)
if page_match:
summary["metadata"]["page_title"] = page_match.group(1)
summary["title"] = f"Page: {page_match.group(1)}"
# Author
author_match = re.search(
r"(?:created|updated) by (.+?)[\n\r]", content, re.IGNORECASE
)
if author_match:
summary["metadata"]["author"] = author_match.group(1)
return summary
def _extract_datadog_summary(content: str) -> Dict[str, Any]:
"""Extract summary from Datadog notification."""
summary = {"action_items": [], "key_links": [], "metadata": {}}
# Alert status
if "triggered" in content.lower():
summary["metadata"]["status"] = "Triggered"
summary["action_items"].append("Alert triggered - investigate")
elif "recovered" in content.lower():
summary["metadata"]["status"] = "Recovered"
# Monitor name
monitor_match = re.search(r"Monitor: (.+?)[\n\r]", content, re.IGNORECASE)
if monitor_match:
summary["metadata"]["monitor"] = monitor_match.group(1)
summary["title"] = f"Alert: {monitor_match.group(1)}"
return summary
def _extract_renovate_summary(content: str) -> Dict[str, Any]:
"""Extract summary from Renovate notification."""
summary = {"action_items": [], "key_links": [], "metadata": {}}
# Dependency patterns
dep_match = re.search(
r"Update (?:.+) dependency (.+?) to (v?\d+\.\d+\.?\d*)", content, re.IGNORECASE
)
if dep_match:
summary["metadata"]["dependency"] = dep_match.group(2)
summary["metadata"]["version"] = dep_match.group(3)
summary["title"] = f"Update {dep_match.group(2)} to {dep_match.group(3)}"
summary["action_items"].append("Review and merge dependency update")
return summary
def _extract_general_notification_summary(content: str) -> Dict[str, Any]:
"""Extract summary from general notification."""
summary = {"action_items": [], "key_links": [], "metadata": {}}
# Look for action-oriented phrases
action_patterns = [
r"you need to (.+)",
r"please (.+)",
r"action required",
r"review requested",
r"approval needed",
]
for pattern in action_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
summary["action_items"].extend(matches)
# Limit action items
summary["action_items"] = summary["action_items"][:5]
return summary

View File

@@ -11,9 +11,11 @@ from src.mail.screens.LinkPanel import (
LinkItem,
LinkItem as LinkItemClass,
)
from src.mail.notification_compressor import create_compressor
from src.mail.notification_detector import NotificationType
import logging
from datetime import datetime
from typing import Literal, List, Dict
from typing import Literal, List, Dict, Optional
from urllib.parse import urlparse
import re
import os
@@ -173,10 +175,16 @@ class ContentContainer(ScrollableContainer):
self.current_folder: str | None = None
self.current_account: str | None = None
self.content_worker = None
self.current_envelope: Optional[Dict[str, Any]] = None
self.current_notification_type: Optional[NotificationType] = None
self.is_compressed_view: bool = False
# Load default view mode from config
# Load default view mode and notification compression from config
config = get_config()
self.current_mode = config.content_display.default_view_mode
self.compressor = create_compressor(
config.content_display.notification_compression_mode
)
def compose(self):
yield self.content
@@ -243,6 +251,7 @@ class ContentContainer(ScrollableContainer):
message_id: int,
folder: str | None = None,
account: str | None = None,
envelope: Optional[Dict[str, Any]] = None,
) -> None:
"""Display the content of a message."""
if not message_id:
@@ -251,6 +260,7 @@ class ContentContainer(ScrollableContainer):
self.current_message_id = message_id
self.current_folder = folder
self.current_account = account
self.current_envelope = envelope
# Immediately show a loading message
if self.current_mode == "markdown":
@@ -282,6 +292,18 @@ class ContentContainer(ScrollableContainer):
# Store the raw content for link extraction
self.current_content = content
# Apply notification compression if enabled
if self.compressor.mode != "off" and self.current_envelope:
compressed_content, notif_type = self.compressor.compress(
content, self.current_envelope
)
self.current_notification_type = notif_type
content = compressed_content
self.is_compressed_view = True
else:
self.current_notification_type = None
self.is_compressed_view = False
# Get URL compression settings from config
config = get_config()
compress_urls = config.content_display.compress_urls
@@ -291,7 +313,8 @@ class ContentContainer(ScrollableContainer):
if self.current_mode == "markdown":
# For markdown mode, use the Markdown widget
display_content = content
if compress_urls:
if compress_urls and not self.is_compressed_view:
# Don't compress URLs in notification summaries (they're already formatted)
display_content = compress_urls_in_content(content, max_url_len)
self.content.update(display_content)
else: