Files
fenrir/tests/unit/test_pty_terminal_sequences.py
2026-06-04 14:21:49 -04:00

746 lines
22 KiB
Python

import threading
import time
from unittest.mock import Mock
import pytest
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.screenDriver.ptyDriver import PTYConstants
from fenrirscreenreader.screenDriver.ptyDriver import Terminal
from fenrirscreenreader.screenDriver.ptyDriver import driver as PtyDriver
class DummyProcessInput:
def __init__(self):
self.data = []
def write(self, data):
self.data.append(data)
@pytest.mark.unit
def test_csi_sequences_with_intermediate_characters_do_not_render_final_byte():
terminal = Terminal(10, 3, DummyProcessInput())
terminal.feed(b"\x1b[2026$p\x1b[2048$pX")
screen = terminal.get_screen_content()
assert screen["text"].splitlines()[0] == "X "
assert "p" not in screen["text"]
@pytest.mark.unit
def test_private_sgr_sequence_from_fullscreen_apps_does_not_crash():
terminal = Terminal(10, 3, DummyProcessInput())
terminal.feed(b"\x1b[?25;7mX")
screen = terminal.get_screen_content()
assert screen["text"].splitlines()[0] == "X "
@pytest.mark.unit
def test_dcs_terminal_queries_do_not_render_as_text():
terminal = Terminal(20, 3, DummyProcessInput())
terminal.feed(b"\x1bP+q6b32\x1b\\X")
screen = terminal.get_screen_content()
assert screen["text"].splitlines()[0] == "X "
assert "+q6b32" not in screen["text"]
@pytest.mark.unit
def test_split_dcs_terminal_query_does_not_render_as_text():
terminal = Terminal(20, 3, DummyProcessInput())
terminal.feed(b"\x1bP+q6")
terminal.feed(b"b32\x1b\\X")
screen = terminal.get_screen_content()
assert screen["text"].splitlines()[0] == "X "
assert "+q6b32" not in screen["text"]
@pytest.mark.unit
def test_optional_float_setting_uses_default_when_missing():
settings_manager = type(
"SettingsManager",
(),
{"get_setting": lambda self, section, setting: ""},
)()
pty_driver = PtyDriver()
assert (
pty_driver._get_optional_float_setting(
settings_manager,
"screen",
"ptyOutputTimeout",
PTYConstants.OUTPUT_READ_TIMEOUT,
)
== PTYConstants.OUTPUT_READ_TIMEOUT
)
@pytest.mark.unit
def test_pty_stdin_input_interrupts_output_when_all_keys_interrupt_enabled():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
settings_manager.get_setting.return_value = ""
output_manager = Mock()
pty_driver.env = {
"runtime": {
"SettingsManager": settings_manager,
"OutputManager": output_manager,
}
}
pty_driver.interrupt_output_on_stdin_input(b"a")
pty_driver.stdin_interrupt_thread.join(timeout=1.0)
output_manager.interrupt_output.assert_called_once_with()
@pytest.mark.unit
@pytest.mark.parametrize(
"sequence",
[
b"\x1b[12;40R",
b"\x1b[?1;2c",
b"\x1b[>85;95;0c",
b"\x1b[4;80;24t",
b"\x1b]10;rgb:ffff/ffff/ffff\x1b\\",
],
)
def test_pty_terminal_response_stdin_does_not_interrupt_output(sequence):
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
settings_manager.get_setting.return_value = ""
output_manager = Mock()
pty_driver.env = {
"runtime": {
"SettingsManager": settings_manager,
"OutputManager": output_manager,
}
}
pty_driver.interrupt_output_on_stdin_input(sequence)
output_manager.interrupt_output.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_input_interrupt_does_not_block_input_injection():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
settings_manager.get_setting.return_value = ""
interrupt_started = threading.Event()
release_interrupt = threading.Event()
def slow_interrupt():
interrupt_started.set()
release_interrupt.wait(timeout=1.0)
output_manager = Mock(interrupt_output=Mock(side_effect=slow_interrupt))
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"SettingsManager": settings_manager,
"OutputManager": output_manager,
"DebugManager": Mock(write_debug_out=Mock()),
},
}
pty_driver.inject_text_to_screen = Mock()
start_time = time.monotonic()
pty_driver.handle_stdin_input(b"a", Mock())
elapsed = time.monotonic() - start_time
try:
assert interrupt_started.wait(timeout=0.2)
assert elapsed < 0.2
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
finally:
release_interrupt.set()
pty_driver.stdin_interrupt_thread.join(timeout=1.0)
@pytest.mark.unit
def test_pty_raw_tab_records_recent_tab_keypress():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\t", Mock())
input_manager.record_unmanaged_keypress.assert_called_once_with("KEY_TAB")
pty_driver.inject_text_to_screen.assert_called_once_with(b"\t")
@pytest.mark.unit
def test_pty_plain_stdin_does_not_record_tab_keypress():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"a", Mock())
input_manager.record_unmanaged_keypress.assert_not_called()
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
@pytest.mark.unit
def test_pty_stdin_consumes_fenrir_shortcut_input():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]),
get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_KP9"]},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_late_fenrir_shortcut_tail_after_release():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_split_fenrir_shortcut_tail_after_release():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(
side_effect=[
[1, ["KEY_KP7"]],
[1, []],
]
),
get_command_for_shortcut=Mock(
side_effect=[
"REVIEW_PREV_LINE",
"",
]
),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_KP7"]},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b", Mock())
pty_driver.env["input"]["curr_input"] = []
pty_driver.handle_stdin_input(b"[H", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_split_ss3_fenrir_shortcut_tail_after_release():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(
side_effect=[
[1, ["KEY_KP7"]],
[1, []],
[1, []],
]
),
get_command_for_shortcut=Mock(
side_effect=[
"REVIEW_PREV_LINE",
"",
"",
]
),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_KP7"]},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b", Mock())
pty_driver.env["input"]["curr_input"] = []
pty_driver.handle_stdin_input(b"O", Mock())
pty_driver.handle_stdin_input(b"w", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_does_not_consume_printable_input_after_fenrir_shortcut():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.last_fenrir_stdin_command_time = time.monotonic()
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"a", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
@pytest.mark.unit
def test_pty_stdin_does_not_consume_stale_fenrir_shortcut_tail():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.last_fenrir_stdin_command_time = (
time.monotonic()
- PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT
- 0.1
)
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
@pytest.mark.unit
def test_pty_stdin_consumes_late_tail_after_recent_review_command():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "REVIEW_NEXT_LINE",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
@pytest.mark.parametrize("sequence", [b"[H", b"[1~", b"Ow"])
def test_pty_stdin_consumes_split_tail_after_recent_review_command(sequence):
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "REVIEW_NEXT_LINE",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(sequence, Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_consumes_lone_escape_after_recent_review_command():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
output_manager = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "REVIEW_CURR_LINE",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b", Mock())
output_manager.interrupt_output.assert_not_called()
pty_driver.inject_text_to_screen.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_does_not_consume_late_tail_after_non_review_command():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
input_manager = Mock(
get_curr_shortcut=Mock(return_value=[1, []]),
get_command_for_shortcut=Mock(return_value=""),
)
pty_driver.env = {
"input": {"curr_input": []},
"commandInfo": {
"lastCommand": "CURSOR_POSITION",
"lastCommandSection": "commands",
"lastCommandRunTime": time.time(),
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"InputManager": input_manager,
"SettingsManager": settings_manager,
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[6~", Mock())
pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~")
@pytest.mark.unit
@pytest.mark.parametrize(
("sequence", "key_name"),
[
(b"\x1b[A", "KEY_UP"),
(b"\x1b[B", "KEY_DOWN"),
(b"\x1b[C", "KEY_RIGHT"),
(b"\x1b[D", "KEY_LEFT"),
(b"\x1b[5~", "KEY_PAGEUP"),
(b"\x1b[6~", "KEY_PAGEDOWN"),
(b"\x1b", "KEY_ESC"),
(b"\r", "KEY_ENTER"),
(b" ", "KEY_SPACE"),
(b"a", "KEY_A"),
(b"Z", "KEY_Z"),
],
)
def test_pty_vmenu_stdin_is_consumed_and_synthesizes_key_events(
sequence,
key_name,
):
pty_driver = PtyDriver()
event_queue = Mock()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"SettingsManager": settings_manager,
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(sequence, event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
assert event_queue.put.call_count == 2
first_event = event_queue.put.call_args_list[0].args[0]
second_event = event_queue.put.call_args_list[1].args[0]
assert first_event["Type"] == FenrirEventType.keyboard_input
assert first_event["data"]["event_name"] == key_name
assert first_event["data"]["event_state"] == 1
assert second_event["data"]["event_name"] == key_name
assert second_event["data"]["event_state"] == 0
@pytest.mark.unit
def test_pty_vmenu_unknown_stdin_is_consumed_without_injection():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[1;5A", event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
event_queue.put.assert_not_called()
@pytest.mark.unit
def test_pty_vmenu_stdin_does_not_duplicate_current_x11_key():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_RIGHT"]},
"runtime": {
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[C", event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
event_queue.put.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_input_honors_interrupt_disabled():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
output_manager = Mock()
pty_driver.env = {
"runtime": {
"SettingsManager": settings_manager,
"OutputManager": output_manager,
}
}
pty_driver.interrupt_output_on_stdin_input(b"a")
output_manager.interrupt_output.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_input_leaves_filtered_interrupts_to_key_events():
pty_driver = PtyDriver()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
settings_manager.get_setting.return_value = "KEY_ENTER"
output_manager = Mock()
pty_driver.env = {
"runtime": {
"SettingsManager": settings_manager,
"OutputManager": output_manager,
}
}
pty_driver.interrupt_output_on_stdin_input(b"a")
output_manager.interrupt_output.assert_not_called()
@pytest.mark.unit
def test_pty_terminal_attributes_use_fenrir_attribute_shape():
terminal = Terminal(10, 2, DummyProcessInput())
terminal.feed(b"plain\r\n\x1b[7mfocus\x1b[0m")
screen_content = terminal.get_screen_content()
plain_attribute = screen_content["attributes"][0][0]
focused_attribute = screen_content["attributes"][1][0]
assert len(plain_attribute) == 10
assert len(focused_attribute) == 10
assert plain_attribute[1] == "default"
assert focused_attribute[1] == "reverse"
assert focused_attribute[6] is True
@pytest.mark.unit
def test_pty_backspace_with_fenrir_key_synthesizes_shortcut_events():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_FENRIR"]},
}
handled = pty_driver.synthesize_backspace_shortcut(b"\x7f", event_queue)
assert handled is True
assert event_queue.put.call_count == 2
first_event = event_queue.put.call_args_list[0].args[0]
second_event = event_queue.put.call_args_list[1].args[0]
assert first_event["Type"] == FenrirEventType.keyboard_input
assert first_event["data"]["event_name"] == "KEY_BACKSPACE"
assert first_event["data"]["event_state"] == 1
assert second_event["data"]["event_state"] == 0
@pytest.mark.unit
@pytest.mark.parametrize(
"sequence",
[
b"\x08",
b"\x1b[3~",
b"\x1b[3;5~",
b"\x1b[27;5;8~",
b"\x1b[27;5;127~",
b"\x1b[8;5u",
b"\x1b[127;5u",
],
)
def test_pty_xterm_backspace_variants_with_fenrir_key_synthesize_shortcut_events(
sequence,
):
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_FENRIR"]},
}
handled = pty_driver.synthesize_backspace_shortcut(sequence, event_queue)
assert handled is True
first_event = event_queue.put.call_args_list[0].args[0]
assert first_event["data"]["event_name"] == "KEY_BACKSPACE"
@pytest.mark.unit
def test_pty_plain_backspace_is_not_synthesized():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": []},
}
handled = pty_driver.synthesize_backspace_shortcut(b"\x7f", event_queue)
assert handled is False
event_queue.put.assert_not_called()
@pytest.mark.unit
def test_pty_plain_delete_sequence_is_not_synthesized():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": []},
}
handled = pty_driver.synthesize_backspace_shortcut(b"\x1b[3~", event_queue)
assert handled is False
event_queue.put.assert_not_called()