diff --git a/src/cthulhu/input_event_manager.py b/src/cthulhu/input_event_manager.py index a483654..62dc199 100644 --- a/src/cthulhu/input_event_manager.py +++ b/src/cthulhu/input_event_manager.py @@ -52,8 +52,10 @@ from . import input_event from . import script_manager from . import settings from . import cthulhu_state +from .wnck_support import load_wnck from .ax_object import AXObject from .ax_utilities import AXUtilities +from .ax_utilities_application import AXUtilitiesApplication if TYPE_CHECKING: from . import keybindings @@ -70,6 +72,8 @@ class InputEventManager: self._mapped_keysyms: List[int] = [] self._grabbed_bindings: Dict[int, keybindings.KeyBinding] = {} self._paused: bool = False + self._wnck = None + self._did_attempt_wnck_load: bool = False def activate_device(self) -> Atspi.Device: """Creates and returns the AT-SPI device used by this manager.""" @@ -367,6 +371,162 @@ class InputEventManager: or AXUtilities.is_dialog_or_alert(x), ) + def _get_wnck(self): + """Returns Wnck when available for X11 active-window checks.""" + + if not self._did_attempt_wnck_load: + self._did_attempt_wnck_load = True + try: + self._wnck = load_wnck() + except Exception as error: + msg = f"INPUT EVENT MANAGER: Wnck unavailable for active-window check: {error}" + debug.print_message(debug.LEVEL_INFO, msg, True) + self._wnck = None + + return self._wnck + + def _get_active_x11_window_pid(self) -> int: + """Returns the PID of the active X11 window if Wnck can provide it.""" + + wnck = self._get_wnck() + if wnck is None: + return -1 + + try: + screen = wnck.Screen.get_default() + if screen is None: + return -1 + screen.force_update() + window = screen.get_active_window() + if window is None: + return -1 + return int(window.get_pid()) + except Exception as error: + msg = f"INPUT EVENT MANAGER: Could not obtain active X11 window PID: {error}" + debug.print_message(debug.LEVEL_INFO, msg, True) + return -1 + + def _find_active_x11_atspi_window(self) -> Optional[Atspi.Accessible]: + """Returns the focused AT-SPI window for the active X11 PID, if possible.""" + + x11Pid = self._get_active_x11_window_pid() + if x11Pid < 1: + return None + + app = AXUtilitiesApplication.get_application_with_pid(x11Pid) + if app is None: + return None + + candidates = [ + child for child in AXObject.iter_children(app) + if AXUtilities.is_frame(child) + or AXUtilities.is_window(child) + or AXUtilities.is_dialog_or_alert(child) + ] + if not candidates: + tokens = ["INPUT EVENT MANAGER: No AT-SPI windows found for active X11 pid", x11Pid] + debug.print_tokens(debug.LEVEL_INFO, tokens, True) + return None + + focusedCandidates = [] + for window in candidates: + if AXUtilities.is_focused(window) or AXUtilities.get_focused_object(window) is not None: + focusedCandidates.append(window) + + if len(focusedCandidates) == 1: + tokens = [ + "INPUT EVENT MANAGER: Recovered active AT-SPI window from X11 pid", + x11Pid, + focusedCandidates[0], + ] + debug.print_tokens(debug.LEVEL_INFO, tokens, True) + return focusedCandidates[0] + + if focusedCandidates: + tokens = [ + "INPUT EVENT MANAGER: Multiple focused AT-SPI windows for active X11 pid", + x11Pid, + focusedCandidates, + ] + debug.print_tokens(debug.LEVEL_INFO, tokens, True) + return focusedCandidates[0] + + tokens = [ + "INPUT EVENT MANAGER: Active X11 pid", + x11Pid, + "has AT-SPI windows but none report focus", + candidates, + ] + debug.print_tokens(debug.LEVEL_INFO, tokens, True) + return None + + def _active_x11_window_differs_from(self, obj: Optional[Atspi.Accessible]) -> bool: + """Returns True when X11 focus is known to be outside the AT-SPI context.""" + + if obj is None: + return False + + x11Pid = self._get_active_x11_window_pid() + if x11Pid < 1: + return False + + axPid = AXObject.get_process_id(obj) + if axPid < 1: + return False + + result = x11Pid != axPid + if result: + tokens = [ + "INPUT EVENT MANAGER: Active X11 window PID", + x11Pid, + "differs from AT-SPI active window PID", + axPid, + "for", + obj, + ] + debug.print_tokens(debug.LEVEL_INFO, tokens, True) + + return result + + @staticmethod + def _get_active_script_app() -> Optional[Atspi.Accessible]: + """Returns the app for the active script, if one is available.""" + + script = script_manager.get_manager().get_active_script() + if script is None: + return None + + return getattr(script, "app", None) + + def _lacks_atspi_context( + self, + window: Optional[Atspi.Accessible], + focus: Optional[Atspi.Accessible], + pendingFocus: Optional[Atspi.Accessible], + ) -> bool: + """Returns True when Cthulhu has no meaningful AT-SPI input context.""" + + if window is not None or focus is not None or pendingFocus is not None: + return False + + return self._get_active_script_app() is None + + @staticmethod + def _suspend_active_script_key_grabs(reason: str) -> None: + """Removes active-script key grabs while Cthulhu has no input context.""" + + script = script_manager.get_manager().get_active_script() + if script is None: + return + + removeGrabs = getattr(script, "removeKeyGrabs", None) + if not callable(removeGrabs): + return + + msg = f"INPUT EVENT MANAGER: Removing active script key grabs. Reason: {reason}" + debug.print_message(debug.LEVEL_INFO, msg, True) + removeGrabs() + # pylint: disable=too-many-arguments # pylint: disable=too-many-positional-arguments def process_keyboard_event( @@ -400,10 +560,24 @@ class InputEventManager: if pendingFocus is not None: tokens = ["INPUT EVENT MANAGER: Using pending self-hosted focus for keyboard event:", pendingFocus] debug.print_tokens(debug.LEVEL_INFO, tokens, True) + if not pressed and self._lacks_atspi_context( + manager.get_active_window(), + manager.get_locus_of_focus(), + pendingFocus, + ): + msg = ( + "INPUT EVENT MANAGER: Passing through keyboard release; " + "no AT-SPI focus context is available." + ) + debug.print_message(debug.LEVEL_INFO, msg, True) + self._suspend_active_script_key_grabs("no AT-SPI focus context") + return False if pressed: window = manager.get_active_window() if not AXUtilities.can_be_active_window(window, clear_cache=True): new_window = AXUtilities.find_active_window() + if new_window is None: + new_window = self._find_active_x11_atspi_window() if new_window is not None: window = new_window tokens = ["INPUT EVENT MANAGER: Updating window and active window to", window] @@ -423,6 +597,30 @@ class InputEventManager: # One example: Brave's popup menus live in frames which lack the active # state. Failing to revalidate the window on a key press is inconclusive; # do not wipe out the last known window and focus state. + focus = pendingFocus or manager.get_locus_of_focus() + if self._lacks_atspi_context(window, focus, pendingFocus): + msg = ( + "INPUT EVENT MANAGER: Passing through keyboard event; " + "no AT-SPI focus context is available." + ) + debug.print_message(debug.LEVEL_INFO, msg, True) + self._suspend_active_script_key_grabs("no AT-SPI focus context") + return False + + staleContext = window or self._get_active_script_app() + if self._active_x11_window_differs_from(staleContext): + msg = ( + "INPUT EVENT MANAGER: Clearing stale AT-SPI focus context; " + "X11 focus moved to an untracked window." + ) + debug.print_message(debug.LEVEL_INFO, msg, True) + script_manager.get_manager().set_active_script( + None, + "active X11 window not found in AT-SPI", + ) + manager.clear_state("active X11 window not found in AT-SPI") + return False + tokens = [ "WARNING:", window, diff --git a/src/cthulhu/scripts/default.py b/src/cthulhu/scripts/default.py index 2c6b037..b665f4a 100644 --- a/src/cthulhu/scripts/default.py +++ b/src/cthulhu/scripts/default.py @@ -1562,7 +1562,7 @@ class Script(script.Script): return f"" @dbus_service.command - def getDiagnosticState(self, script=None, event=None, notify_user=True) -> str: + def get_diagnostic_state(self, script=None, event=None, notify_user=True) -> str: """Dumps runtime state useful for diagnosing sluggish web-app behavior.""" app = cthulhu.cthulhuApp