More type hints added.

This commit is contained in:
Storm Dragon
2026-01-20 16:34:18 -05:00
parent 75b2d482eb
commit fa77d76aa5
7 changed files with 520 additions and 363 deletions
+53 -23
View File
@@ -36,13 +36,26 @@ __copyright__ = "Copyright (c) 2004-2009 Sun Microsystems Inc." \
__license__ = "LGPL" __license__ = "LGPL"
import faulthandler import faulthandler
from typing import TYPE_CHECKING, Any, Callable, Optional from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from . import dbus_service from . import dbus_service
if TYPE_CHECKING: if TYPE_CHECKING:
from types import FrameType from types import FrameType
from gi.repository.Gio import Settings as GSettings from gi.repository.Gio import Settings as GSettings
from gi.repository import Gtk
from .settings_manager import SettingsManager
from .script_manager import ScriptManager
from .plugin_system_manager import PluginSystemManager
from .input_event import InputEvent
from .input_event_manager import InputEventManager
from .event_manager import EventManager
from .signal_manager import SignalManager
from .dynamic_api_manager import DynamicApiManager
from .speech import Speech
from .braille import Braille
from .script import Script
class APIHelper: class APIHelper:
"""Helper class for plugin API interactions, including keybindings.""" """Helper class for plugin API interactions, including keybindings."""
@@ -54,7 +67,7 @@ class APIHelper:
- app: the Cthulhu application - app: the Cthulhu application
""" """
self.app: Cthulhu = app self.app: Cthulhu = app
self._gestureBindings: dict[Optional[str], list[Any]] = {} self._gestureBindings: Dict[Optional[str], List[Any]] = {}
def registerGestureByString( def registerGestureByString(
self, self,
@@ -914,7 +927,7 @@ def main() -> int:
class Cthulhu(GObject.Object): class Cthulhu(GObject.Object):
# basic signals # basic signals
__gsignals__: dict[str, tuple[Any, ...]] = { __gsignals__: Dict[str, Tuple[Any, ...]] = {
"start-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()), "start-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()),
"stop-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()), "stop-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()),
"load-setting-begin": (GObject.SignalFlags.RUN_LAST, None, ()), "load-setting-begin": (GObject.SignalFlags.RUN_LAST, None, ()),
@@ -927,52 +940,69 @@ class Cthulhu(GObject.Object):
def __init__(self) -> None: def __init__(self) -> None:
GObject.Object.__init__(self) GObject.Object.__init__(self)
# add members # add members
self.resourceManager: Any = resource_manager.ResourceManager(self) self.resourceManager: resource_manager.ResourceManager = resource_manager.ResourceManager(self)
self.settingsManager: Any = settings_manager.SettingsManager(self) # Directly instantiate self.settingsManager: SettingsManager = settings_manager.SettingsManager(self) # Directly instantiate
self.eventManager: Any = event_manager.EventManager(self) # Directly instantiate self.eventManager: EventManager = event_manager.EventManager(self) # Directly instantiate
self.scriptManager: Any = script_manager.ScriptManager(self) # Directly instantiate self.scriptManager: ScriptManager = script_manager.ScriptManager(self) # Directly instantiate
self.logger: Any = logger.Logger() # Directly instantiate self.logger: logger.Logger = logger.Logger() # Directly instantiate
self.signalManager: Any = signal_manager.SignalManager(self) self.signalManager: SignalManager = signal_manager.SignalManager(self)
self.dynamicApiManager: Any = dynamic_api_manager.DynamicApiManager(self) self.dynamicApiManager: DynamicApiManager = dynamic_api_manager.DynamicApiManager(self)
self.translationManager: Any = translation_manager.TranslationManager(self) self.translationManager: TranslationManager = translation_manager.TranslationManager(self)
self.debugManager: Any = debug self.debugManager: Any = debug
self.APIHelper: APIHelper = APIHelper(self) self.APIHelper: APIHelper = APIHelper(self)
self.createCompatAPI() self.createCompatAPI()
self.pluginSystemManager: Any = plugin_system_manager.PluginSystemManager(self) self.pluginSystemManager: PluginSystemManager = plugin_system_manager.PluginSystemManager(self)
# Scan for available plugins at startup # Scan for available plugins at startup
self.pluginSystemManager.rescanPlugins() self.pluginSystemManager.rescanPlugins()
def getAPIHelper(self) -> APIHelper: def getAPIHelper(self) -> APIHelper:
return self.APIHelper return self.APIHelper
def getPluginSystemManager(self) -> Any:
def getPluginSystemManager(self) -> PluginSystemManager:
return self.pluginSystemManager return self.pluginSystemManager
def getDynamicApiManager(self) -> Any:
def getDynamicApiManager(self) -> DynamicApiManager:
return self.dynamicApiManager return self.dynamicApiManager
def getSignalManager(self) -> Any:
def getSignalManager(self) -> SignalManager:
return self.signalManager return self.signalManager
def getEventManager(self) -> Any:
def getEventManager(self) -> EventManager:
return self.eventManager return self.eventManager
def getSettingsManager(self) -> Any:
def getSettingsManager(self) -> SettingsManager:
return self.settingsManager return self.settingsManager
def getScriptManager(self) -> Any:
def getScriptManager(self) -> ScriptManager:
return self.scriptManager return self.scriptManager
def get_scriptManager(self) -> Any:
def get_scriptManager(self) -> ScriptManager:
return self.scriptManager return self.scriptManager
def getDebugManager(self) -> Any: def getDebugManager(self) -> Any:
return self.debugManager return self.debugManager
def getTranslationManager(self) -> Any:
def getTranslationManager(self) -> TranslationManager:
return self.translationManager return self.translationManager
def getResourceManager(self) -> Any:
def getResourceManager(self) -> resource_manager.ResourceManager:
return self.resourceManager return self.resourceManager
def getLogger(self) -> Any: # New getter for the logger
def getLogger(self) -> logger.Logger: # New getter for the logger
return self.logger return self.logger
def addKeyGrab(self, binding: Any) -> list[int]:
def addKeyGrab(self, binding: Any) -> List[int]:
return addKeyGrab(binding) return addKeyGrab(binding)
def removeKeyGrab(self, grab_id: int) -> None: def removeKeyGrab(self, grab_id: int) -> None:
return removeKeyGrab(grab_id) return removeKeyGrab(grab_id)
def run(self, cacheValues: bool = True) -> int: def run(self, cacheValues: bool = True) -> int:
return main() return main()
def stop(self) -> None: def stop(self) -> None:
pass pass
def createCompatAPI(self) -> None: def createCompatAPI(self) -> None:
# for now add compatibility layer using Dynamic API # for now add compatibility layer using Dynamic API
# should be removed step by step # should be removed step by step
+52 -41
View File
@@ -23,6 +23,8 @@
# Forked from Orca screen reader. # Forked from Orca screen reader.
# Cthulhu project: https://git.stormux.org/storm/cthulhu # Cthulhu project: https://git.stormux.org/storm/cthulhu
from __future__ import annotations
__id__ = "$Id$" __id__ = "$Id$"
__version__ = "$Revision$" __version__ = "$Revision$"
__date__ = "$Date$" __date__ = "$Date$"
@@ -36,7 +38,7 @@ from gi.repository import GLib
import queue import queue
import threading import threading
import time import time
from typing import Optional, Dict, List, Tuple, Any from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, Union
from . import cthulhu from . import cthulhu
from . import debug from . import debug
@@ -48,21 +50,26 @@ from . import settings
from .ax_object import AXObject from .ax_object import AXObject
from .ax_utilities import AXUtilities from .ax_utilities import AXUtilities
if TYPE_CHECKING:
from .cthulhu import Cthulhu
from .script import Script
from .input_event_manager import InputEventManager
class EventManager: class EventManager:
EMBEDDED_OBJECT_CHARACTER: str = '\ufffc' EMBEDDED_OBJECT_CHARACTER: str = '\ufffc'
def __init__(self, app: Any, asyncMode: bool = True) -> None: # app is CthulhuApp instance def __init__(self, app: Cthulhu, asyncMode: bool = True) -> None:
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Initializing', True) debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Initializing', True)
debug.printMessage(debug.LEVEL_INFO, f'EVENT MANAGER: Async Mode is {asyncMode}', True) debug.printMessage(debug.LEVEL_INFO, f'EVENT MANAGER: Async Mode is {asyncMode}', True)
self.app: Any = app self.app: Cthulhu = app
self._asyncMode: bool = asyncMode self._asyncMode: bool = asyncMode
self._scriptListenerCounts: Dict[str, int] = {} self._scriptListenerCounts: Dict[str, int] = {}
self._active: bool = False self._active: bool = False
self._enqueueCount: int = 0 self._enqueueCount: int = 0
self._dequeueCount: int = 0 self._dequeueCount: int = 0
self._cmdlineCache: Dict[int, str] = {} self._cmdlineCache: Dict[int, str] = {}
self._eventQueue: queue.Queue = queue.Queue(0) self._eventQueue: queue.Queue[Any] = queue.Queue(0)
self._gidleId: int = 0 self._gidleId: int = 0
self._gidleLock: threading.Lock = threading.Lock() self._gidleLock: threading.Lock = threading.Lock()
self._gilSleepTime: float = 0.00001 self._gilSleepTime: float = 0.00001
@@ -81,15 +88,15 @@ class EventManager:
'object:state-changed:sensitive', 'object:state-changed:sensitive',
'object:state-changed:showing', 'object:state-changed:showing',
'object:text-changed:delete'] 'object:text-changed:delete']
self._eventsTriggeringSuspension: List[Any] = [] # List of events self._eventsTriggeringSuspension: List[Atspi.Event] = []
self._ignoredEvents: List[str] = ['object:bounds-changed', self._ignoredEvents: List[str] = ['object:bounds-changed',
'object:state-changed:defunct', 'object:state-changed:defunct',
'object:property-change:accessible-parent'] 'object:property-change:accessible-parent']
self._parentsOfDefunctDescendants: List[Any] = [] # List[Atspi.Accessible] self._parentsOfDefunctDescendants: List[Atspi.Accessible] = []
cthulhu_state.device = None cthulhu_state.device = None
self._keyHandlingActive: bool = False self._keyHandlingActive: bool = False
self._inputEventManager: Optional[Any] = None # Optional[InputEventManager] self._inputEventManager: Optional[InputEventManager] = None
debug.printMessage(debug.LEVEL_INFO, 'Event manager initialized', True) debug.printMessage(debug.LEVEL_INFO, 'Event manager initialized', True)
@@ -174,7 +181,7 @@ class EventManager:
if eventType in self._ignoredEvents: if eventType in self._ignoredEvents:
self._ignoredEvents.remove(eventType) self._ignoredEvents.remove(eventType)
def _isDuplicateEvent(self, event: Any) -> bool: # event: Atspi.Event def _isDuplicateEvent(self, event: Atspi.Event) -> bool:
"""Returns True if this event is already in the event queue.""" """Returns True if this event is already in the event queue."""
if self._inFlood() and self._prioritizeDuringFlood(event): if self._inFlood() and self._prioritizeDuringFlood(event):
@@ -193,7 +200,7 @@ class EventManager:
return False return False
def _getAppCmdline(self, app: Any) -> str: # app: Atspi.Accessible def _getAppCmdline(self, app: Atspi.Accessible) -> str:
pid = AXObject.get_process_id(app) pid = AXObject.get_process_id(app)
if pid == -1: if pid == -1:
return "" return ""
@@ -204,7 +211,7 @@ class EventManager:
self._cmdlineCache[pid] = cmdline self._cmdlineCache[pid] = cmdline
return cmdline return cmdline
def _isSteamApp(self, app: Any) -> bool: # app: Atspi.Accessible def _isSteamApp(self, app: Atspi.Accessible) -> bool:
name = AXObject.get_name(app) name = AXObject.get_name(app)
if not name: if not name:
nameLower = "" nameLower = ""
@@ -217,7 +224,7 @@ class EventManager:
cmdline = self._getAppCmdline(app) cmdline = self._getAppCmdline(app)
return "steamwebhelper" in cmdline return "steamwebhelper" in cmdline
def _isSteamNotificationEvent(self, event: Any) -> bool: # event: Atspi.Event def _isSteamNotificationEvent(self, event: Atspi.Event) -> bool:
for obj in (event.any_data, event.source): for obj in (event.any_data, event.source):
if not isinstance(obj, Atspi.Accessible): if not isinstance(obj, Atspi.Accessible):
continue continue
@@ -230,7 +237,7 @@ class EventManager:
return False return False
def _ignore(self, event: Any) -> bool: # event: Atspi.Event def _ignore(self, event: Atspi.Event) -> bool:
"""Returns True if this event should be ignored.""" """Returns True if this event should be ignored."""
app = AXObject.get_application(event.source) app = AXObject.get_application(event.source)
@@ -238,19 +245,19 @@ class EventManager:
tokens = ["EVENT MANAGER:", event.type, "from", app] tokens = ["EVENT MANAGER:", event.type, "from", app]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
def _log_ignore(reason, message): def _log_ignore(reason: str, message: str) -> None:
debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER", debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER",
f"Ignoring - {message}", reason, True) f"Ignoring - {message}", reason, True)
def _log_allow(reason, message): def _log_allow(reason: str, message: str) -> None:
debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER", debug.print_log(debug.LEVEL_INFO, "EVENT MANAGER",
f"Not ignoring - {message}", reason, True) f"Not ignoring - {message}", reason, True)
def _ignore_with_reason(reason, message): def _ignore_with_reason(reason: str, message: str) -> bool:
_log_ignore(reason, message) _log_ignore(reason, message)
return True return True
def _allow_with_reason(reason, message): def _allow_with_reason(reason: str, message: str) -> bool:
_log_allow(reason, message) _log_allow(reason, message)
return False return False
@@ -430,7 +437,7 @@ class EventManager:
return _allow_with_reason("no-cause", "no ignore condition met") return _allow_with_reason("no-cause", "no ignore condition met")
def _addToQueue(self, event, asyncMode): def _addToQueue(self, event: Any, asyncMode: bool) -> None:
debugging = debug.debugEventQueue debugging = debug.debugEventQueue
if debugging: if debugging:
debug.printMessage(debug.LEVEL_ALL, " acquiring lock...") debug.printMessage(debug.LEVEL_ALL, " acquiring lock...")
@@ -457,7 +464,7 @@ class EventManager:
if debug.debugEventQueue: if debug.debugEventQueue:
debug.printMessage(debug.LEVEL_ALL, " ...released") debug.printMessage(debug.LEVEL_ALL, " ...released")
def _queuePrintln(self, e, isEnqueue=True, isPrune=None): def _queuePrintln(self, e: Any, isEnqueue: bool = True, isPrune: Optional[bool] = None) -> None:
"""Convenience method to output queue-related debugging info.""" """Convenience method to output queue-related debugging info."""
if debug.LEVEL_INFO < debug.debugLevel: if debug.LEVEL_INFO < debug.debugLevel:
@@ -483,7 +490,7 @@ class EventManager:
tokens[0:0] = ["EVENT MANAGER: Dequeued"] tokens[0:0] = ["EVENT MANAGER: Dequeued"]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
def _suspendEvents(self, triggeringEvent): def _suspendEvents(self, triggeringEvent: Atspi.Event) -> None:
self._eventsTriggeringSuspension.append(triggeringEvent) self._eventsTriggeringSuspension.append(triggeringEvent)
if self._eventsSuspended: if self._eventsSuspended:
@@ -499,7 +506,7 @@ class EventManager:
self._eventsSuspended = True self._eventsSuspended = True
def _unsuspendEvents(self, triggeringEvent, force=False): def _unsuspendEvents(self, triggeringEvent: Atspi.Event, force: bool = False) -> None:
if triggeringEvent in self._eventsTriggeringSuspension: if triggeringEvent in self._eventsTriggeringSuspension:
self._eventsTriggeringSuspension.remove(triggeringEvent) self._eventsTriggeringSuspension.remove(triggeringEvent)
@@ -521,7 +528,7 @@ class EventManager:
self._eventsSuspended = False self._eventsSuspended = False
def _shouldSuspendEventsFor(self, event): def _shouldSuspendEventsFor(self, event: Atspi.Event) -> bool:
if AXUtilities.is_frame(event.source) \ if AXUtilities.is_frame(event.source) \
or (AXUtilities.is_window(event.source) \ or (AXUtilities.is_window(event.source) \
and AXUtilities.get_application_toolkit_name(event.source) == "clutter"): and AXUtilities.get_application_toolkit_name(event.source) == "clutter"):
@@ -541,7 +548,7 @@ class EventManager:
return False return False
def _shouldUnsuspendEventsFor(self, event): def _shouldUnsuspendEventsFor(self, event: Atspi.Event) -> bool:
if event.type.startswith("object:state-changed:focused") and event.detail1: if event.type.startswith("object:state-changed:focused") and event.detail1:
msg = "EVENT MANAGER: Should unsuspend events for newly-focused object." msg = "EVENT MANAGER: Should unsuspend events for newly-focused object."
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -559,10 +566,10 @@ class EventManager:
return False return False
def _didSuspendEventsFor(self, event): def _didSuspendEventsFor(self, event: Atspi.Event) -> bool:
return event in self._eventsTriggeringSuspension return event in self._eventsTriggeringSuspension
def _enqueue(self, e): def _enqueue(self, e: Atspi.Event) -> None:
"""Handles the enqueueing of all events destined for scripts. """Handles the enqueueing of all events destined for scripts.
Arguments: Arguments:
@@ -621,7 +628,7 @@ class EventManager:
if debug.debugEventQueue: if debug.debugEventQueue:
self._enqueueCount -= 1 self._enqueueCount -= 1
def _isNoFocus(self): def _isNoFocus(self) -> bool:
if cthulhu_state.locusOfFocus or cthulhu_state.activeWindow or cthulhu_state.activeScript: if cthulhu_state.locusOfFocus or cthulhu_state.activeWindow or cthulhu_state.activeScript:
return False return False
@@ -629,7 +636,7 @@ class EventManager:
debug.printMessage(debug.LEVEL_SEVERE, msg, True) debug.printMessage(debug.LEVEL_SEVERE, msg, True)
return True return True
def _onNoFocus(self): def _onNoFocus(self) -> bool:
if not self._isNoFocus(): if not self._isNoFocus():
return False return False
@@ -638,7 +645,7 @@ class EventManager:
defaultScript.idleMessage() defaultScript.idleMessage()
return False return False
def _dequeue(self): def _dequeue(self) -> bool:
"""Handles all events destined for scripts. Called by the GTK """Handles all events destined for scripts. Called by the GTK
idle thread.""" idle thread."""
@@ -736,7 +743,7 @@ class EventManager:
self._listener.deregister(eventType) self._listener.deregister(eventType)
del self._scriptListenerCounts[eventType] del self._scriptListenerCounts[eventType]
def registerScriptListeners(self, script: Any) -> None: # script: Script def registerScriptListeners(self, script: Script) -> None:
"""Tells the event manager to start listening for all the event types """Tells the event manager to start listening for all the event types
of interest to the script. of interest to the script.
@@ -750,7 +757,7 @@ class EventManager:
for eventType in script.listeners.keys(): for eventType in script.listeners.keys():
self.registerListener(eventType) self.registerListener(eventType)
def deregisterScriptListeners(self, script: Any) -> None: # script: Script def deregisterScriptListeners(self, script: Script) -> None:
"""Tells the event manager to stop listening for all the event types """Tells the event manager to stop listening for all the event types
of interest to the script. of interest to the script.
@@ -764,7 +771,7 @@ class EventManager:
for eventType in script.listeners.keys(): for eventType in script.listeners.keys():
self.deregisterListener(eventType) self.deregisterListener(eventType)
def _processInputEvent(self, event): def _processInputEvent(self, event: Any) -> None:
"""Processes the given input event based on the keybinding from the """Processes the given input event based on the keybinding from the
currently-active script. currently-active script.
@@ -798,7 +805,7 @@ class EventManager:
debug.printMessage(debug.eventDebugLevel, msg, False) debug.printMessage(debug.eventDebugLevel, msg, False)
@staticmethod @staticmethod
def _get_scriptForEvent(event: Any) -> Optional[Any]: # Returns Optional[Script] def _get_scriptForEvent(event: Any) -> Optional[Script]:
"""Returns the script associated with event.""" """Returns the script associated with event."""
if event.type.startswith("mouse:"): if event.type.startswith("mouse:"):
@@ -836,7 +843,7 @@ class EventManager:
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
return script return script
def _isActivatableEvent(self, event: Any, script: Optional[Any] = None) -> Tuple[bool, str]: def _isActivatableEvent(self, event: Atspi.Event, script: Optional[Script] = None) -> Tuple[bool, str]:
"""Determines if the event is one which should cause us to """Determines if the event is one which should cause us to
change which script is currently active. change which script is currently active.
@@ -902,7 +909,7 @@ class EventManager:
return False, "No reason found to activate a different script." return False, "No reason found to activate a different script."
def _eventSourceIsDead(self, event: Any) -> bool: # event: Atspi.Event def _eventSourceIsDead(self, event: Atspi.Event) -> bool:
if AXObject.is_dead(event.source): if AXObject.is_dead(event.source):
tokens = ["EVENT MANAGER: source of", event.type, "is dead"] tokens = ["EVENT MANAGER: source of", event.type, "is dead"]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
@@ -910,7 +917,7 @@ class EventManager:
return False return False
def _ignoreDuringDeluge(self, event: Any) -> bool: # event: Atspi.Event def _ignoreDuringDeluge(self, event: Atspi.Event) -> bool:
"""Returns true if this event should be ignored during a deluge.""" """Returns true if this event should be ignored during a deluge."""
if self._eventSourceIsDead(event): if self._eventSourceIsDead(event):
@@ -946,7 +953,7 @@ class EventManager:
return False return False
def _processDuringFlood(self, event: Any) -> bool: # event: Atspi.Event def _processDuringFlood(self, event: Atspi.Event) -> bool:
"""Returns true if this event should be processed during a flood.""" """Returns true if this event should be processed during a flood."""
if self._eventSourceIsDead(event): if self._eventSourceIsDead(event):
@@ -973,7 +980,7 @@ class EventManager:
return event.source == cthulhu_state.locusOfFocus return event.source == cthulhu_state.locusOfFocus
def _prioritizeDuringFlood(self, event: Any) -> bool: # event: Atspi.Event def _prioritizeDuringFlood(self, event: Atspi.Event) -> bool:
"""Returns true if this event should be prioritized during a flood.""" """Returns true if this event should be prioritized during a flood."""
if event.type.startswith("object:state-changed:focused"): if event.type.startswith("object:state-changed:focused"):
@@ -1007,7 +1014,7 @@ class EventManager:
oldSize = self._eventQueue.qsize() oldSize = self._eventQueue.qsize()
newQueue = queue.Queue(0) newQueue: queue.Queue[Any] = queue.Queue(0)
while not self._eventQueue.empty(): while not self._eventQueue.empty():
try: try:
event = self._eventQueue.get() event = self._eventQueue.get()
@@ -1034,7 +1041,7 @@ class EventManager:
return False return False
def _processObjectEvent(self, event): def _processObjectEvent(self, event: Atspi.Event) -> None:
"""Handles all object events destined for scripts. """Handles all object events destined for scripts.
Arguments: Arguments:
@@ -1133,7 +1140,7 @@ class EventManager:
msg = f"EVENT MANAGER: {key}: {value}" msg = f"EVENT MANAGER: {key}: {value}"
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
def processBrailleEvent(self, brailleEvent: Any) -> bool: # brailleEvent: BrailleEvent def processBrailleEvent(self, brailleEvent: input_event.BrailleEvent) -> bool:
"""Called whenever a cursor key is pressed on the Braille display. """Called whenever a cursor key is pressed on the Braille display.
Arguments: Arguments:
@@ -1151,8 +1158,12 @@ class EventManager:
_manager: Optional[EventManager] = None _manager: Optional[EventManager] = None
def getManager() -> EventManager: def getManager() -> Optional[EventManager]:
global _manager global _manager
if _manager is None: if _manager is None:
_manager = cthulhu.cthulhuApp.eventManager try:
if cthulhu.cthulhuApp:
_manager = cthulhu.cthulhuApp.eventManager
except AttributeError:
pass
return _manager return _manager
+25 -19
View File
@@ -40,7 +40,7 @@ __copyright__ = "Copyright (c) 2005-2008 Sun Microsystems Inc." \
"Copyright (c) 2016-2023 Igalia, S.L." "Copyright (c) 2016-2023 Igalia, S.L."
__license__ = "LGPL" __license__ = "LGPL"
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional, Tuple, Any
import gi import gi
gi.require_version("Atspi", "2.0") gi.require_version("Atspi", "2.0")
@@ -54,7 +54,13 @@ from . import script_manager
from .ax_object import AXObject from .ax_object import AXObject
from .ax_table import AXTable from .ax_table import AXTable
from .ax_text import AXText from .ax_text import AXText
def _get_ax_utilities():
if TYPE_CHECKING:
from .cthulhu import Cthulhu
from .input_event import InputEvent
from .scripts import default
def _get_ax_utilities() -> Any:
# Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager. # Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager.
from .ax_utilities import AXUtilities from .ax_utilities import AXUtilities
return AXUtilities return AXUtilities
@@ -65,10 +71,6 @@ def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, sta
def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None: def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log_tokens(debug.LEVEL_INFO, "FOCUS MANAGER", tokens, reason, timestamp, stack) debug.print_log_tokens(debug.LEVEL_INFO, "FOCUS MANAGER", tokens, reason, timestamp, stack)
if TYPE_CHECKING:
from .input_event import InputEvent
from .scripts import default
CARET_TRACKING = "caret-tracking" CARET_TRACKING = "caret-tracking"
FOCUS_TRACKING = "focus-tracking" FOCUS_TRACKING = "focus-tracking"
FLAT_REVIEW = "flat-review" FLAT_REVIEW = "flat-review"
@@ -80,15 +82,15 @@ SAY_ALL = "say-all"
class FocusManager: class FocusManager:
"""Manages the focused object, window, etc.""" """Manages the focused object, window, etc."""
def __init__(self, app) -> None: # Added app argument def __init__(self, app: Cthulhu) -> None:
self.app = app # Store app instance self.app: Cthulhu = app # Store app instance
self._window: Optional[Atspi.Accessible] = cthulhu_state.activeWindow self._window: Optional[Atspi.Accessible] = cthulhu_state.activeWindow
self._focus: Optional[Atspi.Accessible] = cthulhu_state.locusOfFocus self._focus: Optional[Atspi.Accessible] = cthulhu_state.locusOfFocus
self._object_of_interest: Optional[Atspi.Accessible] = cthulhu_state.objOfInterest self._object_of_interest: Optional[Atspi.Accessible] = cthulhu_state.objOfInterest
self._active_mode: Optional[str] = cthulhu_state.activeMode self._active_mode: Optional[str] = cthulhu_state.activeMode
self._last_cell_coordinates: tuple[int, int] = (-1, -1) self._last_cell_coordinates: Tuple[int, int] = (-1, -1)
self._last_cursor_position: tuple[Optional[Atspi.Accessible], int] = (None, -1) self._last_cursor_position: Tuple[Optional[Atspi.Accessible], int] = (None, -1)
self._penultimate_cursor_position: tuple[Optional[Atspi.Accessible], int] = (None, -1) self._penultimate_cursor_position: Tuple[Optional[Atspi.Accessible], int] = (None, -1)
_log("Registering D-Bus commands.") _log("Registering D-Bus commands.")
controller = dbus_service.get_remote_controller() controller = dbus_service.get_remote_controller()
@@ -188,20 +190,20 @@ class FocusManager:
def get_active_mode_and_object_of_interest( def get_active_mode_and_object_of_interest(
self self
) -> tuple[Optional[str], Optional[Atspi.Accessible]]: ) -> Tuple[Optional[str], Optional[Atspi.Accessible]]:
"""Returns the current mode and associated object of interest""" """Returns the current mode and associated object of interest"""
_log_tokens(["Active mode:", self._active_mode, "Object of interest:", self._object_of_interest]) _log_tokens(["Active mode:", self._active_mode, "Object of interest:", self._object_of_interest])
return self._active_mode, self._object_of_interest return self._active_mode, self._object_of_interest
def get_penultimate_cursor_position(self) -> tuple[Optional[Atspi.Accessible], int]: def get_penultimate_cursor_position(self) -> Tuple[Optional[Atspi.Accessible], int]:
"""Returns the penultimate cursor position as a tuple of (object, offset).""" """Returns the penultimate cursor position as a tuple of (object, offset)."""
obj, offset = self._penultimate_cursor_position obj, offset = self._penultimate_cursor_position
_log_tokens(["Penultimate cursor position:", obj, offset]) _log_tokens(["Penultimate cursor position:", obj, offset])
return obj, offset return obj, offset
def get_last_cursor_position(self) -> tuple[Optional[Atspi.Accessible], int]: def get_last_cursor_position(self) -> Tuple[Optional[Atspi.Accessible], int]:
"""Returns the last cursor position as a tuple of (object, offset).""" """Returns the last cursor position as a tuple of (object, offset)."""
obj, offset = self._last_cursor_position obj, offset = self._last_cursor_position
@@ -215,7 +217,7 @@ class FocusManager:
self._penultimate_cursor_position = self._last_cursor_position self._penultimate_cursor_position = self._last_cursor_position
self._last_cursor_position = obj, offset self._last_cursor_position = obj, offset
def get_last_cell_coordinates(self) -> tuple[int, int]: def get_last_cell_coordinates(self) -> Tuple[int, int]:
"""Returns the last known cell coordinates as a tuple of (row, column).""" """Returns the last known cell coordinates as a tuple of (row, column)."""
row, column = self._last_cell_coordinates row, column = self._last_cell_coordinates
@@ -437,12 +439,16 @@ class FocusManager:
return script.browse_mode_is_sticky() return script.browse_mode_is_sticky()
return False return False
_manager = None _manager: Optional[FocusManager] = None
def get_manager(): def get_manager() -> Optional[FocusManager]:
"""Returns the Focus Manager""" """Returns the Focus Manager"""
global _manager global _manager
if _manager is None: if _manager is None:
from . import cthulhu try:
_manager = FocusManager(cthulhu.cthulhuApp) from . import cthulhu
if cthulhu.cthulhuApp:
_manager = FocusManager(cthulhu.cthulhuApp)
except (ImportError, AttributeError):
pass
return _manager return _manager
+76 -69
View File
@@ -39,7 +39,7 @@ __copyright__ = "Copyright (c) 2024 Igalia, S.L." \
"Copyright (c) 2024 GNOME Foundation Inc." "Copyright (c) 2024 GNOME Foundation Inc."
__license__ = "LGPL" __license__ = "LGPL"
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional, Union, Tuple, List, Dict
import gi import gi
gi.require_version("Atspi", "2.0") gi.require_version("Atspi", "2.0")
@@ -64,9 +64,9 @@ class InputEventManager:
self._last_input_event: Optional[input_event.InputEvent] = None self._last_input_event: Optional[input_event.InputEvent] = None
self._last_non_modifier_key_event: Optional[input_event.KeyboardEvent] = None self._last_non_modifier_key_event: Optional[input_event.KeyboardEvent] = None
self._device: Optional[Atspi.Device] = None self._device: Optional[Atspi.Device] = None
self._mapped_keycodes: list[int] = [] self._mapped_keycodes: List[int] = []
self._mapped_keysyms: list[int] = [] self._mapped_keysyms: List[int] = []
self._grabbed_bindings: dict[int, keybindings.KeyBinding] = {} self._grabbed_bindings: Dict[int, keybindings.KeyBinding] = {}
self._paused: bool = False self._paused: bool = False
def start_key_watcher(self) -> None: def start_key_watcher(self) -> None:
@@ -100,14 +100,14 @@ class InputEventManager:
msg = f"INPUT EVENT MANAGER: {grab_id} for: {binding}" msg = f"INPUT EVENT MANAGER: {grab_id} for: {binding}"
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
def _get_key_definitions(self, binding: keybindings.KeyBinding) -> list[Atspi.KeyDefinition]: def _get_key_definitions(self, binding: keybindings.KeyBinding) -> List[Atspi.KeyDefinition]:
if hasattr(binding, "key_definitions"): if hasattr(binding, "key_definitions"):
return list(binding.key_definitions()) return list(binding.key_definitions())
if hasattr(binding, "keyDefs"): if hasattr(binding, "keyDefs"):
return list(binding.keyDefs()) return list(binding.keyDefs())
return [] return []
def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> list[int]: def add_grabs_for_keybinding(self, binding: keybindings.KeyBinding) -> List[int]:
"""Adds grabs for binding if it is enabled, returns grab IDs.""" """Adds grabs for binding if it is enabled, returns grab IDs."""
if self._device is None: if self._device is None:
@@ -427,7 +427,7 @@ class InputEventManager:
return isinstance(self._last_input_event, input_event.MouseButtonEvent) return isinstance(self._last_input_event, input_event.MouseButtonEvent)
def is_release_for(self, event1, event2): def is_release_for(self, event1: Optional[input_event.InputEvent], event2: Optional[input_event.InputEvent]) -> bool:
"""Returns True if event1 is a release for event2.""" """Returns True if event1 is a release for event2."""
if event1 is None or event2 is None: if event1 is None or event2 is None:
@@ -454,7 +454,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return result return result
def last_event_equals_or_is_release_for_event(self, event): def last_event_equals_or_is_release_for_event(self, event: Optional[input_event.InputEvent]) -> bool:
"""Returns True if the last event equals the provided event, or is the release for it.""" """Returns True if the last event equals the provided event, or is the release for it."""
if self._last_input_event is event: if self._last_input_event is event:
@@ -471,7 +471,7 @@ class InputEventManager:
return self.is_release_for(self._last_non_modifier_key_event, event) return self.is_release_for(self._last_non_modifier_key_event, event)
def _last_key_and_modifiers(self): def _last_key_and_modifiers(self) -> Tuple[str, int]:
"""Returns the last keyval name and modifiers""" """Returns the last keyval name and modifiers"""
if self._last_non_modifier_key_event is None: if self._last_non_modifier_key_event is None:
@@ -480,9 +480,12 @@ class InputEventManager:
if not self.last_event_was_keyboard(): if not self.last_event_was_keyboard():
return "", 0 return "", 0
if self._last_input_event is None:
return self._last_non_modifier_key_event.keyval_name, 0
return self._last_non_modifier_key_event.keyval_name, self._last_input_event.modifiers return self._last_non_modifier_key_event.keyval_name, self._last_input_event.modifiers
def last_event_was_command(self): def last_event_was_command(self) -> bool:
"""Returns True if the last event is believed to be a command.""" """Returns True if the last event is believed to be a command."""
if bool(self._last_key_and_modifiers()[1] & 1 << Atspi.ModifierType.CONTROL): if bool(self._last_key_and_modifiers()[1] & 1 << Atspi.ModifierType.CONTROL):
@@ -492,7 +495,7 @@ class InputEventManager:
return False return False
def last_event_was_shortcut_for(self, obj): def last_event_was_shortcut_for(self, obj: Atspi.Accessible) -> bool:
"""Returns True if the last event is believed to be a shortcut key for obj.""" """Returns True if the last event is believed to be a shortcut key for obj."""
string = self._last_key_and_modifiers()[0] string = self._last_key_and_modifiers()[0]
@@ -511,12 +514,13 @@ class InputEventManager:
debug.print_tokens(debug.LEVEL_INFO, tokens, True) debug.print_tokens(debug.LEVEL_INFO, tokens, True)
return rv return rv
def last_event_was_printable_key(self): def last_event_was_printable_key(self) -> bool:
"""Returns True if the last event is believed to be a printable key.""" """Returns True if the last event is believed to be a printable key."""
if not self.last_event_was_keyboard(): if not self.last_event_was_keyboard():
return False return False
assert isinstance(self._last_input_event, input_event.KeyboardEvent)
if self._last_input_event.is_printable_key(): if self._last_input_event.is_printable_key():
msg = "INPUT EVENT MANAGER: Last event was printable key" msg = "INPUT EVENT MANAGER: Last event was printable key"
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
@@ -524,7 +528,7 @@ class InputEventManager:
return False return False
def last_event_was_caret_navigation(self): def last_event_was_caret_navigation(self) -> bool:
"""Returns True if the last event is believed to be caret navigation.""" """Returns True if the last event is believed to be caret navigation."""
return self.last_event_was_character_navigation() \ return self.last_event_was_character_navigation() \
@@ -534,7 +538,7 @@ class InputEventManager:
or self.last_event_was_file_boundary_navigation() \ or self.last_event_was_file_boundary_navigation() \
or self.last_event_was_page_navigation() or self.last_event_was_page_navigation()
def last_event_was_caret_selection(self): def last_event_was_caret_selection(self) -> bool:
"""Returns True if the last event is believed to be caret selection.""" """Returns True if the last event is believed to be caret selection."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -548,7 +552,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_backward_caret_navigation(self): def last_event_was_backward_caret_navigation(self) -> bool:
"""Returns True if the last event is believed to be backward caret navigation.""" """Returns True if the last event is believed to be backward caret navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -562,7 +566,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_forward_caret_navigation(self): def last_event_was_forward_caret_navigation(self) -> bool:
"""Returns True if the last event is believed to be forward caret navigation.""" """Returns True if the last event is believed to be forward caret navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -576,7 +580,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_forward_caret_selection(self): def last_event_was_forward_caret_selection(self) -> bool:
"""Returns True if the last event is believed to be forward caret selection.""" """Returns True if the last event is believed to be forward caret selection."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -590,7 +594,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_character_navigation(self): def last_event_was_character_navigation(self) -> bool:
"""Returns True if the last event is believed to be character navigation.""" """Returns True if the last event is believed to be character navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -606,7 +610,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_word_navigation(self): def last_event_was_word_navigation(self) -> bool:
"""Returns True if the last event is believed to be word navigation.""" """Returns True if the last event is believed to be word navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -620,7 +624,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_previous_word_navigation(self): def last_event_was_previous_word_navigation(self) -> bool:
"""Returns True if the last event is believed to be previous-word navigation.""" """Returns True if the last event is believed to be previous-word navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -634,7 +638,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_next_word_navigation(self): def last_event_was_next_word_navigation(self) -> bool:
"""Returns True if the last event is believed to be next-word navigation.""" """Returns True if the last event is believed to be next-word navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -648,7 +652,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_line_navigation(self): def last_event_was_line_navigation(self) -> bool:
"""Returns True if the last event is believed to be line navigation.""" """Returns True if the last event is believed to be line navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -668,7 +672,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_paragraph_navigation(self): def last_event_was_paragraph_navigation(self) -> bool:
"""Returns True if the last event is believed to be paragraph navigation.""" """Returns True if the last event is believed to be paragraph navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -682,7 +686,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_line_boundary_navigation(self): def last_event_was_line_boundary_navigation(self) -> bool:
"""Returns True if the last event is believed to be navigation to start/end of line.""" """Returns True if the last event is believed to be navigation to start/end of line."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -696,7 +700,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_file_boundary_navigation(self): def last_event_was_file_boundary_navigation(self) -> bool:
"""Returns True if the last event is believed to be navigation to top/bottom of file.""" """Returns True if the last event is believed to be navigation to top/bottom of file."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -710,7 +714,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_page_navigation(self): def last_event_was_page_navigation(self) -> bool:
"""Returns True if the last event is believed to be page navigation.""" """Returns True if the last event is believed to be page navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -730,7 +734,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_page_switch(self): def last_event_was_page_switch(self) -> bool:
"""Returns True if the last event is believed to be a page switch.""" """Returns True if the last event is believed to be a page switch."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -746,7 +750,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_tab_navigation(self): def last_event_was_tab_navigation(self) -> bool:
"""Returns True if the last event is believed to be Tab navigation.""" """Returns True if the last event is believed to be Tab navigation."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -762,7 +766,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_table_sort(self): def last_event_was_table_sort(self) -> bool:
"""Returns True if the last event is believed to be a table sort.""" """Returns True if the last event is believed to be a table sort."""
focus = focus_manager.get_manager().get_locus_of_focus() focus = focus_manager.get_manager().get_locus_of_focus()
@@ -780,7 +784,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_unmodified_arrow(self): def last_event_was_unmodified_arrow(self) -> bool:
"""Returns True if the last event is an unmodified arrow.""" """Returns True if the last event is an unmodified arrow."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -799,83 +803,83 @@ class InputEventManager:
return True return True
def last_event_was_alt_modified(self): def last_event_was_alt_modified(self) -> bool:
"""Returns True if the last event was alt-modified.""" """Returns True if the last event was alt-modified."""
mods = self._last_key_and_modifiers()[-1] mods = self._last_key_and_modifiers()[-1]
return mods & 1 << Atspi.ModifierType.ALT return bool(mods & 1 << Atspi.ModifierType.ALT)
def last_event_was_backspace(self): def last_event_was_backspace(self) -> bool:
"""Returns True if the last event is BackSpace.""" """Returns True if the last event is BackSpace."""
return self._last_key_and_modifiers()[0] == "BackSpace" return self._last_key_and_modifiers()[0] == "BackSpace"
def last_event_was_down(self): def last_event_was_down(self) -> bool:
"""Returns True if the last event is Down.""" """Returns True if the last event is Down."""
return self._last_key_and_modifiers()[0] == "Down" return self._last_key_and_modifiers()[0] == "Down"
def last_event_was_f1(self): def last_event_was_f1(self) -> bool:
"""Returns True if the last event is F1.""" """Returns True if the last event is F1."""
return self._last_key_and_modifiers()[0] == "F1" return self._last_key_and_modifiers()[0] == "F1"
def last_event_was_left(self): def last_event_was_left(self) -> bool:
"""Returns True if the last event is Left.""" """Returns True if the last event is Left."""
return self._last_key_and_modifiers()[0] == "Left" return self._last_key_and_modifiers()[0] == "Left"
def last_event_was_left_or_right(self): def last_event_was_left_or_right(self) -> bool:
"""Returns True if the last event is Left or Right.""" """Returns True if the last event is Left or Right."""
return self._last_key_and_modifiers()[0] in ["Left", "Right"] return self._last_key_and_modifiers()[0] in ["Left", "Right"]
def last_event_was_page_up_or_page_down(self): def last_event_was_page_up_or_page_down(self) -> bool:
"""Returns True if the last event is Page_Up or Page_Down.""" """Returns True if the last event is Page_Up or Page_Down."""
return self._last_key_and_modifiers()[0] in ["Page_Up", "Page_Down"] return self._last_key_and_modifiers()[0] in ["Page_Up", "Page_Down"]
def last_event_was_right(self): def last_event_was_right(self) -> bool:
"""Returns True if the last event is Right.""" """Returns True if the last event is Right."""
return self._last_key_and_modifiers()[0] == "Right" return self._last_key_and_modifiers()[0] == "Right"
def last_event_was_return(self): def last_event_was_return(self) -> bool:
"""Returns True if the last event is Return.""" """Returns True if the last event is Return."""
return self._last_key_and_modifiers()[0] == "Return" return self._last_key_and_modifiers()[0] == "Return"
def last_event_was_return_or_space(self): def last_event_was_return_or_space(self) -> bool:
"""Returns True if the last event is Return or space.""" """Returns True if the last event is Return or space."""
return self._last_key_and_modifiers()[0] in ["Return", "space", " "] return self._last_key_and_modifiers()[0] in ["Return", "space", " "]
def last_event_was_return_tab_or_space(self): def last_event_was_return_tab_or_space(self) -> bool:
"""Returns True if the last event is Return, Tab, or space.""" """Returns True if the last event is Return, Tab, or space."""
return self._last_key_and_modifiers()[0] in ["Return", "Tab", "space", " "] return self._last_key_and_modifiers()[0] in ["Return", "Tab", "space", " "]
def last_event_was_space(self): def last_event_was_space(self) -> bool:
"""Returns True if the last event is space.""" """Returns True if the last event is space."""
return self._last_key_and_modifiers()[0] in [" ", "space"] return self._last_key_and_modifiers()[0] in [" ", "space"]
def last_event_was_tab(self): def last_event_was_tab(self) -> bool:
"""Returns True if the last event is Tab.""" """Returns True if the last event is Tab."""
return self._last_key_and_modifiers()[0] == "Tab" return self._last_key_and_modifiers()[0] == "Tab"
def last_event_was_up(self): def last_event_was_up(self) -> bool:
"""Returns True if the last event is Up.""" """Returns True if the last event is Up."""
return self._last_key_and_modifiers()[0] == "Up" return self._last_key_and_modifiers()[0] == "Up"
def last_event_was_up_or_down(self): def last_event_was_up_or_down(self) -> bool:
"""Returns True if the last event is Up or Down.""" """Returns True if the last event is Up or Down."""
return self._last_key_and_modifiers()[0] in ["Up", "Down"] return self._last_key_and_modifiers()[0] in ["Up", "Down"]
def last_event_was_delete(self): def last_event_was_delete(self) -> bool:
"""Returns True if the last event is believed to be delete.""" """Returns True if the last event is believed to be delete."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -891,7 +895,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_cut(self): def last_event_was_cut(self) -> bool:
"""Returns True if the last event is believed to be the cut command.""" """Returns True if the last event is believed to be the cut command."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -904,39 +908,39 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_copy(self): def last_event_was_copy(self) -> bool:
"""Returns True if the last event is believed to be the copy command.""" """Returns True if the last event is believed to be the copy command."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
if string.lower() != "c" or not mods & 1 << Atspi.ModifierType.CONTROL: if string.lower() != "c" or not mods & 1 << Atspi.ModifierType.CONTROL:
rv = False rv = False
elif AXUtilities.is_terminal(self._last_input_event.get_object()): elif AXUtilities.is_terminal(self._last_input_event.get_object()):
rv = mods & 1 << Atspi.ModifierType.SHIFT rv = bool(mods & 1 << Atspi.ModifierType.SHIFT)
else: else:
rv = not mods & 1 << Atspi.ModifierType.SHIFT rv = not bool(mods & 1 << Atspi.ModifierType.SHIFT)
if rv: if rv:
msg = "INPUT EVENT MANAGER: Last event was copy" msg = "INPUT EVENT MANAGER: Last event was copy"
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_paste(self): def last_event_was_paste(self) -> bool:
"""Returns True if the last event is believed to be the paste command.""" """Returns True if the last event is believed to be the paste command."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
if string.lower() != "v" or not mods & 1 << Atspi.ModifierType.CONTROL: if string.lower() != "v" or not mods & 1 << Atspi.ModifierType.CONTROL:
rv = False rv = False
elif AXUtilities.is_terminal(self._last_input_event.get_object()): elif AXUtilities.is_terminal(self._last_input_event.get_object()):
rv = mods & 1 << Atspi.ModifierType.SHIFT rv = bool(mods & 1 << Atspi.ModifierType.SHIFT)
else: else:
rv = not mods & 1 << Atspi.ModifierType.SHIFT rv = not bool(mods & 1 << Atspi.ModifierType.SHIFT)
if rv: if rv:
msg = "INPUT EVENT MANAGER: Last event was paste" msg = "INPUT EVENT MANAGER: Last event was paste"
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_undo(self): def last_event_was_undo(self) -> bool:
"""Returns True if the last event is believed to be the undo command.""" """Returns True if the last event is believed to be the undo command."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -948,16 +952,16 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_redo(self): def last_event_was_redo(self) -> bool:
"""Returns True if the last event is believed to be the redo command.""" """Returns True if the last event is believed to be the redo command."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
if string.lower() == "z": if string.lower() == "z":
rv = mods & 1 << Atspi.ModifierType.CONTROL and mods & 1 << Atspi.ModifierType.SHIFT rv = bool(mods & 1 << Atspi.ModifierType.CONTROL and mods & 1 << Atspi.ModifierType.SHIFT)
elif string.lower() == "y": elif string.lower() == "y":
# LibreOffice # LibreOffice
rv = mods & 1 << Atspi.ModifierType.CONTROL \ rv = bool(mods & 1 << Atspi.ModifierType.CONTROL \
and not mods & 1 << Atspi.ModifierType.SHIFT and not mods & 1 << Atspi.ModifierType.SHIFT)
else: else:
rv = False rv = False
@@ -966,7 +970,7 @@ class InputEventManager:
debug.print_message(debug.LEVEL_INFO, msg, True) debug.print_message(debug.LEVEL_INFO, msg, True)
return rv return rv
def last_event_was_select_all(self): def last_event_was_select_all(self) -> bool:
"""Returns True if the last event is believed to be the select all command.""" """Returns True if the last event is believed to be the select all command."""
string, mods = self._last_key_and_modifiers() string, mods = self._last_key_and_modifiers()
@@ -979,7 +983,7 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_primary_click(self): def last_event_was_primary_click(self) -> bool:
"""Returns True if the last event is a primary mouse click.""" """Returns True if the last event is a primary mouse click."""
if not self.last_event_was_mouse_button(): if not self.last_event_was_mouse_button():
@@ -991,7 +995,7 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_primary_release(self): def last_event_was_primary_release(self) -> bool:
"""Returns True if the last event is a primary mouse release.""" """Returns True if the last event is a primary mouse release."""
if not self.last_event_was_mouse_button(): if not self.last_event_was_mouse_button():
@@ -1003,7 +1007,7 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_primary_click_or_release(self): def last_event_was_primary_click_or_release(self) -> bool:
"""Returns True if the last event is a primary mouse click or release.""" """Returns True if the last event is a primary mouse click or release."""
if not self.last_event_was_mouse_button(): if not self.last_event_was_mouse_button():
@@ -1015,7 +1019,7 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_middle_click(self): def last_event_was_middle_click(self) -> bool:
"""Returns True if the last event is a middle mouse click.""" """Returns True if the last event is a middle mouse click."""
if not self.last_event_was_mouse_button(): if not self.last_event_was_mouse_button():
@@ -1027,7 +1031,7 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_middle_release(self): def last_event_was_middle_release(self) -> bool:
"""Returns True if the last event is a middle mouse release.""" """Returns True if the last event is a middle mouse release."""
if not self.last_event_was_mouse_button(): if not self.last_event_was_mouse_button():
@@ -1039,7 +1043,7 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_secondary_click(self): def last_event_was_secondary_click(self) -> bool:
"""Returns True if the last event is a secondary mouse click.""" """Returns True if the last event is a secondary mouse click."""
if not self.last_event_was_mouse_button(): if not self.last_event_was_mouse_button():
@@ -1051,7 +1055,7 @@ class InputEventManager:
return True return True
return False return False
def last_event_was_secondary_release(self): def last_event_was_secondary_release(self) -> bool:
"""Returns True if the last event is a secondary mouse release.""" """Returns True if the last event is a secondary mouse release."""
if not self.last_event_was_mouse_button(): if not self.last_event_was_mouse_button():
@@ -1064,7 +1068,10 @@ class InputEventManager:
return False return False
_manager = InputEventManager() _manager: Optional[InputEventManager] = None
def get_manager(): if _manager is None:
_manager = InputEventManager()
def get_manager() -> InputEventManager:
"""Returns the Input Event Manager singleton.""" """Returns the Input Event Manager singleton."""
return _manager return _manager
+53 -37
View File
@@ -23,6 +23,8 @@
# Forked from Orca screen reader. # Forked from Orca screen reader.
# Cthulhu project: https://git.stormux.org/storm/cthulhu # Cthulhu project: https://git.stormux.org/storm/cthulhu
from __future__ import annotations
__id__ = "$Id$" __id__ = "$Id$"
__version__ = "$Revision$" __version__ = "$Revision$"
__date__ = "$Date$" __date__ = "$Date$"
@@ -30,18 +32,24 @@ __copyright__ = "Copyright (c) 2011. Cthulhu Team."
__license__ = "LGPL" __license__ = "LGPL"
import importlib import importlib
from typing import Optional, Dict, Any from typing import TYPE_CHECKING, Optional, Dict, Any, List, Union
from . import debug from . import debug
from . import cthulhu_state from . import cthulhu_state
from .ax_object import AXObject from .ax_object import AXObject
from .scripts import apps, toolkits from .scripts import apps, toolkits
if TYPE_CHECKING:
from gi.repository import Atspi
from .cthulhu import Cthulhu
from .script import Script
from .input_event import InputEvent
# Forward references to avoid circular imports # Forward references to avoid circular imports
# Script is defined in script.py # Script is defined in script.py
# Atspi.Accessible comes from AT-SPI # Atspi.Accessible comes from AT-SPI
def _get_ax_utilities(): def _get_ax_utilities() -> Any:
# Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager -> braille -> settings_manager -> script_manager. # Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager -> braille -> settings_manager -> script_manager.
from .ax_utilities import AXUtilities from .ax_utilities import AXUtilities
return AXUtilities return AXUtilities
@@ -49,22 +57,22 @@ def _get_ax_utilities():
def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None: def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log(debug.LEVEL_INFO, "SCRIPT MANAGER", message, reason, timestamp, stack) debug.print_log(debug.LEVEL_INFO, "SCRIPT MANAGER", message, reason, timestamp, stack)
def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None: def _log_tokens(tokens: list[Any], reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log_tokens(debug.LEVEL_INFO, "SCRIPT MANAGER", tokens, reason, timestamp, stack) debug.print_log_tokens(debug.LEVEL_INFO, "SCRIPT MANAGER", tokens, reason, timestamp, stack)
class ScriptManager: class ScriptManager:
def __init__(self, app: Any) -> None: # app is the CthulhuApp instance def __init__(self, app: Cthulhu) -> None:
_log("Initializing") _log("Initializing")
self.app: Any = app # Store app instance self.app: Cthulhu = app # Store app instance
self.appScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script] self.appScripts: Dict[Atspi.Accessible, Script] = {}
self.toolkitScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]] self.toolkitScripts: Dict[Atspi.Accessible, Dict[str, Script]] = {}
self.customScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]] self.customScripts: Dict[Atspi.Accessible, Dict[str, Script]] = {}
self._sleepModeScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script] self._sleepModeScripts: Dict[Atspi.Accessible, Script] = {}
self._appModules: list = apps.__all__ self._appModules: List[str] = apps.__all__
self._toolkitModules: list = toolkits.__all__ self._toolkitModules: List[str] = toolkits.__all__
self._defaultScript: Optional[Any] = None # Optional[Script] self._defaultScript: Optional[Script] = None
self._scriptPackages: list[str] = \ self._scriptPackages: List[str] = \
["cthulhu-scripts", ["cthulhu-scripts",
"cthulhu.scripts", "cthulhu.scripts",
"cthulhu.scripts.apps", "cthulhu.scripts.apps",
@@ -92,7 +100,8 @@ class ScriptManager:
_log("Activating") _log("Activating")
self._defaultScript = self.get_script(None) self._defaultScript = self.get_script(None)
self._defaultScript.registerEventListeners() if self._defaultScript:
self._defaultScript.registerEventListeners()
self.set_active_script(self._defaultScript, "lifecycle: activate") self.set_active_script(self._defaultScript, "lifecycle: activate")
self._active = True self._active = True
_log("Activated") _log("Activated")
@@ -111,7 +120,7 @@ class ScriptManager:
self._active = False self._active = False
_log("Deactivated") _log("Deactivated")
def get_module_name(self, app: Optional[Any]) -> Optional[str]: # app: Optional[Atspi.Accessible] def get_module_name(self, app: Optional[Atspi.Accessible]) -> Optional[str]:
"""Returns the module name of the script to use for application app.""" """Returns the module name of the script to use for application app."""
if app is None: if app is None:
@@ -148,19 +157,20 @@ class ScriptManager:
_log_tokens(["Mapped", app, "to", name]) _log_tokens(["Mapped", app, "to", name])
return name return name
def _toolkit_for_object(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible] def _toolkit_for_object(self, obj: Optional[Atspi.Accessible]) -> Optional[str]:
"""Returns the name of the toolkit associated with obj.""" """Returns the name of the toolkit associated with obj."""
if obj is None:
return None
name = AXObject.get_attribute(obj, 'toolkit') name = AXObject.get_attribute(obj, 'toolkit')
return self._toolkitNames.get(name, name) return self._toolkitNames.get(name, name)
def _script_for_role(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible] def _script_for_role(self, obj: Optional[Atspi.Accessible]) -> str:
if _get_ax_utilities().is_terminal(obj): if _get_ax_utilities().is_terminal(obj):
return 'terminal' return 'terminal'
return '' return ''
def _new_named_script(self, app: Optional[Any], name: Optional[str]) -> Optional[Any]: # Returns Optional[Script] def _new_named_script(self, app: Optional[Atspi.Accessible], name: Optional[str]) -> Optional[Script]:
"""Attempts to locate and load the named module. If successful, returns """Attempts to locate and load the named module. If successful, returns
a script based on this module.""" a script based on this module."""
@@ -189,7 +199,7 @@ class ScriptManager:
return script return script
def _create_script(self, app: Optional[Any], obj: Optional[Any] = None) -> Any: # Returns Script def _create_script(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible] = None) -> Script:
"""For the given application, create a new script instance.""" """For the given application, create a new script instance."""
moduleName = self.get_module_name(app) moduleName = self.get_module_name(app)
@@ -212,7 +222,7 @@ class ScriptManager:
return script return script
def get_default_script(self, app: Optional[Any] = None) -> Any: # Returns Script def get_default_script(self, app: Optional[Atspi.Accessible] = None) -> Script:
if not app and self._defaultScript: if not app and self._defaultScript:
return self._defaultScript return self._defaultScript
@@ -224,7 +234,7 @@ class ScriptManager:
return script return script
def sanity_check_script(self, script: Any) -> Any: # Returns Script def sanity_check_script(self, script: Script) -> Script:
if not self._active: if not self._active:
return script return script
@@ -238,7 +248,8 @@ class ScriptManager:
_log_tokens(["Failed to get a replacement script for", script.app], "replacement-missing") _log_tokens(["Failed to get a replacement script for", script.app], "replacement-missing")
return script return script
def get_script_for_mouse_button_event(self, event: Any) -> Any: # Returns Script def get_script_for_mouse_button_event(self, event: Any) -> Script:
# Note: event type unspecified in original code, likely InputEvent or similar
isActive = _get_ax_utilities().is_active(cthulhu_state.activeWindow) isActive = _get_ax_utilities().is_active(cthulhu_state.activeWindow)
_log_tokens([cthulhu_state.activeWindow, "is active:", isActive]) _log_tokens([cthulhu_state.activeWindow, "is active:", isActive])
@@ -256,10 +267,10 @@ class ScriptManager:
return self.get_script(AXObject.get_application(activeWindow), activeWindow) return self.get_script(AXObject.get_application(activeWindow), activeWindow)
def get_active_script(self) -> Optional[Any]: # Returns Optional[Script] def get_active_script(self) -> Optional[Script]:
return cthulhu_state.activeScript return cthulhu_state.activeScript
def get_script(self, app: Optional[Any], obj: Optional[Any] = None, sanity_check: bool = False) -> Any: # Returns Script def get_script(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible] = None, sanity_check: bool = False) -> Script:
"""Get a script for an app (and make it if necessary). This is used """Get a script for an app (and make it if necessary). This is used
instead of a simple calls to Script's constructor. instead of a simple calls to Script's constructor.
@@ -275,21 +286,24 @@ class ScriptManager:
roleName = self._script_for_role(obj) roleName = self._script_for_role(obj)
if roleName: if roleName:
customScripts = self.customScripts.get(app, {}) customScripts = self.customScripts.get(app, {}) # type: ignore
customScript = customScripts.get(roleName) customScript = customScripts.get(roleName)
if not customScript: if not customScript:
customScript = self._new_named_script(app, roleName) customScript = self._new_named_script(app, roleName)
customScripts[roleName] = customScript if customScript:
self.customScripts[app] = customScripts customScripts[roleName] = customScript
if app:
self.customScripts[app] = customScripts
objToolkit = self._toolkit_for_object(obj) objToolkit = self._toolkit_for_object(obj)
if objToolkit: if objToolkit:
toolkitScripts = self.toolkitScripts.get(app, {}) toolkitScripts = self.toolkitScripts.get(app, {}) # type: ignore
toolkitScript = toolkitScripts.get(objToolkit) toolkitScript = toolkitScripts.get(objToolkit)
if not toolkitScript: if not toolkitScript:
toolkitScript = self._create_script(app, obj) toolkitScript = self._create_script(app, obj)
toolkitScripts[objToolkit] = toolkitScript toolkitScripts[objToolkit] = toolkitScript
self.toolkitScripts[app] = toolkitScripts if app:
self.toolkitScripts[app] = toolkitScripts
try: try:
if not app: if not app:
@@ -317,7 +331,7 @@ class ScriptManager:
return appScript return appScript
def get_or_create_sleep_mode_script(self, app: Any) -> Any: # Returns Script def get_or_create_sleep_mode_script(self, app: Atspi.Accessible) -> Script:
"""Gets or creates the sleep mode script.""" """Gets or creates the sleep mode script."""
script = self._sleepModeScripts.get(app) script = self._sleepModeScripts.get(app)
if script is not None: if script is not None:
@@ -329,7 +343,7 @@ class ScriptManager:
self._sleepModeScripts[app] = script self._sleepModeScripts[app] = script
return script return script
def set_active_script(self, newScript: Optional[Any], reason: Optional[str] = None) -> None: # newScript: Optional[Script] def set_active_script(self, newScript: Optional[Script], reason: Optional[str] = None) -> None:
"""Set the new active script. """Set the new active script.
Arguments: Arguments:
@@ -351,12 +365,13 @@ class ScriptManager:
# Emit signal that active script has changed, so PluginSystemManager can update keybindings # Emit signal that active script has changed, so PluginSystemManager can update keybindings
from . import cthulhu from . import cthulhu
cthulhu.cthulhuApp.getSignalManager().emitSignal('active-script-changed', newScript) if cthulhu.cthulhuApp:
cthulhu.cthulhuApp.getSignalManager().emitSignal('active-script-changed', newScript)
_log_tokens(["Setting active script to", newScript], reason) _log_tokens(["Setting active script to", newScript], reason)
self._log_active_state(reason) self._log_active_state(reason)
def activate_script_for_context(self, app: Optional[Any], obj: Optional[Any], reason: Optional[str] = None) -> Any: # Returns Script def activate_script_for_context(self, app: Optional[Atspi.Accessible], obj: Optional[Atspi.Accessible], reason: Optional[str] = None) -> Script:
script = self.get_script(app, obj) script = self.get_script(app, obj)
self.set_active_script(script, reason) self.set_active_script(script, reason)
return script return script
@@ -369,7 +384,7 @@ class ScriptManager:
reason reason
) )
def _get_script_for_app_replicant(self, app: Any) -> Optional[Any]: # Returns Optional[Script] def _get_script_for_app_replicant(self, app: Atspi.Accessible) -> Optional[Script]:
if not self._active: if not self._active:
return None return None
@@ -439,11 +454,12 @@ class ScriptManager:
_manager: Optional[ScriptManager] = None _manager: Optional[ScriptManager] = None
def get_manager() -> ScriptManager: def get_manager() -> Optional[ScriptManager]:
"""Returns the Script Manager""" """Returns the Script Manager"""
global _manager global _manager
if _manager is None: if _manager is None:
from . import cthulhu from . import cthulhu
_manager = ScriptManager(cthulhu.cthulhuApp) if cthulhu.cthulhuApp:
_manager = ScriptManager(cthulhu.cthulhuApp)
return _manager return _manager
+170 -113
View File
@@ -26,6 +26,8 @@
"""Settings manager module. This will load/save user settings from a """Settings manager module. This will load/save user settings from a
defined settings backend.""" defined settings backend."""
from __future__ import annotations
__id__ = "$Id$" __id__ = "$Id$"
__version__ = "$Revision$" __version__ = "$Revision$"
__date__ = "$Date$" __date__ = "$Date$"
@@ -36,6 +38,8 @@ import copy
import importlib import importlib
import json import json
import os import os
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from gi.repository import Gio, GLib from gi.repository import Gio, GLib
from . import debug from . import debug
@@ -46,6 +50,11 @@ from .acss import ACSS
from .ax_object import AXObject from .ax_object import AXObject
from .keybindings import KeyBinding from .keybindings import KeyBinding
if TYPE_CHECKING:
from .cthulhu import Cthulhu
from .script import Script
from .input_event import InputEventHandler
# Removed global cthulhuApp.scriptManager declaration. # Removed global cthulhuApp.scriptManager declaration.
# Note: Do not import cthulhu module here to avoid circular import # Note: Do not import cthulhu module here to avoid circular import
@@ -53,14 +62,14 @@ class SettingsManager(object):
"""Settings backend manager. This class manages cthulhu user's settings """Settings backend manager. This class manages cthulhu user's settings
using different backends""" using different backends"""
def __init__(self, app, backend='json'): # Modified signature def __init__(self, app: Cthulhu, backend: str = 'json') -> None: # Modified signature
debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initializing', True) debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initializing', True)
self.app = app # Store app instance self.app: Cthulhu = app # Store app instance
# Move _proxy initialization here # Move _proxy initialization here
try: try:
self._proxy = Gio.DBusProxy.new_for_bus_sync( self._proxy: Optional[Gio.DBusProxy] = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SESSION, Gio.BusType.SESSION,
Gio.DBusProxyFlags.NONE, Gio.DBusProxyFlags.NONE,
None, None,
@@ -71,51 +80,51 @@ class SettingsManager(object):
except Exception: except Exception:
self._proxy = None self._proxy = None
self.backendModule = None self.backendModule: Optional[Any] = None
self._backend = None self._backend: Optional[Any] = None
self.profile = None self.profile: Optional[str] = None
self.backendName = backend self.backendName: str = backend
self._prefsDir = None self._prefsDir: Optional[str] = None
# Dictionaries for store the default values # Dictionaries for store the default values
# The keys and values are defined at cthulhu.settings # The keys and values are defined at cthulhu.settings
# #
self.defaultGeneral = {} self.defaultGeneral: Dict[str, Any] = {}
self.defaultPronunciations = {} self.defaultPronunciations: Dict[str, Any] = {}
self.defaultKeybindings = {} self.defaultKeybindings: Dict[str, Any] = {}
# Dictionaries that store the key:value pairs which values are # Dictionaries that store the key:value pairs which values are
# different from the current profile and the default ones # different from the current profile and the default ones
# #
self.profileGeneral = {} self.profileGeneral: Dict[str, Any] = {}
self.profilePronunciations = {} self.profilePronunciations: Dict[str, Any] = {}
self.profileKeybindings = {} self.profileKeybindings: Dict[str, Any] = {}
# Dictionaries that store the current settings. # Dictionaries that store the current settings.
# They are result to overwrite the default values with # They are result to overwrite the default values with
# the ones from the current active profile # the ones from the current active profile
self.general = {} self.general: Dict[str, Any] = {}
self.pronunciations = {} self.pronunciations: Dict[str, Any] = {}
self.keybindings = {} self.keybindings: Dict[str, Any] = {}
self._activeApp = "" self._activeApp: str = ""
self._appGeneral = {} self._appGeneral: Dict[str, Any] = {}
self._appPronunciations = {} self._appPronunciations: Dict[str, Any] = {}
self._appKeybindings = {} self._appKeybindings: Dict[str, Any] = {}
self._lastRoleSoundPresentation = None self._lastRoleSoundPresentation: Optional[Any] = None
if not self._loadBackend(): if not self._loadBackend():
raise Exception('SettingsManager._loadBackend failed.') raise Exception('SettingsManager._loadBackend failed.')
self.customizedSettings = {} self.customizedSettings: Dict[str, Any] = {}
self._customizationCompleted = False self._customizationCompleted: bool = False
# For handling the currently-"classic" application settings # For handling the currently-"classic" application settings
self.settingsPackages = ["app-settings"] self.settingsPackages: List[str] = ["app-settings"]
debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initialized', True) debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Initialized', True)
def activate(self, prefsDir=None, customSettings={}): def activate(self, prefsDir: Optional[str] = None, customSettings: Dict[str, Any] = {}) -> None:
debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Activating', True) debug.printMessage(debug.LEVEL_INFO, 'SETTINGS MANAGER: Activating', True)
self.customizedSettings.update(customSettings) self.customizedSettings.update(customSettings)
@@ -123,12 +132,13 @@ class SettingsManager(object):
or os.path.join(GLib.get_user_data_dir(), "cthulhu") or os.path.join(GLib.get_user_data_dir(), "cthulhu")
# Load the backend and the default values # Load the backend and the default values
self._backend = self.backendModule.Backend(self._prefsDir) if self.backendModule:
self._backend = self.backendModule.Backend(self._prefsDir)
self._setDefaultGeneral() self._setDefaultGeneral()
self._setDefaultPronunciations() self._setDefaultPronunciations()
self._setDefaultKeybindings() self._setDefaultKeybindings()
self.general = self.defaultGeneral.copy() self.general = self.defaultGeneral.copy()
if not self.isFirstStart(): if not self.isFirstStart() and self._backend:
self.general.update(self._backend.getGeneral()) self.general.update(self._backend.getGeneral())
self.pronunciations = self.defaultPronunciations.copy() self.pronunciations = self.defaultPronunciations.copy()
self.keybindings = self.defaultKeybindings.copy() self.keybindings = self.defaultKeybindings.copy()
@@ -146,13 +156,14 @@ class SettingsManager(object):
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
if self.profile is None: if self.profile is None:
self.profile = self.general.get('startingProfile')[1] self.profile = self.general.get('startingProfile')[1] # type: ignore
tokens = ["SETTINGS MANAGER: Current profile is now", self.profile] tokens = ["SETTINGS MANAGER: Current profile is now", self.profile]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
self.setProfile(self.profile) if self.profile:
self.setProfile(self.profile)
def _loadBackend(self): def _loadBackend(self) -> bool:
"""Load specific backend for manage user settings""" """Load specific backend for manage user settings"""
try: try:
@@ -162,14 +173,17 @@ class SettingsManager(object):
except Exception: except Exception:
return False return False
def _createDefaults(self): def _createDefaults(self) -> None:
"""Let the active backend to create the initial structure """Let the active backend to create the initial structure
for storing the settings and save the default ones from for storing the settings and save the default ones from
cthulhu.settings""" cthulhu.settings"""
def _createDir(dirName): def _createDir(dirName: str) -> None:
if not os.path.isdir(dirName): if not os.path.isdir(dirName):
os.makedirs(dirName) os.makedirs(dirName)
if not self._prefsDir:
return
# Set up the user's preferences directory # Set up the user's preferences directory
# ($XDG_DATA_HOME/cthulhu by default). # ($XDG_DATA_HOME/cthulhu by default).
# #
@@ -200,20 +214,20 @@ class SettingsManager(object):
if not os.path.exists(userCustomFile): if not os.path.exists(userCustomFile):
os.close(os.open(userCustomFile, os.O_CREAT, 0o700)) os.close(os.open(userCustomFile, os.O_CREAT, 0o700))
if self.isFirstStart(): if self.isFirstStart() and self._backend:
self._backend.saveDefaultSettings(self.defaultGeneral, self._backend.saveDefaultSettings(self.defaultGeneral,
self.defaultPronunciations, self.defaultPronunciations,
self.defaultKeybindings) self.defaultKeybindings)
def _setDefaultPronunciations(self): def _setDefaultPronunciations(self) -> None:
"""Get the pronunciations by default from cthulhu.settings""" """Get the pronunciations by default from cthulhu.settings"""
self.defaultPronunciations = {} self.defaultPronunciations = {}
def _setDefaultKeybindings(self): def _setDefaultKeybindings(self) -> None:
"""Get the keybindings by default from cthulhu.settings""" """Get the keybindings by default from cthulhu.settings"""
self.defaultKeybindings = {} self.defaultKeybindings = {}
def _setDefaultGeneral(self): def _setDefaultGeneral(self) -> None:
"""Get the general settings by default from cthulhu.settings""" """Get the general settings by default from cthulhu.settings"""
self._getCustomizedSettings() self._getCustomizedSettings()
self.defaultGeneral = {} self.defaultGeneral = {}
@@ -241,7 +255,7 @@ class SettingsManager(object):
if default_active_plugins is not None: if default_active_plugins is not None:
self.defaultGeneral["activePlugins"] = default_active_plugins self.defaultGeneral["activePlugins"] = default_active_plugins
def _load_default_general_overrides(self): def _load_default_general_overrides(self) -> Dict[str, Any]:
if not self._prefsDir: if not self._prefsDir:
return {} return {}
@@ -261,7 +275,7 @@ class SettingsManager(object):
if not isinstance(general, dict): if not isinstance(general, dict):
return {} return {}
if hasattr(self._backend, "_migrateSettings"): if self._backend and hasattr(self._backend, "_migrateSettings"):
try: try:
general = self._backend._migrateSettings(dict(general)) general = self._backend._migrateSettings(dict(general))
except Exception as error: except Exception as error:
@@ -283,12 +297,12 @@ class SettingsManager(object):
return general return general
def getDefaultSetting(self, settingName): def getDefaultSetting(self, settingName: str) -> Any:
if settingName in self.defaultGeneral: if settingName in self.defaultGeneral:
return self.defaultGeneral.get(settingName) return self.defaultGeneral.get(settingName)
return getattr(settings, settingName, None) return getattr(settings, settingName, None)
def _getCustomizedSettings(self): def _getCustomizedSettings(self) -> Dict[str, Any]:
if self._customizationCompleted: if self._customizationCompleted:
return self.customizedSettings return self.customizedSettings
@@ -302,20 +316,25 @@ class SettingsManager(object):
customValue = settings.__dict__.get(key) customValue = settings.__dict__.get(key)
if value != customValue: if value != customValue:
self.customizedSettings[key] = customValue self.customizedSettings[key] = customValue
return self.customizedSettings
def _loadUserCustomizations(self): def _loadUserCustomizations(self) -> bool:
"""Attempt to load the user's cthulhu-customizations. Returns a boolean """Attempt to load the user's cthulhu-customizations. Returns a boolean
indicating our success at doing so, where success is measured by the indicating our success at doing so, where success is measured by the
likelihood that the results won't be different if we keep trying.""" likelihood that the results won't be different if we keep trying."""
success = False success = False
if not self._prefsDir:
return False
pathList = [self._prefsDir] pathList = [self._prefsDir]
tokens = ["SETTINGS MANAGER: Attempt to load cthulhu-customizations"] tokens = ["SETTINGS MANAGER: Attempt to load cthulhu-customizations"]
module_path = pathList[0] + "/cthulhu-customizations.py" module_path = pathList[0] + "/cthulhu-customizations.py"
try: try:
spec = importlib.util.spec_from_file_location("cthulhu-customizations", module_path) spec = importlib.util.spec_from_file_location("cthulhu-customizations", module_path)
if spec is not None: if spec is not None and spec.loader:
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) spec.loader.exec_module(module)
tokens.extend(["from", module_path, "succeeded."]) tokens.extend(["from", module_path, "succeeded."])
@@ -327,21 +346,21 @@ class SettingsManager(object):
except Exception as error: except Exception as error:
# Treat this failure as a "success" so that we don't stomp on the existing file. # Treat this failure as a "success" so that we don't stomp on the existing file.
success = True success = True
tokens.extend(["failed due to:", error, ". Not loading customizations."]) tokens.extend(["failed due to:", str(error), ". Not loading customizations."])
debug.printTokens(debug.LEVEL_ALL, tokens, True) debug.printTokens(debug.LEVEL_ALL, tokens, True)
return success return success
def getPrefsDir(self): def getPrefsDir(self) -> Optional[str]:
return self._prefsDir return self._prefsDir
def setSetting(self, settingName, settingValue): def setSetting(self, settingName: str, settingValue: Any) -> None:
self._setSettingsRuntime({settingName:settingValue}) self._setSettingsRuntime({settingName:settingValue})
def getSetting(self, settingName): def getSetting(self, settingName: str) -> Any:
return getattr(settings, settingName, None) return getattr(settings, settingName, None)
def getVoiceLocale(self, voice='default'): def getVoiceLocale(self, voice: str = 'default') -> str:
voices = self.getSetting('voices') voices = self.getSetting('voices')
v = ACSS(voices.get(voice, {})) v = ACSS(voices.get(voice, {}))
lang = v.getLocale() lang = v.getLocale()
@@ -350,7 +369,7 @@ class SettingsManager(object):
lang = f"{lang}_{dialect.upper()}" lang = f"{lang}_{dialect.upper()}"
return lang return lang
def getSpeechServerFactories(self): def getSpeechServerFactories(self) -> List[Any]:
"""Imports all known SpeechServer factory modules.""" """Imports all known SpeechServer factory modules."""
factories = [] factories = []
@@ -366,7 +385,7 @@ class SettingsManager(object):
return factories return factories
def _loadProfileSettings(self, profile=None): def _loadProfileSettings(self, profile: Optional[str] = None) -> None:
"""Get from the active backend all the settings for the current """Get from the active backend all the settings for the current
profile and store them in the object's attributes. profile and store them in the object's attributes.
A profile can be passed as a parameter. This could be useful for A profile can be passed as a parameter. This could be useful for
@@ -377,14 +396,16 @@ class SettingsManager(object):
if profile is None: if profile is None:
profile = self.profile profile = self.profile
self.profileGeneral = self.getGeneralSettings(profile) or {}
self.profilePronunciations = self.getPronunciations(profile) or {} if profile:
self.profileKeybindings = self.getKeybindings(profile) or {} self.profileGeneral = self.getGeneralSettings(profile) or {}
self.profilePronunciations = self.getPronunciations(profile) or {}
self.profileKeybindings = self.getKeybindings(profile) or {}
tokens = ["SETTINGS MANAGER: Settings for", profile, "profile loaded"] tokens = ["SETTINGS MANAGER: Settings for", profile, "profile loaded"]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
def _mergeSettings(self): def _mergeSettings(self) -> None:
"""Update the changed values on the profile settings """Update the changed values on the profile settings
over the current and active settings""" over the current and active settings"""
@@ -402,7 +423,7 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: Settings merged.' msg = 'SETTINGS MANAGER: Settings merged.'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
def _enableAccessibility(self): def _enableAccessibility(self) -> bool:
"""Enables the GNOME accessibility flag. Users need to log out and """Enables the GNOME accessibility flag. Users need to log out and
then back in for this to take effect. then back in for this to take effect.
@@ -416,22 +437,27 @@ class SettingsManager(object):
return not alreadyEnabled return not alreadyEnabled
def isAccessibilityEnabled(self): def isAccessibilityEnabled(self) -> bool:
msg = 'SETTINGS MANAGER: Checking if accessibility is enabled.' msg = 'SETTINGS MANAGER: Checking if accessibility is enabled.'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
msg = 'SETTINGS MANAGER: Accessibility enabled: ' msg = 'SETTINGS MANAGER: Accessibility enabled: '
rv = False
if not self._proxy: if not self._proxy:
rv = False rv = False
msg += 'Error (no proxy)' msg += 'Error (no proxy)'
else: else:
rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'IsEnabled') try:
msg += str(rv) rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'IsEnabled')
msg += str(rv)
except Exception:
rv = False
msg += 'Error calling DBus'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
return rv return rv
def setAccessibility(self, enable): def setAccessibility(self, enable: bool) -> Union[bool, None]:
msg = f'SETTINGS MANAGER: Attempting to set accessibility to {enable}.' msg = f'SETTINGS MANAGER: Attempting to set accessibility to {enable}.'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -440,38 +466,47 @@ class SettingsManager(object):
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
return False return False
vEnable = GLib.Variant('b', enable) try:
self._proxy.Set('(ssv)', 'org.a11y.Status', 'IsEnabled', vEnable) vEnable = GLib.Variant('b', enable)
self._proxy.Set('(ssv)', 'org.a11y.Status', 'IsEnabled', vEnable)
except Exception:
pass
msg = f'SETTINGS MANAGER: Finished setting accessibility to {enable}.' msg = f'SETTINGS MANAGER: Finished setting accessibility to {enable}.'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
return None
def isScreenReaderServiceEnabled(self): def isScreenReaderServiceEnabled(self) -> bool:
"""Returns True if the screen reader service is enabled. Note that """Returns True if the screen reader service is enabled. Note that
this does not necessarily mean that Cthulhu (or any other screen reader) this does not necessarily mean that Cthulhu (or any other screen reader)
is running at the moment.""" is running at the moment."""
msg = 'SETTINGS MANAGER: Is screen reader service enabled? ' msg = 'SETTINGS MANAGER: Is screen reader service enabled? '
rv = False
if not self._proxy: if not self._proxy:
rv = False rv = False
msg += 'Error (no proxy)' msg += 'Error (no proxy)'
else: else:
rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'ScreenReaderEnabled') try:
msg += str(rv) rv = self._proxy.Get('(ss)', 'org.a11y.Status', 'ScreenReaderEnabled')
msg += str(rv)
except Exception:
rv = False
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
return rv return rv
def setStartingProfile(self, profile=None): def setStartingProfile(self, profile: Optional[tuple] = None) -> None:
if profile is None: if profile is None:
profile = settings.profile profile = settings.profile
self._backend._setProfileKey('startingProfile', profile) if self._backend:
self._backend._setProfileKey('startingProfile', profile)
def getProfile(self): def getProfile(self) -> Optional[str]:
return self.profile return self.profile
def setProfile(self, profile='default', updateLocale=False): def setProfile(self, profile: str = 'default', updateLocale: bool = False) -> None:
"""Set a specific profile as the active one. """Set a specific profile as the active one.
Also the settings from that profile will be loading Also the settings from that profile will be loading
and updated the current settings with them.""" and updated the current settings with them."""
@@ -497,10 +532,11 @@ class SettingsManager(object):
tokens = ["SETTINGS MANAGER: Profile set to:", profile] tokens = ["SETTINGS MANAGER: Profile set to:", profile]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
def removeProfile(self, profile): def removeProfile(self, profile: str) -> None:
self._backend.removeProfile(profile) if self._backend:
self._backend.removeProfile(profile)
def _setSettingsRuntime(self, settingsDict): def _setSettingsRuntime(self, settingsDict: Dict[str, Any]) -> None:
msg = 'SETTINGS MANAGER: Setting runtime settings.' msg = 'SETTINGS MANAGER: Setting runtime settings.'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -514,7 +550,7 @@ class SettingsManager(object):
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
self._logRoleSoundPresentationChange() self._logRoleSoundPresentationChange()
def _logRoleSoundPresentationChange(self): def _logRoleSoundPresentationChange(self) -> None:
current = getattr(settings, "roleSoundPresentation", None) current = getattr(settings, "roleSoundPresentation", None)
if current == self._lastRoleSoundPresentation: if current == self._lastRoleSoundPresentation:
return return
@@ -526,31 +562,37 @@ class SettingsManager(object):
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
self._lastRoleSoundPresentation = current self._lastRoleSoundPresentation = current
def _setPronunciationsRuntime(self, pronunciationsDict): def _setPronunciationsRuntime(self, pronunciationsDict: Dict[str, Any]) -> None:
pronunciation_dict.pronunciation_dict = {} pronunciation_dict.pronunciation_dict = {}
for key, value in pronunciationsDict.values(): for key, value in pronunciationsDict.values():
if key and value: if key and value:
pronunciation_dict.setPronunciation(key, value) pronunciation_dict.setPronunciation(key, value)
def getGeneralSettings(self, profile='default'): def getGeneralSettings(self, profile: str = 'default') -> Dict[str, Any]:
"""Return the current general settings. """Return the current general settings.
Those settings comes from updating the default settings Those settings comes from updating the default settings
with the profiles' ones""" with the profiles' ones"""
return self._backend.getGeneral(profile) if self._backend:
return self._backend.getGeneral(profile)
return {}
def getPronunciations(self, profile='default'): def getPronunciations(self, profile: str = 'default') -> Dict[str, Any]:
"""Return the current pronunciations settings. """Return the current pronunciations settings.
Those settings comes from updating the default settings Those settings comes from updating the default settings
with the profiles' ones""" with the profiles' ones"""
return self._backend.getPronunciations(profile) if self._backend:
return self._backend.getPronunciations(profile)
return {}
def getKeybindings(self, profile='default'): def getKeybindings(self, profile: str = 'default') -> Dict[str, Any]:
"""Return the current keybindings settings. """Return the current keybindings settings.
Those settings comes from updating the default settings Those settings comes from updating the default settings
with the profiles' ones""" with the profiles' ones"""
return self._backend.getKeybindings(profile) if self._backend:
return self._backend.getKeybindings(profile)
return {}
def _setProfileGeneral(self, general): def _setProfileGeneral(self, general: Dict[str, Any]) -> None:
"""Set the changed general settings from the defaults' ones """Set the changed general settings from the defaults' ones
as the profile's.""" as the profile's."""
@@ -572,7 +614,7 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: General settings for profile set' msg = 'SETTINGS MANAGER: General settings for profile set'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setProfilePronunciations(self, pronunciations): def _setProfilePronunciations(self, pronunciations: Dict[str, Any]) -> None:
"""Set the changed pronunciations settings from the defaults' ones """Set the changed pronunciations settings from the defaults' ones
as the profile's.""" as the profile's."""
@@ -585,7 +627,7 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: Pronunciation settings for profile set.' msg = 'SETTINGS MANAGER: Pronunciation settings for profile set.'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
def _setProfileKeybindings(self, keybindings): def _setProfileKeybindings(self, keybindings: Dict[str, Any]) -> None:
"""Set the changed keybindings settings from the defaults' ones """Set the changed keybindings settings from the defaults' ones
as the profile's.""" as the profile's."""
@@ -598,21 +640,24 @@ class SettingsManager(object):
msg = 'SETTINGS MANAGER: Keybindings settings for profile set.' msg = 'SETTINGS MANAGER: Keybindings settings for profile set.'
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
def _saveAppSettings(self, appName, general, pronunciations, keybindings): def _saveAppSettings(self, appName: str, general: Dict[str, Any], pronunciations: Dict[str, Any], keybindings: Dict[str, Any]) -> None:
if not self._backend:
return
appGeneral = {} appGeneral = {}
profileGeneral = self.getGeneralSettings(self.profile) profileGeneral = self.getGeneralSettings(self.profile) if self.profile else {}
for key, value in general.items(): for key, value in general.items():
if value != profileGeneral.get(key): if value != profileGeneral.get(key):
appGeneral[key] = value appGeneral[key] = value
appPronunciations = {} appPronunciations = {}
profilePronunciations = self.getPronunciations(self.profile) profilePronunciations = self.getPronunciations(self.profile) if self.profile else {}
for key, value in pronunciations.items(): for key, value in pronunciations.items():
if value != profilePronunciations.get(key): if value != profilePronunciations.get(key):
appPronunciations[key] = value appPronunciations[key] = value
appKeybindings = {} appKeybindings = {}
profileKeybindings = self.getKeybindings(self.profile) profileKeybindings = self.getKeybindings(self.profile) if self.profile else {}
for key, value in keybindings.items(): for key, value in keybindings.items():
if value != profileKeybindings.get(key): if value != profileKeybindings.get(key):
appKeybindings[key] = value appKeybindings[key] = value
@@ -623,15 +668,17 @@ class SettingsManager(object):
appPronunciations, appPronunciations,
appKeybindings) appKeybindings)
def saveSettings(self, script, general, pronunciations, keybindings): def saveSettings(self, script: Script, general: Dict[str, Any], pronunciations: Dict[str, Any], keybindings: Dict[str, Any]) -> Optional[bool]:
"""Save the settings provided for the script provided.""" """Save the settings provided for the script provided."""
tokens = ["SETTINGS MANAGER: Saving settings for", script, "(app:", script.app, ")"] tokens = ["SETTINGS MANAGER: Saving settings for", script, "(app:", script.app, ")"]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
app = script.app app = script.app
if app: if app:
self._saveAppSettings(AXObject.get_name(app), general, pronunciations, keybindings) appName = AXObject.get_name(app)
return if appName:
self._saveAppSettings(appName, general, pronunciations, keybindings)
return None
# Assign current profile # Assign current profile
_profile = general.get('profile', settings.profile) _profile = general.get('profile', settings.profile)
@@ -650,16 +697,17 @@ class SettingsManager(object):
tokens = ["SETTINGS MANAGER: Saving for backend", self._backend] tokens = ["SETTINGS MANAGER: Saving for backend", self._backend]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
self._backend.saveProfileSettings(self.profile, if self._backend and self.profile:
self.profileGeneral, self._backend.saveProfileSettings(self.profile,
self.profilePronunciations, self.profileGeneral,
self.profileKeybindings) self.profilePronunciations,
self.profileKeybindings)
tokens = ["SETTINGS MANAGER: Settings for", script, "(app:", script.app, ") saved"] tokens = ["SETTINGS MANAGER: Settings for", script, "(app:", script.app, ") saved"]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
return self._enableAccessibility() return self._enableAccessibility()
def _adjustBindingTupleValues(self, bindingTuple): def _adjustBindingTupleValues(self, bindingTuple: tuple) -> tuple:
"""Converts the values of bindingTuple into KeyBinding-ready values.""" """Converts the values of bindingTuple into KeyBinding-ready values."""
keysym, mask, mods, clicks = bindingTuple keysym, mask, mods, clicks = bindingTuple
@@ -670,10 +718,10 @@ class SettingsManager(object):
return bindingTuple return bindingTuple
def overrideKeyBindings(self, script, scriptKeyBindings): def overrideKeyBindings(self, script: Script, scriptKeyBindings: Any) -> Any:
keybindingsSettings = self.profileKeybindings keybindingsSettings = self.profileKeybindings
for handlerString, bindingTuples in keybindingsSettings.items(): for handlerString, bindingTuples in keybindingsSettings.items():
handler = script.inputEventHandlers.get(handlerString) handler: Optional[InputEventHandler] = script.inputEventHandlers.get(handlerString)
if not handler: if not handler:
continue continue
@@ -686,23 +734,27 @@ class SettingsManager(object):
return scriptKeyBindings return scriptKeyBindings
def isFirstStart(self): def isFirstStart(self) -> bool:
"""Check if the firstStart key is True or false""" """Check if the firstStart key is True or false"""
return self._backend.isFirstStart() if self._backend:
return self._backend.isFirstStart()
return False
def setFirstStart(self, value=False): def setFirstStart(self, value: bool = False) -> None:
"""Set firstStart. This user-configurable setting is primarily """Set firstStart. This user-configurable setting is primarily
intended to serve as an indication as to whether or not initial intended to serve as an indication as to whether or not initial
configuration is needed.""" configuration is needed."""
self._backend.setFirstStart(value) if self._backend:
self._backend.setFirstStart(value)
def availableProfiles(self): def availableProfiles(self) -> List[str]:
"""Get available profiles from active backend""" """Get available profiles from active backend"""
if self._backend:
return self._backend.availableProfiles()
return []
return self._backend.availableProfiles() def getAppSetting(self, app: Any, settingName: str, fallbackOnDefault: bool = True) -> Any:
if not app or not self._backend:
def getAppSetting(self, app, settingName, fallbackOnDefault=True):
if not app:
return None return None
appPrefs = self._backend.getAppSettings(AXObject.get_name(app)) appPrefs = self._backend.getAppSettings(AXObject.get_name(app))
@@ -710,13 +762,13 @@ class SettingsManager(object):
profilePrefs = profiles.get(self.profile, {}) profilePrefs = profiles.get(self.profile, {})
general = profilePrefs.get('general', {}) general = profilePrefs.get('general', {})
appSetting = general.get(settingName) appSetting = general.get(settingName)
if appSetting is None and fallbackOnDefault: if appSetting is None and fallbackOnDefault and self.profile:
general = self._backend.getGeneral(self.profile) general = self._backend.getGeneral(self.profile)
appSetting = general.get(settingName) appSetting = general.get(settingName)
return appSetting return appSetting
def loadAppSettings(self, script): def loadAppSettings(self, script: Script) -> None:
"""Load the users application specific settings for an app. """Load the users application specific settings for an app.
Arguments: Arguments:
@@ -726,8 +778,12 @@ class SettingsManager(object):
if not (script and script.app): if not (script and script.app):
return return
for key in self._appPronunciations.keys(): if not self._backend:
self.pronunciations.pop(key) return
for key in list(self._appPronunciations.keys()):
if key in self.pronunciations:
self.pronunciations.pop(key)
prefs = self._backend.getAppSettings(AXObject.get_name(script.app)) prefs = self._backend.getAppSettings(AXObject.get_name(script.app))
profiles = prefs.get('profiles', {}) profiles = prefs.get('profiles', {})
@@ -736,7 +792,7 @@ class SettingsManager(object):
self._appGeneral = profilePrefs.get('general', {}) self._appGeneral = profilePrefs.get('general', {})
self._appKeybindings = profilePrefs.get('keybindings', {}) self._appKeybindings = profilePrefs.get('keybindings', {})
self._appPronunciations = profilePrefs.get('pronunciations', {}) self._appPronunciations = profilePrefs.get('pronunciations', {})
self._activeApp = AXObject.get_name(script.app) self._activeApp = AXObject.get_name(script.app) or ""
self._loadProfileSettings() self._loadProfileSettings()
self._mergeSettings() self._mergeSettings()
@@ -744,9 +800,9 @@ class SettingsManager(object):
self._setPronunciationsRuntime(self.pronunciations) self._setPronunciationsRuntime(self.pronunciations)
script.keyBindings = self.overrideKeyBindings(script, script.getKeyBindings()) script.keyBindings = self.overrideKeyBindings(script, script.getKeyBindings())
_managerInstance = None _managerInstance: Optional[SettingsManager] = None
def getManager(): def getManager() -> Optional[SettingsManager]:
"""Get the settings manager instance. Compatibility function. """Get the settings manager instance. Compatibility function.
This function provides backward compatibility for existing code that uses This function provides backward compatibility for existing code that uses
@@ -759,7 +815,8 @@ def getManager():
if _managerInstance is None: if _managerInstance is None:
try: try:
from . import cthulhu from . import cthulhu
_managerInstance = cthulhu.cthulhuApp.settingsManager if cthulhu.cthulhuApp:
_managerInstance = cthulhu.cthulhuApp.settingsManager
except (ImportError, AttributeError): except (ImportError, AttributeError):
# During import phase, cthulhuApp may not exist yet # During import phase, cthulhuApp may not exist yet
pass pass
+91 -61
View File
@@ -26,6 +26,8 @@
"""Manages the default speech server for cthulhu. A script can use this """Manages the default speech server for cthulhu. A script can use this
as its speech server, or it can feel free to create one of its own.""" as its speech server, or it can feel free to create one of its own."""
from __future__ import annotations
__id__ = "$Id$" __id__ = "$Id$"
__version__ = "$Revision$" __version__ = "$Revision$"
__date__ = "$Date$" __date__ = "$Date$"
@@ -34,6 +36,7 @@ __license__ = "LGPL"
import importlib import importlib
import time import time
from typing import TYPE_CHECKING, Optional, List, Dict, Any, Union
from . import debug from . import debug
from . import logger from . import logger
@@ -46,26 +49,31 @@ from .speechserver import VoiceFamily
from .acss import ACSS from .acss import ACSS
from . import speech_history from . import speech_history
# Lazy initialization to avoid circular imports if TYPE_CHECKING:
_logger = None from .speechserver import SpeechServer
log = None from .logger import Logger
def _ensureLogger(): # Lazy initialization to avoid circular imports
_logger: Optional[Logger] = None
log: Optional[Any] = None # Logger.newLog returns a log object, keeping Any for now as logger isn't typed
def _ensureLogger() -> None:
"""Ensure logger is initialized.""" """Ensure logger is initialized."""
global _logger, log global _logger, log
if _logger is None: if _logger is None:
from . import cthulhu from . import cthulhu
_logger = cthulhu.cthulhuApp.logger if cthulhu.cthulhuApp:
log = _logger.newLog("speech") _logger = cthulhu.cthulhuApp.logger
log = _logger.newLog("speech")
# The speech server to use for all speech operations. # The speech server to use for all speech operations.
# #
_speechserver = None _speechserver: Optional[SpeechServer] = None
# The last time something was spoken. # The last time something was spoken.
_timestamp = 0 _timestamp: float = 0.0
def _initSpeechServer(moduleName, speechServerInfo): def _initSpeechServer(moduleName: Optional[str], speechServerInfo: Optional[Any]) -> None:
global _speechserver global _speechserver
@@ -84,19 +92,20 @@ def _initSpeechServer(moduleName, speechServerInfo):
# Now, get the speech server we care about. # Now, get the speech server we care about.
# #
speechServerInfo = settings.speechServerInfo speechServerInfo = settings.speechServerInfo
if speechServerInfo: if factory:
_speechserver = factory.SpeechServer.getSpeechServer(speechServerInfo)
if not _speechserver:
_speechserver = factory.SpeechServer.getSpeechServer()
if speechServerInfo: if speechServerInfo:
tokens = ["SPEECH: Invalid speechServerInfo:", speechServerInfo] _speechserver = factory.SpeechServer.getSpeechServer(speechServerInfo) # type: ignore
debug.printTokens(debug.LEVEL_INFO, tokens, True)
if not _speechserver:
_speechserver = factory.SpeechServer.getSpeechServer() # type: ignore
if speechServerInfo:
tokens = ["SPEECH: Invalid speechServerInfo:", speechServerInfo]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
if not _speechserver: if not _speechserver:
raise Exception(f"ERROR: No speech server for factory: {moduleName}") raise Exception(f"ERROR: No speech server for factory: {moduleName}")
def init(): def init() -> None:
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initializing', True) debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initializing', True)
if _speechserver: if _speechserver:
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Already initialized', True) debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Already initialized', True)
@@ -134,16 +143,35 @@ def init():
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initialized', True) debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Initialized', True)
def checkSpeechSetting(): def checkSpeechSetting() -> None:
msg = "SPEECH: Checking speech setting." msg = "SPEECH: Checking speech setting."
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
if not settings.enableSpeech: if not settings.enableSpeech:
if _speechserver:
shutdown()
return
if not _speechserver:
init()
def getSpeechServer() -> Optional[SpeechServer]:
"""Returns the speech server instance."""
return _speechserver
def setSpeechServer(speechServer: SpeechServer) -> None:
"""Sets the speech server to be used.
Arguments:
- speechServer: the speech server to use.
"""
global _speechserver
_speechserver = speechServer
shutdown() shutdown()
else: else:
init() init()
def __resolveACSS(acss=None): def __resolveACSS(acss: Optional[Any] = None) -> ACSS:
if isinstance(acss, ACSS): if isinstance(acss, ACSS):
family = acss.get(acss.FAMILY) family = acss.get(acss.FAMILY)
try: try:
@@ -160,35 +188,36 @@ def __resolveACSS(acss=None):
voices = settings.voices voices = settings.voices
return ACSS(voices[settings.DEFAULT_VOICE]) return ACSS(voices[settings.DEFAULT_VOICE])
def sayAll(utteranceIterator, progressCallback): def sayAll(utteranceIterator: Any, progressCallback: Any) -> None:
_ensureLogger() _ensureLogger()
if settings.silenceSpeech: if settings.silenceSpeech:
return return
if _speechserver: if _speechserver:
def _speechHistorySayAllWrapper(): def _speechHistorySayAllWrapper() -> Any:
for [context, acss] in utteranceIterator: for [context, acss] in utteranceIterator:
try: try:
utterance = getattr(context, "utterance", None) utterance = getattr(context, "utterance", None)
if isinstance(utterance, str) and utterance.strip(): if isinstance(utterance, str) and utterance.strip():
speech_history.add(utterance, source="sayAll") speech_history.add(utterance, source="sayAll") # type: ignore
except Exception: except Exception:
debug.printException(debug.LEVEL_INFO) debug.printException(debug.LEVEL_INFO)
yield [context, acss] yield [context, acss]
_speechserver.sayAll(_speechHistorySayAllWrapper(), progressCallback) _speechserver.sayAll(_speechHistorySayAllWrapper(), progressCallback) # type: ignore
else: else:
for [context, acss] in utteranceIterator: if log:
logLine = f"SPEECH OUTPUT: '{context.utterance}'" for [context, acss] in utteranceIterator:
debug.printMessage(debug.LEVEL_INFO, logLine, True) logLine = f"SPEECH OUTPUT: '{context.utterance}'"
log.info(logLine) debug.printMessage(debug.LEVEL_INFO, logLine, True)
try: log.info(logLine)
utterance = getattr(context, "utterance", None) try:
if isinstance(utterance, str) and utterance.strip(): utterance = getattr(context, "utterance", None)
speech_history.add(utterance, source="sayAll-fallback") if isinstance(utterance, str) and utterance.strip():
except Exception: speech_history.add(utterance, source="sayAll-fallback") # type: ignore
debug.printException(debug.LEVEL_INFO) except Exception:
debug.printException(debug.LEVEL_INFO)
def _speak(text, acss, interrupt): def _speak(text: str, acss: Optional[Any], interrupt: bool) -> None:
"""Speaks the individual string using the given ACSS.""" """Speaks the individual string using the given ACSS."""
_ensureLogger() _ensureLogger()
@@ -198,7 +227,7 @@ def _speak(text, acss, interrupt):
from . import sleep_mode_manager from . import sleep_mode_manager
if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'): if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'):
sleepModeManager = sleep_mode_manager.getManager() sleepModeManager = sleep_mode_manager.getManager()
if sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app): if sleepModeManager and sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app):
# Allow sleep mode status messages to get through # Allow sleep mode status messages to get through
if "Sleep mode enabled" in text or "Sleep mode disabled" in text: if "Sleep mode enabled" in text or "Sleep mode disabled" in text:
debug.printMessage(debug.LEVEL_INFO, f"SPEECH: Allowing sleep mode status: '{text}'", True) debug.printMessage(debug.LEVEL_INFO, f"SPEECH: Allowing sleep mode status: '{text}'", True)
@@ -207,14 +236,15 @@ def _speak(text, acss, interrupt):
return return
try: try:
speech_history.add(text, source="speak") speech_history.add(text, source="speak") # type: ignore
except Exception: except Exception:
debug.printException(debug.LEVEL_INFO) debug.printException(debug.LEVEL_INFO)
if not _speechserver: if not _speechserver:
logLine = f"SPEECH OUTPUT: '{text}' {acss}" logLine = f"SPEECH OUTPUT: '{text}' {acss}"
debug.printMessage(debug.LEVEL_INFO, logLine, True) debug.printMessage(debug.LEVEL_INFO, logLine, True)
log.info(logLine) if log:
log.info(logLine)
return return
voice = ACSS(settings.voices.get(settings.DEFAULT_VOICE)) voice = ACSS(settings.voices.get(settings.DEFAULT_VOICE))
@@ -227,9 +257,9 @@ def _speak(text, acss, interrupt):
resolvedVoice = __resolveACSS(voice) resolvedVoice = __resolveACSS(voice)
msg = f"SPEECH OUTPUT: '{text}' {resolvedVoice}" msg = f"SPEECH OUTPUT: '{text}' {resolvedVoice}"
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
_speechserver.speak(text, resolvedVoice, interrupt) _speechserver.speak(text, resolvedVoice, interrupt) # type: ignore
def speak(content, acss=None, interrupt=True): def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = True) -> None:
"""Speaks the given content. The content can be either a simple """Speaks the given content. The content can be either a simple
string or an array of arrays of objects returned by a speech string or an array of arrays of objects returned by a speech
generator.""" generator."""
@@ -242,7 +272,7 @@ def speak(content, acss=None, interrupt=True):
from . import sleep_mode_manager from . import sleep_mode_manager
if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'): if cthulhu_state.activeScript and hasattr(cthulhu_state.activeScript, 'app'):
sleepModeManager = sleep_mode_manager.getManager() sleepModeManager = sleep_mode_manager.getManager()
if sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app): if sleepModeManager and sleepModeManager.isActiveForApp(cthulhu_state.activeScript.app):
# Allow sleep mode status messages to get through # Allow sleep mode status messages to get through
if isinstance(content, str): if isinstance(content, str):
if "Sleep mode enabled" in content or "Sleep mode disabled" in content: if "Sleep mode enabled" in content or "Sleep mode disabled" in content:
@@ -265,7 +295,7 @@ def speak(content, acss=None, interrupt=True):
validTypes = (str, list, speech_generator.Pause, validTypes = (str, list, speech_generator.Pause,
speech_generator.LineBreak, ACSS, Icon) speech_generator.LineBreak, ACSS, Icon)
error = "SPEECH: bad content sent to speak(): '%s'" error = "SPEECH: bad content sent to speak(): '%s'"
if not isinstance(content, validTypes): if not isinstance(content, validTypes): # type: ignore
debug.printMessage(debug.LEVEL_INFO, error % content, True) debug.printMessage(debug.LEVEL_INFO, error % content, True)
return return
@@ -288,13 +318,13 @@ def speak(content, acss=None, interrupt=True):
return return
shouldInterrupt = interrupt shouldInterrupt = interrupt
toSpeak = [] toSpeak: List[str] = []
activeVoice = acss activeVoice = acss
if acss is not None: if acss is not None:
activeVoice = ACSS(acss) activeVoice = ACSS(acss)
for element in content: for element in content:
if not isinstance(element, validTypes): if not isinstance(element, validTypes): # type: ignore
debug.printMessage(debug.LEVEL_INFO, error % element, True) debug.printMessage(debug.LEVEL_INFO, error % element, True)
elif isinstance(element, list): elif isinstance(element, list):
speak(element, acss, shouldInterrupt) speak(element, acss, shouldInterrupt)
@@ -310,7 +340,8 @@ def speak(content, acss=None, interrupt=True):
toSpeak = [] toSpeak = []
if element.isValid(): if element.isValid():
player = sound.getPlayer() player = sound.getPlayer()
player.play(element, interrupt=interrupt) if player:
player.play(element, interrupt=interrupt)
elif toSpeak: elif toSpeak:
newVoice = ACSS(acss) newVoice = ACSS(acss)
newItemsToSpeak = [] newItemsToSpeak = []
@@ -338,7 +369,7 @@ def speak(content, acss=None, interrupt=True):
string = " ".join(toSpeak) string = " ".join(toSpeak)
_speak(string, activeVoice, shouldInterrupt) _speak(string, activeVoice, shouldInterrupt)
def speakKeyEvent(event, acss=None): def speakKeyEvent(event: Any, acss: Optional[Any] = None) -> None:
"""Speaks a key event immediately. """Speaks a key event immediately.
Arguments: Arguments:
@@ -362,12 +393,13 @@ def speakKeyEvent(event, acss=None):
msg = f"{keyname} {lockingStateString}" msg = f"{keyname} {lockingStateString}"
logLine = f"SPEECH OUTPUT: '{msg.strip()}' {acss}" logLine = f"SPEECH OUTPUT: '{msg.strip()}' {acss}"
debug.printMessage(debug.LEVEL_INFO, logLine, True) debug.printMessage(debug.LEVEL_INFO, logLine, True)
log.info(logLine) if log:
log.info(logLine)
if _speechserver: if _speechserver:
_speechserver.speakKeyEvent(event, acss) _speechserver.speakKeyEvent(event, acss) # type: ignore
def speakCharacter(character, acss=None): def speakCharacter(character: str, acss: Optional[Any] = None) -> None:
"""Speaks a single character immediately. """Speaks a single character immediately.
Arguments: Arguments:
@@ -393,32 +425,30 @@ def speakCharacter(character, acss=None):
acss = __resolveACSS(acss) acss = __resolveACSS(acss)
tokens = [f"SPEECH OUTPUT: '{character}'", acss] tokens = [f"SPEECH OUTPUT: '{character}'", acss]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
log.info(f"SPEECH OUTPUT: '{character}'") if log:
log.info(f"SPEECH OUTPUT: '{character}'")
if _speechserver: if _speechserver:
_speechserver.speakCharacter(character, acss=acss) _speechserver.speakCharacter(character, acss=acss) # type: ignore
def getInfo(): def getInfo() -> Optional[Any]:
info = None info = None
if _speechserver: if _speechserver:
info = _speechserver.getInfo() info = _speechserver.getInfo() # type: ignore
return info return info
def stop(): def stop() -> None:
if _speechserver: if _speechserver:
_speechserver.stop() _speechserver.stop() # type: ignore
def shutdown(): def shutdown() -> None:
debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Shutting down', True) debug.printMessage(debug.LEVEL_INFO, 'SPEECH: Shutting down', True)
global _speechserver global _speechserver
if _speechserver: if _speechserver:
_speechserver.shutdownActiveServers() _speechserver.shutdownActiveServers() # type: ignore
_speechserver = None _speechserver = None
def reset(text=None, acss=None): def reset(text: Optional[str] = None, acss: Optional[Any] = None) -> None:
if _speechserver: if _speechserver:
_speechserver.reset(text, acss) _speechserver.reset(text, acss) # type: ignore
def getSpeechServer():
return _speechserver