Merge branch 'testing'

This commit is contained in:
Storm Dragon
2026-01-10 19:52:27 -05:00
989 changed files with 2080 additions and 94011 deletions
+5 -141
View File
@@ -186,9 +186,7 @@ class APIHelper:
import gi
import importlib
import os
import re
import signal
import subprocess
import sys
gi.require_version("Atspi", "2.0")
@@ -212,6 +210,7 @@ from . import logger
from . import messages
from . import notification_presenter
from . import focus_manager
from . import cthulhu_modifier_manager
from . import cthulhu_state
from . import cthulhu_platform
from . import script_manager
@@ -280,14 +279,6 @@ EXIT_CODE_HANG = 50
#
_userSettings = None
# A subset of the original Xmodmap info prior to our stomping on it.
# Right now, this is just for the user's chosen Cthulhu modifier(s).
#
_originalXmodmap = ""
_cthulhuModifiers = settings.DESKTOP_MODIFIER_KEYS + settings.LAPTOP_MODIFIER_KEYS
_capsLockCleared = False
_restoreCthulhuKeys = False
########################################################################
# #
# METHODS TO HANDLE APPLICATION LIST AND FOCUSED OBJECTS #
@@ -382,125 +373,7 @@ def deviceChangeHandler(deviceManager, device):
if source == Gdk.InputSource.KEYBOARD:
msg = "CTHULHU: Keyboard change detected, re-creating the xmodmap"
debug.printMessage(debug.LEVEL_INFO, msg, True)
_createCthulhuXmodmap()
def updateKeyMap(keyboardEvent):
"""Unsupported convenience method to call sad hacks which should go away."""
global _restoreCthulhuKeys
if keyboardEvent.is_pressed_key():
return
if keyboardEvent.event_string in settings.cthulhuModifierKeys \
and cthulhu_state.bypassNextCommand:
_restoreXmodmap()
_restoreCthulhuKeys = True
return
if _restoreCthulhuKeys and not cthulhu_state.bypassNextCommand:
_createCthulhuXmodmap()
_restoreCthulhuKeys = False
def _setXmodmap(xkbmap):
"""Set the keyboard map using xkbcomp."""
p = subprocess.Popen(['xkbcomp', '-w0', '-', os.environ['DISPLAY']],
stdin=subprocess.PIPE, stdout=None, stderr=None)
p.communicate(xkbmap)
def _setCapsLockAsCthulhuModifier(enable):
"""Enable or disable use of the caps lock key as an Cthulhu modifier key."""
interpretCapsLineProg = re.compile(
r'^\s*interpret\s+Caps[_+]Lock[_+]AnyOfOrNone\s*\(all\)\s*{\s*$', re.I)
normalCapsLineProg = re.compile(
r'^\s*action\s*=\s*LockMods\s*\(\s*modifiers\s*=\s*Lock\s*\)\s*;\s*$', re.I)
interpretShiftLineProg = re.compile(
r'^\s*interpret\s+Shift[_+]Lock[_+]AnyOf\s*\(\s*Shift\s*\+\s*Lock\s*\)\s*{\s*$', re.I)
normalShiftLineProg = re.compile(
r'^\s*action\s*=\s*LockMods\s*\(\s*modifiers\s*=\s*Shift\s*\)\s*;\s*$', re.I)
disabledModLineProg = re.compile(
r'^\s*action\s*=\s*NoAction\s*\(\s*\)\s*;\s*$', re.I)
normalCapsLine = ' action= LockMods(modifiers=Lock);'
normalShiftLine = ' action= LockMods(modifiers=Shift);'
disabledModLine = ' action= NoAction();'
lines = _originalXmodmap.decode('UTF-8').split('\n')
foundCapsInterpretSection = False
foundShiftInterpretSection = False
modified = False
for i, line in enumerate(lines):
if not foundCapsInterpretSection and not foundShiftInterpretSection:
if interpretCapsLineProg.match(line):
foundCapsInterpretSection = True
elif interpretShiftLineProg.match(line):
foundShiftInterpretSection = True
elif foundCapsInterpretSection:
if enable:
if normalCapsLineProg.match(line):
lines[i] = disabledModLine
modified = True
else:
if disabledModLineProg.match(line):
lines[i] = normalCapsLine
modified = True
if line.find('}'):
foundCapsInterpretSection = False
else: # foundShiftInterpretSection
if enable:
if normalShiftLineProg.match(line):
lines[i] = disabledModLine
modified = True
else:
if disabledModLineProg.match(line):
lines[i] = normalShiftLine
modified = True
if line.find('}'):
foundShiftInterpretSection = False
if modified:
_setXmodmap(bytes('\n'.join(lines), 'UTF-8'))
def _createCthulhuXmodmap():
"""Makes an Cthulhu-specific Xmodmap so that the keys behave as we
need them to do. This is especially the case for the Cthulhu modifier.
"""
global _capsLockCleared
if "Caps_Lock" in settings.cthulhuModifierKeys \
or "Shift_Lock" in settings.cthulhuModifierKeys:
_setCapsLockAsCthulhuModifier(True)
_capsLockCleared = True
elif _capsLockCleared:
_setCapsLockAsCthulhuModifier(False)
_capsLockCleared = False
def _storeXmodmap(keyList):
"""Save the original xmodmap for the keys in keyList before we alter it.
Arguments:
- keyList: A list of named keys to look for.
"""
global _originalXmodmap
_originalXmodmap = subprocess.check_output(['xkbcomp', os.environ['DISPLAY'], '-'])
def _restoreXmodmap(keyList=[]):
"""Restore the original xmodmap values for the keys in keyList.
Arguments:
- keyList: A list of named keys to look for. An empty list means
to restore the entire saved xmodmap.
"""
msg = "CTHULHU: Attempting to restore original xmodmap"
debug.printMessage(debug.LEVEL_INFO, msg, True)
global _capsLockCleared
_capsLockCleared = False
p = subprocess.Popen(['xkbcomp', '-w0', '-', os.environ['DISPLAY']],
stdin=subprocess.PIPE, stdout=None, stderr=None)
p.communicate(_originalXmodmap)
msg = "CTHULHU: Original xmodmap restored"
debug.printMessage(debug.LEVEL_INFO, msg, True)
cthulhu_modifier_manager.getManager().refreshCthulhuModifiers("Keyboard change detected.")
def setKeyHandling(new):
"""Toggle use of the new vs. legacy key handling mode.
@@ -588,16 +461,7 @@ def loadUserSettings(script=None, inputEvent=None, skipReloadMessage=False):
if _settingsManager.getSetting('enableSound'):
player.init()
global _cthulhuModifiers
custom = [k for k in settings.cthulhuModifierKeys if k not in _cthulhuModifiers]
_cthulhuModifiers += custom
# Handle the case where a change was made in the Cthulhu Preferences dialog.
#
if _originalXmodmap:
_restoreXmodmap(_cthulhuModifiers)
_storeXmodmap(_cthulhuModifiers)
_createCthulhuXmodmap()
cthulhu_modifier_manager.getManager().refreshCthulhuModifiers("Loading user settings.")
# Activate core systems FIRST before loading plugins
_scriptManager.activate()
@@ -894,7 +758,7 @@ def shutdown(script=None, inputEvent=None):
signal.alarm(0)
_initialized = False
_restoreXmodmap(_cthulhuModifiers)
cthulhu_modifier_manager.getManager().unsetCthulhuModifiers("Shutting down.")
debug.printMessage(debug.LEVEL_INFO, 'CTHULHU: Quitting Atspi main event loop', True)
Atspi.event_quit()
@@ -948,7 +812,7 @@ def crashOnSignal(signum, frame):
msg = f"CTHULHU: Shutting down and exiting due to signal={signum} {signalString}"
debug.printMessage(debug.LEVEL_SEVERE, msg, True)
debug.printStack(debug.LEVEL_SEVERE)
_restoreXmodmap(_cthulhuModifiers)
cthulhu_modifier_manager.getManager().unsetCthulhuModifiers("Crashed")
sys.exit(1)
def main():
+225
View File
@@ -0,0 +1,225 @@
#!/usr/bin/env python3
#
# Copyright (c) 2026 Stormux
# Copyright (c) 2023 Igalia, S.L.
# Author: Joanmarie Diggs <jdiggs@igalia.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
# Forked from Orca screen reader.
# Cthulhu project: https://git.stormux.org/storm/cthulhu
"""Manages the Cthulhu modifier key(s)."""
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
__copyright__ = "Copyright (c) 2026 Stormux"
__license__ = "LGPL"
import os
import re
import subprocess
import gi
gi.require_version("Atspi", "2.0")
from gi.repository import Atspi
from gi.repository import GLib
import cthulhu.debug as debug
import cthulhu.settings as settings
class CthulhuModifierManager:
"""Manages the Cthulhu modifier."""
def __init__(self):
self._originalXmodmap = b""
self._capsLockCleared = False
def refreshCthulhuModifiers(self, reason=""):
"""Refreshes the Cthulhu modifier keys."""
msg = "CTHULHU MODIFIER MANAGER: Refreshing Cthulhu modifiers"
if reason:
msg += f": {reason}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
display = os.environ.get("DISPLAY")
if not display:
msg = "CTHULHU MODIFIER MANAGER: DISPLAY not set, skipping xkbcomp operations"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
self.unsetCthulhuModifiers(reason)
with subprocess.Popen(["xkbcomp", display, "-"],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) as process:
self._originalXmodmap, _ = process.communicate()
self._createCthulhuXmodmap()
def unsetCthulhuModifiers(self, reason=""):
"""Turns the Cthulhu modifiers back into their original purpose."""
msg = "CTHULHU MODIFIER MANAGER: Attempting to restore original xmodmap"
if reason:
msg += f": {reason}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
if not self._originalXmodmap:
msg = "CTHULHU MODIFIER MANAGER: No stored xmodmap found"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
display = os.environ.get("DISPLAY")
if not display:
msg = "CTHULHU MODIFIER MANAGER: DISPLAY not set, skipping xmodmap restoration"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
self._capsLockCleared = False
with subprocess.Popen(["xkbcomp", "-w0", "-", display],
stdin=subprocess.PIPE, stdout=None, stderr=None) as process:
process.communicate(self._originalXmodmap)
msg = "CTHULHU MODIFIER MANAGER: Original xmodmap restored"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _createCthulhuXmodmap(self):
"""Makes a Cthulhu-specific Xmodmap so that the modifier works."""
msg = "CTHULHU MODIFIER MANAGER: Creating Cthulhu xmodmap"
debug.printMessage(debug.LEVEL_INFO, msg, True)
if "Caps_Lock" in settings.cthulhuModifierKeys \
or "Shift_Lock" in settings.cthulhuModifierKeys:
self.setCapsLockAsCthulhuModifier(True)
self._capsLockCleared = True
elif self._capsLockCleared:
self.setCapsLockAsCthulhuModifier(False)
self._capsLockCleared = False
def setCapsLockAsCthulhuModifier(self, enable):
"""Enable or disable use of the caps lock key as a Cthulhu modifier key."""
msg = "CTHULHU MODIFIER MANAGER: Setting caps lock as the Cthulhu modifier"
debug.printMessage(debug.LEVEL_INFO, msg, True)
display = os.environ.get("DISPLAY")
if not display:
msg = "CTHULHU MODIFIER MANAGER: DISPLAY not set, cannot modify caps lock"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
if not self._originalXmodmap:
msg = "CTHULHU MODIFIER MANAGER: No xmodmap available, cannot modify caps lock"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
interpretCapsLineProg = re.compile(
r'^\s*interpret\s+Caps[_+]Lock[_+]AnyOfOrNone\s*\(all\)\s*{\s*$', re.I)
normalCapsLineProg = re.compile(
r'^\s*action\s*=\s*LockMods\s*\(\s*modifiers\s*=\s*Lock\s*\)\s*;\s*$', re.I)
interpretShiftLineProg = re.compile(
r'^\s*interpret\s+Shift[_+]Lock[_+]AnyOf\s*\(\s*Shift\s*\+\s*Lock\s*\)\s*{\s*$', re.I)
normalShiftLineProg = re.compile(
r'^\s*action\s*=\s*LockMods\s*\(\s*modifiers\s*=\s*Shift\s*\)\s*;\s*$', re.I)
disabledModLineProg = re.compile(
r'^\s*action\s*=\s*NoAction\s*\(\s*\)\s*;\s*$', re.I)
normalCapsLine = ' action= LockMods(modifiers=Lock);'
normalShiftLine = ' action= LockMods(modifiers=Shift);'
disabledModLine = ' action= NoAction();'
lines = self._originalXmodmap.decode('UTF-8').split('\n')
foundCapsInterpretSection = False
foundShiftInterpretSection = False
modified = False
for i, line in enumerate(lines):
if not foundCapsInterpretSection and not foundShiftInterpretSection:
if interpretCapsLineProg.match(line):
foundCapsInterpretSection = True
elif interpretShiftLineProg.match(line):
foundShiftInterpretSection = True
elif foundCapsInterpretSection:
if enable:
if normalCapsLineProg.match(line):
lines[i] = disabledModLine
modified = True
else:
if disabledModLineProg.match(line):
lines[i] = normalCapsLine
modified = True
if line.find('}'):
foundCapsInterpretSection = False
elif foundShiftInterpretSection:
if enable:
if normalShiftLineProg.match(line):
lines[i] = disabledModLine
modified = True
else:
if disabledModLineProg.match(line):
lines[i] = normalShiftLine
modified = True
if line.find('}'):
foundShiftInterpretSection = False
if modified:
msg = "CTHULHU MODIFIER MANAGER: Updating xmodmap"
debug.printMessage(debug.LEVEL_INFO, msg, True)
with subprocess.Popen(["xkbcomp", "-w0", "-", display],
stdin=subprocess.PIPE, stdout=None, stderr=None) as process:
process.communicate(bytes('\n'.join(lines), 'UTF-8'))
else:
msg = "CTHULHU MODIFIER MANAGER: Not updating xmodmap"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def toggleModifier(self, keyboardEvent):
"""Toggles the modifier to enable double-clicking causing normal behavior."""
if keyboardEvent.keyval_name in ["Caps_Lock", "Shift_Lock"]:
self._toggleModifierLock(keyboardEvent)
return
def _toggleModifierLock(self, keyboardEvent):
"""Toggles the lock for a modifier to enable double-clicking causing normal behavior."""
if not keyboardEvent.is_pressed_key():
return
def toggle(modifiers, modifier):
if modifiers & modifier:
lock = Atspi.KeySynthType.UNLOCKMODIFIERS
msg = "CTHULHU MODIFIER MANAGER: Unlocking CapsLock"
debug.printMessage(debug.LEVEL_INFO, msg, True)
else:
lock = Atspi.KeySynthType.LOCKMODIFIERS
msg = "CTHULHU MODIFIER MANAGER: Locking CapsLock"
debug.printMessage(debug.LEVEL_INFO, msg, True)
Atspi.generate_keyboard_event(modifier, "", lock)
if keyboardEvent.keyval_name == "Caps_Lock":
modifier = 1 << Atspi.ModifierType.SHIFTLOCK
elif keyboardEvent.keyval_name == "Shift_Lock":
modifier = 1 << Atspi.ModifierType.SHIFT
else:
return
msg = "CTHULHU MODIFIER MANAGER: Scheduling lock change"
debug.printMessage(debug.LEVEL_INFO, msg, True)
GLib.timeout_add(1, toggle, keyboardEvent.modifiers, modifier)
_manager = CthulhuModifierManager()
def getManager():
"""Returns the CthulhuModifierManager singleton."""
return _manager
+4
View File
@@ -851,6 +851,10 @@ SPEECH_VOICE_TYPE_UPPERCASE = C_("VoiceType", "Uppercase")
# system. (http://devel.freebsoft.org/speechd)
SPEECH_DISPATCHER = _("Speech Dispatcher")
# Translators: This label refers to the Piper neural text-to-speech system.
# (https://github.com/rhasspy/piper)
PIPER_TTS = _("Piper Neural TTS")
# Translators: This is a label for a group of options related to Cthulhu's behavior
# when presenting an application's spell check dialog.
SPELL_CHECK = C_("OptionGroup", "Spell Check")
+6 -30
View File
@@ -47,6 +47,7 @@ from . import keybindings
from . import keynames
from . import messages
from . import cthulhu
from . import cthulhu_modifier_manager
from . import cthulhu_state
from . import script_manager
from . import settings
@@ -347,7 +348,10 @@ class KeyboardEvent(InputEvent):
KeyboardEvent.secondCthulhuModifierTime <
KeyboardEvent.lastCthulhuModifierAloneTime + 0.5):
# double-cthulhu, let the real action happen
self._bypassCthulhu = True
if self.event_string in ["Caps_Lock", "Shift_Lock"]:
cthulhu_modifier_manager.getManager().toggleModifier(self)
else:
self._bypassCthulhu = True
if not _isPressed:
KeyboardEvent.lastCthulhuModifierAlone = False
KeyboardEvent.lastCthulhuModifierAloneTime = False
@@ -955,7 +959,6 @@ class KeyboardEvent(InputEvent):
if (self.event_string == "Caps_Lock" \
or self.event_string == "Shift_Lock") \
and self.type == Atspi.EventType.KEY_PRESSED_EVENT:
self._lock_mod()
self.keyType = KeyboardEvent.TYPE_LOCKING
self._present()
return False, 'Bypassed cthulhu modifier'
@@ -984,6 +987,7 @@ class KeyboardEvent(InputEvent):
if cthulhu_state.bypassNextCommand:
if not self.is_modifier_key():
cthulhu_state.bypassNextCommand = False
cthulhu_modifier_manager.getManager().refreshCthulhuModifiers("Bypass next command disabled")
self._script.addKeyGrabs()
return False, 'Bypass next command'
@@ -999,34 +1003,6 @@ class KeyboardEvent(InputEvent):
return False, 'Unaddressed case'
def _lock_mod(self):
def lock_mod(modifiers, modifier):
def lockit():
try:
if modifiers & modifier:
lock = Atspi.KeySynthType.UNLOCKMODIFIERS
debug.printMessage(debug.LEVEL_INFO, "Unlocking capslock", True)
else:
lock = Atspi.KeySynthType.LOCKMODIFIERS
debug.printMessage(debug.LEVEL_INFO, "Locking capslock", True)
Atspi.generate_keyboard_event(modifier, "", lock)
debug.printMessage(debug.LEVEL_INFO, "Done with capslock", True)
except Exception:
debug.printMessage(debug.LEVEL_INFO, "Could not trigger capslock, " \
"at-spi2-core >= 2.32 is needed for triggering capslock", True)
pass
return lockit
if self.event_string == "Caps_Lock":
modifier = 1 << Atspi.ModifierType.SHIFTLOCK
elif self.event_string == "Shift_Lock":
modifier = 1 << Atspi.ModifierType.SHIFT
else:
tokens = ["Unknown locking key", self.event_string]
debug.printTokens(debug.LEVEL_WARNING, tokens, True)
return
debug.printMessage(debug.LEVEL_INFO, "Scheduling capslock", True)
GLib.timeout_add(1, lock_mod(self.modifiers, modifier))
def _consume(self):
startTime = time.time()
data = "'%s' (%d)" % (self.event_string, self.hw_code)
+19 -1
View File
@@ -37,8 +37,11 @@ import gi
gi.require_version("Atspi", "2.0")
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
gi.require_version("Gio", "2.0")
from gi.repository import Atspi
from gi.repository import Gdk
from gi.repository import Gio
from gi.repository import GLib
from gi.repository import GObject
from gi.repository import Gtk
@@ -305,7 +308,22 @@ class LearnModePresenter:
uri = "help:cthulhu"
if page:
uri += f"/{page}"
Gtk.show_uri(Gdk.Screen.get_default(), uri, Gtk.get_current_event_time())
try:
Gtk.show_uri(Gdk.Screen.get_default(), uri, Gtk.get_current_event_time())
return True
except GLib.GError as error:
msg = f"LEARN MODE PRESENTER: Failed to open help URI {uri}: {error}"
debug.printMessage(debug.LEVEL_WARNING, msg, True)
try:
Gio.AppInfo.launch_default_for_uri(uri, None)
return True
except GLib.GError as error:
msg = f"LEARN MODE PRESENTER: Failed to launch help URI {uri}: {error}"
debug.printMessage(debug.LEVEL_WARNING, msg, True)
if script:
script.presentMessage(messages.HELP_NOT_AVAILABLE)
return True
class CommandListGUI:
+4
View File
@@ -34,6 +34,7 @@ cthulhu_python_sources = files([
'colornames.py',
'common_keyboardmap.py',
'cthulhuVersion.py',
'cthulhu_modifier_manager.py',
'cthulhu_state.py',
'date_and_time_presenter.py',
'dbus_service.py',
@@ -93,6 +94,9 @@ cthulhu_python_sources = files([
'speechdispatcherfactory.py',
'speech_generator.py',
'speechserver.py',
'piperfactory.py',
'piper_voice_manager.py',
'piper_audio_player.py',
'structural_navigation.py',
'text_attribute_names.py',
'translation_context.py',
+9
View File
@@ -1498,6 +1498,9 @@ LINE_UNSELECTED_UP = _("line unselected up from cursor position")
# exiting Learn Mode.
LEARN_MODE_STOP = _("Exiting learn mode.")
# Translators: This message is presented when help cannot be opened.
HELP_NOT_AVAILABLE = _("Help is not available.")
# Translators: when the user selects (highlights) or unselects text in a
# document, Cthulhu will speak information about what they have selected or
# unselected. This message is presented when the user selects from the
@@ -2290,12 +2293,18 @@ SPEECH_MODULE_VALUE = _("Speech-dispatcher module %s")
# Translators: This string announces the current speech-dispatcher voice.
SPEECH_VOICE_VALUE = _("Speech-dispatcher voice %s")
# Translators: This string announces the current voice for non-speech-dispatcher engines.
SPEECH_VOICE_VALUE_GENERIC = _("Voice %s")
# Translators: This string is presented when speech-dispatcher modules are unavailable.
SPEECH_MODULES_UNAVAILABLE = _("No speech-dispatcher modules available")
# Translators: This string is presented when speech-dispatcher voices are unavailable.
SPEECH_VOICES_UNAVAILABLE = _("No speech-dispatcher voices available")
# Translators: This string is presented when voices are unavailable for non-speech-dispatcher engines.
SPEECH_VOICES_UNAVAILABLE_GENERIC = _("No voices available")
# Translators: This string confirms speech settings have been saved.
SPEECH_SETTINGS_SAVED = _("Speech settings saved")
+302
View File
@@ -0,0 +1,302 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
# Cthulhu project: https://git.stormux.org/storm/cthulhu
"""GStreamer-based audio player for Piper TTS synthesis output."""
__id__ = "$Id:$"
__version__ = "$Revision:$"
__date__ = "$Date:$"
__copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import threading
import gi
from gi.repository import GLib
try:
gi.require_version('Gst', '1.0')
from gi.repository import Gst
except Exception:
_gstreamerAvailable = False
else:
_gstreamerAvailable, args = Gst.init_check(None)
from . import debug
class PiperAudioPlayer:
"""GStreamer-based audio player for Piper TTS output.
Handles raw PCM audio data from Piper synthesis and plays it through
a GStreamer pipeline with volume control.
"""
def __init__(self, sampleRate=22050):
"""Initialize the audio player.
Arguments:
- sampleRate: Audio sample rate in Hz (default 22050, common for Piper)
"""
self._sampleRate = sampleRate
self._pipeline = None
self._appsrc = None
self._volume = None
self._initialized = False
self._playing = False
self._lock = threading.Lock()
self._completionCallback = None
if not _gstreamerAvailable:
msg = 'PIPER AUDIO: GStreamer is not available'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return
self._init()
def _init(self):
"""Initialize the GStreamer pipeline."""
if self._initialized:
return True
if not _gstreamerAvailable:
return False
self._resetPipeline()
try:
self._pipeline = Gst.Pipeline.new("piper-audio")
self._appsrc = Gst.ElementFactory.make("appsrc", "source")
if self._appsrc is None:
msg = 'PIPER AUDIO: Failed to create appsrc element'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
self._appsrc.set_property("format", Gst.Format.TIME)
self._appsrc.set_property("is-live", False)
self._appsrc.set_property("block", False)
caps = Gst.Caps.from_string(
f"audio/x-raw,format=S16LE,channels=1,"
f"rate={self._sampleRate},layout=interleaved"
)
self._appsrc.set_property("caps", caps)
convert = Gst.ElementFactory.make("audioconvert", "convert")
resample = Gst.ElementFactory.make("audioresample", "resample")
self._volume = Gst.ElementFactory.make("volume", "volume")
if self._volume is None:
msg = 'PIPER AUDIO: Failed to create volume element'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
sink = Gst.ElementFactory.make("autoaudiosink", "sink")
if sink is None:
msg = 'PIPER AUDIO: Failed to create autoaudiosink element'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
for element in [self._appsrc, convert, resample, self._volume, sink]:
self._pipeline.add(element)
if not self._appsrc.link(convert):
msg = 'PIPER AUDIO: Failed to link appsrc to convert'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if not convert.link(resample):
msg = 'PIPER AUDIO: Failed to link convert to resample'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if not resample.link(self._volume):
msg = 'PIPER AUDIO: Failed to link resample to volume'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if not self._volume.link(sink):
msg = 'PIPER AUDIO: Failed to link volume to sink'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._onMessage)
self._initialized = True
msg = 'PIPER AUDIO: Pipeline initialized successfully'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return True
except Exception as e:
msg = f'PIPER AUDIO: Failed to initialize pipeline: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
def _resetPipeline(self):
"""Reset the GStreamer pipeline and related elements."""
if self._pipeline is not None:
self._pipeline.set_state(Gst.State.NULL)
self._pipeline = None
self._appsrc = None
self._volume = None
self._initialized = False
def _onMessage(self, bus, message):
"""Handle GStreamer bus messages."""
if message.type == Gst.MessageType.EOS:
self._pipeline.set_state(Gst.State.NULL)
with self._lock:
self._playing = False
if self._completionCallback:
GLib.idle_add(self._completionCallback)
self._completionCallback = None
elif message.type == Gst.MessageType.ERROR:
self._pipeline.set_state(Gst.State.NULL)
with self._lock:
self._playing = False
error, info = message.parse_error()
msg = f'PIPER AUDIO ERROR: {error}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
def setSampleRate(self, sampleRate):
"""Update the sample rate for the audio stream.
Arguments:
- sampleRate: New sample rate in Hz
"""
if sampleRate != self._sampleRate:
self._sampleRate = sampleRate
self.stop()
self._resetPipeline()
self._init()
def setVolume(self, volumeLevel):
"""Set the playback volume.
Arguments:
- volumeLevel: Volume level from 0.0 to 1.0
"""
if self._volume is not None:
volume = max(0.0, min(1.0, volumeLevel))
self._volume.set_property("volume", volume)
def play(self, audioData, interrupt=True, completionCallback=None):
"""Play raw PCM audio data.
Arguments:
- audioData: Raw PCM audio data as bytes (16-bit signed, little-endian)
- interrupt: If True, stop any current playback first
- completionCallback: Optional callback to invoke when playback completes
"""
if not self._initialized:
if not self._init():
msg = 'PIPER AUDIO: Cannot play - not initialized'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if interrupt:
self.stop()
with self._lock:
self._playing = True
self._completionCallback = completionCallback
self._pipeline.set_state(Gst.State.PLAYING)
buf = Gst.Buffer.new_wrapped(audioData)
result = self._appsrc.emit("push-buffer", buf)
if result != Gst.FlowReturn.OK:
msg = f'PIPER AUDIO: Failed to push buffer: {result}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
self._appsrc.emit("end-of-stream")
return True
def playStream(self, audioGenerator, interrupt=True, completionCallback=None):
"""Play audio from a streaming generator.
Arguments:
- audioGenerator: Iterator/generator yielding audio chunks as bytes
- interrupt: If True, stop any current playback first
- completionCallback: Optional callback when playback completes
"""
if not self._initialized:
if not self._init():
msg = 'PIPER AUDIO: Cannot play stream - not initialized'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if interrupt:
self.stop()
with self._lock:
self._playing = True
self._completionCallback = completionCallback
self._pipeline.set_state(Gst.State.PLAYING)
def feedThread():
try:
for audioChunk in audioGenerator:
with self._lock:
if not self._playing:
break
buf = Gst.Buffer.new_wrapped(bytes(audioChunk))
result = self._appsrc.emit("push-buffer", buf)
if result != Gst.FlowReturn.OK:
msg = f'PIPER AUDIO: Stream push failed: {result}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
break
except Exception as e:
msg = f'PIPER AUDIO: Stream feed error: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
finally:
self._appsrc.emit("end-of-stream")
thread = threading.Thread(target=feedThread, daemon=True)
thread.start()
return True
def isPlaying(self):
"""Check if audio is currently playing.
Returns True if playback is in progress.
"""
with self._lock:
return self._playing
def stop(self):
"""Stop any current playback."""
with self._lock:
self._playing = False
if self._pipeline is not None:
self._pipeline.set_state(Gst.State.NULL)
self._completionCallback = None
def shutdown(self):
"""Shut down the audio player and release resources."""
self.stop()
self._resetPipeline()
+347
View File
@@ -0,0 +1,347 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
# Cthulhu project: https://git.stormux.org/storm/cthulhu
"""Voice discovery and management for Piper TTS."""
__id__ = "$Id:$"
__version__ = "$Revision:$"
__date__ = "$Date:$"
__copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import json
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional
from . import debug
from . import speechserver
@dataclass
class PiperVoiceInfo:
"""Metadata for a Piper voice model."""
name: str
language: str
dialect: str
quality: str
modelPath: Path
configPath: Path
sampleRate: int = 22050
speakers: Dict[int, str] = field(default_factory=dict)
@property
def key(self) -> str:
"""Return a unique key for this voice."""
dialectPart = f"-{self.dialect}" if self.dialect else ""
return f"{self.language}{dialectPart}-{self.name}-{self.quality}"
@property
def displayName(self) -> str:
"""Return a human-readable display name."""
dialectPart = f" ({self.dialect})" if self.dialect else ""
return f"{self.name} - {self.language}{dialectPart} [{self.quality}]"
@property
def isMultiSpeaker(self) -> bool:
"""Return True if this is a multi-speaker model."""
return len(self.speakers) > 1
class PiperVoiceManager:
"""Discovers and manages Piper voice models.
Searches standard paths for Piper voice models (.onnx files with
companion .onnx.json config files) and provides methods to list
and load them.
"""
VOICE_SEARCH_PATHS = [
"~/.local/share/piper/voices",
"~/.local/share/piper-voices",
"~/.local/share/piper-tts/voices",
"~/.config/piper/voices",
"~/.config/piper-tts/voices",
"$XDG_DATA_HOME/piper/voices",
"$XDG_DATA_HOME/piper-voices",
"$XDG_DATA_HOME/piper-tts/voices",
"$XDG_DATA_HOME/cthulhu/piper-voices",
"/usr/share/piper/voices",
"/usr/share/piper-voices",
"/usr/share/piper-tts/voices",
"/usr/local/share/piper/voices",
"/usr/local/share/piper-voices",
"/usr/local/share/piper-tts/voices",
]
VOICE_FILENAME_PATTERN = re.compile(
r'^(?P<lang>[a-z]{2})(?:_(?P<dialect>[A-Z]{2}))?'
r'-(?P<name>[a-zA-Z0-9_]+)'
r'-(?P<quality>low|medium|high|x_low)\.onnx$'
)
def __init__(self, customPath=None):
"""Initialize the voice manager.
Arguments:
- customPath: Optional additional path to search for voices
"""
self._customPath = customPath
self._voices = []
self._voiceCache = {}
def discoverVoices(self) -> List[PiperVoiceInfo]:
"""Discover all available Piper voices.
Searches standard paths and returns a list of PiperVoiceInfo
objects for each valid voice found.
"""
self._voices = []
searchPaths = self._getSearchPaths()
for searchPath in searchPaths:
path = Path(os.path.expandvars(os.path.expanduser(searchPath)))
if not path.exists():
continue
msg = f'PIPER VOICES: Searching {path}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
for onnxFile in path.rglob("*.onnx"):
configFile = Path(str(onnxFile) + ".json")
if not configFile.exists():
configFile = onnxFile.with_suffix(".onnx.json")
if configFile.exists():
try:
voice = self._parseVoice(onnxFile, configFile)
if voice:
self._voices.append(voice)
msg = f'PIPER VOICES: Found voice {voice.displayName}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
except Exception as e:
msg = f'PIPER VOICES: Failed to parse {onnxFile}: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
else:
voice = self._parseVoiceFromFilename(onnxFile)
if voice:
self._voices.append(voice)
msg = f'PIPER VOICES: Found voice {voice.displayName} (no config)'
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._voices.sort(key=lambda v: (v.language, v.name, v.quality))
msg = f'PIPER VOICES: Discovered {len(self._voices)} voice(s)'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return self._voices
def getVoices(self) -> List[PiperVoiceInfo]:
"""Get the list of discovered voices.
Returns cached list; call discoverVoices() first to refresh.
"""
if not self._voices:
self.discoverVoices()
return self._voices
def getVoiceByKey(self, key: str) -> Optional[PiperVoiceInfo]:
"""Get a voice by its unique key.
Arguments:
- key: Voice key (e.g., "en_US-lessac-medium")
"""
for voice in self.getVoices():
if voice.key == key:
return voice
return None
def getVoiceByName(self, name: str) -> Optional[PiperVoiceInfo]:
"""Get a voice by name (first match).
Arguments:
- name: Voice name (e.g., "lessac")
"""
for voice in self.getVoices():
if voice.name == name:
return voice
return None
def getVoicesForLanguage(self, lang: str) -> List[PiperVoiceInfo]:
"""Get all voices for a specific language.
Arguments:
- lang: Language code (e.g., "en")
"""
return [v for v in self.getVoices() if v.language == lang]
def voiceToVoiceFamily(self, voiceInfo: PiperVoiceInfo) -> speechserver.VoiceFamily:
"""Convert a PiperVoiceInfo to a Cthulhu VoiceFamily.
Arguments:
- voiceInfo: The PiperVoiceInfo to convert
"""
return speechserver.VoiceFamily({
speechserver.VoiceFamily.NAME: voiceInfo.displayName,
speechserver.VoiceFamily.LANG: voiceInfo.language,
speechserver.VoiceFamily.DIALECT: voiceInfo.dialect or "",
speechserver.VoiceFamily.VARIANT: voiceInfo.quality,
})
def _getSearchPaths(self) -> List[str]:
"""Get the list of paths to search for voices."""
paths = list(self.VOICE_SEARCH_PATHS)
if self._customPath:
paths.insert(0, self._customPath)
xdgDataHome = os.environ.get("XDG_DATA_HOME", "~/.local/share")
paths = [p.replace("$XDG_DATA_HOME", xdgDataHome) for p in paths]
return paths
def _parseVoice(self, modelPath: Path, configPath: Path) -> Optional[PiperVoiceInfo]:
"""Parse a voice from its model and config files.
Arguments:
- modelPath: Path to the .onnx model file
- configPath: Path to the .json config file
"""
try:
with open(configPath, 'r', encoding='utf-8') as f:
config = json.load(f)
except (json.JSONDecodeError, IOError) as e:
msg = f'PIPER VOICES: Failed to read config {configPath}: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return None
language = ""
dialect = ""
name = modelPath.stem
quality = "medium"
sampleRate = 22050
speakers = {}
if "language" in config:
langInfo = config["language"]
if isinstance(langInfo, dict):
language = langInfo.get("code", "")
if "_" in language:
parts = language.split("_")
language = parts[0]
dialect = parts[1] if len(parts) > 1 else ""
elif isinstance(langInfo, str):
if "_" in langInfo:
parts = langInfo.split("_")
language = parts[0]
dialect = parts[1] if len(parts) > 1 else ""
else:
language = langInfo
if "audio" in config:
sampleRate = config["audio"].get("sample_rate", 22050)
if "speaker_id_map" in config:
speakers = {v: k for k, v in config["speaker_id_map"].items()}
match = self.VOICE_FILENAME_PATTERN.match(modelPath.name)
if match:
if not language:
language = match.group("lang")
if not dialect:
dialect = match.group("dialect") or ""
name = match.group("name")
quality = match.group("quality").replace("x_low", "x-low")
else:
stem = modelPath.stem
for q in ["low", "medium", "high", "x_low", "x-low"]:
if stem.endswith(f"-{q}"):
quality = q.replace("x_low", "x-low")
stem = stem[:-len(q)-1]
break
parts = stem.split("-")
if parts:
langPart = parts[0]
if "_" in langPart:
langParts = langPart.split("_")
if not language:
language = langParts[0]
if not dialect:
dialect = langParts[1] if len(langParts) > 1 else ""
elif not language:
language = langPart
if len(parts) > 1:
name = parts[1]
if not language:
language = "unknown"
return PiperVoiceInfo(
name=name,
language=language,
dialect=dialect,
quality=quality,
modelPath=modelPath,
configPath=configPath,
sampleRate=sampleRate,
speakers=speakers
)
def _parseVoiceFromFilename(self, modelPath: Path) -> Optional[PiperVoiceInfo]:
"""Parse voice info from filename only (no config file).
Arguments:
- modelPath: Path to the .onnx model file
"""
match = self.VOICE_FILENAME_PATTERN.match(modelPath.name)
if not match:
return None
return PiperVoiceInfo(
name=match.group("name"),
language=match.group("lang"),
dialect=match.group("dialect") or "",
quality=match.group("quality").replace("x_low", "x-low"),
modelPath=modelPath,
configPath=modelPath.with_suffix(".onnx.json"),
sampleRate=22050,
speakers={}
)
_manager = None
def getManager(customPath=None) -> PiperVoiceManager:
"""Get the singleton voice manager instance.
Arguments:
- customPath: Optional custom path to search for voices
"""
global _manager
if _manager is None:
_manager = PiperVoiceManager(customPath)
return _manager
+741
View File
@@ -0,0 +1,741 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
# Cthulhu project: https://git.stormux.org/storm/cthulhu
"""Provides a Cthulhu speech server for Piper TTS backend."""
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
__copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import io
import os
import sys
import threading
import time
import wave
from concurrent.futures import ThreadPoolExecutor
from gi.repository import GLib
from . import chnames
from . import debug
from . import guilabels
from . import messages
from . import settings
from . import speechserver
from . import cthulhu_state
from .acss import ACSS
from . import piper_voice_manager
from . import piper_audio_player
def _getPipxSitePackages():
pipxHome = os.environ.get("PIPX_HOME", os.path.expanduser("~/.local/pipx"))
pythonVersion = f"python{sys.version_info.major}.{sys.version_info.minor}"
sitePackages = os.path.join(
pipxHome,
"venvs",
"piper-tts",
"lib",
pythonVersion,
"site-packages"
)
if os.path.isdir(sitePackages):
return sitePackages
return None
def _tryImportPiper():
try:
from piper.voice import PiperVoice
return PiperVoice
except ImportError:
pass
sitePackages = _getPipxSitePackages()
if not sitePackages:
return None
addedPath = False
if sitePackages not in sys.path:
sys.path.insert(0, sitePackages)
addedPath = True
try:
from piper.voice import PiperVoice
msg = f'PIPER: Loaded piper-tts from pipx venv ({sitePackages})'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return PiperVoice
except ImportError:
if addedPath:
try:
sys.path.remove(sitePackages)
except ValueError:
pass
return None
PiperVoice = _tryImportPiper()
_piperAvailable = PiperVoice is not None
if _piperAvailable:
try:
from piper.config import SynthesisConfig as _PiperSynthesisConfig
except Exception:
_PiperSynthesisConfig = None
else:
_PiperSynthesisConfig = None
if not _piperAvailable:
msg = 'PIPER: piper-tts library not available'
debug.printMessage(debug.LEVEL_INFO, msg, True)
class SpeechServer(speechserver.SpeechServer):
"""Piper TTS speech server implementation.
Provides speech synthesis using Piper neural text-to-speech,
implementing the Cthulhu SpeechServer interface.
"""
_active_servers = {}
DEFAULT_SERVER_ID = 'piper-default'
@staticmethod
def getFactoryName():
"""Returns a localized name describing this factory."""
return guilabels.PIPER_TTS
@staticmethod
def getSpeechServers():
"""Gets available speech servers as a list."""
servers = []
if not _piperAvailable:
msg = 'PIPER: Cannot list servers - piper-tts not installed'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return servers
manager = piper_voice_manager.getManager()
voices = manager.discoverVoices()
if not voices:
msg = 'PIPER: No voice models found'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return servers
server = SpeechServer._getSpeechServer(SpeechServer.DEFAULT_SERVER_ID)
if server is not None:
servers.append(server)
return servers
@classmethod
def _getSpeechServer(cls, serverId):
"""Return an active server for given id.
Attempt to create the server if it doesn't exist yet.
Returns None when it is not possible to create the server.
"""
if serverId not in cls._active_servers:
cls(serverId)
return cls._active_servers.get(serverId)
@staticmethod
def getSpeechServer(info=None):
"""Gets a given SpeechServer based upon the info."""
thisId = info[1] if info is not None else SpeechServer.DEFAULT_SERVER_ID
return SpeechServer._getSpeechServer(thisId)
@staticmethod
def shutdownActiveServers():
"""Cleans up and shuts down this factory."""
servers = list(SpeechServer._active_servers.values())
for server in servers:
server.shutdown()
def __init__(self, serverId):
"""Initialize the Piper speech server.
Arguments:
- serverId: Identifier for this server instance
"""
super(SpeechServer, self).__init__()
self._id = serverId
self._voice = None
self._voiceInfo = None
self._voiceManager = piper_voice_manager.getManager()
self._audioPlayer = None
self._executor = None
self._currentFuture = None
self._stopEvent = threading.Event()
self._lock = threading.Lock()
self._speakGeneration = 0
self._currentVoiceProperties = {}
self._acssManipulators = (
(ACSS.RATE, self._setRate),
(ACSS.AVERAGE_PITCH, self._setPitch),
(ACSS.GAIN, self._setVolume),
(ACSS.FAMILY, self._setFamily),
)
self._rate = 50
self._pitch = 5.0
self._volume = 1.0
if not _piperAvailable:
msg = 'PIPER: piper-tts library not available'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return
try:
self._init()
except Exception as e:
debug.printException(debug.LEVEL_WARNING)
msg = f'PIPER: Failed to initialize server: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
else:
SpeechServer._active_servers[serverId] = self
def _init(self):
"""Initialize the speech server components."""
voices = self._voiceManager.getVoices()
if not voices:
raise RuntimeError("No Piper voices found")
self._voiceInfo = voices[0]
self._loadVoice(self._voiceInfo)
self._audioPlayer = piper_audio_player.PiperAudioPlayer(
self._voiceInfo.sampleRate
)
self._executor = ThreadPoolExecutor(
max_workers=1,
thread_name_prefix="piper"
)
msg = f'PIPER: Initialized with voice {self._voiceInfo.displayName}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _loadVoice(self, voiceInfo):
"""Load a Piper voice model.
Arguments:
- voiceInfo: PiperVoiceInfo for the voice to load
"""
msg = f'PIPER: Loading voice {voiceInfo.displayName}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._voice = PiperVoice.load(str(voiceInfo.modelPath))
self._voiceInfo = voiceInfo
detectedRate = self._getVoiceSampleRate(self._voice, voiceInfo)
if detectedRate and detectedRate != self._voiceInfo.sampleRate:
msg = (
f'PIPER: Using detected sample rate {detectedRate} '
f'for {voiceInfo.displayName}'
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._voiceInfo.sampleRate = detectedRate
if self._audioPlayer:
self._audioPlayer.setSampleRate(voiceInfo.sampleRate)
def _getVoiceSampleRate(self, voice, voiceInfo):
if voice is None:
return voiceInfo.sampleRate if voiceInfo else None
for attr in ("sample_rate", "sampleRate"):
value = getattr(voice, attr, None)
if isinstance(value, (int, float)) and value > 0:
return int(value)
config = getattr(voice, "config", None)
sampleRate = None
if isinstance(config, dict):
audio = config.get("audio")
if isinstance(audio, dict):
sampleRate = audio.get("sample_rate")
elif audio is not None and hasattr(audio, "sample_rate"):
sampleRate = getattr(audio, "sample_rate", None)
if sampleRate is None:
sampleRate = config.get("sample_rate")
else:
if hasattr(config, "audio"):
audio = getattr(config, "audio")
if isinstance(audio, dict):
sampleRate = audio.get("sample_rate")
elif audio is not None and hasattr(audio, "sample_rate"):
sampleRate = getattr(audio, "sample_rate", None)
if sampleRate is None and hasattr(config, "sample_rate"):
sampleRate = getattr(config, "sample_rate", None)
try:
sampleRate = int(sampleRate) if sampleRate is not None else None
except (TypeError, ValueError):
sampleRate = None
if sampleRate and sampleRate > 0:
return sampleRate
return voiceInfo.sampleRate if voiceInfo else None
def _mapRate(self, acssRate):
"""Map ACSS rate (0-99) to Piper length_scale.
ACSS rate 50 (default) = length_scale 1.0
Higher ACSS rate = lower length_scale (faster)
Lower ACSS rate = higher length_scale (slower)
Arguments:
- acssRate: Rate value from 0-99
"""
rate = acssRate if acssRate is not None else 50
rate = max(0, min(99, rate))
lengthScale = 2.0 - (rate / 99.0) * 1.5
return max(0.5, min(2.0, lengthScale))
def _mapPitch(self, acssPitch):
"""Map ACSS pitch (0-9) to pitch adjustment factor.
Note: Piper's native pitch control is limited.
This maps to a factor that could be used for post-processing.
Arguments:
- acssPitch: Pitch value from 0-9
"""
pitch = acssPitch if acssPitch is not None else 5.0
pitch = max(0, min(9, pitch))
return pitch
def _mapVolume(self, acssGain):
"""Map ACSS gain (0-9) to volume (0.0-1.0).
Arguments:
- acssGain: Gain value from 0-9
"""
gain = acssGain if acssGain is not None else 10
gain = max(0, min(10, gain))
return gain / 10.0
def _setRate(self, acssRate):
"""Set the speech rate.
Arguments:
- acssRate: ACSS rate value (0-99)
"""
self._rate = acssRate if acssRate is not None else 50
msg = f'PIPER: Rate set to {self._rate}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setPitch(self, acssPitch):
"""Set the speech pitch.
Arguments:
- acssPitch: ACSS pitch value (0-9)
"""
self._pitch = acssPitch if acssPitch is not None else 5.0
msg = f'PIPER: Pitch set to {self._pitch}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setVolume(self, acssGain):
"""Set the speech volume.
Arguments:
- acssGain: ACSS gain value (0-9)
"""
self._volume = self._mapVolume(acssGain)
if self._audioPlayer:
self._audioPlayer.setVolume(self._volume)
msg = f'PIPER: Volume set to {self._volume}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setFamily(self, acssFamily):
"""Set the voice family.
Arguments:
- acssFamily: ACSS family dict with voice info
"""
if not acssFamily:
return
name = acssFamily.get(speechserver.VoiceFamily.NAME)
if not name:
return
for voice in self._voiceManager.getVoices():
if voice.displayName == name:
if voice != self._voiceInfo:
try:
self._loadVoice(voice)
except Exception as e:
msg = f'PIPER: Failed to load voice {name}: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
break
def _applyAcss(self, acss):
"""Apply ACSS voice settings.
Arguments:
- acss: ACSS settings to apply
"""
if acss is None:
acss = settings.voices[settings.DEFAULT_VOICE]
with self._lock:
current = self._currentVoiceProperties
for acssProperty, method in self._acssManipulators:
value = acss.get(acssProperty)
if value is not None:
if current.get(acssProperty) != value:
method(value)
current[acssProperty] = value
elif acssProperty == ACSS.AVERAGE_PITCH:
method(5.0)
current[acssProperty] = 5.0
elif acssProperty == ACSS.GAIN:
method(10)
current[acssProperty] = 10
elif acssProperty == ACSS.RATE:
method(50)
current[acssProperty] = 50
def _synthesize(self, text):
"""Synthesize text to audio data.
Arguments:
- text: Text to synthesize
Returns raw PCM audio data as bytes.
"""
if not self._voice or not text:
return None
lengthScale = self._mapRate(self._rate)
if _PiperSynthesisConfig is not None:
try:
synConfig = _PiperSynthesisConfig(length_scale=lengthScale)
audioChunks = self._voice.synthesize(text, syn_config=synConfig)
audioParts = []
sampleRate = None
for chunk in audioChunks:
if sampleRate is None:
sampleRate = chunk.sample_rate
audioParts.append(chunk.audio_int16_bytes)
if sampleRate and sampleRate != self._voiceInfo.sampleRate:
self._voiceInfo.sampleRate = sampleRate
if self._audioPlayer:
self._audioPlayer.setSampleRate(sampleRate)
audioData = b"".join(audioParts)
return audioData if audioData else None
except TypeError:
pass
wavBuffer = io.BytesIO()
with wave.open(wavBuffer, 'wb') as wavFile:
wavFile.setnchannels(1)
wavFile.setsampwidth(2)
wavFile.setframerate(self._voiceInfo.sampleRate)
self._voice.synthesize(
text,
wavFile,
length_scale=lengthScale
)
wavBuffer.seek(44)
return wavBuffer.read()
def _synthesizeAndPlay(self, text, acss, completionCallback=None, generation=0):
"""Synthesize and play text (runs in worker thread).
Arguments:
- text: Text to synthesize
- acss: ACSS settings
- completionCallback: Optional callback when complete
"""
try:
if self._stopEvent.is_set() or generation != self._speakGeneration:
return
self._applyAcss(acss)
audioData = self._synthesize(text)
if not audioData or self._stopEvent.is_set() or generation != self._speakGeneration:
return
if self._audioPlayer:
self._audioPlayer.play(audioData, False)
while self._audioPlayer.isPlaying():
if self._stopEvent.is_set() or generation != self._speakGeneration:
self._audioPlayer.stop()
return
time.sleep(0.01)
if completionCallback and not self._stopEvent.is_set() and generation == self._speakGeneration:
GLib.idle_add(completionCallback)
except Exception as e:
msg = f'PIPER: Synthesis error: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
def getInfo(self):
"""Returns [name, id]."""
return [guilabels.PIPER_TTS, self._id]
def getVoiceFamilies(self):
"""Returns a list of VoiceFamily instances."""
families = []
for voice in self._voiceManager.getVoices():
family = self._voiceManager.voiceToVoiceFamily(voice)
families.append(family)
return families
def speak(self, text=None, acss=None, interrupt=True):
"""Speaks the given text.
Arguments:
- text: Text to speak
- acss: ACSS voice settings
- interrupt: If True, stop any current speech first
"""
if not text or not self._voice:
return
if interrupt:
self.stop()
with self._lock:
self._stopEvent.clear()
generation = self._speakGeneration
msg = f"PIPER: Speaking '{text}'"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._currentFuture = self._executor.submit(
self._synthesizeAndPlay, text, acss, None, generation
)
def sayAll(self, utteranceIterator, progressCallback):
"""Iterates through utterances, speaking each one.
Arguments:
- utteranceIterator: Iterator yielding [SayAllContext, acss] tuples
- progressCallback: Called with progress updates
"""
GLib.idle_add(self._sayAllWorker, utteranceIterator, progressCallback)
def _sayAllWorker(self, iterator, callback):
"""Process one utterance at a time (called via GLib.idle_add).
Arguments:
- iterator: Utterance iterator
- callback: Progress callback
"""
try:
context, acss = next(iterator)
except StopIteration:
return False
def onComplete():
context.currentOffset = context.endOffset
callback(context.copy(), speechserver.SayAllContext.COMPLETED)
GLib.idle_add(self._sayAllWorker, iterator, callback)
context.currentOffset = context.startOffset
callback(context.copy(), speechserver.SayAllContext.PROGRESS)
with self._lock:
self._stopEvent.clear()
generation = self._speakGeneration
self._currentFuture = self._executor.submit(
self._synthesizeAndPlay, context.utterance, acss, onComplete, generation
)
return False
def speakCharacter(self, character, acss=None):
"""Speaks a single character immediately.
Arguments:
- character: Character to speak
- acss: ACSS voice settings
"""
name = chnames.getCharacterName(character)
if name and name != character:
if cthulhu_state.activeScript:
name = cthulhu_state.activeScript.utilities.adjustForPronunciation(name)
self.speak(name, acss)
else:
self.speak(character, acss)
def speakKeyEvent(self, event, acss=None):
"""Speaks a key event immediately.
Arguments:
- event: The KeyboardEvent to speak
- acss: ACSS voice settings
"""
eventString = event.getKeyName()
lockingStateString = event.getLockingStateString()
eventString = f"{eventString} {lockingStateString}".strip()
self.speak(eventString, acss)
def _changeDefaultSpeechRate(self, step, decrease=False):
"""Change the default speech rate.
Arguments:
- step: Amount to change
- decrease: If True, decrease rate; otherwise increase
"""
acss = settings.voices[settings.DEFAULT_VOICE]
delta = step * (-1 if decrease else 1)
try:
rate = acss[ACSS.RATE]
except KeyError:
rate = 50
acss[ACSS.RATE] = max(0, min(99, rate + delta))
msg = f"PIPER: Rate set to {acss[ACSS.RATE]}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self.speak(
messages.SPEECH_SLOWER if decrease else messages.SPEECH_FASTER,
acss=acss
)
def _changeDefaultSpeechPitch(self, step, decrease=False):
"""Change the default speech pitch.
Arguments:
- step: Amount to change
- decrease: If True, decrease pitch; otherwise increase
"""
acss = settings.voices[settings.DEFAULT_VOICE]
delta = step * (-1 if decrease else 1)
try:
pitch = acss[ACSS.AVERAGE_PITCH]
except KeyError:
pitch = 5
acss[ACSS.AVERAGE_PITCH] = max(0, min(9, pitch + delta))
msg = f"PIPER: Pitch set to {acss[ACSS.AVERAGE_PITCH]}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self.speak(
messages.SPEECH_LOWER if decrease else messages.SPEECH_HIGHER,
acss=acss
)
def _changeDefaultSpeechVolume(self, step, decrease=False):
"""Change the default speech volume.
Arguments:
- step: Amount to change
- decrease: If True, decrease volume; otherwise increase
"""
acss = settings.voices[settings.DEFAULT_VOICE]
delta = step * (-1 if decrease else 1)
try:
volume = acss[ACSS.GAIN]
except KeyError:
volume = 10
acss[ACSS.GAIN] = max(0, min(9, volume + delta))
msg = f"PIPER: Volume set to {acss[ACSS.GAIN]}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self.speak(
messages.SPEECH_SOFTER if decrease else messages.SPEECH_LOUDER,
acss=acss
)
def increaseSpeechRate(self, step=5):
"""Increases the speech rate."""
self._changeDefaultSpeechRate(step)
def decreaseSpeechRate(self, step=5):
"""Decreases the speech rate."""
self._changeDefaultSpeechRate(step, decrease=True)
def increaseSpeechPitch(self, step=0.5):
"""Increases the speech pitch."""
self._changeDefaultSpeechPitch(step)
def decreaseSpeechPitch(self, step=0.5):
"""Decreases the speech pitch."""
self._changeDefaultSpeechPitch(step, decrease=True)
def increaseSpeechVolume(self, step=0.5):
"""Increases the speech volume."""
self._changeDefaultSpeechVolume(step)
def decreaseSpeechVolume(self, step=0.5):
"""Decreases the speech volume."""
self._changeDefaultSpeechVolume(step, decrease=True)
def updateCapitalizationStyle(self):
"""Updates the capitalization style used by the speech server."""
pass
def updatePunctuationLevel(self):
"""Punctuation level changed, inform this speechServer."""
pass
def stop(self):
"""Stops ongoing speech and flushes the queue."""
with self._lock:
self._speakGeneration += 1
self._stopEvent.set()
if self._currentFuture:
self._currentFuture.cancel()
self._currentFuture = None
if self._audioPlayer:
self._audioPlayer.stop()
def shutdown(self):
"""Shuts down the speech engine."""
self.stop()
if self._executor:
self._executor.shutdown(wait=False)
self._executor = None
if self._audioPlayer:
self._audioPlayer.shutdown()
self._audioPlayer = None
self._voice = None
self._voiceInfo = None
if self._id in SpeechServer._active_servers:
del SpeechServer._active_servers[self._id]
msg = 'PIPER: Server shutdown complete'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def reset(self, text=None, acss=None):
"""Resets the speech engine."""
self.stop()
if self._voiceInfo:
try:
self._loadVoice(self._voiceInfo)
except Exception as e:
msg = f'PIPER: Failed to reset voice: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
+21 -6
View File
@@ -458,19 +458,34 @@ class PluginManager(Plugin):
active_plugins.remove(plugin_name)
_settingsManager.setSetting('activePlugins', active_plugins)
if hasattr(_settingsManager, "general") and isinstance(_settingsManager.general, dict):
_settingsManager.general['activePlugins'] = active_plugins
try:
current_general = _settingsManager.getGeneralSettings()
active_profile = _settingsManager.getSetting('activeProfile')
if isinstance(active_profile, (list, tuple)) and len(active_profile) > 1:
profile_name = active_profile[1]
else:
profile_name = _settingsManager.profile or 'default'
current_general = _settingsManager.getGeneralSettings(profile_name) or {}
current_general['activePlugins'] = active_plugins
_settingsManager.profile = profile_name
_settingsManager._setProfileGeneral(current_general)
pronunciations = _settingsManager.getPronunciations(profile_name) or {}
keybindings = _settingsManager.getKeybindings(profile_name) or {}
backend = _settingsManager._backend
if backend:
backend.saveDefaultSettings(
current_general,
_settingsManager.getPronunciations(),
_settingsManager.getKeybindings()
backend.saveProfileSettings(
profile_name,
_settingsManager.profileGeneral,
pronunciations,
keybindings
)
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Settings saved to backend", True)
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Settings saved to backend (profile {profile_name})", True)
else:
debug.printMessage(debug.LEVEL_INFO, "PluginManager: No backend available for saving", True)
@@ -19,3 +19,4 @@ will not start the server.
- The server listens on 127.0.0.1 and uses the port from NVDA2SPEECHD_HOST
if set; otherwise it defaults to 3457.
- Toggle interrupt/no-interrupt mode with cthulhu+shift+n.
- Toggle translation with cthulhu+control+shift+t (requires translate-shell).
@@ -23,6 +23,9 @@
import asyncio
import logging
import os
import shutil
from collections import OrderedDict
import subprocess
import threading
import urllib.parse
@@ -49,6 +52,9 @@ from cthulhu import settings_manager
logger = logging.getLogger(__name__)
DEFAULT_PORT = 3457
TRANSLATE_COMMAND = ("trans", "-no-autocorrect", "-no-warn", "-brief")
TRANSLATE_TIMEOUT = 5.0
TRANSLATE_CACHE_MAX = 512
def _coerce_text(value):
@@ -92,6 +98,8 @@ class Nvda2Cthulhu(Plugin):
self.httpServer = None
self.ioLoop = None
self.asyncioLoop = None
self.translationCache = OrderedDict()
self.translationCacheLock = threading.Lock()
@cthulhu_hookimpl
def activate(self, plugin=None):
@@ -105,6 +113,12 @@ class Nvda2Cthulhu(Plugin):
"kb:cthulhu+shift+n",
learnModeEnabled=True
)
self.registerGestureByString(
self.toggle_translation,
"NVDA to Cthulhu translation",
"kb:cthulhu+control+shift+t",
learnModeEnabled=True
)
if not self._dependencies_available():
self._present_message("NVDA to Cthulhu missing dependencies: python-msgpack and python-tornado")
logger.warning("NVDA to Cthulhu dependencies missing: msgpack or tornado")
@@ -160,6 +174,19 @@ class Nvda2Cthulhu(Plugin):
self._present_message(f"NVDA to Cthulhu {mode}")
return True
def toggle_translation(self, script=None, inputEvent=None):
if not self.settingsManager:
return False
if not self._translation_command_available():
self._present_message("NVDA to Cthulhu translation unavailable (missing translate-shell)")
return True
currentValue = bool(self.settingsManager.getSetting('nvda2cthulhuTranslateEnabled'))
newValue = not currentValue
self.settingsManager.setSetting('nvda2cthulhuTranslateEnabled', newValue)
mode = "translation enabled" if newValue else "translation disabled"
self._present_message(f"NVDA to Cthulhu {mode}")
return True
def handle_message(self, message):
request = self._parse_request(message)
if not request:
@@ -272,6 +299,8 @@ class Nvda2Cthulhu(Plugin):
def _handle_speak(self, text):
if not text or not text.strip():
return
if self._translation_enabled():
text = self._translate_text(text)
speech.speak(text, interrupt=self.interruptEnabled)
def _handle_braille(self, text):
@@ -307,3 +336,62 @@ class Nvda2Cthulhu(Plugin):
def _dependencies_available(self):
return msgpack is not None and tornado is not None
def _translation_enabled(self):
if not self.settingsManager:
return False
return bool(self.settingsManager.getSetting('nvda2cthulhuTranslateEnabled'))
def _translation_command_available(self):
return shutil.which(TRANSLATE_COMMAND[0]) is not None
def _translate_text(self, text):
cached = self._get_cached_translation(text)
if cached is not None:
return cached
if not self._translation_command_available():
logger.warning("NVDA to Cthulhu translation failed: translate-shell not available")
return text
try:
result = subprocess.run(
TRANSLATE_COMMAND,
input=text,
text=True,
capture_output=True,
check=False,
timeout=TRANSLATE_TIMEOUT
)
except subprocess.TimeoutExpired:
logger.warning("NVDA to Cthulhu translation failed: timed out")
return text
except Exception as exc:
logger.warning(f"NVDA to Cthulhu translation failed: {exc}")
return text
if result.returncode != 0:
stderr = result.stderr.strip()
if stderr:
logger.warning(f"NVDA to Cthulhu translation failed: {stderr}")
return text
output = result.stdout.strip()
if not output:
return text
self._set_cached_translation(text, output)
return output
def _get_cached_translation(self, text):
with self.translationCacheLock:
cached = self.translationCache.get(text)
if cached is None:
return None
self.translationCache.move_to_end(text)
return cached
def _set_cached_translation(self, text, translated):
with self.translationCacheLock:
if text in self.translationCache:
self.translationCache.move_to_end(text)
self.translationCache[text] = translated
while len(self.translationCache) > TRANSLATE_CACHE_MAX:
self.translationCache.popitem(last=False)
@@ -28,6 +28,9 @@ __copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import time
import re
from gi.repository import GLib
from cthulhu import debug
from cthulhu import settings
@@ -78,6 +81,12 @@ class Script(Chromium.Script):
# is not the focused application.
self.presentIfInactive = True
self._lastSteamNotification = ("", 0.0)
self._steamPendingNotification = None
self._steamNotificationDelayMs = 500
self._steamRelativeTimePattern = re.compile(
r"^(?:just now|now|\d+\s+(?:second|minute|hour|day|week|month|year)s?\s+ago)$",
re.IGNORECASE
)
def onShowingChanged(self, event):
"""Callback for object:state-changed:showing accessibility events."""
@@ -133,6 +142,18 @@ class Script(Chromium.Script):
super().onTextInserted(event)
def onSelectionChanged(self, event):
"""Callback for object:selection-changed accessibility events."""
self._logSteamNavigationEvent("selection-changed", event)
return super().onSelectionChanged(event)
def onActiveDescendantChanged(self, event):
"""Callback for object:active-descendant-changed accessibility events."""
self._logSteamNavigationEvent("active-descendant-changed", event)
return super().onActiveDescendantChanged(event)
def _isSteamNotification(self, obj):
"""Detect if object is a Steam notification.
@@ -175,6 +196,27 @@ class Script(Chromium.Script):
return False
def _findSteamNotificationRoot(self, obj):
if obj is None:
return None
if AXUtilities.is_notification(obj) or AXUtilities.is_alert(obj):
return obj
liveAttr = AXObject.get_attribute(obj, 'live')
containerLive = AXObject.get_attribute(obj, 'container-live')
if liveAttr in ['assertive', 'polite'] or containerLive in ['assertive', 'polite']:
return obj
def isNotificationCandidate(candidate):
if AXUtilities.is_notification(candidate) or AXUtilities.is_alert(candidate):
return True
candidateLive = AXObject.get_attribute(candidate, 'live')
candidateContainerLive = AXObject.get_attribute(candidate, 'container-live')
return candidateLive in ['assertive', 'polite'] or candidateContainerLive in ['assertive', 'polite']
return AXObject.find_ancestor(obj, isNotificationCandidate)
def _presentSteamNotification(self, obj):
"""Speak and save the notification.
@@ -189,6 +231,47 @@ class Script(Chromium.Script):
self._presentSteamNotificationText(text, obj)
def _logSteamNavigationEvent(self, label, event):
if not event:
return
sourceInfo = self._describeSteamObject(event.source)
anyDataInfo = self._describeSteamEventAnyData(event.any_data)
msg = f"STEAM: {label}: source={sourceInfo}; any_data={anyDataInfo}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _describeSteamEventAnyData(self, anyData):
if isinstance(anyData, str):
text = self._normalizeSteamNotificationText(anyData)
if not text:
return "string('')"
return f"string('{text}')"
if anyData is None:
return "None"
return self._describeSteamObject(anyData)
def _describeSteamObject(self, obj):
if obj is None:
return "None"
name = AXObject.get_name(obj) or ""
description = AXObject.get_description(obj) or ""
roleName = AXObject.get_role_name(obj) or ""
text = self.utilities.displayedText(obj) or ""
path = AXObject.get_path(obj)
name = self._normalizeSteamNotificationText(name)
description = self._normalizeSteamNotificationText(description)
text = self._normalizeSteamNotificationText(text)
return (
f"role='{roleName}' "
f"name='{name}' "
f"description='{description}' "
f"text='{text}' "
f"path={path}"
)
def _presentSteamLiveRegionText(self, event):
if not isinstance(event.any_data, str):
return False
@@ -200,9 +283,16 @@ class Script(Chromium.Script):
if not self._isSteamLiveRegion(event.source):
return False
notificationRoot = self._findSteamNotificationRoot(event.source)
sourceObj = notificationRoot or event.source
if notificationRoot:
fullText = self._getNotificationText(notificationRoot)
if fullText:
text = fullText
msg = f"STEAM: Live region text inserted: {text}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._presentSteamNotificationText(text, event.source)
self._presentSteamNotificationText(text, sourceObj)
return True
def _isSteamLiveRegion(self, obj):
@@ -223,6 +313,18 @@ class Script(Chromium.Script):
return AXObject.find_ancestor(obj, isLiveRegion) is not None
def _presentSteamNotificationText(self, text, obj):
self._queueSteamNotification(text, obj)
def _isDuplicateSteamNotification(self, text):
lastText, lastTime = self._lastSteamNotification
now = time.monotonic()
if text == lastText and (now - lastTime) < 1.0:
return True
self._lastSteamNotification = (text, now)
return False
def _presentSteamNotificationTextNow(self, text, obj):
if self._isDuplicateSteamNotification(text):
msg = "STEAM: Suppressing duplicate notification"
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -243,15 +345,126 @@ class Script(Chromium.Script):
msg = f"STEAM: Presented notification: {text}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _isDuplicateSteamNotification(self, text):
lastText, lastTime = self._lastSteamNotification
now = time.monotonic()
if text == lastText and (now - lastTime) < 1.0:
return True
def _normalizeSteamNotificationText(self, text):
if not text:
return ""
return " ".join(text.split())
self._lastSteamNotification = (text, now)
def _isSteamRelativeTimestamp(self, text):
text = self._normalizeSteamNotificationText(text)
if not text:
return False
return self._steamRelativeTimePattern.match(text) is not None
def _appendSteamTimestamp(self, baseText, timestampText):
if not baseText:
return timestampText
if baseText[-1] in ".!?":
return f"{baseText} {timestampText}"
return f"{baseText}. {timestampText}"
def _steamTextContains(self, text, other):
textNorm = self._normalizeSteamNotificationText(text).lower()
otherNorm = self._normalizeSteamNotificationText(other).lower()
if not textNorm or not otherNorm:
return False
if len(otherNorm) < 4:
return textNorm == otherNorm
return otherNorm in textNorm
def _queueSteamNotification(self, text, obj):
text = self._normalizeSteamNotificationText(text)
if not text:
return
pending = self._steamPendingNotification
if self._isSteamRelativeTimestamp(text):
if pending:
if pending.get("timestampOnly"):
pending["text"] = text
else:
pending["text"] = self._appendSteamTimestamp(pending["text"], text)
if obj:
pending["obj"] = obj
self._resetSteamPendingTimer()
return
self._steamPendingNotification = {
"text": text,
"obj": obj,
"timerId": None,
"timestampOnly": True
}
self._resetSteamPendingTimer()
return
if pending:
pendingText = pending["text"]
if pending.get("timestampOnly"):
pending["text"] = self._appendSteamTimestamp(text, pendingText)
pending["timestampOnly"] = False
if obj:
pending["obj"] = obj
self._resetSteamPendingTimer()
return
if text == pendingText:
if obj:
pending["obj"] = obj
return
if self._steamTextContains(text, pendingText):
pending["text"] = text
if obj:
pending["obj"] = obj
self._resetSteamPendingTimer()
return
if self._steamTextContains(pendingText, text):
if obj:
pending["obj"] = obj
return
self._flushSteamPendingNotification(fromTimer=False)
self._steamPendingNotification = {
"text": text,
"obj": obj,
"timerId": None,
"timestampOnly": False
}
self._resetSteamPendingTimer()
def _resetSteamPendingTimer(self):
pending = self._steamPendingNotification
if not pending:
return
timerId = pending.get("timerId")
if timerId is not None:
GLib.source_remove(timerId)
pending["timerId"] = GLib.timeout_add(
self._steamNotificationDelayMs,
self._onSteamPendingTimeout
)
def _onSteamPendingTimeout(self):
self._flushSteamPendingNotification(fromTimer=True)
return False
def _flushSteamPendingNotification(self, fromTimer):
pending = self._steamPendingNotification
if not pending:
return
timerId = pending.get("timerId")
if not fromTimer and timerId is not None:
GLib.source_remove(timerId)
self._steamPendingNotification = None
if pending.get("timestampOnly"):
msg = "STEAM: Dropping timestamp-only notification"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
self._presentSteamNotificationTextNow(pending["text"], pending["obj"])
def _getNotificationText(self, obj):
"""Extract text from notification element.
+2
View File
@@ -55,6 +55,7 @@ import cthulhu.input_event_manager as input_event_manager
import cthulhu.keybindings as keybindings
import cthulhu.messages as messages
import cthulhu.cthulhu as cthulhu
import cthulhu.cthulhu_modifier_manager as cthulhu_modifier_manager
import cthulhu.cthulhu_state as cthulhu_state
import cthulhu.phonnames as phonnames
import cthulhu.script as script
@@ -834,6 +835,7 @@ class Script(script.Script):
self.presentMessage(messages.BYPASS_MODE_ENABLED)
cthulhu_state.bypassNextCommand = True
cthulhu_modifier_manager.getManager().unsetCthulhuModifiers("Bypass next command enabled")
self.removeKeyGrabs()
return True
+4 -1
View File
@@ -37,6 +37,7 @@ __license__ = "LGPL"
import cthulhu.scripts.default as default
import cthulhu.debug as debug
import cthulhu.cthulhu_modifier_manager as cthulhu_modifier_manager
import cthulhu.sleep_mode_manager as sleep_mode_manager
class Script(default.Script):
@@ -55,6 +56,7 @@ class Script(default.Script):
debug.printMessage(debug.LEVEL_INFO, "SLEEP MODE SCRIPT: Activating", True)
super().activate()
cthulhu_modifier_manager.getManager().unsetCthulhuModifiers("Entering sleep mode.")
# Get the manager and add its bindings and handlers
manager = sleep_mode_manager.getManager()
@@ -76,6 +78,7 @@ class Script(default.Script):
# Restore key grabs
self.addKeyGrabs()
cthulhu_modifier_manager.getManager().refreshCthulhuModifiers("Exiting sleep mode.")
super().deactivate()
@@ -153,4 +156,4 @@ class Script(default.Script):
def get_script(app):
"""Returns the script for the given application."""
return Script(app)
return Script(app)
+3 -1
View File
@@ -65,6 +65,7 @@ userCustomizableSettings = [
"enableEchoBySentence",
"enableKeyEcho",
"gameMode",
"nvda2cthulhuTranslateEnabled",
"enableAlphabeticKeys",
"enableNumericKeys",
"enablePunctuationKeys",
@@ -274,7 +275,7 @@ activeProfile = ['Default', 'default']
profile = ['Default', 'default']
# Speech
speechFactoryModules = ["speechdispatcherfactory"]
speechFactoryModules = ["speechdispatcherfactory", "piperfactory"]
speechServerFactory = "speechdispatcherfactory"
speechServerInfo = None # None means let the factory decide.
enableSpeech = True
@@ -319,6 +320,7 @@ speechVerbosityLevel = VERBOSITY_LEVEL_VERBOSE
messagesAreDetailed = True
enablePauseBreaks = True
gameMode = False
nvda2cthulhuTranslateEnabled = False
speakDescription = True
speakContextBlockquote = True
speakContextPanel = True
+20 -7
View File
@@ -93,9 +93,9 @@ def init():
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Already initialized', True)
return
chosenModuleName = settings.speechServerFactory
try:
moduleName = settings.speechServerFactory
_initSpeechServer(moduleName, settings.speechServerInfo)
_initSpeechServer(chosenModuleName, settings.speechServerInfo)
except Exception:
moduleNames = settings.speechFactoryModules
for moduleName in moduleNames:
@@ -103,12 +103,21 @@ def init():
try:
_initSpeechServer(moduleName, None)
if _speechserver:
chosenModuleName = moduleName
break
except Exception:
debug.printException(debug.LEVEL_SEVERE)
if _speechserver:
tokens = ["SPEECH: Using speech server factory:", moduleName]
if chosenModuleName != settings.speechServerFactory:
settings.speechServerFactory = chosenModuleName
settings.speechServerInfo = None
tokens = [
"SPEECH: Falling back to speech server factory:",
chosenModuleName
]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
tokens = ["SPEECH: Using speech server factory:", chosenModuleName]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
else:
msg = 'SPEECH: Not available'
@@ -266,6 +275,7 @@ def speak(content, acss=None, interrupt=True):
if not isinstance(content, list):
return
shouldInterrupt = interrupt
toSpeak = []
activeVoice = acss
if acss is not None:
@@ -275,14 +285,16 @@ def speak(content, acss=None, interrupt=True):
if not isinstance(element, validTypes):
debug.printMessage(debug.LEVEL_INFO, error % element, True)
elif isinstance(element, list):
speak(element, acss, interrupt)
speak(element, acss, shouldInterrupt)
shouldInterrupt = False
elif isinstance(element, str):
if len(element):
toSpeak.append(element)
elif isinstance(element, Icon):
if toSpeak:
string = " ".join(toSpeak)
_speak(string, activeVoice, interrupt)
_speak(string, activeVoice, shouldInterrupt)
shouldInterrupt = False
toSpeak = []
if element.isValid():
player = sound.getPlayer()
@@ -305,13 +317,14 @@ def speak(content, acss=None, interrupt=True):
if toSpeak:
string = " ".join(toSpeak)
_speak(string, activeVoice, interrupt)
_speak(string, activeVoice, shouldInterrupt)
shouldInterrupt = False
activeVoice = newVoice
toSpeak = newItemsToSpeak
if toSpeak:
string = " ".join(toSpeak)
_speak(string, activeVoice, interrupt)
_speak(string, activeVoice, shouldInterrupt)
def speakKeyEvent(event, acss=None):
"""Speaks a key event immediately.
+53 -11
View File
@@ -346,11 +346,11 @@ class SpeechAndVerbosityManager:
return f"{value:.1f}".rstrip("0").rstrip(".")
return str(value)
def _present_message(self, script, message):
def _present_message(self, script, message, voice=None):
if script:
script.presentMessage(message)
script.presentMessage(message, voice=voice)
else:
speech.speak(message)
speech.speak(message, voice)
def _get_default_voice(self):
from . import acss
@@ -370,7 +370,23 @@ class SpeechAndVerbosityManager:
default_voice['established'] = True
def _get_current_speech_setting(self):
return self._speech_settings_order[self._current_speech_setting_index]
order = self._get_speech_settings_order()
if not order:
return ""
if self._current_speech_setting_index < 0:
self._current_speech_setting_index = 0
elif self._current_speech_setting_index >= len(order):
self._current_speech_setting_index = len(order) - 1
return order[self._current_speech_setting_index]
def _get_speech_settings_order(self):
order = ["rate", "pitch", "volume"]
server = self._get_server()
if server and hasattr(server, "list_output_modules") and hasattr(server, "getOutputModule"):
order.append("module")
if server and hasattr(server, "getVoiceFamilies"):
order.append("voice")
return order
def _get_rate_value(self):
from . import acss
@@ -410,14 +426,28 @@ class SpeechAndVerbosityManager:
default_voice = self._get_default_voice()
family = default_voice.get(acss.ACSS.FAMILY, {}) or {}
name = family.get(speechserver.VoiceFamily.NAME)
if name and server:
voices = self._get_available_voices(server)
if voices:
for voice in voices:
if voice.get(speechserver.VoiceFamily.NAME) == name:
return name
self._set_default_voice_family(voices[0])
return voices[0].get(speechserver.VoiceFamily.NAME)
if name:
return name
if server:
voices = self._get_available_voices(server)
if voices:
self._set_default_voice_family(voices[0])
return voices[0].get(speechserver.VoiceFamily.NAME)
return ""
def _get_voice_messages(self, server):
if server and hasattr(server, "list_output_modules"):
return messages.SPEECH_VOICE_VALUE, messages.SPEECH_VOICES_UNAVAILABLE
return messages.SPEECH_VOICE_VALUE_GENERIC, messages.SPEECH_VOICES_UNAVAILABLE_GENERIC
def _get_available_modules(self, server):
if server is None or not hasattr(server, 'list_output_modules'):
return []
@@ -539,27 +569,37 @@ class SpeechAndVerbosityManager:
elif setting == "voice":
server = self._get_server()
voices = self._get_available_voices(server)
voice_value_message, voice_unavailable_message = self._get_voice_messages(server)
if not voices:
message = messages.SPEECH_VOICES_UNAVAILABLE
message = voice_unavailable_message
else:
name = self._get_current_voice_name(server)
message = messages.SPEECH_VOICE_VALUE % name
message = voice_value_message % name
else:
message = ""
if message:
self._present_message(script, message)
voice = self._get_default_voice() if setting == "voice" else None
self._present_message(script, message, voice=voice)
@dbus_service.command
def select_previous_speech_setting(self, script=None, event=None):
if self._current_speech_setting_index > 0:
order = self._get_speech_settings_order()
if not order:
return True
if self._current_speech_setting_index >= len(order):
self._current_speech_setting_index = len(order) - 1
elif self._current_speech_setting_index > 0:
self._current_speech_setting_index -= 1
self._announce_current_speech_setting(script)
return True
@dbus_service.command
def select_next_speech_setting(self, script=None, event=None):
if self._current_speech_setting_index < len(self._speech_settings_order) - 1:
order = self._get_speech_settings_order()
if not order:
return True
if self._current_speech_setting_index < len(order) - 1:
self._current_speech_setting_index += 1
self._announce_current_speech_setting(script)
return True
@@ -663,7 +703,8 @@ class SpeechAndVerbosityManager:
server = self._get_server()
voices = self._get_available_voices(server)
if not voices:
self._present_message(script, messages.SPEECH_VOICES_UNAVAILABLE)
_, voice_unavailable_message = self._get_voice_messages(server)
self._present_message(script, voice_unavailable_message)
return True
current_name = self._get_current_voice_name(server)
@@ -680,7 +721,8 @@ class SpeechAndVerbosityManager:
name = new_voice.get(speechserver.VoiceFamily.NAME, "")
msg = f"SPEECH AND VERBOSITY MANAGER: Voice set to {name}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._present_message(script, messages.SPEECH_VOICE_VALUE % name)
voice_value_message, _ = self._get_voice_messages(server)
self._present_message(script, voice_value_message % name, voice=self._get_default_voice())
return True
@dbus_service.command