New hardware synth support added. Untested, so consider this experimental.
This commit is contained in:
@@ -127,6 +127,8 @@ class config_command:
|
||||
self.config.set("speech", "rate", "0.75")
|
||||
self.config.set("speech", "pitch", "0.5")
|
||||
self.config.set("speech", "volume", "1.0")
|
||||
self.config.set("speech", "hardware_device", "auto")
|
||||
self.config.set("speech", "hardware_baud_rate", "9600")
|
||||
|
||||
self.config.add_section("sound")
|
||||
self.config.set("sound", "driver", "genericDriver")
|
||||
|
||||
@@ -108,6 +108,8 @@ class command(config_command):
|
||||
"rate": "0.5",
|
||||
"pitch": "0.5",
|
||||
"volume": "1.0",
|
||||
"hardware_device": "auto",
|
||||
"hardware_baud_rate": "9600",
|
||||
"auto_read_incoming": "True",
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ settings_data = {
|
||||
"module": "",
|
||||
"voice": "en-us",
|
||||
"language": "",
|
||||
"hardware_device": "auto",
|
||||
"hardware_baud_rate": 9600,
|
||||
"auto_read_incoming": True,
|
||||
"read_numbers_as_digits": False,
|
||||
"rapid_update_threshold": 5,
|
||||
|
||||
@@ -508,6 +508,10 @@ class SettingsManager:
|
||||
valid_drivers = [
|
||||
"speechdDriver",
|
||||
"genericDriver",
|
||||
"dectalkDriver",
|
||||
"doubletalkDriver",
|
||||
"litetalkDriver",
|
||||
"tripletalkDriver",
|
||||
"dummyDriver",
|
||||
]
|
||||
if value not in valid_drivers:
|
||||
|
||||
@@ -4,5 +4,5 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
version = "2026.05.19"
|
||||
version = "2026.05.20"
|
||||
code_name = "testing"
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
from fenrirscreenreader.speechDriver.hardwareSerialDriver import (
|
||||
hardware_serial_driver,
|
||||
)
|
||||
|
||||
|
||||
class driver(hardware_serial_driver):
|
||||
cancel_command = b"\x18"
|
||||
|
||||
def _speak_bytes(self, text):
|
||||
return self._clean_text(text).encode("ascii", errors="replace") + b"\x01"
|
||||
|
||||
def _rate_command(self, rate):
|
||||
return self._setting_command("ra", self._scale(rate, 75, 650))
|
||||
|
||||
def _pitch_command(self, pitch):
|
||||
return self._setting_command("dv ap", self._scale(pitch, 50, 180))
|
||||
|
||||
def _volume_command(self, volume):
|
||||
return self._setting_command("vo", self._scale(volume, 0, 100))
|
||||
|
||||
def _setting_command(self, command, value):
|
||||
return f"[:{command} {value}]".encode("ascii")
|
||||
@@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
from fenrirscreenreader.speechDriver.litetalkDriver import driver
|
||||
@@ -0,0 +1,226 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
import glob
|
||||
import os
|
||||
import termios
|
||||
import threading
|
||||
import tty
|
||||
from queue import Empty
|
||||
from queue import Queue
|
||||
|
||||
from fenrirscreenreader.core import debug
|
||||
from fenrirscreenreader.core.speechDriver import speech_driver
|
||||
|
||||
|
||||
class SpeakQueue(Queue):
|
||||
def clear(self):
|
||||
try:
|
||||
while True:
|
||||
self.get_nowait()
|
||||
except Empty:
|
||||
pass
|
||||
|
||||
|
||||
class hardware_serial_driver(speech_driver):
|
||||
cancel_command = b""
|
||||
default_baud_rate = 9600
|
||||
|
||||
def __init__(self):
|
||||
speech_driver.__init__(self)
|
||||
self.device = ""
|
||||
self.baud_rate = self.default_baud_rate
|
||||
self.serial_port = None
|
||||
self.text_queue = SpeakQueue()
|
||||
self.lock = threading.Lock()
|
||||
self.worker_thread = None
|
||||
self._stop_worker = False
|
||||
|
||||
def initialize(self, environment):
|
||||
self.env = environment
|
||||
self._is_initialized = False
|
||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||
self.device = settings_manager.get_setting(
|
||||
"speech", "hardware_device"
|
||||
)
|
||||
self.baud_rate = settings_manager.get_setting_as_int(
|
||||
"speech", "hardware_baud_rate"
|
||||
)
|
||||
self._open_serial_port()
|
||||
self._is_initialized = self.serial_port is not None
|
||||
if self._is_initialized:
|
||||
self._stop_worker = False
|
||||
self.worker_thread = threading.Thread(
|
||||
target=self._worker, daemon=True
|
||||
)
|
||||
self.worker_thread.start()
|
||||
|
||||
def shutdown(self):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self._stop_worker = True
|
||||
self.clear_buffer()
|
||||
self.text_queue.put(None)
|
||||
if self.worker_thread:
|
||||
self.worker_thread.join(timeout=0.5)
|
||||
self._close_serial_port()
|
||||
self._is_initialized = False
|
||||
|
||||
def speak(self, text, queueable=True, ignore_punctuation=False):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not queueable:
|
||||
self.cancel()
|
||||
if not isinstance(text, str) or text == "":
|
||||
return
|
||||
self.text_queue.put(text)
|
||||
|
||||
def cancel(self):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self.clear_buffer()
|
||||
if self.cancel_command:
|
||||
self._write_bytes(self.cancel_command)
|
||||
|
||||
def clear_buffer(self):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self.text_queue.clear()
|
||||
|
||||
def set_rate(self, rate):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not isinstance(rate, float):
|
||||
return
|
||||
self._write_bytes(self._rate_command(rate))
|
||||
|
||||
def set_pitch(self, pitch):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not isinstance(pitch, float):
|
||||
return
|
||||
self._write_bytes(self._pitch_command(pitch))
|
||||
|
||||
def set_volume(self, volume):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not isinstance(volume, float):
|
||||
return
|
||||
self._write_bytes(self._volume_command(volume))
|
||||
|
||||
def _worker(self):
|
||||
while not self._stop_worker:
|
||||
text = self.text_queue.get()
|
||||
if text is None:
|
||||
return
|
||||
self._write_bytes(self._speak_bytes(text))
|
||||
|
||||
def _open_serial_port(self):
|
||||
device = self._resolve_device(self.device)
|
||||
if not device:
|
||||
self._debug(
|
||||
"Hardware speech device not found",
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
return
|
||||
try:
|
||||
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
|
||||
tty.setraw(port)
|
||||
attrs = termios.tcgetattr(port)
|
||||
attrs[2] |= termios.CLOCAL | termios.CREAD
|
||||
baud_rate = self._termios_baud_rate(self.baud_rate)
|
||||
attrs[4] = baud_rate
|
||||
attrs[5] = baud_rate
|
||||
attrs[6][termios.VMIN] = 0
|
||||
attrs[6][termios.VTIME] = 0
|
||||
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
|
||||
termios.tcsetattr(port, termios.TCSANOW, attrs)
|
||||
self.serial_port = port
|
||||
self.device = device
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech device open failed: {device}: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
self.serial_port = None
|
||||
|
||||
def _close_serial_port(self):
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
try:
|
||||
os.close(self.serial_port)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech device close failed: {error}",
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
finally:
|
||||
self.serial_port = None
|
||||
|
||||
def _write_bytes(self, data):
|
||||
if not data:
|
||||
return
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
try:
|
||||
os.write(self.serial_port, data)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech write failed: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
|
||||
def _resolve_device(self, device):
|
||||
if device and device != "auto":
|
||||
return device
|
||||
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*"):
|
||||
matches = sorted(glob.glob(pattern))
|
||||
if matches:
|
||||
return matches[0]
|
||||
return ""
|
||||
|
||||
def _termios_baud_rate(self, baud_rate):
|
||||
baud_name = f"B{baud_rate}"
|
||||
if hasattr(termios, baud_name):
|
||||
return getattr(termios, baud_name)
|
||||
self._debug(
|
||||
f"Unsupported hardware speech baud rate {baud_rate}; using 9600",
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
return termios.B9600
|
||||
|
||||
def _clean_text(self, text):
|
||||
text = text.replace("\r", " ").replace("\n", " ")
|
||||
return "".join(
|
||||
char if 0x20 <= ord(char) <= 0x7E else " "
|
||||
for char in text
|
||||
)
|
||||
|
||||
def _scale(self, value, minimum, maximum):
|
||||
value = max(0.0, min(1.0, value))
|
||||
return int(round(minimum + value * (maximum - minimum)))
|
||||
|
||||
def _debug(self, message, level):
|
||||
try:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
message, level
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _speak_bytes(self, text):
|
||||
raise NotImplementedError
|
||||
|
||||
def _rate_command(self, rate):
|
||||
return b""
|
||||
|
||||
def _pitch_command(self, pitch):
|
||||
return b""
|
||||
|
||||
def _volume_command(self, volume):
|
||||
return b""
|
||||
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
from fenrirscreenreader.speechDriver.hardwareSerialDriver import (
|
||||
hardware_serial_driver,
|
||||
)
|
||||
|
||||
|
||||
class driver(hardware_serial_driver):
|
||||
cancel_command = b"\x18"
|
||||
|
||||
def _speak_bytes(self, text):
|
||||
return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
|
||||
|
||||
def _rate_command(self, rate):
|
||||
return self._setting_command(self._scale(rate, 0, 9), b"S")
|
||||
|
||||
def _pitch_command(self, pitch):
|
||||
return self._setting_command(self._scale(pitch, 0, 99), b"P")
|
||||
|
||||
def _volume_command(self, volume):
|
||||
return self._setting_command(self._scale(volume, 0, 9), b"V")
|
||||
|
||||
def _setting_command(self, value, command):
|
||||
return b"\x01" + str(value).encode("ascii") + command
|
||||
@@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
from fenrirscreenreader.speechDriver.litetalkDriver import driver
|
||||
Reference in New Issue
Block a user