from typing import Tuple, List, Dict, Any, Optional import asyncio import json import logging import subprocess from src.mail.config import get_config async def list_envelopes( folder: Optional[str] = None, account: Optional[str] = None, limit: int = 9999, ) -> Tuple[List[Dict[str, Any]], bool]: """ Retrieve a list of email envelopes using the Himalaya CLI. Args: folder: The folder to list envelopes from (defaults to INBOX) account: The account to use (defaults to default account) limit: Maximum number of envelopes to retrieve Returns: Tuple containing: - List of envelope dictionaries - Success status (True if operation was successful) """ try: cmd = f"himalaya envelope list -o json -s {limit}" if folder: cmd += f" -f '{folder}'" if account: cmd += f" -a '{account}'" process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: envelopes = json.loads(stdout.decode()) return envelopes, True else: logging.error(f"Error listing envelopes: {stderr.decode()}") return [], False except Exception as e: logging.error(f"Exception during envelope listing: {e}") return [], False async def list_accounts() -> Tuple[List[Dict[str, Any]], bool]: """ Retrieve a list of accounts configured in Himalaya. Returns: Tuple containing: - List of account dictionaries - Success status (True if operation was successful) """ try: process = await asyncio.create_subprocess_shell( "himalaya account list -o json", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: accounts = json.loads(stdout.decode()) return accounts, True else: logging.error(f"Error listing accounts: {stderr.decode()}") return [], False except Exception as e: logging.error(f"Exception during account listing: {e}") return [], False async def list_folders( account: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], bool]: """ Retrieve a list of folders available in Himalaya. Args: account: The account to list folders for (defaults to default account) Returns: Tuple containing: - List of folder dictionaries - Success status (True if operation was successful) """ try: cmd = "himalaya folder list -o json" if account: cmd += f" -a '{account}'" process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: folders = json.loads(stdout.decode()) return folders, True else: logging.error(f"Error listing folders: {stderr.decode()}") return [], False except Exception as e: logging.error(f"Exception during folder listing: {e}") return [], False async def delete_message( message_id: int, folder: Optional[str] = None, account: Optional[str] = None, ) -> Tuple[Optional[str], bool]: """ Delete a message by its ID. Args: message_id: The ID of the message to delete folder: The folder containing the message account: The account to use Returns: Tuple containing: - Result message or error - Success status (True if deletion was successful) """ try: cmd = f"himalaya message delete {message_id}" if folder: cmd += f" -f '{folder}'" if account: cmd += f" -a '{account}'" process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: return stdout.decode().strip() or "Deleted successfully", True else: error_msg = stderr.decode().strip() logging.error(f"Error deleting message: {error_msg}") return error_msg or "Unknown error", False except Exception as e: logging.error(f"Exception during message deletion: {e}") return str(e), False # async def archive_message(message_id: int) -> [str, bool]: # """ # Archive a message by its ID. # Args: # message_id: The ID of the message to archive # Returns: # True if archiving was successful, False otherwise # """ # try: # process = await asyncio.create_subprocess_shell( # f"himalaya message move Archives {message_id}", # stdout=asyncio.subprocess.PIPE, # stderr=asyncio.subprocess.PIPE, # ) # stdout, stderr = await process.communicate() # return [stdout.decode(), process.returncode == 0] # except Exception as e: # logging.error(f"Exception during message archiving: {e}") # return False async def archive_messages( message_ids: List[str], folder: Optional[str] = None, account: Optional[str] = None, ) -> Tuple[Optional[str], bool]: """ Archive multiple messages by their IDs. Args: message_ids: A list of message IDs to archive. folder: The source folder containing the messages account: The account to use Returns: A tuple containing an optional output string and a boolean indicating success. """ try: config = get_config() archive_folder = config.mail.archive_folder ids_str = " ".join(message_ids) cmd = f"himalaya message move {archive_folder} {ids_str}" if folder: cmd += f" -f '{folder}'" if account: cmd += f" -a '{account}'" process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: return stdout.decode().strip() or "Archived successfully", True else: error_msg = stderr.decode().strip() logging.error(f"Error archiving messages: {error_msg}") return error_msg or "Unknown error", False except Exception as e: logging.error(f"Exception during message archiving: {e}") return str(e), False async def get_message_content( message_id: int, folder: Optional[str] = None, account: Optional[str] = None, ) -> Tuple[Optional[str], bool]: """ Retrieve the content of a message by its ID. Args: message_id: The ID of the message to retrieve folder: The folder containing the message account: The account to use Returns: Tuple containing: - Message content (or None if retrieval failed) - Success status (True if operation was successful) """ try: cmd = f"himalaya message read {message_id}" if folder: cmd += f" -f '{folder}'" if account: cmd += f" -a '{account}'" process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: content = stdout.decode() return content, True else: logging.error(f"Error retrieving message content: {stderr.decode()}") return None, False except Exception as e: logging.error(f"Exception during message content retrieval: {e}") return None, False async def mark_as_read( message_id: int, folder: Optional[str] = None, account: Optional[str] = None, ) -> Tuple[Optional[str], bool]: """ Mark a message as read by adding the 'seen' flag. Args: message_id: The ID of the message to mark as read folder: The folder containing the message account: The account to use Returns: Tuple containing: - Result message or error - Success status (True if operation was successful) """ try: cmd = f"himalaya flag add seen {message_id}" if folder: cmd += f" -f '{folder}'" if account: cmd += f" -a '{account}'" process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: return stdout.decode().strip() or "Marked as read", True else: error_msg = stderr.decode().strip() logging.error(f"Error marking message as read: {error_msg}") return error_msg or "Unknown error", False except Exception as e: logging.error(f"Exception during marking message as read: {e}") return str(e), False def sync_himalaya(): """This command does not exist. Halucinated by AI.""" try: # subprocess.run(["himalaya", "sync"], check=True) print("Himalaya sync completed successfully.") except subprocess.CalledProcessError as e: print(f"Error during Himalaya sync: {e}")