Merge branch 'testing' Hopefully final release candidate for the new version.

This commit is contained in:
Storm Dragon
2026-05-23 18:59:14 -04:00
11 changed files with 31 additions and 365 deletions
@@ -127,7 +127,7 @@ class config_command:
self.config.set("speech", "rate", "0.75")
self.config.set("speech", "pitch", "0.5")
self.config.set("speech", "volume", "1.0")
self.config.set("speech", "hardware_device", "auto")
self.config.set("speech", "hardware_device", "/dev/ttyS0")
self.config.set("speech", "hardware_baud_rate", "9600")
self.config.add_section("sound")
@@ -108,7 +108,7 @@ class command(config_command):
"rate": "0.5",
"pitch": "0.5",
"volume": "1.0",
"hardware_device": "auto",
"hardware_device": "/dev/ttyS0",
"hardware_baud_rate": "9600",
"auto_read_incoming": "True",
}
+1 -1
View File
@@ -28,7 +28,7 @@ settings_data = {
"module": "",
"voice": "en-us",
"language": "",
"hardware_device": "auto",
"hardware_device": "/dev/ttyS0",
"hardware_baud_rate": 9600,
"auto_read_incoming": True,
"read_numbers_as_digits": False,
@@ -4,9 +4,7 @@
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import glob
import os
import select
import termios
import threading
import tty
@@ -29,8 +27,6 @@ class SpeakQueue(Queue):
class hardware_serial_driver(speech_driver):
cancel_command = b""
default_baud_rate = 9600
hardware_probe_command = b""
hardware_probe_timeout = 0.2
def __init__(self):
speech_driver.__init__(self)
@@ -71,9 +67,9 @@ class hardware_serial_driver(speech_driver):
def _clean_device_setting(self, device):
if not isinstance(device, str):
return "auto"
return ""
device = device.split("#", 1)[0].split(";", 1)[0].strip()
return device or "auto"
return device
def shutdown(self):
if not self._is_initialized:
@@ -156,50 +152,20 @@ class hardware_serial_driver(speech_driver):
)
def _open_serial_port(self):
auto_detect = self.device == "auto"
devices = self._resolve_devices(self.device)
if not devices:
if not self.device or self.device == "auto":
self._debug(
"Hardware speech device not found",
"Hardware speech requires an explicit serial device",
debug.DebugLevel.ERROR,
on_any_level=True,
)
return
fallback_device = None
fallback_port = None
for device in devices:
port = self._open_configured_serial_port(device)
if port is None:
continue
if not auto_detect:
self._activate_serial_port(
device, port, probe_matched=False
)
return
if self._probe_serial_port(port):
if fallback_port is not None:
self._close_port(fallback_port)
self._activate_serial_port(device, port, probe_matched=True)
return
if fallback_port is None:
fallback_device = device
fallback_port = port
else:
self._close_port(port)
if fallback_port is not None:
self._debug(
"Hardware speech probe did not identify a synth; "
f"falling back to {fallback_device}",
debug.DebugLevel.WARNING,
on_any_level=True,
)
self._activate_serial_port(
fallback_device, fallback_port, probe_matched=False
)
port = self._open_configured_serial_port(self.device)
if port is not None:
self._activate_serial_port(self.device, port)
def _open_configured_serial_port(self, device):
port = None
try:
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
tty.setraw(port)
@@ -214,6 +180,7 @@ class hardware_serial_driver(speech_driver):
termios.tcsetattr(port, termios.TCSANOW, attrs)
return port
except (OSError, termios.error) as error:
self._close_port(port)
self._debug(
f"Hardware speech device open failed: {device}: {error}",
debug.DebugLevel.ERROR,
@@ -221,60 +188,16 @@ class hardware_serial_driver(speech_driver):
)
return None
def _activate_serial_port(self, device, port, probe_matched=False):
def _activate_serial_port(self, device, port):
self.serial_port = port
self.device = device
probe_status = "matched" if probe_matched else "not matched"
self._debug(
"Hardware speech device opened: "
f"{device}, baud_rate={self.baud_rate}, probe={probe_status}",
f"{device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO,
on_any_level=True,
)
def _probe_serial_port(self, port):
if not self.hardware_probe_command:
return True
try:
self._drain_serial_input(port)
os.write(port, self.hardware_probe_command)
readable, _, _ = select.select(
[port], [], [], self.hardware_probe_timeout
)
if not readable:
self._debug(
"Hardware speech probe got no response",
debug.DebugLevel.INFO,
on_any_level=True,
)
return False
response = os.read(port, 256)
self._debug(
"Hardware speech probe response: "
f"{self._format_bytes_preview(response)}",
debug.DebugLevel.INFO,
on_any_level=True,
)
return self._is_hardware_probe_response(response)
except OSError as error:
self._debug(
f"Hardware speech probe failed: {error}",
debug.DebugLevel.WARNING,
on_any_level=True,
)
return False
def _drain_serial_input(self, port):
while True:
readable, _, _ = select.select([port], [], [], 0)
if not readable:
return
if not os.read(port, 256):
return
def _is_hardware_probe_response(self, response):
return bool(response)
def _close_serial_port(self):
with self.lock:
if self.serial_port is None:
@@ -322,38 +245,6 @@ class hardware_serial_driver(speech_driver):
on_any_level=True,
)
def _resolve_devices(self, device):
if device and device != "auto":
self._debug(
f"Hardware speech using configured device: {device}",
debug.DebugLevel.INFO,
on_any_level=True,
)
return [device]
devices = []
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*", "/dev/ttyS*"):
matches = sorted(glob.glob(pattern))
self._debug(
f"Hardware speech auto scan {pattern}: {matches}",
debug.DebugLevel.INFO,
on_any_level=True,
)
if matches:
if len(matches) > 1:
self._debug(
"Hardware speech auto found multiple devices for "
f"{pattern}: {matches}; probing in order",
debug.DebugLevel.WARNING,
on_any_level=True,
)
self._debug(
f"Hardware speech auto candidate devices: {matches}",
debug.DebugLevel.INFO,
on_any_level=True,
)
devices.extend(matches)
return devices
def _termios_baud_rate(self, baud_rate):
baud_name = f"B{baud_rate}"
if hasattr(termios, baud_name):
@@ -11,7 +11,6 @@ from fenrirscreenreader.speechDriver.hardwareSerialDriver import (
class driver(hardware_serial_driver):
cancel_command = b"\x18"
hardware_probe_command = b"\x01I"
def _speak_bytes(self, text):
return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
@@ -27,10 +26,3 @@ class driver(hardware_serial_driver):
def _setting_command(self, value, command):
return b"\x01" + str(value).encode("ascii") + command
def _is_hardware_probe_response(self, response):
response_text = response.decode("ascii", errors="ignore").lower()
return any(
token in response_text
for token in ("litetalk", "lite talk", "rpitalk", "doubletalk")
)