many many type hints added. Lots more to go.

This commit is contained in:
Storm Dragon
2026-01-16 13:01:05 -05:00
parent 73ddc18114
commit b24744b22b
16 changed files with 969 additions and 909 deletions
+1
View File
@@ -36,6 +36,7 @@ gi.require_version("Atspi", "2.0")
from gi.repository import Atspi
from . import braille
from . import cthulhu # Need access to cthulhuApp
from . import debug
from . import generator
from . import messages
+119 -101
View File
@@ -25,6 +25,8 @@
"""The main module for the Cthulhu screen reader."""
from __future__ import annotations
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
@@ -34,22 +36,37 @@ __copyright__ = "Copyright (c) 2004-2009 Sun Microsystems Inc." \
__license__ = "LGPL"
import faulthandler
from typing import TYPE_CHECKING, Any, Callable, Optional
from . import dbus_service
if TYPE_CHECKING:
from types import FrameType
from gi.repository.Gio import Settings as GSettings
class APIHelper:
"""Helper class for plugin API interactions, including keybindings."""
def __init__(self, app):
def __init__(self, app: Cthulhu) -> None:
"""Initialize the APIHelper.
Arguments:
- app: the Cthulhu application
"""
self.app = app
self._gestureBindings = {}
self.app: Cthulhu = app
self._gestureBindings: dict[Optional[str], list[Any]] = {}
def registerGestureByString(self, function, name, gestureString, inputEventType='default', normalizer='cthulhu', learnModeEnabled=True, contextName=None, globalBinding=False):
def registerGestureByString(
self,
function: Callable[..., Any],
name: str,
gestureString: str,
inputEventType: str = 'default',
normalizer: str = 'cthulhu',
learnModeEnabled: bool = True,
contextName: Optional[str] = None,
globalBinding: bool = False
) -> Optional[Any]:
"""Register a gesture by string."""
import logging
logger = logging.getLogger(__name__)
@@ -100,12 +117,12 @@ class APIHelper:
# Create a keybinding handler
class GestureHandler:
def __init__(self, function, description, learnModeEnabled=True):
self.function = function
self.description = description
self.learnModeEnabled = learnModeEnabled
def __init__(self, function: Callable[..., Any], description: str, learnModeEnabled: bool = True) -> None:
self.function: Callable[..., Any] = function
self.description: str = description
self.learnModeEnabled: bool = learnModeEnabled
def __call__(self, script, inputEvent):
def __call__(self, script: Any, inputEvent: Any) -> bool:
try:
logger.info(f"=== Plugin keybinding handler called! ===")
return function(script, inputEvent)
@@ -173,14 +190,14 @@ class APIHelper:
return None
def unregisterShortcut(self, binding, contextName=None):
def unregisterShortcut(self, binding: Any, contextName: Optional[str] = None) -> None:
"""Unregister a previously registered shortcut.
Arguments:
- binding: the binding to unregister
- contextName: the context for this gesture
"""
removed_via_plugin_manager = False
removed_via_plugin_manager: bool = False
if contextName and self.app:
try:
plugin_manager = self.app.getPluginSystemManager()
@@ -221,7 +238,7 @@ from gi.repository import GObject
try:
from gi.repository.Gio import Settings
a11yAppSettings = Settings(schema_id='org.gnome.desktop.a11y.applications')
a11yAppSettings: Optional[GSettings] = Settings(schema_id='org.gnome.desktop.a11y.applications')
except Exception:
a11yAppSettings = None
@@ -266,20 +283,20 @@ from . import resource_manager
# Old global variables removed - now using cthulhuApp.* instead
def onEnabledChanged(gsetting, key):
def onEnabledChanged(gsetting: GSettings, key: str) -> None:
try:
enabled = gsetting.get_boolean(key)
enabled: bool = gsetting.get_boolean(key)
except Exception:
return
if key == 'screen-reader-enabled' and not enabled:
shutdown()
EXIT_CODE_HANG = 50
EXIT_CODE_HANG: int = 50
# The user-settings module (see loadUserSettings).
#
_userSettings = None
_userSettings: Optional[Any] = None
########################################################################
# #
@@ -287,29 +304,29 @@ _userSettings = None
# #
########################################################################
CARET_TRACKING = "caret-tracking"
FOCUS_TRACKING = "focus-tracking"
FLAT_REVIEW = "flat-review"
MOUSE_REVIEW = "mouse-review"
OBJECT_NAVIGATOR = "object-navigator"
SAY_ALL = "say-all"
CARET_TRACKING: str = "caret-tracking"
FOCUS_TRACKING: str = "focus-tracking"
FLAT_REVIEW: str = "flat-review"
MOUSE_REVIEW: str = "mouse-review"
OBJECT_NAVIGATOR: str = "object-navigator"
SAY_ALL: str = "say-all"
def getActiveModeAndObjectOfInterest():
def getActiveModeAndObjectOfInterest() -> tuple[str, Any]:
return focus_manager.get_manager().get_active_mode_and_object_of_interest()
def emitRegionChanged(obj, startOffset=None, endOffset=None, mode=None):
def emitRegionChanged(obj: Any, startOffset: Optional[int] = None, endOffset: Optional[int] = None, mode: Optional[str] = None) -> None:
"""Notifies interested clients that the current region of interest has changed."""
focus_manager.get_manager().emit_region_changed(obj, startOffset, endOffset, mode)
def setActiveWindow(frame, app=None, alsoSetLocusOfFocus=False, notifyScript=False):
real_app = app
real_frame = frame
def setActiveWindow(frame: Any, app: Optional[Any] = None, alsoSetLocusOfFocus: bool = False, notifyScript: bool = False) -> None:
real_app: Optional[Any] = app
real_frame: Any = frame
if frame is not None and hasattr(AXObject, "find_real_app_and_window_for"):
real_app, real_frame = AXObject.find_real_app_and_window_for(frame, app)
focus_manager.get_manager().set_active_window(
real_frame, real_app, set_window_as_focus=alsoSetLocusOfFocus, notify_script=notifyScript)
def setLocusOfFocus(event, obj, notifyScript=True, force=False):
def setLocusOfFocus(event: Optional[Any], obj: Any, notifyScript: bool = True, force: bool = False) -> None:
"""Sets the locus of focus (i.e., the object with visual focus) and
notifies the script of the change should the script wish to present
the change to the user.
@@ -331,7 +348,7 @@ def setLocusOfFocus(event, obj, notifyScript=True, force=False):
# #
########################################################################
def _processBrailleEvent(event):
def _processBrailleEvent(event: Any) -> bool:
"""Called whenever a key is pressed on the Braille display.
Arguments:
@@ -340,7 +357,7 @@ def _processBrailleEvent(event):
Returns True if the event was consumed; otherwise False
"""
consumed = False
consumed: bool = False
# Braille key presses always interrupt speech.
#
@@ -368,16 +385,16 @@ def _processBrailleEvent(event):
# #
########################################################################
def deviceChangeHandler(deviceManager, device):
def deviceChangeHandler(deviceManager: Any, device: Any) -> None:
"""New keyboards being plugged in stomp on our changes to the keymappings,
so we have to re-apply"""
source = device.get_source()
if source == Gdk.InputSource.KEYBOARD:
msg = "CTHULHU: Keyboard change detected, re-creating the xmodmap"
msg: str = "CTHULHU: Keyboard change detected, re-creating the xmodmap"
debug.printMessage(debug.LEVEL_INFO, msg, True)
cthulhu_modifier_manager.getManager().refreshCthulhuModifiers("Keyboard change detected.")
def loadUserSettings(script=None, inputEvent=None, skipReloadMessage=False):
def loadUserSettings(script: Optional[Any] = None, inputEvent: Optional[Any] = None, skipReloadMessage: bool = False) -> bool:
"""Loads (and reloads) the user settings module, reinitializing
things such as speech if necessary.
@@ -398,7 +415,7 @@ def loadUserSettings(script=None, inputEvent=None, skipReloadMessage=False):
cthulhuApp.scriptManager.deactivate()
cthulhuApp.getSignalManager().emitSignal('load-setting-begin')
reloaded = False
reloaded: bool = False
if _userSettings:
_profile = cthulhuApp.settingsManager.getSetting('activeProfile')[1]
try:
@@ -491,7 +508,7 @@ def loadUserSettings(script=None, inputEvent=None, skipReloadMessage=False):
return True
def _showPreferencesUI(script, prefs):
def _showPreferencesUI(script: Any, prefs: dict[str, Any]) -> None:
if cthulhu_state.cthulhuOS:
cthulhu_state.cthulhuOS.showGUI()
return
@@ -502,7 +519,7 @@ def _showPreferencesUI(script, prefs):
debug.printException(debug.LEVEL_SEVERE)
return
uiFile = os.path.join(cthulhu_platform.datadir,
uiFile: str = os.path.join(cthulhu_platform.datadir,
cthulhu_platform.package,
"ui",
"cthulhu-setup.ui")
@@ -511,7 +528,7 @@ def _showPreferencesUI(script, prefs):
cthulhu_state.cthulhuOS.init(script)
cthulhu_state.cthulhuOS.showGUI()
def showAppPreferencesGUI(script=None, inputEvent=None):
def showAppPreferencesGUI(script: Optional[Any] = None, inputEvent: Optional[Any] = None) -> bool:
"""Displays the user interface to configure the settings for a
specific applications within Cthulhu and set up those app-specific
user preferences using a GUI.
@@ -519,7 +536,7 @@ def showAppPreferencesGUI(script=None, inputEvent=None):
Returns True to indicate the input event has been consumed.
"""
prefs = {}
prefs: dict[str, Any] = {}
for key in settings.userCustomizableSettings:
prefs[key] = cthulhuApp.settingsManager.getSetting(key)
@@ -528,36 +545,36 @@ def showAppPreferencesGUI(script=None, inputEvent=None):
return True
def showPreferencesGUI(script=None, inputEvent=None):
def showPreferencesGUI(script: Optional[Any] = None, inputEvent: Optional[Any] = None) -> bool:
"""Displays the user interface to configure Cthulhu and set up
user preferences using a GUI.
Returns True to indicate the input event has been consumed.
"""
prefs = cthulhuApp.settingsManager.getGeneralSettings(cthulhuApp.settingsManager.profile)
prefs: dict[str, Any] = cthulhuApp.settingsManager.getGeneralSettings(cthulhuApp.settingsManager.profile)
script = cthulhuApp.scriptManager.get_default_script()
_showPreferencesUI(script, prefs)
return True
def addKeyGrab(binding):
def addKeyGrab(binding: Any) -> list[int]:
""" Add a key grab for the given key binding."""
manager = input_event_manager.get_manager()
return manager.add_grabs_for_keybinding(binding)
def removeKeyGrab(id):
def removeKeyGrab(id: int) -> None:
""" Remove the key grab for the given key binding."""
manager = input_event_manager.get_manager()
manager.remove_grab_by_id(id)
def mapModifier(keycode):
def mapModifier(keycode: int) -> int:
manager = input_event_manager.get_manager()
return manager.map_keycode_to_modifier(keycode)
def quitCthulhu(script=None, inputEvent=None):
def quitCthulhu(script: Optional[Any] = None, inputEvent: Optional[Any] = None) -> bool:
"""Quit Cthulhu. Check if the user wants to confirm this action.
If so, show the confirmation GUI otherwise just shutdown.
@@ -568,7 +585,7 @@ def quitCthulhu(script=None, inputEvent=None):
return True
def showFindGUI(script=None, inputEvent=None):
def showFindGUI(script: Optional[Any] = None, inputEvent: Optional[Any] = None) -> None:
"""Displays the user interface to perform an Cthulhu Find.
Returns True to indicate the input event has been consumed.
@@ -582,9 +599,9 @@ def showFindGUI(script=None, inputEvent=None):
# If True, this module has been initialized.
#
_initialized = False
_initialized: bool = False
def init():
def init() -> bool:
"""Initialize the cthulhu module, which initializes the speech and braille
modules. Also builds up the application list, registers for AT-SPI events,
and creates scripts for all known applications.
@@ -627,7 +644,7 @@ def init():
return True
def _start_dbus_service():
def _start_dbus_service() -> bool:
"""Starts the D-Bus remote controller service in an idle callback."""
debug.printMessage(debug.LEVEL_INFO, 'CTHULHU: Starting D-Bus remote controller', True)
try:
@@ -652,11 +669,11 @@ def _start_dbus_service():
dbus_service.get_remote_controller().register_decorated_module("PluginSystemManager", plugin_manager)
except Exception as e:
msg = f"CTHULHU: Failed to start D-Bus service: {e}"
msg: str = f"CTHULHU: Failed to start D-Bus service: {e}"
debug.printMessage(debug.LEVEL_SEVERE, msg, True)
return False # Remove the idle callback
def start():
def start() -> None:
"""Starts Cthulhu."""
debug.printMessage(debug.LEVEL_INFO, 'CTHULHU: Starting', True)
@@ -693,8 +710,8 @@ def start():
Atspi.event_main()
def die(exitCode=1):
pid = os.getpid()
def die(exitCode: int = 1) -> None:
pid: int = os.getpid()
if exitCode == EXIT_CODE_HANG:
# Someting is hung and we wish to abort.
os.kill(pid, signal.SIGKILL)
@@ -705,14 +722,14 @@ def die(exitCode=1):
if exitCode > 1:
os.kill(pid, signal.SIGTERM)
def timeout(signum=None, frame=None):
msg = 'TIMEOUT: something has hung. Aborting.'
def timeout(signum: Optional[int] = None, frame: Optional[FrameType] = None) -> None:
msg: str = 'TIMEOUT: something has hung. Aborting.'
debug.printMessage(debug.LEVEL_SEVERE, msg, True)
debug.printStack(debug.LEVEL_SEVERE)
debug.examineProcesses(force=True)
die(EXIT_CODE_HANG)
def shutdown(script=None, inputEvent=None):
def shutdown(script: Optional[Any] = None, inputEvent: Optional[Any] = None) -> bool:
"""Exits Cthulhu. Unregisters any event listeners and cleans up.
Returns True if the shutdown procedure ran or False if this module
@@ -769,12 +786,13 @@ def shutdown(script=None, inputEvent=None):
return True
exitCount = 0
def shutdownOnSignal(signum, frame):
exitCount: int = 0
def shutdownOnSignal(signum: int, frame: Optional[FrameType]) -> None:
global exitCount
signalString = f'({signal.strsignal(signum)})'
msg = f"CTHULHU: Shutting down and exiting due to signal={signum} {signalString}"
signalString: str = f'({signal.strsignal(signum)})'
msg: str = f"CTHULHU: Shutting down and exiting due to signal={signum} {signalString}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
# Well...we'll try to exit nicely, but if we keep getting called,
@@ -800,7 +818,7 @@ def shutdownOnSignal(signum, frame):
#
speech.shutdown()
shutdown()
cleanExit = True
cleanExit: bool = True
except Exception:
cleanExit = False
@@ -810,28 +828,28 @@ def shutdownOnSignal(signum, frame):
if not cleanExit:
die(EXIT_CODE_HANG)
def crashOnSignal(signum, frame):
signalString = f'({signal.strsignal(signum)})'
msg = f"CTHULHU: Shutting down and exiting due to signal={signum} {signalString}"
def crashOnSignal(signum: int, frame: Optional[FrameType]) -> None:
signalString: str = f'({signal.strsignal(signum)})'
msg: str = f"CTHULHU: Shutting down and exiting due to signal={signum} {signalString}"
debug.printMessage(debug.LEVEL_SEVERE, msg, True)
debug.printStack(debug.LEVEL_SEVERE)
cthulhu_modifier_manager.getManager().unsetCthulhuModifiers("Crashed")
sys.exit(1)
def main():
def main() -> int:
"""The main entry point for Cthulhu. The exit codes for Cthulhu will
loosely be based on signals, where the exit code will be the
signal used to terminate Cthulhu (if a signal was used). Otherwise,
an exit code of 0 means normal completion and an exit code of 50
means Cthulhu exited because of a hang."""
msg = f"CTHULHU: Launching version {cthulhu_platform.version}"
msg: str = f"CTHULHU: Launching version {cthulhu_platform.version}"
if cthulhu_platform.revision:
msg += f" (rev {cthulhu_platform.revision})"
sessionType = os.environ.get('XDG_SESSION_TYPE') or ""
sessionDesktop = os.environ.get('XDG_SESSION_DESKTOP') or ""
session = "%s %s".strip() % (sessionType, sessionDesktop)
sessionType: str = os.environ.get('XDG_SESSION_TYPE') or ""
sessionDesktop: str = os.environ.get('XDG_SESSION_DESKTOP') or ""
session: str = "%s %s".strip() % (sessionType, sessionDesktop)
if session:
msg += f" session: {session}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -896,7 +914,7 @@ def main():
class Cthulhu(GObject.Object):
# basic signals
__gsignals__ = {
__gsignals__: dict[str, tuple[Any, ...]] = {
"start-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()),
"stop-application-completed": (GObject.SignalFlags.RUN_LAST, None, ()),
"load-setting-begin": (GObject.SignalFlags.RUN_LAST, None, ()),
@@ -906,56 +924,56 @@ class Cthulhu(GObject.Object):
"request-application-preferences": (GObject.SignalFlags.RUN_LAST, None, ()),
"active-script-changed": (GObject.SignalFlags.RUN_LAST, None, (GObject.TYPE_PYOBJECT,)), # New signal to indicate active script change
}
def __init__(self):
def __init__(self) -> None:
GObject.Object.__init__(self)
# add members
self.resourceManager = resource_manager.ResourceManager(self)
self.settingsManager = settings_manager.SettingsManager(self) # Directly instantiate
self.eventManager = event_manager.EventManager(self) # Directly instantiate
self.scriptManager = script_manager.ScriptManager(self) # Directly instantiate
self.logger = logger.Logger() # Directly instantiate
self.signalManager = signal_manager.SignalManager(self)
self.dynamicApiManager = dynamic_api_manager.DynamicApiManager(self)
self.translationManager = translation_manager.TranslationManager(self)
self.debugManager = debug
self.APIHelper = APIHelper(self)
self.resourceManager: Any = resource_manager.ResourceManager(self)
self.settingsManager: Any = settings_manager.SettingsManager(self) # Directly instantiate
self.eventManager: Any = event_manager.EventManager(self) # Directly instantiate
self.scriptManager: Any = script_manager.ScriptManager(self) # Directly instantiate
self.logger: Any = logger.Logger() # Directly instantiate
self.signalManager: Any = signal_manager.SignalManager(self)
self.dynamicApiManager: Any = dynamic_api_manager.DynamicApiManager(self)
self.translationManager: Any = translation_manager.TranslationManager(self)
self.debugManager: Any = debug
self.APIHelper: APIHelper = APIHelper(self)
self.createCompatAPI()
self.pluginSystemManager = plugin_system_manager.PluginSystemManager(self)
self.pluginSystemManager: Any = plugin_system_manager.PluginSystemManager(self)
# Scan for available plugins at startup
self.pluginSystemManager.rescanPlugins()
def getAPIHelper(self):
def getAPIHelper(self) -> APIHelper:
return self.APIHelper
def getPluginSystemManager(self):
def getPluginSystemManager(self) -> Any:
return self.pluginSystemManager
def getDynamicApiManager(self):
def getDynamicApiManager(self) -> Any:
return self.dynamicApiManager
def getSignalManager(self):
def getSignalManager(self) -> Any:
return self.signalManager
def getEventManager(self):
def getEventManager(self) -> Any:
return self.eventManager
def getSettingsManager(self):
def getSettingsManager(self) -> Any:
return self.settingsManager
def getScriptManager(self):
def getScriptManager(self) -> Any:
return self.scriptManager
def get_scriptManager(self):
def get_scriptManager(self) -> Any:
return self.scriptManager
def getDebugManager(self):
def getDebugManager(self) -> Any:
return self.debugManager
def getTranslationManager(self):
def getTranslationManager(self) -> Any:
return self.translationManager
def getResourceManager(self):
def getResourceManager(self) -> Any:
return self.resourceManager
def getLogger(self): # New getter for the logger
def getLogger(self) -> Any: # New getter for the logger
return self.logger
def addKeyGrab(self, binding):
def addKeyGrab(self, binding: Any) -> list[int]:
return addKeyGrab(binding)
def removeKeyGrab(self, grab_id):
def removeKeyGrab(self, grab_id: int) -> None:
return removeKeyGrab(grab_id)
def run(self, cacheValues=True):
return main(cacheValues)
def stop(self):
def run(self, cacheValues: bool = True) -> int:
return main()
def stop(self) -> None:
pass
def createCompatAPI(self):
def createCompatAPI(self) -> None:
# for now add compatibility layer using Dynamic API
# should be removed step by step
# use clean objects, getters and setters instead
@@ -990,9 +1008,9 @@ class Cthulhu(GObject.Object):
self.getDynamicApiManager().registerAPI('LoadUserSettings', loadUserSettings)
self.getDynamicApiManager().registerAPI('APIHelper', self.APIHelper)
cthulhuApp = Cthulhu()
cthulhuApp: Cthulhu = Cthulhu()
def getManager():
def getManager() -> Cthulhu:
return cthulhuApp
if __name__ == "__main__":
+19 -13
View File
@@ -26,6 +26,8 @@
"""Holds state that is shared among many modules.
"""
from typing import Optional, Any
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
@@ -36,54 +38,58 @@ __license__ = "LGPL"
# NOTE: resist the temptation to do any imports here. They can
# easily cause circular imports.
#
# We use forward references in type hints to avoid importing:
# - Atspi.Accessible for focus and window
# - Script for activeScript
# - InputEvent for lastInputEvent
# The Accessible that has visual focus.
#
locusOfFocus = None
locusOfFocus: Optional[Any] = None # Actually: Optional[Atspi.Accessible]
# The currently active window.
#
activeWindow = None
activeWindow: Optional[Any] = None # Actually: Optional[Atspi.Accessible]
# The currently active script.
#
activeScript = None
activeScript: Optional[Any] = None # Actually: Optional[Script]
# The currently active mode (focus, say all, flat review, etc.) and obj
activeMode = None
objOfInterest = None
activeMode: Optional[str] = None
objOfInterest: Optional[Any] = None # Actually: Optional[Atspi.Accessible]
# Used to capture keys to redefine key bindings by the user.
#
capturingKeys = False
capturingKeys: bool = False
# The last non-modifier key event received.
#
lastNonModifierKeyEvent = None
lastNonModifierKeyEvent: Optional[Any] = None # Actually: Optional[KeyboardEvent]
# The InputEvent instance representing the last input event. This is
# set each time a mouse, keyboard or braille event is received.
#
lastInputEvent = None
lastInputEvent: Optional[Any] = None # Actually: Optional[InputEvent]
# Used to determine if the user wishes Cthulhu to pass the next command
# along to the current application rather than consuming it.
#
bypassNextCommand = False
bypassNextCommand: bool = False
# The last searchQuery
#
searchQuery = None
searchQuery: Optional[str] = None
# Handle to the Cthulhu Preferences Glade GUI object.
#
cthulhuOS = None
cthulhuOS: Optional[Any] = None # Actually: Optional[CthulhuSetupGUI]
# Set to True if the last key opened the preferences dialog
#
openingDialog = False
openingDialog: bool = False
# The AT-SPI device (needed for key grabs). Will be set to None if AT-SPI
# is too old to support the new device API.
#
device = None
device: Optional[Any] = None # Actually: Optional[Atspi.Device]
+13 -11
View File
@@ -33,6 +33,7 @@ __copyright__ = "Copyright (c) 2005-2008 Sun Microsystems Inc." \
__license__ = "LGPL"
import time
from typing import Optional, Dict, Callable, Any
from . import cthulhu # Need access to cthulhuApp
from . import cmdnames
@@ -45,21 +46,21 @@ _settingsManager = None # Removed - use cthulhu.cthulhuApp.settingsManager
class DateAndTimePresenter:
"""Provides commands to present the date and time."""
def __init__(self):
self._handlers = self._setup_handlers()
self._bindings = self._setup_bindings()
def __init__(self) -> None:
self._handlers: Dict[str, input_event.InputEventHandler] = self._setup_handlers()
self._bindings: keybindings.KeyBindings = self._setup_bindings()
def get_bindings(self):
def get_bindings(self) -> keybindings.KeyBindings:
"""Returns the date-and-time-presenter keybindings."""
return self._bindings
def get_handlers(self):
def get_handlers(self) -> Dict[str, input_event.InputEventHandler]:
"""Returns the date-and-time-presenter handlers."""
return self._handlers
def _setup_handlers(self):
def _setup_handlers(self) -> Dict[str, input_event.InputEventHandler]:
"""Sets up and returns the date-and-time-presenter input event handlers."""
handlers = {}
@@ -76,7 +77,7 @@ class DateAndTimePresenter:
return handlers
def _setup_bindings(self):
def _setup_bindings(self) -> keybindings.KeyBindings:
"""Sets up and returns the date-and-time-presenter key bindings."""
bindings = keybindings.KeyBindings()
@@ -99,14 +100,14 @@ class DateAndTimePresenter:
return bindings
def present_time(self, script, event=None):
def present_time(self, script: Any, event: Optional[Any] = None) -> bool:
"""Presents the current time."""
format = cthulhu.cthulhuApp.settingsManager.getSetting('presentTimeFormat')
script.presentMessage(time.strftime(format, time.localtime()))
return True
def present_date(self, script, event=None):
def present_date(self, script: Any, event: Optional[Any] = None) -> bool:
"""Presents the current date."""
format = cthulhu.cthulhuApp.settingsManager.getSetting('presentDateFormat')
@@ -114,8 +115,9 @@ class DateAndTimePresenter:
return True
_presenter = None
def getPresenter():
_presenter: Optional[DateAndTimePresenter] = None
def getPresenter() -> DateAndTimePresenter:
global _presenter
if _presenter is None:
_presenter = DateAndTimePresenter()
+50 -40
View File
@@ -28,6 +28,8 @@ level, which is held in the debugLevel field. All other methods take
a debug level, which is compared to the current debug level to
determine if the content should be output."""
from __future__ import annotations
__id__ = "$Id$"
__version__ = "$Revision$"
__date__ = "$Date$"
@@ -41,16 +43,19 @@ import re
import subprocess
import sys
import types
from typing import Any, Optional, TextIO, Pattern, TYPE_CHECKING
from datetime import datetime
import gi
gi.require_version("Atspi", "2.0")
from gi.repository import Atspi
AXObject = None
if TYPE_CHECKING:
from .ax_object import AXObject as AXObjectType
def _get_ax_object():
AXObject: Optional[type[AXObjectType]] = None
def _get_ax_object() -> type[AXObjectType]:
global AXObject
if AXObject is None:
from .ax_object import AXObject as ax_object
@@ -117,14 +122,14 @@ LEVEL_FINEST = 400
#
LEVEL_ALL = 0
debugLevel = LEVEL_SEVERE
debugLevel: int = LEVEL_SEVERE
# The debug file. If this is not set, then all debug output is done
# via stdout. If this is set, then all debug output is sent to the
# file. This can be useful for debugging because one can pass in a
# non-buffered file to better track down hangs.
#
debugFile = None
debugFile: Optional[TextIO] = None
# The debug filter should be either None (which means to match all
# events) or a compiled regular expression from the 're' module (see
@@ -135,14 +140,14 @@ debugFile = None
#
# debug.eventDebugFilter = rc.compile('focus:|window:activate')
#
eventDebugLevel = LEVEL_FINEST
eventDebugFilter = None
eventDebugLevel: int = LEVEL_FINEST
eventDebugFilter: Optional[Pattern[str]] = None
# If True, we output debug information for the event queue. We
# use this in addition to log level to prevent debug logic from
# bogging down event handling.
#
debugEventQueue = False
debugEventQueue: bool = False
# What module(s) should be traced if traceit is being used. By default
# we'll just attend to ourself. (And by default, we will not enable
@@ -152,17 +157,17 @@ debugEventQueue = False
# specific, immediate issue. Trust me. :-) Disabling braille monitor in
# this case is also strongly advised.
#
TRACE_MODULES = ['cthulhu']
TRACE_MODULES: list[str] = ['cthulhu']
# Specific modules to ignore with traceit.
#
TRACE_IGNORE_MODULES = ['traceback', 'linecache', 'locale', 'gettext',
TRACE_IGNORE_MODULES: list[str] = ['traceback', 'linecache', 'locale', 'gettext',
'logging', 'UserDict', 'encodings', 'posixpath',
'genericpath', 're']
# Specific apps to trace with traceit.
#
TRACE_APPS = []
TRACE_APPS: list[str] = []
# What AT-SPI event(s) should be traced if traceit is being used. By
# default, we'll trace everything. Examples of what you might wish to
@@ -173,7 +178,7 @@ TRACE_APPS = []
# TRACE_EVENTS = ['object:state-changed:selected']
# (if you know the exact event type of interest)
#
TRACE_EVENTS = []
TRACE_EVENTS: list[str] = []
# What role(s) should be traced if traceit is being used. By
# default, we'll trace everything. An example of what you might wish
@@ -181,17 +186,17 @@ TRACE_EVENTS = []
#
# TRACE_ROLES = [Atspi.Role.PUSH_BUTTON, Atspi.Role.TOGGLE_BUTTON]
#
TRACE_ROLES = []
TRACE_ROLES: list[Atspi.Role] = []
# Whether or not traceit should only trace the work being done when
# processing an actual event. This is when most bad things happen.
# So we'll default to True.
#
TRACE_ONLY_PROCESSING_EVENTS = True
TRACE_ONLY_PROCESSING_EVENTS: bool = True
objEvent = None
objEvent: Optional[Atspi.Event] = None
def printException(level):
def printException(level: int) -> None:
"""Prints out information regarding the current exception.
Arguments:
@@ -203,7 +208,7 @@ def printException(level):
traceback.print_exc(100, debugFile)
println(level)
def printStack(level):
def printStack(level: int) -> None:
"""Prints out the current stack.
Arguments:
@@ -215,7 +220,7 @@ def printStack(level):
traceback.print_stack(None, 100, debugFile)
println(level)
def _asString(obj):
def _asString(obj: Any) -> str:
AXObject = _get_ax_object()
if isinstance(obj, Atspi.Accessible):
result = AXObject.get_role_name(obj)
@@ -255,45 +260,47 @@ def _asString(obj):
return str(obj)
def _format_tokens(tokens):
def _format_tokens(tokens: list[Any]) -> str:
text = " ".join(map(_asString, tokens))
text = re.sub(r"[ \u00A0]+", " ", text)
text = re.sub(r" (?=[,.:)])(?![\n])", "", text)
return text
def format_log_message(prefix, message, reason=None):
def format_log_message(prefix: str, message: str, reason: Optional[str] = None) -> str:
text = f"{prefix}: {message}"
if reason:
text = f"{text} (reason={reason})"
return text
def print_log(level, prefix, message, reason=None, timestamp=False, stack=False):
def print_log(level: int, prefix: str, message: str, reason: Optional[str] = None,
timestamp: bool = False, stack: bool = False) -> None:
text = format_log_message(prefix, message, reason)
printMessage(level, text, timestamp, stack)
def print_log_tokens(level, prefix, tokens, reason=None, timestamp=False, stack=False):
def print_log_tokens(level: int, prefix: str, tokens: list[Any], reason: Optional[str] = None,
timestamp: bool = False, stack: bool = False) -> None:
text = format_log_message(prefix, _format_tokens(tokens), reason)
printMessage(level, text, timestamp, stack)
def printTokens(level, tokens, timestamp=False, stack=False):
def printTokens(level: int, tokens: list[Any], timestamp: bool = False, stack: bool = False) -> None:
if level < debugLevel:
return
println(level, _format_tokens(tokens), timestamp, stack)
def print_tokens(level, tokens, timestamp=False, stack=False):
def print_tokens(level: int, tokens: list[Any], timestamp: bool = False, stack: bool = False) -> None:
return printTokens(level, tokens, timestamp, stack)
def printMessage(level, text, timestamp=False, stack=False):
def printMessage(level: int, text: str, timestamp: bool = False, stack: bool = False) -> None:
if level < debugLevel:
return
println(level, text, timestamp, stack)
def print_message(level, text, timestamp=False, stack=False):
def print_message(level: int, text: str, timestamp: bool = False, stack: bool = False) -> None:
return printMessage(level, text, timestamp, stack)
def _stackAsString(max_frames=4):
def _stackAsString(max_frames: int = 4) -> str:
callers = []
current_module = inspect.getmodule(inspect.currentframe())
stack = inspect.stack()
@@ -313,7 +320,7 @@ def _stackAsString(max_frames=4):
callers.reverse()
return " > ".join(map(_asString, callers))
def println(level, text="", timestamp=False, stack=False):
def println(level: int, text: str = "", timestamp: bool = False, stack: bool = False) -> None:
"""Prints the text to stderr unless debug is enabled.
If debug is enabled the text will be redirected to the
@@ -357,7 +364,7 @@ def println(level, text="", timestamp=False, stack=False):
sys.stderr.writelines([text, "\n"])
sys.stderr.flush()
def printResult(level, result=None):
def printResult(level: int, result: Any = None) -> None:
"""Prints the return result, along with information about the
method, arguments, and any errors encountered."""
@@ -380,7 +387,8 @@ def printResult(level, result=None):
string = f'{callString}\nRESULT: {result}'
println(level, f'{string}')
def printObjectEvent(level, event, sourceInfo=None, timestamp=False):
def printObjectEvent(level: int, event: Atspi.Event, sourceInfo: Optional[str] = None,
timestamp: bool = False) -> None:
"""Prints out an Python Event object. The given level may be
overridden if the eventDebugLevel is greater. Furthermore, only
events with event types matching the eventDebugFilter regular
@@ -408,7 +416,7 @@ def printObjectEvent(level, event, sourceInfo=None, timestamp=False):
if sourceInfo:
println(level, f"{' ' * 18}{sourceInfo}", timestamp)
def printInputEvent(level, string, timestamp=False):
def printInputEvent(level: int, string: str, timestamp: bool = False) -> None:
"""Prints out an input event. The given level may be overridden
if the eventDebugLevel (see setEventDebugLevel) is greater.
@@ -419,7 +427,8 @@ def printInputEvent(level, string, timestamp=False):
println(max(level, eventDebugLevel), string, timestamp)
def printDetails(level, indent, accessible, includeApp=True, timestamp=False):
def printDetails(level: int, indent: str, accessible: Atspi.Accessible,
includeApp: bool = True, timestamp: bool = False) -> None:
"""Lists the details of the given accessible with the given
indentation.
@@ -435,7 +444,8 @@ def printDetails(level, indent, accessible, includeApp=True, timestamp=False):
getAccessibleDetails(level, accessible, indent, includeApp),
timestamp)
def getAccessibleDetails(level, acc, indent="", includeApp=True):
def getAccessibleDetails(level: int, acc: Atspi.Accessible, indent: str = "",
includeApp: bool = True) -> str:
"""Returns a string, suitable for printing, that describes the
given accessible.
@@ -458,7 +468,7 @@ def getAccessibleDetails(level, acc, indent="", includeApp=True):
#
import linecache
def _getFileAndModule(frame):
def _getFileAndModule(frame: types.FrameType) -> tuple[Optional[str], Optional[str]]:
filename, module = None, None
try:
filename = frame.f_globals["__file__"]
@@ -471,7 +481,7 @@ def _getFileAndModule(frame):
return filename, module
def _shouldTraceIt():
def _shouldTraceIt() -> bool:
AXObject = _get_ax_object()
if not objEvent:
return not TRACE_ONLY_PROCESSING_EVENTS
@@ -491,7 +501,7 @@ def _shouldTraceIt():
return True
def traceit(frame, event, arg):
def traceit(frame: types.FrameType, event: str, arg: Any) -> Optional[object]:
"""Line tracing utility to output all lines as they are executed by
the interpreter. This is to be used by sys.settrace and is for
debugging purposes.
@@ -543,14 +553,14 @@ def traceit(frame, event, arg):
return traceit
def getOpenFDCount(pid):
def getOpenFDCount(pid: int) -> int:
procs = subprocess.check_output([ 'lsof', '-w', '-Ff', '-p', str(pid)])
procs = procs.decode('UTF-8').split('\n')
files = list(filter(lambda s: s and s[0] == 'f' and s[1:].isdigit(), procs))
return len(files)
def getCmdline(pid):
def getCmdline(pid: int) -> str:
try:
openFile = os.popen(f'cat /proc/{pid}/cmdline')
cmdline = openFile.read()
@@ -561,7 +571,7 @@ def getCmdline(pid):
return cmdline
def pidOf(procName):
def pidOf(procName: str) -> list[int]:
openFile = subprocess.Popen(f'pgrep {procName}',
shell=True,
stdout=subprocess.PIPE).stdout
@@ -569,7 +579,7 @@ def pidOf(procName):
openFile.close()
return [int(p) for p in pids.split()]
def examineProcesses(force=False):
def examineProcesses(force: bool = False) -> None:
AXObject = _get_ax_object()
from .ax_utilities import AXUtilities
if force:
+58 -57
View File
@@ -36,6 +36,7 @@ from gi.repository import GLib
import queue
import threading
import time
from typing import Optional, Dict, List, Tuple, Any
from . import cthulhu
from . import debug
@@ -49,25 +50,25 @@ from .ax_utilities import AXUtilities
class EventManager:
EMBEDDED_OBJECT_CHARACTER = '\ufffc'
EMBEDDED_OBJECT_CHARACTER: str = '\ufffc'
def __init__(self, app, asyncMode=True):
def __init__(self, app: Any, asyncMode: bool = True) -> None: # app is CthulhuApp instance
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Initializing', True)
debug.printMessage(debug.LEVEL_INFO, f'EVENT MANAGER: Async Mode is {asyncMode}', True)
self.app = app
self._asyncMode = asyncMode
self._scriptListenerCounts = {}
self._active = False
self._enqueueCount = 0
self._dequeueCount = 0
self._cmdlineCache = {}
self._eventQueue = queue.Queue(0)
self._gidleId = 0
self._gidleLock = threading.Lock()
self._gilSleepTime = 0.00001
self._synchronousToolkits = ['VCL']
self._eventsSuspended = False
self._listener = Atspi.EventListener.new(self._enqueue)
self.app: Any = app
self._asyncMode: bool = asyncMode
self._scriptListenerCounts: Dict[str, int] = {}
self._active: bool = False
self._enqueueCount: int = 0
self._dequeueCount: int = 0
self._cmdlineCache: Dict[int, str] = {}
self._eventQueue: queue.Queue = queue.Queue(0)
self._gidleId: int = 0
self._gidleLock: threading.Lock = threading.Lock()
self._gilSleepTime: float = 0.00001
self._synchronousToolkits: List[str] = ['VCL']
self._eventsSuspended: bool = False
self._listener: Atspi.EventListener = Atspi.EventListener.new(self._enqueue)
# Note: These must match what the scripts registered for, otherwise
# Atspi might segfault.
@@ -75,24 +76,24 @@ class EventManager:
# Events we don't want to suspend include:
# object:text-changed:insert - marco
# object:property-change:accessible-name - gnome-shell issue #6925
self._suspendableEvents = ['object:children-changed:add',
'object:children-changed:remove',
'object:state-changed:sensitive',
'object:state-changed:showing',
'object:text-changed:delete']
self._eventsTriggeringSuspension = []
self._ignoredEvents = ['object:bounds-changed',
'object:state-changed:defunct',
'object:property-change:accessible-parent']
self._parentsOfDefunctDescendants = []
self._suspendableEvents: List[str] = ['object:children-changed:add',
'object:children-changed:remove',
'object:state-changed:sensitive',
'object:state-changed:showing',
'object:text-changed:delete']
self._eventsTriggeringSuspension: List[Any] = [] # List of events
self._ignoredEvents: List[str] = ['object:bounds-changed',
'object:state-changed:defunct',
'object:property-change:accessible-parent']
self._parentsOfDefunctDescendants: List[Any] = [] # List[Atspi.Accessible]
cthulhu_state.device = None
self._keyHandlingActive = False
self._inputEventManager = None
self._keyHandlingActive: bool = False
self._inputEventManager: Optional[Any] = None # Optional[InputEventManager]
debug.printMessage(debug.LEVEL_INFO, 'Event manager initialized', True)
def activate(self):
def activate(self) -> None:
"""Called when this event manager is activated."""
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Activating', True)
@@ -101,7 +102,7 @@ class EventManager:
GLib.idle_add(self._sync_focus_on_startup)
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Activated', True)
def _sync_focus_on_startup(self):
def _sync_focus_on_startup(self) -> bool:
"""Initialize active window and focus when startup missed focus events."""
focus = cthulhu_state.locusOfFocus
@@ -127,7 +128,7 @@ class EventManager:
return False
def _activateKeyHandling(self):
def _activateKeyHandling(self) -> None:
"""Activates keyboard handling using InputEventManager with Atspi.Device."""
if self._keyHandlingActive:
@@ -140,7 +141,7 @@ class EventManager:
self._keyHandlingActive = True
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Keyboard handling activated', True)
def _deactivateKeyHandling(self):
def _deactivateKeyHandling(self) -> None:
"""Deactivates keyboard handling."""
if not self._keyHandlingActive:
@@ -153,7 +154,7 @@ class EventManager:
self._keyHandlingActive = False
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Keyboard handling deactivated', True)
def deactivate(self):
def deactivate(self) -> None:
"""Called when this event manager is deactivated."""
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Deactivating', True)
@@ -163,23 +164,23 @@ class EventManager:
self._deactivateKeyHandling()
debug.printMessage(debug.LEVEL_INFO, 'EVENT MANAGER: Deactivated', True)
def ignoreEventTypes(self, eventTypeList):
def ignoreEventTypes(self, eventTypeList: List[str]) -> None:
for eventType in eventTypeList:
if eventType not in self._ignoredEvents:
self._ignoredEvents.append(eventType)
def unignoreEventTypes(self, eventTypeList):
def unignoreEventTypes(self, eventTypeList: List[str]) -> None:
for eventType in eventTypeList:
if eventType in self._ignoredEvents:
self._ignoredEvents.remove(eventType)
def _isDuplicateEvent(self, event):
def _isDuplicateEvent(self, event: Any) -> bool: # event: Atspi.Event
"""Returns True if this event is already in the event queue."""
if self._inFlood() and self._prioritizeDuringFlood(event):
return False
def isSame(x):
def isSame(x: Any) -> bool:
return x.type == event.type \
and x.source == event.source \
and x.detail1 == event.detail1 \
@@ -192,7 +193,7 @@ class EventManager:
return False
def _getAppCmdline(self, app):
def _getAppCmdline(self, app: Any) -> str: # app: Atspi.Accessible
pid = AXObject.get_process_id(app)
if pid == -1:
return ""
@@ -203,7 +204,7 @@ class EventManager:
self._cmdlineCache[pid] = cmdline
return cmdline
def _isSteamApp(self, app):
def _isSteamApp(self, app: Any) -> bool: # app: Atspi.Accessible
name = AXObject.get_name(app)
if not name:
nameLower = ""
@@ -216,7 +217,7 @@ class EventManager:
cmdline = self._getAppCmdline(app)
return "steamwebhelper" in cmdline
def _isSteamNotificationEvent(self, event):
def _isSteamNotificationEvent(self, event: Any) -> bool: # event: Atspi.Event
for obj in (event.any_data, event.source):
if not isinstance(obj, Atspi.Accessible):
continue
@@ -229,7 +230,7 @@ class EventManager:
return False
def _ignore(self, event):
def _ignore(self, event: Any) -> bool: # event: Atspi.Event
"""Returns True if this event should be ignored."""
app = AXObject.get_application(event.source)
@@ -701,7 +702,7 @@ class EventManager:
return rerun
def registerListener(self, eventType):
def registerListener(self, eventType: str) -> None:
"""Tells this module to listen for the given event type.
Arguments:
@@ -717,7 +718,7 @@ class EventManager:
self._listener.register(eventType)
self._scriptListenerCounts[eventType] = 1
def deregisterListener(self, eventType):
def deregisterListener(self, eventType: str) -> None:
"""Tells this module to stop listening for the given event type.
Arguments:
@@ -735,7 +736,7 @@ class EventManager:
self._listener.deregister(eventType)
del self._scriptListenerCounts[eventType]
def registerScriptListeners(self, script):
def registerScriptListeners(self, script: Any) -> None: # script: Script
"""Tells the event manager to start listening for all the event types
of interest to the script.
@@ -749,7 +750,7 @@ class EventManager:
for eventType in script.listeners.keys():
self.registerListener(eventType)
def deregisterScriptListeners(self, script):
def deregisterScriptListeners(self, script: Any) -> None: # script: Script
"""Tells the event manager to stop listening for all the event types
of interest to the script.
@@ -797,7 +798,7 @@ class EventManager:
debug.printMessage(debug.eventDebugLevel, msg, False)
@staticmethod
def _get_scriptForEvent(event):
def _get_scriptForEvent(event: Any) -> Optional[Any]: # Returns Optional[Script]
"""Returns the script associated with event."""
if event.type.startswith("mouse:"):
@@ -835,7 +836,7 @@ class EventManager:
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return script
def _isActivatableEvent(self, event, script=None):
def _isActivatableEvent(self, event: Any, script: Optional[Any] = None) -> Tuple[bool, str]:
"""Determines if the event is one which should cause us to
change which script is currently active.
@@ -901,7 +902,7 @@ class EventManager:
return False, "No reason found to activate a different script."
def _eventSourceIsDead(self, event):
def _eventSourceIsDead(self, event: Any) -> bool: # event: Atspi.Event
if AXObject.is_dead(event.source):
tokens = ["EVENT MANAGER: source of", event.type, "is dead"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
@@ -909,7 +910,7 @@ class EventManager:
return False
def _ignoreDuringDeluge(self, event):
def _ignoreDuringDeluge(self, event: Any) -> bool: # event: Atspi.Event
"""Returns true if this event should be ignored during a deluge."""
if self._eventSourceIsDead(event):
@@ -936,7 +937,7 @@ class EventManager:
return event.source != cthulhu_state.locusOfFocus
def _inDeluge(self):
def _inDeluge(self) -> bool:
size = self._eventQueue.qsize()
if size > 100:
msg = f"EVENT MANAGER: DELUGE! Queue size is {size}"
@@ -945,7 +946,7 @@ class EventManager:
return False
def _processDuringFlood(self, event):
def _processDuringFlood(self, event: Any) -> bool: # event: Atspi.Event
"""Returns true if this event should be processed during a flood."""
if self._eventSourceIsDead(event):
@@ -972,7 +973,7 @@ class EventManager:
return event.source == cthulhu_state.locusOfFocus
def _prioritizeDuringFlood(self, event):
def _prioritizeDuringFlood(self, event: Any) -> bool: # event: Atspi.Event
"""Returns true if this event should be prioritized during a flood."""
if event.type.startswith("object:state-changed:focused"):
@@ -1001,7 +1002,7 @@ class EventManager:
return False
def _pruneEventsDuringFlood(self):
def _pruneEventsDuringFlood(self) -> None:
"""Gets rid of events we don't care about during a flood."""
oldSize = self._eventQueue.qsize()
@@ -1024,7 +1025,7 @@ class EventManager:
msg = f"EVENT MANAGER: {oldSize - newSize} events pruned. New size: {newSize}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _inFlood(self):
def _inFlood(self) -> bool:
size = self._eventQueue.qsize()
if size > 50:
msg = f"EVENT MANAGER: FLOOD? Queue size is {size}"
@@ -1132,7 +1133,7 @@ class EventManager:
msg = f"EVENT MANAGER: {key}: {value}"
debug.printMessage(debug.LEVEL_INFO, msg, True)
def processBrailleEvent(self, brailleEvent):
def processBrailleEvent(self, brailleEvent: Any) -> bool: # brailleEvent: BrailleEvent
"""Called whenever a cursor key is pressed on the Braille display.
Arguments:
@@ -1148,9 +1149,9 @@ class EventManager:
else:
return False
_manager = None
_manager: Optional[EventManager] = None
def getManager():
def getManager() -> EventManager:
global _manager
if _manager is None:
_manager = cthulhu.cthulhuApp.eventManager
+11 -10
View File
@@ -33,6 +33,7 @@ __copyright__ = "Copyright (c) 2005-2008 Sun Microsystems Inc." \
__license__ = "LGPL"
import gi
from typing import Optional, Dict, Callable, Any
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
@@ -55,21 +56,21 @@ _settingsManager = None # Removed - use cthulhu.cthulhuApp.settingsManager
class FlatReviewPresenter:
"""Provides access to on-screen objects via flat-review."""
def __init__(self):
self._context = None
self._current_contents = ""
self._restrict = cthulhu.cthulhuApp.settingsManager.getSetting("flatReviewIsRestricted")
self._handlers = self._setup_handlers()
self._desktop_bindings = self._setup_desktop_bindings()
self._laptop_bindings = self._setup_laptop_bindings()
self._gui = None
def __init__(self) -> None:
self._context: Optional[flat_review.Context] = None
self._current_contents: str = ""
self._restrict: bool = cthulhu.cthulhuApp.settingsManager.getSetting("flatReviewIsRestricted")
self._handlers: Dict[str, Callable] = self._setup_handlers()
self._desktop_bindings: keybindings.KeyBindings = self._setup_desktop_bindings()
self._laptop_bindings: keybindings.KeyBindings = self._setup_laptop_bindings()
self._gui: Optional[Any] = None # Optional[Gtk.Window]
def is_active(self):
def is_active(self) -> bool:
"""Returns True if the flat review presenter is active."""
return self._context is not None
def get_or_create_context(self, script=None):
def get_or_create_context(self, script: Optional[Any] = None) -> Optional[flat_review.Context]:
"""Returns the flat review context, creating one if necessary."""
# TODO - JD: Scripts should not be able to interact with the
+2 -2
View File
@@ -59,10 +59,10 @@ def _get_ax_utilities():
from .ax_utilities import AXUtilities
return AXUtilities
def _log(message, reason=None, timestamp=True, stack=False):
def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log(debug.LEVEL_INFO, "FOCUS MANAGER", message, reason, timestamp, stack)
def _log_tokens(tokens, reason=None, timestamp=True, stack=False):
def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log_tokens(debug.LEVEL_INFO, "FOCUS MANAGER", tokens, reason, timestamp, stack)
if TYPE_CHECKING:
+20 -19
View File
@@ -39,6 +39,7 @@ from gi.repository import Atspi
import math
import time
from typing import Optional, Any
from gi.repository import Gdk
from gi.repository import GLib
@@ -54,26 +55,26 @@ from . import settings
from .ax_object import AXObject
from .ax_utilities import AXUtilities
KEYBOARD_EVENT = "keyboard"
BRAILLE_EVENT = "braille"
MOUSE_BUTTON_EVENT = "mouse:button"
REMOTE_CONTROLLER_EVENT = "remote controller"
KEYBOARD_EVENT: str = "keyboard"
BRAILLE_EVENT: str = "braille"
MOUSE_BUTTON_EVENT: str = "mouse:button"
REMOTE_CONTROLLER_EVENT: str = "remote controller"
class InputEvent:
def __init__(self, eventType):
def __init__(self, eventType: str) -> None:
"""Creates a new KEYBOARD_EVENT, BRAILLE_EVENT, or MOUSE_BUTTON_EVENT."""
self.type = eventType
self.time = time.time()
self._clickCount = 0
self.type: str = eventType
self.time: float = time.time()
self._clickCount: int = 0
def get_click_count(self):
def get_click_count(self) -> int:
"""Return the count of the number of clicks a user has made."""
return self._clickCount
def set_click_count(self, count=None):
def set_click_count(self, count: Optional[int] = None) -> None:
"""Updates the count of the number of clicks a user has made."""
if count is None:
@@ -82,21 +83,21 @@ class InputEvent:
self._clickCount = count
class KeyboardEvent(InputEvent):
stickyKeys = False
stickyKeys: bool = False
duplicateCount = 0
cthulhuModifierPressed = False
duplicateCount: int = 0
cthulhuModifierPressed: bool = False
# Whether last press of the Cthulhu modifier was alone
lastCthulhuModifierAlone = False
lastCthulhuModifierAloneTime = None
lastCthulhuModifierAlone: bool = False
lastCthulhuModifierAloneTime: Optional[float] = None
# Whether the current press of the Cthulhu modifier is alone
currentCthulhuModifierAlone = False
currentCthulhuModifierAloneTime = None
currentCthulhuModifierAlone: bool = False
currentCthulhuModifierAloneTime: Optional[float] = None
# When the second cthulhu press happened
secondCthulhuModifierTime = None
secondCthulhuModifierTime: Optional[float] = None
# Sticky modifiers state, to be applied to the next keyboard event
cthulhuStickyModifiers = 0
cthulhuStickyModifiers: int = 0
TYPE_UNKNOWN = "unknown"
TYPE_PRINTABLE = "printable"
+13 -12
View File
@@ -33,6 +33,7 @@ __copyright__ = "Copyright (c) 2023 Igalia, S.L." \
__license__ = "LGPL"
import time
from typing import Optional, Dict, List, Any, Callable
import gi
gi.require_version('Gtk', '3.0')
@@ -52,30 +53,30 @@ from . import cthulhu_state
class NotificationPresenter:
"""Provides access to the notification history."""
def __init__(self):
self._gui = None
self._handlers = self._setup_handlers()
self._bindings = self._setup_bindings()
self._max_size = 55
def __init__(self) -> None:
self._gui: Optional[Any] = None # Optional[Gtk.Window]
self._handlers: Dict[str, Callable] = self._setup_handlers()
self._bindings: keybindings.KeyBindings = self._setup_bindings()
self._max_size: int = 55
# The list is arranged with the most recent message being at the end of
# the list. The current index is relative to, and used directly, with the
# python list, i.e. self._notifications[-3] would return the third-to-last
# notification message.
self._notifications = []
self._current_index = -1
self._notifications: List[List[Any]] = [] # List of [message: str, time: float]
self._current_index: int = -1
def get_bindings(self):
def get_bindings(self) -> keybindings.KeyBindings:
"""Returns the notification-presenter keybindings."""
return self._bindings
def get_handlers(self):
def get_handlers(self) -> Dict[str, Callable]:
"""Returns the notification-presenter handlers."""
return self._handlers
def save_notification(self, message):
def save_notification(self, message: str) -> None:
"""Adds message to the list of notification messages."""
tokens = ["NOTIFICATION PRESENTER: Adding '", message, "'."]
@@ -84,7 +85,7 @@ class NotificationPresenter:
self._notifications = self._notifications[to_remove:]
self._notifications.append([message, time.time()])
def clear_list(self):
def clear_list(self) -> None:
"""Clears the notifications list."""
msg = "NOTIFICATION PRESENTER: Clearing list."
@@ -92,7 +93,7 @@ class NotificationPresenter:
self._notifications = []
self._current_index = -1
def _setup_handlers(self):
def _setup_handlers(self) -> Dict[str, Callable]:
"""Sets up and returns the notification-presenter input event handlers."""
handlers = {}
+67 -57
View File
@@ -18,6 +18,7 @@ import re
import shutil
import subprocess
from enum import IntEnum
from typing import Optional, Dict, List, Any
import pluggy
from . import dbus_service
@@ -25,23 +26,23 @@ from . import input_event_manager
from . import keybindings # Added import
# Set to True for more detailed plugin loading debug info
PLUGIN_DEBUG = True
PLUGIN_DEBUG: bool = True
logger = logging.getLogger(__name__)
logger: logging.Logger = logging.getLogger(__name__)
if PLUGIN_DEBUG:
logger.setLevel(logging.DEBUG)
LEGACY_PLUGIN_NAME_ALIASES = {
LEGACY_PLUGIN_NAME_ALIASES: Dict[str, str] = {
"ocrdesktop": "OCR",
}
LEGACY_PLUGIN_DIR_ALIASES = {
LEGACY_PLUGIN_DIR_ALIASES: Dict[str, str] = {
"OCRDesktop": "OCR",
}
_manager = None
_manager: Optional['PluginSystemManager'] = None
def getManager():
def getManager() -> Optional['PluginSystemManager']:
"""Return the shared PluginSystemManager instance."""
return _manager
@@ -50,7 +51,7 @@ class PluginType(IntEnum):
SYSTEM = 1
USER = 2
def get_root_dir(self):
def get_root_dir(self) -> str:
"""Returns the directory where this type of plugins can be found."""
if self.value == PluginType.SYSTEM:
current_file = inspect.getfile(inspect.currentframe())
@@ -63,75 +64,84 @@ class PluginType(IntEnum):
class PluginInfo:
"""Information about a plugin."""
def __init__(self, name, module_name, module_dir, metadata=None, canonical_name=None, source_id=None, origin=None):
self.name = name
self.module_name = module_name
self.module_dir = module_dir
self.metadata = metadata or {}
self.canonical_name = canonical_name or module_name
self.source_id = source_id or "unknown"
self.origin = origin or "unknown"
self.preferred_alias = False
self.builtin = False
self.hidden = False
self.module = None
self.instance = None
self.loaded = False
def __init__(
self,
name: str,
module_name: str,
module_dir: str,
metadata: Optional[Dict[str, Any]] = None,
canonical_name: Optional[str] = None,
source_id: Optional[str] = None,
origin: Optional[str] = None
) -> None:
self.name: str = name
self.module_name: str = module_name
self.module_dir: str = module_dir
self.metadata: Dict[str, Any] = metadata or {}
self.canonical_name: str = canonical_name or module_name
self.source_id: str = source_id or "unknown"
self.origin: str = origin or "unknown"
self.preferred_alias: bool = False
self.builtin: bool = False
self.hidden: bool = False
self.module: Optional[Any] = None
self.instance: Optional[Any] = None
self.loaded: bool = False
def get_module_name(self):
def get_module_name(self) -> str:
return self.module_name
def get_canonical_name(self):
def get_canonical_name(self) -> str:
return self.canonical_name
def get_name(self):
def get_name(self) -> str:
return self.metadata.get('name', self.name)
def get_version(self):
def get_version(self) -> str:
return self.metadata.get('version', '0.0.0')
def get_description(self):
def get_description(self) -> str:
return self.metadata.get('description', '')
def get_source_id(self):
def get_source_id(self) -> str:
return self.source_id
def get_origin(self):
def get_origin(self) -> str:
return self.origin
def get_source_label(self):
def get_source_label(self) -> str:
if self.origin == "sources":
return self.source_id
return self.origin
def get_module_dir(self):
def get_module_dir(self) -> str:
return self.module_dir
class PluginSystemManager:
"""Cthulhu Plugin Manager using pluggy."""
def __init__(self, app):
def __init__(self, app: Any) -> None: # app is CthulhuApp instance
global _manager
self.app = app
self.app: Any = app
logger.info("Initializing PluginSystemManager")
_manager = self
# Initialize plugin manager
logger.info("Setting up plugin manager")
self.plugin_manager = pluggy.PluginManager("cthulhu")
self.plugin_manager: pluggy.PluginManager = pluggy.PluginManager("cthulhu")
# Define hook specifications
hook_spec = pluggy.HookspecMarker("cthulhu")
class CthulhuHookSpecs:
@hook_spec
def activate(self, plugin=None):
def activate(self, plugin: Optional[Any] = None) -> None:
"""Called when the plugin is activated."""
pass
@hook_spec
def deactivate(self, plugin=None):
def deactivate(self, plugin: Optional[Any] = None) -> None:
"""Called when the plugin is deactivated."""
pass
@@ -139,13 +149,13 @@ class PluginSystemManager:
self.plugin_manager.add_hookspecs(CthulhuHookSpecs)
# Plugin storage
self._plugins = {} # module_name -> PluginInfo
self._plugin_name_index = {} # canonical_name -> [module_name]
self._active_plugins = []
self._plugin_keybindings = {} # plugin_name -> [KeyBinding]
self._global_keybindings = keybindings.KeyBindings()
self._global_bindings = []
self._last_active_script = None
self._plugins: Dict[str, PluginInfo] = {} # module_name -> PluginInfo
self._plugin_name_index: Dict[str, List[str]] = {} # canonical_name -> [module_name]
self._active_plugins: List[str] = []
self._plugin_keybindings: Dict[str, List[Any]] = {} # plugin_name -> [KeyBinding]
self._global_keybindings: keybindings.KeyBindings = keybindings.KeyBindings()
self._global_bindings: List[Any] = [] # List[KeyBinding]
self._last_active_script: Optional[Any] = None # Optional[Script]
# Create plugin directories
self._setup_plugin_dirs()
@@ -153,12 +163,12 @@ class PluginSystemManager:
# Log available plugins directory paths
logger.info(f"System plugins directory: {PluginType.SYSTEM.get_root_dir()}")
logger.info(f"User plugins directory: {PluginType.USER.get_root_dir()}")
# Connect to active-script-changed signal
self.app.getSignalManager().connectSignal(
'active-script-changed', self._on_active_script_changed, 'default')
def add_keybinding(self, plugin_name, binding, global_binding=False):
def add_keybinding(self, plugin_name: str, binding: Any, global_binding: bool = False) -> None: # binding: KeyBinding
"""Add a keybinding associated with a specific plugin."""
if plugin_name not in self._plugin_keybindings:
self._plugin_keybindings[plugin_name] = []
@@ -174,14 +184,14 @@ class PluginSystemManager:
else:
logger.warning(f"Failed to create global key grab for {binding.keysymstring}")
def activate_keybindings_for_plugin(self, plugin_name):
def activate_keybindings_for_plugin(self, plugin_name: str) -> None:
"""Activates keybindings for a single plugin with the active script."""
plugin_info = self._plugins.get(plugin_name)
if not plugin_info or not plugin_info.loaded:
return
self._activate_plugin_keybindings(plugin_info)
def remove_keybinding(self, plugin_name, binding):
def remove_keybinding(self, plugin_name: str, binding: Any) -> None: # binding: KeyBinding
"""Remove a keybinding associated with a specific plugin."""
if plugin_name in self._plugin_keybindings:
if binding in self._plugin_keybindings[plugin_name]:
@@ -204,7 +214,7 @@ class PluginSystemManager:
active_script.getKeyBindings().remove(binding)
input_event_manager.get_manager().remove_grabs_for_keybinding(binding)
def _activate_plugin_keybindings(self, plugin_info):
def _activate_plugin_keybindings(self, plugin_info: PluginInfo) -> None:
"""Activates all keybindings for a given plugin with the active script."""
from . import cthulhu_state # Import here to avoid circular dependency
if not cthulhu_state.activeScript:
@@ -227,7 +237,7 @@ class PluginSystemManager:
logger.warning(f"Failed to create key grab for {binding.keysymstring} for plugin {plugin_name}")
logger.debug(f"Activated keybinding '{binding.asString()}' for plugin '{plugin_name}'")
def _deactivate_plugin_keybindings(self, plugin_info):
def _deactivate_plugin_keybindings(self, plugin_info: PluginInfo) -> None:
"""Deactivates all keybindings for a given plugin from the active script."""
from . import cthulhu_state # Import here to avoid circular dependency
if not cthulhu_state.activeScript:
@@ -252,16 +262,16 @@ class PluginSystemManager:
input_manager.remove_grabs_for_keybinding(binding)
logger.debug(f"Deactivated keybinding '{binding.asString()}' for plugin '{plugin_name}'")
def refresh_active_script_keybindings(self):
def refresh_active_script_keybindings(self) -> None:
"""Public method to refresh keybindings for the currently active script."""
from . import cthulhu_state
if cthulhu_state.activeScript:
self._on_active_script_changed(self.app, cthulhu_state.activeScript)
def get_global_keybindings(self):
def get_global_keybindings(self) -> keybindings.KeyBindings:
return self._global_keybindings
def _on_active_script_changed(self, app, new_script):
def _on_active_script_changed(self, app: Any, new_script: Optional[Any]) -> None:
"""Called when the active script changes. Re-applies keybindings for all active plugins."""
logger.info(f"Active script changed to {new_script.name if new_script else 'None'}. Re-applying plugin keybindings.")
@@ -287,28 +297,28 @@ class PluginSystemManager:
if plugin_info and plugin_info.loaded:
self._activate_plugin_keybindings(plugin_info)
def _setup_plugin_dirs(self):
def _setup_plugin_dirs(self) -> None:
"""Ensure plugin directories exist."""
os.makedirs(PluginType.SYSTEM.get_root_dir(), exist_ok=True)
os.makedirs(PluginType.USER.get_root_dir(), exist_ok=True)
os.makedirs(self._get_plugin_sources_root(), exist_ok=True)
def _get_plugin_sources_root(self):
def _get_plugin_sources_root(self) -> str:
return os.path.expanduser('~/.local/share/cthulhu/plugin-sources')
def _get_additional_plugin_dirs(self):
def _get_additional_plugin_dirs(self) -> List[str]:
return [os.path.expanduser('~/.local/share/plugins')]
def _path_under_root(self, path, root):
def _path_under_root(self, path: str, root: str) -> bool:
try:
return os.path.commonpath([os.path.abspath(path), os.path.abspath(root)]) == os.path.abspath(root)
except ValueError:
return False
def _sanitize_source_id(self, source_id):
def _sanitize_source_id(self, source_id: str) -> str:
return re.sub(r'[^a-zA-Z0-9._-]+', '-', source_id).strip('-') or "source"
def _get_origin_info(self, plugin_dir):
def _get_origin_info(self, plugin_dir: str) -> tuple[str, str]:
system_root = PluginType.SYSTEM.get_root_dir()
user_root = PluginType.USER.get_root_dir()
local_root = os.path.expanduser('~/.local/share/plugins')
+44 -38
View File
@@ -30,41 +30,46 @@ __copyright__ = "Copyright (c) 2011. Cthulhu Team."
__license__ = "LGPL"
import importlib
from typing import Optional, Dict, Any
from . import debug
from . import cthulhu_state
from .ax_object import AXObject
from .scripts import apps, toolkits
# Forward references to avoid circular imports
# Script is defined in script.py
# Atspi.Accessible comes from AT-SPI
def _get_ax_utilities():
# Avoid circular import with ax_utilities -> ax_utilities_event -> focus_manager -> braille -> settings_manager -> script_manager.
from .ax_utilities import AXUtilities
return AXUtilities
def _log(message, reason=None, timestamp=True, stack=False):
def _log(message: str, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log(debug.LEVEL_INFO, "SCRIPT MANAGER", message, reason, timestamp, stack)
def _log_tokens(tokens, reason=None, timestamp=True, stack=False):
def _log_tokens(tokens: list, reason: Optional[str] = None, timestamp: bool = True, stack: bool = False) -> None:
debug.print_log_tokens(debug.LEVEL_INFO, "SCRIPT MANAGER", tokens, reason, timestamp, stack)
class ScriptManager:
def __init__(self, app): # Added app argument
def __init__(self, app: Any) -> None: # app is the CthulhuApp instance
_log("Initializing")
self.app = app # Store app instance
self.appScripts = {}
self.toolkitScripts = {}
self.customScripts = {}
self._sleepModeScripts = {}
self._appModules = apps.__all__
self._toolkitModules = toolkits.__all__
self._defaultScript = None
self._scriptPackages = \
self.app: Any = app # Store app instance
self.appScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script]
self.toolkitScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]]
self.customScripts: Dict[Any, Dict[str, Any]] = {} # Dict[Atspi.Accessible, Dict[str, Script]]
self._sleepModeScripts: Dict[Any, Any] = {} # Dict[Atspi.Accessible, Script]
self._appModules: list = apps.__all__
self._toolkitModules: list = toolkits.__all__
self._defaultScript: Optional[Any] = None # Optional[Script]
self._scriptPackages: list[str] = \
["cthulhu-scripts",
"cthulhu.scripts",
"cthulhu.scripts.apps",
"cthulhu.scripts.toolkits"]
self._appNames = \
self._appNames: Dict[str, str] = \
{'Icedove': 'Thunderbird',
'Nereid': 'Banshee',
'gnome-calculator': 'gcalctool',
@@ -75,14 +80,14 @@ class ScriptManager:
'metacity': 'switcher',
'pluma': 'gedit',
}
self._toolkitNames = \
self._toolkitNames: Dict[str, str] = \
{'WebKitGTK': 'WebKitGtk', 'GTK': 'gtk'}
self.set_active_script(None, "lifecycle: init")
self._active = False
self._active: bool = False
_log("Initialized")
def activate(self):
def activate(self) -> None:
"""Called when this script manager is activated."""
_log("Activating")
@@ -92,7 +97,7 @@ class ScriptManager:
self._active = True
_log("Activated")
def deactivate(self):
def deactivate(self) -> None:
"""Called when this script manager is deactivated."""
_log("Deactivating")
@@ -106,7 +111,7 @@ class ScriptManager:
self._active = False
_log("Deactivated")
def get_module_name(self, app):
def get_module_name(self, app: Optional[Any]) -> Optional[str]: # app: Optional[Atspi.Accessible]
"""Returns the module name of the script to use for application app."""
if app is None:
@@ -143,19 +148,19 @@ class ScriptManager:
_log_tokens(["Mapped", app, "to", name])
return name
def _toolkit_for_object(self, obj):
def _toolkit_for_object(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible]
"""Returns the name of the toolkit associated with obj."""
name = AXObject.get_attribute(obj, 'toolkit')
return self._toolkitNames.get(name, name)
def _script_for_role(self, obj):
def _script_for_role(self, obj: Optional[Any]) -> str: # obj: Optional[Atspi.Accessible]
if _get_ax_utilities().is_terminal(obj):
return 'terminal'
return ''
def _new_named_script(self, app, name):
def _new_named_script(self, app: Optional[Any], name: Optional[str]) -> Optional[Any]: # Returns Optional[Script]
"""Attempts to locate and load the named module. If successful, returns
a script based on this module."""
@@ -184,7 +189,7 @@ class ScriptManager:
return script
def _create_script(self, app, obj=None):
def _create_script(self, app: Optional[Any], obj: Optional[Any] = None) -> Any: # Returns Script
"""For the given application, create a new script instance."""
moduleName = self.get_module_name(app)
@@ -207,7 +212,7 @@ class ScriptManager:
return script
def get_default_script(self, app=None):
def get_default_script(self, app: Optional[Any] = None) -> Any: # Returns Script
if not app and self._defaultScript:
return self._defaultScript
@@ -219,7 +224,7 @@ class ScriptManager:
return script
def sanity_check_script(self, script):
def sanity_check_script(self, script: Any) -> Any: # Returns Script
if not self._active:
return script
@@ -233,7 +238,7 @@ class ScriptManager:
_log_tokens(["Failed to get a replacement script for", script.app], "replacement-missing")
return script
def get_script_for_mouse_button_event(self, event):
def get_script_for_mouse_button_event(self, event: Any) -> Any: # Returns Script
isActive = _get_ax_utilities().is_active(cthulhu_state.activeWindow)
_log_tokens([cthulhu_state.activeWindow, "is active:", isActive])
@@ -251,10 +256,10 @@ class ScriptManager:
return self.get_script(AXObject.get_application(activeWindow), activeWindow)
def get_active_script(self):
def get_active_script(self) -> Optional[Any]: # Returns Optional[Script]
return cthulhu_state.activeScript
def get_script(self, app, obj=None, sanity_check=False):
def get_script(self, app: Optional[Any], obj: Optional[Any] = None, sanity_check: bool = False) -> Any: # Returns Script
"""Get a script for an app (and make it if necessary). This is used
instead of a simple calls to Script's constructor.
@@ -312,19 +317,19 @@ class ScriptManager:
return appScript
def get_or_create_sleep_mode_script(self, app):
def get_or_create_sleep_mode_script(self, app: Any) -> Any: # Returns Script
"""Gets or creates the sleep mode script."""
script = self._sleepModeScripts.get(app)
if script is not None:
return script
# Import sleepmode dynamically to avoid circular imports
from .scripts import sleepmode
script = sleepmode.Script(app)
self._sleepModeScripts[app] = script
return script
def set_active_script(self, newScript, reason=None):
def set_active_script(self, newScript: Optional[Any], reason: Optional[str] = None) -> None: # newScript: Optional[Script]
"""Set the new active script.
Arguments:
@@ -343,20 +348,20 @@ class ScriptManager:
return
newScript.activate()
# Emit signal that active script has changed, so PluginSystemManager can update keybindings
from . import cthulhu
cthulhu.cthulhuApp.getSignalManager().emitSignal('active-script-changed', newScript)
_log_tokens(["Setting active script to", newScript], reason)
self._log_active_state(reason)
def activate_script_for_context(self, app, obj, reason=None):
def activate_script_for_context(self, app: Optional[Any], obj: Optional[Any], reason: Optional[str] = None) -> Any: # Returns Script
script = self.get_script(app, obj)
self.set_active_script(script, reason)
return script
def _log_active_state(self, reason=None):
def _log_active_state(self, reason: Optional[str] = None) -> None:
_log_tokens(
["Active state:", "window", cthulhu_state.activeWindow,
"focus", cthulhu_state.locusOfFocus,
@@ -364,7 +369,7 @@ class ScriptManager:
reason
)
def _get_script_for_app_replicant(self, app):
def _get_script_for_app_replicant(self, app: Any) -> Optional[Any]: # Returns Optional[Script]
if not self._active:
return None
@@ -384,7 +389,7 @@ class ScriptManager:
return None
def reclaim_scripts(self):
def reclaim_scripts(self) -> None:
"""Compares the list of known scripts to the list of known apps,
deleting any scripts as necessary.
"""
@@ -432,8 +437,9 @@ class ScriptManager:
del app
_manager = None
def get_manager():
_manager: Optional[ScriptManager] = None
def get_manager() -> ScriptManager:
"""Returns the Script Manager"""
global _manager
File diff suppressed because it is too large Load Diff
@@ -36,6 +36,7 @@ from gi.repository import Atspi
import re
from cthulhu import debug
from cthulhu import cthulhu
from cthulhu import keybindings
from cthulhu import cthulhu_state
from cthulhu import script_utilities
+28 -10
View File
@@ -25,19 +25,27 @@
import gi
from gi.repository import GObject
from typing import Optional, Any, Callable, Tuple
from cthulhu import resource_manager
class SignalManager():
def __init__(self, app):
self.app = app
self.resourceManager = self.app.getResourceManager()
class SignalManager:
def __init__(self, app: Any) -> None: # app is CthulhuApp instance
self.app: Any = app
self.resourceManager: Any = self.app.getResourceManager() # ResourceManager
def registerSignal(self, signalName, signalFlag = GObject.SignalFlags.RUN_LAST, closure = GObject.TYPE_NONE, accumulator=(), contextName = None):
def registerSignal(
self,
signalName: str,
signalFlag: GObject.SignalFlags = GObject.SignalFlags.RUN_LAST,
closure: Any = GObject.TYPE_NONE,
accumulator: Tuple = (),
contextName: Optional[str] = None
) -> bool:
# register signal
ok = False
if not self.signalExist(signalName):
GObject.signal_new(signalName, self.app, signalFlag, closure,accumulator)
GObject.signal_new(signalName, self.app, signalFlag, closure, accumulator)
ok = True
resourceContext = self.resourceManager.getResourceContext(contextName)
if resourceContext:
@@ -45,9 +53,17 @@ class SignalManager():
resourceContext.addSignal(signalName, resourceEntry)
return ok
def signalExist(self, signalName):
def signalExist(self, signalName: str) -> bool:
return GObject.signal_lookup(signalName, self.app) != 0
def connectSignal(self, signalName, function, profile, param = None, contextName = None):
def connectSignal(
self,
signalName: str,
function: Callable,
profile: Any,
param: Optional[Any] = None,
contextName: Optional[str] = None
) -> Optional[int]:
signalID = None
try:
if self.signalExist(signalName):
@@ -63,7 +79,8 @@ class SignalManager():
print(e)
return signalID
def disconnectSignalByFunction(self, function, contextName = None):
def disconnectSignalByFunction(self, function: Callable, contextName: Optional[str] = None) -> bool:
ok = False
try:
self.app.disconnect_by_func(function)
@@ -74,7 +91,8 @@ class SignalManager():
if resourceContext:
resourceContext.removeSubscriptionByFunction(function)
return ok
def emitSignal(self, signalName, *args):
def emitSignal(self, signalName: str, *args: Any) -> None:
# emit a signal with optional arguments
try:
self.app.emit(signalName, *args)
+15 -14
View File
@@ -33,12 +33,13 @@ __license__ = "LGPL"
import gi
from gi.repository import GLib
from typing import Optional, Any
try:
gi.require_version('Gst', '1.0')
from gi.repository import Gst
except Exception:
_gstreamerAvailable = False
_gstreamerAvailable: bool = False
else:
_gstreamerAvailable, args = Gst.init_check(None)
@@ -48,12 +49,12 @@ from .sound_generator import Icon, Tone
class Player:
"""Plays Icons and Tones."""
def __init__(self):
self._initialized = False
self._source = None
self._sink = None
self._player = None
self._pipeline = None
def __init__(self) -> None:
self._initialized: bool = False
self._source: Optional[Any] = None # Optional[Gst.Element]
self._sink: Optional[Any] = None # Optional[Gst.Element]
self._player: Optional[Any] = None # Optional[Gst.Element]
self._pipeline: Optional[Any] = None # Optional[Gst.Pipeline]
if not _gstreamerAvailable:
msg = 'SOUND ERROR: Gstreamer is not available'
@@ -62,7 +63,7 @@ class Player:
self.init()
def _onPlayerMessage(self, bus, message):
def _onPlayerMessage(self, bus: Any, message: Any) -> None: # bus: Gst.Bus, message: Gst.Message
if message.type == Gst.MessageType.EOS:
self._player.set_state(Gst.State.NULL)
elif message.type == Gst.MessageType.ERROR:
@@ -71,7 +72,7 @@ class Player:
msg = f'SOUND ERROR: {error}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _onPipelineMessage(self, bus, message):
def _onPipelineMessage(self, bus: Any, message: Any) -> None: # bus: Gst.Bus, message: Gst.Message
if message.type == Gst.MessageType.EOS:
self._pipeline.set_state(Gst.State.NULL)
elif message.type == Gst.MessageType.ERROR:
@@ -80,11 +81,11 @@ class Player:
msg = f'SOUND ERROR: {error}'
debug.printMessage(debug.LEVEL_INFO, msg, True)
def _onTimeout(self, element):
def _onTimeout(self, element: Any) -> bool: # element: Gst.Element
element.set_state(Gst.State.NULL)
return False
def _playIcon(self, icon, interrupt=True):
def _playIcon(self, icon: Icon, interrupt: bool = True) -> None:
"""Plays a sound icon, interrupting the current play first unless specified."""
if interrupt:
@@ -93,7 +94,7 @@ class Player:
self._player.set_property('uri', f'file://{icon.path}')
self._player.set_state(Gst.State.PLAYING)
def _playIconAndWait(self, icon, interrupt=True, timeout_seconds=10):
def _playIconAndWait(self, icon: Icon, interrupt: bool = True, timeout_seconds: Optional[int] = 10) -> bool:
"""Plays a sound icon and waits for completion."""
if interrupt:
@@ -123,7 +124,7 @@ class Player:
self._player.set_state(Gst.State.NULL)
return message is not None and message.type == Gst.MessageType.EOS
def _playTone(self, tone, interrupt=True):
def _playTone(self, tone: Tone, interrupt: bool = True) -> None:
"""Plays a tone, interrupting the current play first unless specified."""
if interrupt:
@@ -136,7 +137,7 @@ class Player:
duration = int(1000 * tone.duration)
GLib.timeout_add(duration, self._onTimeout, self._pipeline)
def init(self):
def init(self) -> None:
"""(Re)Initializes the Player."""
if self._initialized: