Tab completion fixes.
This commit is contained in:
@@ -48,7 +48,7 @@ class command:
|
||||
for curr_key in self.env["input"]["curr_input"]:
|
||||
if curr_key not in filter_list:
|
||||
return
|
||||
self.env["runtime"]["OutputManager"].interrupt_output()
|
||||
self.env["runtime"]["OutputManager"].interrupt_output_async()
|
||||
|
||||
def set_callback(self, callback):
|
||||
pass
|
||||
|
||||
@@ -163,6 +163,14 @@ class InputManager:
|
||||
def get_last_event(self):
|
||||
return self.lastEvent
|
||||
|
||||
def record_unmanaged_keypress(self, event_name):
|
||||
self.lastEvent = {
|
||||
"event_name": event_name,
|
||||
"event_state": 1,
|
||||
}
|
||||
self.set_last_deepest_input([event_name])
|
||||
self.lastInputTime = time.time()
|
||||
|
||||
def handle_input_event(self, event_data):
|
||||
if not event_data:
|
||||
return
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import re
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
|
||||
from fenrirscreenreader.core import debug
|
||||
@@ -16,6 +17,11 @@ from fenrirscreenreader.utils import line_utils
|
||||
class OutputManager:
|
||||
def __init__(self):
|
||||
self.last_echo = ""
|
||||
self.interrupt_lock = threading.Lock()
|
||||
self.interrupt_running = False
|
||||
self.interrupt_thread = None
|
||||
self.interrupt_done = None
|
||||
self.interrupt_wait_timeout = 0.1
|
||||
|
||||
def initialize(self, environment):
|
||||
self.env = environment
|
||||
@@ -280,7 +286,40 @@ class OutputManager:
|
||||
str(e), debug.DebugLevel.ERROR
|
||||
)
|
||||
|
||||
def interrupt_output(self):
|
||||
def interrupt_output(self, wait=True):
|
||||
interrupt_done, started = self.start_interrupt_output()
|
||||
if wait and started and interrupt_done:
|
||||
interrupt_done.wait(timeout=self.interrupt_wait_timeout)
|
||||
|
||||
def interrupt_output_async(self):
|
||||
self.start_interrupt_output()
|
||||
|
||||
def start_interrupt_output(self):
|
||||
with self.interrupt_lock:
|
||||
if self.interrupt_running:
|
||||
return self.interrupt_done, False
|
||||
self.interrupt_running = True
|
||||
self.interrupt_done = threading.Event()
|
||||
self.interrupt_thread = threading.Thread(
|
||||
target=self.run_interrupt_output,
|
||||
args=(self.interrupt_done,),
|
||||
daemon=True,
|
||||
)
|
||||
interrupt_thread = self.interrupt_thread
|
||||
interrupt_done = self.interrupt_done
|
||||
interrupt_thread.start()
|
||||
return interrupt_done, True
|
||||
|
||||
def run_interrupt_output(self, interrupt_done):
|
||||
try:
|
||||
self.cancel_speech()
|
||||
finally:
|
||||
interrupt_done.set()
|
||||
with self.interrupt_lock:
|
||||
if self.interrupt_done is interrupt_done:
|
||||
self.interrupt_running = False
|
||||
|
||||
def cancel_speech(self):
|
||||
try:
|
||||
self.env["runtime"]["SpeechDriver"].cancel()
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
|
||||
@@ -333,9 +333,23 @@ class driver(screenDriver):
|
||||
def handle_stdin_input(self, msg_bytes, event_queue):
|
||||
if self.synthesize_backspace_shortcut(msg_bytes, event_queue):
|
||||
return
|
||||
self.record_stdin_keypress(msg_bytes)
|
||||
self.interrupt_output_on_stdin_input(msg_bytes)
|
||||
self.inject_text_to_screen(msg_bytes)
|
||||
|
||||
def record_stdin_keypress(self, msg_bytes):
|
||||
if msg_bytes != b"\t":
|
||||
return
|
||||
try:
|
||||
self.env["runtime"]["InputManager"].record_unmanaged_keypress(
|
||||
"KEY_TAB"
|
||||
)
|
||||
except Exception as e:
|
||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||
"ptyDriver record_stdin_keypress: " + str(e),
|
||||
debug.DebugLevel.ERROR,
|
||||
)
|
||||
|
||||
def synthesize_backspace_shortcut(self, msg_bytes, event_queue):
|
||||
if msg_bytes not in [b"\x7f", b"\x08"]:
|
||||
return False
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
import importlib.util
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
@@ -10,6 +14,7 @@ def build_output_manager():
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting_as_float.return_value = 1.0
|
||||
sound_driver = Mock()
|
||||
speech_driver = Mock()
|
||||
output_manager = OutputManager()
|
||||
output_manager.env = {
|
||||
"soundIcons": {
|
||||
@@ -19,16 +24,33 @@ def build_output_manager():
|
||||
"runtime": {
|
||||
"SettingsManager": settings_manager,
|
||||
"SoundDriver": sound_driver,
|
||||
"SpeechDriver": Mock(),
|
||||
"SpeechDriver": speech_driver,
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
},
|
||||
}
|
||||
return output_manager, sound_driver
|
||||
return output_manager, sound_driver, speech_driver
|
||||
|
||||
|
||||
def load_key_interrupt_module():
|
||||
module_path = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "src"
|
||||
/ "fenrirscreenreader"
|
||||
/ "commands"
|
||||
/ "onKeyInput"
|
||||
/ "10000-shut_up.py"
|
||||
)
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"fenrir_key_interrupt", module_path
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_present_text_allows_sound_only_feedback():
|
||||
output_manager, sound_driver = build_output_manager()
|
||||
output_manager, sound_driver, _speech_driver = build_output_manager()
|
||||
|
||||
output_manager.present_text("", sound_icon="Accept", interrupt=False)
|
||||
|
||||
@@ -39,10 +61,90 @@ def test_present_text_allows_sound_only_feedback():
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_play_sound_supports_error_alias():
|
||||
output_manager, sound_driver = build_output_manager()
|
||||
output_manager, sound_driver, _speech_driver = build_output_manager()
|
||||
|
||||
assert output_manager.play_sound("Error") is True
|
||||
|
||||
sound_driver.play_sound_file.assert_called_once_with(
|
||||
"/tmp/ErrorScreen.wav", True
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_interrupt_output_async_does_not_block_on_slow_cancel():
|
||||
output_manager, _sound_driver, speech_driver = build_output_manager()
|
||||
interrupt_started = threading.Event()
|
||||
release_interrupt = threading.Event()
|
||||
|
||||
def slow_cancel():
|
||||
interrupt_started.set()
|
||||
release_interrupt.wait(timeout=1.0)
|
||||
|
||||
speech_driver.cancel.side_effect = slow_cancel
|
||||
|
||||
start_time = time.monotonic()
|
||||
output_manager.interrupt_output_async()
|
||||
elapsed = time.monotonic() - start_time
|
||||
|
||||
try:
|
||||
assert interrupt_started.wait(timeout=0.2)
|
||||
assert elapsed < 0.2
|
||||
output_manager.interrupt_output_async()
|
||||
assert speech_driver.cancel.call_count == 1
|
||||
finally:
|
||||
release_interrupt.set()
|
||||
output_manager.interrupt_thread.join(timeout=1.0)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_interrupt_output_waits_only_briefly_for_slow_cancel():
|
||||
output_manager, _sound_driver, speech_driver = build_output_manager()
|
||||
interrupt_started = threading.Event()
|
||||
release_interrupt = threading.Event()
|
||||
|
||||
def slow_cancel():
|
||||
interrupt_started.set()
|
||||
release_interrupt.wait(timeout=1.0)
|
||||
|
||||
speech_driver.cancel.side_effect = slow_cancel
|
||||
|
||||
start_time = time.monotonic()
|
||||
output_manager.interrupt_output()
|
||||
elapsed = time.monotonic() - start_time
|
||||
|
||||
try:
|
||||
assert interrupt_started.wait(timeout=0.2)
|
||||
assert elapsed < 0.2
|
||||
output_manager.interrupt_output()
|
||||
assert speech_driver.cancel.call_count == 1
|
||||
finally:
|
||||
release_interrupt.set()
|
||||
output_manager.interrupt_thread.join(timeout=1.0)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_key_interrupt_command_uses_nonblocking_interrupt():
|
||||
module = load_key_interrupt_module()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = True
|
||||
settings_manager.get_setting.return_value = ""
|
||||
output_manager = Mock()
|
||||
env = {
|
||||
"input": {
|
||||
"curr_input": ["KEY_A"],
|
||||
"prev_input": [],
|
||||
},
|
||||
"runtime": {
|
||||
"InputManager": Mock(no_key_pressed=Mock(return_value=False)),
|
||||
"OutputManager": output_manager,
|
||||
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
command = module.command()
|
||||
command.initialize(env)
|
||||
|
||||
command.run()
|
||||
|
||||
output_manager.interrupt_output_async.assert_called_once_with()
|
||||
output_manager.interrupt_output.assert_not_called()
|
||||
|
||||
@@ -116,6 +116,50 @@ def test_pty_stdin_input_interrupt_does_not_block_input_injection():
|
||||
pty_driver.stdin_interrupt_thread.join(timeout=1.0)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_raw_tab_records_recent_tab_keypress():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"\t", Mock())
|
||||
|
||||
input_manager.record_unmanaged_keypress.assert_called_once_with("KEY_TAB")
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"\t")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_plain_stdin_does_not_record_tab_keypress():
|
||||
pty_driver = PtyDriver()
|
||||
settings_manager = Mock()
|
||||
settings_manager.get_setting_as_bool.return_value = False
|
||||
input_manager = Mock()
|
||||
pty_driver.env = {
|
||||
"input": {"curr_input": []},
|
||||
"runtime": {
|
||||
"DebugManager": Mock(write_debug_out=Mock()),
|
||||
"InputManager": input_manager,
|
||||
"SettingsManager": settings_manager,
|
||||
},
|
||||
}
|
||||
pty_driver.inject_text_to_screen = Mock()
|
||||
|
||||
pty_driver.handle_stdin_input(b"a", Mock())
|
||||
|
||||
input_manager.record_unmanaged_keypress.assert_not_called()
|
||||
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pty_stdin_input_honors_interrupt_disabled():
|
||||
pty_driver = PtyDriver()
|
||||
|
||||
@@ -212,6 +212,27 @@ def test_recent_tab_screen_update_works_without_key_input_snapshot():
|
||||
assert manager.process_update() == "cuments/"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_recent_raw_pty_tab_speaks_short_completion_without_capture():
|
||||
manager, env, input_manager = _build_env(
|
||||
"cd Documents/".ljust(20), {"x": 13, "y": 0}
|
||||
)
|
||||
input_manager.get_last_deepest_input.return_value = ["KEY_TAB"]
|
||||
input_manager.get_last_event.return_value = None
|
||||
env["screen"]["old_content_text"] = "cd Docume".ljust(20)
|
||||
env["screen"]["old_cursor"] = {"x": 9, "y": 0}
|
||||
env["commandBuffer"]["tabCompletion"]["pending"] = None
|
||||
_set_screen_update(
|
||||
env,
|
||||
"cd Documents/".ljust(20),
|
||||
{"x": 13, "y": 0},
|
||||
delta="nts/",
|
||||
typing=True,
|
||||
)
|
||||
|
||||
assert manager.process_update() == "nts/"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_large_insertion_echo_speaks_pasted_cursor_text():
|
||||
large_insertion_module = _load_large_insertion_module()
|
||||
|
||||
Reference in New Issue
Block a user