Hardware synth code now verified working. New release candidate.
This commit is contained in:
@@ -6,6 +6,7 @@
|
|||||||
|
|
||||||
import glob
|
import glob
|
||||||
import os
|
import os
|
||||||
|
import select
|
||||||
import termios
|
import termios
|
||||||
import threading
|
import threading
|
||||||
import tty
|
import tty
|
||||||
@@ -28,6 +29,8 @@ class SpeakQueue(Queue):
|
|||||||
class hardware_serial_driver(speech_driver):
|
class hardware_serial_driver(speech_driver):
|
||||||
cancel_command = b""
|
cancel_command = b""
|
||||||
default_baud_rate = 9600
|
default_baud_rate = 9600
|
||||||
|
hardware_probe_command = b""
|
||||||
|
hardware_probe_timeout = 0.2
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
speech_driver.__init__(self)
|
speech_driver.__init__(self)
|
||||||
@@ -43,14 +46,22 @@ class hardware_serial_driver(speech_driver):
|
|||||||
self.env = environment
|
self.env = environment
|
||||||
self._is_initialized = False
|
self._is_initialized = False
|
||||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||||
self.device = settings_manager.get_setting(
|
self.device = self._clean_device_setting(
|
||||||
"speech", "hardware_device"
|
settings_manager.get_setting("speech", "hardware_device")
|
||||||
)
|
)
|
||||||
self.baud_rate = settings_manager.get_setting_as_int(
|
self.baud_rate = settings_manager.get_setting_as_int(
|
||||||
"speech", "hardware_baud_rate"
|
"speech", "hardware_baud_rate"
|
||||||
)
|
)
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech initialize: "
|
||||||
|
f"requested_device={self.device}, baud_rate={self.baud_rate}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
self._open_serial_port()
|
self._open_serial_port()
|
||||||
self._is_initialized = self.serial_port is not None
|
self._is_initialized = self.serial_port is not None
|
||||||
|
if not self._is_initialized:
|
||||||
|
raise RuntimeError("hardware speech device is not available")
|
||||||
if self._is_initialized:
|
if self._is_initialized:
|
||||||
self._stop_worker = False
|
self._stop_worker = False
|
||||||
self.worker_thread = threading.Thread(
|
self.worker_thread = threading.Thread(
|
||||||
@@ -58,6 +69,12 @@ class hardware_serial_driver(speech_driver):
|
|||||||
)
|
)
|
||||||
self.worker_thread.start()
|
self.worker_thread.start()
|
||||||
|
|
||||||
|
def _clean_device_setting(self, device):
|
||||||
|
if not isinstance(device, str):
|
||||||
|
return "auto"
|
||||||
|
device = device.split("#", 1)[0].split(";", 1)[0].strip()
|
||||||
|
return device or "auto"
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
if not self._is_initialized:
|
if not self._is_initialized:
|
||||||
return
|
return
|
||||||
@@ -76,6 +93,12 @@ class hardware_serial_driver(speech_driver):
|
|||||||
self.cancel()
|
self.cancel()
|
||||||
if not isinstance(text, str) or text == "":
|
if not isinstance(text, str) or text == "":
|
||||||
return
|
return
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech queued text: "
|
||||||
|
f"{len(text)} chars, queue_size={self.text_queue.qsize()}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
self.text_queue.put(text)
|
self.text_queue.put(text)
|
||||||
|
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
@@ -83,7 +106,7 @@ class hardware_serial_driver(speech_driver):
|
|||||||
return
|
return
|
||||||
self.clear_buffer()
|
self.clear_buffer()
|
||||||
if self.cancel_command:
|
if self.cancel_command:
|
||||||
self._write_bytes(self.cancel_command)
|
self._write_bytes(self.cancel_command, "cancel")
|
||||||
|
|
||||||
def clear_buffer(self):
|
def clear_buffer(self):
|
||||||
if not self._is_initialized:
|
if not self._is_initialized:
|
||||||
@@ -95,37 +118,88 @@ class hardware_serial_driver(speech_driver):
|
|||||||
return
|
return
|
||||||
if not isinstance(rate, float):
|
if not isinstance(rate, float):
|
||||||
return
|
return
|
||||||
self._write_bytes(self._rate_command(rate))
|
self._write_bytes(self._rate_command(rate), "rate")
|
||||||
|
|
||||||
def set_pitch(self, pitch):
|
def set_pitch(self, pitch):
|
||||||
if not self._is_initialized:
|
if not self._is_initialized:
|
||||||
return
|
return
|
||||||
if not isinstance(pitch, float):
|
if not isinstance(pitch, float):
|
||||||
return
|
return
|
||||||
self._write_bytes(self._pitch_command(pitch))
|
self._write_bytes(self._pitch_command(pitch), "pitch")
|
||||||
|
|
||||||
def set_volume(self, volume):
|
def set_volume(self, volume):
|
||||||
if not self._is_initialized:
|
if not self._is_initialized:
|
||||||
return
|
return
|
||||||
if not isinstance(volume, float):
|
if not isinstance(volume, float):
|
||||||
return
|
return
|
||||||
self._write_bytes(self._volume_command(volume))
|
self._write_bytes(self._volume_command(volume), "volume")
|
||||||
|
|
||||||
def _worker(self):
|
def _worker(self):
|
||||||
while not self._stop_worker:
|
while not self._stop_worker:
|
||||||
text = self.text_queue.get()
|
text = self.text_queue.get()
|
||||||
if text is None:
|
if text is None:
|
||||||
return
|
return
|
||||||
self._write_bytes(self._speak_bytes(text))
|
try:
|
||||||
|
data = self._speak_bytes(text)
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech worker prepared speech bytes: "
|
||||||
|
f"{len(data)} bytes",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
self._write_bytes(data, "speech")
|
||||||
|
except Exception as error:
|
||||||
|
self._debug(
|
||||||
|
f"Hardware speech worker failed: {error}",
|
||||||
|
debug.DebugLevel.ERROR,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
|
||||||
def _open_serial_port(self):
|
def _open_serial_port(self):
|
||||||
device = self._resolve_device(self.device)
|
auto_detect = self.device == "auto"
|
||||||
if not device:
|
devices = self._resolve_devices(self.device)
|
||||||
|
if not devices:
|
||||||
self._debug(
|
self._debug(
|
||||||
"Hardware speech device not found",
|
"Hardware speech device not found",
|
||||||
debug.DebugLevel.ERROR,
|
debug.DebugLevel.ERROR,
|
||||||
|
on_any_level=True,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
fallback_device = None
|
||||||
|
fallback_port = None
|
||||||
|
for device in devices:
|
||||||
|
port = self._open_configured_serial_port(device)
|
||||||
|
if port is None:
|
||||||
|
continue
|
||||||
|
if not auto_detect:
|
||||||
|
self._activate_serial_port(
|
||||||
|
device, port, probe_matched=False
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if self._probe_serial_port(port):
|
||||||
|
if fallback_port is not None:
|
||||||
|
self._close_port(fallback_port)
|
||||||
|
self._activate_serial_port(device, port, probe_matched=True)
|
||||||
|
return
|
||||||
|
if fallback_port is None:
|
||||||
|
fallback_device = device
|
||||||
|
fallback_port = port
|
||||||
|
else:
|
||||||
|
self._close_port(port)
|
||||||
|
|
||||||
|
if fallback_port is not None:
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech probe did not identify a synth; "
|
||||||
|
f"falling back to {fallback_device}",
|
||||||
|
debug.DebugLevel.WARNING,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
self._activate_serial_port(
|
||||||
|
fallback_device, fallback_port, probe_matched=False
|
||||||
|
)
|
||||||
|
|
||||||
|
def _open_configured_serial_port(self, device):
|
||||||
try:
|
try:
|
||||||
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
|
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
|
||||||
tty.setraw(port)
|
tty.setraw(port)
|
||||||
@@ -138,51 +212,147 @@ class hardware_serial_driver(speech_driver):
|
|||||||
attrs[6][termios.VTIME] = 0
|
attrs[6][termios.VTIME] = 0
|
||||||
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
|
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
|
||||||
termios.tcsetattr(port, termios.TCSANOW, attrs)
|
termios.tcsetattr(port, termios.TCSANOW, attrs)
|
||||||
self.serial_port = port
|
return port
|
||||||
self.device = device
|
except (OSError, termios.error) as error:
|
||||||
except OSError as error:
|
|
||||||
self._debug(
|
self._debug(
|
||||||
f"Hardware speech device open failed: {device}: {error}",
|
f"Hardware speech device open failed: {device}: {error}",
|
||||||
debug.DebugLevel.ERROR,
|
debug.DebugLevel.ERROR,
|
||||||
|
on_any_level=True,
|
||||||
)
|
)
|
||||||
self.serial_port = None
|
return None
|
||||||
|
|
||||||
|
def _activate_serial_port(self, device, port, probe_matched=False):
|
||||||
|
self.serial_port = port
|
||||||
|
self.device = device
|
||||||
|
probe_status = "matched" if probe_matched else "not matched"
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech device opened: "
|
||||||
|
f"{device}, baud_rate={self.baud_rate}, probe={probe_status}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _probe_serial_port(self, port):
|
||||||
|
if not self.hardware_probe_command:
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
self._drain_serial_input(port)
|
||||||
|
os.write(port, self.hardware_probe_command)
|
||||||
|
readable, _, _ = select.select(
|
||||||
|
[port], [], [], self.hardware_probe_timeout
|
||||||
|
)
|
||||||
|
if not readable:
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech probe got no response",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
response = os.read(port, 256)
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech probe response: "
|
||||||
|
f"{self._format_bytes_preview(response)}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
return self._is_hardware_probe_response(response)
|
||||||
|
except OSError as error:
|
||||||
|
self._debug(
|
||||||
|
f"Hardware speech probe failed: {error}",
|
||||||
|
debug.DebugLevel.WARNING,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _drain_serial_input(self, port):
|
||||||
|
while True:
|
||||||
|
readable, _, _ = select.select([port], [], [], 0)
|
||||||
|
if not readable:
|
||||||
|
return
|
||||||
|
if not os.read(port, 256):
|
||||||
|
return
|
||||||
|
|
||||||
|
def _is_hardware_probe_response(self, response):
|
||||||
|
return bool(response)
|
||||||
|
|
||||||
def _close_serial_port(self):
|
def _close_serial_port(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if self.serial_port is None:
|
if self.serial_port is None:
|
||||||
return
|
return
|
||||||
try:
|
self._close_port(self.serial_port)
|
||||||
os.close(self.serial_port)
|
self.serial_port = None
|
||||||
except OSError as error:
|
|
||||||
self._debug(
|
|
||||||
f"Hardware speech device close failed: {error}",
|
|
||||||
debug.DebugLevel.WARNING,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
self.serial_port = None
|
|
||||||
|
|
||||||
def _write_bytes(self, data):
|
def _close_port(self, port):
|
||||||
|
if port is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
os.close(port)
|
||||||
|
except OSError as error:
|
||||||
|
self._debug(
|
||||||
|
f"Hardware speech device close failed: {error}",
|
||||||
|
debug.DebugLevel.WARNING,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _write_bytes(self, data, description="data"):
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if self.serial_port is None:
|
if self.serial_port is None:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
os.write(self.serial_port, data)
|
total_written = 0
|
||||||
|
while total_written < len(data):
|
||||||
|
bytes_written = os.write(
|
||||||
|
self.serial_port, data[total_written:]
|
||||||
|
)
|
||||||
|
if bytes_written == 0:
|
||||||
|
raise OSError("serial write returned 0 bytes")
|
||||||
|
total_written += bytes_written
|
||||||
|
preview = self._format_bytes_preview(data)
|
||||||
|
self._debug(
|
||||||
|
"Hardware speech wrote "
|
||||||
|
f"{total_written} {description} bytes: {preview}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
except OSError as error:
|
except OSError as error:
|
||||||
self._debug(
|
self._debug(
|
||||||
f"Hardware speech write failed: {error}",
|
f"Hardware speech write failed: {error}",
|
||||||
debug.DebugLevel.ERROR,
|
debug.DebugLevel.ERROR,
|
||||||
|
on_any_level=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _resolve_device(self, device):
|
def _resolve_devices(self, device):
|
||||||
if device and device != "auto":
|
if device and device != "auto":
|
||||||
return device
|
self._debug(
|
||||||
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*"):
|
f"Hardware speech using configured device: {device}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
return [device]
|
||||||
|
devices = []
|
||||||
|
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"):
|
||||||
matches = sorted(glob.glob(pattern))
|
matches = sorted(glob.glob(pattern))
|
||||||
|
self._debug(
|
||||||
|
f"Hardware speech auto scan {pattern}: {matches}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
if matches:
|
if matches:
|
||||||
return matches[0]
|
if len(matches) > 1:
|
||||||
return ""
|
self._debug(
|
||||||
|
"Hardware speech auto found multiple devices for "
|
||||||
|
f"{pattern}: {matches}; probing in order",
|
||||||
|
debug.DebugLevel.WARNING,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
self._debug(
|
||||||
|
f"Hardware speech auto candidate devices: {matches}",
|
||||||
|
debug.DebugLevel.INFO,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
devices.extend(matches)
|
||||||
|
return devices
|
||||||
|
|
||||||
def _termios_baud_rate(self, baud_rate):
|
def _termios_baud_rate(self, baud_rate):
|
||||||
baud_name = f"B{baud_rate}"
|
baud_name = f"B{baud_rate}"
|
||||||
@@ -205,10 +375,23 @@ class hardware_serial_driver(speech_driver):
|
|||||||
value = max(0.0, min(1.0, value))
|
value = max(0.0, min(1.0, value))
|
||||||
return int(round(minimum + value * (maximum - minimum)))
|
return int(round(minimum + value * (maximum - minimum)))
|
||||||
|
|
||||||
def _debug(self, message, level):
|
def _format_bytes_preview(self, data, limit=32):
|
||||||
|
preview = data[:limit]
|
||||||
|
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
|
||||||
|
ascii_preview = "".join(
|
||||||
|
chr(byte) if 0x20 <= byte <= 0x7E else "."
|
||||||
|
for byte in preview
|
||||||
|
)
|
||||||
|
suffix = "" if len(data) <= limit else " ..."
|
||||||
|
return (
|
||||||
|
f"hex=[{hex_preview}{suffix}] "
|
||||||
|
f"ascii=[{ascii_preview}{suffix}]"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _debug(self, message, level, on_any_level=False):
|
||||||
try:
|
try:
|
||||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||||
message, level
|
message, level, on_any_level=on_any_level
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from fenrirscreenreader.speechDriver.hardwareSerialDriver import (
|
|||||||
|
|
||||||
class driver(hardware_serial_driver):
|
class driver(hardware_serial_driver):
|
||||||
cancel_command = b"\x18"
|
cancel_command = b"\x18"
|
||||||
|
hardware_probe_command = b"\x01I"
|
||||||
|
|
||||||
def _speak_bytes(self, text):
|
def _speak_bytes(self, text):
|
||||||
return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
|
return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
|
||||||
@@ -26,3 +27,10 @@ class driver(hardware_serial_driver):
|
|||||||
|
|
||||||
def _setting_command(self, value, command):
|
def _setting_command(self, value, command):
|
||||||
return b"\x01" + str(value).encode("ascii") + command
|
return b"\x01" + str(value).encode("ascii") + command
|
||||||
|
|
||||||
|
def _is_hardware_probe_response(self, response):
|
||||||
|
response_text = response.decode("ascii", errors="ignore").lower()
|
||||||
|
return any(
|
||||||
|
token in response_text
|
||||||
|
for token in ("litetalk", "lite talk", "rpitalk", "doubletalk")
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import os
|
import os
|
||||||
import select
|
import select
|
||||||
|
import termios
|
||||||
import time
|
import time
|
||||||
|
from unittest.mock import ANY
|
||||||
from unittest.mock import Mock
|
from unittest.mock import Mock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -103,6 +105,280 @@ def test_litetalk_driver_writes_settings_and_cancel(serial_pair):
|
|||||||
speech_driver.shutdown()
|
speech_driver.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_auto_device_detection_includes_classic_serial(
|
||||||
|
monkeypatch, serial_pair
|
||||||
|
):
|
||||||
|
master_fd, slave_name = serial_pair
|
||||||
|
|
||||||
|
def fake_glob(pattern):
|
||||||
|
if pattern == "/dev/ttyS*":
|
||||||
|
return [slave_name]
|
||||||
|
return []
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob",
|
||||||
|
fake_glob,
|
||||||
|
)
|
||||||
|
speech_driver = litetalkDriver.driver()
|
||||||
|
speech_driver.initialize(build_environment("auto"))
|
||||||
|
try:
|
||||||
|
assert speech_driver.device == slave_name
|
||||||
|
speech_driver.speak("Serial")
|
||||||
|
assert read_available(master_fd, 9) == b"\x01ISerial\r"
|
||||||
|
finally:
|
||||||
|
speech_driver.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_configured_device_strips_inline_comment(serial_pair):
|
||||||
|
master_fd, slave_name = serial_pair
|
||||||
|
device_setting = f"{slave_name} # built-in serial port"
|
||||||
|
speech_driver = litetalkDriver.driver()
|
||||||
|
speech_driver.initialize(build_environment(device_setting))
|
||||||
|
try:
|
||||||
|
assert speech_driver.device == slave_name
|
||||||
|
speech_driver.speak("Specific")
|
||||||
|
assert read_available(master_fd, 9) == b"Specific\r"
|
||||||
|
finally:
|
||||||
|
speech_driver.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_auto_device_detection_prefers_probe_response(monkeypatch):
|
||||||
|
opened_ports = []
|
||||||
|
closed_ports = []
|
||||||
|
writes = {}
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob",
|
||||||
|
lambda pattern: ["/dev/ttyUSB0", "/dev/ttyUSB1"]
|
||||||
|
if pattern == "/dev/ttyUSB*"
|
||||||
|
else [],
|
||||||
|
)
|
||||||
|
|
||||||
|
def fake_open(device, flags):
|
||||||
|
port = 100 + len(opened_ports)
|
||||||
|
opened_ports.append((device, port))
|
||||||
|
return port
|
||||||
|
|
||||||
|
def fake_write(port, data):
|
||||||
|
writes.setdefault(port, b"")
|
||||||
|
writes[port] += data
|
||||||
|
return len(data)
|
||||||
|
|
||||||
|
def fake_select(readable, writable, exceptional, timeout):
|
||||||
|
port = readable[0]
|
||||||
|
if port == 101 and writes.get(port) == b"\x01I":
|
||||||
|
return readable, writable, exceptional
|
||||||
|
return [], writable, exceptional
|
||||||
|
|
||||||
|
def fake_read(port, size):
|
||||||
|
if port == 101:
|
||||||
|
return b"RPItalk 1.3\r"
|
||||||
|
return b""
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open",
|
||||||
|
fake_open,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close",
|
||||||
|
lambda port: closed_ports.append(port),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
|
||||||
|
fake_write,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.read",
|
||||||
|
fake_read,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select",
|
||||||
|
fake_select,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr",
|
||||||
|
lambda port: [0, 0, 0, 0, 0, 0, [0] * 32],
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr",
|
||||||
|
lambda port, when, attrs: None,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw",
|
||||||
|
lambda port: None,
|
||||||
|
)
|
||||||
|
|
||||||
|
speech_driver = litetalkDriver.driver()
|
||||||
|
speech_driver.initialize(build_environment("auto"))
|
||||||
|
try:
|
||||||
|
assert opened_ports == [("/dev/ttyUSB0", 100), ("/dev/ttyUSB1", 101)]
|
||||||
|
assert writes == {100: b"\x01I", 101: b"\x01I"}
|
||||||
|
assert closed_ports == [100]
|
||||||
|
assert speech_driver.device == "/dev/ttyUSB1"
|
||||||
|
finally:
|
||||||
|
speech_driver.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_auto_device_detection_falls_back_without_probe_response(
|
||||||
|
monkeypatch
|
||||||
|
):
|
||||||
|
opened_ports = []
|
||||||
|
closed_ports = []
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob",
|
||||||
|
lambda pattern: ["/dev/ttyUSB0", "/dev/ttyUSB1"]
|
||||||
|
if pattern == "/dev/ttyUSB*"
|
||||||
|
else [],
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open",
|
||||||
|
lambda device, flags: opened_ports.append(device) or len(opened_ports),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close",
|
||||||
|
lambda port: closed_ports.append(port),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
|
||||||
|
lambda port, data: len(data),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select",
|
||||||
|
lambda readable, writable, exceptional, timeout: (
|
||||||
|
[],
|
||||||
|
writable,
|
||||||
|
exceptional,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr",
|
||||||
|
lambda port: [0, 0, 0, 0, 0, 0, [0] * 32],
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr",
|
||||||
|
lambda port, when, attrs: None,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw",
|
||||||
|
lambda port: None,
|
||||||
|
)
|
||||||
|
|
||||||
|
speech_driver = litetalkDriver.driver()
|
||||||
|
speech_driver.initialize(build_environment("auto"))
|
||||||
|
try:
|
||||||
|
assert opened_ports == ["/dev/ttyUSB0", "/dev/ttyUSB1"]
|
||||||
|
assert closed_ports == [2]
|
||||||
|
assert speech_driver.device == "/dev/ttyUSB0"
|
||||||
|
finally:
|
||||||
|
speech_driver.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_auto_device_detection_skips_termios_failures(monkeypatch):
|
||||||
|
opened_ports = []
|
||||||
|
closed_ports = []
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob",
|
||||||
|
lambda pattern: ["/dev/ttyUSB0"]
|
||||||
|
if pattern == "/dev/ttyUSB*"
|
||||||
|
else ["/dev/ttyS0"]
|
||||||
|
if pattern == "/dev/ttyS*"
|
||||||
|
else [],
|
||||||
|
)
|
||||||
|
|
||||||
|
def fake_open(device, flags):
|
||||||
|
port = 100 + len(opened_ports)
|
||||||
|
opened_ports.append((device, port))
|
||||||
|
return port
|
||||||
|
|
||||||
|
def fake_tcgetattr(port):
|
||||||
|
if port == 101:
|
||||||
|
raise termios.error(5, "Input/output error")
|
||||||
|
return [0, 0, 0, 0, 0, 0, [0] * 32]
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.open",
|
||||||
|
fake_open,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.close",
|
||||||
|
lambda port: closed_ports.append(port),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
|
||||||
|
lambda port, data: len(data),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.select.select",
|
||||||
|
lambda readable, writable, exceptional, timeout: (
|
||||||
|
[],
|
||||||
|
writable,
|
||||||
|
exceptional,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcgetattr",
|
||||||
|
fake_tcgetattr,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.termios.tcsetattr",
|
||||||
|
lambda port, when, attrs: None,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.tty.setraw",
|
||||||
|
lambda port: None,
|
||||||
|
)
|
||||||
|
|
||||||
|
speech_driver = litetalkDriver.driver()
|
||||||
|
speech_driver.initialize(build_environment("auto"))
|
||||||
|
try:
|
||||||
|
assert opened_ports == [("/dev/ttyUSB0", 100), ("/dev/ttyS0", 101)]
|
||||||
|
assert speech_driver.device == "/dev/ttyUSB0"
|
||||||
|
finally:
|
||||||
|
speech_driver.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_auto_device_detection_fails_when_no_serial_device(monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.glob.glob",
|
||||||
|
lambda pattern: [],
|
||||||
|
)
|
||||||
|
speech_driver = litetalkDriver.driver()
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="hardware speech device"):
|
||||||
|
speech_driver.initialize(build_environment("auto"))
|
||||||
|
|
||||||
|
debug_manager = speech_driver.env["runtime"]["DebugManager"]
|
||||||
|
debug_manager.write_debug_out.assert_called_with(
|
||||||
|
"Hardware speech device not found",
|
||||||
|
ANY,
|
||||||
|
on_any_level=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_hardware_driver_retries_partial_serial_writes(monkeypatch):
|
||||||
|
speech_driver = litetalkDriver.driver()
|
||||||
|
speech_driver.env = build_environment("/dev/ttyUSB0")
|
||||||
|
speech_driver.serial_port = 12
|
||||||
|
written_chunks = []
|
||||||
|
|
||||||
|
def fake_write(port, data):
|
||||||
|
assert port == 12
|
||||||
|
chunk = data[:2]
|
||||||
|
written_chunks.append(chunk)
|
||||||
|
return len(chunk)
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
|
||||||
|
fake_write,
|
||||||
|
)
|
||||||
|
|
||||||
|
speech_driver._write_bytes(b"abcdef", "speech")
|
||||||
|
|
||||||
|
assert written_chunks == [b"ab", b"cd", b"ef"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
|
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
|
||||||
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
|
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
|
||||||
speech_driver, master_fd = initialized_driver(driver_class, serial_pair)
|
speech_driver, master_fd = initialized_driver(driver_class, serial_pair)
|
||||||
|
|||||||
Reference in New Issue
Block a user