Initial attempt at porting over fenrir's hardware synth support. Probably buggy.
This commit is contained in:
@@ -24,6 +24,45 @@ This repository is a screen reader. Prioritize accessibility, correctness, and s
|
|||||||
- If repo and installed behavior differ, prefer rebuilding with `./build-local.sh` over patching the installed package directly.
|
- If repo and installed behavior differ, prefer rebuilding with `./build-local.sh` over patching the installed package directly.
|
||||||
- Treat direct edits under `~/.local/.../cthulhu/` as an exception path that requires explicit user approval.
|
- Treat direct edits under `~/.local/.../cthulhu/` as an exception path that requires explicit user approval.
|
||||||
|
|
||||||
|
## Contribution workflow (including generated patches)
|
||||||
|
- Assume contributors may use code-generation tools without understanding every changed line. Review the resulting code, not the contributor's confidence in it.
|
||||||
|
- Start from a reproducible user-visible problem. Capture the exact application, desktop/session type, steps, expected behavior, actual behavior, and relevant logs before changing code.
|
||||||
|
- Investigate the full confirmed behavior class before implementing a fix. For example, a stale-focus bug seen in one browser may also affect other applications or empty workspaces.
|
||||||
|
- Prefer the smallest root-cause fix that covers the full confirmed behavior class. Do not add app-specific exceptions, desktop-specific branches, compatibility fallbacks, or broad refactors unless the evidence requires them.
|
||||||
|
- Add or update automated regression tests for the general behavior and the originally reported workflow whenever practical.
|
||||||
|
- Read every generated diff before committing. Remove unrelated rewrites, speculative cleanup, dead code, debug leftovers, and generated files that are not required by the fix.
|
||||||
|
- Never commit a change solely because it compiles or because an automated tool says it works. Verify the real user-facing workflow after rebuilding the installed copy.
|
||||||
|
|
||||||
|
## Verification checklist before commit
|
||||||
|
Run the narrowest relevant checks first, then broaden testing based on the affected behavior:
|
||||||
|
|
||||||
|
1. Inspect the diff:
|
||||||
|
- `git diff --check`
|
||||||
|
- `git status --short`
|
||||||
|
- `git diff --stat`
|
||||||
|
- `git diff -- <changed files>`
|
||||||
|
2. Run syntax checks for each changed Python file:
|
||||||
|
- `python -m py_compile <changed .py files>`
|
||||||
|
3. Run focused automated regression tests:
|
||||||
|
- `python -m unittest <relevant test modules>`
|
||||||
|
- For shared input, focus, script lifecycle, settings, plugin loading, or installation changes, run `./test-local.sh` after the focused tests pass.
|
||||||
|
4. Rebuild the local installed copy:
|
||||||
|
- `./build-local.sh`
|
||||||
|
5. Confirm the runtime import resolves to the refreshed local install:
|
||||||
|
- `python - <<'PY'`
|
||||||
|
- `import importlib.util`
|
||||||
|
- `print(importlib.util.find_spec("cthulhu").origin)`
|
||||||
|
- `PY`
|
||||||
|
6. Reproduce the original user-visible workflow against the rebuilt copy.
|
||||||
|
7. Check closely related regressions. For focus, keyboard, or window-tracking changes, manually test:
|
||||||
|
- Xorg and the user's active window manager or desktop.
|
||||||
|
- Switching among browser content, terminal windows, GTK applications, dialogs, and empty workspaces.
|
||||||
|
- Returning to the original application after each switch.
|
||||||
|
- Cthulhu shortcuts, structural navigation, flat review, and any delegated key handling such as Fenrir in XTerm.
|
||||||
|
- Both key press and key release behavior for modifiers, NumLock, and keypad keys when relevant.
|
||||||
|
- Clean shutdown without crashing the browser or leaving grabs behind.
|
||||||
|
8. Record what was tested, what could not be tested locally, and any remaining uncertainty in the commit message or review notes.
|
||||||
|
|
||||||
## Platform support stance
|
## Platform support stance
|
||||||
- **critical** Robust Xorg support is required and is a merge gate for Cthulhu.
|
- **critical** Robust Xorg support is required and is a merge gate for Cthulhu.
|
||||||
- Wayland support is desirable, but it is secondary to keeping Xorg stable and usable.
|
- Wayland support is desirable, but it is secondary to keeping Xorg stable and usable.
|
||||||
|
|||||||
@@ -1787,6 +1787,35 @@
|
|||||||
<property name="top_attach">5</property>
|
<property name="top_attach">5</property>
|
||||||
</packing>
|
</packing>
|
||||||
</child>
|
</child>
|
||||||
|
<child>
|
||||||
|
<object class="GtkLabel" id="hardwareDeviceLabel">
|
||||||
|
<property name="visible">False</property>
|
||||||
|
<property name="can_focus">False</property>
|
||||||
|
<property name="xalign">1</property>
|
||||||
|
<property name="label" translatable="yes">Serial _device:</property>
|
||||||
|
<property name="use_underline">True</property>
|
||||||
|
<property name="justify">right</property>
|
||||||
|
<property name="mnemonic_widget">hardwareDeviceCombo</property>
|
||||||
|
<accessibility>
|
||||||
|
<relation type="label-for" target="hardwareDeviceCombo"/>
|
||||||
|
</accessibility>
|
||||||
|
</object>
|
||||||
|
<packing>
|
||||||
|
<property name="left_attach">0</property>
|
||||||
|
<property name="top_attach">9</property>
|
||||||
|
</packing>
|
||||||
|
</child>
|
||||||
|
<child>
|
||||||
|
<object class="GtkComboBox" id="hardwareDeviceCombo">
|
||||||
|
<property name="visible">False</property>
|
||||||
|
<property name="can_focus">False</property>
|
||||||
|
<signal name="changed" handler="hardwareDeviceChanged" swapped="no"/>
|
||||||
|
</object>
|
||||||
|
<packing>
|
||||||
|
<property name="left_attach">1</property>
|
||||||
|
<property name="top_attach">9</property>
|
||||||
|
</packing>
|
||||||
|
</child>
|
||||||
</object>
|
</object>
|
||||||
</child>
|
</child>
|
||||||
</object>
|
</object>
|
||||||
|
|||||||
@@ -168,6 +168,9 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
|
|||||||
self.speechFamiliesChoice = None
|
self.speechFamiliesChoice = None
|
||||||
self.speechFamiliesChoices = None
|
self.speechFamiliesChoices = None
|
||||||
self.speechFamiliesModel = None
|
self.speechFamiliesModel = None
|
||||||
|
self.hardwareDeviceChoice = None
|
||||||
|
self.hardwareDeviceChoices = None
|
||||||
|
self.hardwareDeviceModel = None
|
||||||
self.speechLanguagesChoice = None
|
self.speechLanguagesChoice = None
|
||||||
self.speechLanguagesChoices = None
|
self.speechLanguagesChoices = None
|
||||||
self.speechLanguagesModel = None
|
self.speechLanguagesModel = None
|
||||||
@@ -405,6 +408,11 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
|
|||||||
self._initComboBox(self.get_widget("speechLanguages"))
|
self._initComboBox(self.get_widget("speechLanguages"))
|
||||||
self.speechFamiliesModel = \
|
self.speechFamiliesModel = \
|
||||||
self._initComboBox(self.get_widget("speechFamilies"))
|
self._initComboBox(self.get_widget("speechFamilies"))
|
||||||
|
try:
|
||||||
|
self.hardwareDeviceModel = \
|
||||||
|
self._initComboBox(self.get_widget("hardwareDeviceCombo"))
|
||||||
|
except AttributeError:
|
||||||
|
self.hardwareDeviceModel = None
|
||||||
self.echoSpeechServersModel = \
|
self.echoSpeechServersModel = \
|
||||||
self._initComboBox(self.get_widget("echoSpeechServers"))
|
self._initComboBox(self.get_widget("echoSpeechServers"))
|
||||||
self.echoSpeechFamiliesModel = \
|
self.echoSpeechFamiliesModel = \
|
||||||
@@ -1703,6 +1711,8 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
|
|||||||
#
|
#
|
||||||
self.initializingSpeech = True
|
self.initializingSpeech = True
|
||||||
self._setupSpeechSystems(factories)
|
self._setupSpeechSystems(factories)
|
||||||
|
self._setupHardwareDevice()
|
||||||
|
self._updateHardwareDeviceVisibility()
|
||||||
self.initializingSpeech = False
|
self.initializingSpeech = False
|
||||||
|
|
||||||
def _getSpeechDispatcherFactory(self):
|
def _getSpeechDispatcherFactory(self):
|
||||||
@@ -3847,6 +3857,118 @@ print(json.dumps(result))
|
|||||||
self.prefsDict["onlySpeakDisplayedText"] = enable
|
self.prefsDict["onlySpeakDisplayedText"] = enable
|
||||||
self.get_widget("contextOptionsGrid").set_sensitive(not enable)
|
self.get_widget("contextOptionsGrid").set_sensitive(not enable)
|
||||||
|
|
||||||
|
|
||||||
|
def _scanSerialDevices(self):
|
||||||
|
"""Scan for available serial devices and return a list of paths."""
|
||||||
|
import glob
|
||||||
|
devices = []
|
||||||
|
patterns = [
|
||||||
|
"/dev/ttyUSB*",
|
||||||
|
"/dev/ttyACM*",
|
||||||
|
"/dev/ttyS*",
|
||||||
|
"/dev/ttyAMA*",
|
||||||
|
"/dev/rfcomm*",
|
||||||
|
"/dev/serial/by-id/*",
|
||||||
|
]
|
||||||
|
for pattern in patterns:
|
||||||
|
devices.extend(glob.glob(pattern))
|
||||||
|
devices = sorted(set(devices))
|
||||||
|
return devices
|
||||||
|
|
||||||
|
def _setupHardwareDevice(self):
|
||||||
|
"""Sets up the hardware device combo box with available serial ports.
|
||||||
|
|
||||||
|
Populates the combo with scanned serial devices and restores the
|
||||||
|
previously saved selection if still available.
|
||||||
|
"""
|
||||||
|
if self.hardwareDeviceModel is None:
|
||||||
|
return
|
||||||
|
combobox = self.get_widget("hardwareDeviceCombo")
|
||||||
|
combobox.set_model(None)
|
||||||
|
self.hardwareDeviceModel.clear()
|
||||||
|
self.hardwareDeviceChoices = []
|
||||||
|
|
||||||
|
devices = self._scanSerialDevices()
|
||||||
|
saved_device = self.prefsDict.get("hardwareSpeechDevice",
|
||||||
|
settings.hardwareSpeechDevice)
|
||||||
|
|
||||||
|
# Always include a "(none)" option so the user can clear the device
|
||||||
|
self.hardwareDeviceChoices.append("")
|
||||||
|
self.hardwareDeviceModel.append((0, "(none)"))
|
||||||
|
i = 1
|
||||||
|
for device in devices:
|
||||||
|
self.hardwareDeviceChoices.append(device)
|
||||||
|
self.hardwareDeviceModel.append((i, device))
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
# If the saved device is not in the scanned list but is non-empty,
|
||||||
|
# append it so the user still sees their configured device.
|
||||||
|
if saved_device and saved_device not in devices:
|
||||||
|
self.hardwareDeviceChoices.append(saved_device)
|
||||||
|
self.hardwareDeviceModel.append((i, saved_device))
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
combobox.set_model(self.hardwareDeviceModel)
|
||||||
|
self._setHardwareDeviceChoice(saved_device)
|
||||||
|
|
||||||
|
def _setHardwareDeviceChoice(self, device_name):
|
||||||
|
"""Set the active item in the hardware device combo box.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
- device_name: the device path to select.
|
||||||
|
"""
|
||||||
|
if not self.hardwareDeviceChoices:
|
||||||
|
self.hardwareDeviceChoice = None
|
||||||
|
return
|
||||||
|
|
||||||
|
for i, choice in enumerate(self.hardwareDeviceChoices):
|
||||||
|
if choice == device_name:
|
||||||
|
self.get_widget("hardwareDeviceCombo").set_active(i)
|
||||||
|
self.hardwareDeviceChoice = choice
|
||||||
|
return
|
||||||
|
|
||||||
|
self.get_widget("hardwareDeviceCombo").set_active(0)
|
||||||
|
self.hardwareDeviceChoice = self.hardwareDeviceChoices[0]
|
||||||
|
|
||||||
|
def _updateHardwareDeviceVisibility(self):
|
||||||
|
"""Show or hide the hardware device combo based on speech system.
|
||||||
|
|
||||||
|
The hardware device selector is only visible when the hardware
|
||||||
|
speech synthesizer factory is active.
|
||||||
|
"""
|
||||||
|
if self.hardwareDeviceModel is None:
|
||||||
|
return
|
||||||
|
is_hardware = False
|
||||||
|
if self.speechSystemsChoice:
|
||||||
|
try:
|
||||||
|
is_hardware = (
|
||||||
|
self.speechSystemsChoice.__name__ == "hardwarefactory"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.get_widget("hardwareDeviceLabel").set_visible(is_hardware)
|
||||||
|
self.get_widget("hardwareDeviceCombo").set_visible(is_hardware)
|
||||||
|
|
||||||
|
def hardwareDeviceChanged(self, widget):
|
||||||
|
"""Signal handler for the hardware device combo box changed signal.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
- widget: the component that generated the signal.
|
||||||
|
"""
|
||||||
|
if self.initializingSpeech:
|
||||||
|
return
|
||||||
|
|
||||||
|
selected_index = widget.get_active()
|
||||||
|
if selected_index >= 0 and selected_index < len(self.hardwareDeviceChoices):
|
||||||
|
self.hardwareDeviceChoice = self.hardwareDeviceChoices[selected_index]
|
||||||
|
else:
|
||||||
|
self.hardwareDeviceChoice = None
|
||||||
|
|
||||||
|
# Update runtime settings so the factory sees the new device
|
||||||
|
if self.hardwareDeviceChoice is not None:
|
||||||
|
settings.hardwareSpeechDevice = self.hardwareDeviceChoice
|
||||||
|
|
||||||
def speechSystemsChanged(self, widget):
|
def speechSystemsChanged(self, widget):
|
||||||
"""Signal handler for the "changed" signal for the speechSystems
|
"""Signal handler for the "changed" signal for the speechSystems
|
||||||
GtkComboBox widget. The user has selected a different speech
|
GtkComboBox widget. The user has selected a different speech
|
||||||
@@ -3866,6 +3988,7 @@ print(json.dumps(result))
|
|||||||
self._setupSpeechServers()
|
self._setupSpeechServers()
|
||||||
self._setupEchoSpeechServers()
|
self._setupEchoSpeechServers()
|
||||||
self._setEchoVoiceItems()
|
self._setEchoVoiceItems()
|
||||||
|
self._updateHardwareDeviceVisibility()
|
||||||
|
|
||||||
def speechServersChanged(self, widget):
|
def speechServersChanged(self, widget):
|
||||||
"""Signal handler for the "changed" signal for the speechServers
|
"""Signal handler for the "changed" signal for the speechServers
|
||||||
@@ -4927,6 +5050,16 @@ print(json.dumps(result))
|
|||||||
self.prefsDict["speechServerFactory"] = \
|
self.prefsDict["speechServerFactory"] = \
|
||||||
self.speechSystemsChoice.__name__
|
self.speechSystemsChoice.__name__
|
||||||
|
|
||||||
|
# Save hardware speech device setting when hardware factory is active
|
||||||
|
if self.speechSystemsChoice and \
|
||||||
|
self.speechSystemsChoice.__name__ == "hardwarefactory":
|
||||||
|
if self.hardwareDeviceChoice is not None:
|
||||||
|
self.prefsDict["hardwareSpeechDevice"] = self.hardwareDeviceChoice
|
||||||
|
else:
|
||||||
|
self.prefsDict["hardwareSpeechDevice"] = ""
|
||||||
|
else:
|
||||||
|
self.prefsDict["hardwareSpeechDevice"] = settings.hardwareSpeechDevice
|
||||||
|
|
||||||
speechServerChoice = self._getSpeechServerChoiceForSave()
|
speechServerChoice = self._getSpeechServerChoiceForSave()
|
||||||
if speechServerChoice:
|
if speechServerChoice:
|
||||||
self.prefsDict["speechServerInfo"] = \
|
self.prefsDict["speechServerInfo"] = \
|
||||||
|
|||||||
@@ -870,6 +870,19 @@ SPEECH_DISPATCHER = _("Speech Dispatcher")
|
|||||||
# Translators: This label refers to the Piper neural text-to-speech system.
|
# Translators: This label refers to the Piper neural text-to-speech system.
|
||||||
# (https://github.com/rhasspy/piper)
|
# (https://github.com/rhasspy/piper)
|
||||||
PIPER_TTS = _("Piper Neural TTS")
|
PIPER_TTS = _("Piper Neural TTS")
|
||||||
|
# Translators: This label refers to external hardware serial speech synthesizers.
|
||||||
|
HARDWARE_SPEECH = _("Hardware Speech Synthesizer")
|
||||||
|
# Translators: This label refers to the LiteTalk hardware speech synthesizer.
|
||||||
|
HARDWARE_LITETALK = _("LiteTalk")
|
||||||
|
# Translators: This label refers to the DoubleTalk LT hardware speech synthesizer.
|
||||||
|
HARDWARE_DOUBLETALK = _("DoubleTalk LT")
|
||||||
|
# Translators: This label refers to the TripleTalk hardware speech synthesizer.
|
||||||
|
HARDWARE_TRIPLETALK = _("TripleTalk")
|
||||||
|
# Translators: This label refers to the Dectalk hardware synthesizer.
|
||||||
|
HARDWARE_DECTALK = _("Dectalk")
|
||||||
|
# Translators: This is the label for the combo box that lets the user choose
|
||||||
|
# the serial device used by a hardware speech synthesizer.
|
||||||
|
HARDWARE_SERIAL_DEVICE = _("Serial _device:")
|
||||||
|
|
||||||
# Translators: This is a label for a group of options related to Cthulhu's behavior
|
# Translators: This is a label for a group of options related to Cthulhu's behavior
|
||||||
# when presenting an application's spell check dialog.
|
# when presenting an application's spell check dialog.
|
||||||
|
|||||||
@@ -0,0 +1,571 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
#
|
||||||
|
# Copyright (c) 2024 Stormux
|
||||||
|
#
|
||||||
|
# This library is free software; you can redistribute it and/or
|
||||||
|
# modify it under the terms of the GNU Lesser General Public
|
||||||
|
# License as published by the Free Software Foundation; either
|
||||||
|
# version 2.1 of the License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This library is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
# Lesser General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Lesser General Public
|
||||||
|
# License along with this library; if not, write to the
|
||||||
|
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
|
||||||
|
# Boston MA 02110-1301 USA.
|
||||||
|
#
|
||||||
|
# Cthulhu project: https://git.stormux.org/storm/cthulhu
|
||||||
|
|
||||||
|
"""Provides a Cthulhu speech server for hardware serial synthesizers.
|
||||||
|
|
||||||
|
Ports Fenrir's hardware serial drivers (LiteTalk/DoubleTalk/TripleTalk,
|
||||||
|
Dectalk) to Cthulhu's SpeechServer interface.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import termios
|
||||||
|
import threading
|
||||||
|
import tty
|
||||||
|
from queue import Empty, Queue
|
||||||
|
|
||||||
|
from . import debug
|
||||||
|
from . import guilabels
|
||||||
|
from . import messages
|
||||||
|
from . import settings
|
||||||
|
from . import speechserver
|
||||||
|
from .acss import ACSS
|
||||||
|
|
||||||
|
|
||||||
|
class _SpeakQueue(Queue):
|
||||||
|
"""Queue with a clear() method."""
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
self.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class _HardwareSerialDriver:
|
||||||
|
"""Base class for hardware serial speech synthesizers.
|
||||||
|
|
||||||
|
Ported from Fenrir's hardwareSerialDriver.py.
|
||||||
|
"""
|
||||||
|
|
||||||
|
cancel_command = b""
|
||||||
|
default_baud_rate = 9600
|
||||||
|
|
||||||
|
def __init__(self, device: str, baud_rate: int):
|
||||||
|
self.device = device
|
||||||
|
self.baud_rate = baud_rate
|
||||||
|
self.serial_port: int | None = None
|
||||||
|
self.text_queue = _SpeakQueue()
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
self.worker_thread: threading.Thread | None = None
|
||||||
|
self._stop_worker = False
|
||||||
|
self._is_initialized = False
|
||||||
|
|
||||||
|
def initialize(self) -> bool:
|
||||||
|
self._open_serial_port()
|
||||||
|
self._is_initialized = self.serial_port is not None
|
||||||
|
if self._is_initialized:
|
||||||
|
self._stop_worker = False
|
||||||
|
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
|
||||||
|
self.worker_thread.start()
|
||||||
|
return self._is_initialized
|
||||||
|
|
||||||
|
def shutdown(self) -> None:
|
||||||
|
if not self._is_initialized:
|
||||||
|
return
|
||||||
|
self._stop_worker = True
|
||||||
|
self.clear_buffer()
|
||||||
|
self.text_queue.put(None)
|
||||||
|
if self.worker_thread:
|
||||||
|
self.worker_thread.join(timeout=0.5)
|
||||||
|
self._close_serial_port()
|
||||||
|
self._is_initialized = False
|
||||||
|
|
||||||
|
def speak(self, text: str, interrupt: bool = True) -> None:
|
||||||
|
if not self._is_initialized:
|
||||||
|
return
|
||||||
|
if interrupt:
|
||||||
|
self.stop()
|
||||||
|
if not isinstance(text, str) or text == "":
|
||||||
|
return
|
||||||
|
self.text_queue.put(text)
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
if not self._is_initialized:
|
||||||
|
return
|
||||||
|
self.clear_buffer()
|
||||||
|
if self.cancel_command:
|
||||||
|
self._write_bytes(self.cancel_command, "cancel")
|
||||||
|
|
||||||
|
def clear_buffer(self) -> None:
|
||||||
|
if not self._is_initialized:
|
||||||
|
return
|
||||||
|
self.text_queue.clear()
|
||||||
|
|
||||||
|
def set_rate(self, rate: float) -> None:
|
||||||
|
if not self._is_initialized:
|
||||||
|
return
|
||||||
|
self._write_bytes(self._rate_command(rate), "rate")
|
||||||
|
|
||||||
|
def set_pitch(self, pitch: float) -> None:
|
||||||
|
if not self._is_initialized:
|
||||||
|
return
|
||||||
|
self._write_bytes(self._pitch_command(pitch), "pitch")
|
||||||
|
|
||||||
|
def set_volume(self, volume: float) -> None:
|
||||||
|
if not self._is_initialized:
|
||||||
|
return
|
||||||
|
self._write_bytes(self._volume_command(volume), "volume")
|
||||||
|
|
||||||
|
def _worker(self) -> None:
|
||||||
|
while not self._stop_worker:
|
||||||
|
text = self.text_queue.get()
|
||||||
|
if text is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
data = self._speak_bytes(text)
|
||||||
|
self._write_bytes(data, "speech")
|
||||||
|
except Exception as error:
|
||||||
|
msg = f"HARDWARE SPEECH: worker failed: {error}"
|
||||||
|
debug.printMessage(debug.LEVEL_ERROR, msg, True)
|
||||||
|
|
||||||
|
def _open_serial_port(self) -> None:
|
||||||
|
if not self.device or self.device == "auto":
|
||||||
|
msg = "HARDWARE SPEECH: requires an explicit serial device"
|
||||||
|
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||||
|
return
|
||||||
|
|
||||||
|
port = self._open_configured_serial_port(self.device)
|
||||||
|
if port is not None:
|
||||||
|
self._activate_serial_port(self.device, port)
|
||||||
|
|
||||||
|
def _open_configured_serial_port(self, device: str) -> int | None:
|
||||||
|
port = None
|
||||||
|
try:
|
||||||
|
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
|
||||||
|
tty.setraw(port)
|
||||||
|
attrs = termios.tcgetattr(port)
|
||||||
|
attrs[2] |= termios.CLOCAL | termios.CREAD
|
||||||
|
baud_rate = self._termios_baud_rate(self.baud_rate)
|
||||||
|
attrs[4] = baud_rate
|
||||||
|
attrs[5] = baud_rate
|
||||||
|
attrs[6][termios.VMIN] = 0
|
||||||
|
attrs[6][termios.VTIME] = 0
|
||||||
|
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
|
||||||
|
termios.tcsetattr(port, termios.TCSANOW, attrs)
|
||||||
|
return port
|
||||||
|
except (OSError, termios.error) as error:
|
||||||
|
self._close_port(port)
|
||||||
|
msg = f"HARDWARE SPEECH: device open failed: {device}: {error}"
|
||||||
|
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _activate_serial_port(self, device: str, port: int) -> None:
|
||||||
|
self.serial_port = port
|
||||||
|
self.device = device
|
||||||
|
msg = f"HARDWARE SPEECH: device opened: {device}, baud_rate={self.baud_rate}"
|
||||||
|
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||||
|
|
||||||
|
def _close_serial_port(self) -> None:
|
||||||
|
with self.lock:
|
||||||
|
if self.serial_port is None:
|
||||||
|
return
|
||||||
|
self._close_port(self.serial_port)
|
||||||
|
self.serial_port = None
|
||||||
|
|
||||||
|
def _close_port(self, port: int | None) -> None:
|
||||||
|
if port is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
os.close(port)
|
||||||
|
except OSError as error:
|
||||||
|
msg = f"HARDWARE SPEECH: device close failed: {error}"
|
||||||
|
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||||
|
|
||||||
|
def _write_bytes(self, data: bytes, description: str = "data") -> None:
|
||||||
|
if not data:
|
||||||
|
return
|
||||||
|
with self.lock:
|
||||||
|
if self.serial_port is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
total_written = 0
|
||||||
|
while total_written < len(data):
|
||||||
|
bytes_written = os.write(self.serial_port, data[total_written:])
|
||||||
|
if bytes_written == 0:
|
||||||
|
raise OSError("serial write returned 0 bytes")
|
||||||
|
total_written += bytes_written
|
||||||
|
preview = self._format_bytes_preview(data)
|
||||||
|
msg = f"HARDWARE SPEECH: wrote {total_written} {description} bytes: {preview}"
|
||||||
|
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||||
|
except OSError as error:
|
||||||
|
msg = f"HARDWARE SPEECH: write failed: {error}"
|
||||||
|
debug.printMessage(debug.LEVEL_ERROR, msg, True)
|
||||||
|
|
||||||
|
def _termios_baud_rate(self, baud_rate: int) -> int:
|
||||||
|
baud_name = f"B{baud_rate}"
|
||||||
|
if hasattr(termios, baud_name):
|
||||||
|
return getattr(termios, baud_name)
|
||||||
|
msg = f"HARDWARE SPEECH: unsupported baud rate {baud_rate}; using 9600"
|
||||||
|
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||||
|
return termios.B9600
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _clean_text(text: str) -> str:
|
||||||
|
text = text.replace("\r", " ").replace("\n", " ")
|
||||||
|
return "".join(char if 0x20 <= ord(char) <= 0x7E else " " for char in text)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _scale(value: float, minimum: int, maximum: int) -> int:
|
||||||
|
value = max(0.0, min(1.0, value))
|
||||||
|
return int(round(minimum + value * (maximum - minimum)))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _format_bytes_preview(data: bytes, limit: int = 32) -> str:
|
||||||
|
preview = data[:limit]
|
||||||
|
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
|
||||||
|
ascii_preview = "".join(
|
||||||
|
chr(byte) if 0x20 <= byte <= 0x7E else "." for byte in preview
|
||||||
|
)
|
||||||
|
suffix = "" if len(data) <= limit else " ..."
|
||||||
|
return f"hex=[{hex_preview}{suffix}] ascii=[{ascii_preview}{suffix}]"
|
||||||
|
|
||||||
|
def _speak_bytes(self, text: str) -> bytes:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def _rate_command(self, rate: float) -> bytes:
|
||||||
|
return b""
|
||||||
|
|
||||||
|
def _pitch_command(self, pitch: float) -> bytes:
|
||||||
|
return b""
|
||||||
|
|
||||||
|
def _volume_command(self, volume: float) -> bytes:
|
||||||
|
return b""
|
||||||
|
|
||||||
|
|
||||||
|
class _LiteTalkDriver(_HardwareSerialDriver):
|
||||||
|
"""LiteTalk-compatible serial driver."""
|
||||||
|
|
||||||
|
cancel_command = b"\x18"
|
||||||
|
|
||||||
|
def _speak_bytes(self, text: str) -> bytes:
|
||||||
|
return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
|
||||||
|
|
||||||
|
def _rate_command(self, rate: float) -> bytes:
|
||||||
|
return self._setting_command(self._scale(rate, 0, 9), b"S")
|
||||||
|
|
||||||
|
def _pitch_command(self, pitch: float) -> bytes:
|
||||||
|
return self._setting_command(self._scale(pitch, 0, 99), b"P")
|
||||||
|
|
||||||
|
def _volume_command(self, volume: float) -> bytes:
|
||||||
|
return self._setting_command(self._scale(volume, 0, 9), b"V")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _setting_command(value: int, command: bytes) -> bytes:
|
||||||
|
return b"\x01" + str(value).encode("ascii") + command
|
||||||
|
|
||||||
|
|
||||||
|
class _DectalkDriver(_HardwareSerialDriver):
|
||||||
|
"""Dectalk serial driver."""
|
||||||
|
|
||||||
|
cancel_command = b"\x18"
|
||||||
|
|
||||||
|
def _speak_bytes(self, text: str) -> bytes:
|
||||||
|
return self._clean_text(text).encode("ascii", errors="replace") + b"\x01"
|
||||||
|
|
||||||
|
def _rate_command(self, rate: float) -> bytes:
|
||||||
|
return self._setting_command("ra", self._scale(rate, 75, 650))
|
||||||
|
|
||||||
|
def _pitch_command(self, pitch: float) -> bytes:
|
||||||
|
return self._setting_command("dv ap", self._scale(pitch, 50, 180))
|
||||||
|
|
||||||
|
def _volume_command(self, volume: float) -> bytes:
|
||||||
|
return self._setting_command("vo", self._scale(volume, 0, 100))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _setting_command(command: str, value: int) -> bytes:
|
||||||
|
return f"[:{command} {value}]".encode("ascii")
|
||||||
|
|
||||||
|
|
||||||
|
_DRIVER_MAP: dict[str, type[_HardwareSerialDriver]] = {
|
||||||
|
"litetalk": _LiteTalkDriver,
|
||||||
|
"doubletalk": _LiteTalkDriver,
|
||||||
|
"tripletalk": _LiteTalkDriver,
|
||||||
|
"dectalk": _DectalkDriver,
|
||||||
|
}
|
||||||
|
|
||||||
|
_SYNTH_DISPLAY_NAMES = {
|
||||||
|
"litetalk": guilabels.HARDWARE_LITETALK,
|
||||||
|
"doubletalk": guilabels.HARDWARE_DOUBLETALK,
|
||||||
|
"tripletalk": guilabels.HARDWARE_TRIPLETALK,
|
||||||
|
"dectalk": guilabels.HARDWARE_DECTALK,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class SpeechServer(speechserver.SpeechServer):
|
||||||
|
"""Hardware serial speech server implementation for Cthulhu."""
|
||||||
|
|
||||||
|
_active_servers: dict[str, SpeechServer] = {}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def getFactoryName() -> str:
|
||||||
|
"""Returns a localized name describing this factory."""
|
||||||
|
return guilabels.HARDWARE_SPEECH
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def getSpeechServers() -> list[SpeechServer]:
|
||||||
|
"""Gets available speech servers as a list."""
|
||||||
|
return [
|
||||||
|
SpeechServer(server_id, initialize=False, register=False)
|
||||||
|
for server_id in _DRIVER_MAP
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _getSpeechServer(cls, server_id: str) -> SpeechServer | None:
|
||||||
|
"""Return an active server for the given id."""
|
||||||
|
active_server = cls._active_servers.get(server_id)
|
||||||
|
if active_server is not None:
|
||||||
|
if active_server._matches_current_settings():
|
||||||
|
return active_server
|
||||||
|
active_server.shutdown()
|
||||||
|
|
||||||
|
cls(server_id)
|
||||||
|
return cls._active_servers.get(server_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def getSpeechServer(info: list[str] | None = None) -> SpeechServer | None:
|
||||||
|
"""Gets a given SpeechServer based upon the info."""
|
||||||
|
if info and len(info) >= 2:
|
||||||
|
server_id = info[1]
|
||||||
|
else:
|
||||||
|
server_id = "litetalk"
|
||||||
|
return SpeechServer._getSpeechServer(server_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def shutdownActiveServers() -> None:
|
||||||
|
"""Cleans up and shuts down this factory."""
|
||||||
|
servers = list(SpeechServer._active_servers.values())
|
||||||
|
for server in servers:
|
||||||
|
server.shutdown()
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
server_id: str,
|
||||||
|
initialize: bool = True,
|
||||||
|
register: bool = True,
|
||||||
|
):
|
||||||
|
super().__init__()
|
||||||
|
self._id = server_id
|
||||||
|
self._driver: _HardwareSerialDriver | None = None
|
||||||
|
self._info: list[str] = []
|
||||||
|
self._device = ""
|
||||||
|
self._baud_rate = settings.hardwareSpeechBaudRate
|
||||||
|
|
||||||
|
driver_class = _DRIVER_MAP.get(server_id)
|
||||||
|
if driver_class is None:
|
||||||
|
msg = f"HARDWARE SPEECH: unknown synth type: {server_id}"
|
||||||
|
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||||
|
return
|
||||||
|
|
||||||
|
display_name = _SYNTH_DISPLAY_NAMES.get(server_id, server_id)
|
||||||
|
self._info = [display_name, server_id]
|
||||||
|
|
||||||
|
if not initialize:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._device = settings.hardwareSpeechDevice
|
||||||
|
self._baud_rate = settings.hardwareSpeechBaudRate
|
||||||
|
self._driver = driver_class(self._device, self._baud_rate)
|
||||||
|
if self._driver.initialize():
|
||||||
|
if register:
|
||||||
|
SpeechServer._active_servers[server_id] = self
|
||||||
|
msg = f"HARDWARE SPEECH: server initialized: {server_id} on {self._device}"
|
||||||
|
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||||
|
else:
|
||||||
|
msg = f"HARDWARE SPEECH: server initialization failed: {server_id}"
|
||||||
|
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||||
|
self._driver = None
|
||||||
|
|
||||||
|
def _matches_current_settings(self) -> bool:
|
||||||
|
return (
|
||||||
|
self._driver is not None
|
||||||
|
and self._device == settings.hardwareSpeechDevice
|
||||||
|
and self._baud_rate == settings.hardwareSpeechBaudRate
|
||||||
|
)
|
||||||
|
|
||||||
|
def getInfo(self) -> list[str]:
|
||||||
|
"""Returns [name, id]."""
|
||||||
|
return self._info
|
||||||
|
|
||||||
|
def getVoiceFamilies(self) -> list[dict[str, str]]:
|
||||||
|
"""Returns a list of VoiceFamily instances."""
|
||||||
|
return []
|
||||||
|
|
||||||
|
def speakCharacter(self, character: str, acss: dict | None = None) -> None:
|
||||||
|
"""Speaks a single character immediately."""
|
||||||
|
if self._driver:
|
||||||
|
self._apply_acss(acss)
|
||||||
|
self._driver.speak(character, interrupt=True)
|
||||||
|
|
||||||
|
def speakKeyEvent(self, event, acss: dict | None = None) -> None:
|
||||||
|
"""Speaks a key event immediately."""
|
||||||
|
event_string = event.getKeyName()
|
||||||
|
locking_state_string = event.getLockingStateString()
|
||||||
|
text = f"{event_string} {locking_state_string}".strip()
|
||||||
|
self.speak(text, acss=acss)
|
||||||
|
|
||||||
|
def speak(
|
||||||
|
self,
|
||||||
|
text: str | None = None,
|
||||||
|
acss: dict | None = None,
|
||||||
|
interrupt: bool = True,
|
||||||
|
) -> None:
|
||||||
|
"""Speaks all queued text immediately."""
|
||||||
|
if not self._driver or text is None:
|
||||||
|
return
|
||||||
|
self._apply_acss(acss)
|
||||||
|
self._driver.speak(text, interrupt=interrupt)
|
||||||
|
|
||||||
|
def sayAll(self, utteranceIterator, progressCallback) -> None:
|
||||||
|
"""Iterates through the given utteranceIterator, speaking each utterance."""
|
||||||
|
for context, acss in utteranceIterator:
|
||||||
|
self.speak(context.utterance, acss=acss, interrupt=False)
|
||||||
|
|
||||||
|
def increaseSpeechRate(self, step: int = 5) -> None:
|
||||||
|
self._change_default_speech_rate(step)
|
||||||
|
|
||||||
|
def decreaseSpeechRate(self, step: int = 5) -> None:
|
||||||
|
self._change_default_speech_rate(step, decrease=True)
|
||||||
|
|
||||||
|
def increaseSpeechPitch(self, step: float = 0.5) -> None:
|
||||||
|
self._change_default_speech_pitch(step)
|
||||||
|
|
||||||
|
def decreaseSpeechPitch(self, step: float = 0.5) -> None:
|
||||||
|
self._change_default_speech_pitch(step, decrease=True)
|
||||||
|
|
||||||
|
def increaseSpeechVolume(self, step: float = 0.5) -> None:
|
||||||
|
self._change_default_speech_volume(step)
|
||||||
|
|
||||||
|
def decreaseSpeechVolume(self, step: float = 0.5) -> None:
|
||||||
|
self._change_default_speech_volume(step, decrease=True)
|
||||||
|
|
||||||
|
def updateCapitalizationStyle(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def updatePunctuationLevel(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
if self._driver:
|
||||||
|
self._driver.stop()
|
||||||
|
|
||||||
|
def shutdown(self) -> None:
|
||||||
|
if self._driver:
|
||||||
|
self._driver.shutdown()
|
||||||
|
self._driver = None
|
||||||
|
if self._id in SpeechServer._active_servers:
|
||||||
|
del SpeechServer._active_servers[self._id]
|
||||||
|
|
||||||
|
def reset(self, text: str | None = None, acss: dict | None = None) -> None:
|
||||||
|
if self._driver:
|
||||||
|
self._driver.shutdown()
|
||||||
|
self._driver = None
|
||||||
|
|
||||||
|
driver_class = _DRIVER_MAP.get(self._id)
|
||||||
|
if driver_class is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._device = settings.hardwareSpeechDevice
|
||||||
|
self._baud_rate = settings.hardwareSpeechBaudRate
|
||||||
|
self._driver = driver_class(self._device, self._baud_rate)
|
||||||
|
if not self._driver.initialize():
|
||||||
|
self._driver = None
|
||||||
|
|
||||||
|
def _apply_acss(self, acss: dict | None) -> None:
|
||||||
|
if not self._driver or not acss:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
rate = acss.get(ACSS.RATE)
|
||||||
|
if rate is not None:
|
||||||
|
normalized = max(0.0, min(99.0, float(rate))) / 99.0
|
||||||
|
self._driver.set_rate(normalized)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
pitch = acss.get(ACSS.AVERAGE_PITCH)
|
||||||
|
if pitch is not None:
|
||||||
|
normalized = max(0.0, min(9.0, float(pitch))) / 9.0
|
||||||
|
self._driver.set_pitch(normalized)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
volume = acss.get(ACSS.GAIN)
|
||||||
|
if volume is not None:
|
||||||
|
normalized = max(0.0, min(9.0, float(volume))) / 9.0
|
||||||
|
self._driver.set_volume(normalized)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _change_default_speech_rate(self, step: float, decrease: bool = False) -> None:
|
||||||
|
acss = settings.voices[settings.DEFAULT_VOICE]
|
||||||
|
delta = step * (-1 if decrease else 1)
|
||||||
|
try:
|
||||||
|
rate = acss[ACSS.RATE]
|
||||||
|
except KeyError:
|
||||||
|
rate = 50.0
|
||||||
|
acss[ACSS.RATE] = max(0, min(99, rate + delta))
|
||||||
|
msg = f"HARDWARE SPEECH: rate set to {acss[ACSS.RATE]}"
|
||||||
|
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||||
|
if self._driver:
|
||||||
|
normalized = acss[ACSS.RATE] / 99.0
|
||||||
|
self._driver.set_rate(normalized)
|
||||||
|
self.speak(
|
||||||
|
messages.SPEECH_SLOWER if decrease else messages.SPEECH_FASTER,
|
||||||
|
acss=acss
|
||||||
|
)
|
||||||
|
|
||||||
|
def _change_default_speech_pitch(self, step: float, decrease: bool = False) -> None:
|
||||||
|
acss = settings.voices[settings.DEFAULT_VOICE]
|
||||||
|
delta = step * (-1 if decrease else 1)
|
||||||
|
try:
|
||||||
|
pitch = acss[ACSS.AVERAGE_PITCH]
|
||||||
|
except KeyError:
|
||||||
|
pitch = 5.0
|
||||||
|
acss[ACSS.AVERAGE_PITCH] = max(0, min(9, pitch + delta))
|
||||||
|
msg = f"HARDWARE SPEECH: pitch set to {acss[ACSS.AVERAGE_PITCH]}"
|
||||||
|
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||||
|
if self._driver:
|
||||||
|
normalized = acss[ACSS.AVERAGE_PITCH] / 9.0
|
||||||
|
self._driver.set_pitch(normalized)
|
||||||
|
self.speak(
|
||||||
|
messages.SPEECH_LOWER if decrease else messages.SPEECH_HIGHER,
|
||||||
|
acss=acss
|
||||||
|
)
|
||||||
|
|
||||||
|
def _change_default_speech_volume(self, step: float, decrease: bool = False) -> None:
|
||||||
|
acss = settings.voices[settings.DEFAULT_VOICE]
|
||||||
|
delta = step * (-1 if decrease else 1)
|
||||||
|
try:
|
||||||
|
volume = acss[ACSS.GAIN]
|
||||||
|
except KeyError:
|
||||||
|
volume = 10.0
|
||||||
|
acss[ACSS.GAIN] = max(0, min(9, volume + delta))
|
||||||
|
msg = f"HARDWARE SPEECH: volume set to {acss[ACSS.GAIN]}"
|
||||||
|
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||||
|
if self._driver:
|
||||||
|
normalized = acss[ACSS.GAIN] / 9.0
|
||||||
|
self._driver.set_volume(normalized)
|
||||||
|
self.speak(
|
||||||
|
messages.SPEECH_SOFTER if decrease else messages.SPEECH_LOUDER,
|
||||||
|
acss=acss
|
||||||
|
)
|
||||||
@@ -101,6 +101,7 @@ cthulhu_python_sources = files([
|
|||||||
'speech.py',
|
'speech.py',
|
||||||
'spellcheck.py',
|
'spellcheck.py',
|
||||||
'speechdispatcherfactory.py',
|
'speechdispatcherfactory.py',
|
||||||
|
'hardwarefactory.py',
|
||||||
'speech_generator.py',
|
'speech_generator.py',
|
||||||
'speechserver.py',
|
'speechserver.py',
|
||||||
'piperfactory.py',
|
'piperfactory.py',
|
||||||
|
|||||||
@@ -300,19 +300,21 @@ class SpeechServer(speechserver.SpeechServer):
|
|||||||
return voiceInfo.sampleRate if voiceInfo else None
|
return voiceInfo.sampleRate if voiceInfo else None
|
||||||
|
|
||||||
def _mapRate(self, acssRate):
|
def _mapRate(self, acssRate):
|
||||||
"""Map ACSS rate (0-99) to Piper length_scale.
|
"""Map ACSS rate (0-100) to Piper length_scale.
|
||||||
|
|
||||||
ACSS rate 50 (default) = length_scale 1.0
|
ACSS rate 50 (default) = length_scale 1.0
|
||||||
Higher ACSS rate = lower length_scale (faster)
|
Higher ACSS rate = lower length_scale (faster)
|
||||||
Lower ACSS rate = higher length_scale (slower)
|
Lower ACSS rate = higher length_scale (slower)
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
- acssRate: Rate value from 0-99
|
- acssRate: Rate value from 0-100
|
||||||
"""
|
"""
|
||||||
rate = acssRate if acssRate is not None else 50
|
rate = acssRate if acssRate is not None else 50
|
||||||
rate = max(0, min(99, rate))
|
rate = max(0.0, min(100.0, float(rate)))
|
||||||
lengthScale = 2.0 - (rate / 99.0) * 1.5
|
if rate <= 50.0:
|
||||||
return max(0.5, min(2.0, lengthScale))
|
return 2.0 - (rate / 50.0)
|
||||||
|
|
||||||
|
return 1.0 - ((rate - 50.0) / 50.0) * 0.75
|
||||||
|
|
||||||
def _mapPitch(self, acssPitch):
|
def _mapPitch(self, acssPitch):
|
||||||
"""Map ACSS pitch (0-9) to pitch adjustment factor.
|
"""Map ACSS pitch (0-9) to pitch adjustment factor.
|
||||||
@@ -614,7 +616,7 @@ class SpeechServer(speechserver.SpeechServer):
|
|||||||
rate = acss[ACSS.RATE]
|
rate = acss[ACSS.RATE]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
rate = 50
|
rate = 50
|
||||||
acss[ACSS.RATE] = max(0, min(99, rate + delta))
|
acss[ACSS.RATE] = max(0, min(100, rate + delta))
|
||||||
msg = f"PIPER: Rate set to {acss[ACSS.RATE]}"
|
msg = f"PIPER: Rate set to {acss[ACSS.RATE]}"
|
||||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||||
self.speak(
|
self.speak(
|
||||||
|
|||||||
@@ -42,6 +42,8 @@ userCustomizableSettings = [
|
|||||||
"onlySpeakDisplayedText",
|
"onlySpeakDisplayedText",
|
||||||
"speechServerFactory",
|
"speechServerFactory",
|
||||||
"speechServerInfo",
|
"speechServerInfo",
|
||||||
|
"hardwareSpeechDevice",
|
||||||
|
"hardwareSpeechBaudRate",
|
||||||
"voices",
|
"voices",
|
||||||
"speechVerbosityLevel",
|
"speechVerbosityLevel",
|
||||||
"readFullRowInGUITable",
|
"readFullRowInGUITable",
|
||||||
@@ -265,9 +267,11 @@ activeProfile = ['Default', 'default']
|
|||||||
profile = ['Default', 'default']
|
profile = ['Default', 'default']
|
||||||
|
|
||||||
# Speech
|
# Speech
|
||||||
speechFactoryModules = ["speechdispatcherfactory", "piperfactory"]
|
speechFactoryModules = ["speechdispatcherfactory", "piperfactory", "hardwarefactory"]
|
||||||
speechServerFactory = "speechdispatcherfactory"
|
speechServerFactory = "speechdispatcherfactory"
|
||||||
speechServerInfo = None # None means let the factory decide.
|
speechServerInfo = None # None means let the factory decide.
|
||||||
|
hardwareSpeechDevice = ""
|
||||||
|
hardwareSpeechBaudRate = 9600
|
||||||
enableSpeech = True
|
enableSpeech = True
|
||||||
silenceSpeech = False
|
silenceSpeech = False
|
||||||
enableTutorialMessages = False
|
enableTutorialMessages = False
|
||||||
|
|||||||
@@ -0,0 +1,100 @@
|
|||||||
|
import os
|
||||||
|
import select
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||||
|
|
||||||
|
from cthulhu import hardwarefactory
|
||||||
|
from cthulhu import settings
|
||||||
|
|
||||||
|
|
||||||
|
def read_available(fd, expectedLength, timeout=1.0):
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
data = b""
|
||||||
|
while len(data) < expectedLength and time.monotonic() < deadline:
|
||||||
|
readable, _, _ = select.select([fd], [], [], 0.05)
|
||||||
|
if readable:
|
||||||
|
data += os.read(fd, 1024)
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
class HardwareFactoryRegressionTests(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self._oldDevice = settings.hardwareSpeechDevice
|
||||||
|
self._oldBaudRate = settings.hardwareSpeechBaudRate
|
||||||
|
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||||
|
settings.hardwareSpeechDevice = self._oldDevice
|
||||||
|
settings.hardwareSpeechBaudRate = self._oldBaudRate
|
||||||
|
|
||||||
|
def test_lists_explicit_synth_choices_without_opening_serial_device(self):
|
||||||
|
settings.hardwareSpeechDevice = ""
|
||||||
|
|
||||||
|
servers = hardwarefactory.SpeechServer.getSpeechServers()
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
["litetalk", "doubletalk", "tripletalk", "dectalk"],
|
||||||
|
[server.getInfo()[1] for server in servers],
|
||||||
|
)
|
||||||
|
self.assertEqual({}, hardwarefactory.SpeechServer._active_servers)
|
||||||
|
self.assertTrue(all(server._driver is None for server in servers))
|
||||||
|
|
||||||
|
def test_failed_initialization_is_not_cached(self):
|
||||||
|
settings.hardwareSpeechDevice = ""
|
||||||
|
|
||||||
|
self.assertIsNone(
|
||||||
|
hardwarefactory.SpeechServer.getSpeechServer(["LiteTalk", "litetalk"])
|
||||||
|
)
|
||||||
|
self.assertEqual({}, hardwarefactory.SpeechServer._active_servers)
|
||||||
|
|
||||||
|
masterFd, slaveFd = os.openpty()
|
||||||
|
try:
|
||||||
|
settings.hardwareSpeechDevice = os.ttyname(slaveFd)
|
||||||
|
server = hardwarefactory.SpeechServer.getSpeechServer(
|
||||||
|
["LiteTalk", "litetalk"]
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsNotNone(server)
|
||||||
|
self.assertIsNotNone(server._driver)
|
||||||
|
self.assertIs(
|
||||||
|
server,
|
||||||
|
hardwarefactory.SpeechServer._active_servers.get("litetalk"),
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
os.close(masterFd)
|
||||||
|
os.close(slaveFd)
|
||||||
|
|
||||||
|
def test_explicit_synth_choices_write_expected_serial_bytes(self):
|
||||||
|
expectedBytes = {
|
||||||
|
"litetalk": b"Alias\r",
|
||||||
|
"doubletalk": b"Alias\r",
|
||||||
|
"tripletalk": b"Alias\r",
|
||||||
|
"dectalk": b"Alias\x01",
|
||||||
|
}
|
||||||
|
|
||||||
|
for synthId, expected in expectedBytes.items():
|
||||||
|
with self.subTest(synthId=synthId):
|
||||||
|
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||||
|
masterFd, slaveFd = os.openpty()
|
||||||
|
try:
|
||||||
|
settings.hardwareSpeechDevice = os.ttyname(slaveFd)
|
||||||
|
server = hardwarefactory.SpeechServer.getSpeechServer(
|
||||||
|
["", synthId]
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsNotNone(server)
|
||||||
|
server.speak("Alias", interrupt=False)
|
||||||
|
self.assertEqual(expected, read_available(masterFd, len(expected)))
|
||||||
|
finally:
|
||||||
|
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||||
|
os.close(masterFd)
|
||||||
|
os.close(slaveFd)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||||
|
|
||||||
|
from cthulhu import piperfactory
|
||||||
|
|
||||||
|
|
||||||
|
class PiperFactoryRateMappingTests(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.server = piperfactory.SpeechServer.__new__(piperfactory.SpeechServer)
|
||||||
|
|
||||||
|
def test_default_rate_maps_to_native_piper_speed(self):
|
||||||
|
self.assertEqual(1.0, self.server._mapRate(50))
|
||||||
|
|
||||||
|
def test_rate_scale_uses_full_cthulhu_range(self):
|
||||||
|
self.assertEqual(2.0, self.server._mapRate(0))
|
||||||
|
self.assertEqual(0.25, self.server._mapRate(100))
|
||||||
|
|
||||||
|
def test_high_screen_reader_rate_is_substantially_faster(self):
|
||||||
|
self.assertAlmostEqual(0.415, self.server._mapRate(89), places=3)
|
||||||
|
|
||||||
|
def test_rate_values_are_clamped(self):
|
||||||
|
self.assertEqual(2.0, self.server._mapRate(-1))
|
||||||
|
self.assertEqual(0.25, self.server._mapRate(101))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user