diff --git a/src/cthulhu/guilabels.py b/src/cthulhu/guilabels.py index a057472..dc2fd60 100644 --- a/src/cthulhu/guilabels.py +++ b/src/cthulhu/guilabels.py @@ -851,6 +851,10 @@ SPEECH_VOICE_TYPE_UPPERCASE = C_("VoiceType", "Uppercase") # system. (http://devel.freebsoft.org/speechd) SPEECH_DISPATCHER = _("Speech Dispatcher") +# Translators: This label refers to the Piper neural text-to-speech system. +# (https://github.com/rhasspy/piper) +PIPER_TTS = _("Piper Neural TTS") + # Translators: This is a label for a group of options related to Cthulhu's behavior # when presenting an application's spell check dialog. SPELL_CHECK = C_("OptionGroup", "Spell Check") diff --git a/src/cthulhu/learn_mode_presenter.py b/src/cthulhu/learn_mode_presenter.py index e64c067..4e28902 100644 --- a/src/cthulhu/learn_mode_presenter.py +++ b/src/cthulhu/learn_mode_presenter.py @@ -37,8 +37,11 @@ import gi gi.require_version("Atspi", "2.0") gi.require_version("Gdk", "3.0") gi.require_version("Gtk", "3.0") +gi.require_version("Gio", "2.0") from gi.repository import Atspi from gi.repository import Gdk +from gi.repository import Gio +from gi.repository import GLib from gi.repository import GObject from gi.repository import Gtk @@ -305,7 +308,22 @@ class LearnModePresenter: uri = "help:cthulhu" if page: uri += f"/{page}" - Gtk.show_uri(Gdk.Screen.get_default(), uri, Gtk.get_current_event_time()) + try: + Gtk.show_uri(Gdk.Screen.get_default(), uri, Gtk.get_current_event_time()) + return True + except GLib.GError as error: + msg = f"LEARN MODE PRESENTER: Failed to open help URI {uri}: {error}" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + + try: + Gio.AppInfo.launch_default_for_uri(uri, None) + return True + except GLib.GError as error: + msg = f"LEARN MODE PRESENTER: Failed to launch help URI {uri}: {error}" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + + if script: + script.presentMessage(messages.HELP_NOT_AVAILABLE) return True class CommandListGUI: diff --git a/src/cthulhu/meson.build b/src/cthulhu/meson.build index 9b15cf9..c239c0e 100644 --- a/src/cthulhu/meson.build +++ b/src/cthulhu/meson.build @@ -94,6 +94,9 @@ cthulhu_python_sources = files([ 'speechdispatcherfactory.py', 'speech_generator.py', 'speechserver.py', + 'piperfactory.py', + 'piper_voice_manager.py', + 'piper_audio_player.py', 'structural_navigation.py', 'text_attribute_names.py', 'translation_context.py', diff --git a/src/cthulhu/messages.py b/src/cthulhu/messages.py index 26d8466..1454c5f 100644 --- a/src/cthulhu/messages.py +++ b/src/cthulhu/messages.py @@ -1498,6 +1498,9 @@ LINE_UNSELECTED_UP = _("line unselected up from cursor position") # exiting Learn Mode. LEARN_MODE_STOP = _("Exiting learn mode.") +# Translators: This message is presented when help cannot be opened. +HELP_NOT_AVAILABLE = _("Help is not available.") + # Translators: when the user selects (highlights) or unselects text in a # document, Cthulhu will speak information about what they have selected or # unselected. This message is presented when the user selects from the @@ -2290,12 +2293,18 @@ SPEECH_MODULE_VALUE = _("Speech-dispatcher module %s") # Translators: This string announces the current speech-dispatcher voice. SPEECH_VOICE_VALUE = _("Speech-dispatcher voice %s") +# Translators: This string announces the current voice for non-speech-dispatcher engines. +SPEECH_VOICE_VALUE_GENERIC = _("Voice %s") + # Translators: This string is presented when speech-dispatcher modules are unavailable. SPEECH_MODULES_UNAVAILABLE = _("No speech-dispatcher modules available") # Translators: This string is presented when speech-dispatcher voices are unavailable. SPEECH_VOICES_UNAVAILABLE = _("No speech-dispatcher voices available") +# Translators: This string is presented when voices are unavailable for non-speech-dispatcher engines. +SPEECH_VOICES_UNAVAILABLE_GENERIC = _("No voices available") + # Translators: This string confirms speech settings have been saved. SPEECH_SETTINGS_SAVED = _("Speech settings saved") diff --git a/src/cthulhu/piper_audio_player.py b/src/cthulhu/piper_audio_player.py new file mode 100644 index 0000000..7f29e5f --- /dev/null +++ b/src/cthulhu/piper_audio_player.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the +# Free Software Foundation, Inc., Franklin Street, Fifth Floor, +# Boston MA 02110-1301 USA. +# +# Cthulhu project: https://git.stormux.org/storm/cthulhu + +"""GStreamer-based audio player for Piper TTS synthesis output.""" + +__id__ = "$Id:$" +__version__ = "$Revision:$" +__date__ = "$Date:$" +__copyright__ = "Copyright (c) 2024 Stormux" +__license__ = "LGPL" + +import threading + +import gi +from gi.repository import GLib + +try: + gi.require_version('Gst', '1.0') + from gi.repository import Gst +except Exception: + _gstreamerAvailable = False +else: + _gstreamerAvailable, args = Gst.init_check(None) + +from . import debug + + +class PiperAudioPlayer: + """GStreamer-based audio player for Piper TTS output. + + Handles raw PCM audio data from Piper synthesis and plays it through + a GStreamer pipeline with volume control. + """ + + def __init__(self, sampleRate=22050): + """Initialize the audio player. + + Arguments: + - sampleRate: Audio sample rate in Hz (default 22050, common for Piper) + """ + self._sampleRate = sampleRate + self._pipeline = None + self._appsrc = None + self._volume = None + self._initialized = False + self._playing = False + self._lock = threading.Lock() + self._completionCallback = None + + if not _gstreamerAvailable: + msg = 'PIPER AUDIO: GStreamer is not available' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return + + self._init() + + def _init(self): + """Initialize the GStreamer pipeline.""" + if self._initialized: + return True + + if not _gstreamerAvailable: + return False + + try: + self._pipeline = Gst.Pipeline.new("piper-audio") + + self._appsrc = Gst.ElementFactory.make("appsrc", "source") + if self._appsrc is None: + msg = 'PIPER AUDIO: Failed to create appsrc element' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + self._appsrc.set_property("format", Gst.Format.TIME) + self._appsrc.set_property("is-live", False) + self._appsrc.set_property("block", False) + + caps = Gst.Caps.from_string( + f"audio/x-raw,format=S16LE,channels=1," + f"rate={self._sampleRate},layout=interleaved" + ) + self._appsrc.set_property("caps", caps) + + convert = Gst.ElementFactory.make("audioconvert", "convert") + resample = Gst.ElementFactory.make("audioresample", "resample") + + self._volume = Gst.ElementFactory.make("volume", "volume") + if self._volume is None: + msg = 'PIPER AUDIO: Failed to create volume element' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + sink = Gst.ElementFactory.make("autoaudiosink", "sink") + if sink is None: + msg = 'PIPER AUDIO: Failed to create autoaudiosink element' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + for element in [self._appsrc, convert, resample, self._volume, sink]: + self._pipeline.add(element) + + if not self._appsrc.link(convert): + msg = 'PIPER AUDIO: Failed to link appsrc to convert' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + if not convert.link(resample): + msg = 'PIPER AUDIO: Failed to link convert to resample' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + if not resample.link(self._volume): + msg = 'PIPER AUDIO: Failed to link resample to volume' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + if not self._volume.link(sink): + msg = 'PIPER AUDIO: Failed to link volume to sink' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + bus = self._pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", self._onMessage) + + self._initialized = True + msg = 'PIPER AUDIO: Pipeline initialized successfully' + debug.printMessage(debug.LEVEL_INFO, msg, True) + return True + + except Exception as e: + msg = f'PIPER AUDIO: Failed to initialize pipeline: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + def _onMessage(self, bus, message): + """Handle GStreamer bus messages.""" + if message.type == Gst.MessageType.EOS: + self._pipeline.set_state(Gst.State.NULL) + with self._lock: + self._playing = False + if self._completionCallback: + GLib.idle_add(self._completionCallback) + self._completionCallback = None + elif message.type == Gst.MessageType.ERROR: + self._pipeline.set_state(Gst.State.NULL) + with self._lock: + self._playing = False + error, info = message.parse_error() + msg = f'PIPER AUDIO ERROR: {error}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + + def setSampleRate(self, sampleRate): + """Update the sample rate for the audio stream. + + Arguments: + - sampleRate: New sample rate in Hz + """ + if sampleRate != self._sampleRate: + self._sampleRate = sampleRate + self._initialized = False + self.stop() + self._init() + + def setVolume(self, volumeLevel): + """Set the playback volume. + + Arguments: + - volumeLevel: Volume level from 0.0 to 1.0 + """ + if self._volume is not None: + volume = max(0.0, min(1.0, volumeLevel)) + self._volume.set_property("volume", volume) + + def play(self, audioData, interrupt=True, completionCallback=None): + """Play raw PCM audio data. + + Arguments: + - audioData: Raw PCM audio data as bytes (16-bit signed, little-endian) + - interrupt: If True, stop any current playback first + - completionCallback: Optional callback to invoke when playback completes + """ + if not self._initialized: + if not self._init(): + msg = 'PIPER AUDIO: Cannot play - not initialized' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + if interrupt: + self.stop() + + with self._lock: + self._playing = True + + self._completionCallback = completionCallback + + self._pipeline.set_state(Gst.State.PLAYING) + + buf = Gst.Buffer.new_wrapped(audioData) + result = self._appsrc.emit("push-buffer", buf) + if result != Gst.FlowReturn.OK: + msg = f'PIPER AUDIO: Failed to push buffer: {result}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + + self._appsrc.emit("end-of-stream") + return True + + def playStream(self, audioGenerator, interrupt=True, completionCallback=None): + """Play audio from a streaming generator. + + Arguments: + - audioGenerator: Iterator/generator yielding audio chunks as bytes + - interrupt: If True, stop any current playback first + - completionCallback: Optional callback when playback completes + """ + if not self._initialized: + if not self._init(): + msg = 'PIPER AUDIO: Cannot play stream - not initialized' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + if interrupt: + self.stop() + + with self._lock: + self._playing = True + stopRequested = False + + self._completionCallback = completionCallback + + self._pipeline.set_state(Gst.State.PLAYING) + + def feedThread(): + try: + for audioChunk in audioGenerator: + with self._lock: + if not self._playing: + break + + buf = Gst.Buffer.new_wrapped(bytes(audioChunk)) + result = self._appsrc.emit("push-buffer", buf) + if result != Gst.FlowReturn.OK: + msg = f'PIPER AUDIO: Stream push failed: {result}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + break + except Exception as e: + msg = f'PIPER AUDIO: Stream feed error: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + finally: + self._appsrc.emit("end-of-stream") + + thread = threading.Thread(target=feedThread, daemon=True) + thread.start() + return True + + def isPlaying(self): + """Check if audio is currently playing. + + Returns True if playback is in progress. + """ + with self._lock: + return self._playing + + def stop(self): + """Stop any current playback.""" + with self._lock: + self._playing = False + + if self._pipeline is not None: + self._pipeline.set_state(Gst.State.NULL) + + self._completionCallback = None + + def shutdown(self): + """Shut down the audio player and release resources.""" + self.stop() + self._initialized = False + if self._pipeline is not None: + self._pipeline.set_state(Gst.State.NULL) + self._pipeline = None + self._appsrc = None + self._volume = None diff --git a/src/cthulhu/piper_voice_manager.py b/src/cthulhu/piper_voice_manager.py new file mode 100644 index 0000000..bf288d7 --- /dev/null +++ b/src/cthulhu/piper_voice_manager.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the +# Free Software Foundation, Inc., Franklin Street, Fifth Floor, +# Boston MA 02110-1301 USA. +# +# Cthulhu project: https://git.stormux.org/storm/cthulhu + +"""Voice discovery and management for Piper TTS.""" + +__id__ = "$Id:$" +__version__ = "$Revision:$" +__date__ = "$Date:$" +__copyright__ = "Copyright (c) 2024 Stormux" +__license__ = "LGPL" + +import json +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional + +from . import debug +from . import speechserver + + +@dataclass +class PiperVoiceInfo: + """Metadata for a Piper voice model.""" + + name: str + language: str + dialect: str + quality: str + modelPath: Path + configPath: Path + sampleRate: int = 22050 + speakers: Dict[int, str] = field(default_factory=dict) + + @property + def key(self) -> str: + """Return a unique key for this voice.""" + dialectPart = f"-{self.dialect}" if self.dialect else "" + return f"{self.language}{dialectPart}-{self.name}-{self.quality}" + + @property + def displayName(self) -> str: + """Return a human-readable display name.""" + dialectPart = f" ({self.dialect})" if self.dialect else "" + return f"{self.name} - {self.language}{dialectPart} [{self.quality}]" + + @property + def isMultiSpeaker(self) -> bool: + """Return True if this is a multi-speaker model.""" + return len(self.speakers) > 1 + + +class PiperVoiceManager: + """Discovers and manages Piper voice models. + + Searches standard paths for Piper voice models (.onnx files with + companion .onnx.json config files) and provides methods to list + and load them. + """ + + VOICE_SEARCH_PATHS = [ + "~/.local/share/piper/voices", + "~/.local/share/piper-tts/voices", + "~/.config/piper/voices", + "$XDG_DATA_HOME/piper/voices", + "$XDG_DATA_HOME/piper-tts/voices", + "$XDG_DATA_HOME/cthulhu/piper-voices", + "/usr/share/piper/voices", + "/usr/share/piper-voices", + "/usr/share/piper-tts/voices", + "/usr/local/share/piper/voices", + "/usr/local/share/piper-tts/voices", + ] + + VOICE_FILENAME_PATTERN = re.compile( + r'^(?P[a-z]{2})(?:_(?P[A-Z]{2}))?' + r'-(?P[a-zA-Z0-9_]+)' + r'-(?Plow|medium|high|x_low)\.onnx$' + ) + + def __init__(self, customPath=None): + """Initialize the voice manager. + + Arguments: + - customPath: Optional additional path to search for voices + """ + self._customPath = customPath + self._voices = [] + self._voiceCache = {} + + def discoverVoices(self) -> List[PiperVoiceInfo]: + """Discover all available Piper voices. + + Searches standard paths and returns a list of PiperVoiceInfo + objects for each valid voice found. + """ + self._voices = [] + searchPaths = self._getSearchPaths() + + for searchPath in searchPaths: + path = Path(os.path.expandvars(os.path.expanduser(searchPath))) + if not path.exists(): + continue + + msg = f'PIPER VOICES: Searching {path}' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + for onnxFile in path.rglob("*.onnx"): + configFile = Path(str(onnxFile) + ".json") + if not configFile.exists(): + configFile = onnxFile.with_suffix(".onnx.json") + + if configFile.exists(): + try: + voice = self._parseVoice(onnxFile, configFile) + if voice: + self._voices.append(voice) + msg = f'PIPER VOICES: Found voice {voice.displayName}' + debug.printMessage(debug.LEVEL_INFO, msg, True) + except Exception as e: + msg = f'PIPER VOICES: Failed to parse {onnxFile}: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + else: + voice = self._parseVoiceFromFilename(onnxFile) + if voice: + self._voices.append(voice) + msg = f'PIPER VOICES: Found voice {voice.displayName} (no config)' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + self._voices.sort(key=lambda v: (v.language, v.name, v.quality)) + + msg = f'PIPER VOICES: Discovered {len(self._voices)} voice(s)' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + return self._voices + + def getVoices(self) -> List[PiperVoiceInfo]: + """Get the list of discovered voices. + + Returns cached list; call discoverVoices() first to refresh. + """ + if not self._voices: + self.discoverVoices() + return self._voices + + def getVoiceByKey(self, key: str) -> Optional[PiperVoiceInfo]: + """Get a voice by its unique key. + + Arguments: + - key: Voice key (e.g., "en_US-lessac-medium") + """ + for voice in self.getVoices(): + if voice.key == key: + return voice + return None + + def getVoiceByName(self, name: str) -> Optional[PiperVoiceInfo]: + """Get a voice by name (first match). + + Arguments: + - name: Voice name (e.g., "lessac") + """ + for voice in self.getVoices(): + if voice.name == name: + return voice + return None + + def getVoicesForLanguage(self, lang: str) -> List[PiperVoiceInfo]: + """Get all voices for a specific language. + + Arguments: + - lang: Language code (e.g., "en") + """ + return [v for v in self.getVoices() if v.language == lang] + + def voiceToVoiceFamily(self, voiceInfo: PiperVoiceInfo) -> speechserver.VoiceFamily: + """Convert a PiperVoiceInfo to a Cthulhu VoiceFamily. + + Arguments: + - voiceInfo: The PiperVoiceInfo to convert + """ + return speechserver.VoiceFamily({ + speechserver.VoiceFamily.NAME: voiceInfo.displayName, + speechserver.VoiceFamily.LANG: voiceInfo.language, + speechserver.VoiceFamily.DIALECT: voiceInfo.dialect or "", + speechserver.VoiceFamily.VARIANT: voiceInfo.quality, + }) + + def _getSearchPaths(self) -> List[str]: + """Get the list of paths to search for voices.""" + paths = list(self.VOICE_SEARCH_PATHS) + + if self._customPath: + paths.insert(0, self._customPath) + + xdgDataHome = os.environ.get("XDG_DATA_HOME", "~/.local/share") + paths = [p.replace("$XDG_DATA_HOME", xdgDataHome) for p in paths] + + return paths + + def _parseVoice(self, modelPath: Path, configPath: Path) -> Optional[PiperVoiceInfo]: + """Parse a voice from its model and config files. + + Arguments: + - modelPath: Path to the .onnx model file + - configPath: Path to the .json config file + """ + try: + with open(configPath, 'r', encoding='utf-8') as f: + config = json.load(f) + except (json.JSONDecodeError, IOError) as e: + msg = f'PIPER VOICES: Failed to read config {configPath}: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return None + + language = "" + dialect = "" + name = modelPath.stem + quality = "medium" + sampleRate = 22050 + speakers = {} + + if "language" in config: + langInfo = config["language"] + if isinstance(langInfo, dict): + language = langInfo.get("code", "") + if "_" in language: + parts = language.split("_") + language = parts[0] + dialect = parts[1] if len(parts) > 1 else "" + elif isinstance(langInfo, str): + if "_" in langInfo: + parts = langInfo.split("_") + language = parts[0] + dialect = parts[1] if len(parts) > 1 else "" + else: + language = langInfo + + if "audio" in config: + sampleRate = config["audio"].get("sample_rate", 22050) + + if "speaker_id_map" in config: + speakers = {v: k for k, v in config["speaker_id_map"].items()} + + match = self.VOICE_FILENAME_PATTERN.match(modelPath.name) + if match: + if not language: + language = match.group("lang") + if not dialect: + dialect = match.group("dialect") or "" + name = match.group("name") + quality = match.group("quality").replace("x_low", "x-low") + else: + stem = modelPath.stem + for q in ["low", "medium", "high", "x_low", "x-low"]: + if stem.endswith(f"-{q}"): + quality = q.replace("x_low", "x-low") + stem = stem[:-len(q)-1] + break + + parts = stem.split("-") + if parts: + langPart = parts[0] + if "_" in langPart: + langParts = langPart.split("_") + if not language: + language = langParts[0] + if not dialect: + dialect = langParts[1] if len(langParts) > 1 else "" + elif not language: + language = langPart + + if len(parts) > 1: + name = parts[1] + + if not language: + language = "unknown" + + return PiperVoiceInfo( + name=name, + language=language, + dialect=dialect, + quality=quality, + modelPath=modelPath, + configPath=configPath, + sampleRate=sampleRate, + speakers=speakers + ) + + def _parseVoiceFromFilename(self, modelPath: Path) -> Optional[PiperVoiceInfo]: + """Parse voice info from filename only (no config file). + + Arguments: + - modelPath: Path to the .onnx model file + """ + match = self.VOICE_FILENAME_PATTERN.match(modelPath.name) + if not match: + return None + + return PiperVoiceInfo( + name=match.group("name"), + language=match.group("lang"), + dialect=match.group("dialect") or "", + quality=match.group("quality").replace("x_low", "x-low"), + modelPath=modelPath, + configPath=modelPath.with_suffix(".onnx.json"), + sampleRate=22050, + speakers={} + ) + + +_manager = None + + +def getManager(customPath=None) -> PiperVoiceManager: + """Get the singleton voice manager instance. + + Arguments: + - customPath: Optional custom path to search for voices + """ + global _manager + if _manager is None: + _manager = PiperVoiceManager(customPath) + return _manager diff --git a/src/cthulhu/piperfactory.py b/src/cthulhu/piperfactory.py new file mode 100644 index 0000000..7ee19b2 --- /dev/null +++ b/src/cthulhu/piperfactory.py @@ -0,0 +1,741 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the +# Free Software Foundation, Inc., Franklin Street, Fifth Floor, +# Boston MA 02110-1301 USA. +# +# Cthulhu project: https://git.stormux.org/storm/cthulhu + +"""Provides a Cthulhu speech server for Piper TTS backend.""" + +__id__ = "$Id$" +__version__ = "$Revision$" +__date__ = "$Date$" +__copyright__ = "Copyright (c) 2024 Stormux" +__license__ = "LGPL" + +import io +import os +import sys +import threading +import time +import wave +from concurrent.futures import ThreadPoolExecutor + +from gi.repository import GLib + +from . import chnames +from . import debug +from . import guilabels +from . import messages +from . import settings +from . import speechserver +from . import cthulhu_state +from .acss import ACSS +from . import piper_voice_manager +from . import piper_audio_player + +def _getPipxSitePackages(): + pipxHome = os.environ.get("PIPX_HOME", os.path.expanduser("~/.local/pipx")) + pythonVersion = f"python{sys.version_info.major}.{sys.version_info.minor}" + sitePackages = os.path.join( + pipxHome, + "venvs", + "piper-tts", + "lib", + pythonVersion, + "site-packages" + ) + if os.path.isdir(sitePackages): + return sitePackages + return None + + +def _tryImportPiper(): + try: + from piper.voice import PiperVoice + return PiperVoice + except ImportError: + pass + + sitePackages = _getPipxSitePackages() + if not sitePackages: + return None + + addedPath = False + if sitePackages not in sys.path: + sys.path.insert(0, sitePackages) + addedPath = True + + try: + from piper.voice import PiperVoice + msg = f'PIPER: Loaded piper-tts from pipx venv ({sitePackages})' + debug.printMessage(debug.LEVEL_INFO, msg, True) + return PiperVoice + except ImportError: + if addedPath: + try: + sys.path.remove(sitePackages) + except ValueError: + pass + return None + + +PiperVoice = _tryImportPiper() +_piperAvailable = PiperVoice is not None +if _piperAvailable: + try: + from piper.config import SynthesisConfig as _PiperSynthesisConfig + except Exception: + _PiperSynthesisConfig = None +else: + _PiperSynthesisConfig = None +if not _piperAvailable: + msg = 'PIPER: piper-tts library not available' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + +class SpeechServer(speechserver.SpeechServer): + """Piper TTS speech server implementation. + + Provides speech synthesis using Piper neural text-to-speech, + implementing the Cthulhu SpeechServer interface. + """ + + _active_servers = {} + + DEFAULT_SERVER_ID = 'piper-default' + + @staticmethod + def getFactoryName(): + """Returns a localized name describing this factory.""" + return guilabels.PIPER_TTS + + @staticmethod + def getSpeechServers(): + """Gets available speech servers as a list.""" + servers = [] + + if not _piperAvailable: + msg = 'PIPER: Cannot list servers - piper-tts not installed' + debug.printMessage(debug.LEVEL_INFO, msg, True) + return servers + + manager = piper_voice_manager.getManager() + voices = manager.discoverVoices() + + if not voices: + msg = 'PIPER: No voice models found' + debug.printMessage(debug.LEVEL_INFO, msg, True) + return servers + + server = SpeechServer._getSpeechServer(SpeechServer.DEFAULT_SERVER_ID) + if server is not None: + servers.append(server) + + return servers + + @classmethod + def _getSpeechServer(cls, serverId): + """Return an active server for given id. + + Attempt to create the server if it doesn't exist yet. + Returns None when it is not possible to create the server. + """ + if serverId not in cls._active_servers: + cls(serverId) + return cls._active_servers.get(serverId) + + @staticmethod + def getSpeechServer(info=None): + """Gets a given SpeechServer based upon the info.""" + thisId = info[1] if info is not None else SpeechServer.DEFAULT_SERVER_ID + return SpeechServer._getSpeechServer(thisId) + + @staticmethod + def shutdownActiveServers(): + """Cleans up and shuts down this factory.""" + servers = list(SpeechServer._active_servers.values()) + for server in servers: + server.shutdown() + + def __init__(self, serverId): + """Initialize the Piper speech server. + + Arguments: + - serverId: Identifier for this server instance + """ + super(SpeechServer, self).__init__() + self._id = serverId + self._voice = None + self._voiceInfo = None + self._voiceManager = piper_voice_manager.getManager() + self._audioPlayer = None + self._executor = None + self._currentFuture = None + self._stopEvent = threading.Event() + self._lock = threading.Lock() + self._speakGeneration = 0 + + self._currentVoiceProperties = {} + self._acssManipulators = ( + (ACSS.RATE, self._setRate), + (ACSS.AVERAGE_PITCH, self._setPitch), + (ACSS.GAIN, self._setVolume), + (ACSS.FAMILY, self._setFamily), + ) + + self._rate = 50 + self._pitch = 5.0 + self._volume = 1.0 + + if not _piperAvailable: + msg = 'PIPER: piper-tts library not available' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return + + try: + self._init() + except Exception as e: + debug.printException(debug.LEVEL_WARNING) + msg = f'PIPER: Failed to initialize server: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + else: + SpeechServer._active_servers[serverId] = self + + def _init(self): + """Initialize the speech server components.""" + voices = self._voiceManager.getVoices() + if not voices: + raise RuntimeError("No Piper voices found") + + self._voiceInfo = voices[0] + self._loadVoice(self._voiceInfo) + + self._audioPlayer = piper_audio_player.PiperAudioPlayer( + self._voiceInfo.sampleRate + ) + + self._executor = ThreadPoolExecutor( + max_workers=1, + thread_name_prefix="piper" + ) + + msg = f'PIPER: Initialized with voice {self._voiceInfo.displayName}' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def _loadVoice(self, voiceInfo): + """Load a Piper voice model. + + Arguments: + - voiceInfo: PiperVoiceInfo for the voice to load + """ + msg = f'PIPER: Loading voice {voiceInfo.displayName}' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + self._voice = PiperVoice.load(str(voiceInfo.modelPath)) + self._voiceInfo = voiceInfo + detectedRate = self._getVoiceSampleRate(self._voice, voiceInfo) + if detectedRate and detectedRate != self._voiceInfo.sampleRate: + msg = ( + f'PIPER: Using detected sample rate {detectedRate} ' + f'for {voiceInfo.displayName}' + ) + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._voiceInfo.sampleRate = detectedRate + + if self._audioPlayer: + self._audioPlayer.setSampleRate(voiceInfo.sampleRate) + + def _getVoiceSampleRate(self, voice, voiceInfo): + if voice is None: + return voiceInfo.sampleRate if voiceInfo else None + + for attr in ("sample_rate", "sampleRate"): + value = getattr(voice, attr, None) + if isinstance(value, (int, float)) and value > 0: + return int(value) + + config = getattr(voice, "config", None) + sampleRate = None + if isinstance(config, dict): + audio = config.get("audio") + if isinstance(audio, dict): + sampleRate = audio.get("sample_rate") + elif audio is not None and hasattr(audio, "sample_rate"): + sampleRate = getattr(audio, "sample_rate", None) + if sampleRate is None: + sampleRate = config.get("sample_rate") + else: + if hasattr(config, "audio"): + audio = getattr(config, "audio") + if isinstance(audio, dict): + sampleRate = audio.get("sample_rate") + elif audio is not None and hasattr(audio, "sample_rate"): + sampleRate = getattr(audio, "sample_rate", None) + if sampleRate is None and hasattr(config, "sample_rate"): + sampleRate = getattr(config, "sample_rate", None) + + try: + sampleRate = int(sampleRate) if sampleRate is not None else None + except (TypeError, ValueError): + sampleRate = None + + if sampleRate and sampleRate > 0: + return sampleRate + + return voiceInfo.sampleRate if voiceInfo else None + + def _mapRate(self, acssRate): + """Map ACSS rate (0-99) to Piper length_scale. + + ACSS rate 50 (default) = length_scale 1.0 + Higher ACSS rate = lower length_scale (faster) + Lower ACSS rate = higher length_scale (slower) + + Arguments: + - acssRate: Rate value from 0-99 + """ + rate = acssRate if acssRate is not None else 50 + rate = max(0, min(99, rate)) + lengthScale = 2.0 - (rate / 99.0) * 1.5 + return max(0.5, min(2.0, lengthScale)) + + def _mapPitch(self, acssPitch): + """Map ACSS pitch (0-9) to pitch adjustment factor. + + Note: Piper's native pitch control is limited. + This maps to a factor that could be used for post-processing. + + Arguments: + - acssPitch: Pitch value from 0-9 + """ + pitch = acssPitch if acssPitch is not None else 5.0 + pitch = max(0, min(9, pitch)) + return pitch + + def _mapVolume(self, acssGain): + """Map ACSS gain (0-9) to volume (0.0-1.0). + + Arguments: + - acssGain: Gain value from 0-9 + """ + gain = acssGain if acssGain is not None else 10 + gain = max(0, min(10, gain)) + return gain / 10.0 + + def _setRate(self, acssRate): + """Set the speech rate. + + Arguments: + - acssRate: ACSS rate value (0-99) + """ + self._rate = acssRate if acssRate is not None else 50 + msg = f'PIPER: Rate set to {self._rate}' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def _setPitch(self, acssPitch): + """Set the speech pitch. + + Arguments: + - acssPitch: ACSS pitch value (0-9) + """ + self._pitch = acssPitch if acssPitch is not None else 5.0 + msg = f'PIPER: Pitch set to {self._pitch}' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def _setVolume(self, acssGain): + """Set the speech volume. + + Arguments: + - acssGain: ACSS gain value (0-9) + """ + self._volume = self._mapVolume(acssGain) + if self._audioPlayer: + self._audioPlayer.setVolume(self._volume) + msg = f'PIPER: Volume set to {self._volume}' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def _setFamily(self, acssFamily): + """Set the voice family. + + Arguments: + - acssFamily: ACSS family dict with voice info + """ + if not acssFamily: + return + + name = acssFamily.get(speechserver.VoiceFamily.NAME) + if not name: + return + + for voice in self._voiceManager.getVoices(): + if voice.displayName == name: + if voice != self._voiceInfo: + try: + self._loadVoice(voice) + except Exception as e: + msg = f'PIPER: Failed to load voice {name}: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + break + + def _applyAcss(self, acss): + """Apply ACSS voice settings. + + Arguments: + - acss: ACSS settings to apply + """ + if acss is None: + acss = settings.voices[settings.DEFAULT_VOICE] + + with self._lock: + current = self._currentVoiceProperties + for acssProperty, method in self._acssManipulators: + value = acss.get(acssProperty) + if value is not None: + if current.get(acssProperty) != value: + method(value) + current[acssProperty] = value + elif acssProperty == ACSS.AVERAGE_PITCH: + method(5.0) + current[acssProperty] = 5.0 + elif acssProperty == ACSS.GAIN: + method(10) + current[acssProperty] = 10 + elif acssProperty == ACSS.RATE: + method(50) + current[acssProperty] = 50 + + def _synthesize(self, text): + """Synthesize text to audio data. + + Arguments: + - text: Text to synthesize + + Returns raw PCM audio data as bytes. + """ + if not self._voice or not text: + return None + + lengthScale = self._mapRate(self._rate) + + if _PiperSynthesisConfig is not None: + try: + synConfig = _PiperSynthesisConfig(length_scale=lengthScale) + audioChunks = self._voice.synthesize(text, syn_config=synConfig) + audioParts = [] + sampleRate = None + for chunk in audioChunks: + if sampleRate is None: + sampleRate = chunk.sample_rate + audioParts.append(chunk.audio_int16_bytes) + if sampleRate and sampleRate != self._voiceInfo.sampleRate: + self._voiceInfo.sampleRate = sampleRate + if self._audioPlayer: + self._audioPlayer.setSampleRate(sampleRate) + audioData = b"".join(audioParts) + return audioData if audioData else None + except TypeError: + pass + + wavBuffer = io.BytesIO() + with wave.open(wavBuffer, 'wb') as wavFile: + wavFile.setnchannels(1) + wavFile.setsampwidth(2) + wavFile.setframerate(self._voiceInfo.sampleRate) + + self._voice.synthesize( + text, + wavFile, + length_scale=lengthScale + ) + + wavBuffer.seek(44) + return wavBuffer.read() + + def _synthesizeAndPlay(self, text, acss, completionCallback=None, generation=0): + """Synthesize and play text (runs in worker thread). + + Arguments: + - text: Text to synthesize + - acss: ACSS settings + - completionCallback: Optional callback when complete + """ + try: + if self._stopEvent.is_set() or generation != self._speakGeneration: + return + + self._applyAcss(acss) + + audioData = self._synthesize(text) + if not audioData or self._stopEvent.is_set() or generation != self._speakGeneration: + return + + if self._audioPlayer: + self._audioPlayer.play(audioData, False) + while self._audioPlayer.isPlaying(): + if self._stopEvent.is_set() or generation != self._speakGeneration: + self._audioPlayer.stop() + return + time.sleep(0.01) + + if completionCallback and not self._stopEvent.is_set() and generation == self._speakGeneration: + GLib.idle_add(completionCallback) + + except Exception as e: + msg = f'PIPER: Synthesis error: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) + + def getInfo(self): + """Returns [name, id].""" + return [guilabels.PIPER_TTS, self._id] + + def getVoiceFamilies(self): + """Returns a list of VoiceFamily instances.""" + families = [] + for voice in self._voiceManager.getVoices(): + family = self._voiceManager.voiceToVoiceFamily(voice) + families.append(family) + return families + + def speak(self, text=None, acss=None, interrupt=True): + """Speaks the given text. + + Arguments: + - text: Text to speak + - acss: ACSS voice settings + - interrupt: If True, stop any current speech first + """ + if not text or not self._voice: + return + + if interrupt: + self.stop() + + with self._lock: + self._stopEvent.clear() + generation = self._speakGeneration + + msg = f"PIPER: Speaking '{text}'" + debug.printMessage(debug.LEVEL_INFO, msg, True) + + self._currentFuture = self._executor.submit( + self._synthesizeAndPlay, text, acss, None, generation + ) + + def sayAll(self, utteranceIterator, progressCallback): + """Iterates through utterances, speaking each one. + + Arguments: + - utteranceIterator: Iterator yielding [SayAllContext, acss] tuples + - progressCallback: Called with progress updates + """ + GLib.idle_add(self._sayAllWorker, utteranceIterator, progressCallback) + + def _sayAllWorker(self, iterator, callback): + """Process one utterance at a time (called via GLib.idle_add). + + Arguments: + - iterator: Utterance iterator + - callback: Progress callback + """ + try: + context, acss = next(iterator) + except StopIteration: + return False + + def onComplete(): + context.currentOffset = context.endOffset + callback(context.copy(), speechserver.SayAllContext.COMPLETED) + GLib.idle_add(self._sayAllWorker, iterator, callback) + + context.currentOffset = context.startOffset + callback(context.copy(), speechserver.SayAllContext.PROGRESS) + + with self._lock: + self._stopEvent.clear() + generation = self._speakGeneration + self._currentFuture = self._executor.submit( + self._synthesizeAndPlay, context.utterance, acss, onComplete, generation + ) + + return False + + def speakCharacter(self, character, acss=None): + """Speaks a single character immediately. + + Arguments: + - character: Character to speak + - acss: ACSS voice settings + """ + name = chnames.getCharacterName(character) + if name and name != character: + if cthulhu_state.activeScript: + name = cthulhu_state.activeScript.utilities.adjustForPronunciation(name) + self.speak(name, acss) + else: + self.speak(character, acss) + + def speakKeyEvent(self, event, acss=None): + """Speaks a key event immediately. + + Arguments: + - event: The KeyboardEvent to speak + - acss: ACSS voice settings + """ + eventString = event.getKeyName() + lockingStateString = event.getLockingStateString() + eventString = f"{eventString} {lockingStateString}".strip() + self.speak(eventString, acss) + + def _changeDefaultSpeechRate(self, step, decrease=False): + """Change the default speech rate. + + Arguments: + - step: Amount to change + - decrease: If True, decrease rate; otherwise increase + """ + acss = settings.voices[settings.DEFAULT_VOICE] + delta = step * (-1 if decrease else 1) + try: + rate = acss[ACSS.RATE] + except KeyError: + rate = 50 + acss[ACSS.RATE] = max(0, min(99, rate + delta)) + msg = f"PIPER: Rate set to {acss[ACSS.RATE]}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + self.speak( + messages.SPEECH_SLOWER if decrease else messages.SPEECH_FASTER, + acss=acss + ) + + def _changeDefaultSpeechPitch(self, step, decrease=False): + """Change the default speech pitch. + + Arguments: + - step: Amount to change + - decrease: If True, decrease pitch; otherwise increase + """ + acss = settings.voices[settings.DEFAULT_VOICE] + delta = step * (-1 if decrease else 1) + try: + pitch = acss[ACSS.AVERAGE_PITCH] + except KeyError: + pitch = 5 + acss[ACSS.AVERAGE_PITCH] = max(0, min(9, pitch + delta)) + msg = f"PIPER: Pitch set to {acss[ACSS.AVERAGE_PITCH]}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + self.speak( + messages.SPEECH_LOWER if decrease else messages.SPEECH_HIGHER, + acss=acss + ) + + def _changeDefaultSpeechVolume(self, step, decrease=False): + """Change the default speech volume. + + Arguments: + - step: Amount to change + - decrease: If True, decrease volume; otherwise increase + """ + acss = settings.voices[settings.DEFAULT_VOICE] + delta = step * (-1 if decrease else 1) + try: + volume = acss[ACSS.GAIN] + except KeyError: + volume = 10 + acss[ACSS.GAIN] = max(0, min(9, volume + delta)) + msg = f"PIPER: Volume set to {acss[ACSS.GAIN]}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + self.speak( + messages.SPEECH_SOFTER if decrease else messages.SPEECH_LOUDER, + acss=acss + ) + + def increaseSpeechRate(self, step=5): + """Increases the speech rate.""" + self._changeDefaultSpeechRate(step) + + def decreaseSpeechRate(self, step=5): + """Decreases the speech rate.""" + self._changeDefaultSpeechRate(step, decrease=True) + + def increaseSpeechPitch(self, step=0.5): + """Increases the speech pitch.""" + self._changeDefaultSpeechPitch(step) + + def decreaseSpeechPitch(self, step=0.5): + """Decreases the speech pitch.""" + self._changeDefaultSpeechPitch(step, decrease=True) + + def increaseSpeechVolume(self, step=0.5): + """Increases the speech volume.""" + self._changeDefaultSpeechVolume(step) + + def decreaseSpeechVolume(self, step=0.5): + """Decreases the speech volume.""" + self._changeDefaultSpeechVolume(step, decrease=True) + + def updateCapitalizationStyle(self): + """Updates the capitalization style used by the speech server.""" + pass + + def updatePunctuationLevel(self): + """Punctuation level changed, inform this speechServer.""" + pass + + def stop(self): + """Stops ongoing speech and flushes the queue.""" + with self._lock: + self._speakGeneration += 1 + self._stopEvent.set() + + if self._currentFuture: + self._currentFuture.cancel() + self._currentFuture = None + + if self._audioPlayer: + self._audioPlayer.stop() + + def shutdown(self): + """Shuts down the speech engine.""" + self.stop() + + if self._executor: + self._executor.shutdown(wait=False) + self._executor = None + + if self._audioPlayer: + self._audioPlayer.shutdown() + self._audioPlayer = None + + self._voice = None + self._voiceInfo = None + + if self._id in SpeechServer._active_servers: + del SpeechServer._active_servers[self._id] + + msg = 'PIPER: Server shutdown complete' + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def reset(self, text=None, acss=None): + """Resets the speech engine.""" + self.stop() + if self._voiceInfo: + try: + self._loadVoice(self._voiceInfo) + except Exception as e: + msg = f'PIPER: Failed to reset voice: {e}' + debug.printMessage(debug.LEVEL_WARNING, msg, True) diff --git a/src/cthulhu/settings.py b/src/cthulhu/settings.py index f0ae224..082d8ba 100644 --- a/src/cthulhu/settings.py +++ b/src/cthulhu/settings.py @@ -274,7 +274,7 @@ activeProfile = ['Default', 'default'] profile = ['Default', 'default'] # Speech -speechFactoryModules = ["speechdispatcherfactory"] +speechFactoryModules = ["speechdispatcherfactory", "piperfactory"] speechServerFactory = "speechdispatcherfactory" speechServerInfo = None # None means let the factory decide. enableSpeech = True diff --git a/src/cthulhu/speech.py b/src/cthulhu/speech.py index 85abe8e..44aa89b 100644 --- a/src/cthulhu/speech.py +++ b/src/cthulhu/speech.py @@ -93,9 +93,9 @@ def init(): debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Already initialized', True) return + chosenModuleName = settings.speechServerFactory try: - moduleName = settings.speechServerFactory - _initSpeechServer(moduleName, settings.speechServerInfo) + _initSpeechServer(chosenModuleName, settings.speechServerInfo) except Exception: moduleNames = settings.speechFactoryModules for moduleName in moduleNames: @@ -103,12 +103,21 @@ def init(): try: _initSpeechServer(moduleName, None) if _speechserver: + chosenModuleName = moduleName break except Exception: debug.printException(debug.LEVEL_SEVERE) if _speechserver: - tokens = ["SPEECH: Using speech server factory:", moduleName] + if chosenModuleName != settings.speechServerFactory: + settings.speechServerFactory = chosenModuleName + settings.speechServerInfo = None + tokens = [ + "SPEECH: Falling back to speech server factory:", + chosenModuleName + ] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + tokens = ["SPEECH: Using speech server factory:", chosenModuleName] debug.printTokens(debug.LEVEL_INFO, tokens, True) else: msg = 'SPEECH: Not available' @@ -266,6 +275,7 @@ def speak(content, acss=None, interrupt=True): if not isinstance(content, list): return + shouldInterrupt = interrupt toSpeak = [] activeVoice = acss if acss is not None: @@ -275,14 +285,16 @@ def speak(content, acss=None, interrupt=True): if not isinstance(element, validTypes): debug.printMessage(debug.LEVEL_INFO, error % element, True) elif isinstance(element, list): - speak(element, acss, interrupt) + speak(element, acss, shouldInterrupt) + shouldInterrupt = False elif isinstance(element, str): if len(element): toSpeak.append(element) elif isinstance(element, Icon): if toSpeak: string = " ".join(toSpeak) - _speak(string, activeVoice, interrupt) + _speak(string, activeVoice, shouldInterrupt) + shouldInterrupt = False toSpeak = [] if element.isValid(): player = sound.getPlayer() @@ -305,13 +317,14 @@ def speak(content, acss=None, interrupt=True): if toSpeak: string = " ".join(toSpeak) - _speak(string, activeVoice, interrupt) + _speak(string, activeVoice, shouldInterrupt) + shouldInterrupt = False activeVoice = newVoice toSpeak = newItemsToSpeak if toSpeak: string = " ".join(toSpeak) - _speak(string, activeVoice, interrupt) + _speak(string, activeVoice, shouldInterrupt) def speakKeyEvent(event, acss=None): """Speaks a key event immediately. diff --git a/src/cthulhu/speech_and_verbosity_manager.py b/src/cthulhu/speech_and_verbosity_manager.py index 0b28914..2a54002 100644 --- a/src/cthulhu/speech_and_verbosity_manager.py +++ b/src/cthulhu/speech_and_verbosity_manager.py @@ -346,11 +346,11 @@ class SpeechAndVerbosityManager: return f"{value:.1f}".rstrip("0").rstrip(".") return str(value) - def _present_message(self, script, message): + def _present_message(self, script, message, voice=None): if script: - script.presentMessage(message) + script.presentMessage(message, voice=voice) else: - speech.speak(message) + speech.speak(message, voice) def _get_default_voice(self): from . import acss @@ -370,7 +370,23 @@ class SpeechAndVerbosityManager: default_voice['established'] = True def _get_current_speech_setting(self): - return self._speech_settings_order[self._current_speech_setting_index] + order = self._get_speech_settings_order() + if not order: + return "" + if self._current_speech_setting_index < 0: + self._current_speech_setting_index = 0 + elif self._current_speech_setting_index >= len(order): + self._current_speech_setting_index = len(order) - 1 + return order[self._current_speech_setting_index] + + def _get_speech_settings_order(self): + order = ["rate", "pitch", "volume"] + server = self._get_server() + if server and hasattr(server, "list_output_modules") and hasattr(server, "getOutputModule"): + order.append("module") + if server and hasattr(server, "getVoiceFamilies"): + order.append("voice") + return order def _get_rate_value(self): from . import acss @@ -410,14 +426,28 @@ class SpeechAndVerbosityManager: default_voice = self._get_default_voice() family = default_voice.get(acss.ACSS.FAMILY, {}) or {} name = family.get(speechserver.VoiceFamily.NAME) + if name and server: + voices = self._get_available_voices(server) + if voices: + for voice in voices: + if voice.get(speechserver.VoiceFamily.NAME) == name: + return name + self._set_default_voice_family(voices[0]) + return voices[0].get(speechserver.VoiceFamily.NAME) if name: return name if server: voices = self._get_available_voices(server) if voices: + self._set_default_voice_family(voices[0]) return voices[0].get(speechserver.VoiceFamily.NAME) return "" + def _get_voice_messages(self, server): + if server and hasattr(server, "list_output_modules"): + return messages.SPEECH_VOICE_VALUE, messages.SPEECH_VOICES_UNAVAILABLE + return messages.SPEECH_VOICE_VALUE_GENERIC, messages.SPEECH_VOICES_UNAVAILABLE_GENERIC + def _get_available_modules(self, server): if server is None or not hasattr(server, 'list_output_modules'): return [] @@ -539,27 +569,37 @@ class SpeechAndVerbosityManager: elif setting == "voice": server = self._get_server() voices = self._get_available_voices(server) + voice_value_message, voice_unavailable_message = self._get_voice_messages(server) if not voices: - message = messages.SPEECH_VOICES_UNAVAILABLE + message = voice_unavailable_message else: name = self._get_current_voice_name(server) - message = messages.SPEECH_VOICE_VALUE % name + message = voice_value_message % name else: message = "" if message: - self._present_message(script, message) + voice = self._get_default_voice() if setting == "voice" else None + self._present_message(script, message, voice=voice) @dbus_service.command def select_previous_speech_setting(self, script=None, event=None): - if self._current_speech_setting_index > 0: + order = self._get_speech_settings_order() + if not order: + return True + if self._current_speech_setting_index >= len(order): + self._current_speech_setting_index = len(order) - 1 + elif self._current_speech_setting_index > 0: self._current_speech_setting_index -= 1 self._announce_current_speech_setting(script) return True @dbus_service.command def select_next_speech_setting(self, script=None, event=None): - if self._current_speech_setting_index < len(self._speech_settings_order) - 1: + order = self._get_speech_settings_order() + if not order: + return True + if self._current_speech_setting_index < len(order) - 1: self._current_speech_setting_index += 1 self._announce_current_speech_setting(script) return True @@ -663,7 +703,8 @@ class SpeechAndVerbosityManager: server = self._get_server() voices = self._get_available_voices(server) if not voices: - self._present_message(script, messages.SPEECH_VOICES_UNAVAILABLE) + _, voice_unavailable_message = self._get_voice_messages(server) + self._present_message(script, voice_unavailable_message) return True current_name = self._get_current_voice_name(server) @@ -680,7 +721,8 @@ class SpeechAndVerbosityManager: name = new_voice.get(speechserver.VoiceFamily.NAME, "") msg = f"SPEECH AND VERBOSITY MANAGER: Voice set to {name}" debug.printMessage(debug.LEVEL_INFO, msg, True) - self._present_message(script, messages.SPEECH_VOICE_VALUE % name) + voice_value_message, _ = self._get_voice_messages(server) + self._present_message(script, voice_value_message % name, voice=self._get_default_voice()) return True @dbus_service.command