From a8950c42e23d4c4eaa1e229f8c0ef0fcab535df8 Mon Sep 17 00:00:00 2001 From: Hunter Jozwiak Date: Mon, 6 Apr 2026 21:09:41 -0400 Subject: [PATCH] Initial support for mako notification daemon. --- src/cthulhu/cthulhu.py | 9 + src/cthulhu/guilabels.py | 16 + src/cthulhu/mako_notification_monitor.py | 419 ++++++++++++++ src/cthulhu/meson.build | 1 + src/cthulhu/messages.py | 15 + src/cthulhu/notification_presenter.py | 541 +++++++++++++++--- tests/conftest.py | 65 +++ tests/test_mako_notification_monitor.py | 160 ++++++ ...notification_presenter_mako_regressions.py | 124 ++++ 9 files changed, 1265 insertions(+), 85 deletions(-) create mode 100644 src/cthulhu/mako_notification_monitor.py create mode 100644 tests/conftest.py create mode 100644 tests/test_mako_notification_monitor.py create mode 100644 tests/test_notification_presenter_mako_regressions.py diff --git a/src/cthulhu/cthulhu.py b/src/cthulhu/cthulhu.py index d9f9ceb..711803a 100644 --- a/src/cthulhu/cthulhu.py +++ b/src/cthulhu/cthulhu.py @@ -261,6 +261,7 @@ from . import event_manager from . import keybindings from . import learn_mode_presenter from . import logger +from . import mako_notification_monitor from . import messages from . import notification_presenter from . import focus_manager @@ -719,6 +720,7 @@ def start() -> None: debug.printMessage(debug.LEVEL_INFO, 'CTHULHU: Starting Atspi main event loop', True) # Start D-Bus remote controller service after ATSPI is ready + cthulhuApp.getMakoNotificationMonitor().start() GObject.idle_add(_start_dbus_service) Atspi.event_main() @@ -771,6 +773,7 @@ def shutdown(script: Optional[Any] = None, inputEvent: Optional[Any] = None) -> cthulhuApp.getSignalManager().emitSignal('stop-application-completed') sound_theme_manager.getManager().playStopSound(wait=True, timeoutSeconds=1) cthulhuApp.getPluginSystemManager().unloadAllPlugins(ForceAllPlugins=True) + cthulhuApp.getMakoNotificationMonitor().stop() # Deactivate the event manager first so that it clears its queue and will not # accept new events. Then let the script manager unregister script event listeners. @@ -958,6 +961,8 @@ class Cthulhu(GObject.Object): self.translationManager: TranslationManager = translation_manager.TranslationManager(self) self.debugManager: Any = debug self.APIHelper: APIHelper = APIHelper(self) + self.makoNotificationMonitor: mako_notification_monitor.MakoNotificationMonitor = \ + mako_notification_monitor.MakoNotificationMonitor(self) self.createCompatAPI() self.pluginSystemManager: PluginSystemManager = plugin_system_manager.PluginSystemManager(self) # Scan for available plugins at startup @@ -969,6 +974,9 @@ class Cthulhu(GObject.Object): def getPluginSystemManager(self) -> PluginSystemManager: return self.pluginSystemManager + def getMakoNotificationMonitor(self) -> mako_notification_monitor.MakoNotificationMonitor: + return self.makoNotificationMonitor + def getDynamicApiManager(self) -> DynamicApiManager: return self.dynamicApiManager @@ -1027,6 +1035,7 @@ class Cthulhu(GObject.Object): self.getDynamicApiManager().registerAPI('Messages', messages) self.getDynamicApiManager().registerAPI('Cmdnames', cmdnames) self.getDynamicApiManager().registerAPI('NotificationPresenter', notification_presenter) + self.getDynamicApiManager().registerAPI('MakoNotificationMonitor', self.makoNotificationMonitor) self.getDynamicApiManager().registerAPI('CthulhuState', cthulhu_state) self.getDynamicApiManager().registerAPI('CthulhuPlatform', cthulhu_platform) self.getDynamicApiManager().registerAPI('Settings', settings) diff --git a/src/cthulhu/guilabels.py b/src/cthulhu/guilabels.py index 0cb9f1c..5946025 100644 --- a/src/cthulhu/guilabels.py +++ b/src/cthulhu/guilabels.py @@ -376,6 +376,22 @@ NOTIFICATIONS_COLUMN_HEADER = C_("notification presenter", "Notifications") # for the time, which will be relative (e.g. "10 minutes ago") or absolute. NOTIFICATIONS_RECEIVED_TIME = C_("notification presenter", "Received") +# Translators: This is the label for a button which dismisses the selected live +# notification from mako's queue. +NOTIFICATIONS_DISMISS_BUTTON = C_("notification presenter", "Dismiss Notification") + +# Translators: This is the label for a button which opens the list of actions +# for the selected live notification. +NOTIFICATIONS_ACTIONS_BUTTON = C_("notification presenter", "Notification Actions") + +# Translators: This is the title for the dialog listing the actions associated +# with the selected live notification. +NOTIFICATIONS_ACTIONS_TITLE = C_("notification presenter", "Notification Actions") + +# Translators: This is the label for a button which invokes the selected action +# from the notification actions dialog. +NOTIFICATIONS_INVOKE_ACTION_BUTTON = C_("notification presenter", "Invoke Action") + # Translators: This string is a label for the group of Cthulhu commands which # are associated with presenting notifications. KB_GROUP_NOTIFICATIONS = _("Notification presenter") diff --git a/src/cthulhu/mako_notification_monitor.py b/src/cthulhu/mako_notification_monitor.py new file mode 100644 index 0000000..066b102 --- /dev/null +++ b/src/cthulhu/mako_notification_monitor.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2026 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the +# Free Software Foundation, Inc., Franklin Street, Fifth Floor, +# Boston MA 02110-1301 USA. +# + +"""Core support for speaking mako notifications via D-Bus.""" + +from __future__ import annotations + +import html +import re +from typing import Any, Optional + +from gi.repository import Gio, GLib + +from . import cthulhu_state +from . import debug +from . import messages +from . import notification_presenter +from . import settings + + +class MakoNotificationMonitor: + """Tracks mako's live D-Bus queue and presents new notifications.""" + + BUS_NAME = "org.freedesktop.Notifications" + OBJECT_PATH = "/fr/emersion/Mako" + INTERFACE_NAME = "fr.emersion.Mako" + SOURCE_NAME = "mako" + + def __init__(self, app) -> None: + self._app = app + self._presenter = None + self._connection: Optional[Gio.DBusConnection] = None + self._watch_id: int = 0 + self._properties_subscription_id: int = 0 + self._known_ids: set[int] = set() + self._is_running: bool = False + self._is_available: bool = False + self._generation: int = 0 + self._tag_pattern = re.compile(r"<[^>]+>") + self._whitespace_pattern = re.compile(r"\s+") + + def start(self) -> None: + """Starts watching the session bus for mako.""" + + if self._is_running: + return + + try: + self._connection = Gio.bus_get_sync(Gio.BusType.SESSION, None) + except Exception as error: + msg = f"MAKO MONITOR: Failed to connect to session bus: {error}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return + + self._watch_id = Gio.bus_watch_name_on_connection( + self._connection, + self.BUS_NAME, + Gio.BusNameWatcherFlags.NONE, + self._on_name_appeared, + self._on_name_vanished, + ) + self._is_running = True + debug.printMessage(debug.LEVEL_INFO, "MAKO MONITOR: Started", True) + + def stop(self, reset_live_state: bool = True) -> None: + """Stops watching the session bus for mako.""" + + if self._properties_subscription_id and self._connection is not None: + self._connection.signal_unsubscribe(self._properties_subscription_id) + self._properties_subscription_id = 0 + + if self._watch_id: + Gio.bus_unwatch_name(self._watch_id) + self._watch_id = 0 + + if reset_live_state: + self._get_presenter().mark_source_unavailable(self.SOURCE_NAME) + + self._known_ids.clear() + self._is_available = False + self._is_running = False + self._connection = None + debug.printMessage(debug.LEVEL_INFO, "MAKO MONITOR: Stopped", True) + + def refresh(self, announce_new: bool = False) -> bool: + """Refreshes mako's live queue and optionally speaks new notifications.""" + + if self._connection is None or not self._is_available: + return False + + live_notifications = self._fetch_live_notifications() + if live_notifications is None: + return False + + self._sync_notifications(live_notifications, announce_new) + return True + + def dismiss_notification(self, notification_id: int) -> bool: + """Dismisses a live mako notification by ID.""" + + if self._connection is None or not self._is_available: + return False + + try: + options = { + "id": GLib.Variant("u", notification_id), + "group": GLib.Variant("b", False), + "history": GLib.Variant("b", True), + "all": GLib.Variant("b", False), + } + self._connection.call_sync( + self.BUS_NAME, + self.OBJECT_PATH, + self.INTERFACE_NAME, + "DismissNotifications", + GLib.Variant("(a{sv})", (options,)), + None, + Gio.DBusCallFlags.NONE, + -1, + None, + ) + except Exception as error: + msg = f"MAKO MONITOR: Failed to dismiss notification {notification_id}: {error}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return False + + self.refresh(announce_new=False) + return True + + def invoke_action(self, notification_id: int, action_key: str) -> bool: + """Invokes a live mako notification action.""" + + if self._connection is None or not self._is_available: + return False + + try: + self._connection.call_sync( + self.BUS_NAME, + self.OBJECT_PATH, + self.INTERFACE_NAME, + "InvokeAction", + GLib.Variant("(us)", (notification_id, action_key)), + None, + Gio.DBusCallFlags.NONE, + -1, + None, + ) + except Exception as error: + msg = ( + f"MAKO MONITOR: Failed to invoke action {action_key} " + f"for notification {notification_id}: {error}" + ) + debug.printMessage(debug.LEVEL_INFO, msg, True) + return False + + self.refresh(announce_new=False) + return True + + def is_current_entry(self, entry) -> bool: + """Returns True if entry belongs to the current mako session.""" + + return ( + self._is_available + and entry.source == self.SOURCE_NAME + and entry.source_generation == self._generation + ) + + def get_generation(self) -> int: + """Returns the current mako session generation.""" + + return self._generation + + def _get_presenter(self): + presenter = self._presenter or notification_presenter.getPresenter() + if presenter is not self._presenter: + self._presenter = presenter + + if getattr(presenter, "_mako_monitor", None) is not self: + presenter.set_mako_monitor(self) + + return presenter + + def _on_name_appeared( + self, + connection: Gio.DBusConnection, + name: str, + name_owner: str, + user_data: Optional[Any] = None, + ) -> None: + msg = f"MAKO MONITOR: {name} appeared as {name_owner}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + + self._connection = connection + self._subscribe_to_properties() + + live_notifications = self._fetch_live_notifications() + if live_notifications is None: + self._unsubscribe_from_properties() + self._get_presenter().mark_source_unavailable(self.SOURCE_NAME) + self._known_ids.clear() + self._is_available = False + debug.printMessage( + debug.LEVEL_INFO, + "MAKO MONITOR: Notification daemon is not exposing the mako interface", + True, + ) + return + + self._generation += 1 + self._is_available = True + self._known_ids.clear() + self._sync_notifications(live_notifications, announce_new=False) + + def _on_name_vanished( + self, + connection: Gio.DBusConnection, + name: str, + user_data: Optional[Any] = None, + ) -> None: + msg = f"MAKO MONITOR: {name} vanished" + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._unsubscribe_from_properties() + self._get_presenter().mark_source_unavailable(self.SOURCE_NAME) + self._known_ids.clear() + self._is_available = False + + def _subscribe_to_properties(self) -> None: + if self._connection is None or self._properties_subscription_id: + return + + self._properties_subscription_id = self._connection.signal_subscribe( + self.BUS_NAME, + "org.freedesktop.DBus.Properties", + "PropertiesChanged", + self.OBJECT_PATH, + self.INTERFACE_NAME, + Gio.DBusSignalFlags.NONE, + self._on_properties_changed, + ) + + def _unsubscribe_from_properties(self) -> None: + if self._connection is None or not self._properties_subscription_id: + return + + self._connection.signal_unsubscribe(self._properties_subscription_id) + self._properties_subscription_id = 0 + + def _on_properties_changed( + self, + connection: Gio.DBusConnection, + sender_name: Optional[str], + object_path: str, + interface_name: str, + signal_name: str, + parameters: GLib.Variant, + user_data: Optional[Any] = None, + ) -> None: + del connection, sender_name, object_path, interface_name, signal_name, parameters, user_data + self.refresh(announce_new=True) + + def _fetch_live_notifications(self) -> Optional[list[dict[str, Any]]]: + if self._connection is None: + return None + + try: + reply = self._connection.call_sync( + self.BUS_NAME, + self.OBJECT_PATH, + self.INTERFACE_NAME, + "ListNotifications", + None, + None, + Gio.DBusCallFlags.NONE, + -1, + None, + ) + except Exception as error: + msg = f"MAKO MONITOR: Failed to list notifications: {error}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return None + + try: + unpacked = reply.unpack() + except Exception as error: + msg = f"MAKO MONITOR: Failed to unpack notification list: {error}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return None + + if not unpacked: + return [] + + return list(unpacked[0]) + + def _sync_notifications( + self, + live_notifications: list[dict[str, Any]], + announce_new: bool, + ) -> None: + live_by_id: dict[int, dict[str, Any]] = {} + ordered_ids: list[int] = [] + + for notification in live_notifications: + parsed = self._parse_notification(notification) + notification_id = parsed.get("notification_id") + if notification_id is None: + continue + live_by_id[notification_id] = parsed + ordered_ids.append(notification_id) + + self._get_presenter().sync_live_notifications( + self.SOURCE_NAME, + live_by_id, + self._generation, + ) + + if announce_new: + for notification_id in ordered_ids: + if notification_id in self._known_ids: + continue + self._announce_notification(live_by_id[notification_id]) + + self._known_ids = set(live_by_id) + + def _announce_notification(self, notification: dict[str, Any]) -> None: + message = notification.get("message") or "" + if not message: + return + + self._get_presenter().save_notification( + message, + source=self.SOURCE_NAME, + source_generation=self._generation, + notification_id=notification.get("notification_id"), + live=True, + actions=notification.get("actions"), + app_name=notification.get("app_name", ""), + summary=notification.get("summary", ""), + body=notification.get("body", ""), + urgency=notification.get("urgency", -1), + desktop_entry=notification.get("desktop_entry", ""), + ) + + script = cthulhu_state.activeScript or self._app.getScriptManager().get_default_script() + if script is None: + return + + voice = script.speechGenerator.voice(string=message) + script.speakMessage(message, voice=voice) + script.displayBrailleMessage(message, flashTime=settings.brailleFlashTime) + + def _parse_notification(self, notification: dict[str, Any]) -> dict[str, Any]: + actions = notification.get("actions") or {} + if not isinstance(actions, dict): + actions = {} + + app_name = self._normalize_text(notification.get("app-name", "")) + summary = self._normalize_text(notification.get("summary", "")) + body = self._normalize_text(notification.get("body", "")) + desktop_entry = self._normalize_text(notification.get("desktop-entry", "")) + urgency = notification.get("urgency", -1) + notification_id = notification.get("id") + + return { + "notification_id": notification_id if isinstance(notification_id, int) else None, + "actions": { + str(key): self._normalize_text(value) or str(key) + for key, value in actions.items() + }, + "app_name": app_name, + "summary": summary, + "body": body, + "urgency": urgency if isinstance(urgency, int) else -1, + "desktop_entry": desktop_entry, + "message": self._build_message(app_name, summary, body, desktop_entry), + } + + def _build_message( + self, + app_name: str, + summary: str, + body: str, + desktop_entry: str, + ) -> str: + parts = [messages.NOTIFICATION] + if summary: + parts.append(summary) + if body and body != summary: + parts.append(body) + if len(parts) == 1: + fallback = app_name or desktop_entry + if fallback: + parts.append(fallback) + return " ".join(part for part in parts if part).strip() + + def _normalize_text(self, text: Any) -> str: + if not isinstance(text, str): + return "" + + text = html.unescape(text) + text = self._tag_pattern.sub(" ", text) + text = self._whitespace_pattern.sub(" ", text) + return text.strip() diff --git a/src/cthulhu/meson.build b/src/cthulhu/meson.build index 3b9456a..e9adcfe 100644 --- a/src/cthulhu/meson.build +++ b/src/cthulhu/meson.build @@ -60,6 +60,7 @@ cthulhu_python_sources = files([ 'learn_mode_presenter.py', 'liveregions.py', 'logger.py', + 'mako_notification_monitor.py', 'mathsymbols.py', 'messages.py', 'mouse_review.py', diff --git a/src/cthulhu/messages.py b/src/cthulhu/messages.py index 1454c5f..8493477 100644 --- a/src/cthulhu/messages.py +++ b/src/cthulhu/messages.py @@ -2011,6 +2011,21 @@ NOTIFICATION_LIST_BOTTOM = C_("notification", "Bottom") # list of notifications is reached. NOTIFICATION_LIST_TOP = C_("notification", "Top") +# Translators: This message is presented when a live notification can no longer +# be controlled because it has already been closed or replaced. +NOTIFICATION_UNAVAILABLE = _("Notification is no longer available") + +# Translators: This message is presented when the selected live notification +# does not expose any actions. +NOTIFICATION_NO_ACTIONS = _("No notification actions available") + +# Translators: This message is presented after dismissing a live notification. +NOTIFICATION_DISMISSED = _("Notification dismissed") + +# Translators: This message is presented after invoking an action on a live +# notification. +NOTIFICATION_ACTION_INVOKED = _("Notification action invoked") + # Translators: This message is presented to the user when the notifications list # is empty. NOTIFICATION_NO_MESSAGES = _("No notification messages") diff --git a/src/cthulhu/notification_presenter.py b/src/cthulhu/notification_presenter.py index 0228511..59f74ac 100644 --- a/src/cthulhu/notification_presenter.py +++ b/src/cthulhu/notification_presenter.py @@ -23,24 +23,18 @@ # Forked from Orca screen reader. # Cthulhu project: https://git.stormux.org/storm/cthulhu -"""Module for notification messages""" - -__id__ = "$Id$" -__version__ = "$Revision$" -__date__ = "$Date$" -__copyright__ = "Copyright (c) 2023 Igalia, S.L." \ - "Copyright (c) 2010 Informal Informatica LTDA." -__license__ = "LGPL" +"""Module for notification messages.""" +from dataclasses import dataclass, field import time from typing import Optional, Dict, List, Any, Callable import gi -gi.require_version('Gtk', '3.0') + +gi.require_version("Gtk", "3.0") from gi.repository import GObject from gi.repository import Gtk from gi.repository import Gdk -from gi.repository import Gdk from . import cmdnames from . import debug @@ -48,13 +42,32 @@ from . import guilabels from . import input_event from . import keybindings from . import messages -from . import cthulhu_state + + +@dataclass +class NotificationEntry: + """Represents a notification saved in Cthulhu's history.""" + + message: str + timestamp: float + source: str = "generic" + source_generation: int = 0 + notification_id: Optional[int] = None + live: bool = False + actions: Dict[str, str] = field(default_factory=dict) + app_name: str = "" + summary: str = "" + body: str = "" + urgency: int = -1 + desktop_entry: str = "" + class NotificationPresenter: """Provides access to the notification history.""" def __init__(self) -> None: - self._gui: Optional[Any] = None # Optional[Gtk.Window] + self._gui: Optional[Any] = None + self._mako_monitor: Optional[Any] = None self._handlers: Dict[str, Callable] = self._setup_handlers() self._bindings: keybindings.KeyBindings = self._setup_bindings() self._max_size: int = 55 @@ -63,7 +76,7 @@ class NotificationPresenter: # the list. The current index is relative to, and used directly, with the # python list, i.e. self._notifications[-3] would return the third-to-last # notification message. - self._notifications: List[List[Any]] = [] # List of [message: str, time: float] + self._notifications: List[NotificationEntry] = [] self._current_index: int = -1 def get_bindings(self) -> keybindings.KeyBindings: @@ -76,14 +89,47 @@ class NotificationPresenter: return self._handlers - def save_notification(self, message: str) -> None: + def set_mako_monitor(self, monitor: Any) -> None: + """Associates the mako D-Bus monitor with this presenter.""" + + self._mako_monitor = monitor + + def save_notification( + self, + message: str, + source: str = "generic", + source_generation: int = 0, + notification_id: Optional[int] = None, + live: bool = False, + actions: Optional[Dict[str, str]] = None, + app_name: str = "", + summary: str = "", + body: str = "", + urgency: int = -1, + desktop_entry: str = "", + ) -> NotificationEntry: """Adds message to the list of notification messages.""" tokens = ["NOTIFICATION PRESENTER: Adding '", message, "'."] debug.printTokens(debug.LEVEL_INFO, tokens, True) to_remove = max(len(self._notifications) - self._max_size + 1, 0) self._notifications = self._notifications[to_remove:] - self._notifications.append([message, time.time()]) + entry = NotificationEntry( + message=message, + timestamp=time.time(), + source=source, + source_generation=source_generation, + notification_id=notification_id, + live=live, + actions=dict(actions or {}), + app_name=app_name, + summary=summary, + body=body, + urgency=urgency, + desktop_entry=desktop_entry, + ) + self._notifications.append(entry) + return entry def clear_list(self) -> None: """Clears the notifications list.""" @@ -93,30 +139,117 @@ class NotificationPresenter: self._notifications = [] self._current_index = -1 + def refresh_live_notifications(self) -> bool: + """Refreshes live mako state without announcing new notifications.""" + + if self._mako_monitor is None: + return False + + return self._mako_monitor.refresh(announce_new=False) + + def sync_live_notifications( + self, + source: str, + live_notifications: Dict[int, Dict[str, Any]], + source_generation: int, + ) -> None: + """Synchronizes current live notification metadata with history.""" + + live_ids = set(live_notifications) + for entry in self._notifications: + if entry.source != source or entry.source_generation != source_generation: + continue + + if entry.notification_id in live_ids: + metadata = live_notifications[entry.notification_id] + entry.live = True + entry.message = metadata.get("message", entry.message) + entry.actions = dict(metadata.get("actions") or {}) + entry.app_name = metadata.get("app_name", entry.app_name) + entry.summary = metadata.get("summary", entry.summary) + entry.body = metadata.get("body", entry.body) + entry.urgency = metadata.get("urgency", entry.urgency) + entry.desktop_entry = metadata.get("desktop_entry", entry.desktop_entry) + elif entry.live: + entry.live = False + + def mark_source_unavailable(self, source: str) -> None: + """Marks all live notifications from source as no longer live.""" + + for entry in self._notifications: + if entry.source == source and entry.live: + entry.live = False + + def can_control_entry(self, entry: Optional[NotificationEntry]) -> bool: + """Returns True if entry can still be controlled through mako.""" + + if entry is None or not entry.live or entry.notification_id is None: + return False + + if self._mako_monitor is None: + return False + + return self._mako_monitor.is_current_entry(entry) + + def get_actions_for_entry(self, entry: Optional[NotificationEntry]) -> Dict[str, str]: + """Returns the live action map for entry, if it is controllable.""" + + if not self.can_control_entry(entry): + return {} + + return dict(entry.actions) + + def dismiss_entry(self, script: Any, entry: Optional[NotificationEntry]) -> bool: + """Dismisses the live notification represented by entry.""" + + if not self.can_control_entry(entry) or self._mako_monitor is None: + return False + + result = self._mako_monitor.dismiss_notification(entry.notification_id) + if result: + script.presentMessage(messages.NOTIFICATION_DISMISSED) + return result + + def invoke_action_for_entry( + self, + script: Any, + entry: Optional[NotificationEntry], + action_key: str, + ) -> bool: + """Invokes action_key for the live notification represented by entry.""" + + if not self.can_control_entry(entry) or self._mako_monitor is None: + return False + + result = self._mako_monitor.invoke_action(entry.notification_id, action_key) + if result: + script.presentMessage(messages.NOTIFICATION_ACTION_INVOKED) + return result + def _setup_handlers(self) -> Dict[str, Callable]: """Sets up and returns the notification-presenter input event handlers.""" handlers = {} - handlers["present_last_notification"] = \ - input_event.InputEventHandler( - self._present_last_notification, - cmdnames.NOTIFICATION_MESSAGES_LAST) + handlers["present_last_notification"] = input_event.InputEventHandler( + self._present_last_notification, + cmdnames.NOTIFICATION_MESSAGES_LAST, + ) - handlers["present_next_notification"] = \ - input_event.InputEventHandler( - self._present_next_notification, - cmdnames.NOTIFICATION_MESSAGES_NEXT) + handlers["present_next_notification"] = input_event.InputEventHandler( + self._present_next_notification, + cmdnames.NOTIFICATION_MESSAGES_NEXT, + ) - handlers["present_previous_notification"] = \ - input_event.InputEventHandler( - self._present_previous_notification, - cmdnames.NOTIFICATION_MESSAGES_PREVIOUS) + handlers["present_previous_notification"] = input_event.InputEventHandler( + self._present_previous_notification, + cmdnames.NOTIFICATION_MESSAGES_PREVIOUS, + ) - handlers["show_notification_list"] = \ - input_event.InputEventHandler( - self._show_notification_list, - cmdnames.NOTIFICATION_MESSAGES_LIST) + handlers["show_notification_list"] = input_event.InputEventHandler( + self._show_notification_list, + cmdnames.NOTIFICATION_MESSAGES_LIST, + ) return handlers @@ -130,28 +263,36 @@ class NotificationPresenter: "n", keybindings.defaultModifierMask, keybindings.CTHULHU_MODIFIER_MASK, - self._handlers.get("present_last_notification"))) + self._handlers.get("present_last_notification"), + ) + ) bindings.add( keybindings.KeyBinding( "", keybindings.defaultModifierMask, keybindings.NO_MODIFIER_MASK, - self._handlers.get("present_next_notification"))) + self._handlers.get("present_next_notification"), + ) + ) bindings.add( keybindings.KeyBinding( "", keybindings.defaultModifierMask, keybindings.NO_MODIFIER_MASK, - self._handlers.get("present_previous_notification"))) + self._handlers.get("present_previous_notification"), + ) + ) bindings.add( keybindings.KeyBinding( "n", keybindings.defaultModifierMask, keybindings.CTHULHU_CTRL_MODIFIER_MASK, - self._handlers.get("show_notification_list"))) + self._handlers.get("show_notification_list"), + ) + ) return bindings @@ -171,6 +312,9 @@ class NotificationPresenter: days = round(diff / 86400) return messages.daysAgo(days) + def _entry_to_string(self, entry: NotificationEntry) -> str: + return f"{entry.message} {self._timestamp_to_string(entry.timestamp)}" + def _present_last_notification(self, script, event=None): """Presents the last notification.""" @@ -181,9 +325,8 @@ class NotificationPresenter: msg = "NOTIFICATION PRESENTER: Presenting last notification." debug.printMessage(debug.LEVEL_INFO, msg, True) - message, timestamp = self._notifications[-1] - string = f"{message} {self._timestamp_to_string(timestamp)}" - script.presentMessage(string) + entry = self._notifications[-1] + script.presentMessage(self._entry_to_string(entry)) self._current_index = -1 return True @@ -200,23 +343,21 @@ class NotificationPresenter: ) debug.printMessage(debug.LEVEL_INFO, msg, True) - # This is the first (oldest) message in the list. - if self._current_index == 0 : + if self._current_index == 0: script.presentMessage(messages.NOTIFICATION_LIST_TOP) - message, timestamp = self._notifications[self._current_index] + entry = self._notifications[self._current_index] else: try: index = self._current_index - 1 - message, timestamp = self._notifications[index] + entry = self._notifications[index] self._current_index -= 1 except IndexError: msg = "NOTIFICATION PRESENTER: Handling IndexError exception." debug.printMessage(debug.LEVEL_INFO, msg, True) script.presentMessage(messages.NOTIFICATION_LIST_TOP) - message, timestamp = self._notifications[self._current_index] + entry = self._notifications[self._current_index] - string = f"{message} {self._timestamp_to_string(timestamp)}" - script.presentMessage(string) + script.presentMessage(self._entry_to_string(entry)) return True def _present_next_notification(self, script, event=None): @@ -232,28 +373,27 @@ class NotificationPresenter: ) debug.printMessage(debug.LEVEL_INFO, msg, True) - # This is the last (newest) message in the list. if self._current_index == -1: script.presentMessage(messages.NOTIFICATION_LIST_BOTTOM) - message, timestamp = self._notifications[self._current_index] + entry = self._notifications[self._current_index] else: try: index = self._current_index + 1 - message, timestamp = self._notifications[index] + entry = self._notifications[index] self._current_index += 1 except IndexError: msg = "NOTIFICATION PRESENTER: Handling IndexError exception." debug.printMessage(debug.LEVEL_INFO, msg, True) script.presentMessage(messages.NOTIFICATION_LIST_BOTTOM) - message, timestamp = self._notifications[self._current_index] + entry = self._notifications[self._current_index] - string = f"{message} {self._timestamp_to_string(timestamp)}" - script.presentMessage(string) + script.presentMessage(self._entry_to_string(entry)) return True def _show_notification_list(self, script, event=None): """Opens a dialog with a list of the notifications.""" + self.refresh_live_notifications() if not self._notifications: script.presentMessage(messages.NOTIFICATION_NO_MESSAGES) return True @@ -261,39 +401,54 @@ class NotificationPresenter: msg = "NOTIFICATION PRESENTER: Showing notification list." debug.printMessage(debug.LEVEL_INFO, msg, True) - rows = [(message, self._timestamp_to_string(timestamp)) \ - for message, timestamp in reversed(self._notifications)] + entries = list(reversed(self._notifications)) title = guilabels.notifications_count(len(self._notifications)) - column_headers = [guilabels.NOTIFICATIONS_COLUMN_HEADER, - guilabels.NOTIFICATIONS_RECEIVED_TIME] - self._gui = NotificationListGUI(script, title, column_headers, rows) + column_headers = [ + guilabels.NOTIFICATIONS_COLUMN_HEADER, + guilabels.NOTIFICATIONS_RECEIVED_TIME, + ] + self._gui = NotificationListGUI(self, script, title, column_headers, entries) self._gui.show_gui() return True def on_dialog_destroyed(self): - """Handler for the 'destroyed' signal of the dialog.""" + """Handler for the dialog being destroyed.""" self._gui = None + class NotificationListGUI: """The dialog containing the notifications list.""" RESPONSE_COPY = 1 + RESPONSE_ACTIONS = 2 + RESPONSE_DISMISS = 3 - def __init__(self, script, title, column_headers, rows): + def __init__(self, presenter, script, title, column_headers, entries): + self._presenter = presenter self._script = script self._model = None self._tree = None - self._gui = self._create_dialog(title, column_headers, rows) + self._selection = None + self._dismiss_button = None + self._actions_button = None + self._gui = self._create_dialog(title, column_headers, entries) - def _create_dialog(self, title, column_headers, rows): - dialog = Gtk.Dialog(title, - None, - Gtk.DialogFlags.MODAL, - (Gtk.STOCK_COPY, self.RESPONSE_COPY, - Gtk.STOCK_CLEAR, Gtk.ResponseType.APPLY, - Gtk.STOCK_CLOSE, Gtk.ResponseType.CLOSE)) + def _create_dialog(self, title, column_headers, entries): + dialog = Gtk.Dialog(title, None, Gtk.DialogFlags.MODAL) dialog.set_default_size(600, 400) + dialog.add_button(Gtk.STOCK_COPY, self.RESPONSE_COPY) + self._actions_button = dialog.add_button( + guilabels.NOTIFICATIONS_ACTIONS_BUTTON, + self.RESPONSE_ACTIONS, + ) + self._dismiss_button = dialog.add_button( + guilabels.NOTIFICATIONS_DISMISS_BUTTON, + self.RESPONSE_DISMISS, + ) + dialog.add_button(Gtk.STOCK_CLEAR, Gtk.ResponseType.APPLY) + dialog.add_button(Gtk.STOCK_CLOSE, Gtk.ResponseType.CLOSE) + dialog.set_default_response(Gtk.ResponseType.CLOSE) grid = Gtk.Grid() content_area = dialog.get_content_area() @@ -305,9 +460,12 @@ class NotificationListGUI: tree = Gtk.TreeView() tree.set_hexpand(True) tree.set_vexpand(True) + tree_accessible = tree.get_accessible() + if tree_accessible: + tree_accessible.set_name(title) scrolled_window.add(tree) - cols = len(column_headers) * [GObject.TYPE_STRING] + cols = (GObject.TYPE_STRING, GObject.TYPE_STRING, GObject.TYPE_PYOBJECT) for i, header in enumerate(column_headers): cell = Gtk.CellRendererText() column = Gtk.TreeViewColumn(header, cell, text=i) @@ -316,18 +474,27 @@ class NotificationListGUI: column.set_sort_column_id(i) self._model = Gtk.ListStore(*cols) - for row in rows: + for entry in entries: row_iter = self._model.append(None) - for i, cell in enumerate(row): - self._model.set_value(row_iter, i, cell) + self._model.set_value(row_iter, 0, entry.message) + self._model.set_value( + row_iter, + 1, + self._presenter._timestamp_to_string(entry.timestamp), + ) + self._model.set_value(row_iter, 2, entry) tree.set_model(self._model) selection = tree.get_selection() selection.set_mode(Gtk.SelectionMode.SINGLE) + selection.connect("changed", self._on_selection_changed) if self._model.iter_n_children(None) > 0: selection.select_path(0) + self._selection = selection self._tree = tree dialog.connect("response", self.on_response) + dialog.connect("destroy", self._on_destroy) + self._update_action_buttons() return dialog def on_response(self, dialog, response): @@ -341,9 +508,17 @@ class NotificationListGUI: self._copy_selected_notification() return + if response == self.RESPONSE_ACTIONS: + self._show_selected_actions() + return + + if response == self.RESPONSE_DISMISS: + self._dismiss_selected_notification() + return + if response == Gtk.ResponseType.APPLY and self._model is not None: self._model.clear() - getPresenter().clear_list() + self._presenter.clear_list() self._script.presentMessage(messages.NOTIFICATION_NO_MESSAGES) time.sleep(1) self._gui.destroy() @@ -357,40 +532,236 @@ class NotificationListGUI: time_stamp = Gdk.CURRENT_TIME self._gui.present_with_time(int(time_stamp)) - def _copy_selected_notification(self): - if self._model is None or self._tree is None: - return + def _on_destroy(self, widget): + self._presenter.on_dialog_destroyed() - selection = self._tree.get_selection() - model, paths = selection.get_selected_rows() + def _on_selection_changed(self, selection): + self._presenter.refresh_live_notifications() + self._update_action_buttons() + + def _get_selected_entry(self) -> Optional[NotificationEntry]: + if self._selection is None or self._model is None: + return None + + model, paths = self._selection.get_selected_rows() if not paths and self._model.iter_n_children(None) > 0: - selection.select_path(0) - model, paths = selection.get_selected_rows() + self._selection.select_path(0) + model, paths = self._selection.get_selected_rows() if not paths: - return + return None iter_for_path = model.get_iter(paths[0]) if iter_for_path is None: + return None + + return model.get_value(iter_for_path, 2) + + def _update_action_buttons(self) -> None: + entry = self._get_selected_entry() + can_control = self._presenter.can_control_entry(entry) + has_actions = bool(self._presenter.get_actions_for_entry(entry)) + + if self._dismiss_button is not None: + self._dismiss_button.set_sensitive(can_control) + if self._actions_button is not None: + self._actions_button.set_sensitive(can_control and has_actions) + + def _copy_selected_notification(self): + entry = self._get_selected_entry() + if entry is None: return - message = model.get_value(iter_for_path, 0) - timestamp = model.get_value(iter_for_path, 1) + timestamp = self._presenter._timestamp_to_string(entry.timestamp) if timestamp: - text = f"{message}\t{timestamp}" + text = f"{entry.message}\t{timestamp}" else: - text = f"{message}" + text = f"{entry.message}" clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) clipboard.set_text(text, -1) clipboard.store() self._script.presentMessage(messages.CLIPBOARD_COPIED_FULL) + def _dismiss_selected_notification(self) -> None: + self._presenter.refresh_live_notifications() + entry = self._get_selected_entry() + if not self._presenter.can_control_entry(entry): + self._script.presentMessage(messages.NOTIFICATION_UNAVAILABLE) + self._update_action_buttons() + return + + if not self._presenter.dismiss_entry(self._script, entry): + self._script.presentMessage(messages.NOTIFICATION_UNAVAILABLE) + + self._update_action_buttons() + + def _show_selected_actions(self) -> None: + self._presenter.refresh_live_notifications() + entry = self._get_selected_entry() + actions = self._presenter.get_actions_for_entry(entry) + if not self._presenter.can_control_entry(entry): + self._script.presentMessage(messages.NOTIFICATION_UNAVAILABLE) + self._update_action_buttons() + return + + if not actions: + self._script.presentMessage(messages.NOTIFICATION_NO_ACTIONS) + self._update_action_buttons() + return + + dialog = NotificationActionsGUI( + self._gui, + self._script, + self._presenter, + entry, + actions, + ) + dialog.show_gui() + + +class NotificationActionsGUI: + """Dialog listing live mako actions for a notification.""" + + RESPONSE_INVOKE = 1 + + def __init__( + self, + parent: Gtk.Dialog, + script: Any, + presenter: NotificationPresenter, + entry: NotificationEntry, + actions: Dict[str, str], + ) -> None: + self._parent = parent + self._script = script + self._presenter = presenter + self._entry = entry + self._actions = dict(actions) + self._list_box = None + self._invoke_button = None + self._dialog = self._create_dialog() + + def _create_dialog(self): + dialog = Gtk.Dialog( + guilabels.NOTIFICATIONS_ACTIONS_TITLE, + self._parent, + Gtk.DialogFlags.MODAL | Gtk.DialogFlags.DESTROY_WITH_PARENT, + ) + dialog.set_default_size(420, 260) + self._invoke_button = dialog.add_button( + guilabels.NOTIFICATIONS_INVOKE_ACTION_BUTTON, + self.RESPONSE_INVOKE, + ) + dialog.add_button(Gtk.STOCK_CLOSE, Gtk.ResponseType.CLOSE) + + content_area = dialog.get_content_area() + scrolled_window = Gtk.ScrolledWindow() + scrolled_window.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) + content_area.add(scrolled_window) + + self._list_box = Gtk.ListBox() + self._list_box.set_selection_mode(Gtk.SelectionMode.SINGLE) + self._list_box.connect("row-activated", self._on_row_activated) + self._list_box.connect("selected-rows-changed", self._on_selection_changed) + list_accessible = self._list_box.get_accessible() + if list_accessible: + list_accessible.set_name(guilabels.NOTIFICATIONS_ACTIONS_TITLE) + scrolled_window.add(self._list_box) + + for action_key, label_text in self._actions.items(): + row = Gtk.ListBoxRow() + row._action_key = action_key # type: ignore[attr-defined] + label = Gtk.Label(label=label_text or action_key, xalign=0) + label.set_margin_start(10) + label.set_margin_end(10) + label.set_margin_top(6) + label.set_margin_bottom(6) + row.add(label) + self._list_box.add(row) + + first_row = self._list_box.get_row_at_index(0) + if first_row is not None: + self._list_box.select_row(first_row) + + dialog.connect("response", self._on_response) + self._update_invoke_button() + return dialog + + def show_gui(self) -> None: + self._dialog.show_all() + time_stamp = Gtk.get_current_event_time() + if not time_stamp or time_stamp > 0xFFFFFFFF: + time_stamp = Gdk.CURRENT_TIME + self._dialog.present_with_time(int(time_stamp)) + + def _on_response(self, dialog, response) -> None: + if response == Gtk.ResponseType.CLOSE: + self._dialog.destroy() + return + + if response == self.RESPONSE_INVOKE: + self._invoke_selected_action() + + def _on_row_activated(self, list_box, row) -> None: + self._invoke_selected_action() + + def _on_selection_changed(self, list_box) -> None: + self._update_invoke_button() + + def _get_selected_action_key(self) -> Optional[str]: + if self._list_box is None: + return None + + row = self._list_box.get_selected_row() + if row is None: + return None + + return getattr(row, "_action_key", None) + + def _update_invoke_button(self) -> None: + if self._invoke_button is None: + return + + self._invoke_button.set_sensitive(self._get_selected_action_key() is not None) + + def _invoke_selected_action(self) -> None: + self._presenter.refresh_live_notifications() + action_key = self._get_selected_action_key() + if not action_key: + return + + if not self._presenter.can_control_entry(self._entry): + self._script.presentMessage(messages.NOTIFICATION_UNAVAILABLE) + self._dialog.destroy() + return + + if not self._presenter.invoke_action_for_entry(self._script, self._entry, action_key): + self._script.presentMessage(messages.NOTIFICATION_UNAVAILABLE) + + self._dialog.destroy() + + _presenter = None + + def getPresenter(): - """Returns the Notification Presenter""" - + """Returns the Notification Presenter.""" + global _presenter if _presenter is None: _presenter = NotificationPresenter() + + if _presenter._mako_monitor is None: + try: + from . import cthulhu + + app = getattr(cthulhu, "cthulhuApp", None) + if app is not None: + monitor = getattr(app, "makoNotificationMonitor", None) + if monitor is not None: + _presenter.set_mako_monitor(monitor) + except Exception: + pass + return _presenter diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..eada025 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,65 @@ +import importlib.util +import sys +import types +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_ROOT = REPO_ROOT / "src" +BUILD_PACKAGE_ROOT = REPO_ROOT / "_build" / "src" / "cthulhu" + + +def _load_generated_module(module_name): + module_path = BUILD_PACKAGE_ROOT / f"{module_name}.py" + full_name = f"cthulhu.{module_name}" + + if module_path.exists(): + spec = importlib.util.spec_from_file_location(full_name, module_path) + module = importlib.util.module_from_spec(spec) + sys.modules[full_name] = module + spec.loader.exec_module(module) + return module + + module = types.ModuleType(full_name) + if module_name == "cthulhu_i18n": + module._ = lambda text: text + module.ngettext = lambda singular, plural, count: singular if count == 1 else plural + module.cgettext = lambda text: text + module.C_ = lambda _context, text: text + module.localedir = str(REPO_ROOT / "po") + module.setModuleLocale = lambda *_args, **_kwargs: None + module.setLocaleForMessages = lambda *_args, **_kwargs: None + module.setLocaleForNames = lambda *_args, **_kwargs: None + module.setLocaleForGUI = lambda *_args, **_kwargs: None + return module + + if module_name == "cthulhu_platform": + from cthulhu import cthulhuVersion + + module.version = ( + f"Cthulhu screen reader version " + f"{cthulhuVersion.version}-{cthulhuVersion.codeName}" + ) + module.revision = "" + module.prefix = str(REPO_ROOT) + module.package = "cthulhu" + module.datadir = str(REPO_ROOT) + module.tablesdir = "/usr/share/liblouis/tables" + return module + + raise ImportError(f"Unsupported generated module: {module_name}") + + +if str(SRC_ROOT) not in sys.path: + sys.path.insert(0, str(SRC_ROOT)) + +import cthulhu # noqa: E402 + + +for generated_module in ("cthulhu_i18n", "cthulhu_platform"): + if f"cthulhu.{generated_module}" in sys.modules: + continue + + loaded_module = _load_generated_module(generated_module) + sys.modules[f"cthulhu.{generated_module}"] = loaded_module + setattr(cthulhu, generated_module, loaded_module) diff --git a/tests/test_mako_notification_monitor.py b/tests/test_mako_notification_monitor.py new file mode 100644 index 0000000..bac570f --- /dev/null +++ b/tests/test_mako_notification_monitor.py @@ -0,0 +1,160 @@ +import sys +import unittest +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import notification_presenter +from cthulhu.mako_notification_monitor import MakoNotificationMonitor + + +class _FakeScript: + def __init__(self): + self.speechGenerator = mock.Mock() + self.speechGenerator.voice.return_value = object() + self.speakMessage = mock.Mock() + self.displayBrailleMessage = mock.Mock() + + +class _FakeScriptManager: + def __init__(self, script): + self._script = script + + def get_default_script(self): + return self._script + + +class _FakeApp: + def __init__(self, script): + self._script_manager = _FakeScriptManager(script) + + def getScriptManager(self): + return self._script_manager + + +class MakoNotificationMonitorTests(unittest.TestCase): + def setUp(self): + self.presenter = notification_presenter.NotificationPresenter() + self.script = _FakeScript() + self.app = _FakeApp(self.script) + + presenter_patcher = mock.patch( + "cthulhu.mako_notification_monitor.notification_presenter.getPresenter", + return_value=self.presenter, + ) + self.addCleanup(presenter_patcher.stop) + presenter_patcher.start() + + self.monitor = MakoNotificationMonitor(self.app) + self.monitor._generation = 1 + + def test_initial_seed_does_not_announce_existing_notifications(self): + self.monitor._sync_notifications( + [ + { + "id": 10, + "app-name": "discord", + "summary": "hello", + "body": "world", + "actions": {"default": "View"}, + } + ], + announce_new=False, + ) + + self.assertEqual(self.monitor._known_ids, {10}) + self.assertEqual(self.presenter._notifications, []) + self.script.speakMessage.assert_not_called() + self.script.displayBrailleMessage.assert_not_called() + + def test_new_notification_is_spoken_and_saved(self): + self.monitor._sync_notifications( + [ + { + "id": 10, + "app-name": "discord", + "summary": "old", + "body": "", + "actions": {}, + } + ], + announce_new=False, + ) + + self.monitor._sync_notifications( + [ + { + "id": 10, + "app-name": "discord", + "summary": "old", + "body": "", + "actions": {}, + }, + { + "id": 11, + "app-name": "discord", + "summary": "new summary", + "body": "new body", + "actions": {"default": "View"}, + }, + ], + announce_new=True, + ) + + self.assertEqual(len(self.presenter._notifications), 1) + entry = self.presenter._notifications[0] + self.assertEqual(entry.notification_id, 11) + self.assertTrue(entry.live) + self.assertEqual(entry.source, "mako") + self.assertEqual(entry.actions, {"default": "View"}) + self.assertEqual(entry.message, "Notification new summary new body") + self.script.speakMessage.assert_called_once() + self.script.displayBrailleMessage.assert_called_once() + + def test_removed_notification_is_marked_not_live(self): + self.monitor._sync_notifications( + [ + { + "id": 11, + "app-name": "discord", + "summary": "new summary", + "body": "new body", + "actions": {"default": "View"}, + }, + ], + announce_new=True, + ) + + self.assertTrue(self.presenter._notifications[0].live) + + self.monitor._sync_notifications([], announce_new=False) + + self.assertFalse(self.presenter._notifications[0].live) + self.assertEqual(self.monitor._known_ids, set()) + + def test_parse_notification_strips_markup_and_normalizes_actions(self): + parsed = self.monitor._parse_notification( + { + "id": 22, + "app-name": "Crash Reporting System", + "summary": "Service Crash", + "body": "/usr/bin/python3.14 crashed", + "desktop-entry": "crash-handler", + "urgency": 2, + "actions": {"1": "Details"}, + } + ) + + self.assertEqual(parsed["notification_id"], 22) + self.assertEqual(parsed["summary"], "Service Crash") + self.assertEqual(parsed["body"], "/usr/bin/python3.14 crashed") + self.assertEqual(parsed["actions"], {"1": "Details"}) + self.assertEqual( + parsed["message"], + "Notification Service Crash /usr/bin/python3.14 crashed", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_notification_presenter_mako_regressions.py b/tests/test_notification_presenter_mako_regressions.py new file mode 100644 index 0000000..05f8571 --- /dev/null +++ b/tests/test_notification_presenter_mako_regressions.py @@ -0,0 +1,124 @@ +import sys +import unittest +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import messages +from cthulhu.notification_presenter import NotificationPresenter + + +class _FakeMonitor: + def __init__(self): + self.dismissed = [] + self.invoked = [] + self.current_generation = 2 + + def refresh(self, announce_new=False): + return True + + def is_current_entry(self, entry): + return entry.source == "mako" and entry.source_generation == self.current_generation + + def dismiss_notification(self, notification_id): + self.dismissed.append(notification_id) + return True + + def invoke_action(self, notification_id, action_key): + self.invoked.append((notification_id, action_key)) + return True + + +class NotificationPresenterMakoTests(unittest.TestCase): + def setUp(self): + self.presenter = NotificationPresenter() + self.monitor = _FakeMonitor() + self.presenter.set_mako_monitor(self.monitor) + + def test_sync_updates_current_generation_only(self): + old_entry = self.presenter.save_notification( + "Notification old", + source="mako", + source_generation=1, + notification_id=5, + live=False, + actions={"default": "Old"}, + ) + current_entry = self.presenter.save_notification( + "Notification current", + source="mako", + source_generation=2, + notification_id=6, + live=True, + actions={"default": "View"}, + ) + + self.presenter.sync_live_notifications( + "mako", + { + 5: { + "message": "Notification replaced", + "actions": {"default": "New"}, + "app_name": "discord", + "summary": "replaced", + "body": "", + "urgency": 1, + "desktop_entry": "discord", + } + }, + source_generation=2, + ) + + self.assertEqual(old_entry.message, "Notification old") + self.assertEqual(old_entry.actions, {"default": "Old"}) + self.assertFalse(current_entry.live) + + def test_can_control_entry_and_get_actions_require_current_monitor_generation(self): + entry = self.presenter.save_notification( + "Notification current", + source="mako", + source_generation=2, + notification_id=6, + live=True, + actions={"default": "View"}, + ) + + self.assertTrue(self.presenter.can_control_entry(entry)) + self.assertEqual(self.presenter.get_actions_for_entry(entry), {"default": "View"}) + + stale_entry = self.presenter.save_notification( + "Notification stale", + source="mako", + source_generation=1, + notification_id=7, + live=True, + actions={"default": "Open"}, + ) + + self.assertFalse(self.presenter.can_control_entry(stale_entry)) + self.assertEqual(self.presenter.get_actions_for_entry(stale_entry), {}) + + def test_dismiss_and_invoke_action_route_through_monitor(self): + entry = self.presenter.save_notification( + "Notification current", + source="mako", + source_generation=2, + notification_id=6, + live=True, + actions={"default": "View"}, + ) + script = mock.Mock() + + self.assertTrue(self.presenter.dismiss_entry(script, entry)) + self.assertEqual(self.monitor.dismissed, [6]) + script.presentMessage.assert_called_with(messages.NOTIFICATION_DISMISSED) + + script.reset_mock() + self.assertTrue(self.presenter.invoke_action_for_entry(script, entry, "default")) + self.assertEqual(self.monitor.invoked, [(6, "default")]) + script.presentMessage.assert_called_with(messages.NOTIFICATION_ACTION_INVOKED) + + +if __name__ == "__main__": + unittest.main()