Attempt to improve steam notifications. Added debugging to see what's actually happening when "view notifications" is pressed in the steam client itself.

This commit is contained in:
Storm Dragon
2026-01-09 11:11:11 -05:00
parent 59484782c0
commit eba1ddc419

View File

@@ -28,6 +28,9 @@ __copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import time
import re
from gi.repository import GLib
from cthulhu import debug
from cthulhu import settings
@@ -78,6 +81,12 @@ class Script(Chromium.Script):
# is not the focused application.
self.presentIfInactive = True
self._lastSteamNotification = ("", 0.0)
self._steamPendingNotification = None
self._steamNotificationDelayMs = 500
self._steamRelativeTimePattern = re.compile(
r"^(?:just now|now|\d+\s+(?:second|minute|hour|day|week|month|year)s?\s+ago)$",
re.IGNORECASE
)
def onShowingChanged(self, event):
"""Callback for object:state-changed:showing accessibility events."""
@@ -133,6 +142,18 @@ class Script(Chromium.Script):
super().onTextInserted(event)
def onSelectionChanged(self, event):
"""Callback for object:selection-changed accessibility events."""
self._logSteamNavigationEvent("selection-changed", event)
return super().onSelectionChanged(event)
def onActiveDescendantChanged(self, event):
"""Callback for object:active-descendant-changed accessibility events."""
self._logSteamNavigationEvent("active-descendant-changed", event)
return super().onActiveDescendantChanged(event)
def _isSteamNotification(self, obj):
"""Detect if object is a Steam notification.
@@ -175,6 +196,27 @@ class Script(Chromium.Script):
return False
def _findSteamNotificationRoot(self, obj):
if obj is None:
return None
if AXUtilities.is_notification(obj) or AXUtilities.is_alert(obj):
return obj
liveAttr = AXObject.get_attribute(obj, 'live')
containerLive = AXObject.get_attribute(obj, 'container-live')
if liveAttr in ['assertive', 'polite'] or containerLive in ['assertive', 'polite']:
return obj
def isNotificationCandidate(candidate):
if AXUtilities.is_notification(candidate) or AXUtilities.is_alert(candidate):
return True
candidateLive = AXObject.get_attribute(candidate, 'live')
candidateContainerLive = AXObject.get_attribute(candidate, 'container-live')
return candidateLive in ['assertive', 'polite'] or candidateContainerLive in ['assertive', 'polite']
return AXObject.find_ancestor(obj, isNotificationCandidate)
def _presentSteamNotification(self, obj):
"""Speak and save the notification.
@@ -189,6 +231,47 @@ class Script(Chromium.Script):
self._presentSteamNotificationText(text, obj)
def _logSteamNavigationEvent(self, label, event):
if not event:
return
sourceInfo = self._describeSteamObject(event.source)
anyDataInfo = self._describeSteamEventAnyData(event.any_data)
msg = f"STEAM: {label}: source={sourceInfo}; any_data={anyDataInfo}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _describeSteamEventAnyData(self, anyData):
if isinstance(anyData, str):
text = self._normalizeSteamNotificationText(anyData)
if not text:
return "string('')"
return f"string('{text}')"
if anyData is None:
return "None"
return self._describeSteamObject(anyData)
def _describeSteamObject(self, obj):
if obj is None:
return "None"
name = AXObject.get_name(obj) or ""
description = AXObject.get_description(obj) or ""
roleName = AXObject.get_role_name(obj) or ""
text = self.utilities.displayedText(obj) or ""
path = AXObject.get_path(obj)
name = self._normalizeSteamNotificationText(name)
description = self._normalizeSteamNotificationText(description)
text = self._normalizeSteamNotificationText(text)
return (
f"role='{roleName}' "
f"name='{name}' "
f"description='{description}' "
f"text='{text}' "
f"path={path}"
)
def _presentSteamLiveRegionText(self, event):
if not isinstance(event.any_data, str):
return False
@@ -200,9 +283,16 @@ class Script(Chromium.Script):
if not self._isSteamLiveRegion(event.source):
return False
notificationRoot = self._findSteamNotificationRoot(event.source)
sourceObj = notificationRoot or event.source
if notificationRoot:
fullText = self._getNotificationText(notificationRoot)
if fullText:
text = fullText
msg = f"STEAM: Live region text inserted: {text}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._presentSteamNotificationText(text, event.source)
self._presentSteamNotificationText(text, sourceObj)
return True
def _isSteamLiveRegion(self, obj):
@@ -223,6 +313,18 @@ class Script(Chromium.Script):
return AXObject.find_ancestor(obj, isLiveRegion) is not None
def _presentSteamNotificationText(self, text, obj):
self._queueSteamNotification(text, obj)
def _isDuplicateSteamNotification(self, text):
lastText, lastTime = self._lastSteamNotification
now = time.monotonic()
if text == lastText and (now - lastTime) < 1.0:
return True
self._lastSteamNotification = (text, now)
return False
def _presentSteamNotificationTextNow(self, text, obj):
if self._isDuplicateSteamNotification(text):
msg = "STEAM: Suppressing duplicate notification"
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -243,15 +345,126 @@ class Script(Chromium.Script):
msg = f"STEAM: Presented notification: {text}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _isDuplicateSteamNotification(self, text):
lastText, lastTime = self._lastSteamNotification
now = time.monotonic()
if text == lastText and (now - lastTime) < 1.0:
return True
def _normalizeSteamNotificationText(self, text):
if not text:
return ""
return " ".join(text.split())
self._lastSteamNotification = (text, now)
def _isSteamRelativeTimestamp(self, text):
text = self._normalizeSteamNotificationText(text)
if not text:
return False
return self._steamRelativeTimePattern.match(text) is not None
def _appendSteamTimestamp(self, baseText, timestampText):
if not baseText:
return timestampText
if baseText[-1] in ".!?":
return f"{baseText} {timestampText}"
return f"{baseText}. {timestampText}"
def _steamTextContains(self, text, other):
textNorm = self._normalizeSteamNotificationText(text).lower()
otherNorm = self._normalizeSteamNotificationText(other).lower()
if not textNorm or not otherNorm:
return False
if len(otherNorm) < 4:
return textNorm == otherNorm
return otherNorm in textNorm
def _queueSteamNotification(self, text, obj):
text = self._normalizeSteamNotificationText(text)
if not text:
return
pending = self._steamPendingNotification
if self._isSteamRelativeTimestamp(text):
if pending:
if pending.get("timestampOnly"):
pending["text"] = text
else:
pending["text"] = self._appendSteamTimestamp(pending["text"], text)
if obj:
pending["obj"] = obj
self._resetSteamPendingTimer()
return
self._steamPendingNotification = {
"text": text,
"obj": obj,
"timerId": None,
"timestampOnly": True
}
self._resetSteamPendingTimer()
return
if pending:
pendingText = pending["text"]
if pending.get("timestampOnly"):
pending["text"] = self._appendSteamTimestamp(text, pendingText)
pending["timestampOnly"] = False
if obj:
pending["obj"] = obj
self._resetSteamPendingTimer()
return
if text == pendingText:
if obj:
pending["obj"] = obj
return
if self._steamTextContains(text, pendingText):
pending["text"] = text
if obj:
pending["obj"] = obj
self._resetSteamPendingTimer()
return
if self._steamTextContains(pendingText, text):
if obj:
pending["obj"] = obj
return
self._flushSteamPendingNotification(fromTimer=False)
self._steamPendingNotification = {
"text": text,
"obj": obj,
"timerId": None,
"timestampOnly": False
}
self._resetSteamPendingTimer()
def _resetSteamPendingTimer(self):
pending = self._steamPendingNotification
if not pending:
return
timerId = pending.get("timerId")
if timerId is not None:
GLib.source_remove(timerId)
pending["timerId"] = GLib.timeout_add(
self._steamNotificationDelayMs,
self._onSteamPendingTimeout
)
def _onSteamPendingTimeout(self):
self._flushSteamPendingNotification(fromTimer=True)
return False
def _flushSteamPendingNotification(self, fromTimer):
pending = self._steamPendingNotification
if not pending:
return
timerId = pending.get("timerId")
if not fromTimer and timerId is not None:
GLib.source_remove(timerId)
self._steamPendingNotification = None
if pending.get("timestampOnly"):
msg = "STEAM: Dropping timestamp-only notification"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
self._presentSteamNotificationTextNow(pending["text"], pending["obj"])
def _getNotificationText(self, obj):
"""Extract text from notification element.