From 42ba3fdad2d4352ff6d71ff6ec63a1eee040ccf5 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Fri, 8 May 2026 21:50:27 -0400 Subject: [PATCH] Another attempt at improving tab completion detection. --- .../onCursorChange/55000-tab_completion.py | 161 --------------- .../onHeartBeat/55000-tab_completion_retry.py | 128 ------------ .../55000-tab_completion_capture.py | 29 +++ .../onScreenUpdate/64000-tab_completion.py | 44 ++++ .../core/tabCompletionManager.py | 192 ++++++++++++++++++ tests/unit/test_tab_completion_manager.py | 130 ++++++++++++ 6 files changed, 395 insertions(+), 289 deletions(-) delete mode 100644 src/fenrirscreenreader/commands/onCursorChange/55000-tab_completion.py delete mode 100644 src/fenrirscreenreader/commands/onHeartBeat/55000-tab_completion_retry.py create mode 100644 src/fenrirscreenreader/commands/onKeyInput/55000-tab_completion_capture.py create mode 100644 src/fenrirscreenreader/commands/onScreenUpdate/64000-tab_completion.py create mode 100644 src/fenrirscreenreader/core/tabCompletionManager.py create mode 100644 tests/unit/test_tab_completion_manager.py diff --git a/src/fenrirscreenreader/commands/onCursorChange/55000-tab_completion.py b/src/fenrirscreenreader/commands/onCursorChange/55000-tab_completion.py deleted file mode 100644 index a118d5f1..00000000 --- a/src/fenrirscreenreader/commands/onCursorChange/55000-tab_completion.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# Fenrir TTY screen reader -# By Chrys, Storm Dragon, and contributors. - -import time -from fenrirscreenreader.core.i18n import _ - - -class command: - def __init__(self): - pass - - def initialize(self, environment): - self.env = environment - # Initialize tab completion state tracking - if "tabCompletion" not in self.env["commandBuffer"]: - self.env["commandBuffer"]["tabCompletion"] = { - "lastTabTime": 0, - "pendingCompletion": None, - "retryCount": 0 - } - - def shutdown(self): - pass - - def get_description(self): - return _("Announces tab completions when detected") - - def _is_recent_tab_input(self): - """Check if TAB was pressed recently (within 200ms window)""" - current_time = time.time() - tab_detected = False - - if (self.env["runtime"]["InputManager"].get_last_deepest_input() - in [["KEY_TAB"]]): - tab_detected = True - self.env["commandBuffer"]["tabCompletion"]["lastTabTime"] = current_time - - # Check if tab was pressed recently (200ms window) - if not tab_detected: - time_since_tab = current_time - self.env["commandBuffer"]["tabCompletion"]["lastTabTime"] - if time_since_tab <= 0.2: # 200ms window - tab_detected = True - - return tab_detected - - def _is_flexible_completion_match(self, x_move, delta_text): - """Use flexible matching instead of strict equality""" - if not delta_text: - return False - - delta_len = len(delta_text) - - # Exact match (preserve original behavior) - if x_move == delta_len: - return True - - # Flexible range: allow ±2 characters difference - # Handles spacing adjustments and unicode width variations - if abs(x_move - delta_len) <= 2 and delta_len > 0: - return True - - # For longer completions, allow proportional variance - if delta_len > 10 and abs(x_move - delta_len) <= (delta_len * 0.2): - return True - - return False - - def _detect_completion_patterns(self, delta_text): - """Detect common tab completion patterns for improved accuracy""" - if not delta_text: - return False - - delta_stripped = delta_text.strip() - - # File extension completion - if '.' in delta_stripped and delta_stripped.count('.') <= 2: - return True - - # Path completion (contains / or \) - if '/' in delta_stripped or '\\' in delta_stripped: - return True - - # Command parameter completion (starts with -) - if delta_stripped.startswith('-') and len(delta_stripped) > 1: - return True - - # Word boundary completion (alphanumeric content) - if delta_stripped.isalnum() and len(delta_stripped) >= 2: - return True - - return False - - def run(self): - """Enhanced tab completion detection with improved reliability""" - # Basic cursor movement check (preserve original logic) - x_move = ( - self.env["screen"]["new_cursor"]["x"] - - self.env["screen"]["old_cursor"]["x"] - ) - if x_move <= 0: - return - - # Enhanced tab input detection with persistence - tab_detected = self._is_recent_tab_input() - - # Fallback for non-tab movements (preserve original thresholds) - if not tab_detected: - if x_move < 5: - return - - # Screen delta availability check - if not self.env["runtime"]["ScreenManager"].is_delta(): - # If tab was detected but no delta yet, store for potential retry - if tab_detected and self.env["commandBuffer"]["tabCompletion"]["retryCount"] < 2: - self.env["commandBuffer"]["tabCompletion"]["pendingCompletion"] = { - "x_move": x_move, - "timestamp": time.time() - } - self.env["commandBuffer"]["tabCompletion"]["retryCount"] += 1 - return - - delta_text = self.env["screen"]["new_delta"] - - # Enhanced correlation checking with flexible matching - if not self._is_flexible_completion_match(x_move, delta_text): - # Additional pattern-based validation for edge cases - if not (tab_detected and self._detect_completion_patterns(delta_text)): - return - - # Reset retry counter on successful detection - self.env["commandBuffer"]["tabCompletion"]["retryCount"] = 0 - self.env["commandBuffer"]["tabCompletion"]["pendingCompletion"] = None - - # Mark that we've handled this delta to prevent duplicate announcements - # This prevents the incoming text handler from also announcing the same content - self.env["commandBuffer"]["tabCompletion"]["lastProcessedDelta"] = delta_text - self.env["commandBuffer"]["tabCompletion"]["lastProcessedTime"] = time.time() - - # Text filtering and announcement (preserve original behavior) - curr_delta = delta_text - if (len(curr_delta.strip()) != len(curr_delta) and curr_delta.strip() != ""): - curr_delta = curr_delta.strip() - - # Don't interrupt ongoing auto-read announcements - do_interrupt = True - if self.env["runtime"]["SettingsManager"].get_setting_as_bool( - "speech", "auto_read_incoming" - ): - do_interrupt = False - - # Enhanced announcement with better handling of empty completions - if curr_delta: - self.env["runtime"]["OutputManager"].present_text( - curr_delta, interrupt=do_interrupt, announce_capital=True, flush=False - ) - - def set_callback(self, callback): - pass diff --git a/src/fenrirscreenreader/commands/onHeartBeat/55000-tab_completion_retry.py b/src/fenrirscreenreader/commands/onHeartBeat/55000-tab_completion_retry.py deleted file mode 100644 index d81ea994..00000000 --- a/src/fenrirscreenreader/commands/onHeartBeat/55000-tab_completion_retry.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# Fenrir TTY screen reader -# By Chrys, Storm Dragon, and contributors. - -import time -from fenrirscreenreader.core.i18n import _ - - -class command: - def __init__(self): - pass - - def initialize(self, environment): - self.env = environment - - def shutdown(self): - pass - - def get_description(self): - return _("Handles delayed retry for tab completion detection") - - def run(self): - """Check for and process pending tab completions with slight delay""" - # Only process if we have tab completion state - if "tabCompletion" not in self.env["commandBuffer"]: - return - - tab_state = self.env["commandBuffer"]["tabCompletion"] - pending = tab_state.get("pendingCompletion") - - if not pending: - return - - current_time = time.time() - - # Process pending completion after 50ms delay - if current_time - pending["timestamp"] < 0.05: - return - - # Check if screen delta is now available - if not self.env["runtime"]["ScreenManager"].is_delta(): - # Give up after 200ms total - if current_time - pending["timestamp"] > 0.2: - tab_state["pendingCompletion"] = None - tab_state["retryCount"] = 0 - return - - # Process the delayed completion - delta_text = self.env["screen"]["new_delta"] - x_move = pending["x_move"] - - # Use the same flexible matching logic as main tab completion - match_found = self._is_flexible_completion_match(x_move, delta_text) - - if not match_found: - # Try pattern-based detection as final fallback - match_found = self._detect_completion_patterns(delta_text) - - if match_found and delta_text: - # Mark that we've handled this delta to prevent duplicate announcements - tab_state["lastProcessedDelta"] = delta_text - tab_state["lastProcessedTime"] = current_time - - # Filter and announce the completion - curr_delta = delta_text - if (len(curr_delta.strip()) != len(curr_delta) and - curr_delta.strip() != ""): - curr_delta = curr_delta.strip() - - if curr_delta: - self.env["runtime"]["OutputManager"].present_text( - curr_delta, interrupt=True, announce_capital=True, flush=False - ) - - # Clear pending completion - tab_state["pendingCompletion"] = None - tab_state["retryCount"] = 0 - - def _is_flexible_completion_match(self, x_move, delta_text): - """Use flexible matching (duplicated from main command for heartbeat use)""" - if not delta_text: - return False - - delta_len = len(delta_text) - - # Exact match - if x_move == delta_len: - return True - - # Flexible range: allow ±2 characters difference - if abs(x_move - delta_len) <= 2 and delta_len > 0: - return True - - # For longer completions, allow proportional variance - if delta_len > 10 and abs(x_move - delta_len) <= (delta_len * 0.2): - return True - - return False - - def _detect_completion_patterns(self, delta_text): - """Detect common tab completion patterns (duplicated from main command)""" - if not delta_text: - return False - - delta_stripped = delta_text.strip() - - # File extension completion - if '.' in delta_stripped and delta_stripped.count('.') <= 2: - return True - - # Path completion - if '/' in delta_stripped or '\\' in delta_stripped: - return True - - # Command parameter completion - if delta_stripped.startswith('-') and len(delta_stripped) > 1: - return True - - # Word boundary completion - if delta_stripped.isalnum() and len(delta_stripped) >= 2: - return True - - return False - - def set_callback(self, callback): - pass \ No newline at end of file diff --git a/src/fenrirscreenreader/commands/onKeyInput/55000-tab_completion_capture.py b/src/fenrirscreenreader/commands/onKeyInput/55000-tab_completion_capture.py new file mode 100644 index 00000000..51428f69 --- /dev/null +++ b/src/fenrirscreenreader/commands/onKeyInput/55000-tab_completion_capture.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Fenrir TTY screen reader +# By Chrys, Storm Dragon, and contributors. + +from fenrirscreenreader.core.i18n import _ +from fenrirscreenreader.core.tabCompletionManager import TabCompletionManager + + +class command: + def __init__(self): + self.manager = TabCompletionManager() + + def initialize(self, environment): + self.env = environment + self.manager.initialize(environment) + + def shutdown(self): + pass + + def get_description(self): + return _("Tracks tab keypresses for completion announcements") + + def run(self): + self.manager.capture_if_tab() + + def set_callback(self, callback): + pass diff --git a/src/fenrirscreenreader/commands/onScreenUpdate/64000-tab_completion.py b/src/fenrirscreenreader/commands/onScreenUpdate/64000-tab_completion.py new file mode 100644 index 00000000..021094fe --- /dev/null +++ b/src/fenrirscreenreader/commands/onScreenUpdate/64000-tab_completion.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Fenrir TTY screen reader +# By Chrys, Storm Dragon, and contributors. + +from fenrirscreenreader.core.i18n import _ +from fenrirscreenreader.core.tabCompletionManager import TabCompletionManager + + +class command: + def __init__(self): + self.manager = TabCompletionManager() + + def initialize(self, environment): + self.env = environment + self.manager.initialize(environment) + + def shutdown(self): + pass + + def get_description(self): + return _("Announces visible tab completion results") + + def run(self): + text = self.manager.process_update() + if not text: + return + + do_interrupt = True + if self.env["runtime"]["SettingsManager"].get_setting_as_bool( + "speech", "auto_read_incoming" + ): + do_interrupt = False + + self.env["runtime"]["OutputManager"].present_text( + text, + interrupt=do_interrupt, + announce_capital=True, + flush=False, + ) + + def set_callback(self, callback): + pass diff --git a/src/fenrirscreenreader/core/tabCompletionManager.py b/src/fenrirscreenreader/core/tabCompletionManager.py new file mode 100644 index 00000000..c81b0a72 --- /dev/null +++ b/src/fenrirscreenreader/core/tabCompletionManager.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Fenrir TTY screen reader +# By Chrys, Storm Dragon, and contributors. + +import difflib +import time + + +class TabCompletionManager: + def __init__(self, timeout=0.5): + self.timeout = timeout + + def initialize(self, environment): + self.env = environment + self._ensure_state() + + def _ensure_state(self): + if "tabCompletion" not in self.env["commandBuffer"]: + self.env["commandBuffer"]["tabCompletion"] = {} + state = self.env["commandBuffer"]["tabCompletion"] + state.setdefault("pending", None) + state.setdefault("lastProcessedDelta", "") + state.setdefault("lastProcessedTime", 0) + + def capture_if_tab(self): + if not self._last_event_is_tab_press(): + return + + self._ensure_state() + self.env["commandBuffer"]["tabCompletion"]["pending"] = { + "timestamp": time.time(), + "cursor": self.env["screen"]["new_cursor"].copy(), + "content": self.env["screen"]["new_content_text"], + "screen": self.env["screen"]["newTTY"], + } + + def process_update(self): + self._ensure_state() + state = self.env["commandBuffer"]["tabCompletion"] + pending = state.get("pending") + if not pending: + return "" + + if time.time() - pending["timestamp"] > self.timeout: + state["pending"] = None + return "" + + if pending.get("screen") != self.env["screen"]["newTTY"]: + state["pending"] = None + return "" + + spoken_text = self._get_completion_text(pending) + if not spoken_text: + return "" + + state["pending"] = None + state["lastProcessedDelta"] = self.env["screen"]["new_delta"] + state["lastProcessedTime"] = time.time() + return spoken_text + + def _last_event_is_tab_press(self): + input_manager = self.env["runtime"]["InputManager"] + last_event = input_manager.get_last_event() + if not last_event: + return False + return ( + last_event.get("event_name") == "KEY_TAB" + and last_event.get("event_state") == 1 + ) + + def _get_completion_text(self, pending): + old_lines = pending["content"].split("\n") + new_lines = self.env["screen"]["new_content_text"].split("\n") + cursor = pending["cursor"] + cursor_y = cursor["y"] + + if cursor_y < len(old_lines) and cursor_y < len(new_lines): + inserted_text = self._get_cursor_line_inserted_text( + old_lines[cursor_y], + new_lines[cursor_y], + cursor["x"], + self.env["screen"]["new_cursor"]["x"], + ) + if inserted_text: + return self._clean_text(inserted_text) + + candidate_text = self._get_candidate_text( + old_lines, new_lines, cursor_y + ) + if candidate_text: + return self._clean_text(candidate_text) + + delta_text = self.env["screen"]["new_delta"] + if ( + delta_text + and not self.env["screen"].get("new_delta_is_typing", False) + ): + return self._clean_text(delta_text) + + return "" + + def _get_cursor_line_inserted_text( + self, old_line, new_line, old_cursor_x, new_cursor_x + ): + if new_cursor_x <= old_cursor_x: + return "" + if old_line == new_line: + return "" + + matcher = difflib.SequenceMatcher( + None, old_line, new_line, autojunk=False + ) + inserted_parts = [] + for ( + tag, + old_start, + old_end, + new_start, + new_end, + ) in matcher.get_opcodes(): + if tag == "equal": + continue + if tag == "insert" and old_cursor_x <= new_start <= new_cursor_x: + inserted_parts.append(new_line[new_start:new_end]) + continue + if ( + tag == "replace" + and old_line[old_start:old_end].strip() == "" + and old_cursor_x <= new_start <= new_cursor_x + ): + inserted_parts.append(new_line[new_start:new_end]) + continue + if ( + tag == "delete" + and old_line[old_start:old_end].strip() == "" + and old_start >= old_cursor_x + ): + continue + if old_start >= old_cursor_x: + return "" + + return "".join(inserted_parts) + + def _get_candidate_text(self, old_lines, new_lines, cursor_y): + if len(old_lines) != len(new_lines): + return self._get_inserted_lines(old_lines, new_lines, cursor_y) + + changed_lines = [] + old_cursor_line = ( + old_lines[cursor_y].strip() if cursor_y < len(old_lines) else "" + ) + for index, old_line in enumerate(old_lines): + if index == cursor_y: + continue + if index < len(new_lines) and old_line != new_lines[index]: + if new_lines[index].strip() == old_cursor_line: + continue + changed_lines.append(new_lines[index]) + + return "\n".join( + line.rstrip() for line in changed_lines if line.strip() + ) + + def _get_inserted_lines(self, old_lines, new_lines, cursor_y): + matcher = difflib.SequenceMatcher( + None, old_lines, new_lines, autojunk=False + ) + inserted_lines = [] + for ( + tag, + old_start, + old_end, + new_start, + new_end, + ) in matcher.get_opcodes(): + if tag not in ["insert", "replace"]: + continue + if new_end <= cursor_y: + continue + for line in new_lines[new_start:new_end]: + if line.strip(): + inserted_lines.append(line.rstrip()) + return "\n".join(inserted_lines) + + def _clean_text(self, text): + if not text: + return "" + if len(text.strip()) != len(text) and text.strip() != "": + return text.strip() + return text diff --git a/tests/unit/test_tab_completion_manager.py b/tests/unit/test_tab_completion_manager.py new file mode 100644 index 00000000..d6a105cf --- /dev/null +++ b/tests/unit/test_tab_completion_manager.py @@ -0,0 +1,130 @@ +import time +from unittest.mock import Mock + +import pytest + +from fenrirscreenreader.core.tabCompletionManager import TabCompletionManager + + +def _build_env(old_text="", cursor=None, screen="pty"): + if cursor is None: + cursor = {"x": 0, "y": 0} + input_manager = Mock() + input_manager.get_last_event.return_value = { + "event_name": "KEY_TAB", + "event_state": 1, + } + env = { + "runtime": { + "InputManager": input_manager, + }, + "screen": { + "new_cursor": cursor.copy(), + "new_content_text": old_text, + "new_delta": "", + "new_delta_is_typing": False, + "newTTY": screen, + }, + "commandBuffer": {}, + } + manager = TabCompletionManager() + manager.initialize(env) + return manager, env, input_manager + + +def _set_screen_update(env, text, cursor, delta="", typing=False): + env["screen"]["new_content_text"] = text + env["screen"]["new_cursor"] = cursor.copy() + env["screen"]["new_delta"] = delta + env["screen"]["new_delta_is_typing"] = typing + + +@pytest.mark.unit +def test_unique_completion_speaks_inserted_suffix(): + manager, env, _input_manager = _build_env( + "cd Do".ljust(20), {"x": 5, "y": 0} + ) + + manager.capture_if_tab() + _set_screen_update( + env, + "cd Documents/".ljust(20), + {"x": 13, "y": 0}, + delta="cuments/", + typing=True, + ) + + assert manager.process_update() == "cuments/" + state = env["commandBuffer"]["tabCompletion"] + assert state["pending"] is None + assert state["lastProcessedDelta"] == "cuments/" + + +@pytest.mark.unit +def test_candidate_list_speaks_visible_list_without_cursor_advance(): + old_text = "\n".join(["$ cd Do".ljust(20), "".ljust(20), "".ljust(20)]) + manager, env, _input_manager = _build_env(old_text, {"x": 7, "y": 0}) + + manager.capture_if_tab() + _set_screen_update( + env, + "\n".join( + [ + "$ cd Do".ljust(20), + "Documents/ Downloads/".ljust(20), + "$ cd Do".ljust(20), + ] + ), + {"x": 7, "y": 2}, + delta="Documents/ Downloads/", + ) + + assert manager.process_update() == "Documents/ Downloads/" + + +@pytest.mark.unit +def test_no_screen_change_stays_silent_and_keeps_pending_briefly(): + manager, env, _input_manager = _build_env( + "cd Do".ljust(20), {"x": 5, "y": 0} + ) + + manager.capture_if_tab() + + assert manager.process_update() == "" + assert env["commandBuffer"]["tabCompletion"]["pending"] is not None + + +@pytest.mark.unit +def test_timeout_clears_pending_without_speech(): + manager, env, _input_manager = _build_env( + "cd Do".ljust(20), {"x": 5, "y": 0} + ) + + manager.capture_if_tab() + env["commandBuffer"]["tabCompletion"]["pending"]["timestamp"] = ( + time.time() - 1 + ) + _set_screen_update( + env, + "unrelated output".ljust(20), + {"x": 0, "y": 0}, + delta="unrelated output", + ) + + assert manager.process_update() == "" + assert env["commandBuffer"]["tabCompletion"]["pending"] is None + + +@pytest.mark.unit +def test_non_tab_key_does_not_capture(): + manager, env, input_manager = _build_env( + "cd Do".ljust(20), {"x": 5, "y": 0} + ) + input_manager.get_last_event.return_value = { + "event_name": "KEY_A", + "event_state": 1, + } + + manager.capture_if_tab() + + assert env["commandBuffer"]["tabCompletion"]["pending"] is None