From 5b7c08260ac7f0a61d7da34187269df594f92263 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Sat, 23 May 2026 17:23:52 -0400 Subject: [PATCH] Another iteration based on feedback from hardware synth testing. --- .../speechDriver/hardwareSerialDriver.py | 85 +++++++++++++++++-- tests/unit/test_hardware_speech_drivers.py | 22 +++++ 2 files changed, 99 insertions(+), 8 deletions(-) diff --git a/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py b/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py index d733d270..29d0fbb5 100644 --- a/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py +++ b/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py @@ -49,6 +49,12 @@ class hardware_serial_driver(speech_driver): self.baud_rate = settings_manager.get_setting_as_int( "speech", "hardware_baud_rate" ) + self._debug( + "Hardware speech initialize: " + f"requested_device={self.device}, baud_rate={self.baud_rate}", + debug.DebugLevel.INFO, + on_any_level=True, + ) self._open_serial_port() self._is_initialized = self.serial_port is not None if not self._is_initialized: @@ -78,6 +84,12 @@ class hardware_serial_driver(speech_driver): self.cancel() if not isinstance(text, str) or text == "": return + self._debug( + "Hardware speech queued text: " + f"{len(text)} chars, queue_size={self.text_queue.qsize()}", + debug.DebugLevel.INFO, + on_any_level=True, + ) self.text_queue.put(text) def cancel(self): @@ -85,7 +97,7 @@ class hardware_serial_driver(speech_driver): return self.clear_buffer() if self.cancel_command: - self._write_bytes(self.cancel_command) + self._write_bytes(self.cancel_command, "cancel") def clear_buffer(self): if not self._is_initialized: @@ -97,28 +109,42 @@ class hardware_serial_driver(speech_driver): return if not isinstance(rate, float): return - self._write_bytes(self._rate_command(rate)) + self._write_bytes(self._rate_command(rate), "rate") def set_pitch(self, pitch): if not self._is_initialized: return if not isinstance(pitch, float): return - self._write_bytes(self._pitch_command(pitch)) + self._write_bytes(self._pitch_command(pitch), "pitch") def set_volume(self, volume): if not self._is_initialized: return if not isinstance(volume, float): return - self._write_bytes(self._volume_command(volume)) + self._write_bytes(self._volume_command(volume), "volume") def _worker(self): while not self._stop_worker: text = self.text_queue.get() if text is None: return - self._write_bytes(self._speak_bytes(text)) + try: + data = self._speak_bytes(text) + self._debug( + "Hardware speech worker prepared speech bytes: " + f"{len(data)} bytes", + debug.DebugLevel.INFO, + on_any_level=True, + ) + self._write_bytes(data, "speech") + except Exception as error: + self._debug( + f"Hardware speech worker failed: {error}", + debug.DebugLevel.ERROR, + on_any_level=True, + ) def _open_serial_port(self): device = self._resolve_device(self.device) @@ -144,7 +170,8 @@ class hardware_serial_driver(speech_driver): self.serial_port = port self.device = device self._debug( - f"Hardware speech device opened: {device}", + "Hardware speech device opened: " + f"{device}, baud_rate={self.baud_rate}", debug.DebugLevel.INFO, on_any_level=True, ) @@ -170,14 +197,28 @@ class hardware_serial_driver(speech_driver): finally: self.serial_port = None - def _write_bytes(self, data): + def _write_bytes(self, data, description="data"): if not data: return with self.lock: if self.serial_port is None: return try: - os.write(self.serial_port, data) + total_written = 0 + while total_written < len(data): + bytes_written = os.write( + self.serial_port, data[total_written:] + ) + if bytes_written == 0: + raise OSError("serial write returned 0 bytes") + total_written += bytes_written + preview = self._format_bytes_preview(data) + self._debug( + "Hardware speech wrote " + f"{total_written} {description} bytes: {preview}", + debug.DebugLevel.INFO, + on_any_level=True, + ) except OSError as error: self._debug( f"Hardware speech write failed: {error}", @@ -187,10 +228,25 @@ class hardware_serial_driver(speech_driver): def _resolve_device(self, device): if device and device != "auto": + self._debug( + f"Hardware speech using configured device: {device}", + debug.DebugLevel.INFO, + on_any_level=True, + ) return device for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"): matches = sorted(glob.glob(pattern)) + self._debug( + f"Hardware speech auto scan {pattern}: {matches}", + debug.DebugLevel.INFO, + on_any_level=True, + ) if matches: + self._debug( + f"Hardware speech auto selected device: {matches[0]}", + debug.DebugLevel.INFO, + on_any_level=True, + ) return matches[0] return "" @@ -215,6 +271,19 @@ class hardware_serial_driver(speech_driver): value = max(0.0, min(1.0, value)) return int(round(minimum + value * (maximum - minimum))) + def _format_bytes_preview(self, data, limit=32): + preview = data[:limit] + hex_preview = " ".join(f"{byte:02x}" for byte in preview) + ascii_preview = "".join( + chr(byte) if 0x20 <= byte <= 0x7E else "." + for byte in preview + ) + suffix = "" if len(data) <= limit else " ..." + return ( + f"hex=[{hex_preview}{suffix}] " + f"ascii=[{ascii_preview}{suffix}]" + ) + def _debug(self, message, level, on_any_level=False): try: self.env["runtime"]["DebugManager"].write_debug_out( diff --git a/tests/unit/test_hardware_speech_drivers.py b/tests/unit/test_hardware_speech_drivers.py index e4a5a8b5..3650fdc7 100644 --- a/tests/unit/test_hardware_speech_drivers.py +++ b/tests/unit/test_hardware_speech_drivers.py @@ -146,6 +146,28 @@ def test_auto_device_detection_fails_when_no_serial_device(monkeypatch): ) +def test_hardware_driver_retries_partial_serial_writes(monkeypatch): + speech_driver = litetalkDriver.driver() + speech_driver.env = build_environment("/dev/ttyUSB0") + speech_driver.serial_port = 12 + written_chunks = [] + + def fake_write(port, data): + assert port == 12 + chunk = data[:2] + written_chunks.append(chunk) + return len(chunk) + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write", + fake_write, + ) + + speech_driver._write_bytes(b"abcdef", "speech") + + assert written_chunks == [b"ab", b"cd", b"ef"] + + @pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver]) def test_litetalk_compatible_alias_drivers(driver_class, serial_pair): speech_driver, master_fd = initialized_driver(driver_class, serial_pair)