Add guarded AT-SPI pointer mouse review backend

This commit is contained in:
Storm Dragon
2026-04-07 17:15:27 -04:00
parent d7d26c57f4
commit ceb03668b9
2 changed files with 293 additions and 79 deletions

View File

@@ -32,30 +32,31 @@ __copyright__ = "Copyright (c) 2008 Eitan Isaacson" \
"Copyright (c) 2016 Igalia, S.L."
__license__ = "LGPL"
from collections import deque
import gi
gi.require_version("Atspi", "2.0")
from gi.repository import Atspi
from gi.repository import Atspi, GLib
import math
import time
from gi.repository import Gdk
try:
gi.require_version("Wnck", "3.0")
from gi.repository import Wnck
_mouseReviewCapable = True
except Exception:
_mouseReviewCapable = False
from .wnck_support import load_wnck
Wnck = load_wnck()
from . import cmdnames
from . import debug
from . import keybindings
from . import input_event
from . import input_event_manager
from . import messages
from . import cthulhu
from . import cthulhu_state
from . import script_manager
from . import settings_manager
from .ax_component import AXComponent
from .ax_object import AXObject
from .ax_text import AXText
from .ax_utilities import AXUtilities
@@ -82,8 +83,9 @@ class _StringContext:
self._start = start
self._end = end
self._boundingBox = 0, 0, 0, 0
if script:
self._boundingBox = script.utilities.getTextBoundingBox(obj, start, end)
if AXObject.supports_text(obj):
rect = AXText.get_range_rect(obj, start, end)
self._boundingBox = rect.x, rect.y, rect.width, rect.height
def __eq__(self, other):
return other is not None \
@@ -183,8 +185,9 @@ class _ItemContext:
self._string = self._getStringContext()
self._time = time.time()
self._boundingBox = 0, 0, 0, 0
if script:
self._boundingBox = script.utilities.getBoundingBox(obj)
if AXObject.supports_component(obj):
rect = AXComponent.get_rect(obj)
self._boundingBox = rect.x, rect.y, rect.width, rect.height
def __eq__(self, other):
return other is not None \
@@ -237,7 +240,12 @@ class _ItemContext:
return _StringContext(self._obj, self._script)
string, start, end = self._script.utilities.textAtPoint(
self._obj, self._x, self._y, boundary=self._boundary)
self._obj,
self._x,
self._y,
coordType=Atspi.CoordType.WINDOW,
boundary=self._boundary,
)
if string:
string = self._script.utilities.expandEOCs(self._obj, start, end)
@@ -351,31 +359,65 @@ class MouseReviewer:
self._handlerIds = {}
self._eventListener = Atspi.EventListener.new(self._listener)
self.inMouseEvent = False
self._eventQueue = deque()
self._mouseReviewCapable = False
self._useAtspi = False
self._handlers = self._setup_handlers()
self._bindings = self._setup_bindings()
if not _mouseReviewCapable:
msg = "MOUSE REVIEW ERROR: Wnck is not available"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
atspiVersion = Atspi.get_version()
capabilityEnum = getattr(Atspi, "DeviceCapability", None)
pointerMonitor = getattr(capabilityEnum, "POINTER_MONITOR", 0) if capabilityEnum else 0
atspiSupported = pointerMonitor and (
atspiVersion[0] > 2
or atspiVersion[1] >= 60
or (atspiVersion[0] == 2 and atspiVersion[1] == 59 and atspiVersion[2] >= 90)
)
display = Gdk.Display.get_default()
try:
seat = Gdk.Display.get_default_seat(display)
self._pointer = seat.get_pointer()
except AttributeError:
msg = "MOUSE REVIEW ERROR: Gtk+ 3.20 is not available"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
if atspiSupported:
manager = input_event_manager.get_manager()
manager.activate_device()
if manager.enable_pointer_monitoring():
self._useAtspi = True
self._mouseReviewCapable = True
else:
self._mouseReviewCapable = Wnck is not None
else:
self._mouseReviewCapable = Wnck is not None
except Exception:
msg = "MOUSE REVIEW ERROR: Exception getting pointer for default seat."
self._mouseReviewCapable = False
if not self._mouseReviewCapable:
msg = (
"MOUSE REVIEW ERROR: Not supported by AT-SPI device"
if atspiSupported
else "MOUSE REVIEW ERROR: Wnck or at-spi2-core >= 2.60 required"
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
if not self._pointer:
msg = "MOUSE REVIEW ERROR: No pointer for default seat."
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
if not self._useAtspi:
display = Gdk.Display.get_default()
try:
seat = Gdk.Display.get_default_seat(display)
self._pointer = seat.get_pointer()
except AttributeError:
msg = "MOUSE REVIEW ERROR: Gtk+ 3.20 is not available"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._mouseReviewCapable = False
return
except Exception:
msg = "MOUSE REVIEW ERROR: Exception getting pointer for default seat."
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._mouseReviewCapable = False
return
if not self._pointer:
msg = "MOUSE REVIEW ERROR: No pointer for default seat."
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._mouseReviewCapable = False
return
if not self._active:
return
@@ -421,9 +463,14 @@ class MouseReviewer:
def activate(self):
"""Activates mouse review."""
if not _mouseReviewCapable:
msg = "MOUSE REVIEW ERROR: Wnck is not available"
if not self._mouseReviewCapable:
msg = (
"MOUSE REVIEW ERROR: Not supported by AT-SPI device"
if self._useAtspi
else "MOUSE REVIEW ERROR: Wnck or at-spi2-core >= 2.60 required"
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._active = False
return
# Set up the initial object as the one with the focus to avoid
@@ -437,6 +484,11 @@ class MouseReviewer:
frame = script.utilities.topLevelObject(obj)
self._currentMouseOver = _ItemContext(obj=obj, frame=frame, script=script)
if self._useAtspi:
input_event_manager.get_manager().start_pointer_watcher(self._on_pointer_moved)
self._active = True
return
self._eventListener.register("mouse:abs")
screen = Wnck.Screen.get_default()
if screen:
@@ -462,20 +514,29 @@ class MouseReviewer:
def deactivate(self):
"""Deactivates mouse review."""
self._eventListener.deregister("mouse:abs")
for key, value in self._handlerIds.items():
value.disconnect(key)
self._handlerIds = {}
self._workspace = None
self._windows = []
self._all_windows = []
if self._useAtspi:
input_event_manager.get_manager().stop_pointer_watcher()
else:
try:
self._eventListener.deregister("mouse:abs")
except GLib.GError as error:
msg = f"MOUSE REVIEW: Exception deregistering 'mouse:abs' listener: {error}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
for key, value in self._handlerIds.items():
value.disconnect(key)
self._handlerIds = {}
self._workspace = None
self._windows = []
self._all_windows = []
self._eventQueue.clear()
self._active = False
def getCurrentItem(self):
"""Returns the accessible object being reviewed."""
if not _mouseReviewCapable:
if not self._mouseReviewCapable:
return None
if not self._active:
@@ -493,7 +554,7 @@ class MouseReviewer:
def toggle(self, script=None, event=None):
"""Toggle mouse reviewing on or off."""
if not _mouseReviewCapable:
if not self._mouseReviewCapable:
return
self._active = not self._active
@@ -570,52 +631,65 @@ class MouseReviewer:
return [extents.x, extents.y, extents.width, extents.height] == list(bounds)
def _accessible_window_at_point(self, pX, pY):
"""Returns the accessible window at the specified coordinates."""
def _accessible_window_at_point_deprecated(self, pX, pY):
"""Returns the accessible window and local coordinates for screen coordinates."""
window = None
for w in self._windows:
if w.is_minimized():
continue
x, y, width, height = w.get_geometry()
x, y, width, height = w.get_client_window_geometry()
if x <= pX <= x + width and y <= pY <= y + height:
window = w
break
if not window:
return None
return None, -1, -1
windowApp = window.get_application()
if not windowApp:
return None
app = AXUtilities.get_application_with_pid(windowApp.get_pid())
app = AXUtilities.get_application_with_pid(windowApp.get_pid()) if windowApp else None
if not app:
return None
return None, -1, -1
windowX = pX - x
windowY = pY - y
candidates = [o for o in AXObject.iter_children(
app, lambda x: self._contains_point(x, pX, pY))]
app, lambda obj: self._contains_point(obj, windowX, windowY, Atspi.CoordType.WINDOW))]
if len(candidates) == 1:
return candidates[0]
return candidates[0], windowX, windowY
name = window.get_name()
matches = [o for o in candidates if AXObject.get_name(o) == name]
if len(matches) == 1:
return matches[0]
return matches[0], windowX, windowY
bbox = window.get_client_window_geometry()
matches = [o for o in candidates if self._has_bounds(o, bbox)]
matches = [o for o in candidates if AXUtilities.is_active(o)]
if len(matches) == 1:
return matches[0]
return matches[0], windowX, windowY
return None
return None, -1, -1
def _on_mouse_moved(self, event):
"""Callback for mouse:abs events."""
def _accessible_window_at_point(self, app, pX, pY):
"""Returns the accessible window and local coordinates for pointer-moved events."""
screen, pX, pY = self._pointer.get_position()
window = self._accessible_window_at_point(pX, pY)
def getTuple(obj, x, y):
rect = AXComponent.get_rect(obj)
return obj, x - rect.x, y - rect.y
candidates = [o for o in AXObject.iter_children(
app, lambda obj: self._contains_point(obj, pX, pY, Atspi.CoordType.WINDOW))]
if len(candidates) == 1:
return getTuple(candidates[0], pX, pY)
matches = [o for o in candidates if AXUtilities.is_active(o)]
if len(matches) == 1:
return getTuple(matches[0], pX, pY)
return None, -1, -1
def _mouse_moved_common(self, window, pX, pY):
tokens = [f"MOUSE REVIEW: Window at ({pX}, {pY}) is", window]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
if not window:
@@ -632,16 +706,16 @@ class MouseReviewer:
else:
menu = AXObject.find_ancestor(cthulhu_state.locusOfFocus, AXUtilities.is_menu)
screen, nowX, nowY = self._pointer.get_position()
if (pX, pY) != (nowX, nowY):
msg = f"MOUSE REVIEW: Pointer moved again: ({nowX}, {nowY})"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
obj = None
if menu:
obj = script.utilities.descendantAtPoint(menu, pX, pY, Atspi.CoordType.WINDOW)
tokens = ["MOUSE REVIEW: Object in", menu, f"at ({pX}, {pY}) is", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
obj = script.utilities.descendantAtPoint(menu, pX, pY) \
or script.utilities.descendantAtPoint(window, pX, pY)
tokens = [f"MOUSE REVIEW: Object at ({pX}, {pY}) is", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
if obj is None:
obj = script.utilities.descendantAtPoint(window, pX, pY, Atspi.CoordType.WINDOW)
tokens = ["MOUSE REVIEW: Object in", window, f"at ({pX}, {pY}) is", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
script = self.app.getScriptManager().get_script(AXObject.get_application(window), obj)
if menu and obj and not AXObject.find_ancestor(obj, AXUtilities.is_menu):
@@ -658,9 +732,8 @@ class MouseReviewer:
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return
screen, nowX, nowY = self._pointer.get_position()
if (pX, pY) != (nowX, nowY):
msg = f"MOUSE REVIEW: Pointer moved again: ({nowX}, {nowY})"
if len(self._eventQueue):
msg = "MOUSE REVIEW: Mouse moved again."
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
@@ -679,22 +752,76 @@ class MouseReviewer:
if new.present(self._currentMouseOver):
self._currentMouseOver = new
def _listener(self, event):
"""Generic listener, mainly to output debugging info."""
def _on_mouse_moved_deprecated(self, event):
"""Callback for mouse:abs events."""
pX, pY = event.detail1, event.detail2
window, windowX, windowY = self._accessible_window_at_point_deprecated(pX, pY)
self._mouse_moved_common(window, windowX, windowY)
def _on_mouse_moved(self, obj, pX, pY):
"""Callback for pointer-moved events."""
if AXObject.get_role(obj) == Atspi.Role.APPLICATION:
window, windowX, windowY = self._accessible_window_at_point(obj, pX, pY)
self._mouse_moved_common(window, windowX, windowY)
return
self._mouse_moved_common(obj, pX, pY)
def _process_event_deprecated(self):
if not self._eventQueue:
return
event = self._eventQueue.popleft()
if len(self._eventQueue):
return
startTime = time.time()
tokens = ["\nvvvvv PROCESS OBJECT EVENT", event.type, "vvvvv"]
debug.printTokens(debug.LEVEL_INFO, tokens, False)
if event.type.startswith("mouse:abs"):
self.inMouseEvent = True
self._on_mouse_moved(event)
self.inMouseEvent = False
self.inMouseEvent = True
self._on_mouse_moved_deprecated(event)
self.inMouseEvent = False
msg = f"TOTAL PROCESSING TIME: {time.time() - startTime:.4f}\n"
msg += f"^^^^^ PROCESS OBJECT EVENT {event.type} ^^^^^\n"
debug.printMessage(debug.LEVEL_INFO, msg, False)
def _process_pointer_event(self):
if not self._eventQueue:
return
obj, x, y = self._eventQueue.popleft()
if len(self._eventQueue):
return
startTime = time.time()
tokens = ["\nvvvvv PROCESS POINTER-MOVED EVENT", "vvvvv"]
debug.printTokens(debug.LEVEL_INFO, tokens, False)
self.inMouseEvent = True
self._on_mouse_moved(obj, x, y)
self.inMouseEvent = False
msg = f"TOTAL PROCESSING TIME: {time.time() - startTime:.4f}\n"
msg += "^^^^^ PROCESS POINTER-MOVED EVENT ^^^^^\n"
debug.printMessage(debug.LEVEL_INFO, msg, False)
def _listener(self, event):
"""Generic listener, mainly to output debugging info."""
if event.type.startswith("mouse:abs"):
self._eventQueue.append(event)
GLib.timeout_add(50, self._process_event_deprecated)
def _on_pointer_moved(self, _device, obj, x, y):
"""Listener for pointer-moved events from devices."""
self._eventQueue.append([obj, x, y])
GLib.timeout_add(50, self._process_pointer_event)
_reviewer = None
def getReviewer():
"""Returns the Mouse Reviewer"""
@@ -704,5 +831,3 @@ def getReviewer():
from . import cthulhu
_reviewer = MouseReviewer(cthulhu.cthulhuApp)
return _reviewer

View File

@@ -7,6 +7,7 @@ from unittest import mock
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from cthulhu import input_event_manager
from cthulhu import mouse_review
class FakeDevice:
@@ -131,5 +132,93 @@ class InputEventManagerPointerMonitorTests(unittest.TestCase):
self.assertEqual(device.disconnect_calls, [17])
class MouseReviewBackendSelectionTests(unittest.TestCase):
@staticmethod
def _make_app(enabled=False):
app = mock.Mock()
settingsManager = mock.Mock()
settingsManager.getSetting.return_value = enabled
app.getSettingsManager.return_value = settingsManager
app.getScriptManager.return_value = mock.Mock()
return app
def test_prefers_atspi_backend_when_version_and_capability_are_available(self):
listener = mock.Mock()
deviceManager = mock.Mock()
deviceManager.enable_pointer_monitoring.return_value = True
with (
mock.patch.object(mouse_review.Atspi.EventListener, "new", return_value=listener),
mock.patch.object(mouse_review.Atspi, "get_version", return_value=(2, 60, 0)),
mock.patch.object(
mouse_review.Atspi,
"DeviceCapability",
new=types.SimpleNamespace(POINTER_MONITOR=8),
create=True,
),
mock.patch.object(mouse_review.input_event_manager, "get_manager", return_value=deviceManager),
mock.patch.object(mouse_review, "Wnck", None),
):
reviewer = mouse_review.MouseReviewer(self._make_app())
self.assertTrue(reviewer._useAtspi)
self.assertTrue(reviewer._mouseReviewCapable)
deviceManager.activate_device.assert_called_once_with()
deviceManager.enable_pointer_monitoring.assert_called_once_with()
def test_activate_uses_pointer_watcher_for_atspi_backend(self):
listener = mock.Mock()
deviceManager = mock.Mock()
deviceManager.enable_pointer_monitoring.return_value = True
with (
mock.patch.object(mouse_review.Atspi.EventListener, "new", return_value=listener),
mock.patch.object(mouse_review.Atspi, "get_version", return_value=(2, 60, 0)),
mock.patch.object(
mouse_review.Atspi,
"DeviceCapability",
new=types.SimpleNamespace(POINTER_MONITOR=8),
create=True,
),
mock.patch.object(mouse_review.input_event_manager, "get_manager", return_value=deviceManager),
mock.patch.object(mouse_review.cthulhu_state, "locusOfFocus", None),
mock.patch.object(mouse_review, "Wnck", None),
):
reviewer = mouse_review.MouseReviewer(self._make_app(enabled=False))
reviewer.activate()
reviewer.deactivate()
deviceManager.start_pointer_watcher.assert_called_once_with(reviewer._on_pointer_moved)
deviceManager.stop_pointer_watcher.assert_called_once_with()
listener.register.assert_not_called()
def test_keeps_legacy_backend_when_atspi_pointer_monitor_is_unavailable(self):
listener = mock.Mock()
pointer = mock.Mock()
seat = mock.Mock()
seat.get_pointer.return_value = pointer
screen = mock.Mock()
screen.get_windows_stacked.return_value = []
screen.get_active_workspace.return_value = None
fakeWnck = mock.Mock()
fakeWnck.Screen.get_default.return_value = screen
with (
mock.patch.object(mouse_review.Atspi.EventListener, "new", return_value=listener),
mock.patch.object(mouse_review.Atspi, "get_version", return_value=(2, 58, 4)),
mock.patch.object(mouse_review.input_event_manager, "get_manager"),
mock.patch.object(mouse_review.cthulhu_state, "locusOfFocus", None),
mock.patch.object(mouse_review.Gdk.Display, "get_default", return_value=mock.Mock()),
mock.patch.object(mouse_review.Gdk.Display, "get_default_seat", return_value=seat),
mock.patch.object(mouse_review, "Wnck", fakeWnck),
):
reviewer = mouse_review.MouseReviewer(self._make_app(enabled=False))
reviewer.activate()
self.assertFalse(reviewer._useAtspi)
self.assertTrue(reviewer._mouseReviewCapable)
listener.register.assert_called_once_with("mouse:abs")
if __name__ == "__main__":
unittest.main()