Bug fixes in -x, things should read better now.

This commit is contained in:
Storm Dragon
2026-06-04 14:21:49 -04:00
parent fd5fe5b328
commit 191fdbe8fd
20 changed files with 969 additions and 22 deletions
@@ -67,7 +67,7 @@ class command:
)
# is has attribute it enabled?
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "hasattributes"
"general", "has_attributes"
):
cursor_pos = self.env["screen"]["newCursorReview"]
@@ -119,7 +119,7 @@ class command:
)
# is has attribute it enabled?
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "hasattributes"
"general", "has_attributes"
):
cursor_pos = self.env["screen"]["newCursorReview"]
@@ -125,7 +125,7 @@ class command:
)
# is has attribute it enabled?
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "hasattributes"
"general", "has_attributes"
):
cursor_pos = self.env["screen"]["newCursorReview"]
@@ -23,15 +23,15 @@ class command:
def run(self):
self.env["runtime"]["SettingsManager"].set_setting(
"general",
"hasattributes",
"has_attributes",
str(
not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "hasattributes"
"general", "has_attributes"
)
),
)
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "hasattributes"
"general", "has_attributes"
):
self.env["runtime"]["OutputManager"].present_text(
_("announcement of attributes enabled"),
@@ -62,6 +62,7 @@ class command:
announce_capital=True,
flush=False,
)
self.env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] = True
def _is_recent_tab_input(self):
input_manager = self.env["runtime"].get("InputManager")
@@ -4,6 +4,8 @@
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import re
from fenrirscreenreader.core.i18n import _
from fenrirscreenreader.utils import line_utils
from fenrirscreenreader.utils import word_utils
@@ -30,14 +32,21 @@ class command:
if self.env["runtime"]["ScreenManager"].is_screen_change():
self.lastIdent = 0
return
# Don't announce cursor movements when auto-read is handling incoming text
# This prevents interrupting ongoing auto-read announcements
if self.env["runtime"]["ScreenManager"].is_delta():
return
# is a vertical change?
if not self.env["runtime"]["CursorManager"].is_cursor_vertical_move():
return
# Don't announce cursor movements when auto-read is handling incoming text.
# In PTY mode, TUI navigation often arrives as cursor movement plus a
# small repaint delta, so allow the line announcement there.
if (
self.env["runtime"]["ScreenManager"].is_delta()
and self.env["screen"].get("newTTY") != "pty"
):
return
pty_repaint_delta = (
self.env["screen"].get("newTTY") == "pty"
and self.env["runtime"]["ScreenManager"].is_delta()
)
x, y, curr_line = line_utils.get_current_line(
self.env["screen"]["new_cursor"]["x"],
@@ -52,10 +61,26 @@ class command:
do_interrupt = False
if curr_line.isspace():
if pty_repaint_delta and self._delta_has_nonblank_text():
return
self.env["runtime"]["OutputManager"].present_text(
_("blank"), sound_icon="EmptyLine", interrupt=do_interrupt, flush=False
)
if pty_repaint_delta:
self.env["commandsIgnore"]["onScreenUpdate"][
"INCOMING_IGNORE"
] = True
else:
dialog_text = self._get_pty_dialog_text(curr_line, y)
if dialog_text:
self.env["runtime"]["OutputManager"].present_text(
dialog_text, interrupt=do_interrupt, flush=False
)
if pty_repaint_delta:
self.env["commandsIgnore"]["onScreenUpdate"][
"INCOMING_IGNORE"
] = True
return
# ident
curr_ident = len(curr_line) - len(curr_line.lstrip())
if self.lastIdent == -1:
@@ -98,7 +123,53 @@ class command:
self.env["runtime"]["OutputManager"].present_text(
say_line, interrupt=do_interrupt, flush=False
)
if pty_repaint_delta:
self.env["commandsIgnore"]["onScreenUpdate"][
"INCOMING_IGNORE"
] = True
self.lastIdent = curr_ident
def _get_pty_dialog_text(self, curr_line, curr_y):
if self.env["screen"].get("newTTY") != "pty":
return ""
if not self._is_focus_control_line(curr_line):
return ""
screen_lines = self.env["screen"]["new_content_text"].split("\n")
start = max(0, curr_y - 6)
candidate_lines = []
for line in screen_lines[start: curr_y + 1]:
normalized = self._normalize_line(line)
if normalized:
candidate_lines.append(normalized)
if len(candidate_lines) < 2:
return ""
if all(self._is_focus_control_line(line) for line in candidate_lines):
return ""
return "\n".join(candidate_lines)
def _delta_has_nonblank_text(self):
return bool(self.env["screen"].get("new_delta", "").strip())
def _normalize_line(self, line):
return " ".join(line.split())
def _is_focus_control_line(self, text):
stripped = self._normalize_line(text)
if not stripped:
return False
control = r"<\s*[A-Za-z][A-Za-z0-9 _.-]{0,30}\s*>"
if re.fullmatch(rf"(?:{control}\s*)+", stripped):
return True
return stripped.lower() in {
"ok",
"cancel",
"yes",
"no",
"retry",
"abort",
"ignore",
"continue",
}
def set_callback(self, callback):
pass
@@ -24,7 +24,7 @@ class command:
def run(self):
# is it enabled?
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "hasattributes"
"general", "has_attributes"
):
return
# is a vertical change?
@@ -33,6 +33,8 @@ class command:
self.env["input"]["prev_input"]
):
return
if self._current_input_runs_fenrir_command():
return
# if the filter is set
if (
self.env["runtime"]["SettingsManager"]
@@ -50,5 +52,16 @@ class command:
return
self.env["runtime"]["OutputManager"].interrupt_output_async()
def _current_input_runs_fenrir_command(self):
input_manager = self.env["runtime"].get("InputManager")
if input_manager is None:
return False
try:
shortcut = input_manager.get_curr_shortcut()
command = input_manager.get_command_for_shortcut(shortcut)
except Exception:
return False
return isinstance(command, str) and command != ""
def set_callback(self, callback):
pass
+3 -1
View File
@@ -19,7 +19,9 @@ command_buffer = {
# used by the command_manager
command_info = {
# 'curr_command': '',
"lastCommand": "",
"lastCommandSection": "",
"lastCommandRunTime": time.time(),
"lastCommandExecutionTime": time.time(),
"lastCommandRequestTime": time.time(),
}
@@ -489,7 +489,10 @@ class CommandManager:
+ str(e),
debug.DebugLevel.ERROR,
)
self.env["commandInfo"]["lastCommandExecutionTime"] = time.time()
self.env["commandInfo"]["lastCommand"] = command
self.env["commandInfo"]["lastCommandSection"] = section
self.env["commandInfo"]["lastCommandRunTime"] = time.time()
self.env["commandInfo"]["lastCommandExecutionTime"] = time.time()
def get_command_description(self, command, section="commands"):
if self.command_exists(command, section):
@@ -66,6 +66,8 @@ class OutputManager:
"present_text:\nsoundIcon:'" + sound_icon + "'\nText:\n" + text,
debug.DebugLevel.INFO,
)
if interrupt and self._sound_icon_will_play(sound_icon):
self.interrupt_output()
if self.play_sound_icon(sound_icon, interrupt):
self.env["runtime"]["DebugManager"].write_debug_out(
"sound_icon found", debug.DebugLevel.INFO
@@ -121,6 +123,18 @@ class OutputManager:
return False
return text.isupper()
def _sound_icon_will_play(self, sound_icon):
if sound_icon == "":
return False
sound_icon = sound_icon.upper()
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"sound", "enabled"
):
return False
if sound_icon not in self.env["soundIcons"]:
return False
return self.env["runtime"]["SoundDriver"] is not None
def get_last_echo(self):
return self.last_echo
+16 -2
View File
@@ -359,7 +359,21 @@ class ScreenManager:
# Keep those as typing so incoming speech does not
# announce the repainted prompt line.
if temp_new_delta != expected_typing:
if expected_typing.strip() != "":
old_cursor_x = self.env["screen"]["old_cursor"][
"x"
]
prefix_changed_before_cursor = (
old_screen_text[:old_cursor_x]
!= new_screen_text[:old_cursor_x]
)
likely_line_repaint = (
prefix_changed_before_cursor
and len(expected_typing.strip()) > 4
)
if (
expected_typing.strip() != ""
and not likely_line_repaint
):
diff_list = ["+ " + expected_typing]
else:
# Fallback: treat entire current line as new
@@ -370,7 +384,7 @@ class ScreenManager:
self.env["screen"]["new_cursor"]["y"]
]
diff_list = [
"+ " + current_line + "\n"
"+ " + current_line.rstrip()
]
typing = False
else:
+2 -2
View File
@@ -4,5 +4,5 @@
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
version = "2026.06.01"
code_name = "master"
version = "2026.06.04"
code_name = "xim"
@@ -8,6 +8,7 @@ import fcntl
import getpass
import os
import pty
import re
import shlex
from queue import Full
import signal
@@ -35,6 +36,7 @@ class PTYConstants:
SELECT_TIMEOUT = 0.05
PROCESS_TERMINATION_TIMEOUT = 3.0
PROCESS_KILL_DELAY = 0.5
FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT = 0.3
# Polling intervals (in seconds)
MIN_POLL_INTERVAL = 0.001
@@ -75,6 +77,8 @@ class Terminal:
)
self.stream = pyte.ByteStream()
self.stream.attach(self.screen)
self._pending_control_bytes = b""
self._discarding_control_string = False
def _log_error(self, message, level=None):
"""Log error message using proper debug manager if available."""
@@ -93,7 +97,53 @@ class Terminal:
print(f"PTY Terminal: {message}")
def feed(self, data):
self.stream.feed(data)
self.stream.feed(self._filter_terminal_control_strings(data))
def _filter_terminal_control_strings(self, data):
if not data:
return data
data = self._pending_control_bytes + data
self._pending_control_bytes = b""
output = bytearray()
index = 0
while index < len(data):
if self._discarding_control_string:
end = self._find_control_string_end(data, index)
if end == -1:
return bytes(output)
index = end
self._discarding_control_string = False
continue
byte = data[index]
if byte == 0x90:
self._discarding_control_string = True
index += 1
continue
if byte == 0x1B:
if index + 1 >= len(data):
self._pending_control_bytes = data[index:]
break
next_byte = data[index + 1]
if next_byte in b"P^_X":
self._discarding_control_string = True
index += 2
continue
output.append(byte)
index += 1
return bytes(output)
def _find_control_string_end(self, data, start):
c1_end = data.find(b"\x9c", start)
st_end = data.find(b"\x1b\\", start)
ends = [
end + 1 for end in [c1_end] if end != -1
] + [
end + 2 for end in [st_end] if end != -1
]
if not ends:
return -1
return min(ends)
def update_attributes(self, initialize=False):
buffer = self.screen.buffer
@@ -105,8 +155,8 @@ class Terminal:
try:
self.attributes = [
[
list(attribute[1:]) + [False, "default", "default"]
if len(attribute) > 1 else [False, "default", "default"]
self._attribute_from_pyte_char(attribute)
if len(attribute) > 1 else self._default_attribute[:]
for attribute in line.values()
]
for line in buffer.values()
@@ -143,7 +193,7 @@ class Terminal:
try:
self.attributes[y] = [
list(attribute[1:]) + [False, "default", "default"]
self._attribute_from_pyte_char(attribute)
for attribute in (buffer[y].values())
]
except Exception as e:
@@ -155,6 +205,27 @@ class Terminal:
# Use pre-created template for efficiency
self.attributes[y] += [self._default_attribute[:] for _ in range(diff)]
def _attribute_from_pyte_char(self, attribute):
fg = attribute.fg
bg = attribute.bg
reverse = bool(attribute.reverse)
if reverse:
fg, bg = bg, fg
if fg == "default" and bg == "default":
bg = "reverse"
return [
fg,
bg,
bool(attribute.bold),
bool(attribute.italics),
bool(attribute.underscore),
bool(attribute.strikethrough),
reverse,
bool(attribute.blink),
"default",
"default",
]
def resize(self, lines, columns):
self.screen.resize(lines, columns)
self.set_cursor()
@@ -205,6 +276,8 @@ class driver(screenDriver):
self.stdin_interrupt_lock = threading.Lock()
self.stdin_interrupt_running = False
self.stdin_interrupt_thread = None
self.last_fenrir_stdin_command_time = 0.0
self.fenrir_stdin_sequence_prefix = b""
signal.signal(signal.SIGWINCH, self.handle_sigwinch)
# Runtime configuration storage
@@ -295,6 +368,8 @@ class driver(screenDriver):
def interrupt_output_on_stdin_input(self, msg_bytes):
if not msg_bytes:
return
if self.is_terminal_response_sequence(msg_bytes):
return
settings_manager = self.env["runtime"]["SettingsManager"]
if not settings_manager.get_setting_as_bool(
"keyboard", "interrupt_on_key_press"
@@ -317,6 +392,29 @@ class driver(screenDriver):
)
self.stdin_interrupt_thread.start()
def is_terminal_response_sequence(self, msg_bytes):
if not msg_bytes or not msg_bytes.startswith(b"\x1b"):
return False
if msg_bytes.startswith((b"\x1b]", b"\x1bP", b"\x1b_", b"\x1b^")):
return True
if not msg_bytes.startswith(b"\x1b["):
return False
try:
sequence = msg_bytes.decode("ascii", errors="ignore")
except Exception:
return False
if len(sequence) < 3:
return False
final_byte = sequence[-1]
if final_byte not in "cRnt":
return False
body = sequence[2:-1]
if final_byte == "R":
return bool(re.fullmatch(r"\??\d+;\d+", body))
if final_byte in "cnt":
return bool(re.fullmatch(r"[?>=0-9;]*", body))
return False
def run_stdin_interrupt(self):
try:
self.env["runtime"]["OutputManager"].interrupt_output()
@@ -335,10 +433,120 @@ class driver(screenDriver):
return
if self.handle_vmenu_stdin_input(msg_bytes, event_queue):
return
if self.stdin_matches_fenrir_command(msg_bytes):
return
if self.is_late_fenrir_shortcut_sequence(msg_bytes):
return
self.record_stdin_keypress(msg_bytes)
self.interrupt_output_on_stdin_input(msg_bytes)
self.inject_text_to_screen(msg_bytes)
def stdin_matches_fenrir_command(self, msg_bytes):
input_manager = self.env["runtime"].get("InputManager")
if input_manager is None:
return False
try:
shortcut = input_manager.get_curr_shortcut()
command = input_manager.get_command_for_shortcut(shortcut)
except Exception:
return False
if not isinstance(command, str) or command == "":
return False
self.record_consumed_fenrir_stdin_sequence(msg_bytes)
return True
def is_late_fenrir_shortcut_sequence(self, msg_bytes):
if not self.recent_fenrir_stdin_command():
if not self.recent_review_command_execution():
return False
if self.is_terminal_response_sequence(msg_bytes):
return False
if self.consume_keyboard_escape_sequence_fragment(msg_bytes):
return True
return False
def record_consumed_fenrir_stdin_sequence(self, msg_bytes):
self.last_fenrir_stdin_command_time = time.monotonic()
self.fenrir_stdin_sequence_prefix = self.remaining_keyboard_sequence_prefix(
msg_bytes
)
def recent_fenrir_stdin_command(self):
return (
time.monotonic() - self.last_fenrir_stdin_command_time
<= PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
)
def recent_review_command_execution(self):
command_info = self.env.get("commandInfo", {})
last_command = command_info.get("lastCommand", "")
last_section = command_info.get("lastCommandSection", "")
if last_section != "commands" or not last_command.startswith("REVIEW_"):
return False
try:
last_time = float(command_info.get("lastCommandRunTime", 0))
except (TypeError, ValueError):
return False
return (
time.time() - last_time
<= PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
)
def is_keyboard_escape_sequence(self, msg_bytes):
if msg_bytes == b"\x1b":
return True
if not msg_bytes or len(msg_bytes) < 2:
return False
if msg_bytes.startswith(b"\x1bO") and 3 <= len(msg_bytes) <= 4:
return True
if not msg_bytes.startswith(b"\x1b["):
return False
try:
sequence = msg_bytes.decode("ascii", errors="ignore")
except Exception:
return False
if len(sequence) < 3:
return False
return sequence[-1] in "~ABCDHFZPQRS"
def consume_keyboard_escape_sequence_fragment(self, msg_bytes):
if self.is_keyboard_escape_sequence(msg_bytes):
self.record_consumed_fenrir_stdin_sequence(msg_bytes)
return True
if self.fenrir_stdin_sequence_prefix:
combined = self.fenrir_stdin_sequence_prefix + msg_bytes
prefix = self.remaining_keyboard_sequence_prefix(combined)
if self.is_keyboard_escape_sequence(combined) or prefix != b"":
self.last_fenrir_stdin_command_time = time.monotonic()
self.fenrir_stdin_sequence_prefix = prefix
return True
if msg_bytes.startswith((b"[", b"O")) and len(msg_bytes) > 1:
combined = b"\x1b" + msg_bytes
if self.is_keyboard_escape_sequence(combined):
self.record_consumed_fenrir_stdin_sequence(combined)
return True
return False
def remaining_keyboard_sequence_prefix(self, msg_bytes):
if not msg_bytes:
return b""
if msg_bytes == b"\x1b":
return msg_bytes
if msg_bytes.startswith(b"\x1bO") and len(msg_bytes) < 3:
return msg_bytes
if self.is_keyboard_escape_sequence(msg_bytes):
return b""
if msg_bytes.startswith(b"\x1b["):
try:
sequence = msg_bytes.decode("ascii", errors="ignore")
except Exception:
return b""
if len(sequence) < 3:
return msg_bytes
if sequence[-1] not in "~ABCDHFZPQRS":
return msg_bytes
return b""
def handle_vmenu_stdin_input(self, msg_bytes, event_queue):
if not self.is_vmenu_active():
return False
+42
View File
@@ -0,0 +1,42 @@
import importlib.util
from pathlib import Path
from unittest.mock import Mock
import pytest
def _load_command():
module_path = (
Path(__file__).resolve().parents[2]
/ "src"
/ "fenrirscreenreader"
/ "commands"
/ "onCursorChange"
/ "85000-has_attribute.py"
)
spec = importlib.util.spec_from_file_location(
"fenrir_has_attribute", module_path
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.command()
@pytest.mark.unit
def test_has_attribute_uses_configured_setting_name():
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
command = _load_command()
command.initialize(
{
"runtime": {
"SettingsManager": settings_manager,
},
}
)
command.run()
settings_manager.get_setting_as_bool.assert_called_once_with(
"general", "has_attributes"
)
+47
View File
@@ -67,6 +67,21 @@ def test_present_text_allows_sound_only_feedback():
)
@pytest.mark.unit
def test_present_text_sound_icon_with_interrupt_cancels_speech():
output_manager, sound_driver, speech_driver = build_output_manager()
output_manager.present_text(
"end of screen", sound_icon="Accept", interrupt=True
)
speech_driver.cancel.assert_called_once_with()
sound_driver.play_sound_file.assert_called_once_with(
"/tmp/Accept.wav", True
)
speech_driver.speak.assert_not_called()
@pytest.mark.unit
def test_play_sound_supports_error_alias():
output_manager, sound_driver, _speech_driver = build_output_manager()
@@ -227,3 +242,35 @@ def test_key_interrupt_command_uses_nonblocking_interrupt():
output_manager.interrupt_output_async.assert_called_once_with()
output_manager.interrupt_output.assert_not_called()
@pytest.mark.unit
def test_key_interrupt_command_ignores_fenrir_shortcuts():
module = load_key_interrupt_module()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
settings_manager.get_setting.return_value = ""
input_manager = Mock(
no_key_pressed=Mock(return_value=False),
get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]),
get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"),
)
output_manager = Mock()
env = {
"input": {
"curr_input": ["KEY_KP9"],
"prev_input": [],
},
"runtime": {
"InputManager": input_manager,
"OutputManager": output_manager,
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
"SettingsManager": settings_manager,
},
}
command = module.command()
command.initialize(env)
command.run()
output_manager.interrupt_output_async.assert_not_called()
+387
View File
@@ -39,6 +39,29 @@ def test_private_sgr_sequence_from_fullscreen_apps_does_not_crash():
assert screen["text"].splitlines()[0] == "X "
@pytest.mark.unit
def test_dcs_terminal_queries_do_not_render_as_text():
terminal = Terminal(20, 3, DummyProcessInput())
terminal.feed(b"\x1bP+q6b32\x1b\\X")
screen = terminal.get_screen_content()
assert screen["text"].splitlines()[0] == "X "
assert "+q6b32" not in screen["text"]
@pytest.mark.unit
def test_split_dcs_terminal_query_does_not_render_as_text():
terminal = Terminal(20, 3, DummyProcessInput())
terminal.feed(b"\x1bP+q6")
terminal.feed(b"b32\x1b\\X")
screen = terminal.get_screen_content()
assert screen["text"].splitlines()[0] == "X "
assert "+q6b32" not in screen["text"]
@pytest.mark.unit
def test_optional_float_setting_uses_default_when_missing():
settings_manager = type(
@@ -79,6 +102,35 @@ def test_pty_stdin_input_interrupts_output_when_all_keys_interrupt_enabled():
output_manager.interrupt_output.assert_called_once_with()
@pytest.mark.unit
@pytest.mark.parametrize(
"sequence",
[
b"\x1b[12;40R",
b"\x1b[?1;2c",
b"\x1b[>85;95;0c",
b"\x1b[4;80;24t",
b"\x1b]10;rgb:ffff/ffff/ffff\x1b\\",
],
)
def test_pty_terminal_response_stdin_does_not_interrupt_output(sequence):
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
settings_manager.get_setting.return_value = ""
output_manager = Mock()
pty_driver.env = {
"runtime": {
"SettingsManager": settings_manager,
"OutputManager": output_manager,
}
}
pty_driver.interrupt_output_on_stdin_input(sequence)
output_manager.interrupt_output.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_input_interrupt_does_not_block_input_injection():
pty_driver = PtyDriver()
@@ -160,6 +212,325 @@ def test_pty_plain_stdin_does_not_record_tab_keypress():
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
@pytest.mark.unit
def test_pty_stdin_consumes_fenrir_shortcut_input():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]),
get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_KP9"]},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_late_fenrir_shortcut_tail_after_release():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_split_fenrir_shortcut_tail_after_release():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(
side_effect=[
[1, ["KEY_KP7"]],
[1, []],
]
),
get_command_for_shortcut=Mock(
side_effect=[
"REVIEW_PREV_LINE",
"",
]
),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_KP7"]},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b", Mock())
pty_driver.env["input"]["curr_input"] = []
pty_driver.handle_stdin_input(b"[H", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_split_ss3_fenrir_shortcut_tail_after_release():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(
side_effect=[
[1, ["KEY_KP7"]],
[1, []],
[1, []],
]
),
get_command_for_shortcut=Mock(
side_effect=[
"REVIEW_PREV_LINE",
"",
"",
]
),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_KP7"]},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b", Mock())
pty_driver.env["input"]["curr_input"] = []
pty_driver.handle_stdin_input(b"O", Mock())
pty_driver.handle_stdin_input(b"w", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_does_not_consume_printable_input_after_fenrir_shortcut():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"a", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
@pytest.mark.unit
def test_pty_stdin_does_not_consume_stale_fenrir_shortcut_tail():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.last_fenrir_stdin_command_time = (
time.monotonic()
- PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
- 0.1
)
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
@pytest.mark.unit
def test_pty_stdin_consumes_late_tail_after_recent_review_command():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "REVIEW_NEXT_LINE",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
@pytest.mark.parametrize("sequence", [b"[H", b"[1~", b"Ow"])
def test_pty_stdin_consumes_split_tail_after_recent_review_command(sequence):
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "REVIEW_NEXT_LINE",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(sequence, Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_lone_escape_after_recent_review_command():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "REVIEW_CURR_LINE",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_does_not_consume_late_tail_after_non_review_command():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "CURSOR_POSITION",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
@pytest.mark.unit
@pytest.mark.parametrize(
("sequence", "key_name"),
@@ -281,6 +652,22 @@ def test_pty_stdin_input_leaves_filtered_interrupts_to_key_events():
output_manager.interrupt_output.assert_not_called()
@pytest.mark.unit
def test_pty_terminal_attributes_use_fenrir_attribute_shape():
terminal = Terminal(10, 2, DummyProcessInput())
terminal.feed(b"plain\r\n\x1b[7mfocus\x1b[0m")
screen_content = terminal.get_screen_content()
plain_attribute = screen_content["attributes"][0][0]
focused_attribute = screen_content["attributes"][1][0]
assert len(plain_attribute) == 10
assert len(focused_attribute) == 10
assert plain_attribute[1] == "default"
assert focused_attribute[1] == "reverse"
assert focused_attribute[6] is True
@pytest.mark.unit
def test_pty_backspace_with_fenrir_key_synthesizes_shortcut_events():
pty_driver = PtyDriver()
+24
View File
@@ -195,3 +195,27 @@ def test_tui_input_line_typing_is_filtered_from_mixed_repaint_delta():
assert "Username" not in env["screen"]["new_delta"]
assert "#channel" not in env["screen"]["new_delta"]
assert env["screen"]["new_delta_is_typing"] is False
@pytest.mark.unit
def test_pty_prompt_repaint_from_blank_line_keeps_full_prompt():
manager, env = _build_screen_manager(
" ".ljust(40),
{"x": 2, "y": 0},
)
manager.update(
{
"bytes": b"",
"lines": 1,
"columns": 40,
"textCursor": {"x": 23, "y": 0},
"screen": "pty",
"text": "[storm@fenrir fenrir] $ ".ljust(40),
"attributes": [],
},
"onScreenUpdate",
)
assert env["screen"]["new_delta"] == "[storm@fenrir fenrir] $"
assert env["screen"]["new_delta_is_typing"] is False
@@ -334,6 +334,7 @@ def test_large_insertion_echo_speaks_pasted_cursor_text():
is_delta=Mock(return_value=True),
)
env = {
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
"runtime": {
"InputManager": input_manager,
"OutputManager": output_manager,
@@ -357,6 +358,7 @@ def test_large_insertion_echo_speaks_pasted_cursor_text():
announce_capital=True,
flush=False,
)
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
@pytest.mark.unit
@@ -377,6 +379,7 @@ def test_large_insertion_echo_defers_recent_tab_to_tab_completion():
is_delta=Mock(return_value=True),
)
env = {
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
"runtime": {
"InputManager": input_manager,
"OutputManager": output_manager,
@@ -395,3 +398,4 @@ def test_large_insertion_echo_defers_recent_tab_to_tab_completion():
command.run()
output_manager.present_text.assert_not_called()
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
@@ -0,0 +1,117 @@
import importlib.util
from pathlib import Path
from unittest.mock import Mock
import pytest
def _load_command():
module_path = (
Path(__file__).resolve().parents[2]
/ "src"
/ "fenrirscreenreader"
/ "commands"
/ "onCursorChange"
/ "65000-present_line_if_cursor_change_vertical.py"
)
spec = importlib.util.spec_from_file_location(
"fenrir_present_line_if_cursor_change_vertical", module_path
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.command()
def _build_environment(screen_name):
settings_manager = Mock()
settings_manager.get_setting_as_bool.side_effect = (
lambda section, setting: section == "focus" and setting == "cursor"
)
output_manager = Mock()
return {
"commandsIgnore": {"onScreenUpdate": {"INCOMING_IGNORE": False}},
"screen": {
"newTTY": screen_name,
"new_cursor": {"x": 0, "y": 1},
"new_content_text": "first line\nsecond line",
},
"runtime": {
"BarrierManager": Mock(),
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=True)),
"OutputManager": output_manager,
"ScreenManager": Mock(
is_screen_change=Mock(return_value=False),
is_delta=Mock(return_value=True),
),
"SettingsManager": settings_manager,
},
}
@pytest.mark.unit
def test_pty_vertical_cursor_move_speaks_line_despite_repaint_delta():
command = _load_command()
env = _build_environment("pty")
command.initialize(env)
command.run()
env["runtime"]["OutputManager"].present_text.assert_called_once_with(
"second line", interrupt=True, flush=False
)
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
@pytest.mark.unit
def test_non_pty_vertical_cursor_move_still_suppresses_delta():
command = _load_command()
env = _build_environment("1")
command.initialize(env)
command.run()
env["runtime"]["OutputManager"].present_text.assert_not_called()
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False
@pytest.mark.unit
def test_pty_dialog_button_row_speaks_nearby_question():
command = _load_command()
env = _build_environment("pty")
env["screen"]["new_cursor"] = {"x": 30, "y": 4}
env["screen"]["new_content_text"] = "\n".join(
[
"".ljust(80),
"".ljust(80),
"Do you want to save changes?".center(80),
"".ljust(80),
"< Yes > < No >".center(80),
"".ljust(80),
]
)
command.initialize(env)
command.run()
env["runtime"]["OutputManager"].present_text.assert_called_once_with(
"Do you want to save changes?\n< Yes > < No >",
interrupt=True,
flush=False,
)
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is True
@pytest.mark.unit
def test_pty_blank_cursor_line_does_not_suppress_nonblank_delta():
command = _load_command()
env = _build_environment("pty")
env["screen"]["new_cursor"] = {"x": 0, "y": 1}
env["screen"]["new_content_text"] = "Birthday soon\n".ljust(80)
env["screen"]["new_delta"] = "Birthday soon"
command.initialize(env)
command.run()
env["runtime"]["OutputManager"].present_text.assert_not_called()
assert env["commandsIgnore"]["onScreenUpdate"]["INCOMING_IGNORE"] is False