Reworked the xterm hand off so it's faster and more reliable.

This commit is contained in:
Storm Dragon
2026-05-07 18:05:16 -04:00
parent e6b6b1051e
commit e54600ff4d
2 changed files with 454 additions and 43 deletions
+126 -43
View File
@@ -39,7 +39,8 @@ __copyright__ = "Copyright (c) 2024 Igalia, S.L." \
"Copyright (c) 2024 GNOME Foundation Inc."
__license__ = "LGPL"
from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict
import os
from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict, Any
import gi
gi.require_version("Atspi", "2.0")
@@ -74,6 +75,7 @@ class InputEventManager:
self._paused: bool = False
self._wnck = None
self._did_attempt_wnck_load: bool = False
self._scriptWithSuspendedGrabsForXterm = None
def activate_device(self) -> Atspi.Device:
"""Creates and returns the AT-SPI device used by this manager."""
@@ -385,27 +387,94 @@ class InputEventManager:
return self._wnck
def _get_active_x11_window_pid(self) -> int:
"""Returns the PID of the active X11 window if Wnck can provide it."""
def _get_active_x11_window(self) -> Optional[Any]:
"""Returns the active X11 window if Wnck can provide it."""
wnck = self._get_wnck()
if wnck is None:
return -1
return None
try:
screen = wnck.Screen.get_default()
if screen is None:
return -1
return None
screen.force_update()
window = screen.get_active_window()
if window is None:
return -1
return screen.get_active_window()
except Exception as error:
msg = f"INPUT EVENT MANAGER: Could not obtain active X11 window: {error}"
debug.print_message(debug.LEVEL_INFO, msg, True)
return None
def _get_active_x11_window_pid(self) -> int:
"""Returns the PID of the active X11 window if Wnck can provide it."""
window = self._get_active_x11_window()
if window is None:
return -1
try:
return int(window.get_pid())
except Exception as error:
msg = f"INPUT EVENT MANAGER: Could not obtain active X11 window PID: {error}"
debug.print_message(debug.LEVEL_INFO, msg, True)
return -1
@staticmethod
def _identifier_is_xterm(value: Any) -> bool:
"""Returns True if value identifies XTerm."""
if not isinstance(value, str):
return False
identifier = os.path.basename(value.strip().lower())
return identifier == "xterm"
def _active_x11_window_is_xterm(self) -> bool:
"""Returns True when the active X11 window appears to be XTerm."""
window = self._get_active_x11_window()
if window is None:
return False
for attrName in ("get_class_group_name", "get_class_instance_name", "get_name"):
attr = getattr(window, attrName, None)
if not callable(attr):
continue
try:
if self._identifier_is_xterm(attr()):
return True
except Exception:
continue
getClassGroup = getattr(window, "get_class_group", None)
if callable(getClassGroup):
try:
classGroup = getClassGroup()
except Exception:
classGroup = None
for attrName in ("get_name", "get_res_class"):
attr = getattr(classGroup, attrName, None)
if not callable(attr):
continue
try:
if self._identifier_is_xterm(attr()):
return True
except Exception:
continue
pid = self._get_active_x11_window_pid()
if pid < 1:
return False
try:
with open(f"/proc/{pid}/cmdline", "rb") as cmdlineFile:
executable = cmdlineFile.read().split(b"\0", 1)[0].decode(errors="ignore")
except OSError:
return False
return self._identifier_is_xterm(executable)
def _find_active_x11_atspi_window(self) -> Optional[Atspi.Accessible]:
"""Returns the focused AT-SPI window for the active X11 PID, if possible."""
@@ -498,8 +567,8 @@ class InputEventManager:
return getattr(script, "app", None)
@staticmethod
def _lacks_atspi_context(
self,
window: Optional[Atspi.Accessible],
focus: Optional[Atspi.Accessible],
pendingFocus: Optional[Atspi.Accessible],
@@ -509,23 +578,57 @@ class InputEventManager:
if window is not None or focus is not None or pendingFocus is not None:
return False
return self._get_active_script_app() is None
return True
@staticmethod
def _suspend_active_script_key_grabs(reason: str) -> None:
"""Removes active-script key grabs while Cthulhu has no input context."""
def _should_pass_through_for_active_xterm(
self,
window: Optional[Atspi.Accessible],
focus: Optional[Atspi.Accessible],
pendingFocus: Optional[Atspi.Accessible],
) -> bool:
"""Returns True when XTerm is active and Cthulhu lacks matching AT-SPI context."""
if pendingFocus is not None:
return False
return self._active_x11_window_is_xterm()
def _suspend_key_grabs_for_xterm(self) -> None:
"""Suspends active-script key grabs so XTerm/Fenrir can receive them."""
script = script_manager.get_manager().get_active_script()
if script is None:
if script is None or script is self._scriptWithSuspendedGrabsForXterm:
return
if self._scriptWithSuspendedGrabsForXterm is not None:
self._restore_key_grabs_after_xterm()
removeGrabs = getattr(script, "removeKeyGrabs", None)
if not callable(removeGrabs):
return
msg = f"INPUT EVENT MANAGER: Removing active script key grabs. Reason: {reason}"
msg = "INPUT EVENT MANAGER: Removing active script key grabs while XTerm is focused."
debug.print_message(debug.LEVEL_INFO, msg, True)
removeGrabs()
self._scriptWithSuspendedGrabsForXterm = script
def _restore_key_grabs_after_xterm(self) -> None:
"""Restores key grabs after leaving XTerm."""
script = self._scriptWithSuspendedGrabsForXterm
self._scriptWithSuspendedGrabsForXterm = None
if script is None:
return
if script is not script_manager.get_manager().get_active_script():
return
addGrabs = getattr(script, "addKeyGrabs", None)
if not callable(addGrabs):
return
msg = "INPUT EVENT MANAGER: Restoring active script key grabs after leaving XTerm."
debug.print_message(debug.LEVEL_INFO, msg, True)
addGrabs()
# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
@@ -560,20 +663,15 @@ class InputEventManager:
if pendingFocus is not None:
tokens = ["INPUT EVENT MANAGER: Using pending self-hosted focus for keyboard event:", pendingFocus]
debug.print_tokens(debug.LEVEL_INFO, tokens, True)
if not pressed and self._lacks_atspi_context(
manager.get_active_window(),
manager.get_locus_of_focus(),
pendingFocus,
):
msg = (
"INPUT EVENT MANAGER: Passing through keyboard release; "
"no AT-SPI focus context is available."
)
window = manager.get_active_window()
focus = manager.get_locus_of_focus()
if self._should_pass_through_for_active_xterm(window, focus, pendingFocus):
msg = "INPUT EVENT MANAGER: Passing through keyboard event; active XTerm is focused."
debug.print_message(debug.LEVEL_INFO, msg, True)
self._suspend_active_script_key_grabs("no AT-SPI focus context")
self._suspend_key_grabs_for_xterm()
return False
self._restore_key_grabs_after_xterm()
if pressed:
window = manager.get_active_window()
if not AXUtilities.can_be_active_window(window, clear_cache=True):
new_window = AXUtilities.find_active_window()
if new_window is None:
@@ -598,28 +696,13 @@ class InputEventManager:
# state. Failing to revalidate the window on a key press is inconclusive;
# do not wipe out the last known window and focus state.
focus = pendingFocus or manager.get_locus_of_focus()
if self._lacks_atspi_context(window, focus, pendingFocus):
msg = (
"INPUT EVENT MANAGER: Passing through keyboard event; "
"no AT-SPI focus context is available."
)
debug.print_message(debug.LEVEL_INFO, msg, True)
self._suspend_active_script_key_grabs("no AT-SPI focus context")
return False
staleContext = window or self._get_active_script_app()
if self._active_x11_window_differs_from(staleContext):
msg = (
"INPUT EVENT MANAGER: Clearing stale AT-SPI focus context; "
"X11 focus moved to an untracked window."
"INPUT EVENT MANAGER: X11 focus moved to an untracked window; "
"preserving Cthulhu key handling."
)
debug.print_message(debug.LEVEL_INFO, msg, True)
script_manager.get_manager().set_active_script(
None,
"active X11 window not found in AT-SPI",
)
manager.clear_state("active X11 window not found in AT-SPI")
return False
tokens = [
"WARNING:",