Add IPC listener to calendar and tasks apps for sync daemon refresh notifications

This commit is contained in:
Bendt
2025-12-19 15:57:46 -05:00
parent f5ad43323c
commit ab55d0836e
2 changed files with 35 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ from src.calendar.widgets.MonthCalendar import MonthCalendar
from src.calendar.widgets.InvitesPanel import InvitesPanel, CalendarInvite
from src.calendar.widgets.AddEventForm import EventFormData
from src.utils.shared_config import get_theme_name
from src.utils.ipc import IPCListener, IPCMessage
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -171,6 +172,10 @@ class CalendarApp(App):
"""Initialize the app on mount."""
self.theme = get_theme_name()
# Start IPC listener for refresh notifications from sync daemon
self._ipc_listener = IPCListener("calendar", self._on_ipc_message)
self._ipc_listener.start()
# Load events for current week
self.load_events()
@@ -184,6 +189,12 @@ class CalendarApp(App):
self._update_status()
self._update_title()
def _on_ipc_message(self, message: IPCMessage) -> None:
"""Handle IPC messages from sync daemon."""
if message.event == "refresh":
# Schedule a reload on the main thread
self.call_from_thread(self.load_events)
async def _load_invites_async(self) -> None:
"""Load pending calendar invites from Microsoft Graph."""
try:
@@ -528,6 +539,12 @@ Keybindings:
"""
self.notify(help_text.strip(), timeout=10)
async def action_quit(self) -> None:
"""Quit the app and clean up IPC listener."""
if hasattr(self, "_ipc_listener"):
self._ipc_listener.stop()
self.exit()
def action_focus_invites(self) -> None:
"""Focus on the invites panel and show invite count."""
if not self.show_sidebar:

View File

@@ -18,6 +18,7 @@ from .config import get_config, TasksAppConfig
from .backend import Task, TaskBackend, TaskPriority, TaskStatus, Project
from .widgets.FilterSidebar import FilterSidebar
from src.utils.shared_config import get_theme_name
from src.utils.ipc import IPCListener, IPCMessage
# Add the parent directory to the system path to resolve relative imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -246,6 +247,11 @@ class TasksApp(App):
def on_mount(self) -> None:
"""Initialize the app on mount."""
self.theme = get_theme_name()
# Start IPC listener for refresh notifications from sync daemon
self._ipc_listener = IPCListener("tasks", self._on_ipc_message)
self._ipc_listener.start()
table = self.query_one("#task-table", DataTable)
# Setup columns based on config with dynamic widths
@@ -897,6 +903,18 @@ Keybindings:
if task:
self._update_detail_display(task)
def _on_ipc_message(self, message: IPCMessage) -> None:
"""Handle IPC messages from sync daemon."""
if message.event == "refresh":
# Schedule a reload on the main thread
self.call_from_thread(self.load_tasks)
async def action_quit(self) -> None:
"""Quit the app and clean up IPC listener."""
if hasattr(self, "_ipc_listener"):
self._ipc_listener.stop()
self.exit()
def run_app(backend: Optional[TaskBackend] = None) -> None:
"""Run the Tasks TUI application."""