From ecb1ae4fe5c744c96555e87ef3ab2ce5da16f534 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Mon, 4 Aug 2025 00:21:49 -0400 Subject: [PATCH] Keyboard seems to be working, same methods as orca now. --- src/cthulhu/Makefile.am | 1 + src/cthulhu/cthulhu_gui_prefs.py | 5 +- src/cthulhu/dbus_service.py | 228 +++++++++++++-- src/cthulhu/event_manager.py | 71 +++-- src/cthulhu/input_event.py | 114 +++++--- src/cthulhu/input_event_manager.py | 431 +++++++++++++++++++++++++++++ src/cthulhu/scripts/default.py | 9 +- 7 files changed, 774 insertions(+), 85 deletions(-) create mode 100644 src/cthulhu/input_event_manager.py diff --git a/src/cthulhu/Makefile.am b/src/cthulhu/Makefile.am index 13baf23..8e0f4de 100644 --- a/src/cthulhu/Makefile.am +++ b/src/cthulhu/Makefile.am @@ -44,6 +44,7 @@ cthulhu_python_PYTHON = \ guilabels.py \ highlighter.py \ input_event.py \ + input_event_manager.py \ keybindings.py \ keynames.py \ label_inference.py \ diff --git a/src/cthulhu/cthulhu_gui_prefs.py b/src/cthulhu/cthulhu_gui_prefs.py index 516e6ac..9fb9f0d 100644 --- a/src/cthulhu/cthulhu_gui_prefs.py +++ b/src/cthulhu/cthulhu_gui_prefs.py @@ -2039,11 +2039,14 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper): try: ts = cthulhu_state.lastInputEvent.timestamp + # Ensure timestamp fits in 32-bit range for GTK + if ts > 4294967295: # 2^32 - 1 + ts = ts % 4294967296 # Wrap to 32-bit range except Exception: ts = 0 if ts == 0: ts = Gtk.get_current_event_time() - cthulhuSetupWindow.present_with_time(ts) + cthulhuSetupWindow.present_with_time(int(ts)) # We always want to re-order the text attributes page so that enabled # items are consistently at the top. diff --git a/src/cthulhu/dbus_service.py b/src/cthulhu/dbus_service.py index dce3666..8f6900a 100644 --- a/src/cthulhu/dbus_service.py +++ b/src/cthulhu/dbus_service.py @@ -28,6 +28,7 @@ __copyright__ = "Copyright (c) 2025 Stormux " __license__ = "LGPL" import enum +import inspect from typing import Callable try: @@ -56,6 +57,7 @@ class HandlerType(enum.Enum): """Enumeration of handler types for D-Bus methods.""" COMMAND = enum.auto() + PARAMETERIZED_COMMAND = enum.auto() GETTER = enum.auto() SETTER = enum.auto() @@ -85,6 +87,26 @@ def getter(func): func.dbus_getter_description = description return func +def parameterized_command(func): + """Decorator to mark a method as a D-Bus parameterized command using its docstring. + + Usage: + @parameterized_command + def get_voices_for_language( + self, + language, + variant='', + script=None, + event=None, + notify_user=False + ): + '''Returns a list of available voices for the specified language.''' + # method implementation + """ + description = func.__doc__ or f"D-Bus parameterized command: {func.__name__}" + func.dbus_parameterized_command_description = description + return func + def setter(func): """Decorator to mark a method as a D-Bus setter using its docstring. @@ -98,6 +120,29 @@ def setter(func): func.dbus_setter_description = description return func +def _extract_function_parameters(func: Callable) -> list[tuple[str, str]]: + """Extract parameter names and types from a function signature.""" + + sig = inspect.signature(func) + parameters = [] + + skip_params = {"self", "script", "event"} + for param_name, param in sig.parameters.items(): + if param_name in skip_params: + continue + + if param.annotation != inspect.Parameter.empty: + if hasattr(param.annotation, "__name__"): + type_str = param.annotation.__name__ + else: + type_str = str(param.annotation).replace("typing.", "") + else: + type_str = "Any" + parameters.append((param_name, type_str)) + + return parameters + + class _HandlerInfo: """Stores processed information about a function exposed via D-Bus.""" @@ -106,12 +151,14 @@ class _HandlerInfo: python_function_name: str, description: str, action: Callable[..., bool], - handler_type: 'HandlerType' = HandlerType.COMMAND + handler_type: 'HandlerType' = HandlerType.COMMAND, + parameters: list[tuple[str, str]] | None = None ): self.python_function_name: str = python_function_name self.description: str = description self.action: Callable[..., bool] = action self.handler_type: HandlerType = handler_type + self.parameters: list[tuple[str, str]] = parameters or [] if _dasbus_available: @@ -125,23 +172,27 @@ if _dasbus_available: super().__init__() self._module_name = module_name self._commands: dict[str, _HandlerInfo] = {} + self._parameterized_commands: dict[str, _HandlerInfo] = {} self._getters: dict[str, _HandlerInfo] = {} self._setters: dict[str, _HandlerInfo] = {} for info in handlers_info: handler_type = getattr(info, "handler_type", HandlerType.COMMAND) - normalized_name = self._normalize_handler_name(info.python_function_name) + normalized_name = self._normalize_handler_name(info.python_function_name, handler_type) if handler_type == HandlerType.GETTER: self._getters[normalized_name] = info elif handler_type == HandlerType.SETTER: self._setters[normalized_name] = info + elif handler_type == HandlerType.PARAMETERIZED_COMMAND: + self._parameterized_commands[normalized_name] = info else: self._commands[normalized_name] = info msg = ( f"DBUS SERVICE: CthulhuModuleDBusInterface for {module_name} initialized " - f"with {len(self._commands)} commands, {len(self._getters)} getters, " - f"{len(self._setters)} setters." + f"with {len(self._commands)} command(s), " + f"{len(self._parameterized_commands)} parameterized command(s), " + f"{len(self._getters)} getter(s), {len(self._setters)} setter(s)." ) debug.printMessage(debug.LEVEL_INFO, msg, True) @@ -182,6 +233,16 @@ if _dasbus_available: command_list.append((camel_case_name, info.description)) return command_list + def ListParameterizedCommands( # pylint: disable=invalid-name + self, + ) -> list[tuple[str, str, list[tuple[str, str]]]]: + """Returns a list of (command_name, description, parameters) for this module.""" + + command_list = [] + for camel_case_name, info in self._parameterized_commands.items(): + command_list.append((camel_case_name, info.description, info.parameters)) + return command_list + def ListRuntimeGetters(self) -> list[tuple[str, str]]: # pylint: disable=invalid-name """Returns a list of (getter_name, description) for this module.""" @@ -215,6 +276,30 @@ if _dasbus_available: debug.printMessage(debug.LEVEL_INFO, msg, True) return result + def ExecuteParameterizedCommand( # pylint: disable=invalid-name + self, + command_name: str, + parameters: dict[str, GLib.Variant], + notify_user: bool + ) -> GLib.Variant: + """Executes the named command with parameters and returns the result.""" + + handler_info = self._parameterized_commands.get(command_name) + if not handler_info: + msg = ( + f"DBUS SERVICE: Unknown parameterized command '{command_name}' for " + f"'{self._module_name}'." + ) + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return GLib.Variant("b", False) + + kwargs = {name: variant.unpack() for name, variant in parameters.items()} + kwargs["notify_user"] = notify_user + result = handler_info.action(**kwargs) + msg = f"DBUS SERVICE: Parameterized '{command_name}' in '{self._module_name}' executed." + debug.printMessage(debug.LEVEL_INFO, msg, True) + return self._to_variant(result) + def for_publication(self): """Returns the D-Bus interface XML for publication.""" @@ -222,40 +307,51 @@ if _dasbus_available: @staticmethod - def _normalize_handler_name(function_name: str) -> str: + def _normalize_handler_name( + function_name: str, + handler_type: HandlerType = HandlerType.COMMAND + ) -> str: """Normalizes a Python function name for D-Bus exposure (getter/setter/command).""" - if function_name.startswith("get_") or function_name.startswith("set_"): - function_name = function_name[4:] + # Only strip prefixes for getters and setters, not for commands + if handler_type in (HandlerType.GETTER, HandlerType.SETTER): + if function_name.startswith("get_") or function_name.startswith("set_"): + function_name = function_name[4:] return "".join(word.capitalize() for word in function_name.split("_")) @staticmethod def _to_variant(result): """Converts a Python value to a correctly-typed GLib.Variant for D-Bus marshalling.""" + if isinstance(result, bool): return GLib.Variant("b", result) - elif isinstance(result, int): + if isinstance(result, int): return GLib.Variant("i", result) - elif isinstance(result, float): + if isinstance(result, float): return GLib.Variant("d", result) - elif isinstance(result, str): + if isinstance(result, str): return GLib.Variant("s", result) - elif isinstance(result, dict): + if isinstance(result, dict): return GLib.Variant( "a{sv}", {str(k): GLib.Variant("v", v) for k, v in result.items()}) - elif isinstance(result, list) or isinstance(result, tuple): + if isinstance(result, (list, tuple)): if all(isinstance(x, str) for x in result): return GLib.Variant("as", list(result)) - elif all(isinstance(x, int) for x in result): - return GLib.Variant("ax", list(result)) - elif all(isinstance(x, bool) for x in result): + if all(isinstance(x, bool) for x in result): return GLib.Variant("ab", list(result)) - else: - return GLib.Variant("av", [GLib.Variant("v", x) for x in result]) - elif result is None: + if all(isinstance(x, int) for x in result): + return GLib.Variant("ax", list(result)) + if all(isinstance(x, (list, tuple)) for x in result): + if not result: + return GLib.Variant("av", []) + first_len = len(result[0]) + converted = [tuple(str(item or "") for item in x) for x in result] + signature = "(" + "s" * first_len + ")" + return GLib.Variant(f"a{signature}", converted) + return GLib.Variant("av", [GLib.Variant("v", x) for x in result]) + if result is None: return GLib.Variant("v", GLib.Variant("s", "")) - else: - return GLib.Variant("s", str(result)) + return GLib.Variant("s", str(result)) @dbus_interface("org.stormux.Cthulhu.Service") @@ -349,6 +445,22 @@ if _dasbus_available: return sorted(commands) + def ShowPreferences(self) -> bool: # pylint: disable=invalid-name + """Shows Cthulhu's preferences GUI.""" + + msg = "DBUS SERVICE: ShowPreferences called." + debug.printMessage(debug.LEVEL_INFO, msg, True) + + manager = script_manager.getManager() + script = cthulhu_state.activeScript or manager.getDefaultScript() + if script is None: + msg = "DBUS SERVICE: No script available" + debug.printMessage(debug.LEVEL_WARNING, msg, True) + return False + + script.showPreferencesGUI() + return True + def PresentMessage(self, message: str) -> bool: # pylint: disable=invalid-name """Presents message to the user.""" @@ -420,6 +532,10 @@ class CthulhuRemoteController: self._bus: SessionMessageBus | None = None self._event_loop: EventLoop | None = None self._pending_registrations: dict[str, object] = {} + self._total_commands: int = 0 + self._total_getters: int = 0 + self._total_setters: int = 0 + self._total_modules: int = 0 self._dasbus_available = _dasbus_available def start(self) -> bool: @@ -474,6 +590,7 @@ class CthulhuRemoteController: ) debug.printMessage(debug.LEVEL_INFO, msg, True) self._process_pending_registrations() + self._print_registration_summary() return True def _process_pending_registrations(self) -> None: @@ -523,6 +640,10 @@ class CthulhuRemoteController: return handlers_info = [] + commands_count = 0 + getters_count = 0 + setters_count = 0 + for attr_name in dir(module_instance): attr = getattr(module_instance, attr_name) # Command @@ -541,8 +662,32 @@ class CthulhuRemoteController: handler_type=HandlerType.COMMAND ) handlers_info.append(handler_info) + commands_count += 1 msg = f"REMOTE CONTROLLER: Found decorated command '{attr_name}': {description}" debug.printMessage(debug.LEVEL_INFO, msg, True) + # Parameterized Command + elif callable(attr) and hasattr(attr, "dbus_parameterized_command_description"): + description = attr.dbus_parameterized_command_description + def _create_parameterized_wrapper(method=attr): + def _wrapper(**kwargs): + event = _get_input_event().RemoteControllerEvent() + script = cthulhu_state.activeScript + if script is None: + manager = script_manager.getManager() + script = manager.getDefaultScript() + return method(script=script, event=event, **kwargs) + return _wrapper + handler_info = _HandlerInfo( + python_function_name=attr_name, + description=description, + action=_create_parameterized_wrapper(), + handler_type=HandlerType.PARAMETERIZED_COMMAND, + parameters=_extract_function_parameters(attr) + ) + handlers_info.append(handler_info) + commands_count += 1 + msg = f"REMOTE CONTROLLER: Found decorated parameterized command '{attr_name}': {description}" + debug.printMessage(debug.LEVEL_INFO, msg, True) # Getter elif callable(attr) and hasattr(attr, "dbus_getter_description"): description = attr.dbus_getter_description @@ -557,6 +702,7 @@ class CthulhuRemoteController: handler_type=HandlerType.GETTER ) handlers_info.append(handler_info) + getters_count += 1 msg = f"REMOTE CONTROLLER: Found decorated getter '{attr_name}': {description}" debug.printMessage(debug.LEVEL_INFO, msg, True) # Setter @@ -573,17 +719,23 @@ class CthulhuRemoteController: handler_type=HandlerType.SETTER ) handlers_info.append(handler_info) + setters_count += 1 msg = f"REMOTE CONTROLLER: Found decorated setter '{attr_name}': {description}" debug.printMessage(debug.LEVEL_INFO, msg, True) if not handlers_info: return + self._total_commands += commands_count + self._total_getters += getters_count + self._total_setters += setters_count + self._total_modules += 1 + self._dbus_service_interface.add_module_interface( module_name, handlers_info, self._bus, self.OBJECT_PATH) msg = ( f"REMOTE CONTROLLER: Successfully registered {len(handlers_info)} " - f"decorated commands/getters/setters for module {module_name}." + f"commands/getters/setters for module {module_name}." ) debug.printMessage(debug.LEVEL_INFO, msg, True) @@ -645,12 +797,46 @@ class CthulhuRemoteController: msg = "REMOTE CONTROLLER: D-Bus service shut down." debug.printMessage(debug.LEVEL_INFO, msg, True) self._pending_registrations.clear() + self._total_commands = 0 + self._total_getters = 0 + self._total_setters = 0 + self._total_modules = 0 def is_running(self) -> bool: """Checks if the D-Bus service is currently running.""" return self._is_running + def _count_system_commands(self) -> int: + """Counts the system-wide D-Bus commands available on the main service interface.""" + + if not self._dbus_service_interface: + return 0 + + system_commands = 0 + for attr_name in dir(self._dbus_service_interface): + if not attr_name.startswith("_") and attr_name[0].isupper(): + attr = getattr(self._dbus_service_interface, attr_name) + if callable(attr) and hasattr(attr, "__doc__"): + system_commands += 1 + return system_commands + + def _print_registration_summary(self) -> None: + """Prints a summary of all registered D-Bus handlers.""" + + system_commands_count = self._count_system_commands() + total_handlers = self._total_commands + self._total_getters + self._total_setters + msg = ( + f"REMOTE CONTROLLER: Registration complete. Summary: " + f"{self._total_modules} modules, " + f"{self._total_commands} module commands, " + f"{self._total_getters} module getters, " + f"{self._total_setters} module setters, " + f"{system_commands_count} system commands. " + f"Total handlers: {total_handlers + system_commands_count}." + ) + debug.printMessage(debug.LEVEL_INFO, msg, True) + _remote_controller: CthulhuRemoteController = CthulhuRemoteController() def get_remote_controller() -> CthulhuRemoteController: diff --git a/src/cthulhu/event_manager.py b/src/cthulhu/event_manager.py index 46897a6..8f2db5d 100644 --- a/src/cthulhu/event_manager.py +++ b/src/cthulhu/event_manager.py @@ -40,6 +40,7 @@ import time from . import debug from . import input_event +from . import input_event_manager from . import cthulhu_state from . import script_manager from . import settings @@ -96,7 +97,7 @@ class EventManager: """Called when this event manager is activated.""" debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Activating', True) - self.setKeyHandling(False) + self.setKeyHandling(True) # Enable new InputEventManager for global keyboard capture self._active = True debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Activated', True) @@ -104,14 +105,21 @@ class EventManager: def activateNewKeyHandling(self): if not self.newKeyHandlingActive: try: - cthulhu_state.device = Atspi.Device.new() - except Exception: + debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Attempting to activate new keyboard handling', True) + # Use the new InputEventManager instead of direct Atspi.Device + self._inputEventManager = input_event_manager.getManager() + debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: InputEventManager obtained', True) + self._inputEventManager.start_key_watcher() + debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Key watcher started', True) + cthulhu_state.device = self._inputEventManager._device # For compatibility + except Exception as e: + debug.printMessage(debug.LEVEL_INFO, f'EVENT MANAGER: New keyboard handling failed: {e}', True) self.forceLegacyKeyHandling = True self.activateLegacyKeyHandling() return - cthulhu_state.device.key_watcher = cthulhu_state.device.add_key_watcher( - self._processNewKeyboardEvent) + self.newKeyHandlingActive = True + debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: New keyboard handling activated with InputEventManager', True) # Notify plugin system that device is now available for keybinding registration from . import cthulhu @@ -145,6 +153,9 @@ class EventManager: def deactivateNewKeyHandling(self): if self.newKeyHandlingActive: + if hasattr(self, '_inputEventManager'): + self._inputEventManager.stop_key_watcher() + self._inputEventManager = None cthulhu_state.device = None self.newKeyHandlingActive = False @@ -1142,19 +1153,8 @@ class EventManager: debug.printMessage(debug.LEVEL_INFO, msg, True) def _processNewKeyboardEvent(self, device, pressed, keycode, keysym, state, text): - event = Atspi.DeviceEvent() - if pressed: - event.type = Atspi.EventType.KEY_PRESSED_EVENT - else: - event.type = Atspi.EventType.KEY_RELEASED_EVENT - event.hw_code = keycode - event.id = keysym - event.modifiers = state - event.event_string = text - if event.event_string is None: - event.event_string = "" - event.timestamp = time.time() - + """Process keyboard event using new direct KeyboardEvent creation.""" + if not pressed and text == "Num_Lock" and "KP_Insert" in settings.cthulhuModifierKeys \ and cthulhu_state.activeScript is not None: cthulhu_state.activeScript.refreshKeyGrabs() @@ -1163,10 +1163,41 @@ class EventManager: cthulhu_state.openingDialog = (text == "space" \ and (state & ~(1 << Atspi.ModifierType.NUMLOCK))) - self._processKeyboardEvent(event) + # Create KeyboardEvent directly with new constructor + keyboardEvent = input_event.KeyboardEvent(pressed, keycode, keysym, state, text or "") + + # Set context information + keyboardEvent.setWindow(cthulhu_state.activeWindow) + keyboardEvent.setObject(cthulhu_state.locusOfFocus) + keyboardEvent.setScript(cthulhu_state.activeScript) + + # Finalize initialization now that context is set + keyboardEvent._finalize_initialization() + + if not keyboardEvent.is_duplicate: + debug.printMessage(debug.LEVEL_INFO, f"\n{keyboardEvent}") + + rv = keyboardEvent.process() + + # Do any needed xmodmap crap. Hopefully this can die soon. + from cthulhu import cthulhu + cthulhu.updateKeyMap(keyboardEvent) + + return rv def _processKeyboardEvent(self, event): - keyboardEvent = input_event.KeyboardEvent(event) + # Convert AT-SPI event to new KeyboardEvent format + pressed = event.type == Atspi.EventType.KEY_PRESSED_EVENT + keyboardEvent = input_event.KeyboardEvent(pressed, event.hw_code, event.id, event.modifiers, event.event_string or "") + + # Set context information + keyboardEvent.setWindow(cthulhu_state.activeWindow) + keyboardEvent.setObject(cthulhu_state.locusOfFocus) + keyboardEvent.setScript(cthulhu_state.activeScript) + + # Finalize initialization + keyboardEvent._finalize_initialization() + if not keyboardEvent.is_duplicate: debug.printMessage(debug.LEVEL_INFO, f"\n{keyboardEvent}") diff --git a/src/cthulhu/input_event.py b/src/cthulhu/input_event.py index b5b37a5..77d3119 100644 --- a/src/cthulhu/input_event.py +++ b/src/cthulhu/input_event.py @@ -233,31 +233,34 @@ class KeyboardEvent(InputEvent): Gdk.KEY_Yacute, Gdk.KEY_yacute] - def __init__(self, event): + def __init__(self, pressed, keycode, keysym, modifiers, text): """Creates a new InputEvent of type KEYBOARD_EVENT. Arguments: - - event: the AT-SPI keyboard event + - pressed: True if key is pressed, False if released + - keycode: hardware keycode + - keysym: keysym value + - modifiers: modifier mask + - text: text representation of the key """ super().__init__(KEYBOARD_EVENT) - self.id = event.id - self.type = event.type - self.hw_code = event.hw_code - self.modifiers = event.modifiers & Gdk.ModifierType.MODIFIER_MASK - if event.modifiers & (1 << Atspi.ModifierType.NUMLOCK): + self.id = keysym + self.type = Atspi.EventType.KEY_PRESSED_EVENT if pressed else Atspi.EventType.KEY_RELEASED_EVENT + self.hw_code = keycode + self.modifiers = modifiers & Gdk.ModifierType.MODIFIER_MASK + if modifiers & (1 << Atspi.ModifierType.NUMLOCK): self.modifiers |= (1 << Atspi.ModifierType.NUMLOCK) - self.event_string = event.event_string - self.keyval_name = Gdk.keyval_name(event.id) - if self.event_string == "": + self.event_string = text + self.keyval_name = Gdk.keyval_name(keysym) + if self.event_string == "": self.event_string = self.keyval_name - self.timestamp = event.timestamp - self.is_duplicate = self in [cthulhu_state.lastInputEvent, - cthulhu_state.lastNonModifierKeyEvent] - self._script = cthulhu_state.activeScript + self.timestamp = time.time() * 1000 # Convert to milliseconds + self.is_duplicate = False # Will be set by InputEventManager + self._script = None self._app = None - self._window = cthulhu_state.activeWindow - self._obj = cthulhu_state.locusOfFocus + self._window = None + self._obj = None self._handler = None self._consumer = None self._should_consume = None @@ -283,33 +286,26 @@ class KeyboardEvent(InputEvent): # trying to heuristically hack around this just by looking at the event # is not reliable. Ditto regarding asking Gdk for the numlock state. if self.keyval_name.startswith("KP"): - if event.modifiers & (1 << Atspi.ModifierType.NUMLOCK): + if modifiers & (1 << Atspi.ModifierType.NUMLOCK): self._is_kp_with_numlock = True - if self._script: - self._app = self._script.app - if not self._window: - cthulhu.setActiveWindow(self._script.utilities.activeWindow()) - self._window = cthulhu_state.activeWindow - tokens = ["INPUT EVENT: Updated window and active window to", self._window] - debug.printTokens(debug.LEVEL_INFO, tokens, True) - - if self._window and self._app != AXObject.get_application(self._window): - self._script = script_manager.getManager().getScript( - AXObject.get_application(self._window)) - self._app = self._script.app - tokens = ["INPUT EVENT: Updated script to", self._script] - debug.printTokens(debug.LEVEL_INFO, tokens, True) + self.keyType = None + self.shouldEcho = False + + # Initialize key type - will be refined later in _finalize_initialization + self._finalize_initialization() + def _finalize_initialization(self): + """Finalize initialization after object creation. + This is separated to allow InputEventManager to set additional properties first.""" + if self.is_duplicate: KeyboardEvent.duplicateCount += 1 else: KeyboardEvent.duplicateCount = 0 - self.keyType = None - - _isPressed = event.type == Atspi.EventType.KEY_PRESSED_EVENT - role = AXObject.get_role(self._obj) + _isPressed = self.type == Atspi.EventType.KEY_PRESSED_EVENT + role = AXObject.get_role(self._obj) if self._obj else None _mayEcho = _isPressed or role == Atspi.Role.TERMINAL if KeyboardEvent.stickyKeys and not self.isCthulhuModifier() \ @@ -427,8 +423,15 @@ class KeyboardEvent(InputEvent): return lastEvent return None - def setClickCount(self): - """Updates the count of the number of clicks a user has made.""" + def setClickCount(self, count=None): + """Updates the count of the number of clicks a user has made. + + If count is provided, sets the click count to that value. + Otherwise, calculates the click count based on event timing.""" + + if count is not None: + self._clickCount = count + return doubleEvent = self._getDoubleClickCandidate() if not doubleEvent: @@ -736,6 +739,43 @@ class KeyboardEvent(InputEvent): """Returns the object believed to be associated with this key event.""" return self._obj + + def setObject(self, obj): + """Sets the object believed to be associated with this key event.""" + + self._obj = obj + + def getWindow(self): + """Returns the window associated with this key event.""" + + return self._window + + def setWindow(self, window): + """Sets the window associated with this key event.""" + + self._window = window + + def getScript(self): + """Returns the script associated with this key event.""" + + return self._script + + def setScript(self, script): + """Sets the script associated with this key event.""" + + self._script = script + if script: + self._app = script.app + + def getClickCount(self): + """Returns the click count for this event.""" + + return self._clickCount + + def asSingleLineString(self): + """Returns a single-line string representation of this event.""" + + return f"KeyboardEvent({self.keyval_name}, pressed={self.isPressedKey()}, modifiers={self.modifiers})" def getHandler(self): """Returns the handler associated with this key event.""" diff --git a/src/cthulhu/input_event_manager.py b/src/cthulhu/input_event_manager.py new file mode 100644 index 0000000..959a2d8 --- /dev/null +++ b/src/cthulhu/input_event_manager.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024 Stormux +# Copyright (c) 2024 Igalia, S.L. +# Copyright (c) 2024 GNOME Foundation Inc. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the +# Free Software Foundation, Inc., Franklin Street, Fifth Floor, +# Boston MA 02110-1301 USA. + +"""Provides utilities for managing input events.""" + +from __future__ import annotations + +__id__ = "$Id$" +__version__ = "$Revision$" +__date__ = "$Date$" +__copyright__ = "Copyright (c) 2024 Stormux" \ + "Copyright (c) 2024 Igalia, S.L." \ + "Copyright (c) 2024 GNOME Foundation Inc." +__license__ = "LGPL" + +from typing import TYPE_CHECKING + +import gi +gi.require_version("Atspi", "2.0") +gi.require_version("Gdk", "3.0") +from gi.repository import Atspi +from gi.repository import Gdk + +from . import debug +from . import input_event +from . import script_manager +from . import settings +from . import cthulhu_state +from .ax_object import AXObject +from .ax_utilities import AXUtilities + +if TYPE_CHECKING: + from . import keybindings + +class InputEventManager: + """Provides utilities for managing input events.""" + + def __init__(self) -> None: + self._last_input_event: input_event.InputEvent | None = None + self._last_non_modifier_key_event: input_event.KeyboardEvent | None = None + self._device: Atspi.Device | None = None + self._mapped_keycodes: list[int] = [] + self._mapped_keysyms: list[int] = [] + self._grabbed_bindings: dict[int, keybindings.KeyBinding] = {} + self._paused: bool = False + + def start_key_watcher(self) -> None: + """Starts the watcher for keyboard input events.""" + + msg = "INPUT EVENT MANAGER: Starting key watcher." + debug.printMessage(debug.LEVEL_INFO, msg, True) + try: + atspi_version = Atspi.get_version() + debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: AT-SPI version: {atspi_version}", True) + if atspi_version >= (2, 55, 90): + debug.printMessage(debug.LEVEL_INFO, "INPUT EVENT MANAGER: Using Device.new_full", True) + self._device = Atspi.Device.new_full("org.stormux.Cthulhu") + else: + debug.printMessage(debug.LEVEL_INFO, "INPUT EVENT MANAGER: Using Device.new", True) + self._device = Atspi.Device.new() + debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: Device created: {self._device}", True) + debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: About to add key watcher callback", True) + result = self._device.add_key_watcher(self.process_keyboard_event) + debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: add_key_watcher result: {result}", True) + debug.printMessage(debug.LEVEL_INFO, "INPUT EVENT MANAGER: Key watcher added successfully", True) + except Exception as e: + debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: Error in start_key_watcher: {e}", True) + raise + + def stop_key_watcher(self) -> None: + """Stops the watcher for keyboard input events.""" + + msg = "INPUT EVENT MANAGER: Stopping key watcher." + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._device = None + + def pause_key_watcher(self, pause: bool = True, reason: str = "") -> None: + """Pauses processing of keyboard input events.""" + + msg = f"INPUT EVENT MANAGER: Pause processing: {pause}. {reason}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._paused = pause + + def check_grabbed_bindings(self) -> None: + """Checks the grabbed key bindings.""" + + msg = f"INPUT EVENT MANAGER: {len(self._grabbed_bindings)} grabbed key bindings." + debug.printMessage(debug.LEVEL_INFO, msg, True) + for grab_id, binding in self._grabbed_bindings.items(): + msg = f"INPUT EVENT MANAGER: {grab_id} for: {binding}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> list[int]: + """Adds grabs for binding if it is enabled, returns grab IDs.""" + + if not (binding.is_enabled() and binding.is_bound()): + return [] + + if binding.has_grabs(): + tokens = ["INPUT EVENT MANAGER:", binding, "already has grabs."] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + return [] + + if self._device is None: + tokens = ["INPUT EVENT MANAGER: No device to add grab for", binding] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + return [] + + grab_ids = [] + for kd in binding.key_definitions(): + grab_id = self._device.add_key_grab(kd, None) + grab_ids.append(grab_id) + self._grabbed_bindings[grab_id] = binding + + return grab_ids + + def remove_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> None: + """Removes grabs for binding.""" + + if self._device is None: + tokens = ["INPUT EVENT MANAGER: No device to remove grab from", binding] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + return + + grab_ids = binding.get_grab_ids() + if not grab_ids: + tokens = ["INPUT EVENT MANAGER:", binding, "doesn't have grabs to remove."] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + return + + for grab_id in grab_ids: + self._device.remove_key_grab(grab_id) + removed = self._grabbed_bindings.pop(grab_id, None) + if removed is None: + msg = f"INPUT EVENT MANAGER: No key binding for grab id {grab_id}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def map_keycode_to_modifier(self, keycode: int) -> int: + """Maps keycode as a modifier, returns the newly-mapped modifier.""" + + if self._device is None: + msg = f"INPUT EVENT MANAGER: No device to map keycode {keycode} to modifier" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return 0 + + self._mapped_keycodes.append(keycode) + return self._device.map_modifier(keycode) + + def map_keysym_to_modifier(self, keysym: int) -> int: + """Maps keysym as a modifier, returns the newly-mapped modifier.""" + + if self._device is None: + msg = f"INPUT EVENT MANAGER: No device to map keysym {keysym} to modifier" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return 0 + + self._mapped_keysyms.append(keysym) + return self._device.map_keysym_modifier(keysym) + + def unmap_all_modifiers(self) -> None: + """Unmaps all previously mapped modifiers.""" + + if self._device is None: + msg = "INPUT EVENT MANAGER: No device to unmap all modifiers from" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return + + for keycode in self._mapped_keycodes: + self._device.unmap_modifier(keycode) + + for keysym in self._mapped_keysyms: + self._device.unmap_keysym_modifier(keysym) + + self._mapped_keycodes.clear() + self._mapped_keysyms.clear() + + def add_grab_for_modifier(self, modifier: str, keysym: int, keycode: int) -> int: + """Adds grab for modifier, returns grab id.""" + + if self._device is None: + msg = f"INPUT EVENT MANAGER: No device to add grab for {modifier}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return -1 + + kd = Atspi.KeyDefinition() + kd.keysym = keysym + kd.keycode = keycode + kd.modifiers = 0 + grab_id = self._device.add_key_grab(kd) + + msg = f"INPUT EVENT MANAGER: Grab id for {modifier}: {grab_id}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return grab_id + + def remove_grab_for_modifier(self, modifier: str, grab_id: int) -> None: + """Removes grab for modifier.""" + + if self._device is None: + msg = f"INPUT EVENT MANAGER: No device to remove grab from {modifier}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return + + self._device.remove_key_grab(grab_id) + msg = f"INPUT EVENT MANAGER: Grab id removed for {modifier}: {grab_id}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + + def grab_keyboard(self, reason: str = "") -> None: + """Grabs the keyboard, e.g. when entering learn mode.""" + + msg = "INPUT EVENT MANAGER: Grabbing keyboard" + if reason: + msg += f" Reason: {reason}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + Atspi.Device.grab_keyboard(self._device) + + def ungrab_keyboard(self, reason: str = "") -> None: + """Removes keyboard grab, e.g. when exiting learn mode.""" + + msg = "INPUT EVENT MANAGER: Ungrabbing keyboard" + if reason: + msg += f" Reason: {reason}" + debug.printMessage(debug.LEVEL_INFO, msg, True) + Atspi.Device.ungrab_keyboard(self._device) + + def process_braille_event(self, event: Atspi.Event) -> bool: + """Processes this Braille event.""" + + braille_event = input_event.BrailleEvent(event) + result = braille_event.process() + self._last_input_event = braille_event + self._last_non_modifier_key_event = None + return result + + def process_mouse_button_event(self, event: Atspi.Event) -> None: + """Processes this Mouse event.""" + + mouse_event = input_event.MouseButtonEvent(event) + mouse_event.setClickCount(self._determine_mouse_event_click_count(mouse_event)) + self._last_input_event = mouse_event + + def process_keyboard_event(self, _device, pressed, keycode, keysym, modifiers, text): + """Processes this Atspi keyboard event.""" + + debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: Received keyboard event: pressed={pressed}, keycode={keycode}, keysym={keysym}, text='{text}'", True) + + if self._paused: + msg = "INPUT EVENT MANAGER: Keyboard event processing is paused." + debug.printMessage(debug.LEVEL_INFO, msg, True) + return False + + # Handle Cthulhu-specific logic before creating event + if not pressed and text == "Num_Lock" and "KP_Insert" in settings.cthulhuModifierKeys \ + and cthulhu_state.activeScript is not None: + cthulhu_state.activeScript.refreshKeyGrabs() + + if pressed: + cthulhu_state.openingDialog = (text == "space" \ + and (modifiers & ~(1 << Atspi.ModifierType.NUMLOCK))) + + event = input_event.KeyboardEvent(pressed, keycode, keysym, modifiers, text or "") + if event in [self._last_input_event, self._last_non_modifier_key_event]: + msg = "INPUT EVENT MANAGER: Received duplicate event." + debug.printMessage(debug.LEVEL_INFO, msg, True) + return False + + if pressed: + # Get active window and focus + window = cthulhu_state.activeWindow + # Use cthulhu_state.activeScript instead of ScriptManager API + active_script = cthulhu_state.activeScript + if active_script and hasattr(active_script, 'utilities') and hasattr(active_script.utilities, 'canBeActiveWindow'): + if not active_script.utilities.canBeActiveWindow(window): + new_window = active_script.utilities.activeWindow() + if new_window is not None: + window = new_window + tokens = ["INPUT EVENT MANAGER: Updating window and active window to", window] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + cthulhu_state.activeWindow = window + else: + tokens = ["WARNING:", window, "cannot be active window. No alternative found."] + debug.printTokens(debug.LEVEL_WARNING, tokens, True) + + event.setWindow(window) + event.setObject(cthulhu_state.locusOfFocus) + event.setScript(active_script) + elif self.last_event_was_keyboard(): + assert isinstance(self._last_input_event, input_event.KeyboardEvent) + event.setWindow(self._last_input_event.getWindow()) + event.setObject(self._last_input_event.getObject()) + event.setScript(self._last_input_event.getScript()) + else: + event.setWindow(cthulhu_state.activeWindow) + event.setObject(cthulhu_state.locusOfFocus) + event.setScript(cthulhu_state.activeScript) + + # Finalize initialization now that context is set + event._finalize_initialization() + + if not event.is_duplicate: + debug.printMessage(debug.LEVEL_INFO, f"\n{event}") + + event.setClickCount(self._determine_keyboard_event_click_count(event)) + rv = event.process() + + # Do any needed xmodmap handling + from . import cthulhu + cthulhu.updateKeyMap(event) + + if event.isModifierKey(): + if self.is_release_for(event, self._last_input_event): + msg = "INPUT EVENT MANAGER: Clearing last non modifier key event" + debug.printMessage(debug.LEVEL_INFO, msg, True) + self._last_non_modifier_key_event = None + else: + self._last_non_modifier_key_event = event + self._last_input_event = event + return rv + + def _determine_keyboard_event_click_count(self, event: input_event.KeyboardEvent) -> int: + """Determines the click count of event.""" + + if not self.last_event_was_keyboard(): + return 1 + + if event.isModifierKey(): + last_event = self._last_input_event + else: + last_event = self._last_non_modifier_key_event or self._last_input_event + + assert isinstance(last_event, input_event.KeyboardEvent) + if (event.time - last_event.time > settings.doubleClickTimeout) or \ + (event.keyval_name != last_event.keyval_name) or \ + (event.getObject() != last_event.getObject()): + return 1 + + last_count = last_event.getClickCount() + if not event.isPressedKey(): + return last_count + if last_event.isPressedKey(): + return last_count + if (event.isModifierKey() and last_count == 2) or last_count == 3: + return 1 + return last_count + 1 + + def _determine_mouse_event_click_count(self, event: input_event.MouseButtonEvent) -> int: + """Determines the click count of event.""" + + if not self.last_event_was_mouse_button(): + return 1 + + assert isinstance(self._last_input_event, input_event.MouseButtonEvent) + if not event.pressed: + return self._last_input_event.getClickCount() + if self._last_input_event.button != event.button: + return 1 + if event.time - self._last_input_event.time > settings.doubleClickTimeout: + return 1 + + return self._last_input_event.getClickCount() + 1 + + def last_event_was_keyboard(self) -> bool: + """Returns True if the last event is a keyboard event.""" + + return isinstance(self._last_input_event, input_event.KeyboardEvent) + + def last_event_was_mouse_button(self) -> bool: + """Returns True if the last event is a mouse button event.""" + + return isinstance(self._last_input_event, input_event.MouseButtonEvent) + + def is_release_for(self, event1, event2): + """Returns True if event1 is a release for event2.""" + + if event1 is None or event2 is None: + return False + + if not isinstance(event1, input_event.KeyboardEvent) \ + or not isinstance(event2, input_event.KeyboardEvent): + return False + + if event1.isPressedKey() or not event2.isPressedKey(): + return False + + result = event1.id == event2.id \ + and event1.hw_code == event2.hw_code \ + and event1.keyval_name == event2.keyval_name + + if result and not event1.isModifierKey(): + result = event1.modifiers == event2.modifiers + + msg = ( + f"INPUT EVENT MANAGER: {event1.asSingleLineString()} " + f"is release for {event2.asSingleLineString()}: {result}" + ) + debug.printMessage(debug.LEVEL_INFO, msg, True) + return result + + def last_event_equals_or_is_release_for_event(self, event): + """Returns True if the last non-modifier event equals, or is the release for, event.""" + + if self._last_non_modifier_key_event is None: + return False + + if event == self._last_non_modifier_key_event: + return True + + return self.is_release_for(self._last_non_modifier_key_event, event) + + +_manager = InputEventManager() +def getManager(): + """Returns the Input Event Manager singleton.""" + return _manager \ No newline at end of file diff --git a/src/cthulhu/scripts/default.py b/src/cthulhu/scripts/default.py index d17398f..41dc98c 100644 --- a/src/cthulhu/scripts/default.py +++ b/src/cthulhu/scripts/default.py @@ -723,12 +723,9 @@ class Script(script.Script): self.speechAndVerbosityManager.update_punctuation_level() self.speechAndVerbosityManager.update_capitalization_style() - # Gtk 4 requrns "GTK", while older versions return "gtk" - # TODO: move this to a toolkit-specific script - if self.app is not None and self.app.toolkitName == "GTK" and self.app.toolkitVersion > "4": - cthulhu.setKeyHandling(True) - else: - cthulhu.setKeyHandling(False) + # Use new InputEventManager for global keyboard capture by default + # Only fall back to legacy handling for problematic applications + cthulhu.setKeyHandling(True) self.addKeyGrabs()