Implement mail search using Himalaya CLI with auto-select first result

This commit is contained in:
Bendt
2025-12-19 14:18:40 -05:00
parent d3468f7395
commit 0cd7cf6984
2 changed files with 109 additions and 28 deletions

View File

@@ -518,13 +518,7 @@ class EmailViewerApp(App):
config = get_config()
# Use filtered envelopes if search is active
if self.search_query:
display_envelopes = self.message_store.filter_by_query(self.search_query)
else:
display_envelopes = self.message_store.envelopes
for item in display_envelopes:
for item in self.message_store.envelopes:
if item and item.get("type") == "header":
# Use the new GroupHeader widget for date groupings
envelopes_list.append(ListItem(GroupHeader(label=item["label"])))
@@ -885,17 +879,14 @@ class EmailViewerApp(App):
self._update_list_view_subtitle()
def action_clear_selection(self) -> None:
"""Clear all selected messages and search filter."""
"""Clear all selected messages."""
if self.selected_messages:
self.selected_messages.clear()
self.refresh_list_view_items() # Refresh all items to uncheck checkboxes
self._update_list_view_subtitle()
elif self.search_query:
# Clear search if no selection
if self.search_query:
self.search_query = ""
self._populate_list_view()
self._update_list_view_subtitle()
self.show_status("Search cleared")
def action_oldest(self) -> None:
self.fetch_envelopes() if self.reload_needed else None
@@ -906,38 +897,77 @@ class EmailViewerApp(App):
self.show_message(self.message_store.get_newest_id())
def action_search(self) -> None:
"""Open search dialog to filter messages."""
"""Open search dialog to search messages via Himalaya."""
def handle_search_result(query: str | None) -> None:
if query is None:
return # User cancelled
if not query.strip():
# Empty query - clear search
self.search_query = ""
self._update_list_view_subtitle()
return
self.search_query = query
self._apply_search_filter()
self._perform_search(query)
self.push_screen(
SearchScreen(
title="Search Messages",
placeholder="Search by subject, sender, or recipient...",
placeholder="Search by sender, recipient, subject, or body...",
initial_value=self.search_query,
),
handle_search_result,
)
def _apply_search_filter(self) -> None:
"""Apply the current search filter to the envelope list."""
self._populate_list_view()
@work(exclusive=True)
async def _perform_search(self, query: str) -> None:
"""Perform search using Himalaya and select first result."""
self.show_status(f"Searching for '{query}'...")
# Update the title to show search status
if self.search_query:
self.query_one("#envelopes_list").border_subtitle = f"[{self.search_query}]"
folder = self.folder if self.folder else None
account = self.current_account if self.current_account else None
results, success = await himalaya_client.search_envelopes(
query, folder=folder, account=account
)
if not success:
self.show_status("Search failed", "error")
return
if not results:
self.show_status(f"No messages found matching '{query}'")
return
# Get the first result's ID
first_result = results[0]
result_id = int(first_result.get("id", 0))
if result_id == 0:
self.show_status("Search returned invalid result", "error")
return
# Find this ID in our current envelope list and select it
metadata = self.message_store.get_metadata(result_id)
if metadata:
# Message is in current view - select it
self.current_message_id = result_id
self.current_message_index = metadata["index"]
# Update list view selection
list_view = self.query_one("#envelopes_list", ListView)
list_view.index = metadata["index"]
self.show_status(f"Found {len(results)} message(s) - showing first match")
self.action_focus_4()
else:
self._update_list_view_subtitle()
# Focus the list and select first message
self.query_one("#envelopes_list").focus()
envelopes_list = self.query_one("#envelopes_list", ListView)
if envelopes_list.children:
envelopes_list.index = 0
# Message not in current view (maybe filtered or not loaded)
# Just open it directly
self.current_message_id = result_id
self.show_status(
f"Found {len(results)} message(s) - ID {result_id} (not in current list)"
)
self.action_focus_4()
def action_focus_1(self) -> None:
self.query_one("#envelopes_list").focus()

View File

@@ -312,6 +312,57 @@ async def mark_as_read(
return str(e), False
async def search_envelopes(
query: str,
folder: Optional[str] = None,
account: Optional[str] = None,
limit: int = 100,
) -> Tuple[List[Dict[str, Any]], bool]:
"""
Search for envelopes matching a query using Himalaya CLI.
The query is searched across from, to, subject, and body fields.
Args:
query: The search term to look for
folder: The folder to search in (defaults to INBOX)
account: The account to use (defaults to default account)
limit: Maximum number of results to return
Returns:
Tuple containing:
- List of matching envelope dictionaries
- Success status (True if operation was successful)
"""
try:
# Build a compound query to search from, to, subject, and body
# Himalaya query syntax: from <pattern> or to <pattern> or subject <pattern> or body <pattern>
search_query = f"from {query} or to {query} or subject {query} or body {query}"
cmd = f"himalaya envelope list -o json -s {limit} {search_query}"
if folder:
cmd += f" -f '{folder}'"
if account:
cmd += f" -a '{account}'"
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
envelopes = json.loads(stdout.decode())
return envelopes, True
else:
logging.error(f"Error searching envelopes: {stderr.decode()}")
return [], False
except Exception as e:
logging.error(f"Exception during envelope search: {e}")
return [], False
def sync_himalaya():
"""This command does not exist. Halucinated by AI."""
try: