Another attempt at improving tab completion detection.

This commit is contained in:
Storm Dragon
2026-05-08 21:50:27 -04:00
parent 6fb8298b9f
commit 42ba3fdad2
6 changed files with 395 additions and 289 deletions
@@ -1,161 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import time
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
# Initialize tab completion state tracking
if "tabCompletion" not in self.env["commandBuffer"]:
self.env["commandBuffer"]["tabCompletion"] = {
"lastTabTime": 0,
"pendingCompletion": None,
"retryCount": 0
}
def shutdown(self):
pass
def get_description(self):
return _("Announces tab completions when detected")
def _is_recent_tab_input(self):
"""Check if TAB was pressed recently (within 200ms window)"""
current_time = time.time()
tab_detected = False
if (self.env["runtime"]["InputManager"].get_last_deepest_input()
in [["KEY_TAB"]]):
tab_detected = True
self.env["commandBuffer"]["tabCompletion"]["lastTabTime"] = current_time
# Check if tab was pressed recently (200ms window)
if not tab_detected:
time_since_tab = current_time - self.env["commandBuffer"]["tabCompletion"]["lastTabTime"]
if time_since_tab <= 0.2: # 200ms window
tab_detected = True
return tab_detected
def _is_flexible_completion_match(self, x_move, delta_text):
"""Use flexible matching instead of strict equality"""
if not delta_text:
return False
delta_len = len(delta_text)
# Exact match (preserve original behavior)
if x_move == delta_len:
return True
# Flexible range: allow ±2 characters difference
# Handles spacing adjustments and unicode width variations
if abs(x_move - delta_len) <= 2 and delta_len > 0:
return True
# For longer completions, allow proportional variance
if delta_len > 10 and abs(x_move - delta_len) <= (delta_len * 0.2):
return True
return False
def _detect_completion_patterns(self, delta_text):
"""Detect common tab completion patterns for improved accuracy"""
if not delta_text:
return False
delta_stripped = delta_text.strip()
# File extension completion
if '.' in delta_stripped and delta_stripped.count('.') <= 2:
return True
# Path completion (contains / or \)
if '/' in delta_stripped or '\\' in delta_stripped:
return True
# Command parameter completion (starts with -)
if delta_stripped.startswith('-') and len(delta_stripped) > 1:
return True
# Word boundary completion (alphanumeric content)
if delta_stripped.isalnum() and len(delta_stripped) >= 2:
return True
return False
def run(self):
"""Enhanced tab completion detection with improved reliability"""
# Basic cursor movement check (preserve original logic)
x_move = (
self.env["screen"]["new_cursor"]["x"]
- self.env["screen"]["old_cursor"]["x"]
)
if x_move <= 0:
return
# Enhanced tab input detection with persistence
tab_detected = self._is_recent_tab_input()
# Fallback for non-tab movements (preserve original thresholds)
if not tab_detected:
if x_move < 5:
return
# Screen delta availability check
if not self.env["runtime"]["ScreenManager"].is_delta():
# If tab was detected but no delta yet, store for potential retry
if tab_detected and self.env["commandBuffer"]["tabCompletion"]["retryCount"] < 2:
self.env["commandBuffer"]["tabCompletion"]["pendingCompletion"] = {
"x_move": x_move,
"timestamp": time.time()
}
self.env["commandBuffer"]["tabCompletion"]["retryCount"] += 1
return
delta_text = self.env["screen"]["new_delta"]
# Enhanced correlation checking with flexible matching
if not self._is_flexible_completion_match(x_move, delta_text):
# Additional pattern-based validation for edge cases
if not (tab_detected and self._detect_completion_patterns(delta_text)):
return
# Reset retry counter on successful detection
self.env["commandBuffer"]["tabCompletion"]["retryCount"] = 0
self.env["commandBuffer"]["tabCompletion"]["pendingCompletion"] = None
# Mark that we've handled this delta to prevent duplicate announcements
# This prevents the incoming text handler from also announcing the same content
self.env["commandBuffer"]["tabCompletion"]["lastProcessedDelta"] = delta_text
self.env["commandBuffer"]["tabCompletion"]["lastProcessedTime"] = time.time()
# Text filtering and announcement (preserve original behavior)
curr_delta = delta_text
if (len(curr_delta.strip()) != len(curr_delta) and curr_delta.strip() != ""):
curr_delta = curr_delta.strip()
# Don't interrupt ongoing auto-read announcements
do_interrupt = True
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"speech", "auto_read_incoming"
):
do_interrupt = False
# Enhanced announcement with better handling of empty completions
if curr_delta:
self.env["runtime"]["OutputManager"].present_text(
curr_delta, interrupt=do_interrupt, announce_capital=True, flush=False
)
def set_callback(self, callback):
pass
@@ -1,128 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import time
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("Handles delayed retry for tab completion detection")
def run(self):
"""Check for and process pending tab completions with slight delay"""
# Only process if we have tab completion state
if "tabCompletion" not in self.env["commandBuffer"]:
return
tab_state = self.env["commandBuffer"]["tabCompletion"]
pending = tab_state.get("pendingCompletion")
if not pending:
return
current_time = time.time()
# Process pending completion after 50ms delay
if current_time - pending["timestamp"] < 0.05:
return
# Check if screen delta is now available
if not self.env["runtime"]["ScreenManager"].is_delta():
# Give up after 200ms total
if current_time - pending["timestamp"] > 0.2:
tab_state["pendingCompletion"] = None
tab_state["retryCount"] = 0
return
# Process the delayed completion
delta_text = self.env["screen"]["new_delta"]
x_move = pending["x_move"]
# Use the same flexible matching logic as main tab completion
match_found = self._is_flexible_completion_match(x_move, delta_text)
if not match_found:
# Try pattern-based detection as final fallback
match_found = self._detect_completion_patterns(delta_text)
if match_found and delta_text:
# Mark that we've handled this delta to prevent duplicate announcements
tab_state["lastProcessedDelta"] = delta_text
tab_state["lastProcessedTime"] = current_time
# Filter and announce the completion
curr_delta = delta_text
if (len(curr_delta.strip()) != len(curr_delta) and
curr_delta.strip() != ""):
curr_delta = curr_delta.strip()
if curr_delta:
self.env["runtime"]["OutputManager"].present_text(
curr_delta, interrupt=True, announce_capital=True, flush=False
)
# Clear pending completion
tab_state["pendingCompletion"] = None
tab_state["retryCount"] = 0
def _is_flexible_completion_match(self, x_move, delta_text):
"""Use flexible matching (duplicated from main command for heartbeat use)"""
if not delta_text:
return False
delta_len = len(delta_text)
# Exact match
if x_move == delta_len:
return True
# Flexible range: allow ±2 characters difference
if abs(x_move - delta_len) <= 2 and delta_len > 0:
return True
# For longer completions, allow proportional variance
if delta_len > 10 and abs(x_move - delta_len) <= (delta_len * 0.2):
return True
return False
def _detect_completion_patterns(self, delta_text):
"""Detect common tab completion patterns (duplicated from main command)"""
if not delta_text:
return False
delta_stripped = delta_text.strip()
# File extension completion
if '.' in delta_stripped and delta_stripped.count('.') <= 2:
return True
# Path completion
if '/' in delta_stripped or '\\' in delta_stripped:
return True
# Command parameter completion
if delta_stripped.startswith('-') and len(delta_stripped) > 1:
return True
# Word boundary completion
if delta_stripped.isalnum() and len(delta_stripped) >= 2:
return True
return False
def set_callback(self, callback):
pass
@@ -0,0 +1,29 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
from fenrirscreenreader.core.tabCompletionManager import TabCompletionManager
class command:
def __init__(self):
self.manager = TabCompletionManager()
def initialize(self, environment):
self.env = environment
self.manager.initialize(environment)
def shutdown(self):
pass
def get_description(self):
return _("Tracks tab keypresses for completion announcements")
def run(self):
self.manager.capture_if_tab()
def set_callback(self, callback):
pass
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
from fenrirscreenreader.core.tabCompletionManager import TabCompletionManager
class command:
def __init__(self):
self.manager = TabCompletionManager()
def initialize(self, environment):
self.env = environment
self.manager.initialize(environment)
def shutdown(self):
pass
def get_description(self):
return _("Announces visible tab completion results")
def run(self):
text = self.manager.process_update()
if not text:
return
do_interrupt = True
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"speech", "auto_read_incoming"
):
do_interrupt = False
self.env["runtime"]["OutputManager"].present_text(
text,
interrupt=do_interrupt,
announce_capital=True,
flush=False,
)
def set_callback(self, callback):
pass
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import difflib
import time
class TabCompletionManager:
def __init__(self, timeout=0.5):
self.timeout = timeout
def initialize(self, environment):
self.env = environment
self._ensure_state()
def _ensure_state(self):
if "tabCompletion" not in self.env["commandBuffer"]:
self.env["commandBuffer"]["tabCompletion"] = {}
state = self.env["commandBuffer"]["tabCompletion"]
state.setdefault("pending", None)
state.setdefault("lastProcessedDelta", "")
state.setdefault("lastProcessedTime", 0)
def capture_if_tab(self):
if not self._last_event_is_tab_press():
return
self._ensure_state()
self.env["commandBuffer"]["tabCompletion"]["pending"] = {
"timestamp": time.time(),
"cursor": self.env["screen"]["new_cursor"].copy(),
"content": self.env["screen"]["new_content_text"],
"screen": self.env["screen"]["newTTY"],
}
def process_update(self):
self._ensure_state()
state = self.env["commandBuffer"]["tabCompletion"]
pending = state.get("pending")
if not pending:
return ""
if time.time() - pending["timestamp"] > self.timeout:
state["pending"] = None
return ""
if pending.get("screen") != self.env["screen"]["newTTY"]:
state["pending"] = None
return ""
spoken_text = self._get_completion_text(pending)
if not spoken_text:
return ""
state["pending"] = None
state["lastProcessedDelta"] = self.env["screen"]["new_delta"]
state["lastProcessedTime"] = time.time()
return spoken_text
def _last_event_is_tab_press(self):
input_manager = self.env["runtime"]["InputManager"]
last_event = input_manager.get_last_event()
if not last_event:
return False
return (
last_event.get("event_name") == "KEY_TAB"
and last_event.get("event_state") == 1
)
def _get_completion_text(self, pending):
old_lines = pending["content"].split("\n")
new_lines = self.env["screen"]["new_content_text"].split("\n")
cursor = pending["cursor"]
cursor_y = cursor["y"]
if cursor_y < len(old_lines) and cursor_y < len(new_lines):
inserted_text = self._get_cursor_line_inserted_text(
old_lines[cursor_y],
new_lines[cursor_y],
cursor["x"],
self.env["screen"]["new_cursor"]["x"],
)
if inserted_text:
return self._clean_text(inserted_text)
candidate_text = self._get_candidate_text(
old_lines, new_lines, cursor_y
)
if candidate_text:
return self._clean_text(candidate_text)
delta_text = self.env["screen"]["new_delta"]
if (
delta_text
and not self.env["screen"].get("new_delta_is_typing", False)
):
return self._clean_text(delta_text)
return ""
def _get_cursor_line_inserted_text(
self, old_line, new_line, old_cursor_x, new_cursor_x
):
if new_cursor_x <= old_cursor_x:
return ""
if old_line == new_line:
return ""
matcher = difflib.SequenceMatcher(
None, old_line, new_line, autojunk=False
)
inserted_parts = []
for (
tag,
old_start,
old_end,
new_start,
new_end,
) in matcher.get_opcodes():
if tag == "equal":
continue
if tag == "insert" and old_cursor_x <= new_start <= new_cursor_x:
inserted_parts.append(new_line[new_start:new_end])
continue
if (
tag == "replace"
and old_line[old_start:old_end].strip() == ""
and old_cursor_x <= new_start <= new_cursor_x
):
inserted_parts.append(new_line[new_start:new_end])
continue
if (
tag == "delete"
and old_line[old_start:old_end].strip() == ""
and old_start >= old_cursor_x
):
continue
if old_start >= old_cursor_x:
return ""
return "".join(inserted_parts)
def _get_candidate_text(self, old_lines, new_lines, cursor_y):
if len(old_lines) != len(new_lines):
return self._get_inserted_lines(old_lines, new_lines, cursor_y)
changed_lines = []
old_cursor_line = (
old_lines[cursor_y].strip() if cursor_y < len(old_lines) else ""
)
for index, old_line in enumerate(old_lines):
if index == cursor_y:
continue
if index < len(new_lines) and old_line != new_lines[index]:
if new_lines[index].strip() == old_cursor_line:
continue
changed_lines.append(new_lines[index])
return "\n".join(
line.rstrip() for line in changed_lines if line.strip()
)
def _get_inserted_lines(self, old_lines, new_lines, cursor_y):
matcher = difflib.SequenceMatcher(
None, old_lines, new_lines, autojunk=False
)
inserted_lines = []
for (
tag,
old_start,
old_end,
new_start,
new_end,
) in matcher.get_opcodes():
if tag not in ["insert", "replace"]:
continue
if new_end <= cursor_y:
continue
for line in new_lines[new_start:new_end]:
if line.strip():
inserted_lines.append(line.rstrip())
return "\n".join(inserted_lines)
def _clean_text(self, text):
if not text:
return ""
if len(text.strip()) != len(text) and text.strip() != "":
return text.strip()
return text
+130
View File
@@ -0,0 +1,130 @@
import time
from unittest.mock import Mock
import pytest
from fenrirscreenreader.core.tabCompletionManager import TabCompletionManager
def _build_env(old_text="", cursor=None, screen="pty"):
if cursor is None:
cursor = {"x": 0, "y": 0}
input_manager = Mock()
input_manager.get_last_event.return_value = {
"event_name": "KEY_TAB",
"event_state": 1,
}
env = {
"runtime": {
"InputManager": input_manager,
},
"screen": {
"new_cursor": cursor.copy(),
"new_content_text": old_text,
"new_delta": "",
"new_delta_is_typing": False,
"newTTY": screen,
},
"commandBuffer": {},
}
manager = TabCompletionManager()
manager.initialize(env)
return manager, env, input_manager
def _set_screen_update(env, text, cursor, delta="", typing=False):
env["screen"]["new_content_text"] = text
env["screen"]["new_cursor"] = cursor.copy()
env["screen"]["new_delta"] = delta
env["screen"]["new_delta_is_typing"] = typing
@pytest.mark.unit
def test_unique_completion_speaks_inserted_suffix():
manager, env, _input_manager = _build_env(
"cd Do".ljust(20), {"x": 5, "y": 0}
)
manager.capture_if_tab()
_set_screen_update(
env,
"cd Documents/".ljust(20),
{"x": 13, "y": 0},
delta="cuments/",
typing=True,
)
assert manager.process_update() == "cuments/"
state = env["commandBuffer"]["tabCompletion"]
assert state["pending"] is None
assert state["lastProcessedDelta"] == "cuments/"
@pytest.mark.unit
def test_candidate_list_speaks_visible_list_without_cursor_advance():
old_text = "\n".join(["$ cd Do".ljust(20), "".ljust(20), "".ljust(20)])
manager, env, _input_manager = _build_env(old_text, {"x": 7, "y": 0})
manager.capture_if_tab()
_set_screen_update(
env,
"\n".join(
[
"$ cd Do".ljust(20),
"Documents/ Downloads/".ljust(20),
"$ cd Do".ljust(20),
]
),
{"x": 7, "y": 2},
delta="Documents/ Downloads/",
)
assert manager.process_update() == "Documents/ Downloads/"
@pytest.mark.unit
def test_no_screen_change_stays_silent_and_keeps_pending_briefly():
manager, env, _input_manager = _build_env(
"cd Do".ljust(20), {"x": 5, "y": 0}
)
manager.capture_if_tab()
assert manager.process_update() == ""
assert env["commandBuffer"]["tabCompletion"]["pending"] is not None
@pytest.mark.unit
def test_timeout_clears_pending_without_speech():
manager, env, _input_manager = _build_env(
"cd Do".ljust(20), {"x": 5, "y": 0}
)
manager.capture_if_tab()
env["commandBuffer"]["tabCompletion"]["pending"]["timestamp"] = (
time.time() - 1
)
_set_screen_update(
env,
"unrelated output".ljust(20),
{"x": 0, "y": 0},
delta="unrelated output",
)
assert manager.process_update() == ""
assert env["commandBuffer"]["tabCompletion"]["pending"] is None
@pytest.mark.unit
def test_non_tab_key_does_not_capture():
manager, env, input_manager = _build_env(
"cd Do".ljust(20), {"x": 5, "y": 0}
)
input_manager.get_last_event.return_value = {
"event_name": "KEY_A",
"event_state": 1,
}
manager.capture_if_tab()
assert env["commandBuffer"]["tabCompletion"]["pending"] is None