187 lines
5.7 KiB
Python
187 lines
5.7 KiB
Python
import os
|
|
import select
|
|
import time
|
|
from unittest.mock import ANY
|
|
from unittest.mock import Mock
|
|
|
|
import pytest
|
|
|
|
from fenrirscreenreader.speechDriver import dectalkDriver
|
|
from fenrirscreenreader.speechDriver import doubletalkDriver
|
|
from fenrirscreenreader.speechDriver import litetalkDriver
|
|
from fenrirscreenreader.speechDriver import tripletalkDriver
|
|
|
|
|
|
def build_environment(device):
|
|
settings_manager = Mock()
|
|
settings_manager.get_setting.side_effect = (
|
|
lambda section, setting: device
|
|
if (section, setting) == ("speech", "hardware_device")
|
|
else ""
|
|
)
|
|
settings_manager.get_setting_as_int.side_effect = (
|
|
lambda section, setting: 9600
|
|
if (section, setting) == ("speech", "hardware_baud_rate")
|
|
else 0
|
|
)
|
|
return {
|
|
"runtime": {
|
|
"SettingsManager": settings_manager,
|
|
"DebugManager": Mock(),
|
|
}
|
|
}
|
|
|
|
|
|
def read_available(fd, expected_length, timeout=1.0):
|
|
deadline = time.monotonic() + timeout
|
|
data = b""
|
|
while len(data) < expected_length and time.monotonic() < deadline:
|
|
readable, _, _ = select.select([fd], [], [], 0.05)
|
|
if readable:
|
|
data += os.read(fd, 1024)
|
|
return data
|
|
|
|
|
|
@pytest.fixture
|
|
def serial_pair():
|
|
master_fd, slave_fd = os.openpty()
|
|
try:
|
|
yield master_fd, os.ttyname(slave_fd)
|
|
finally:
|
|
os.close(master_fd)
|
|
os.close(slave_fd)
|
|
|
|
|
|
def initialized_driver(driver_class, serial_pair):
|
|
master_fd, slave_name = serial_pair
|
|
speech_driver = driver_class.driver()
|
|
speech_driver.initialize(build_environment(slave_name))
|
|
assert speech_driver._is_initialized
|
|
return speech_driver, master_fd
|
|
|
|
|
|
def test_dectalk_driver_speaks_printable_text(serial_pair):
|
|
speech_driver, master_fd = initialized_driver(dectalkDriver, serial_pair)
|
|
try:
|
|
speech_driver.speak("Hello\nworld ☃")
|
|
assert read_available(master_fd, 13) == b"Hello world \x01"
|
|
finally:
|
|
speech_driver.shutdown()
|
|
|
|
|
|
def test_dectalk_driver_writes_settings_and_cancel(serial_pair):
|
|
speech_driver, master_fd = initialized_driver(dectalkDriver, serial_pair)
|
|
try:
|
|
speech_driver.set_rate(1.0)
|
|
speech_driver.set_pitch(0.0)
|
|
speech_driver.set_volume(0.5)
|
|
speech_driver.cancel()
|
|
assert read_available(master_fd, 33) == (
|
|
b"[:ra 650][:dv ap 50][:vo 50]\x18"
|
|
)
|
|
finally:
|
|
speech_driver.shutdown()
|
|
|
|
|
|
def test_litetalk_driver_speaks_printable_text(serial_pair):
|
|
speech_driver, master_fd = initialized_driver(litetalkDriver, serial_pair)
|
|
try:
|
|
speech_driver.speak("Ready")
|
|
assert read_available(master_fd, 6) == b"Ready\r"
|
|
finally:
|
|
speech_driver.shutdown()
|
|
|
|
|
|
def test_litetalk_driver_writes_settings_and_cancel(serial_pair):
|
|
speech_driver, master_fd = initialized_driver(litetalkDriver, serial_pair)
|
|
try:
|
|
speech_driver.set_rate(1.0)
|
|
speech_driver.set_pitch(0.0)
|
|
speech_driver.set_volume(0.5)
|
|
speech_driver.cancel()
|
|
assert read_available(master_fd, 9) == b"\x019S\x010P\x014V\x18"
|
|
finally:
|
|
speech_driver.shutdown()
|
|
|
|
|
|
def test_configured_device_supports_classic_serial(serial_pair):
|
|
master_fd, slave_name = serial_pair
|
|
speech_driver = litetalkDriver.driver()
|
|
speech_driver.initialize(build_environment(slave_name))
|
|
try:
|
|
assert speech_driver.device == slave_name
|
|
speech_driver.speak("Serial")
|
|
assert read_available(master_fd, 7) == b"Serial\r"
|
|
finally:
|
|
speech_driver.shutdown()
|
|
|
|
|
|
def test_configured_device_strips_inline_comment(serial_pair):
|
|
master_fd, slave_name = serial_pair
|
|
device_setting = f"{slave_name} # built-in serial port"
|
|
speech_driver = litetalkDriver.driver()
|
|
speech_driver.initialize(build_environment(device_setting))
|
|
try:
|
|
assert speech_driver.device == slave_name
|
|
speech_driver.speak("Specific")
|
|
assert read_available(master_fd, 9) == b"Specific\r"
|
|
finally:
|
|
speech_driver.shutdown()
|
|
|
|
|
|
def test_auto_device_is_rejected():
|
|
speech_driver = litetalkDriver.driver()
|
|
|
|
with pytest.raises(RuntimeError, match="hardware speech device"):
|
|
speech_driver.initialize(build_environment("auto"))
|
|
|
|
debug_manager = speech_driver.env["runtime"]["DebugManager"]
|
|
debug_manager.write_debug_out.assert_called_with(
|
|
"Hardware speech requires an explicit serial device",
|
|
ANY,
|
|
on_any_level=True,
|
|
)
|
|
|
|
|
|
def test_hardware_driver_retries_partial_serial_writes(monkeypatch):
|
|
speech_driver = litetalkDriver.driver()
|
|
speech_driver.env = build_environment("/dev/ttyUSB0")
|
|
speech_driver.serial_port = 12
|
|
written_chunks = []
|
|
|
|
def fake_write(port, data):
|
|
assert port == 12
|
|
chunk = data[:2]
|
|
written_chunks.append(chunk)
|
|
return len(chunk)
|
|
|
|
monkeypatch.setattr(
|
|
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
|
|
fake_write,
|
|
)
|
|
|
|
speech_driver._write_bytes(b"abcdef", "speech")
|
|
|
|
assert written_chunks == [b"ab", b"cd", b"ef"]
|
|
|
|
|
|
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
|
|
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
|
|
speech_driver, master_fd = initialized_driver(driver_class, serial_pair)
|
|
try:
|
|
speech_driver.speak("Alias")
|
|
speech_driver.set_rate(1.0)
|
|
assert read_available(master_fd, 10) == b"\x019SAlias\r"
|
|
finally:
|
|
speech_driver.shutdown()
|
|
|
|
|
|
def test_hardware_driver_ignores_empty_and_non_string_text(serial_pair):
|
|
speech_driver, master_fd = initialized_driver(dectalkDriver, serial_pair)
|
|
try:
|
|
speech_driver.speak("")
|
|
speech_driver.speak(None)
|
|
assert read_available(master_fd, 1, timeout=0.2) == b""
|
|
finally:
|
|
speech_driver.shutdown()
|