Compare commits
26 Commits
29a6c3eb42
...
xim
| Author | SHA1 | Date | |
|---|---|---|---|
| 191fdbe8fd | |||
| fd5fe5b328 | |||
| 4ed3f4d6ab | |||
| 2cb83632f9 | |||
| 0c4fe50606 | |||
| 15f2435749 | |||
| 3897b63068 | |||
| f1a8e6af21 | |||
| bd54ec0edb | |||
| b9518f52ec | |||
| c143c9a561 | |||
| 7e2f927596 | |||
| 788e678ed6 | |||
| ea89e90c2f | |||
| ce43d64e77 | |||
| 618987546a | |||
| 604221a29d | |||
| 89b85c6f17 | |||
| 6e3d7fee94 | |||
| 089850ac18 | |||
| 5b7c08260a | |||
| d4b2fec1db | |||
| 1f7aa99cc0 | |||
| d853e1b24d | |||
| e8bc34eaf5 | |||
| f84167a7fb |
@@ -460,13 +460,14 @@ setting <action> [parameters]
|
||||
- `speech#voice=voice_name` - Voice selection (e.g., "en-us+f3")
|
||||
- `speech#module=module_name` - TTS module (e.g., "espeak-ng")
|
||||
- `speech#driver=driver_name` - Speech driver (speechdDriver/genericDriver/dectalkDriver/litetalkDriver/doubletalkDriver/tripletalkDriver)
|
||||
- `speech#hardware_device=auto` - Hardware synth serial device for dectalkDriver/litetalkDriver
|
||||
- `speech#hardware_device=/dev/ttyS0` - Hardware synth serial device for dectalkDriver/litetalkDriver
|
||||
- `speech#hardware_baud_rate=9600` - Hardware synth serial baud rate
|
||||
- `speech#history_size=50` - Number of spoken items kept in runtime speech history
|
||||
|
||||
USB hardware synths are supported only when Linux exposes them as a serial tty
|
||||
such as `/dev/ttyACM0` or `/dev/ttyUSB0`. A USB-only TripleTalk with no tty
|
||||
device would require a separate USB protocol driver.
|
||||
device would require a separate USB protocol driver. Use an explicit
|
||||
`speech#hardware_device` path for hardware speech.
|
||||
- `speech#auto_read_incoming=True/False` - Auto-read new text
|
||||
|
||||
*Sound Settings:*
|
||||
@@ -652,6 +653,10 @@ Building...
|
||||
- **Non-blocking**: Progress tones don't interrupt speech or other functionality
|
||||
- **Configurable**: Can be enabled/disabled as needed
|
||||
|
||||
Fenrir detects stable progress structures rather than application-specific
|
||||
status formats. Application-specific formats change too frequently to support
|
||||
reliably.
|
||||
|
||||
### Usage Examples
|
||||
|
||||
```bash
|
||||
@@ -735,7 +740,7 @@ send_fenrir_command("setting set speech#rate=0.9")
|
||||
|
||||
**Commands not working:**
|
||||
- Verify `enable_command_remote=True` in settings
|
||||
- Check Fenrir debug logs: `/var/log/fenrir.log`
|
||||
- Check Fenrir debug logs: `/tmp/fenrir.log`
|
||||
- Test with simple command: `echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock`
|
||||
|
||||
## Command Line Options
|
||||
|
||||
@@ -3,13 +3,13 @@ https://git.stormux.org/storm/fenrir/issues
|
||||
|
||||
For bugs, please provide a debug file that shows the issue.
|
||||
How to create a debug file:
|
||||
1. first delete old stuff:
|
||||
sudo rm /var/log/fenrir.log
|
||||
2. start fenrir in debug mode
|
||||
1. start fenrir in debug mode
|
||||
sudo fenrir -d
|
||||
<do your stuff>
|
||||
3.
|
||||
2.
|
||||
stop fenrir (fenrirKey + q)
|
||||
the debug file is in /var/log/fenrir.log
|
||||
the debug file is in /tmp/fenrir.log
|
||||
if another Fenrir debug instance is already using it, check /tmp/fenrir2.log,
|
||||
/tmp/fenrir3.log, etc.
|
||||
|
||||
please be as precise as possible to make it easy to solve the problem.
|
||||
|
||||
@@ -79,12 +79,12 @@ volume=1.0
|
||||
# Used by dectalkDriver, litetalkDriver, doubletalkDriver, and tripletalkDriver.
|
||||
# USB serial devices are supported if Linux exposes them as /dev/ttyACM*
|
||||
# or /dev/ttyUSB*. USB-only synths with no tty device need a separate driver.
|
||||
# auto checks /dev/ttyACM* first, then /dev/ttyUSB*.
|
||||
# Set an explicit device for hardware speech.
|
||||
# Examples:
|
||||
# hardware_device=/dev/ttyACM0 # RPITalk USB gadget mode
|
||||
# hardware_device=/dev/ttyUSB0 # USB serial adapter
|
||||
# hardware_device=/dev/ttyS0 # built-in serial port
|
||||
hardware_device=auto
|
||||
hardware_device=/dev/ttyS0
|
||||
|
||||
# Serial baud rate for hardware speech synthesizers.
|
||||
hardware_baud_rate=9600
|
||||
@@ -183,7 +183,8 @@ double_tap_timeout=0.2
|
||||
# The default is 0, no logging.
|
||||
debug_level=0
|
||||
# debugMode sets where the debug output should send to:
|
||||
# debugMode=File writes to debug_file (Default:/tmp/fenrir-PID.log)
|
||||
# debugMode=File writes to debug_file (Default:/tmp/fenrir.log)
|
||||
# If the default log is already in use, Fenrir uses /tmp/fenrir2.log, etc.
|
||||
# debugMode=Print just prints on the screen
|
||||
debug_mode=File
|
||||
debug_file=
|
||||
|
||||
@@ -114,7 +114,7 @@ sudo ./fenrir -f -d -p
|
||||
|
||||
# Debug output goes to:
|
||||
# - Console (with -p flag)
|
||||
# - /var/log/fenrir.log
|
||||
# - /tmp/fenrir.log
|
||||
```
|
||||
|
||||
## Creating Commands
|
||||
|
||||
+4
-2
@@ -50,7 +50,9 @@ Multiple settings can be separated by semicolons.
|
||||
|
||||
.TP
|
||||
.BR \-d ", " \-\-debug
|
||||
Enable debug mode. Debug information will be logged to /var/log/fenrir.log.
|
||||
Enable debug mode. Debug information will be logged to /tmp/fenrir.log by
|
||||
default. If another Fenrir debug instance is already using it, Fenrir uses
|
||||
/tmp/fenrir2.log, /tmp/fenrir3.log, etc.
|
||||
|
||||
.TP
|
||||
.BR \-p ", " \-\-print
|
||||
@@ -476,7 +478,7 @@ User sound themes
|
||||
User scripts
|
||||
|
||||
.TP
|
||||
.B /var/log/fenrir.log
|
||||
.B /tmp/fenrir.log
|
||||
Debug log file
|
||||
|
||||
.TP
|
||||
|
||||
+4
-7
@@ -1684,12 +1684,9 @@ the pico module:
|
||||
language=de-DE
|
||||
....
|
||||
|
||||
Hardware speech drivers use a serial device. The default `+auto+` checks
|
||||
`+/dev/ttyACM*+` first, then `+/dev/ttyUSB*+`. Set an explicit path for
|
||||
stable systems.
|
||||
Hardware speech drivers use a serial device. Set an explicit path.
|
||||
|
||||
....
|
||||
hardware_device=auto
|
||||
hardware_device=/dev/ttyACM0
|
||||
hardware_device=/dev/ttyUSB0
|
||||
hardware_device=/dev/ttyS0
|
||||
@@ -2279,13 +2276,13 @@ that shows the issue.
|
||||
|
||||
==== How-to create a debug file
|
||||
|
||||
. Delete old debug stuff +
|
||||
`+sudo rm /var/log/fenrir.log+`
|
||||
. Start fenrir in debug mode +
|
||||
`+sudo fenrir -d+`
|
||||
. Do your stuff to reproduce the problem
|
||||
. Stop fenrir (`+fenrirKey + q+`)
|
||||
|
||||
the debug file is located in `+/var/log/fenrir.log+`
|
||||
the debug file is located in `+/tmp/fenrir.log+`. If another Fenrir debug
|
||||
instance is already using it, check `+/tmp/fenrir2.log+`,
|
||||
`+/tmp/fenrir3.log+`, etc.
|
||||
|
||||
Please be as precise as possible to make it easy to solve the problem.
|
||||
|
||||
+9
-6
@@ -101,7 +101,7 @@ driver=speechdDriver
|
||||
rate=0.5
|
||||
pitch=0.5
|
||||
volume=1.0
|
||||
hardware_device=auto
|
||||
hardware_device=/dev/ttyS0
|
||||
hardware_baud_rate=9600
|
||||
history_size=50
|
||||
|
||||
@@ -312,6 +312,9 @@ Fenrir automatically detects and provides audio feedback for progress indicators
|
||||
- **Automatic**: Works with downloads, compilations, installations
|
||||
- **Remote control**: Enable via socket commands
|
||||
|
||||
Fenrir detects stable progress structures rather than application-specific
|
||||
status formats, which change too frequently to support reliably.
|
||||
|
||||
### Spell Checking
|
||||
- `Fenrir + S` - Spell check current word
|
||||
- `Fenrir + S S` - Add word to dictionary
|
||||
@@ -341,10 +344,10 @@ Fenrir automatically detects and provides audio feedback for progress indicators
|
||||
- **doubletalkDriver** - Serial DoubleTalk LT-compatible hardware speech
|
||||
- **tripletalkDriver** - Serial TripleTalk-compatible hardware speech
|
||||
|
||||
For hardware speech, set `speech#hardware_device` to `auto` or an explicit
|
||||
serial path. RPITalk gadget mode usually appears as `/dev/ttyACM0`; USB serial
|
||||
adapters usually appear as `/dev/ttyUSB0`; built-in serial ports may be
|
||||
`/dev/ttyS0`. The default baud rate is `9600`. `doubletalkDriver` targets
|
||||
For hardware speech, set `speech#hardware_device` to an explicit serial path.
|
||||
RPITalk gadget mode usually appears as `/dev/ttyACM0`; USB serial adapters
|
||||
usually appear as `/dev/ttyUSB0`; built-in serial ports may be `/dev/ttyS0`.
|
||||
The default baud rate is `9600`. `doubletalkDriver` targets
|
||||
DoubleTalk LT-style serial devices, not the internal DoubleTalk PC ISA card.
|
||||
USB TripleTalk devices work only if Linux exposes them as a serial tty such as
|
||||
`/dev/ttyACM0` or `/dev/ttyUSB0`; USB-only models with no tty device need a
|
||||
@@ -428,7 +431,7 @@ For a dedicated PTY/terminal screen reader, see TDSR: https://github.com/tspivey
|
||||
### Debug Mode
|
||||
```bash
|
||||
sudo fenrir -f -d
|
||||
# Debug output goes to /var/log/fenrir.log
|
||||
# Debug output goes to /tmp/fenrir.log
|
||||
```
|
||||
|
||||
## Getting Help
|
||||
|
||||
+4
-4
@@ -927,8 +927,7 @@ Select the language you want Fenrir to use.
|
||||
language=english-us
|
||||
Values: Text, see your TTS synths documentation what is available.
|
||||
|
||||
Hardware speech drivers use a serial device. The default ''auto'' checks /dev/ttyACM* first, then /dev/ttyUSB*. Set an explicit path for stable systems.
|
||||
hardware_device=auto
|
||||
Hardware speech drivers use a serial device. Set an explicit path.
|
||||
hardware_device=/dev/ttyACM0
|
||||
hardware_device=/dev/ttyUSB0
|
||||
hardware_device=/dev/ttyS0
|
||||
@@ -1316,10 +1315,11 @@ Please report Bugs and feature requests to:
|
||||
|
||||
for bugs please provide a [[#Howto create a debug file|debug]] file that shows the issue.
|
||||
==== How-to create a debug file ====
|
||||
- Delete old debug stuff\\ ''sudo rm /var/log/fenrir.log''
|
||||
- Start fenrir in debug mode\\ ''sudo fenrir -d''
|
||||
- Do your stuff to reproduce the problem
|
||||
- Stop fenrir (''fenrirKey + q'')
|
||||
the debug file is located in ''/var/log/fenrir.log''
|
||||
the debug file is located in ''/tmp/fenrir.log''. If another Fenrir debug
|
||||
instance is already using it, check ''/tmp/fenrir2.log'',
|
||||
''/tmp/fenrir3.log'', etc.
|
||||
|
||||
Please be as precise as possible to make it easy to solve the problem.
|
||||
|
||||
@@ -67,7 +67,7 @@ class command:
|
||||
)
|
||||
# is has attribute it enabled?
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
cursor_pos = self.env["screen"]["newCursorReview"]
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ class command:
|
||||
)
|
||||
# is has attribute it enabled?
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
cursor_pos = self.env["screen"]["newCursorReview"]
|
||||
|
||||
|
||||
@@ -112,9 +112,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
@@ -125,7 +125,7 @@ class command:
|
||||
)
|
||||
# is has attribute it enabled?
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
cursor_pos = self.env["screen"]["newCursorReview"]
|
||||
|
||||
|
||||
@@ -49,9 +49,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -50,9 +50,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
|
||||
def set_callback(self, callback):
|
||||
|
||||
@@ -95,9 +95,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=False,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -60,9 +60,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -50,9 +50,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -23,15 +23,15 @@ class command:
|
||||
def run(self):
|
||||
self.env["runtime"]["SettingsManager"].set_setting(
|
||||
"general",
|
||||
"hasattributes",
|
||||
"has_attributes",
|
||||
str(
|
||||
not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
)
|
||||
),
|
||||
)
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("announcement of attributes enabled"),
|
||||
|
||||
@@ -62,6 +62,7 @@ class command:
|
||||
announce_capital=True,
|
||||
flush=False,
|
||||
)
|
||||
self.env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] = True
|
||||
|
||||
def _is_recent_tab_input(self):
|
||||
input_manager = self.env["runtime"].get("InputManager")
|
||||
|
||||
+76
-5
@@ -4,6 +4,8 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
import re
|
||||
|
||||
from fenrirscreenreader.core.i18n import _
|
||||
from fenrirscreenreader.utils import line_utils
|
||||
from fenrirscreenreader.utils import word_utils
|
||||
@@ -30,14 +32,21 @@ class command:
|
||||
if self.env["runtime"]["ScreenManager"].is_screen_change():
|
||||
self.lastIdent = 0
|
||||
return
|
||||
# Don't announce cursor movements when auto-read is handling incoming text
|
||||
# This prevents interrupting ongoing auto-read announcements
|
||||
if self.env["runtime"]["ScreenManager"].is_delta():
|
||||
return
|
||||
|
||||
# is a vertical change?
|
||||
if not self.env["runtime"]["CursorManager"].is_cursor_vertical_move():
|
||||
return
|
||||
# Don't announce cursor movements when auto-read is handling incoming text.
|
||||
# In PTY mode, TUI navigation often arrives as cursor movement plus a
|
||||
# small repaint delta, so allow the line announcement there.
|
||||
if (
|
||||
self.env["runtime"]["ScreenManager"].is_delta()
|
||||
and self.env["screen"].get("newTTY") != "pty"
|
||||
):
|
||||
return
|
||||
pty_repaint_delta = (
|
||||
self.env["screen"].get("newTTY") == "pty"
|
||||
and self.env["runtime"]["ScreenManager"].is_delta()
|
||||
)
|
||||
|
||||
x, y, curr_line = line_utils.get_current_line(
|
||||
self.env["screen"]["new_cursor"]["x"],
|
||||
@@ -52,10 +61,26 @@ class command:
|
||||
do_interrupt = False
|
||||
|
||||
if curr_line.isspace():
|
||||
if pty_repaint_delta and self._delta_has_nonblank_text():
|
||||
return
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("blank"), sound_icon="EmptyLine", interrupt=do_interrupt, flush=False
|
||||
)
|
||||
if pty_repaint_delta:
|
||||
self.env["commandsIgnore"]["onScreenUpdate"][
|
||||
"INCOMING_IGNORE"
|
||||
] = True
|
||||
else:
|
||||
dialog_text = self._get_pty_dialog_text(curr_line, y)
|
||||
if dialog_text:
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
dialog_text, interrupt=do_interrupt, flush=False
|
||||
)
|
||||
if pty_repaint_delta:
|
||||
self.env["commandsIgnore"]["onScreenUpdate"][
|
||||
"INCOMING_IGNORE"
|
||||
] = True
|
||||
return
|
||||
# ident
|
||||
curr_ident = len(curr_line) - len(curr_line.lstrip())
|
||||
if self.lastIdent == -1:
|
||||
@@ -98,7 +123,53 @@ class command:
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
say_line, interrupt=do_interrupt, flush=False
|
||||
)
|
||||
if pty_repaint_delta:
|
||||
self.env["commandsIgnore"]["onScreenUpdate"][
|
||||
"INCOMING_IGNORE"
|
||||
] = True
|
||||
self.lastIdent = curr_ident
|
||||
|
||||
def _get_pty_dialog_text(self, curr_line, curr_y):
|
||||
if self.env["screen"].get("newTTY") != "pty":
|
||||
return ""
|
||||
if not self._is_focus_control_line(curr_line):
|
||||
return ""
|
||||
screen_lines = self.env["screen"]["new_content_text"].split("\n")
|
||||
start = max(0, curr_y - 6)
|
||||
candidate_lines = []
|
||||
for line in screen_lines[start: curr_y + 1]:
|
||||
normalized = self._normalize_line(line)
|
||||
if normalized:
|
||||
candidate_lines.append(normalized)
|
||||
if len(candidate_lines) < 2:
|
||||
return ""
|
||||
if all(self._is_focus_control_line(line) for line in candidate_lines):
|
||||
return ""
|
||||
return "\n".join(candidate_lines)
|
||||
|
||||
def _delta_has_nonblank_text(self):
|
||||
return bool(self.env["screen"].get("new_delta", "").strip())
|
||||
|
||||
def _normalize_line(self, line):
|
||||
return " ".join(line.split())
|
||||
|
||||
def _is_focus_control_line(self, text):
|
||||
stripped = self._normalize_line(text)
|
||||
if not stripped:
|
||||
return False
|
||||
control = r"<\s*[A-Za-z][A-Za-z0-9 _.-]{0,30}\s*>"
|
||||
if re.fullmatch(rf"(?:{control}\s*)+", stripped):
|
||||
return True
|
||||
return stripped.lower() in {
|
||||
"ok",
|
||||
"cancel",
|
||||
"yes",
|
||||
"no",
|
||||
"retry",
|
||||
"abort",
|
||||
"ignore",
|
||||
"continue",
|
||||
}
|
||||
|
||||
def set_callback(self, callback):
|
||||
pass
|
||||
|
||||
@@ -24,7 +24,7 @@ class command:
|
||||
def run(self):
|
||||
# is it enabled?
|
||||
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
return
|
||||
# is a vertical change?
|
||||
|
||||
@@ -33,6 +33,8 @@ class command:
|
||||
self.env["input"]["prev_input"]
|
||||
):
|
||||
return
|
||||
if self._current_input_runs_fenrir_command():
|
||||
return
|
||||
# if the filter is set
|
||||
if (
|
||||
self.env["runtime"]["SettingsManager"]
|
||||
@@ -50,5 +52,16 @@ class command:
|
||||
return
|
||||
self.env["runtime"]["OutputManager"].interrupt_output_async()
|
||||
|
||||
def _current_input_runs_fenrir_command(self):
|
||||
input_manager = self.env["runtime"].get("InputManager")
|
||||
if input_manager is None:
|
||||
return False
|
||||
try:
|
||||
shortcut = input_manager.get_curr_shortcut()
|
||||
command = input_manager.get_command_for_shortcut(shortcut)
|
||||
except Exception:
|
||||
return False
|
||||
return isinstance(command, str) and command != ""
|
||||
|
||||
def set_callback(self, callback):
|
||||
pass
|
||||
|
||||
@@ -81,12 +81,13 @@ class command:
|
||||
delta_length = len(delta_text)
|
||||
if (
|
||||
delta_length > 200
|
||||
): # Allow longer progress lines like Claude Code's status
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
f"Progress filter: delta too long ({delta_length})",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
return False
|
||||
): # Allow longer progress lines such as terminal status output
|
||||
if not self.is_explicit_progress_delta(delta_text):
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
f"Progress filter: delta too long ({delta_length})",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
return False
|
||||
|
||||
# If delta contains newlines and is substantial, let incoming handler
|
||||
# deal with it to avoid interfering with multi-line text output
|
||||
@@ -107,6 +108,25 @@ class command:
|
||||
|
||||
return True
|
||||
|
||||
def is_explicit_progress_delta(self, text):
|
||||
"""Allow long single-line deltas that still look like progress output."""
|
||||
import re
|
||||
|
||||
if "\n" in text or self.contains_url(text):
|
||||
return False
|
||||
|
||||
has_percentage = re.search(r"(^|\s)\d+(?:\.\d+)?\s*%", text)
|
||||
if not has_percentage:
|
||||
return False
|
||||
|
||||
return bool(
|
||||
re.search(
|
||||
r"[|\[\]#=*>█▉▊▋▌▍▎▏▒▓░]"
|
||||
r"|\b\d+(?:\.\d+)?\s*[kKmMgGtT](?:i?B)?/s\b",
|
||||
text,
|
||||
)
|
||||
)
|
||||
|
||||
def reset_progress_state(self):
|
||||
"""Reset progress state when a prompt is detected, allowing new progress operations to start fresh"""
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
@@ -306,43 +326,27 @@ class command:
|
||||
self.env["commandBuffer"]["lastProgressTime"] = current_time
|
||||
return
|
||||
|
||||
# Pattern 6: Claude Code working indicators (various symbols + activity text + "esc/ctrl+c to interrupt")
|
||||
# Matches any: [symbol] [Task description]… (... to interrupt ...)
|
||||
# Pattern 6: Interruptible terminal activity indicators
|
||||
# Matches any: [symbol] [Task description][…] (... to interrupt ...)
|
||||
# Symbols include: * ✢ ✽ ✶ ✻ · • ◦ ○ ● ◆ and similar decorative characters
|
||||
# Example: ✽ Reviewing script for issues… (ctrl+c to interrupt · 33s · ↑ 1.6k tokens · thought for 4s)
|
||||
claude_progress_match = re.search(
|
||||
r'[*✢✽✶✻·•◦○●◆]\s+\w+.*?…\s*\(.*(?:esc|ctrl\+c) to interrupt.*\)',
|
||||
# Keep this structural rather than adding application-specific formats,
|
||||
# which change too frequently to support reliably.
|
||||
interruptible_activity_match = re.search(
|
||||
r'[*✢✽✶✻·•◦○●◆]\s+\w+.*?(?:…\s*)?\(.*(?:esc|ctrl\+c) to interrupt.*\)',
|
||||
text,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if claude_progress_match:
|
||||
if interruptible_activity_match:
|
||||
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"Playing Claude Code activity beep",
|
||||
"Playing interruptible activity beep",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
self.play_activity_beep()
|
||||
self.env["commandBuffer"]["lastProgressTime"] = current_time
|
||||
return
|
||||
|
||||
# Pattern 6b: Claude Code tool invocation indicators (● Tool Name(...))
|
||||
# Example: ● Web Search("query here")
|
||||
tool_invocation_match = re.search(
|
||||
r'[●○◉•◦]\s+(?:Web\s*Search|Read|Write|Edit|Bash|Glob|Grep|Task|WebFetch)\s*\(',
|
||||
text,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if tool_invocation_match:
|
||||
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"Playing Claude Code tool invocation beep",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
self.play_activity_beep()
|
||||
self.env["commandBuffer"]["lastProgressTime"] = current_time
|
||||
return
|
||||
|
||||
# Pattern 6c: Bullet/white bullet activity lines (•/◦ ...)
|
||||
# Pattern 6b: Bullet/white bullet activity lines (•/◦ ...)
|
||||
bullet_activity_match = re.search(
|
||||
(
|
||||
r'^\s*[•◦]\s+.*(?:…|\.{3,}|\b(?:thinking|working|processing|'
|
||||
|
||||
@@ -127,7 +127,7 @@ class config_command:
|
||||
self.config.set("speech", "rate", "0.75")
|
||||
self.config.set("speech", "pitch", "0.5")
|
||||
self.config.set("speech", "volume", "1.0")
|
||||
self.config.set("speech", "hardware_device", "auto")
|
||||
self.config.set("speech", "hardware_device", "/dev/ttyS0")
|
||||
self.config.set("speech", "hardware_baud_rate", "9600")
|
||||
|
||||
self.config.add_section("sound")
|
||||
|
||||
+1
-1
@@ -108,7 +108,7 @@ class command(config_command):
|
||||
"rate": "0.5",
|
||||
"pitch": "0.5",
|
||||
"volume": "1.0",
|
||||
"hardware_device": "auto",
|
||||
"hardware_device": "/dev/ttyS0",
|
||||
"hardware_baud_rate": "9600",
|
||||
"auto_read_incoming": "True",
|
||||
}
|
||||
|
||||
@@ -19,7 +19,9 @@ command_buffer = {
|
||||
|
||||
# used by the command_manager
|
||||
command_info = {
|
||||
# 'curr_command': '',
|
||||
"lastCommand": "",
|
||||
"lastCommandSection": "",
|
||||
"lastCommandRunTime": time.time(),
|
||||
"lastCommandExecutionTime": time.time(),
|
||||
"lastCommandRequestTime": time.time(),
|
||||
}
|
||||
|
||||
@@ -489,7 +489,10 @@ class CommandManager:
|
||||
+ str(e),
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
self.env["commandInfo"]["lastCommandExecutionTime"] = time.time()
|
||||
self.env["commandInfo"]["lastCommand"] = command
|
||||
self.env["commandInfo"]["lastCommandSection"] = section
|
||||
self.env["commandInfo"]["lastCommandRunTime"] = time.time()
|
||||
self.env["commandInfo"]["lastCommandExecutionTime"] = time.time()
|
||||
|
||||
def get_command_description(self, command, section="commands"):
|
||||
if self.command_exists(command, section):
|
||||
|
||||
@@ -3,24 +3,22 @@
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
import fcntl
|
||||
from datetime import datetime
|
||||
|
||||
from fenrirscreenreader.core import debug
|
||||
|
||||
|
||||
class DebugManager:
|
||||
DEFAULT_LOG_DIR = "/tmp"
|
||||
DEFAULT_LOG_BASENAME = "fenrir"
|
||||
DEFAULT_LOG_EXTENSION = ".log"
|
||||
|
||||
def __init__(self, file_name=""):
|
||||
self._file = None
|
||||
self._fileOpened = False
|
||||
self._fileName = (
|
||||
"/tmp/fenrir_"
|
||||
+ str(os.getpid())
|
||||
+ "_"
|
||||
+ str(datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S"))
|
||||
+ ".log"
|
||||
)
|
||||
if file_name != "":
|
||||
self._fileName = file_name
|
||||
self._fileName = file_name
|
||||
self._useDefaultLogName = file_name == ""
|
||||
|
||||
def initialize(self, environment):
|
||||
self.env = environment
|
||||
@@ -39,6 +37,10 @@ class DebugManager:
|
||||
self._fileOpened = False
|
||||
if file_name != "":
|
||||
self._fileName = file_name
|
||||
self._useDefaultLogName = False
|
||||
if self._useDefaultLogName:
|
||||
self._open_default_debug_file()
|
||||
return
|
||||
if self._fileName != "":
|
||||
directory = os.path.dirname(self._fileName)
|
||||
if not os.path.exists(directory):
|
||||
@@ -51,6 +53,43 @@ class DebugManager:
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def _open_default_debug_file(self):
|
||||
pathlib.Path(self.DEFAULT_LOG_DIR).mkdir(parents=True, exist_ok=True)
|
||||
log_number = 1
|
||||
while True:
|
||||
log_file = self._default_log_file_name(log_number)
|
||||
try:
|
||||
fd = os.open(
|
||||
log_file,
|
||||
os.O_CREAT | os.O_RDWR | os.O_NOFOLLOW,
|
||||
0o644,
|
||||
)
|
||||
file_obj = os.fdopen(fd, "a")
|
||||
fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
file_obj.seek(0)
|
||||
file_obj.truncate()
|
||||
os.chmod(log_file, 0o644)
|
||||
self._file = file_obj
|
||||
self._fileName = log_file
|
||||
self._fileOpened = True
|
||||
return
|
||||
except BlockingIOError:
|
||||
try:
|
||||
file_obj.close()
|
||||
except Exception:
|
||||
pass
|
||||
log_number += 1
|
||||
except OSError as e:
|
||||
print(e)
|
||||
return
|
||||
|
||||
def _default_log_file_name(self, log_number):
|
||||
suffix = "" if log_number == 1 else str(log_number)
|
||||
return os.path.join(
|
||||
self.DEFAULT_LOG_DIR,
|
||||
self.DEFAULT_LOG_BASENAME + suffix + self.DEFAULT_LOG_EXTENSION,
|
||||
)
|
||||
|
||||
def write_debug_out(
|
||||
self, text, level=debug.DebugLevel.DEACTIVE, on_any_level=False
|
||||
):
|
||||
@@ -120,3 +159,4 @@ class DebugManager:
|
||||
def set_debug_file(self, file_name):
|
||||
self.close_debug_file()
|
||||
self._fileName = file_name
|
||||
self._useDefaultLogName = file_name == ""
|
||||
|
||||
@@ -311,7 +311,10 @@ class FenrirManager:
|
||||
self.singleKeyCommand = True
|
||||
elif (
|
||||
(
|
||||
self.environment["runtime"]["DiffReviewManager"].is_active()
|
||||
self.environment["runtime"]["VmenuManager"].get_active()
|
||||
or self.environment["runtime"][
|
||||
"DiffReviewManager"
|
||||
].is_active()
|
||||
or self.environment["runtime"][
|
||||
"SpeechHistoryManager"
|
||||
].is_active()
|
||||
|
||||
@@ -66,6 +66,8 @@ class OutputManager:
|
||||
"present_text:\nsoundIcon:'" + sound_icon + "'\nText:\n" + text,
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
if interrupt and self._sound_icon_will_play(sound_icon):
|
||||
self.interrupt_output()
|
||||
if self.play_sound_icon(sound_icon, interrupt):
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"sound_icon found", debug.DebugLevel.INFO
|
||||
@@ -121,6 +123,18 @@ class OutputManager:
|
||||
return False
|
||||
return text.isupper()
|
||||
|
||||
def _sound_icon_will_play(self, sound_icon):
|
||||
if sound_icon == "":
|
||||
return False
|
||||
sound_icon = sound_icon.upper()
|
||||
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"sound", "enabled"
|
||||
):
|
||||
return False
|
||||
if sound_icon not in self.env["soundIcons"]:
|
||||
return False
|
||||
return self.env["runtime"]["SoundDriver"] is not None
|
||||
|
||||
def get_last_echo(self):
|
||||
return self.last_echo
|
||||
|
||||
|
||||
@@ -359,7 +359,21 @@ class ScreenManager:
|
||||
# Keep those as typing so incoming speech does not
|
||||
# announce the repainted prompt line.
|
||||
if temp_new_delta != expected_typing:
|
||||
if expected_typing.strip() != "":
|
||||
old_cursor_x = self.env["screen"]["old_cursor"][
|
||||
"x"
|
||||
]
|
||||
prefix_changed_before_cursor = (
|
||||
old_screen_text[:old_cursor_x]
|
||||
!= new_screen_text[:old_cursor_x]
|
||||
)
|
||||
likely_line_repaint = (
|
||||
prefix_changed_before_cursor
|
||||
and len(expected_typing.strip()) > 4
|
||||
)
|
||||
if (
|
||||
expected_typing.strip() != ""
|
||||
and not likely_line_repaint
|
||||
):
|
||||
diff_list = ["+ " + expected_typing]
|
||||
else:
|
||||
# Fallback: treat entire current line as new
|
||||
@@ -370,7 +384,7 @@ class ScreenManager:
|
||||
self.env["screen"]["new_cursor"]["y"]
|
||||
]
|
||||
diff_list = [
|
||||
"+ " + current_line + "\n"
|
||||
"+ " + current_line.rstrip()
|
||||
]
|
||||
typing = False
|
||||
else:
|
||||
|
||||
@@ -28,7 +28,7 @@ settings_data = {
|
||||
"module": "",
|
||||
"voice": "en-us",
|
||||
"language": "",
|
||||
"hardware_device": "auto",
|
||||
"hardware_device": "/dev/ttyS0",
|
||||
"hardware_baud_rate": 9600,
|
||||
"auto_read_incoming": True,
|
||||
"read_numbers_as_digits": False,
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import inspect
|
||||
import os
|
||||
from argparse import Namespace
|
||||
from configparser import ConfigParser
|
||||
|
||||
from fenrirscreenreader.core import applicationManager
|
||||
@@ -67,6 +68,15 @@ class SettingsManager:
|
||||
def shutdown(self):
|
||||
pass
|
||||
|
||||
def format_cli_args(self, cliArgs):
|
||||
if cliArgs is None:
|
||||
return "{}"
|
||||
if isinstance(cliArgs, Namespace):
|
||||
args = vars(cliArgs)
|
||||
else:
|
||||
args = vars(cliArgs) if hasattr(cliArgs, "__dict__") else {}
|
||||
return str({key: args[key] for key in sorted(args)})
|
||||
|
||||
def get_binding_backup(self):
|
||||
return self.bindingsBackup.copy()
|
||||
|
||||
@@ -644,6 +654,11 @@ class SettingsManager:
|
||||
)
|
||||
)
|
||||
environment["runtime"]["DebugManager"].initialize(environment)
|
||||
environment["runtime"]["DebugManager"].write_debug_out(
|
||||
"Fenrir startup CLI arguments: " + self.format_cli_args(cliArgs),
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
if cliArgs.force_all_screens:
|
||||
environment["runtime"]["force_all_screens"] = True
|
||||
|
||||
@@ -164,7 +164,7 @@ class SpeechHistoryManager:
|
||||
str([1, ["KEY_KPENTER"]]): "SPEECH_HISTORY_COPY",
|
||||
str([1, ["KEY_ESC"]]): "SPEECH_HISTORY_CLOSE",
|
||||
}
|
||||
self.env["rawBindings"] = {
|
||||
modal_raw_bindings = {
|
||||
str([1, ["KEY_UP"]]): [1, ["KEY_UP"]],
|
||||
str([1, ["KEY_DOWN"]]): [1, ["KEY_DOWN"]],
|
||||
str([1, ["KEY_SPACE"]]): [1, ["KEY_SPACE"]],
|
||||
@@ -172,6 +172,9 @@ class SpeechHistoryManager:
|
||||
str([1, ["KEY_KPENTER"]]): [1, ["KEY_KPENTER"]],
|
||||
str([1, ["KEY_ESC"]]): [1, ["KEY_ESC"]],
|
||||
}
|
||||
self.env["rawBindings"] = self.raw_bindings_backup.copy()
|
||||
self.env["rawBindings"].update(modal_raw_bindings)
|
||||
self._refresh_input_bindings()
|
||||
|
||||
def _restore_bindings(self):
|
||||
if self.bindings_backup is not None:
|
||||
@@ -180,3 +183,21 @@ class SpeechHistoryManager:
|
||||
self.env["rawBindings"] = self.raw_bindings_backup
|
||||
self.bindings_backup = None
|
||||
self.raw_bindings_backup = None
|
||||
self._reset_input_state()
|
||||
self._refresh_input_bindings()
|
||||
|
||||
def _reset_input_state(self):
|
||||
try:
|
||||
self.env["runtime"]["InputManager"].reset_input_state()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _refresh_input_bindings(self):
|
||||
try:
|
||||
refresh_grabs = getattr(
|
||||
self.env["runtime"]["InputDriver"], "refresh_grabs", None
|
||||
)
|
||||
if refresh_grabs:
|
||||
refresh_grabs(force=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -132,13 +132,6 @@ class TabCompletionManager:
|
||||
if candidate_text:
|
||||
return self._clean_text(candidate_text)
|
||||
|
||||
delta_text = self.env["screen"]["new_delta"]
|
||||
if (
|
||||
delta_text
|
||||
and not self.env["screen"].get("new_delta_is_typing", False)
|
||||
):
|
||||
return self._clean_text(delta_text)
|
||||
|
||||
return ""
|
||||
|
||||
def _get_cursor_line_inserted_text(
|
||||
@@ -184,26 +177,19 @@ class TabCompletionManager:
|
||||
return "".join(inserted_parts)
|
||||
|
||||
def _get_candidate_text(self, old_lines, new_lines, cursor_y):
|
||||
if len(old_lines) != len(new_lines):
|
||||
return self._get_inserted_lines(old_lines, new_lines, cursor_y)
|
||||
|
||||
changed_lines = []
|
||||
old_cursor_line = (
|
||||
old_lines[cursor_y].strip() if cursor_y < len(old_lines) else ""
|
||||
)
|
||||
for index, old_line in enumerate(old_lines):
|
||||
if index == cursor_y:
|
||||
continue
|
||||
if index < len(new_lines) and old_line != new_lines[index]:
|
||||
if new_lines[index].strip() == old_cursor_line:
|
||||
continue
|
||||
changed_lines.append(new_lines[index])
|
||||
|
||||
return "\n".join(
|
||||
line.rstrip() for line in changed_lines if line.strip()
|
||||
return self._get_inserted_lines(
|
||||
old_lines,
|
||||
new_lines,
|
||||
self.env["screen"]["new_cursor"]["y"],
|
||||
old_cursor_line,
|
||||
)
|
||||
|
||||
def _get_inserted_lines(self, old_lines, new_lines, cursor_y):
|
||||
def _get_inserted_lines(
|
||||
self, old_lines, new_lines, new_cursor_y, old_cursor_line
|
||||
):
|
||||
matcher = difflib.SequenceMatcher(
|
||||
None, old_lines, new_lines, autojunk=False
|
||||
)
|
||||
@@ -217,10 +203,15 @@ class TabCompletionManager:
|
||||
) in matcher.get_opcodes():
|
||||
if tag not in ["insert", "replace"]:
|
||||
continue
|
||||
if new_end <= cursor_y:
|
||||
if new_start > new_cursor_y:
|
||||
continue
|
||||
if tag == "replace" and any(
|
||||
line.strip() for line in old_lines[old_start:old_end]
|
||||
):
|
||||
continue
|
||||
for line in new_lines[new_start:new_end]:
|
||||
if line.strip():
|
||||
stripped_line = line.strip()
|
||||
if stripped_line and stripped_line != old_cursor_line:
|
||||
inserted_lines.append(line.rstrip())
|
||||
return "\n".join(inserted_lines)
|
||||
|
||||
|
||||
@@ -4,5 +4,5 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
version = "2026.05.21"
|
||||
code_name = "testing"
|
||||
version = "2026.06.04"
|
||||
code_name = "xim"
|
||||
|
||||
@@ -162,6 +162,7 @@ class driver(inputDriver):
|
||||
self.fenrir_keys = set()
|
||||
self.failed_grabs = 0
|
||||
self.modifier_state = 0
|
||||
self.modifier_interrupt_state = 0
|
||||
|
||||
def initialize(self, environment):
|
||||
self.env = environment
|
||||
@@ -194,6 +195,7 @@ class driver(inputDriver):
|
||||
)
|
||||
self.num_lock_mask = self.find_num_lock_mask()
|
||||
self.refresh_modifier_state()
|
||||
self.modifier_interrupt_state = self.modifier_state
|
||||
self.refresh_interesting_keys()
|
||||
self.refresh_grabs(force=True)
|
||||
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
|
||||
@@ -274,6 +276,7 @@ class driver(inputDriver):
|
||||
while active.value:
|
||||
try:
|
||||
self.refresh_grabs()
|
||||
self.poll_modifier_interrupt_keys()
|
||||
if not self.display.pending_events():
|
||||
time.sleep(0.01)
|
||||
continue
|
||||
@@ -371,6 +374,56 @@ class driver(inputDriver):
|
||||
"event_x_time": getattr(event, "time", X.CurrentTime),
|
||||
}
|
||||
|
||||
def poll_modifier_interrupt_keys(self):
|
||||
if not self.active or not self.should_poll_modifier_interrupt_keys():
|
||||
return
|
||||
try:
|
||||
pointer = self.root.query_pointer()
|
||||
current_state = getattr(pointer, "mask", 0)
|
||||
except Exception:
|
||||
return
|
||||
previous_state = self.modifier_interrupt_state
|
||||
self.modifier_interrupt_state = current_state
|
||||
self.modifier_state = current_state
|
||||
for key_name, modifier_mask in self.interrupt_modifier_masks():
|
||||
if current_state & modifier_mask and not previous_state & modifier_mask:
|
||||
self.interrupt_output_on_modifier_key(key_name)
|
||||
|
||||
def should_poll_modifier_interrupt_keys(self):
|
||||
try:
|
||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||
except Exception:
|
||||
return False
|
||||
if not settings_manager.get_setting_as_bool(
|
||||
"keyboard", "interrupt_on_key_press"
|
||||
):
|
||||
return False
|
||||
return (
|
||||
settings_manager.get_setting(
|
||||
"keyboard", "interrupt_on_key_press_filter"
|
||||
).strip()
|
||||
== ""
|
||||
)
|
||||
|
||||
def interrupt_modifier_masks(self):
|
||||
return [
|
||||
("KEY_CTRL", X.ControlMask),
|
||||
("KEY_SHIFT", X.ShiftMask),
|
||||
("KEY_ALT", X.Mod1Mask),
|
||||
]
|
||||
|
||||
def interrupt_output_on_modifier_key(self, key_name):
|
||||
try:
|
||||
self.env["runtime"]["OutputManager"].interrupt_output_async()
|
||||
except Exception as e:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"x11Driver modifier interrupt failed for "
|
||||
+ key_name
|
||||
+ ": "
|
||||
+ str(e),
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
|
||||
def refresh_modifier_state(self):
|
||||
try:
|
||||
pointer = self.root.query_pointer()
|
||||
|
||||
@@ -8,6 +8,7 @@ import fcntl
|
||||
import getpass
|
||||
import os
|
||||
import pty
|
||||
import re
|
||||
import shlex
|
||||
from queue import Full
|
||||
import signal
|
||||
@@ -35,6 +36,7 @@ class PTYConstants:
|
||||
SELECT_TIMEOUT = 0.05
|
||||
PROCESS_TERMINATION_TIMEOUT = 3.0
|
||||
PROCESS_KILL_DELAY = 0.5
|
||||
FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT = 0.3
|
||||
|
||||
# Polling intervals (in seconds)
|
||||
MIN_POLL_INTERVAL = 0.001
|
||||
@@ -75,6 +77,8 @@ class Terminal:
|
||||
)
|
||||
self.stream = pyte.ByteStream()
|
||||
self.stream.attach(self.screen)
|
||||
self._pending_control_bytes = b""
|
||||
self._discarding_control_string = False
|
||||
|
||||
def _log_error(self, message, level=None):
|
||||
"""Log error message using proper debug manager if available."""
|
||||
@@ -93,7 +97,53 @@ class Terminal:
|
||||
print(f"PTY Terminal: {message}")
|
||||
|
||||
def feed(self, data):
|
||||
self.stream.feed(data)
|
||||
self.stream.feed(self._filter_terminal_control_strings(data))
|
||||
|
||||
def _filter_terminal_control_strings(self, data):
|
||||
if not data:
|
||||
return data
|
||||
data = self._pending_control_bytes + data
|
||||
self._pending_control_bytes = b""
|
||||
output = bytearray()
|
||||
index = 0
|
||||
while index < len(data):
|
||||
if self._discarding_control_string:
|
||||
end = self._find_control_string_end(data, index)
|
||||
if end == -1:
|
||||
return bytes(output)
|
||||
index = end
|
||||
self._discarding_control_string = False
|
||||
continue
|
||||
|
||||
byte = data[index]
|
||||
if byte == 0x90:
|
||||
self._discarding_control_string = True
|
||||
index += 1
|
||||
continue
|
||||
if byte == 0x1B:
|
||||
if index + 1 >= len(data):
|
||||
self._pending_control_bytes = data[index:]
|
||||
break
|
||||
next_byte = data[index + 1]
|
||||
if next_byte in b"P^_X":
|
||||
self._discarding_control_string = True
|
||||
index += 2
|
||||
continue
|
||||
output.append(byte)
|
||||
index += 1
|
||||
return bytes(output)
|
||||
|
||||
def _find_control_string_end(self, data, start):
|
||||
c1_end = data.find(b"\x9c", start)
|
||||
st_end = data.find(b"\x1b\\", start)
|
||||
ends = [
|
||||
end + 1 for end in [c1_end] if end != -1
|
||||
] + [
|
||||
end + 2 for end in [st_end] if end != -1
|
||||
]
|
||||
if not ends:
|
||||
return -1
|
||||
return min(ends)
|
||||
|
||||
def update_attributes(self, initialize=False):
|
||||
buffer = self.screen.buffer
|
||||
@@ -105,8 +155,8 @@ class Terminal:
|
||||
try:
|
||||
self.attributes = [
|
||||
[
|
||||
list(attribute[1:]) + [False, "default", "default"]
|
||||
if len(attribute) > 1 else [False, "default", "default"]
|
||||
self._attribute_from_pyte_char(attribute)
|
||||
if len(attribute) > 1 else self._default_attribute[:]
|
||||
for attribute in line.values()
|
||||
]
|
||||
for line in buffer.values()
|
||||
@@ -143,7 +193,7 @@ class Terminal:
|
||||
|
||||
try:
|
||||
self.attributes[y] = [
|
||||
list(attribute[1:]) + [False, "default", "default"]
|
||||
self._attribute_from_pyte_char(attribute)
|
||||
for attribute in (buffer[y].values())
|
||||
]
|
||||
except Exception as e:
|
||||
@@ -155,6 +205,27 @@ class Terminal:
|
||||
# Use pre-created template for efficiency
|
||||
self.attributes[y] += [self._default_attribute[:] for _ in range(diff)]
|
||||
|
||||
def _attribute_from_pyte_char(self, attribute):
|
||||
fg = attribute.fg
|
||||
bg = attribute.bg
|
||||
reverse = bool(attribute.reverse)
|
||||
if reverse:
|
||||
fg, bg = bg, fg
|
||||
if fg == "default" and bg == "default":
|
||||
bg = "reverse"
|
||||
return [
|
||||
fg,
|
||||
bg,
|
||||
bool(attribute.bold),
|
||||
bool(attribute.italics),
|
||||
bool(attribute.underscore),
|
||||
bool(attribute.strikethrough),
|
||||
reverse,
|
||||
bool(attribute.blink),
|
||||
"default",
|
||||
"default",
|
||||
]
|
||||
|
||||
def resize(self, lines, columns):
|
||||
self.screen.resize(lines, columns)
|
||||
self.set_cursor()
|
||||
@@ -205,6 +276,8 @@ class driver(screenDriver):
|
||||
self.stdin_interrupt_lock = threading.Lock()
|
||||
self.stdin_interrupt_running = False
|
||||
self.stdin_interrupt_thread = None
|
||||
self.last_fenrir_stdin_command_time = 0.0
|
||||
self.fenrir_stdin_sequence_prefix = b""
|
||||
signal.signal(signal.SIGWINCH, self.handle_sigwinch)
|
||||
|
||||
# Runtime configuration storage
|
||||
@@ -295,6 +368,8 @@ class driver(screenDriver):
|
||||
def interrupt_output_on_stdin_input(self, msg_bytes):
|
||||
if not msg_bytes:
|
||||
return
|
||||
if self.is_terminal_response_sequence(msg_bytes):
|
||||
return
|
||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||
if not settings_manager.get_setting_as_bool(
|
||||
"keyboard", "interrupt_on_key_press"
|
||||
@@ -317,6 +392,29 @@ class driver(screenDriver):
|
||||
)
|
||||
self.stdin_interrupt_thread.start()
|
||||
|
||||
def is_terminal_response_sequence(self, msg_bytes):
|
||||
if not msg_bytes or not msg_bytes.startswith(b"\x1b"):
|
||||
return False
|
||||
if msg_bytes.startswith((b"\x1b]", b"\x1bP", b"\x1b_", b"\x1b^")):
|
||||
return True
|
||||
if not msg_bytes.startswith(b"\x1b["):
|
||||
return False
|
||||
try:
|
||||
sequence = msg_bytes.decode("ascii", errors="ignore")
|
||||
except Exception:
|
||||
return False
|
||||
if len(sequence) < 3:
|
||||
return False
|
||||
final_byte = sequence[-1]
|
||||
if final_byte not in "cRnt":
|
||||
return False
|
||||
body = sequence[2:-1]
|
||||
if final_byte == "R":
|
||||
return bool(re.fullmatch(r"\??\d+;\d+", body))
|
||||
if final_byte in "cnt":
|
||||
return bool(re.fullmatch(r"[?>=0-9;]*", body))
|
||||
return False
|
||||
|
||||
def run_stdin_interrupt(self):
|
||||
try:
|
||||
self.env["runtime"]["OutputManager"].interrupt_output()
|
||||
@@ -333,10 +431,189 @@ class driver(screenDriver):
|
||||
def handle_stdin_input(self, msg_bytes, event_queue):
|
||||
if self.synthesize_backspace_shortcut(msg_bytes, event_queue):
|
||||
return
|
||||
if self.handle_vmenu_stdin_input(msg_bytes, event_queue):
|
||||
return
|
||||
if self.stdin_matches_fenrir_command(msg_bytes):
|
||||
return
|
||||
if self.is_late_fenrir_shortcut_sequence(msg_bytes):
|
||||
return
|
||||
self.record_stdin_keypress(msg_bytes)
|
||||
self.interrupt_output_on_stdin_input(msg_bytes)
|
||||
self.inject_text_to_screen(msg_bytes)
|
||||
|
||||
def stdin_matches_fenrir_command(self, msg_bytes):
|
||||
input_manager = self.env["runtime"].get("InputManager")
|
||||
if input_manager is None:
|
||||
return False
|
||||
try:
|
||||
shortcut = input_manager.get_curr_shortcut()
|
||||
command = input_manager.get_command_for_shortcut(shortcut)
|
||||
except Exception:
|
||||
return False
|
||||
if not isinstance(command, str) or command == "":
|
||||
return False
|
||||
self.record_consumed_fenrir_stdin_sequence(msg_bytes)
|
||||
return True
|
||||
|
||||
def is_late_fenrir_shortcut_sequence(self, msg_bytes):
|
||||
if not self.recent_fenrir_stdin_command():
|
||||
if not self.recent_review_command_execution():
|
||||
return False
|
||||
if self.is_terminal_response_sequence(msg_bytes):
|
||||
return False
|
||||
if self.consume_keyboard_escape_sequence_fragment(msg_bytes):
|
||||
return True
|
||||
return False
|
||||
|
||||
def record_consumed_fenrir_stdin_sequence(self, msg_bytes):
|
||||
self.last_fenrir_stdin_command_time = time.monotonic()
|
||||
self.fenrir_stdin_sequence_prefix = self.remaining_keyboard_sequence_prefix(
|
||||
msg_bytes
|
||||
)
|
||||
|
||||
def recent_fenrir_stdin_command(self):
|
||||
return (
|
||||
time.monotonic() - self.last_fenrir_stdin_command_time
|
||||
<= PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
|
||||
)
|
||||
|
||||
def recent_review_command_execution(self):
|
||||
command_info = self.env.get("commandInfo", {})
|
||||
last_command = command_info.get("lastCommand", "")
|
||||
last_section = command_info.get("lastCommandSection", "")
|
||||
if last_section != "commands" or not last_command.startswith("REVIEW_"):
|
||||
return False
|
||||
try:
|
||||
last_time = float(command_info.get("lastCommandRunTime", 0))
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return (
|
||||
time.time() - last_time
|
||||
<= PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
|
||||
)
|
||||
|
||||
def is_keyboard_escape_sequence(self, msg_bytes):
|
||||
if msg_bytes == b"\x1b":
|
||||
return True
|
||||
if not msg_bytes or len(msg_bytes) < 2:
|
||||
return False
|
||||
if msg_bytes.startswith(b"\x1bO") and 3 <= len(msg_bytes) <= 4:
|
||||
return True
|
||||
if not msg_bytes.startswith(b"\x1b["):
|
||||
return False
|
||||
try:
|
||||
sequence = msg_bytes.decode("ascii", errors="ignore")
|
||||
except Exception:
|
||||
return False
|
||||
if len(sequence) < 3:
|
||||
return False
|
||||
return sequence[-1] in "~ABCDHFZPQRS"
|
||||
|
||||
def consume_keyboard_escape_sequence_fragment(self, msg_bytes):
|
||||
if self.is_keyboard_escape_sequence(msg_bytes):
|
||||
self.record_consumed_fenrir_stdin_sequence(msg_bytes)
|
||||
return True
|
||||
if self.fenrir_stdin_sequence_prefix:
|
||||
combined = self.fenrir_stdin_sequence_prefix + msg_bytes
|
||||
prefix = self.remaining_keyboard_sequence_prefix(combined)
|
||||
if self.is_keyboard_escape_sequence(combined) or prefix != b"":
|
||||
self.last_fenrir_stdin_command_time = time.monotonic()
|
||||
self.fenrir_stdin_sequence_prefix = prefix
|
||||
return True
|
||||
if msg_bytes.startswith((b"[", b"O")) and len(msg_bytes) > 1:
|
||||
combined = b"\x1b" + msg_bytes
|
||||
if self.is_keyboard_escape_sequence(combined):
|
||||
self.record_consumed_fenrir_stdin_sequence(combined)
|
||||
return True
|
||||
return False
|
||||
|
||||
def remaining_keyboard_sequence_prefix(self, msg_bytes):
|
||||
if not msg_bytes:
|
||||
return b""
|
||||
if msg_bytes == b"\x1b":
|
||||
return msg_bytes
|
||||
if msg_bytes.startswith(b"\x1bO") and len(msg_bytes) < 3:
|
||||
return msg_bytes
|
||||
if self.is_keyboard_escape_sequence(msg_bytes):
|
||||
return b""
|
||||
if msg_bytes.startswith(b"\x1b["):
|
||||
try:
|
||||
sequence = msg_bytes.decode("ascii", errors="ignore")
|
||||
except Exception:
|
||||
return b""
|
||||
if len(sequence) < 3:
|
||||
return msg_bytes
|
||||
if sequence[-1] not in "~ABCDHFZPQRS":
|
||||
return msg_bytes
|
||||
return b""
|
||||
|
||||
def handle_vmenu_stdin_input(self, msg_bytes, event_queue):
|
||||
if not self.is_vmenu_active():
|
||||
return False
|
||||
key_name = self.vmenu_stdin_key_name(msg_bytes)
|
||||
if key_name and not self.vmenu_key_already_handled(key_name):
|
||||
self.queue_keypress(key_name, event_queue)
|
||||
return True
|
||||
|
||||
def is_vmenu_active(self):
|
||||
try:
|
||||
return self.env["runtime"]["VmenuManager"].get_active()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def vmenu_stdin_key_name(self, msg_bytes):
|
||||
key_map = {
|
||||
b"\x1b": "KEY_ESC",
|
||||
b"\x1b[A": "KEY_UP",
|
||||
b"\x1b[B": "KEY_DOWN",
|
||||
b"\x1b[C": "KEY_RIGHT",
|
||||
b"\x1b[D": "KEY_LEFT",
|
||||
b"\x1b[5~": "KEY_PAGEUP",
|
||||
b"\x1b[6~": "KEY_PAGEDOWN",
|
||||
b"\r": "KEY_ENTER",
|
||||
b"\n": "KEY_ENTER",
|
||||
b" ": "KEY_SPACE",
|
||||
}
|
||||
if msg_bytes in key_map:
|
||||
return key_map[msg_bytes]
|
||||
if len(msg_bytes) != 1:
|
||||
return None
|
||||
char = chr(msg_bytes[0])
|
||||
if "a" <= char <= "z" or "A" <= char <= "Z":
|
||||
return "KEY_" + char.upper()
|
||||
return None
|
||||
|
||||
def vmenu_key_already_handled(self, key_name):
|
||||
try:
|
||||
return key_name in self.env["input"]["curr_input"]
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def queue_keypress(self, key_name, event_queue):
|
||||
event_time = time.time()
|
||||
for event_state in [1, 0]:
|
||||
try:
|
||||
event_queue.put(
|
||||
{
|
||||
"Type": FenrirEventType.keyboard_input,
|
||||
"data": {
|
||||
"event_name": key_name,
|
||||
"event_value": 0,
|
||||
"event_sec": int(event_time),
|
||||
"event_usec": int((event_time % 1) * 1000000),
|
||||
"event_state": event_state,
|
||||
"event_type": 0,
|
||||
},
|
||||
},
|
||||
block=False,
|
||||
)
|
||||
except Full:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"ptyDriver queue_keypress: Event queue full, dropping "
|
||||
+ key_name,
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
|
||||
def record_stdin_keypress(self, msg_bytes):
|
||||
if msg_bytes != b"\t":
|
||||
return
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
import glob
|
||||
import os
|
||||
import termios
|
||||
import threading
|
||||
@@ -43,14 +42,22 @@ class hardware_serial_driver(speech_driver):
|
||||
self.env = environment
|
||||
self._is_initialized = False
|
||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||
self.device = settings_manager.get_setting(
|
||||
"speech", "hardware_device"
|
||||
self.device = self._clean_device_setting(
|
||||
settings_manager.get_setting("speech", "hardware_device")
|
||||
)
|
||||
self.baud_rate = settings_manager.get_setting_as_int(
|
||||
"speech", "hardware_baud_rate"
|
||||
)
|
||||
self._debug(
|
||||
"Hardware speech initialize: "
|
||||
f"requested_device={self.device}, baud_rate={self.baud_rate}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
self._open_serial_port()
|
||||
self._is_initialized = self.serial_port is not None
|
||||
if not self._is_initialized:
|
||||
raise RuntimeError("hardware speech device is not available")
|
||||
if self._is_initialized:
|
||||
self._stop_worker = False
|
||||
self.worker_thread = threading.Thread(
|
||||
@@ -58,6 +65,12 @@ class hardware_serial_driver(speech_driver):
|
||||
)
|
||||
self.worker_thread.start()
|
||||
|
||||
def _clean_device_setting(self, device):
|
||||
if not isinstance(device, str):
|
||||
return ""
|
||||
device = device.split("#", 1)[0].split(";", 1)[0].strip()
|
||||
return device
|
||||
|
||||
def shutdown(self):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
@@ -76,6 +89,12 @@ class hardware_serial_driver(speech_driver):
|
||||
self.cancel()
|
||||
if not isinstance(text, str) or text == "":
|
||||
return
|
||||
self._debug(
|
||||
"Hardware speech queued text: "
|
||||
f"{len(text)} chars, queue_size={self.text_queue.qsize()}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
self.text_queue.put(text)
|
||||
|
||||
def cancel(self):
|
||||
@@ -83,7 +102,7 @@ class hardware_serial_driver(speech_driver):
|
||||
return
|
||||
self.clear_buffer()
|
||||
if self.cancel_command:
|
||||
self._write_bytes(self.cancel_command)
|
||||
self._write_bytes(self.cancel_command, "cancel")
|
||||
|
||||
def clear_buffer(self):
|
||||
if not self._is_initialized:
|
||||
@@ -95,37 +114,58 @@ class hardware_serial_driver(speech_driver):
|
||||
return
|
||||
if not isinstance(rate, float):
|
||||
return
|
||||
self._write_bytes(self._rate_command(rate))
|
||||
self._write_bytes(self._rate_command(rate), "rate")
|
||||
|
||||
def set_pitch(self, pitch):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not isinstance(pitch, float):
|
||||
return
|
||||
self._write_bytes(self._pitch_command(pitch))
|
||||
self._write_bytes(self._pitch_command(pitch), "pitch")
|
||||
|
||||
def set_volume(self, volume):
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if not isinstance(volume, float):
|
||||
return
|
||||
self._write_bytes(self._volume_command(volume))
|
||||
self._write_bytes(self._volume_command(volume), "volume")
|
||||
|
||||
def _worker(self):
|
||||
while not self._stop_worker:
|
||||
text = self.text_queue.get()
|
||||
if text is None:
|
||||
return
|
||||
self._write_bytes(self._speak_bytes(text))
|
||||
try:
|
||||
data = self._speak_bytes(text)
|
||||
self._debug(
|
||||
"Hardware speech worker prepared speech bytes: "
|
||||
f"{len(data)} bytes",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
self._write_bytes(data, "speech")
|
||||
except Exception as error:
|
||||
self._debug(
|
||||
f"Hardware speech worker failed: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
def _open_serial_port(self):
|
||||
device = self._resolve_device(self.device)
|
||||
if not device:
|
||||
if not self.device or self.device == "auto":
|
||||
self._debug(
|
||||
"Hardware speech device not found",
|
||||
"Hardware speech requires an explicit serial device",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
return
|
||||
|
||||
port = self._open_configured_serial_port(self.device)
|
||||
if port is not None:
|
||||
self._activate_serial_port(self.device, port)
|
||||
|
||||
def _open_configured_serial_port(self, device):
|
||||
port = None
|
||||
try:
|
||||
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
|
||||
tty.setraw(port)
|
||||
@@ -138,52 +178,73 @@ class hardware_serial_driver(speech_driver):
|
||||
attrs[6][termios.VTIME] = 0
|
||||
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
|
||||
termios.tcsetattr(port, termios.TCSANOW, attrs)
|
||||
self.serial_port = port
|
||||
self.device = device
|
||||
except OSError as error:
|
||||
return port
|
||||
except (OSError, termios.error) as error:
|
||||
self._close_port(port)
|
||||
self._debug(
|
||||
f"Hardware speech device open failed: {device}: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
self.serial_port = None
|
||||
return None
|
||||
|
||||
def _activate_serial_port(self, device, port):
|
||||
self.serial_port = port
|
||||
self.device = device
|
||||
self._debug(
|
||||
"Hardware speech device opened: "
|
||||
f"{device}, baud_rate={self.baud_rate}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
def _close_serial_port(self):
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
try:
|
||||
os.close(self.serial_port)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech device close failed: {error}",
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
finally:
|
||||
self.serial_port = None
|
||||
self._close_port(self.serial_port)
|
||||
self.serial_port = None
|
||||
|
||||
def _write_bytes(self, data):
|
||||
def _close_port(self, port):
|
||||
if port is None:
|
||||
return
|
||||
try:
|
||||
os.close(port)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech device close failed: {error}",
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
|
||||
def _write_bytes(self, data, description="data"):
|
||||
if not data:
|
||||
return
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
try:
|
||||
os.write(self.serial_port, data)
|
||||
total_written = 0
|
||||
while total_written < len(data):
|
||||
bytes_written = os.write(
|
||||
self.serial_port, data[total_written:]
|
||||
)
|
||||
if bytes_written == 0:
|
||||
raise OSError("serial write returned 0 bytes")
|
||||
total_written += bytes_written
|
||||
preview = self._format_bytes_preview(data)
|
||||
self._debug(
|
||||
"Hardware speech wrote "
|
||||
f"{total_written} {description} bytes: {preview}",
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
except OSError as error:
|
||||
self._debug(
|
||||
f"Hardware speech write failed: {error}",
|
||||
debug.DebugLevel.ERROR,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
def _resolve_device(self, device):
|
||||
if device and device != "auto":
|
||||
return device
|
||||
for pattern in ("/dev/ttyACM*", "/dev/ttyUSB*"):
|
||||
matches = sorted(glob.glob(pattern))
|
||||
if matches:
|
||||
return matches[0]
|
||||
return ""
|
||||
|
||||
def _termios_baud_rate(self, baud_rate):
|
||||
baud_name = f"B{baud_rate}"
|
||||
if hasattr(termios, baud_name):
|
||||
@@ -205,10 +266,23 @@ class hardware_serial_driver(speech_driver):
|
||||
value = max(0.0, min(1.0, value))
|
||||
return int(round(minimum + value * (maximum - minimum)))
|
||||
|
||||
def _debug(self, message, level):
|
||||
def _format_bytes_preview(self, data, limit=32):
|
||||
preview = data[:limit]
|
||||
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
|
||||
ascii_preview = "".join(
|
||||
chr(byte) if 0x20 <= byte <= 0x7E else "."
|
||||
for byte in preview
|
||||
)
|
||||
suffix = "" if len(data) <= limit else " ..."
|
||||
return (
|
||||
f"hex=[{hex_preview}{suffix}] "
|
||||
f"ascii=[{ascii_preview}{suffix}]"
|
||||
)
|
||||
|
||||
def _debug(self, message, level, on_any_level=False):
|
||||
try:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
message, level
|
||||
message, level, on_any_level=on_any_level
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -47,7 +47,6 @@ def get_up_char(curr_x, curr_y, curr_text):
|
||||
curr_y -= 1
|
||||
if curr_y < 0:
|
||||
curr_y = 0
|
||||
else:
|
||||
end_of_screen = True
|
||||
curr_char = ""
|
||||
if not end_of_screen:
|
||||
@@ -63,7 +62,6 @@ def get_down_char(curr_x, curr_y, curr_text):
|
||||
curr_y += 1
|
||||
if curr_y >= len(wrapped_lines):
|
||||
curr_y = len(wrapped_lines) - 1
|
||||
else:
|
||||
end_of_screen = True
|
||||
curr_char = ""
|
||||
if not end_of_screen:
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
from fenrirscreenreader.core.debugManager import DebugManager
|
||||
|
||||
|
||||
def test_default_debug_file_uses_flat_name(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
|
||||
manager = DebugManager()
|
||||
try:
|
||||
manager.open_debug_file()
|
||||
|
||||
assert manager.get_debug_file() == str(tmp_path / "fenrir.log")
|
||||
assert (tmp_path / "fenrir.log").exists()
|
||||
finally:
|
||||
manager.close_debug_file()
|
||||
|
||||
|
||||
def test_default_debug_file_uses_next_number_when_locked(
|
||||
tmp_path, monkeypatch
|
||||
):
|
||||
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
|
||||
first_manager = DebugManager()
|
||||
second_manager = DebugManager()
|
||||
try:
|
||||
first_manager.open_debug_file()
|
||||
second_manager.open_debug_file()
|
||||
|
||||
assert first_manager.get_debug_file() == str(tmp_path / "fenrir.log")
|
||||
assert second_manager.get_debug_file() == str(
|
||||
tmp_path / "fenrir2.log"
|
||||
)
|
||||
assert (tmp_path / "fenrir2.log").exists()
|
||||
finally:
|
||||
second_manager.close_debug_file()
|
||||
first_manager.close_debug_file()
|
||||
|
||||
|
||||
def test_default_debug_file_reuses_unlocked_flat_name(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
|
||||
first_manager = DebugManager()
|
||||
second_manager = DebugManager()
|
||||
try:
|
||||
first_manager.open_debug_file()
|
||||
first_manager.close_debug_file()
|
||||
second_manager.open_debug_file()
|
||||
|
||||
assert second_manager.get_debug_file() == str(tmp_path / "fenrir.log")
|
||||
finally:
|
||||
second_manager.close_debug_file()
|
||||
|
||||
|
||||
def test_explicit_debug_file_uses_exact_path(tmp_path):
|
||||
debug_file = tmp_path / "custom.log"
|
||||
manager = DebugManager(str(debug_file))
|
||||
try:
|
||||
manager.open_debug_file()
|
||||
|
||||
assert manager.get_debug_file() == str(debug_file)
|
||||
assert debug_file.exists()
|
||||
finally:
|
||||
manager.close_debug_file()
|
||||
@@ -22,6 +22,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
)
|
||||
speech_history_manager = Mock(is_active=Mock(return_value=True))
|
||||
diff_review_manager = Mock(is_active=Mock(return_value=False))
|
||||
vmenu_manager = Mock(get_active=Mock(return_value=False))
|
||||
|
||||
manager.environment = {
|
||||
"input": {
|
||||
@@ -32,6 +33,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"EventManager": event_manager,
|
||||
"VmenuManager": vmenu_manager,
|
||||
"DiffReviewManager": diff_review_manager,
|
||||
"SpeechHistoryManager": speech_history_manager,
|
||||
},
|
||||
@@ -42,3 +44,43 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
event_manager.put_to_event_queue.assert_called_once_with(
|
||||
FenrirEventType.execute_command, "SPEECH_HISTORY_PREV"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_vmenu_plain_key_modal_command_is_dispatched():
|
||||
manager = FenrirManager.__new__(FenrirManager)
|
||||
manager.modifierInput = False
|
||||
manager.singleKeyCommand = False
|
||||
manager.command = ""
|
||||
|
||||
event_manager = Mock(put_to_event_queue=Mock())
|
||||
input_manager = Mock(
|
||||
is_key_press=Mock(return_value=False),
|
||||
no_key_pressed=Mock(return_value=False),
|
||||
get_curr_shortcut=Mock(return_value=str([1, ["KEY_UP"]])),
|
||||
get_command_for_shortcut=Mock(return_value="PREV_VMENU_ENTRY"),
|
||||
)
|
||||
vmenu_manager = Mock(get_active=Mock(return_value=True))
|
||||
speech_history_manager = Mock(is_active=Mock(return_value=False))
|
||||
diff_review_manager = Mock(is_active=Mock(return_value=False))
|
||||
|
||||
manager.environment = {
|
||||
"input": {
|
||||
"key_forward": 0,
|
||||
"prev_input": ["KEY_UP"],
|
||||
"curr_input": ["KEY_UP"],
|
||||
},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"EventManager": event_manager,
|
||||
"VmenuManager": vmenu_manager,
|
||||
"DiffReviewManager": diff_review_manager,
|
||||
"SpeechHistoryManager": speech_history_manager,
|
||||
},
|
||||
}
|
||||
|
||||
manager.detect_shortcut_command()
|
||||
|
||||
event_manager.put_to_event_queue.assert_called_once_with(
|
||||
FenrirEventType.execute_command, "PREV_VMENU_ENTRY"
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
import select
|
||||
import time
|
||||
from unittest.mock import ANY
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
@@ -103,6 +104,67 @@ def test_litetalk_driver_writes_settings_and_cancel(serial_pair):
|
||||
speech_driver.shutdown()
|
||||
|
||||
|
||||
def test_configured_device_supports_classic_serial(serial_pair):
|
||||
master_fd, slave_name = serial_pair
|
||||
speech_driver = litetalkDriver.driver()
|
||||
speech_driver.initialize(build_environment(slave_name))
|
||||
try:
|
||||
assert speech_driver.device == slave_name
|
||||
speech_driver.speak("Serial")
|
||||
assert read_available(master_fd, 7) == b"Serial\r"
|
||||
finally:
|
||||
speech_driver.shutdown()
|
||||
|
||||
|
||||
def test_configured_device_strips_inline_comment(serial_pair):
|
||||
master_fd, slave_name = serial_pair
|
||||
device_setting = f"{slave_name} # built-in serial port"
|
||||
speech_driver = litetalkDriver.driver()
|
||||
speech_driver.initialize(build_environment(device_setting))
|
||||
try:
|
||||
assert speech_driver.device == slave_name
|
||||
speech_driver.speak("Specific")
|
||||
assert read_available(master_fd, 9) == b"Specific\r"
|
||||
finally:
|
||||
speech_driver.shutdown()
|
||||
|
||||
|
||||
def test_auto_device_is_rejected():
|
||||
speech_driver = litetalkDriver.driver()
|
||||
|
||||
with pytest.raises(RuntimeError, match="hardware speech device"):
|
||||
speech_driver.initialize(build_environment("auto"))
|
||||
|
||||
debug_manager = speech_driver.env["runtime"]["DebugManager"]
|
||||
debug_manager.write_debug_out.assert_called_with(
|
||||
"Hardware speech requires an explicit serial device",
|
||||
ANY,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
|
||||
def test_hardware_driver_retries_partial_serial_writes(monkeypatch):
|
||||
speech_driver = litetalkDriver.driver()
|
||||
speech_driver.env = build_environment("/dev/ttyUSB0")
|
||||
speech_driver.serial_port = 12
|
||||
written_chunks = []
|
||||
|
||||
def fake_write(port, data):
|
||||
assert port == 12
|
||||
chunk = data[:2]
|
||||
written_chunks.append(chunk)
|
||||
return len(chunk)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
|
||||
fake_write,
|
||||
)
|
||||
|
||||
speech_driver._write_bytes(b"abcdef", "speech")
|
||||
|
||||
assert written_chunks == [b"ab", b"cd", b"ef"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
|
||||
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
|
||||
speech_driver, master_fd = initialized_driver(driver_class, serial_pair)
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _load_command():
|
||||
module_path = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "src"
|
||||
/ "fenrirscreenreader"
|
||||
/ "commands"
|
||||
/ "onCursorChange"
|
||||
/ "85000-has_attribute.py"
|
||||
)
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"fenrir_has_attribute", module_path
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module.command()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_has_attribute_uses_configured_setting_name():
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
command = _load_command()
|
||||
command.initialize(
|
||||
{
|
||||
"runtime": {
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
command.run()
|
||||
|
||||
settings_manager.get_setting_as_bool.assert_called_once_with(
|
||||
"general", "has_attributes"
|
||||
)
|
||||
@@ -67,6 +67,21 @@ def test_present_text_allows_sound_only_feedback():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_present_text_sound_icon_with_interrupt_cancels_speech():
|
||||
output_manager, sound_driver, speech_driver = build_output_manager()
|
||||
|
||||
output_manager.present_text(
|
||||
"end of screen", sound_icon="Accept", interrupt=True
|
||||
)
|
||||
|
||||
speech_driver.cancel.assert_called_once_with()
|
||||
sound_driver.play_sound_file.assert_called_once_with(
|
||||
"/tmp/Accept.wav", True
|
||||
)
|
||||
speech_driver.speak.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_play_sound_supports_error_alias():
|
||||
output_manager, sound_driver, _speech_driver = build_output_manager()
|
||||
@@ -227,3 +242,35 @@ def test_key_interrupt_command_uses_nonblocking_interrupt():
|
||||
|
||||
output_manager.interrupt_output_async.assert_called_once_with()
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_key_interrupt_command_ignores_fenrir_shortcuts():
|
||||
module = load_key_interrupt_module()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting.return_value = ""
|
||||
input_manager = Mock(
|
||||
no_key_pressed=Mock(return_value=False),
|
||||
get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]),
|
||||
get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"),
|
||||
)
|
||||
output_manager = Mock()
|
||||
env = {
|
||||
"input": {
|
||||
"curr_input": ["KEY_KP9"],
|
||||
"prev_input": [],
|
||||
},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
command = module.command()
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
output_manager.interrupt_output_async.assert_not_called()
|
||||
|
||||
@@ -39,3 +39,104 @@ def test_progress_detector_skips_typing_delta():
|
||||
|
||||
command.is_real_progress_update.assert_not_called()
|
||||
command.detect_progress.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_progress_detector_allows_long_tqdm_transfer_delta():
|
||||
progress_module = _load_progress_module()
|
||||
command = progress_module.command()
|
||||
sample = (
|
||||
"88%|"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"█████████████████████████▊ "
|
||||
"| 843M/954M [00:54<00:07, 15.2MB/s]"
|
||||
)
|
||||
command.env = {
|
||||
"commandBuffer": {"progress_monitoring": True},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
|
||||
},
|
||||
"screen": {
|
||||
"new_delta": sample,
|
||||
"new_content_text": sample,
|
||||
"old_cursor": {"x": 0, "y": 0},
|
||||
"new_cursor": {"x": 0, "y": 0},
|
||||
},
|
||||
}
|
||||
|
||||
assert len(sample) > 200
|
||||
assert command.is_real_progress_update()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_progress_detector_beeps_for_long_tqdm_transfer_delta():
|
||||
progress_module = _load_progress_module()
|
||||
command = progress_module.command()
|
||||
sample = (
|
||||
"90%|"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"████████████████████████████████████████████████████████████████"
|
||||
"█████████████████████████████████████▍ "
|
||||
"| 856M/954M [00:56<00:14, 6.78MB/s]"
|
||||
)
|
||||
command.env = {
|
||||
"commandBuffer": {
|
||||
"progress_monitoring": True,
|
||||
"lastProgressValue": -1,
|
||||
"lastProgressTime": 0,
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
|
||||
},
|
||||
"screen": {
|
||||
"new_delta": sample,
|
||||
"new_delta_is_typing": False,
|
||||
"new_content_text": sample,
|
||||
"old_cursor": {"x": 0, "y": 0},
|
||||
"new_cursor": {"x": 0, "y": 0},
|
||||
},
|
||||
}
|
||||
command.play_progress_tone = Mock()
|
||||
|
||||
command.run()
|
||||
|
||||
command.play_progress_tone.assert_called_once_with(90.0)
|
||||
assert command.env["commandBuffer"]["lastProgressValue"] == 90.0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_progress_detector_beeps_for_interruptible_status_without_ellipsis():
|
||||
progress_module = _load_progress_module()
|
||||
command = progress_module.command()
|
||||
sample = "◦ Files: (1m 04s • esc to interrupt)"
|
||||
command.env = {
|
||||
"commandBuffer": {
|
||||
"progress_monitoring": True,
|
||||
"lastProgressValue": -1,
|
||||
"lastProgressTime": 0,
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
|
||||
},
|
||||
"screen": {
|
||||
"new_delta": sample,
|
||||
"new_delta_is_typing": False,
|
||||
"new_content_text": sample,
|
||||
"old_cursor": {"x": 0, "y": 0},
|
||||
"new_cursor": {"x": 0, "y": 0},
|
||||
},
|
||||
}
|
||||
command.play_activity_beep = Mock()
|
||||
|
||||
command.run()
|
||||
|
||||
command.play_activity_beep.assert_called_once_with()
|
||||
|
||||
@@ -39,6 +39,29 @@ def test_private_sgr_sequence_from_fullscreen_apps_does_not_crash():
|
||||
assert screen["text"].splitlines()[0] == "X "
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_dcs_terminal_queries_do_not_render_as_text():
|
||||
terminal = Terminal(20, 3, DummyProcessInput())
|
||||
|
||||
terminal.feed(b"\x1bP+q6b32\x1b\\X")
|
||||
screen = terminal.get_screen_content()
|
||||
|
||||
assert screen["text"].splitlines()[0] == "X "
|
||||
assert "+q6b32" not in screen["text"]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_split_dcs_terminal_query_does_not_render_as_text():
|
||||
terminal = Terminal(20, 3, DummyProcessInput())
|
||||
|
||||
terminal.feed(b"\x1bP+q6")
|
||||
terminal.feed(b"b32\x1b\\X")
|
||||
screen = terminal.get_screen_content()
|
||||
|
||||
assert screen["text"].splitlines()[0] == "X "
|
||||
assert "+q6b32" not in screen["text"]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_optional_float_setting_uses_default_when_missing():
|
||||
settings_manager = type(
|
||||
@@ -79,6 +102,35 @@ def test_pty_stdin_input_interrupts_output_when_all_keys_interrupt_enabled():
|
||||
output_manager.interrupt_output.assert_called_once_with()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize(
|
||||
"sequence",
|
||||
[
|
||||
b"\x1b[12;40R",
|
||||
b"\x1b[?1;2c",
|
||||
b"\x1b[>85;95;0c",
|
||||
b"\x1b[4;80;24t",
|
||||
b"\x1b]10;rgb:ffff/ffff/ffff\x1b\\",
|
||||
],
|
||||
)
|
||||
def test_pty_terminal_response_stdin_does_not_interrupt_output(sequence):
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting.return_value = ""
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"runtime": {
|
||||
"SettingsManager": settings_manager,
|
||||
"OutputManager": output_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pty_driver.interrupt_output_on_stdin_input(sequence)
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_input_interrupt_does_not_block_input_injection():
|
||||
pty_driver = PtyDriver()
|
||||
@@ -160,6 +212,409 @@ def test_pty_plain_stdin_does_not_record_tab_keypress():
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_fenrir_shortcut_input():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]),
|
||||
get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_KP9"]},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_late_fenrir_shortcut_tail_after_release():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_split_fenrir_shortcut_tail_after_release():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(
|
||||
side_effect=[
|
||||
[1, ["KEY_KP7"]],
|
||||
[1, []],
|
||||
]
|
||||
),
|
||||
get_command_for_shortcut=Mock(
|
||||
side_effect=[
|
||||
"REVIEW_PREV_LINE",
|
||||
"",
|
||||
]
|
||||
),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_KP7"]},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b", Mock())
|
||||
pty_driver.env["input"]["curr_input"] = []
|
||||
pty_driver.handle_stdin_input(b"[H", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_split_ss3_fenrir_shortcut_tail_after_release():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(
|
||||
side_effect=[
|
||||
[1, ["KEY_KP7"]],
|
||||
[1, []],
|
||||
[1, []],
|
||||
]
|
||||
),
|
||||
get_command_for_shortcut=Mock(
|
||||
side_effect=[
|
||||
"REVIEW_PREV_LINE",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_KP7"]},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b", Mock())
|
||||
pty_driver.env["input"]["curr_input"] = []
|
||||
pty_driver.handle_stdin_input(b"O", Mock())
|
||||
pty_driver.handle_stdin_input(b"w", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_does_not_consume_printable_input_after_fenrir_shortcut():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"a", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_does_not_consume_stale_fenrir_shortcut_tail():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.last_fenrir_stdin_command_time = (
|
||||
time.monotonic()
|
||||
- PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
|
||||
- 0.1
|
||||
)
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_late_tail_after_recent_review_command():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "REVIEW_NEXT_LINE",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize("sequence", [b"[H", b"[1~", b"Ow"])
|
||||
def test_pty_stdin_consumes_split_tail_after_recent_review_command(sequence):
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "REVIEW_NEXT_LINE",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(sequence, Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_lone_escape_after_recent_review_command():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "REVIEW_CURR_LINE",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_does_not_consume_late_tail_after_non_review_command():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "CURSOR_POSITION",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize(
|
||||
("sequence", "key_name"),
|
||||
[
|
||||
(b"\x1b[A", "KEY_UP"),
|
||||
(b"\x1b[B", "KEY_DOWN"),
|
||||
(b"\x1b[C", "KEY_RIGHT"),
|
||||
(b"\x1b[D", "KEY_LEFT"),
|
||||
(b"\x1b[5~", "KEY_PAGEUP"),
|
||||
(b"\x1b[6~", "KEY_PAGEDOWN"),
|
||||
(b"\x1b", "KEY_ESC"),
|
||||
(b"\r", "KEY_ENTER"),
|
||||
(b" ", "KEY_SPACE"),
|
||||
(b"a", "KEY_A"),
|
||||
(b"Z", "KEY_Z"),
|
||||
],
|
||||
)
|
||||
def test_pty_vmenu_stdin_is_consumed_and_synthesizes_key_events(
|
||||
sequence,
|
||||
key_name,
|
||||
):
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"SettingsManager": settings_manager,
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(sequence, event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
assert event_queue.put.call_count == 2
|
||||
first_event = event_queue.put.call_args_list[0].args[0]
|
||||
second_event = event_queue.put.call_args_list[1].args[0]
|
||||
assert first_event["Type"] == FenrirEventType.keyboard_input
|
||||
assert first_event["data"]["event_name"] == key_name
|
||||
assert first_event["data"]["event_state"] == 1
|
||||
assert second_event["data"]["event_name"] == key_name
|
||||
assert second_event["data"]["event_state"] == 0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vmenu_unknown_stdin_is_consumed_without_injection():
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[1;5A", event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
event_queue.put.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vmenu_stdin_does_not_duplicate_current_x11_key():
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_RIGHT"]},
|
||||
"runtime": {
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[C", event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
event_queue.put.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_input_honors_interrupt_disabled():
|
||||
pty_driver = PtyDriver()
|
||||
@@ -197,6 +652,22 @@ def test_pty_stdin_input_leaves_filtered_interrupts_to_key_events():
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_terminal_attributes_use_fenrir_attribute_shape():
|
||||
terminal = Terminal(10, 2, DummyProcessInput())
|
||||
|
||||
terminal.feed(b"plain\r\n\x1b[7mfocus\x1b[0m")
|
||||
screen_content = terminal.get_screen_content()
|
||||
|
||||
plain_attribute = screen_content["attributes"][0][0]
|
||||
focused_attribute = screen_content["attributes"][1][0]
|
||||
assert len(plain_attribute) == 10
|
||||
assert len(focused_attribute) == 10
|
||||
assert plain_attribute[1] == "default"
|
||||
assert focused_attribute[1] == "reverse"
|
||||
assert focused_attribute[6] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_backspace_with_fenrir_key_synthesizes_shortcut_events():
|
||||
pty_driver = PtyDriver()
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from fenrirscreenreader.utils import char_utils
|
||||
|
||||
|
||||
COMMANDS_DIR = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "src"
|
||||
/ "fenrirscreenreader"
|
||||
/ "commands"
|
||||
/ "commands"
|
||||
)
|
||||
|
||||
|
||||
def load_command(name):
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
f"fenrir_{name}", COMMANDS_DIR / f"{name}.py"
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module.command()
|
||||
|
||||
|
||||
def build_environment(cursor):
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
output_manager = Mock()
|
||||
cursor_manager = Mock()
|
||||
cursor_manager.enter_review_mode_curr_text_cursor.return_value = None
|
||||
cursor_manager.get_review_or_text_cursor.return_value = cursor
|
||||
|
||||
return {
|
||||
"punctuation": {"PUNCTDICT": {" ": "space"}},
|
||||
"screen": {
|
||||
"newCursorReview": cursor.copy(),
|
||||
"new_cursor": cursor.copy(),
|
||||
"new_content_text": "abc\ndef",
|
||||
},
|
||||
"runtime": {
|
||||
"AttributeManager": Mock(has_attributes=Mock(return_value=False)),
|
||||
"CursorManager": cursor_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
"TableManager": Mock(is_table_mode=Mock(return_value=False)),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def run_command(name, cursor):
|
||||
env = build_environment(cursor)
|
||||
command = load_command(name)
|
||||
command.initialize(env)
|
||||
command.run()
|
||||
return env["runtime"]["OutputManager"]
|
||||
|
||||
|
||||
def boundary_call(output_manager):
|
||||
return output_manager.present_text.call_args_list[-1]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_previous_line_uses_start_of_screen_sound_at_top():
|
||||
output_manager = run_command("review_prev_line", {"x": 0, "y": 0})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "start of screen"
|
||||
assert call.kwargs["sound_icon"] == "StartOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_next_line_uses_end_of_screen_sound_at_bottom():
|
||||
output_manager = run_command("review_next_line", {"x": 0, "y": 1})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "end of screen"
|
||||
assert call.kwargs["sound_icon"] == "EndOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_previous_character_uses_start_of_screen_sound_at_top_left():
|
||||
output_manager = run_command("review_prev_char", {"x": 0, "y": 0})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "start of screen"
|
||||
assert call.kwargs["sound_icon"] == "StartOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_next_character_uses_end_of_screen_sound_at_bottom_right():
|
||||
output_manager = run_command("review_next_char", {"x": 2, "y": 1})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "end of screen"
|
||||
assert call.kwargs["sound_icon"] == "EndOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_vertical_character_navigation_reports_boundaries_only_at_edges():
|
||||
assert char_utils.get_up_char(0, 1, "abc\ndef") == (
|
||||
0,
|
||||
0,
|
||||
"a",
|
||||
False,
|
||||
)
|
||||
assert char_utils.get_up_char(0, 0, "abc\ndef") == (
|
||||
0,
|
||||
0,
|
||||
"",
|
||||
True,
|
||||
)
|
||||
assert char_utils.get_down_char(0, 0, "abc\ndef") == (
|
||||
0,
|
||||
1,
|
||||
"d",
|
||||
False,
|
||||
)
|
||||
assert char_utils.get_down_char(0, 1, "abc\ndef") == (
|
||||
0,
|
||||
1,
|
||||
"",
|
||||
True,
|
||||
)
|
||||
@@ -195,3 +195,27 @@ def test_tui_input_line_typing_is_filtered_from_mixed_repaint_delta():
|
||||
assert "Username" not in env["screen"]["new_delta"]
|
||||
assert "#channel" not in env["screen"]["new_delta"]
|
||||
assert env["screen"]["new_delta_is_typing"] is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_prompt_repaint_from_blank_line_keeps_full_prompt():
|
||||
manager, env = _build_screen_manager(
|
||||
" ".ljust(40),
|
||||
{"x": 2, "y": 0},
|
||||
)
|
||||
|
||||
manager.update(
|
||||
{
|
||||
"bytes": b"",
|
||||
"lines": 1,
|
||||
"columns": 40,
|
||||
"textCursor": {"x": 23, "y": 0},
|
||||
"screen": "pty",
|
||||
"text": "[storm@fenrir fenrir] $ ".ljust(40),
|
||||
"attributes": [],
|
||||
},
|
||||
"onScreenUpdate",
|
||||
)
|
||||
|
||||
assert env["screen"]["new_delta"] == "[storm@fenrir fenrir] $"
|
||||
assert env["screen"]["new_delta_is_typing"] is False
|
||||
|
||||
@@ -5,6 +5,8 @@ Tests the _validate_setting_value method to ensure proper input validation
|
||||
for all configurable settings that could cause crashes or accessibility issues.
|
||||
"""
|
||||
|
||||
from argparse import Namespace
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
@@ -206,6 +208,30 @@ def test_focus_settings_define_tui_toggle():
|
||||
assert settings_data["focus"]["tui"] is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.settings
|
||||
def test_format_cli_args_reports_startup_flags_in_stable_order():
|
||||
manager = SettingsManager()
|
||||
cli_args = Namespace(
|
||||
debug=True,
|
||||
foreground=False,
|
||||
force_all_screens=False,
|
||||
ignore_screen=["7"],
|
||||
options="speech#rate=1.2",
|
||||
print=False,
|
||||
setting="/tmp/settings.conf",
|
||||
x11=True,
|
||||
x11_window_id="0x123",
|
||||
)
|
||||
|
||||
assert manager.format_cli_args(cli_args) == (
|
||||
"{'debug': True, 'force_all_screens': False, 'foreground': False, "
|
||||
"'ignore_screen': ['7'], 'options': 'speech#rate=1.2', "
|
||||
"'print': False, 'setting': '/tmp/settings.conf', 'x11': True, "
|
||||
"'x11_window_id': '0x123'}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.settings
|
||||
class TestSettingsPathSelection:
|
||||
|
||||
@@ -16,11 +16,15 @@ def build_speech_history_manager(history_size=3):
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_int.return_value = history_size
|
||||
memory_manager = Mock(add_value_to_first_index=Mock())
|
||||
input_manager = Mock(reset_input_state=Mock())
|
||||
input_driver = Mock(refresh_grabs=Mock())
|
||||
env = {
|
||||
"runtime": {
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
"MemoryManager": memory_manager,
|
||||
"InputManager": input_manager,
|
||||
"InputDriver": input_driver,
|
||||
},
|
||||
"bindings": {"original": "COMMAND"},
|
||||
"rawBindings": {"original": [1, ["KEY_FENRIR"]]},
|
||||
@@ -90,6 +94,7 @@ def test_open_history_installs_modal_bindings_and_replay_is_not_recorded():
|
||||
manager, env, spoken_messages, _memory_manager = (
|
||||
build_speech_history_manager()
|
||||
)
|
||||
env["rawBindings"]["ctrl_shut_up"] = [1, ["KEY_CTRL"]]
|
||||
manager.add_text("first")
|
||||
manager.add_text("second")
|
||||
|
||||
@@ -101,10 +106,14 @@ def test_open_history_installs_modal_bindings_and_replay_is_not_recorded():
|
||||
assert manager.curr_index == -1
|
||||
assert manager.history == ["second", "first"]
|
||||
assert "original" not in env["bindings"]
|
||||
assert "original" not in env["rawBindings"]
|
||||
assert env["rawBindings"]["original"] == [1, ["KEY_FENRIR"]]
|
||||
assert env["rawBindings"]["ctrl_shut_up"] == [1, ["KEY_CTRL"]]
|
||||
assert env["bindings"][str([1, ["KEY_UP"]])] == "SPEECH_HISTORY_PREV"
|
||||
assert env["bindings"][str([1, ["KEY_ENTER"]])] == "SPEECH_HISTORY_COPY"
|
||||
assert env["bindings"][str([1, ["KEY_ESC"]])] == "SPEECH_HISTORY_CLOSE"
|
||||
assert env["rawBindings"][str([1, ["KEY_UP"]])] == [1, ["KEY_UP"]]
|
||||
input_driver = env["runtime"]["InputDriver"]
|
||||
input_driver.refresh_grabs.assert_called_once_with(force=True)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -145,3 +154,22 @@ def test_copy_current_adds_clipboard_and_restores_bindings():
|
||||
assert not manager.is_active()
|
||||
assert env["bindings"] == {"original": "COMMAND"}
|
||||
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
|
||||
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
|
||||
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_close_history_restores_keyboard_state_and_grabs():
|
||||
manager, env, _spoken_messages, _memory_manager = (
|
||||
build_speech_history_manager()
|
||||
)
|
||||
manager.add_text("first")
|
||||
manager.open_history()
|
||||
|
||||
manager.close_history()
|
||||
|
||||
assert not manager.is_active()
|
||||
assert env["bindings"] == {"original": "COMMAND"}
|
||||
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
|
||||
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
|
||||
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2
|
||||
|
||||
@@ -140,6 +140,93 @@ def test_candidate_list_speaks_visible_list_without_cursor_advance():
|
||||
assert manager.process_update() == "Documents/ Downloads/"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_full_screen_scroll_speaks_only_inserted_candidate_list():
|
||||
old_text = "\n".join(
|
||||
[
|
||||
"old top".ljust(30),
|
||||
"old middle".ljust(30),
|
||||
"old lower".ljust(30),
|
||||
"old bottom".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
)
|
||||
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 4})
|
||||
|
||||
manager.capture_if_tab()
|
||||
_set_screen_update(
|
||||
env,
|
||||
"\n".join(
|
||||
[
|
||||
"old middle".ljust(30),
|
||||
"old lower".ljust(30),
|
||||
"old bottom".ljust(30),
|
||||
"TobyAccMod_V10-0.pk3 TobyConfig.ini".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
),
|
||||
{"x": 8, "y": 4},
|
||||
delta="\n".join(
|
||||
[
|
||||
"old middle",
|
||||
"old lower",
|
||||
"old bottom",
|
||||
"TobyAccMod_V10-0.pk3 TobyConfig.ini",
|
||||
"$ ./Toby",
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
assert manager.process_update() == "TobyAccMod_V10-0.pk3 TobyConfig.ini"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_same_height_repaint_without_inserted_candidates_stays_silent():
|
||||
old_text = "\n".join(
|
||||
[
|
||||
"old top".ljust(30),
|
||||
"old middle".ljust(30),
|
||||
"old lower".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
)
|
||||
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 3})
|
||||
|
||||
manager.capture_if_tab()
|
||||
_set_screen_update(
|
||||
env,
|
||||
"\n".join(
|
||||
[
|
||||
"status line".ljust(30),
|
||||
"old prompt history".ljust(30),
|
||||
"unrelated output".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
),
|
||||
{"x": 8, "y": 3},
|
||||
delta="status line\nold prompt history\nunrelated output\n$ ./Toby",
|
||||
)
|
||||
|
||||
assert manager.process_update() == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_recent_tab_does_not_speak_delta_without_detected_completion():
|
||||
old_text = "\n".join(["$ ./Toby".ljust(20), "".ljust(20)])
|
||||
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 0})
|
||||
|
||||
manager.capture_if_tab()
|
||||
_set_screen_update(
|
||||
env,
|
||||
old_text,
|
||||
{"x": 8, "y": 0},
|
||||
delta="old unrelated screen content\n$ ./Toby",
|
||||
)
|
||||
|
||||
assert manager.process_update() == ""
|
||||
assert env["commandBuffer"]["tabCompletion"]["pending"] is not None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_no_screen_change_stays_silent_and_keeps_pending_briefly():
|
||||
manager, env, _input_manager = _build_env(
|
||||
@@ -247,6 +334,7 @@ def test_large_insertion_echo_speaks_pasted_cursor_text():
|
||||
is_delta=Mock(return_value=True),
|
||||
)
|
||||
env = {
|
||||
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
@@ -270,6 +358,7 @@ def test_large_insertion_echo_speaks_pasted_cursor_text():
|
||||
announce_capital=True,
|
||||
flush=False,
|
||||
)
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -290,6 +379,7 @@ def test_large_insertion_echo_defers_recent_tab_to_tab_completion():
|
||||
is_delta=Mock(return_value=True),
|
||||
)
|
||||
env = {
|
||||
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
@@ -308,3 +398,4 @@ def test_large_insertion_echo_defers_recent_tab_to_tab_completion():
|
||||
command.run()
|
||||
|
||||
output_manager.present_text.assert_not_called()
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _load_command():
|
||||
module_path = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "src"
|
||||
/ "fenrirscreenreader"
|
||||
/ "commands"
|
||||
/ "onCursorChange"
|
||||
/ "65000-present_line_if_cursor_change_vertical.py"
|
||||
)
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"fenrir_present_line_if_cursor_change_vertical", module_path
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module.command()
|
||||
|
||||
|
||||
def _build_environment(screen_name):
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.side_effect = (
|
||||
lambda section, setting: section == "focus" and setting == "cursor"
|
||||
)
|
||||
output_manager = Mock()
|
||||
|
||||
return {
|
||||
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
|
||||
"screen": {
|
||||
"newTTY": screen_name,
|
||||
"new_cursor": {"x": 0, "y": 1},
|
||||
"new_content_text": "first line\nsecond line",
|
||||
},
|
||||
"runtime": {
|
||||
"BarrierManager": Mock(),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=True)),
|
||||
"OutputManager": output_manager,
|
||||
"ScreenManager": Mock(
|
||||
is_screen_change=Mock(return_value=False),
|
||||
is_delta=Mock(return_value=True),
|
||||
),
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vertical_cursor_move_speaks_line_despite_repaint_delta():
|
||||
command = _load_command()
|
||||
env = _build_environment("pty")
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_called_once_with(
|
||||
"second line", interrupt=True, flush=False
|
||||
)
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_non_pty_vertical_cursor_move_still_suppresses_delta():
|
||||
command = _load_command()
|
||||
env = _build_environment("1")
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_not_called()
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_dialog_button_row_speaks_nearby_question():
|
||||
command = _load_command()
|
||||
env = _build_environment("pty")
|
||||
env["screen"]["new_cursor"] = {"x": 30, "y": 4}
|
||||
env["screen"]["new_content_text"] = "\n".join(
|
||||
[
|
||||
"".ljust(80),
|
||||
"".ljust(80),
|
||||
"Do you want to save changes?".center(80),
|
||||
"".ljust(80),
|
||||
"< Yes > < No >".center(80),
|
||||
"".ljust(80),
|
||||
]
|
||||
)
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_called_once_with(
|
||||
"Do you want to save changes?\n< Yes > < No >",
|
||||
interrupt=True,
|
||||
flush=False,
|
||||
)
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_blank_cursor_line_does_not_suppress_nonblank_delta():
|
||||
command = _load_command()
|
||||
env = _build_environment("pty")
|
||||
env["screen"]["new_cursor"] = {"x": 0, "y": 1}
|
||||
env["screen"]["new_content_text"] = "Birthday soon\n".ljust(80)
|
||||
env["screen"]["new_delta"] = "Birthday soon"
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_not_called()
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
|
||||
@@ -217,6 +217,47 @@ def test_x11_build_passive_grabs_for_fenrir_keys_and_shortcuts():
|
||||
assert ("KEY_BACKSPACE", X.Mod4Mask, True) in grabs
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize(
|
||||
"modifier_mask",
|
||||
[X.ControlMask, X.ShiftMask, X.Mod1Mask],
|
||||
)
|
||||
def test_x11_poll_modifier_interrupt_keys_interrupts_without_input_events(
|
||||
modifier_mask,
|
||||
):
|
||||
x11 = X11Driver()
|
||||
x11.active = True
|
||||
x11.modifier_interrupt_state = 0
|
||||
x11.modifier_state = 0
|
||||
x11.root = Mock()
|
||||
x11.root.query_pointer.return_value = Mock(mask=modifier_mask)
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting.return_value = ""
|
||||
output_manager = Mock()
|
||||
x11.env = {
|
||||
"input": {"event_buffer": []},
|
||||
"runtime": {
|
||||
"SettingsManager": settings_manager,
|
||||
"OutputManager": output_manager,
|
||||
"DebugManager": Mock(),
|
||||
},
|
||||
}
|
||||
|
||||
x11.poll_modifier_interrupt_keys()
|
||||
|
||||
output_manager.interrupt_output_async.assert_called_once()
|
||||
assert x11.env["input"]["event_buffer"] == []
|
||||
|
||||
output_manager.interrupt_output_async.reset_mock()
|
||||
x11.root.query_pointer.return_value = Mock(mask=0)
|
||||
|
||||
x11.poll_modifier_interrupt_keys()
|
||||
|
||||
output_manager.interrupt_output_async.assert_not_called()
|
||||
assert x11.env["input"]["event_buffer"] == []
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_x11_optional_modifier_masks_can_exclude_numlock():
|
||||
x11 = X11Driver()
|
||||
|
||||
Reference in New Issue
Block a user