More hw synth refinement.

This commit is contained in:
Storm Dragon
2026-05-23 17:39:16 -04:00
parent 5b7c08260a
commit 089850ac18
3 changed files with 264 additions and 26 deletions
@@ -6,6 +6,7 @@
import glob import glob
import os import os
import select
import termios import termios
import threading import threading
import tty import tty
@@ -28,6 +29,8 @@ class SpeakQueue(Queue):
class hardware_serial_driver(speech_driver): class hardware_serial_driver(speech_driver):
cancel_command = b"" cancel_command = b""
default_baud_rate = 9600 default_baud_rate = 9600
hardware_probe_command = b""
hardware_probe_timeout = 0.2
def __init__(self): def __init__(self):
speech_driver.__init__(self) speech_driver.__init__(self)
@@ -147,14 +150,50 @@ class hardware_serial_driver(speech_driver):
) )
def _open_serial_port(self): def _open_serial_port(self):
device = self._resolve_device(self.device) auto_detect = self.device == "auto"
if not device: devices = self._resolve_devices(self.device)
if not devices:
self._debug( self._debug(
"Hardware speech device not found", "Hardware speech device not found",
debug.DebugLevel.ERROR, debug.DebugLevel.ERROR,
on_any_level=True, on_any_level=True,
) )
return return
fallback_device = None
fallback_port = None
for device in devices:
port = self._open_configured_serial_port(device)
if port is None:
continue
if not auto_detect:
self._activate_serial_port(
device, port, probe_matched=False
)
return
if self._probe_serial_port(port):
if fallback_port is not None:
self._close_port(fallback_port)
self._activate_serial_port(device, port, probe_matched=True)
return
if fallback_port is None:
fallback_device = device
fallback_port = port
else:
self._close_port(port)
if fallback_port is not None:
self._debug(
"Hardware speech probe did not identify a synth; "
f"falling back to {fallback_device}",
debug.DebugLevel.WARNING,
on_any_level=True,
)
self._activate_serial_port(
fallback_device, fallback_port, probe_matched=False
)
def _open_configured_serial_port(self, device):
try: try:
port = os.open(device, os.O_RDWR | os.O_NOCTTY) port = os.open(device, os.O_RDWR | os.O_NOCTTY)
tty.setraw(port) tty.setraw(port)
@@ -167,35 +206,86 @@ class hardware_serial_driver(speech_driver):
attrs[6][termios.VTIME] = 0 attrs[6][termios.VTIME] = 0
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY) attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
termios.tcsetattr(port, termios.TCSANOW, attrs) termios.tcsetattr(port, termios.TCSANOW, attrs)
self.serial_port = port return port
self.device = device
self._debug(
"Hardware speech device opened: "
f"{device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO,
on_any_level=True,
)
except OSError as error: except OSError as error:
self._debug( self._debug(
f"Hardware speech device open failed: {device}: {error}", f"Hardware speech device open failed: {device}: {error}",
debug.DebugLevel.ERROR, debug.DebugLevel.ERROR,
on_any_level=True, on_any_level=True,
) )
self.serial_port = None return None
def _activate_serial_port(self, device, port, probe_matched=False):
self.serial_port = port
self.device = device
probe_status = "matched" if probe_matched else "not matched"
self._debug(
"Hardware speech device opened: "
f"{device}, baud_rate={self.baud_rate}, probe={probe_status}",
debug.DebugLevel.INFO,
on_any_level=True,
)
def _probe_serial_port(self, port):
if not self.hardware_probe_command:
return True
try:
self._drain_serial_input(port)
os.write(port, self.hardware_probe_command)
readable, _, _ = select.select(
[port], [], [], self.hardware_probe_timeout
)
if not readable:
self._debug(
"Hardware speech probe got no response",
debug.DebugLevel.INFO,
on_any_level=True,
)
return False
response = os.read(port, 256)
self._debug(
"Hardware speech probe response: "
f"{self._format_bytes_preview(response)}",
debug.DebugLevel.INFO,
on_any_level=True,
)
return self._is_hardware_probe_response(response)
except OSError as error:
self._debug(
f"Hardware speech probe failed: {error}",
debug.DebugLevel.WARNING,
on_any_level=True,
)
return False
def _drain_serial_input(self, port):
while True:
readable, _, _ = select.select([port], [], [], 0)
if not readable:
return
if not os.read(port, 256):
return
def _is_hardware_probe_response(self, response):
return bool(response)
def _close_serial_port(self): def _close_serial_port(self):
with self.lock: with self.lock:
if self.serial_port is None: if self.serial_port is None:
return return
self._close_port(self.serial_port)
self.serial_port = None
def _close_port(self, port):
if port is None:
return
try: try:
os.close(self.serial_port) os.close(port)
except OSError as error: except OSError as error:
self._debug( self._debug(
f"Hardware speech device close failed: {error}", f"Hardware speech device close failed: {error}",
debug.DebugLevel.WARNING, debug.DebugLevel.WARNING,
) )
finally:
self.serial_port = None
def _write_bytes(self, data, description="data"): def _write_bytes(self, data, description="data"):
if not data: if not data:
@@ -226,14 +316,15 @@ class hardware_serial_driver(speech_driver):
on_any_level=True, on_any_level=True,
) )
def _resolve_device(self, device): def _resolve_devices(self, device):
if device and device != "auto": if device and device != "auto":
self._debug( self._debug(
f"Hardware speech using configured device: {device}", f"Hardware speech using configured device: {device}",
debug.DebugLevel.INFO, debug.DebugLevel.INFO,
on_any_level=True, on_any_level=True,
) )
return device return [device]
devices = []
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"): for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"):
matches = sorted(glob.glob(pattern)) matches = sorted(glob.glob(pattern))
self._debug( self._debug(
@@ -242,13 +333,20 @@ class hardware_serial_driver(speech_driver):
on_any_level=True, on_any_level=True,
) )
if matches: if matches:
if len(matches) > 1:
self._debug( self._debug(
f"Hardware speech auto selected device: {matches[0]}", "Hardware speech auto found multiple devices for "
f"{pattern}: {matches}; probing in order",
debug.DebugLevel.WARNING,
on_any_level=True,
)
self._debug(
f"Hardware speech auto candidate devices: {matches}",
debug.DebugLevel.INFO, debug.DebugLevel.INFO,
on_any_level=True, on_any_level=True,
) )
return matches[0] devices.extend(matches)
return "" return devices
def _termios_baud_rate(self, baud_rate): def _termios_baud_rate(self, baud_rate):
baud_name = f"B{baud_rate}" baud_name = f"B{baud_rate}"
@@ -11,6 +11,7 @@ from fenrirscreenreader.speechDriver.hardwareSerialDriver import (
class driver(hardware_serial_driver): class driver(hardware_serial_driver):
cancel_command = b"\x18" cancel_command = b"\x18"
hardware_probe_command = b"\x01I"
def _speak_bytes(self, text): def _speak_bytes(self, text):
return self._clean_text(text).encode("ascii", errors="replace") + b"\r" return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
@@ -26,3 +27,10 @@ class driver(hardware_serial_driver):
def _setting_command(self, value, command): def _setting_command(self, value, command):
return b"\x01" + str(value).encode("ascii") + command return b"\x01" + str(value).encode("ascii") + command
def _is_hardware_probe_response(self, response):
response_text = response.decode("ascii", errors="ignore").lower()
return any(
token in response_text
for token in ("litetalk", "lite talk", "rpitalk", "doubletalk")
)
+133 -1
View File
@@ -123,7 +123,139 @@ def test_auto_device_detection_includes_classic_serial(
try: try:
assert speech_driver.device == slave_name assert speech_driver.device == slave_name
speech_driver.speak("Serial") speech_driver.speak("Serial")
assert read_available(master_fd, 7) == b"Serial\r" assert read_available(master_fd, 9) == b"\x01ISerial\r"
finally:
speech_driver.shutdown()
def test_auto_device_detection_prefers_probe_response(monkeypatch):
opened_ports = []
closed_ports = []
writes = {}
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob",
lambda pattern: ["/dev/ttyUSB0", "/dev/ttyUSB1"]
if pattern == "/dev/ttyUSB*"
else [],
)
def fake_open(device, flags):
port = 100 + len(opened_ports)
opened_ports.append((device, port))
return port
def fake_write(port, data):
writes.setdefault(port, b"")
writes[port] += data
return len(data)
def fake_select(readable, writable, exceptional, timeout):
port = readable[0]
if port == 101 and writes.get(port) == b"\x01I":
return readable, writable, exceptional
return [], writable, exceptional
def fake_read(port, size):
if port == 101:
return b"RPItalk 1.3\r"
return b""
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open",
fake_open,
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close",
lambda port: closed_ports.append(port),
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
fake_write,
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.read",
fake_read,
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select",
fake_select,
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr",
lambda port: [0, 0, 0, 0, 0, 0, [0] * 32],
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr",
lambda port, when, attrs: None,
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw",
lambda port: None,
)
speech_driver = litetalkDriver.driver()
speech_driver.initialize(build_environment("auto"))
try:
assert opened_ports == [("/dev/ttyUSB0", 100), ("/dev/ttyUSB1", 101)]
assert writes == {100: b"\x01I", 101: b"\x01I"}
assert closed_ports == [100]
assert speech_driver.device == "/dev/ttyUSB1"
finally:
speech_driver.shutdown()
def test_auto_device_detection_falls_back_without_probe_response(
monkeypatch
):
opened_ports = []
closed_ports = []
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob",
lambda pattern: ["/dev/ttyUSB0", "/dev/ttyUSB1"]
if pattern == "/dev/ttyUSB*"
else [],
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open",
lambda device, flags: opened_ports.append(device) or len(opened_ports),
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close",
lambda port: closed_ports.append(port),
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
lambda port, data: len(data),
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select",
lambda readable, writable, exceptional, timeout: (
[],
writable,
exceptional,
),
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr",
lambda port: [0, 0, 0, 0, 0, 0, [0] * 32],
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr",
lambda port, when, attrs: None,
)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw",
lambda port: None,
)
speech_driver = litetalkDriver.driver()
speech_driver.initialize(build_environment("auto"))
try:
assert opened_ports == ["/dev/ttyUSB0", "/dev/ttyUSB1"]
assert closed_ports == [2]
assert speech_driver.device == "/dev/ttyUSB0"
finally: finally:
speech_driver.shutdown() speech_driver.shutdown()