Piper-tts added to speech engine options.

This commit is contained in:
Storm Dragon
2026-01-09 16:11:25 -05:00
parent b1b9ffce22
commit 2939526c03
10 changed files with 1490 additions and 20 deletions
+4
View File
@@ -851,6 +851,10 @@ SPEECH_VOICE_TYPE_UPPERCASE = C_("VoiceType", "Uppercase")
# system. (http://devel.freebsoft.org/speechd)
SPEECH_DISPATCHER = _("Speech Dispatcher")
# Translators: This label refers to the Piper neural text-to-speech system.
# (https://github.com/rhasspy/piper)
PIPER_TTS = _("Piper Neural TTS")
# Translators: This is a label for a group of options related to Cthulhu's behavior
# when presenting an application's spell check dialog.
SPELL_CHECK = C_("OptionGroup", "Spell Check")
+19 -1
View File
@@ -37,8 +37,11 @@ import gi
gi.require_version("Atspi", "2.0")
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
gi.require_version("Gio", "2.0")
from gi.repository import Atspi
from gi.repository import Gdk
from gi.repository import Gio
from gi.repository import GLib
from gi.repository import GObject
from gi.repository import Gtk
@@ -305,7 +308,22 @@ class LearnModePresenter:
uri = "help:cthulhu"
if page:
uri += f"/{page}"
Gtk.show_uri(Gdk.Screen.get_default(), uri, Gtk.get_current_event_time())
try:
Gtk.show_uri(Gdk.Screen.get_default(), uri, Gtk.get_current_event_time())
return True
except GLib.GError as error:
msg = f"LEARN MODE PRESENTER: Failed to open help URI {uri}: {error}"
debug.printMessage(debug.LEVEL_WARNING, msg, True)
try:
Gio.AppInfo.launch_default_for_uri(uri, None)
return True
except GLib.GError as error:
msg = f"LEARN MODE PRESENTER: Failed to launch help URI {uri}: {error}"
debug.printMessage(debug.LEVEL_WARNING, msg, True)
if script:
script.presentMessage(messages.HELP_NOT_AVAILABLE)
return True
class CommandListGUI:
+3
View File
@@ -94,6 +94,9 @@ cthulhu_python_sources = files([
'speechdispatcherfactory.py',
'speech_generator.py',
'speechserver.py',
'piperfactory.py',
'piper_voice_manager.py',
'piper_audio_player.py',
'structural_navigation.py',
'text_attribute_names.py',
'translation_context.py',
+9
View File
@@ -1498,6 +1498,9 @@ LINE_UNSELECTED_UP = _("line unselected up from cursor position")
# exiting Learn Mode.
LEARN_MODE_STOP = _("Exiting learn mode.")
# Translators: This message is presented when help cannot be opened.
HELP_NOT_AVAILABLE = _("Help is not available.")
# Translators: when the user selects (highlights) or unselects text in a
# document, Cthulhu will speak information about what they have selected or
# unselected. This message is presented when the user selects from the
@@ -2290,12 +2293,18 @@ SPEECH_MODULE_VALUE = _("Speech-dispatcher module %s")
# Translators: This string announces the current speech-dispatcher voice.
SPEECH_VOICE_VALUE = _("Speech-dispatcher voice %s")
# Translators: This string announces the current voice for non-speech-dispatcher engines.
SPEECH_VOICE_VALUE_GENERIC = _("Voice %s")
# Translators: This string is presented when speech-dispatcher modules are unavailable.
SPEECH_MODULES_UNAVAILABLE = _("No speech-dispatcher modules available")
# Translators: This string is presented when speech-dispatcher voices are unavailable.
SPEECH_VOICES_UNAVAILABLE = _("No speech-dispatcher voices available")
# Translators: This string is presented when voices are unavailable for non-speech-dispatcher engines.
SPEECH_VOICES_UNAVAILABLE_GENERIC = _("No voices available")
# Translators: This string confirms speech settings have been saved.
SPEECH_SETTINGS_SAVED = _("Speech settings saved")
+297
View File
@@ -0,0 +1,297 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
# Cthulhu project: https://git.stormux.org/storm/cthulhu
"""GStreamer-based audio player for Piper TTS synthesis output."""
__id__ = "$Id:$"
__version__ = "$Revision:$"
__date__ = "$Date:$"
__copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import threading
import gi
from gi.repository import GLib
try:
gi.require_version('Gst', '1.0')
from gi.repository import Gst
except Exception:
_gstreamerAvailable = False
else:
_gstreamerAvailable, args = Gst.init_check(None)
from . import debug
class PiperAudioPlayer:
"""GStreamer-based audio player for Piper TTS output.
Handles raw PCM audio data from Piper synthesis and plays it through
a GStreamer pipeline with volume control.
"""
def __init__(self, sampleRate=22050):
"""Initialize the audio player.
Arguments:
- sampleRate: Audio sample rate in Hz (default 22050, common for Piper)
"""
self._sampleRate = sampleRate
self._pipeline = None
self._appsrc = None
self._volume = None
self._initialized = False
self._playing = False
self._lock = threading.Lock()
self._completionCallback = None
if not _gstreamerAvailable:
msg = 'PIPER AUDIO: GStreamer is not available'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return
self._init()
def _init(self):
"""Initialize the GStreamer pipeline."""
if self._initialized:
return True
if not _gstreamerAvailable:
return False
try:
self._pipeline = Gst.Pipeline.new("piper-audio")
self._appsrc = Gst.ElementFactory.make("appsrc", "source")
if self._appsrc is None:
msg = 'PIPER AUDIO: Failed to create appsrc element'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
self._appsrc.set_property("format", Gst.Format.TIME)
self._appsrc.set_property("is-live", False)
self._appsrc.set_property("block", False)
caps = Gst.Caps.from_string(
f"audio/x-raw,format=S16LE,channels=1,"
f"rate={self._sampleRate},layout=interleaved"
)
self._appsrc.set_property("caps", caps)
convert = Gst.ElementFactory.make("audioconvert", "convert")
resample = Gst.ElementFactory.make("audioresample", "resample")
self._volume = Gst.ElementFactory.make("volume", "volume")
if self._volume is None:
msg = 'PIPER AUDIO: Failed to create volume element'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
sink = Gst.ElementFactory.make("autoaudiosink", "sink")
if sink is None:
msg = 'PIPER AUDIO: Failed to create autoaudiosink element'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
for element in [self._appsrc, convert, resample, self._volume, sink]:
self._pipeline.add(element)
if not self._appsrc.link(convert):
msg = 'PIPER AUDIO: Failed to link appsrc to convert'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if not convert.link(resample):
msg = 'PIPER AUDIO: Failed to link convert to resample'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if not resample.link(self._volume):
msg = 'PIPER AUDIO: Failed to link resample to volume'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if not self._volume.link(sink):
msg = 'PIPER AUDIO: Failed to link volume to sink'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._onMessage)
self._initialized = True
msg = 'PIPER AUDIO: Pipeline initialized successfully'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return True
except Exception as e:
msg = f'PIPER AUDIO: Failed to initialize pipeline: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
def _onMessage(self, bus, message):
"""Handle GStreamer bus messages."""
if message.type == Gst.MessageType.EOS:
self._pipeline.set_state(Gst.State.NULL)
with self._lock:
self._playing = False
if self._completionCallback:
GLib.idle_add(self._completionCallback)
self._completionCallback = None
elif message.type == Gst.MessageType.ERROR:
self._pipeline.set_state(Gst.State.NULL)
with self._lock:
self._playing = False
error, info = message.parse_error()
msg = f'PIPER AUDIO ERROR: {error}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
def setSampleRate(self, sampleRate):
"""Update the sample rate for the audio stream.
Arguments:
- sampleRate: New sample rate in Hz
"""
if sampleRate != self._sampleRate:
self._sampleRate = sampleRate
self._initialized = False
self.stop()
self._init()
def setVolume(self, volumeLevel):
"""Set the playback volume.
Arguments:
- volumeLevel: Volume level from 0.0 to 1.0
"""
if self._volume is not None:
volume = max(0.0, min(1.0, volumeLevel))
self._volume.set_property("volume", volume)
def play(self, audioData, interrupt=True, completionCallback=None):
"""Play raw PCM audio data.
Arguments:
- audioData: Raw PCM audio data as bytes (16-bit signed, little-endian)
- interrupt: If True, stop any current playback first
- completionCallback: Optional callback to invoke when playback completes
"""
if not self._initialized:
if not self._init():
msg = 'PIPER AUDIO: Cannot play - not initialized'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if interrupt:
self.stop()
with self._lock:
self._playing = True
self._completionCallback = completionCallback
self._pipeline.set_state(Gst.State.PLAYING)
buf = Gst.Buffer.new_wrapped(audioData)
result = self._appsrc.emit("push-buffer", buf)
if result != Gst.FlowReturn.OK:
msg = f'PIPER AUDIO: Failed to push buffer: {result}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
self._appsrc.emit("end-of-stream")
return True
def playStream(self, audioGenerator, interrupt=True, completionCallback=None):
"""Play audio from a streaming generator.
Arguments:
- audioGenerator: Iterator/generator yielding audio chunks as bytes
- interrupt: If True, stop any current playback first
- completionCallback: Optional callback when playback completes
"""
if not self._initialized:
if not self._init():
msg = 'PIPER AUDIO: Cannot play stream - not initialized'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
if interrupt:
self.stop()
with self._lock:
self._playing = True
stopRequested = False
self._completionCallback = completionCallback
self._pipeline.set_state(Gst.State.PLAYING)
def feedThread():
try:
for audioChunk in audioGenerator:
with self._lock:
if not self._playing:
break
buf = Gst.Buffer.new_wrapped(bytes(audioChunk))
result = self._appsrc.emit("push-buffer", buf)
if result != Gst.FlowReturn.OK:
msg = f'PIPER AUDIO: Stream push failed: {result}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
break
except Exception as e:
msg = f'PIPER AUDIO: Stream feed error: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
finally:
self._appsrc.emit("end-of-stream")
thread = threading.Thread(target=feedThread, daemon=True)
thread.start()
return True
def isPlaying(self):
"""Check if audio is currently playing.
Returns True if playback is in progress.
"""
with self._lock:
return self._playing
def stop(self):
"""Stop any current playback."""
with self._lock:
self._playing = False
if self._pipeline is not None:
self._pipeline.set_state(Gst.State.NULL)
self._completionCallback = None
def shutdown(self):
"""Shut down the audio player and release resources."""
self.stop()
self._initialized = False
if self._pipeline is not None:
self._pipeline.set_state(Gst.State.NULL)
self._pipeline = None
self._appsrc = None
self._volume = None
+343
View File
@@ -0,0 +1,343 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
# Cthulhu project: https://git.stormux.org/storm/cthulhu
"""Voice discovery and management for Piper TTS."""
__id__ = "$Id:$"
__version__ = "$Revision:$"
__date__ = "$Date:$"
__copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import json
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional
from . import debug
from . import speechserver
@dataclass
class PiperVoiceInfo:
"""Metadata for a Piper voice model."""
name: str
language: str
dialect: str
quality: str
modelPath: Path
configPath: Path
sampleRate: int = 22050
speakers: Dict[int, str] = field(default_factory=dict)
@property
def key(self) -> str:
"""Return a unique key for this voice."""
dialectPart = f"-{self.dialect}" if self.dialect else ""
return f"{self.language}{dialectPart}-{self.name}-{self.quality}"
@property
def displayName(self) -> str:
"""Return a human-readable display name."""
dialectPart = f" ({self.dialect})" if self.dialect else ""
return f"{self.name} - {self.language}{dialectPart} [{self.quality}]"
@property
def isMultiSpeaker(self) -> bool:
"""Return True if this is a multi-speaker model."""
return len(self.speakers) > 1
class PiperVoiceManager:
"""Discovers and manages Piper voice models.
Searches standard paths for Piper voice models (.onnx files with
companion .onnx.json config files) and provides methods to list
and load them.
"""
VOICE_SEARCH_PATHS = [
"~/.local/share/piper/voices",
"~/.local/share/piper-tts/voices",
"~/.config/piper/voices",
"$XDG_DATA_HOME/piper/voices",
"$XDG_DATA_HOME/piper-tts/voices",
"$XDG_DATA_HOME/cthulhu/piper-voices",
"/usr/share/piper/voices",
"/usr/share/piper-voices",
"/usr/share/piper-tts/voices",
"/usr/local/share/piper/voices",
"/usr/local/share/piper-tts/voices",
]
VOICE_FILENAME_PATTERN = re.compile(
r'^(?P<lang>[a-z]{2})(?:_(?P<dialect>[A-Z]{2}))?'
r'-(?P<name>[a-zA-Z0-9_]+)'
r'-(?P<quality>low|medium|high|x_low)\.onnx$'
)
def __init__(self, customPath=None):
"""Initialize the voice manager.
Arguments:
- customPath: Optional additional path to search for voices
"""
self._customPath = customPath
self._voices = []
self._voiceCache = {}
def discoverVoices(self) -> List[PiperVoiceInfo]:
"""Discover all available Piper voices.
Searches standard paths and returns a list of PiperVoiceInfo
objects for each valid voice found.
"""
self._voices = []
searchPaths = self._getSearchPaths()
for searchPath in searchPaths:
path = Path(os.path.expandvars(os.path.expanduser(searchPath)))
if not path.exists():
continue
msg = f'PIPER VOICES: Searching {path}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
for onnxFile in path.rglob("*.onnx"):
configFile = Path(str(onnxFile) + ".json")
if not configFile.exists():
configFile = onnxFile.with_suffix(".onnx.json")
if configFile.exists():
try:
voice = self._parseVoice(onnxFile, configFile)
if voice:
self._voices.append(voice)
msg = f'PIPER VOICES: Found voice {voice.displayName}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
except Exception as e:
msg = f'PIPER VOICES: Failed to parse {onnxFile}: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
else:
voice = self._parseVoiceFromFilename(onnxFile)
if voice:
self._voices.append(voice)
msg = f'PIPER VOICES: Found voice {voice.displayName} (no config)'
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._voices.sort(key=lambda v: (v.language, v.name, v.quality))
msg = f'PIPER VOICES: Discovered {len(self._voices)} voice(s)'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return self._voices
def getVoices(self) -> List[PiperVoiceInfo]:
"""Get the list of discovered voices.
Returns cached list; call discoverVoices() first to refresh.
"""
if not self._voices:
self.discoverVoices()
return self._voices
def getVoiceByKey(self, key: str) -> Optional[PiperVoiceInfo]:
"""Get a voice by its unique key.
Arguments:
- key: Voice key (e.g., "en_US-lessac-medium")
"""
for voice in self.getVoices():
if voice.key == key:
return voice
return None
def getVoiceByName(self, name: str) -> Optional[PiperVoiceInfo]:
"""Get a voice by name (first match).
Arguments:
- name: Voice name (e.g., "lessac")
"""
for voice in self.getVoices():
if voice.name == name:
return voice
return None
def getVoicesForLanguage(self, lang: str) -> List[PiperVoiceInfo]:
"""Get all voices for a specific language.
Arguments:
- lang: Language code (e.g., "en")
"""
return [v for v in self.getVoices() if v.language == lang]
def voiceToVoiceFamily(self, voiceInfo: PiperVoiceInfo) -> speechserver.VoiceFamily:
"""Convert a PiperVoiceInfo to a Cthulhu VoiceFamily.
Arguments:
- voiceInfo: The PiperVoiceInfo to convert
"""
return speechserver.VoiceFamily({
speechserver.VoiceFamily.NAME: voiceInfo.displayName,
speechserver.VoiceFamily.LANG: voiceInfo.language,
speechserver.VoiceFamily.DIALECT: voiceInfo.dialect or "",
speechserver.VoiceFamily.VARIANT: voiceInfo.quality,
})
def _getSearchPaths(self) -> List[str]:
"""Get the list of paths to search for voices."""
paths = list(self.VOICE_SEARCH_PATHS)
if self._customPath:
paths.insert(0, self._customPath)
xdgDataHome = os.environ.get("XDG_DATA_HOME", "~/.local/share")
paths = [p.replace("$XDG_DATA_HOME", xdgDataHome) for p in paths]
return paths
def _parseVoice(self, modelPath: Path, configPath: Path) -> Optional[PiperVoiceInfo]:
"""Parse a voice from its model and config files.
Arguments:
- modelPath: Path to the .onnx model file
- configPath: Path to the .json config file
"""
try:
with open(configPath, 'r', encoding='utf-8') as f:
config = json.load(f)
except (json.JSONDecodeError, IOError) as e:
msg = f'PIPER VOICES: Failed to read config {configPath}: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return None
language = ""
dialect = ""
name = modelPath.stem
quality = "medium"
sampleRate = 22050
speakers = {}
if "language" in config:
langInfo = config["language"]
if isinstance(langInfo, dict):
language = langInfo.get("code", "")
if "_" in language:
parts = language.split("_")
language = parts[0]
dialect = parts[1] if len(parts) > 1 else ""
elif isinstance(langInfo, str):
if "_" in langInfo:
parts = langInfo.split("_")
language = parts[0]
dialect = parts[1] if len(parts) > 1 else ""
else:
language = langInfo
if "audio" in config:
sampleRate = config["audio"].get("sample_rate", 22050)
if "speaker_id_map" in config:
speakers = {v: k for k, v in config["speaker_id_map"].items()}
match = self.VOICE_FILENAME_PATTERN.match(modelPath.name)
if match:
if not language:
language = match.group("lang")
if not dialect:
dialect = match.group("dialect") or ""
name = match.group("name")
quality = match.group("quality").replace("x_low", "x-low")
else:
stem = modelPath.stem
for q in ["low", "medium", "high", "x_low", "x-low"]:
if stem.endswith(f"-{q}"):
quality = q.replace("x_low", "x-low")
stem = stem[:-len(q)-1]
break
parts = stem.split("-")
if parts:
langPart = parts[0]
if "_" in langPart:
langParts = langPart.split("_")
if not language:
language = langParts[0]
if not dialect:
dialect = langParts[1] if len(langParts) > 1 else ""
elif not language:
language = langPart
if len(parts) > 1:
name = parts[1]
if not language:
language = "unknown"
return PiperVoiceInfo(
name=name,
language=language,
dialect=dialect,
quality=quality,
modelPath=modelPath,
configPath=configPath,
sampleRate=sampleRate,
speakers=speakers
)
def _parseVoiceFromFilename(self, modelPath: Path) -> Optional[PiperVoiceInfo]:
"""Parse voice info from filename only (no config file).
Arguments:
- modelPath: Path to the .onnx model file
"""
match = self.VOICE_FILENAME_PATTERN.match(modelPath.name)
if not match:
return None
return PiperVoiceInfo(
name=match.group("name"),
language=match.group("lang"),
dialect=match.group("dialect") or "",
quality=match.group("quality").replace("x_low", "x-low"),
modelPath=modelPath,
configPath=modelPath.with_suffix(".onnx.json"),
sampleRate=22050,
speakers={}
)
_manager = None
def getManager(customPath=None) -> PiperVoiceManager:
"""Get the singleton voice manager instance.
Arguments:
- customPath: Optional custom path to search for voices
"""
global _manager
if _manager is None:
_manager = PiperVoiceManager(customPath)
return _manager
+741
View File
@@ -0,0 +1,741 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
# Cthulhu project: https://git.stormux.org/storm/cthulhu
"""Provides a Cthulhu speech server for Piper TTS backend."""
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
__copyright__ = "Copyright (c) 2024 Stormux"
__license__ = "LGPL"
import io
import os
import sys
import threading
import time
import wave
from concurrent.futures import ThreadPoolExecutor
from gi.repository import GLib
from . import chnames
from . import debug
from . import guilabels
from . import messages
from . import settings
from . import speechserver
from . import cthulhu_state
from .acss import ACSS
from . import piper_voice_manager
from . import piper_audio_player
def _getPipxSitePackages():
pipxHome = os.environ.get("PIPX_HOME", os.path.expanduser("~/.local/pipx"))
pythonVersion = f"python{sys.version_info.major}.{sys.version_info.minor}"
sitePackages = os.path.join(
pipxHome,
"venvs",
"piper-tts",
"lib",
pythonVersion,
"site-packages"
)
if os.path.isdir(sitePackages):
return sitePackages
return None
def _tryImportPiper():
try:
from piper.voice import PiperVoice
return PiperVoice
except ImportError:
pass
sitePackages = _getPipxSitePackages()
if not sitePackages:
return None
addedPath = False
if sitePackages not in sys.path:
sys.path.insert(0, sitePackages)
addedPath = True
try:
from piper.voice import PiperVoice
msg = f'PIPER: Loaded piper-tts from pipx venv ({sitePackages})'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return PiperVoice
except ImportError:
if addedPath:
try:
sys.path.remove(sitePackages)
except ValueError:
pass
return None
PiperVoice = _tryImportPiper()
_piperAvailable = PiperVoice is not None
if _piperAvailable:
try:
from piper.config import SynthesisConfig as _PiperSynthesisConfig
except Exception:
_PiperSynthesisConfig = None
else:
_PiperSynthesisConfig = None
if not _piperAvailable:
msg = 'PIPER: piper-tts library not available'
debug.printMessage(debug.LEVEL_INFO, msg, True)
class SpeechServer(speechserver.SpeechServer):
"""Piper TTS speech server implementation.
Provides speech synthesis using Piper neural text-to-speech,
implementing the Cthulhu SpeechServer interface.
"""
_active_servers = {}
DEFAULT_SERVER_ID = 'piper-default'
@staticmethod
def getFactoryName():
"""Returns a localized name describing this factory."""
return guilabels.PIPER_TTS
@staticmethod
def getSpeechServers():
"""Gets available speech servers as a list."""
servers = []
if not _piperAvailable:
msg = 'PIPER: Cannot list servers - piper-tts not installed'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return servers
manager = piper_voice_manager.getManager()
voices = manager.discoverVoices()
if not voices:
msg = 'PIPER: No voice models found'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return servers
server = SpeechServer._getSpeechServer(SpeechServer.DEFAULT_SERVER_ID)
if server is not None:
servers.append(server)
return servers
@classmethod
def _getSpeechServer(cls, serverId):
"""Return an active server for given id.
Attempt to create the server if it doesn't exist yet.
Returns None when it is not possible to create the server.
"""
if serverId not in cls._active_servers:
cls(serverId)
return cls._active_servers.get(serverId)
@staticmethod
def getSpeechServer(info=None):
"""Gets a given SpeechServer based upon the info."""
thisId = info[1] if info is not None else SpeechServer.DEFAULT_SERVER_ID
return SpeechServer._getSpeechServer(thisId)
@staticmethod
def shutdownActiveServers():
"""Cleans up and shuts down this factory."""
servers = list(SpeechServer._active_servers.values())
for server in servers:
server.shutdown()
def __init__(self, serverId):
"""Initialize the Piper speech server.
Arguments:
- serverId: Identifier for this server instance
"""
super(SpeechServer, self).__init__()
self._id = serverId
self._voice = None
self._voiceInfo = None
self._voiceManager = piper_voice_manager.getManager()
self._audioPlayer = None
self._executor = None
self._currentFuture = None
self._stopEvent = threading.Event()
self._lock = threading.Lock()
self._speakGeneration = 0
self._currentVoiceProperties = {}
self._acssManipulators = (
(ACSS.RATE, self._setRate),
(ACSS.AVERAGE_PITCH, self._setPitch),
(ACSS.GAIN, self._setVolume),
(ACSS.FAMILY, self._setFamily),
)
self._rate = 50
self._pitch = 5.0
self._volume = 1.0
if not _piperAvailable:
msg = 'PIPER: piper-tts library not available'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return
try:
self._init()
except Exception as e:
debug.printException(debug.LEVEL_WARNING)
msg = f'PIPER: Failed to initialize server: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
else:
SpeechServer._active_servers[serverId] = self
def _init(self):
"""Initialize the speech server components."""
voices = self._voiceManager.getVoices()
if not voices:
raise RuntimeError("No Piper voices found")
self._voiceInfo = voices[0]
self._loadVoice(self._voiceInfo)
self._audioPlayer = piper_audio_player.PiperAudioPlayer(
self._voiceInfo.sampleRate
)
self._executor = ThreadPoolExecutor(
max_workers=1,
thread_name_prefix="piper"
)
msg = f'PIPER: Initialized with voice {self._voiceInfo.displayName}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _loadVoice(self, voiceInfo):
"""Load a Piper voice model.
Arguments:
- voiceInfo: PiperVoiceInfo for the voice to load
"""
msg = f'PIPER: Loading voice {voiceInfo.displayName}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._voice = PiperVoice.load(str(voiceInfo.modelPath))
self._voiceInfo = voiceInfo
detectedRate = self._getVoiceSampleRate(self._voice, voiceInfo)
if detectedRate and detectedRate != self._voiceInfo.sampleRate:
msg = (
f'PIPER: Using detected sample rate {detectedRate} '
f'for {voiceInfo.displayName}'
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._voiceInfo.sampleRate = detectedRate
if self._audioPlayer:
self._audioPlayer.setSampleRate(voiceInfo.sampleRate)
def _getVoiceSampleRate(self, voice, voiceInfo):
if voice is None:
return voiceInfo.sampleRate if voiceInfo else None
for attr in ("sample_rate", "sampleRate"):
value = getattr(voice, attr, None)
if isinstance(value, (int, float)) and value > 0:
return int(value)
config = getattr(voice, "config", None)
sampleRate = None
if isinstance(config, dict):
audio = config.get("audio")
if isinstance(audio, dict):
sampleRate = audio.get("sample_rate")
elif audio is not None and hasattr(audio, "sample_rate"):
sampleRate = getattr(audio, "sample_rate", None)
if sampleRate is None:
sampleRate = config.get("sample_rate")
else:
if hasattr(config, "audio"):
audio = getattr(config, "audio")
if isinstance(audio, dict):
sampleRate = audio.get("sample_rate")
elif audio is not None and hasattr(audio, "sample_rate"):
sampleRate = getattr(audio, "sample_rate", None)
if sampleRate is None and hasattr(config, "sample_rate"):
sampleRate = getattr(config, "sample_rate", None)
try:
sampleRate = int(sampleRate) if sampleRate is not None else None
except (TypeError, ValueError):
sampleRate = None
if sampleRate and sampleRate > 0:
return sampleRate
return voiceInfo.sampleRate if voiceInfo else None
def _mapRate(self, acssRate):
"""Map ACSS rate (0-99) to Piper length_scale.
ACSS rate 50 (default) = length_scale 1.0
Higher ACSS rate = lower length_scale (faster)
Lower ACSS rate = higher length_scale (slower)
Arguments:
- acssRate: Rate value from 0-99
"""
rate = acssRate if acssRate is not None else 50
rate = max(0, min(99, rate))
lengthScale = 2.0 - (rate / 99.0) * 1.5
return max(0.5, min(2.0, lengthScale))
def _mapPitch(self, acssPitch):
"""Map ACSS pitch (0-9) to pitch adjustment factor.
Note: Piper's native pitch control is limited.
This maps to a factor that could be used for post-processing.
Arguments:
- acssPitch: Pitch value from 0-9
"""
pitch = acssPitch if acssPitch is not None else 5.0
pitch = max(0, min(9, pitch))
return pitch
def _mapVolume(self, acssGain):
"""Map ACSS gain (0-9) to volume (0.0-1.0).
Arguments:
- acssGain: Gain value from 0-9
"""
gain = acssGain if acssGain is not None else 10
gain = max(0, min(10, gain))
return gain / 10.0
def _setRate(self, acssRate):
"""Set the speech rate.
Arguments:
- acssRate: ACSS rate value (0-99)
"""
self._rate = acssRate if acssRate is not None else 50
msg = f'PIPER: Rate set to {self._rate}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setPitch(self, acssPitch):
"""Set the speech pitch.
Arguments:
- acssPitch: ACSS pitch value (0-9)
"""
self._pitch = acssPitch if acssPitch is not None else 5.0
msg = f'PIPER: Pitch set to {self._pitch}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setVolume(self, acssGain):
"""Set the speech volume.
Arguments:
- acssGain: ACSS gain value (0-9)
"""
self._volume = self._mapVolume(acssGain)
if self._audioPlayer:
self._audioPlayer.setVolume(self._volume)
msg = f'PIPER: Volume set to {self._volume}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setFamily(self, acssFamily):
"""Set the voice family.
Arguments:
- acssFamily: ACSS family dict with voice info
"""
if not acssFamily:
return
name = acssFamily.get(speechserver.VoiceFamily.NAME)
if not name:
return
for voice in self._voiceManager.getVoices():
if voice.displayName == name:
if voice != self._voiceInfo:
try:
self._loadVoice(voice)
except Exception as e:
msg = f'PIPER: Failed to load voice {name}: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
break
def _applyAcss(self, acss):
"""Apply ACSS voice settings.
Arguments:
- acss: ACSS settings to apply
"""
if acss is None:
acss = settings.voices[settings.DEFAULT_VOICE]
with self._lock:
current = self._currentVoiceProperties
for acssProperty, method in self._acssManipulators:
value = acss.get(acssProperty)
if value is not None:
if current.get(acssProperty) != value:
method(value)
current[acssProperty] = value
elif acssProperty == ACSS.AVERAGE_PITCH:
method(5.0)
current[acssProperty] = 5.0
elif acssProperty == ACSS.GAIN:
method(10)
current[acssProperty] = 10
elif acssProperty == ACSS.RATE:
method(50)
current[acssProperty] = 50
def _synthesize(self, text):
"""Synthesize text to audio data.
Arguments:
- text: Text to synthesize
Returns raw PCM audio data as bytes.
"""
if not self._voice or not text:
return None
lengthScale = self._mapRate(self._rate)
if _PiperSynthesisConfig is not None:
try:
synConfig = _PiperSynthesisConfig(length_scale=lengthScale)
audioChunks = self._voice.synthesize(text, syn_config=synConfig)
audioParts = []
sampleRate = None
for chunk in audioChunks:
if sampleRate is None:
sampleRate = chunk.sample_rate
audioParts.append(chunk.audio_int16_bytes)
if sampleRate and sampleRate != self._voiceInfo.sampleRate:
self._voiceInfo.sampleRate = sampleRate
if self._audioPlayer:
self._audioPlayer.setSampleRate(sampleRate)
audioData = b"".join(audioParts)
return audioData if audioData else None
except TypeError:
pass
wavBuffer = io.BytesIO()
with wave.open(wavBuffer, 'wb') as wavFile:
wavFile.setnchannels(1)
wavFile.setsampwidth(2)
wavFile.setframerate(self._voiceInfo.sampleRate)
self._voice.synthesize(
text,
wavFile,
length_scale=lengthScale
)
wavBuffer.seek(44)
return wavBuffer.read()
def _synthesizeAndPlay(self, text, acss, completionCallback=None, generation=0):
"""Synthesize and play text (runs in worker thread).
Arguments:
- text: Text to synthesize
- acss: ACSS settings
- completionCallback: Optional callback when complete
"""
try:
if self._stopEvent.is_set() or generation != self._speakGeneration:
return
self._applyAcss(acss)
audioData = self._synthesize(text)
if not audioData or self._stopEvent.is_set() or generation != self._speakGeneration:
return
if self._audioPlayer:
self._audioPlayer.play(audioData, False)
while self._audioPlayer.isPlaying():
if self._stopEvent.is_set() or generation != self._speakGeneration:
self._audioPlayer.stop()
return
time.sleep(0.01)
if completionCallback and not self._stopEvent.is_set() and generation == self._speakGeneration:
GLib.idle_add(completionCallback)
except Exception as e:
msg = f'PIPER: Synthesis error: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
def getInfo(self):
"""Returns [name, id]."""
return [guilabels.PIPER_TTS, self._id]
def getVoiceFamilies(self):
"""Returns a list of VoiceFamily instances."""
families = []
for voice in self._voiceManager.getVoices():
family = self._voiceManager.voiceToVoiceFamily(voice)
families.append(family)
return families
def speak(self, text=None, acss=None, interrupt=True):
"""Speaks the given text.
Arguments:
- text: Text to speak
- acss: ACSS voice settings
- interrupt: If True, stop any current speech first
"""
if not text or not self._voice:
return
if interrupt:
self.stop()
with self._lock:
self._stopEvent.clear()
generation = self._speakGeneration
msg = f"PIPER: Speaking '{text}'"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._currentFuture = self._executor.submit(
self._synthesizeAndPlay, text, acss, None, generation
)
def sayAll(self, utteranceIterator, progressCallback):
"""Iterates through utterances, speaking each one.
Arguments:
- utteranceIterator: Iterator yielding [SayAllContext, acss] tuples
- progressCallback: Called with progress updates
"""
GLib.idle_add(self._sayAllWorker, utteranceIterator, progressCallback)
def _sayAllWorker(self, iterator, callback):
"""Process one utterance at a time (called via GLib.idle_add).
Arguments:
- iterator: Utterance iterator
- callback: Progress callback
"""
try:
context, acss = next(iterator)
except StopIteration:
return False
def onComplete():
context.currentOffset = context.endOffset
callback(context.copy(), speechserver.SayAllContext.COMPLETED)
GLib.idle_add(self._sayAllWorker, iterator, callback)
context.currentOffset = context.startOffset
callback(context.copy(), speechserver.SayAllContext.PROGRESS)
with self._lock:
self._stopEvent.clear()
generation = self._speakGeneration
self._currentFuture = self._executor.submit(
self._synthesizeAndPlay, context.utterance, acss, onComplete, generation
)
return False
def speakCharacter(self, character, acss=None):
"""Speaks a single character immediately.
Arguments:
- character: Character to speak
- acss: ACSS voice settings
"""
name = chnames.getCharacterName(character)
if name and name != character:
if cthulhu_state.activeScript:
name = cthulhu_state.activeScript.utilities.adjustForPronunciation(name)
self.speak(name, acss)
else:
self.speak(character, acss)
def speakKeyEvent(self, event, acss=None):
"""Speaks a key event immediately.
Arguments:
- event: The KeyboardEvent to speak
- acss: ACSS voice settings
"""
eventString = event.getKeyName()
lockingStateString = event.getLockingStateString()
eventString = f"{eventString} {lockingStateString}".strip()
self.speak(eventString, acss)
def _changeDefaultSpeechRate(self, step, decrease=False):
"""Change the default speech rate.
Arguments:
- step: Amount to change
- decrease: If True, decrease rate; otherwise increase
"""
acss = settings.voices[settings.DEFAULT_VOICE]
delta = step * (-1 if decrease else 1)
try:
rate = acss[ACSS.RATE]
except KeyError:
rate = 50
acss[ACSS.RATE] = max(0, min(99, rate + delta))
msg = f"PIPER: Rate set to {acss[ACSS.RATE]}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self.speak(
messages.SPEECH_SLOWER if decrease else messages.SPEECH_FASTER,
acss=acss
)
def _changeDefaultSpeechPitch(self, step, decrease=False):
"""Change the default speech pitch.
Arguments:
- step: Amount to change
- decrease: If True, decrease pitch; otherwise increase
"""
acss = settings.voices[settings.DEFAULT_VOICE]
delta = step * (-1 if decrease else 1)
try:
pitch = acss[ACSS.AVERAGE_PITCH]
except KeyError:
pitch = 5
acss[ACSS.AVERAGE_PITCH] = max(0, min(9, pitch + delta))
msg = f"PIPER: Pitch set to {acss[ACSS.AVERAGE_PITCH]}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self.speak(
messages.SPEECH_LOWER if decrease else messages.SPEECH_HIGHER,
acss=acss
)
def _changeDefaultSpeechVolume(self, step, decrease=False):
"""Change the default speech volume.
Arguments:
- step: Amount to change
- decrease: If True, decrease volume; otherwise increase
"""
acss = settings.voices[settings.DEFAULT_VOICE]
delta = step * (-1 if decrease else 1)
try:
volume = acss[ACSS.GAIN]
except KeyError:
volume = 10
acss[ACSS.GAIN] = max(0, min(9, volume + delta))
msg = f"PIPER: Volume set to {acss[ACSS.GAIN]}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self.speak(
messages.SPEECH_SOFTER if decrease else messages.SPEECH_LOUDER,
acss=acss
)
def increaseSpeechRate(self, step=5):
"""Increases the speech rate."""
self._changeDefaultSpeechRate(step)
def decreaseSpeechRate(self, step=5):
"""Decreases the speech rate."""
self._changeDefaultSpeechRate(step, decrease=True)
def increaseSpeechPitch(self, step=0.5):
"""Increases the speech pitch."""
self._changeDefaultSpeechPitch(step)
def decreaseSpeechPitch(self, step=0.5):
"""Decreases the speech pitch."""
self._changeDefaultSpeechPitch(step, decrease=True)
def increaseSpeechVolume(self, step=0.5):
"""Increases the speech volume."""
self._changeDefaultSpeechVolume(step)
def decreaseSpeechVolume(self, step=0.5):
"""Decreases the speech volume."""
self._changeDefaultSpeechVolume(step, decrease=True)
def updateCapitalizationStyle(self):
"""Updates the capitalization style used by the speech server."""
pass
def updatePunctuationLevel(self):
"""Punctuation level changed, inform this speechServer."""
pass
def stop(self):
"""Stops ongoing speech and flushes the queue."""
with self._lock:
self._speakGeneration += 1
self._stopEvent.set()
if self._currentFuture:
self._currentFuture.cancel()
self._currentFuture = None
if self._audioPlayer:
self._audioPlayer.stop()
def shutdown(self):
"""Shuts down the speech engine."""
self.stop()
if self._executor:
self._executor.shutdown(wait=False)
self._executor = None
if self._audioPlayer:
self._audioPlayer.shutdown()
self._audioPlayer = None
self._voice = None
self._voiceInfo = None
if self._id in SpeechServer._active_servers:
del SpeechServer._active_servers[self._id]
msg = 'PIPER: Server shutdown complete'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def reset(self, text=None, acss=None):
"""Resets the speech engine."""
self.stop()
if self._voiceInfo:
try:
self._loadVoice(self._voiceInfo)
except Exception as e:
msg = f'PIPER: Failed to reset voice: {e}'
debug.printMessage(debug.LEVEL_WARNING, msg, True)
+1 -1
View File
@@ -274,7 +274,7 @@ activeProfile = ['Default', 'default']
profile = ['Default', 'default']
# Speech
speechFactoryModules = ["speechdispatcherfactory"]
speechFactoryModules = ["speechdispatcherfactory", "piperfactory"]
speechServerFactory = "speechdispatcherfactory"
speechServerInfo = None # None means let the factory decide.
enableSpeech = True
+20 -7
View File
@@ -93,9 +93,9 @@ def init():
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Already initialized', True)
return
chosenModuleName = settings.speechServerFactory
try:
moduleName = settings.speechServerFactory
_initSpeechServer(moduleName, settings.speechServerInfo)
_initSpeechServer(chosenModuleName, settings.speechServerInfo)
except Exception:
moduleNames = settings.speechFactoryModules
for moduleName in moduleNames:
@@ -103,12 +103,21 @@ def init():
try:
_initSpeechServer(moduleName, None)
if _speechserver:
chosenModuleName = moduleName
break
except Exception:
debug.printException(debug.LEVEL_SEVERE)
if _speechserver:
tokens = ["SPEECH: Using speech server factory:", moduleName]
if chosenModuleName != settings.speechServerFactory:
settings.speechServerFactory = chosenModuleName
settings.speechServerInfo = None
tokens = [
"SPEECH: Falling back to speech server factory:",
chosenModuleName
]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
tokens = ["SPEECH: Using speech server factory:", chosenModuleName]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
else:
msg = 'SPEECH: Not available'
@@ -266,6 +275,7 @@ def speak(content, acss=None, interrupt=True):
if not isinstance(content, list):
return
shouldInterrupt = interrupt
toSpeak = []
activeVoice = acss
if acss is not None:
@@ -275,14 +285,16 @@ def speak(content, acss=None, interrupt=True):
if not isinstance(element, validTypes):
debug.printMessage(debug.LEVEL_INFO, error % element, True)
elif isinstance(element, list):
speak(element, acss, interrupt)
speak(element, acss, shouldInterrupt)
shouldInterrupt = False
elif isinstance(element, str):
if len(element):
toSpeak.append(element)
elif isinstance(element, Icon):
if toSpeak:
string = " ".join(toSpeak)
_speak(string, activeVoice, interrupt)
_speak(string, activeVoice, shouldInterrupt)
shouldInterrupt = False
toSpeak = []
if element.isValid():
player = sound.getPlayer()
@@ -305,13 +317,14 @@ def speak(content, acss=None, interrupt=True):
if toSpeak:
string = " ".join(toSpeak)
_speak(string, activeVoice, interrupt)
_speak(string, activeVoice, shouldInterrupt)
shouldInterrupt = False
activeVoice = newVoice
toSpeak = newItemsToSpeak
if toSpeak:
string = " ".join(toSpeak)
_speak(string, activeVoice, interrupt)
_speak(string, activeVoice, shouldInterrupt)
def speakKeyEvent(event, acss=None):
"""Speaks a key event immediately.
+53 -11
View File
@@ -346,11 +346,11 @@ class SpeechAndVerbosityManager:
return f"{value:.1f}".rstrip("0").rstrip(".")
return str(value)
def _present_message(self, script, message):
def _present_message(self, script, message, voice=None):
if script:
script.presentMessage(message)
script.presentMessage(message, voice=voice)
else:
speech.speak(message)
speech.speak(message, voice)
def _get_default_voice(self):
from . import acss
@@ -370,7 +370,23 @@ class SpeechAndVerbosityManager:
default_voice['established'] = True
def _get_current_speech_setting(self):
return self._speech_settings_order[self._current_speech_setting_index]
order = self._get_speech_settings_order()
if not order:
return ""
if self._current_speech_setting_index < 0:
self._current_speech_setting_index = 0
elif self._current_speech_setting_index >= len(order):
self._current_speech_setting_index = len(order) - 1
return order[self._current_speech_setting_index]
def _get_speech_settings_order(self):
order = ["rate", "pitch", "volume"]
server = self._get_server()
if server and hasattr(server, "list_output_modules") and hasattr(server, "getOutputModule"):
order.append("module")
if server and hasattr(server, "getVoiceFamilies"):
order.append("voice")
return order
def _get_rate_value(self):
from . import acss
@@ -410,14 +426,28 @@ class SpeechAndVerbosityManager:
default_voice = self._get_default_voice()
family = default_voice.get(acss.ACSS.FAMILY, {}) or {}
name = family.get(speechserver.VoiceFamily.NAME)
if name and server:
voices = self._get_available_voices(server)
if voices:
for voice in voices:
if voice.get(speechserver.VoiceFamily.NAME) == name:
return name
self._set_default_voice_family(voices[0])
return voices[0].get(speechserver.VoiceFamily.NAME)
if name:
return name
if server:
voices = self._get_available_voices(server)
if voices:
self._set_default_voice_family(voices[0])
return voices[0].get(speechserver.VoiceFamily.NAME)
return ""
def _get_voice_messages(self, server):
if server and hasattr(server, "list_output_modules"):
return messages.SPEECH_VOICE_VALUE, messages.SPEECH_VOICES_UNAVAILABLE
return messages.SPEECH_VOICE_VALUE_GENERIC, messages.SPEECH_VOICES_UNAVAILABLE_GENERIC
def _get_available_modules(self, server):
if server is None or not hasattr(server, 'list_output_modules'):
return []
@@ -539,27 +569,37 @@ class SpeechAndVerbosityManager:
elif setting == "voice":
server = self._get_server()
voices = self._get_available_voices(server)
voice_value_message, voice_unavailable_message = self._get_voice_messages(server)
if not voices:
message = messages.SPEECH_VOICES_UNAVAILABLE
message = voice_unavailable_message
else:
name = self._get_current_voice_name(server)
message = messages.SPEECH_VOICE_VALUE % name
message = voice_value_message % name
else:
message = ""
if message:
self._present_message(script, message)
voice = self._get_default_voice() if setting == "voice" else None
self._present_message(script, message, voice=voice)
@dbus_service.command
def select_previous_speech_setting(self, script=None, event=None):
if self._current_speech_setting_index > 0:
order = self._get_speech_settings_order()
if not order:
return True
if self._current_speech_setting_index >= len(order):
self._current_speech_setting_index = len(order) - 1
elif self._current_speech_setting_index > 0:
self._current_speech_setting_index -= 1
self._announce_current_speech_setting(script)
return True
@dbus_service.command
def select_next_speech_setting(self, script=None, event=None):
if self._current_speech_setting_index < len(self._speech_settings_order) - 1:
order = self._get_speech_settings_order()
if not order:
return True
if self._current_speech_setting_index < len(order) - 1:
self._current_speech_setting_index += 1
self._announce_current_speech_setting(script)
return True
@@ -663,7 +703,8 @@ class SpeechAndVerbosityManager:
server = self._get_server()
voices = self._get_available_voices(server)
if not voices:
self._present_message(script, messages.SPEECH_VOICES_UNAVAILABLE)
_, voice_unavailable_message = self._get_voice_messages(server)
self._present_message(script, voice_unavailable_message)
return True
current_name = self._get_current_voice_name(server)
@@ -680,7 +721,8 @@ class SpeechAndVerbosityManager:
name = new_voice.get(speechserver.VoiceFamily.NAME, "")
msg = f"SPEECH AND VERBOSITY MANAGER: Voice set to {name}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._present_message(script, messages.SPEECH_VOICE_VALUE % name)
voice_value_message, _ = self._get_voice_messages(server)
self._present_message(script, voice_value_message % name, voice=self._get_default_voice())
return True
@dbus_service.command