From 6ecc775c6dcc04646dfe9c22437d93f6805bd608 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Sat, 30 May 2026 18:40:35 -0400 Subject: [PATCH] Initial attempt at porting over fenrir's hardware synth support. Probably buggy. --- AGENTS.md | 39 ++ src/cthulhu/cthulhu-setup.ui | 29 ++ src/cthulhu/cthulhu_gui_prefs.py | 133 +++++ src/cthulhu/guilabels.py | 13 + src/cthulhu/hardwarefactory.py | 571 ++++++++++++++++++++++ src/cthulhu/meson.build | 1 + src/cthulhu/piperfactory.py | 14 +- src/cthulhu/settings.py | 6 +- tests/test_hardwarefactory_regressions.py | 100 ++++ tests/test_piperfactory_rate_mapping.py | 30 ++ 10 files changed, 929 insertions(+), 7 deletions(-) create mode 100644 src/cthulhu/hardwarefactory.py create mode 100644 tests/test_hardwarefactory_regressions.py create mode 100644 tests/test_piperfactory_rate_mapping.py diff --git a/AGENTS.md b/AGENTS.md index bf342b6..d429b97 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,6 +24,45 @@ This repository is a screen reader. Prioritize accessibility, correctness, and s - If repo and installed behavior differ, prefer rebuilding with `./build-local.sh` over patching the installed package directly. - Treat direct edits under `~/.local/.../cthulhu/` as an exception path that requires explicit user approval. +## Contribution workflow (including generated patches) +- Assume contributors may use code-generation tools without understanding every changed line. Review the resulting code, not the contributor's confidence in it. +- Start from a reproducible user-visible problem. Capture the exact application, desktop/session type, steps, expected behavior, actual behavior, and relevant logs before changing code. +- Investigate the full confirmed behavior class before implementing a fix. For example, a stale-focus bug seen in one browser may also affect other applications or empty workspaces. +- Prefer the smallest root-cause fix that covers the full confirmed behavior class. Do not add app-specific exceptions, desktop-specific branches, compatibility fallbacks, or broad refactors unless the evidence requires them. +- Add or update automated regression tests for the general behavior and the originally reported workflow whenever practical. +- Read every generated diff before committing. Remove unrelated rewrites, speculative cleanup, dead code, debug leftovers, and generated files that are not required by the fix. +- Never commit a change solely because it compiles or because an automated tool says it works. Verify the real user-facing workflow after rebuilding the installed copy. + +## Verification checklist before commit +Run the narrowest relevant checks first, then broaden testing based on the affected behavior: + +1. Inspect the diff: + - `git diff --check` + - `git status --short` + - `git diff --stat` + - `git diff -- ` +2. Run syntax checks for each changed Python file: + - `python -m py_compile ` +3. Run focused automated regression tests: + - `python -m unittest ` + - For shared input, focus, script lifecycle, settings, plugin loading, or installation changes, run `./test-local.sh` after the focused tests pass. +4. Rebuild the local installed copy: + - `./build-local.sh` +5. Confirm the runtime import resolves to the refreshed local install: + - `python - <<'PY'` + - `import importlib.util` + - `print(importlib.util.find_spec("cthulhu").origin)` + - `PY` +6. Reproduce the original user-visible workflow against the rebuilt copy. +7. Check closely related regressions. For focus, keyboard, or window-tracking changes, manually test: + - Xorg and the user's active window manager or desktop. + - Switching among browser content, terminal windows, GTK applications, dialogs, and empty workspaces. + - Returning to the original application after each switch. + - Cthulhu shortcuts, structural navigation, flat review, and any delegated key handling such as Fenrir in XTerm. + - Both key press and key release behavior for modifiers, NumLock, and keypad keys when relevant. + - Clean shutdown without crashing the browser or leaving grabs behind. +8. Record what was tested, what could not be tested locally, and any remaining uncertainty in the commit message or review notes. + ## Platform support stance - **critical** Robust Xorg support is required and is a merge gate for Cthulhu. - Wayland support is desirable, but it is secondary to keeping Xorg stable and usable. diff --git a/src/cthulhu/cthulhu-setup.ui b/src/cthulhu/cthulhu-setup.ui index 4c3197a..bfc0a40 100644 --- a/src/cthulhu/cthulhu-setup.ui +++ b/src/cthulhu/cthulhu-setup.ui @@ -1787,6 +1787,35 @@ 5 + + + False + False + 1 + Serial _device: + True + right + hardwareDeviceCombo + + + + + + 0 + 9 + + + + + False + False + + + + 1 + 9 + + diff --git a/src/cthulhu/cthulhu_gui_prefs.py b/src/cthulhu/cthulhu_gui_prefs.py index e8a6914..18ab399 100644 --- a/src/cthulhu/cthulhu_gui_prefs.py +++ b/src/cthulhu/cthulhu_gui_prefs.py @@ -168,6 +168,9 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper): self.speechFamiliesChoice = None self.speechFamiliesChoices = None self.speechFamiliesModel = None + self.hardwareDeviceChoice = None + self.hardwareDeviceChoices = None + self.hardwareDeviceModel = None self.speechLanguagesChoice = None self.speechLanguagesChoices = None self.speechLanguagesModel = None @@ -405,6 +408,11 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper): self._initComboBox(self.get_widget("speechLanguages")) self.speechFamiliesModel = \ self._initComboBox(self.get_widget("speechFamilies")) + try: + self.hardwareDeviceModel = \ + self._initComboBox(self.get_widget("hardwareDeviceCombo")) + except AttributeError: + self.hardwareDeviceModel = None self.echoSpeechServersModel = \ self._initComboBox(self.get_widget("echoSpeechServers")) self.echoSpeechFamiliesModel = \ @@ -1703,6 +1711,8 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper): # self.initializingSpeech = True self._setupSpeechSystems(factories) + self._setupHardwareDevice() + self._updateHardwareDeviceVisibility() self.initializingSpeech = False def _getSpeechDispatcherFactory(self): @@ -3847,6 +3857,118 @@ print(json.dumps(result)) self.prefsDict["onlySpeakDisplayedText"] = enable self.get_widget("contextOptionsGrid").set_sensitive(not enable) + + def _scanSerialDevices(self): + """Scan for available serial devices and return a list of paths.""" + import glob + devices = [] + patterns = [ + "/dev/ttyUSB*", + "/dev/ttyACM*", + "/dev/ttyS*", + "/dev/ttyAMA*", + "/dev/rfcomm*", + "/dev/serial/by-id/*", + ] + for pattern in patterns: + devices.extend(glob.glob(pattern)) + devices = sorted(set(devices)) + return devices + + def _setupHardwareDevice(self): + """Sets up the hardware device combo box with available serial ports. + + Populates the combo with scanned serial devices and restores the + previously saved selection if still available. + """ + if self.hardwareDeviceModel is None: + return + combobox = self.get_widget("hardwareDeviceCombo") + combobox.set_model(None) + self.hardwareDeviceModel.clear() + self.hardwareDeviceChoices = [] + + devices = self._scanSerialDevices() + saved_device = self.prefsDict.get("hardwareSpeechDevice", + settings.hardwareSpeechDevice) + + # Always include a "(none)" option so the user can clear the device + self.hardwareDeviceChoices.append("") + self.hardwareDeviceModel.append((0, "(none)")) + i = 1 + for device in devices: + self.hardwareDeviceChoices.append(device) + self.hardwareDeviceModel.append((i, device)) + i += 1 + + # If the saved device is not in the scanned list but is non-empty, + # append it so the user still sees their configured device. + if saved_device and saved_device not in devices: + self.hardwareDeviceChoices.append(saved_device) + self.hardwareDeviceModel.append((i, saved_device)) + i += 1 + + combobox.set_model(self.hardwareDeviceModel) + self._setHardwareDeviceChoice(saved_device) + + def _setHardwareDeviceChoice(self, device_name): + """Set the active item in the hardware device combo box. + + Arguments: + - device_name: the device path to select. + """ + if not self.hardwareDeviceChoices: + self.hardwareDeviceChoice = None + return + + for i, choice in enumerate(self.hardwareDeviceChoices): + if choice == device_name: + self.get_widget("hardwareDeviceCombo").set_active(i) + self.hardwareDeviceChoice = choice + return + + self.get_widget("hardwareDeviceCombo").set_active(0) + self.hardwareDeviceChoice = self.hardwareDeviceChoices[0] + + def _updateHardwareDeviceVisibility(self): + """Show or hide the hardware device combo based on speech system. + + The hardware device selector is only visible when the hardware + speech synthesizer factory is active. + """ + if self.hardwareDeviceModel is None: + return + is_hardware = False + if self.speechSystemsChoice: + try: + is_hardware = ( + self.speechSystemsChoice.__name__ == "hardwarefactory" + ) + except Exception: + pass + + self.get_widget("hardwareDeviceLabel").set_visible(is_hardware) + self.get_widget("hardwareDeviceCombo").set_visible(is_hardware) + + def hardwareDeviceChanged(self, widget): + """Signal handler for the hardware device combo box changed signal. + + Arguments: + - widget: the component that generated the signal. + """ + if self.initializingSpeech: + return + + selected_index = widget.get_active() + if selected_index >= 0 and selected_index < len(self.hardwareDeviceChoices): + self.hardwareDeviceChoice = self.hardwareDeviceChoices[selected_index] + else: + self.hardwareDeviceChoice = None + + # Update runtime settings so the factory sees the new device + if self.hardwareDeviceChoice is not None: + settings.hardwareSpeechDevice = self.hardwareDeviceChoice + def speechSystemsChanged(self, widget): """Signal handler for the "changed" signal for the speechSystems GtkComboBox widget. The user has selected a different speech @@ -3866,6 +3988,7 @@ print(json.dumps(result)) self._setupSpeechServers() self._setupEchoSpeechServers() self._setEchoVoiceItems() + self._updateHardwareDeviceVisibility() def speechServersChanged(self, widget): """Signal handler for the "changed" signal for the speechServers @@ -4927,6 +5050,16 @@ print(json.dumps(result)) self.prefsDict["speechServerFactory"] = \ self.speechSystemsChoice.__name__ + # Save hardware speech device setting when hardware factory is active + if self.speechSystemsChoice and \ + self.speechSystemsChoice.__name__ == "hardwarefactory": + if self.hardwareDeviceChoice is not None: + self.prefsDict["hardwareSpeechDevice"] = self.hardwareDeviceChoice + else: + self.prefsDict["hardwareSpeechDevice"] = "" + else: + self.prefsDict["hardwareSpeechDevice"] = settings.hardwareSpeechDevice + speechServerChoice = self._getSpeechServerChoiceForSave() if speechServerChoice: self.prefsDict["speechServerInfo"] = \ diff --git a/src/cthulhu/guilabels.py b/src/cthulhu/guilabels.py index 5946025..d7c0dfd 100644 --- a/src/cthulhu/guilabels.py +++ b/src/cthulhu/guilabels.py @@ -870,6 +870,19 @@ SPEECH_DISPATCHER = _("Speech Dispatcher") # Translators: This label refers to the Piper neural text-to-speech system. # (https://github.com/rhasspy/piper) PIPER_TTS = _("Piper Neural TTS") +# Translators: This label refers to external hardware serial speech synthesizers. +HARDWARE_SPEECH = _("Hardware Speech Synthesizer") +# Translators: This label refers to the LiteTalk hardware speech synthesizer. +HARDWARE_LITETALK = _("LiteTalk") +# Translators: This label refers to the DoubleTalk LT hardware speech synthesizer. +HARDWARE_DOUBLETALK = _("DoubleTalk LT") +# Translators: This label refers to the TripleTalk hardware speech synthesizer. +HARDWARE_TRIPLETALK = _("TripleTalk") +# Translators: This label refers to the Dectalk hardware synthesizer. +HARDWARE_DECTALK = _("Dectalk") +# Translators: This is the label for the combo box that lets the user choose +# the serial device used by a hardware speech synthesizer. +HARDWARE_SERIAL_DEVICE = _("Serial _device:") # Translators: This is a label for a group of options related to Cthulhu's behavior # when presenting an application's spell check dialog. diff --git a/src/cthulhu/hardwarefactory.py b/src/cthulhu/hardwarefactory.py new file mode 100644 index 0000000..d0fc025 --- /dev/null +++ b/src/cthulhu/hardwarefactory.py @@ -0,0 +1,571 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the +# Free Software Foundation, Inc., Franklin Street, Fifth Floor, +# Boston MA 02110-1301 USA. +# +# Cthulhu project: https://git.stormux.org/storm/cthulhu + +"""Provides a Cthulhu speech server for hardware serial synthesizers. + +Ports Fenrir's hardware serial drivers (LiteTalk/DoubleTalk/TripleTalk, +Dectalk) to Cthulhu's SpeechServer interface. +""" + +from __future__ import annotations + +import os +import termios +import threading +import tty +from queue import Empty, Queue + +from . import debug +from . import guilabels +from . import messages +from . import settings +from . import speechserver +from .acss import ACSS + + +class _SpeakQueue(Queue): + """Queue with a clear() method.""" + + def clear(self): + try: + while True: + self.get_nowait() + except Empty: + pass + + +class _HardwareSerialDriver: + """Base class for hardware serial speech synthesizers. + + Ported from Fenrir's hardwareSerialDriver.py. + """ + + cancel_command = b"" + default_baud_rate = 9600 + + def __init__(self, device: str, baud_rate: int): + self.device = device + self.baud_rate = baud_rate + self.serial_port: int | None = None + self.text_queue = _SpeakQueue() + self.lock = threading.Lock() + self.worker_thread: threading.Thread | None = None + self._stop_worker = False + self._is_initialized = False + + def initialize(self) -> bool: + self._open_serial_port() + self._is_initialized = self.serial_port is not None + if self._is_initialized: + self._stop_worker = False + self.worker_thread = threading.Thread(target=self._worker, daemon=True) + self.worker_thread.start() + return self._is_initialized + + def shutdown(self) -> None: + if not self._is_initialized: + return + self._stop_worker = True + self.clear_buffer() + self.text_queue.put(None) + if self.worker_thread: + self.worker_thread.join(timeout=0.5) + self._close_serial_port() + self._is_initialized = False + + def speak(self, text: str, interrupt: bool = True) -> None: + if not self._is_initialized: + return + if interrupt: + self.stop() + if not isinstance(text, str) or text == "": + return + self.text_queue.put(text) + + def stop(self) -> None: + if not self._is_initialized: + return + self.clear_buffer() + if self.cancel_command: + self._write_bytes(self.cancel_command, "cancel") + + def clear_buffer(self) -> None: + if not self._is_initialized: + return + self.text_queue.clear() + + def set_rate(self, rate: float) -> None: + if not self._is_initialized: + return + self._write_bytes(self._rate_command(rate), "rate") + + def set_pitch(self, pitch: float) -> None: + if not self._is_initialized: + return + self._write_bytes(self._pitch_command(pitch), "pitch") + + def set_volume(self, volume: float) -> None: + if not self._is_initialized: + return + self._write_bytes(self._volume_command(volume), "volume") + + def _worker(self) -> None: + while not self._stop_worker: + text = self.text_queue.get() + if text is None: + return + try: + data = self._speak_bytes(text) + self._write_bytes(data, "speech") + except Exception as error: + msg = f"HARDWARE SPEECH: worker failed: {error}" + debug.printMessage(debug.LEVEL_ERROR, msg, True) + + def _open_serial_port(self) -> None: + if not self.device or self.device == "auto": + msg = "HARDWARE SPEECH: requires an explicit serial device" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return + + port = self._open_configured_serial_port(self.device) + if port is not None: + self._activate_serial_port(self.device, port) + + def _open_configured_serial_port(self, device: str) -> int | None: + port = None + try: + port = os.open(device, os.O_RDWR | os.O_NOCTTY) + tty.setraw(port) + attrs = termios.tcgetattr(port) + attrs[2] |= termios.CLOCAL | termios.CREAD + baud_rate = self._termios_baud_rate(self.baud_rate) + attrs[4] = baud_rate + attrs[5] = baud_rate + attrs[6][termios.VMIN] = 0 + attrs[6][termios.VTIME] = 0 + attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY) + termios.tcsetattr(port, termios.TCSANOW, attrs) + return port + except (OSError, termios.error) as error: + self._close_port(port) + msg = f"HARDWARE SPEECH: device open failed: {device}: {error}" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return None + + def _activate_serial_port(self, device: str, port: int) -> None: + self.serial_port = port + self.device = device + msg = f"HARDWARE SPEECH: device opened: {device}, baud_rate={self.baud_rate}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def _close_serial_port(self) -> None: + with self.lock: + if self.serial_port is None: + return + self._close_port(self.serial_port) + self.serial_port = None + + def _close_port(self, port: int | None) -> None: + if port is None: + return + try: + os.close(port) + except OSError as error: + msg = f"HARDWARE SPEECH: device close failed: {error}" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + + def _write_bytes(self, data: bytes, description: str = "data") -> None: + if not data: + return + with self.lock: + if self.serial_port is None: + return + try: + total_written = 0 + while total_written < len(data): + bytes_written = os.write(self.serial_port, data[total_written:]) + if bytes_written == 0: + raise OSError("serial write returned 0 bytes") + total_written += bytes_written + preview = self._format_bytes_preview(data) + msg = f"HARDWARE SPEECH: wrote {total_written} {description} bytes: {preview}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + except OSError as error: + msg = f"HARDWARE SPEECH: write failed: {error}" + debug.printMessage(debug.LEVEL_ERROR, msg, True) + + def _termios_baud_rate(self, baud_rate: int) -> int: + baud_name = f"B{baud_rate}" + if hasattr(termios, baud_name): + return getattr(termios, baud_name) + msg = f"HARDWARE SPEECH: unsupported baud rate {baud_rate}; using 9600" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return termios.B9600 + + @staticmethod + def _clean_text(text: str) -> str: + text = text.replace("\r", " ").replace("\n", " ") + return "".join(char if 0x20 <= ord(char) <= 0x7E else " " for char in text) + + @staticmethod + def _scale(value: float, minimum: int, maximum: int) -> int: + value = max(0.0, min(1.0, value)) + return int(round(minimum + value * (maximum - minimum))) + + @staticmethod + def _format_bytes_preview(data: bytes, limit: int = 32) -> str: + preview = data[:limit] + hex_preview = " ".join(f"{byte:02x}" for byte in preview) + ascii_preview = "".join( + chr(byte) if 0x20 <= byte <= 0x7E else "." for byte in preview + ) + suffix = "" if len(data) <= limit else " ..." + return f"hex=[{hex_preview}{suffix}] ascii=[{ascii_preview}{suffix}]" + + def _speak_bytes(self, text: str) -> bytes: + raise NotImplementedError + + def _rate_command(self, rate: float) -> bytes: + return b"" + + def _pitch_command(self, pitch: float) -> bytes: + return b"" + + def _volume_command(self, volume: float) -> bytes: + return b"" + + +class _LiteTalkDriver(_HardwareSerialDriver): + """LiteTalk-compatible serial driver.""" + + cancel_command = b"\x18" + + def _speak_bytes(self, text: str) -> bytes: + return self._clean_text(text).encode("ascii", errors="replace") + b"\r" + + def _rate_command(self, rate: float) -> bytes: + return self._setting_command(self._scale(rate, 0, 9), b"S") + + def _pitch_command(self, pitch: float) -> bytes: + return self._setting_command(self._scale(pitch, 0, 99), b"P") + + def _volume_command(self, volume: float) -> bytes: + return self._setting_command(self._scale(volume, 0, 9), b"V") + + @staticmethod + def _setting_command(value: int, command: bytes) -> bytes: + return b"\x01" + str(value).encode("ascii") + command + + +class _DectalkDriver(_HardwareSerialDriver): + """Dectalk serial driver.""" + + cancel_command = b"\x18" + + def _speak_bytes(self, text: str) -> bytes: + return self._clean_text(text).encode("ascii", errors="replace") + b"\x01" + + def _rate_command(self, rate: float) -> bytes: + return self._setting_command("ra", self._scale(rate, 75, 650)) + + def _pitch_command(self, pitch: float) -> bytes: + return self._setting_command("dv ap", self._scale(pitch, 50, 180)) + + def _volume_command(self, volume: float) -> bytes: + return self._setting_command("vo", self._scale(volume, 0, 100)) + + @staticmethod + def _setting_command(command: str, value: int) -> bytes: + return f"[:{command} {value}]".encode("ascii") + + +_DRIVER_MAP: dict[str, type[_HardwareSerialDriver]] = { + "litetalk": _LiteTalkDriver, + "doubletalk": _LiteTalkDriver, + "tripletalk": _LiteTalkDriver, + "dectalk": _DectalkDriver, +} + +_SYNTH_DISPLAY_NAMES = { + "litetalk": guilabels.HARDWARE_LITETALK, + "doubletalk": guilabels.HARDWARE_DOUBLETALK, + "tripletalk": guilabels.HARDWARE_TRIPLETALK, + "dectalk": guilabels.HARDWARE_DECTALK, +} + + +class SpeechServer(speechserver.SpeechServer): + """Hardware serial speech server implementation for Cthulhu.""" + + _active_servers: dict[str, SpeechServer] = {} + + @staticmethod + def getFactoryName() -> str: + """Returns a localized name describing this factory.""" + return guilabels.HARDWARE_SPEECH + + @staticmethod + def getSpeechServers() -> list[SpeechServer]: + """Gets available speech servers as a list.""" + return [ + SpeechServer(server_id, initialize=False, register=False) + for server_id in _DRIVER_MAP + ] + + @classmethod + def _getSpeechServer(cls, server_id: str) -> SpeechServer | None: + """Return an active server for the given id.""" + active_server = cls._active_servers.get(server_id) + if active_server is not None: + if active_server._matches_current_settings(): + return active_server + active_server.shutdown() + + cls(server_id) + return cls._active_servers.get(server_id) + + @staticmethod + def getSpeechServer(info: list[str] | None = None) -> SpeechServer | None: + """Gets a given SpeechServer based upon the info.""" + if info and len(info) >= 2: + server_id = info[1] + else: + server_id = "litetalk" + return SpeechServer._getSpeechServer(server_id) + + @staticmethod + def shutdownActiveServers() -> None: + """Cleans up and shuts down this factory.""" + servers = list(SpeechServer._active_servers.values()) + for server in servers: + server.shutdown() + + def __init__( + self, + server_id: str, + initialize: bool = True, + register: bool = True, + ): + super().__init__() + self._id = server_id + self._driver: _HardwareSerialDriver | None = None + self._info: list[str] = [] + self._device = "" + self._baud_rate = settings.hardwareSpeechBaudRate + + driver_class = _DRIVER_MAP.get(server_id) + if driver_class is None: + msg = f"HARDWARE SPEECH: unknown synth type: {server_id}" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return + + display_name = _SYNTH_DISPLAY_NAMES.get(server_id, server_id) + self._info = [display_name, server_id] + + if not initialize: + return + + self._device = settings.hardwareSpeechDevice + self._baud_rate = settings.hardwareSpeechBaudRate + self._driver = driver_class(self._device, self._baud_rate) + if self._driver.initialize(): + if register: + SpeechServer._active_servers[server_id] = self + msg = f"HARDWARE SPEECH: server initialized: {server_id} on {self._device}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + else: + msg = f"HARDWARE SPEECH: server initialization failed: {server_id}" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + self._driver = None + + def _matches_current_settings(self) -> bool: + return ( + self._driver is not None + and self._device == settings.hardwareSpeechDevice + and self._baud_rate == settings.hardwareSpeechBaudRate + ) + + def getInfo(self) -> list[str]: + """Returns [name, id].""" + return self._info + + def getVoiceFamilies(self) -> list[dict[str, str]]: + """Returns a list of VoiceFamily instances.""" + return [] + + def speakCharacter(self, character: str, acss: dict | None = None) -> None: + """Speaks a single character immediately.""" + if self._driver: + self._apply_acss(acss) + self._driver.speak(character, interrupt=True) + + def speakKeyEvent(self, event, acss: dict | None = None) -> None: + """Speaks a key event immediately.""" + event_string = event.getKeyName() + locking_state_string = event.getLockingStateString() + text = f"{event_string} {locking_state_string}".strip() + self.speak(text, acss=acss) + + def speak( + self, + text: str | None = None, + acss: dict | None = None, + interrupt: bool = True, + ) -> None: + """Speaks all queued text immediately.""" + if not self._driver or text is None: + return + self._apply_acss(acss) + self._driver.speak(text, interrupt=interrupt) + + def sayAll(self, utteranceIterator, progressCallback) -> None: + """Iterates through the given utteranceIterator, speaking each utterance.""" + for context, acss in utteranceIterator: + self.speak(context.utterance, acss=acss, interrupt=False) + + def increaseSpeechRate(self, step: int = 5) -> None: + self._change_default_speech_rate(step) + + def decreaseSpeechRate(self, step: int = 5) -> None: + self._change_default_speech_rate(step, decrease=True) + + def increaseSpeechPitch(self, step: float = 0.5) -> None: + self._change_default_speech_pitch(step) + + def decreaseSpeechPitch(self, step: float = 0.5) -> None: + self._change_default_speech_pitch(step, decrease=True) + + def increaseSpeechVolume(self, step: float = 0.5) -> None: + self._change_default_speech_volume(step) + + def decreaseSpeechVolume(self, step: float = 0.5) -> None: + self._change_default_speech_volume(step, decrease=True) + + def updateCapitalizationStyle(self) -> None: + pass + + def updatePunctuationLevel(self) -> None: + pass + + def stop(self) -> None: + if self._driver: + self._driver.stop() + + def shutdown(self) -> None: + if self._driver: + self._driver.shutdown() + self._driver = None + if self._id in SpeechServer._active_servers: + del SpeechServer._active_servers[self._id] + + def reset(self, text: str | None = None, acss: dict | None = None) -> None: + if self._driver: + self._driver.shutdown() + self._driver = None + + driver_class = _DRIVER_MAP.get(self._id) + if driver_class is None: + return + + self._device = settings.hardwareSpeechDevice + self._baud_rate = settings.hardwareSpeechBaudRate + self._driver = driver_class(self._device, self._baud_rate) + if not self._driver.initialize(): + self._driver = None + + def _apply_acss(self, acss: dict | None) -> None: + if not self._driver or not acss: + return + try: + rate = acss.get(ACSS.RATE) + if rate is not None: + normalized = max(0.0, min(99.0, float(rate))) / 99.0 + self._driver.set_rate(normalized) + except Exception: + pass + try: + pitch = acss.get(ACSS.AVERAGE_PITCH) + if pitch is not None: + normalized = max(0.0, min(9.0, float(pitch))) / 9.0 + self._driver.set_pitch(normalized) + except Exception: + pass + try: + volume = acss.get(ACSS.GAIN) + if volume is not None: + normalized = max(0.0, min(9.0, float(volume))) / 9.0 + self._driver.set_volume(normalized) + except Exception: + pass + + def _change_default_speech_rate(self, step: float, decrease: bool = False) -> None: + acss = settings.voices[settings.DEFAULT_VOICE] + delta = step * (-1 if decrease else 1) + try: + rate = acss[ACSS.RATE] + except KeyError: + rate = 50.0 + acss[ACSS.RATE] = max(0, min(99, rate + delta)) + msg = f"HARDWARE SPEECH: rate set to {acss[ACSS.RATE]}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + if self._driver: + normalized = acss[ACSS.RATE] / 99.0 + self._driver.set_rate(normalized) + self.speak( + messages.SPEECH_SLOWER if decrease else messages.SPEECH_FASTER, + acss=acss + ) + + def _change_default_speech_pitch(self, step: float, decrease: bool = False) -> None: + acss = settings.voices[settings.DEFAULT_VOICE] + delta = step * (-1 if decrease else 1) + try: + pitch = acss[ACSS.AVERAGE_PITCH] + except KeyError: + pitch = 5.0 + acss[ACSS.AVERAGE_PITCH] = max(0, min(9, pitch + delta)) + msg = f"HARDWARE SPEECH: pitch set to {acss[ACSS.AVERAGE_PITCH]}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + if self._driver: + normalized = acss[ACSS.AVERAGE_PITCH] / 9.0 + self._driver.set_pitch(normalized) + self.speak( + messages.SPEECH_LOWER if decrease else messages.SPEECH_HIGHER, + acss=acss + ) + + def _change_default_speech_volume(self, step: float, decrease: bool = False) -> None: + acss = settings.voices[settings.DEFAULT_VOICE] + delta = step * (-1 if decrease else 1) + try: + volume = acss[ACSS.GAIN] + except KeyError: + volume = 10.0 + acss[ACSS.GAIN] = max(0, min(9, volume + delta)) + msg = f"HARDWARE SPEECH: volume set to {acss[ACSS.GAIN]}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + if self._driver: + normalized = acss[ACSS.GAIN] / 9.0 + self._driver.set_volume(normalized) + self.speak( + messages.SPEECH_SOFTER if decrease else messages.SPEECH_LOUDER, + acss=acss + ) diff --git a/src/cthulhu/meson.build b/src/cthulhu/meson.build index d025922..9eaf271 100644 --- a/src/cthulhu/meson.build +++ b/src/cthulhu/meson.build @@ -101,6 +101,7 @@ cthulhu_python_sources = files([ 'speech.py', 'spellcheck.py', 'speechdispatcherfactory.py', + 'hardwarefactory.py', 'speech_generator.py', 'speechserver.py', 'piperfactory.py', diff --git a/src/cthulhu/piperfactory.py b/src/cthulhu/piperfactory.py index 7ee19b2..031461e 100644 --- a/src/cthulhu/piperfactory.py +++ b/src/cthulhu/piperfactory.py @@ -300,19 +300,21 @@ class SpeechServer(speechserver.SpeechServer): return voiceInfo.sampleRate if voiceInfo else None def _mapRate(self, acssRate): - """Map ACSS rate (0-99) to Piper length_scale. + """Map ACSS rate (0-100) to Piper length_scale. ACSS rate 50 (default) = length_scale 1.0 Higher ACSS rate = lower length_scale (faster) Lower ACSS rate = higher length_scale (slower) Arguments: - - acssRate: Rate value from 0-99 + - acssRate: Rate value from 0-100 """ rate = acssRate if acssRate is not None else 50 - rate = max(0, min(99, rate)) - lengthScale = 2.0 - (rate / 99.0) * 1.5 - return max(0.5, min(2.0, lengthScale)) + rate = max(0.0, min(100.0, float(rate))) + if rate <= 50.0: + return 2.0 - (rate / 50.0) + + return 1.0 - ((rate - 50.0) / 50.0) * 0.75 def _mapPitch(self, acssPitch): """Map ACSS pitch (0-9) to pitch adjustment factor. @@ -614,7 +616,7 @@ class SpeechServer(speechserver.SpeechServer): rate = acss[ACSS.RATE] except KeyError: rate = 50 - acss[ACSS.RATE] = max(0, min(99, rate + delta)) + acss[ACSS.RATE] = max(0, min(100, rate + delta)) msg = f"PIPER: Rate set to {acss[ACSS.RATE]}" debug.printMessage(debug.LEVEL_INFO, msg, True) self.speak( diff --git a/src/cthulhu/settings.py b/src/cthulhu/settings.py index f073e85..323245b 100644 --- a/src/cthulhu/settings.py +++ b/src/cthulhu/settings.py @@ -42,6 +42,8 @@ userCustomizableSettings = [ "onlySpeakDisplayedText", "speechServerFactory", "speechServerInfo", + "hardwareSpeechDevice", + "hardwareSpeechBaudRate", "voices", "speechVerbosityLevel", "readFullRowInGUITable", @@ -265,9 +267,11 @@ activeProfile = ['Default', 'default'] profile = ['Default', 'default'] # Speech -speechFactoryModules = ["speechdispatcherfactory", "piperfactory"] +speechFactoryModules = ["speechdispatcherfactory", "piperfactory", "hardwarefactory"] speechServerFactory = "speechdispatcherfactory" speechServerInfo = None # None means let the factory decide. +hardwareSpeechDevice = "" +hardwareSpeechBaudRate = 9600 enableSpeech = True silenceSpeech = False enableTutorialMessages = False diff --git a/tests/test_hardwarefactory_regressions.py b/tests/test_hardwarefactory_regressions.py new file mode 100644 index 0000000..de69d67 --- /dev/null +++ b/tests/test_hardwarefactory_regressions.py @@ -0,0 +1,100 @@ +import os +import select +import sys +import time +import unittest +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import hardwarefactory +from cthulhu import settings + + +def read_available(fd, expectedLength, timeout=1.0): + deadline = time.monotonic() + timeout + data = b"" + while len(data) < expectedLength and time.monotonic() < deadline: + readable, _, _ = select.select([fd], [], [], 0.05) + if readable: + data += os.read(fd, 1024) + return data + + +class HardwareFactoryRegressionTests(unittest.TestCase): + def setUp(self): + self._oldDevice = settings.hardwareSpeechDevice + self._oldBaudRate = settings.hardwareSpeechBaudRate + hardwarefactory.SpeechServer.shutdownActiveServers() + + def tearDown(self): + hardwarefactory.SpeechServer.shutdownActiveServers() + settings.hardwareSpeechDevice = self._oldDevice + settings.hardwareSpeechBaudRate = self._oldBaudRate + + def test_lists_explicit_synth_choices_without_opening_serial_device(self): + settings.hardwareSpeechDevice = "" + + servers = hardwarefactory.SpeechServer.getSpeechServers() + + self.assertEqual( + ["litetalk", "doubletalk", "tripletalk", "dectalk"], + [server.getInfo()[1] for server in servers], + ) + self.assertEqual({}, hardwarefactory.SpeechServer._active_servers) + self.assertTrue(all(server._driver is None for server in servers)) + + def test_failed_initialization_is_not_cached(self): + settings.hardwareSpeechDevice = "" + + self.assertIsNone( + hardwarefactory.SpeechServer.getSpeechServer(["LiteTalk", "litetalk"]) + ) + self.assertEqual({}, hardwarefactory.SpeechServer._active_servers) + + masterFd, slaveFd = os.openpty() + try: + settings.hardwareSpeechDevice = os.ttyname(slaveFd) + server = hardwarefactory.SpeechServer.getSpeechServer( + ["LiteTalk", "litetalk"] + ) + + self.assertIsNotNone(server) + self.assertIsNotNone(server._driver) + self.assertIs( + server, + hardwarefactory.SpeechServer._active_servers.get("litetalk"), + ) + finally: + os.close(masterFd) + os.close(slaveFd) + + def test_explicit_synth_choices_write_expected_serial_bytes(self): + expectedBytes = { + "litetalk": b"Alias\r", + "doubletalk": b"Alias\r", + "tripletalk": b"Alias\r", + "dectalk": b"Alias\x01", + } + + for synthId, expected in expectedBytes.items(): + with self.subTest(synthId=synthId): + hardwarefactory.SpeechServer.shutdownActiveServers() + masterFd, slaveFd = os.openpty() + try: + settings.hardwareSpeechDevice = os.ttyname(slaveFd) + server = hardwarefactory.SpeechServer.getSpeechServer( + ["", synthId] + ) + + self.assertIsNotNone(server) + server.speak("Alias", interrupt=False) + self.assertEqual(expected, read_available(masterFd, len(expected))) + finally: + hardwarefactory.SpeechServer.shutdownActiveServers() + os.close(masterFd) + os.close(slaveFd) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_piperfactory_rate_mapping.py b/tests/test_piperfactory_rate_mapping.py new file mode 100644 index 0000000..99e073f --- /dev/null +++ b/tests/test_piperfactory_rate_mapping.py @@ -0,0 +1,30 @@ +import sys +import unittest +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import piperfactory + + +class PiperFactoryRateMappingTests(unittest.TestCase): + def setUp(self): + self.server = piperfactory.SpeechServer.__new__(piperfactory.SpeechServer) + + def test_default_rate_maps_to_native_piper_speed(self): + self.assertEqual(1.0, self.server._mapRate(50)) + + def test_rate_scale_uses_full_cthulhu_range(self): + self.assertEqual(2.0, self.server._mapRate(0)) + self.assertEqual(0.25, self.server._mapRate(100)) + + def test_high_screen_reader_rate_is_substantially_faster(self): + self.assertAlmostEqual(0.415, self.server._mapRate(89), places=3) + + def test_rate_values_are_clamped(self): + self.assertEqual(2.0, self.server._mapRate(-1)) + self.assertEqual(0.25, self.server._mapRate(101)) + + +if __name__ == "__main__": + unittest.main()