More trouble with pty vs console behavior.

This commit is contained in:
Storm Dragon
2026-05-08 21:26:58 -04:00
parent 114a7b0da7
commit 6fb8298b9f
2 changed files with 199 additions and 34 deletions
+95 -27
View File
@@ -249,11 +249,7 @@ class ScreenManager:
cursor_line_end = ( cursor_line_end = (
cursor_line_start + self.env["screen"]["columns"] cursor_line_start + self.env["screen"]["columns"]
) )
cursor_moved_horizontally = (
# TYPING DETECTION ALGORITHM
# Determines if this screen change is likely user typing vs other content changes
# All conditions must be met for typing detection:
if (
abs( abs(
self.env["screen"]["old_cursor"]["x"] self.env["screen"]["old_cursor"]["x"]
- self.env["screen"]["new_cursor"]["x"] - self.env["screen"]["new_cursor"]["x"]
@@ -261,6 +257,32 @@ class ScreenManager:
>= 1 >= 1
and self.env["screen"]["old_cursor"]["y"] and self.env["screen"]["old_cursor"]["y"]
== self.env["screen"]["new_cursor"]["y"] == self.env["screen"]["new_cursor"]["y"]
)
cursor_line_typing_delta = (
self._get_cursor_line_typing_delta()
)
cursor_line_is_typing = (
cursor_line_typing_delta is not None
and (
cursor_moved_horizontally
or self._is_recent_input()
)
)
# TYPING DETECTION ALGORITHM
# Determines if this screen change is likely user typing vs other content changes
# All conditions must be met for typing detection:
if (
cursor_line_is_typing
and cursor_line_typing_delta["changed_lines"]
== [cursor_line_typing_delta["cursor_y"]]
):
diff_list = [
"+ " + cursor_line_typing_delta["text"]
]
typing = True
elif (
cursor_moved_horizontally
and self.env["screen"]["new_content_text"][ and self.env["screen"]["new_content_text"][
:cursor_line_start :cursor_line_start
] ]
@@ -356,11 +378,33 @@ class ScreenManager:
# Not typing - handle as line-by-line content change # Not typing - handle as line-by-line content change
# This catches: incoming messages, screen updates, # This catches: incoming messages, screen updates,
# application output, etc. # application output, etc.
appended_text = self._get_recent_cursor_line_append() typing_delta = (
if appended_text is not None: cursor_line_typing_delta
diff_list = ["+ " + appended_text] if cursor_line_is_typing
else None
)
if (
typing_delta is not None
and typing_delta["changed_lines"]
== [typing_delta["cursor_y"]]
):
diff_list = ["+ " + typing_delta["text"]]
typing = True typing = True
else: else:
old_content_text = self.env["screen"][
"old_content_text"
]
new_content_text = self.env["screen"][
"new_content_text"
]
if typing_delta is not None:
old_lines = old_content_text.split("\n")
new_lines = new_content_text.split("\n")
new_lines[typing_delta["cursor_y"]] = old_lines[
typing_delta["cursor_y"]
]
old_content_text = "\n".join(old_lines)
new_content_text = "\n".join(new_lines)
# Pre-process screen text for comparison - collapse multiple spaces to single space # Pre-process screen text for comparison - collapse multiple spaces to single space
# This normalization prevents spurious diffs from spacing # This normalization prevents spurious diffs from spacing
@@ -369,17 +413,13 @@ class ScreenManager:
" ", " ",
self.env["runtime"][ self.env["runtime"][
"ScreenManager" "ScreenManager"
].get_window_area_in_text( ].get_window_area_in_text(old_content_text),
self.env["screen"]["old_content_text"]
),
) )
new_screen_text = self._space_normalize_regex.sub( new_screen_text = self._space_normalize_regex.sub(
" ", " ",
self.env["runtime"][ self.env["runtime"][
"ScreenManager" "ScreenManager"
].get_window_area_in_text( ].get_window_area_in_text(new_content_text),
self.env["screen"]["new_content_text"]
),
) )
diff = self.differ.compare( diff = self.differ.compare(
@@ -458,13 +498,20 @@ class ScreenManager:
) )
return screen in ignore_screens return screen in ignore_screens
def _get_recent_cursor_line_append(self): def _is_recent_input(self):
try: try:
if time.time() - self.env["runtime"][ return (
"InputManager" time.time()
].get_last_input_time() > 0.3: - self.env["runtime"][
return None "InputManager"
].get_last_input_time()
<= 0.3
)
except Exception:
return False
def _get_cursor_line_typing_delta(self):
try:
old_lines = self.env["screen"]["old_content_text"].split("\n") old_lines = self.env["screen"]["old_content_text"].split("\n")
new_lines = self.env["screen"]["new_content_text"].split("\n") new_lines = self.env["screen"]["new_content_text"].split("\n")
cursor_y = self.env["screen"]["new_cursor"]["y"] cursor_y = self.env["screen"]["new_cursor"]["y"]
@@ -480,19 +527,40 @@ class ScreenManager:
for index, old_line in enumerate(old_lines) for index, old_line in enumerate(old_lines)
if index >= len(new_lines) or old_line != new_lines[index] if index >= len(new_lines) or old_line != new_lines[index]
] ]
if changed_lines != [cursor_y]: if cursor_y not in changed_lines:
return None return None
old_line = old_lines[cursor_y] old_line = old_lines[cursor_y].rstrip()
new_line = new_lines[cursor_y] new_line = new_lines[cursor_y].rstrip()
old_line = old_line.rstrip() if old_line == new_line:
if not new_line.startswith(old_line):
return None return None
appended_text = new_line[len(old_line):].strip() matcher = difflib.SequenceMatcher(
if appended_text == "" or len(appended_text) > 4: None,
old_line,
new_line,
autojunk=False,
)
inserted_parts = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == "equal":
continue
if tag == "insert":
inserted_parts.append(new_line[j1:j2])
continue
if tag == "replace" and old_line[i1:i2].strip() == "":
inserted_parts.append(new_line[j1:j2])
continue
return None return None
return appended_text
typed_text = "".join(inserted_parts).strip()
if typed_text == "" or len(typed_text) > 4:
return None
return {
"text": typed_text,
"cursor_y": cursor_y,
"changed_lines": changed_lines,
}
except Exception: except Exception:
return None return None
+104 -7
View File
@@ -70,11 +70,11 @@ def test_tui_input_line_append_is_typing_delta():
manager, env = _build_screen_manager( manager, env = _build_screen_manager(
"\n".join( "\n".join(
[ [
"<UserA> hi".ljust(20), "<UserA> hi".ljust(20),
"[Username] ".ljust(20), "[Username] ".ljust(20),
] ]
), ),
{"x": 11, "y": 1}, {"x": 11, "y": 1},
) )
env["runtime"]["InputManager"].get_last_input_time.return_value = time.time() env["runtime"]["InputManager"].get_last_input_time.return_value = time.time()
@@ -83,11 +83,11 @@ def test_tui_input_line_append_is_typing_delta():
"bytes": b"", "bytes": b"",
"lines": 2, "lines": 2,
"columns": 20, "columns": 20,
"textCursor": {"x": 12, "y": 1}, "textCursor": {"x": 12, "y": 1},
"screen": "pty", "screen": "pty",
"text": "\n".join( "text": "\n".join(
[ [
"<UserA> hi".ljust(20), "<UserA> hi".ljust(20),
"[Username] l".ljust(20), "[Username] l".ljust(20),
] ]
), ),
@@ -98,3 +98,100 @@ def test_tui_input_line_append_is_typing_delta():
assert env["screen"]["new_delta"] == "l" assert env["screen"]["new_delta"] == "l"
assert env["screen"]["new_delta_is_typing"] is True assert env["screen"]["new_delta_is_typing"] is True
@pytest.mark.unit
def test_tui_input_line_cursor_jump_keeps_only_typed_delta():
manager, env = _build_screen_manager(
"[Username] ".ljust(30),
{"x": 0, "y": 0},
)
manager.update(
{
"bytes": b"",
"lines": 1,
"columns": 30,
"textCursor": {"x": 12, "y": 0},
"screen": "pty",
"text": "[Username] l".ljust(30),
"attributes": [],
},
"onScreenUpdate",
)
assert env["screen"]["new_delta"] == "l"
assert env["screen"]["new_delta_is_typing"] is True
@pytest.mark.unit
def test_tui_input_line_insert_with_channel_prefix_is_typing_delta():
manager, env = _build_screen_manager(
"\n".join(
[
"<UserA> hi".ljust(40),
"[#channel] [Username] | 12:00".ljust(40),
]
),
{"x": 22, "y": 1},
)
env["runtime"]["InputManager"].get_last_input_time.return_value = time.time()
manager.update(
{
"bytes": b"",
"lines": 2,
"columns": 40,
"textCursor": {"x": 23, "y": 1},
"screen": "pty",
"text": "\n".join(
[
"<UserA> hi".ljust(40),
"[#channel] [Username] l | 12:00".ljust(40),
]
),
"attributes": [],
},
"onScreenUpdate",
)
assert env["screen"]["new_delta"] == "l"
assert env["screen"]["new_delta_is_typing"] is True
@pytest.mark.unit
def test_tui_input_line_typing_is_filtered_from_mixed_repaint_delta():
manager, env = _build_screen_manager(
"\n".join(
[
"<UserA> hi".ljust(40),
"[#channel] [Username] | 12:00".ljust(40),
"status 12:00".ljust(40),
]
),
{"x": 22, "y": 1},
)
env["runtime"]["InputManager"].get_last_input_time.return_value = time.time()
manager.update(
{
"bytes": b"",
"lines": 3,
"columns": 40,
"textCursor": {"x": 23, "y": 1},
"screen": "pty",
"text": "\n".join(
[
"<UserA> hi".ljust(40),
"[#channel] [Username] l | 12:00".ljust(40),
"status 12:01".ljust(40),
]
),
"attributes": [],
},
"onScreenUpdate",
)
assert "Username" not in env["screen"]["new_delta"]
assert "#channel" not in env["screen"]["new_delta"]
assert env["screen"]["new_delta_is_typing"] is False