Keyboard seems to be working, same methods as orca now.

This commit is contained in:
Storm Dragon
2025-08-04 00:21:49 -04:00
parent 9c8063c55e
commit ecb1ae4fe5
7 changed files with 774 additions and 85 deletions
+1
View File
@@ -44,6 +44,7 @@ cthulhu_python_PYTHON = \
guilabels.py \
highlighter.py \
input_event.py \
input_event_manager.py \
keybindings.py \
keynames.py \
label_inference.py \
+4 -1
View File
@@ -2039,11 +2039,14 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
try:
ts = cthulhu_state.lastInputEvent.timestamp
# Ensure timestamp fits in 32-bit range for GTK
if ts > 4294967295: # 2^32 - 1
ts = ts % 4294967296 # Wrap to 32-bit range
except Exception:
ts = 0
if ts == 0:
ts = Gtk.get_current_event_time()
cthulhuSetupWindow.present_with_time(ts)
cthulhuSetupWindow.present_with_time(int(ts))
# We always want to re-order the text attributes page so that enabled
# items are consistently at the top.
+207 -21
View File
@@ -28,6 +28,7 @@ __copyright__ = "Copyright (c) 2025 Stormux <storm_dragon@stormux.org>"
__license__ = "LGPL"
import enum
import inspect
from typing import Callable
try:
@@ -56,6 +57,7 @@ class HandlerType(enum.Enum):
"""Enumeration of handler types for D-Bus methods."""
COMMAND = enum.auto()
PARAMETERIZED_COMMAND = enum.auto()
GETTER = enum.auto()
SETTER = enum.auto()
@@ -85,6 +87,26 @@ def getter(func):
func.dbus_getter_description = description
return func
def parameterized_command(func):
"""Decorator to mark a method as a D-Bus parameterized command using its docstring.
Usage:
@parameterized_command
def get_voices_for_language(
self,
language,
variant='',
script=None,
event=None,
notify_user=False
):
'''Returns a list of available voices for the specified language.'''
# method implementation
"""
description = func.__doc__ or f"D-Bus parameterized command: {func.__name__}"
func.dbus_parameterized_command_description = description
return func
def setter(func):
"""Decorator to mark a method as a D-Bus setter using its docstring.
@@ -98,6 +120,29 @@ def setter(func):
func.dbus_setter_description = description
return func
def _extract_function_parameters(func: Callable) -> list[tuple[str, str]]:
"""Extract parameter names and types from a function signature."""
sig = inspect.signature(func)
parameters = []
skip_params = {"self", "script", "event"}
for param_name, param in sig.parameters.items():
if param_name in skip_params:
continue
if param.annotation != inspect.Parameter.empty:
if hasattr(param.annotation, "__name__"):
type_str = param.annotation.__name__
else:
type_str = str(param.annotation).replace("typing.", "")
else:
type_str = "Any"
parameters.append((param_name, type_str))
return parameters
class _HandlerInfo:
"""Stores processed information about a function exposed via D-Bus."""
@@ -106,12 +151,14 @@ class _HandlerInfo:
python_function_name: str,
description: str,
action: Callable[..., bool],
handler_type: 'HandlerType' = HandlerType.COMMAND
handler_type: 'HandlerType' = HandlerType.COMMAND,
parameters: list[tuple[str, str]] | None = None
):
self.python_function_name: str = python_function_name
self.description: str = description
self.action: Callable[..., bool] = action
self.handler_type: HandlerType = handler_type
self.parameters: list[tuple[str, str]] = parameters or []
if _dasbus_available:
@@ -125,23 +172,27 @@ if _dasbus_available:
super().__init__()
self._module_name = module_name
self._commands: dict[str, _HandlerInfo] = {}
self._parameterized_commands: dict[str, _HandlerInfo] = {}
self._getters: dict[str, _HandlerInfo] = {}
self._setters: dict[str, _HandlerInfo] = {}
for info in handlers_info:
handler_type = getattr(info, "handler_type", HandlerType.COMMAND)
normalized_name = self._normalize_handler_name(info.python_function_name)
normalized_name = self._normalize_handler_name(info.python_function_name, handler_type)
if handler_type == HandlerType.GETTER:
self._getters[normalized_name] = info
elif handler_type == HandlerType.SETTER:
self._setters[normalized_name] = info
elif handler_type == HandlerType.PARAMETERIZED_COMMAND:
self._parameterized_commands[normalized_name] = info
else:
self._commands[normalized_name] = info
msg = (
f"DBUS SERVICE: CthulhuModuleDBusInterface for {module_name} initialized "
f"with {len(self._commands)} commands, {len(self._getters)} getters, "
f"{len(self._setters)} setters."
f"with {len(self._commands)} command(s), "
f"{len(self._parameterized_commands)} parameterized command(s), "
f"{len(self._getters)} getter(s), {len(self._setters)} setter(s)."
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -182,6 +233,16 @@ if _dasbus_available:
command_list.append((camel_case_name, info.description))
return command_list
def ListParameterizedCommands( # pylint: disable=invalid-name
self,
) -> list[tuple[str, str, list[tuple[str, str]]]]:
"""Returns a list of (command_name, description, parameters) for this module."""
command_list = []
for camel_case_name, info in self._parameterized_commands.items():
command_list.append((camel_case_name, info.description, info.parameters))
return command_list
def ListRuntimeGetters(self) -> list[tuple[str, str]]: # pylint: disable=invalid-name
"""Returns a list of (getter_name, description) for this module."""
@@ -215,6 +276,30 @@ if _dasbus_available:
debug.printMessage(debug.LEVEL_INFO, msg, True)
return result
def ExecuteParameterizedCommand( # pylint: disable=invalid-name
self,
command_name: str,
parameters: dict[str, GLib.Variant],
notify_user: bool
) -> GLib.Variant:
"""Executes the named command with parameters and returns the result."""
handler_info = self._parameterized_commands.get(command_name)
if not handler_info:
msg = (
f"DBUS SERVICE: Unknown parameterized command '{command_name}' for "
f"'{self._module_name}'."
)
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return GLib.Variant("b", False)
kwargs = {name: variant.unpack() for name, variant in parameters.items()}
kwargs["notify_user"] = notify_user
result = handler_info.action(**kwargs)
msg = f"DBUS SERVICE: Parameterized '{command_name}' in '{self._module_name}' executed."
debug.printMessage(debug.LEVEL_INFO, msg, True)
return self._to_variant(result)
def for_publication(self):
"""Returns the D-Bus interface XML for publication."""
@@ -222,40 +307,51 @@ if _dasbus_available:
@staticmethod
def _normalize_handler_name(function_name: str) -> str:
def _normalize_handler_name(
function_name: str,
handler_type: HandlerType = HandlerType.COMMAND
) -> str:
"""Normalizes a Python function name for D-Bus exposure (getter/setter/command)."""
if function_name.startswith("get_") or function_name.startswith("set_"):
function_name = function_name[4:]
# Only strip prefixes for getters and setters, not for commands
if handler_type in (HandlerType.GETTER, HandlerType.SETTER):
if function_name.startswith("get_") or function_name.startswith("set_"):
function_name = function_name[4:]
return "".join(word.capitalize() for word in function_name.split("_"))
@staticmethod
def _to_variant(result):
"""Converts a Python value to a correctly-typed GLib.Variant for D-Bus marshalling."""
if isinstance(result, bool):
return GLib.Variant("b", result)
elif isinstance(result, int):
if isinstance(result, int):
return GLib.Variant("i", result)
elif isinstance(result, float):
if isinstance(result, float):
return GLib.Variant("d", result)
elif isinstance(result, str):
if isinstance(result, str):
return GLib.Variant("s", result)
elif isinstance(result, dict):
if isinstance(result, dict):
return GLib.Variant(
"a{sv}", {str(k): GLib.Variant("v", v) for k, v in result.items()})
elif isinstance(result, list) or isinstance(result, tuple):
if isinstance(result, (list, tuple)):
if all(isinstance(x, str) for x in result):
return GLib.Variant("as", list(result))
elif all(isinstance(x, int) for x in result):
return GLib.Variant("ax", list(result))
elif all(isinstance(x, bool) for x in result):
if all(isinstance(x, bool) for x in result):
return GLib.Variant("ab", list(result))
else:
return GLib.Variant("av", [GLib.Variant("v", x) for x in result])
elif result is None:
if all(isinstance(x, int) for x in result):
return GLib.Variant("ax", list(result))
if all(isinstance(x, (list, tuple)) for x in result):
if not result:
return GLib.Variant("av", [])
first_len = len(result[0])
converted = [tuple(str(item or "") for item in x) for x in result]
signature = "(" + "s" * first_len + ")"
return GLib.Variant(f"a{signature}", converted)
return GLib.Variant("av", [GLib.Variant("v", x) for x in result])
if result is None:
return GLib.Variant("v", GLib.Variant("s", ""))
else:
return GLib.Variant("s", str(result))
return GLib.Variant("s", str(result))
@dbus_interface("org.stormux.Cthulhu.Service")
@@ -349,6 +445,22 @@ if _dasbus_available:
return sorted(commands)
def ShowPreferences(self) -> bool: # pylint: disable=invalid-name
"""Shows Cthulhu's preferences GUI."""
msg = "DBUS SERVICE: ShowPreferences called."
debug.printMessage(debug.LEVEL_INFO, msg, True)
manager = script_manager.getManager()
script = cthulhu_state.activeScript or manager.getDefaultScript()
if script is None:
msg = "DBUS SERVICE: No script available"
debug.printMessage(debug.LEVEL_WARNING, msg, True)
return False
script.showPreferencesGUI()
return True
def PresentMessage(self, message: str) -> bool: # pylint: disable=invalid-name
"""Presents message to the user."""
@@ -420,6 +532,10 @@ class CthulhuRemoteController:
self._bus: SessionMessageBus | None = None
self._event_loop: EventLoop | None = None
self._pending_registrations: dict[str, object] = {}
self._total_commands: int = 0
self._total_getters: int = 0
self._total_setters: int = 0
self._total_modules: int = 0
self._dasbus_available = _dasbus_available
def start(self) -> bool:
@@ -474,6 +590,7 @@ class CthulhuRemoteController:
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._process_pending_registrations()
self._print_registration_summary()
return True
def _process_pending_registrations(self) -> None:
@@ -523,6 +640,10 @@ class CthulhuRemoteController:
return
handlers_info = []
commands_count = 0
getters_count = 0
setters_count = 0
for attr_name in dir(module_instance):
attr = getattr(module_instance, attr_name)
# Command
@@ -541,8 +662,32 @@ class CthulhuRemoteController:
handler_type=HandlerType.COMMAND
)
handlers_info.append(handler_info)
commands_count += 1
msg = f"REMOTE CONTROLLER: Found decorated command '{attr_name}': {description}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
# Parameterized Command
elif callable(attr) and hasattr(attr, "dbus_parameterized_command_description"):
description = attr.dbus_parameterized_command_description
def _create_parameterized_wrapper(method=attr):
def _wrapper(**kwargs):
event = _get_input_event().RemoteControllerEvent()
script = cthulhu_state.activeScript
if script is None:
manager = script_manager.getManager()
script = manager.getDefaultScript()
return method(script=script, event=event, **kwargs)
return _wrapper
handler_info = _HandlerInfo(
python_function_name=attr_name,
description=description,
action=_create_parameterized_wrapper(),
handler_type=HandlerType.PARAMETERIZED_COMMAND,
parameters=_extract_function_parameters(attr)
)
handlers_info.append(handler_info)
commands_count += 1
msg = f"REMOTE CONTROLLER: Found decorated parameterized command '{attr_name}': {description}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
# Getter
elif callable(attr) and hasattr(attr, "dbus_getter_description"):
description = attr.dbus_getter_description
@@ -557,6 +702,7 @@ class CthulhuRemoteController:
handler_type=HandlerType.GETTER
)
handlers_info.append(handler_info)
getters_count += 1
msg = f"REMOTE CONTROLLER: Found decorated getter '{attr_name}': {description}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
# Setter
@@ -573,17 +719,23 @@ class CthulhuRemoteController:
handler_type=HandlerType.SETTER
)
handlers_info.append(handler_info)
setters_count += 1
msg = f"REMOTE CONTROLLER: Found decorated setter '{attr_name}': {description}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
if not handlers_info:
return
self._total_commands += commands_count
self._total_getters += getters_count
self._total_setters += setters_count
self._total_modules += 1
self._dbus_service_interface.add_module_interface(
module_name, handlers_info, self._bus, self.OBJECT_PATH)
msg = (
f"REMOTE CONTROLLER: Successfully registered {len(handlers_info)} "
f"decorated commands/getters/setters for module {module_name}."
f"commands/getters/setters for module {module_name}."
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -645,12 +797,46 @@ class CthulhuRemoteController:
msg = "REMOTE CONTROLLER: D-Bus service shut down."
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._pending_registrations.clear()
self._total_commands = 0
self._total_getters = 0
self._total_setters = 0
self._total_modules = 0
def is_running(self) -> bool:
"""Checks if the D-Bus service is currently running."""
return self._is_running
def _count_system_commands(self) -> int:
"""Counts the system-wide D-Bus commands available on the main service interface."""
if not self._dbus_service_interface:
return 0
system_commands = 0
for attr_name in dir(self._dbus_service_interface):
if not attr_name.startswith("_") and attr_name[0].isupper():
attr = getattr(self._dbus_service_interface, attr_name)
if callable(attr) and hasattr(attr, "__doc__"):
system_commands += 1
return system_commands
def _print_registration_summary(self) -> None:
"""Prints a summary of all registered D-Bus handlers."""
system_commands_count = self._count_system_commands()
total_handlers = self._total_commands + self._total_getters + self._total_setters
msg = (
f"REMOTE CONTROLLER: Registration complete. Summary: "
f"{self._total_modules} modules, "
f"{self._total_commands} module commands, "
f"{self._total_getters} module getters, "
f"{self._total_setters} module setters, "
f"{system_commands_count} system commands. "
f"Total handlers: {total_handlers + system_commands_count}."
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
_remote_controller: CthulhuRemoteController = CthulhuRemoteController()
def get_remote_controller() -> CthulhuRemoteController:
+51 -20
View File
@@ -40,6 +40,7 @@ import time
from . import debug
from . import input_event
from . import input_event_manager
from . import cthulhu_state
from . import script_manager
from . import settings
@@ -96,7 +97,7 @@ class EventManager:
"""Called when this event manager is activated."""
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Activating', True)
self.setKeyHandling(False)
self.setKeyHandling(True) # Enable new InputEventManager for global keyboard capture
self._active = True
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Activated', True)
@@ -104,14 +105,21 @@ class EventManager:
def activateNewKeyHandling(self):
if not self.newKeyHandlingActive:
try:
cthulhu_state.device = Atspi.Device.new()
except Exception:
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Attempting to activate new keyboard handling', True)
# Use the new InputEventManager instead of direct Atspi.Device
self._inputEventManager = input_event_manager.getManager()
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: InputEventManager obtained', True)
self._inputEventManager.start_key_watcher()
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Key watcher started', True)
cthulhu_state.device = self._inputEventManager._device # For compatibility
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f'EVENT MANAGER: New keyboard handling failed: {e}', True)
self.forceLegacyKeyHandling = True
self.activateLegacyKeyHandling()
return
cthulhu_state.device.key_watcher = cthulhu_state.device.add_key_watcher(
self._processNewKeyboardEvent)
self.newKeyHandlingActive = True
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: New keyboard handling activated with InputEventManager', True)
# Notify plugin system that device is now available for keybinding registration
from . import cthulhu
@@ -145,6 +153,9 @@ class EventManager:
def deactivateNewKeyHandling(self):
if self.newKeyHandlingActive:
if hasattr(self, '_inputEventManager'):
self._inputEventManager.stop_key_watcher()
self._inputEventManager = None
cthulhu_state.device = None
self.newKeyHandlingActive = False
@@ -1142,19 +1153,8 @@ class EventManager:
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _processNewKeyboardEvent(self, device, pressed, keycode, keysym, state, text):
event = Atspi.DeviceEvent()
if pressed:
event.type = Atspi.EventType.KEY_PRESSED_EVENT
else:
event.type = Atspi.EventType.KEY_RELEASED_EVENT
event.hw_code = keycode
event.id = keysym
event.modifiers = state
event.event_string = text
if event.event_string is None:
event.event_string = ""
event.timestamp = time.time()
"""Process keyboard event using new direct KeyboardEvent creation."""
if not pressed and text == "Num_Lock" and "KP_Insert" in settings.cthulhuModifierKeys \
and cthulhu_state.activeScript is not None:
cthulhu_state.activeScript.refreshKeyGrabs()
@@ -1163,10 +1163,41 @@ class EventManager:
cthulhu_state.openingDialog = (text == "space" \
and (state & ~(1 << Atspi.ModifierType.NUMLOCK)))
self._processKeyboardEvent(event)
# Create KeyboardEvent directly with new constructor
keyboardEvent = input_event.KeyboardEvent(pressed, keycode, keysym, state, text or "")
# Set context information
keyboardEvent.setWindow(cthulhu_state.activeWindow)
keyboardEvent.setObject(cthulhu_state.locusOfFocus)
keyboardEvent.setScript(cthulhu_state.activeScript)
# Finalize initialization now that context is set
keyboardEvent._finalize_initialization()
if not keyboardEvent.is_duplicate:
debug.printMessage(debug.LEVEL_INFO, f"\n{keyboardEvent}")
rv = keyboardEvent.process()
# Do any needed xmodmap crap. Hopefully this can die soon.
from cthulhu import cthulhu
cthulhu.updateKeyMap(keyboardEvent)
return rv
def _processKeyboardEvent(self, event):
keyboardEvent = input_event.KeyboardEvent(event)
# Convert AT-SPI event to new KeyboardEvent format
pressed = event.type == Atspi.EventType.KEY_PRESSED_EVENT
keyboardEvent = input_event.KeyboardEvent(pressed, event.hw_code, event.id, event.modifiers, event.event_string or "")
# Set context information
keyboardEvent.setWindow(cthulhu_state.activeWindow)
keyboardEvent.setObject(cthulhu_state.locusOfFocus)
keyboardEvent.setScript(cthulhu_state.activeScript)
# Finalize initialization
keyboardEvent._finalize_initialization()
if not keyboardEvent.is_duplicate:
debug.printMessage(debug.LEVEL_INFO, f"\n{keyboardEvent}")
+77 -37
View File
@@ -233,31 +233,34 @@ class KeyboardEvent(InputEvent):
Gdk.KEY_Yacute,
Gdk.KEY_yacute]
def __init__(self, event):
def __init__(self, pressed, keycode, keysym, modifiers, text):
"""Creates a new InputEvent of type KEYBOARD_EVENT.
Arguments:
- event: the AT-SPI keyboard event
- pressed: True if key is pressed, False if released
- keycode: hardware keycode
- keysym: keysym value
- modifiers: modifier mask
- text: text representation of the key
"""
super().__init__(KEYBOARD_EVENT)
self.id = event.id
self.type = event.type
self.hw_code = event.hw_code
self.modifiers = event.modifiers & Gdk.ModifierType.MODIFIER_MASK
if event.modifiers & (1 << Atspi.ModifierType.NUMLOCK):
self.id = keysym
self.type = Atspi.EventType.KEY_PRESSED_EVENT if pressed else Atspi.EventType.KEY_RELEASED_EVENT
self.hw_code = keycode
self.modifiers = modifiers & Gdk.ModifierType.MODIFIER_MASK
if modifiers & (1 << Atspi.ModifierType.NUMLOCK):
self.modifiers |= (1 << Atspi.ModifierType.NUMLOCK)
self.event_string = event.event_string
self.keyval_name = Gdk.keyval_name(event.id)
if self.event_string == "":
self.event_string = text
self.keyval_name = Gdk.keyval_name(keysym)
if self.event_string == "":
self.event_string = self.keyval_name
self.timestamp = event.timestamp
self.is_duplicate = self in [cthulhu_state.lastInputEvent,
cthulhu_state.lastNonModifierKeyEvent]
self._script = cthulhu_state.activeScript
self.timestamp = time.time() * 1000 # Convert to milliseconds
self.is_duplicate = False # Will be set by InputEventManager
self._script = None
self._app = None
self._window = cthulhu_state.activeWindow
self._obj = cthulhu_state.locusOfFocus
self._window = None
self._obj = None
self._handler = None
self._consumer = None
self._should_consume = None
@@ -283,33 +286,26 @@ class KeyboardEvent(InputEvent):
# trying to heuristically hack around this just by looking at the event
# is not reliable. Ditto regarding asking Gdk for the numlock state.
if self.keyval_name.startswith("KP"):
if event.modifiers & (1 << Atspi.ModifierType.NUMLOCK):
if modifiers & (1 << Atspi.ModifierType.NUMLOCK):
self._is_kp_with_numlock = True
if self._script:
self._app = self._script.app
if not self._window:
cthulhu.setActiveWindow(self._script.utilities.activeWindow())
self._window = cthulhu_state.activeWindow
tokens = ["INPUT EVENT: Updated window and active window to", self._window]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
if self._window and self._app != AXObject.get_application(self._window):
self._script = script_manager.getManager().getScript(
AXObject.get_application(self._window))
self._app = self._script.app
tokens = ["INPUT EVENT: Updated script to", self._script]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
self.keyType = None
self.shouldEcho = False
# Initialize key type - will be refined later in _finalize_initialization
self._finalize_initialization()
def _finalize_initialization(self):
"""Finalize initialization after object creation.
This is separated to allow InputEventManager to set additional properties first."""
if self.is_duplicate:
KeyboardEvent.duplicateCount += 1
else:
KeyboardEvent.duplicateCount = 0
self.keyType = None
_isPressed = event.type == Atspi.EventType.KEY_PRESSED_EVENT
role = AXObject.get_role(self._obj)
_isPressed = self.type == Atspi.EventType.KEY_PRESSED_EVENT
role = AXObject.get_role(self._obj) if self._obj else None
_mayEcho = _isPressed or role == Atspi.Role.TERMINAL
if KeyboardEvent.stickyKeys and not self.isCthulhuModifier() \
@@ -427,8 +423,15 @@ class KeyboardEvent(InputEvent):
return lastEvent
return None
def setClickCount(self):
"""Updates the count of the number of clicks a user has made."""
def setClickCount(self, count=None):
"""Updates the count of the number of clicks a user has made.
If count is provided, sets the click count to that value.
Otherwise, calculates the click count based on event timing."""
if count is not None:
self._clickCount = count
return
doubleEvent = self._getDoubleClickCandidate()
if not doubleEvent:
@@ -736,6 +739,43 @@ class KeyboardEvent(InputEvent):
"""Returns the object believed to be associated with this key event."""
return self._obj
def setObject(self, obj):
"""Sets the object believed to be associated with this key event."""
self._obj = obj
def getWindow(self):
"""Returns the window associated with this key event."""
return self._window
def setWindow(self, window):
"""Sets the window associated with this key event."""
self._window = window
def getScript(self):
"""Returns the script associated with this key event."""
return self._script
def setScript(self, script):
"""Sets the script associated with this key event."""
self._script = script
if script:
self._app = script.app
def getClickCount(self):
"""Returns the click count for this event."""
return self._clickCount
def asSingleLineString(self):
"""Returns a single-line string representation of this event."""
return f"KeyboardEvent({self.keyval_name}, pressed={self.isPressedKey()}, modifiers={self.modifiers})"
def getHandler(self):
"""Returns the handler associated with this key event."""
+431
View File
@@ -0,0 +1,431 @@
#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
# Copyright (c) 2024 Igalia, S.L.
# Copyright (c) 2024 GNOME Foundation Inc.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
"""Provides utilities for managing input events."""
from __future__ import annotations
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
__copyright__ = "Copyright (c) 2024 Stormux" \
"Copyright (c) 2024 Igalia, S.L." \
"Copyright (c) 2024 GNOME Foundation Inc."
__license__ = "LGPL"
from typing import TYPE_CHECKING
import gi
gi.require_version("Atspi", "2.0")
gi.require_version("Gdk", "3.0")
from gi.repository import Atspi
from gi.repository import Gdk
from . import debug
from . import input_event
from . import script_manager
from . import settings
from . import cthulhu_state
from .ax_object import AXObject
from .ax_utilities import AXUtilities
if TYPE_CHECKING:
from . import keybindings
class InputEventManager:
"""Provides utilities for managing input events."""
def __init__(self) -> None:
self._last_input_event: input_event.InputEvent | None = None
self._last_non_modifier_key_event: input_event.KeyboardEvent | None = None
self._device: Atspi.Device | None = None
self._mapped_keycodes: list[int] = []
self._mapped_keysyms: list[int] = []
self._grabbed_bindings: dict[int, keybindings.KeyBinding] = {}
self._paused: bool = False
def start_key_watcher(self) -> None:
"""Starts the watcher for keyboard input events."""
msg = "INPUT EVENT MANAGER: Starting key watcher."
debug.printMessage(debug.LEVEL_INFO, msg, True)
try:
atspi_version = Atspi.get_version()
debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: AT-SPI version: {atspi_version}", True)
if atspi_version >= (2, 55, 90):
debug.printMessage(debug.LEVEL_INFO, "INPUT EVENT MANAGER: Using Device.new_full", True)
self._device = Atspi.Device.new_full("org.stormux.Cthulhu")
else:
debug.printMessage(debug.LEVEL_INFO, "INPUT EVENT MANAGER: Using Device.new", True)
self._device = Atspi.Device.new()
debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: Device created: {self._device}", True)
debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: About to add key watcher callback", True)
result = self._device.add_key_watcher(self.process_keyboard_event)
debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: add_key_watcher result: {result}", True)
debug.printMessage(debug.LEVEL_INFO, "INPUT EVENT MANAGER: Key watcher added successfully", True)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: Error in start_key_watcher: {e}", True)
raise
def stop_key_watcher(self) -> None:
"""Stops the watcher for keyboard input events."""
msg = "INPUT EVENT MANAGER: Stopping key watcher."
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._device = None
def pause_key_watcher(self, pause: bool = True, reason: str = "") -> None:
"""Pauses processing of keyboard input events."""
msg = f"INPUT EVENT MANAGER: Pause processing: {pause}. {reason}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._paused = pause
def check_grabbed_bindings(self) -> None:
"""Checks the grabbed key bindings."""
msg = f"INPUT EVENT MANAGER: {len(self._grabbed_bindings)} grabbed key bindings."
debug.printMessage(debug.LEVEL_INFO, msg, True)
for grab_id, binding in self._grabbed_bindings.items():
msg = f"INPUT EVENT MANAGER: {grab_id} for: {binding}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> list[int]:
"""Adds grabs for binding if it is enabled, returns grab IDs."""
if not (binding.is_enabled() and binding.is_bound()):
return []
if binding.has_grabs():
tokens = ["INPUT EVENT MANAGER:", binding, "already has grabs."]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return []
if self._device is None:
tokens = ["INPUT EVENT MANAGER: No device to add grab for", binding]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return []
grab_ids = []
for kd in binding.key_definitions():
grab_id = self._device.add_key_grab(kd, None)
grab_ids.append(grab_id)
self._grabbed_bindings[grab_id] = binding
return grab_ids
def remove_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> None:
"""Removes grabs for binding."""
if self._device is None:
tokens = ["INPUT EVENT MANAGER: No device to remove grab from", binding]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return
grab_ids = binding.get_grab_ids()
if not grab_ids:
tokens = ["INPUT EVENT MANAGER:", binding, "doesn't have grabs to remove."]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return
for grab_id in grab_ids:
self._device.remove_key_grab(grab_id)
removed = self._grabbed_bindings.pop(grab_id, None)
if removed is None:
msg = f"INPUT EVENT MANAGER: No key binding for grab id {grab_id}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def map_keycode_to_modifier(self, keycode: int) -> int:
"""Maps keycode as a modifier, returns the newly-mapped modifier."""
if self._device is None:
msg = f"INPUT EVENT MANAGER: No device to map keycode {keycode} to modifier"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return 0
self._mapped_keycodes.append(keycode)
return self._device.map_modifier(keycode)
def map_keysym_to_modifier(self, keysym: int) -> int:
"""Maps keysym as a modifier, returns the newly-mapped modifier."""
if self._device is None:
msg = f"INPUT EVENT MANAGER: No device to map keysym {keysym} to modifier"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return 0
self._mapped_keysyms.append(keysym)
return self._device.map_keysym_modifier(keysym)
def unmap_all_modifiers(self) -> None:
"""Unmaps all previously mapped modifiers."""
if self._device is None:
msg = "INPUT EVENT MANAGER: No device to unmap all modifiers from"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
for keycode in self._mapped_keycodes:
self._device.unmap_modifier(keycode)
for keysym in self._mapped_keysyms:
self._device.unmap_keysym_modifier(keysym)
self._mapped_keycodes.clear()
self._mapped_keysyms.clear()
def add_grab_for_modifier(self, modifier: str, keysym: int, keycode: int) -> int:
"""Adds grab for modifier, returns grab id."""
if self._device is None:
msg = f"INPUT EVENT MANAGER: No device to add grab for {modifier}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return -1
kd = Atspi.KeyDefinition()
kd.keysym = keysym
kd.keycode = keycode
kd.modifiers = 0
grab_id = self._device.add_key_grab(kd)
msg = f"INPUT EVENT MANAGER: Grab id for {modifier}: {grab_id}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return grab_id
def remove_grab_for_modifier(self, modifier: str, grab_id: int) -> None:
"""Removes grab for modifier."""
if self._device is None:
msg = f"INPUT EVENT MANAGER: No device to remove grab from {modifier}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return
self._device.remove_key_grab(grab_id)
msg = f"INPUT EVENT MANAGER: Grab id removed for {modifier}: {grab_id}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def grab_keyboard(self, reason: str = "") -> None:
"""Grabs the keyboard, e.g. when entering learn mode."""
msg = "INPUT EVENT MANAGER: Grabbing keyboard"
if reason:
msg += f" Reason: {reason}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
Atspi.Device.grab_keyboard(self._device)
def ungrab_keyboard(self, reason: str = "") -> None:
"""Removes keyboard grab, e.g. when exiting learn mode."""
msg = "INPUT EVENT MANAGER: Ungrabbing keyboard"
if reason:
msg += f" Reason: {reason}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
Atspi.Device.ungrab_keyboard(self._device)
def process_braille_event(self, event: Atspi.Event) -> bool:
"""Processes this Braille event."""
braille_event = input_event.BrailleEvent(event)
result = braille_event.process()
self._last_input_event = braille_event
self._last_non_modifier_key_event = None
return result
def process_mouse_button_event(self, event: Atspi.Event) -> None:
"""Processes this Mouse event."""
mouse_event = input_event.MouseButtonEvent(event)
mouse_event.setClickCount(self._determine_mouse_event_click_count(mouse_event))
self._last_input_event = mouse_event
def process_keyboard_event(self, _device, pressed, keycode, keysym, modifiers, text):
"""Processes this Atspi keyboard event."""
debug.printMessage(debug.LEVEL_INFO, f"INPUT EVENT MANAGER: Received keyboard event: pressed={pressed}, keycode={keycode}, keysym={keysym}, text='{text}'", True)
if self._paused:
msg = "INPUT EVENT MANAGER: Keyboard event processing is paused."
debug.printMessage(debug.LEVEL_INFO, msg, True)
return False
# Handle Cthulhu-specific logic before creating event
if not pressed and text == "Num_Lock" and "KP_Insert" in settings.cthulhuModifierKeys \
and cthulhu_state.activeScript is not None:
cthulhu_state.activeScript.refreshKeyGrabs()
if pressed:
cthulhu_state.openingDialog = (text == "space" \
and (modifiers & ~(1 << Atspi.ModifierType.NUMLOCK)))
event = input_event.KeyboardEvent(pressed, keycode, keysym, modifiers, text or "")
if event in [self._last_input_event, self._last_non_modifier_key_event]:
msg = "INPUT EVENT MANAGER: Received duplicate event."
debug.printMessage(debug.LEVEL_INFO, msg, True)
return False
if pressed:
# Get active window and focus
window = cthulhu_state.activeWindow
# Use cthulhu_state.activeScript instead of ScriptManager API
active_script = cthulhu_state.activeScript
if active_script and hasattr(active_script, 'utilities') and hasattr(active_script.utilities, 'canBeActiveWindow'):
if not active_script.utilities.canBeActiveWindow(window):
new_window = active_script.utilities.activeWindow()
if new_window is not None:
window = new_window
tokens = ["INPUT EVENT MANAGER: Updating window and active window to", window]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
cthulhu_state.activeWindow = window
else:
tokens = ["WARNING:", window, "cannot be active window. No alternative found."]
debug.printTokens(debug.LEVEL_WARNING, tokens, True)
event.setWindow(window)
event.setObject(cthulhu_state.locusOfFocus)
event.setScript(active_script)
elif self.last_event_was_keyboard():
assert isinstance(self._last_input_event, input_event.KeyboardEvent)
event.setWindow(self._last_input_event.getWindow())
event.setObject(self._last_input_event.getObject())
event.setScript(self._last_input_event.getScript())
else:
event.setWindow(cthulhu_state.activeWindow)
event.setObject(cthulhu_state.locusOfFocus)
event.setScript(cthulhu_state.activeScript)
# Finalize initialization now that context is set
event._finalize_initialization()
if not event.is_duplicate:
debug.printMessage(debug.LEVEL_INFO, f"\n{event}")
event.setClickCount(self._determine_keyboard_event_click_count(event))
rv = event.process()
# Do any needed xmodmap handling
from . import cthulhu
cthulhu.updateKeyMap(event)
if event.isModifierKey():
if self.is_release_for(event, self._last_input_event):
msg = "INPUT EVENT MANAGER: Clearing last non modifier key event"
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._last_non_modifier_key_event = None
else:
self._last_non_modifier_key_event = event
self._last_input_event = event
return rv
def _determine_keyboard_event_click_count(self, event: input_event.KeyboardEvent) -> int:
"""Determines the click count of event."""
if not self.last_event_was_keyboard():
return 1
if event.isModifierKey():
last_event = self._last_input_event
else:
last_event = self._last_non_modifier_key_event or self._last_input_event
assert isinstance(last_event, input_event.KeyboardEvent)
if (event.time - last_event.time > settings.doubleClickTimeout) or \
(event.keyval_name != last_event.keyval_name) or \
(event.getObject() != last_event.getObject()):
return 1
last_count = last_event.getClickCount()
if not event.isPressedKey():
return last_count
if last_event.isPressedKey():
return last_count
if (event.isModifierKey() and last_count == 2) or last_count == 3:
return 1
return last_count + 1
def _determine_mouse_event_click_count(self, event: input_event.MouseButtonEvent) -> int:
"""Determines the click count of event."""
if not self.last_event_was_mouse_button():
return 1
assert isinstance(self._last_input_event, input_event.MouseButtonEvent)
if not event.pressed:
return self._last_input_event.getClickCount()
if self._last_input_event.button != event.button:
return 1
if event.time - self._last_input_event.time > settings.doubleClickTimeout:
return 1
return self._last_input_event.getClickCount() + 1
def last_event_was_keyboard(self) -> bool:
"""Returns True if the last event is a keyboard event."""
return isinstance(self._last_input_event, input_event.KeyboardEvent)
def last_event_was_mouse_button(self) -> bool:
"""Returns True if the last event is a mouse button event."""
return isinstance(self._last_input_event, input_event.MouseButtonEvent)
def is_release_for(self, event1, event2):
"""Returns True if event1 is a release for event2."""
if event1 is None or event2 is None:
return False
if not isinstance(event1, input_event.KeyboardEvent) \
or not isinstance(event2, input_event.KeyboardEvent):
return False
if event1.isPressedKey() or not event2.isPressedKey():
return False
result = event1.id == event2.id \
and event1.hw_code == event2.hw_code \
and event1.keyval_name == event2.keyval_name
if result and not event1.isModifierKey():
result = event1.modifiers == event2.modifiers
msg = (
f"INPUT EVENT MANAGER: {event1.asSingleLineString()} "
f"is release for {event2.asSingleLineString()}: {result}"
)
debug.printMessage(debug.LEVEL_INFO, msg, True)
return result
def last_event_equals_or_is_release_for_event(self, event):
"""Returns True if the last non-modifier event equals, or is the release for, event."""
if self._last_non_modifier_key_event is None:
return False
if event == self._last_non_modifier_key_event:
return True
return self.is_release_for(self._last_non_modifier_key_event, event)
_manager = InputEventManager()
def getManager():
"""Returns the Input Event Manager singleton."""
return _manager
+3 -6
View File
@@ -723,12 +723,9 @@ class Script(script.Script):
self.speechAndVerbosityManager.update_punctuation_level()
self.speechAndVerbosityManager.update_capitalization_style()
# Gtk 4 requrns "GTK", while older versions return "gtk"
# TODO: move this to a toolkit-specific script
if self.app is not None and self.app.toolkitName == "GTK" and self.app.toolkitVersion > "4":
cthulhu.setKeyHandling(True)
else:
cthulhu.setKeyHandling(False)
# Use new InputEventManager for global keyboard capture by default
# Only fall back to legacy handling for problematic applications
cthulhu.setKeyHandling(True)
self.addKeyGrabs()