Add AT-SPI pointer monitor wrappers

This commit is contained in:
Storm Dragon
2026-04-07 17:11:13 -04:00
parent 8e792dd4e2
commit d7d26c57f4
2 changed files with 213 additions and 5 deletions

View File

@@ -44,6 +44,7 @@ from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict
import gi
gi.require_version("Atspi", "2.0")
from gi.repository import Atspi
from gi.repository import GLib
from . import debug
from . import focus_manager
@@ -64,21 +65,36 @@ class InputEventManager:
self._last_input_event: Optional[input_event.InputEvent] = None
self._last_non_modifier_key_event: Optional[input_event.KeyboardEvent] = None
self._device: Optional[Atspi.Device] = None
self._pointer_moved_id: int = 0
self._mapped_keycodes: List[int] = []
self._mapped_keysyms: List[int] = []
self._grabbed_bindings: Dict[int, keybindings.KeyBinding] = {}
self._paused: bool = False
def activate_device(self) -> Atspi.Device:
"""Creates and returns the AT-SPI device used by this manager."""
if self._device is not None:
return self._device
if Atspi.get_version() >= (2, 55, 90):
self._device = Atspi.Device.new_full("org.stormux.Cthulhu")
else:
self._device = Atspi.Device.new()
return self._device
def get_device(self) -> Optional[Atspi.Device]:
"""Returns the active AT-SPI device, if any."""
return self._device
def start_key_watcher(self) -> None:
"""Starts the watcher for keyboard input events."""
msg = "INPUT EVENT MANAGER: Starting key watcher."
debug.print_message(debug.LEVEL_INFO, msg, True)
if Atspi.get_version() >= (2, 55, 90):
self._device = Atspi.Device.new_full("org.stormux.Cthulhu")
else:
self._device = Atspi.Device.new()
self._device.add_key_watcher(self.process_keyboard_event)
self.activate_device().add_key_watcher(self.process_keyboard_event)
def stop_key_watcher(self) -> None:
"""Starts the watcher for keyboard input events."""
@@ -87,6 +103,63 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
self._device = None
def enable_pointer_monitoring(self) -> bool:
"""Enables pointer monitoring on the current device, if possible."""
device = self.get_device()
if device is None:
return False
deviceCapability = getattr(Atspi, "DeviceCapability", None)
if deviceCapability is None:
return False
pointerMonitor = getattr(deviceCapability, "POINTER_MONITOR", None)
if pointerMonitor is None:
return False
setCapabilities = getattr(device, "set_capabilities", None)
if not callable(setCapabilities):
return False
currentCapabilities = 0
getCapabilities = getattr(device, "get_capabilities", None)
if callable(getCapabilities):
currentCapabilities = getCapabilities()
try:
grantedCapabilities = setCapabilities(currentCapabilities | pointerMonitor)
except GLib.GError:
return False
if isinstance(grantedCapabilities, bool):
return grantedCapabilities
try:
return bool(int(grantedCapabilities) & int(pointerMonitor))
except (TypeError, ValueError):
return False
def start_pointer_watcher(self, callback) -> None:
"""Starts the watcher for pointer movement events."""
device = self.get_device()
if device is None:
return
self._pointer_moved_id = device.connect("pointer-moved", callback)
def stop_pointer_watcher(self) -> None:
"""Stops the watcher for pointer movement events."""
device = self.get_device()
if device is None or not self._pointer_moved_id:
self._pointer_moved_id = 0
return
device.disconnect(self._pointer_moved_id)
self._pointer_moved_id = 0
def pause_key_watcher(self, pause: bool = True, reason: str = "") -> None:
"""Pauses processing of keyboard input events."""

View File

@@ -0,0 +1,135 @@
import sys
import types
import unittest
from pathlib import Path
from unittest import mock
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from cthulhu import input_event_manager
class FakeDevice:
def __init__(self):
self.add_key_watcher_calls = []
self.connect_calls = []
self.disconnect_calls = []
self.set_capabilities_calls = []
self.capabilities = 0
self.next_handler_id = 17
def add_key_watcher(self, callback, user_data=None):
self.add_key_watcher_calls.append((callback, user_data))
def connect(self, signalName, callback):
self.connect_calls.append((signalName, callback))
return self.next_handler_id
def disconnect(self, handlerId):
self.disconnect_calls.append(handlerId)
def get_capabilities(self):
return self.capabilities
def set_capabilities(self, capabilities):
self.set_capabilities_calls.append(capabilities)
self.capabilities = capabilities
return capabilities
class FakeDeviceFactory:
def __init__(self, device):
self.device = device
self.new_calls = 0
self.new_full_calls = 0
self.app_ids = []
def new(self):
self.new_calls += 1
return self.device
def new_full(self, appId):
self.new_full_calls += 1
self.app_ids.append(appId)
return self.device
class InputEventManagerPointerMonitorTests(unittest.TestCase):
def setUp(self):
self.manager = input_event_manager.InputEventManager()
def test_activate_device_creates_the_device_only_once(self):
device = FakeDevice()
deviceFactory = FakeDeviceFactory(device)
fakeAtspi = types.SimpleNamespace(
get_version=lambda: (2, 58, 4),
Device=deviceFactory,
)
with mock.patch.object(input_event_manager, "Atspi", fakeAtspi):
firstDevice = self.manager.activate_device()
secondDevice = self.manager.activate_device()
self.assertIs(firstDevice, device)
self.assertIs(secondDevice, device)
self.assertEqual(deviceFactory.new_full_calls, 1)
self.assertEqual(deviceFactory.new_calls, 0)
self.assertEqual(deviceFactory.app_ids, ["org.stormux.Cthulhu"])
def test_enable_pointer_monitoring_returns_false_without_a_device(self):
fakeAtspi = types.SimpleNamespace()
with mock.patch.object(input_event_manager, "Atspi", fakeAtspi):
self.assertFalse(self.manager.enable_pointer_monitoring())
def test_enable_pointer_monitoring_returns_false_when_device_capability_is_missing(self):
device = FakeDevice()
deviceFactory = FakeDeviceFactory(device)
fakeAtspi = types.SimpleNamespace(
get_version=lambda: (2, 58, 4),
Device=deviceFactory,
)
with mock.patch.object(input_event_manager, "Atspi", fakeAtspi):
self.manager.activate_device()
self.assertFalse(self.manager.enable_pointer_monitoring())
self.assertEqual(device.set_capabilities_calls, [])
def test_enable_pointer_monitoring_returns_true_when_pointer_monitor_is_granted(self):
device = FakeDevice()
deviceFactory = FakeDeviceFactory(device)
fakeDeviceCapability = types.SimpleNamespace(POINTER_MONITOR=8)
fakeAtspi = types.SimpleNamespace(
get_version=lambda: (2, 58, 4),
Device=deviceFactory,
DeviceCapability=fakeDeviceCapability,
)
with mock.patch.object(input_event_manager, "Atspi", fakeAtspi):
self.manager.activate_device()
self.assertTrue(self.manager.enable_pointer_monitoring())
self.assertEqual(device.set_capabilities_calls, [8])
self.assertEqual(device.capabilities, 8)
def test_start_and_stop_pointer_watcher_connect_and_disconnect_pointer_moved(self):
device = FakeDevice()
deviceFactory = FakeDeviceFactory(device)
fakeAtspi = types.SimpleNamespace(
get_version=lambda: (2, 58, 4),
Device=deviceFactory,
)
callback = mock.Mock()
with mock.patch.object(input_event_manager, "Atspi", fakeAtspi):
self.manager.activate_device()
self.manager.start_pointer_watcher(callback)
self.manager.stop_pointer_watcher()
self.assertEqual(device.connect_calls, [("pointer-moved", callback)])
self.assertEqual(device.disconnect_calls, [17])
if __name__ == "__main__":
unittest.main()