Compare commits
12 Commits
788e678ed6
..
xim
| Author | SHA1 | Date | |
|---|---|---|---|
| 191fdbe8fd | |||
| fd5fe5b328 | |||
| 4ed3f4d6ab | |||
| 2cb83632f9 | |||
| 0c4fe50606 | |||
| 15f2435749 | |||
| 3897b63068 | |||
| f1a8e6af21 | |||
| bd54ec0edb | |||
| b9518f52ec | |||
| c143c9a561 | |||
| 7e2f927596 |
@@ -653,6 +653,10 @@ Building...
|
||||
- **Non-blocking**: Progress tones don't interrupt speech or other functionality
|
||||
- **Configurable**: Can be enabled/disabled as needed
|
||||
|
||||
Fenrir detects stable progress structures rather than application-specific
|
||||
status formats. Application-specific formats change too frequently to support
|
||||
reliably.
|
||||
|
||||
### Usage Examples
|
||||
|
||||
```bash
|
||||
@@ -736,7 +740,7 @@ send_fenrir_command("setting set speech#rate=0.9")
|
||||
|
||||
**Commands not working:**
|
||||
- Verify `enable_command_remote=True` in settings
|
||||
- Check Fenrir debug logs: `/var/log/fenrir.log`
|
||||
- Check Fenrir debug logs: `/tmp/fenrir.log`
|
||||
- Test with simple command: `echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock`
|
||||
|
||||
## Command Line Options
|
||||
|
||||
@@ -3,13 +3,13 @@ https://git.stormux.org/storm/fenrir/issues
|
||||
|
||||
For bugs, please provide a debug file that shows the issue.
|
||||
How to create a debug file:
|
||||
1. first delete old stuff:
|
||||
sudo rm /var/log/fenrir.log
|
||||
2. start fenrir in debug mode
|
||||
1. start fenrir in debug mode
|
||||
sudo fenrir -d
|
||||
<do your stuff>
|
||||
3.
|
||||
2.
|
||||
stop fenrir (fenrirKey + q)
|
||||
the debug file is in /var/log/fenrir.log
|
||||
the debug file is in /tmp/fenrir.log
|
||||
if another Fenrir debug instance is already using it, check /tmp/fenrir2.log,
|
||||
/tmp/fenrir3.log, etc.
|
||||
|
||||
please be as precise as possible to make it easy to solve the problem.
|
||||
|
||||
@@ -183,7 +183,8 @@ double_tap_timeout=0.2
|
||||
# The default is 0, no logging.
|
||||
debug_level=0
|
||||
# debugMode sets where the debug output should send to:
|
||||
# debugMode=File writes to debug_file (Default:/tmp/fenrir-PID.log)
|
||||
# debugMode=File writes to debug_file (Default:/tmp/fenrir.log)
|
||||
# If the default log is already in use, Fenrir uses /tmp/fenrir2.log, etc.
|
||||
# debugMode=Print just prints on the screen
|
||||
debug_mode=File
|
||||
debug_file=
|
||||
|
||||
@@ -114,7 +114,7 @@ sudo ./fenrir -f -d -p
|
||||
|
||||
# Debug output goes to:
|
||||
# - Console (with -p flag)
|
||||
# - /var/log/fenrir.log
|
||||
# - /tmp/fenrir.log
|
||||
```
|
||||
|
||||
## Creating Commands
|
||||
|
||||
+4
-2
@@ -50,7 +50,9 @@ Multiple settings can be separated by semicolons.
|
||||
|
||||
.TP
|
||||
.BR \-d ", " \-\-debug
|
||||
Enable debug mode. Debug information will be logged to /var/log/fenrir.log.
|
||||
Enable debug mode. Debug information will be logged to /tmp/fenrir.log by
|
||||
default. If another Fenrir debug instance is already using it, Fenrir uses
|
||||
/tmp/fenrir2.log, /tmp/fenrir3.log, etc.
|
||||
|
||||
.TP
|
||||
.BR \-p ", " \-\-print
|
||||
@@ -476,7 +478,7 @@ User sound themes
|
||||
User scripts
|
||||
|
||||
.TP
|
||||
.B /var/log/fenrir.log
|
||||
.B /tmp/fenrir.log
|
||||
Debug log file
|
||||
|
||||
.TP
|
||||
|
||||
+3
-3
@@ -2276,13 +2276,13 @@ that shows the issue.
|
||||
|
||||
==== How-to create a debug file
|
||||
|
||||
. Delete old debug stuff +
|
||||
`+sudo rm /var/log/fenrir.log+`
|
||||
. Start fenrir in debug mode +
|
||||
`+sudo fenrir -d+`
|
||||
. Do your stuff to reproduce the problem
|
||||
. Stop fenrir (`+fenrirKey + q+`)
|
||||
|
||||
the debug file is located in `+/var/log/fenrir.log+`
|
||||
the debug file is located in `+/tmp/fenrir.log+`. If another Fenrir debug
|
||||
instance is already using it, check `+/tmp/fenrir2.log+`,
|
||||
`+/tmp/fenrir3.log+`, etc.
|
||||
|
||||
Please be as precise as possible to make it easy to solve the problem.
|
||||
|
||||
+4
-1
@@ -312,6 +312,9 @@ Fenrir automatically detects and provides audio feedback for progress indicators
|
||||
- **Automatic**: Works with downloads, compilations, installations
|
||||
- **Remote control**: Enable via socket commands
|
||||
|
||||
Fenrir detects stable progress structures rather than application-specific
|
||||
status formats, which change too frequently to support reliably.
|
||||
|
||||
### Spell Checking
|
||||
- `Fenrir + S` - Spell check current word
|
||||
- `Fenrir + S S` - Add word to dictionary
|
||||
@@ -428,7 +431,7 @@ For a dedicated PTY/terminal screen reader, see TDSR: https://github.com/tspivey
|
||||
### Debug Mode
|
||||
```bash
|
||||
sudo fenrir -f -d
|
||||
# Debug output goes to /var/log/fenrir.log
|
||||
# Debug output goes to /tmp/fenrir.log
|
||||
```
|
||||
|
||||
## Getting Help
|
||||
|
||||
+3
-2
@@ -1315,10 +1315,11 @@ Please report Bugs and feature requests to:
|
||||
|
||||
for bugs please provide a [[#Howto create a debug file|debug]] file that shows the issue.
|
||||
==== How-to create a debug file ====
|
||||
- Delete old debug stuff\\ ''sudo rm /var/log/fenrir.log''
|
||||
- Start fenrir in debug mode\\ ''sudo fenrir -d''
|
||||
- Do your stuff to reproduce the problem
|
||||
- Stop fenrir (''fenrirKey + q'')
|
||||
the debug file is located in ''/var/log/fenrir.log''
|
||||
the debug file is located in ''/tmp/fenrir.log''. If another Fenrir debug
|
||||
instance is already using it, check ''/tmp/fenrir2.log'',
|
||||
''/tmp/fenrir3.log'', etc.
|
||||
|
||||
Please be as precise as possible to make it easy to solve the problem.
|
||||
|
||||
@@ -67,7 +67,7 @@ class command:
|
||||
)
|
||||
# is has attribute it enabled?
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
cursor_pos = self.env["screen"]["newCursorReview"]
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ class command:
|
||||
)
|
||||
# is has attribute it enabled?
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
cursor_pos = self.env["screen"]["newCursorReview"]
|
||||
|
||||
|
||||
@@ -112,9 +112,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
@@ -125,7 +125,7 @@ class command:
|
||||
)
|
||||
# is has attribute it enabled?
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
cursor_pos = self.env["screen"]["newCursorReview"]
|
||||
|
||||
|
||||
@@ -49,9 +49,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -50,9 +50,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
|
||||
def set_callback(self, callback):
|
||||
|
||||
@@ -95,9 +95,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=False,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -60,9 +60,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -50,9 +50,9 @@ class command:
|
||||
"review", "end_of_screen"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("end of screen"),
|
||||
_("start of screen"),
|
||||
interrupt=True,
|
||||
sound_icon="EndOfScreen",
|
||||
sound_icon="StartOfScreen",
|
||||
)
|
||||
if line_break:
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
|
||||
@@ -23,15 +23,15 @@ class command:
|
||||
def run(self):
|
||||
self.env["runtime"]["SettingsManager"].set_setting(
|
||||
"general",
|
||||
"hasattributes",
|
||||
"has_attributes",
|
||||
str(
|
||||
not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
)
|
||||
),
|
||||
)
|
||||
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("announcement of attributes enabled"),
|
||||
|
||||
@@ -62,6 +62,7 @@ class command:
|
||||
announce_capital=True,
|
||||
flush=False,
|
||||
)
|
||||
self.env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] = True
|
||||
|
||||
def _is_recent_tab_input(self):
|
||||
input_manager = self.env["runtime"].get("InputManager")
|
||||
|
||||
+76
-5
@@ -4,6 +4,8 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
import re
|
||||
|
||||
from fenrirscreenreader.core.i18n import _
|
||||
from fenrirscreenreader.utils import line_utils
|
||||
from fenrirscreenreader.utils import word_utils
|
||||
@@ -30,14 +32,21 @@ class command:
|
||||
if self.env["runtime"]["ScreenManager"].is_screen_change():
|
||||
self.lastIdent = 0
|
||||
return
|
||||
# Don't announce cursor movements when auto-read is handling incoming text
|
||||
# This prevents interrupting ongoing auto-read announcements
|
||||
if self.env["runtime"]["ScreenManager"].is_delta():
|
||||
return
|
||||
|
||||
# is a vertical change?
|
||||
if not self.env["runtime"]["CursorManager"].is_cursor_vertical_move():
|
||||
return
|
||||
# Don't announce cursor movements when auto-read is handling incoming text.
|
||||
# In PTY mode, TUI navigation often arrives as cursor movement plus a
|
||||
# small repaint delta, so allow the line announcement there.
|
||||
if (
|
||||
self.env["runtime"]["ScreenManager"].is_delta()
|
||||
and self.env["screen"].get("newTTY") != "pty"
|
||||
):
|
||||
return
|
||||
pty_repaint_delta = (
|
||||
self.env["screen"].get("newTTY") == "pty"
|
||||
and self.env["runtime"]["ScreenManager"].is_delta()
|
||||
)
|
||||
|
||||
x, y, curr_line = line_utils.get_current_line(
|
||||
self.env["screen"]["new_cursor"]["x"],
|
||||
@@ -52,10 +61,26 @@ class command:
|
||||
do_interrupt = False
|
||||
|
||||
if curr_line.isspace():
|
||||
if pty_repaint_delta and self._delta_has_nonblank_text():
|
||||
return
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
_("blank"), sound_icon="EmptyLine", interrupt=do_interrupt, flush=False
|
||||
)
|
||||
if pty_repaint_delta:
|
||||
self.env["commandsIgnore"]["onScreenUpdate"][
|
||||
"INCOMING_IGNORE"
|
||||
] = True
|
||||
else:
|
||||
dialog_text = self._get_pty_dialog_text(curr_line, y)
|
||||
if dialog_text:
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
dialog_text, interrupt=do_interrupt, flush=False
|
||||
)
|
||||
if pty_repaint_delta:
|
||||
self.env["commandsIgnore"]["onScreenUpdate"][
|
||||
"INCOMING_IGNORE"
|
||||
] = True
|
||||
return
|
||||
# ident
|
||||
curr_ident = len(curr_line) - len(curr_line.lstrip())
|
||||
if self.lastIdent == -1:
|
||||
@@ -98,7 +123,53 @@ class command:
|
||||
self.env["runtime"]["OutputManager"].present_text(
|
||||
say_line, interrupt=do_interrupt, flush=False
|
||||
)
|
||||
if pty_repaint_delta:
|
||||
self.env["commandsIgnore"]["onScreenUpdate"][
|
||||
"INCOMING_IGNORE"
|
||||
] = True
|
||||
self.lastIdent = curr_ident
|
||||
|
||||
def _get_pty_dialog_text(self, curr_line, curr_y):
|
||||
if self.env["screen"].get("newTTY") != "pty":
|
||||
return ""
|
||||
if not self._is_focus_control_line(curr_line):
|
||||
return ""
|
||||
screen_lines = self.env["screen"]["new_content_text"].split("\n")
|
||||
start = max(0, curr_y - 6)
|
||||
candidate_lines = []
|
||||
for line in screen_lines[start: curr_y + 1]:
|
||||
normalized = self._normalize_line(line)
|
||||
if normalized:
|
||||
candidate_lines.append(normalized)
|
||||
if len(candidate_lines) < 2:
|
||||
return ""
|
||||
if all(self._is_focus_control_line(line) for line in candidate_lines):
|
||||
return ""
|
||||
return "\n".join(candidate_lines)
|
||||
|
||||
def _delta_has_nonblank_text(self):
|
||||
return bool(self.env["screen"].get("new_delta", "").strip())
|
||||
|
||||
def _normalize_line(self, line):
|
||||
return " ".join(line.split())
|
||||
|
||||
def _is_focus_control_line(self, text):
|
||||
stripped = self._normalize_line(text)
|
||||
if not stripped:
|
||||
return False
|
||||
control = r"<\s*[A-Za-z][A-Za-z0-9 _.-]{0,30}\s*>"
|
||||
if re.fullmatch(rf"(?:{control}\s*)+", stripped):
|
||||
return True
|
||||
return stripped.lower() in {
|
||||
"ok",
|
||||
"cancel",
|
||||
"yes",
|
||||
"no",
|
||||
"retry",
|
||||
"abort",
|
||||
"ignore",
|
||||
"continue",
|
||||
}
|
||||
|
||||
def set_callback(self, callback):
|
||||
pass
|
||||
|
||||
@@ -24,7 +24,7 @@ class command:
|
||||
def run(self):
|
||||
# is it enabled?
|
||||
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"general", "hasattributes"
|
||||
"general", "has_attributes"
|
||||
):
|
||||
return
|
||||
# is a vertical change?
|
||||
|
||||
@@ -33,6 +33,8 @@ class command:
|
||||
self.env["input"]["prev_input"]
|
||||
):
|
||||
return
|
||||
if self._current_input_runs_fenrir_command():
|
||||
return
|
||||
# if the filter is set
|
||||
if (
|
||||
self.env["runtime"]["SettingsManager"]
|
||||
@@ -50,5 +52,16 @@ class command:
|
||||
return
|
||||
self.env["runtime"]["OutputManager"].interrupt_output_async()
|
||||
|
||||
def _current_input_runs_fenrir_command(self):
|
||||
input_manager = self.env["runtime"].get("InputManager")
|
||||
if input_manager is None:
|
||||
return False
|
||||
try:
|
||||
shortcut = input_manager.get_curr_shortcut()
|
||||
command = input_manager.get_command_for_shortcut(shortcut)
|
||||
except Exception:
|
||||
return False
|
||||
return isinstance(command, str) and command != ""
|
||||
|
||||
def set_callback(self, callback):
|
||||
pass
|
||||
|
||||
@@ -81,7 +81,7 @@ class command:
|
||||
delta_length = len(delta_text)
|
||||
if (
|
||||
delta_length > 200
|
||||
): # Allow longer progress lines like Claude Code's status
|
||||
): # Allow longer progress lines such as terminal status output
|
||||
if not self.is_explicit_progress_delta(delta_text):
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
f"Progress filter: delta too long ({delta_length})",
|
||||
@@ -326,43 +326,27 @@ class command:
|
||||
self.env["commandBuffer"]["lastProgressTime"] = current_time
|
||||
return
|
||||
|
||||
# Pattern 6: Claude Code working indicators (various symbols + activity text + "esc/ctrl+c to interrupt")
|
||||
# Matches any: [symbol] [Task description]… (... to interrupt ...)
|
||||
# Pattern 6: Interruptible terminal activity indicators
|
||||
# Matches any: [symbol] [Task description][…] (... to interrupt ...)
|
||||
# Symbols include: * ✢ ✽ ✶ ✻ · • ◦ ○ ● ◆ and similar decorative characters
|
||||
# Example: ✽ Reviewing script for issues… (ctrl+c to interrupt · 33s · ↑ 1.6k tokens · thought for 4s)
|
||||
claude_progress_match = re.search(
|
||||
r'[*✢✽✶✻·•◦○●◆]\s+\w+.*?…\s*\(.*(?:esc|ctrl\+c) to interrupt.*\)',
|
||||
# Keep this structural rather than adding application-specific formats,
|
||||
# which change too frequently to support reliably.
|
||||
interruptible_activity_match = re.search(
|
||||
r'[*✢✽✶✻·•◦○●◆]\s+\w+.*?(?:…\s*)?\(.*(?:esc|ctrl\+c) to interrupt.*\)',
|
||||
text,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if claude_progress_match:
|
||||
if interruptible_activity_match:
|
||||
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"Playing Claude Code activity beep",
|
||||
"Playing interruptible activity beep",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
self.play_activity_beep()
|
||||
self.env["commandBuffer"]["lastProgressTime"] = current_time
|
||||
return
|
||||
|
||||
# Pattern 6b: Claude Code tool invocation indicators (● Tool Name(...))
|
||||
# Example: ● Web Search("query here")
|
||||
tool_invocation_match = re.search(
|
||||
r'[●○◉•◦]\s+(?:Web\s*Search|Read|Write|Edit|Bash|Glob|Grep|Task|WebFetch)\s*\(',
|
||||
text,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if tool_invocation_match:
|
||||
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"Playing Claude Code tool invocation beep",
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
self.play_activity_beep()
|
||||
self.env["commandBuffer"]["lastProgressTime"] = current_time
|
||||
return
|
||||
|
||||
# Pattern 6c: Bullet/white bullet activity lines (•/◦ ...)
|
||||
# Pattern 6b: Bullet/white bullet activity lines (•/◦ ...)
|
||||
bullet_activity_match = re.search(
|
||||
(
|
||||
r'^\s*[•◦]\s+.*(?:…|\.{3,}|\b(?:thinking|working|processing|'
|
||||
|
||||
@@ -19,7 +19,9 @@ command_buffer = {
|
||||
|
||||
# used by the command_manager
|
||||
command_info = {
|
||||
# 'curr_command': '',
|
||||
"lastCommand": "",
|
||||
"lastCommandSection": "",
|
||||
"lastCommandRunTime": time.time(),
|
||||
"lastCommandExecutionTime": time.time(),
|
||||
"lastCommandRequestTime": time.time(),
|
||||
}
|
||||
|
||||
@@ -489,7 +489,10 @@ class CommandManager:
|
||||
+ str(e),
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
self.env["commandInfo"]["lastCommandExecutionTime"] = time.time()
|
||||
self.env["commandInfo"]["lastCommand"] = command
|
||||
self.env["commandInfo"]["lastCommandSection"] = section
|
||||
self.env["commandInfo"]["lastCommandRunTime"] = time.time()
|
||||
self.env["commandInfo"]["lastCommandExecutionTime"] = time.time()
|
||||
|
||||
def get_command_description(self, command, section="commands"):
|
||||
if self.command_exists(command, section):
|
||||
|
||||
@@ -3,24 +3,22 @@
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
import fcntl
|
||||
from datetime import datetime
|
||||
|
||||
from fenrirscreenreader.core import debug
|
||||
|
||||
|
||||
class DebugManager:
|
||||
DEFAULT_LOG_DIR = "/tmp"
|
||||
DEFAULT_LOG_BASENAME = "fenrir"
|
||||
DEFAULT_LOG_EXTENSION = ".log"
|
||||
|
||||
def __init__(self, file_name=""):
|
||||
self._file = None
|
||||
self._fileOpened = False
|
||||
self._fileName = (
|
||||
"/tmp/fenrir_"
|
||||
+ str(os.getpid())
|
||||
+ "_"
|
||||
+ str(datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S"))
|
||||
+ ".log"
|
||||
)
|
||||
if file_name != "":
|
||||
self._fileName = file_name
|
||||
self._fileName = file_name
|
||||
self._useDefaultLogName = file_name == ""
|
||||
|
||||
def initialize(self, environment):
|
||||
self.env = environment
|
||||
@@ -39,6 +37,10 @@ class DebugManager:
|
||||
self._fileOpened = False
|
||||
if file_name != "":
|
||||
self._fileName = file_name
|
||||
self._useDefaultLogName = False
|
||||
if self._useDefaultLogName:
|
||||
self._open_default_debug_file()
|
||||
return
|
||||
if self._fileName != "":
|
||||
directory = os.path.dirname(self._fileName)
|
||||
if not os.path.exists(directory):
|
||||
@@ -51,6 +53,43 @@ class DebugManager:
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def _open_default_debug_file(self):
|
||||
pathlib.Path(self.DEFAULT_LOG_DIR).mkdir(parents=True, exist_ok=True)
|
||||
log_number = 1
|
||||
while True:
|
||||
log_file = self._default_log_file_name(log_number)
|
||||
try:
|
||||
fd = os.open(
|
||||
log_file,
|
||||
os.O_CREAT | os.O_RDWR | os.O_NOFOLLOW,
|
||||
0o644,
|
||||
)
|
||||
file_obj = os.fdopen(fd, "a")
|
||||
fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
file_obj.seek(0)
|
||||
file_obj.truncate()
|
||||
os.chmod(log_file, 0o644)
|
||||
self._file = file_obj
|
||||
self._fileName = log_file
|
||||
self._fileOpened = True
|
||||
return
|
||||
except BlockingIOError:
|
||||
try:
|
||||
file_obj.close()
|
||||
except Exception:
|
||||
pass
|
||||
log_number += 1
|
||||
except OSError as e:
|
||||
print(e)
|
||||
return
|
||||
|
||||
def _default_log_file_name(self, log_number):
|
||||
suffix = "" if log_number == 1 else str(log_number)
|
||||
return os.path.join(
|
||||
self.DEFAULT_LOG_DIR,
|
||||
self.DEFAULT_LOG_BASENAME + suffix + self.DEFAULT_LOG_EXTENSION,
|
||||
)
|
||||
|
||||
def write_debug_out(
|
||||
self, text, level=debug.DebugLevel.DEACTIVE, on_any_level=False
|
||||
):
|
||||
@@ -120,3 +159,4 @@ class DebugManager:
|
||||
def set_debug_file(self, file_name):
|
||||
self.close_debug_file()
|
||||
self._fileName = file_name
|
||||
self._useDefaultLogName = file_name == ""
|
||||
|
||||
@@ -311,7 +311,10 @@ class FenrirManager:
|
||||
self.singleKeyCommand = True
|
||||
elif (
|
||||
(
|
||||
self.environment["runtime"]["DiffReviewManager"].is_active()
|
||||
self.environment["runtime"]["VmenuManager"].get_active()
|
||||
or self.environment["runtime"][
|
||||
"DiffReviewManager"
|
||||
].is_active()
|
||||
or self.environment["runtime"][
|
||||
"SpeechHistoryManager"
|
||||
].is_active()
|
||||
|
||||
@@ -66,6 +66,8 @@ class OutputManager:
|
||||
"present_text:\nsoundIcon:'" + sound_icon + "'\nText:\n" + text,
|
||||
debug.DebugLevel.INFO,
|
||||
)
|
||||
if interrupt and self._sound_icon_will_play(sound_icon):
|
||||
self.interrupt_output()
|
||||
if self.play_sound_icon(sound_icon, interrupt):
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"sound_icon found", debug.DebugLevel.INFO
|
||||
@@ -121,6 +123,18 @@ class OutputManager:
|
||||
return False
|
||||
return text.isupper()
|
||||
|
||||
def _sound_icon_will_play(self, sound_icon):
|
||||
if sound_icon == "":
|
||||
return False
|
||||
sound_icon = sound_icon.upper()
|
||||
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
|
||||
"sound", "enabled"
|
||||
):
|
||||
return False
|
||||
if sound_icon not in self.env["soundIcons"]:
|
||||
return False
|
||||
return self.env["runtime"]["SoundDriver"] is not None
|
||||
|
||||
def get_last_echo(self):
|
||||
return self.last_echo
|
||||
|
||||
|
||||
@@ -359,7 +359,21 @@ class ScreenManager:
|
||||
# Keep those as typing so incoming speech does not
|
||||
# announce the repainted prompt line.
|
||||
if temp_new_delta != expected_typing:
|
||||
if expected_typing.strip() != "":
|
||||
old_cursor_x = self.env["screen"]["old_cursor"][
|
||||
"x"
|
||||
]
|
||||
prefix_changed_before_cursor = (
|
||||
old_screen_text[:old_cursor_x]
|
||||
!= new_screen_text[:old_cursor_x]
|
||||
)
|
||||
likely_line_repaint = (
|
||||
prefix_changed_before_cursor
|
||||
and len(expected_typing.strip()) > 4
|
||||
)
|
||||
if (
|
||||
expected_typing.strip() != ""
|
||||
and not likely_line_repaint
|
||||
):
|
||||
diff_list = ["+ " + expected_typing]
|
||||
else:
|
||||
# Fallback: treat entire current line as new
|
||||
@@ -370,7 +384,7 @@ class ScreenManager:
|
||||
self.env["screen"]["new_cursor"]["y"]
|
||||
]
|
||||
diff_list = [
|
||||
"+ " + current_line + "\n"
|
||||
"+ " + current_line.rstrip()
|
||||
]
|
||||
typing = False
|
||||
else:
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import inspect
|
||||
import os
|
||||
from argparse import Namespace
|
||||
from configparser import ConfigParser
|
||||
|
||||
from fenrirscreenreader.core import applicationManager
|
||||
@@ -67,6 +68,15 @@ class SettingsManager:
|
||||
def shutdown(self):
|
||||
pass
|
||||
|
||||
def format_cli_args(self, cliArgs):
|
||||
if cliArgs is None:
|
||||
return "{}"
|
||||
if isinstance(cliArgs, Namespace):
|
||||
args = vars(cliArgs)
|
||||
else:
|
||||
args = vars(cliArgs) if hasattr(cliArgs, "__dict__") else {}
|
||||
return str({key: args[key] for key in sorted(args)})
|
||||
|
||||
def get_binding_backup(self):
|
||||
return self.bindingsBackup.copy()
|
||||
|
||||
@@ -644,6 +654,11 @@ class SettingsManager:
|
||||
)
|
||||
)
|
||||
environment["runtime"]["DebugManager"].initialize(environment)
|
||||
environment["runtime"]["DebugManager"].write_debug_out(
|
||||
"Fenrir startup CLI arguments: " + self.format_cli_args(cliArgs),
|
||||
debug.DebugLevel.INFO,
|
||||
on_any_level=True,
|
||||
)
|
||||
|
||||
if cliArgs.force_all_screens:
|
||||
environment["runtime"]["force_all_screens"] = True
|
||||
|
||||
@@ -132,13 +132,6 @@ class TabCompletionManager:
|
||||
if candidate_text:
|
||||
return self._clean_text(candidate_text)
|
||||
|
||||
delta_text = self.env["screen"]["new_delta"]
|
||||
if (
|
||||
delta_text
|
||||
and not self.env["screen"].get("new_delta_is_typing", False)
|
||||
):
|
||||
return self._clean_text(delta_text)
|
||||
|
||||
return ""
|
||||
|
||||
def _get_cursor_line_inserted_text(
|
||||
@@ -184,26 +177,19 @@ class TabCompletionManager:
|
||||
return "".join(inserted_parts)
|
||||
|
||||
def _get_candidate_text(self, old_lines, new_lines, cursor_y):
|
||||
if len(old_lines) != len(new_lines):
|
||||
return self._get_inserted_lines(old_lines, new_lines, cursor_y)
|
||||
|
||||
changed_lines = []
|
||||
old_cursor_line = (
|
||||
old_lines[cursor_y].strip() if cursor_y < len(old_lines) else ""
|
||||
)
|
||||
for index, old_line in enumerate(old_lines):
|
||||
if index == cursor_y:
|
||||
continue
|
||||
if index < len(new_lines) and old_line != new_lines[index]:
|
||||
if new_lines[index].strip() == old_cursor_line:
|
||||
continue
|
||||
changed_lines.append(new_lines[index])
|
||||
|
||||
return "\n".join(
|
||||
line.rstrip() for line in changed_lines if line.strip()
|
||||
return self._get_inserted_lines(
|
||||
old_lines,
|
||||
new_lines,
|
||||
self.env["screen"]["new_cursor"]["y"],
|
||||
old_cursor_line,
|
||||
)
|
||||
|
||||
def _get_inserted_lines(self, old_lines, new_lines, cursor_y):
|
||||
def _get_inserted_lines(
|
||||
self, old_lines, new_lines, new_cursor_y, old_cursor_line
|
||||
):
|
||||
matcher = difflib.SequenceMatcher(
|
||||
None, old_lines, new_lines, autojunk=False
|
||||
)
|
||||
@@ -217,10 +203,15 @@ class TabCompletionManager:
|
||||
) in matcher.get_opcodes():
|
||||
if tag not in ["insert", "replace"]:
|
||||
continue
|
||||
if new_end <= cursor_y:
|
||||
if new_start > new_cursor_y:
|
||||
continue
|
||||
if tag == "replace" and any(
|
||||
line.strip() for line in old_lines[old_start:old_end]
|
||||
):
|
||||
continue
|
||||
for line in new_lines[new_start:new_end]:
|
||||
if line.strip():
|
||||
stripped_line = line.strip()
|
||||
if stripped_line and stripped_line != old_cursor_line:
|
||||
inserted_lines.append(line.rstrip())
|
||||
return "\n".join(inserted_lines)
|
||||
|
||||
|
||||
@@ -4,5 +4,5 @@
|
||||
# Fenrir TTY screen reader
|
||||
# By Chrys, Storm Dragon, and contributors.
|
||||
|
||||
version = "2026.05.24"
|
||||
code_name = "master"
|
||||
version = "2026.06.04"
|
||||
code_name = "xim"
|
||||
|
||||
@@ -162,6 +162,7 @@ class driver(inputDriver):
|
||||
self.fenrir_keys = set()
|
||||
self.failed_grabs = 0
|
||||
self.modifier_state = 0
|
||||
self.modifier_interrupt_state = 0
|
||||
|
||||
def initialize(self, environment):
|
||||
self.env = environment
|
||||
@@ -194,6 +195,7 @@ class driver(inputDriver):
|
||||
)
|
||||
self.num_lock_mask = self.find_num_lock_mask()
|
||||
self.refresh_modifier_state()
|
||||
self.modifier_interrupt_state = self.modifier_state
|
||||
self.refresh_interesting_keys()
|
||||
self.refresh_grabs(force=True)
|
||||
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
|
||||
@@ -274,6 +276,7 @@ class driver(inputDriver):
|
||||
while active.value:
|
||||
try:
|
||||
self.refresh_grabs()
|
||||
self.poll_modifier_interrupt_keys()
|
||||
if not self.display.pending_events():
|
||||
time.sleep(0.01)
|
||||
continue
|
||||
@@ -371,6 +374,56 @@ class driver(inputDriver):
|
||||
"event_x_time": getattr(event, "time", X.CurrentTime),
|
||||
}
|
||||
|
||||
def poll_modifier_interrupt_keys(self):
|
||||
if not self.active or not self.should_poll_modifier_interrupt_keys():
|
||||
return
|
||||
try:
|
||||
pointer = self.root.query_pointer()
|
||||
current_state = getattr(pointer, "mask", 0)
|
||||
except Exception:
|
||||
return
|
||||
previous_state = self.modifier_interrupt_state
|
||||
self.modifier_interrupt_state = current_state
|
||||
self.modifier_state = current_state
|
||||
for key_name, modifier_mask in self.interrupt_modifier_masks():
|
||||
if current_state & modifier_mask and not previous_state & modifier_mask:
|
||||
self.interrupt_output_on_modifier_key(key_name)
|
||||
|
||||
def should_poll_modifier_interrupt_keys(self):
|
||||
try:
|
||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||
except Exception:
|
||||
return False
|
||||
if not settings_manager.get_setting_as_bool(
|
||||
"keyboard", "interrupt_on_key_press"
|
||||
):
|
||||
return False
|
||||
return (
|
||||
settings_manager.get_setting(
|
||||
"keyboard", "interrupt_on_key_press_filter"
|
||||
).strip()
|
||||
== ""
|
||||
)
|
||||
|
||||
def interrupt_modifier_masks(self):
|
||||
return [
|
||||
("KEY_CTRL", X.ControlMask),
|
||||
("KEY_SHIFT", X.ShiftMask),
|
||||
("KEY_ALT", X.Mod1Mask),
|
||||
]
|
||||
|
||||
def interrupt_output_on_modifier_key(self, key_name):
|
||||
try:
|
||||
self.env["runtime"]["OutputManager"].interrupt_output_async()
|
||||
except Exception as e:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"x11Driver modifier interrupt failed for "
|
||||
+ key_name
|
||||
+ ": "
|
||||
+ str(e),
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
|
||||
def refresh_modifier_state(self):
|
||||
try:
|
||||
pointer = self.root.query_pointer()
|
||||
|
||||
@@ -8,6 +8,7 @@ import fcntl
|
||||
import getpass
|
||||
import os
|
||||
import pty
|
||||
import re
|
||||
import shlex
|
||||
from queue import Full
|
||||
import signal
|
||||
@@ -35,6 +36,7 @@ class PTYConstants:
|
||||
SELECT_TIMEOUT = 0.05
|
||||
PROCESS_TERMINATION_TIMEOUT = 3.0
|
||||
PROCESS_KILL_DELAY = 0.5
|
||||
FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT = 0.3
|
||||
|
||||
# Polling intervals (in seconds)
|
||||
MIN_POLL_INTERVAL = 0.001
|
||||
@@ -75,6 +77,8 @@ class Terminal:
|
||||
)
|
||||
self.stream = pyte.ByteStream()
|
||||
self.stream.attach(self.screen)
|
||||
self._pending_control_bytes = b""
|
||||
self._discarding_control_string = False
|
||||
|
||||
def _log_error(self, message, level=None):
|
||||
"""Log error message using proper debug manager if available."""
|
||||
@@ -93,7 +97,53 @@ class Terminal:
|
||||
print(f"PTY Terminal: {message}")
|
||||
|
||||
def feed(self, data):
|
||||
self.stream.feed(data)
|
||||
self.stream.feed(self._filter_terminal_control_strings(data))
|
||||
|
||||
def _filter_terminal_control_strings(self, data):
|
||||
if not data:
|
||||
return data
|
||||
data = self._pending_control_bytes + data
|
||||
self._pending_control_bytes = b""
|
||||
output = bytearray()
|
||||
index = 0
|
||||
while index < len(data):
|
||||
if self._discarding_control_string:
|
||||
end = self._find_control_string_end(data, index)
|
||||
if end == -1:
|
||||
return bytes(output)
|
||||
index = end
|
||||
self._discarding_control_string = False
|
||||
continue
|
||||
|
||||
byte = data[index]
|
||||
if byte == 0x90:
|
||||
self._discarding_control_string = True
|
||||
index += 1
|
||||
continue
|
||||
if byte == 0x1B:
|
||||
if index + 1 >= len(data):
|
||||
self._pending_control_bytes = data[index:]
|
||||
break
|
||||
next_byte = data[index + 1]
|
||||
if next_byte in b"P^_X":
|
||||
self._discarding_control_string = True
|
||||
index += 2
|
||||
continue
|
||||
output.append(byte)
|
||||
index += 1
|
||||
return bytes(output)
|
||||
|
||||
def _find_control_string_end(self, data, start):
|
||||
c1_end = data.find(b"\x9c", start)
|
||||
st_end = data.find(b"\x1b\\", start)
|
||||
ends = [
|
||||
end + 1 for end in [c1_end] if end != -1
|
||||
] + [
|
||||
end + 2 for end in [st_end] if end != -1
|
||||
]
|
||||
if not ends:
|
||||
return -1
|
||||
return min(ends)
|
||||
|
||||
def update_attributes(self, initialize=False):
|
||||
buffer = self.screen.buffer
|
||||
@@ -105,8 +155,8 @@ class Terminal:
|
||||
try:
|
||||
self.attributes = [
|
||||
[
|
||||
list(attribute[1:]) + [False, "default", "default"]
|
||||
if len(attribute) > 1 else [False, "default", "default"]
|
||||
self._attribute_from_pyte_char(attribute)
|
||||
if len(attribute) > 1 else self._default_attribute[:]
|
||||
for attribute in line.values()
|
||||
]
|
||||
for line in buffer.values()
|
||||
@@ -143,7 +193,7 @@ class Terminal:
|
||||
|
||||
try:
|
||||
self.attributes[y] = [
|
||||
list(attribute[1:]) + [False, "default", "default"]
|
||||
self._attribute_from_pyte_char(attribute)
|
||||
for attribute in (buffer[y].values())
|
||||
]
|
||||
except Exception as e:
|
||||
@@ -155,6 +205,27 @@ class Terminal:
|
||||
# Use pre-created template for efficiency
|
||||
self.attributes[y] += [self._default_attribute[:] for _ in range(diff)]
|
||||
|
||||
def _attribute_from_pyte_char(self, attribute):
|
||||
fg = attribute.fg
|
||||
bg = attribute.bg
|
||||
reverse = bool(attribute.reverse)
|
||||
if reverse:
|
||||
fg, bg = bg, fg
|
||||
if fg == "default" and bg == "default":
|
||||
bg = "reverse"
|
||||
return [
|
||||
fg,
|
||||
bg,
|
||||
bool(attribute.bold),
|
||||
bool(attribute.italics),
|
||||
bool(attribute.underscore),
|
||||
bool(attribute.strikethrough),
|
||||
reverse,
|
||||
bool(attribute.blink),
|
||||
"default",
|
||||
"default",
|
||||
]
|
||||
|
||||
def resize(self, lines, columns):
|
||||
self.screen.resize(lines, columns)
|
||||
self.set_cursor()
|
||||
@@ -205,6 +276,8 @@ class driver(screenDriver):
|
||||
self.stdin_interrupt_lock = threading.Lock()
|
||||
self.stdin_interrupt_running = False
|
||||
self.stdin_interrupt_thread = None
|
||||
self.last_fenrir_stdin_command_time = 0.0
|
||||
self.fenrir_stdin_sequence_prefix = b""
|
||||
signal.signal(signal.SIGWINCH, self.handle_sigwinch)
|
||||
|
||||
# Runtime configuration storage
|
||||
@@ -295,6 +368,8 @@ class driver(screenDriver):
|
||||
def interrupt_output_on_stdin_input(self, msg_bytes):
|
||||
if not msg_bytes:
|
||||
return
|
||||
if self.is_terminal_response_sequence(msg_bytes):
|
||||
return
|
||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||
if not settings_manager.get_setting_as_bool(
|
||||
"keyboard", "interrupt_on_key_press"
|
||||
@@ -317,6 +392,29 @@ class driver(screenDriver):
|
||||
)
|
||||
self.stdin_interrupt_thread.start()
|
||||
|
||||
def is_terminal_response_sequence(self, msg_bytes):
|
||||
if not msg_bytes or not msg_bytes.startswith(b"\x1b"):
|
||||
return False
|
||||
if msg_bytes.startswith((b"\x1b]", b"\x1bP", b"\x1b_", b"\x1b^")):
|
||||
return True
|
||||
if not msg_bytes.startswith(b"\x1b["):
|
||||
return False
|
||||
try:
|
||||
sequence = msg_bytes.decode("ascii", errors="ignore")
|
||||
except Exception:
|
||||
return False
|
||||
if len(sequence) < 3:
|
||||
return False
|
||||
final_byte = sequence[-1]
|
||||
if final_byte not in "cRnt":
|
||||
return False
|
||||
body = sequence[2:-1]
|
||||
if final_byte == "R":
|
||||
return bool(re.fullmatch(r"\??\d+;\d+", body))
|
||||
if final_byte in "cnt":
|
||||
return bool(re.fullmatch(r"[?>=0-9;]*", body))
|
||||
return False
|
||||
|
||||
def run_stdin_interrupt(self):
|
||||
try:
|
||||
self.env["runtime"]["OutputManager"].interrupt_output()
|
||||
@@ -333,10 +431,189 @@ class driver(screenDriver):
|
||||
def handle_stdin_input(self, msg_bytes, event_queue):
|
||||
if self.synthesize_backspace_shortcut(msg_bytes, event_queue):
|
||||
return
|
||||
if self.handle_vmenu_stdin_input(msg_bytes, event_queue):
|
||||
return
|
||||
if self.stdin_matches_fenrir_command(msg_bytes):
|
||||
return
|
||||
if self.is_late_fenrir_shortcut_sequence(msg_bytes):
|
||||
return
|
||||
self.record_stdin_keypress(msg_bytes)
|
||||
self.interrupt_output_on_stdin_input(msg_bytes)
|
||||
self.inject_text_to_screen(msg_bytes)
|
||||
|
||||
def stdin_matches_fenrir_command(self, msg_bytes):
|
||||
input_manager = self.env["runtime"].get("InputManager")
|
||||
if input_manager is None:
|
||||
return False
|
||||
try:
|
||||
shortcut = input_manager.get_curr_shortcut()
|
||||
command = input_manager.get_command_for_shortcut(shortcut)
|
||||
except Exception:
|
||||
return False
|
||||
if not isinstance(command, str) or command == "":
|
||||
return False
|
||||
self.record_consumed_fenrir_stdin_sequence(msg_bytes)
|
||||
return True
|
||||
|
||||
def is_late_fenrir_shortcut_sequence(self, msg_bytes):
|
||||
if not self.recent_fenrir_stdin_command():
|
||||
if not self.recent_review_command_execution():
|
||||
return False
|
||||
if self.is_terminal_response_sequence(msg_bytes):
|
||||
return False
|
||||
if self.consume_keyboard_escape_sequence_fragment(msg_bytes):
|
||||
return True
|
||||
return False
|
||||
|
||||
def record_consumed_fenrir_stdin_sequence(self, msg_bytes):
|
||||
self.last_fenrir_stdin_command_time = time.monotonic()
|
||||
self.fenrir_stdin_sequence_prefix = self.remaining_keyboard_sequence_prefix(
|
||||
msg_bytes
|
||||
)
|
||||
|
||||
def recent_fenrir_stdin_command(self):
|
||||
return (
|
||||
time.monotonic() - self.last_fenrir_stdin_command_time
|
||||
<= PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
|
||||
)
|
||||
|
||||
def recent_review_command_execution(self):
|
||||
command_info = self.env.get("commandInfo", {})
|
||||
last_command = command_info.get("lastCommand", "")
|
||||
last_section = command_info.get("lastCommandSection", "")
|
||||
if last_section != "commands" or not last_command.startswith("REVIEW_"):
|
||||
return False
|
||||
try:
|
||||
last_time = float(command_info.get("lastCommandRunTime", 0))
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return (
|
||||
time.time() - last_time
|
||||
<= PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
|
||||
)
|
||||
|
||||
def is_keyboard_escape_sequence(self, msg_bytes):
|
||||
if msg_bytes == b"\x1b":
|
||||
return True
|
||||
if not msg_bytes or len(msg_bytes) < 2:
|
||||
return False
|
||||
if msg_bytes.startswith(b"\x1bO") and 3 <= len(msg_bytes) <= 4:
|
||||
return True
|
||||
if not msg_bytes.startswith(b"\x1b["):
|
||||
return False
|
||||
try:
|
||||
sequence = msg_bytes.decode("ascii", errors="ignore")
|
||||
except Exception:
|
||||
return False
|
||||
if len(sequence) < 3:
|
||||
return False
|
||||
return sequence[-1] in "~ABCDHFZPQRS"
|
||||
|
||||
def consume_keyboard_escape_sequence_fragment(self, msg_bytes):
|
||||
if self.is_keyboard_escape_sequence(msg_bytes):
|
||||
self.record_consumed_fenrir_stdin_sequence(msg_bytes)
|
||||
return True
|
||||
if self.fenrir_stdin_sequence_prefix:
|
||||
combined = self.fenrir_stdin_sequence_prefix + msg_bytes
|
||||
prefix = self.remaining_keyboard_sequence_prefix(combined)
|
||||
if self.is_keyboard_escape_sequence(combined) or prefix != b"":
|
||||
self.last_fenrir_stdin_command_time = time.monotonic()
|
||||
self.fenrir_stdin_sequence_prefix = prefix
|
||||
return True
|
||||
if msg_bytes.startswith((b"[", b"O")) and len(msg_bytes) > 1:
|
||||
combined = b"\x1b" + msg_bytes
|
||||
if self.is_keyboard_escape_sequence(combined):
|
||||
self.record_consumed_fenrir_stdin_sequence(combined)
|
||||
return True
|
||||
return False
|
||||
|
||||
def remaining_keyboard_sequence_prefix(self, msg_bytes):
|
||||
if not msg_bytes:
|
||||
return b""
|
||||
if msg_bytes == b"\x1b":
|
||||
return msg_bytes
|
||||
if msg_bytes.startswith(b"\x1bO") and len(msg_bytes) < 3:
|
||||
return msg_bytes
|
||||
if self.is_keyboard_escape_sequence(msg_bytes):
|
||||
return b""
|
||||
if msg_bytes.startswith(b"\x1b["):
|
||||
try:
|
||||
sequence = msg_bytes.decode("ascii", errors="ignore")
|
||||
except Exception:
|
||||
return b""
|
||||
if len(sequence) < 3:
|
||||
return msg_bytes
|
||||
if sequence[-1] not in "~ABCDHFZPQRS":
|
||||
return msg_bytes
|
||||
return b""
|
||||
|
||||
def handle_vmenu_stdin_input(self, msg_bytes, event_queue):
|
||||
if not self.is_vmenu_active():
|
||||
return False
|
||||
key_name = self.vmenu_stdin_key_name(msg_bytes)
|
||||
if key_name and not self.vmenu_key_already_handled(key_name):
|
||||
self.queue_keypress(key_name, event_queue)
|
||||
return True
|
||||
|
||||
def is_vmenu_active(self):
|
||||
try:
|
||||
return self.env["runtime"]["VmenuManager"].get_active()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def vmenu_stdin_key_name(self, msg_bytes):
|
||||
key_map = {
|
||||
b"\x1b": "KEY_ESC",
|
||||
b"\x1b[A": "KEY_UP",
|
||||
b"\x1b[B": "KEY_DOWN",
|
||||
b"\x1b[C": "KEY_RIGHT",
|
||||
b"\x1b[D": "KEY_LEFT",
|
||||
b"\x1b[5~": "KEY_PAGEUP",
|
||||
b"\x1b[6~": "KEY_PAGEDOWN",
|
||||
b"\r": "KEY_ENTER",
|
||||
b"\n": "KEY_ENTER",
|
||||
b" ": "KEY_SPACE",
|
||||
}
|
||||
if msg_bytes in key_map:
|
||||
return key_map[msg_bytes]
|
||||
if len(msg_bytes) != 1:
|
||||
return None
|
||||
char = chr(msg_bytes[0])
|
||||
if "a" <= char <= "z" or "A" <= char <= "Z":
|
||||
return "KEY_" + char.upper()
|
||||
return None
|
||||
|
||||
def vmenu_key_already_handled(self, key_name):
|
||||
try:
|
||||
return key_name in self.env["input"]["curr_input"]
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def queue_keypress(self, key_name, event_queue):
|
||||
event_time = time.time()
|
||||
for event_state in [1, 0]:
|
||||
try:
|
||||
event_queue.put(
|
||||
{
|
||||
"Type": FenrirEventType.keyboard_input,
|
||||
"data": {
|
||||
"event_name": key_name,
|
||||
"event_value": 0,
|
||||
"event_sec": int(event_time),
|
||||
"event_usec": int((event_time % 1) * 1000000),
|
||||
"event_state": event_state,
|
||||
"event_type": 0,
|
||||
},
|
||||
},
|
||||
block=False,
|
||||
)
|
||||
except Full:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"ptyDriver queue_keypress: Event queue full, dropping "
|
||||
+ key_name,
|
||||
debug.DebugLevel.WARNING,
|
||||
)
|
||||
|
||||
def record_stdin_keypress(self, msg_bytes):
|
||||
if msg_bytes != b"\t":
|
||||
return
|
||||
|
||||
@@ -47,7 +47,6 @@ def get_up_char(curr_x, curr_y, curr_text):
|
||||
curr_y -= 1
|
||||
if curr_y < 0:
|
||||
curr_y = 0
|
||||
else:
|
||||
end_of_screen = True
|
||||
curr_char = ""
|
||||
if not end_of_screen:
|
||||
@@ -63,7 +62,6 @@ def get_down_char(curr_x, curr_y, curr_text):
|
||||
curr_y += 1
|
||||
if curr_y >= len(wrapped_lines):
|
||||
curr_y = len(wrapped_lines) - 1
|
||||
else:
|
||||
end_of_screen = True
|
||||
curr_char = ""
|
||||
if not end_of_screen:
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
from fenrirscreenreader.core.debugManager import DebugManager
|
||||
|
||||
|
||||
def test_default_debug_file_uses_flat_name(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
|
||||
manager = DebugManager()
|
||||
try:
|
||||
manager.open_debug_file()
|
||||
|
||||
assert manager.get_debug_file() == str(tmp_path / "fenrir.log")
|
||||
assert (tmp_path / "fenrir.log").exists()
|
||||
finally:
|
||||
manager.close_debug_file()
|
||||
|
||||
|
||||
def test_default_debug_file_uses_next_number_when_locked(
|
||||
tmp_path, monkeypatch
|
||||
):
|
||||
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
|
||||
first_manager = DebugManager()
|
||||
second_manager = DebugManager()
|
||||
try:
|
||||
first_manager.open_debug_file()
|
||||
second_manager.open_debug_file()
|
||||
|
||||
assert first_manager.get_debug_file() == str(tmp_path / "fenrir.log")
|
||||
assert second_manager.get_debug_file() == str(
|
||||
tmp_path / "fenrir2.log"
|
||||
)
|
||||
assert (tmp_path / "fenrir2.log").exists()
|
||||
finally:
|
||||
second_manager.close_debug_file()
|
||||
first_manager.close_debug_file()
|
||||
|
||||
|
||||
def test_default_debug_file_reuses_unlocked_flat_name(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
|
||||
first_manager = DebugManager()
|
||||
second_manager = DebugManager()
|
||||
try:
|
||||
first_manager.open_debug_file()
|
||||
first_manager.close_debug_file()
|
||||
second_manager.open_debug_file()
|
||||
|
||||
assert second_manager.get_debug_file() == str(tmp_path / "fenrir.log")
|
||||
finally:
|
||||
second_manager.close_debug_file()
|
||||
|
||||
|
||||
def test_explicit_debug_file_uses_exact_path(tmp_path):
|
||||
debug_file = tmp_path / "custom.log"
|
||||
manager = DebugManager(str(debug_file))
|
||||
try:
|
||||
manager.open_debug_file()
|
||||
|
||||
assert manager.get_debug_file() == str(debug_file)
|
||||
assert debug_file.exists()
|
||||
finally:
|
||||
manager.close_debug_file()
|
||||
@@ -22,6 +22,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
)
|
||||
speech_history_manager = Mock(is_active=Mock(return_value=True))
|
||||
diff_review_manager = Mock(is_active=Mock(return_value=False))
|
||||
vmenu_manager = Mock(get_active=Mock(return_value=False))
|
||||
|
||||
manager.environment = {
|
||||
"input": {
|
||||
@@ -32,6 +33,7 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"EventManager": event_manager,
|
||||
"VmenuManager": vmenu_manager,
|
||||
"DiffReviewManager": diff_review_manager,
|
||||
"SpeechHistoryManager": speech_history_manager,
|
||||
},
|
||||
@@ -42,3 +44,43 @@ def test_speech_history_plain_key_modal_command_is_dispatched():
|
||||
event_manager.put_to_event_queue.assert_called_once_with(
|
||||
FenrirEventType.execute_command, "SPEECH_HISTORY_PREV"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_vmenu_plain_key_modal_command_is_dispatched():
|
||||
manager = FenrirManager.__new__(FenrirManager)
|
||||
manager.modifierInput = False
|
||||
manager.singleKeyCommand = False
|
||||
manager.command = ""
|
||||
|
||||
event_manager = Mock(put_to_event_queue=Mock())
|
||||
input_manager = Mock(
|
||||
is_key_press=Mock(return_value=False),
|
||||
no_key_pressed=Mock(return_value=False),
|
||||
get_curr_shortcut=Mock(return_value=str([1, ["KEY_UP"]])),
|
||||
get_command_for_shortcut=Mock(return_value="PREV_VMENU_ENTRY"),
|
||||
)
|
||||
vmenu_manager = Mock(get_active=Mock(return_value=True))
|
||||
speech_history_manager = Mock(is_active=Mock(return_value=False))
|
||||
diff_review_manager = Mock(is_active=Mock(return_value=False))
|
||||
|
||||
manager.environment = {
|
||||
"input": {
|
||||
"key_forward": 0,
|
||||
"prev_input": ["KEY_UP"],
|
||||
"curr_input": ["KEY_UP"],
|
||||
},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"EventManager": event_manager,
|
||||
"VmenuManager": vmenu_manager,
|
||||
"DiffReviewManager": diff_review_manager,
|
||||
"SpeechHistoryManager": speech_history_manager,
|
||||
},
|
||||
}
|
||||
|
||||
manager.detect_shortcut_command()
|
||||
|
||||
event_manager.put_to_event_queue.assert_called_once_with(
|
||||
FenrirEventType.execute_command, "PREV_VMENU_ENTRY"
|
||||
)
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _load_command():
|
||||
module_path = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "src"
|
||||
/ "fenrirscreenreader"
|
||||
/ "commands"
|
||||
/ "onCursorChange"
|
||||
/ "85000-has_attribute.py"
|
||||
)
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"fenrir_has_attribute", module_path
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module.command()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_has_attribute_uses_configured_setting_name():
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
command = _load_command()
|
||||
command.initialize(
|
||||
{
|
||||
"runtime": {
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
command.run()
|
||||
|
||||
settings_manager.get_setting_as_bool.assert_called_once_with(
|
||||
"general", "has_attributes"
|
||||
)
|
||||
@@ -67,6 +67,21 @@ def test_present_text_allows_sound_only_feedback():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_present_text_sound_icon_with_interrupt_cancels_speech():
|
||||
output_manager, sound_driver, speech_driver = build_output_manager()
|
||||
|
||||
output_manager.present_text(
|
||||
"end of screen", sound_icon="Accept", interrupt=True
|
||||
)
|
||||
|
||||
speech_driver.cancel.assert_called_once_with()
|
||||
sound_driver.play_sound_file.assert_called_once_with(
|
||||
"/tmp/Accept.wav", True
|
||||
)
|
||||
speech_driver.speak.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_play_sound_supports_error_alias():
|
||||
output_manager, sound_driver, _speech_driver = build_output_manager()
|
||||
@@ -227,3 +242,35 @@ def test_key_interrupt_command_uses_nonblocking_interrupt():
|
||||
|
||||
output_manager.interrupt_output_async.assert_called_once_with()
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_key_interrupt_command_ignores_fenrir_shortcuts():
|
||||
module = load_key_interrupt_module()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting.return_value = ""
|
||||
input_manager = Mock(
|
||||
no_key_pressed=Mock(return_value=False),
|
||||
get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]),
|
||||
get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"),
|
||||
)
|
||||
output_manager = Mock()
|
||||
env = {
|
||||
"input": {
|
||||
"curr_input": ["KEY_KP9"],
|
||||
"prev_input": [],
|
||||
},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
command = module.command()
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
output_manager.interrupt_output_async.assert_not_called()
|
||||
|
||||
@@ -109,3 +109,34 @@ def test_progress_detector_beeps_for_long_tqdm_transfer_delta():
|
||||
|
||||
command.play_progress_tone.assert_called_once_with(90.0)
|
||||
assert command.env["commandBuffer"]["lastProgressValue"] == 90.0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_progress_detector_beeps_for_interruptible_status_without_ellipsis():
|
||||
progress_module = _load_progress_module()
|
||||
command = progress_module.command()
|
||||
sample = "◦ Files: (1m 04s • esc to interrupt)"
|
||||
command.env = {
|
||||
"commandBuffer": {
|
||||
"progress_monitoring": True,
|
||||
"lastProgressValue": -1,
|
||||
"lastProgressTime": 0,
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
|
||||
},
|
||||
"screen": {
|
||||
"new_delta": sample,
|
||||
"new_delta_is_typing": False,
|
||||
"new_content_text": sample,
|
||||
"old_cursor": {"x": 0, "y": 0},
|
||||
"new_cursor": {"x": 0, "y": 0},
|
||||
},
|
||||
}
|
||||
command.play_activity_beep = Mock()
|
||||
|
||||
command.run()
|
||||
|
||||
command.play_activity_beep.assert_called_once_with()
|
||||
|
||||
@@ -39,6 +39,29 @@ def test_private_sgr_sequence_from_fullscreen_apps_does_not_crash():
|
||||
assert screen["text"].splitlines()[0] == "X "
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_dcs_terminal_queries_do_not_render_as_text():
|
||||
terminal = Terminal(20, 3, DummyProcessInput())
|
||||
|
||||
terminal.feed(b"\x1bP+q6b32\x1b\\X")
|
||||
screen = terminal.get_screen_content()
|
||||
|
||||
assert screen["text"].splitlines()[0] == "X "
|
||||
assert "+q6b32" not in screen["text"]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_split_dcs_terminal_query_does_not_render_as_text():
|
||||
terminal = Terminal(20, 3, DummyProcessInput())
|
||||
|
||||
terminal.feed(b"\x1bP+q6")
|
||||
terminal.feed(b"b32\x1b\\X")
|
||||
screen = terminal.get_screen_content()
|
||||
|
||||
assert screen["text"].splitlines()[0] == "X "
|
||||
assert "+q6b32" not in screen["text"]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_optional_float_setting_uses_default_when_missing():
|
||||
settings_manager = type(
|
||||
@@ -79,6 +102,35 @@ def test_pty_stdin_input_interrupts_output_when_all_keys_interrupt_enabled():
|
||||
output_manager.interrupt_output.assert_called_once_with()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize(
|
||||
"sequence",
|
||||
[
|
||||
b"\x1b[12;40R",
|
||||
b"\x1b[?1;2c",
|
||||
b"\x1b[>85;95;0c",
|
||||
b"\x1b[4;80;24t",
|
||||
b"\x1b]10;rgb:ffff/ffff/ffff\x1b\\",
|
||||
],
|
||||
)
|
||||
def test_pty_terminal_response_stdin_does_not_interrupt_output(sequence):
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting.return_value = ""
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"runtime": {
|
||||
"SettingsManager": settings_manager,
|
||||
"OutputManager": output_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pty_driver.interrupt_output_on_stdin_input(sequence)
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_input_interrupt_does_not_block_input_injection():
|
||||
pty_driver = PtyDriver()
|
||||
@@ -160,6 +212,409 @@ def test_pty_plain_stdin_does_not_record_tab_keypress():
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_fenrir_shortcut_input():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]),
|
||||
get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_KP9"]},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_late_fenrir_shortcut_tail_after_release():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_split_fenrir_shortcut_tail_after_release():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(
|
||||
side_effect=[
|
||||
[1, ["KEY_KP7"]],
|
||||
[1, []],
|
||||
]
|
||||
),
|
||||
get_command_for_shortcut=Mock(
|
||||
side_effect=[
|
||||
"REVIEW_PREV_LINE",
|
||||
"",
|
||||
]
|
||||
),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_KP7"]},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b", Mock())
|
||||
pty_driver.env["input"]["curr_input"] = []
|
||||
pty_driver.handle_stdin_input(b"[H", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_split_ss3_fenrir_shortcut_tail_after_release():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(
|
||||
side_effect=[
|
||||
[1, ["KEY_KP7"]],
|
||||
[1, []],
|
||||
[1, []],
|
||||
]
|
||||
),
|
||||
get_command_for_shortcut=Mock(
|
||||
side_effect=[
|
||||
"REVIEW_PREV_LINE",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_KP7"]},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b", Mock())
|
||||
pty_driver.env["input"]["curr_input"] = []
|
||||
pty_driver.handle_stdin_input(b"O", Mock())
|
||||
pty_driver.handle_stdin_input(b"w", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_does_not_consume_printable_input_after_fenrir_shortcut():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"a", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_does_not_consume_stale_fenrir_shortcut_tail():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.last_fenrir_stdin_command_time = (
|
||||
time.monotonic()
|
||||
- PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
|
||||
- 0.1
|
||||
)
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_late_tail_after_recent_review_command():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "REVIEW_NEXT_LINE",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize("sequence", [b"[H", b"[1~", b"Ow"])
|
||||
def test_pty_stdin_consumes_split_tail_after_recent_review_command(sequence):
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "REVIEW_NEXT_LINE",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(sequence, Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_consumes_lone_escape_after_recent_review_command():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
output_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "REVIEW_CURR_LINE",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b", Mock())
|
||||
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_does_not_consume_late_tail_after_non_review_command():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock(
|
||||
get_curr_shortcut=Mock(return_value=[1, []]),
|
||||
get_command_for_shortcut=Mock(return_value=""),
|
||||
)
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"commandInfo": {
|
||||
"lastCommand": "CURSOR_POSITION",
|
||||
"lastCommandSection": "commands",
|
||||
"lastCommandRunTime": time.time(),
|
||||
},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize(
|
||||
("sequence", "key_name"),
|
||||
[
|
||||
(b"\x1b[A", "KEY_UP"),
|
||||
(b"\x1b[B", "KEY_DOWN"),
|
||||
(b"\x1b[C", "KEY_RIGHT"),
|
||||
(b"\x1b[D", "KEY_LEFT"),
|
||||
(b"\x1b[5~", "KEY_PAGEUP"),
|
||||
(b"\x1b[6~", "KEY_PAGEDOWN"),
|
||||
(b"\x1b", "KEY_ESC"),
|
||||
(b"\r", "KEY_ENTER"),
|
||||
(b" ", "KEY_SPACE"),
|
||||
(b"a", "KEY_A"),
|
||||
(b"Z", "KEY_Z"),
|
||||
],
|
||||
)
|
||||
def test_pty_vmenu_stdin_is_consumed_and_synthesizes_key_events(
|
||||
sequence,
|
||||
key_name,
|
||||
):
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"SettingsManager": settings_manager,
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(sequence, event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
assert event_queue.put.call_count == 2
|
||||
first_event = event_queue.put.call_args_list[0].args[0]
|
||||
second_event = event_queue.put.call_args_list[1].args[0]
|
||||
assert first_event["Type"] == FenrirEventType.keyboard_input
|
||||
assert first_event["data"]["event_name"] == key_name
|
||||
assert first_event["data"]["event_state"] == 1
|
||||
assert second_event["data"]["event_name"] == key_name
|
||||
assert second_event["data"]["event_state"] == 0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vmenu_unknown_stdin_is_consumed_without_injection():
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[1;5A", event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
event_queue.put.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vmenu_stdin_does_not_duplicate_current_x11_key():
|
||||
pty_driver = PtyDriver()
|
||||
event_queue = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": ["KEY_RIGHT"]},
|
||||
"runtime": {
|
||||
"VmenuManager": Mock(get_active=Mock(return_value=True)),
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\x1b[C", event_queue)
|
||||
|
||||
pty_driver.inject_text_to_screen.assert_not_called()
|
||||
event_queue.put.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_input_honors_interrupt_disabled():
|
||||
pty_driver = PtyDriver()
|
||||
@@ -197,6 +652,22 @@ def test_pty_stdin_input_leaves_filtered_interrupts_to_key_events():
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_terminal_attributes_use_fenrir_attribute_shape():
|
||||
terminal = Terminal(10, 2, DummyProcessInput())
|
||||
|
||||
terminal.feed(b"plain\r\n\x1b[7mfocus\x1b[0m")
|
||||
screen_content = terminal.get_screen_content()
|
||||
|
||||
plain_attribute = screen_content["attributes"][0][0]
|
||||
focused_attribute = screen_content["attributes"][1][0]
|
||||
assert len(plain_attribute) == 10
|
||||
assert len(focused_attribute) == 10
|
||||
assert plain_attribute[1] == "default"
|
||||
assert focused_attribute[1] == "reverse"
|
||||
assert focused_attribute[6] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_backspace_with_fenrir_key_synthesizes_shortcut_events():
|
||||
pty_driver = PtyDriver()
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from fenrirscreenreader.utils import char_utils
|
||||
|
||||
|
||||
COMMANDS_DIR = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "src"
|
||||
/ "fenrirscreenreader"
|
||||
/ "commands"
|
||||
/ "commands"
|
||||
)
|
||||
|
||||
|
||||
def load_command(name):
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
f"fenrir_{name}", COMMANDS_DIR / f"{name}.py"
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module.command()
|
||||
|
||||
|
||||
def build_environment(cursor):
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
output_manager = Mock()
|
||||
cursor_manager = Mock()
|
||||
cursor_manager.enter_review_mode_curr_text_cursor.return_value = None
|
||||
cursor_manager.get_review_or_text_cursor.return_value = cursor
|
||||
|
||||
return {
|
||||
"punctuation": {"PUNCTDICT": {" ": "space"}},
|
||||
"screen": {
|
||||
"newCursorReview": cursor.copy(),
|
||||
"new_cursor": cursor.copy(),
|
||||
"new_content_text": "abc\ndef",
|
||||
},
|
||||
"runtime": {
|
||||
"AttributeManager": Mock(has_attributes=Mock(return_value=False)),
|
||||
"CursorManager": cursor_manager,
|
||||
"OutputManager": output_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
"TableManager": Mock(is_table_mode=Mock(return_value=False)),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def run_command(name, cursor):
|
||||
env = build_environment(cursor)
|
||||
command = load_command(name)
|
||||
command.initialize(env)
|
||||
command.run()
|
||||
return env["runtime"]["OutputManager"]
|
||||
|
||||
|
||||
def boundary_call(output_manager):
|
||||
return output_manager.present_text.call_args_list[-1]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_previous_line_uses_start_of_screen_sound_at_top():
|
||||
output_manager = run_command("review_prev_line", {"x": 0, "y": 0})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "start of screen"
|
||||
assert call.kwargs["sound_icon"] == "StartOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_next_line_uses_end_of_screen_sound_at_bottom():
|
||||
output_manager = run_command("review_next_line", {"x": 0, "y": 1})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "end of screen"
|
||||
assert call.kwargs["sound_icon"] == "EndOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_previous_character_uses_start_of_screen_sound_at_top_left():
|
||||
output_manager = run_command("review_prev_char", {"x": 0, "y": 0})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "start of screen"
|
||||
assert call.kwargs["sound_icon"] == "StartOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_next_character_uses_end_of_screen_sound_at_bottom_right():
|
||||
output_manager = run_command("review_next_char", {"x": 2, "y": 1})
|
||||
|
||||
call = boundary_call(output_manager)
|
||||
assert call.args[0] == "end of screen"
|
||||
assert call.kwargs["sound_icon"] == "EndOfScreen"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_vertical_character_navigation_reports_boundaries_only_at_edges():
|
||||
assert char_utils.get_up_char(0, 1, "abc\ndef") == (
|
||||
0,
|
||||
0,
|
||||
"a",
|
||||
False,
|
||||
)
|
||||
assert char_utils.get_up_char(0, 0, "abc\ndef") == (
|
||||
0,
|
||||
0,
|
||||
"",
|
||||
True,
|
||||
)
|
||||
assert char_utils.get_down_char(0, 0, "abc\ndef") == (
|
||||
0,
|
||||
1,
|
||||
"d",
|
||||
False,
|
||||
)
|
||||
assert char_utils.get_down_char(0, 1, "abc\ndef") == (
|
||||
0,
|
||||
1,
|
||||
"",
|
||||
True,
|
||||
)
|
||||
@@ -195,3 +195,27 @@ def test_tui_input_line_typing_is_filtered_from_mixed_repaint_delta():
|
||||
assert "Username" not in env["screen"]["new_delta"]
|
||||
assert "#channel" not in env["screen"]["new_delta"]
|
||||
assert env["screen"]["new_delta_is_typing"] is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_prompt_repaint_from_blank_line_keeps_full_prompt():
|
||||
manager, env = _build_screen_manager(
|
||||
" ".ljust(40),
|
||||
{"x": 2, "y": 0},
|
||||
)
|
||||
|
||||
manager.update(
|
||||
{
|
||||
"bytes": b"",
|
||||
"lines": 1,
|
||||
"columns": 40,
|
||||
"textCursor": {"x": 23, "y": 0},
|
||||
"screen": "pty",
|
||||
"text": "[storm@fenrir fenrir] $ ".ljust(40),
|
||||
"attributes": [],
|
||||
},
|
||||
"onScreenUpdate",
|
||||
)
|
||||
|
||||
assert env["screen"]["new_delta"] == "[storm@fenrir fenrir] $"
|
||||
assert env["screen"]["new_delta_is_typing"] is False
|
||||
|
||||
@@ -5,6 +5,8 @@ Tests the _validate_setting_value method to ensure proper input validation
|
||||
for all configurable settings that could cause crashes or accessibility issues.
|
||||
"""
|
||||
|
||||
from argparse import Namespace
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
@@ -206,6 +208,30 @@ def test_focus_settings_define_tui_toggle():
|
||||
assert settings_data["focus"]["tui"] is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.settings
|
||||
def test_format_cli_args_reports_startup_flags_in_stable_order():
|
||||
manager = SettingsManager()
|
||||
cli_args = Namespace(
|
||||
debug=True,
|
||||
foreground=False,
|
||||
force_all_screens=False,
|
||||
ignore_screen=["7"],
|
||||
options="speech#rate=1.2",
|
||||
print=False,
|
||||
setting="/tmp/settings.conf",
|
||||
x11=True,
|
||||
x11_window_id="0x123",
|
||||
)
|
||||
|
||||
assert manager.format_cli_args(cli_args) == (
|
||||
"{'debug': True, 'force_all_screens': False, 'foreground': False, "
|
||||
"'ignore_screen': ['7'], 'options': 'speech#rate=1.2', "
|
||||
"'print': False, 'setting': '/tmp/settings.conf', 'x11': True, "
|
||||
"'x11_window_id': '0x123'}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.settings
|
||||
class TestSettingsPathSelection:
|
||||
|
||||
@@ -140,6 +140,93 @@ def test_candidate_list_speaks_visible_list_without_cursor_advance():
|
||||
assert manager.process_update() == "Documents/ Downloads/"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_full_screen_scroll_speaks_only_inserted_candidate_list():
|
||||
old_text = "\n".join(
|
||||
[
|
||||
"old top".ljust(30),
|
||||
"old middle".ljust(30),
|
||||
"old lower".ljust(30),
|
||||
"old bottom".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
)
|
||||
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 4})
|
||||
|
||||
manager.capture_if_tab()
|
||||
_set_screen_update(
|
||||
env,
|
||||
"\n".join(
|
||||
[
|
||||
"old middle".ljust(30),
|
||||
"old lower".ljust(30),
|
||||
"old bottom".ljust(30),
|
||||
"TobyAccMod_V10-0.pk3 TobyConfig.ini".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
),
|
||||
{"x": 8, "y": 4},
|
||||
delta="\n".join(
|
||||
[
|
||||
"old middle",
|
||||
"old lower",
|
||||
"old bottom",
|
||||
"TobyAccMod_V10-0.pk3 TobyConfig.ini",
|
||||
"$ ./Toby",
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
assert manager.process_update() == "TobyAccMod_V10-0.pk3 TobyConfig.ini"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_same_height_repaint_without_inserted_candidates_stays_silent():
|
||||
old_text = "\n".join(
|
||||
[
|
||||
"old top".ljust(30),
|
||||
"old middle".ljust(30),
|
||||
"old lower".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
)
|
||||
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 3})
|
||||
|
||||
manager.capture_if_tab()
|
||||
_set_screen_update(
|
||||
env,
|
||||
"\n".join(
|
||||
[
|
||||
"status line".ljust(30),
|
||||
"old prompt history".ljust(30),
|
||||
"unrelated output".ljust(30),
|
||||
"$ ./Toby".ljust(30),
|
||||
]
|
||||
),
|
||||
{"x": 8, "y": 3},
|
||||
delta="status line\nold prompt history\nunrelated output\n$ ./Toby",
|
||||
)
|
||||
|
||||
assert manager.process_update() == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_recent_tab_does_not_speak_delta_without_detected_completion():
|
||||
old_text = "\n".join(["$ ./Toby".ljust(20), "".ljust(20)])
|
||||
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 0})
|
||||
|
||||
manager.capture_if_tab()
|
||||
_set_screen_update(
|
||||
env,
|
||||
old_text,
|
||||
{"x": 8, "y": 0},
|
||||
delta="old unrelated screen content\n$ ./Toby",
|
||||
)
|
||||
|
||||
assert manager.process_update() == ""
|
||||
assert env["commandBuffer"]["tabCompletion"]["pending"] is not None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_no_screen_change_stays_silent_and_keeps_pending_briefly():
|
||||
manager, env, _input_manager = _build_env(
|
||||
@@ -247,6 +334,7 @@ def test_large_insertion_echo_speaks_pasted_cursor_text():
|
||||
is_delta=Mock(return_value=True),
|
||||
)
|
||||
env = {
|
||||
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
@@ -270,6 +358,7 @@ def test_large_insertion_echo_speaks_pasted_cursor_text():
|
||||
announce_capital=True,
|
||||
flush=False,
|
||||
)
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -290,6 +379,7 @@ def test_large_insertion_echo_defers_recent_tab_to_tab_completion():
|
||||
is_delta=Mock(return_value=True),
|
||||
)
|
||||
env = {
|
||||
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
|
||||
"runtime": {
|
||||
"InputManager": input_manager,
|
||||
"OutputManager": output_manager,
|
||||
@@ -308,3 +398,4 @@ def test_large_insertion_echo_defers_recent_tab_to_tab_completion():
|
||||
command.run()
|
||||
|
||||
output_manager.present_text.assert_not_called()
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _load_command():
|
||||
module_path = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "src"
|
||||
/ "fenrirscreenreader"
|
||||
/ "commands"
|
||||
/ "onCursorChange"
|
||||
/ "65000-present_line_if_cursor_change_vertical.py"
|
||||
)
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"fenrir_present_line_if_cursor_change_vertical", module_path
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module.command()
|
||||
|
||||
|
||||
def _build_environment(screen_name):
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.side_effect = (
|
||||
lambda section, setting: section == "focus" and setting == "cursor"
|
||||
)
|
||||
output_manager = Mock()
|
||||
|
||||
return {
|
||||
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
|
||||
"screen": {
|
||||
"newTTY": screen_name,
|
||||
"new_cursor": {"x": 0, "y": 1},
|
||||
"new_content_text": "first line\nsecond line",
|
||||
},
|
||||
"runtime": {
|
||||
"BarrierManager": Mock(),
|
||||
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=True)),
|
||||
"OutputManager": output_manager,
|
||||
"ScreenManager": Mock(
|
||||
is_screen_change=Mock(return_value=False),
|
||||
is_delta=Mock(return_value=True),
|
||||
),
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_vertical_cursor_move_speaks_line_despite_repaint_delta():
|
||||
command = _load_command()
|
||||
env = _build_environment("pty")
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_called_once_with(
|
||||
"second line", interrupt=True, flush=False
|
||||
)
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_non_pty_vertical_cursor_move_still_suppresses_delta():
|
||||
command = _load_command()
|
||||
env = _build_environment("1")
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_not_called()
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_dialog_button_row_speaks_nearby_question():
|
||||
command = _load_command()
|
||||
env = _build_environment("pty")
|
||||
env["screen"]["new_cursor"] = {"x": 30, "y": 4}
|
||||
env["screen"]["new_content_text"] = "\n".join(
|
||||
[
|
||||
"".ljust(80),
|
||||
"".ljust(80),
|
||||
"Do you want to save changes?".center(80),
|
||||
"".ljust(80),
|
||||
"< Yes > < No >".center(80),
|
||||
"".ljust(80),
|
||||
]
|
||||
)
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_called_once_with(
|
||||
"Do you want to save changes?\n< Yes > < No >",
|
||||
interrupt=True,
|
||||
flush=False,
|
||||
)
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_blank_cursor_line_does_not_suppress_nonblank_delta():
|
||||
command = _load_command()
|
||||
env = _build_environment("pty")
|
||||
env["screen"]["new_cursor"] = {"x": 0, "y": 1}
|
||||
env["screen"]["new_content_text"] = "Birthday soon\n".ljust(80)
|
||||
env["screen"]["new_delta"] = "Birthday soon"
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
env["runtime"]["OutputManager"].present_text.assert_not_called()
|
||||
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
|
||||
@@ -217,6 +217,47 @@ def test_x11_build_passive_grabs_for_fenrir_keys_and_shortcuts():
|
||||
assert ("KEY_BACKSPACE", X.Mod4Mask, True) in grabs
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.parametrize(
|
||||
"modifier_mask",
|
||||
[X.ControlMask, X.ShiftMask, X.Mod1Mask],
|
||||
)
|
||||
def test_x11_poll_modifier_interrupt_keys_interrupts_without_input_events(
|
||||
modifier_mask,
|
||||
):
|
||||
x11 = X11Driver()
|
||||
x11.active = True
|
||||
x11.modifier_interrupt_state = 0
|
||||
x11.modifier_state = 0
|
||||
x11.root = Mock()
|
||||
x11.root.query_pointer.return_value = Mock(mask=modifier_mask)
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting.return_value = ""
|
||||
output_manager = Mock()
|
||||
x11.env = {
|
||||
"input": {"event_buffer": []},
|
||||
"runtime": {
|
||||
"SettingsManager": settings_manager,
|
||||
"OutputManager": output_manager,
|
||||
"DebugManager": Mock(),
|
||||
},
|
||||
}
|
||||
|
||||
x11.poll_modifier_interrupt_keys()
|
||||
|
||||
output_manager.interrupt_output_async.assert_called_once()
|
||||
assert x11.env["input"]["event_buffer"] == []
|
||||
|
||||
output_manager.interrupt_output_async.reset_mock()
|
||||
x11.root.query_pointer.return_value = Mock(mask=0)
|
||||
|
||||
x11.poll_modifier_interrupt_keys()
|
||||
|
||||
output_manager.interrupt_output_async.assert_not_called()
|
||||
assert x11.env["input"]["event_buffer"] == []
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_x11_optional_modifier_masks_can_exclude_numlock():
|
||||
x11 = X11Driver()
|
||||
|
||||
Reference in New Issue
Block a user