From ceb03668b927f2eaa416350ae52d5b6b1d107d3b Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Tue, 7 Apr 2026 17:15:27 -0400 Subject: [PATCH] Add guarded AT-SPI pointer mouse review backend --- src/cthulhu/mouse_review.py | 283 +++++++++++++----- ...ouse_review_pointer_monitor_regressions.py | 89 ++++++ 2 files changed, 293 insertions(+), 79 deletions(-) diff --git a/src/cthulhu/mouse_review.py b/src/cthulhu/mouse_review.py index a6ed955..569c8fe 100644 --- a/src/cthulhu/mouse_review.py +++ b/src/cthulhu/mouse_review.py @@ -32,30 +32,31 @@ __copyright__ = "Copyright (c) 2008 Eitan Isaacson" \ "Copyright (c) 2016 Igalia, S.L." __license__ = "LGPL" +from collections import deque + import gi gi.require_version("Atspi", "2.0") -from gi.repository import Atspi +from gi.repository import Atspi, GLib import math import time from gi.repository import Gdk -try: - gi.require_version("Wnck", "3.0") - from gi.repository import Wnck - _mouseReviewCapable = True -except Exception: - _mouseReviewCapable = False +from .wnck_support import load_wnck + +Wnck = load_wnck() from . import cmdnames from . import debug from . import keybindings from . import input_event +from . import input_event_manager from . import messages from . import cthulhu from . import cthulhu_state from . import script_manager from . import settings_manager +from .ax_component import AXComponent from .ax_object import AXObject from .ax_text import AXText from .ax_utilities import AXUtilities @@ -82,8 +83,9 @@ class _StringContext: self._start = start self._end = end self._boundingBox = 0, 0, 0, 0 - if script: - self._boundingBox = script.utilities.getTextBoundingBox(obj, start, end) + if AXObject.supports_text(obj): + rect = AXText.get_range_rect(obj, start, end) + self._boundingBox = rect.x, rect.y, rect.width, rect.height def __eq__(self, other): return other is not None \ @@ -183,8 +185,9 @@ class _ItemContext: self._string = self._getStringContext() self._time = time.time() self._boundingBox = 0, 0, 0, 0 - if script: - self._boundingBox = script.utilities.getBoundingBox(obj) + if AXObject.supports_component(obj): + rect = AXComponent.get_rect(obj) + self._boundingBox = rect.x, rect.y, rect.width, rect.height def __eq__(self, other): return other is not None \ @@ -237,7 +240,12 @@ class _ItemContext: return _StringContext(self._obj, self._script) string, start, end = self._script.utilities.textAtPoint( - self._obj, self._x, self._y, boundary=self._boundary) + self._obj, + self._x, + self._y, + coordType=Atspi.CoordType.WINDOW, + boundary=self._boundary, + ) if string: string = self._script.utilities.expandEOCs(self._obj, start, end) @@ -351,31 +359,65 @@ class MouseReviewer: self._handlerIds = {} self._eventListener = Atspi.EventListener.new(self._listener) self.inMouseEvent = False + self._eventQueue = deque() + self._mouseReviewCapable = False + self._useAtspi = False self._handlers = self._setup_handlers() self._bindings = self._setup_bindings() - if not _mouseReviewCapable: - msg = "MOUSE REVIEW ERROR: Wnck is not available" - debug.printMessage(debug.LEVEL_INFO, msg, True) - return + atspiVersion = Atspi.get_version() + capabilityEnum = getattr(Atspi, "DeviceCapability", None) + pointerMonitor = getattr(capabilityEnum, "POINTER_MONITOR", 0) if capabilityEnum else 0 + atspiSupported = pointerMonitor and ( + atspiVersion[0] > 2 + or atspiVersion[1] >= 60 + or (atspiVersion[0] == 2 and atspiVersion[1] == 59 and atspiVersion[2] >= 90) + ) - display = Gdk.Display.get_default() try: - seat = Gdk.Display.get_default_seat(display) - self._pointer = seat.get_pointer() - except AttributeError: - msg = "MOUSE REVIEW ERROR: Gtk+ 3.20 is not available" - debug.printMessage(debug.LEVEL_INFO, msg, True) - return + if atspiSupported: + manager = input_event_manager.get_manager() + manager.activate_device() + if manager.enable_pointer_monitoring(): + self._useAtspi = True + self._mouseReviewCapable = True + else: + self._mouseReviewCapable = Wnck is not None + else: + self._mouseReviewCapable = Wnck is not None except Exception: - msg = "MOUSE REVIEW ERROR: Exception getting pointer for default seat." + self._mouseReviewCapable = False + + if not self._mouseReviewCapable: + msg = ( + "MOUSE REVIEW ERROR: Not supported by AT-SPI device" + if atspiSupported + else "MOUSE REVIEW ERROR: Wnck or at-spi2-core >= 2.60 required" + ) debug.printMessage(debug.LEVEL_INFO, msg, True) return - if not self._pointer: - msg = "MOUSE REVIEW ERROR: No pointer for default seat." - debug.printMessage(debug.LEVEL_INFO, msg, True) - return + if not self._useAtspi: + display = Gdk.Display.get_default() + try: + seat = Gdk.Display.get_default_seat(display) + self._pointer = seat.get_pointer() + except AttributeError: + msg = "MOUSE REVIEW ERROR: Gtk+ 3.20 is not available" + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._mouseReviewCapable = False + return + except Exception: + msg = "MOUSE REVIEW ERROR: Exception getting pointer for default seat." + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._mouseReviewCapable = False + return + + if not self._pointer: + msg = "MOUSE REVIEW ERROR: No pointer for default seat." + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._mouseReviewCapable = False + return if not self._active: return @@ -421,9 +463,14 @@ class MouseReviewer: def activate(self): """Activates mouse review.""" - if not _mouseReviewCapable: - msg = "MOUSE REVIEW ERROR: Wnck is not available" + if not self._mouseReviewCapable: + msg = ( + "MOUSE REVIEW ERROR: Not supported by AT-SPI device" + if self._useAtspi + else "MOUSE REVIEW ERROR: Wnck or at-spi2-core >= 2.60 required" + ) debug.printMessage(debug.LEVEL_INFO, msg, True) + self._active = False return # Set up the initial object as the one with the focus to avoid @@ -437,6 +484,11 @@ class MouseReviewer: frame = script.utilities.topLevelObject(obj) self._currentMouseOver = _ItemContext(obj=obj, frame=frame, script=script) + if self._useAtspi: + input_event_manager.get_manager().start_pointer_watcher(self._on_pointer_moved) + self._active = True + return + self._eventListener.register("mouse:abs") screen = Wnck.Screen.get_default() if screen: @@ -462,20 +514,29 @@ class MouseReviewer: def deactivate(self): """Deactivates mouse review.""" - self._eventListener.deregister("mouse:abs") - for key, value in self._handlerIds.items(): - value.disconnect(key) - self._handlerIds = {} - self._workspace = None - self._windows = [] - self._all_windows = [] + if self._useAtspi: + input_event_manager.get_manager().stop_pointer_watcher() + else: + try: + self._eventListener.deregister("mouse:abs") + except GLib.GError as error: + msg = f"MOUSE REVIEW: Exception deregistering 'mouse:abs' listener: {error}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + for key, value in self._handlerIds.items(): + value.disconnect(key) + self._handlerIds = {} + self._workspace = None + self._windows = [] + self._all_windows = [] + + self._eventQueue.clear() self._active = False def getCurrentItem(self): """Returns the accessible object being reviewed.""" - if not _mouseReviewCapable: + if not self._mouseReviewCapable: return None if not self._active: @@ -493,7 +554,7 @@ class MouseReviewer: def toggle(self, script=None, event=None): """Toggle mouse reviewing on or off.""" - if not _mouseReviewCapable: + if not self._mouseReviewCapable: return self._active = not self._active @@ -570,52 +631,65 @@ class MouseReviewer: return [extents.x, extents.y, extents.width, extents.height] == list(bounds) - def _accessible_window_at_point(self, pX, pY): - """Returns the accessible window at the specified coordinates.""" + def _accessible_window_at_point_deprecated(self, pX, pY): + """Returns the accessible window and local coordinates for screen coordinates.""" window = None for w in self._windows: if w.is_minimized(): continue - x, y, width, height = w.get_geometry() + x, y, width, height = w.get_client_window_geometry() if x <= pX <= x + width and y <= pY <= y + height: window = w break if not window: - return None + return None, -1, -1 windowApp = window.get_application() - if not windowApp: - return None - - app = AXUtilities.get_application_with_pid(windowApp.get_pid()) + app = AXUtilities.get_application_with_pid(windowApp.get_pid()) if windowApp else None if not app: - return None + return None, -1, -1 + + windowX = pX - x + windowY = pY - y candidates = [o for o in AXObject.iter_children( - app, lambda x: self._contains_point(x, pX, pY))] + app, lambda obj: self._contains_point(obj, windowX, windowY, Atspi.CoordType.WINDOW))] if len(candidates) == 1: - return candidates[0] + return candidates[0], windowX, windowY name = window.get_name() matches = [o for o in candidates if AXObject.get_name(o) == name] if len(matches) == 1: - return matches[0] + return matches[0], windowX, windowY - bbox = window.get_client_window_geometry() - matches = [o for o in candidates if self._has_bounds(o, bbox)] + matches = [o for o in candidates if AXUtilities.is_active(o)] if len(matches) == 1: - return matches[0] + return matches[0], windowX, windowY - return None + return None, -1, -1 - def _on_mouse_moved(self, event): - """Callback for mouse:abs events.""" + def _accessible_window_at_point(self, app, pX, pY): + """Returns the accessible window and local coordinates for pointer-moved events.""" - screen, pX, pY = self._pointer.get_position() - window = self._accessible_window_at_point(pX, pY) + def getTuple(obj, x, y): + rect = AXComponent.get_rect(obj) + return obj, x - rect.x, y - rect.y + + candidates = [o for o in AXObject.iter_children( + app, lambda obj: self._contains_point(obj, pX, pY, Atspi.CoordType.WINDOW))] + if len(candidates) == 1: + return getTuple(candidates[0], pX, pY) + + matches = [o for o in candidates if AXUtilities.is_active(o)] + if len(matches) == 1: + return getTuple(matches[0], pX, pY) + + return None, -1, -1 + + def _mouse_moved_common(self, window, pX, pY): tokens = [f"MOUSE REVIEW: Window at ({pX}, {pY}) is", window] debug.printTokens(debug.LEVEL_INFO, tokens, True) if not window: @@ -632,16 +706,16 @@ class MouseReviewer: else: menu = AXObject.find_ancestor(cthulhu_state.locusOfFocus, AXUtilities.is_menu) - screen, nowX, nowY = self._pointer.get_position() - if (pX, pY) != (nowX, nowY): - msg = f"MOUSE REVIEW: Pointer moved again: ({nowX}, {nowY})" - debug.printMessage(debug.LEVEL_INFO, msg, True) - return + obj = None + if menu: + obj = script.utilities.descendantAtPoint(menu, pX, pY, Atspi.CoordType.WINDOW) + tokens = ["MOUSE REVIEW: Object in", menu, f"at ({pX}, {pY}) is", obj] + debug.printTokens(debug.LEVEL_INFO, tokens, True) - obj = script.utilities.descendantAtPoint(menu, pX, pY) \ - or script.utilities.descendantAtPoint(window, pX, pY) - tokens = [f"MOUSE REVIEW: Object at ({pX}, {pY}) is", obj] - debug.printTokens(debug.LEVEL_INFO, tokens, True) + if obj is None: + obj = script.utilities.descendantAtPoint(window, pX, pY, Atspi.CoordType.WINDOW) + tokens = ["MOUSE REVIEW: Object in", window, f"at ({pX}, {pY}) is", obj] + debug.printTokens(debug.LEVEL_INFO, tokens, True) script = self.app.getScriptManager().get_script(AXObject.get_application(window), obj) if menu and obj and not AXObject.find_ancestor(obj, AXUtilities.is_menu): @@ -658,9 +732,8 @@ class MouseReviewer: debug.printTokens(debug.LEVEL_INFO, tokens, True) return - screen, nowX, nowY = self._pointer.get_position() - if (pX, pY) != (nowX, nowY): - msg = f"MOUSE REVIEW: Pointer moved again: ({nowX}, {nowY})" + if len(self._eventQueue): + msg = "MOUSE REVIEW: Mouse moved again." debug.printMessage(debug.LEVEL_INFO, msg, True) return @@ -679,22 +752,76 @@ class MouseReviewer: if new.present(self._currentMouseOver): self._currentMouseOver = new - def _listener(self, event): - """Generic listener, mainly to output debugging info.""" + def _on_mouse_moved_deprecated(self, event): + """Callback for mouse:abs events.""" + + pX, pY = event.detail1, event.detail2 + window, windowX, windowY = self._accessible_window_at_point_deprecated(pX, pY) + self._mouse_moved_common(window, windowX, windowY) + + def _on_mouse_moved(self, obj, pX, pY): + """Callback for pointer-moved events.""" + + if AXObject.get_role(obj) == Atspi.Role.APPLICATION: + window, windowX, windowY = self._accessible_window_at_point(obj, pX, pY) + self._mouse_moved_common(window, windowX, windowY) + return + + self._mouse_moved_common(obj, pX, pY) + + def _process_event_deprecated(self): + if not self._eventQueue: + return + + event = self._eventQueue.popleft() + if len(self._eventQueue): + return startTime = time.time() tokens = ["\nvvvvv PROCESS OBJECT EVENT", event.type, "vvvvv"] debug.printTokens(debug.LEVEL_INFO, tokens, False) - if event.type.startswith("mouse:abs"): - self.inMouseEvent = True - self._on_mouse_moved(event) - self.inMouseEvent = False + self.inMouseEvent = True + self._on_mouse_moved_deprecated(event) + self.inMouseEvent = False msg = f"TOTAL PROCESSING TIME: {time.time() - startTime:.4f}\n" msg += f"^^^^^ PROCESS OBJECT EVENT {event.type} ^^^^^\n" debug.printMessage(debug.LEVEL_INFO, msg, False) + def _process_pointer_event(self): + if not self._eventQueue: + return + + obj, x, y = self._eventQueue.popleft() + if len(self._eventQueue): + return + + startTime = time.time() + tokens = ["\nvvvvv PROCESS POINTER-MOVED EVENT", "vvvvv"] + debug.printTokens(debug.LEVEL_INFO, tokens, False) + + self.inMouseEvent = True + self._on_mouse_moved(obj, x, y) + self.inMouseEvent = False + + msg = f"TOTAL PROCESSING TIME: {time.time() - startTime:.4f}\n" + msg += "^^^^^ PROCESS POINTER-MOVED EVENT ^^^^^\n" + debug.printMessage(debug.LEVEL_INFO, msg, False) + + def _listener(self, event): + """Generic listener, mainly to output debugging info.""" + + if event.type.startswith("mouse:abs"): + self._eventQueue.append(event) + GLib.timeout_add(50, self._process_event_deprecated) + + def _on_pointer_moved(self, _device, obj, x, y): + """Listener for pointer-moved events from devices.""" + + self._eventQueue.append([obj, x, y]) + GLib.timeout_add(50, self._process_pointer_event) + _reviewer = None def getReviewer(): """Returns the Mouse Reviewer""" @@ -704,5 +831,3 @@ def getReviewer(): from . import cthulhu _reviewer = MouseReviewer(cthulhu.cthulhuApp) return _reviewer - - diff --git a/tests/test_mouse_review_pointer_monitor_regressions.py b/tests/test_mouse_review_pointer_monitor_regressions.py index 5f88ca7..4104fd5 100644 --- a/tests/test_mouse_review_pointer_monitor_regressions.py +++ b/tests/test_mouse_review_pointer_monitor_regressions.py @@ -7,6 +7,7 @@ from unittest import mock sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) from cthulhu import input_event_manager +from cthulhu import mouse_review class FakeDevice: @@ -131,5 +132,93 @@ class InputEventManagerPointerMonitorTests(unittest.TestCase): self.assertEqual(device.disconnect_calls, [17]) +class MouseReviewBackendSelectionTests(unittest.TestCase): + @staticmethod + def _make_app(enabled=False): + app = mock.Mock() + settingsManager = mock.Mock() + settingsManager.getSetting.return_value = enabled + app.getSettingsManager.return_value = settingsManager + app.getScriptManager.return_value = mock.Mock() + return app + + def test_prefers_atspi_backend_when_version_and_capability_are_available(self): + listener = mock.Mock() + deviceManager = mock.Mock() + deviceManager.enable_pointer_monitoring.return_value = True + + with ( + mock.patch.object(mouse_review.Atspi.EventListener, "new", return_value=listener), + mock.patch.object(mouse_review.Atspi, "get_version", return_value=(2, 60, 0)), + mock.patch.object( + mouse_review.Atspi, + "DeviceCapability", + new=types.SimpleNamespace(POINTER_MONITOR=8), + create=True, + ), + mock.patch.object(mouse_review.input_event_manager, "get_manager", return_value=deviceManager), + mock.patch.object(mouse_review, "Wnck", None), + ): + reviewer = mouse_review.MouseReviewer(self._make_app()) + + self.assertTrue(reviewer._useAtspi) + self.assertTrue(reviewer._mouseReviewCapable) + deviceManager.activate_device.assert_called_once_with() + deviceManager.enable_pointer_monitoring.assert_called_once_with() + + def test_activate_uses_pointer_watcher_for_atspi_backend(self): + listener = mock.Mock() + deviceManager = mock.Mock() + deviceManager.enable_pointer_monitoring.return_value = True + + with ( + mock.patch.object(mouse_review.Atspi.EventListener, "new", return_value=listener), + mock.patch.object(mouse_review.Atspi, "get_version", return_value=(2, 60, 0)), + mock.patch.object( + mouse_review.Atspi, + "DeviceCapability", + new=types.SimpleNamespace(POINTER_MONITOR=8), + create=True, + ), + mock.patch.object(mouse_review.input_event_manager, "get_manager", return_value=deviceManager), + mock.patch.object(mouse_review.cthulhu_state, "locusOfFocus", None), + mock.patch.object(mouse_review, "Wnck", None), + ): + reviewer = mouse_review.MouseReviewer(self._make_app(enabled=False)) + reviewer.activate() + reviewer.deactivate() + + deviceManager.start_pointer_watcher.assert_called_once_with(reviewer._on_pointer_moved) + deviceManager.stop_pointer_watcher.assert_called_once_with() + listener.register.assert_not_called() + + def test_keeps_legacy_backend_when_atspi_pointer_monitor_is_unavailable(self): + listener = mock.Mock() + pointer = mock.Mock() + seat = mock.Mock() + seat.get_pointer.return_value = pointer + screen = mock.Mock() + screen.get_windows_stacked.return_value = [] + screen.get_active_workspace.return_value = None + fakeWnck = mock.Mock() + fakeWnck.Screen.get_default.return_value = screen + + with ( + mock.patch.object(mouse_review.Atspi.EventListener, "new", return_value=listener), + mock.patch.object(mouse_review.Atspi, "get_version", return_value=(2, 58, 4)), + mock.patch.object(mouse_review.input_event_manager, "get_manager"), + mock.patch.object(mouse_review.cthulhu_state, "locusOfFocus", None), + mock.patch.object(mouse_review.Gdk.Display, "get_default", return_value=mock.Mock()), + mock.patch.object(mouse_review.Gdk.Display, "get_default_seat", return_value=seat), + mock.patch.object(mouse_review, "Wnck", fakeWnck), + ): + reviewer = mouse_review.MouseReviewer(self._make_app(enabled=False)) + reviewer.activate() + + self.assertFalse(reviewer._useAtspi) + self.assertTrue(reviewer._mouseReviewCapable) + listener.register.assert_called_once_with("mouse:abs") + + if __name__ == "__main__": unittest.main()