diff --git a/.gitignore b/.gitignore index d9e3999..8edfa92 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,4 @@ po/insert-header.sed !/help/C/*.xml /help/*/*.mo /help/*/*.stamp +.aider* diff --git a/distro-packages/Arch-Linux/PKGBUILD b/distro-packages/Arch-Linux/PKGBUILD index b4ee574..3c7bd34 100644 --- a/distro-packages/Arch-Linux/PKGBUILD +++ b/distro-packages/Arch-Linux/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Storm Dragon pkgname=cthulhu -pkgver=2026.01.12 +pkgver=2026.01.26 pkgrel=1 pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca" url="https://git.stormux.org/storm/cthulhu" diff --git a/meson.build b/meson.build index 95e96d4..ec3cac6 100644 --- a/meson.build +++ b/meson.build @@ -1,5 +1,5 @@ project('cthulhu', - version: '2026.01.12-master', + version: '2026.01.26-master', meson_version: '>= 1.0.0', ) diff --git a/src/cthulhu/cthulhu.py b/src/cthulhu/cthulhu.py index 1dd740f..a5122a3 100644 --- a/src/cthulhu/cthulhu.py +++ b/src/cthulhu/cthulhu.py @@ -36,13 +36,26 @@ __copyright__ = "Copyright (c) 2004-2009 Sun Microsystems Inc." \ __license__ = "LGPL" import faulthandler -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from . import dbus_service if TYPE_CHECKING: from types import FrameType from gi.repository.Gio import Settings as GSettings + from gi.repository import Gtk + + from .settings_manager import SettingsManager + from .script_manager import ScriptManager + from .plugin_system_manager import PluginSystemManager + from .input_event import InputEvent + from .input_event_manager import InputEventManager + from .event_manager import EventManager + from .signal_manager import SignalManager + from .dynamic_api_manager import DynamicApiManager + from .speech import Speech + from .braille import Braille + from .script import Script class APIHelper: """Helper class for plugin API interactions, including keybindings.""" @@ -54,7 +67,7 @@ class APIHelper: - app: the Cthulhu application """ self.app: Cthulhu = app - self._gestureBindings: dict[Optional[str], list[Any]] = {} + self._gestureBindings: Dict[Optional[str], List[Any]] = {} def registerGestureByString( self, @@ -914,7 +927,7 @@ def main() -> int: class Cthulhu(GObject.Object): # basic signals - __gsignals__: dict[str, tuple[Any, ...]] = { + __gsignals__: Dict[str, Tuple[Any, ...]] = { "start-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()), "stop-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()), "load-setting-begin": (GObject.SignalFlags.RUN_LAST, None, ()), @@ -927,52 +940,69 @@ class Cthulhu(GObject.Object): def __init__(self) -> None: GObject.Object.__init__(self) # add members - self.resourceManager: Any = resource_manager.ResourceManager(self) - self.settingsManager: Any = settings_manager.SettingsManager(self) # Directly instantiate - self.eventManager: Any = event_manager.EventManager(self) # Directly instantiate - self.scriptManager: Any = script_manager.ScriptManager(self) # Directly instantiate - self.logger: Any = logger.Logger() # Directly instantiate - self.signalManager: Any = signal_manager.SignalManager(self) - self.dynamicApiManager: Any = dynamic_api_manager.DynamicApiManager(self) - self.translationManager: Any = translation_manager.TranslationManager(self) + self.resourceManager: resource_manager.ResourceManager = resource_manager.ResourceManager(self) + self.settingsManager: SettingsManager = settings_manager.SettingsManager(self) # Directly instantiate + self.eventManager: EventManager = event_manager.EventManager(self) # Directly instantiate + self.scriptManager: ScriptManager = script_manager.ScriptManager(self) # Directly instantiate + self.logger: logger.Logger = logger.Logger() # Directly instantiate + self.signalManager: SignalManager = signal_manager.SignalManager(self) + self.dynamicApiManager: DynamicApiManager = dynamic_api_manager.DynamicApiManager(self) + self.translationManager: TranslationManager = translation_manager.TranslationManager(self) self.debugManager: Any = debug self.APIHelper: APIHelper = APIHelper(self) self.createCompatAPI() - self.pluginSystemManager: Any = plugin_system_manager.PluginSystemManager(self) + self.pluginSystemManager: PluginSystemManager = plugin_system_manager.PluginSystemManager(self) # Scan for available plugins at startup self.pluginSystemManager.rescanPlugins() + def getAPIHelper(self) -> APIHelper: return self.APIHelper - def getPluginSystemManager(self) -> Any: + + def getPluginSystemManager(self) -> PluginSystemManager: return self.pluginSystemManager - def getDynamicApiManager(self) -> Any: + + def getDynamicApiManager(self) -> DynamicApiManager: return self.dynamicApiManager - def getSignalManager(self) -> Any: + + def getSignalManager(self) -> SignalManager: return self.signalManager - def getEventManager(self) -> Any: + + def getEventManager(self) -> EventManager: return self.eventManager - def getSettingsManager(self) -> Any: + + def getSettingsManager(self) -> SettingsManager: return self.settingsManager - def getScriptManager(self) -> Any: + + def getScriptManager(self) -> ScriptManager: return self.scriptManager - def get_scriptManager(self) -> Any: + + def get_scriptManager(self) -> ScriptManager: return self.scriptManager + def getDebugManager(self) -> Any: return self.debugManager - def getTranslationManager(self) -> Any: + + def getTranslationManager(self) -> TranslationManager: return self.translationManager - def getResourceManager(self) -> Any: + + def getResourceManager(self) -> resource_manager.ResourceManager: return self.resourceManager - def getLogger(self) -> Any: # New getter for the logger + + def getLogger(self) -> logger.Logger: # New getter for the logger return self.logger - def addKeyGrab(self, binding: Any) -> list[int]: + + def addKeyGrab(self, binding: Any) -> List[int]: return addKeyGrab(binding) + def removeKeyGrab(self, grab_id: int) -> None: return removeKeyGrab(grab_id) + def run(self, cacheValues: bool = True) -> int: return main() + def stop(self) -> None: pass + def createCompatAPI(self) -> None: # for now add compatibility layer using Dynamic API # should be removed step by step diff --git a/src/cthulhu/cthulhuVersion.py b/src/cthulhu/cthulhuVersion.py index aa31ed5..9f57825 100644 --- a/src/cthulhu/cthulhuVersion.py +++ b/src/cthulhu/cthulhuVersion.py @@ -23,5 +23,5 @@ # Forked from Orca screen reader. # Cthulhu project: https://git.stormux.org/storm/cthulhu -version = "2026.01.12" +version = "2026.01.26" codeName = "master" diff --git a/src/cthulhu/event_manager.py b/src/cthulhu/event_manager.py index 827acc5..5f5c1da 100644 --- a/src/cthulhu/event_manager.py +++ b/src/cthulhu/event_manager.py @@ -23,6 +23,8 @@ # Forked from Orca screen reader. # Cthulhu project: https://git.stormux.org/storm/cthulhu +from __future__ import annotations + __id__ = "$Id$" __version__ = "$Revision$" __date__ = "$Date$" @@ -36,7 +38,7 @@ from gi.repository import GLib import queue import threading import time -from typing import Optional, Dict, List, Tuple, Any +from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, Union from . import cthulhu from . import debug @@ -48,21 +50,26 @@ from . import settings from .ax_object import AXObject from .ax_utilities import AXUtilities +if TYPE_CHECKING: + from .cthulhu import Cthulhu + from .script import Script + from .input_event_manager import InputEventManager + class EventManager: EMBEDDED_OBJECT_CHARACTER: str = '\ufffc' - def __init__(self, app: Any, asyncMode: bool = True) -> None: # app is CthulhuApp instance + def __init__(self, app: Cthulhu, asyncMode: bool = True) -> None: debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Initializing', True) debug.printMessage(debug.LEVEL_INFO, f'EVENT MANAGER: Async Mode is {asyncMode}', True) - self.app: Any = app + self.app: Cthulhu = app self._asyncMode: bool = asyncMode self._scriptListenerCounts: Dict[str, int] = {} self._active: bool = False self._enqueueCount: int = 0 self._dequeueCount: int = 0 self._cmdlineCache: Dict[int, str] = {} - self._eventQueue: queue.Queue = queue.Queue(0) + self._eventQueue: queue.Queue[Any] = queue.Queue(0) self._gidleId: int = 0 self._gidleLock: threading.Lock = threading.Lock() self._gilSleepTime: float = 0.00001 @@ -81,15 +88,15 @@ class EventManager: 'object:state-changed:sensitive', 'object:state-changed:showing', 'object:text-changed:delete'] - self._eventsTriggeringSuspension: List[Any] = [] # List of events + self._eventsTriggeringSuspension: List[Atspi.Event] = [] self._ignoredEvents: List[str] = ['object:bounds-changed', 'object:state-changed:defunct', 'object:property-change:accessible-parent'] - self._parentsOfDefunctDescendants: List[Any] = [] # List[Atspi.Accessible] + self._parentsOfDefunctDescendants: List[Atspi.Accessible] = [] cthulhu_state.device = None self._keyHandlingActive: bool = False - self._inputEventManager: Optional[Any] = None # Optional[InputEventManager] + self._inputEventManager: Optional[InputEventManager] = None debug.printMessage(debug.LEVEL_INFO, 'Event manager initialized', True) @@ -174,7 +181,7 @@ class EventManager: if eventType in self._ignoredEvents: self._ignoredEvents.remove(eventType) - def _isDuplicateEvent(self, event: Any) -> bool: # event: Atspi.Event + def _isDuplicateEvent(self, event: Atspi.Event) -> bool: """Returns True if this event is already in the event queue.""" if self._inFlood() and self._prioritizeDuringFlood(event): @@ -193,7 +200,7 @@ class EventManager: return False - def _getAppCmdline(self, app: Any) -> str: # app: Atspi.Accessible + def _getAppCmdline(self, app: Atspi.Accessible) -> str: pid = AXObject.get_process_id(app) if pid == -1: return "" @@ -204,7 +211,7 @@ class EventManager: self._cmdlineCache[pid] = cmdline return cmdline - def _isSteamApp(self, app: Any) -> bool: # app: Atspi.Accessible + def _isSteamApp(self, app: Atspi.Accessible) -> bool: name = AXObject.get_name(app) if not name: nameLower = "" @@ -217,7 +224,7 @@ class EventManager: cmdline = self._getAppCmdline(app) return "steamwebhelper" in cmdline - def _isSteamNotificationEvent(self, event: Any) -> bool: # event: Atspi.Event + def _isSteamNotificationEvent(self, event: Atspi.Event) -> bool: for obj in (event.any_data, event.source): if not isinstance(obj, Atspi.Accessible): continue @@ -230,7 +237,7 @@ class EventManager: return False - def _ignore(self, event: Any) -> bool: # event: Atspi.Event + def _ignore(self, event: Atspi.Event) -> bool: """Returns True if this event should be ignored.""" app = AXObject.get_application(event.source) @@ -238,19 +245,19 @@ class EventManager: tokens = ["EVENT MANAGER:", event.type, "from", app] debug.printTokens(debug.LEVEL_INFO, tokens, True) - def _log_ignore(reason, message): + def _log_ignore(reason: str, message: str) -> None: debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER", f"Ignoring - {message}", reason, True) - def _log_allow(reason, message): + def _log_allow(reason: str, message: str) -> None: debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER", f"Not ignoring - {message}", reason, True) - def _ignore_with_reason(reason, message): + def _ignore_with_reason(reason: str, message: str) -> bool: _log_ignore(reason, message) return True - def _allow_with_reason(reason, message): + def _allow_with_reason(reason: str, message: str) -> bool: _log_allow(reason, message) return False @@ -430,7 +437,7 @@ class EventManager: return _allow_with_reason("no-cause", "no ignore condition met") - def _addToQueue(self, event, asyncMode): + def _addToQueue(self, event: Any, asyncMode: bool) -> None: debugging = debug.debugEventQueue if debugging: debug.printMessage(debug.LEVEL_ALL, " acquiring lock...") @@ -457,7 +464,7 @@ class EventManager: if debug.debugEventQueue: debug.printMessage(debug.LEVEL_ALL, " ...released") - def _queuePrintln(self, e, isEnqueue=True, isPrune=None): + def _queuePrintln(self, e: Any, isEnqueue: bool = True, isPrune: Optional[bool] = None) -> None: """Convenience method to output queue-related debugging info.""" if debug.LEVEL_INFO < debug.debugLevel: @@ -483,7 +490,7 @@ class EventManager: tokens[0:0] = ["EVENT MANAGER: Dequeued"] debug.printTokens(debug.LEVEL_INFO, tokens, True) - def _suspendEvents(self, triggeringEvent): + def _suspendEvents(self, triggeringEvent: Atspi.Event) -> None: self._eventsTriggeringSuspension.append(triggeringEvent) if self._eventsSuspended: @@ -499,7 +506,7 @@ class EventManager: self._eventsSuspended = True - def _unsuspendEvents(self, triggeringEvent, force=False): + def _unsuspendEvents(self, triggeringEvent: Atspi.Event, force: bool = False) -> None: if triggeringEvent in self._eventsTriggeringSuspension: self._eventsTriggeringSuspension.remove(triggeringEvent) @@ -521,7 +528,7 @@ class EventManager: self._eventsSuspended = False - def _shouldSuspendEventsFor(self, event): + def _shouldSuspendEventsFor(self, event: Atspi.Event) -> bool: if AXUtilities.is_frame(event.source) \ or (AXUtilities.is_window(event.source) \ and AXUtilities.get_application_toolkit_name(event.source) == "clutter"): @@ -541,7 +548,7 @@ class EventManager: return False - def _shouldUnsuspendEventsFor(self, event): + def _shouldUnsuspendEventsFor(self, event: Atspi.Event) -> bool: if event.type.startswith("object:state-changed:focused") and event.detail1: msg = "EVENT MANAGER: Should unsuspend events for newly-focused object." debug.printMessage(debug.LEVEL_INFO, msg, True) @@ -559,10 +566,10 @@ class EventManager: return False - def _didSuspendEventsFor(self, event): + def _didSuspendEventsFor(self, event: Atspi.Event) -> bool: return event in self._eventsTriggeringSuspension - def _enqueue(self, e): + def _enqueue(self, e: Atspi.Event) -> None: """Handles the enqueueing of all events destined for scripts. Arguments: @@ -621,7 +628,7 @@ class EventManager: if debug.debugEventQueue: self._enqueueCount -= 1 - def _isNoFocus(self): + def _isNoFocus(self) -> bool: if cthulhu_state.locusOfFocus or cthulhu_state.activeWindow or cthulhu_state.activeScript: return False @@ -629,7 +636,7 @@ class EventManager: debug.printMessage(debug.LEVEL_SEVERE, msg, True) return True - def _onNoFocus(self): + def _onNoFocus(self) -> bool: if not self._isNoFocus(): return False @@ -638,7 +645,7 @@ class EventManager: defaultScript.idleMessage() return False - def _dequeue(self): + def _dequeue(self) -> bool: """Handles all events destined for scripts. Called by the GTK idle thread.""" @@ -736,7 +743,7 @@ class EventManager: self._listener.deregister(eventType) del self._scriptListenerCounts[eventType] - def registerScriptListeners(self, script: Any) -> None: # script: Script + def registerScriptListeners(self, script: Script) -> None: """Tells the event manager to start listening for all the event types of interest to the script. @@ -750,7 +757,7 @@ class EventManager: for eventType in script.listeners.keys(): self.registerListener(eventType) - def deregisterScriptListeners(self, script: Any) -> None: # script: Script + def deregisterScriptListeners(self, script: Script) -> None: """Tells the event manager to stop listening for all the event types of interest to the script. @@ -764,7 +771,7 @@ class EventManager: for eventType in script.listeners.keys(): self.deregisterListener(eventType) - def _processInputEvent(self, event): + def _processInputEvent(self, event: Any) -> None: """Processes the given input event based on the keybinding from the currently-active script. @@ -798,7 +805,7 @@ class EventManager: debug.printMessage(debug.eventDebugLevel, msg, False) @staticmethod - def _get_scriptForEvent(event: Any) -> Optional[Any]: # Returns Optional[Script] + def _get_scriptForEvent(event: Any) -> Optional[Script]: """Returns the script associated with event.""" if event.type.startswith("mouse:"): @@ -836,7 +843,7 @@ class EventManager: debug.printTokens(debug.LEVEL_INFO, tokens, True) return script - def _isActivatableEvent(self, event: Any, script: Optional[Any] = None) -> Tuple[bool, str]: + def _isActivatableEvent(self, event: Atspi.Event, script: Optional[Script] = None) -> Tuple[bool, str]: """Determines if the event is one which should cause us to change which script is currently active. @@ -902,7 +909,7 @@ class EventManager: return False, "No reason found to activate a different script." - def _eventSourceIsDead(self, event: Any) -> bool: # event: Atspi.Event + def _eventSourceIsDead(self, event: Atspi.Event) -> bool: if AXObject.is_dead(event.source): tokens = ["EVENT MANAGER: source of", event.type, "is dead"] debug.printTokens(debug.LEVEL_INFO, tokens, True) @@ -910,7 +917,7 @@ class EventManager: return False - def _ignoreDuringDeluge(self, event: Any) -> bool: # event: Atspi.Event + def _ignoreDuringDeluge(self, event: Atspi.Event) -> bool: """Returns true if this event should be ignored during a deluge.""" if self._eventSourceIsDead(event): @@ -946,7 +953,7 @@ class EventManager: return False - def _processDuringFlood(self, event: Any) -> bool: # event: Atspi.Event + def _processDuringFlood(self, event: Atspi.Event) -> bool: """Returns true if this event should be processed during a flood.""" if self._eventSourceIsDead(event): @@ -973,7 +980,7 @@ class EventManager: return event.source == cthulhu_state.locusOfFocus - def _prioritizeDuringFlood(self, event: Any) -> bool: # event: Atspi.Event + def _prioritizeDuringFlood(self, event: Atspi.Event) -> bool: """Returns true if this event should be prioritized during a flood.""" if event.type.startswith("object:state-changed:focused"): @@ -1007,7 +1014,7 @@ class EventManager: oldSize = self._eventQueue.qsize() - newQueue = queue.Queue(0) + newQueue: queue.Queue[Any] = queue.Queue(0) while not self._eventQueue.empty(): try: event = self._eventQueue.get() @@ -1034,7 +1041,7 @@ class EventManager: return False - def _processObjectEvent(self, event): + def _processObjectEvent(self, event: Atspi.Event) -> None: """Handles all object events destined for scripts. Arguments: @@ -1133,7 +1140,7 @@ class EventManager: msg = f"EVENT MANAGER: {key}: {value}" debug.printMessage(debug.LEVEL_INFO, msg, True) - def processBrailleEvent(self, brailleEvent: Any) -> bool: # brailleEvent: BrailleEvent + def processBrailleEvent(self, brailleEvent: input_event.BrailleEvent) -> bool: """Called whenever a cursor key is pressed on the Braille display. Arguments: @@ -1151,8 +1158,12 @@ class EventManager: _manager: Optional[EventManager] = None -def getManager() -> EventManager: +def getManager() -> Optional[EventManager]: global _manager if _manager is None: - _manager = cthulhu.cthulhuApp.eventManager + try: + if cthulhu.cthulhuApp: + _manager = cthulhu.cthulhuApp.eventManager + except AttributeError: + pass return _manager diff --git a/src/cthulhu/focus_manager.py b/src/cthulhu/focus_manager.py index 2068a2f..b81473f 100644 --- a/src/cthulhu/focus_manager.py +++ b/src/cthulhu/focus_manager.py @@ -40,7 +40,7 @@ __copyright__ = "Copyright (c) 2005-2008 Sun Microsystems Inc." \ "Copyright (c) 2016-2023 Igalia, S.L." __license__ = "LGPL" -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Tuple, Any import gi gi.require_version("Atspi", "2.0") @@ -54,7 +54,13 @@ from . import script_manager from .ax_object import AXObject from .ax_table import AXTable from .ax_text import AXText -def _get_ax_utilities(): + +if TYPE_CHECKING: + from .cthulhu import Cthulhu + from .input_event import InputEvent + from .scripts import default + +def _get_ax_utilities() -> Any: # Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager. from .ax_utilities import AXUtilities return AXUtilities @@ -65,10 +71,6 @@ def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, sta def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None: debug.print_log_tokens(debug.LEVEL_INFO, "FOCUS MANAGER", tokens, reason, timestamp, stack) -if TYPE_CHECKING: - from .input_event import InputEvent - from .scripts import default - CARET_TRACKING = "caret-tracking" FOCUS_TRACKING = "focus-tracking" FLAT_REVIEW = "flat-review" @@ -80,15 +82,15 @@ SAY_ALL = "say-all" class FocusManager: """Manages the focused object, window, etc.""" - def __init__(self, app) -> None: # Added app argument - self.app = app # Store app instance + def __init__(self, app: Cthulhu) -> None: + self.app: Cthulhu = app # Store app instance self._window: Optional[Atspi.Accessible] = cthulhu_state.activeWindow self._focus: Optional[Atspi.Accessible] = cthulhu_state.locusOfFocus self._object_of_interest: Optional[Atspi.Accessible] = cthulhu_state.objOfInterest self._active_mode: Optional[str] = cthulhu_state.activeMode - self._last_cell_coordinates: tuple[int, int] = (-1, -1) - self._last_cursor_position: tuple[Optional[Atspi.Accessible], int] = (None, -1) - self._penultimate_cursor_position: tuple[Optional[Atspi.Accessible], int] = (None, -1) + self._last_cell_coordinates: Tuple[int, int] = (-1, -1) + self._last_cursor_position: Tuple[Optional[Atspi.Accessible], int] = (None, -1) + self._penultimate_cursor_position: Tuple[Optional[Atspi.Accessible], int] = (None, -1) _log("Registering D-Bus commands.") controller = dbus_service.get_remote_controller() @@ -188,20 +190,20 @@ class FocusManager: def get_active_mode_and_object_of_interest( self - ) -> tuple[Optional[str], Optional[Atspi.Accessible]]: + ) -> Tuple[Optional[str], Optional[Atspi.Accessible]]: """Returns the current mode and associated object of interest""" _log_tokens(["Active mode:", self._active_mode, "Object of interest:", self._object_of_interest]) return self._active_mode, self._object_of_interest - def get_penultimate_cursor_position(self) -> tuple[Optional[Atspi.Accessible], int]: + def get_penultimate_cursor_position(self) -> Tuple[Optional[Atspi.Accessible], int]: """Returns the penultimate cursor position as a tuple of (object, offset).""" obj, offset = self._penultimate_cursor_position _log_tokens(["Penultimate cursor position:", obj, offset]) return obj, offset - def get_last_cursor_position(self) -> tuple[Optional[Atspi.Accessible], int]: + def get_last_cursor_position(self) -> Tuple[Optional[Atspi.Accessible], int]: """Returns the last cursor position as a tuple of (object, offset).""" obj, offset = self._last_cursor_position @@ -215,7 +217,7 @@ class FocusManager: self._penultimate_cursor_position = self._last_cursor_position self._last_cursor_position = obj, offset - def get_last_cell_coordinates(self) -> tuple[int, int]: + def get_last_cell_coordinates(self) -> Tuple[int, int]: """Returns the last known cell coordinates as a tuple of (row, column).""" row, column = self._last_cell_coordinates @@ -437,12 +439,16 @@ class FocusManager: return script.browse_mode_is_sticky() return False -_manager = None -def get_manager(): +_manager: Optional[FocusManager] = None +def get_manager() -> Optional[FocusManager]: """Returns the Focus Manager""" global _manager if _manager is None: - from . import cthulhu - _manager = FocusManager(cthulhu.cthulhuApp) + try: + from . import cthulhu + if cthulhu.cthulhuApp: + _manager = FocusManager(cthulhu.cthulhuApp) + except (ImportError, AttributeError): + pass return _manager diff --git a/src/cthulhu/input_event_manager.py b/src/cthulhu/input_event_manager.py index d680677..21ebb7f 100644 --- a/src/cthulhu/input_event_manager.py +++ b/src/cthulhu/input_event_manager.py @@ -39,7 +39,7 @@ __copyright__ = "Copyright (c) 2024 Igalia, S.L." \ "Copyright (c) 2024 GNOME Foundation Inc." __license__ = "LGPL" -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict import gi gi.require_version("Atspi", "2.0") @@ -64,9 +64,9 @@ class InputEventManager: self._last_input_event: Optional[input_event.InputEvent] = None self._last_non_modifier_key_event: Optional[input_event.KeyboardEvent] = None self._device: Optional[Atspi.Device] = None - self._mapped_keycodes: list[int] = [] - self._mapped_keysyms: list[int] = [] - self._grabbed_bindings: dict[int, keybindings.KeyBinding] = {} + self._mapped_keycodes: List[int] = [] + self._mapped_keysyms: List[int] = [] + self._grabbed_bindings: Dict[int, keybindings.KeyBinding] = {} self._paused: bool = False def start_key_watcher(self) -> None: @@ -100,14 +100,14 @@ class InputEventManager: msg = f"INPUT EVENT MANAGER: {grab_id} for: {binding}" debug.print_message(debug.LEVEL_INFO, msg, True) - def _get_key_definitions(self, binding: keybindings.KeyBinding) -> list[Atspi.KeyDefinition]: + def _get_key_definitions(self, binding: keybindings.KeyBinding) -> List[Atspi.KeyDefinition]: if hasattr(binding, "key_definitions"): return list(binding.key_definitions()) if hasattr(binding, "keyDefs"): return list(binding.keyDefs()) return [] - def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> list[int]: + def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> List[int]: """Adds grabs for binding if it is enabled, returns grab IDs.""" if self._device is None: @@ -427,7 +427,7 @@ class InputEventManager: return isinstance(self._last_input_event, input_event.MouseButtonEvent) - def is_release_for(self, event1, event2): + def is_release_for(self, event1: Optional[input_event.InputEvent], event2: Optional[input_event.InputEvent]) -> bool: """Returns True if event1 is a release for event2.""" if event1 is None or event2 is None: @@ -454,7 +454,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return result - def last_event_equals_or_is_release_for_event(self, event): + def last_event_equals_or_is_release_for_event(self, event: Optional[input_event.InputEvent]) -> bool: """Returns True if the last event equals the provided event, or is the release for it.""" if self._last_input_event is event: @@ -471,7 +471,7 @@ class InputEventManager: return self.is_release_for(self._last_non_modifier_key_event, event) - def _last_key_and_modifiers(self): + def _last_key_and_modifiers(self) -> Tuple[str, int]: """Returns the last keyval name and modifiers""" if self._last_non_modifier_key_event is None: @@ -480,9 +480,12 @@ class InputEventManager: if not self.last_event_was_keyboard(): return "", 0 + if self._last_input_event is None: + return self._last_non_modifier_key_event.keyval_name, 0 + return self._last_non_modifier_key_event.keyval_name, self._last_input_event.modifiers - def last_event_was_command(self): + def last_event_was_command(self) -> bool: """Returns True if the last event is believed to be a command.""" if bool(self._last_key_and_modifiers()[1] & 1 << Atspi.ModifierType.CONTROL): @@ -492,7 +495,7 @@ class InputEventManager: return False - def last_event_was_shortcut_for(self, obj): + def last_event_was_shortcut_for(self, obj: Atspi.Accessible) -> bool: """Returns True if the last event is believed to be a shortcut key for obj.""" string = self._last_key_and_modifiers()[0] @@ -511,12 +514,13 @@ class InputEventManager: debug.print_tokens(debug.LEVEL_INFO, tokens, True) return rv - def last_event_was_printable_key(self): + def last_event_was_printable_key(self) -> bool: """Returns True if the last event is believed to be a printable key.""" if not self.last_event_was_keyboard(): return False + assert isinstance(self._last_input_event, input_event.KeyboardEvent) if self._last_input_event.is_printable_key(): msg = "INPUT EVENT MANAGER: Last event was printable key" debug.print_message(debug.LEVEL_INFO, msg, True) @@ -524,7 +528,7 @@ class InputEventManager: return False - def last_event_was_caret_navigation(self): + def last_event_was_caret_navigation(self) -> bool: """Returns True if the last event is believed to be caret navigation.""" return self.last_event_was_character_navigation() \ @@ -534,7 +538,7 @@ class InputEventManager: or self.last_event_was_file_boundary_navigation() \ or self.last_event_was_page_navigation() - def last_event_was_caret_selection(self): + def last_event_was_caret_selection(self) -> bool: """Returns True if the last event is believed to be caret selection.""" string, mods = self._last_key_and_modifiers() @@ -548,7 +552,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_backward_caret_navigation(self): + def last_event_was_backward_caret_navigation(self) -> bool: """Returns True if the last event is believed to be backward caret navigation.""" string, mods = self._last_key_and_modifiers() @@ -562,7 +566,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_forward_caret_navigation(self): + def last_event_was_forward_caret_navigation(self) -> bool: """Returns True if the last event is believed to be forward caret navigation.""" string, mods = self._last_key_and_modifiers() @@ -576,7 +580,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_forward_caret_selection(self): + def last_event_was_forward_caret_selection(self) -> bool: """Returns True if the last event is believed to be forward caret selection.""" string, mods = self._last_key_and_modifiers() @@ -590,7 +594,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_character_navigation(self): + def last_event_was_character_navigation(self) -> bool: """Returns True if the last event is believed to be character navigation.""" string, mods = self._last_key_and_modifiers() @@ -606,7 +610,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_word_navigation(self): + def last_event_was_word_navigation(self) -> bool: """Returns True if the last event is believed to be word navigation.""" string, mods = self._last_key_and_modifiers() @@ -620,7 +624,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_previous_word_navigation(self): + def last_event_was_previous_word_navigation(self) -> bool: """Returns True if the last event is believed to be previous-word navigation.""" string, mods = self._last_key_and_modifiers() @@ -634,7 +638,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_next_word_navigation(self): + def last_event_was_next_word_navigation(self) -> bool: """Returns True if the last event is believed to be next-word navigation.""" string, mods = self._last_key_and_modifiers() @@ -648,7 +652,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_line_navigation(self): + def last_event_was_line_navigation(self) -> bool: """Returns True if the last event is believed to be line navigation.""" string, mods = self._last_key_and_modifiers() @@ -668,7 +672,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_paragraph_navigation(self): + def last_event_was_paragraph_navigation(self) -> bool: """Returns True if the last event is believed to be paragraph navigation.""" string, mods = self._last_key_and_modifiers() @@ -682,7 +686,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_line_boundary_navigation(self): + def last_event_was_line_boundary_navigation(self) -> bool: """Returns True if the last event is believed to be navigation to start/end of line.""" string, mods = self._last_key_and_modifiers() @@ -696,7 +700,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_file_boundary_navigation(self): + def last_event_was_file_boundary_navigation(self) -> bool: """Returns True if the last event is believed to be navigation to top/bottom of file.""" string, mods = self._last_key_and_modifiers() @@ -710,7 +714,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_page_navigation(self): + def last_event_was_page_navigation(self) -> bool: """Returns True if the last event is believed to be page navigation.""" string, mods = self._last_key_and_modifiers() @@ -730,7 +734,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_page_switch(self): + def last_event_was_page_switch(self) -> bool: """Returns True if the last event is believed to be a page switch.""" string, mods = self._last_key_and_modifiers() @@ -746,7 +750,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_tab_navigation(self): + def last_event_was_tab_navigation(self) -> bool: """Returns True if the last event is believed to be Tab navigation.""" string, mods = self._last_key_and_modifiers() @@ -762,7 +766,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_table_sort(self): + def last_event_was_table_sort(self) -> bool: """Returns True if the last event is believed to be a table sort.""" focus = focus_manager.get_manager().get_locus_of_focus() @@ -780,7 +784,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_unmodified_arrow(self): + def last_event_was_unmodified_arrow(self) -> bool: """Returns True if the last event is an unmodified arrow.""" string, mods = self._last_key_and_modifiers() @@ -799,83 +803,83 @@ class InputEventManager: return True - def last_event_was_alt_modified(self): + def last_event_was_alt_modified(self) -> bool: """Returns True if the last event was alt-modified.""" mods = self._last_key_and_modifiers()[-1] - return mods & 1 << Atspi.ModifierType.ALT + return bool(mods & 1 << Atspi.ModifierType.ALT) - def last_event_was_backspace(self): + def last_event_was_backspace(self) -> bool: """Returns True if the last event is BackSpace.""" return self._last_key_and_modifiers()[0] == "BackSpace" - def last_event_was_down(self): + def last_event_was_down(self) -> bool: """Returns True if the last event is Down.""" return self._last_key_and_modifiers()[0] == "Down" - def last_event_was_f1(self): + def last_event_was_f1(self) -> bool: """Returns True if the last event is F1.""" return self._last_key_and_modifiers()[0] == "F1" - def last_event_was_left(self): + def last_event_was_left(self) -> bool: """Returns True if the last event is Left.""" return self._last_key_and_modifiers()[0] == "Left" - def last_event_was_left_or_right(self): + def last_event_was_left_or_right(self) -> bool: """Returns True if the last event is Left or Right.""" return self._last_key_and_modifiers()[0] in ["Left", "Right"] - def last_event_was_page_up_or_page_down(self): + def last_event_was_page_up_or_page_down(self) -> bool: """Returns True if the last event is Page_Up or Page_Down.""" return self._last_key_and_modifiers()[0] in ["Page_Up", "Page_Down"] - def last_event_was_right(self): + def last_event_was_right(self) -> bool: """Returns True if the last event is Right.""" return self._last_key_and_modifiers()[0] == "Right" - def last_event_was_return(self): + def last_event_was_return(self) -> bool: """Returns True if the last event is Return.""" return self._last_key_and_modifiers()[0] == "Return" - def last_event_was_return_or_space(self): + def last_event_was_return_or_space(self) -> bool: """Returns True if the last event is Return or space.""" return self._last_key_and_modifiers()[0] in ["Return", "space", " "] - def last_event_was_return_tab_or_space(self): + def last_event_was_return_tab_or_space(self) -> bool: """Returns True if the last event is Return, Tab, or space.""" return self._last_key_and_modifiers()[0] in ["Return", "Tab", "space", " "] - def last_event_was_space(self): + def last_event_was_space(self) -> bool: """Returns True if the last event is space.""" return self._last_key_and_modifiers()[0] in [" ", "space"] - def last_event_was_tab(self): + def last_event_was_tab(self) -> bool: """Returns True if the last event is Tab.""" return self._last_key_and_modifiers()[0] == "Tab" - def last_event_was_up(self): + def last_event_was_up(self) -> bool: """Returns True if the last event is Up.""" return self._last_key_and_modifiers()[0] == "Up" - def last_event_was_up_or_down(self): + def last_event_was_up_or_down(self) -> bool: """Returns True if the last event is Up or Down.""" return self._last_key_and_modifiers()[0] in ["Up", "Down"] - def last_event_was_delete(self): + def last_event_was_delete(self) -> bool: """Returns True if the last event is believed to be delete.""" string, mods = self._last_key_and_modifiers() @@ -891,7 +895,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_cut(self): + def last_event_was_cut(self) -> bool: """Returns True if the last event is believed to be the cut command.""" string, mods = self._last_key_and_modifiers() @@ -904,39 +908,39 @@ class InputEventManager: return True return False - def last_event_was_copy(self): + def last_event_was_copy(self) -> bool: """Returns True if the last event is believed to be the copy command.""" string, mods = self._last_key_and_modifiers() if string.lower() != "c" or not mods & 1 << Atspi.ModifierType.CONTROL: rv = False elif AXUtilities.is_terminal(self._last_input_event.get_object()): - rv = mods & 1 << Atspi.ModifierType.SHIFT + rv = bool(mods & 1 << Atspi.ModifierType.SHIFT) else: - rv = not mods & 1 << Atspi.ModifierType.SHIFT + rv = not bool(mods & 1 << Atspi.ModifierType.SHIFT) if rv: msg = "INPUT EVENT MANAGER: Last event was copy" debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_paste(self): + def last_event_was_paste(self) -> bool: """Returns True if the last event is believed to be the paste command.""" string, mods = self._last_key_and_modifiers() if string.lower() != "v" or not mods & 1 << Atspi.ModifierType.CONTROL: rv = False elif AXUtilities.is_terminal(self._last_input_event.get_object()): - rv = mods & 1 << Atspi.ModifierType.SHIFT + rv = bool(mods & 1 << Atspi.ModifierType.SHIFT) else: - rv = not mods & 1 << Atspi.ModifierType.SHIFT + rv = not bool(mods & 1 << Atspi.ModifierType.SHIFT) if rv: msg = "INPUT EVENT MANAGER: Last event was paste" debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_undo(self): + def last_event_was_undo(self) -> bool: """Returns True if the last event is believed to be the undo command.""" string, mods = self._last_key_and_modifiers() @@ -948,16 +952,16 @@ class InputEventManager: return True return False - def last_event_was_redo(self): + def last_event_was_redo(self) -> bool: """Returns True if the last event is believed to be the redo command.""" string, mods = self._last_key_and_modifiers() if string.lower() == "z": - rv = mods & 1 << Atspi.ModifierType.CONTROL and mods & 1 << Atspi.ModifierType.SHIFT + rv = bool(mods & 1 << Atspi.ModifierType.CONTROL and mods & 1 << Atspi.ModifierType.SHIFT) elif string.lower() == "y": # LibreOffice - rv = mods & 1 << Atspi.ModifierType.CONTROL \ - and not mods & 1 << Atspi.ModifierType.SHIFT + rv = bool(mods & 1 << Atspi.ModifierType.CONTROL \ + and not mods & 1 << Atspi.ModifierType.SHIFT) else: rv = False @@ -966,7 +970,7 @@ class InputEventManager: debug.print_message(debug.LEVEL_INFO, msg, True) return rv - def last_event_was_select_all(self): + def last_event_was_select_all(self) -> bool: """Returns True if the last event is believed to be the select all command.""" string, mods = self._last_key_and_modifiers() @@ -979,7 +983,7 @@ class InputEventManager: return True return False - def last_event_was_primary_click(self): + def last_event_was_primary_click(self) -> bool: """Returns True if the last event is a primary mouse click.""" if not self.last_event_was_mouse_button(): @@ -991,7 +995,7 @@ class InputEventManager: return True return False - def last_event_was_primary_release(self): + def last_event_was_primary_release(self) -> bool: """Returns True if the last event is a primary mouse release.""" if not self.last_event_was_mouse_button(): @@ -1003,7 +1007,7 @@ class InputEventManager: return True return False - def last_event_was_primary_click_or_release(self): + def last_event_was_primary_click_or_release(self) -> bool: """Returns True if the last event is a primary mouse click or release.""" if not self.last_event_was_mouse_button(): @@ -1015,7 +1019,7 @@ class InputEventManager: return True return False - def last_event_was_middle_click(self): + def last_event_was_middle_click(self) -> bool: """Returns True if the last event is a middle mouse click.""" if not self.last_event_was_mouse_button(): @@ -1027,7 +1031,7 @@ class InputEventManager: return True return False - def last_event_was_middle_release(self): + def last_event_was_middle_release(self) -> bool: """Returns True if the last event is a middle mouse release.""" if not self.last_event_was_mouse_button(): @@ -1039,7 +1043,7 @@ class InputEventManager: return True return False - def last_event_was_secondary_click(self): + def last_event_was_secondary_click(self) -> bool: """Returns True if the last event is a secondary mouse click.""" if not self.last_event_was_mouse_button(): @@ -1051,7 +1055,7 @@ class InputEventManager: return True return False - def last_event_was_secondary_release(self): + def last_event_was_secondary_release(self) -> bool: """Returns True if the last event is a secondary mouse release.""" if not self.last_event_was_mouse_button(): @@ -1064,7 +1068,10 @@ class InputEventManager: return False -_manager = InputEventManager() -def get_manager(): +_manager: Optional[InputEventManager] = None +if _manager is None: + _manager = InputEventManager() + +def get_manager() -> InputEventManager: """Returns the Input Event Manager singleton.""" return _manager diff --git a/src/cthulhu/script_manager.py b/src/cthulhu/script_manager.py index 20ac2e4..9ef82eb 100644 --- a/src/cthulhu/script_manager.py +++ b/src/cthulhu/script_manager.py @@ -23,6 +23,8 @@ # Forked from Orca screen reader. # Cthulhu project: https://git.stormux.org/storm/cthulhu +from __future__ import annotations + __id__ = "$Id$" __version__ = "$Revision$" __date__ = "$Date$" @@ -30,18 +32,24 @@ __copyright__ = "Copyright (c) 2011. Cthulhu Team." __license__ = "LGPL" import importlib -from typing import Optional, Dict, Any +from typing import TYPE_CHECKING, Optional, Dict, Any, List, Union from . import debug from . import cthulhu_state from .ax_object import AXObject from .scripts import apps, toolkits +if TYPE_CHECKING: + from gi.repository import Atspi + from .cthulhu import Cthulhu + from .script import Script + from .input_event import InputEvent + # Forward references to avoid circular imports # Script is defined in script.py # Atspi.Accessible comes from AT-SPI -def _get_ax_utilities(): +def _get_ax_utilities() -> Any: # Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager -> braille -> settings_manager -> script_manager. from .ax_utilities import AXUtilities return AXUtilities @@ -49,22 +57,22 @@ def _get_ax_utilities(): def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None: debug.print_log(debug.LEVEL_INFO, "SCRIPT MANAGER", message, reason, timestamp, stack) -def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None: +def _log_tokens(tokens: list[Any], reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None: debug.print_log_tokens(debug.LEVEL_INFO, "SCRIPT MANAGER", tokens, reason, timestamp, stack) class ScriptManager: - def __init__(self, app: Any) -> None: # app is the CthulhuApp instance + def __init__(self, app: Cthulhu) -> None: _log("Initializing") - self.app: Any = app # Store app instance - self.appScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script] - self.toolkitScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]] - self.customScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]] - self._sleepModeScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script] - self._appModules: list = apps.__all__ - self._toolkitModules: list = toolkits.__all__ - self._defaultScript: Optional[Any] = None # Optional[Script] - self._scriptPackages: list[str] = \ + self.app: Cthulhu = app # Store app instance + self.appScripts: Dict[Atspi.Accessible, Script] = {} + self.toolkitScripts: Dict[Atspi.Accessible, Dict[str, Script]] = {} + self.customScripts: Dict[Atspi.Accessible, Dict[str, Script]] = {} + self._sleepModeScripts: Dict[Atspi.Accessible, Script] = {} + self._appModules: List[str] = apps.__all__ + self._toolkitModules: List[str] = toolkits.__all__ + self._defaultScript: Optional[Script] = None + self._scriptPackages: List[str] = \ ["cthulhu-scripts", "cthulhu.scripts", "cthulhu.scripts.apps", @@ -73,6 +81,8 @@ class ScriptManager: {'Icedove': 'Thunderbird', 'Nereid': 'Banshee', 'gnome-calculator': 'gcalctool', + 'Steam': 'steamwebhelper', + 'Steam Web Helper': 'steamwebhelper', 'gtk-window-decorator': 'switcher', 'marco': 'switcher', 'xfce4-notifyd': 'notification-daemon', @@ -92,7 +102,8 @@ class ScriptManager: _log("Activating") self._defaultScript = self.get_script(None) - self._defaultScript.registerEventListeners() + if self._defaultScript: + self._defaultScript.registerEventListeners() self.set_active_script(self._defaultScript, "lifecycle: activate") self._active = True _log("Activated") @@ -111,7 +122,7 @@ class ScriptManager: self._active = False _log("Deactivated") - def get_module_name(self, app: Optional[Any]) -> Optional[str]: # app: Optional[Atspi.Accessible] + def get_module_name(self, app: Optional[Atspi.Accessible]) -> Optional[str]: """Returns the module name of the script to use for application app.""" if app is None: @@ -148,19 +159,20 @@ class ScriptManager: _log_tokens(["Mapped", app, "to", name]) return name - def _toolkit_for_object(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible] + def _toolkit_for_object(self, obj: Optional[Atspi.Accessible]) -> Optional[str]: """Returns the name of the toolkit associated with obj.""" - + if obj is None: + return None name = AXObject.get_attribute(obj, 'toolkit') return self._toolkitNames.get(name, name) - def _script_for_role(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible] + def _script_for_role(self, obj: Optional[Atspi.Accessible]) -> str: if _get_ax_utilities().is_terminal(obj): return 'terminal' return '' - def _new_named_script(self, app: Optional[Any], name: Optional[str]) -> Optional[Any]: # Returns Optional[Script] + def _new_named_script(self, app: Optional[Atspi.Accessible], name: Optional[str]) -> Optional[Script]: """Attempts to locate and load the named module. If successful, returns a script based on this module.""" @@ -189,7 +201,7 @@ class ScriptManager: return script - def _create_script(self, app: Optional[Any], obj: Optional[Any] = None) -> Any: # Returns Script + def _create_script(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible] = None) -> Script: """For the given application, create a new script instance.""" moduleName = self.get_module_name(app) @@ -212,7 +224,7 @@ class ScriptManager: return script - def get_default_script(self, app: Optional[Any] = None) -> Any: # Returns Script + def get_default_script(self, app: Optional[Atspi.Accessible] = None) -> Script: if not app and self._defaultScript: return self._defaultScript @@ -224,7 +236,7 @@ class ScriptManager: return script - def sanity_check_script(self, script: Any) -> Any: # Returns Script + def sanity_check_script(self, script: Script) -> Script: if not self._active: return script @@ -238,7 +250,8 @@ class ScriptManager: _log_tokens(["Failed to get a replacement script for", script.app], "replacement-missing") return script - def get_script_for_mouse_button_event(self, event: Any) -> Any: # Returns Script + def get_script_for_mouse_button_event(self, event: Any) -> Script: + # Note: event type unspecified in original code, likely InputEvent or similar isActive = _get_ax_utilities().is_active(cthulhu_state.activeWindow) _log_tokens([cthulhu_state.activeWindow, "is active:", isActive]) @@ -256,10 +269,10 @@ class ScriptManager: return self.get_script(AXObject.get_application(activeWindow), activeWindow) - def get_active_script(self) -> Optional[Any]: # Returns Optional[Script] + def get_active_script(self) -> Optional[Script]: return cthulhu_state.activeScript - def get_script(self, app: Optional[Any], obj: Optional[Any] = None, sanity_check: bool = False) -> Any: # Returns Script + def get_script(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible] = None, sanity_check: bool = False) -> Script: """Get a script for an app (and make it if necessary). This is used instead of a simple calls to Script's constructor. @@ -275,21 +288,24 @@ class ScriptManager: roleName = self._script_for_role(obj) if roleName: - customScripts = self.customScripts.get(app, {}) + customScripts = self.customScripts.get(app, {}) # type: ignore customScript = customScripts.get(roleName) if not customScript: customScript = self._new_named_script(app, roleName) - customScripts[roleName] = customScript - self.customScripts[app] = customScripts + if customScript: + customScripts[roleName] = customScript + if app: + self.customScripts[app] = customScripts objToolkit = self._toolkit_for_object(obj) if objToolkit: - toolkitScripts = self.toolkitScripts.get(app, {}) + toolkitScripts = self.toolkitScripts.get(app, {}) # type: ignore toolkitScript = toolkitScripts.get(objToolkit) if not toolkitScript: toolkitScript = self._create_script(app, obj) toolkitScripts[objToolkit] = toolkitScript - self.toolkitScripts[app] = toolkitScripts + if app: + self.toolkitScripts[app] = toolkitScripts try: if not app: @@ -317,7 +333,7 @@ class ScriptManager: return appScript - def get_or_create_sleep_mode_script(self, app: Any) -> Any: # Returns Script + def get_or_create_sleep_mode_script(self, app: Atspi.Accessible) -> Script: """Gets or creates the sleep mode script.""" script = self._sleepModeScripts.get(app) if script is not None: @@ -329,7 +345,7 @@ class ScriptManager: self._sleepModeScripts[app] = script return script - def set_active_script(self, newScript: Optional[Any], reason: Optional[str] = None) -> None: # newScript: Optional[Script] + def set_active_script(self, newScript: Optional[Script], reason: Optional[str] = None) -> None: """Set the new active script. Arguments: @@ -351,12 +367,13 @@ class ScriptManager: # Emit signal that active script has changed, so PluginSystemManager can update keybindings from . import cthulhu - cthulhu.cthulhuApp.getSignalManager().emitSignal('active-script-changed', newScript) + if cthulhu.cthulhuApp: + cthulhu.cthulhuApp.getSignalManager().emitSignal('active-script-changed', newScript) _log_tokens(["Setting active script to", newScript], reason) self._log_active_state(reason) - def activate_script_for_context(self, app: Optional[Any], obj: Optional[Any], reason: Optional[str] = None) -> Any: # Returns Script + def activate_script_for_context(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible], reason: Optional[str] = None) -> Script: script = self.get_script(app, obj) self.set_active_script(script, reason) return script @@ -369,7 +386,7 @@ class ScriptManager: reason ) - def _get_script_for_app_replicant(self, app: Any) -> Optional[Any]: # Returns Optional[Script] + def _get_script_for_app_replicant(self, app: Atspi.Accessible) -> Optional[Script]: if not self._active: return None @@ -439,11 +456,12 @@ class ScriptManager: _manager: Optional[ScriptManager] = None -def get_manager() -> ScriptManager: +def get_manager() -> Optional[ScriptManager]: """Returns the Script Manager""" global _manager if _manager is None: from . import cthulhu - _manager = ScriptManager(cthulhu.cthulhuApp) + if cthulhu.cthulhuApp: + _manager = ScriptManager(cthulhu.cthulhuApp) return _manager diff --git a/src/cthulhu/settings_manager.py b/src/cthulhu/settings_manager.py index 40f9b88..3c6e1bb 100644 --- a/src/cthulhu/settings_manager.py +++ b/src/cthulhu/settings_manager.py @@ -26,6 +26,8 @@ """Settings manager module. This will load/save user settings from a defined settings backend.""" +from __future__ import annotations + __id__ = "$Id$" __version__ = "$Revision$" __date__ = "$Date$" @@ -36,6 +38,8 @@ import copy import importlib import json import os +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + from gi.repository import Gio, GLib from . import debug @@ -46,6 +50,11 @@ from .acss import ACSS from .ax_object import AXObject from .keybindings import KeyBinding +if TYPE_CHECKING: + from .cthulhu import Cthulhu + from .script import Script + from .input_event import InputEventHandler + # Removed global cthulhuApp.scriptManager declaration. # Note: Do not import cthulhu module here to avoid circular import @@ -53,14 +62,14 @@ class SettingsManager(object): """Settings backend manager. This class manages cthulhu user's settings using different backends""" - def __init__(self, app, backend='json'): # Modified signature + def __init__(self, app: Cthulhu, backend: str = 'json') -> None: # Modified signature debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initializing', True) - self.app = app # Store app instance + self.app: Cthulhu = app # Store app instance # Move _proxy initialization here try: - self._proxy = Gio.DBusProxy.new_for_bus_sync( + self._proxy: Optional[Gio.DBusProxy] = Gio.DBusProxy.new_for_bus_sync( Gio.BusType.SESSION, Gio.DBusProxyFlags.NONE, None, @@ -71,51 +80,51 @@ class SettingsManager(object): except Exception: self._proxy = None - self.backendModule = None - self._backend = None - self.profile = None - self.backendName = backend - self._prefsDir = None + self.backendModule: Optional[Any] = None + self._backend: Optional[Any] = None + self.profile: Optional[str] = None + self.backendName: str = backend + self._prefsDir: Optional[str] = None # Dictionaries for store the default values # The keys and values are defined at cthulhu.settings # - self.defaultGeneral = {} - self.defaultPronunciations = {} - self.defaultKeybindings = {} + self.defaultGeneral: Dict[str, Any] = {} + self.defaultPronunciations: Dict[str, Any] = {} + self.defaultKeybindings: Dict[str, Any] = {} # Dictionaries that store the key:value pairs which values are # different from the current profile and the default ones # - self.profileGeneral = {} - self.profilePronunciations = {} - self.profileKeybindings = {} + self.profileGeneral: Dict[str, Any] = {} + self.profilePronunciations: Dict[str, Any] = {} + self.profileKeybindings: Dict[str, Any] = {} # Dictionaries that store the current settings. # They are result to overwrite the default values with # the ones from the current active profile - self.general = {} - self.pronunciations = {} - self.keybindings = {} + self.general: Dict[str, Any] = {} + self.pronunciations: Dict[str, Any] = {} + self.keybindings: Dict[str, Any] = {} - self._activeApp = "" - self._appGeneral = {} - self._appPronunciations = {} - self._appKeybindings = {} - self._lastRoleSoundPresentation = None + self._activeApp: str = "" + self._appGeneral: Dict[str, Any] = {} + self._appPronunciations: Dict[str, Any] = {} + self._appKeybindings: Dict[str, Any] = {} + self._lastRoleSoundPresentation: Optional[Any] = None if not self._loadBackend(): raise Exception('SettingsManager._loadBackend failed.') - self.customizedSettings = {} - self._customizationCompleted = False + self.customizedSettings: Dict[str, Any] = {} + self._customizationCompleted: bool = False # For handling the currently-"classic" application settings - self.settingsPackages = ["app-settings"] + self.settingsPackages: List[str] = ["app-settings"] debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initialized', True) - def activate(self, prefsDir=None, customSettings={}): + def activate(self, prefsDir: Optional[str] = None, customSettings: Dict[str, Any] = {}) -> None: debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Activating', True) self.customizedSettings.update(customSettings) @@ -123,12 +132,13 @@ class SettingsManager(object): or os.path.join(GLib.get_user_data_dir(), "cthulhu") # Load the backend and the default values - self._backend = self.backendModule.Backend(self._prefsDir) + if self.backendModule: + self._backend = self.backendModule.Backend(self._prefsDir) self._setDefaultGeneral() self._setDefaultPronunciations() self._setDefaultKeybindings() self.general = self.defaultGeneral.copy() - if not self.isFirstStart(): + if not self.isFirstStart() and self._backend: self.general.update(self._backend.getGeneral()) self.pronunciations = self.defaultPronunciations.copy() self.keybindings = self.defaultKeybindings.copy() @@ -146,13 +156,14 @@ class SettingsManager(object): debug.printTokens(debug.LEVEL_INFO, tokens, True) if self.profile is None: - self.profile = self.general.get('startingProfile')[1] + self.profile = self.general.get('startingProfile')[1] # type: ignore tokens = ["SETTINGS MANAGER: Current profile is now", self.profile] debug.printTokens(debug.LEVEL_INFO, tokens, True) - self.setProfile(self.profile) + if self.profile: + self.setProfile(self.profile) - def _loadBackend(self): + def _loadBackend(self) -> bool: """Load specific backend for manage user settings""" try: @@ -162,14 +173,17 @@ class SettingsManager(object): except Exception: return False - def _createDefaults(self): + def _createDefaults(self) -> None: """Let the active backend to create the initial structure for storing the settings and save the default ones from cthulhu.settings""" - def _createDir(dirName): + def _createDir(dirName: str) -> None: if not os.path.isdir(dirName): os.makedirs(dirName) + if not self._prefsDir: + return + # Set up the user's preferences directory # ($XDG_DATA_HOME/cthulhu by default). # @@ -200,20 +214,20 @@ class SettingsManager(object): if not os.path.exists(userCustomFile): os.close(os.open(userCustomFile, os.O_CREAT, 0o700)) - if self.isFirstStart(): + if self.isFirstStart() and self._backend: self._backend.saveDefaultSettings(self.defaultGeneral, self.defaultPronunciations, self.defaultKeybindings) - def _setDefaultPronunciations(self): + def _setDefaultPronunciations(self) -> None: """Get the pronunciations by default from cthulhu.settings""" self.defaultPronunciations = {} - def _setDefaultKeybindings(self): + def _setDefaultKeybindings(self) -> None: """Get the keybindings by default from cthulhu.settings""" self.defaultKeybindings = {} - def _setDefaultGeneral(self): + def _setDefaultGeneral(self) -> None: """Get the general settings by default from cthulhu.settings""" self._getCustomizedSettings() self.defaultGeneral = {} @@ -241,7 +255,7 @@ class SettingsManager(object): if default_active_plugins is not None: self.defaultGeneral["activePlugins"] = default_active_plugins - def _load_default_general_overrides(self): + def _load_default_general_overrides(self) -> Dict[str, Any]: if not self._prefsDir: return {} @@ -261,7 +275,7 @@ class SettingsManager(object): if not isinstance(general, dict): return {} - if hasattr(self._backend, "_migrateSettings"): + if self._backend and hasattr(self._backend, "_migrateSettings"): try: general = self._backend._migrateSettings(dict(general)) except Exception as error: @@ -283,12 +297,12 @@ class SettingsManager(object): return general - def getDefaultSetting(self, settingName): + def getDefaultSetting(self, settingName: str) -> Any: if settingName in self.defaultGeneral: return self.defaultGeneral.get(settingName) return getattr(settings, settingName, None) - def _getCustomizedSettings(self): + def _getCustomizedSettings(self) -> Dict[str, Any]: if self._customizationCompleted: return self.customizedSettings @@ -302,20 +316,25 @@ class SettingsManager(object): customValue = settings.__dict__.get(key) if value != customValue: self.customizedSettings[key] = customValue + + return self.customizedSettings - def _loadUserCustomizations(self): + def _loadUserCustomizations(self) -> bool: """Attempt to load the user's cthulhu-customizations. Returns a boolean indicating our success at doing so, where success is measured by the likelihood that the results won't be different if we keep trying.""" success = False + if not self._prefsDir: + return False + pathList = [self._prefsDir] tokens = ["SETTINGS MANAGER: Attempt to load cthulhu-customizations"] module_path = pathList[0] + "/cthulhu-customizations.py" try: spec = importlib.util.spec_from_file_location("cthulhu-customizations", module_path) - if spec is not None: + if spec is not None and spec.loader: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) tokens.extend(["from", module_path, "succeeded."]) @@ -327,21 +346,21 @@ class SettingsManager(object): except Exception as error: # Treat this failure as a "success" so that we don't stomp on the existing file. success = True - tokens.extend(["failed due to:", error, ". Not loading customizations."]) + tokens.extend(["failed due to:", str(error), ". Not loading customizations."]) debug.printTokens(debug.LEVEL_ALL, tokens, True) return success - def getPrefsDir(self): + def getPrefsDir(self) -> Optional[str]: return self._prefsDir - def setSetting(self, settingName, settingValue): + def setSetting(self, settingName: str, settingValue: Any) -> None: self._setSettingsRuntime({settingName:settingValue}) - def getSetting(self, settingName): + def getSetting(self, settingName: str) -> Any: return getattr(settings, settingName, None) - def getVoiceLocale(self, voice='default'): + def getVoiceLocale(self, voice: str = 'default') -> str: voices = self.getSetting('voices') v = ACSS(voices.get(voice, {})) lang = v.getLocale() @@ -350,7 +369,7 @@ class SettingsManager(object): lang = f"{lang}_{dialect.upper()}" return lang - def getSpeechServerFactories(self): + def getSpeechServerFactories(self) -> List[Any]: """Imports all known SpeechServer factory modules.""" factories = [] @@ -366,7 +385,7 @@ class SettingsManager(object): return factories - def _loadProfileSettings(self, profile=None): + def _loadProfileSettings(self, profile: Optional[str] = None) -> None: """Get from the active backend all the settings for the current profile and store them in the object's attributes. A profile can be passed as a parameter. This could be useful for @@ -377,14 +396,16 @@ class SettingsManager(object): if profile is None: profile = self.profile - self.profileGeneral = self.getGeneralSettings(profile) or {} - self.profilePronunciations = self.getPronunciations(profile) or {} - self.profileKeybindings = self.getKeybindings(profile) or {} + + if profile: + self.profileGeneral = self.getGeneralSettings(profile) or {} + self.profilePronunciations = self.getPronunciations(profile) or {} + self.profileKeybindings = self.getKeybindings(profile) or {} tokens = ["SETTINGS MANAGER: Settings for", profile, "profile loaded"] debug.printTokens(debug.LEVEL_INFO, tokens, True) - def _mergeSettings(self): + def _mergeSettings(self) -> None: """Update the changed values on the profile settings over the current and active settings""" @@ -402,7 +423,7 @@ class SettingsManager(object): msg = 'SETTINGS MANAGER: Settings merged.' debug.printMessage(debug.LEVEL_INFO, msg, True) - def _enableAccessibility(self): + def _enableAccessibility(self) -> bool: """Enables the GNOME accessibility flag. Users need to log out and then back in for this to take effect. @@ -416,22 +437,27 @@ class SettingsManager(object): return not alreadyEnabled - def isAccessibilityEnabled(self): + def isAccessibilityEnabled(self) -> bool: msg = 'SETTINGS MANAGER: Checking if accessibility is enabled.' debug.printMessage(debug.LEVEL_INFO, msg, True) msg = 'SETTINGS MANAGER: Accessibility enabled: ' + rv = False if not self._proxy: rv = False msg += 'Error (no proxy)' else: - rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'IsEnabled') - msg += str(rv) + try: + rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'IsEnabled') + msg += str(rv) + except Exception: + rv = False + msg += 'Error calling DBus' debug.printMessage(debug.LEVEL_INFO, msg, True) return rv - def setAccessibility(self, enable): + def setAccessibility(self, enable: bool) -> Union[bool, None]: msg = f'SETTINGS MANAGER: Attempting to set accessibility to {enable}.' debug.printMessage(debug.LEVEL_INFO, msg, True) @@ -440,38 +466,47 @@ class SettingsManager(object): debug.printMessage(debug.LEVEL_INFO, msg, True) return False - vEnable = GLib.Variant('b', enable) - self._proxy.Set('(ssv)', 'org.a11y.Status', 'IsEnabled', vEnable) + try: + vEnable = GLib.Variant('b', enable) + self._proxy.Set('(ssv)', 'org.a11y.Status', 'IsEnabled', vEnable) + except Exception: + pass msg = f'SETTINGS MANAGER: Finished setting accessibility to {enable}.' debug.printMessage(debug.LEVEL_INFO, msg, True) + return None - def isScreenReaderServiceEnabled(self): + def isScreenReaderServiceEnabled(self) -> bool: """Returns True if the screen reader service is enabled. Note that this does not necessarily mean that Cthulhu (or any other screen reader) is running at the moment.""" msg = 'SETTINGS MANAGER: Is screen reader service enabled? ' + rv = False if not self._proxy: rv = False msg += 'Error (no proxy)' else: - rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'ScreenReaderEnabled') - msg += str(rv) + try: + rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'ScreenReaderEnabled') + msg += str(rv) + except Exception: + rv = False debug.printMessage(debug.LEVEL_INFO, msg, True) return rv - def setStartingProfile(self, profile=None): + def setStartingProfile(self, profile: Optional[tuple] = None) -> None: if profile is None: profile = settings.profile - self._backend._setProfileKey('startingProfile', profile) + if self._backend: + self._backend._setProfileKey('startingProfile', profile) - def getProfile(self): + def getProfile(self) -> Optional[str]: return self.profile - def setProfile(self, profile='default', updateLocale=False): + def setProfile(self, profile: str = 'default', updateLocale: bool = False) -> None: """Set a specific profile as the active one. Also the settings from that profile will be loading and updated the current settings with them.""" @@ -497,10 +532,11 @@ class SettingsManager(object): tokens = ["SETTINGS MANAGER: Profile set to:", profile] debug.printTokens(debug.LEVEL_INFO, tokens, True) - def removeProfile(self, profile): - self._backend.removeProfile(profile) + def removeProfile(self, profile: str) -> None: + if self._backend: + self._backend.removeProfile(profile) - def _setSettingsRuntime(self, settingsDict): + def _setSettingsRuntime(self, settingsDict: Dict[str, Any]) -> None: msg = 'SETTINGS MANAGER: Setting runtime settings.' debug.printMessage(debug.LEVEL_INFO, msg, True) @@ -514,7 +550,7 @@ class SettingsManager(object): debug.printMessage(debug.LEVEL_INFO, msg, True) self._logRoleSoundPresentationChange() - def _logRoleSoundPresentationChange(self): + def _logRoleSoundPresentationChange(self) -> None: current = getattr(settings, "roleSoundPresentation", None) if current == self._lastRoleSoundPresentation: return @@ -526,31 +562,37 @@ class SettingsManager(object): debug.printMessage(debug.LEVEL_INFO, msg, True) self._lastRoleSoundPresentation = current - def _setPronunciationsRuntime(self, pronunciationsDict): + def _setPronunciationsRuntime(self, pronunciationsDict: Dict[str, Any]) -> None: pronunciation_dict.pronunciation_dict = {} for key, value in pronunciationsDict.values(): if key and value: pronunciation_dict.setPronunciation(key, value) - def getGeneralSettings(self, profile='default'): + def getGeneralSettings(self, profile: str = 'default') -> Dict[str, Any]: """Return the current general settings. Those settings comes from updating the default settings with the profiles' ones""" - return self._backend.getGeneral(profile) + if self._backend: + return self._backend.getGeneral(profile) + return {} - def getPronunciations(self, profile='default'): + def getPronunciations(self, profile: str = 'default') -> Dict[str, Any]: """Return the current pronunciations settings. Those settings comes from updating the default settings with the profiles' ones""" - return self._backend.getPronunciations(profile) + if self._backend: + return self._backend.getPronunciations(profile) + return {} - def getKeybindings(self, profile='default'): + def getKeybindings(self, profile: str = 'default') -> Dict[str, Any]: """Return the current keybindings settings. Those settings comes from updating the default settings with the profiles' ones""" - return self._backend.getKeybindings(profile) + if self._backend: + return self._backend.getKeybindings(profile) + return {} - def _setProfileGeneral(self, general): + def _setProfileGeneral(self, general: Dict[str, Any]) -> None: """Set the changed general settings from the defaults' ones as the profile's.""" @@ -572,7 +614,7 @@ class SettingsManager(object): msg = 'SETTINGS MANAGER: General settings for profile set' debug.printMessage(debug.LEVEL_INFO, msg, True) - def _setProfilePronunciations(self, pronunciations): + def _setProfilePronunciations(self, pronunciations: Dict[str, Any]) -> None: """Set the changed pronunciations settings from the defaults' ones as the profile's.""" @@ -585,7 +627,7 @@ class SettingsManager(object): msg = 'SETTINGS MANAGER: Pronunciation settings for profile set.' debug.printMessage(debug.LEVEL_INFO, msg, True) - def _setProfileKeybindings(self, keybindings): + def _setProfileKeybindings(self, keybindings: Dict[str, Any]) -> None: """Set the changed keybindings settings from the defaults' ones as the profile's.""" @@ -598,21 +640,24 @@ class SettingsManager(object): msg = 'SETTINGS MANAGER: Keybindings settings for profile set.' debug.printMessage(debug.LEVEL_INFO, msg, True) - def _saveAppSettings(self, appName, general, pronunciations, keybindings): + def _saveAppSettings(self, appName: str, general: Dict[str, Any], pronunciations: Dict[str, Any], keybindings: Dict[str, Any]) -> None: + if not self._backend: + return + appGeneral = {} - profileGeneral = self.getGeneralSettings(self.profile) + profileGeneral = self.getGeneralSettings(self.profile) if self.profile else {} for key, value in general.items(): if value != profileGeneral.get(key): appGeneral[key] = value appPronunciations = {} - profilePronunciations = self.getPronunciations(self.profile) + profilePronunciations = self.getPronunciations(self.profile) if self.profile else {} for key, value in pronunciations.items(): if value != profilePronunciations.get(key): appPronunciations[key] = value appKeybindings = {} - profileKeybindings = self.getKeybindings(self.profile) + profileKeybindings = self.getKeybindings(self.profile) if self.profile else {} for key, value in keybindings.items(): if value != profileKeybindings.get(key): appKeybindings[key] = value @@ -623,15 +668,17 @@ class SettingsManager(object): appPronunciations, appKeybindings) - def saveSettings(self, script, general, pronunciations, keybindings): + def saveSettings(self, script: Script, general: Dict[str, Any], pronunciations: Dict[str, Any], keybindings: Dict[str, Any]) -> Optional[bool]: """Save the settings provided for the script provided.""" tokens = ["SETTINGS MANAGER: Saving settings for", script, "(app:", script.app, ")"] debug.printTokens(debug.LEVEL_INFO, tokens, True) app = script.app if app: - self._saveAppSettings(AXObject.get_name(app), general, pronunciations, keybindings) - return + appName = AXObject.get_name(app) + if appName: + self._saveAppSettings(appName, general, pronunciations, keybindings) + return None # Assign current profile _profile = general.get('profile', settings.profile) @@ -650,16 +697,17 @@ class SettingsManager(object): tokens = ["SETTINGS MANAGER: Saving for backend", self._backend] debug.printTokens(debug.LEVEL_INFO, tokens, True) - self._backend.saveProfileSettings(self.profile, - self.profileGeneral, - self.profilePronunciations, - self.profileKeybindings) + if self._backend and self.profile: + self._backend.saveProfileSettings(self.profile, + self.profileGeneral, + self.profilePronunciations, + self.profileKeybindings) tokens = ["SETTINGS MANAGER: Settings for", script, "(app:", script.app, ") saved"] debug.printTokens(debug.LEVEL_INFO, tokens, True) return self._enableAccessibility() - def _adjustBindingTupleValues(self, bindingTuple): + def _adjustBindingTupleValues(self, bindingTuple: tuple) -> tuple: """Converts the values of bindingTuple into KeyBinding-ready values.""" keysym, mask, mods, clicks = bindingTuple @@ -670,10 +718,10 @@ class SettingsManager(object): return bindingTuple - def overrideKeyBindings(self, script, scriptKeyBindings): + def overrideKeyBindings(self, script: Script, scriptKeyBindings: Any) -> Any: keybindingsSettings = self.profileKeybindings for handlerString, bindingTuples in keybindingsSettings.items(): - handler = script.inputEventHandlers.get(handlerString) + handler: Optional[InputEventHandler] = script.inputEventHandlers.get(handlerString) if not handler: continue @@ -686,23 +734,27 @@ class SettingsManager(object): return scriptKeyBindings - def isFirstStart(self): + def isFirstStart(self) -> bool: """Check if the firstStart key is True or false""" - return self._backend.isFirstStart() + if self._backend: + return self._backend.isFirstStart() + return False - def setFirstStart(self, value=False): + def setFirstStart(self, value: bool = False) -> None: """Set firstStart. This user-configurable setting is primarily intended to serve as an indication as to whether or not initial configuration is needed.""" - self._backend.setFirstStart(value) + if self._backend: + self._backend.setFirstStart(value) - def availableProfiles(self): + def availableProfiles(self) -> List[str]: """Get available profiles from active backend""" + if self._backend: + return self._backend.availableProfiles() + return [] - return self._backend.availableProfiles() - - def getAppSetting(self, app, settingName, fallbackOnDefault=True): - if not app: + def getAppSetting(self, app: Any, settingName: str, fallbackOnDefault: bool = True) -> Any: + if not app or not self._backend: return None appPrefs = self._backend.getAppSettings(AXObject.get_name(app)) @@ -710,13 +762,13 @@ class SettingsManager(object): profilePrefs = profiles.get(self.profile, {}) general = profilePrefs.get('general', {}) appSetting = general.get(settingName) - if appSetting is None and fallbackOnDefault: + if appSetting is None and fallbackOnDefault and self.profile: general = self._backend.getGeneral(self.profile) appSetting = general.get(settingName) return appSetting - def loadAppSettings(self, script): + def loadAppSettings(self, script: Script) -> None: """Load the users application specific settings for an app. Arguments: @@ -726,8 +778,12 @@ class SettingsManager(object): if not (script and script.app): return - for key in self._appPronunciations.keys(): - self.pronunciations.pop(key) + if not self._backend: + return + + for key in list(self._appPronunciations.keys()): + if key in self.pronunciations: + self.pronunciations.pop(key) prefs = self._backend.getAppSettings(AXObject.get_name(script.app)) profiles = prefs.get('profiles', {}) @@ -736,7 +792,7 @@ class SettingsManager(object): self._appGeneral = profilePrefs.get('general', {}) self._appKeybindings = profilePrefs.get('keybindings', {}) self._appPronunciations = profilePrefs.get('pronunciations', {}) - self._activeApp = AXObject.get_name(script.app) + self._activeApp = AXObject.get_name(script.app) or "" self._loadProfileSettings() self._mergeSettings() @@ -744,9 +800,9 @@ class SettingsManager(object): self._setPronunciationsRuntime(self.pronunciations) script.keyBindings = self.overrideKeyBindings(script, script.getKeyBindings()) -_managerInstance = None +_managerInstance: Optional[SettingsManager] = None -def getManager(): +def getManager() -> Optional[SettingsManager]: """Get the settings manager instance. Compatibility function. This function provides backward compatibility for existing code that uses @@ -759,7 +815,8 @@ def getManager(): if _managerInstance is None: try: from . import cthulhu - _managerInstance = cthulhu.cthulhuApp.settingsManager + if cthulhu.cthulhuApp: + _managerInstance = cthulhu.cthulhuApp.settingsManager except (ImportError, AttributeError): # During import phase, cthulhuApp may not exist yet pass diff --git a/src/cthulhu/speech.py b/src/cthulhu/speech.py index b4d3248..9ddf27c 100644 --- a/src/cthulhu/speech.py +++ b/src/cthulhu/speech.py @@ -26,6 +26,8 @@ """Manages the default speech server for cthulhu. A script can use this as its speech server, or it can feel free to create one of its own.""" +from __future__ import annotations + __id__ = "$Id$" __version__ = "$Revision$" __date__ = "$Date$" @@ -34,6 +36,7 @@ __license__ = "LGPL" import importlib import time +from typing import TYPE_CHECKING, Optional, List, Dict, Any, Union from . import debug from . import logger @@ -46,26 +49,31 @@ from .speechserver import VoiceFamily from .acss import ACSS from . import speech_history -# Lazy initialization to avoid circular imports -_logger = None -log = None +if TYPE_CHECKING: + from .speechserver import SpeechServer + from .logger import Logger -def _ensureLogger(): +# Lazy initialization to avoid circular imports +_logger: Optional[Logger] = None +log: Optional[Any] = None # Logger.newLog returns a log object, keeping Any for now as logger isn't typed + +def _ensureLogger() -> None: """Ensure logger is initialized.""" global _logger, log if _logger is None: from . import cthulhu - _logger = cthulhu.cthulhuApp.logger - log = _logger.newLog("speech") + if cthulhu.cthulhuApp: + _logger = cthulhu.cthulhuApp.logger + log = _logger.newLog("speech") # The speech server to use for all speech operations. # -_speechserver = None +_speechserver: Optional[SpeechServer] = None # The last time something was spoken. -_timestamp = 0 +_timestamp: float = 0.0 -def _initSpeechServer(moduleName, speechServerInfo): +def _initSpeechServer(moduleName: Optional[str], speechServerInfo: Optional[Any]) -> None: global _speechserver @@ -84,19 +92,20 @@ def _initSpeechServer(moduleName, speechServerInfo): # Now, get the speech server we care about. # speechServerInfo = settings.speechServerInfo - if speechServerInfo: - _speechserver = factory.SpeechServer.getSpeechServer(speechServerInfo) - - if not _speechserver: - _speechserver = factory.SpeechServer.getSpeechServer() + if factory: if speechServerInfo: - tokens = ["SPEECH: Invalid speechServerInfo:", speechServerInfo] - debug.printTokens(debug.LEVEL_INFO, tokens, True) + _speechserver = factory.SpeechServer.getSpeechServer(speechServerInfo) # type: ignore + + if not _speechserver: + _speechserver = factory.SpeechServer.getSpeechServer() # type: ignore + if speechServerInfo: + tokens = ["SPEECH: Invalid speechServerInfo:", speechServerInfo] + debug.printTokens(debug.LEVEL_INFO, tokens, True) if not _speechserver: raise Exception(f"ERROR: No speech server for factory: {moduleName}") -def init(): +def init() -> None: debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initializing', True) if _speechserver: debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Already initialized', True) @@ -134,16 +143,32 @@ def init(): debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initialized', True) -def checkSpeechSetting(): +def checkSpeechSetting() -> None: msg = "SPEECH: Checking speech setting." debug.printMessage(debug.LEVEL_INFO, msg, True) if not settings.enableSpeech: - shutdown() - else: + if _speechserver: + shutdown() + return + + if not _speechserver: init() -def __resolveACSS(acss=None): +def getSpeechServer() -> Optional[SpeechServer]: + """Returns the speech server instance.""" + return _speechserver + +def setSpeechServer(speechServer: SpeechServer) -> None: + """Sets the speech server to be used. + + Arguments: + - speechServer: the speech server to use. + """ + global _speechserver + _speechserver = speechServer + +def __resolveACSS(acss: Optional[Any] = None) -> ACSS: if isinstance(acss, ACSS): family = acss.get(acss.FAMILY) try: @@ -160,35 +185,36 @@ def __resolveACSS(acss=None): voices = settings.voices return ACSS(voices[settings.DEFAULT_VOICE]) -def sayAll(utteranceIterator, progressCallback): +def sayAll(utteranceIterator: Any, progressCallback: Any) -> None: _ensureLogger() if settings.silenceSpeech: return if _speechserver: - def _speechHistorySayAllWrapper(): + def _speechHistorySayAllWrapper() -> Any: for [context, acss] in utteranceIterator: try: utterance = getattr(context, "utterance", None) if isinstance(utterance, str) and utterance.strip(): - speech_history.add(utterance, source="sayAll") + speech_history.add(utterance, source="sayAll") # type: ignore except Exception: debug.printException(debug.LEVEL_INFO) yield [context, acss] - _speechserver.sayAll(_speechHistorySayAllWrapper(), progressCallback) + _speechserver.sayAll(_speechHistorySayAllWrapper(), progressCallback) # type: ignore else: - for [context, acss] in utteranceIterator: - logLine = f"SPEECH OUTPUT: '{context.utterance}'" - debug.printMessage(debug.LEVEL_INFO, logLine, True) - log.info(logLine) - try: - utterance = getattr(context, "utterance", None) - if isinstance(utterance, str) and utterance.strip(): - speech_history.add(utterance, source="sayAll-fallback") - except Exception: - debug.printException(debug.LEVEL_INFO) + if log: + for [context, acss] in utteranceIterator: + logLine = f"SPEECH OUTPUT: '{context.utterance}'" + debug.printMessage(debug.LEVEL_INFO, logLine, True) + log.info(logLine) + try: + utterance = getattr(context, "utterance", None) + if isinstance(utterance, str) and utterance.strip(): + speech_history.add(utterance, source="sayAll-fallback") # type: ignore + except Exception: + debug.printException(debug.LEVEL_INFO) -def _speak(text, acss, interrupt): +def _speak(text: str, acss: Optional[Any], interrupt: bool) -> None: """Speaks the individual string using the given ACSS.""" _ensureLogger() @@ -198,7 +224,7 @@ def _speak(text, acss, interrupt): from . import sleep_mode_manager if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'): sleepModeManager = sleep_mode_manager.getManager() - if sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app): + if sleepModeManager and sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app): # Allow sleep mode status messages to get through if "Sleep mode enabled" in text or "Sleep mode disabled" in text: debug.printMessage(debug.LEVEL_INFO, f"SPEECH: Allowing sleep mode status: '{text}'", True) @@ -207,14 +233,15 @@ def _speak(text, acss, interrupt): return try: - speech_history.add(text, source="speak") + speech_history.add(text, source="speak") # type: ignore except Exception: debug.printException(debug.LEVEL_INFO) if not _speechserver: logLine = f"SPEECH OUTPUT: '{text}' {acss}" debug.printMessage(debug.LEVEL_INFO, logLine, True) - log.info(logLine) + if log: + log.info(logLine) return voice = ACSS(settings.voices.get(settings.DEFAULT_VOICE)) @@ -227,9 +254,9 @@ def _speak(text, acss, interrupt): resolvedVoice = __resolveACSS(voice) msg = f"SPEECH OUTPUT: '{text}' {resolvedVoice}" debug.printMessage(debug.LEVEL_INFO, msg, True) - _speechserver.speak(text, resolvedVoice, interrupt) + _speechserver.speak(text, resolvedVoice, interrupt) # type: ignore -def speak(content, acss=None, interrupt=True): +def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = True) -> None: """Speaks the given content. The content can be either a simple string or an array of arrays of objects returned by a speech generator.""" @@ -242,7 +269,7 @@ def speak(content, acss=None, interrupt=True): from . import sleep_mode_manager if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'): sleepModeManager = sleep_mode_manager.getManager() - if sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app): + if sleepModeManager and sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app): # Allow sleep mode status messages to get through if isinstance(content, str): if "Sleep mode enabled" in content or "Sleep mode disabled" in content: @@ -265,7 +292,7 @@ def speak(content, acss=None, interrupt=True): validTypes = (str, list, speech_generator.Pause, speech_generator.LineBreak, ACSS, Icon) error = "SPEECH: bad content sent to speak(): '%s'" - if not isinstance(content, validTypes): + if not isinstance(content, validTypes): # type: ignore debug.printMessage(debug.LEVEL_INFO, error % content, True) return @@ -288,13 +315,13 @@ def speak(content, acss=None, interrupt=True): return shouldInterrupt = interrupt - toSpeak = [] + toSpeak: List[str] = [] activeVoice = acss if acss is not None: activeVoice = ACSS(acss) for element in content: - if not isinstance(element, validTypes): + if not isinstance(element, validTypes): # type: ignore debug.printMessage(debug.LEVEL_INFO, error % element, True) elif isinstance(element, list): speak(element, acss, shouldInterrupt) @@ -310,7 +337,8 @@ def speak(content, acss=None, interrupt=True): toSpeak = [] if element.isValid(): player = sound.getPlayer() - player.play(element, interrupt=interrupt) + if player: + player.play(element, interrupt=interrupt) elif toSpeak: newVoice = ACSS(acss) newItemsToSpeak = [] @@ -338,7 +366,7 @@ def speak(content, acss=None, interrupt=True): string = " ".join(toSpeak) _speak(string, activeVoice, shouldInterrupt) -def speakKeyEvent(event, acss=None): +def speakKeyEvent(event: Any, acss: Optional[Any] = None) -> None: """Speaks a key event immediately. Arguments: @@ -362,12 +390,13 @@ def speakKeyEvent(event, acss=None): msg = f"{keyname} {lockingStateString}" logLine = f"SPEECH OUTPUT: '{msg.strip()}' {acss}" debug.printMessage(debug.LEVEL_INFO, logLine, True) - log.info(logLine) + if log: + log.info(logLine) if _speechserver: - _speechserver.speakKeyEvent(event, acss) + _speechserver.speakKeyEvent(event, acss) # type: ignore -def speakCharacter(character, acss=None): +def speakCharacter(character: str, acss: Optional[Any] = None) -> None: """Speaks a single character immediately. Arguments: @@ -393,32 +422,30 @@ def speakCharacter(character, acss=None): acss = __resolveACSS(acss) tokens = [f"SPEECH OUTPUT: '{character}'", acss] debug.printTokens(debug.LEVEL_INFO, tokens, True) - log.info(f"SPEECH OUTPUT: '{character}'") + if log: + log.info(f"SPEECH OUTPUT: '{character}'") if _speechserver: - _speechserver.speakCharacter(character, acss=acss) + _speechserver.speakCharacter(character, acss=acss) # type: ignore -def getInfo(): +def getInfo() -> Optional[Any]: info = None if _speechserver: - info = _speechserver.getInfo() + info = _speechserver.getInfo() # type: ignore return info -def stop(): +def stop() -> None: if _speechserver: - _speechserver.stop() + _speechserver.stop() # type: ignore -def shutdown(): +def shutdown() -> None: debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Shutting down', True) global _speechserver if _speechserver: - _speechserver.shutdownActiveServers() + _speechserver.shutdownActiveServers() # type: ignore _speechserver = None -def reset(text=None, acss=None): +def reset(text: Optional[str] = None, acss: Optional[Any] = None) -> None: if _speechserver: - _speechserver.reset(text, acss) - -def getSpeechServer(): - return _speechserver + _speechserver.reset(text, acss) # type: ignore diff --git a/src/cthulhu/structural_navigation.py b/src/cthulhu/structural_navigation.py index 49998fc..9764d4c 100644 --- a/src/cthulhu/structural_navigation.py +++ b/src/cthulhu/structural_navigation.py @@ -1487,9 +1487,13 @@ class StructuralNavigation: def _chunkCriteria(self, arg=None): return AXCollection.create_match_rule(roles=self.OBJECT_ROLES + self.CONTAINER_ROLES) - def _chunkPredicate(self, obj, arg=None): + def _chunkPredicate(self, obj: Atspi.Accessible, arg=None) -> bool: if AXUtilities.is_heading(obj): return True + if AXUtilities.is_list(obj): + return True + if AXUtilities.is_table(obj): + return True text = self._script.utilities.queryNonEmptyText(obj) if not text: