From e54600ff4da5460d18b12c1dffdd723ddb110779 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Thu, 7 May 2026 18:05:16 -0400 Subject: [PATCH] Reworked the xterm hand off so it's faster and more reliable. --- src/cthulhu/input_event_manager.py | 169 ++++++--- ...put_event_manager_x11_focus_regressions.py | 328 ++++++++++++++++++ 2 files changed, 454 insertions(+), 43 deletions(-) create mode 100644 tests/test_input_event_manager_x11_focus_regressions.py diff --git a/src/cthulhu/input_event_manager.py b/src/cthulhu/input_event_manager.py index 62dc199..7f4f970 100644 --- a/src/cthulhu/input_event_manager.py +++ b/src/cthulhu/input_event_manager.py @@ -39,7 +39,8 @@ __copyright__ = "Copyright (c) 2024 Igalia, S.L." \ "Copyright (c) 2024 GNOME Foundation Inc." __license__ = "LGPL" -from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict +import os +from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict, Any import gi gi.require_version("Atspi", "2.0") @@ -74,6 +75,7 @@ class InputEventManager: self._paused: bool = False self._wnck = None self._did_attempt_wnck_load: bool = False + self._scriptWithSuspendedGrabsForXterm = None def activate_device(self) -> Atspi.Device: """Creates and returns the AT-SPI device used by this manager.""" @@ -385,27 +387,94 @@ class InputEventManager: return self._wnck - def _get_active_x11_window_pid(self) -> int: - """Returns the PID of the active X11 window if Wnck can provide it.""" + def _get_active_x11_window(self) -> Optional[Any]: + """Returns the active X11 window if Wnck can provide it.""" wnck = self._get_wnck() if wnck is None: - return -1 + return None try: screen = wnck.Screen.get_default() if screen is None: - return -1 + return None screen.force_update() - window = screen.get_active_window() - if window is None: - return -1 + return screen.get_active_window() + except Exception as error: + msg = f"INPUT EVENT MANAGER: Could not obtain active X11 window: {error}" + debug.print_message(debug.LEVEL_INFO, msg, True) + return None + + def _get_active_x11_window_pid(self) -> int: + """Returns the PID of the active X11 window if Wnck can provide it.""" + + window = self._get_active_x11_window() + if window is None: + return -1 + + try: return int(window.get_pid()) except Exception as error: msg = f"INPUT EVENT MANAGER: Could not obtain active X11 window PID: {error}" debug.print_message(debug.LEVEL_INFO, msg, True) return -1 + @staticmethod + def _identifier_is_xterm(value: Any) -> bool: + """Returns True if value identifies XTerm.""" + + if not isinstance(value, str): + return False + + identifier = os.path.basename(value.strip().lower()) + return identifier == "xterm" + + def _active_x11_window_is_xterm(self) -> bool: + """Returns True when the active X11 window appears to be XTerm.""" + + window = self._get_active_x11_window() + if window is None: + return False + + for attrName in ("get_class_group_name", "get_class_instance_name", "get_name"): + attr = getattr(window, attrName, None) + if not callable(attr): + continue + try: + if self._identifier_is_xterm(attr()): + return True + except Exception: + continue + + getClassGroup = getattr(window, "get_class_group", None) + if callable(getClassGroup): + try: + classGroup = getClassGroup() + except Exception: + classGroup = None + + for attrName in ("get_name", "get_res_class"): + attr = getattr(classGroup, attrName, None) + if not callable(attr): + continue + try: + if self._identifier_is_xterm(attr()): + return True + except Exception: + continue + + pid = self._get_active_x11_window_pid() + if pid < 1: + return False + + try: + with open(f"/proc/{pid}/cmdline", "rb") as cmdlineFile: + executable = cmdlineFile.read().split(b"\0", 1)[0].decode(errors="ignore") + except OSError: + return False + + return self._identifier_is_xterm(executable) + def _find_active_x11_atspi_window(self) -> Optional[Atspi.Accessible]: """Returns the focused AT-SPI window for the active X11 PID, if possible.""" @@ -498,8 +567,8 @@ class InputEventManager: return getattr(script, "app", None) + @staticmethod def _lacks_atspi_context( - self, window: Optional[Atspi.Accessible], focus: Optional[Atspi.Accessible], pendingFocus: Optional[Atspi.Accessible], @@ -509,23 +578,57 @@ class InputEventManager: if window is not None or focus is not None or pendingFocus is not None: return False - return self._get_active_script_app() is None + return True - @staticmethod - def _suspend_active_script_key_grabs(reason: str) -> None: - """Removes active-script key grabs while Cthulhu has no input context.""" + def _should_pass_through_for_active_xterm( + self, + window: Optional[Atspi.Accessible], + focus: Optional[Atspi.Accessible], + pendingFocus: Optional[Atspi.Accessible], + ) -> bool: + """Returns True when XTerm is active and Cthulhu lacks matching AT-SPI context.""" + + if pendingFocus is not None: + return False + return self._active_x11_window_is_xterm() + + def _suspend_key_grabs_for_xterm(self) -> None: + """Suspends active-script key grabs so XTerm/Fenrir can receive them.""" script = script_manager.get_manager().get_active_script() - if script is None: + if script is None or script is self._scriptWithSuspendedGrabsForXterm: return + if self._scriptWithSuspendedGrabsForXterm is not None: + self._restore_key_grabs_after_xterm() + removeGrabs = getattr(script, "removeKeyGrabs", None) if not callable(removeGrabs): return - msg = f"INPUT EVENT MANAGER: Removing active script key grabs. Reason: {reason}" + msg = "INPUT EVENT MANAGER: Removing active script key grabs while XTerm is focused." debug.print_message(debug.LEVEL_INFO, msg, True) removeGrabs() + self._scriptWithSuspendedGrabsForXterm = script + + def _restore_key_grabs_after_xterm(self) -> None: + """Restores key grabs after leaving XTerm.""" + + script = self._scriptWithSuspendedGrabsForXterm + self._scriptWithSuspendedGrabsForXterm = None + if script is None: + return + + if script is not script_manager.get_manager().get_active_script(): + return + + addGrabs = getattr(script, "addKeyGrabs", None) + if not callable(addGrabs): + return + + msg = "INPUT EVENT MANAGER: Restoring active script key grabs after leaving XTerm." + debug.print_message(debug.LEVEL_INFO, msg, True) + addGrabs() # pylint: disable=too-many-arguments # pylint: disable=too-many-positional-arguments @@ -560,20 +663,15 @@ class InputEventManager: if pendingFocus is not None: tokens = ["INPUT EVENT MANAGER: Using pending self-hosted focus for keyboard event:", pendingFocus] debug.print_tokens(debug.LEVEL_INFO, tokens, True) - if not pressed and self._lacks_atspi_context( - manager.get_active_window(), - manager.get_locus_of_focus(), - pendingFocus, - ): - msg = ( - "INPUT EVENT MANAGER: Passing through keyboard release; " - "no AT-SPI focus context is available." - ) + window = manager.get_active_window() + focus = manager.get_locus_of_focus() + if self._should_pass_through_for_active_xterm(window, focus, pendingFocus): + msg = "INPUT EVENT MANAGER: Passing through keyboard event; active XTerm is focused." debug.print_message(debug.LEVEL_INFO, msg, True) - self._suspend_active_script_key_grabs("no AT-SPI focus context") + self._suspend_key_grabs_for_xterm() return False + self._restore_key_grabs_after_xterm() if pressed: - window = manager.get_active_window() if not AXUtilities.can_be_active_window(window, clear_cache=True): new_window = AXUtilities.find_active_window() if new_window is None: @@ -598,28 +696,13 @@ class InputEventManager: # state. Failing to revalidate the window on a key press is inconclusive; # do not wipe out the last known window and focus state. focus = pendingFocus or manager.get_locus_of_focus() - if self._lacks_atspi_context(window, focus, pendingFocus): - msg = ( - "INPUT EVENT MANAGER: Passing through keyboard event; " - "no AT-SPI focus context is available." - ) - debug.print_message(debug.LEVEL_INFO, msg, True) - self._suspend_active_script_key_grabs("no AT-SPI focus context") - return False - staleContext = window or self._get_active_script_app() if self._active_x11_window_differs_from(staleContext): msg = ( - "INPUT EVENT MANAGER: Clearing stale AT-SPI focus context; " - "X11 focus moved to an untracked window." + "INPUT EVENT MANAGER: X11 focus moved to an untracked window; " + "preserving Cthulhu key handling." ) debug.print_message(debug.LEVEL_INFO, msg, True) - script_manager.get_manager().set_active_script( - None, - "active X11 window not found in AT-SPI", - ) - manager.clear_state("active X11 window not found in AT-SPI") - return False tokens = [ "WARNING:", diff --git a/tests/test_input_event_manager_x11_focus_regressions.py b/tests/test_input_event_manager_x11_focus_regressions.py new file mode 100644 index 0000000..4958bea --- /dev/null +++ b/tests/test_input_event_manager_x11_focus_regressions.py @@ -0,0 +1,328 @@ +import sys +import unittest +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import input_event_manager + + +class InputEventManagerX11FocusRegressionTests(unittest.TestCase): + def test_active_x11_window_differs_from_cached_atspi_window_by_pid(self): + manager = input_event_manager.InputEventManager() + cachedWindow = object() + + with ( + mock.patch.object(manager, "_get_active_x11_window_pid", return_value=1002), + mock.patch.object(input_event_manager.AXObject, "get_process_id", return_value=1001), + mock.patch.object(input_event_manager.debug, "print_tokens"), + ): + self.assertTrue(manager._active_x11_window_differs_from(cachedWindow)) + + def test_keyboard_event_preserves_key_handling_when_unknown_x11_window_is_not_xterm(self): + manager = input_event_manager.InputEventManager() + staleWindow = object() + staleFocus = object() + focusManager = mock.Mock() + focusManager.get_active_window.return_value = staleWindow + focusManager.get_locus_of_focus.return_value = staleFocus + scriptManager = mock.Mock() + keyboardEvent = mock.Mock() + keyboardEvent.is_modifier_key.return_value = False + + with ( + mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False), + mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager), + mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager), + mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent), + mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window", return_value=False), + mock.patch.object(input_event_manager.AXUtilities, "find_active_window", return_value=None), + mock.patch.object(manager, "_get_top_level_window", return_value=None), + mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=False), + mock.patch.object(manager, "_active_x11_window_differs_from", return_value=True), + mock.patch.object(manager, "last_event_was_keyboard", return_value=False), + mock.patch.object(input_event_manager.debug, "print_message"), + ): + result = manager.process_keyboard_event( + mock.Mock(), + True, + 36, + 65293, + 0, + "Return", + ) + + self.assertTrue(result) + scriptManager.set_active_script.assert_not_called() + focusManager.clear_state.assert_not_called() + keyboardEvent.process.assert_called_once_with() + + def test_keyboard_event_uses_active_script_app_when_cached_window_is_missing(self): + manager = input_event_manager.InputEventManager() + staleApp = object() + staleScript = mock.Mock(app=staleApp) + focusManager = mock.Mock() + focusManager.get_active_window.return_value = None + focusManager.get_locus_of_focus.return_value = None + scriptManager = mock.Mock() + scriptManager.get_active_script.return_value = staleScript + keyboardEvent = mock.Mock() + keyboardEvent.is_modifier_key.return_value = False + + with ( + mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False), + mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager), + mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager), + mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent), + mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window", return_value=False), + mock.patch.object(input_event_manager.AXUtilities, "find_active_window", return_value=None), + mock.patch.object(manager, "_get_top_level_window", return_value=None), + mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=False), + mock.patch.object(manager, "_active_x11_window_differs_from", return_value=True) as differs, + mock.patch.object(manager, "last_event_was_keyboard", return_value=False), + mock.patch.object(input_event_manager.debug, "print_message"), + ): + result = manager.process_keyboard_event( + mock.Mock(), + True, + 81, + 65434, + 0, + "KP_Page_Up", + ) + + self.assertTrue(result) + differs.assert_called_once_with(staleApp) + scriptManager.set_active_script.assert_not_called() + focusManager.clear_state.assert_not_called() + keyboardEvent.process.assert_called_once_with() + + def test_keyboard_press_keeps_cthulhu_commands_when_unknown_window_is_not_xterm(self): + manager = input_event_manager.InputEventManager() + focusManager = mock.Mock() + focusManager.get_active_window.return_value = None + focusManager.get_locus_of_focus.return_value = None + scriptManager = mock.Mock() + activeScript = mock.Mock(app=None) + scriptManager.get_active_script.return_value = activeScript + keyboardEvent = mock.Mock() + + with ( + mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False), + mock.patch.object(input_event_manager.cthulhu_state, "pendingSelfHostedFocus", None), + mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager), + mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager), + mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent), + mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window", return_value=False), + mock.patch.object(input_event_manager.AXUtilities, "find_active_window", return_value=None), + mock.patch.object(manager, "_get_top_level_window", return_value=None), + mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=False), + mock.patch.object(manager, "_active_x11_window_differs_from", return_value=False), + mock.patch.object(manager, "last_event_was_keyboard", return_value=False), + mock.patch.object(input_event_manager.debug, "print_message"), + ): + result = manager.process_keyboard_event( + mock.Mock(), + True, + 79, + 65429, + 0, + "KP_Home", + ) + + self.assertTrue(result) + activeScript.removeKeyGrabs.assert_not_called() + keyboardEvent.process.assert_called_once_with() + + def test_keyboard_press_suspends_grabs_and_passes_through_when_active_xterm_is_focused(self): + manager = input_event_manager.InputEventManager() + staleWindow = object() + staleFocus = object() + focusManager = mock.Mock() + focusManager.get_active_window.return_value = staleWindow + focusManager.get_locus_of_focus.return_value = staleFocus + scriptManager = mock.Mock() + activeScript = mock.Mock(app=None) + scriptManager.get_active_script.return_value = activeScript + keyboardEvent = mock.Mock() + + with ( + mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False), + mock.patch.object(input_event_manager.cthulhu_state, "pendingSelfHostedFocus", None), + mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager), + mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager), + mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent), + mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=True), + mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window") as canBeActiveWindow, + mock.patch.object(input_event_manager.AXUtilities, "find_active_window") as findActiveWindow, + mock.patch.object(input_event_manager.debug, "print_message"), + ): + result = manager.process_keyboard_event( + mock.Mock(), + True, + 79, + 65429, + 0, + "KP_Home", + ) + + self.assertFalse(result) + activeScript.removeKeyGrabs.assert_called_once_with() + canBeActiveWindow.assert_not_called() + findActiveWindow.assert_not_called() + keyboardEvent.process.assert_not_called() + + def test_finds_focused_atspi_window_for_active_x11_pid(self): + manager = input_event_manager.InputEventManager() + app = object() + unfocusedWindow = object() + focusedWindow = object() + + with ( + mock.patch.object(manager, "_get_active_x11_window_pid", return_value=23875), + mock.patch.object( + input_event_manager.AXUtilitiesApplication, + "get_application_with_pid", + return_value=app, + ), + mock.patch.object( + input_event_manager.AXObject, + "iter_children", + return_value=iter([unfocusedWindow, focusedWindow]), + ), + mock.patch.object(input_event_manager.AXUtilities, "is_frame", return_value=True), + mock.patch.object(input_event_manager.AXUtilities, "is_window", return_value=False), + mock.patch.object(input_event_manager.AXUtilities, "is_dialog_or_alert", return_value=False), + mock.patch.object(input_event_manager.AXUtilities, "is_focused", return_value=False), + mock.patch.object( + input_event_manager.AXUtilities, + "get_focused_object", + side_effect=[None, object()], + ), + mock.patch.object(input_event_manager.debug, "print_tokens"), + ): + result = manager._find_active_x11_atspi_window() + + self.assertIs(result, focusedWindow) + + def test_keyboard_event_recovers_context_from_active_x11_pid_window(self): + manager = input_event_manager.InputEventManager() + recoveredWindow = object() + focusManager = mock.Mock() + focusManager.get_active_window.return_value = None + focusManager.get_locus_of_focus.return_value = None + scriptManager = mock.Mock() + activeScript = mock.Mock(app=None) + scriptManager.get_active_script.return_value = activeScript + keyboardEvent = mock.Mock() + keyboardEvent.is_modifier_key.return_value = False + + with ( + mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False), + mock.patch.object(input_event_manager.cthulhu_state, "pendingSelfHostedFocus", None), + mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager), + mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager), + mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent), + mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window", return_value=False), + mock.patch.object(input_event_manager.AXUtilities, "find_active_window", return_value=None), + mock.patch.object(manager, "_find_active_x11_atspi_window", return_value=recoveredWindow), + mock.patch.object(manager, "last_event_was_keyboard", return_value=False), + mock.patch.object(input_event_manager.debug, "print_message"), + mock.patch.object(input_event_manager.debug, "print_tokens"), + ): + result = manager.process_keyboard_event( + mock.Mock(), + True, + 79, + 65429, + 0, + "KP_Home", + ) + + self.assertTrue(result) + focusManager.set_active_window.assert_called_once_with(recoveredWindow) + activeScript.removeKeyGrabs.assert_not_called() + keyboardEvent.set_window.assert_called_once_with(recoveredWindow) + keyboardEvent.process.assert_called_once_with() + + def test_keyboard_release_suspends_grabs_and_passes_through_when_active_xterm_is_focused(self): + manager = input_event_manager.InputEventManager() + focusManager = mock.Mock() + focusManager.get_active_window.return_value = None + focusManager.get_locus_of_focus.return_value = None + scriptManager = mock.Mock() + activeScript = mock.Mock(app=None) + scriptManager.get_active_script.return_value = activeScript + keyboardEvent = mock.Mock() + + with ( + mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False), + mock.patch.object(input_event_manager.cthulhu_state, "pendingSelfHostedFocus", None), + mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager), + mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager), + mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent), + mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=True), + mock.patch.object(input_event_manager.debug, "print_message"), + ): + result = manager.process_keyboard_event( + mock.Mock(), + False, + 79, + 65429, + 0, + "KP_Home", + ) + + self.assertFalse(result) + activeScript.removeKeyGrabs.assert_called_once_with() + keyboardEvent.process.assert_not_called() + + def test_xterm_grab_suspension_is_idempotent(self): + manager = input_event_manager.InputEventManager() + activeScript = mock.Mock() + scriptManager = mock.Mock() + scriptManager.get_active_script.return_value = activeScript + + with mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager): + manager._suspend_key_grabs_for_xterm() + manager._suspend_key_grabs_for_xterm() + + activeScript.removeKeyGrabs.assert_called_once_with() + + def test_xterm_grab_restore_readds_grabs_for_same_active_script(self): + manager = input_event_manager.InputEventManager() + activeScript = mock.Mock() + scriptManager = mock.Mock() + scriptManager.get_active_script.return_value = activeScript + + with mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager): + manager._suspend_key_grabs_for_xterm() + manager._restore_key_grabs_after_xterm() + + activeScript.removeKeyGrabs.assert_called_once_with() + activeScript.addKeyGrabs.assert_called_once_with() + + def test_xterm_grab_restore_does_not_readd_grabs_for_inactive_script(self): + manager = input_event_manager.InputEventManager() + oldScript = mock.Mock() + newScript = mock.Mock() + scriptManager = mock.Mock() + scriptManager.get_active_script.side_effect = [oldScript, newScript] + + with mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager): + manager._suspend_key_grabs_for_xterm() + manager._restore_key_grabs_after_xterm() + + oldScript.removeKeyGrabs.assert_called_once_with() + oldScript.addKeyGrabs.assert_not_called() + + def test_identifier_is_xterm_matches_exact_xterm_only(self): + self.assertTrue(input_event_manager.InputEventManager._identifier_is_xterm("xterm")) + self.assertTrue(input_event_manager.InputEventManager._identifier_is_xterm("/usr/bin/xterm")) + self.assertFalse(input_event_manager.InputEventManager._identifier_is_xterm("uxterm")) + self.assertFalse(input_event_manager.InputEventManager._identifier_is_xterm("gnome-terminal")) + + +if __name__ == "__main__": + unittest.main()