From d7d26c57f41c2bdc0f6c01b37321ec57e9126476 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Tue, 7 Apr 2026 17:11:13 -0400 Subject: [PATCH] Add AT-SPI pointer monitor wrappers --- src/cthulhu/input_event_manager.py | 83 ++++++++++- ...ouse_review_pointer_monitor_regressions.py | 135 ++++++++++++++++++ 2 files changed, 213 insertions(+), 5 deletions(-) create mode 100644 tests/test_mouse_review_pointer_monitor_regressions.py diff --git a/src/cthulhu/input_event_manager.py b/src/cthulhu/input_event_manager.py index 8a4711c..a483654 100644 --- a/src/cthulhu/input_event_manager.py +++ b/src/cthulhu/input_event_manager.py @@ -44,6 +44,7 @@ from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict import gi gi.require_version("Atspi", "2.0") from gi.repository import Atspi +from gi.repository import GLib from . import debug from . import focus_manager @@ -64,21 +65,36 @@ class InputEventManager: self._last_input_event: Optional[input_event.InputEvent] = None self._last_non_modifier_key_event: Optional[input_event.KeyboardEvent] = None self._device: Optional[Atspi.Device] = None + self._pointer_moved_id: int = 0 self._mapped_keycodes: List[int] = [] self._mapped_keysyms: List[int] = [] self._grabbed_bindings: Dict[int, keybindings.KeyBinding] = {} self._paused: bool = False + def activate_device(self) -> Atspi.Device: + """Creates and returns the AT-SPI device used by this manager.""" + + if self._device is not None: + return self._device + + if Atspi.get_version() >= (2, 55, 90): + self._device = Atspi.Device.new_full("org.stormux.Cthulhu") + else: + self._device = Atspi.Device.new() + + return self._device + + def get_device(self) -> Optional[Atspi.Device]: + """Returns the active AT-SPI device, if any.""" + + return self._device + def start_key_watcher(self) -> None: """Starts the watcher for keyboard input events.""" msg = "INPUT EVENT MANAGER: Starting key watcher." debug.print_message(debug.LEVEL_INFO, msg, True) - if Atspi.get_version() >= (2, 55, 90): - self._device = Atspi.Device.new_full("org.stormux.Cthulhu") - else: - self._device = Atspi.Device.new() - self._device.add_key_watcher(self.process_keyboard_event) + self.activate_device().add_key_watcher(self.process_keyboard_event) def stop_key_watcher(self) -> None: """Starts the watcher for keyboard input events.""" @@ -87,6 +103,63 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) self._device = None + def enable_pointer_monitoring(self) -> bool: + """Enables pointer monitoring on the current device, if possible.""" + + device = self.get_device() + if device is None: + return False + + deviceCapability = getattr(Atspi, "DeviceCapability", None) + if deviceCapability is None: + return False + + pointerMonitor = getattr(deviceCapability, "POINTER_MONITOR", None) + if pointerMonitor is None: + return False + + setCapabilities = getattr(device, "set_capabilities", None) + if not callable(setCapabilities): + return False + + currentCapabilities = 0 + getCapabilities = getattr(device, "get_capabilities", None) + if callable(getCapabilities): + currentCapabilities = getCapabilities() + + try: + grantedCapabilities = setCapabilities(currentCapabilities | pointerMonitor) + except GLib.GError: + return False + + if isinstance(grantedCapabilities, bool): + return grantedCapabilities + + try: + return bool(int(grantedCapabilities) & int(pointerMonitor)) + except (TypeError, ValueError): + return False + + def start_pointer_watcher(self, callback) -> None: + """Starts the watcher for pointer movement events.""" + + device = self.get_device() + if device is None: + return + + self._pointer_moved_id = device.connect("pointer-moved", callback) + + def stop_pointer_watcher(self) -> None: + """Stops the watcher for pointer movement events.""" + + device = self.get_device() + if device is None or not self._pointer_moved_id: + self._pointer_moved_id = 0 + return + + device.disconnect(self._pointer_moved_id) + self._pointer_moved_id = 0 + def pause_key_watcher(self, pause: bool = True, reason: str = "") -> None: """Pauses processing of keyboard input events.""" diff --git a/tests/test_mouse_review_pointer_monitor_regressions.py b/tests/test_mouse_review_pointer_monitor_regressions.py new file mode 100644 index 0000000..5f88ca7 --- /dev/null +++ b/tests/test_mouse_review_pointer_monitor_regressions.py @@ -0,0 +1,135 @@ +import sys +import types +import unittest +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import input_event_manager + + +class FakeDevice: + def __init__(self): + self.add_key_watcher_calls = [] + self.connect_calls = [] + self.disconnect_calls = [] + self.set_capabilities_calls = [] + self.capabilities = 0 + self.next_handler_id = 17 + + def add_key_watcher(self, callback, user_data=None): + self.add_key_watcher_calls.append((callback, user_data)) + + def connect(self, signalName, callback): + self.connect_calls.append((signalName, callback)) + return self.next_handler_id + + def disconnect(self, handlerId): + self.disconnect_calls.append(handlerId) + + def get_capabilities(self): + return self.capabilities + + def set_capabilities(self, capabilities): + self.set_capabilities_calls.append(capabilities) + self.capabilities = capabilities + return capabilities + + +class FakeDeviceFactory: + def __init__(self, device): + self.device = device + self.new_calls = 0 + self.new_full_calls = 0 + self.app_ids = [] + + def new(self): + self.new_calls += 1 + return self.device + + def new_full(self, appId): + self.new_full_calls += 1 + self.app_ids.append(appId) + return self.device + + +class InputEventManagerPointerMonitorTests(unittest.TestCase): + def setUp(self): + self.manager = input_event_manager.InputEventManager() + + def test_activate_device_creates_the_device_only_once(self): + device = FakeDevice() + deviceFactory = FakeDeviceFactory(device) + fakeAtspi = types.SimpleNamespace( + get_version=lambda: (2, 58, 4), + Device=deviceFactory, + ) + + with mock.patch.object(input_event_manager, "Atspi", fakeAtspi): + firstDevice = self.manager.activate_device() + secondDevice = self.manager.activate_device() + + self.assertIs(firstDevice, device) + self.assertIs(secondDevice, device) + self.assertEqual(deviceFactory.new_full_calls, 1) + self.assertEqual(deviceFactory.new_calls, 0) + self.assertEqual(deviceFactory.app_ids, ["org.stormux.Cthulhu"]) + + def test_enable_pointer_monitoring_returns_false_without_a_device(self): + fakeAtspi = types.SimpleNamespace() + + with mock.patch.object(input_event_manager, "Atspi", fakeAtspi): + self.assertFalse(self.manager.enable_pointer_monitoring()) + + def test_enable_pointer_monitoring_returns_false_when_device_capability_is_missing(self): + device = FakeDevice() + deviceFactory = FakeDeviceFactory(device) + fakeAtspi = types.SimpleNamespace( + get_version=lambda: (2, 58, 4), + Device=deviceFactory, + ) + + with mock.patch.object(input_event_manager, "Atspi", fakeAtspi): + self.manager.activate_device() + self.assertFalse(self.manager.enable_pointer_monitoring()) + + self.assertEqual(device.set_capabilities_calls, []) + + def test_enable_pointer_monitoring_returns_true_when_pointer_monitor_is_granted(self): + device = FakeDevice() + deviceFactory = FakeDeviceFactory(device) + fakeDeviceCapability = types.SimpleNamespace(POINTER_MONITOR=8) + fakeAtspi = types.SimpleNamespace( + get_version=lambda: (2, 58, 4), + Device=deviceFactory, + DeviceCapability=fakeDeviceCapability, + ) + + with mock.patch.object(input_event_manager, "Atspi", fakeAtspi): + self.manager.activate_device() + self.assertTrue(self.manager.enable_pointer_monitoring()) + + self.assertEqual(device.set_capabilities_calls, [8]) + self.assertEqual(device.capabilities, 8) + + def test_start_and_stop_pointer_watcher_connect_and_disconnect_pointer_moved(self): + device = FakeDevice() + deviceFactory = FakeDeviceFactory(device) + fakeAtspi = types.SimpleNamespace( + get_version=lambda: (2, 58, 4), + Device=deviceFactory, + ) + callback = mock.Mock() + + with mock.patch.object(input_event_manager, "Atspi", fakeAtspi): + self.manager.activate_device() + self.manager.start_pointer_watcher(callback) + self.manager.stop_pointer_watcher() + + self.assertEqual(device.connect_calls, [("pointer-moved", callback)]) + self.assertEqual(device.disconnect_calls, [17]) + + +if __name__ == "__main__": + unittest.main()