Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bd54ec0edb | |||
| b9518f52ec | |||
| c143c9a561 | |||
| 7e2f927596 | |||
| 788e678ed6 | |||
| ea89e90c2f | |||
| ce43d64e77 | |||
| 618987546a | |||
| 604221a29d | |||
| 89b85c6f17 | |||
| 6e3d7fee94 | |||
| 089850ac18 | |||
| 5b7c08260a | |||
| d4b2fec1db | |||
| 1f7aa99cc0 | |||
| d853e1b24d | |||
| e8bc34eaf5 | |||
| f84167a7fb |
@@ -460,13 +460,14 @@ setting <action> [parameters]
|
||||
- `speech#voice=voice_name` - Voice selection (e.g., "en-us+f3")
|
||||
- `speech#module=module_name` - TTS module (e.g., "espeak-ng")
|
||||
- `speech#driver=driver_name` - Speech driver (speechdDriver/genericDriver/dectalkDriver/litetalkDriver/doubletalkDriver/tripletalkDriver)
|
||||
- `speech#hardware_device=auto` - Hardware synth serial device for dectalkDriver/litetalkDriver
|
||||
- `speech#hardware_device=/dev/ttyS0` - Hardware synth serial device for dectalkDriver/litetalkDriver
|
||||
- `speech#hardware_baud_rate=9600` - Hardware synth serial baud rate
|
||||
- `speech#history_size=50` - Number of spoken items kept in runtime speech history
|
||||
|
||||
USB hardware synths are supported only when Linux exposes them as a serial tty
|
||||
such as `/dev/ttyACM0` or `/dev/ttyUSB0`. A USB-only TripleTalk with no tty
|
||||
device would require a separate USB protocol driver.
|
||||
device would require a separate USB protocol driver. Use an explicit
|
||||
`speech#hardware_device` path for hardware speech.
|
||||
- `speech#auto_read_incoming=True/False` - Auto-read new text
|
||||
|
||||
*Sound Settings:*
|
||||
|
||||
@@ -79,12 +79,12 @@ volume=1.0
|
||||
# Used by dectalkDriver, litetalkDriver, doubletalkDriver, and tripletalkDriver.
|
||||
# USB serial devices are supported if Linux exposes them as /dev/ttyACM*
|
||||
# or /dev/ttyUSB*. USB-only synths with no tty device need a separate driver.
|
||||
# auto checks /dev/ttyACM* first, then /dev/ttyUSB*.
|
||||
# Set an explicit device for hardware speech.
|
||||
# Examples:
|
||||
# hardware_device=/dev/ttyACM0 # RPITalk USB gadget mode
|
||||
# hardware_device=/dev/ttyUSB0 # USB serial adapter
|
||||
# hardware_device=/dev/ttyS0 # built-in serial port
|
||||
hardware_device=auto
|
||||
hardware_device=/dev/ttyS0
|
||||
|
||||
# Serial baud rate for hardware speech synthesizers.
|
||||
hardware_baud_rate=9600
|
||||
|
||||
+1
-4
@@ -1684,12 +1684,9 @@ the pico module:
|
||||
language=de-DE
|
||||
....
|
||||
|
||||
Hardware speech drivers use a serial device. The default `+auto+` checks
|
||||
`+/dev/ttyACM*+` first, then `+/dev/ttyUSB*+`. Set an explicit path for
|
||||
stable systems.
|
||||
Hardware speech drivers use a serial device. Set an explicit path.
|
||||
|
||||
....
|
||||
hardware_device=auto
|
||||
hardware_device=/dev/ttyACM0
|
||||
hardware_device=/dev/ttyUSB0
|
||||
hardware_device=/dev/ttyS0
|
||||
|
||||
+5
-5
@@ -101,7 +101,7 @@ driver=speechdDriver
|
||||
rate=0.5
|
||||
pitch=0.5
|
||||
volume=1.0
|
||||
hardware_device=auto
|
||||
hardware_device=/dev/ttyS0
|
||||
hardware_baud_rate=9600
|
||||
history_size=50
|
||||
|
||||
@@ -341,10 +341,10 @@ Fenrir automatically detects and provides audio feedback for progress indicators
|
||||
- **doubletalkDriver** - Serial DoubleTalk LT-compatible hardware speech
|
||||
- **tripletalkDriver** - Serial TripleTalk-compatible hardware speech
|
||||
|
||||
For hardware speech, set `speech#hardware_device` to `auto` or an explicit
|
||||
serial path. RPITalk gadget mode usually appears as `/dev/ttyACM0`; USB serial
|
||||
adapters usually appear as `/dev/ttyUSB0`; built-in serial ports may be
|
||||
`/dev/ttyS0`. The default baud rate is `9600`. `doubletalkDriver` targets
|
||||
For hardware speech, set `speech#hardware_device` to an explicit serial path.
|
||||
RPITalk gadget mode usually appears as `/dev/ttyACM0`; USB serial adapters
|
||||
usually appear as `/dev/ttyUSB0`; built-in serial ports may be `/dev/ttyS0`.
|
||||
The default baud rate is `9600`. `doubletalkDriver` targets
|
||||
DoubleTalk LT-style serial devices, not the internal DoubleTalk PC ISA card.
|
||||
USB TripleTalk devices work only if Linux exposes them as a serial tty such as
|
||||
`/dev/ttyACM0` or `/dev/ttyUSB0`; USB-only models with no tty device need a
|
||||
|
||||
+1
-2
@@ -927,8 +927,7 @@ Select the language you want Fenrir to use.
|
||||
language=english-us
|
||||
Values: Text, see your TTS synths documentation what is available.
|
||||
|
||||
Hardware speech drivers use a serial device. The default ''auto'' checks /dev/ttyACM* first, then /dev/ttyUSB*. Set an explicit path for stable systems.
|
||||
hardware_device=auto
|
||||
Hardware speech drivers use a serial device. Set an explicit path.
|
||||
hardware_device=/dev/ttyACM0
|
||||
hardware_device=/dev/ttyUSB0
|
||||
hardware_device=/dev/ttyS0
|
||||
|
||||
@@ -82,11 +82,12 @@ class command:
|
||||
if (
|
||||
delta_length > 200
|
||||
): # Allow longer progress lines like Claude Code's status
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
f"Progress filter: delta too long ({delta_length})",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
return False
|
||||
if not self.is_explicit_progress_delta(delta_text):
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
f"Progress filter: delta too long ({delta_length})",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
return False
|
||||
|
||||
# If delta contains newlines and is substantial, let incoming handler
|
||||
# deal with it to avoid interfering with multi-line text output
|
||||
@@ -107,6 +108,25 @@ class command:
|
||||
|
||||
return True
|
||||
|
||||
def is_explicit_progress_delta(self, text):
|
||||
"""Allow long single-line deltas that still look like progress output."""
|
||||
import re
|
||||
|
||||
if "\n" in text or self.contains_url(text):
|
||||
return False
|
||||
|
||||
has_percentage = re.search(r"(^|\s)\d+(?:\.\d+)?\s*%", text)
|
||||
if not has_percentage:
|
||||
return False
|
||||
|
||||
return bool(
|
||||
re.search(
|
||||
r"[|\[\]#=*>█▉▊▋▌▍▎▏▒▓░]"
|
||||
r"|\b\d+(?:\.\d+)?\s*[kKmMgGtT](?:i?B)?/s\b",
|
||||
text,
|
||||
)
|
||||
)
|
||||
|
||||
def reset_progress_state(self):
|
||||
"""Reset progress state when a prompt is detected, allowing new progress operations to start fresh"""
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
|
||||
@@ -127,7 +127,7 @@ class config_command:
|
||||
self.config.set("speech", "rate", "0.75")
|
||||
self.config.set("speech", "pitch", "0.5")
|
||||
self.config.set("speech", "volume", "1.0")
|
||||
self.config.set("speech", "hardware_device", "auto")
|
||||
self.config.set("speech", "hardware_device", "/dev/ttyS0")
|
||||
self.config.set("speech", "hardware_baud_rate", "9600")
|
||||
|
||||
self.config.add_section("sound")
|
||||
|
||||
+1
-1
@@ -108,7 +108,7 @@ class command(config_command):
|
||||
"rate": "0.5",
|
||||
"pitch": "0.5",
|
||||
"volume": "1.0",
|
||||
"hardware_device": "auto",
|
||||
"hardware_device": "/dev/ttyS0",
|
||||
"hardware_baud_rate": "9600",
|
||||
"auto_read_incoming": "True",
|
||||
}
|
||||
|
||||
@@ -311,7 +311,10 @@ class FenrirManager:
|
||||
self.singleKeyCommand = True
|
||||
elif (
|
||||
(
|
||||
self.environment["runtime"]["DiffReviewManager"].is_active()
|
||||
self.environment["runtime"]["VmenuManager"].get_active()
|
||||
or self.environment["runtime"][
|
||||
"DiffReviewManager"
|
||||
].is_active()
|
||||
or self.environment["runtime"][
|
||||
"SpeechHistoryManager"
|
||||
].is_active()
|
||||
|
||||
@@ -28,7 +28,7 @@ settings_data = {
|
||||
"module": "",
|
||||
"voice": "en-us",
|
||||
"language": "",
|
||||
"hardware_device": "auto",
|
||||
"hardware_device": "/dev/ttyS0",
|
||||
"hardware_baud_rate": 9600,
|
||||
"auto_read_incoming": True,
|
||||
"read_numbers_as_digits": False,
|
||||
|
||||
@@ -164,7 +164,7 @@ class SpeechHistoryManager:
|
||||
str([1, ["KEY_KPENTER"]]): "SPEECH_HISTORY_COPY",
|
||||
str([1, ["KEY_ESC"]]): "SPEECH_HISTORY_CLOSE",
|
||||
}
|
||||
self.env["rawBindings"] = {
|
||||
modal_raw_bindings = {
|
||||
str([1, ["KEY_UP"]]): [1, ["KEY_UP"]],
|
||||
str([1, ["KEY_DOWN"]]): [1, ["KEY_DOWN"]],
|
||||
str([1, ["KEY_SPACE"]]): [1, ["KEY_SPACE"]],
|
||||
@@ -172,6 +172,9 @@ class SpeechHistoryManager:
|
||||
str([1, ["KEY_KPENTER"]]): [1, ["KEY_KPENTER"]],
|
||||
str([1, ["KEY_ESC"]]): [1, ["KEY_ESC"]],
|
||||
}
|
||||
self.env["rawBindings"] = self.raw_bindings_backup.copy()
|
||||
self.env["rawBindings"].update(modal_raw_bindings)
|
||||
self._refresh_input_bindings()
|
||||
|
||||
def _restore_bindings(self):
|
||||
if self.bindings_backup is not None:
|
||||
@@ -180,3 +183,21 @@ class SpeechHistoryManager:
|
||||
self.env["rawBindings"] = self.raw_bindings_backup
|
||||
self.bindings_backup = None
|
||||
self.raw_bindings_backup = None
|
||||
self._reset_input_state()
|
||||
self._refresh_input_bindings()
|
||||
|
||||
def _reset_input_state(self):
|
||||
try:
|
||||
self.env["runtime"]["InputManager"].reset_input_state()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _refresh_input_bindings(self):
|
||||
try:
|
||||
refresh_grabs = getattr(
|
||||
self.env["runtime"]["InputDriver"], "refresh_grabs", None
|
||||
)
|
||||
if refresh_grabs:
|
||||
refresh_grabs(force=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -4,5 +4,5 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
version = "2026.05.21"
|
||||
code_name = "testing"
|
||||
version = "2026.05.24"
|
||||
code_name = "master"
|
||||
|
||||
@@ -333,10 +333,79 @@ class driver(screenDriver):
|
||||
def handle_stdin_input(self, msg_bytes, event_queue):
|
||||
if self.synthesize_backspace_shortcut(msg_bytes, event_queue):
|
||||
return
|
||||
if self.handle_vmenu_stdin_input(msg_bytes, event_queue):
|
||||
return
|
||||
self.record_stdin_keypress(msg_bytes)
|
||||
self.interrupt_output_on_stdin_input(msg_bytes)
|
||||
self.inject_text_to_screen(msg_bytes)
|
||||
|
||||
def handle_vmenu_stdin_input(self, msg_bytes, event_queue):
|
||||
if not self.is_vmenu_active():
|
||||
return False
|
||||
key_name = self.vmenu_stdin_key_name(msg_bytes)
|
||||
if key_name and not self.vmenu_key_already_handled(key_name):
|
||||
self.queue_keypress(key_name, event_queue)
|
||||
return True
|
||||
|
||||
def is_vmenu_active(self):
|
||||
try:
|
||||
return self.env["runtime"]["VmenuManager"].get_active()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def vmenu_stdin_key_name(self, msg_bytes):
|
||||
key_map = {
|
||||
b"\x1b": "KEY_ESC",
|
||||
b"\x1b[A": "KEY_UP",
|
||||
b"\x1b[B": "KEY_DOWN",
|
||||
b"\x1b[C": "KEY_RIGHT",
|
||||
b"\x1b[D": "KEY_LEFT",
|
||||
b"\x1b[5~": "KEY_PAGEUP",
|
||||
b"\x1b[6~": "KEY_PAGEDOWN",
|
||||
b"\r": "KEY_ENTER",
|
||||
b"\n": "KEY_ENTER",
|
||||
b" ": "KEY_SPACE",
|
||||
}
|
||||
if msg_bytes in key_map:
|
||||
return key_map[msg_bytes]
|
||||
if len(msg_bytes) != 1:
|
||||
return None
|
||||
char = chr(msg_bytes[0])
|
||||
if "a" <= char <= "z" or "A" <= char <= "Z":
|
||||
return "KEY_" + char.upper()
|
||||
return None
|
||||
|
||||
def vmenu_key_already_handled(self, key_name):
|
||||
try:
|
||||
return key_name in self.env["input"]["curr_input"]
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def queue_keypress(self, key_name, event_queue):
|
||||
event_time = time.time()
|
||||
for event_state in [1, 0]:
|
||||
try:
|
||||
event_queue.put(
|
||||
{
|
||||
"Type": FenrirEventType.keyboard_input,
|
||||
"data": {
|
||||
"event_name": key_name,
|
||||
"event_value": 0,
|
||||
"event_sec": int(event_time),
|
||||
"event_usec": int((event_time % 1) * 1000000),
|
||||
"event_state": event_state,
|
||||
"event_type": 0,
|
||||
},
|
||||
},
|
||||
block=False,
|
||||
)
|
||||
except Full:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"ptyDriver queue_keypress: Event queue full, dropping "
|
||||
+ key_name,
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
|
||||
def record_stdin_keypress(self, msg_bytes):
|
||||
if msg_bytes != b"\t":
|
||||
return
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
import glob
|
||||
import os
|
||||
import termios
|
||||
import threading
|
||||
@@ -43,14 +42,22 @@ class hardware_serial_driver(speech_driver):
|
||||
self.env = environment
|
||||
self._is_initialized = False
|
||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||
self.device = settings_manager.get_setting(
|
||||
"speech", "hardware_device"
|
||||
self.device = self._clean_device_setting(
|
||||
settings_manager.get_setting("speech", "hardware_device")
|
||||
)
|
||||
self.baud_rate = settings_manager.get_setting_as_int(
|
||||
"speech", "hardware_baud_rate"
|
||||
)
|
||||
self._debug(
|
||||
"Hardware speech initialize: "
|
||||
f"requested_device={self.device}, baud_rate={self.baud_rate}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
self._open_serial_port()
|
||||
self._is_initialized = self.serial_port is not None
|
||||
if not self._is_initialized:
|
||||
raise RuntimeError("hardware speech device is not available")
|
||||
if self._is_initialized:
|
||||
self._stop_worker = False
|
||||
self.worker_thread = threading.Thread(
|
||||
@@ -58,6 +65,12 @@ class hardware_serial_driver(speech_driver):
|
||||
)
|
||||
self.worker_thread.start()
|
||||
|
||||
def _clean_device_setting(self, device):
|
||||
if not isinstance(device, str):
|
||||
return ""
|
||||
device = device.split("#", 1)[0].split(";", 1)[0].strip()
|
||||
return device
|
||||
|
||||
def shutdown(self):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
@@ -76,6 +89,12 @@ class hardware_serial_driver(speech_driver):
|
||||
self.cancel()
|
||||
if not isinstance(text, str) or text == "":
|
||||
return
|
||||
self._debug(
|
||||
"Hardware speech queued text: "
|
||||
f"{len(text)} chars, queue_size={self.text_queue.qsize()}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
self.text_queue.put(text)
|
||||
|
||||
def cancel(self):
|
||||
@@ -83,7 +102,7 @@ class hardware_serial_driver(speech_driver):
|
||||
return
|
||||
self.clear_buffer()
|
||||
if self.cancel_command:
|
||||
self._write_bytes(self.cancel_command)
|
||||
self._write_bytes(self.cancel_command, "cancel")
|
||||
|
||||
def clear_buffer(self):
|
||||
if not self._is_initialized:
|
||||
@@ -95,37 +114,58 @@ class hardware_serial_driver(speech_driver):
|
||||
return
|
||||
if not isinstance(rate, float):
|
||||
return
|
||||
self._write_bytes(self._rate_command(rate))
|
||||
self._write_bytes(self._rate_command(rate), "rate")
|
||||
|
||||
def set_pitch(self, pitch):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not isinstance(pitch, float):
|
||||
return
|
||||
self._write_bytes(self._pitch_command(pitch))
|
||||
self._write_bytes(self._pitch_command(pitch), "pitch")
|
||||
|
||||
def set_volume(self, volume):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not isinstance(volume, float):
|
||||
return
|
||||
self._write_bytes(self._volume_command(volume))
|
||||
self._write_bytes(self._volume_command(volume), "volume")
|
||||
|
||||
def _worker(self):
|
||||
while not self._stop_worker:
|
||||
text = self.text_queue.get()
|
||||
if text is None:
|
||||
return
|
||||
self._write_bytes(self._speak_bytes(text))
|
||||
try:
|
||||
data = self._speak_bytes(text)
|
||||
self._debug(
|
||||
"Hardware speech worker prepared speech bytes: "
|
||||
f"{len(data)} bytes",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
self._write_bytes(data, "speech")
|
||||
except Exception as error:
|
||||
self._debug(
|
||||
f"Hardware speech worker failed: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
def _open_serial_port(self):
|
||||
device = self._resolve_device(self.device)
|
||||
if not device:
|
||||
if not self.device or self.device == "auto":
|
||||
self._debug(
|
||||
"Hardware speech device not found",
|
||||
"Hardware speech requires an explicit serial device",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
return
|
||||
|
||||
port = self._open_configured_serial_port(self.device)
|
||||
if port is not None:
|
||||
self._activate_serial_port(self.device, port)
|
||||
|
||||
def _open_configured_serial_port(self, device):
|
||||
port = None
|
||||
try:
|
||||
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
|
||||
tty.setraw(port)
|
||||
@@ -138,52 +178,73 @@ class hardware_serial_driver(speech_driver):
|
||||
attrs[6][termios.VTIME] = 0
|
||||
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
|
||||
termios.tcsetattr(port, termios.TCSANOW, attrs)
|
||||
self.serial_port = port
|
||||
self.device = device
|
||||
except OSError as error:
|
||||
return port
|
||||
except (OSError, termios.error) as error:
|
||||
self._close_port(port)
|
||||
self._debug(
|
||||
f"Hardware speech device open failed: {device}: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
self.serial_port = None
|
||||
return None
|
||||
|
||||
def _activate_serial_port(self, device, port):
|
||||
self.serial_port = port
|
||||
self.device = device
|
||||
self._debug(
|
||||
"Hardware speech device opened: "
|
||||
f"{device}, baud_rate={self.baud_rate}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
def _close_serial_port(self):
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
try:
|
||||
os.close(self.serial_port)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech device close failed: {error}",
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
finally:
|
||||
self.serial_port = None
|
||||
self._close_port(self.serial_port)
|
||||
self.serial_port = None
|
||||
|
||||
def _write_bytes(self, data):
|
||||
def _close_port(self, port):
|
||||
if port is None:
|
||||
return
|
||||
try:
|
||||
os.close(port)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech device close failed: {error}",
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
|
||||
def _write_bytes(self, data, description="data"):
|
||||
if not data:
|
||||
return
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
try:
|
||||
os.write(self.serial_port, data)
|
||||
total_written = 0
|
||||
while total_written < len(data):
|
||||
bytes_written = os.write(
|
||||
self.serial_port, data[total_written:]
|
||||
)
|
||||
if bytes_written == 0:
|
||||
raise OSError("serial write returned 0 bytes")
|
||||
total_written += bytes_written
|
||||
preview = self._format_bytes_preview(data)
|
||||
self._debug(
|
||||
"Hardware speech wrote "
|
||||
f"{total_written} {description} bytes: {preview}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech write failed: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
def _resolve_device(self, device):
|
||||
if device and device != "auto":
|
||||
return device
|
||||
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*"):
|
||||
matches = sorted(glob.glob(pattern))
|
||||
if matches:
|
||||
return matches[0]
|
||||
return ""
|
||||
|
||||
def _termios_baud_rate(self, baud_rate):
|
||||
baud_name = f"B{baud_rate}"
|
||||
if hasattr(termios, baud_name):
|
||||
@@ -205,10 +266,23 @@ class hardware_serial_driver(speech_driver):
|
||||
value = max(0.0, min(1.0, value))
|
||||
return int(round(minimum + value * (maximum - minimum)))
|
||||
|
||||
def _debug(self, message, level):
|
||||
def _format_bytes_preview(self, data, limit=32):
|
||||
preview = data[:limit]
|
||||
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
|
||||
ascii_preview = "".join(
|
||||
chr(byte) if 0x20 <= byte <= 0x7E else "."
|
||||
for byte in preview
|
||||
)
|
||||
suffix = "" if len(data) <= limit else " ..."
|
||||
return (
|
||||
f"hex=[{hex_preview}{suffix}] "
|
||||
f"ascii=[{ascii_preview}{suffix}]"
|
||||
)
|
||||
|
||||
def _debug(self, message, level, on_any_level=False):
|
||||
try:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
message, level
|
||||
message, level, on_any_level=on_any_level
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -22,6 +22,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
)
|
||||
speech_history_manager = Mock(is_active=Mock(return_value=True))
|
||||
diff_review_manager = Mock(is_active=Mock(return_value=False))
|
||||
vmenu_manager = Mock(get_active=Mock(return_value=False))
|
||||
|
||||
manager.environment = {
|
||||
"input": {
|
||||
@@ -32,6 +33,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"EventManager": event_manager,
|
||||
"VmenuManager": vmenu_manager,
|
||||
"DiffReviewManager": diff_review_manager,
|
||||
"SpeechHistoryManager": speech_history_manager,
|
||||
},
|
||||
@@ -42,3 +44,43 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
event_manager.put_to_event_queue.assert_called_once_with(
|
||||
FenrirEventType.execute_command, "SPEECH_HISTORY_PREV"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_vmenu_plain_key_modal_command_is_dispatched():
|
||||
manager = FenrirManager.__new__(FenrirManager)
|
||||
manager.modifierInput = False
|
||||
manager.singleKeyCommand = False
|
||||
manager.command = ""
|
||||
|
||||
event_manager = Mock(put_to_event_queue=Mock())
|
||||
input_manager = Mock(
|
||||
is_key_press=Mock(return_value=False),
|
||||
no_key_pressed=Mock(return_value=False),
|
||||
get_curr_shortcut=Mock(return_value=str([1, ["KEY_UP"]])),
|
||||
get_command_for_shortcut=Mock(return_value="PREV_VMENU_ENTRY"),
|
||||
)
|
||||
vmenu_manager = Mock(get_active=Mock(return_value=True))
|
||||
speech_history_manager = Mock(is_active=Mock(return_value=False))
|
||||
diff_review_manager = Mock(is_active=Mock(return_value=False))
|
||||
|
||||
manager.environment = {
|
||||
"input": {
|
||||
"key_forward": 0,
|
||||
"prev_input": ["KEY_UP"],
|
||||
"curr_input": ["KEY_UP"],
|
||||
},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"EventManager": event_manager,
|
||||
"VmenuManager": vmenu_manager,
|
||||
"DiffReviewManager": diff_review_manager,
|
||||
"SpeechHistoryManager": speech_history_manager,
|
||||
},
|
||||
}
|
||||
|
||||
manager.detect_shortcut_command()
|
||||
|
||||
event_manager.put_to_event_queue.assert_called_once_with(
|
||||
FenrirEventType.execute_command, "PREV_VMENU_ENTRY"
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
import select
|
||||
import time
|
||||
from unittest.mock import ANY
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
@@ -103,6 +104,67 @@ def test_litetalk_driver_writes_settings_and_cancel(serial_pair):
|
||||
speech_driver.shutdown()
|
||||
|
||||
|
||||
def test_configured_device_supports_classic_serial(serial_pair):
|
||||
master_fd, slave_name = serial_pair
|
||||
speech_driver = litetalkDriver.driver()
|
||||
speech_driver.initialize(build_environment(slave_name))
|
||||
try:
|
||||
assert speech_driver.device == slave_name
|
||||
speech_driver.speak("Serial")
|
||||
assert read_available(master_fd, 7) == b"Serial\r"
|
||||
finally:
|
||||
speech_driver.shutdown()
|
||||
|
||||
|
||||
def test_configured_device_strips_inline_comment(serial_pair):
|
||||
master_fd, slave_name = serial_pair
|
||||
device_setting = f"{slave_name} # built-in serial port"
|
||||
speech_driver = litetalkDriver.driver()
|
||||
speech_driver.initialize(build_environment(device_setting))
|
||||
try:
|
||||
assert speech_driver.device == slave_name
|
||||
speech_driver.speak("Specific")
|
||||
assert read_available(master_fd, 9) == b"Specific\r"
|
||||
finally:
|
||||
speech_driver.shutdown()
|
||||
|
||||
|
||||
def test_auto_device_is_rejected():
|
||||
speech_driver = litetalkDriver.driver()
|
||||
|
||||
with pytest.raises(RuntimeError, match="hardware speech device"):
|
||||
speech_driver.initialize(build_environment("auto"))
|
||||
|
||||
debug_manager = speech_driver.env["runtime"]["DebugManager"]
|
||||
debug_manager.write_debug_out.assert_called_with(
|
||||
"Hardware speech requires an explicit serial device",
|
||||
ANY,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
|
||||
def test_hardware_driver_retries_partial_serial_writes(monkeypatch):
|
||||
speech_driver = litetalkDriver.driver()
|
||||
speech_driver.env = build_environment("/dev/ttyUSB0")
|
||||
speech_driver.serial_port = 12
|
||||
written_chunks = []
|
||||
|
||||
def fake_write(port, data):
|
||||
assert port == 12
|
||||
chunk = data[:2]
|
||||
written_chunks.append(chunk)
|
||||
return len(chunk)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
|
||||
fake_write,
|
||||
)
|
||||
|
||||
speech_driver._write_bytes(b"abcdef", "speech")
|
||||
|
||||
assert written_chunks == [b"ab", b"cd", b"ef"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
|
||||
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
|
||||
speech_driver, master_fd = initialized_driver(driver_class, serial_pair)
|
||||
|
||||
@@ -39,3 +39,73 @@ def test_progress_detector_skips_typing_delta():
|
||||
|
||||
command.is_real_progress_update.assert_not_called()
|
||||
command.detect_progress.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_progress_detector_allows_long_tqdm_transfer_delta():
|
||||
progress_module = _load_progress_module()
|
||||
command = progress_module.command()
|
||||
sample = (
|
||||
"88%|"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"█████████████████████████▊ "
|
||||
"| 843M/954M [00:54<00:07, 15.2MB/s]"
|
||||
)
|
||||
command.env = {
|
||||
"commandBuffer": {"progress_monitoring": True},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
|
||||
},
|
||||
"screen": {
|
||||
"new_delta": sample,
|
||||
"new_content_text": sample,
|
||||
"old_cursor": {"x": 0, "y": 0},
|
||||
"new_cursor": {"x": 0, "y": 0},
|
||||
},
|
||||
}
|
||||
|
||||
assert len(sample) > 200
|
||||
assert command.is_real_progress_update()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_progress_detector_beeps_for_long_tqdm_transfer_delta():
|
||||
progress_module = _load_progress_module()
|
||||
command = progress_module.command()
|
||||
sample = (
|
||||
"90%|"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"█████████████████████████████████████▍ "
|
||||
"| 856M/954M [00:56<00:14, 6.78MB/s]"
|
||||
)
|
||||
command.env = {
|
||||
"commandBuffer": {
|
||||
"progress_monitoring": True,
|
||||
"lastProgressValue": -1,
|
||||
"lastProgressTime": 0,
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
|
||||
},
|
||||
"screen": {
|
||||
"new_delta": sample,
|
||||
"new_delta_is_typing": False,
|
||||
"new_content_text": sample,
|
||||
"old_cursor": {"x": 0, "y": 0},
|
||||
"new_cursor": {"x": 0, "y": 0},
|
||||
},
|
||||
}
|
||||
command.play_progress_tone = Mock()
|
||||
|
||||
command.run()
|
||||
|
||||
command.play_progress_tone.assert_called_once_with(90.0)
|
||||
assert command.env["commandBuffer"]["lastProgressValue"] == 90.0
|
||||
|
||||
@@ -160,6 +160,90 @@ def test_pty_plain_stdin_does_not_record_tab_keypress():
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize(
|
||||
("sequence", "key_name"),
|
||||
[
|
||||
(b"\x1b[A", "KEY_UP"),
|
||||
(b"\x1b[B", "KEY_DOWN"),
|
||||
(b"\x1b[C", "KEY_RIGHT"),
|
||||
(b"\x1b[D", "KEY_LEFT"),
|
||||
(b"\x1b[5~", "KEY_PAGEUP"),
|
||||
(b"\x1b[6~", "KEY_PAGEDOWN"),
|
||||
(b"\x1b", "KEY_ESC"),
|
||||
(b"\r", "KEY_ENTER"),
|
||||
(b" ", "KEY_SPACE"),
|
||||
(b"a", "KEY_A"),
|
||||
(b"Z", "KEY_Z"),
|
||||
],
|
||||
)
|
||||
def test_pty_vmenu_stdin_is_consumed_and_synthesizes_key_events(
|
||||
sequence,
|
||||
key_name,
|
||||
):
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"SettingsManager": settings_manager,
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(sequence, event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
assert event_queue.put.call_count == 2
|
||||
first_event = event_queue.put.call_args_list[0].args[0]
|
||||
second_event = event_queue.put.call_args_list[1].args[0]
|
||||
assert first_event["Type"] == FenrirEventType.keyboard_input
|
||||
assert first_event["data"]["event_name"] == key_name
|
||||
assert first_event["data"]["event_state"] == 1
|
||||
assert second_event["data"]["event_name"] == key_name
|
||||
assert second_event["data"]["event_state"] == 0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vmenu_unknown_stdin_is_consumed_without_injection():
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[1;5A", event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
event_queue.put.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vmenu_stdin_does_not_duplicate_current_x11_key():
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_RIGHT"]},
|
||||
"runtime": {
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[C", event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
event_queue.put.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_input_honors_interrupt_disabled():
|
||||
pty_driver = PtyDriver()
|
||||
|
||||
@@ -16,11 +16,15 @@ def build_speech_history_manager(history_size=3):
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_int.return_value = history_size
|
||||
memory_manager = Mock(add_value_to_first_index=Mock())
|
||||
input_manager = Mock(reset_input_state=Mock())
|
||||
input_driver = Mock(refresh_grabs=Mock())
|
||||
env = {
|
||||
"runtime": {
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
"MemoryManager": memory_manager,
|
||||
"InputManager": input_manager,
|
||||
"InputDriver": input_driver,
|
||||
},
|
||||
"bindings": {"original": "COMMAND"},
|
||||
"rawBindings": {"original": [1, ["KEY_FENRIR"]]},
|
||||
@@ -90,6 +94,7 @@ def test_open_history_installs_modal_bindings_and_replay_is_not_recorded():
|
||||
manager, env, spoken_messages, _memory_manager = (
|
||||
build_speech_history_manager()
|
||||
)
|
||||
env["rawBindings"]["ctrl_shut_up"] = [1, ["KEY_CTRL"]]
|
||||
manager.add_text("first")
|
||||
manager.add_text("second")
|
||||
|
||||
@@ -101,10 +106,14 @@ def test_open_history_installs_modal_bindings_and_replay_is_not_recorded():
|
||||
assert manager.curr_index == -1
|
||||
assert manager.history == ["second", "first"]
|
||||
assert "original" not in env["bindings"]
|
||||
assert "original" not in env["rawBindings"]
|
||||
assert env["rawBindings"]["original"] == [1, ["KEY_FENRIR"]]
|
||||
assert env["rawBindings"]["ctrl_shut_up"] == [1, ["KEY_CTRL"]]
|
||||
assert env["bindings"][str([1, ["KEY_UP"]])] == "SPEECH_HISTORY_PREV"
|
||||
assert env["bindings"][str([1, ["KEY_ENTER"]])] == "SPEECH_HISTORY_COPY"
|
||||
assert env["bindings"][str([1, ["KEY_ESC"]])] == "SPEECH_HISTORY_CLOSE"
|
||||
assert env["rawBindings"][str([1, ["KEY_UP"]])] == [1, ["KEY_UP"]]
|
||||
input_driver = env["runtime"]["InputDriver"]
|
||||
input_driver.refresh_grabs.assert_called_once_with(force=True)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -145,3 +154,22 @@ def test_copy_current_adds_clipboard_and_restores_bindings():
|
||||
assert not manager.is_active()
|
||||
assert env["bindings"] == {"original": "COMMAND"}
|
||||
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
|
||||
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
|
||||
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_close_history_restores_keyboard_state_and_grabs():
|
||||
manager, env, _spoken_messages, _memory_manager = (
|
||||
build_speech_history_manager()
|
||||
)
|
||||
manager.add_text("first")
|
||||
manager.open_history()
|
||||
|
||||
manager.close_history()
|
||||
|
||||
assert not manager.is_active()
|
||||
assert env["bindings"] == {"original": "COMMAND"}
|
||||
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
|
||||
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
|
||||
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2
|
||||
|
||||
Reference in New Issue
Block a user