From f4af54228a3a76dae0f59a3e19abd6d660efb63e Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Sat, 4 Apr 2026 19:04:45 -0400 Subject: [PATCH] Tightened up Steam notifications. Fixed a system notifications regression.? --- .../apps/notification-daemon/script.py | 12 ++- .../scripts/apps/steamwebhelper/script.py | 57 +++++++++++-- tests/test_notification_daemon_regressions.py | 58 +++++++++++++ tests/test_steam_notification_regressions.py | 85 +++++++++++++++++++ 4 files changed, 204 insertions(+), 8 deletions(-) create mode 100644 tests/test_notification_daemon_regressions.py create mode 100644 tests/test_steam_notification_regressions.py diff --git a/src/cthulhu/scripts/apps/notification-daemon/script.py b/src/cthulhu/scripts/apps/notification-daemon/script.py index 52861b7..6c98acf 100644 --- a/src/cthulhu/scripts/apps/notification-daemon/script.py +++ b/src/cthulhu/scripts/apps/notification-daemon/script.py @@ -34,6 +34,7 @@ __license__ = "LGPL" import cthulhu.messages as messages import cthulhu.scripts.default as default import cthulhu.settings as settings +from cthulhu.ax_object import AXObject from cthulhu.ax_utilities import AXUtilities @@ -49,8 +50,15 @@ class Script(default.Script): """Callback for window:create accessibility events.""" allLabels = AXUtilities.find_all_labels(event.source) - texts = [self.utilities.displayedText(acc) for acc in allLabels] - text = f"{messages.NOTIFICATION} {' '.join(texts)}" + texts = [] + for acc in allLabels: + text = self.utilities.displayedText(acc) or AXObject.get_name(acc) + if text: + texts.append(text) + + text = messages.NOTIFICATION + if texts: + text = f"{text} {' '.join(texts)}" voice = self.speechGenerator.voice(obj=event.source, string=text) self.speakMessage(text, voice=voice) diff --git a/src/cthulhu/scripts/apps/steamwebhelper/script.py b/src/cthulhu/scripts/apps/steamwebhelper/script.py index c0a46b0..ed8468d 100644 --- a/src/cthulhu/scripts/apps/steamwebhelper/script.py +++ b/src/cthulhu/scripts/apps/steamwebhelper/script.py @@ -203,19 +203,24 @@ class Script(Chromium.Script): if AXUtilities.is_notification(obj) or AXUtilities.is_alert(obj): return obj + def isNotificationRole(candidate): + return AXUtilities.is_notification(candidate) or AXUtilities.is_alert(candidate) + + ancestorNotification = AXObject.find_ancestor(obj, isNotificationRole) + if ancestorNotification: + return ancestorNotification + liveAttr = AXObject.get_attribute(obj, 'live') containerLive = AXObject.get_attribute(obj, 'container-live') if liveAttr in ['assertive', 'polite'] or containerLive in ['assertive', 'polite']: return obj - def isNotificationCandidate(candidate): - if AXUtilities.is_notification(candidate) or AXUtilities.is_alert(candidate): - return True + def isLiveRegionCandidate(candidate): candidateLive = AXObject.get_attribute(candidate, 'live') candidateContainerLive = AXObject.get_attribute(candidate, 'container-live') return candidateLive in ['assertive', 'polite'] or candidateContainerLive in ['assertive', 'polite'] - return AXObject.find_ancestor(obj, isNotificationCandidate) + return AXObject.find_ancestor(obj, isLiveRegionCandidate) def _presentSteamNotification(self, obj): """Speak and save the notification. @@ -363,6 +368,27 @@ class Script(Chromium.Script): return f"{baseText} {timestampText}" return f"{baseText}. {timestampText}" + def _getSteamNotificationIdentity(self, obj): + if obj is None: + return None + + try: + path = AXObject.get_path(obj) + except Exception: + path = None + + if path is not None: + return tuple(path) + + try: + return hash(obj) + except TypeError: + return id(obj) + + def _combineSteamNotificationFragments(self, firstText, secondText): + combined = f"{firstText} {secondText}" + return self._normalizeSteamNotificationText(combined) + def _steamTextContains(self, text, other): textNorm = self._normalizeSteamNotificationText(text).lower() otherNorm = self._normalizeSteamNotificationText(other).lower() @@ -376,6 +402,7 @@ class Script(Chromium.Script): text = self._normalizeSteamNotificationText(text) if not text: return + sourceKey = self._getSteamNotificationIdentity(obj) pending = self._steamPendingNotification if self._isSteamRelativeTimestamp(text): @@ -386,6 +413,7 @@ class Script(Chromium.Script): pending["text"] = self._appendSteamTimestamp(pending["text"], text) if obj: pending["obj"] = obj + pending["sourceKey"] = sourceKey self._resetSteamPendingTimer() return @@ -393,7 +421,8 @@ class Script(Chromium.Script): "text": text, "obj": obj, "timerId": None, - "timestampOnly": True + "timestampOnly": True, + "sourceKey": sourceKey } self._resetSteamPendingTimer() return @@ -405,24 +434,39 @@ class Script(Chromium.Script): pending["timestampOnly"] = False if obj: pending["obj"] = obj + pending["sourceKey"] = sourceKey self._resetSteamPendingTimer() return if text == pendingText: if obj: pending["obj"] = obj + pending["sourceKey"] = sourceKey return if self._steamTextContains(text, pendingText): pending["text"] = text if obj: pending["obj"] = obj + pending["sourceKey"] = sourceKey self._resetSteamPendingTimer() return if self._steamTextContains(pendingText, text): if obj: pending["obj"] = obj + pending["sourceKey"] = sourceKey + return + + # Steam often emits multi-line toasts as separate live-region fragments. + # Keep fragments from the same toast together instead of speaking the first + # line immediately and the complete toast later. + if sourceKey is not None and sourceKey == pending.get("sourceKey"): + pending["text"] = self._combineSteamNotificationFragments(pendingText, text) + if obj: + pending["obj"] = obj + pending["sourceKey"] = sourceKey + self._resetSteamPendingTimer() return self._flushSteamPendingNotification(fromTimer=False) @@ -431,7 +475,8 @@ class Script(Chromium.Script): "text": text, "obj": obj, "timerId": None, - "timestampOnly": False + "timestampOnly": False, + "sourceKey": sourceKey } self._resetSteamPendingTimer() diff --git a/tests/test_notification_daemon_regressions.py b/tests/test_notification_daemon_regressions.py new file mode 100644 index 0000000..3712285 --- /dev/null +++ b/tests/test_notification_daemon_regressions.py @@ -0,0 +1,58 @@ +import importlib +import sys +import unittest +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import messages + +notification_script = importlib.import_module("cthulhu.scripts.apps.notification-daemon.script") + + +class NotificationDaemonTests(unittest.TestCase): + def test_window_created_ignores_empty_labels(self): + testScript = notification_script.Script.__new__(notification_script.Script) + labelEmpty = object() + labelText = object() + eventSource = object() + event = mock.Mock(source=eventSource) + + testScript.utilities = mock.Mock() + testScript.utilities.displayedText.side_effect = lambda obj: { + labelEmpty: None, + labelText: "this is a test", + }.get(obj) + testScript.speechGenerator = mock.Mock() + testScript.speechGenerator.voice.return_value = object() + testScript.speakMessage = mock.Mock() + testScript.displayBrailleMessage = mock.Mock() + testScript.notificationPresenter = mock.Mock() + + with ( + mock.patch.object( + notification_script.AXUtilities, + "find_all_labels", + return_value=[labelEmpty, labelText], + ), + mock.patch.object( + notification_script.AXObject, + "get_name", + side_effect=lambda obj: { + labelEmpty: "", + labelText: "", + }.get(obj, ""), + ), + ): + testScript.onWindowCreated(event) + + expectedText = f"{messages.NOTIFICATION} this is a test" + testScript.speechGenerator.voice.assert_called_once_with(obj=eventSource, string=expectedText) + testScript.speakMessage.assert_called_once_with(expectedText, voice=testScript.speechGenerator.voice.return_value) + testScript.displayBrailleMessage.assert_called_once() + testScript.notificationPresenter.save_notification.assert_called_once_with(expectedText) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_steam_notification_regressions.py b/tests/test_steam_notification_regressions.py new file mode 100644 index 0000000..79691bb --- /dev/null +++ b/tests/test_steam_notification_regressions.py @@ -0,0 +1,85 @@ +import re +import sys +import unittest +from pathlib import Path +from unittest import mock + +import gi + +gi.require_version("Gdk", "3.0") +gi.require_version("Gtk", "3.0") + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +soundGeneratorModule = sys.modules.get("cthulhu.sound_generator") +if soundGeneratorModule is not None and not hasattr(soundGeneratorModule, "SoundGenerator"): + class _StubSoundGenerator: + pass + + soundGeneratorModule.SoundGenerator = _StubSoundGenerator + +from cthulhu.ax_object import AXObject +from cthulhu.ax_utilities import AXUtilities +from cthulhu.scripts.apps.steamwebhelper import script as steam_script + + +class SteamNotificationRootTests(unittest.TestCase): + def test_prefers_notification_ancestor_over_live_region_descendant(self): + testScript = steam_script.Script.__new__(steam_script.Script) + fragment = object() + notification = object() + + def get_attribute(obj, name): + if obj is fragment and name == "container-live": + return "assertive" + return None + + def find_ancestor(obj, predicate): + if obj is fragment and predicate(notification): + return notification + return None + + with ( + mock.patch.object(AXUtilities, "is_notification", side_effect=lambda obj: obj is notification), + mock.patch.object(AXUtilities, "is_alert", return_value=False), + mock.patch.object(AXObject, "get_attribute", side_effect=get_attribute), + mock.patch.object(AXObject, "find_ancestor", side_effect=find_ancestor), + ): + result = testScript._findSteamNotificationRoot(fragment) + + self.assertIs(result, notification) + + +class SteamNotificationQueueTests(unittest.TestCase): + def test_merges_non_overlapping_fragments_for_same_notification(self): + testScript = steam_script.Script.__new__(steam_script.Script) + notification = object() + + testScript._lastSteamNotification = ("", 0.0) + testScript._steamPendingNotification = None + testScript._steamRelativeTimePattern = re.compile( + r"^(?:just now|now|\d+\s+(?:second|minute|hour|day|week|month|year)s?\s+ago)$", + re.IGNORECASE, + ) + testScript._resetSteamPendingTimer = mock.Mock() + testScript._presentSteamNotificationTextNow = mock.Mock() + + testScript._queueSteamNotification("username", notification) + testScript._queueSteamNotification("Playing: Borderlands 2", notification) + + testScript._presentSteamNotificationTextNow.assert_not_called() + self.assertEqual( + testScript._steamPendingNotification["text"], + "username Playing: Game Title", + ) + + testScript._flushSteamPendingNotification(fromTimer=True) + + testScript._presentSteamNotificationTextNow.assert_called_once_with( + "username Playing: Game Title", + notification, + ) + + +if __name__ == "__main__": + unittest.main()