import threading import time from unittest.mock import Mock import pytest from fenrirscreenreader.core.eventData import FenrirEventType from fenrirscreenreader.screenDriver.ptyDriver import PTYConstants from fenrirscreenreader.screenDriver.ptyDriver import Terminal from fenrirscreenreader.screenDriver.ptyDriver import driver as PtyDriver class DummyProcessInput: def __init__(self): self.data = [] def write(self, data): self.data.append(data) @pytest.mark.unit def test_csi_sequences_with_intermediate_characters_do_not_render_final_byte(): terminal = Terminal(10, 3, DummyProcessInput()) terminal.feed(b"\x1b[2026$p\x1b[2048$pX") screen = terminal.get_screen_content() assert screen["text"].splitlines()[0] == "X " assert "p" not in screen["text"] @pytest.mark.unit def test_private_sgr_sequence_from_fullscreen_apps_does_not_crash(): terminal = Terminal(10, 3, DummyProcessInput()) terminal.feed(b"\x1b[?25;7mX") screen = terminal.get_screen_content() assert screen["text"].splitlines()[0] == "X " @pytest.mark.unit def test_dcs_terminal_queries_do_not_render_as_text(): terminal = Terminal(20, 3, DummyProcessInput()) terminal.feed(b"\x1bP+q6b32\x1b\\X") screen = terminal.get_screen_content() assert screen["text"].splitlines()[0] == "X " assert "+q6b32" not in screen["text"] @pytest.mark.unit def test_split_dcs_terminal_query_does_not_render_as_text(): terminal = Terminal(20, 3, DummyProcessInput()) terminal.feed(b"\x1bP+q6") terminal.feed(b"b32\x1b\\X") screen = terminal.get_screen_content() assert screen["text"].splitlines()[0] == "X " assert "+q6b32" not in screen["text"] @pytest.mark.unit def test_optional_float_setting_uses_default_when_missing(): settings_manager = type( "SettingsManager", (), {"get_setting": lambda self, section, setting: ""}, )() pty_driver = PtyDriver() assert ( pty_driver._get_optional_float_setting( settings_manager, "screen", "ptyOutputTimeout", PTYConstants.OUTPUT_READ_TIMEOUT, ) == PTYConstants.OUTPUT_READ_TIMEOUT ) @pytest.mark.unit def test_pty_stdin_input_interrupts_output_when_all_keys_interrupt_enabled(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True settings_manager.get_setting.return_value = "" output_manager = Mock() pty_driver.env = { "runtime": { "SettingsManager": settings_manager, "OutputManager": output_manager, } } pty_driver.interrupt_output_on_stdin_input(b"a") pty_driver.stdin_interrupt_thread.join(timeout=1.0) output_manager.interrupt_output.assert_called_once_with() @pytest.mark.unit @pytest.mark.parametrize( "sequence", [ b"\x1b[12;40R", b"\x1b[?1;2c", b"\x1b[>85;95;0c", b"\x1b[4;80;24t", b"\x1b]10;rgb:ffff/ffff/ffff\x1b\\", ], ) def test_pty_terminal_response_stdin_does_not_interrupt_output(sequence): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True settings_manager.get_setting.return_value = "" output_manager = Mock() pty_driver.env = { "runtime": { "SettingsManager": settings_manager, "OutputManager": output_manager, } } pty_driver.interrupt_output_on_stdin_input(sequence) output_manager.interrupt_output.assert_not_called() @pytest.mark.unit def test_pty_stdin_input_interrupt_does_not_block_input_injection(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True settings_manager.get_setting.return_value = "" interrupt_started = threading.Event() release_interrupt = threading.Event() def slow_interrupt(): interrupt_started.set() release_interrupt.wait(timeout=1.0) output_manager = Mock(interrupt_output=Mock(side_effect=slow_interrupt)) pty_driver.env = { "input": {"curr_input": []}, "runtime": { "SettingsManager": settings_manager, "OutputManager": output_manager, "DebugManager": Mock(write_debug_out=Mock()), }, } pty_driver.inject_text_to_screen = Mock() start_time = time.monotonic() pty_driver.handle_stdin_input(b"a", Mock()) elapsed = time.monotonic() - start_time try: assert interrupt_started.wait(timeout=0.2) assert elapsed < 0.2 pty_driver.inject_text_to_screen.assert_called_once_with(b"a") finally: release_interrupt.set() pty_driver.stdin_interrupt_thread.join(timeout=1.0) @pytest.mark.unit def test_pty_raw_tab_records_recent_tab_keypress(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = False input_manager = Mock() pty_driver.env = { "input": {"curr_input": []}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\t", Mock()) input_manager.record_unmanaged_keypress.assert_called_once_with("KEY_TAB") pty_driver.inject_text_to_screen.assert_called_once_with(b"\t") @pytest.mark.unit def test_pty_plain_stdin_does_not_record_tab_keypress(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = False input_manager = Mock() pty_driver.env = { "input": {"curr_input": []}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"a", Mock()) input_manager.record_unmanaged_keypress.assert_not_called() pty_driver.inject_text_to_screen.assert_called_once_with(b"a") @pytest.mark.unit def test_pty_stdin_consumes_fenrir_shortcut_input(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, ["KEY_KP9"]]), get_command_for_shortcut=Mock(return_value="REVIEW_NEXT_LINE"), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": ["KEY_KP9"]}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b[6~", Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_not_called() @pytest.mark.unit def test_pty_stdin_consumes_late_fenrir_shortcut_tail_after_release(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, []]), get_command_for_shortcut=Mock(return_value=""), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": []}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.last_fenrir_stdin_command_time = time.monotonic() pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b[6~", Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_not_called() @pytest.mark.unit def test_pty_stdin_consumes_split_fenrir_shortcut_tail_after_release(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True input_manager = Mock( get_curr_shortcut=Mock( side_effect=[ [1, ["KEY_KP7"]], [1, []], ] ), get_command_for_shortcut=Mock( side_effect=[ "REVIEW_PREV_LINE", "", ] ), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": ["KEY_KP7"]}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b", Mock()) pty_driver.env["input"]["curr_input"] = [] pty_driver.handle_stdin_input(b"[H", Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_not_called() @pytest.mark.unit def test_pty_stdin_consumes_split_ss3_fenrir_shortcut_tail_after_release(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True input_manager = Mock( get_curr_shortcut=Mock( side_effect=[ [1, ["KEY_KP7"]], [1, []], [1, []], ] ), get_command_for_shortcut=Mock( side_effect=[ "REVIEW_PREV_LINE", "", "", ] ), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": ["KEY_KP7"]}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b", Mock()) pty_driver.env["input"]["curr_input"] = [] pty_driver.handle_stdin_input(b"O", Mock()) pty_driver.handle_stdin_input(b"w", Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_not_called() @pytest.mark.unit def test_pty_stdin_does_not_consume_printable_input_after_fenrir_shortcut(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = False input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, []]), get_command_for_shortcut=Mock(return_value=""), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": []}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.last_fenrir_stdin_command_time = time.monotonic() pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"a", Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_called_once_with(b"a") @pytest.mark.unit def test_pty_stdin_does_not_consume_stale_fenrir_shortcut_tail(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = False input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, []]), get_command_for_shortcut=Mock(return_value=""), ) pty_driver.env = { "input": {"curr_input": []}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "SettingsManager": settings_manager, }, } pty_driver.last_fenrir_stdin_command_time = ( time.monotonic() - PTYConstants.FENRIR_SHORTCUT_STDIN_TAIL_TIMEOUT - 0.1 ) pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b[6~", Mock()) pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~") @pytest.mark.unit def test_pty_stdin_consumes_late_tail_after_recent_review_command(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, []]), get_command_for_shortcut=Mock(return_value=""), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": []}, "commandInfo": { "lastCommand": "REVIEW_NEXT_LINE", "lastCommandSection": "commands", "lastCommandRunTime": time.time(), }, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b[6~", Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_not_called() @pytest.mark.unit @pytest.mark.parametrize("sequence", [b"[H", b"[1~", b"Ow"]) def test_pty_stdin_consumes_split_tail_after_recent_review_command(sequence): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, []]), get_command_for_shortcut=Mock(return_value=""), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": []}, "commandInfo": { "lastCommand": "REVIEW_NEXT_LINE", "lastCommandSection": "commands", "lastCommandRunTime": time.time(), }, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(sequence, Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_not_called() @pytest.mark.unit def test_pty_stdin_consumes_lone_escape_after_recent_review_command(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, []]), get_command_for_shortcut=Mock(return_value=""), ) output_manager = Mock() pty_driver.env = { "input": {"curr_input": []}, "commandInfo": { "lastCommand": "REVIEW_CURR_LINE", "lastCommandSection": "commands", "lastCommandRunTime": time.time(), }, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "OutputManager": output_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b", Mock()) output_manager.interrupt_output.assert_not_called() pty_driver.inject_text_to_screen.assert_not_called() @pytest.mark.unit def test_pty_stdin_does_not_consume_late_tail_after_non_review_command(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = False input_manager = Mock( get_curr_shortcut=Mock(return_value=[1, []]), get_command_for_shortcut=Mock(return_value=""), ) pty_driver.env = { "input": {"curr_input": []}, "commandInfo": { "lastCommand": "CURSOR_POSITION", "lastCommandSection": "commands", "lastCommandRunTime": time.time(), }, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "InputManager": input_manager, "SettingsManager": settings_manager, }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b[6~", Mock()) pty_driver.inject_text_to_screen.assert_called_once_with(b"\x1b[6~") @pytest.mark.unit @pytest.mark.parametrize( ("sequence", "key_name"), [ (b"\x1b[A", "KEY_UP"), (b"\x1b[B", "KEY_DOWN"), (b"\x1b[C", "KEY_RIGHT"), (b"\x1b[D", "KEY_LEFT"), (b"\x1b[5~", "KEY_PAGEUP"), (b"\x1b[6~", "KEY_PAGEDOWN"), (b"\x1b", "KEY_ESC"), (b"\r", "KEY_ENTER"), (b" ", "KEY_SPACE"), (b"a", "KEY_A"), (b"Z", "KEY_Z"), ], ) def test_pty_vmenu_stdin_is_consumed_and_synthesizes_key_events( sequence, key_name, ): pty_driver = PtyDriver() event_queue = Mock() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = False pty_driver.env = { "input": {"curr_input": []}, "runtime": { "DebugManager": Mock(write_debug_out=Mock()), "SettingsManager": settings_manager, "VmenuManager": Mock(get_active=Mock(return_value=True)), }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(sequence, event_queue) pty_driver.inject_text_to_screen.assert_not_called() assert event_queue.put.call_count == 2 first_event = event_queue.put.call_args_list[0].args[0] second_event = event_queue.put.call_args_list[1].args[0] assert first_event["Type"] == FenrirEventType.keyboard_input assert first_event["data"]["event_name"] == key_name assert first_event["data"]["event_state"] == 1 assert second_event["data"]["event_name"] == key_name assert second_event["data"]["event_state"] == 0 @pytest.mark.unit def test_pty_vmenu_unknown_stdin_is_consumed_without_injection(): pty_driver = PtyDriver() event_queue = Mock() pty_driver.env = { "input": {"curr_input": []}, "runtime": { "VmenuManager": Mock(get_active=Mock(return_value=True)), }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b[1;5A", event_queue) pty_driver.inject_text_to_screen.assert_not_called() event_queue.put.assert_not_called() @pytest.mark.unit def test_pty_vmenu_stdin_does_not_duplicate_current_x11_key(): pty_driver = PtyDriver() event_queue = Mock() pty_driver.env = { "input": {"curr_input": ["KEY_RIGHT"]}, "runtime": { "VmenuManager": Mock(get_active=Mock(return_value=True)), }, } pty_driver.inject_text_to_screen = Mock() pty_driver.handle_stdin_input(b"\x1b[C", event_queue) pty_driver.inject_text_to_screen.assert_not_called() event_queue.put.assert_not_called() @pytest.mark.unit def test_pty_stdin_input_honors_interrupt_disabled(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = False output_manager = Mock() pty_driver.env = { "runtime": { "SettingsManager": settings_manager, "OutputManager": output_manager, } } pty_driver.interrupt_output_on_stdin_input(b"a") output_manager.interrupt_output.assert_not_called() @pytest.mark.unit def test_pty_stdin_input_leaves_filtered_interrupts_to_key_events(): pty_driver = PtyDriver() settings_manager = Mock() settings_manager.get_setting_as_bool.return_value = True settings_manager.get_setting.return_value = "KEY_ENTER" output_manager = Mock() pty_driver.env = { "runtime": { "SettingsManager": settings_manager, "OutputManager": output_manager, } } pty_driver.interrupt_output_on_stdin_input(b"a") output_manager.interrupt_output.assert_not_called() @pytest.mark.unit def test_pty_terminal_attributes_use_fenrir_attribute_shape(): terminal = Terminal(10, 2, DummyProcessInput()) terminal.feed(b"plain\r\n\x1b[7mfocus\x1b[0m") screen_content = terminal.get_screen_content() plain_attribute = screen_content["attributes"][0][0] focused_attribute = screen_content["attributes"][1][0] assert len(plain_attribute) == 10 assert len(focused_attribute) == 10 assert plain_attribute[1] == "default" assert focused_attribute[1] == "reverse" assert focused_attribute[6] is True @pytest.mark.unit def test_pty_backspace_with_fenrir_key_synthesizes_shortcut_events(): pty_driver = PtyDriver() event_queue = Mock() pty_driver.env = { "input": {"curr_input": ["KEY_FENRIR"]}, } handled = pty_driver.synthesize_backspace_shortcut(b"\x7f", event_queue) assert handled is True assert event_queue.put.call_count == 2 first_event = event_queue.put.call_args_list[0].args[0] second_event = event_queue.put.call_args_list[1].args[0] assert first_event["Type"] == FenrirEventType.keyboard_input assert first_event["data"]["event_name"] == "KEY_BACKSPACE" assert first_event["data"]["event_state"] == 1 assert second_event["data"]["event_state"] == 0 @pytest.mark.unit @pytest.mark.parametrize( "sequence", [ b"\x08", b"\x1b[3~", b"\x1b[3;5~", b"\x1b[27;5;8~", b"\x1b[27;5;127~", b"\x1b[8;5u", b"\x1b[127;5u", ], ) def test_pty_xterm_backspace_variants_with_fenrir_key_synthesize_shortcut_events( sequence, ): pty_driver = PtyDriver() event_queue = Mock() pty_driver.env = { "input": {"curr_input": ["KEY_FENRIR"]}, } handled = pty_driver.synthesize_backspace_shortcut(sequence, event_queue) assert handled is True first_event = event_queue.put.call_args_list[0].args[0] assert first_event["data"]["event_name"] == "KEY_BACKSPACE" @pytest.mark.unit def test_pty_plain_backspace_is_not_synthesized(): pty_driver = PtyDriver() event_queue = Mock() pty_driver.env = { "input": {"curr_input": []}, } handled = pty_driver.synthesize_backspace_shortcut(b"\x7f", event_queue) assert handled is False event_queue.put.assert_not_called() @pytest.mark.unit def test_pty_plain_delete_sequence_is_not_synthesized(): pty_driver = PtyDriver() event_queue = Mock() pty_driver.env = { "input": {"curr_input": []}, } handled = pty_driver.synthesize_backspace_shortcut(b"\x1b[3~", event_queue) assert handled is False event_queue.put.assert_not_called()