Another iteration based on feedback from hardware synth testing.

This commit is contained in:
Storm Dragon
2026-05-23 17:23:52 -04:00
parent d4b2fec1db
commit 5b7c08260a
2 changed files with 99 additions and 8 deletions
@@ -49,6 +49,12 @@ class hardware_serial_driver(speech_driver):
self.baud_rate = settings_manager.get_setting_as_int( self.baud_rate = settings_manager.get_setting_as_int(
"speech", "hardware_baud_rate" "speech", "hardware_baud_rate"
) )
self._debug(
"Hardware speech initialize: "
f"requested_device={self.device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO,
on_any_level=True,
)
self._open_serial_port() self._open_serial_port()
self._is_initialized = self.serial_port is not None self._is_initialized = self.serial_port is not None
if not self._is_initialized: if not self._is_initialized:
@@ -78,6 +84,12 @@ class hardware_serial_driver(speech_driver):
self.cancel() self.cancel()
if not isinstance(text, str) or text == "": if not isinstance(text, str) or text == "":
return return
self._debug(
"Hardware speech queued text: "
f"{len(text)} chars, queue_size={self.text_queue.qsize()}",
debug.DebugLevel.INFO,
on_any_level=True,
)
self.text_queue.put(text) self.text_queue.put(text)
def cancel(self): def cancel(self):
@@ -85,7 +97,7 @@ class hardware_serial_driver(speech_driver):
return return
self.clear_buffer() self.clear_buffer()
if self.cancel_command: if self.cancel_command:
self._write_bytes(self.cancel_command) self._write_bytes(self.cancel_command, "cancel")
def clear_buffer(self): def clear_buffer(self):
if not self._is_initialized: if not self._is_initialized:
@@ -97,28 +109,42 @@ class hardware_serial_driver(speech_driver):
return return
if not isinstance(rate, float): if not isinstance(rate, float):
return return
self._write_bytes(self._rate_command(rate)) self._write_bytes(self._rate_command(rate), "rate")
def set_pitch(self, pitch): def set_pitch(self, pitch):
if not self._is_initialized: if not self._is_initialized:
return return
if not isinstance(pitch, float): if not isinstance(pitch, float):
return return
self._write_bytes(self._pitch_command(pitch)) self._write_bytes(self._pitch_command(pitch), "pitch")
def set_volume(self, volume): def set_volume(self, volume):
if not self._is_initialized: if not self._is_initialized:
return return
if not isinstance(volume, float): if not isinstance(volume, float):
return return
self._write_bytes(self._volume_command(volume)) self._write_bytes(self._volume_command(volume), "volume")
def _worker(self): def _worker(self):
while not self._stop_worker: while not self._stop_worker:
text = self.text_queue.get() text = self.text_queue.get()
if text is None: if text is None:
return return
self._write_bytes(self._speak_bytes(text)) try:
data = self._speak_bytes(text)
self._debug(
"Hardware speech worker prepared speech bytes: "
f"{len(data)} bytes",
debug.DebugLevel.INFO,
on_any_level=True,
)
self._write_bytes(data, "speech")
except Exception as error:
self._debug(
f"Hardware speech worker failed: {error}",
debug.DebugLevel.ERROR,
on_any_level=True,
)
def _open_serial_port(self): def _open_serial_port(self):
device = self._resolve_device(self.device) device = self._resolve_device(self.device)
@@ -144,7 +170,8 @@ class hardware_serial_driver(speech_driver):
self.serial_port = port self.serial_port = port
self.device = device self.device = device
self._debug( self._debug(
f"Hardware speech device opened: {device}", "Hardware speech device opened: "
f"{device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO, debug.DebugLevel.INFO,
on_any_level=True, on_any_level=True,
) )
@@ -170,14 +197,28 @@ class hardware_serial_driver(speech_driver):
finally: finally:
self.serial_port = None self.serial_port = None
def _write_bytes(self, data): def _write_bytes(self, data, description="data"):
if not data: if not data:
return return
with self.lock: with self.lock:
if self.serial_port is None: if self.serial_port is None:
return return
try: try:
os.write(self.serial_port, data) total_written = 0
while total_written < len(data):
bytes_written = os.write(
self.serial_port, data[total_written:]
)
if bytes_written == 0:
raise OSError("serial write returned 0 bytes")
total_written += bytes_written
preview = self._format_bytes_preview(data)
self._debug(
"Hardware speech wrote "
f"{total_written} {description} bytes: {preview}",
debug.DebugLevel.INFO,
on_any_level=True,
)
except OSError as error: except OSError as error:
self._debug( self._debug(
f"Hardware speech write failed: {error}", f"Hardware speech write failed: {error}",
@@ -187,10 +228,25 @@ class hardware_serial_driver(speech_driver):
def _resolve_device(self, device): def _resolve_device(self, device):
if device and device != "auto": if device and device != "auto":
self._debug(
f"Hardware speech using configured device: {device}",
debug.DebugLevel.INFO,
on_any_level=True,
)
return device return device
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"): for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"):
matches = sorted(glob.glob(pattern)) matches = sorted(glob.glob(pattern))
self._debug(
f"Hardware speech auto scan {pattern}: {matches}",
debug.DebugLevel.INFO,
on_any_level=True,
)
if matches: if matches:
self._debug(
f"Hardware speech auto selected device: {matches[0]}",
debug.DebugLevel.INFO,
on_any_level=True,
)
return matches[0] return matches[0]
return "" return ""
@@ -215,6 +271,19 @@ class hardware_serial_driver(speech_driver):
value = max(0.0, min(1.0, value)) value = max(0.0, min(1.0, value))
return int(round(minimum + value * (maximum - minimum))) return int(round(minimum + value * (maximum - minimum)))
def _format_bytes_preview(self, data, limit=32):
preview = data[:limit]
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
ascii_preview = "".join(
chr(byte) if 0x20 <= byte <= 0x7E else "."
for byte in preview
)
suffix = "" if len(data) <= limit else " ..."
return (
f"hex=[{hex_preview}{suffix}] "
f"ascii=[{ascii_preview}{suffix}]"
)
def _debug(self, message, level, on_any_level=False): def _debug(self, message, level, on_any_level=False):
try: try:
self.env["runtime"]["DebugManager"].write_debug_out( self.env["runtime"]["DebugManager"].write_debug_out(
@@ -146,6 +146,28 @@ def test_auto_device_detection_fails_when_no_serial_device(monkeypatch):
) )
def test_hardware_driver_retries_partial_serial_writes(monkeypatch):
speech_driver = litetalkDriver.driver()
speech_driver.env = build_environment("/dev/ttyUSB0")
speech_driver.serial_port = 12
written_chunks = []
def fake_write(port, data):
assert port == 12
chunk = data[:2]
written_chunks.append(chunk)
return len(chunk)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
fake_write,
)
speech_driver._write_bytes(b"abcdef", "speech")
assert written_chunks == [b"ab", b"cd", b"ef"]
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver]) @pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair): def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
speech_driver, master_fd = initialized_driver(driver_class, serial_pair) speech_driver, master_fd = initialized_driver(driver_class, serial_pair)