Preparing for release.

This commit is contained in:
Storm Dragon
2026-01-26 13:07:33 -05:00
12 changed files with 530 additions and 369 deletions
+1
View File
@@ -102,3 +102,4 @@ po/insert-header.sed
!/help/C/*.xml
/help/*/*.mo
/help/*/*.stamp
.aider*
+1 -1
View File
@@ -1,7 +1,7 @@
# Maintainer: Storm Dragon <storm_dragon@stormux.org>
pkgname=cthulhu
pkgver=2026.01.12
pkgver=2026.01.26
pkgrel=1
pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca"
url="https://git.stormux.org/storm/cthulhu"
+1 -1
View File
@@ -1,5 +1,5 @@
project('cthulhu',
version: '2026.01.12-master',
version: '2026.01.26-master',
meson_version: '>= 1.0.0',
)
+53 -23
View File
@@ -36,13 +36,26 @@ __copyright__ = "Copyright (c) 2004-2009 Sun Microsystems Inc." \
__license__ = "LGPL"
import faulthandler
from typing import TYPE_CHECKING, Any, Callable, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from . import dbus_service
if TYPE_CHECKING:
from types import FrameType
from gi.repository.Gio import Settings as GSettings
from gi.repository import Gtk
from .settings_manager import SettingsManager
from .script_manager import ScriptManager
from .plugin_system_manager import PluginSystemManager
from .input_event import InputEvent
from .input_event_manager import InputEventManager
from .event_manager import EventManager
from .signal_manager import SignalManager
from .dynamic_api_manager import DynamicApiManager
from .speech import Speech
from .braille import Braille
from .script import Script
class APIHelper:
"""Helper class for plugin API interactions, including keybindings."""
@@ -54,7 +67,7 @@ class APIHelper:
- app: the Cthulhu application
"""
self.app: Cthulhu = app
self._gestureBindings: dict[Optional[str], list[Any]] = {}
self._gestureBindings: Dict[Optional[str], List[Any]] = {}
def registerGestureByString(
self,
@@ -914,7 +927,7 @@ def main() -> int:
class Cthulhu(GObject.Object):
# basic signals
__gsignals__: dict[str, tuple[Any, ...]] = {
__gsignals__: Dict[str, Tuple[Any, ...]] = {
"start-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()),
"stop-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()),
"load-setting-begin": (GObject.SignalFlags.RUN_LAST, None, ()),
@@ -927,52 +940,69 @@ class Cthulhu(GObject.Object):
def __init__(self) -> None:
GObject.Object.__init__(self)
# add members
self.resourceManager: Any = resource_manager.ResourceManager(self)
self.settingsManager: Any = settings_manager.SettingsManager(self) # Directly instantiate
self.eventManager: Any = event_manager.EventManager(self) # Directly instantiate
self.scriptManager: Any = script_manager.ScriptManager(self) # Directly instantiate
self.logger: Any = logger.Logger() # Directly instantiate
self.signalManager: Any = signal_manager.SignalManager(self)
self.dynamicApiManager: Any = dynamic_api_manager.DynamicApiManager(self)
self.translationManager: Any = translation_manager.TranslationManager(self)
self.resourceManager: resource_manager.ResourceManager = resource_manager.ResourceManager(self)
self.settingsManager: SettingsManager = settings_manager.SettingsManager(self) # Directly instantiate
self.eventManager: EventManager = event_manager.EventManager(self) # Directly instantiate
self.scriptManager: ScriptManager = script_manager.ScriptManager(self) # Directly instantiate
self.logger: logger.Logger = logger.Logger() # Directly instantiate
self.signalManager: SignalManager = signal_manager.SignalManager(self)
self.dynamicApiManager: DynamicApiManager = dynamic_api_manager.DynamicApiManager(self)
self.translationManager: TranslationManager = translation_manager.TranslationManager(self)
self.debugManager: Any = debug
self.APIHelper: APIHelper = APIHelper(self)
self.createCompatAPI()
self.pluginSystemManager: Any = plugin_system_manager.PluginSystemManager(self)
self.pluginSystemManager: PluginSystemManager = plugin_system_manager.PluginSystemManager(self)
# Scan for available plugins at startup
self.pluginSystemManager.rescanPlugins()
def getAPIHelper(self) -> APIHelper:
return self.APIHelper
def getPluginSystemManager(self) -> Any:
def getPluginSystemManager(self) -> PluginSystemManager:
return self.pluginSystemManager
def getDynamicApiManager(self) -> Any:
def getDynamicApiManager(self) -> DynamicApiManager:
return self.dynamicApiManager
def getSignalManager(self) -> Any:
def getSignalManager(self) -> SignalManager:
return self.signalManager
def getEventManager(self) -> Any:
def getEventManager(self) -> EventManager:
return self.eventManager
def getSettingsManager(self) -> Any:
def getSettingsManager(self) -> SettingsManager:
return self.settingsManager
def getScriptManager(self) -> Any:
def getScriptManager(self) -> ScriptManager:
return self.scriptManager
def get_scriptManager(self) -> Any:
def get_scriptManager(self) -> ScriptManager:
return self.scriptManager
def getDebugManager(self) -> Any:
return self.debugManager
def getTranslationManager(self) -> Any:
def getTranslationManager(self) -> TranslationManager:
return self.translationManager
def getResourceManager(self) -> Any:
def getResourceManager(self) -> resource_manager.ResourceManager:
return self.resourceManager
def getLogger(self) -> Any: # New getter for the logger
def getLogger(self) -> logger.Logger: # New getter for the logger
return self.logger
def addKeyGrab(self, binding: Any) -> list[int]:
def addKeyGrab(self, binding: Any) -> List[int]:
return addKeyGrab(binding)
def removeKeyGrab(self, grab_id: int) -> None:
return removeKeyGrab(grab_id)
def run(self, cacheValues: bool = True) -> int:
return main()
def stop(self) -> None:
pass
def createCompatAPI(self) -> None:
# for now add compatibility layer using Dynamic API
# should be removed step by step
+1 -1
View File
@@ -23,5 +23,5 @@
# Forked from Orca screen reader.
# Cthulhu project: https://git.stormux.org/storm/cthulhu
version = "2026.01.12"
version = "2026.01.26"
codeName = "master"
+52 -41
View File
@@ -23,6 +23,8 @@
# Forked from Orca screen reader.
# Cthulhu project: https://git.stormux.org/storm/cthulhu
from __future__ import annotations
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
@@ -36,7 +38,7 @@ from gi.repository import GLib
import queue
import threading
import time
from typing import Optional, Dict, List, Tuple, Any
from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, Union
from . import cthulhu
from . import debug
@@ -48,21 +50,26 @@ from . import settings
from .ax_object import AXObject
from .ax_utilities import AXUtilities
if TYPE_CHECKING:
from .cthulhu import Cthulhu
from .script import Script
from .input_event_manager import InputEventManager
class EventManager:
EMBEDDED_OBJECT_CHARACTER: str = '\ufffc'
def __init__(self, app: Any, asyncMode: bool = True) -> None: # app is CthulhuApp instance
def __init__(self, app: Cthulhu, asyncMode: bool = True) -> None:
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Initializing', True)
debug.printMessage(debug.LEVEL_INFO, f'EVENT MANAGER: Async Mode is {asyncMode}', True)
self.app: Any = app
self.app: Cthulhu = app
self._asyncMode: bool = asyncMode
self._scriptListenerCounts: Dict[str, int] = {}
self._active: bool = False
self._enqueueCount: int = 0
self._dequeueCount: int = 0
self._cmdlineCache: Dict[int, str] = {}
self._eventQueue: queue.Queue = queue.Queue(0)
self._eventQueue: queue.Queue[Any] = queue.Queue(0)
self._gidleId: int = 0
self._gidleLock: threading.Lock = threading.Lock()
self._gilSleepTime: float = 0.00001
@@ -81,15 +88,15 @@ class EventManager:
'object:state-changed:sensitive',
'object:state-changed:showing',
'object:text-changed:delete']
self._eventsTriggeringSuspension: List[Any] = [] # List of events
self._eventsTriggeringSuspension: List[Atspi.Event] = []
self._ignoredEvents: List[str] = ['object:bounds-changed',
'object:state-changed:defunct',
'object:property-change:accessible-parent']
self._parentsOfDefunctDescendants: List[Any] = [] # List[Atspi.Accessible]
self._parentsOfDefunctDescendants: List[Atspi.Accessible] = []
cthulhu_state.device = None
self._keyHandlingActive: bool = False
self._inputEventManager: Optional[Any] = None # Optional[InputEventManager]
self._inputEventManager: Optional[InputEventManager] = None
debug.printMessage(debug.LEVEL_INFO, 'Event manager initialized', True)
@@ -174,7 +181,7 @@ class EventManager:
if eventType in self._ignoredEvents:
self._ignoredEvents.remove(eventType)
def _isDuplicateEvent(self, event: Any) -> bool: # event: Atspi.Event
def _isDuplicateEvent(self, event: Atspi.Event) -> bool:
"""Returns True if this event is already in the event queue."""
if self._inFlood() and self._prioritizeDuringFlood(event):
@@ -193,7 +200,7 @@ class EventManager:
return False
def _getAppCmdline(self, app: Any) -> str: # app: Atspi.Accessible
def _getAppCmdline(self, app: Atspi.Accessible) -> str:
pid = AXObject.get_process_id(app)
if pid == -1:
return ""
@@ -204,7 +211,7 @@ class EventManager:
self._cmdlineCache[pid] = cmdline
return cmdline
def _isSteamApp(self, app: Any) -> bool: # app: Atspi.Accessible
def _isSteamApp(self, app: Atspi.Accessible) -> bool:
name = AXObject.get_name(app)
if not name:
nameLower = ""
@@ -217,7 +224,7 @@ class EventManager:
cmdline = self._getAppCmdline(app)
return "steamwebhelper" in cmdline
def _isSteamNotificationEvent(self, event: Any) -> bool: # event: Atspi.Event
def _isSteamNotificationEvent(self, event: Atspi.Event) -> bool:
for obj in (event.any_data, event.source):
if not isinstance(obj, Atspi.Accessible):
continue
@@ -230,7 +237,7 @@ class EventManager:
return False
def _ignore(self, event: Any) -> bool: # event: Atspi.Event
def _ignore(self, event: Atspi.Event) -> bool:
"""Returns True if this event should be ignored."""
app = AXObject.get_application(event.source)
@@ -238,19 +245,19 @@ class EventManager:
tokens = ["EVENT MANAGER:", event.type, "from", app]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
def _log_ignore(reason, message):
def _log_ignore(reason: str, message: str) -> None:
debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER",
f"Ignoring - {message}", reason, True)
def _log_allow(reason, message):
def _log_allow(reason: str, message: str) -> None:
debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER",
f"Not ignoring - {message}", reason, True)
def _ignore_with_reason(reason, message):
def _ignore_with_reason(reason: str, message: str) -> bool:
_log_ignore(reason, message)
return True
def _allow_with_reason(reason, message):
def _allow_with_reason(reason: str, message: str) -> bool:
_log_allow(reason, message)
return False
@@ -430,7 +437,7 @@ class EventManager:
return _allow_with_reason("no-cause", "no ignore condition met")
def _addToQueue(self, event, asyncMode):
def _addToQueue(self, event: Any, asyncMode: bool) -> None:
debugging = debug.debugEventQueue
if debugging:
debug.printMessage(debug.LEVEL_ALL, " acquiring lock...")
@@ -457,7 +464,7 @@ class EventManager:
if debug.debugEventQueue:
debug.printMessage(debug.LEVEL_ALL, " ...released")
def _queuePrintln(self, e, isEnqueue=True, isPrune=None):
def _queuePrintln(self, e: Any, isEnqueue: bool = True, isPrune: Optional[bool] = None) -> None:
"""Convenience method to output queue-related debugging info."""
if debug.LEVEL_INFO < debug.debugLevel:
@@ -483,7 +490,7 @@ class EventManager:
tokens[0:0] = ["EVENT MANAGER: Dequeued"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
def _suspendEvents(self, triggeringEvent):
def _suspendEvents(self, triggeringEvent: Atspi.Event) -> None:
self._eventsTriggeringSuspension.append(triggeringEvent)
if self._eventsSuspended:
@@ -499,7 +506,7 @@ class EventManager:
self._eventsSuspended = True
def _unsuspendEvents(self, triggeringEvent, force=False):
def _unsuspendEvents(self, triggeringEvent: Atspi.Event, force: bool = False) -> None:
if triggeringEvent in self._eventsTriggeringSuspension:
self._eventsTriggeringSuspension.remove(triggeringEvent)
@@ -521,7 +528,7 @@ class EventManager:
self._eventsSuspended = False
def _shouldSuspendEventsFor(self, event):
def _shouldSuspendEventsFor(self, event: Atspi.Event) -> bool:
if AXUtilities.is_frame(event.source) \
or (AXUtilities.is_window(event.source) \
and AXUtilities.get_application_toolkit_name(event.source) == "clutter"):
@@ -541,7 +548,7 @@ class EventManager:
return False
def _shouldUnsuspendEventsFor(self, event):
def _shouldUnsuspendEventsFor(self, event: Atspi.Event) -> bool:
if event.type.startswith("object:state-changed:focused") and event.detail1:
msg = "EVENT MANAGER: Should unsuspend events for newly-focused object."
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -559,10 +566,10 @@ class EventManager:
return False
def _didSuspendEventsFor(self, event):
def _didSuspendEventsFor(self, event: Atspi.Event) -> bool:
return event in self._eventsTriggeringSuspension
def _enqueue(self, e):
def _enqueue(self, e: Atspi.Event) -> None:
"""Handles the enqueueing of all events destined for scripts.
Arguments:
@@ -621,7 +628,7 @@ class EventManager:
if debug.debugEventQueue:
self._enqueueCount -= 1
def _isNoFocus(self):
def _isNoFocus(self) -> bool:
if cthulhu_state.locusOfFocus or cthulhu_state.activeWindow or cthulhu_state.activeScript:
return False
@@ -629,7 +636,7 @@ class EventManager:
debug.printMessage(debug.LEVEL_SEVERE, msg, True)
return True
def _onNoFocus(self):
def _onNoFocus(self) -> bool:
if not self._isNoFocus():
return False
@@ -638,7 +645,7 @@ class EventManager:
defaultScript.idleMessage()
return False
def _dequeue(self):
def _dequeue(self) -> bool:
"""Handles all events destined for scripts. Called by the GTK
idle thread."""
@@ -736,7 +743,7 @@ class EventManager:
self._listener.deregister(eventType)
del self._scriptListenerCounts[eventType]
def registerScriptListeners(self, script: Any) -> None: # script: Script
def registerScriptListeners(self, script: Script) -> None:
"""Tells the event manager to start listening for all the event types
of interest to the script.
@@ -750,7 +757,7 @@ class EventManager:
for eventType in script.listeners.keys():
self.registerListener(eventType)
def deregisterScriptListeners(self, script: Any) -> None: # script: Script
def deregisterScriptListeners(self, script: Script) -> None:
"""Tells the event manager to stop listening for all the event types
of interest to the script.
@@ -764,7 +771,7 @@ class EventManager:
for eventType in script.listeners.keys():
self.deregisterListener(eventType)
def _processInputEvent(self, event):
def _processInputEvent(self, event: Any) -> None:
"""Processes the given input event based on the keybinding from the
currently-active script.
@@ -798,7 +805,7 @@ class EventManager:
debug.printMessage(debug.eventDebugLevel, msg, False)
@staticmethod
def _get_scriptForEvent(event: Any) -> Optional[Any]: # Returns Optional[Script]
def _get_scriptForEvent(event: Any) -> Optional[Script]:
"""Returns the script associated with event."""
if event.type.startswith("mouse:"):
@@ -836,7 +843,7 @@ class EventManager:
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return script
def _isActivatableEvent(self, event: Any, script: Optional[Any] = None) -> Tuple[bool, str]:
def _isActivatableEvent(self, event: Atspi.Event, script: Optional[Script] = None) -> Tuple[bool, str]:
"""Determines if the event is one which should cause us to
change which script is currently active.
@@ -902,7 +909,7 @@ class EventManager:
return False, "No reason found to activate a different script."
def _eventSourceIsDead(self, event: Any) -> bool: # event: Atspi.Event
def _eventSourceIsDead(self, event: Atspi.Event) -> bool:
if AXObject.is_dead(event.source):
tokens = ["EVENT MANAGER: source of", event.type, "is dead"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
@@ -910,7 +917,7 @@ class EventManager:
return False
def _ignoreDuringDeluge(self, event: Any) -> bool: # event: Atspi.Event
def _ignoreDuringDeluge(self, event: Atspi.Event) -> bool:
"""Returns true if this event should be ignored during a deluge."""
if self._eventSourceIsDead(event):
@@ -946,7 +953,7 @@ class EventManager:
return False
def _processDuringFlood(self, event: Any) -> bool: # event: Atspi.Event
def _processDuringFlood(self, event: Atspi.Event) -> bool:
"""Returns true if this event should be processed during a flood."""
if self._eventSourceIsDead(event):
@@ -973,7 +980,7 @@ class EventManager:
return event.source == cthulhu_state.locusOfFocus
def _prioritizeDuringFlood(self, event: Any) -> bool: # event: Atspi.Event
def _prioritizeDuringFlood(self, event: Atspi.Event) -> bool:
"""Returns true if this event should be prioritized during a flood."""
if event.type.startswith("object:state-changed:focused"):
@@ -1007,7 +1014,7 @@ class EventManager:
oldSize = self._eventQueue.qsize()
newQueue = queue.Queue(0)
newQueue: queue.Queue[Any] = queue.Queue(0)
while not self._eventQueue.empty():
try:
event = self._eventQueue.get()
@@ -1034,7 +1041,7 @@ class EventManager:
return False
def _processObjectEvent(self, event):
def _processObjectEvent(self, event: Atspi.Event) -> None:
"""Handles all object events destined for scripts.
Arguments:
@@ -1133,7 +1140,7 @@ class EventManager:
msg = f"EVENT MANAGER: {key}: {value}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def processBrailleEvent(self, brailleEvent: Any) -> bool: # brailleEvent: BrailleEvent
def processBrailleEvent(self, brailleEvent: input_event.BrailleEvent) -> bool:
"""Called whenever a cursor key is pressed on the Braille display.
Arguments:
@@ -1151,8 +1158,12 @@ class EventManager:
_manager: Optional[EventManager] = None
def getManager() -> EventManager:
def getManager() -> Optional[EventManager]:
global _manager
if _manager is None:
_manager = cthulhu.cthulhuApp.eventManager
try:
if cthulhu.cthulhuApp:
_manager = cthulhu.cthulhuApp.eventManager
except AttributeError:
pass
return _manager
+25 -19
View File
@@ -40,7 +40,7 @@ __copyright__ = "Copyright (c) 2005-2008 Sun Microsystems Inc." \
"Copyright (c) 2016-2023 Igalia, S.L."
__license__ = "LGPL"
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Tuple, Any
import gi
gi.require_version("Atspi", "2.0")
@@ -54,7 +54,13 @@ from . import script_manager
from .ax_object import AXObject
from .ax_table import AXTable
from .ax_text import AXText
def _get_ax_utilities():
if TYPE_CHECKING:
from .cthulhu import Cthulhu
from .input_event import InputEvent
from .scripts import default
def _get_ax_utilities() -> Any:
# Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager.
from .ax_utilities import AXUtilities
return AXUtilities
@@ -65,10 +71,6 @@ def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, sta
def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log_tokens(debug.LEVEL_INFO, "FOCUS MANAGER", tokens, reason, timestamp, stack)
if TYPE_CHECKING:
from .input_event import InputEvent
from .scripts import default
CARET_TRACKING = "caret-tracking"
FOCUS_TRACKING = "focus-tracking"
FLAT_REVIEW = "flat-review"
@@ -80,15 +82,15 @@ SAY_ALL = "say-all"
class FocusManager:
"""Manages the focused object, window, etc."""
def __init__(self, app) -> None: # Added app argument
self.app = app # Store app instance
def __init__(self, app: Cthulhu) -> None:
self.app: Cthulhu = app # Store app instance
self._window: Optional[Atspi.Accessible] = cthulhu_state.activeWindow
self._focus: Optional[Atspi.Accessible] = cthulhu_state.locusOfFocus
self._object_of_interest: Optional[Atspi.Accessible] = cthulhu_state.objOfInterest
self._active_mode: Optional[str] = cthulhu_state.activeMode
self._last_cell_coordinates: tuple[int, int] = (-1, -1)
self._last_cursor_position: tuple[Optional[Atspi.Accessible], int] = (None, -1)
self._penultimate_cursor_position: tuple[Optional[Atspi.Accessible], int] = (None, -1)
self._last_cell_coordinates: Tuple[int, int] = (-1, -1)
self._last_cursor_position: Tuple[Optional[Atspi.Accessible], int] = (None, -1)
self._penultimate_cursor_position: Tuple[Optional[Atspi.Accessible], int] = (None, -1)
_log("Registering D-Bus commands.")
controller = dbus_service.get_remote_controller()
@@ -188,20 +190,20 @@ class FocusManager:
def get_active_mode_and_object_of_interest(
self
) -> tuple[Optional[str], Optional[Atspi.Accessible]]:
) -> Tuple[Optional[str], Optional[Atspi.Accessible]]:
"""Returns the current mode and associated object of interest"""
_log_tokens(["Active mode:", self._active_mode, "Object of interest:", self._object_of_interest])
return self._active_mode, self._object_of_interest
def get_penultimate_cursor_position(self) -> tuple[Optional[Atspi.Accessible], int]:
def get_penultimate_cursor_position(self) -> Tuple[Optional[Atspi.Accessible], int]:
"""Returns the penultimate cursor position as a tuple of (object, offset)."""
obj, offset = self._penultimate_cursor_position
_log_tokens(["Penultimate cursor position:", obj, offset])
return obj, offset
def get_last_cursor_position(self) -> tuple[Optional[Atspi.Accessible], int]:
def get_last_cursor_position(self) -> Tuple[Optional[Atspi.Accessible], int]:
"""Returns the last cursor position as a tuple of (object, offset)."""
obj, offset = self._last_cursor_position
@@ -215,7 +217,7 @@ class FocusManager:
self._penultimate_cursor_position = self._last_cursor_position
self._last_cursor_position = obj, offset
def get_last_cell_coordinates(self) -> tuple[int, int]:
def get_last_cell_coordinates(self) -> Tuple[int, int]:
"""Returns the last known cell coordinates as a tuple of (row, column)."""
row, column = self._last_cell_coordinates
@@ -437,12 +439,16 @@ class FocusManager:
return script.browse_mode_is_sticky()
return False
_manager = None
def get_manager():
_manager: Optional[FocusManager] = None
def get_manager() -> Optional[FocusManager]:
"""Returns the Focus Manager"""
global _manager
if _manager is None:
from . import cthulhu
_manager = FocusManager(cthulhu.cthulhuApp)
try:
from . import cthulhu
if cthulhu.cthulhuApp:
_manager = FocusManager(cthulhu.cthulhuApp)
except (ImportError, AttributeError):
pass
return _manager
+76 -69
View File
@@ -39,7 +39,7 @@ __copyright__ = "Copyright (c) 2024 Igalia, S.L." \
"Copyright (c) 2024 GNOME Foundation Inc."
__license__ = "LGPL"
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict
import gi
gi.require_version("Atspi", "2.0")
@@ -64,9 +64,9 @@ class InputEventManager:
self._last_input_event: Optional[input_event.InputEvent] = None
self._last_non_modifier_key_event: Optional[input_event.KeyboardEvent] = None
self._device: Optional[Atspi.Device] = None
self._mapped_keycodes: list[int] = []
self._mapped_keysyms: list[int] = []
self._grabbed_bindings: dict[int, keybindings.KeyBinding] = {}
self._mapped_keycodes: List[int] = []
self._mapped_keysyms: List[int] = []
self._grabbed_bindings: Dict[int, keybindings.KeyBinding] = {}
self._paused: bool = False
def start_key_watcher(self) -> None:
@@ -100,14 +100,14 @@ class InputEventManager:
msg = f"INPUT EVENT MANAGER: {grab_id} for: {binding}"
debug.print_message(debug.LEVEL_INFO, msg, True)
def _get_key_definitions(self, binding: keybindings.KeyBinding) -> list[Atspi.KeyDefinition]:
def _get_key_definitions(self, binding: keybindings.KeyBinding) -> List[Atspi.KeyDefinition]:
if hasattr(binding, "key_definitions"):
return list(binding.key_definitions())
if hasattr(binding, "keyDefs"):
return list(binding.keyDefs())
return []
def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> list[int]:
def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> List[int]:
"""Adds grabs for binding if it is enabled, returns grab IDs."""
if self._device is None:
@@ -427,7 +427,7 @@ class InputEventManager:
return isinstance(self._last_input_event, input_event.MouseButtonEvent)
def is_release_for(self, event1, event2):
def is_release_for(self, event1: Optional[input_event.InputEvent], event2: Optional[input_event.InputEvent]) -> bool:
"""Returns True if event1 is a release for event2."""
if event1 is None or event2 is None:
@@ -454,7 +454,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return result
def last_event_equals_or_is_release_for_event(self, event):
def last_event_equals_or_is_release_for_event(self, event: Optional[input_event.InputEvent]) -> bool:
"""Returns True if the last event equals the provided event, or is the release for it."""
if self._last_input_event is event:
@@ -471,7 +471,7 @@ class InputEventManager:
return self.is_release_for(self._last_non_modifier_key_event, event)
def _last_key_and_modifiers(self):
def _last_key_and_modifiers(self) -> Tuple[str, int]:
"""Returns the last keyval name and modifiers"""
if self._last_non_modifier_key_event is None:
@@ -480,9 +480,12 @@ class InputEventManager:
if not self.last_event_was_keyboard():
return "", 0
if self._last_input_event is None:
return self._last_non_modifier_key_event.keyval_name, 0
return self._last_non_modifier_key_event.keyval_name, self._last_input_event.modifiers
def last_event_was_command(self):
def last_event_was_command(self) -> bool:
"""Returns True if the last event is believed to be a command."""
if bool(self._last_key_and_modifiers()[1] & 1 << Atspi.ModifierType.CONTROL):
@@ -492,7 +495,7 @@ class InputEventManager:
return False
def last_event_was_shortcut_for(self, obj):
def last_event_was_shortcut_for(self, obj: Atspi.Accessible) -> bool:
"""Returns True if the last event is believed to be a shortcut key for obj."""
string = self._last_key_and_modifiers()[0]
@@ -511,12 +514,13 @@ class InputEventManager:
debug.print_tokens(debug.LEVEL_INFO, tokens, True)
return rv
def last_event_was_printable_key(self):
def last_event_was_printable_key(self) -> bool:
"""Returns True if the last event is believed to be a printable key."""
if not self.last_event_was_keyboard():
return False
assert isinstance(self._last_input_event, input_event.KeyboardEvent)
if self._last_input_event.is_printable_key():
msg = "INPUT EVENT MANAGER: Last event was printable key"
debug.print_message(debug.LEVEL_INFO, msg, True)
@@ -524,7 +528,7 @@ class InputEventManager:
return False
def last_event_was_caret_navigation(self):
def last_event_was_caret_navigation(self) -> bool:
"""Returns True if the last event is believed to be caret navigation."""
return self.last_event_was_character_navigation() \
@@ -534,7 +538,7 @@ class InputEventManager:
or self.last_event_was_file_boundary_navigation() \
or self.last_event_was_page_navigation()
def last_event_was_caret_selection(self):
def last_event_was_caret_selection(self) -> bool:
"""Returns True if the last event is believed to be caret selection."""
string, mods = self._last_key_and_modifiers()
@@ -548,7 +552,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_backward_caret_navigation(self):
def last_event_was_backward_caret_navigation(self) -> bool:
"""Returns True if the last event is believed to be backward caret navigation."""
string, mods = self._last_key_and_modifiers()
@@ -562,7 +566,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_forward_caret_navigation(self):
def last_event_was_forward_caret_navigation(self) -> bool:
"""Returns True if the last event is believed to be forward caret navigation."""
string, mods = self._last_key_and_modifiers()
@@ -576,7 +580,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_forward_caret_selection(self):
def last_event_was_forward_caret_selection(self) -> bool:
"""Returns True if the last event is believed to be forward caret selection."""
string, mods = self._last_key_and_modifiers()
@@ -590,7 +594,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_character_navigation(self):
def last_event_was_character_navigation(self) -> bool:
"""Returns True if the last event is believed to be character navigation."""
string, mods = self._last_key_and_modifiers()
@@ -606,7 +610,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_word_navigation(self):
def last_event_was_word_navigation(self) -> bool:
"""Returns True if the last event is believed to be word navigation."""
string, mods = self._last_key_and_modifiers()
@@ -620,7 +624,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_previous_word_navigation(self):
def last_event_was_previous_word_navigation(self) -> bool:
"""Returns True if the last event is believed to be previous-word navigation."""
string, mods = self._last_key_and_modifiers()
@@ -634,7 +638,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_next_word_navigation(self):
def last_event_was_next_word_navigation(self) -> bool:
"""Returns True if the last event is believed to be next-word navigation."""
string, mods = self._last_key_and_modifiers()
@@ -648,7 +652,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_line_navigation(self):
def last_event_was_line_navigation(self) -> bool:
"""Returns True if the last event is believed to be line navigation."""
string, mods = self._last_key_and_modifiers()
@@ -668,7 +672,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_paragraph_navigation(self):
def last_event_was_paragraph_navigation(self) -> bool:
"""Returns True if the last event is believed to be paragraph navigation."""
string, mods = self._last_key_and_modifiers()
@@ -682,7 +686,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_line_boundary_navigation(self):
def last_event_was_line_boundary_navigation(self) -> bool:
"""Returns True if the last event is believed to be navigation to start/end of line."""
string, mods = self._last_key_and_modifiers()
@@ -696,7 +700,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_file_boundary_navigation(self):
def last_event_was_file_boundary_navigation(self) -> bool:
"""Returns True if the last event is believed to be navigation to top/bottom of file."""
string, mods = self._last_key_and_modifiers()
@@ -710,7 +714,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_page_navigation(self):
def last_event_was_page_navigation(self) -> bool:
"""Returns True if the last event is believed to be page navigation."""
string, mods = self._last_key_and_modifiers()
@@ -730,7 +734,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_page_switch(self):
def last_event_was_page_switch(self) -> bool:
"""Returns True if the last event is believed to be a page switch."""
string, mods = self._last_key_and_modifiers()
@@ -746,7 +750,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_tab_navigation(self):
def last_event_was_tab_navigation(self) -> bool:
"""Returns True if the last event is believed to be Tab navigation."""
string, mods = self._last_key_and_modifiers()
@@ -762,7 +766,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_table_sort(self):
def last_event_was_table_sort(self) -> bool:
"""Returns True if the last event is believed to be a table sort."""
focus = focus_manager.get_manager().get_locus_of_focus()
@@ -780,7 +784,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_unmodified_arrow(self):
def last_event_was_unmodified_arrow(self) -> bool:
"""Returns True if the last event is an unmodified arrow."""
string, mods = self._last_key_and_modifiers()
@@ -799,83 +803,83 @@ class InputEventManager:
return True
def last_event_was_alt_modified(self):
def last_event_was_alt_modified(self) -> bool:
"""Returns True if the last event was alt-modified."""
mods = self._last_key_and_modifiers()[-1]
return mods & 1 << Atspi.ModifierType.ALT
return bool(mods & 1 << Atspi.ModifierType.ALT)
def last_event_was_backspace(self):
def last_event_was_backspace(self) -> bool:
"""Returns True if the last event is BackSpace."""
return self._last_key_and_modifiers()[0] == "BackSpace"
def last_event_was_down(self):
def last_event_was_down(self) -> bool:
"""Returns True if the last event is Down."""
return self._last_key_and_modifiers()[0] == "Down"
def last_event_was_f1(self):
def last_event_was_f1(self) -> bool:
"""Returns True if the last event is F1."""
return self._last_key_and_modifiers()[0] == "F1"
def last_event_was_left(self):
def last_event_was_left(self) -> bool:
"""Returns True if the last event is Left."""
return self._last_key_and_modifiers()[0] == "Left"
def last_event_was_left_or_right(self):
def last_event_was_left_or_right(self) -> bool:
"""Returns True if the last event is Left or Right."""
return self._last_key_and_modifiers()[0] in ["Left", "Right"]
def last_event_was_page_up_or_page_down(self):
def last_event_was_page_up_or_page_down(self) -> bool:
"""Returns True if the last event is Page_Up or Page_Down."""
return self._last_key_and_modifiers()[0] in ["Page_Up", "Page_Down"]
def last_event_was_right(self):
def last_event_was_right(self) -> bool:
"""Returns True if the last event is Right."""
return self._last_key_and_modifiers()[0] == "Right"
def last_event_was_return(self):
def last_event_was_return(self) -> bool:
"""Returns True if the last event is Return."""
return self._last_key_and_modifiers()[0] == "Return"
def last_event_was_return_or_space(self):
def last_event_was_return_or_space(self) -> bool:
"""Returns True if the last event is Return or space."""
return self._last_key_and_modifiers()[0] in ["Return", "space", " "]
def last_event_was_return_tab_or_space(self):
def last_event_was_return_tab_or_space(self) -> bool:
"""Returns True if the last event is Return, Tab, or space."""
return self._last_key_and_modifiers()[0] in ["Return", "Tab", "space", " "]
def last_event_was_space(self):
def last_event_was_space(self) -> bool:
"""Returns True if the last event is space."""
return self._last_key_and_modifiers()[0] in [" ", "space"]
def last_event_was_tab(self):
def last_event_was_tab(self) -> bool:
"""Returns True if the last event is Tab."""
return self._last_key_and_modifiers()[0] == "Tab"
def last_event_was_up(self):
def last_event_was_up(self) -> bool:
"""Returns True if the last event is Up."""
return self._last_key_and_modifiers()[0] == "Up"
def last_event_was_up_or_down(self):
def last_event_was_up_or_down(self) -> bool:
"""Returns True if the last event is Up or Down."""
return self._last_key_and_modifiers()[0] in ["Up", "Down"]
def last_event_was_delete(self):
def last_event_was_delete(self) -> bool:
"""Returns True if the last event is believed to be delete."""
string, mods = self._last_key_and_modifiers()
@@ -891,7 +895,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_cut(self):
def last_event_was_cut(self) -> bool:
"""Returns True if the last event is believed to be the cut command."""
string, mods = self._last_key_and_modifiers()
@@ -904,39 +908,39 @@ class InputEventManager:
return True
return False
def last_event_was_copy(self):
def last_event_was_copy(self) -> bool:
"""Returns True if the last event is believed to be the copy command."""
string, mods = self._last_key_and_modifiers()
if string.lower() != "c" or not mods & 1 << Atspi.ModifierType.CONTROL:
rv = False
elif AXUtilities.is_terminal(self._last_input_event.get_object()):
rv = mods & 1 << Atspi.ModifierType.SHIFT
rv = bool(mods & 1 << Atspi.ModifierType.SHIFT)
else:
rv = not mods & 1 << Atspi.ModifierType.SHIFT
rv = not bool(mods & 1 << Atspi.ModifierType.SHIFT)
if rv:
msg = "INPUT EVENT MANAGER: Last event was copy"
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_paste(self):
def last_event_was_paste(self) -> bool:
"""Returns True if the last event is believed to be the paste command."""
string, mods = self._last_key_and_modifiers()
if string.lower() != "v" or not mods & 1 << Atspi.ModifierType.CONTROL:
rv = False
elif AXUtilities.is_terminal(self._last_input_event.get_object()):
rv = mods & 1 << Atspi.ModifierType.SHIFT
rv = bool(mods & 1 << Atspi.ModifierType.SHIFT)
else:
rv = not mods & 1 << Atspi.ModifierType.SHIFT
rv = not bool(mods & 1 << Atspi.ModifierType.SHIFT)
if rv:
msg = "INPUT EVENT MANAGER: Last event was paste"
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_undo(self):
def last_event_was_undo(self) -> bool:
"""Returns True if the last event is believed to be the undo command."""
string, mods = self._last_key_and_modifiers()
@@ -948,16 +952,16 @@ class InputEventManager:
return True
return False
def last_event_was_redo(self):
def last_event_was_redo(self) -> bool:
"""Returns True if the last event is believed to be the redo command."""
string, mods = self._last_key_and_modifiers()
if string.lower() == "z":
rv = mods & 1 << Atspi.ModifierType.CONTROL and mods & 1 << Atspi.ModifierType.SHIFT
rv = bool(mods & 1 << Atspi.ModifierType.CONTROL and mods & 1 << Atspi.ModifierType.SHIFT)
elif string.lower() == "y":
# LibreOffice
rv = mods & 1 << Atspi.ModifierType.CONTROL \
and not mods & 1 << Atspi.ModifierType.SHIFT
rv = bool(mods & 1 << Atspi.ModifierType.CONTROL \
and not mods & 1 << Atspi.ModifierType.SHIFT)
else:
rv = False
@@ -966,7 +970,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True)
return rv
def last_event_was_select_all(self):
def last_event_was_select_all(self) -> bool:
"""Returns True if the last event is believed to be the select all command."""
string, mods = self._last_key_and_modifiers()
@@ -979,7 +983,7 @@ class InputEventManager:
return True
return False
def last_event_was_primary_click(self):
def last_event_was_primary_click(self) -> bool:
"""Returns True if the last event is a primary mouse click."""
if not self.last_event_was_mouse_button():
@@ -991,7 +995,7 @@ class InputEventManager:
return True
return False
def last_event_was_primary_release(self):
def last_event_was_primary_release(self) -> bool:
"""Returns True if the last event is a primary mouse release."""
if not self.last_event_was_mouse_button():
@@ -1003,7 +1007,7 @@ class InputEventManager:
return True
return False
def last_event_was_primary_click_or_release(self):
def last_event_was_primary_click_or_release(self) -> bool:
"""Returns True if the last event is a primary mouse click or release."""
if not self.last_event_was_mouse_button():
@@ -1015,7 +1019,7 @@ class InputEventManager:
return True
return False
def last_event_was_middle_click(self):
def last_event_was_middle_click(self) -> bool:
"""Returns True if the last event is a middle mouse click."""
if not self.last_event_was_mouse_button():
@@ -1027,7 +1031,7 @@ class InputEventManager:
return True
return False
def last_event_was_middle_release(self):
def last_event_was_middle_release(self) -> bool:
"""Returns True if the last event is a middle mouse release."""
if not self.last_event_was_mouse_button():
@@ -1039,7 +1043,7 @@ class InputEventManager:
return True
return False
def last_event_was_secondary_click(self):
def last_event_was_secondary_click(self) -> bool:
"""Returns True if the last event is a secondary mouse click."""
if not self.last_event_was_mouse_button():
@@ -1051,7 +1055,7 @@ class InputEventManager:
return True
return False
def last_event_was_secondary_release(self):
def last_event_was_secondary_release(self) -> bool:
"""Returns True if the last event is a secondary mouse release."""
if not self.last_event_was_mouse_button():
@@ -1064,7 +1068,10 @@ class InputEventManager:
return False
_manager = InputEventManager()
def get_manager():
_manager: Optional[InputEventManager] = None
if _manager is None:
_manager = InputEventManager()
def get_manager() -> InputEventManager:
"""Returns the Input Event Manager singleton."""
return _manager
+55 -37
View File
@@ -23,6 +23,8 @@
# Forked from Orca screen reader.
# Cthulhu project: https://git.stormux.org/storm/cthulhu
from __future__ import annotations
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
@@ -30,18 +32,24 @@ __copyright__ = "Copyright (c) 2011. Cthulhu Team."
__license__ = "LGPL"
import importlib
from typing import Optional, Dict, Any
from typing import TYPE_CHECKING, Optional, Dict, Any, List, Union
from . import debug
from . import cthulhu_state
from .ax_object import AXObject
from .scripts import apps, toolkits
if TYPE_CHECKING:
from gi.repository import Atspi
from .cthulhu import Cthulhu
from .script import Script
from .input_event import InputEvent
# Forward references to avoid circular imports
# Script is defined in script.py
# Atspi.Accessible comes from AT-SPI
def _get_ax_utilities():
def _get_ax_utilities() -> Any:
# Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager -> braille -> settings_manager -> script_manager.
from .ax_utilities import AXUtilities
return AXUtilities
@@ -49,22 +57,22 @@ def _get_ax_utilities():
def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log(debug.LEVEL_INFO, "SCRIPT MANAGER", message, reason, timestamp, stack)
def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
def _log_tokens(tokens: list[Any], reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log_tokens(debug.LEVEL_INFO, "SCRIPT MANAGER", tokens, reason, timestamp, stack)
class ScriptManager:
def __init__(self, app: Any) -> None: # app is the CthulhuApp instance
def __init__(self, app: Cthulhu) -> None:
_log("Initializing")
self.app: Any = app # Store app instance
self.appScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script]
self.toolkitScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]]
self.customScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]]
self._sleepModeScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script]
self._appModules: list = apps.__all__
self._toolkitModules: list = toolkits.__all__
self._defaultScript: Optional[Any] = None # Optional[Script]
self._scriptPackages: list[str] = \
self.app: Cthulhu = app # Store app instance
self.appScripts: Dict[Atspi.Accessible, Script] = {}
self.toolkitScripts: Dict[Atspi.Accessible, Dict[str, Script]] = {}
self.customScripts: Dict[Atspi.Accessible, Dict[str, Script]] = {}
self._sleepModeScripts: Dict[Atspi.Accessible, Script] = {}
self._appModules: List[str] = apps.__all__
self._toolkitModules: List[str] = toolkits.__all__
self._defaultScript: Optional[Script] = None
self._scriptPackages: List[str] = \
["cthulhu-scripts",
"cthulhu.scripts",
"cthulhu.scripts.apps",
@@ -73,6 +81,8 @@ class ScriptManager:
{'Icedove': 'Thunderbird',
'Nereid': 'Banshee',
'gnome-calculator': 'gcalctool',
'Steam': 'steamwebhelper',
'Steam Web Helper': 'steamwebhelper',
'gtk-window-decorator': 'switcher',
'marco': 'switcher',
'xfce4-notifyd': 'notification-daemon',
@@ -92,7 +102,8 @@ class ScriptManager:
_log("Activating")
self._defaultScript = self.get_script(None)
self._defaultScript.registerEventListeners()
if self._defaultScript:
self._defaultScript.registerEventListeners()
self.set_active_script(self._defaultScript, "lifecycle: activate")
self._active = True
_log("Activated")
@@ -111,7 +122,7 @@ class ScriptManager:
self._active = False
_log("Deactivated")
def get_module_name(self, app: Optional[Any]) -> Optional[str]: # app: Optional[Atspi.Accessible]
def get_module_name(self, app: Optional[Atspi.Accessible]) -> Optional[str]:
"""Returns the module name of the script to use for application app."""
if app is None:
@@ -148,19 +159,20 @@ class ScriptManager:
_log_tokens(["Mapped", app, "to", name])
return name
def _toolkit_for_object(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible]
def _toolkit_for_object(self, obj: Optional[Atspi.Accessible]) -> Optional[str]:
"""Returns the name of the toolkit associated with obj."""
if obj is None:
return None
name = AXObject.get_attribute(obj, 'toolkit')
return self._toolkitNames.get(name, name)
def _script_for_role(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible]
def _script_for_role(self, obj: Optional[Atspi.Accessible]) -> str:
if _get_ax_utilities().is_terminal(obj):
return 'terminal'
return ''
def _new_named_script(self, app: Optional[Any], name: Optional[str]) -> Optional[Any]: # Returns Optional[Script]
def _new_named_script(self, app: Optional[Atspi.Accessible], name: Optional[str]) -> Optional[Script]:
"""Attempts to locate and load the named module. If successful, returns
a script based on this module."""
@@ -189,7 +201,7 @@ class ScriptManager:
return script
def _create_script(self, app: Optional[Any], obj: Optional[Any] = None) -> Any: # Returns Script
def _create_script(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible] = None) -> Script:
"""For the given application, create a new script instance."""
moduleName = self.get_module_name(app)
@@ -212,7 +224,7 @@ class ScriptManager:
return script
def get_default_script(self, app: Optional[Any] = None) -> Any: # Returns Script
def get_default_script(self, app: Optional[Atspi.Accessible] = None) -> Script:
if not app and self._defaultScript:
return self._defaultScript
@@ -224,7 +236,7 @@ class ScriptManager:
return script
def sanity_check_script(self, script: Any) -> Any: # Returns Script
def sanity_check_script(self, script: Script) -> Script:
if not self._active:
return script
@@ -238,7 +250,8 @@ class ScriptManager:
_log_tokens(["Failed to get a replacement script for", script.app], "replacement-missing")
return script
def get_script_for_mouse_button_event(self, event: Any) -> Any: # Returns Script
def get_script_for_mouse_button_event(self, event: Any) -> Script:
# Note: event type unspecified in original code, likely InputEvent or similar
isActive = _get_ax_utilities().is_active(cthulhu_state.activeWindow)
_log_tokens([cthulhu_state.activeWindow, "is active:", isActive])
@@ -256,10 +269,10 @@ class ScriptManager:
return self.get_script(AXObject.get_application(activeWindow), activeWindow)
def get_active_script(self) -> Optional[Any]: # Returns Optional[Script]
def get_active_script(self) -> Optional[Script]:
return cthulhu_state.activeScript
def get_script(self, app: Optional[Any], obj: Optional[Any] = None, sanity_check: bool = False) -> Any: # Returns Script
def get_script(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible] = None, sanity_check: bool = False) -> Script:
"""Get a script for an app (and make it if necessary). This is used
instead of a simple calls to Script's constructor.
@@ -275,21 +288,24 @@ class ScriptManager:
roleName = self._script_for_role(obj)
if roleName:
customScripts = self.customScripts.get(app, {})
customScripts = self.customScripts.get(app, {}) # type: ignore
customScript = customScripts.get(roleName)
if not customScript:
customScript = self._new_named_script(app, roleName)
customScripts[roleName] = customScript
self.customScripts[app] = customScripts
if customScript:
customScripts[roleName] = customScript
if app:
self.customScripts[app] = customScripts
objToolkit = self._toolkit_for_object(obj)
if objToolkit:
toolkitScripts = self.toolkitScripts.get(app, {})
toolkitScripts = self.toolkitScripts.get(app, {}) # type: ignore
toolkitScript = toolkitScripts.get(objToolkit)
if not toolkitScript:
toolkitScript = self._create_script(app, obj)
toolkitScripts[objToolkit] = toolkitScript
self.toolkitScripts[app] = toolkitScripts
if app:
self.toolkitScripts[app] = toolkitScripts
try:
if not app:
@@ -317,7 +333,7 @@ class ScriptManager:
return appScript
def get_or_create_sleep_mode_script(self, app: Any) -> Any: # Returns Script
def get_or_create_sleep_mode_script(self, app: Atspi.Accessible) -> Script:
"""Gets or creates the sleep mode script."""
script = self._sleepModeScripts.get(app)
if script is not None:
@@ -329,7 +345,7 @@ class ScriptManager:
self._sleepModeScripts[app] = script
return script
def set_active_script(self, newScript: Optional[Any], reason: Optional[str] = None) -> None: # newScript: Optional[Script]
def set_active_script(self, newScript: Optional[Script], reason: Optional[str] = None) -> None:
"""Set the new active script.
Arguments:
@@ -351,12 +367,13 @@ class ScriptManager:
# Emit signal that active script has changed, so PluginSystemManager can update keybindings
from . import cthulhu
cthulhu.cthulhuApp.getSignalManager().emitSignal('active-script-changed', newScript)
if cthulhu.cthulhuApp:
cthulhu.cthulhuApp.getSignalManager().emitSignal('active-script-changed', newScript)
_log_tokens(["Setting active script to", newScript], reason)
self._log_active_state(reason)
def activate_script_for_context(self, app: Optional[Any], obj: Optional[Any], reason: Optional[str] = None) -> Any: # Returns Script
def activate_script_for_context(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible], reason: Optional[str] = None) -> Script:
script = self.get_script(app, obj)
self.set_active_script(script, reason)
return script
@@ -369,7 +386,7 @@ class ScriptManager:
reason
)
def _get_script_for_app_replicant(self, app: Any) -> Optional[Any]: # Returns Optional[Script]
def _get_script_for_app_replicant(self, app: Atspi.Accessible) -> Optional[Script]:
if not self._active:
return None
@@ -439,11 +456,12 @@ class ScriptManager:
_manager: Optional[ScriptManager] = None
def get_manager() -> ScriptManager:
def get_manager() -> Optional[ScriptManager]:
"""Returns the Script Manager"""
global _manager
if _manager is None:
from . import cthulhu
_manager = ScriptManager(cthulhu.cthulhuApp)
if cthulhu.cthulhuApp:
_manager = ScriptManager(cthulhu.cthulhuApp)
return _manager
+170 -113
View File
@@ -26,6 +26,8 @@
"""Settings manager module. This will load/save user settings from a
defined settings backend."""
from __future__ import annotations
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
@@ -36,6 +38,8 @@ import copy
import importlib
import json
import os
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from gi.repository import Gio, GLib
from . import debug
@@ -46,6 +50,11 @@ from .acss import ACSS
from .ax_object import AXObject
from .keybindings import KeyBinding
if TYPE_CHECKING:
from .cthulhu import Cthulhu
from .script import Script
from .input_event import InputEventHandler
# Removed global cthulhuApp.scriptManager declaration.
# Note: Do not import cthulhu module here to avoid circular import
@@ -53,14 +62,14 @@ class SettingsManager(object):
"""Settings backend manager. This class manages cthulhu user's settings
using different backends"""
def __init__(self, app, backend='json'): # Modified signature
def __init__(self, app: Cthulhu, backend: str = 'json') -> None: # Modified signature
debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initializing', True)
self.app = app # Store app instance
self.app: Cthulhu = app # Store app instance
# Move _proxy initialization here
try:
self._proxy = Gio.DBusProxy.new_for_bus_sync(
self._proxy: Optional[Gio.DBusProxy] = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SESSION,
Gio.DBusProxyFlags.NONE,
None,
@@ -71,51 +80,51 @@ class SettingsManager(object):
except Exception:
self._proxy = None
self.backendModule = None
self._backend = None
self.profile = None
self.backendName = backend
self._prefsDir = None
self.backendModule: Optional[Any] = None
self._backend: Optional[Any] = None
self.profile: Optional[str] = None
self.backendName: str = backend
self._prefsDir: Optional[str] = None
# Dictionaries for store the default values
# The keys and values are defined at cthulhu.settings
#
self.defaultGeneral = {}
self.defaultPronunciations = {}
self.defaultKeybindings = {}
self.defaultGeneral: Dict[str, Any] = {}
self.defaultPronunciations: Dict[str, Any] = {}
self.defaultKeybindings: Dict[str, Any] = {}
# Dictionaries that store the key:value pairs which values are
# different from the current profile and the default ones
#
self.profileGeneral = {}
self.profilePronunciations = {}
self.profileKeybindings = {}
self.profileGeneral: Dict[str, Any] = {}
self.profilePronunciations: Dict[str, Any] = {}
self.profileKeybindings: Dict[str, Any] = {}
# Dictionaries that store the current settings.
# They are result to overwrite the default values with
# the ones from the current active profile
self.general = {}
self.pronunciations = {}
self.keybindings = {}
self.general: Dict[str, Any] = {}
self.pronunciations: Dict[str, Any] = {}
self.keybindings: Dict[str, Any] = {}
self._activeApp = ""
self._appGeneral = {}
self._appPronunciations = {}
self._appKeybindings = {}
self._lastRoleSoundPresentation = None
self._activeApp: str = ""
self._appGeneral: Dict[str, Any] = {}
self._appPronunciations: Dict[str, Any] = {}
self._appKeybindings: Dict[str, Any] = {}
self._lastRoleSoundPresentation: Optional[Any] = None
if not self._loadBackend():
raise Exception('SettingsManager._loadBackend failed.')
self.customizedSettings = {}
self._customizationCompleted = False
self.customizedSettings: Dict[str, Any] = {}
self._customizationCompleted: bool = False
# For handling the currently-"classic" application settings
self.settingsPackages = ["app-settings"]
self.settingsPackages: List[str] = ["app-settings"]
debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initialized', True)
def activate(self, prefsDir=None, customSettings={}):
def activate(self, prefsDir: Optional[str] = None, customSettings: Dict[str, Any] = {}) -> None:
debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Activating', True)
self.customizedSettings.update(customSettings)
@@ -123,12 +132,13 @@ class SettingsManager(object):
or os.path.join(GLib.get_user_data_dir(), "cthulhu")
# Load the backend and the default values
self._backend = self.backendModule.Backend(self._prefsDir)
if self.backendModule:
self._backend = self.backendModule.Backend(self._prefsDir)
self._setDefaultGeneral()
self._setDefaultPronunciations()
self._setDefaultKeybindings()
self.general = self.defaultGeneral.copy()
if not self.isFirstStart():
if not self.isFirstStart() and self._backend:
self.general.update(self._backend.getGeneral())
self.pronunciations = self.defaultPronunciations.copy()
self.keybindings = self.defaultKeybindings.copy()
@@ -146,13 +156,14 @@ class SettingsManager(object):
debug.printTokens(debug.LEVEL_INFO, tokens, True)
if self.profile is None:
self.profile = self.general.get('startingProfile')[1]
self.profile = self.general.get('startingProfile')[1] # type: ignore
tokens = ["SETTINGS MANAGER: Current profile is now", self.profile]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
self.setProfile(self.profile)
if self.profile:
self.setProfile(self.profile)
def _loadBackend(self):
def _loadBackend(self) -> bool:
"""Load specific backend for manage user settings"""
try:
@@ -162,14 +173,17 @@ class SettingsManager(object):
except Exception:
return False
def _createDefaults(self):
def _createDefaults(self) -> None:
"""Let the active backend to create the initial structure
for storing the settings and save the default ones from
cthulhu.settings"""
def _createDir(dirName):
def _createDir(dirName: str) -> None:
if not os.path.isdir(dirName):
os.makedirs(dirName)
if not self._prefsDir:
return
# Set up the user's preferences directory
# ($XDG_DATA_HOME/cthulhu by default).
#
@@ -200,20 +214,20 @@ class SettingsManager(object):
if not os.path.exists(userCustomFile):
os.close(os.open(userCustomFile, os.O_CREAT, 0o700))
if self.isFirstStart():
if self.isFirstStart() and self._backend:
self._backend.saveDefaultSettings(self.defaultGeneral,
self.defaultPronunciations,
self.defaultKeybindings)
def _setDefaultPronunciations(self):
def _setDefaultPronunciations(self) -> None:
"""Get the pronunciations by default from cthulhu.settings"""
self.defaultPronunciations = {}
def _setDefaultKeybindings(self):
def _setDefaultKeybindings(self) -> None:
"""Get the keybindings by default from cthulhu.settings"""
self.defaultKeybindings = {}
def _setDefaultGeneral(self):
def _setDefaultGeneral(self) -> None:
"""Get the general settings by default from cthulhu.settings"""
self._getCustomizedSettings()
self.defaultGeneral = {}
@@ -241,7 +255,7 @@ class SettingsManager(object):
if default_active_plugins is not None:
self.defaultGeneral["activePlugins"] = default_active_plugins
def _load_default_general_overrides(self):
def _load_default_general_overrides(self) -> Dict[str, Any]:
if not self._prefsDir:
return {}
@@ -261,7 +275,7 @@ class SettingsManager(object):
if not isinstance(general, dict):
return {}
if hasattr(self._backend, "_migrateSettings"):
if self._backend and hasattr(self._backend, "_migrateSettings"):
try:
general = self._backend._migrateSettings(dict(general))
except Exception as error:
@@ -283,12 +297,12 @@ class SettingsManager(object):
return general
def getDefaultSetting(self, settingName):
def getDefaultSetting(self, settingName: str) -> Any:
if settingName in self.defaultGeneral:
return self.defaultGeneral.get(settingName)
return getattr(settings, settingName, None)
def _getCustomizedSettings(self):
def _getCustomizedSettings(self) -> Dict[str, Any]:
if self._customizationCompleted:
return self.customizedSettings
@@ -302,20 +316,25 @@ class SettingsManager(object):
customValue = settings.__dict__.get(key)
if value != customValue:
self.customizedSettings[key] = customValue
return self.customizedSettings
def _loadUserCustomizations(self):
def _loadUserCustomizations(self) -> bool:
"""Attempt to load the user's cthulhu-customizations. Returns a boolean
indicating our success at doing so, where success is measured by the
likelihood that the results won't be different if we keep trying."""
success = False
if not self._prefsDir:
return False
pathList = [self._prefsDir]
tokens = ["SETTINGS MANAGER: Attempt to load cthulhu-customizations"]
module_path = pathList[0] + "/cthulhu-customizations.py"
try:
spec = importlib.util.spec_from_file_location("cthulhu-customizations", module_path)
if spec is not None:
if spec is not None and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
tokens.extend(["from", module_path, "succeeded."])
@@ -327,21 +346,21 @@ class SettingsManager(object):
except Exception as error:
# Treat this failure as a "success" so that we don't stomp on the existing file.
success = True
tokens.extend(["failed due to:", error, ". Not loading customizations."])
tokens.extend(["failed due to:", str(error), ". Not loading customizations."])
debug.printTokens(debug.LEVEL_ALL, tokens, True)
return success
def getPrefsDir(self):
def getPrefsDir(self) -> Optional[str]:
return self._prefsDir
def setSetting(self, settingName, settingValue):
def setSetting(self, settingName: str, settingValue: Any) -> None:
self._setSettingsRuntime({settingName:settingValue})
def getSetting(self, settingName):
def getSetting(self, settingName: str) -> Any:
return getattr(settings, settingName, None)
def getVoiceLocale(self, voice='default'):
def getVoiceLocale(self, voice: str = 'default') -> str:
voices = self.getSetting('voices')
v = ACSS(voices.get(voice, {}))
lang = v.getLocale()
@@ -350,7 +369,7 @@ class SettingsManager(object):
lang = f"{lang}_{dialect.upper()}"
return lang
def getSpeechServerFactories(self):
def getSpeechServerFactories(self) -> List[Any]:
"""Imports all known SpeechServer factory modules."""
factories = []
@@ -366,7 +385,7 @@ class SettingsManager(object):
return factories
def _loadProfileSettings(self, profile=None):
def _loadProfileSettings(self, profile: Optional[str] = None) -> None:
"""Get from the active backend all the settings for the current
profile and store them in the object's attributes.
A profile can be passed as a parameter. This could be useful for
@@ -377,14 +396,16 @@ class SettingsManager(object):
if profile is None:
profile = self.profile
self.profileGeneral = self.getGeneralSettings(profile) or {}
self.profilePronunciations = self.getPronunciations(profile) or {}
self.profileKeybindings = self.getKeybindings(profile) or {}
if profile:
self.profileGeneral = self.getGeneralSettings(profile) or {}
self.profilePronunciations = self.getPronunciations(profile) or {}
self.profileKeybindings = self.getKeybindings(profile) or {}
tokens = ["SETTINGS MANAGER: Settings for", profile, "profile loaded"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
def _mergeSettings(self):
def _mergeSettings(self) -> None:
"""Update the changed values on the profile settings
over the current and active settings"""
@@ -402,7 +423,7 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: Settings merged.'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _enableAccessibility(self):
def _enableAccessibility(self) -> bool:
"""Enables the GNOME accessibility flag. Users need to log out and
then back in for this to take effect.
@@ -416,22 +437,27 @@ class SettingsManager(object):
return not alreadyEnabled
def isAccessibilityEnabled(self):
def isAccessibilityEnabled(self) -> bool:
msg = 'SETTINGS MANAGER: Checking if accessibility is enabled.'
debug.printMessage(debug.LEVEL_INFO, msg, True)
msg = 'SETTINGS MANAGER: Accessibility enabled: '
rv = False
if not self._proxy:
rv = False
msg += 'Error (no proxy)'
else:
rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'IsEnabled')
msg += str(rv)
try:
rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'IsEnabled')
msg += str(rv)
except Exception:
rv = False
msg += 'Error calling DBus'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return rv
def setAccessibility(self, enable):
def setAccessibility(self, enable: bool) -> Union[bool, None]:
msg = f'SETTINGS MANAGER: Attempting to set accessibility to {enable}.'
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -440,38 +466,47 @@ class SettingsManager(object):
debug.printMessage(debug.LEVEL_INFO, msg, True)
return False
vEnable = GLib.Variant('b', enable)
self._proxy.Set('(ssv)', 'org.a11y.Status', 'IsEnabled', vEnable)
try:
vEnable = GLib.Variant('b', enable)
self._proxy.Set('(ssv)', 'org.a11y.Status', 'IsEnabled', vEnable)
except Exception:
pass
msg = f'SETTINGS MANAGER: Finished setting accessibility to {enable}.'
debug.printMessage(debug.LEVEL_INFO, msg, True)
return None
def isScreenReaderServiceEnabled(self):
def isScreenReaderServiceEnabled(self) -> bool:
"""Returns True if the screen reader service is enabled. Note that
this does not necessarily mean that Cthulhu (or any other screen reader)
is running at the moment."""
msg = 'SETTINGS MANAGER: Is screen reader service enabled? '
rv = False
if not self._proxy:
rv = False
msg += 'Error (no proxy)'
else:
rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'ScreenReaderEnabled')
msg += str(rv)
try:
rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'ScreenReaderEnabled')
msg += str(rv)
except Exception:
rv = False
debug.printMessage(debug.LEVEL_INFO, msg, True)
return rv
def setStartingProfile(self, profile=None):
def setStartingProfile(self, profile: Optional[tuple] = None) -> None:
if profile is None:
profile = settings.profile
self._backend._setProfileKey('startingProfile', profile)
if self._backend:
self._backend._setProfileKey('startingProfile', profile)
def getProfile(self):
def getProfile(self) -> Optional[str]:
return self.profile
def setProfile(self, profile='default', updateLocale=False):
def setProfile(self, profile: str = 'default', updateLocale: bool = False) -> None:
"""Set a specific profile as the active one.
Also the settings from that profile will be loading
and updated the current settings with them."""
@@ -497,10 +532,11 @@ class SettingsManager(object):
tokens = ["SETTINGS MANAGER: Profile set to:", profile]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
def removeProfile(self, profile):
self._backend.removeProfile(profile)
def removeProfile(self, profile: str) -> None:
if self._backend:
self._backend.removeProfile(profile)
def _setSettingsRuntime(self, settingsDict):
def _setSettingsRuntime(self, settingsDict: Dict[str, Any]) -> None:
msg = 'SETTINGS MANAGER: Setting runtime settings.'
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -514,7 +550,7 @@ class SettingsManager(object):
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._logRoleSoundPresentationChange()
def _logRoleSoundPresentationChange(self):
def _logRoleSoundPresentationChange(self) -> None:
current = getattr(settings, "roleSoundPresentation", None)
if current == self._lastRoleSoundPresentation:
return
@@ -526,31 +562,37 @@ class SettingsManager(object):
debug.printMessage(debug.LEVEL_INFO, msg, True)
self._lastRoleSoundPresentation = current
def _setPronunciationsRuntime(self, pronunciationsDict):
def _setPronunciationsRuntime(self, pronunciationsDict: Dict[str, Any]) -> None:
pronunciation_dict.pronunciation_dict = {}
for key, value in pronunciationsDict.values():
if key and value:
pronunciation_dict.setPronunciation(key, value)
def getGeneralSettings(self, profile='default'):
def getGeneralSettings(self, profile: str = 'default') -> Dict[str, Any]:
"""Return the current general settings.
Those settings comes from updating the default settings
with the profiles' ones"""
return self._backend.getGeneral(profile)
if self._backend:
return self._backend.getGeneral(profile)
return {}
def getPronunciations(self, profile='default'):
def getPronunciations(self, profile: str = 'default') -> Dict[str, Any]:
"""Return the current pronunciations settings.
Those settings comes from updating the default settings
with the profiles' ones"""
return self._backend.getPronunciations(profile)
if self._backend:
return self._backend.getPronunciations(profile)
return {}
def getKeybindings(self, profile='default'):
def getKeybindings(self, profile: str = 'default') -> Dict[str, Any]:
"""Return the current keybindings settings.
Those settings comes from updating the default settings
with the profiles' ones"""
return self._backend.getKeybindings(profile)
if self._backend:
return self._backend.getKeybindings(profile)
return {}
def _setProfileGeneral(self, general):
def _setProfileGeneral(self, general: Dict[str, Any]) -> None:
"""Set the changed general settings from the defaults' ones
as the profile's."""
@@ -572,7 +614,7 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: General settings for profile set'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setProfilePronunciations(self, pronunciations):
def _setProfilePronunciations(self, pronunciations: Dict[str, Any]) -> None:
"""Set the changed pronunciations settings from the defaults' ones
as the profile's."""
@@ -585,7 +627,7 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: Pronunciation settings for profile set.'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setProfileKeybindings(self, keybindings):
def _setProfileKeybindings(self, keybindings: Dict[str, Any]) -> None:
"""Set the changed keybindings settings from the defaults' ones
as the profile's."""
@@ -598,21 +640,24 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: Keybindings settings for profile set.'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _saveAppSettings(self, appName, general, pronunciations, keybindings):
def _saveAppSettings(self, appName: str, general: Dict[str, Any], pronunciations: Dict[str, Any], keybindings: Dict[str, Any]) -> None:
if not self._backend:
return
appGeneral = {}
profileGeneral = self.getGeneralSettings(self.profile)
profileGeneral = self.getGeneralSettings(self.profile) if self.profile else {}
for key, value in general.items():
if value != profileGeneral.get(key):
appGeneral[key] = value
appPronunciations = {}
profilePronunciations = self.getPronunciations(self.profile)
profilePronunciations = self.getPronunciations(self.profile) if self.profile else {}
for key, value in pronunciations.items():
if value != profilePronunciations.get(key):
appPronunciations[key] = value
appKeybindings = {}
profileKeybindings = self.getKeybindings(self.profile)
profileKeybindings = self.getKeybindings(self.profile) if self.profile else {}
for key, value in keybindings.items():
if value != profileKeybindings.get(key):
appKeybindings[key] = value
@@ -623,15 +668,17 @@ class SettingsManager(object):
appPronunciations,
appKeybindings)
def saveSettings(self, script, general, pronunciations, keybindings):
def saveSettings(self, script: Script, general: Dict[str, Any], pronunciations: Dict[str, Any], keybindings: Dict[str, Any]) -> Optional[bool]:
"""Save the settings provided for the script provided."""
tokens = ["SETTINGS MANAGER: Saving settings for", script, "(app:", script.app, ")"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
app = script.app
if app:
self._saveAppSettings(AXObject.get_name(app), general, pronunciations, keybindings)
return
appName = AXObject.get_name(app)
if appName:
self._saveAppSettings(appName, general, pronunciations, keybindings)
return None
# Assign current profile
_profile = general.get('profile', settings.profile)
@@ -650,16 +697,17 @@ class SettingsManager(object):
tokens = ["SETTINGS MANAGER: Saving for backend", self._backend]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
self._backend.saveProfileSettings(self.profile,
self.profileGeneral,
self.profilePronunciations,
self.profileKeybindings)
if self._backend and self.profile:
self._backend.saveProfileSettings(self.profile,
self.profileGeneral,
self.profilePronunciations,
self.profileKeybindings)
tokens = ["SETTINGS MANAGER: Settings for", script, "(app:", script.app, ") saved"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return self._enableAccessibility()
def _adjustBindingTupleValues(self, bindingTuple):
def _adjustBindingTupleValues(self, bindingTuple: tuple) -> tuple:
"""Converts the values of bindingTuple into KeyBinding-ready values."""
keysym, mask, mods, clicks = bindingTuple
@@ -670,10 +718,10 @@ class SettingsManager(object):
return bindingTuple
def overrideKeyBindings(self, script, scriptKeyBindings):
def overrideKeyBindings(self, script: Script, scriptKeyBindings: Any) -> Any:
keybindingsSettings = self.profileKeybindings
for handlerString, bindingTuples in keybindingsSettings.items():
handler = script.inputEventHandlers.get(handlerString)
handler: Optional[InputEventHandler] = script.inputEventHandlers.get(handlerString)
if not handler:
continue
@@ -686,23 +734,27 @@ class SettingsManager(object):
return scriptKeyBindings
def isFirstStart(self):
def isFirstStart(self) -> bool:
"""Check if the firstStart key is True or false"""
return self._backend.isFirstStart()
if self._backend:
return self._backend.isFirstStart()
return False
def setFirstStart(self, value=False):
def setFirstStart(self, value: bool = False) -> None:
"""Set firstStart. This user-configurable setting is primarily
intended to serve as an indication as to whether or not initial
configuration is needed."""
self._backend.setFirstStart(value)
if self._backend:
self._backend.setFirstStart(value)
def availableProfiles(self):
def availableProfiles(self) -> List[str]:
"""Get available profiles from active backend"""
if self._backend:
return self._backend.availableProfiles()
return []
return self._backend.availableProfiles()
def getAppSetting(self, app, settingName, fallbackOnDefault=True):
if not app:
def getAppSetting(self, app: Any, settingName: str, fallbackOnDefault: bool = True) -> Any:
if not app or not self._backend:
return None
appPrefs = self._backend.getAppSettings(AXObject.get_name(app))
@@ -710,13 +762,13 @@ class SettingsManager(object):
profilePrefs = profiles.get(self.profile, {})
general = profilePrefs.get('general', {})
appSetting = general.get(settingName)
if appSetting is None and fallbackOnDefault:
if appSetting is None and fallbackOnDefault and self.profile:
general = self._backend.getGeneral(self.profile)
appSetting = general.get(settingName)
return appSetting
def loadAppSettings(self, script):
def loadAppSettings(self, script: Script) -> None:
"""Load the users application specific settings for an app.
Arguments:
@@ -726,8 +778,12 @@ class SettingsManager(object):
if not (script and script.app):
return
for key in self._appPronunciations.keys():
self.pronunciations.pop(key)
if not self._backend:
return
for key in list(self._appPronunciations.keys()):
if key in self.pronunciations:
self.pronunciations.pop(key)
prefs = self._backend.getAppSettings(AXObject.get_name(script.app))
profiles = prefs.get('profiles', {})
@@ -736,7 +792,7 @@ class SettingsManager(object):
self._appGeneral = profilePrefs.get('general', {})
self._appKeybindings = profilePrefs.get('keybindings', {})
self._appPronunciations = profilePrefs.get('pronunciations', {})
self._activeApp = AXObject.get_name(script.app)
self._activeApp = AXObject.get_name(script.app) or ""
self._loadProfileSettings()
self._mergeSettings()
@@ -744,9 +800,9 @@ class SettingsManager(object):
self._setPronunciationsRuntime(self.pronunciations)
script.keyBindings = self.overrideKeyBindings(script, script.getKeyBindings())
_managerInstance = None
_managerInstance: Optional[SettingsManager] = None
def getManager():
def getManager() -> Optional[SettingsManager]:
"""Get the settings manager instance. Compatibility function.
This function provides backward compatibility for existing code that uses
@@ -759,7 +815,8 @@ def getManager():
if _managerInstance is None:
try:
from . import cthulhu
_managerInstance = cthulhu.cthulhuApp.settingsManager
if cthulhu.cthulhuApp:
_managerInstance = cthulhu.cthulhuApp.settingsManager
except (ImportError, AttributeError):
# During import phase, cthulhuApp may not exist yet
pass
+90 -63
View File
@@ -26,6 +26,8 @@
"""Manages the default speech server for cthulhu. A script can use this
as its speech server, or it can feel free to create one of its own."""
from __future__ import annotations
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
@@ -34,6 +36,7 @@ __license__ = "LGPL"
import importlib
import time
from typing import TYPE_CHECKING, Optional, List, Dict, Any, Union
from . import debug
from . import logger
@@ -46,26 +49,31 @@ from .speechserver import VoiceFamily
from .acss import ACSS
from . import speech_history
# Lazy initialization to avoid circular imports
_logger = None
log = None
if TYPE_CHECKING:
from .speechserver import SpeechServer
from .logger import Logger
def _ensureLogger():
# Lazy initialization to avoid circular imports
_logger: Optional[Logger] = None
log: Optional[Any] = None # Logger.newLog returns a log object, keeping Any for now as logger isn't typed
def _ensureLogger() -> None:
"""Ensure logger is initialized."""
global _logger, log
if _logger is None:
from . import cthulhu
_logger = cthulhu.cthulhuApp.logger
log = _logger.newLog("speech")
if cthulhu.cthulhuApp:
_logger = cthulhu.cthulhuApp.logger
log = _logger.newLog("speech")
# The speech server to use for all speech operations.
#
_speechserver = None
_speechserver: Optional[SpeechServer] = None
# The last time something was spoken.
_timestamp = 0
_timestamp: float = 0.0
def _initSpeechServer(moduleName, speechServerInfo):
def _initSpeechServer(moduleName: Optional[str], speechServerInfo: Optional[Any]) -> None:
global _speechserver
@@ -84,19 +92,20 @@ def _initSpeechServer(moduleName, speechServerInfo):
# Now, get the speech server we care about.
#
speechServerInfo = settings.speechServerInfo
if speechServerInfo:
_speechserver = factory.SpeechServer.getSpeechServer(speechServerInfo)
if not _speechserver:
_speechserver = factory.SpeechServer.getSpeechServer()
if factory:
if speechServerInfo:
tokens = ["SPEECH: Invalid speechServerInfo:", speechServerInfo]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
_speechserver = factory.SpeechServer.getSpeechServer(speechServerInfo) # type: ignore
if not _speechserver:
_speechserver = factory.SpeechServer.getSpeechServer() # type: ignore
if speechServerInfo:
tokens = ["SPEECH: Invalid speechServerInfo:", speechServerInfo]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
if not _speechserver:
raise Exception(f"ERROR: No speech server for factory: {moduleName}")
def init():
def init() -> None:
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initializing', True)
if _speechserver:
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Already initialized', True)
@@ -134,16 +143,32 @@ def init():
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initialized', True)
def checkSpeechSetting():
def checkSpeechSetting() -> None:
msg = "SPEECH: Checking speech setting."
debug.printMessage(debug.LEVEL_INFO, msg, True)
if not settings.enableSpeech:
shutdown()
else:
if _speechserver:
shutdown()
return
if not _speechserver:
init()
def __resolveACSS(acss=None):
def getSpeechServer() -> Optional[SpeechServer]:
"""Returns the speech server instance."""
return _speechserver
def setSpeechServer(speechServer: SpeechServer) -> None:
"""Sets the speech server to be used.
Arguments:
- speechServer: the speech server to use.
"""
global _speechserver
_speechserver = speechServer
def __resolveACSS(acss: Optional[Any] = None) -> ACSS:
if isinstance(acss, ACSS):
family = acss.get(acss.FAMILY)
try:
@@ -160,35 +185,36 @@ def __resolveACSS(acss=None):
voices = settings.voices
return ACSS(voices[settings.DEFAULT_VOICE])
def sayAll(utteranceIterator, progressCallback):
def sayAll(utteranceIterator: Any, progressCallback: Any) -> None:
_ensureLogger()
if settings.silenceSpeech:
return
if _speechserver:
def _speechHistorySayAllWrapper():
def _speechHistorySayAllWrapper() -> Any:
for [context, acss] in utteranceIterator:
try:
utterance = getattr(context, "utterance", None)
if isinstance(utterance, str) and utterance.strip():
speech_history.add(utterance, source="sayAll")
speech_history.add(utterance, source="sayAll") # type: ignore
except Exception:
debug.printException(debug.LEVEL_INFO)
yield [context, acss]
_speechserver.sayAll(_speechHistorySayAllWrapper(), progressCallback)
_speechserver.sayAll(_speechHistorySayAllWrapper(), progressCallback) # type: ignore
else:
for [context, acss] in utteranceIterator:
logLine = f"SPEECH OUTPUT: '{context.utterance}'"
debug.printMessage(debug.LEVEL_INFO, logLine, True)
log.info(logLine)
try:
utterance = getattr(context, "utterance", None)
if isinstance(utterance, str) and utterance.strip():
speech_history.add(utterance, source="sayAll-fallback")
except Exception:
debug.printException(debug.LEVEL_INFO)
if log:
for [context, acss] in utteranceIterator:
logLine = f"SPEECH OUTPUT: '{context.utterance}'"
debug.printMessage(debug.LEVEL_INFO, logLine, True)
log.info(logLine)
try:
utterance = getattr(context, "utterance", None)
if isinstance(utterance, str) and utterance.strip():
speech_history.add(utterance, source="sayAll-fallback") # type: ignore
except Exception:
debug.printException(debug.LEVEL_INFO)
def _speak(text, acss, interrupt):
def _speak(text: str, acss: Optional[Any], interrupt: bool) -> None:
"""Speaks the individual string using the given ACSS."""
_ensureLogger()
@@ -198,7 +224,7 @@ def _speak(text, acss, interrupt):
from . import sleep_mode_manager
if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'):
sleepModeManager = sleep_mode_manager.getManager()
if sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app):
if sleepModeManager and sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app):
# Allow sleep mode status messages to get through
if "Sleep mode enabled" in text or "Sleep mode disabled" in text:
debug.printMessage(debug.LEVEL_INFO, f"SPEECH: Allowing sleep mode status: '{text}'", True)
@@ -207,14 +233,15 @@ def _speak(text, acss, interrupt):
return
try:
speech_history.add(text, source="speak")
speech_history.add(text, source="speak") # type: ignore
except Exception:
debug.printException(debug.LEVEL_INFO)
if not _speechserver:
logLine = f"SPEECH OUTPUT: '{text}' {acss}"
debug.printMessage(debug.LEVEL_INFO, logLine, True)
log.info(logLine)
if log:
log.info(logLine)
return
voice = ACSS(settings.voices.get(settings.DEFAULT_VOICE))
@@ -227,9 +254,9 @@ def _speak(text, acss, interrupt):
resolvedVoice = __resolveACSS(voice)
msg = f"SPEECH OUTPUT: '{text}' {resolvedVoice}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
_speechserver.speak(text, resolvedVoice, interrupt)
_speechserver.speak(text, resolvedVoice, interrupt) # type: ignore
def speak(content, acss=None, interrupt=True):
def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = True) -> None:
"""Speaks the given content. The content can be either a simple
string or an array of arrays of objects returned by a speech
generator."""
@@ -242,7 +269,7 @@ def speak(content, acss=None, interrupt=True):
from . import sleep_mode_manager
if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'):
sleepModeManager = sleep_mode_manager.getManager()
if sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app):
if sleepModeManager and sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app):
# Allow sleep mode status messages to get through
if isinstance(content, str):
if "Sleep mode enabled" in content or "Sleep mode disabled" in content:
@@ -265,7 +292,7 @@ def speak(content, acss=None, interrupt=True):
validTypes = (str, list, speech_generator.Pause,
speech_generator.LineBreak, ACSS, Icon)
error = "SPEECH: bad content sent to speak(): '%s'"
if not isinstance(content, validTypes):
if not isinstance(content, validTypes): # type: ignore
debug.printMessage(debug.LEVEL_INFO, error % content, True)
return
@@ -288,13 +315,13 @@ def speak(content, acss=None, interrupt=True):
return
shouldInterrupt = interrupt
toSpeak = []
toSpeak: List[str] = []
activeVoice = acss
if acss is not None:
activeVoice = ACSS(acss)
for element in content:
if not isinstance(element, validTypes):
if not isinstance(element, validTypes): # type: ignore
debug.printMessage(debug.LEVEL_INFO, error % element, True)
elif isinstance(element, list):
speak(element, acss, shouldInterrupt)
@@ -310,7 +337,8 @@ def speak(content, acss=None, interrupt=True):
toSpeak = []
if element.isValid():
player = sound.getPlayer()
player.play(element, interrupt=interrupt)
if player:
player.play(element, interrupt=interrupt)
elif toSpeak:
newVoice = ACSS(acss)
newItemsToSpeak = []
@@ -338,7 +366,7 @@ def speak(content, acss=None, interrupt=True):
string = " ".join(toSpeak)
_speak(string, activeVoice, shouldInterrupt)
def speakKeyEvent(event, acss=None):
def speakKeyEvent(event: Any, acss: Optional[Any] = None) -> None:
"""Speaks a key event immediately.
Arguments:
@@ -362,12 +390,13 @@ def speakKeyEvent(event, acss=None):
msg = f"{keyname} {lockingStateString}"
logLine = f"SPEECH OUTPUT: '{msg.strip()}' {acss}"
debug.printMessage(debug.LEVEL_INFO, logLine, True)
log.info(logLine)
if log:
log.info(logLine)
if _speechserver:
_speechserver.speakKeyEvent(event, acss)
_speechserver.speakKeyEvent(event, acss) # type: ignore
def speakCharacter(character, acss=None):
def speakCharacter(character: str, acss: Optional[Any] = None) -> None:
"""Speaks a single character immediately.
Arguments:
@@ -393,32 +422,30 @@ def speakCharacter(character, acss=None):
acss = __resolveACSS(acss)
tokens = [f"SPEECH OUTPUT: '{character}'", acss]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
log.info(f"SPEECH OUTPUT: '{character}'")
if log:
log.info(f"SPEECH OUTPUT: '{character}'")
if _speechserver:
_speechserver.speakCharacter(character, acss=acss)
_speechserver.speakCharacter(character, acss=acss) # type: ignore
def getInfo():
def getInfo() -> Optional[Any]:
info = None
if _speechserver:
info = _speechserver.getInfo()
info = _speechserver.getInfo() # type: ignore
return info
def stop():
def stop() -> None:
if _speechserver:
_speechserver.stop()
_speechserver.stop() # type: ignore
def shutdown():
def shutdown() -> None:
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Shutting down', True)
global _speechserver
if _speechserver:
_speechserver.shutdownActiveServers()
_speechserver.shutdownActiveServers() # type: ignore
_speechserver = None
def reset(text=None, acss=None):
def reset(text: Optional[str] = None, acss: Optional[Any] = None) -> None:
if _speechserver:
_speechserver.reset(text, acss)
def getSpeechServer():
return _speechserver
_speechserver.reset(text, acss) # type: ignore
+5 -1
View File
@@ -1487,9 +1487,13 @@ class StructuralNavigation:
def _chunkCriteria(self, arg=None):
return AXCollection.create_match_rule(roles=self.OBJECT_ROLES + self.CONTAINER_ROLES)
def _chunkPredicate(self, obj, arg=None):
def _chunkPredicate(self, obj: Atspi.Accessible, arg=None) -> bool:
if AXUtilities.is_heading(obj):
return True
if AXUtilities.is_list(obj):
return True
if AXUtilities.is_table(obj):
return True
text = self._script.utilities.queryNonEmptyText(obj)
if not text: