diff --git a/src/fenrirscreenreader/commands/onKeyInput/10000-shut_up.py b/src/fenrirscreenreader/commands/onKeyInput/10000-shut_up.py index 3b763c50..7c05fc61 100644 --- a/src/fenrirscreenreader/commands/onKeyInput/10000-shut_up.py +++ b/src/fenrirscreenreader/commands/onKeyInput/10000-shut_up.py @@ -48,7 +48,7 @@ class command: for curr_key in self.env["input"]["curr_input"]: if curr_key not in filter_list: return - self.env["runtime"]["OutputManager"].interrupt_output() + self.env["runtime"]["OutputManager"].interrupt_output_async() def set_callback(self, callback): pass diff --git a/src/fenrirscreenreader/core/inputManager.py b/src/fenrirscreenreader/core/inputManager.py index 48cf35bf..1324668e 100644 --- a/src/fenrirscreenreader/core/inputManager.py +++ b/src/fenrirscreenreader/core/inputManager.py @@ -163,6 +163,14 @@ class InputManager: def get_last_event(self): return self.lastEvent + def record_unmanaged_keypress(self, event_name): + self.lastEvent = { + "event_name": event_name, + "event_state": 1, + } + self.set_last_deepest_input([event_name]) + self.lastInputTime = time.time() + def handle_input_event(self, event_data): if not event_data: return diff --git a/src/fenrirscreenreader/core/outputManager.py b/src/fenrirscreenreader/core/outputManager.py index 3df2bc9a..1b4a66c8 100644 --- a/src/fenrirscreenreader/core/outputManager.py +++ b/src/fenrirscreenreader/core/outputManager.py @@ -6,6 +6,7 @@ import re import string +import threading import time from fenrirscreenreader.core import debug @@ -16,6 +17,11 @@ from fenrirscreenreader.utils import line_utils class OutputManager: def __init__(self): self.last_echo = "" + self.interrupt_lock = threading.Lock() + self.interrupt_running = False + self.interrupt_thread = None + self.interrupt_done = None + self.interrupt_wait_timeout = 0.1 def initialize(self, environment): self.env = environment @@ -280,7 +286,40 @@ class OutputManager: str(e), debug.DebugLevel.ERROR ) - def interrupt_output(self): + def interrupt_output(self, wait=True): + interrupt_done, started = self.start_interrupt_output() + if wait and started and interrupt_done: + interrupt_done.wait(timeout=self.interrupt_wait_timeout) + + def interrupt_output_async(self): + self.start_interrupt_output() + + def start_interrupt_output(self): + with self.interrupt_lock: + if self.interrupt_running: + return self.interrupt_done, False + self.interrupt_running = True + self.interrupt_done = threading.Event() + self.interrupt_thread = threading.Thread( + target=self.run_interrupt_output, + args=(self.interrupt_done,), + daemon=True, + ) + interrupt_thread = self.interrupt_thread + interrupt_done = self.interrupt_done + interrupt_thread.start() + return interrupt_done, True + + def run_interrupt_output(self, interrupt_done): + try: + self.cancel_speech() + finally: + interrupt_done.set() + with self.interrupt_lock: + if self.interrupt_done is interrupt_done: + self.interrupt_running = False + + def cancel_speech(self): try: self.env["runtime"]["SpeechDriver"].cancel() self.env["runtime"]["DebugManager"].write_debug_out( diff --git a/src/fenrirscreenreader/screenDriver/ptyDriver.py b/src/fenrirscreenreader/screenDriver/ptyDriver.py index 54c914d1..af1fe2e1 100644 --- a/src/fenrirscreenreader/screenDriver/ptyDriver.py +++ b/src/fenrirscreenreader/screenDriver/ptyDriver.py @@ -333,9 +333,23 @@ class driver(screenDriver): def handle_stdin_input(self, msg_bytes, event_queue): if self.synthesize_backspace_shortcut(msg_bytes, event_queue): return + self.record_stdin_keypress(msg_bytes) self.interrupt_output_on_stdin_input(msg_bytes) self.inject_text_to_screen(msg_bytes) + def record_stdin_keypress(self, msg_bytes): + if msg_bytes != b"\t": + return + try: + self.env["runtime"]["InputManager"].record_unmanaged_keypress( + "KEY_TAB" + ) + except Exception as e: + self.env["runtime"]["DebugManager"].write_debug_out( + "ptyDriver record_stdin_keypress: " + str(e), + debug.DebugLevel.ERROR, + ) + def synthesize_backspace_shortcut(self, msg_bytes, event_queue): if msg_bytes not in [b"\x7f", b"\x08"]: return False diff --git a/tests/unit/test_output_manager.py b/tests/unit/test_output_manager.py index a540bd08..6d6b21d5 100644 --- a/tests/unit/test_output_manager.py +++ b/tests/unit/test_output_manager.py @@ -1,3 +1,7 @@ +import importlib.util +import threading +import time +from pathlib import Path from unittest.mock import Mock import pytest @@ -10,6 +14,7 @@ def build_output_manager(): settings_manager.get_setting_as_bool.return_value = True settings_manager.get_setting_as_float.return_value = 1.0 sound_driver = Mock() + speech_driver = Mock() output_manager = OutputManager() output_manager.env = { "soundIcons": { @@ -19,16 +24,33 @@ def build_output_manager(): "runtime": { "SettingsManager": settings_manager, "SoundDriver": sound_driver, - "SpeechDriver": Mock(), + "SpeechDriver": speech_driver, "DebugManager": Mock(write_debug_out=Mock()), }, } - return output_manager, sound_driver + return output_manager, sound_driver, speech_driver + + +def load_key_interrupt_module(): + module_path = ( + Path(__file__).resolve().parents[2] + / "src" + / "fenrirscreenreader" + / "commands" + / "onKeyInput" + / "10000-shut_up.py" + ) + spec = importlib.util.spec_from_file_location( + "fenrir_key_interrupt", module_path + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module @pytest.mark.unit def test_present_text_allows_sound_only_feedback(): - output_manager, sound_driver = build_output_manager() + output_manager, sound_driver, _speech_driver = build_output_manager() output_manager.present_text("", sound_icon="Accept", interrupt=False) @@ -39,10 +61,90 @@ def test_present_text_allows_sound_only_feedback(): @pytest.mark.unit def test_play_sound_supports_error_alias(): - output_manager, sound_driver = build_output_manager() + output_manager, sound_driver, _speech_driver = build_output_manager() assert output_manager.play_sound("Error") is True sound_driver.play_sound_file.assert_called_once_with( "/tmp/ErrorScreen.wav", True ) + + +@pytest.mark.unit +def test_interrupt_output_async_does_not_block_on_slow_cancel(): + output_manager, _sound_driver, speech_driver = build_output_manager() + interrupt_started = threading.Event() + release_interrupt = threading.Event() + + def slow_cancel(): + interrupt_started.set() + release_interrupt.wait(timeout=1.0) + + speech_driver.cancel.side_effect = slow_cancel + + start_time = time.monotonic() + output_manager.interrupt_output_async() + elapsed = time.monotonic() - start_time + + try: + assert interrupt_started.wait(timeout=0.2) + assert elapsed < 0.2 + output_manager.interrupt_output_async() + assert speech_driver.cancel.call_count == 1 + finally: + release_interrupt.set() + output_manager.interrupt_thread.join(timeout=1.0) + + +@pytest.mark.unit +def test_interrupt_output_waits_only_briefly_for_slow_cancel(): + output_manager, _sound_driver, speech_driver = build_output_manager() + interrupt_started = threading.Event() + release_interrupt = threading.Event() + + def slow_cancel(): + interrupt_started.set() + release_interrupt.wait(timeout=1.0) + + speech_driver.cancel.side_effect = slow_cancel + + start_time = time.monotonic() + output_manager.interrupt_output() + elapsed = time.monotonic() - start_time + + try: + assert interrupt_started.wait(timeout=0.2) + assert elapsed < 0.2 + output_manager.interrupt_output() + assert speech_driver.cancel.call_count == 1 + finally: + release_interrupt.set() + output_manager.interrupt_thread.join(timeout=1.0) + + +@pytest.mark.unit +def test_key_interrupt_command_uses_nonblocking_interrupt(): + module = load_key_interrupt_module() + settings_manager = Mock() + settings_manager.get_setting_as_bool.return_value = True + settings_manager.get_setting.return_value = "" + output_manager = Mock() + env = { + "input": { + "curr_input": ["KEY_A"], + "prev_input": [], + }, + "runtime": { + "InputManager": Mock(no_key_pressed=Mock(return_value=False)), + "OutputManager": output_manager, + "ScreenManager": Mock(is_screen_change=Mock(return_value=False)), + "SettingsManager": settings_manager, + }, + } + command = module.command() + command.initialize(env) + + command.run() + + output_manager.interrupt_output_async.assert_called_once_with() + output_manager.interrupt_output.assert_not_called() diff --git a/tests/unit/test_pty_terminal_sequences.py b/tests/unit/test_pty_terminal_sequences.py index 6e1cca3c..5ddcd2b1 100644 --- a/tests/unit/test_pty_terminal_sequences.py +++ b/tests/unit/test_pty_terminal_sequences.py @@ -116,6 +116,50 @@ def test_pty_stdin_input_interrupt_does_not_block_input_injection(): pty_driver.stdin_interrupt_thread.join(timeout=1.0) +@pytest.mark.unit +def test_pty_raw_tab_records_recent_tab_keypress(): + pty_driver = PtyDriver() + settings_manager = Mock() + settings_manager.get_setting_as_bool.return_value = False + input_manager = Mock() + pty_driver.env = { + "input": {"curr_input": []}, + "runtime": { + "DebugManager": Mock(write_debug_out=Mock()), + "InputManager": input_manager, + "SettingsManager": settings_manager, + }, + } + pty_driver.inject_text_to_screen = Mock() + + pty_driver.handle_stdin_input(b"\t", Mock()) + + input_manager.record_unmanaged_keypress.assert_called_once_with("KEY_TAB") + pty_driver.inject_text_to_screen.assert_called_once_with(b"\t") + + +@pytest.mark.unit +def test_pty_plain_stdin_does_not_record_tab_keypress(): + pty_driver = PtyDriver() + settings_manager = Mock() + settings_manager.get_setting_as_bool.return_value = False + input_manager = Mock() + pty_driver.env = { + "input": {"curr_input": []}, + "runtime": { + "DebugManager": Mock(write_debug_out=Mock()), + "InputManager": input_manager, + "SettingsManager": settings_manager, + }, + } + pty_driver.inject_text_to_screen = Mock() + + pty_driver.handle_stdin_input(b"a", Mock()) + + input_manager.record_unmanaged_keypress.assert_not_called() + pty_driver.inject_text_to_screen.assert_called_once_with(b"a") + + @pytest.mark.unit def test_pty_stdin_input_honors_interrupt_disabled(): pty_driver = PtyDriver() diff --git a/tests/unit/test_tab_completion_manager.py b/tests/unit/test_tab_completion_manager.py index 1eb9241f..10072f6a 100644 --- a/tests/unit/test_tab_completion_manager.py +++ b/tests/unit/test_tab_completion_manager.py @@ -212,6 +212,27 @@ def test_recent_tab_screen_update_works_without_key_input_snapshot(): assert manager.process_update() == "cuments/" +@pytest.mark.unit +def test_recent_raw_pty_tab_speaks_short_completion_without_capture(): + manager, env, input_manager = _build_env( + "cd Documents/".ljust(20), {"x": 13, "y": 0} + ) + input_manager.get_last_deepest_input.return_value = ["KEY_TAB"] + input_manager.get_last_event.return_value = None + env["screen"]["old_content_text"] = "cd Docume".ljust(20) + env["screen"]["old_cursor"] = {"x": 9, "y": 0} + env["commandBuffer"]["tabCompletion"]["pending"] = None + _set_screen_update( + env, + "cd Documents/".ljust(20), + {"x": 13, "y": 0}, + delta="nts/", + typing=True, + ) + + assert manager.process_update() == "nts/" + + @pytest.mark.unit def test_large_insertion_echo_speaks_pasted_cursor_text(): large_insertion_module = _load_large_insertion_module()