diff --git a/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py b/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py index e006b865..a32cdd8b 100644 --- a/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py +++ b/src/fenrirscreenreader/speechDriver/hardwareSerialDriver.py @@ -6,6 +6,7 @@ import glob import os +import select import termios import threading import tty @@ -28,6 +29,8 @@ class SpeakQueue(Queue): class hardware_serial_driver(speech_driver): cancel_command = b"" default_baud_rate = 9600 + hardware_probe_command = b"" + hardware_probe_timeout = 0.2 def __init__(self): speech_driver.__init__(self) @@ -43,14 +46,22 @@ class hardware_serial_driver(speech_driver): self.env = environment self._is_initialized = False settings_manager = self.env["runtime"]["SettingsManager"] - self.device = settings_manager.get_setting( - "speech", "hardware_device" + self.device = self._clean_device_setting( + settings_manager.get_setting("speech", "hardware_device") ) self.baud_rate = settings_manager.get_setting_as_int( "speech", "hardware_baud_rate" ) + self._debug( + "Hardware speech initialize: " + f"requested_device={self.device}, baud_rate={self.baud_rate}", + debug.DebugLevel.INFO, + on_any_level=True, + ) self._open_serial_port() self._is_initialized = self.serial_port is not None + if not self._is_initialized: + raise RuntimeError("hardware speech device is not available") if self._is_initialized: self._stop_worker = False self.worker_thread = threading.Thread( @@ -58,6 +69,12 @@ class hardware_serial_driver(speech_driver): ) self.worker_thread.start() + def _clean_device_setting(self, device): + if not isinstance(device, str): + return "auto" + device = device.split("#", 1)[0].split(";", 1)[0].strip() + return device or "auto" + def shutdown(self): if not self._is_initialized: return @@ -76,6 +93,12 @@ class hardware_serial_driver(speech_driver): self.cancel() if not isinstance(text, str) or text == "": return + self._debug( + "Hardware speech queued text: " + f"{len(text)} chars, queue_size={self.text_queue.qsize()}", + debug.DebugLevel.INFO, + on_any_level=True, + ) self.text_queue.put(text) def cancel(self): @@ -83,7 +106,7 @@ class hardware_serial_driver(speech_driver): return self.clear_buffer() if self.cancel_command: - self._write_bytes(self.cancel_command) + self._write_bytes(self.cancel_command, "cancel") def clear_buffer(self): if not self._is_initialized: @@ -95,37 +118,88 @@ class hardware_serial_driver(speech_driver): return if not isinstance(rate, float): return - self._write_bytes(self._rate_command(rate)) + self._write_bytes(self._rate_command(rate), "rate") def set_pitch(self, pitch): if not self._is_initialized: return if not isinstance(pitch, float): return - self._write_bytes(self._pitch_command(pitch)) + self._write_bytes(self._pitch_command(pitch), "pitch") def set_volume(self, volume): if not self._is_initialized: return if not isinstance(volume, float): return - self._write_bytes(self._volume_command(volume)) + self._write_bytes(self._volume_command(volume), "volume") def _worker(self): while not self._stop_worker: text = self.text_queue.get() if text is None: return - self._write_bytes(self._speak_bytes(text)) + try: + data = self._speak_bytes(text) + self._debug( + "Hardware speech worker prepared speech bytes: " + f"{len(data)} bytes", + debug.DebugLevel.INFO, + on_any_level=True, + ) + self._write_bytes(data, "speech") + except Exception as error: + self._debug( + f"Hardware speech worker failed: {error}", + debug.DebugLevel.ERROR, + on_any_level=True, + ) def _open_serial_port(self): - device = self._resolve_device(self.device) - if not device: + auto_detect = self.device == "auto" + devices = self._resolve_devices(self.device) + if not devices: self._debug( "Hardware speech device not found", debug.DebugLevel.ERROR, + on_any_level=True, ) return + + fallback_device = None + fallback_port = None + for device in devices: + port = self._open_configured_serial_port(device) + if port is None: + continue + if not auto_detect: + self._activate_serial_port( + device, port, probe_matched=False + ) + return + if self._probe_serial_port(port): + if fallback_port is not None: + self._close_port(fallback_port) + self._activate_serial_port(device, port, probe_matched=True) + return + if fallback_port is None: + fallback_device = device + fallback_port = port + else: + self._close_port(port) + + if fallback_port is not None: + self._debug( + "Hardware speech probe did not identify a synth; " + f"falling back to {fallback_device}", + debug.DebugLevel.WARNING, + on_any_level=True, + ) + self._activate_serial_port( + fallback_device, fallback_port, probe_matched=False + ) + + def _open_configured_serial_port(self, device): try: port = os.open(device, os.O_RDWR | os.O_NOCTTY) tty.setraw(port) @@ -138,51 +212,147 @@ class hardware_serial_driver(speech_driver): attrs[6][termios.VTIME] = 0 attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY) termios.tcsetattr(port, termios.TCSANOW, attrs) - self.serial_port = port - self.device = device - except OSError as error: + return port + except (OSError, termios.error) as error: self._debug( f"Hardware speech device open failed: {device}: {error}", debug.DebugLevel.ERROR, + on_any_level=True, ) - self.serial_port = None + return None + + def _activate_serial_port(self, device, port, probe_matched=False): + self.serial_port = port + self.device = device + probe_status = "matched" if probe_matched else "not matched" + self._debug( + "Hardware speech device opened: " + f"{device}, baud_rate={self.baud_rate}, probe={probe_status}", + debug.DebugLevel.INFO, + on_any_level=True, + ) + + def _probe_serial_port(self, port): + if not self.hardware_probe_command: + return True + try: + self._drain_serial_input(port) + os.write(port, self.hardware_probe_command) + readable, _, _ = select.select( + [port], [], [], self.hardware_probe_timeout + ) + if not readable: + self._debug( + "Hardware speech probe got no response", + debug.DebugLevel.INFO, + on_any_level=True, + ) + return False + response = os.read(port, 256) + self._debug( + "Hardware speech probe response: " + f"{self._format_bytes_preview(response)}", + debug.DebugLevel.INFO, + on_any_level=True, + ) + return self._is_hardware_probe_response(response) + except OSError as error: + self._debug( + f"Hardware speech probe failed: {error}", + debug.DebugLevel.WARNING, + on_any_level=True, + ) + return False + + def _drain_serial_input(self, port): + while True: + readable, _, _ = select.select([port], [], [], 0) + if not readable: + return + if not os.read(port, 256): + return + + def _is_hardware_probe_response(self, response): + return bool(response) def _close_serial_port(self): with self.lock: if self.serial_port is None: return - try: - os.close(self.serial_port) - except OSError as error: - self._debug( - f"Hardware speech device close failed: {error}", - debug.DebugLevel.WARNING, - ) - finally: - self.serial_port = None + self._close_port(self.serial_port) + self.serial_port = None - def _write_bytes(self, data): + def _close_port(self, port): + if port is None: + return + try: + os.close(port) + except OSError as error: + self._debug( + f"Hardware speech device close failed: {error}", + debug.DebugLevel.WARNING, + ) + + def _write_bytes(self, data, description="data"): if not data: return with self.lock: if self.serial_port is None: return try: - os.write(self.serial_port, data) + total_written = 0 + while total_written < len(data): + bytes_written = os.write( + self.serial_port, data[total_written:] + ) + if bytes_written == 0: + raise OSError("serial write returned 0 bytes") + total_written += bytes_written + preview = self._format_bytes_preview(data) + self._debug( + "Hardware speech wrote " + f"{total_written} {description} bytes: {preview}", + debug.DebugLevel.INFO, + on_any_level=True, + ) except OSError as error: self._debug( f"Hardware speech write failed: {error}", debug.DebugLevel.ERROR, + on_any_level=True, ) - def _resolve_device(self, device): + def _resolve_devices(self, device): if device and device != "auto": - return device - for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*"): + self._debug( + f"Hardware speech using configured device: {device}", + debug.DebugLevel.INFO, + on_any_level=True, + ) + return [device] + devices = [] + for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"): matches = sorted(glob.glob(pattern)) + self._debug( + f"Hardware speech auto scan {pattern}: {matches}", + debug.DebugLevel.INFO, + on_any_level=True, + ) if matches: - return matches[0] - return "" + if len(matches) > 1: + self._debug( + "Hardware speech auto found multiple devices for " + f"{pattern}: {matches}; probing in order", + debug.DebugLevel.WARNING, + on_any_level=True, + ) + self._debug( + f"Hardware speech auto candidate devices: {matches}", + debug.DebugLevel.INFO, + on_any_level=True, + ) + devices.extend(matches) + return devices def _termios_baud_rate(self, baud_rate): baud_name = f"B{baud_rate}" @@ -205,10 +375,23 @@ class hardware_serial_driver(speech_driver): value = max(0.0, min(1.0, value)) return int(round(minimum + value * (maximum - minimum))) - def _debug(self, message, level): + def _format_bytes_preview(self, data, limit=32): + preview = data[:limit] + hex_preview = " ".join(f"{byte:02x}" for byte in preview) + ascii_preview = "".join( + chr(byte) if 0x20 <= byte <= 0x7E else "." + for byte in preview + ) + suffix = "" if len(data) <= limit else " ..." + return ( + f"hex=[{hex_preview}{suffix}] " + f"ascii=[{ascii_preview}{suffix}]" + ) + + def _debug(self, message, level, on_any_level=False): try: self.env["runtime"]["DebugManager"].write_debug_out( - message, level + message, level, on_any_level=on_any_level ) except Exception: pass diff --git a/src/fenrirscreenreader/speechDriver/litetalkDriver.py b/src/fenrirscreenreader/speechDriver/litetalkDriver.py index 6dadeda2..75397781 100644 --- a/src/fenrirscreenreader/speechDriver/litetalkDriver.py +++ b/src/fenrirscreenreader/speechDriver/litetalkDriver.py @@ -11,6 +11,7 @@ from fenrirscreenreader.speechDriver.hardwareSerialDriver import ( class driver(hardware_serial_driver): cancel_command = b"\x18" + hardware_probe_command = b"\x01I" def _speak_bytes(self, text): return self._clean_text(text).encode("ascii", errors="replace") + b"\r" @@ -26,3 +27,10 @@ class driver(hardware_serial_driver): def _setting_command(self, value, command): return b"\x01" + str(value).encode("ascii") + command + + def _is_hardware_probe_response(self, response): + response_text = response.decode("ascii", errors="ignore").lower() + return any( + token in response_text + for token in ("litetalk", "lite talk", "rpitalk", "doubletalk") + ) diff --git a/tests/unit/test_hardware_speech_drivers.py b/tests/unit/test_hardware_speech_drivers.py index 8f5fce86..100b4947 100644 --- a/tests/unit/test_hardware_speech_drivers.py +++ b/tests/unit/test_hardware_speech_drivers.py @@ -1,6 +1,8 @@ import os import select +import termios import time +from unittest.mock import ANY from unittest.mock import Mock import pytest @@ -103,6 +105,280 @@ def test_litetalk_driver_writes_settings_and_cancel(serial_pair): speech_driver.shutdown() +def test_auto_device_detection_includes_classic_serial( + monkeypatch, serial_pair +): + master_fd, slave_name = serial_pair + + def fake_glob(pattern): + if pattern == "/dev/ttyS*": + return [slave_name] + return [] + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob", + fake_glob, + ) + speech_driver = litetalkDriver.driver() + speech_driver.initialize(build_environment("auto")) + try: + assert speech_driver.device == slave_name + speech_driver.speak("Serial") + assert read_available(master_fd, 9) == b"\x01ISerial\r" + finally: + speech_driver.shutdown() + + +def test_configured_device_strips_inline_comment(serial_pair): + master_fd, slave_name = serial_pair + device_setting = f"{slave_name} # built-in serial port" + speech_driver = litetalkDriver.driver() + speech_driver.initialize(build_environment(device_setting)) + try: + assert speech_driver.device == slave_name + speech_driver.speak("Specific") + assert read_available(master_fd, 9) == b"Specific\r" + finally: + speech_driver.shutdown() + + +def test_auto_device_detection_prefers_probe_response(monkeypatch): + opened_ports = [] + closed_ports = [] + writes = {} + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob", + lambda pattern: ["/dev/ttyUSB0", "/dev/ttyUSB1"] + if pattern == "/dev/ttyUSB*" + else [], + ) + + def fake_open(device, flags): + port = 100 + len(opened_ports) + opened_ports.append((device, port)) + return port + + def fake_write(port, data): + writes.setdefault(port, b"") + writes[port] += data + return len(data) + + def fake_select(readable, writable, exceptional, timeout): + port = readable[0] + if port == 101 and writes.get(port) == b"\x01I": + return readable, writable, exceptional + return [], writable, exceptional + + def fake_read(port, size): + if port == 101: + return b"RPItalk 1.3\r" + return b"" + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open", + fake_open, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close", + lambda port: closed_ports.append(port), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write", + fake_write, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.read", + fake_read, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select", + fake_select, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr", + lambda port: [0, 0, 0, 0, 0, 0, [0] * 32], + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr", + lambda port, when, attrs: None, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw", + lambda port: None, + ) + + speech_driver = litetalkDriver.driver() + speech_driver.initialize(build_environment("auto")) + try: + assert opened_ports == [("/dev/ttyUSB0", 100), ("/dev/ttyUSB1", 101)] + assert writes == {100: b"\x01I", 101: b"\x01I"} + assert closed_ports == [100] + assert speech_driver.device == "/dev/ttyUSB1" + finally: + speech_driver.shutdown() + + +def test_auto_device_detection_falls_back_without_probe_response( + monkeypatch +): + opened_ports = [] + closed_ports = [] + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob", + lambda pattern: ["/dev/ttyUSB0", "/dev/ttyUSB1"] + if pattern == "/dev/ttyUSB*" + else [], + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open", + lambda device, flags: opened_ports.append(device) or len(opened_ports), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close", + lambda port: closed_ports.append(port), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write", + lambda port, data: len(data), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select", + lambda readable, writable, exceptional, timeout: ( + [], + writable, + exceptional, + ), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr", + lambda port: [0, 0, 0, 0, 0, 0, [0] * 32], + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr", + lambda port, when, attrs: None, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw", + lambda port: None, + ) + + speech_driver = litetalkDriver.driver() + speech_driver.initialize(build_environment("auto")) + try: + assert opened_ports == ["/dev/ttyUSB0", "/dev/ttyUSB1"] + assert closed_ports == [2] + assert speech_driver.device == "/dev/ttyUSB0" + finally: + speech_driver.shutdown() + + +def test_auto_device_detection_skips_termios_failures(monkeypatch): + opened_ports = [] + closed_ports = [] + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob", + lambda pattern: ["/dev/ttyUSB0"] + if pattern == "/dev/ttyUSB*" + else ["/dev/ttyS0"] + if pattern == "/dev/ttyS*" + else [], + ) + + def fake_open(device, flags): + port = 100 + len(opened_ports) + opened_ports.append((device, port)) + return port + + def fake_tcgetattr(port): + if port == 101: + raise termios.error(5, "Input/output error") + return [0, 0, 0, 0, 0, 0, [0] * 32] + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open", + fake_open, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close", + lambda port: closed_ports.append(port), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write", + lambda port, data: len(data), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select", + lambda readable, writable, exceptional, timeout: ( + [], + writable, + exceptional, + ), + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr", + fake_tcgetattr, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr", + lambda port, when, attrs: None, + ) + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw", + lambda port: None, + ) + + speech_driver = litetalkDriver.driver() + speech_driver.initialize(build_environment("auto")) + try: + assert opened_ports == [("/dev/ttyUSB0", 100), ("/dev/ttyS0", 101)] + assert speech_driver.device == "/dev/ttyUSB0" + finally: + speech_driver.shutdown() + + +def test_auto_device_detection_fails_when_no_serial_device(monkeypatch): + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob", + lambda pattern: [], + ) + speech_driver = litetalkDriver.driver() + + with pytest.raises(RuntimeError, match="hardware speech device"): + speech_driver.initialize(build_environment("auto")) + + debug_manager = speech_driver.env["runtime"]["DebugManager"] + debug_manager.write_debug_out.assert_called_with( + "Hardware speech device not found", + ANY, + on_any_level=True, + ) + + +def test_hardware_driver_retries_partial_serial_writes(monkeypatch): + speech_driver = litetalkDriver.driver() + speech_driver.env = build_environment("/dev/ttyUSB0") + speech_driver.serial_port = 12 + written_chunks = [] + + def fake_write(port, data): + assert port == 12 + chunk = data[:2] + written_chunks.append(chunk) + return len(chunk) + + monkeypatch.setattr( + "fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write", + fake_write, + ) + + speech_driver._write_bytes(b"abcdef", "speech") + + assert written_chunks == [b"ab", b"cd", b"ef"] + + @pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver]) def test_litetalk_compatible_alias_drivers(driver_class, serial_pair): speech_driver, master_fd = initialized_driver(driver_class, serial_pair)