Attempt to make the sound system more robust. Improve some web component detection. Fixed a place where Cthulhu gets stuck on netgear's web interface.

This commit is contained in:
Storm Dragon
2026-04-04 18:31:11 -04:00
parent 7043f08dab
commit 518d2b3bb6
16 changed files with 1546 additions and 225 deletions
+48 -3
View File
@@ -1053,6 +1053,51 @@
<property name="can_focus">False</property>
<property name="orientation">vertical</property>
<property name="spacing">6</property>
<child>
<object class="GtkBox" id="soundSinkHBox">
<property name="visible">True</property>
<property name="can_focus">False</property>
<property name="spacing">12</property>
<child>
<object class="GtkLabel" id="soundSinkLabel">
<property name="visible">True</property>
<property name="can_focus">False</property>
<property name="xalign">0</property>
<property name="label" translatable="yes" comments="Translators: This is the label for a combo box where users can choose which audio backend Cthulhu should use.">Audio _backend:</property>
<property name="use_underline">True</property>
<property name="mnemonic_widget">soundSinkCombo</property>
<accessibility>
<relation type="label-for" target="soundSinkCombo"/>
</accessibility>
</object>
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">0</property>
</packing>
</child>
<child>
<object class="GtkComboBoxText" id="soundSinkCombo">
<property name="visible">True</property>
<property name="can_focus">False</property>
<signal name="changed" handler="soundSinkComboChanged" swapped="no"/>
<accessibility>
<relation type="labelled-by" target="soundSinkLabel"/>
</accessibility>
</object>
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">1</property>
</packing>
</child>
</object>
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">0</property>
</packing>
</child>
<child>
<object class="GtkBox" id="soundThemeHBox">
<property name="visible">True</property>
@@ -1095,7 +1140,7 @@
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">0</property>
<property name="position">1</property>
</packing>
</child>
<child>
@@ -1140,7 +1185,7 @@
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">1</property>
<property name="position">2</property>
</packing>
</child>
</object>
@@ -1151,7 +1196,7 @@
<object class="GtkLabel" id="soundThemeTitleLabel">
<property name="visible">True</property>
<property name="can_focus">False</property>
<property name="label" translatable="yes" comments="Translators: This is the title of a section in the preferences dialog containing sound theme options.">Sound Theme</property>
<property name="label" translatable="yes" comments="Translators: This is the title of a section in the preferences dialog containing sound options.">Sound</property>
<attributes>
<attribute name="weight" value="bold"/>
</attributes>
+2 -2
View File
@@ -769,7 +769,7 @@ def shutdown(script: Optional[Any] = None, inputEvent: Optional[Any] = None) ->
cthulhu_state.activeScript.presentationInterrupt()
cthulhuApp.getSignalManager().emitSignal('stop-application-completed')
sound_theme_manager.getManager().playStopSound(wait=True)
sound_theme_manager.getManager().playStopSound(wait=True, timeoutSeconds=1)
cthulhuApp.getPluginSystemManager().unloadAllPlugins(ForceAllPlugins=True)
# Deactivate the event manager first so that it clears its queue and will not
@@ -893,7 +893,7 @@ def main() -> int:
debug.printMessage(debug.LEVEL_INFO, "CTHULHU: Initialized.", True)
script = cthulhu_state.activeScript
sound_theme_manager.getManager().playStartSound(wait=True)
sound_theme_manager.getManager().playStartSound(wait=False)
soundFailureReason = sound.getSoundSystemFailureReason()
if soundFailureReason:
debug.printMessage(
+52 -1
View File
@@ -60,6 +60,7 @@ from . import cthulhu_gui_profile
from . import cthulhu_state
from . import settings
from . import settings_manager
from . import sound_sink
from . import input_event
from . import input_event_manager
from . import keybindings
@@ -195,6 +196,7 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
self.savedPitch = None
self.savedRate = None
self.soundThemeCombo = None
self.soundSinkCombo = None
self.roleSoundPresentationCombo = None
self._isInitialSetup = False
self._updatingSpeechFamilies = False
@@ -3102,15 +3104,37 @@ print(json.dumps(result))
self.ocrCopyToClipboardCheckButton.set_active(copyToClipboard)
def _initSoundThemeState(self):
"""Initialize Sound Theme widgets with current settings."""
"""Initialize sound widgets with current settings."""
prefs = self.prefsDict
# Get widget references
self.soundSinkCombo = self.get_widget("soundSinkCombo")
self.soundThemeCombo = self.get_widget("soundThemeCombo")
self.roleSoundPresentationCombo = self.get_widget("roleSoundPresentationCombo")
self.soundSinkCombo.set_can_focus(False)
self.soundThemeCombo.set_can_focus(False)
self.roleSoundPresentationCombo.set_can_focus(False)
self._soundSinkChoices = [
(settings.SOUND_SINK_AUTO, guilabels.SOUND_BACKEND_AUTO),
(settings.SOUND_SINK_PIPEWIRE, guilabels.SOUND_BACKEND_PIPEWIRE),
(settings.SOUND_SINK_PULSE, guilabels.SOUND_BACKEND_PULSE),
(settings.SOUND_SINK_ALSA, guilabels.SOUND_BACKEND_ALSA),
]
self.soundSinkCombo.remove_all()
for _, label in self._soundSinkChoices:
self.soundSinkCombo.append_text(label)
runtimeSoundSink = cthulhu.cthulhuApp.settingsManager.getSetting("soundSink")
currentSink = sound_sink.normalize_sound_sink_choice(
prefs.get("soundSink", runtimeSoundSink if runtimeSoundSink is not None else settings.soundSink)
)
sinkIndex = 0
for index, (value, _) in enumerate(self._soundSinkChoices):
if value == currentSink:
sinkIndex = index
break
self.soundSinkCombo.set_active(sinkIndex)
# Populate sound theme combo box
themeManager = sound_theme_manager.getManager()
availableThemes = themeManager.getAvailableThemes()
@@ -3159,6 +3183,14 @@ print(json.dumps(result))
if activeText:
self.prefsDict["soundTheme"] = activeText
def soundSinkComboChanged(self, widget):
"""Signal handler for the sound backend combo box."""
activeIndex = widget.get_active()
if activeIndex < 0:
return
value = self._soundSinkChoices[activeIndex][0]
self.prefsDict["soundSink"] = value
def roleSoundPresentationComboChanged(self, widget):
"""Signal handler for the role sound presentation combo box."""
activeIndex = widget.get_active()
@@ -4844,6 +4876,11 @@ print(json.dumps(result))
if not self._isInitialSetup:
self.restoreSettings()
if self.soundSinkCombo is not None:
activeIndex = self.soundSinkCombo.get_active()
if activeIndex >= 0:
self.prefsDict["soundSink"] = self._soundSinkChoices[activeIndex][0]
enable = self.get_widget("speechSupportCheckButton").get_active()
self.prefsDict["enableSpeech"] = enable
@@ -4855,6 +4892,20 @@ print(json.dumps(result))
if speechServerChoice:
self.prefsDict["speechServerInfo"] = \
speechServerChoice.getInfo()
else:
activeSpeechInfo = speech.getInfo()
if activeSpeechInfo:
self.prefsDict["speechServerInfo"] = activeSpeechInfo
runtimeSpeechServerInfo = cthulhu.cthulhuApp.settingsManager.getSetting(
"speechServerInfo"
)
if runtimeSpeechServerInfo and not self.prefsDict.get("speechServerInfo"):
self.prefsDict["speechServerInfo"] = runtimeSpeechServerInfo
existingSpeechServerInfo = self.prefsDict.get("speechServerInfo")
if existingSpeechServerInfo:
self.prefsDict["speechServerInfo"] = existingSpeechServerInfo
if self.defaultVoice is not None:
self.prefsDict["voices"] = {
+19 -2
View File
@@ -938,9 +938,26 @@ USE_STRUCTURAL_NAVIGATION = _("Enable _structural navigation")
# audio files that Cthulhu plays for various events.
SOUND_THEME = _("Sound _theme:")
# Translators: This is the label for a combo box in the preferences dialog
# where users can choose which audio backend Cthulhu should use.
SOUND_BACKEND = _("Audio _backend:")
# Translators: This is a sound backend option which lets Cthulhu choose a
# backend automatically.
SOUND_BACKEND_AUTO = _("Automatic")
# Translators: This is a sound backend option which forces PipeWire.
SOUND_BACKEND_PIPEWIRE = _("PipeWire")
# Translators: This is a sound backend option which forces PulseAudio.
SOUND_BACKEND_PULSE = _("PulseAudio")
# Translators: This is a sound backend option which forces ALSA.
SOUND_BACKEND_ALSA = _("ALSA")
# Translators: This is the title of a frame in the preferences dialog
# containing sound theme options.
SOUND_THEME_TITLE = _("Sound Theme")
# containing sound options.
SOUND_THEME_TITLE = _("Sound")
# Translators: This refers to the amount of information Cthulhu provides about a
# particular object that receives focus.
+2
View File
@@ -86,6 +86,8 @@ cthulhu_python_sources = files([
'signal_manager.py',
'sleep_mode_manager.py',
'sound.py',
'sound_helper.py',
'sound_sink.py',
'sound_generator.py',
'sound_theme_manager.py',
'speech_and_verbosity_manager.py',
+8 -3
View File
@@ -41,6 +41,7 @@ else:
_gstreamerAvailable, args = Gst.init_check(None)
from . import debug
from . import sound_sink
class PiperAudioPlayer:
@@ -84,6 +85,7 @@ class PiperAudioPlayer:
try:
self._pipeline = Gst.Pipeline.new("piper-audio")
configuredSink = sound_sink.get_configured_sound_sink()
self._appsrc = Gst.ElementFactory.make("appsrc", "source")
if self._appsrc is None:
@@ -110,9 +112,9 @@ class PiperAudioPlayer:
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
sink = Gst.ElementFactory.make("autoaudiosink", "sink")
sink, sinkName, sinkError = sound_sink.create_audio_sink("sink", configuredSink)
if sink is None:
msg = 'PIPER AUDIO: Failed to create autoaudiosink element'
msg = f'PIPER AUDIO: {sinkError or "Failed to create audio sink"}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
@@ -141,7 +143,10 @@ class PiperAudioPlayer:
bus.connect("message", self._onMessage)
self._initialized = True
msg = 'PIPER AUDIO: Pipeline initialized successfully'
msg = (
f'PIPER AUDIO: Pipeline initialized successfully using '
f'{sinkName} (soundSink={configuredSink})'
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
return True
+44 -13
View File
@@ -1450,10 +1450,36 @@ class Script(default.Script):
return None
def _getClickableActivationTarget(self):
obj = cthulhu_state.locusOfFocus
if self.inFocusMode():
return obj
if not self.utilities.inDocumentContent(obj):
return obj
contextObj, _ = self.utilities.getCaretContext(searchIfNeeded=False)
if contextObj and self.utilities.inDocumentContent(contextObj):
return contextObj
return obj
def _performClickableAction(self, obj):
from cthulhu import ax_object
actionNames = ["click", "click-ancestor", "press", "jump", "open", "activate"]
for actionName in actionNames:
if not ax_object.AXObject.has_action(obj, actionName):
continue
if ax_object.AXObject.do_named_action(obj, actionName):
return True
return False
def _tryClickableActivation(self, keyboardEvent):
"""Try to activate clickable element - returns True if we should consume the event."""
obj = cthulhu_state.locusOfFocus
obj = self._getClickableActivationTarget()
if not obj or not self.utilities.inDocumentContent(obj):
return False
@@ -1474,31 +1500,36 @@ class Script(default.Script):
# First try the standard clickable detection
if self.utilities.isClickableElement(obj):
from cthulhu import ax_event_synthesizer
# Give immediate feedback that activation is starting
self.presentMessage("Activating...")
result = self._performClickableAction(obj)
if result:
self._presentDelayedMessage("Element activated", 50)
return True
from cthulhu import ax_event_synthesizer
result = ax_event_synthesizer.AXEventSynthesizer.click_object(obj)
if result:
# Schedule success message after a brief delay
self._presentDelayedMessage("Element activated", 50)
# Try to restore focus to the clicked element after a brief delay
self._restoreFocusAfterClick(original_focus)
return True
# If that didn't work, try a more permissive approach for any element with click action
from cthulhu import ax_object
if ax_object.AXObject.has_action(obj, "click"):
from cthulhu import ax_event_synthesizer
# Give immediate feedback that activation is starting
if ax_object.AXObject.has_action(obj, "click") \
or ax_object.AXObject.has_action(obj, "click-ancestor"):
self.presentMessage("Activating...")
result = self._performClickableAction(obj)
if result:
self._presentDelayedMessage("Element activated", 50)
return True
from cthulhu import ax_event_synthesizer
result = ax_event_synthesizer.AXEventSynthesizer.click_object(obj)
if result:
# Schedule success message after a brief delay
self._presentDelayedMessage("Element activated", 50)
# Try to restore focus to the clicked element after a brief delay
self._restoreFocusAfterClick(original_focus)
return True
return False
def _restoreFocusAfterClick(self, original_focus):
+34 -8
View File
@@ -618,8 +618,24 @@ class Utilities(script_utilities.Utilities):
return self.queryNonEmptyText(obj, False) is None
def isHidden(self, obj):
attrs = self.objectAttributes(obj, False)
return attrs.get('hidden', False)
hiddenValues = {"true", "1", "yes"}
hiddenStyles = {"none", "hidden", "collapse"}
current = obj
while current and self.inDocumentContent(current):
attrs = self.objectAttributes(current, False)
hidden = str(attrs.get("hidden", "")).lower()
display = str(attrs.get("display", "")).lower()
visibility = str(attrs.get("visibility", "")).lower()
if hidden in hiddenValues or display in hiddenStyles or visibility in hiddenStyles:
return True
parent = AXObject.get_parent(current)
if parent == current:
break
current = parent
return False
def _isOrIsIn(self, child, parent):
if not (child and parent):
@@ -4772,13 +4788,23 @@ class Utilities(script_utilities.Utilities):
startTime = time.time()
rv = None
if AXUtilities.is_focusable(obj):
tokens = ["WEB: Focusable object can have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
rv = True
if self.isHidden(obj):
tokens = ["WEB: Hidden object cannot have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
rv = False
else:
tokens = ["WEB: Focusable object can have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
rv = True
elif AXUtilities.is_editable(obj):
tokens = ["WEB: Editable object can have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
rv = True
if self.isHidden(obj):
tokens = ["WEB: Hidden object cannot have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
rv = False
else:
tokens = ["WEB: Editable object can have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
rv = True
elif AXUtilities.is_landmark(obj):
tokens = ["WEB: Landmark can have caret context", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
+12
View File
@@ -85,6 +85,7 @@ userCustomizableSettings = [
"brailleLinkIndicator",
"enableSound",
"soundVolume",
"soundSink",
"playSoundForRole",
"playSoundForState",
"playSoundForPositionInSet",
@@ -333,9 +334,20 @@ brailleVerbosityLevel = VERBOSITY_LEVEL_VERBOSE
ROLE_SOUND_PRESENTATION_SOUND_AND_SPEECH = "sound_and_speech"
ROLE_SOUND_PRESENTATION_SPEECH_ONLY = "speech_only"
ROLE_SOUND_PRESENTATION_SOUND_ONLY = "sound_only"
SOUND_SINK_AUTO = "auto"
SOUND_SINK_PIPEWIRE = "pipewire"
SOUND_SINK_PULSE = "pulse"
SOUND_SINK_ALSA = "alsa"
SOUND_SINK_VALUES = (
SOUND_SINK_AUTO,
SOUND_SINK_PIPEWIRE,
SOUND_SINK_PULSE,
SOUND_SINK_ALSA,
)
enableSound = True
soundVolume = 0.5
soundSink = SOUND_SINK_AUTO
playSoundForRole = False
playSoundForState = False
playSoundForPositionInSet = False
+472 -180
View File
@@ -31,9 +31,14 @@ __date__ = "$Date:$"
__copyright__ = "Copyright (c) 2016 Cthulhu Team"
__license__ = "LGPL"
import json
import os
import subprocess
import sys
import threading
from typing import Any, Optional, Tuple
import gi
from gi.repository import GLib
from typing import Optional, Any
try:
gi.require_version('Gst', '1.0')
@@ -41,54 +46,40 @@ try:
except Exception:
_gstreamerAvailable: bool = False
else:
_gstreamerAvailable, args = Gst.init_check(None)
_gstreamerAvailable, _args = Gst.init_check(None)
from . import debug
from . import settings
from . import sound_sink
from .sound_generator import Icon, Tone
_soundSystemFailureReason: Optional[str] = None
class _PendingResponse:
def __init__(self) -> None:
self.event = threading.Event()
self.response: Optional[dict[str, Any]] = None
class Player:
"""Plays Icons and Tones."""
"""Plays Icons and Tones through a persistent worker process."""
def __init__(self) -> None:
self._initialized: bool = False
self._source: Optional[Any] = None # Optional[Gst.Element]
self._volume: Optional[Any] = None # Optional[Gst.Element]
self._sink: Optional[Any] = None # Optional[Gst.Element]
self._player: Optional[Any] = None # Optional[Gst.Element]
self._pipeline: Optional[Any] = None # Optional[Gst.Pipeline]
self._initialized = False
self._workerProcess: Optional[subprocess.Popen[str]] = None
self._workerSink: Optional[str] = None
self._workerRestartRequired = False
self._workerRestartReason: Optional[str] = None
self._workerLock = threading.RLock()
self._responseLock = threading.Lock()
self._pendingResponses: dict[int, _PendingResponse] = {}
self._nextRequestId = 1
if not _gstreamerAvailable:
msg = 'SOUND ERROR: Gstreamer is not available'
debug.printMessage(debug.LEVEL_INFO, msg, True)
debug.printMessage(debug.LEVEL_INFO, 'SOUND ERROR: Gstreamer is not available', True)
return
self.init()
def _onPlayerMessage(self, bus: Any, message: Any) -> None: # bus: Gst.Bus, message: Gst.Message
if message.type == Gst.MessageType.EOS:
self._player.set_state(Gst.State.NULL)
elif message.type == Gst.MessageType.ERROR:
self._player.set_state(Gst.State.NULL)
error, info = message.parse_error()
msg = f'SOUND ERROR: {error}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _onPipelineMessage(self, bus: Any, message: Any) -> None: # bus: Gst.Bus, message: Gst.Message
if message.type == Gst.MessageType.EOS:
self._pipeline.set_state(Gst.State.NULL)
elif message.type == Gst.MessageType.ERROR:
self._pipeline.set_state(Gst.State.NULL)
error, info = message.parse_error()
msg = f'SOUND ERROR: {error}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _onTimeout(self, element: Any) -> bool: # element: Gst.Element
element.set_state(Gst.State.NULL)
return False
@staticmethod
def _get_configured_volume() -> float:
"""Returns the configured sound volume with a safe fallback."""
@@ -103,117 +94,18 @@ class Player:
return max(0.0, float(settings.soundVolume))
def _playIcon(self, icon: Icon, interrupt: bool = True) -> None:
"""Plays a sound icon, interrupting the current play first unless specified."""
if not self._player:
return
if interrupt:
self._player.set_state(Gst.State.NULL)
self._player.set_property('volume', self._get_configured_volume())
self._player.set_property('uri', f'file://{icon.path}')
self._player.set_state(Gst.State.PLAYING)
def _playIconAndWait(self, icon: Icon, interrupt: bool = True, timeout_seconds: Optional[int] = 10) -> bool:
"""Plays a sound icon and waits for completion."""
if not self._player:
return False
if interrupt:
self._player.set_state(Gst.State.NULL)
self._player.set_property('volume', self._get_configured_volume())
self._player.set_property('uri', f'file://{icon.path}')
self._player.set_state(Gst.State.PLAYING)
bus = self._player.get_bus()
if not bus:
return False
if timeout_seconds is None:
timeout_ns = Gst.CLOCK_TIME_NONE
else:
timeout_ns = int(timeout_seconds * Gst.SECOND)
message = bus.timed_pop_filtered(
timeout_ns,
Gst.MessageType.EOS | Gst.MessageType.ERROR
)
if message and message.type == Gst.MessageType.ERROR:
error, info = message.parse_error()
msg = f'SOUND ERROR: {error}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._player.set_state(Gst.State.NULL)
return message is not None and message.type == Gst.MessageType.EOS
def _playTone(self, tone: Tone, interrupt: bool = True) -> None:
"""Plays a tone, interrupting the current play first unless specified."""
if not self._pipeline or not self._source:
return
if interrupt:
self._pipeline.set_state(Gst.State.NULL)
self._pipeline.set_state(Gst.State.NULL)
if self._volume is not None:
self._volume.set_property('volume', tone.volume)
self._source.set_property('volume', 1.0)
else:
self._source.set_property('volume', tone.volume)
self._source.set_property('freq', tone.frequency)
self._source.set_property('wave', tone.wave)
self._pipeline.set_state(Gst.State.PLAYING)
duration = int(1000 * tone.duration)
GLib.timeout_add(duration, self._onTimeout, self._pipeline)
def init(self) -> None:
"""(Re)Initializes the Player."""
if self._initialized:
return
"""(Re)Initializes the persistent worker."""
if not _gstreamerAvailable:
return
self._player = Gst.ElementFactory.make('playbin', 'player')
if self._player is None:
msg = 'SOUND ERROR: Gstreamer is available, but player is None'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
with self._workerLock:
if self._ensureWorkerLocked():
self._initialized = True
clearSoundSystemFailure()
bus = self._player.get_bus()
bus.add_signal_watch()
bus.connect("message", self._onPlayerMessage)
self._pipeline = Gst.Pipeline(name='cthulhu-pipeline')
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._onPipelineMessage)
self._source = Gst.ElementFactory.make('audiotestsrc', 'src')
self._volume = Gst.ElementFactory.make('volume', 'volume')
self._sink = Gst.ElementFactory.make('autoaudiosink', 'output')
if self._source is None or self._sink is None:
return
self._pipeline.add(self._source)
if self._volume is not None:
self._pipeline.add(self._volume)
self._pipeline.add(self._sink)
if self._volume is not None:
self._source.link(self._volume)
self._volume.link(self._sink)
else:
self._source.link(self._sink)
self._initialized = True
def play(self, item, interrupt=True):
def play(self, item: Any, interrupt: bool = True) -> None:
"""Plays a sound, interrupting the current play first unless specified."""
if isinstance(item, Icon):
@@ -221,59 +113,406 @@ class Player:
elif isinstance(item, Tone):
self._playTone(item, interrupt)
else:
tokens = ["SOUND ERROR:", item, "is not an Icon or Tone"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
debug.printTokens(debug.LEVEL_INFO, ["SOUND ERROR:", item, "is not an Icon or Tone"], True)
def playAndWait(self, item, interrupt=True, timeout_seconds=10):
def playAndWait(self, item: Any, interrupt: bool = True, timeout_seconds: int = 10) -> bool:
"""Plays a sound and blocks until completion or timeout."""
if not self._player:
if _gstreamerAvailable and not self._initialized:
self.init()
if not self._player:
return False
if isinstance(item, Icon):
return self._playIconAndWait(
item,
interrupt=interrupt,
timeout_seconds=timeout_seconds
)
return self._playIconAndWait(item, interrupt=interrupt, timeout_seconds=timeout_seconds)
if isinstance(item, Tone):
return self._playToneAndWait(item, interrupt=interrupt, timeout_seconds=timeout_seconds)
self.play(item, interrupt)
return False
def stop(self, element=None):
"""Stops play."""
def stop(self, _element: Any = None) -> None:
"""Stops current sound playback."""
if not _gstreamerAvailable:
return
self._sendWorkerCommand({"action": "stop"}, waitForResponse=False)
if element:
element.set_state(Gst.State.NULL)
return
if self._player:
self._player.set_state(Gst.State.NULL)
if self._pipeline:
self._pipeline.set_state(Gst.State.NULL)
def shutdown(self):
def shutdown(self) -> None:
"""Shuts down the sound utilities."""
global _gstreamerAvailable
if not _gstreamerAvailable:
return
self.stop()
self._initialized = False
_gstreamerAvailable = False
with self._workerLock:
self._stopWorkerLocked("Sound system shutdown")
self._initialized = False
_player = Player()
def _playIcon(self, icon: Icon, interrupt: bool = True) -> None:
if not icon.isValid():
return
success, reason = self._sendWorkerCommand(
{
"action": "play_file",
"path": icon.path,
"volume": self._get_configured_volume(),
"interrupt": interrupt,
},
waitForResponse=False,
)
if not success and reason:
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
def _playIconAndWait(
self,
icon: Icon,
interrupt: bool = True,
timeout_seconds: Optional[int] = 10,
) -> bool:
if not icon.isValid():
return False
timeout = float((timeout_seconds or 10) + 2)
success, reason = self._sendWorkerCommand(
{
"action": "play_file",
"path": icon.path,
"volume": self._get_configured_volume(),
"interrupt": interrupt,
},
waitForResponse=True,
timeout=timeout,
)
if not success and reason:
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
return success
def _playTone(self, tone: Tone, interrupt: bool = True) -> None:
success, reason = self._sendWorkerCommand(
self._buildToneCommand(tone, interrupt),
waitForResponse=False,
)
if not success and reason:
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
def _playToneAndWait(
self,
tone: Tone,
interrupt: bool = True,
timeout_seconds: Optional[int] = 10,
) -> bool:
timeout = max(float(timeout_seconds or 10), float(tone.duration) + 2.0)
success, reason = self._sendWorkerCommand(
self._buildToneCommand(tone, interrupt),
waitForResponse=True,
timeout=timeout,
)
if not success and reason:
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
return success
def _buildToneCommand(self, tone: Tone, interrupt: bool) -> dict[str, Any]:
return {
"action": "play_tone",
"duration": tone.duration,
"frequency": tone.frequency,
"volume": tone.volume,
"wave": tone.wave,
"interrupt": interrupt,
}
def _ensureWorkerLocked(self) -> bool:
configuredSink = sound_sink.get_configured_sound_sink()
if self._workerProcess is not None and self._workerProcess.poll() is None:
if self._workerRestartRequired:
reason = self._consumeWorkerRestartReason()
debug.printMessage(
debug.LEVEL_INFO,
f"SOUND: Restarting persistent worker after recovery request: {reason}",
True,
)
self._stopWorkerLocked(reason)
elif self._workerSink == configuredSink:
return True
else:
debug.printMessage(
debug.LEVEL_INFO,
f"SOUND: Restarting persistent worker for soundSink={configuredSink}",
True,
)
self._stopWorkerLocked("Sound sink changed")
return self._startWorkerLocked(configuredSink)
def _startWorkerLocked(self, configuredSink: str) -> bool:
environment = _buildSoundHelperEnvironment()
command = [
sys.executable,
"-m",
"cthulhu.sound_helper",
"--worker",
"--sound-sink",
configuredSink,
]
try:
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=environment,
)
except Exception as error:
reason = f"Failed to start persistent sound worker: {error}"
disableSoundSystem(reason)
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
return False
if process.stdin is None or process.stdout is None or process.stderr is None:
try:
process.terminate()
except Exception:
pass
reason = "Persistent sound worker is missing stdio pipes"
disableSoundSystem(reason)
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
return False
self._workerProcess = process
self._workerSink = configuredSink
self._workerRestartRequired = False
self._workerRestartReason = None
self._startWorkerThreads(process)
success, reason = self._sendWorkerCommandLocked(
{"action": "ping"},
waitForResponse=True,
timeout=2.0,
allowRestart=False,
)
if not success:
self._stopWorkerLocked(reason or "Persistent sound worker failed to initialize")
disableSoundSystem(reason or "Persistent sound worker failed to initialize")
if reason:
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
return False
debug.printMessage(
debug.LEVEL_INFO,
f"SOUND: Using persistent worker for icon playback (soundSink={configuredSink})",
True,
)
debug.printMessage(
debug.LEVEL_INFO,
f"SOUND: Using persistent worker for tone playback (soundSink={configuredSink})",
True,
)
return True
def _startWorkerThreads(self, process: subprocess.Popen[str]) -> None:
threading.Thread(
target=self._readWorkerStdout,
args=(process,),
daemon=True,
).start()
threading.Thread(
target=self._readWorkerStderr,
args=(process,),
daemon=True,
).start()
threading.Thread(
target=self._watchWorkerExit,
args=(process,),
daemon=True,
).start()
def _readWorkerStdout(self, process: subprocess.Popen[str]) -> None:
assert process.stdout is not None
for line in process.stdout:
line = line.strip()
if not line:
continue
try:
response = json.loads(line)
except Exception:
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: Invalid worker response: {line}", True)
continue
requestId = response.get("id")
if requestId is None:
continue
with self._responseLock:
pending = self._pendingResponses.pop(int(requestId), None)
if pending is None:
continue
pending.response = response
pending.event.set()
def _readWorkerStderr(self, process: subprocess.Popen[str]) -> None:
assert process.stderr is not None
for line in process.stderr:
message = line.strip()
if message:
self._handleWorkerDiagnostic(message)
debug.printMessage(debug.LEVEL_INFO, f"SOUND WORKER: {message}", True)
def _watchWorkerExit(self, process: subprocess.Popen[str]) -> None:
returnCode = process.wait()
reason = _formatWorkerFailure(returnCode)
with self._workerLock:
if self._workerProcess is process:
self._workerProcess = None
self._workerSink = None
self._failPendingResponses(reason)
if returnCode != 0:
debug.printMessage(debug.LEVEL_INFO, f"SOUND ERROR: {reason}", True)
def _failPendingResponses(self, reason: str) -> None:
with self._responseLock:
pendingResponses = list(self._pendingResponses.values())
self._pendingResponses.clear()
for pending in pendingResponses:
pending.response = {"ok": False, "error": reason}
pending.event.set()
def _stopWorkerLocked(self, reason: str) -> None:
process = self._workerProcess
self._workerProcess = None
self._workerSink = None
self._workerRestartRequired = False
self._workerRestartReason = None
if process is None:
return
if process.poll() is None:
try:
assert process.stdin is not None
process.stdin.write(json.dumps({"action": "shutdown"}) + "\n")
process.stdin.flush()
process.wait(timeout=1.0)
except Exception:
try:
process.terminate()
process.wait(timeout=1.0)
except Exception:
try:
process.kill()
process.wait(timeout=1.0)
except Exception:
pass
self._failPendingResponses(reason)
def _sendWorkerCommand(
self,
command: dict[str, Any],
waitForResponse: bool,
timeout: float = 2.0,
) -> Tuple[bool, Optional[str]]:
with self._workerLock:
return self._sendWorkerCommandLocked(command, waitForResponse, timeout)
def _sendWorkerCommandLocked(
self,
command: dict[str, Any],
waitForResponse: bool,
timeout: float = 2.0,
allowRestart: bool = True,
) -> Tuple[bool, Optional[str]]:
if not self._ensureWorkerLocked():
return False, getSoundSystemFailureReason() or "Persistent sound worker is unavailable"
process = self._workerProcess
if process is None or process.stdin is None:
if allowRestart:
self._stopWorkerLocked("Worker pipes disappeared")
if self._ensureWorkerLocked():
return self._sendWorkerCommandLocked(command, waitForResponse, timeout, allowRestart=False)
return False, "Persistent sound worker stdin is unavailable"
requestId: Optional[int] = None
pending: Optional[_PendingResponse] = None
payload = dict(command)
if waitForResponse:
requestId = self._allocateRequestId()
pending = _PendingResponse()
payload["id"] = requestId
with self._responseLock:
self._pendingResponses[requestId] = pending
try:
process.stdin.write(json.dumps(payload) + "\n")
process.stdin.flush()
except Exception as error:
if requestId is not None:
with self._responseLock:
self._pendingResponses.pop(requestId, None)
if allowRestart:
self._stopWorkerLocked(f"Worker write failed: {error}")
if self._ensureWorkerLocked():
return self._sendWorkerCommandLocked(command, waitForResponse, timeout, allowRestart=False)
return False, f"Worker write failed: {error}"
if not waitForResponse or pending is None:
return True, None
if pending.event.wait(timeout):
response = pending.response or {"ok": False, "error": "No worker response"}
if not bool(response.get("ok")):
self._maybeMarkWorkerRestartRequired(response.get("error"))
return bool(response.get("ok")), response.get("error")
with self._responseLock:
self._pendingResponses.pop(requestId, None)
if allowRestart and (self._workerProcess is None or self._workerProcess.poll() is not None):
self._stopWorkerLocked("Worker exited while waiting for response")
if self._ensureWorkerLocked():
return self._sendWorkerCommandLocked(command, waitForResponse, timeout, allowRestart=False)
reason = f"Persistent sound worker timed out after {timeout:.1f} seconds"
self._markWorkerRestartRequired(reason)
return False, reason
def _allocateRequestId(self) -> int:
with self._responseLock:
requestId = self._nextRequestId
self._nextRequestId += 1
return requestId
def _handleWorkerDiagnostic(self, message: str) -> None:
normalizedMessage = str(message).strip()
if not normalizedMessage.startswith("RECOVERY REQUIRED:"):
return
reason = normalizedMessage.split(":", 1)[1].strip() or "Worker requested recovery"
self._markWorkerRestartRequired(reason)
def _markWorkerRestartRequired(self, reason: Optional[str]) -> None:
normalizedReason = str(reason or "").strip() or "Worker requested recovery"
with self._workerLock:
self._workerRestartRequired = True
self._workerRestartReason = normalizedReason
def _consumeWorkerRestartReason(self) -> str:
reason = self._workerRestartReason or "Worker requested recovery"
self._workerRestartRequired = False
self._workerRestartReason = None
return reason
def _maybeMarkWorkerRestartRequired(self, reason: Optional[str]) -> None:
normalizedReason = str(reason or "").strip().lower()
if normalizedReason in {"", "interrupted", "stopped", "shutdown", "stdin closed"}:
return
self._markWorkerRestartRequired(reason)
def getPlayer():
return _player
def disableSoundSystem(reason: str) -> None:
global _soundSystemFailureReason
@@ -282,11 +521,64 @@ def disableSoundSystem(reason: str) -> None:
return
_soundSystemFailureReason = reason
msg = f"SOUND: Disabling sound system. Reason: {reason}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
debug.printMessage(debug.LEVEL_INFO, f"SOUND: Disabling sound system. Reason: {reason}", True)
def getSoundSystemFailureReason() -> Optional[str]:
return _soundSystemFailureReason
def isSoundSystemAvailable() -> bool:
return _soundSystemFailureReason is None
def clearSoundSystemFailure() -> None:
global _soundSystemFailureReason
_soundSystemFailureReason = None
def _buildSoundHelperEnvironment() -> dict[str, str]:
pythonPathEntries = []
for path in sys.path:
if not path:
path = os.getcwd()
if os.path.isdir(path) and path not in pythonPathEntries:
pythonPathEntries.append(path)
environment = os.environ.copy()
if pythonPathEntries:
environment["PYTHONPATH"] = os.pathsep.join(pythonPathEntries)
return environment
def _formatWorkerFailure(returnCode: int) -> str:
if returnCode < 0:
return f"Sound worker exited via signal {-returnCode}"
return f"Sound worker exited with status {returnCode}"
_player = Player()
def getPlayer() -> Player:
return _player
def play(item: Any, interrupt: bool = True) -> None:
_player.play(item, interrupt)
def playIconSafely(icon: Icon, timeoutSeconds: int = 10) -> Tuple[bool, Optional[str]]:
if not icon.isValid():
return False, f"Invalid sound icon: {icon.path}"
return _player._sendWorkerCommand(
{
"action": "play_file",
"path": icon.path,
"volume": Player._get_configured_volume(),
"interrupt": True,
},
waitForResponse=True,
timeout=max(0.1, float(timeoutSeconds)),
)
+443
View File
@@ -0,0 +1,443 @@
#!/usr/bin/env python3
#
# Copyright (c) 2026 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
"""Persistent sound helper used to isolate theme and tone playback crashes."""
import argparse
from collections import deque
import json
import pathlib
import sys
import threading
from typing import Any, Optional
import gi
gi.require_version('GLib', '2.0')
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from . import settings
from . import sound_sink
def _clamp_volume(volume: Any) -> float:
try:
return max(0.0, float(volume))
except Exception:
return 1.0
def _write_json_line(payload: dict[str, Any]) -> None:
sys.stdout.write(json.dumps(payload) + "\n")
sys.stdout.flush()
def _report_recovery_required(message: str) -> None:
print(f"RECOVERY REQUIRED: {message}", file=sys.stderr, flush=True)
def _create_file_player(
playerId: str,
soundSink: Optional[str] = None,
) -> tuple[Optional[Any], Optional[str], Optional[str]]:
player = Gst.ElementFactory.make("playbin", playerId)
if player is None:
return None, None, "Failed to create playbin for file playback"
configuredSink = sound_sink.normalize_sound_sink_choice(soundSink)
if configuredSink == settings.SOUND_SINK_AUTO:
return player, "playbin-default", None
audioSink, sinkName, sinkError = sound_sink.create_audio_sink(
f"{playerId}-output",
configuredSink,
)
if audioSink is None:
return None, sinkName, sinkError or f"Failed to create audio sink for {configuredSink}"
player.set_property("audio-sink", audioSink)
return player, sinkName or configuredSink, None
class SoundWorker:
"""Runs a long-lived GStreamer worker for icon and tone playback."""
def __init__(self, soundSink: Optional[str] = None) -> None:
available, _args = Gst.init_check(None)
if not available:
raise RuntimeError("GStreamer is not available")
self._loop = GLib.MainLoop()
self._queue: deque[dict[str, Any]] = deque()
self._currentCommand: Optional[dict[str, Any]] = None
self._toneTimeoutId = 0
self._filePlayer, fileSinkName, fileSinkError = _create_file_player(
"cthulhu-sound-worker-file",
soundSink,
)
if self._filePlayer is None:
raise RuntimeError(fileSinkError or "Failed to create file playback player")
fileBus = self._filePlayer.get_bus()
if fileBus is None:
raise RuntimeError("No bus available for file playback player")
fileBus.add_signal_watch()
fileBus.connect("message", self._onFileMessage)
self._tonePipeline = Gst.Pipeline.new('cthulhu-sound-worker-tone')
if self._tonePipeline is None:
raise RuntimeError("Failed to create tone pipeline")
self._toneSource = Gst.ElementFactory.make('audiotestsrc', 'cthulhu-sound-worker-source')
self._toneVolume = Gst.ElementFactory.make('volume', 'cthulhu-sound-worker-volume')
toneSink, toneSinkName, toneSinkError = sound_sink.create_audio_sink(
'cthulhu-sound-worker-tone-output',
soundSink
)
if self._toneSource is None or toneSink is None:
raise RuntimeError(toneSinkError or "Failed to create tone playback pipeline")
self._toneSink = toneSink
self._tonePipeline.add(self._toneSource)
if self._toneVolume is not None:
self._tonePipeline.add(self._toneVolume)
self._tonePipeline.add(self._toneSink)
if self._toneVolume is not None:
if not self._toneSource.link(self._toneVolume):
raise RuntimeError("Failed to link tone source to volume")
if not self._toneVolume.link(self._toneSink):
raise RuntimeError("Failed to link tone volume to sink")
elif not self._toneSource.link(self._toneSink):
raise RuntimeError("Failed to link tone source to sink")
toneBus = self._tonePipeline.get_bus()
if toneBus is None:
raise RuntimeError("No bus available for tone pipeline")
toneBus.add_signal_watch()
toneBus.connect("message", self._onToneMessage)
self._fileSinkName = fileSinkName or "unknown"
self._toneSinkName = toneSinkName or "unknown"
def run(self) -> int:
self._report_worker_ready()
readerThread = threading.Thread(target=self._read_commands, daemon=True)
readerThread.start()
try:
self._loop.run()
return 0
finally:
self._clear_tone_timeout()
self._filePlayer.set_state(Gst.State.NULL)
self._tonePipeline.set_state(Gst.State.NULL)
def _report_worker_ready(self) -> None:
print(
f"Persistent sound worker ready: file={self._fileSinkName} tone={self._toneSinkName}",
file=sys.stderr,
flush=True,
)
def _read_commands(self) -> None:
try:
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
command = json.loads(line)
except Exception as error:
print(f"Invalid worker command: {error}", file=sys.stderr, flush=True)
continue
GLib.idle_add(self._handle_command, command)
finally:
GLib.idle_add(self._quit_from_stdin_close)
def _quit_from_stdin_close(self) -> bool:
self._interrupt_current("stdin closed")
self._interrupt_queued("stdin closed")
self._loop.quit()
return False
def _handle_command(self, command: dict[str, Any]) -> bool:
action = str(command.get("action", "")).strip().lower()
requestId = command.get("id")
if action == "ping":
self._respond(requestId, True)
return False
if action == "shutdown":
self._interrupt_current("shutdown")
self._interrupt_queued("shutdown")
self._respond(requestId, True)
self._loop.quit()
return False
if action == "stop":
self._interrupt_current("stopped")
self._interrupt_queued("stopped")
self._respond(requestId, True)
return False
if action not in {"play_file", "play_tone"}:
self._respond(requestId, False, f"Unknown worker action: {action}")
return False
interrupt = bool(command.get("interrupt", True))
if interrupt:
self._interrupt_current("interrupted")
self._interrupt_queued("interrupted")
elif self._currentCommand is not None:
self._queue.append(command)
return False
if self._currentCommand is None:
self._start_command(command)
else:
self._queue.appendleft(command)
return False
def _start_command(self, command: dict[str, Any]) -> None:
action = command["action"]
if action == "play_file":
self._start_file_command(command)
return
if action == "play_tone":
self._start_tone_command(command)
return
self._respond(command.get("id"), False, f"Unsupported worker action: {action}")
self._start_next_command()
def _start_file_command(self, command: dict[str, Any]) -> None:
soundPath = pathlib.Path(str(command.get("path", ""))).expanduser()
if not soundPath.is_file():
self._respond(command.get("id"), False, f"Missing sound file: {soundPath}")
self._start_next_command()
return
self._clear_tone_timeout()
self._tonePipeline.set_state(Gst.State.NULL)
self._filePlayer.set_state(Gst.State.NULL)
self._currentCommand = command
self._filePlayer.set_property("uri", soundPath.resolve().as_uri())
self._filePlayer.set_property("volume", _clamp_volume(command.get("volume", 1.0)))
stateChange = self._filePlayer.set_state(Gst.State.PLAYING)
if stateChange == Gst.StateChangeReturn.FAILURE:
self._currentCommand = None
reason = f"Failed to start playback for {soundPath}"
_report_recovery_required(reason)
self._respond(command.get("id"), False, reason)
self._start_next_command()
def _start_tone_command(self, command: dict[str, Any]) -> None:
self._filePlayer.set_state(Gst.State.NULL)
self._tonePipeline.set_state(Gst.State.NULL)
self._clear_tone_timeout()
try:
durationSeconds = max(0.0, float(command.get("duration", 0.0)))
frequency = max(0, min(20000, int(command.get("frequency", 0))))
wave = int(command.get("wave", 0))
except Exception as error:
self._respond(command.get("id"), False, f"Invalid tone request: {error}")
self._start_next_command()
return
self._currentCommand = command
if self._toneVolume is not None:
self._toneVolume.set_property('volume', _clamp_volume(command.get("volume", 1.0)))
self._toneSource.set_property('volume', 1.0)
else:
self._toneSource.set_property('volume', _clamp_volume(command.get("volume", 1.0)))
self._toneSource.set_property('freq', frequency)
self._toneSource.set_property('wave', wave)
stateChange = self._tonePipeline.set_state(Gst.State.PLAYING)
if stateChange == Gst.StateChangeReturn.FAILURE:
self._currentCommand = None
reason = "Failed to start tone playback"
_report_recovery_required(reason)
self._respond(command.get("id"), False, reason)
self._start_next_command()
return
durationMs = max(1, int(durationSeconds * 1000))
self._toneTimeoutId = GLib.timeout_add(durationMs, self._finish_tone_playback)
def _finish_tone_playback(self) -> bool:
self._toneTimeoutId = 0
self._tonePipeline.set_state(Gst.State.NULL)
self._finish_current(True)
return False
def _clear_tone_timeout(self) -> None:
if self._toneTimeoutId:
GLib.source_remove(self._toneTimeoutId)
self._toneTimeoutId = 0
def _interrupt_current(self, reason: str) -> None:
if self._currentCommand is None:
self._filePlayer.set_state(Gst.State.NULL)
self._tonePipeline.set_state(Gst.State.NULL)
self._clear_tone_timeout()
return
current = self._currentCommand
self._currentCommand = None
self._filePlayer.set_state(Gst.State.NULL)
self._tonePipeline.set_state(Gst.State.NULL)
self._clear_tone_timeout()
self._respond(current.get("id"), False, reason)
def _interrupt_queued(self, reason: str) -> None:
while self._queue:
queued = self._queue.popleft()
self._respond(queued.get("id"), False, reason)
def _finish_current(self, success: bool, error: Optional[str] = None) -> None:
current = self._currentCommand
self._currentCommand = None
if current is not None:
self._respond(current.get("id"), success, error)
self._start_next_command()
def _start_next_command(self) -> None:
if self._currentCommand is not None or not self._queue:
return
self._start_command(self._queue.popleft())
def _respond(self, requestId: Any, success: bool, error: Optional[str] = None) -> None:
if requestId is None:
return
payload: dict[str, Any] = {
"id": requestId,
"ok": bool(success),
}
if error:
payload["error"] = str(error)
_write_json_line(payload)
def _onFileMessage(self, _bus: Any, message: Any) -> None:
if self._currentCommand is None or self._currentCommand.get("action") != "play_file":
return
if message.type == Gst.MessageType.EOS:
self._filePlayer.set_state(Gst.State.NULL)
self._finish_current(True)
return
if message.type == Gst.MessageType.ERROR:
self._filePlayer.set_state(Gst.State.NULL)
error, _info = message.parse_error()
_report_recovery_required(str(error))
print(str(error), file=sys.stderr, flush=True)
self._finish_current(False, str(error))
def _onToneMessage(self, _bus: Any, message: Any) -> None:
if self._currentCommand is None or self._currentCommand.get("action") != "play_tone":
return
if message.type != Gst.MessageType.ERROR:
return
self._tonePipeline.set_state(Gst.State.NULL)
self._clear_tone_timeout()
error, _info = message.parse_error()
_report_recovery_required(str(error))
print(str(error), file=sys.stderr, flush=True)
self._finish_current(False, str(error))
def play_file_once(
soundPath: str,
timeoutSeconds: int = 10,
soundSink: Optional[str] = None,
volume: float = 1.0,
) -> int:
available, _args = Gst.init_check(None)
if not available:
print("GStreamer is not available", file=sys.stderr)
return 1
player, sinkName, sinkError = _create_file_player(
"cthulhu-sound-helper-once",
soundSink,
)
if player is None:
print(sinkError or "Failed to create one-shot file playback player", file=sys.stderr)
return 1
player.set_property("uri", pathlib.Path(soundPath).resolve().as_uri())
player.set_property("volume", _clamp_volume(volume))
player.set_state(Gst.State.PLAYING)
bus = player.get_bus()
if bus is None:
player.set_state(Gst.State.NULL)
print("No bus available for one-shot playback", file=sys.stderr)
return 1
timeoutNs = int(timeoutSeconds * Gst.SECOND)
try:
message = bus.timed_pop_filtered(
timeoutNs,
Gst.MessageType.EOS | Gst.MessageType.ERROR
)
if message is None:
print(
f"Sound helper timed out after {timeoutSeconds} seconds on sink {sinkName}",
file=sys.stderr
)
return 1
if message.type == Gst.MessageType.ERROR:
error, _info = message.parse_error()
print(str(error), file=sys.stderr)
return 1
return 0
finally:
player.set_state(Gst.State.NULL)
def main() -> int:
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--worker", action="store_true")
parser.add_argument("--play-file")
parser.add_argument("--sound-sink")
parser.add_argument("--volume", type=float, default=1.0)
parser.add_argument("--timeout-seconds", type=int, default=10)
args, _unknown = parser.parse_known_args()
if args.worker:
worker = SoundWorker(args.sound_sink)
return worker.run()
if args.play_file:
return play_file_once(
args.play_file,
args.timeout_seconds,
args.sound_sink,
args.volume,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+115
View File
@@ -0,0 +1,115 @@
#!/usr/bin/env python3
#
# Copyright (c) 2026 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
"""Utilities for selecting GStreamer audio sinks."""
from __future__ import annotations
from typing import Any, Optional, Tuple
import gi
try:
gi.require_version('Gst', '1.0')
from gi.repository import Gst
except Exception:
_gstreamerAvailable: bool = False
else:
_gstreamerAvailable, _args = Gst.init_check(None)
from . import settings
_SINK_ELEMENT_BY_SETTING = {
settings.SOUND_SINK_PIPEWIRE: "pipewiresink",
settings.SOUND_SINK_PULSE: "pulsesink",
settings.SOUND_SINK_ALSA: "alsasink",
}
_AUTO_SINK_ELEMENTS = [
"autoaudiosink",
"pipewiresink",
"pulsesink",
"alsasink",
]
def normalize_sound_sink_choice(soundSink: Optional[str]) -> str:
"""Return a valid sound sink setting value."""
if soundSink is None:
return settings.SOUND_SINK_AUTO
choice = str(soundSink).strip().lower()
if choice in settings.SOUND_SINK_VALUES:
return choice
return settings.SOUND_SINK_AUTO
def get_configured_sound_sink() -> str:
"""Return the configured sound sink, falling back to defaults."""
try:
from . import cthulhu
if cthulhu.cthulhuApp is not None:
configured = cthulhu.cthulhuApp.settingsManager.getSetting("soundSink")
return normalize_sound_sink_choice(configured)
except Exception:
pass
return normalize_sound_sink_choice(getattr(settings, "soundSink", settings.SOUND_SINK_AUTO))
def _get_sink_element_candidates(soundSink: str) -> list[str]:
if soundSink == settings.SOUND_SINK_AUTO:
return list(_AUTO_SINK_ELEMENTS)
elementName = _SINK_ELEMENT_BY_SETTING.get(soundSink)
if elementName:
return [elementName]
return list(_AUTO_SINK_ELEMENTS)
def create_audio_sink(
sinkId: str,
soundSink: Optional[str] = None,
) -> Tuple[Optional[Any], Optional[str], Optional[str]]:
"""Create a configured GStreamer audio sink."""
if not _gstreamerAvailable:
return None, None, "GStreamer is not available"
configuredSink = normalize_sound_sink_choice(soundSink or get_configured_sound_sink())
triedCandidates = []
for elementName in _get_sink_element_candidates(configuredSink):
factory = Gst.ElementFactory.find(elementName)
if factory is None:
triedCandidates.append(f"{elementName} (unavailable)")
continue
sink = Gst.ElementFactory.make(elementName, sinkId)
if sink is None:
triedCandidates.append(f"{elementName} (failed to create)")
continue
return sink, elementName, None
triedText = ", ".join(triedCandidates) if triedCandidates else "none"
reason = (
f"No usable audio sink for soundSink={configuredSink}. "
f"Tried: {triedText}"
)
return None, None, reason
+19 -13
View File
@@ -317,7 +317,7 @@ class SoundThemeManager:
return None
def _playThemeSound(self, soundName, interrupt=True, wait=False,
requireSoundSetting=False):
requireSoundSetting=False, timeoutSeconds=10):
"""Play a themed sound with optional gating and blocking.
Args:
@@ -349,14 +349,17 @@ class SoundThemeManager:
try:
icon = Icon(os.path.dirname(soundPath), os.path.basename(soundPath))
if icon.isValid():
player = sound.getPlayer()
if wait and hasattr(player, "playAndWait"):
success = player.playAndWait(icon, interrupt=interrupt)
if wait:
success, reason = sound.playIconSafely(icon, timeoutSeconds=timeoutSeconds)
if not success:
reason = "Failed to play theme sound via GStreamer"
sound.disableSoundSystem(reason)
self.app.getSettingsManager().setSetting('enableSound', False)
failureReason = reason or f"Failed to play sound '{soundName}'"
msg = (
"SOUND THEME: Failed to play helper-isolated sound "
f"'{soundName}'. Continuing with sound enabled. Reason: {failureReason}"
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
return success
player = sound.getPlayer()
player.play(icon, interrupt=interrupt)
return True
except Exception as e:
@@ -365,7 +368,7 @@ class SoundThemeManager:
return False
def playSound(self, soundName, interrupt=True, wait=False):
def playSound(self, soundName, interrupt=True, wait=False, timeoutSeconds=10):
"""Play a sound from the current theme if enabled.
Args:
@@ -380,7 +383,8 @@ class SoundThemeManager:
soundName,
interrupt=interrupt,
wait=wait,
requireSoundSetting=True
requireSoundSetting=True,
timeoutSeconds=timeoutSeconds
)
def playFocusModeSound(self):
@@ -395,22 +399,24 @@ class SoundThemeManager:
"""Play sound for button focus (future use)."""
return self.playSound(SOUND_BUTTON)
def playStartSound(self, wait=False):
def playStartSound(self, wait=False, timeoutSeconds=10):
"""Play sound for application startup."""
return self._playThemeSound(
SOUND_START,
interrupt=True,
wait=wait,
requireSoundSetting=True
requireSoundSetting=True,
timeoutSeconds=timeoutSeconds
)
def playStopSound(self, wait=False):
def playStopSound(self, wait=False, timeoutSeconds=10):
"""Play sound for application shutdown."""
return self._playThemeSound(
SOUND_STOP,
interrupt=True,
wait=wait,
requireSoundSetting=True
requireSoundSetting=True,
timeoutSeconds=timeoutSeconds
)
_manager = None