18 Commits

Author SHA1 Message Date
Storm Dragon bd54ec0edb Fixed version. 2026-05-24 17:14:58 -04:00
Storm Dragon b9518f52ec Vmenu fixed I think. Hopefully last thing before new version. 2026-05-24 17:13:38 -04:00
Storm Dragon c143c9a561 Found a vmenu bug in -x. I thought we were close to a new release... 2026-05-24 17:03:41 -04:00
Storm Dragon 7e2f927596 fixed version. 2026-05-24 14:15:02 -04:00
Storm Dragon 788e678ed6 Attempted fix for some progress bars that were being skipped by progress bar detection. 2026-05-24 14:13:29 -04:00
Storm Dragon ea89e90c2f Merge branch 'testing' Hopefully final release candidate for the new version. 2026-05-23 18:59:14 -04:00
Storm Dragon ce43d64e77 Removed auto as a hardware synth device option. It was too flakey. 2026-05-23 18:58:55 -04:00
Storm Dragon 618987546a Adjust timeout for auto detection. I forgot these devices would be slow because most of them are very old with much less speed than would be expected today. 2026-05-23 18:41:42 -04:00
Storm Dragon 604221a29d Attempt to make auto at least somewhat more reliable. Recommend that device be explicitly set if possible. 2026-05-23 18:23:58 -04:00
Storm Dragon 89b85c6f17 Hardware synth code now verified working. New release candidate. 2026-05-23 18:05:55 -04:00
Storm Dragon 6e3d7fee94 Parse the settings correctly lol. 2026-05-23 17:57:02 -04:00
Storm Dragon 089850ac18 More hw synth refinement. 2026-05-23 17:39:16 -04:00
Storm Dragon 5b7c08260a Another iteration based on feedback from hardware synth testing. 2026-05-23 17:23:52 -04:00
Storm Dragon d4b2fec1db speculative fixes for hardware speech. 2026-05-23 17:10:46 -04:00
Storm Dragon 1f7aa99cc0 Release candidate. 2026-05-23 16:13:51 -04:00
Storm Dragon d853e1b24d Fixed the -x keyboard problem for real this time I'm pretty sure. 2026-05-22 20:23:31 -04:00
Storm Dragon e8bc34eaf5 Merge bug fixes, fix version. 2026-05-21 01:08:27 -04:00
Storm Dragon f84167a7fb Of course, soon as I feel things are stable enough to merge to master bugs come crawling out of the woodwork. Fix for being sure Fenrir switches out of its modal mode completely when leaving speech history. 2026-05-21 01:07:01 -04:00
19 changed files with 535 additions and 65 deletions
+3 -2
View File
@@ -460,13 +460,14 @@ setting <action> [parameters]
- `speech#voice=voice_name` - Voice selection (e.g., "en-us+f3")
- `speech#module=module_name` - TTS module (e.g., "espeak-ng")
- `speech#driver=driver_name` - Speech driver (speechdDriver/genericDriver/dectalkDriver/litetalkDriver/doubletalkDriver/tripletalkDriver)
- `speech#hardware_device=auto` - Hardware synth serial device for dectalkDriver/litetalkDriver
- `speech#hardware_device=/dev/ttyS0` - Hardware synth serial device for dectalkDriver/litetalkDriver
- `speech#hardware_baud_rate=9600` - Hardware synth serial baud rate
- `speech#history_size=50` - Number of spoken items kept in runtime speech history
USB hardware synths are supported only when Linux exposes them as a serial tty
such as `/dev/ttyACM0` or `/dev/ttyUSB0`. A USB-only TripleTalk with no tty
device would require a separate USB protocol driver.
device would require a separate USB protocol driver. Use an explicit
`speech#hardware_device` path for hardware speech.
- `speech#auto_read_incoming=True/False` - Auto-read new text
*Sound Settings:*
+2 -2
View File
@@ -79,12 +79,12 @@ volume=1.0
# Used by dectalkDriver, litetalkDriver, doubletalkDriver, and tripletalkDriver.
# USB serial devices are supported if Linux exposes them as /dev/ttyACM*
# or /dev/ttyUSB*. USB-only synths with no tty device need a separate driver.
# auto checks /dev/ttyACM* first, then /dev/ttyUSB*.
# Set an explicit device for hardware speech.
# Examples:
# hardware_device=/dev/ttyACM0 # RPITalk USB gadget mode
# hardware_device=/dev/ttyUSB0 # USB serial adapter
# hardware_device=/dev/ttyS0 # built-in serial port
hardware_device=auto
hardware_device=/dev/ttyS0
# Serial baud rate for hardware speech synthesizers.
hardware_baud_rate=9600
+1 -4
View File
@@ -1684,12 +1684,9 @@ the pico module:
language=de-DE
....
Hardware speech drivers use a serial device. The default `+auto+` checks
`+/dev/ttyACM*+` first, then `+/dev/ttyUSB*+`. Set an explicit path for
stable systems.
Hardware speech drivers use a serial device. Set an explicit path.
....
hardware_device=auto
hardware_device=/dev/ttyACM0
hardware_device=/dev/ttyUSB0
hardware_device=/dev/ttyS0
+5 -5
View File
@@ -101,7 +101,7 @@ driver=speechdDriver
rate=0.5
pitch=0.5
volume=1.0
hardware_device=auto
hardware_device=/dev/ttyS0
hardware_baud_rate=9600
history_size=50
@@ -341,10 +341,10 @@ Fenrir automatically detects and provides audio feedback for progress indicators
- **doubletalkDriver** - Serial DoubleTalk LT-compatible hardware speech
- **tripletalkDriver** - Serial TripleTalk-compatible hardware speech
For hardware speech, set `speech#hardware_device` to `auto` or an explicit
serial path. RPITalk gadget mode usually appears as `/dev/ttyACM0`; USB serial
adapters usually appear as `/dev/ttyUSB0`; built-in serial ports may be
`/dev/ttyS0`. The default baud rate is `9600`. `doubletalkDriver` targets
For hardware speech, set `speech#hardware_device` to an explicit serial path.
RPITalk gadget mode usually appears as `/dev/ttyACM0`; USB serial adapters
usually appear as `/dev/ttyUSB0`; built-in serial ports may be `/dev/ttyS0`.
The default baud rate is `9600`. `doubletalkDriver` targets
DoubleTalk LT-style serial devices, not the internal DoubleTalk PC ISA card.
USB TripleTalk devices work only if Linux exposes them as a serial tty such as
`/dev/ttyACM0` or `/dev/ttyUSB0`; USB-only models with no tty device need a
+1 -2
View File
@@ -927,8 +927,7 @@ Select the language you want Fenrir to use.
language=english-us
Values: Text, see your TTS synths documentation what is available.
Hardware speech drivers use a serial device. The default ''auto'' checks /dev/ttyACM* first, then /dev/ttyUSB*. Set an explicit path for stable systems.
hardware_device=auto
Hardware speech drivers use a serial device. Set an explicit path.
hardware_device=/dev/ttyACM0
hardware_device=/dev/ttyUSB0
hardware_device=/dev/ttyS0
@@ -82,11 +82,12 @@ class command:
if (
delta_length > 200
): # Allow longer progress lines like Claude Code's status
self.env["runtime"]["DebugManager"].write_debug_out(
f"Progress filter: delta too long ({delta_length})",
debug.DebugLevel.INFO,
)
return False
if not self.is_explicit_progress_delta(delta_text):
self.env["runtime"]["DebugManager"].write_debug_out(
f"Progress filter: delta too long ({delta_length})",
debug.DebugLevel.INFO,
)
return False
# If delta contains newlines and is substantial, let incoming handler
# deal with it to avoid interfering with multi-line text output
@@ -107,6 +108,25 @@ class command:
return True
def is_explicit_progress_delta(self, text):
"""Allow long single-line deltas that still look like progress output."""
import re
if "\n" in text or self.contains_url(text):
return False
has_percentage = re.search(r"(^|\s)\d+(?:\.\d+)?\s*%", text)
if not has_percentage:
return False
return bool(
re.search(
r"[|\[\]#=*>█▉▊▋▌▍▎▏▒▓░]"
r"|\b\d+(?:\.\d+)?\s*[kKmMgGtT](?:i?B)?/s\b",
text,
)
)
def reset_progress_state(self):
"""Reset progress state when a prompt is detected, allowing new progress operations to start fresh"""
self.env["runtime"]["DebugManager"].write_debug_out(
@@ -127,7 +127,7 @@ class config_command:
self.config.set("speech", "rate", "0.75")
self.config.set("speech", "pitch", "0.5")
self.config.set("speech", "volume", "1.0")
self.config.set("speech", "hardware_device", "auto")
self.config.set("speech", "hardware_device", "/dev/ttyS0")
self.config.set("speech", "hardware_baud_rate", "9600")
self.config.add_section("sound")
@@ -108,7 +108,7 @@ class command(config_command):
"rate": "0.5",
"pitch": "0.5",
"volume": "1.0",
"hardware_device": "auto",
"hardware_device": "/dev/ttyS0",
"hardware_baud_rate": "9600",
"auto_read_incoming": "True",
}
+4 -1
View File
@@ -311,7 +311,10 @@ class FenrirManager:
self.singleKeyCommand = True
elif (
(
self.environment["runtime"]["DiffReviewManager"].is_active()
self.environment["runtime"]["VmenuManager"].get_active()
or self.environment["runtime"][
"DiffReviewManager"
].is_active()
or self.environment["runtime"][
"SpeechHistoryManager"
].is_active()
+1 -1
View File
@@ -28,7 +28,7 @@ settings_data = {
"module": "",
"voice": "en-us",
"language": "",
"hardware_device": "auto",
"hardware_device": "/dev/ttyS0",
"hardware_baud_rate": 9600,
"auto_read_incoming": True,
"read_numbers_as_digits": False,
@@ -164,7 +164,7 @@ class SpeechHistoryManager:
str([1, ["KEY_KPENTER"]]): "SPEECH_HISTORY_COPY",
str([1, ["KEY_ESC"]]): "SPEECH_HISTORY_CLOSE",
}
self.env["rawBindings"] = {
modal_raw_bindings = {
str([1, ["KEY_UP"]]): [1, ["KEY_UP"]],
str([1, ["KEY_DOWN"]]): [1, ["KEY_DOWN"]],
str([1, ["KEY_SPACE"]]): [1, ["KEY_SPACE"]],
@@ -172,6 +172,9 @@ class SpeechHistoryManager:
str([1, ["KEY_KPENTER"]]): [1, ["KEY_KPENTER"]],
str([1, ["KEY_ESC"]]): [1, ["KEY_ESC"]],
}
self.env["rawBindings"] = self.raw_bindings_backup.copy()
self.env["rawBindings"].update(modal_raw_bindings)
self._refresh_input_bindings()
def _restore_bindings(self):
if self.bindings_backup is not None:
@@ -180,3 +183,21 @@ class SpeechHistoryManager:
self.env["rawBindings"] = self.raw_bindings_backup
self.bindings_backup = None
self.raw_bindings_backup = None
self._reset_input_state()
self._refresh_input_bindings()
def _reset_input_state(self):
try:
self.env["runtime"]["InputManager"].reset_input_state()
except Exception:
pass
def _refresh_input_bindings(self):
try:
refresh_grabs = getattr(
self.env["runtime"]["InputDriver"], "refresh_grabs", None
)
if refresh_grabs:
refresh_grabs(force=True)
except Exception:
pass
+2 -2
View File
@@ -4,5 +4,5 @@
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
version = "2026.05.21"
code_name = "testing"
version = "2026.05.24"
code_name = "master"
@@ -333,10 +333,79 @@ class driver(screenDriver):
def handle_stdin_input(self, msg_bytes, event_queue):
if self.synthesize_backspace_shortcut(msg_bytes, event_queue):
return
if self.handle_vmenu_stdin_input(msg_bytes, event_queue):
return
self.record_stdin_keypress(msg_bytes)
self.interrupt_output_on_stdin_input(msg_bytes)
self.inject_text_to_screen(msg_bytes)
def handle_vmenu_stdin_input(self, msg_bytes, event_queue):
if not self.is_vmenu_active():
return False
key_name = self.vmenu_stdin_key_name(msg_bytes)
if key_name and not self.vmenu_key_already_handled(key_name):
self.queue_keypress(key_name, event_queue)
return True
def is_vmenu_active(self):
try:
return self.env["runtime"]["VmenuManager"].get_active()
except Exception:
return False
def vmenu_stdin_key_name(self, msg_bytes):
key_map = {
b"\x1b": "KEY_ESC",
b"\x1b[A": "KEY_UP",
b"\x1b[B": "KEY_DOWN",
b"\x1b[C": "KEY_RIGHT",
b"\x1b[D": "KEY_LEFT",
b"\x1b[5~": "KEY_PAGEUP",
b"\x1b[6~": "KEY_PAGEDOWN",
b"\r": "KEY_ENTER",
b"\n": "KEY_ENTER",
b" ": "KEY_SPACE",
}
if msg_bytes in key_map:
return key_map[msg_bytes]
if len(msg_bytes) != 1:
return None
char = chr(msg_bytes[0])
if "a" <= char <= "z" or "A" <= char <= "Z":
return "KEY_" + char.upper()
return None
def vmenu_key_already_handled(self, key_name):
try:
return key_name in self.env["input"]["curr_input"]
except Exception:
return False
def queue_keypress(self, key_name, event_queue):
event_time = time.time()
for event_state in [1, 0]:
try:
event_queue.put(
{
"Type": FenrirEventType.keyboard_input,
"data": {
"event_name": key_name,
"event_value": 0,
"event_sec": int(event_time),
"event_usec": int((event_time % 1) * 1000000),
"event_state": event_state,
"event_type": 0,
},
},
block=False,
)
except Full:
self.env["runtime"]["DebugManager"].write_debug_out(
"ptyDriver queue_keypress: Event queue full, dropping "
+ key_name,
debug.DebugLevel.WARNING,
)
def record_stdin_keypress(self, msg_bytes):
if msg_bytes != b"\t":
return
@@ -4,7 +4,6 @@
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import glob
import os
import termios
import threading
@@ -43,14 +42,22 @@ class hardware_serial_driver(speech_driver):
self.env = environment
self._is_initialized = False
settings_manager = self.env["runtime"]["SettingsManager"]
self.device = settings_manager.get_setting(
"speech", "hardware_device"
self.device = self._clean_device_setting(
settings_manager.get_setting("speech", "hardware_device")
)
self.baud_rate = settings_manager.get_setting_as_int(
"speech", "hardware_baud_rate"
)
self._debug(
"Hardware speech initialize: "
f"requested_device={self.device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO,
on_any_level=True,
)
self._open_serial_port()
self._is_initialized = self.serial_port is not None
if not self._is_initialized:
raise RuntimeError("hardware speech device is not available")
if self._is_initialized:
self._stop_worker = False
self.worker_thread = threading.Thread(
@@ -58,6 +65,12 @@ class hardware_serial_driver(speech_driver):
)
self.worker_thread.start()
def _clean_device_setting(self, device):
if not isinstance(device, str):
return ""
device = device.split("#", 1)[0].split(";", 1)[0].strip()
return device
def shutdown(self):
if not self._is_initialized:
return
@@ -76,6 +89,12 @@ class hardware_serial_driver(speech_driver):
self.cancel()
if not isinstance(text, str) or text == "":
return
self._debug(
"Hardware speech queued text: "
f"{len(text)} chars, queue_size={self.text_queue.qsize()}",
debug.DebugLevel.INFO,
on_any_level=True,
)
self.text_queue.put(text)
def cancel(self):
@@ -83,7 +102,7 @@ class hardware_serial_driver(speech_driver):
return
self.clear_buffer()
if self.cancel_command:
self._write_bytes(self.cancel_command)
self._write_bytes(self.cancel_command, "cancel")
def clear_buffer(self):
if not self._is_initialized:
@@ -95,37 +114,58 @@ class hardware_serial_driver(speech_driver):
return
if not isinstance(rate, float):
return
self._write_bytes(self._rate_command(rate))
self._write_bytes(self._rate_command(rate), "rate")
def set_pitch(self, pitch):
if not self._is_initialized:
return
if not isinstance(pitch, float):
return
self._write_bytes(self._pitch_command(pitch))
self._write_bytes(self._pitch_command(pitch), "pitch")
def set_volume(self, volume):
if not self._is_initialized:
return
if not isinstance(volume, float):
return
self._write_bytes(self._volume_command(volume))
self._write_bytes(self._volume_command(volume), "volume")
def _worker(self):
while not self._stop_worker:
text = self.text_queue.get()
if text is None:
return
self._write_bytes(self._speak_bytes(text))
try:
data = self._speak_bytes(text)
self._debug(
"Hardware speech worker prepared speech bytes: "
f"{len(data)} bytes",
debug.DebugLevel.INFO,
on_any_level=True,
)
self._write_bytes(data, "speech")
except Exception as error:
self._debug(
f"Hardware speech worker failed: {error}",
debug.DebugLevel.ERROR,
on_any_level=True,
)
def _open_serial_port(self):
device = self._resolve_device(self.device)
if not device:
if not self.device or self.device == "auto":
self._debug(
"Hardware speech device not found",
"Hardware speech requires an explicit serial device",
debug.DebugLevel.ERROR,
on_any_level=True,
)
return
port = self._open_configured_serial_port(self.device)
if port is not None:
self._activate_serial_port(self.device, port)
def _open_configured_serial_port(self, device):
port = None
try:
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
tty.setraw(port)
@@ -138,52 +178,73 @@ class hardware_serial_driver(speech_driver):
attrs[6][termios.VTIME] = 0
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
termios.tcsetattr(port, termios.TCSANOW, attrs)
self.serial_port = port
self.device = device
except OSError as error:
return port
except (OSError, termios.error) as error:
self._close_port(port)
self._debug(
f"Hardware speech device open failed: {device}: {error}",
debug.DebugLevel.ERROR,
on_any_level=True,
)
self.serial_port = None
return None
def _activate_serial_port(self, device, port):
self.serial_port = port
self.device = device
self._debug(
"Hardware speech device opened: "
f"{device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO,
on_any_level=True,
)
def _close_serial_port(self):
with self.lock:
if self.serial_port is None:
return
try:
os.close(self.serial_port)
except OSError as error:
self._debug(
f"Hardware speech device close failed: {error}",
debug.DebugLevel.WARNING,
)
finally:
self.serial_port = None
self._close_port(self.serial_port)
self.serial_port = None
def _write_bytes(self, data):
def _close_port(self, port):
if port is None:
return
try:
os.close(port)
except OSError as error:
self._debug(
f"Hardware speech device close failed: {error}",
debug.DebugLevel.WARNING,
)
def _write_bytes(self, data, description="data"):
if not data:
return
with self.lock:
if self.serial_port is None:
return
try:
os.write(self.serial_port, data)
total_written = 0
while total_written < len(data):
bytes_written = os.write(
self.serial_port, data[total_written:]
)
if bytes_written == 0:
raise OSError("serial write returned 0 bytes")
total_written += bytes_written
preview = self._format_bytes_preview(data)
self._debug(
"Hardware speech wrote "
f"{total_written} {description} bytes: {preview}",
debug.DebugLevel.INFO,
on_any_level=True,
)
except OSError as error:
self._debug(
f"Hardware speech write failed: {error}",
debug.DebugLevel.ERROR,
on_any_level=True,
)
def _resolve_device(self, device):
if device and device != "auto":
return device
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*"):
matches = sorted(glob.glob(pattern))
if matches:
return matches[0]
return ""
def _termios_baud_rate(self, baud_rate):
baud_name = f"B{baud_rate}"
if hasattr(termios, baud_name):
@@ -205,10 +266,23 @@ class hardware_serial_driver(speech_driver):
value = max(0.0, min(1.0, value))
return int(round(minimum + value * (maximum - minimum)))
def _debug(self, message, level):
def _format_bytes_preview(self, data, limit=32):
preview = data[:limit]
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
ascii_preview = "".join(
chr(byte) if 0x20 <= byte <= 0x7E else "."
for byte in preview
)
suffix = "" if len(data) <= limit else " ..."
return (
f"hex=[{hex_preview}{suffix}] "
f"ascii=[{ascii_preview}{suffix}]"
)
def _debug(self, message, level, on_any_level=False):
try:
self.env["runtime"]["DebugManager"].write_debug_out(
message, level
message, level, on_any_level=on_any_level
)
except Exception:
pass
@@ -22,6 +22,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
)
speech_history_manager = Mock(is_active=Mock(return_value=True))
diff_review_manager = Mock(is_active=Mock(return_value=False))
vmenu_manager = Mock(get_active=Mock(return_value=False))
manager.environment = {
"input": {
@@ -32,6 +33,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
"runtime": {
"InputManager": input_manager,
"EventManager": event_manager,
"VmenuManager": vmenu_manager,
"DiffReviewManager": diff_review_manager,
"SpeechHistoryManager": speech_history_manager,
},
@@ -42,3 +44,43 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
event_manager.put_to_event_queue.assert_called_once_with(
FenrirEventType.execute_command, "SPEECH_HISTORY_PREV"
)
@pytest.mark.unit
def test_vmenu_plain_key_modal_command_is_dispatched():
manager = FenrirManager.__new__(FenrirManager)
manager.modifierInput = False
manager.singleKeyCommand = False
manager.command = ""
event_manager = Mock(put_to_event_queue=Mock())
input_manager = Mock(
is_key_press=Mock(return_value=False),
no_key_pressed=Mock(return_value=False),
get_curr_shortcut=Mock(return_value=str([1, ["KEY_UP"]])),
get_command_for_shortcut=Mock(return_value="PREV_VMENU_ENTRY"),
)
vmenu_manager = Mock(get_active=Mock(return_value=True))
speech_history_manager = Mock(is_active=Mock(return_value=False))
diff_review_manager = Mock(is_active=Mock(return_value=False))
manager.environment = {
"input": {
"key_forward": 0,
"prev_input": ["KEY_UP"],
"curr_input": ["KEY_UP"],
},
"runtime": {
"InputManager": input_manager,
"EventManager": event_manager,
"VmenuManager": vmenu_manager,
"DiffReviewManager": diff_review_manager,
"SpeechHistoryManager": speech_history_manager,
},
}
manager.detect_shortcut_command()
event_manager.put_to_event_queue.assert_called_once_with(
FenrirEventType.execute_command, "PREV_VMENU_ENTRY"
)
@@ -1,6 +1,7 @@
import os
import select
import time
from unittest.mock import ANY
from unittest.mock import Mock
import pytest
@@ -103,6 +104,67 @@ def test_litetalk_driver_writes_settings_and_cancel(serial_pair):
speech_driver.shutdown()
def test_configured_device_supports_classic_serial(serial_pair):
master_fd, slave_name = serial_pair
speech_driver = litetalkDriver.driver()
speech_driver.initialize(build_environment(slave_name))
try:
assert speech_driver.device == slave_name
speech_driver.speak("Serial")
assert read_available(master_fd, 7) == b"Serial\r"
finally:
speech_driver.shutdown()
def test_configured_device_strips_inline_comment(serial_pair):
master_fd, slave_name = serial_pair
device_setting = f"{slave_name} # built-in serial port"
speech_driver = litetalkDriver.driver()
speech_driver.initialize(build_environment(device_setting))
try:
assert speech_driver.device == slave_name
speech_driver.speak("Specific")
assert read_available(master_fd, 9) == b"Specific\r"
finally:
speech_driver.shutdown()
def test_auto_device_is_rejected():
speech_driver = litetalkDriver.driver()
with pytest.raises(RuntimeError, match="hardware speech device"):
speech_driver.initialize(build_environment("auto"))
debug_manager = speech_driver.env["runtime"]["DebugManager"]
debug_manager.write_debug_out.assert_called_with(
"Hardware speech requires an explicit serial device",
ANY,
on_any_level=True,
)
def test_hardware_driver_retries_partial_serial_writes(monkeypatch):
speech_driver = litetalkDriver.driver()
speech_driver.env = build_environment("/dev/ttyUSB0")
speech_driver.serial_port = 12
written_chunks = []
def fake_write(port, data):
assert port == 12
chunk = data[:2]
written_chunks.append(chunk)
return len(chunk)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
fake_write,
)
speech_driver._write_bytes(b"abcdef", "speech")
assert written_chunks == [b"ab", b"cd", b"ef"]
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
speech_driver, master_fd = initialized_driver(driver_class, serial_pair)
+70
View File
@@ -39,3 +39,73 @@ def test_progress_detector_skips_typing_delta():
command.is_real_progress_update.assert_not_called()
command.detect_progress.assert_not_called()
@pytest.mark.unit
def test_progress_detector_allows_long_tqdm_transfer_delta():
progress_module = _load_progress_module()
command = progress_module.command()
sample = (
"88%|"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"█████████████████████████▊ "
"| 843M/954M [00:54<00:07, 15.2MB/s]"
)
command.env = {
"commandBuffer": {"progress_monitoring": True},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
},
"screen": {
"new_delta": sample,
"new_content_text": sample,
"old_cursor": {"x": 0, "y": 0},
"new_cursor": {"x": 0, "y": 0},
},
}
assert len(sample) > 200
assert command.is_real_progress_update()
@pytest.mark.unit
def test_progress_detector_beeps_for_long_tqdm_transfer_delta():
progress_module = _load_progress_module()
command = progress_module.command()
sample = (
"90%|"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"█████████████████████████████████████▍ "
"| 856M/954M [00:56<00:14, 6.78MB/s]"
)
command.env = {
"commandBuffer": {
"progress_monitoring": True,
"lastProgressValue": -1,
"lastProgressTime": 0,
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
},
"screen": {
"new_delta": sample,
"new_delta_is_typing": False,
"new_content_text": sample,
"old_cursor": {"x": 0, "y": 0},
"new_cursor": {"x": 0, "y": 0},
},
}
command.play_progress_tone = Mock()
command.run()
command.play_progress_tone.assert_called_once_with(90.0)
assert command.env["commandBuffer"]["lastProgressValue"] == 90.0
+84
View File
@@ -160,6 +160,90 @@ def test_pty_plain_stdin_does_not_record_tab_keypress():
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
@pytest.mark.unit
@pytest.mark.parametrize(
("sequence", "key_name"),
[
(b"\x1b[A", "KEY_UP"),
(b"\x1b[B", "KEY_DOWN"),
(b"\x1b[C", "KEY_RIGHT"),
(b"\x1b[D", "KEY_LEFT"),
(b"\x1b[5~", "KEY_PAGEUP"),
(b"\x1b[6~", "KEY_PAGEDOWN"),
(b"\x1b", "KEY_ESC"),
(b"\r", "KEY_ENTER"),
(b" ", "KEY_SPACE"),
(b"a", "KEY_A"),
(b"Z", "KEY_Z"),
],
)
def test_pty_vmenu_stdin_is_consumed_and_synthesizes_key_events(
sequence,
key_name,
):
pty_driver = PtyDriver()
event_queue = Mock()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"SettingsManager": settings_manager,
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(sequence, event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
assert event_queue.put.call_count == 2
first_event = event_queue.put.call_args_list[0].args[0]
second_event = event_queue.put.call_args_list[1].args[0]
assert first_event["Type"] == FenrirEventType.keyboard_input
assert first_event["data"]["event_name"] == key_name
assert first_event["data"]["event_state"] == 1
assert second_event["data"]["event_name"] == key_name
assert second_event["data"]["event_state"] == 0
@pytest.mark.unit
def test_pty_vmenu_unknown_stdin_is_consumed_without_injection():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[1;5A", event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
event_queue.put.assert_not_called()
@pytest.mark.unit
def test_pty_vmenu_stdin_does_not_duplicate_current_x11_key():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_RIGHT"]},
"runtime": {
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[C", event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
event_queue.put.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_input_honors_interrupt_disabled():
pty_driver = PtyDriver()
+29 -1
View File
@@ -16,11 +16,15 @@ def build_speech_history_manager(history_size=3):
settings_manager = Mock()
settings_manager.get_setting_as_int.return_value = history_size
memory_manager = Mock(add_value_to_first_index=Mock())
input_manager = Mock(reset_input_state=Mock())
input_driver = Mock(refresh_grabs=Mock())
env = {
"runtime": {
"OutputManager": output_manager,
"SettingsManager": settings_manager,
"MemoryManager": memory_manager,
"InputManager": input_manager,
"InputDriver": input_driver,
},
"bindings": {"original": "COMMAND"},
"rawBindings": {"original": [1, ["KEY_FENRIR"]]},
@@ -90,6 +94,7 @@ def test_open_history_installs_modal_bindings_and_replay_is_not_recorded():
manager, env, spoken_messages, _memory_manager = (
build_speech_history_manager()
)
env["rawBindings"]["ctrl_shut_up"] = [1, ["KEY_CTRL"]]
manager.add_text("first")
manager.add_text("second")
@@ -101,10 +106,14 @@ def test_open_history_installs_modal_bindings_and_replay_is_not_recorded():
assert manager.curr_index == -1
assert manager.history == ["second", "first"]
assert "original" not in env["bindings"]
assert "original" not in env["rawBindings"]
assert env["rawBindings"]["original"] == [1, ["KEY_FENRIR"]]
assert env["rawBindings"]["ctrl_shut_up"] == [1, ["KEY_CTRL"]]
assert env["bindings"][str([1, ["KEY_UP"]])] == "SPEECH_HISTORY_PREV"
assert env["bindings"][str([1, ["KEY_ENTER"]])] == "SPEECH_HISTORY_COPY"
assert env["bindings"][str([1, ["KEY_ESC"]])] == "SPEECH_HISTORY_CLOSE"
assert env["rawBindings"][str([1, ["KEY_UP"]])] == [1, ["KEY_UP"]]
input_driver = env["runtime"]["InputDriver"]
input_driver.refresh_grabs.assert_called_once_with(force=True)
@pytest.mark.unit
@@ -145,3 +154,22 @@ def test_copy_current_adds_clipboard_and_restores_bindings():
assert not manager.is_active()
assert env["bindings"] == {"original": "COMMAND"}
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2
@pytest.mark.unit
def test_close_history_restores_keyboard_state_and_grabs():
manager, env, _spoken_messages, _memory_manager = (
build_speech_history_manager()
)
manager.add_text("first")
manager.open_history()
manager.close_history()
assert not manager.is_active()
assert env["bindings"] == {"original": "COMMAND"}
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2