diff --git a/src/cthulhu/cthulhu-setup.ui b/src/cthulhu/cthulhu-setup.ui
index 002ade0..295958b 100644
--- a/src/cthulhu/cthulhu-setup.ui
+++ b/src/cthulhu/cthulhu-setup.ui
@@ -1053,6 +1053,51 @@
False
vertical
6
+
+
+
+ False
+ True
+ 0
+
+
True
@@ -1095,7 +1140,7 @@
False
True
- 0
+ 1
@@ -1140,7 +1185,7 @@
False
True
- 1
+ 2
@@ -1151,7 +1196,7 @@
True
False
- Sound Theme
+ Sound
diff --git a/src/cthulhu/cthulhu.py b/src/cthulhu/cthulhu.py
index 70a91aa..d9f9ceb 100644
--- a/src/cthulhu/cthulhu.py
+++ b/src/cthulhu/cthulhu.py
@@ -769,7 +769,7 @@ def shutdown(script: Optional[Any] = None, inputEvent: Optional[Any] = None) ->
cthulhu_state.activeScript.presentationInterrupt()
cthulhuApp.getSignalManager().emitSignal('stop-application-completed')
- sound_theme_manager.getManager().playStopSound(wait=True)
+ sound_theme_manager.getManager().playStopSound(wait=True, timeoutSeconds=1)
cthulhuApp.getPluginSystemManager().unloadAllPlugins(ForceAllPlugins=True)
# Deactivate the event manager first so that it clears its queue and will not
@@ -893,7 +893,7 @@ def main() -> int:
debug.printMessage(debug.LEVEL_INFO, "CTHULHU: Initialized.", True)
script = cthulhu_state.activeScript
- sound_theme_manager.getManager().playStartSound(wait=True)
+ sound_theme_manager.getManager().playStartSound(wait=False)
soundFailureReason = sound.getSoundSystemFailureReason()
if soundFailureReason:
debug.printMessage(
diff --git a/src/cthulhu/cthulhu_gui_prefs.py b/src/cthulhu/cthulhu_gui_prefs.py
index 97eb24f..412a5d1 100644
--- a/src/cthulhu/cthulhu_gui_prefs.py
+++ b/src/cthulhu/cthulhu_gui_prefs.py
@@ -60,6 +60,7 @@ from . import cthulhu_gui_profile
from . import cthulhu_state
from . import settings
from . import settings_manager
+from . import sound_sink
from . import input_event
from . import input_event_manager
from . import keybindings
@@ -195,6 +196,7 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
self.savedPitch = None
self.savedRate = None
self.soundThemeCombo = None
+ self.soundSinkCombo = None
self.roleSoundPresentationCombo = None
self._isInitialSetup = False
self._updatingSpeechFamilies = False
@@ -3102,15 +3104,37 @@ print(json.dumps(result))
self.ocrCopyToClipboardCheckButton.set_active(copyToClipboard)
def _initSoundThemeState(self):
- """Initialize Sound Theme widgets with current settings."""
+ """Initialize sound widgets with current settings."""
prefs = self.prefsDict
# Get widget references
+ self.soundSinkCombo = self.get_widget("soundSinkCombo")
self.soundThemeCombo = self.get_widget("soundThemeCombo")
self.roleSoundPresentationCombo = self.get_widget("roleSoundPresentationCombo")
+ self.soundSinkCombo.set_can_focus(False)
self.soundThemeCombo.set_can_focus(False)
self.roleSoundPresentationCombo.set_can_focus(False)
+ self._soundSinkChoices = [
+ (settings.SOUND_SINK_AUTO, guilabels.SOUND_BACKEND_AUTO),
+ (settings.SOUND_SINK_PIPEWIRE, guilabels.SOUND_BACKEND_PIPEWIRE),
+ (settings.SOUND_SINK_PULSE, guilabels.SOUND_BACKEND_PULSE),
+ (settings.SOUND_SINK_ALSA, guilabels.SOUND_BACKEND_ALSA),
+ ]
+ self.soundSinkCombo.remove_all()
+ for _, label in self._soundSinkChoices:
+ self.soundSinkCombo.append_text(label)
+ runtimeSoundSink = cthulhu.cthulhuApp.settingsManager.getSetting("soundSink")
+ currentSink = sound_sink.normalize_sound_sink_choice(
+ prefs.get("soundSink", runtimeSoundSink if runtimeSoundSink is not None else settings.soundSink)
+ )
+ sinkIndex = 0
+ for index, (value, _) in enumerate(self._soundSinkChoices):
+ if value == currentSink:
+ sinkIndex = index
+ break
+ self.soundSinkCombo.set_active(sinkIndex)
+
# Populate sound theme combo box
themeManager = sound_theme_manager.getManager()
availableThemes = themeManager.getAvailableThemes()
@@ -3159,6 +3183,14 @@ print(json.dumps(result))
if activeText:
self.prefsDict["soundTheme"] = activeText
+ def soundSinkComboChanged(self, widget):
+ """Signal handler for the sound backend combo box."""
+ activeIndex = widget.get_active()
+ if activeIndex < 0:
+ return
+ value = self._soundSinkChoices[activeIndex][0]
+ self.prefsDict["soundSink"] = value
+
def roleSoundPresentationComboChanged(self, widget):
"""Signal handler for the role sound presentation combo box."""
activeIndex = widget.get_active()
@@ -4844,6 +4876,11 @@ print(json.dumps(result))
if not self._isInitialSetup:
self.restoreSettings()
+ if self.soundSinkCombo is not None:
+ activeIndex = self.soundSinkCombo.get_active()
+ if activeIndex >= 0:
+ self.prefsDict["soundSink"] = self._soundSinkChoices[activeIndex][0]
+
enable = self.get_widget("speechSupportCheckButton").get_active()
self.prefsDict["enableSpeech"] = enable
@@ -4855,6 +4892,20 @@ print(json.dumps(result))
if speechServerChoice:
self.prefsDict["speechServerInfo"] = \
speechServerChoice.getInfo()
+ else:
+ activeSpeechInfo = speech.getInfo()
+ if activeSpeechInfo:
+ self.prefsDict["speechServerInfo"] = activeSpeechInfo
+
+ runtimeSpeechServerInfo = cthulhu.cthulhuApp.settingsManager.getSetting(
+ "speechServerInfo"
+ )
+ if runtimeSpeechServerInfo and not self.prefsDict.get("speechServerInfo"):
+ self.prefsDict["speechServerInfo"] = runtimeSpeechServerInfo
+
+ existingSpeechServerInfo = self.prefsDict.get("speechServerInfo")
+ if existingSpeechServerInfo:
+ self.prefsDict["speechServerInfo"] = existingSpeechServerInfo
if self.defaultVoice is not None:
self.prefsDict["voices"] = {
diff --git a/src/cthulhu/guilabels.py b/src/cthulhu/guilabels.py
index dc2fd60..0cb9f1c 100644
--- a/src/cthulhu/guilabels.py
+++ b/src/cthulhu/guilabels.py
@@ -938,9 +938,26 @@ USE_STRUCTURAL_NAVIGATION = _("Enable _structural navigation")
# audio files that Cthulhu plays for various events.
SOUND_THEME = _("Sound _theme:")
+# Translators: This is the label for a combo box in the preferences dialog
+# where users can choose which audio backend Cthulhu should use.
+SOUND_BACKEND = _("Audio _backend:")
+
+# Translators: This is a sound backend option which lets Cthulhu choose a
+# backend automatically.
+SOUND_BACKEND_AUTO = _("Automatic")
+
+# Translators: This is a sound backend option which forces PipeWire.
+SOUND_BACKEND_PIPEWIRE = _("PipeWire")
+
+# Translators: This is a sound backend option which forces PulseAudio.
+SOUND_BACKEND_PULSE = _("PulseAudio")
+
+# Translators: This is a sound backend option which forces ALSA.
+SOUND_BACKEND_ALSA = _("ALSA")
+
# Translators: This is the title of a frame in the preferences dialog
-# containing sound theme options.
-SOUND_THEME_TITLE = _("Sound Theme")
+# containing sound options.
+SOUND_THEME_TITLE = _("Sound")
# Translators: This refers to the amount of information Cthulhu provides about a
# particular object that receives focus.
diff --git a/src/cthulhu/meson.build b/src/cthulhu/meson.build
index e1d13b1..9f1ed5d 100644
--- a/src/cthulhu/meson.build
+++ b/src/cthulhu/meson.build
@@ -86,6 +86,8 @@ cthulhu_python_sources = files([
'signal_manager.py',
'sleep_mode_manager.py',
'sound.py',
+ 'sound_helper.py',
+ 'sound_sink.py',
'sound_generator.py',
'sound_theme_manager.py',
'speech_and_verbosity_manager.py',
diff --git a/src/cthulhu/piper_audio_player.py b/src/cthulhu/piper_audio_player.py
index c0744e4..55ad83f 100644
--- a/src/cthulhu/piper_audio_player.py
+++ b/src/cthulhu/piper_audio_player.py
@@ -41,6 +41,7 @@ else:
_gstreamerAvailable, args = Gst.init_check(None)
from . import debug
+from . import sound_sink
class PiperAudioPlayer:
@@ -84,6 +85,7 @@ class PiperAudioPlayer:
try:
self._pipeline = Gst.Pipeline.new("piper-audio")
+ configuredSink = sound_sink.get_configured_sound_sink()
self._appsrc = Gst.ElementFactory.make("appsrc", "source")
if self._appsrc is None:
@@ -110,9 +112,9 @@ class PiperAudioPlayer:
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
- sink = Gst.ElementFactory.make("autoaudiosink", "sink")
+ sink, sinkName, sinkError = sound_sink.create_audio_sink("sink", configuredSink)
if sink is None:
- msg = 'PIPER AUDIO: Failed to create autoaudiosink element'
+ msg = f'PIPER AUDIO: {sinkError or "Failed to create audio sink"}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
@@ -141,7 +143,10 @@ class PiperAudioPlayer:
bus.connect("message", self._onMessage)
self._initialized = True
- msg = 'PIPER AUDIO: Pipeline initialized successfully'
+ msg = (
+ f'PIPER AUDIO: Pipeline initialized successfully using '
+ f'{sinkName} (soundSink={configuredSink})'
+ )
debug.printMessage(debug.LEVEL_INFO, msg, True)
return True
diff --git a/src/cthulhu/scripts/web/script.py b/src/cthulhu/scripts/web/script.py
index 48fd087..c4bbdac 100644
--- a/src/cthulhu/scripts/web/script.py
+++ b/src/cthulhu/scripts/web/script.py
@@ -1450,10 +1450,36 @@ class Script(default.Script):
return None
+ def _getClickableActivationTarget(self):
+ obj = cthulhu_state.locusOfFocus
+ if self.inFocusMode():
+ return obj
+
+ if not self.utilities.inDocumentContent(obj):
+ return obj
+
+ contextObj, _ = self.utilities.getCaretContext(searchIfNeeded=False)
+ if contextObj and self.utilities.inDocumentContent(contextObj):
+ return contextObj
+
+ return obj
+
+ def _performClickableAction(self, obj):
+ from cthulhu import ax_object
+
+ actionNames = ["click", "click-ancestor", "press", "jump", "open", "activate"]
+ for actionName in actionNames:
+ if not ax_object.AXObject.has_action(obj, actionName):
+ continue
+ if ax_object.AXObject.do_named_action(obj, actionName):
+ return True
+
+ return False
+
def _tryClickableActivation(self, keyboardEvent):
"""Try to activate clickable element - returns True if we should consume the event."""
-
- obj = cthulhu_state.locusOfFocus
+
+ obj = self._getClickableActivationTarget()
if not obj or not self.utilities.inDocumentContent(obj):
return False
@@ -1474,31 +1500,36 @@ class Script(default.Script):
# First try the standard clickable detection
if self.utilities.isClickableElement(obj):
- from cthulhu import ax_event_synthesizer
- # Give immediate feedback that activation is starting
self.presentMessage("Activating...")
+ result = self._performClickableAction(obj)
+ if result:
+ self._presentDelayedMessage("Element activated", 50)
+ return True
+
+ from cthulhu import ax_event_synthesizer
result = ax_event_synthesizer.AXEventSynthesizer.click_object(obj)
if result:
- # Schedule success message after a brief delay
self._presentDelayedMessage("Element activated", 50)
- # Try to restore focus to the clicked element after a brief delay
self._restoreFocusAfterClick(original_focus)
return True
-
+
# If that didn't work, try a more permissive approach for any element with click action
from cthulhu import ax_object
- if ax_object.AXObject.has_action(obj, "click"):
- from cthulhu import ax_event_synthesizer
- # Give immediate feedback that activation is starting
+ if ax_object.AXObject.has_action(obj, "click") \
+ or ax_object.AXObject.has_action(obj, "click-ancestor"):
self.presentMessage("Activating...")
+ result = self._performClickableAction(obj)
+ if result:
+ self._presentDelayedMessage("Element activated", 50)
+ return True
+
+ from cthulhu import ax_event_synthesizer
result = ax_event_synthesizer.AXEventSynthesizer.click_object(obj)
if result:
- # Schedule success message after a brief delay
self._presentDelayedMessage("Element activated", 50)
- # Try to restore focus to the clicked element after a brief delay
self._restoreFocusAfterClick(original_focus)
return True
-
+
return False
def _restoreFocusAfterClick(self, original_focus):
diff --git a/src/cthulhu/scripts/web/script_utilities.py b/src/cthulhu/scripts/web/script_utilities.py
index dc24331..2537a9b 100644
--- a/src/cthulhu/scripts/web/script_utilities.py
+++ b/src/cthulhu/scripts/web/script_utilities.py
@@ -618,8 +618,24 @@ class Utilities(script_utilities.Utilities):
return self.queryNonEmptyText(obj, False) is None
def isHidden(self, obj):
- attrs = self.objectAttributes(obj, False)
- return attrs.get('hidden', False)
+ hiddenValues = {"true", "1", "yes"}
+ hiddenStyles = {"none", "hidden", "collapse"}
+
+ current = obj
+ while current and self.inDocumentContent(current):
+ attrs = self.objectAttributes(current, False)
+ hidden = str(attrs.get("hidden", "")).lower()
+ display = str(attrs.get("display", "")).lower()
+ visibility = str(attrs.get("visibility", "")).lower()
+ if hidden in hiddenValues or display in hiddenStyles or visibility in hiddenStyles:
+ return True
+
+ parent = AXObject.get_parent(current)
+ if parent == current:
+ break
+ current = parent
+
+ return False
def _isOrIsIn(self, child, parent):
if not (child and parent):
@@ -4772,13 +4788,23 @@ class Utilities(script_utilities.Utilities):
startTime = time.time()
rv = None
if AXUtilities.is_focusable(obj):
- tokens = ["WEB: Focusable object can have caret context", obj]
- debug.printTokens(debug.LEVEL_INFO, tokens, True)
- rv = True
+ if self.isHidden(obj):
+ tokens = ["WEB: Hidden object cannot have caret context", obj]
+ debug.printTokens(debug.LEVEL_INFO, tokens, True)
+ rv = False
+ else:
+ tokens = ["WEB: Focusable object can have caret context", obj]
+ debug.printTokens(debug.LEVEL_INFO, tokens, True)
+ rv = True
elif AXUtilities.is_editable(obj):
- tokens = ["WEB: Editable object can have caret context", obj]
- debug.printTokens(debug.LEVEL_INFO, tokens, True)
- rv = True
+ if self.isHidden(obj):
+ tokens = ["WEB: Hidden object cannot have caret context", obj]
+ debug.printTokens(debug.LEVEL_INFO, tokens, True)
+ rv = False
+ else:
+ tokens = ["WEB: Editable object can have caret context", obj]
+ debug.printTokens(debug.LEVEL_INFO, tokens, True)
+ rv = True
elif AXUtilities.is_landmark(obj):
tokens = ["WEB: Landmark can have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
diff --git a/src/cthulhu/settings.py b/src/cthulhu/settings.py
index 2b6b7ed..f3e0265 100644
--- a/src/cthulhu/settings.py
+++ b/src/cthulhu/settings.py
@@ -85,6 +85,7 @@ userCustomizableSettings = [
"brailleLinkIndicator",
"enableSound",
"soundVolume",
+ "soundSink",
"playSoundForRole",
"playSoundForState",
"playSoundForPositionInSet",
@@ -333,9 +334,20 @@ brailleVerbosityLevel = VERBOSITY_LEVEL_VERBOSE
ROLE_SOUND_PRESENTATION_SOUND_AND_SPEECH = "sound_and_speech"
ROLE_SOUND_PRESENTATION_SPEECH_ONLY = "speech_only"
ROLE_SOUND_PRESENTATION_SOUND_ONLY = "sound_only"
+SOUND_SINK_AUTO = "auto"
+SOUND_SINK_PIPEWIRE = "pipewire"
+SOUND_SINK_PULSE = "pulse"
+SOUND_SINK_ALSA = "alsa"
+SOUND_SINK_VALUES = (
+ SOUND_SINK_AUTO,
+ SOUND_SINK_PIPEWIRE,
+ SOUND_SINK_PULSE,
+ SOUND_SINK_ALSA,
+)
enableSound = True
soundVolume = 0.5
+soundSink = SOUND_SINK_AUTO
playSoundForRole = False
playSoundForState = False
playSoundForPositionInSet = False
diff --git a/src/cthulhu/sound.py b/src/cthulhu/sound.py
index a7d0279..4077b2d 100644
--- a/src/cthulhu/sound.py
+++ b/src/cthulhu/sound.py
@@ -31,9 +31,14 @@ __date__ = "$Date:$"
__copyright__ = "Copyright (c) 2016 Cthulhu Team"
__license__ = "LGPL"
+import json
+import os
+import subprocess
+import sys
+import threading
+from typing import Any, Optional, Tuple
+
import gi
-from gi.repository import GLib
-from typing import Optional, Any
try:
gi.require_version('Gst', '1.0')
@@ -41,54 +46,40 @@ try:
except Exception:
_gstreamerAvailable: bool = False
else:
- _gstreamerAvailable, args = Gst.init_check(None)
+ _gstreamerAvailable, _args = Gst.init_check(None)
from . import debug
from . import settings
+from . import sound_sink
from .sound_generator import Icon, Tone
_soundSystemFailureReason: Optional[str] = None
+
+class _PendingResponse:
+ def __init__(self) -> None:
+ self.event = threading.Event()
+ self.response: Optional[dict[str, Any]] = None
+
+
class Player:
- """Plays Icons and Tones."""
+ """Plays Icons and Tones through a persistent worker process."""
def __init__(self) -> None:
- self._initialized: bool = False
- self._source: Optional[Any] = None # Optional[Gst.Element]
- self._volume: Optional[Any] = None # Optional[Gst.Element]
- self._sink: Optional[Any] = None # Optional[Gst.Element]
- self._player: Optional[Any] = None # Optional[Gst.Element]
- self._pipeline: Optional[Any] = None # Optional[Gst.Pipeline]
+ self._initialized = False
+ self._workerProcess: Optional[subprocess.Popen[str]] = None
+ self._workerSink: Optional[str] = None
+ self._workerRestartRequired = False
+ self._workerRestartReason: Optional[str] = None
+ self._workerLock = threading.RLock()
+ self._responseLock = threading.Lock()
+ self._pendingResponses: dict[int, _PendingResponse] = {}
+ self._nextRequestId = 1
if not _gstreamerAvailable:
- msg = 'SOUND ERROR: Gstreamer is not available'
- debug.printMessage(debug.LEVEL_INFO, msg, True)
+ debug.printMessage(debug.LEVEL_INFO, 'SOUND ERROR: Gstreamer is not available', True)
return
- self.init()
-
- def _onPlayerMessage(self, bus: Any, message: Any) -> None: # bus: Gst.Bus, message: Gst.Message
- if message.type == Gst.MessageType.EOS:
- self._player.set_state(Gst.State.NULL)
- elif message.type == Gst.MessageType.ERROR:
- self._player.set_state(Gst.State.NULL)
- error, info = message.parse_error()
- msg = f'SOUND ERROR: {error}'
- debug.printMessage(debug.LEVEL_INFO, msg, True)
-
- def _onPipelineMessage(self, bus: Any, message: Any) -> None: # bus: Gst.Bus, message: Gst.Message
- if message.type == Gst.MessageType.EOS:
- self._pipeline.set_state(Gst.State.NULL)
- elif message.type == Gst.MessageType.ERROR:
- self._pipeline.set_state(Gst.State.NULL)
- error, info = message.parse_error()
- msg = f'SOUND ERROR: {error}'
- debug.printMessage(debug.LEVEL_INFO, msg, True)
-
- def _onTimeout(self, element: Any) -> bool: # element: Gst.Element
- element.set_state(Gst.State.NULL)
- return False
-
@staticmethod
def _get_configured_volume() -> float:
"""Returns the configured sound volume with a safe fallback."""
@@ -103,117 +94,18 @@ class Player:
return max(0.0, float(settings.soundVolume))
- def _playIcon(self, icon: Icon, interrupt: bool = True) -> None:
- """Plays a sound icon, interrupting the current play first unless specified."""
-
- if not self._player:
- return
-
- if interrupt:
- self._player.set_state(Gst.State.NULL)
-
- self._player.set_property('volume', self._get_configured_volume())
- self._player.set_property('uri', f'file://{icon.path}')
- self._player.set_state(Gst.State.PLAYING)
-
- def _playIconAndWait(self, icon: Icon, interrupt: bool = True, timeout_seconds: Optional[int] = 10) -> bool:
- """Plays a sound icon and waits for completion."""
-
- if not self._player:
- return False
-
- if interrupt:
- self._player.set_state(Gst.State.NULL)
-
- self._player.set_property('volume', self._get_configured_volume())
- self._player.set_property('uri', f'file://{icon.path}')
- self._player.set_state(Gst.State.PLAYING)
-
- bus = self._player.get_bus()
- if not bus:
- return False
-
- if timeout_seconds is None:
- timeout_ns = Gst.CLOCK_TIME_NONE
- else:
- timeout_ns = int(timeout_seconds * Gst.SECOND)
-
- message = bus.timed_pop_filtered(
- timeout_ns,
- Gst.MessageType.EOS | Gst.MessageType.ERROR
- )
- if message and message.type == Gst.MessageType.ERROR:
- error, info = message.parse_error()
- msg = f'SOUND ERROR: {error}'
- debug.printMessage(debug.LEVEL_INFO, msg, True)
-
- self._player.set_state(Gst.State.NULL)
- return message is not None and message.type == Gst.MessageType.EOS
-
- def _playTone(self, tone: Tone, interrupt: bool = True) -> None:
- """Plays a tone, interrupting the current play first unless specified."""
-
- if not self._pipeline or not self._source:
- return
-
- if interrupt:
- self._pipeline.set_state(Gst.State.NULL)
-
- self._pipeline.set_state(Gst.State.NULL)
- if self._volume is not None:
- self._volume.set_property('volume', tone.volume)
- self._source.set_property('volume', 1.0)
- else:
- self._source.set_property('volume', tone.volume)
- self._source.set_property('freq', tone.frequency)
- self._source.set_property('wave', tone.wave)
- self._pipeline.set_state(Gst.State.PLAYING)
- duration = int(1000 * tone.duration)
- GLib.timeout_add(duration, self._onTimeout, self._pipeline)
-
def init(self) -> None:
- """(Re)Initializes the Player."""
-
- if self._initialized:
- return
+ """(Re)Initializes the persistent worker."""
if not _gstreamerAvailable:
return
- self._player = Gst.ElementFactory.make('playbin', 'player')
- if self._player is None:
- msg = 'SOUND ERROR: Gstreamer is available, but player is None'
- debug.printMessage(debug.LEVEL_INFO, msg, True)
- return
+ with self._workerLock:
+ if self._ensureWorkerLocked():
+ self._initialized = True
+ clearSoundSystemFailure()
- bus = self._player.get_bus()
- bus.add_signal_watch()
- bus.connect("message", self._onPlayerMessage)
-
- self._pipeline = Gst.Pipeline(name='cthulhu-pipeline')
- bus = self._pipeline.get_bus()
- bus.add_signal_watch()
- bus.connect("message", self._onPipelineMessage)
-
- self._source = Gst.ElementFactory.make('audiotestsrc', 'src')
- self._volume = Gst.ElementFactory.make('volume', 'volume')
- self._sink = Gst.ElementFactory.make('autoaudiosink', 'output')
- if self._source is None or self._sink is None:
- return
-
- self._pipeline.add(self._source)
- if self._volume is not None:
- self._pipeline.add(self._volume)
- self._pipeline.add(self._sink)
- if self._volume is not None:
- self._source.link(self._volume)
- self._volume.link(self._sink)
- else:
- self._source.link(self._sink)
-
- self._initialized = True
-
- def play(self, item, interrupt=True):
+ def play(self, item: Any, interrupt: bool = True) -> None:
"""Plays a sound, interrupting the current play first unless specified."""
if isinstance(item, Icon):
@@ -221,59 +113,406 @@ class Player:
elif isinstance(item, Tone):
self._playTone(item, interrupt)
else:
- tokens = ["SOUND ERROR:", item, "is not an Icon or Tone"]
- debug.printTokens(debug.LEVEL_INFO, tokens, True)
+ debug.printTokens(debug.LEVEL_INFO, ["SOUND ERROR:", item, "is not an Icon or Tone"], True)
- def playAndWait(self, item, interrupt=True, timeout_seconds=10):
+ def playAndWait(self, item: Any, interrupt: bool = True, timeout_seconds: int = 10) -> bool:
"""Plays a sound and blocks until completion or timeout."""
- if not self._player:
- if _gstreamerAvailable and not self._initialized:
- self.init()
- if not self._player:
- return False
-
if isinstance(item, Icon):
- return self._playIconAndWait(
- item,
- interrupt=interrupt,
- timeout_seconds=timeout_seconds
- )
-
+ return self._playIconAndWait(item, interrupt=interrupt, timeout_seconds=timeout_seconds)
+ if isinstance(item, Tone):
+ return self._playToneAndWait(item, interrupt=interrupt, timeout_seconds=timeout_seconds)
self.play(item, interrupt)
return False
- def stop(self, element=None):
- """Stops play."""
+ def stop(self, _element: Any = None) -> None:
+ """Stops current sound playback."""
- if not _gstreamerAvailable:
- return
+ self._sendWorkerCommand({"action": "stop"}, waitForResponse=False)
- if element:
- element.set_state(Gst.State.NULL)
- return
-
- if self._player:
- self._player.set_state(Gst.State.NULL)
-
- if self._pipeline:
- self._pipeline.set_state(Gst.State.NULL)
-
- def shutdown(self):
+ def shutdown(self) -> None:
"""Shuts down the sound utilities."""
- global _gstreamerAvailable
if not _gstreamerAvailable:
return
- self.stop()
- self._initialized = False
- _gstreamerAvailable = False
+ with self._workerLock:
+ self._stopWorkerLocked("Sound system shutdown")
+ self._initialized = False
-_player = Player()
+ def _playIcon(self, icon: Icon, interrupt: bool = True) -> None:
+ if not icon.isValid():
+ return
+
+ success, reason = self._sendWorkerCommand(
+ {
+ "action": "play_file",
+ "path": icon.path,
+ "volume": self._get_configured_volume(),
+ "interrupt": interrupt,
+ },
+ waitForResponse=False,
+ )
+ if not success and reason:
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+
+ def _playIconAndWait(
+ self,
+ icon: Icon,
+ interrupt: bool = True,
+ timeout_seconds: Optional[int] = 10,
+ ) -> bool:
+ if not icon.isValid():
+ return False
+
+ timeout = float((timeout_seconds or 10) + 2)
+ success, reason = self._sendWorkerCommand(
+ {
+ "action": "play_file",
+ "path": icon.path,
+ "volume": self._get_configured_volume(),
+ "interrupt": interrupt,
+ },
+ waitForResponse=True,
+ timeout=timeout,
+ )
+ if not success and reason:
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+ return success
+
+ def _playTone(self, tone: Tone, interrupt: bool = True) -> None:
+ success, reason = self._sendWorkerCommand(
+ self._buildToneCommand(tone, interrupt),
+ waitForResponse=False,
+ )
+ if not success and reason:
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+
+ def _playToneAndWait(
+ self,
+ tone: Tone,
+ interrupt: bool = True,
+ timeout_seconds: Optional[int] = 10,
+ ) -> bool:
+ timeout = max(float(timeout_seconds or 10), float(tone.duration) + 2.0)
+ success, reason = self._sendWorkerCommand(
+ self._buildToneCommand(tone, interrupt),
+ waitForResponse=True,
+ timeout=timeout,
+ )
+ if not success and reason:
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+ return success
+
+ def _buildToneCommand(self, tone: Tone, interrupt: bool) -> dict[str, Any]:
+ return {
+ "action": "play_tone",
+ "duration": tone.duration,
+ "frequency": tone.frequency,
+ "volume": tone.volume,
+ "wave": tone.wave,
+ "interrupt": interrupt,
+ }
+
+ def _ensureWorkerLocked(self) -> bool:
+ configuredSink = sound_sink.get_configured_sound_sink()
+
+ if self._workerProcess is not None and self._workerProcess.poll() is None:
+ if self._workerRestartRequired:
+ reason = self._consumeWorkerRestartReason()
+ debug.printMessage(
+ debug.LEVEL_INFO,
+ f"SOUND: Restarting persistent worker after recovery request: {reason}",
+ True,
+ )
+ self._stopWorkerLocked(reason)
+ elif self._workerSink == configuredSink:
+ return True
+ else:
+ debug.printMessage(
+ debug.LEVEL_INFO,
+ f"SOUND: Restarting persistent worker for soundSink={configuredSink}",
+ True,
+ )
+ self._stopWorkerLocked("Sound sink changed")
+
+ return self._startWorkerLocked(configuredSink)
+
+ def _startWorkerLocked(self, configuredSink: str) -> bool:
+ environment = _buildSoundHelperEnvironment()
+ command = [
+ sys.executable,
+ "-m",
+ "cthulhu.sound_helper",
+ "--worker",
+ "--sound-sink",
+ configuredSink,
+ ]
+
+ try:
+ process = subprocess.Popen(
+ command,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ bufsize=1,
+ env=environment,
+ )
+ except Exception as error:
+ reason = f"Failed to start persistent sound worker: {error}"
+ disableSoundSystem(reason)
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+ return False
+
+ if process.stdin is None or process.stdout is None or process.stderr is None:
+ try:
+ process.terminate()
+ except Exception:
+ pass
+ reason = "Persistent sound worker is missing stdio pipes"
+ disableSoundSystem(reason)
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+ return False
+
+ self._workerProcess = process
+ self._workerSink = configuredSink
+ self._workerRestartRequired = False
+ self._workerRestartReason = None
+ self._startWorkerThreads(process)
+
+ success, reason = self._sendWorkerCommandLocked(
+ {"action": "ping"},
+ waitForResponse=True,
+ timeout=2.0,
+ allowRestart=False,
+ )
+ if not success:
+ self._stopWorkerLocked(reason or "Persistent sound worker failed to initialize")
+ disableSoundSystem(reason or "Persistent sound worker failed to initialize")
+ if reason:
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+ return False
+
+ debug.printMessage(
+ debug.LEVEL_INFO,
+ f"SOUND: Using persistent worker for icon playback (soundSink={configuredSink})",
+ True,
+ )
+ debug.printMessage(
+ debug.LEVEL_INFO,
+ f"SOUND: Using persistent worker for tone playback (soundSink={configuredSink})",
+ True,
+ )
+ return True
+
+ def _startWorkerThreads(self, process: subprocess.Popen[str]) -> None:
+ threading.Thread(
+ target=self._readWorkerStdout,
+ args=(process,),
+ daemon=True,
+ ).start()
+ threading.Thread(
+ target=self._readWorkerStderr,
+ args=(process,),
+ daemon=True,
+ ).start()
+ threading.Thread(
+ target=self._watchWorkerExit,
+ args=(process,),
+ daemon=True,
+ ).start()
+
+ def _readWorkerStdout(self, process: subprocess.Popen[str]) -> None:
+ assert process.stdout is not None
+ for line in process.stdout:
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ response = json.loads(line)
+ except Exception:
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: Invalid worker response: {line}", True)
+ continue
+
+ requestId = response.get("id")
+ if requestId is None:
+ continue
+
+ with self._responseLock:
+ pending = self._pendingResponses.pop(int(requestId), None)
+
+ if pending is None:
+ continue
+
+ pending.response = response
+ pending.event.set()
+
+ def _readWorkerStderr(self, process: subprocess.Popen[str]) -> None:
+ assert process.stderr is not None
+ for line in process.stderr:
+ message = line.strip()
+ if message:
+ self._handleWorkerDiagnostic(message)
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND WORKER: {message}", True)
+
+ def _watchWorkerExit(self, process: subprocess.Popen[str]) -> None:
+ returnCode = process.wait()
+ reason = _formatWorkerFailure(returnCode)
+
+ with self._workerLock:
+ if self._workerProcess is process:
+ self._workerProcess = None
+ self._workerSink = None
+
+ self._failPendingResponses(reason)
+
+ if returnCode != 0:
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
+
+ def _failPendingResponses(self, reason: str) -> None:
+ with self._responseLock:
+ pendingResponses = list(self._pendingResponses.values())
+ self._pendingResponses.clear()
+
+ for pending in pendingResponses:
+ pending.response = {"ok": False, "error": reason}
+ pending.event.set()
+
+ def _stopWorkerLocked(self, reason: str) -> None:
+ process = self._workerProcess
+ self._workerProcess = None
+ self._workerSink = None
+ self._workerRestartRequired = False
+ self._workerRestartReason = None
+
+ if process is None:
+ return
+
+ if process.poll() is None:
+ try:
+ assert process.stdin is not None
+ process.stdin.write(json.dumps({"action": "shutdown"}) + "\n")
+ process.stdin.flush()
+ process.wait(timeout=1.0)
+ except Exception:
+ try:
+ process.terminate()
+ process.wait(timeout=1.0)
+ except Exception:
+ try:
+ process.kill()
+ process.wait(timeout=1.0)
+ except Exception:
+ pass
+
+ self._failPendingResponses(reason)
+
+ def _sendWorkerCommand(
+ self,
+ command: dict[str, Any],
+ waitForResponse: bool,
+ timeout: float = 2.0,
+ ) -> Tuple[bool, Optional[str]]:
+ with self._workerLock:
+ return self._sendWorkerCommandLocked(command, waitForResponse, timeout)
+
+ def _sendWorkerCommandLocked(
+ self,
+ command: dict[str, Any],
+ waitForResponse: bool,
+ timeout: float = 2.0,
+ allowRestart: bool = True,
+ ) -> Tuple[bool, Optional[str]]:
+ if not self._ensureWorkerLocked():
+ return False, getSoundSystemFailureReason() or "Persistent sound worker is unavailable"
+
+ process = self._workerProcess
+ if process is None or process.stdin is None:
+ if allowRestart:
+ self._stopWorkerLocked("Worker pipes disappeared")
+ if self._ensureWorkerLocked():
+ return self._sendWorkerCommandLocked(command, waitForResponse, timeout, allowRestart=False)
+ return False, "Persistent sound worker stdin is unavailable"
+
+ requestId: Optional[int] = None
+ pending: Optional[_PendingResponse] = None
+ payload = dict(command)
+
+ if waitForResponse:
+ requestId = self._allocateRequestId()
+ pending = _PendingResponse()
+ payload["id"] = requestId
+ with self._responseLock:
+ self._pendingResponses[requestId] = pending
+
+ try:
+ process.stdin.write(json.dumps(payload) + "\n")
+ process.stdin.flush()
+ except Exception as error:
+ if requestId is not None:
+ with self._responseLock:
+ self._pendingResponses.pop(requestId, None)
+ if allowRestart:
+ self._stopWorkerLocked(f"Worker write failed: {error}")
+ if self._ensureWorkerLocked():
+ return self._sendWorkerCommandLocked(command, waitForResponse, timeout, allowRestart=False)
+ return False, f"Worker write failed: {error}"
+
+ if not waitForResponse or pending is None:
+ return True, None
+
+ if pending.event.wait(timeout):
+ response = pending.response or {"ok": False, "error": "No worker response"}
+ if not bool(response.get("ok")):
+ self._maybeMarkWorkerRestartRequired(response.get("error"))
+ return bool(response.get("ok")), response.get("error")
+
+ with self._responseLock:
+ self._pendingResponses.pop(requestId, None)
+
+ if allowRestart and (self._workerProcess is None or self._workerProcess.poll() is not None):
+ self._stopWorkerLocked("Worker exited while waiting for response")
+ if self._ensureWorkerLocked():
+ return self._sendWorkerCommandLocked(command, waitForResponse, timeout, allowRestart=False)
+
+ reason = f"Persistent sound worker timed out after {timeout:.1f} seconds"
+ self._markWorkerRestartRequired(reason)
+ return False, reason
+
+ def _allocateRequestId(self) -> int:
+ with self._responseLock:
+ requestId = self._nextRequestId
+ self._nextRequestId += 1
+ return requestId
+
+ def _handleWorkerDiagnostic(self, message: str) -> None:
+ normalizedMessage = str(message).strip()
+ if not normalizedMessage.startswith("RECOVERY REQUIRED:"):
+ return
+
+ reason = normalizedMessage.split(":", 1)[1].strip() or "Worker requested recovery"
+ self._markWorkerRestartRequired(reason)
+
+ def _markWorkerRestartRequired(self, reason: Optional[str]) -> None:
+ normalizedReason = str(reason or "").strip() or "Worker requested recovery"
+ with self._workerLock:
+ self._workerRestartRequired = True
+ self._workerRestartReason = normalizedReason
+
+ def _consumeWorkerRestartReason(self) -> str:
+ reason = self._workerRestartReason or "Worker requested recovery"
+ self._workerRestartRequired = False
+ self._workerRestartReason = None
+ return reason
+
+ def _maybeMarkWorkerRestartRequired(self, reason: Optional[str]) -> None:
+ normalizedReason = str(reason or "").strip().lower()
+ if normalizedReason in {"", "interrupted", "stopped", "shutdown", "stdin closed"}:
+ return
+
+ self._markWorkerRestartRequired(reason)
-def getPlayer():
- return _player
def disableSoundSystem(reason: str) -> None:
global _soundSystemFailureReason
@@ -282,11 +521,64 @@ def disableSoundSystem(reason: str) -> None:
return
_soundSystemFailureReason = reason
- msg = f"SOUND: Disabling sound system. Reason: {reason}"
- debug.printMessage(debug.LEVEL_INFO, msg, True)
+ debug.printMessage(debug.LEVEL_INFO, f"SOUND: Disabling sound system. Reason: {reason}", True)
+
def getSoundSystemFailureReason() -> Optional[str]:
return _soundSystemFailureReason
+
def isSoundSystemAvailable() -> bool:
return _soundSystemFailureReason is None
+
+
+def clearSoundSystemFailure() -> None:
+ global _soundSystemFailureReason
+ _soundSystemFailureReason = None
+
+
+def _buildSoundHelperEnvironment() -> dict[str, str]:
+ pythonPathEntries = []
+ for path in sys.path:
+ if not path:
+ path = os.getcwd()
+ if os.path.isdir(path) and path not in pythonPathEntries:
+ pythonPathEntries.append(path)
+
+ environment = os.environ.copy()
+ if pythonPathEntries:
+ environment["PYTHONPATH"] = os.pathsep.join(pythonPathEntries)
+ return environment
+
+
+def _formatWorkerFailure(returnCode: int) -> str:
+ if returnCode < 0:
+ return f"Sound worker exited via signal {-returnCode}"
+ return f"Sound worker exited with status {returnCode}"
+
+
+_player = Player()
+
+
+def getPlayer() -> Player:
+ return _player
+
+
+def play(item: Any, interrupt: bool = True) -> None:
+ _player.play(item, interrupt)
+
+
+def playIconSafely(icon: Icon, timeoutSeconds: int = 10) -> Tuple[bool, Optional[str]]:
+ if not icon.isValid():
+ return False, f"Invalid sound icon: {icon.path}"
+
+ return _player._sendWorkerCommand(
+ {
+ "action": "play_file",
+ "path": icon.path,
+ "volume": Player._get_configured_volume(),
+ "interrupt": True,
+ },
+ waitForResponse=True,
+ timeout=max(0.1, float(timeoutSeconds)),
+ )
diff --git a/src/cthulhu/sound_helper.py b/src/cthulhu/sound_helper.py
new file mode 100644
index 0000000..2e1ec80
--- /dev/null
+++ b/src/cthulhu/sound_helper.py
@@ -0,0 +1,443 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2026 Stormux
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+
+"""Persistent sound helper used to isolate theme and tone playback crashes."""
+
+import argparse
+from collections import deque
+import json
+import pathlib
+import sys
+import threading
+from typing import Any, Optional
+
+import gi
+
+gi.require_version('GLib', '2.0')
+gi.require_version('Gst', '1.0')
+from gi.repository import GLib, Gst
+
+from . import settings
+from . import sound_sink
+
+
+def _clamp_volume(volume: Any) -> float:
+ try:
+ return max(0.0, float(volume))
+ except Exception:
+ return 1.0
+
+
+def _write_json_line(payload: dict[str, Any]) -> None:
+ sys.stdout.write(json.dumps(payload) + "\n")
+ sys.stdout.flush()
+
+
+def _report_recovery_required(message: str) -> None:
+ print(f"RECOVERY REQUIRED: {message}", file=sys.stderr, flush=True)
+
+
+def _create_file_player(
+ playerId: str,
+ soundSink: Optional[str] = None,
+) -> tuple[Optional[Any], Optional[str], Optional[str]]:
+ player = Gst.ElementFactory.make("playbin", playerId)
+ if player is None:
+ return None, None, "Failed to create playbin for file playback"
+
+ configuredSink = sound_sink.normalize_sound_sink_choice(soundSink)
+ if configuredSink == settings.SOUND_SINK_AUTO:
+ return player, "playbin-default", None
+
+ audioSink, sinkName, sinkError = sound_sink.create_audio_sink(
+ f"{playerId}-output",
+ configuredSink,
+ )
+ if audioSink is None:
+ return None, sinkName, sinkError or f"Failed to create audio sink for {configuredSink}"
+
+ player.set_property("audio-sink", audioSink)
+ return player, sinkName or configuredSink, None
+
+
+class SoundWorker:
+ """Runs a long-lived GStreamer worker for icon and tone playback."""
+
+ def __init__(self, soundSink: Optional[str] = None) -> None:
+ available, _args = Gst.init_check(None)
+ if not available:
+ raise RuntimeError("GStreamer is not available")
+
+ self._loop = GLib.MainLoop()
+ self._queue: deque[dict[str, Any]] = deque()
+ self._currentCommand: Optional[dict[str, Any]] = None
+ self._toneTimeoutId = 0
+
+ self._filePlayer, fileSinkName, fileSinkError = _create_file_player(
+ "cthulhu-sound-worker-file",
+ soundSink,
+ )
+ if self._filePlayer is None:
+ raise RuntimeError(fileSinkError or "Failed to create file playback player")
+
+ fileBus = self._filePlayer.get_bus()
+ if fileBus is None:
+ raise RuntimeError("No bus available for file playback player")
+ fileBus.add_signal_watch()
+ fileBus.connect("message", self._onFileMessage)
+
+ self._tonePipeline = Gst.Pipeline.new('cthulhu-sound-worker-tone')
+ if self._tonePipeline is None:
+ raise RuntimeError("Failed to create tone pipeline")
+
+ self._toneSource = Gst.ElementFactory.make('audiotestsrc', 'cthulhu-sound-worker-source')
+ self._toneVolume = Gst.ElementFactory.make('volume', 'cthulhu-sound-worker-volume')
+ toneSink, toneSinkName, toneSinkError = sound_sink.create_audio_sink(
+ 'cthulhu-sound-worker-tone-output',
+ soundSink
+ )
+ if self._toneSource is None or toneSink is None:
+ raise RuntimeError(toneSinkError or "Failed to create tone playback pipeline")
+
+ self._toneSink = toneSink
+ self._tonePipeline.add(self._toneSource)
+ if self._toneVolume is not None:
+ self._tonePipeline.add(self._toneVolume)
+ self._tonePipeline.add(self._toneSink)
+ if self._toneVolume is not None:
+ if not self._toneSource.link(self._toneVolume):
+ raise RuntimeError("Failed to link tone source to volume")
+ if not self._toneVolume.link(self._toneSink):
+ raise RuntimeError("Failed to link tone volume to sink")
+ elif not self._toneSource.link(self._toneSink):
+ raise RuntimeError("Failed to link tone source to sink")
+
+ toneBus = self._tonePipeline.get_bus()
+ if toneBus is None:
+ raise RuntimeError("No bus available for tone pipeline")
+ toneBus.add_signal_watch()
+ toneBus.connect("message", self._onToneMessage)
+
+ self._fileSinkName = fileSinkName or "unknown"
+ self._toneSinkName = toneSinkName or "unknown"
+
+ def run(self) -> int:
+ self._report_worker_ready()
+ readerThread = threading.Thread(target=self._read_commands, daemon=True)
+ readerThread.start()
+ try:
+ self._loop.run()
+ return 0
+ finally:
+ self._clear_tone_timeout()
+ self._filePlayer.set_state(Gst.State.NULL)
+ self._tonePipeline.set_state(Gst.State.NULL)
+
+ def _report_worker_ready(self) -> None:
+ print(
+ f"Persistent sound worker ready: file={self._fileSinkName} tone={self._toneSinkName}",
+ file=sys.stderr,
+ flush=True,
+ )
+
+ def _read_commands(self) -> None:
+ try:
+ for line in sys.stdin:
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ command = json.loads(line)
+ except Exception as error:
+ print(f"Invalid worker command: {error}", file=sys.stderr, flush=True)
+ continue
+ GLib.idle_add(self._handle_command, command)
+ finally:
+ GLib.idle_add(self._quit_from_stdin_close)
+
+ def _quit_from_stdin_close(self) -> bool:
+ self._interrupt_current("stdin closed")
+ self._interrupt_queued("stdin closed")
+ self._loop.quit()
+ return False
+
+ def _handle_command(self, command: dict[str, Any]) -> bool:
+ action = str(command.get("action", "")).strip().lower()
+ requestId = command.get("id")
+
+ if action == "ping":
+ self._respond(requestId, True)
+ return False
+
+ if action == "shutdown":
+ self._interrupt_current("shutdown")
+ self._interrupt_queued("shutdown")
+ self._respond(requestId, True)
+ self._loop.quit()
+ return False
+
+ if action == "stop":
+ self._interrupt_current("stopped")
+ self._interrupt_queued("stopped")
+ self._respond(requestId, True)
+ return False
+
+ if action not in {"play_file", "play_tone"}:
+ self._respond(requestId, False, f"Unknown worker action: {action}")
+ return False
+
+ interrupt = bool(command.get("interrupt", True))
+ if interrupt:
+ self._interrupt_current("interrupted")
+ self._interrupt_queued("interrupted")
+ elif self._currentCommand is not None:
+ self._queue.append(command)
+ return False
+
+ if self._currentCommand is None:
+ self._start_command(command)
+ else:
+ self._queue.appendleft(command)
+
+ return False
+
+ def _start_command(self, command: dict[str, Any]) -> None:
+ action = command["action"]
+ if action == "play_file":
+ self._start_file_command(command)
+ return
+ if action == "play_tone":
+ self._start_tone_command(command)
+ return
+ self._respond(command.get("id"), False, f"Unsupported worker action: {action}")
+ self._start_next_command()
+
+ def _start_file_command(self, command: dict[str, Any]) -> None:
+ soundPath = pathlib.Path(str(command.get("path", ""))).expanduser()
+ if not soundPath.is_file():
+ self._respond(command.get("id"), False, f"Missing sound file: {soundPath}")
+ self._start_next_command()
+ return
+
+ self._clear_tone_timeout()
+ self._tonePipeline.set_state(Gst.State.NULL)
+ self._filePlayer.set_state(Gst.State.NULL)
+
+ self._currentCommand = command
+ self._filePlayer.set_property("uri", soundPath.resolve().as_uri())
+ self._filePlayer.set_property("volume", _clamp_volume(command.get("volume", 1.0)))
+ stateChange = self._filePlayer.set_state(Gst.State.PLAYING)
+ if stateChange == Gst.StateChangeReturn.FAILURE:
+ self._currentCommand = None
+ reason = f"Failed to start playback for {soundPath}"
+ _report_recovery_required(reason)
+ self._respond(command.get("id"), False, reason)
+ self._start_next_command()
+
+ def _start_tone_command(self, command: dict[str, Any]) -> None:
+ self._filePlayer.set_state(Gst.State.NULL)
+ self._tonePipeline.set_state(Gst.State.NULL)
+ self._clear_tone_timeout()
+
+ try:
+ durationSeconds = max(0.0, float(command.get("duration", 0.0)))
+ frequency = max(0, min(20000, int(command.get("frequency", 0))))
+ wave = int(command.get("wave", 0))
+ except Exception as error:
+ self._respond(command.get("id"), False, f"Invalid tone request: {error}")
+ self._start_next_command()
+ return
+
+ self._currentCommand = command
+ if self._toneVolume is not None:
+ self._toneVolume.set_property('volume', _clamp_volume(command.get("volume", 1.0)))
+ self._toneSource.set_property('volume', 1.0)
+ else:
+ self._toneSource.set_property('volume', _clamp_volume(command.get("volume", 1.0)))
+ self._toneSource.set_property('freq', frequency)
+ self._toneSource.set_property('wave', wave)
+
+ stateChange = self._tonePipeline.set_state(Gst.State.PLAYING)
+ if stateChange == Gst.StateChangeReturn.FAILURE:
+ self._currentCommand = None
+ reason = "Failed to start tone playback"
+ _report_recovery_required(reason)
+ self._respond(command.get("id"), False, reason)
+ self._start_next_command()
+ return
+
+ durationMs = max(1, int(durationSeconds * 1000))
+ self._toneTimeoutId = GLib.timeout_add(durationMs, self._finish_tone_playback)
+
+ def _finish_tone_playback(self) -> bool:
+ self._toneTimeoutId = 0
+ self._tonePipeline.set_state(Gst.State.NULL)
+ self._finish_current(True)
+ return False
+
+ def _clear_tone_timeout(self) -> None:
+ if self._toneTimeoutId:
+ GLib.source_remove(self._toneTimeoutId)
+ self._toneTimeoutId = 0
+
+ def _interrupt_current(self, reason: str) -> None:
+ if self._currentCommand is None:
+ self._filePlayer.set_state(Gst.State.NULL)
+ self._tonePipeline.set_state(Gst.State.NULL)
+ self._clear_tone_timeout()
+ return
+
+ current = self._currentCommand
+ self._currentCommand = None
+ self._filePlayer.set_state(Gst.State.NULL)
+ self._tonePipeline.set_state(Gst.State.NULL)
+ self._clear_tone_timeout()
+ self._respond(current.get("id"), False, reason)
+
+ def _interrupt_queued(self, reason: str) -> None:
+ while self._queue:
+ queued = self._queue.popleft()
+ self._respond(queued.get("id"), False, reason)
+
+ def _finish_current(self, success: bool, error: Optional[str] = None) -> None:
+ current = self._currentCommand
+ self._currentCommand = None
+ if current is not None:
+ self._respond(current.get("id"), success, error)
+ self._start_next_command()
+
+ def _start_next_command(self) -> None:
+ if self._currentCommand is not None or not self._queue:
+ return
+ self._start_command(self._queue.popleft())
+
+ def _respond(self, requestId: Any, success: bool, error: Optional[str] = None) -> None:
+ if requestId is None:
+ return
+
+ payload: dict[str, Any] = {
+ "id": requestId,
+ "ok": bool(success),
+ }
+ if error:
+ payload["error"] = str(error)
+ _write_json_line(payload)
+
+ def _onFileMessage(self, _bus: Any, message: Any) -> None:
+ if self._currentCommand is None or self._currentCommand.get("action") != "play_file":
+ return
+
+ if message.type == Gst.MessageType.EOS:
+ self._filePlayer.set_state(Gst.State.NULL)
+ self._finish_current(True)
+ return
+
+ if message.type == Gst.MessageType.ERROR:
+ self._filePlayer.set_state(Gst.State.NULL)
+ error, _info = message.parse_error()
+ _report_recovery_required(str(error))
+ print(str(error), file=sys.stderr, flush=True)
+ self._finish_current(False, str(error))
+
+ def _onToneMessage(self, _bus: Any, message: Any) -> None:
+ if self._currentCommand is None or self._currentCommand.get("action") != "play_tone":
+ return
+
+ if message.type != Gst.MessageType.ERROR:
+ return
+
+ self._tonePipeline.set_state(Gst.State.NULL)
+ self._clear_tone_timeout()
+ error, _info = message.parse_error()
+ _report_recovery_required(str(error))
+ print(str(error), file=sys.stderr, flush=True)
+ self._finish_current(False, str(error))
+
+
+def play_file_once(
+ soundPath: str,
+ timeoutSeconds: int = 10,
+ soundSink: Optional[str] = None,
+ volume: float = 1.0,
+) -> int:
+ available, _args = Gst.init_check(None)
+ if not available:
+ print("GStreamer is not available", file=sys.stderr)
+ return 1
+
+ player, sinkName, sinkError = _create_file_player(
+ "cthulhu-sound-helper-once",
+ soundSink,
+ )
+ if player is None:
+ print(sinkError or "Failed to create one-shot file playback player", file=sys.stderr)
+ return 1
+
+ player.set_property("uri", pathlib.Path(soundPath).resolve().as_uri())
+ player.set_property("volume", _clamp_volume(volume))
+ player.set_state(Gst.State.PLAYING)
+
+ bus = player.get_bus()
+ if bus is None:
+ player.set_state(Gst.State.NULL)
+ print("No bus available for one-shot playback", file=sys.stderr)
+ return 1
+
+ timeoutNs = int(timeoutSeconds * Gst.SECOND)
+ try:
+ message = bus.timed_pop_filtered(
+ timeoutNs,
+ Gst.MessageType.EOS | Gst.MessageType.ERROR
+ )
+ if message is None:
+ print(
+ f"Sound helper timed out after {timeoutSeconds} seconds on sink {sinkName}",
+ file=sys.stderr
+ )
+ return 1
+ if message.type == Gst.MessageType.ERROR:
+ error, _info = message.parse_error()
+ print(str(error), file=sys.stderr)
+ return 1
+ return 0
+ finally:
+ player.set_state(Gst.State.NULL)
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(add_help=False)
+ parser.add_argument("--worker", action="store_true")
+ parser.add_argument("--play-file")
+ parser.add_argument("--sound-sink")
+ parser.add_argument("--volume", type=float, default=1.0)
+ parser.add_argument("--timeout-seconds", type=int, default=10)
+ args, _unknown = parser.parse_known_args()
+
+ if args.worker:
+ worker = SoundWorker(args.sound_sink)
+ return worker.run()
+
+ if args.play_file:
+ return play_file_once(
+ args.play_file,
+ args.timeout_seconds,
+ args.sound_sink,
+ args.volume,
+ )
+
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/src/cthulhu/sound_sink.py b/src/cthulhu/sound_sink.py
new file mode 100644
index 0000000..f78a7c4
--- /dev/null
+++ b/src/cthulhu/sound_sink.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2026 Stormux
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+
+"""Utilities for selecting GStreamer audio sinks."""
+
+from __future__ import annotations
+
+from typing import Any, Optional, Tuple
+
+import gi
+
+try:
+ gi.require_version('Gst', '1.0')
+ from gi.repository import Gst
+except Exception:
+ _gstreamerAvailable: bool = False
+else:
+ _gstreamerAvailable, _args = Gst.init_check(None)
+
+from . import settings
+
+_SINK_ELEMENT_BY_SETTING = {
+ settings.SOUND_SINK_PIPEWIRE: "pipewiresink",
+ settings.SOUND_SINK_PULSE: "pulsesink",
+ settings.SOUND_SINK_ALSA: "alsasink",
+}
+
+_AUTO_SINK_ELEMENTS = [
+ "autoaudiosink",
+ "pipewiresink",
+ "pulsesink",
+ "alsasink",
+]
+
+
+def normalize_sound_sink_choice(soundSink: Optional[str]) -> str:
+ """Return a valid sound sink setting value."""
+
+ if soundSink is None:
+ return settings.SOUND_SINK_AUTO
+
+ choice = str(soundSink).strip().lower()
+ if choice in settings.SOUND_SINK_VALUES:
+ return choice
+
+ return settings.SOUND_SINK_AUTO
+
+
+def get_configured_sound_sink() -> str:
+ """Return the configured sound sink, falling back to defaults."""
+
+ try:
+ from . import cthulhu
+ if cthulhu.cthulhuApp is not None:
+ configured = cthulhu.cthulhuApp.settingsManager.getSetting("soundSink")
+ return normalize_sound_sink_choice(configured)
+ except Exception:
+ pass
+
+ return normalize_sound_sink_choice(getattr(settings, "soundSink", settings.SOUND_SINK_AUTO))
+
+
+def _get_sink_element_candidates(soundSink: str) -> list[str]:
+ if soundSink == settings.SOUND_SINK_AUTO:
+ return list(_AUTO_SINK_ELEMENTS)
+
+ elementName = _SINK_ELEMENT_BY_SETTING.get(soundSink)
+ if elementName:
+ return [elementName]
+
+ return list(_AUTO_SINK_ELEMENTS)
+
+
+def create_audio_sink(
+ sinkId: str,
+ soundSink: Optional[str] = None,
+) -> Tuple[Optional[Any], Optional[str], Optional[str]]:
+ """Create a configured GStreamer audio sink."""
+
+ if not _gstreamerAvailable:
+ return None, None, "GStreamer is not available"
+
+ configuredSink = normalize_sound_sink_choice(soundSink or get_configured_sound_sink())
+ triedCandidates = []
+
+ for elementName in _get_sink_element_candidates(configuredSink):
+ factory = Gst.ElementFactory.find(elementName)
+ if factory is None:
+ triedCandidates.append(f"{elementName} (unavailable)")
+ continue
+
+ sink = Gst.ElementFactory.make(elementName, sinkId)
+ if sink is None:
+ triedCandidates.append(f"{elementName} (failed to create)")
+ continue
+
+ return sink, elementName, None
+
+ triedText = ", ".join(triedCandidates) if triedCandidates else "none"
+ reason = (
+ f"No usable audio sink for soundSink={configuredSink}. "
+ f"Tried: {triedText}"
+ )
+ return None, None, reason
diff --git a/src/cthulhu/sound_theme_manager.py b/src/cthulhu/sound_theme_manager.py
index e963030..c66efd3 100644
--- a/src/cthulhu/sound_theme_manager.py
+++ b/src/cthulhu/sound_theme_manager.py
@@ -317,7 +317,7 @@ class SoundThemeManager:
return None
def _playThemeSound(self, soundName, interrupt=True, wait=False,
- requireSoundSetting=False):
+ requireSoundSetting=False, timeoutSeconds=10):
"""Play a themed sound with optional gating and blocking.
Args:
@@ -349,14 +349,17 @@ class SoundThemeManager:
try:
icon = Icon(os.path.dirname(soundPath), os.path.basename(soundPath))
if icon.isValid():
- player = sound.getPlayer()
- if wait and hasattr(player, "playAndWait"):
- success = player.playAndWait(icon, interrupt=interrupt)
+ if wait:
+ success, reason = sound.playIconSafely(icon, timeoutSeconds=timeoutSeconds)
if not success:
- reason = "Failed to play theme sound via GStreamer"
- sound.disableSoundSystem(reason)
- self.app.getSettingsManager().setSetting('enableSound', False)
+ failureReason = reason or f"Failed to play sound '{soundName}'"
+ msg = (
+ "SOUND THEME: Failed to play helper-isolated sound "
+ f"'{soundName}'. Continuing with sound enabled. Reason: {failureReason}"
+ )
+ debug.printMessage(debug.LEVEL_INFO, msg, True)
return success
+ player = sound.getPlayer()
player.play(icon, interrupt=interrupt)
return True
except Exception as e:
@@ -365,7 +368,7 @@ class SoundThemeManager:
return False
- def playSound(self, soundName, interrupt=True, wait=False):
+ def playSound(self, soundName, interrupt=True, wait=False, timeoutSeconds=10):
"""Play a sound from the current theme if enabled.
Args:
@@ -380,7 +383,8 @@ class SoundThemeManager:
soundName,
interrupt=interrupt,
wait=wait,
- requireSoundSetting=True
+ requireSoundSetting=True,
+ timeoutSeconds=timeoutSeconds
)
def playFocusModeSound(self):
@@ -395,22 +399,24 @@ class SoundThemeManager:
"""Play sound for button focus (future use)."""
return self.playSound(SOUND_BUTTON)
- def playStartSound(self, wait=False):
+ def playStartSound(self, wait=False, timeoutSeconds=10):
"""Play sound for application startup."""
return self._playThemeSound(
SOUND_START,
interrupt=True,
wait=wait,
- requireSoundSetting=True
+ requireSoundSetting=True,
+ timeoutSeconds=timeoutSeconds
)
- def playStopSound(self, wait=False):
+ def playStopSound(self, wait=False, timeoutSeconds=10):
"""Play sound for application shutdown."""
return self._playThemeSound(
SOUND_STOP,
interrupt=True,
wait=wait,
- requireSoundSetting=True
+ requireSoundSetting=True,
+ timeoutSeconds=timeoutSeconds
)
_manager = None
diff --git a/tests/test_sound_helper_backend.py b/tests/test_sound_helper_backend.py
new file mode 100644
index 0000000..ceafd7d
--- /dev/null
+++ b/tests/test_sound_helper_backend.py
@@ -0,0 +1,71 @@
+import sys
+import unittest
+from pathlib import Path
+from unittest import mock
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
+
+from cthulhu import settings
+from cthulhu import sound_helper
+
+
+class _FakePlaybin:
+ def __init__(self):
+ self.properties = {}
+
+ def set_property(self, name, value):
+ self.properties[name] = value
+
+
+class SoundHelperBackendTests(unittest.TestCase):
+ def test_create_file_player_uses_playbin_default_sink_for_auto(self):
+ fakePlaybin = _FakePlaybin()
+
+ with (
+ mock.patch.object(
+ sound_helper.Gst.ElementFactory,
+ "make",
+ return_value=fakePlaybin,
+ ) as makeElement,
+ mock.patch.object(
+ sound_helper.sound_sink,
+ "create_audio_sink",
+ ) as createAudioSink,
+ ):
+ player, sinkName, error = sound_helper._create_file_player("worker-file", settings.SOUND_SINK_AUTO)
+
+ self.assertIs(player, fakePlaybin)
+ self.assertEqual(sinkName, "playbin-default")
+ self.assertIsNone(error)
+ createAudioSink.assert_not_called()
+ makeElement.assert_called_once_with("playbin", "worker-file")
+ self.assertNotIn("audio-sink", fakePlaybin.properties)
+
+ def test_create_file_player_sets_explicit_sink_when_requested(self):
+ fakePlaybin = _FakePlaybin()
+ fakeSink = object()
+
+ with (
+ mock.patch.object(
+ sound_helper.Gst.ElementFactory,
+ "make",
+ return_value=fakePlaybin,
+ ) as makeElement,
+ mock.patch.object(
+ sound_helper.sound_sink,
+ "create_audio_sink",
+ return_value=(fakeSink, "pulsesink", None),
+ ) as createAudioSink,
+ ):
+ player, sinkName, error = sound_helper._create_file_player("worker-file", settings.SOUND_SINK_PULSE)
+
+ self.assertIs(player, fakePlaybin)
+ self.assertEqual(sinkName, "pulsesink")
+ self.assertIsNone(error)
+ makeElement.assert_called_once_with("playbin", "worker-file")
+ createAudioSink.assert_called_once()
+ self.assertIs(fakePlaybin.properties["audio-sink"], fakeSink)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_sound_recovery.py b/tests/test_sound_recovery.py
new file mode 100644
index 0000000..86910b2
--- /dev/null
+++ b/tests/test_sound_recovery.py
@@ -0,0 +1,93 @@
+import sys
+import types
+import unittest
+from pathlib import Path
+from unittest import mock
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
+
+soundGeneratorStub = types.ModuleType("cthulhu.sound_generator")
+
+
+class _Icon:
+ def __init__(self, path="", name=""):
+ self.path = path
+ self.name = name
+
+ def isValid(self):
+ return True
+
+
+class _Tone:
+ def __init__(self, duration=0.1, frequency=440, volume=1.0, wave=0):
+ self.duration = duration
+ self.frequency = frequency
+ self.volume = volume
+ self.wave = wave
+
+
+soundGeneratorStub.Icon = _Icon
+soundGeneratorStub.Tone = _Tone
+sys.modules.setdefault("cthulhu.sound_generator", soundGeneratorStub)
+
+from cthulhu import settings
+from cthulhu import sound
+from cthulhu import sound_sink
+
+
+class _FakeProcess:
+ def poll(self):
+ return None
+
+
+class SoundSinkTests(unittest.TestCase):
+ def test_auto_sink_prefers_autoaudiosink(self):
+ candidates = sound_sink._get_sink_element_candidates(settings.SOUND_SINK_AUTO)
+ self.assertGreaterEqual(len(candidates), 1)
+ self.assertEqual(candidates[0], "autoaudiosink")
+
+
+class PlayerRecoveryTests(unittest.TestCase):
+ def test_worker_diagnostic_marks_restart_required(self):
+ player = sound.Player()
+
+ player._handleWorkerDiagnostic("RECOVERY REQUIRED: lost audio sink")
+
+ self.assertTrue(player._workerRestartRequired)
+ self.assertEqual(player._workerRestartReason, "lost audio sink")
+
+ def test_ensure_worker_restarts_when_recovery_is_required(self):
+ player = sound.Player()
+ player._workerProcess = _FakeProcess()
+ player._workerSink = settings.SOUND_SINK_AUTO
+ player._workerRestartRequired = True
+ player._workerRestartReason = "lost audio sink"
+
+ stopReasons = []
+ startedSinks = []
+
+ with (
+ mock.patch.object(
+ sound_sink,
+ "get_configured_sound_sink",
+ return_value=settings.SOUND_SINK_AUTO,
+ ),
+ mock.patch.object(
+ player,
+ "_stopWorkerLocked",
+ side_effect=lambda reason: stopReasons.append(reason),
+ ),
+ mock.patch.object(
+ player,
+ "_startWorkerLocked",
+ side_effect=lambda configuredSink: startedSinks.append(configuredSink) or True,
+ ),
+ ):
+ self.assertTrue(player._ensureWorkerLocked())
+
+ self.assertEqual(stopReasons, ["lost audio sink"])
+ self.assertEqual(startedSinks, [settings.SOUND_SINK_AUTO])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_web_router_regressions.py b/tests/test_web_router_regressions.py
new file mode 100644
index 0000000..31c1782
--- /dev/null
+++ b/tests/test_web_router_regressions.py
@@ -0,0 +1,112 @@
+import sys
+import unittest
+from pathlib import Path
+from unittest import mock
+
+import gi
+
+gi.require_version("Gdk", "3.0")
+gi.require_version("Gtk", "3.0")
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
+
+soundGeneratorModule = sys.modules.get("cthulhu.sound_generator")
+if soundGeneratorModule is not None and not hasattr(soundGeneratorModule, "SoundGenerator"):
+ class _StubSoundGenerator:
+ pass
+
+ soundGeneratorModule.SoundGenerator = _StubSoundGenerator
+
+from cthulhu import ax_object
+from cthulhu import ax_utilities
+from cthulhu.scripts.web import script as web_script
+from cthulhu.scripts.web import script_utilities
+
+
+class WebClickableActivationTests(unittest.TestCase):
+ def test_return_activates_click_ancestor_on_caret_context(self):
+ testScript = web_script.Script.__new__(web_script.Script)
+ caretObject = object()
+ documentObject = object()
+
+ testScript.utilities = mock.Mock()
+ testScript.utilities.inDocumentContent.return_value = True
+ testScript.utilities.getCaretContext.return_value = (caretObject, 0)
+ testScript.utilities.isClickableElement.return_value = False
+ testScript.inFocusMode = mock.Mock(return_value=False)
+ testScript.presentMessage = mock.Mock()
+ testScript._presentDelayedMessage = mock.Mock()
+ testScript._restoreFocusAfterClick = mock.Mock()
+
+ def has_action(obj, action_name):
+ return obj is caretObject and action_name == "click-ancestor"
+
+ with (
+ mock.patch.object(web_script.cthulhu_state, "locusOfFocus", documentObject),
+ mock.patch.object(ax_utilities.AXUtilities, "is_entry", return_value=False),
+ mock.patch.object(ax_utilities.AXUtilities, "is_text", return_value=False),
+ mock.patch.object(ax_utilities.AXUtilities, "is_password_text", return_value=False),
+ mock.patch.object(ax_utilities.AXUtilities, "is_combo_box", return_value=False),
+ mock.patch.object(ax_utilities.AXUtilities, "is_button", return_value=False),
+ mock.patch.object(ax_utilities.AXUtilities, "is_push_button", return_value=False),
+ mock.patch.object(ax_utilities.AXUtilities, "is_link", return_value=False),
+ mock.patch.object(ax_object.AXObject, "has_action", side_effect=has_action),
+ mock.patch.object(ax_object.AXObject, "do_named_action", return_value=True) as doAction,
+ mock.patch("cthulhu.ax_event_synthesizer.AXEventSynthesizer.click_object") as clickObject,
+ ):
+ self.assertTrue(testScript._tryClickableActivation(mock.Mock(event_string="Return")))
+
+ doAction.assert_called_once_with(caretObject, "click-ancestor")
+ clickObject.assert_not_called()
+
+
+class WebHiddenPopupTests(unittest.TestCase):
+ def test_hidden_ancestor_marks_descendant_hidden(self):
+ utilities = script_utilities.Utilities.__new__(script_utilities.Utilities)
+ child = object()
+ hiddenParent = object()
+
+ utilities.inDocumentContent = mock.Mock(return_value=True)
+ utilities.objectAttributes = mock.Mock(
+ side_effect=lambda obj, useCache=False: {
+ child: {"display": "block"},
+ hiddenParent: {"display": "none"},
+ }.get(obj, {})
+ )
+
+ with mock.patch.object(
+ script_utilities.AXObject,
+ "get_parent",
+ side_effect=lambda obj: hiddenParent if obj is child else None,
+ ):
+ self.assertTrue(utilities.isHidden(child))
+
+ def test_focusable_hidden_object_cannot_have_caret_context(self):
+ utilities = script_utilities.Utilities.__new__(script_utilities.Utilities)
+ hiddenObject = object()
+
+ utilities._canHaveCaretContextDecision = {}
+ utilities.isZombie = mock.Mock(return_value=False)
+ utilities.isStaticTextLeaf = mock.Mock(return_value=False)
+ utilities.isUselessEmptyElement = mock.Mock(return_value=False)
+ utilities.isOffScreenLabel = mock.Mock(return_value=False)
+ utilities.isNonNavigablePopup = mock.Mock(return_value=False)
+ utilities.isUselessImage = mock.Mock(return_value=False)
+ utilities.isEmptyAnchor = mock.Mock(return_value=False)
+ utilities.isEmptyToolTip = mock.Mock(return_value=False)
+ utilities.isParentOfNullChild = mock.Mock(return_value=False)
+ utilities.isPseudoElement = mock.Mock(return_value=False)
+ utilities.isFakePlaceholderForEntry = mock.Mock(return_value=False)
+ utilities.isNonInteractiveDescendantOfControl = mock.Mock(return_value=False)
+ utilities.isHidden = mock.Mock(return_value=True)
+ utilities.hasNoSize = mock.Mock(return_value=False)
+
+ with (
+ mock.patch.object(script_utilities.AXObject, "is_dead", return_value=False),
+ mock.patch.object(script_utilities.AXUtilities, "is_focusable", return_value=True),
+ ):
+ self.assertFalse(utilities._canHaveCaretContext(hiddenObject))
+
+
+if __name__ == "__main__":
+ unittest.main()