More fixes to pty reading.
This commit is contained in:
@@ -36,6 +36,7 @@ class command:
|
|||||||
# screen changes
|
# screen changes
|
||||||
elif (
|
elif (
|
||||||
self.env["screen"]["new_delta"]
|
self.env["screen"]["new_delta"]
|
||||||
|
and not self.env["screen"].get("new_delta_is_typing", False)
|
||||||
and self.is_real_progress_update()
|
and self.is_real_progress_update()
|
||||||
):
|
):
|
||||||
self.detect_progress(self.env["screen"]["new_delta"])
|
self.detect_progress(self.env["screen"]["new_delta"])
|
||||||
|
|||||||
@@ -185,6 +185,8 @@ class command:
|
|||||||
# is there something to read?
|
# is there something to read?
|
||||||
if not self.env["runtime"]["ScreenManager"].is_delta(ignoreSpace=True):
|
if not self.env["runtime"]["ScreenManager"].is_delta(ignoreSpace=True):
|
||||||
return
|
return
|
||||||
|
if self.env["screen"].get("new_delta_is_typing", False):
|
||||||
|
return
|
||||||
|
|
||||||
delta_text = self.env["screen"]["new_delta"]
|
delta_text = self.env["screen"]["new_delta"]
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ screen_data = {
|
|||||||
"oldApplication": "",
|
"oldApplication": "",
|
||||||
"oldTTY": None,
|
"oldTTY": None,
|
||||||
"new_delta": "",
|
"new_delta": "",
|
||||||
|
"new_delta_is_typing": False,
|
||||||
"newNegativeDelta": "",
|
"newNegativeDelta": "",
|
||||||
"newAttribDelta": "",
|
"newAttribDelta": "",
|
||||||
"newCursorReview": None,
|
"newCursorReview": None,
|
||||||
|
|||||||
@@ -194,6 +194,7 @@ class ScreenManager:
|
|||||||
# initialize current deltas
|
# initialize current deltas
|
||||||
self.env["screen"]["newNegativeDelta"] = ""
|
self.env["screen"]["newNegativeDelta"] = ""
|
||||||
self.env["screen"]["new_delta"] = ""
|
self.env["screen"]["new_delta"] = ""
|
||||||
|
self.env["screen"]["new_delta_is_typing"] = False
|
||||||
self.env["runtime"]["AttributeManager"].reset_attribute_delta()
|
self.env["runtime"]["AttributeManager"].reset_attribute_delta()
|
||||||
|
|
||||||
# Diff generation - critical for screen reader functionality
|
# Diff generation - critical for screen reader functionality
|
||||||
@@ -397,6 +398,7 @@ class ScreenManager:
|
|||||||
self.env["screen"]["new_delta"] = "".join(
|
self.env["screen"]["new_delta"] = "".join(
|
||||||
x[2:] for x in diff_list if x[0] == "+"
|
x[2:] for x in diff_list if x[0] == "+"
|
||||||
)
|
)
|
||||||
|
self.env["screen"]["new_delta_is_typing"] = typing
|
||||||
|
|
||||||
# Negative delta (removed content) - used for backspace/delete
|
# Negative delta (removed content) - used for backspace/delete
|
||||||
# detection
|
# detection
|
||||||
|
|||||||
@@ -207,23 +207,27 @@ class driver(screenDriver):
|
|||||||
try:
|
try:
|
||||||
settings_manager = self.env["runtime"]["SettingsManager"]
|
settings_manager = self.env["runtime"]["SettingsManager"]
|
||||||
|
|
||||||
# Load timeout settings with defaults
|
|
||||||
self.pty_config = {
|
self.pty_config = {
|
||||||
'input_timeout': float(settings_manager.get_setting(
|
'input_timeout': self._get_optional_float_setting(
|
||||||
'screen', 'ptyInputTimeout', PTYConstants.INPUT_READ_TIMEOUT
|
settings_manager, 'screen', 'ptyInputTimeout',
|
||||||
)),
|
PTYConstants.INPUT_READ_TIMEOUT
|
||||||
'output_timeout': float(settings_manager.get_setting(
|
),
|
||||||
'screen', 'ptyOutputTimeout', PTYConstants.OUTPUT_READ_TIMEOUT
|
'output_timeout': self._get_optional_float_setting(
|
||||||
)),
|
settings_manager, 'screen', 'ptyOutputTimeout',
|
||||||
'select_timeout': float(settings_manager.get_setting(
|
PTYConstants.OUTPUT_READ_TIMEOUT
|
||||||
'screen', 'ptySelectTimeout', PTYConstants.SELECT_TIMEOUT
|
),
|
||||||
)),
|
'select_timeout': self._get_optional_float_setting(
|
||||||
'process_termination_timeout': float(settings_manager.get_setting(
|
settings_manager, 'screen', 'ptySelectTimeout',
|
||||||
'screen', 'ptyProcessTimeout', PTYConstants.PROCESS_TERMINATION_TIMEOUT
|
PTYConstants.SELECT_TIMEOUT
|
||||||
)),
|
),
|
||||||
'poll_interval': float(settings_manager.get_setting(
|
'process_termination_timeout': self._get_optional_float_setting(
|
||||||
'screen', 'ptyPollInterval', PTYConstants.MIN_POLL_INTERVAL
|
settings_manager, 'screen', 'ptyProcessTimeout',
|
||||||
))
|
PTYConstants.PROCESS_TERMINATION_TIMEOUT
|
||||||
|
),
|
||||||
|
'poll_interval': self._get_optional_float_setting(
|
||||||
|
settings_manager, 'screen', 'ptyPollInterval',
|
||||||
|
PTYConstants.MIN_POLL_INTERVAL
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
self.env["runtime"]["DebugManager"].write_debug_out(
|
self.env["runtime"]["DebugManager"].write_debug_out(
|
||||||
@@ -245,6 +249,17 @@ class driver(screenDriver):
|
|||||||
'poll_interval': PTYConstants.MIN_POLL_INTERVAL
|
'poll_interval': PTYConstants.MIN_POLL_INTERVAL
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def _get_optional_float_setting(
|
||||||
|
self, settings_manager, section, setting, default
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
value = settings_manager.get_setting(section, setting)
|
||||||
|
if value == "":
|
||||||
|
return default
|
||||||
|
return float(value)
|
||||||
|
except Exception:
|
||||||
|
return default
|
||||||
|
|
||||||
def initialize(self, environment):
|
def initialize(self, environment):
|
||||||
self.env = environment
|
self.env = environment
|
||||||
self.command = self.env["runtime"]["SettingsManager"].get_setting(
|
self.command = self.env["runtime"]["SettingsManager"].get_setting(
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ def incoming_command():
|
|||||||
},
|
},
|
||||||
"screen": {
|
"screen": {
|
||||||
"new_delta": "",
|
"new_delta": "",
|
||||||
|
"new_delta_is_typing": False,
|
||||||
"old_content_text": "",
|
"old_content_text": "",
|
||||||
"new_content_text": "",
|
"new_content_text": "",
|
||||||
"old_cursor": {"x": 0, "y": 0},
|
"old_cursor": {"x": 0, "y": 0},
|
||||||
@@ -138,3 +139,12 @@ class TestIncomingCommand:
|
|||||||
output_manager.present_text.assert_called_once_with(
|
output_manager.present_text.assert_called_once_with(
|
||||||
"Status new\nUsers new", interrupt=False, flush=False
|
"Status new\nUsers new", interrupt=False, flush=False
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_skips_typing_delta(self, incoming_command):
|
||||||
|
command, env, output_manager = incoming_command
|
||||||
|
env["screen"]["new_delta"] = "x"
|
||||||
|
env["screen"]["new_delta_is_typing"] = True
|
||||||
|
|
||||||
|
command.run()
|
||||||
|
|
||||||
|
output_manager.present_text.assert_not_called()
|
||||||
|
|||||||
@@ -0,0 +1,41 @@
|
|||||||
|
import importlib.util
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def _load_progress_module():
|
||||||
|
module_path = (
|
||||||
|
Path(__file__).resolve().parents[2]
|
||||||
|
/ "src"
|
||||||
|
/ "fenrirscreenreader"
|
||||||
|
/ "commands"
|
||||||
|
/ "onScreenUpdate"
|
||||||
|
/ "65000-progress_detector.py"
|
||||||
|
)
|
||||||
|
spec = importlib.util.spec_from_file_location(
|
||||||
|
"fenrir_progress_detector", module_path
|
||||||
|
)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
return module
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_progress_detector_skips_typing_delta():
|
||||||
|
progress_module = _load_progress_module()
|
||||||
|
command = progress_module.command()
|
||||||
|
command.env = {
|
||||||
|
"commandBuffer": {"progress_monitoring": True},
|
||||||
|
"runtime": {"DebugManager": Mock(write_debug_out=Mock())},
|
||||||
|
"screen": {"new_delta": "x", "new_delta_is_typing": True},
|
||||||
|
}
|
||||||
|
command.is_current_line_prompt = Mock(return_value=False)
|
||||||
|
command.is_real_progress_update = Mock(return_value=True)
|
||||||
|
command.detect_progress = Mock()
|
||||||
|
|
||||||
|
command.run()
|
||||||
|
|
||||||
|
command.is_real_progress_update.assert_not_called()
|
||||||
|
command.detect_progress.assert_not_called()
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from fenrirscreenreader.screenDriver.ptyDriver import PTYConstants
|
||||||
from fenrirscreenreader.screenDriver.ptyDriver import Terminal
|
from fenrirscreenreader.screenDriver.ptyDriver import Terminal
|
||||||
|
from fenrirscreenreader.screenDriver.ptyDriver import driver as PtyDriver
|
||||||
|
|
||||||
|
|
||||||
class DummyProcessInput:
|
class DummyProcessInput:
|
||||||
@@ -20,3 +22,23 @@ def test_csi_sequences_with_intermediate_characters_do_not_render_final_byte():
|
|||||||
|
|
||||||
assert screen["text"].splitlines()[0] == "X "
|
assert screen["text"].splitlines()[0] == "X "
|
||||||
assert "p" not in screen["text"]
|
assert "p" not in screen["text"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_optional_float_setting_uses_default_when_missing():
|
||||||
|
settings_manager = type(
|
||||||
|
"SettingsManager",
|
||||||
|
(),
|
||||||
|
{"get_setting": lambda self, section, setting: ""},
|
||||||
|
)()
|
||||||
|
pty_driver = PtyDriver()
|
||||||
|
|
||||||
|
assert (
|
||||||
|
pty_driver._get_optional_float_setting(
|
||||||
|
settings_manager,
|
||||||
|
"screen",
|
||||||
|
"ptyOutputTimeout",
|
||||||
|
PTYConstants.OUTPUT_READ_TIMEOUT,
|
||||||
|
)
|
||||||
|
== PTYConstants.OUTPUT_READ_TIMEOUT
|
||||||
|
)
|
||||||
|
|||||||
@@ -0,0 +1,63 @@
|
|||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from fenrirscreenreader.core.screenManager import ScreenManager
|
||||||
|
|
||||||
|
|
||||||
|
def _build_screen_manager(old_text, old_cursor):
|
||||||
|
manager = ScreenManager()
|
||||||
|
env = {
|
||||||
|
"runtime": {
|
||||||
|
"AttributeManager": Mock(
|
||||||
|
reset_attributes=Mock(),
|
||||||
|
reset_attribute_cursor=Mock(),
|
||||||
|
set_attributes=Mock(),
|
||||||
|
reset_attribute_delta=Mock(),
|
||||||
|
is_attribute_change=Mock(return_value=False),
|
||||||
|
),
|
||||||
|
"CursorManager": Mock(is_application_window_set=Mock(return_value=False)),
|
||||||
|
"DebugManager": Mock(write_debug_out=Mock()),
|
||||||
|
"ScreenManager": manager,
|
||||||
|
"SettingsManager": Mock(get_setting_as_bool=Mock(return_value=False)),
|
||||||
|
},
|
||||||
|
"screen": {
|
||||||
|
"newContentBytes": b"",
|
||||||
|
"oldContentBytes": b"",
|
||||||
|
"new_content_text": old_text,
|
||||||
|
"old_content_text": "",
|
||||||
|
"new_cursor": old_cursor.copy(),
|
||||||
|
"old_cursor": {"x": 0, "y": 0},
|
||||||
|
"new_delta": "",
|
||||||
|
"new_delta_is_typing": False,
|
||||||
|
"oldDelta": "",
|
||||||
|
"newNegativeDelta": "",
|
||||||
|
"oldNegativeDelta": "",
|
||||||
|
"oldTTY": "pty",
|
||||||
|
"newTTY": "pty",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manager.initialize = Mock()
|
||||||
|
manager.env = env
|
||||||
|
return manager, env
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_prompt_repaint_during_typing_keeps_only_typed_delta():
|
||||||
|
manager, env = _build_screen_manager("[0] ", {"x": 4, "y": 0})
|
||||||
|
|
||||||
|
manager.update(
|
||||||
|
{
|
||||||
|
"bytes": b"[1] h",
|
||||||
|
"lines": 1,
|
||||||
|
"columns": 20,
|
||||||
|
"textCursor": {"x": 5, "y": 0},
|
||||||
|
"screen": "pty",
|
||||||
|
"text": "[1] h",
|
||||||
|
"attributes": [],
|
||||||
|
},
|
||||||
|
"onScreenUpdate",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert env["screen"]["new_delta"] == "h"
|
||||||
|
assert env["screen"]["new_delta_is_typing"] is True
|
||||||
Reference in New Issue
Block a user