Compare commits

...

6 Commits

Author SHA1 Message Date
Storm Dragon
6463e03a1a Fixed some debugging that actually caused a bug. 2025-04-04 02:23:16 -04:00
Storm Dragon
cd76760d9f Try to get around plugins causing the screen reader to hang in infinite loop. 2025-04-04 02:15:26 -04:00
Storm Dragon
bb1ff5c579 Still fighting with plugins. 2025-04-03 22:30:51 -04:00
Storm Dragon
987be0eee2 Anbother attempt to not hang on bad plguins. 2025-04-03 22:18:21 -04:00
Storm Dragon
682d66e08f Fix to stop the screen reader from hanging when plugins fail. 2025-04-03 22:10:48 -04:00
Storm Dragon
df7f4c5e62 Fixed some errors with plugins. 2025-04-03 20:56:48 -04:00
12 changed files with 642 additions and 178 deletions

View File

@ -610,8 +610,44 @@ def loadUserSettings(script=None, inputEvent=None, skipReloadMessage=False):
_storeXmodmap(_cthulhuModifiers)
_createCthulhuXmodmap()
activePlugins = list(_settingsManager.getSetting('activePlugins'))
cthulhuApp.getPluginSystemManager().setActivePlugins(activePlugins)
# Make plugin loading more robust by handling potential exceptions
try:
activePlugins = list(_settingsManager.getSetting('activePlugins'))
# Use debug module for logging instead of the logger module
debug.printMessage(debug.LEVEL_INFO, f"CTHULHU: Loading active plugins: {activePlugins}", True)
# Setup a timeout for plugin activation to prevent hanging
import threading
import time
import logging
# Setup proper logging
log = logging.getLogger("cthulhu.plugins")
activation_completed = [False]
def activate_plugins_with_timeout():
try:
cthulhuApp.getPluginSystemManager().setActivePlugins(activePlugins)
activation_completed[0] = True
except Exception as e:
debug.printMessage(debug.LEVEL_WARNING, f"CTHULHU: Error activating plugins: {e}", True)
import traceback
debug.printMessage(debug.LEVEL_WARNING, traceback.format_exc(), True)
plugin_thread = threading.Thread(target=activate_plugins_with_timeout)
plugin_thread.daemon = True # Make thread daemonic so it doesn't block program exit
plugin_thread.start()
# Give plugins up to 30 seconds to load, then continue regardless
plugin_thread.join(30)
if not activation_completed[0]:
debug.printMessage(debug.LEVEL_WARNING, "CTHULHU: Plugin activation timed out - continuing without all plugins", True)
except Exception as e:
debug.printMessage(debug.LEVEL_WARNING, f"CTHULHU: Error setting up plugin activation: {e}", True)
import traceback
debug.printMessage(debug.LEVEL_WARNING, traceback.format_exc(), True)
_scriptManager.activate()
_eventManager.activate()

View File

@ -23,5 +23,5 @@
# Fork of Orca Screen Reader (GNOME)
# Original source: https://gitlab.gnome.org/GNOME/orca
version = "2025.04.03"
codeName = "testing"
version = "2025.04.04"
codeName = "plugins"

View File

@ -90,3 +90,9 @@ class Plugin:
contextName=self.module_name
)
return None
def connectSignal(self, signal_name, callback):
"""Connect to an application signal."""
if self.app and self.app.getSignalManager():
return self.app.getSignalManager().connectSignal(signal_name, callback)
return None

View File

@ -12,15 +12,23 @@ import os
import inspect
import importlib.util
import logging
import threading
import time
from enum import IntEnum
# Import pluggy if available
try:
import pluggy
PLUGGY_AVAILABLE = True
logging.getLogger(__name__).info("Pluggy loaded successfully")
except ImportError:
PLUGGY_AVAILABLE = False
logging.getLogger(__name__).info("Pluggy not available, plugins will be disabled")
logging.getLogger(__name__).warning("Pluggy not available (ImportError), plugins will be disabled")
except Exception as e:
PLUGGY_AVAILABLE = False
logging.getLogger(__name__).warning(f"Error loading pluggy: {str(e)}, plugins will be disabled")
import traceback
logging.getLogger(__name__).debug(traceback.format_exc())
# Set to True for more detailed plugin loading debug info
PLUGIN_DEBUG = True
@ -145,6 +153,8 @@ class PluginSystemManager:
self._plugins[name].loaded = True
self._plugins[name].instance = old_info.instance
self._plugins[name].module = old_info.module
logger.info(f"Rescanned plugins, found {len(self._plugins)} plugins")
def _scan_plugins_in_directory(self, directory):
"""Scan for plugins in a directory."""
@ -153,44 +163,49 @@ class PluginSystemManager:
return
logger.info(f"Scanning for plugins in directory: {directory}")
for item in os.listdir(directory):
plugin_dir = os.path.join(directory, item)
if not os.path.isdir(plugin_dir):
continue
try:
for item in os.listdir(directory):
plugin_dir = os.path.join(directory, item)
if not os.path.isdir(plugin_dir):
continue
# Check for the traditional structure first (plugin.py & plugin.info)
plugin_file = os.path.join(plugin_dir, "plugin.py")
metadata_file = os.path.join(plugin_dir, "plugin.info")
# Check for the traditional structure first (plugin.py & plugin.info)
plugin_file = os.path.join(plugin_dir, "plugin.py")
metadata_file = os.path.join(plugin_dir, "plugin.info")
# Fall back to [PluginName].py if plugin.py doesn't exist
if not os.path.isfile(plugin_file):
alternative_plugin_file = os.path.join(plugin_dir, f"{item}.py")
if os.path.isfile(alternative_plugin_file):
plugin_file = alternative_plugin_file
logger.info(f"Using alternative plugin file: {alternative_plugin_file}")
# Fall back to [PluginName].py if plugin.py doesn't exist
if not os.path.isfile(plugin_file):
alternative_plugin_file = os.path.join(plugin_dir, f"{item}.py")
if os.path.isfile(alternative_plugin_file):
plugin_file = alternative_plugin_file
logger.info(f"Using alternative plugin file: {alternative_plugin_file}")
# Check if we have any valid plugin file
if os.path.isfile(plugin_file):
# Extract plugin info
module_name = os.path.basename(plugin_dir)
logger.info(f"Found plugin: {module_name} in {plugin_dir}")
metadata = self._load_plugin_metadata(metadata_file)
# Check if we have any valid plugin file
if os.path.isfile(plugin_file):
# Extract plugin info
module_name = os.path.basename(plugin_dir)
logger.info(f"Found plugin: {module_name} in {plugin_dir}")
metadata = self._load_plugin_metadata(metadata_file)
plugin_info = PluginInfo(
metadata.get('name', module_name),
module_name,
plugin_dir,
metadata
)
plugin_info = PluginInfo(
metadata.get('name', module_name),
module_name,
plugin_dir,
metadata
)
# Check if it's a built-in or hidden plugin
plugin_info.builtin = metadata.get('builtin', 'false').lower() == 'true'
plugin_info.hidden = metadata.get('hidden', 'false').lower() == 'true'
# Check if it's a built-in or hidden plugin
plugin_info.builtin = metadata.get('builtin', 'false').lower() == 'true'
plugin_info.hidden = metadata.get('hidden', 'false').lower() == 'true'
logger.info(f"Adding plugin to registry: {module_name}")
self._plugins[module_name] = plugin_info
else:
logger.warning(f"No plugin file found in directory: {plugin_dir}")
logger.info(f"Adding plugin to registry: {module_name}")
self._plugins[module_name] = plugin_info
else:
logger.warning(f"No plugin file found in directory: {plugin_dir}")
except Exception as e:
logger.error(f"Error scanning directory {directory}: {e}")
import traceback
logger.error(traceback.format_exc())
def _load_plugin_metadata(self, metadata_file):
"""Load plugin metadata from a file."""
@ -217,27 +232,122 @@ class PluginSystemManager:
return self._active_plugins
def setActivePlugins(self, activePlugins):
"""Set active plugins and sync their state."""
"""Set active plugins and sync their state with robust error handling."""
logger.info(f"Setting active plugins: {activePlugins}")
# Set a timeout for the entire operation
operation_timeout = 60 # seconds
start_time = time.time()
# Make sure we have scanned for plugins first
# Make sure we have scanned for plugins first, with timeout
if not self._plugins:
logger.info("No plugins found, rescanning...")
self.rescanPlugins()
try:
# Use a separate thread with timeout for rescanning
rescan_success = [False]
def rescan_with_timeout():
try:
self.rescanPlugins()
rescan_success[0] = True
except Exception as e:
logger.error(f"Error in plugin rescan thread: {e}")
import traceback
logger.error(traceback.format_exc())
rescan_thread = threading.Thread(target=rescan_with_timeout)
rescan_thread.daemon = True
rescan_thread.start()
rescan_thread.join(10) # 10 second timeout for rescanning
if not rescan_success[0]:
logger.error("Plugin rescan timed out or failed, continuing with whatever plugins were found")
except Exception as e:
logger.error(f"Error setting up plugin rescan: {e}")
import traceback
logger.error(traceback.format_exc())
# Check if the operation has already timed out
if time.time() - start_time > operation_timeout:
logger.error("Plugin activation operation timed out during rescan, continuing with empty plugin list")
self._active_plugins = []
return
# Create a clean list of valid active plugins
try:
available_plugins = [p.get_module_name() for p in self.plugins]
valid_active_plugins = []
self._active_plugins = activePlugins
# Log active vs available plugins
available_plugins = [p.get_module_name() for p in self.plugins]
logger.info(f"Available plugins: {available_plugins}")
logger.info(f"Active plugins: {self._active_plugins}")
# Find missing plugins
missing_plugins = [p for p in self._active_plugins if p not in available_plugins]
if missing_plugins:
logger.warning(f"Active plugins not found: {missing_plugins}")
for plugin_name in activePlugins:
# Check for exact match first
if plugin_name in available_plugins:
valid_active_plugins.append(plugin_name)
continue
# Try case-insensitive match
plugin_name_lower = plugin_name.lower()
matched = False
for available in available_plugins:
if available.lower() == plugin_name_lower:
# Use the correctly cased name from available plugins
valid_active_plugins.append(available)
matched = True
logger.info(f"Case-insensitive match: requested '{plugin_name}', using '{available}'")
break
if not matched:
logger.warning(f"Plugin '{plugin_name}' not found, skipping")
self.syncAllPluginsActive()
# Only use valid plugins
self._active_plugins = valid_active_plugins
logger.info(f"Using verified active plugins: {self._active_plugins}")
except Exception as e:
logger.error(f"Error validating active plugins: {e}")
import traceback
logger.error(traceback.format_exc())
# Continue with an empty plugin list in case of error
self._active_plugins = []
# Check if the operation has already timed out
if time.time() - start_time > operation_timeout:
logger.error("Plugin activation operation timed out during validation, skipping sync")
return
# Now sync the plugins with timeout protection
try:
# Use a separate thread with timeout for syncing
sync_success = [False]
def sync_with_timeout():
try:
self.syncAllPluginsActive()
sync_success[0] = True
except Exception as e:
logger.error(f"Error in plugin sync thread: {e}")
import traceback
logger.error(traceback.format_exc())
# Make sure we don't exceed our overall operation timeout
remaining_time = operation_timeout - (time.time() - start_time)
if remaining_time <= 0:
logger.error("No time remaining for plugin sync, skipping")
return
sync_thread = threading.Thread(target=sync_with_timeout)
sync_thread.daemon = True
sync_thread.start()
sync_thread.join(min(30, remaining_time)) # Use either 30 seconds or remaining time, whichever is less
if not sync_success[0]:
logger.error("Plugin sync timed out or failed, some plugins may not be properly loaded")
except Exception as e:
logger.error(f"Error setting up plugin sync: {e}")
import traceback
logger.error(traceback.format_exc())
logger.info("Plugin activation operation completed")
return
def setPluginActive(self, pluginInfo, active):
"""Set the active state of a plugin."""
@ -292,6 +402,20 @@ class PluginSystemManager:
"""Sync the active state of all plugins."""
logger.info("Syncing active state of all plugins")
# Skip if pluggy is not available
if not PLUGGY_AVAILABLE:
logger.warning("Pluggy not available, skipping plugin sync")
return
# Skip if no plugins found - don't waste time trying to sync nothing
if not self._plugins:
logger.warning("No plugins found, skipping plugin sync")
return
# Set an overall timeout for the sync operation to prevent the screen reader from hanging
sync_timeout = 30 # seconds
sync_start_time = time.time()
# Log plugin status before syncing
if PLUGIN_DEBUG:
for pluginInfo in self.plugins:
@ -299,24 +423,75 @@ class PluginSystemManager:
is_loaded = pluginInfo.loaded
logger.debug(f"Plugin {pluginInfo.get_module_name()}: active={is_active}, loaded={is_loaded}")
# First unload inactive plugins
for pluginInfo in self.plugins:
if not self.isPluginActive(pluginInfo) and pluginInfo.loaded:
try:
# Create plans first, then execute to avoid changing collection during iteration
to_unload = [p for p in self.plugins if not self.isPluginActive(p) and p.loaded]
to_load = [p for p in self.plugins if self.isPluginActive(p) and not p.loaded]
# First unload inactive plugins
for pluginInfo in to_unload:
# Check if we've exceeded our total sync timeout
if time.time() - sync_start_time > sync_timeout:
logger.error("Plugin sync operation timed out, continuing with partially synced plugins")
break
logger.info(f"Unloading inactive plugin: {pluginInfo.get_module_name()}")
self.unloadPlugin(pluginInfo)
try:
unload_result = self.unloadPlugin(pluginInfo)
logger.info(f"Plugin {pluginInfo.get_module_name()} unload result: {unload_result}")
except Exception as e:
logger.error(f"Error unloading plugin {pluginInfo.get_module_name()}: {e}")
import traceback
logger.error(traceback.format_exc())
# Continue with other plugins even if this one fails
# Then load active plugins
for pluginInfo in self.plugins:
if self.isPluginActive(pluginInfo) and not pluginInfo.loaded:
# Then load active plugins
for pluginInfo in to_load:
# Check if we've exceeded our total sync timeout
if time.time() - sync_start_time > sync_timeout:
logger.error("Plugin sync operation timed out, continuing with partially synced plugins")
break
logger.info(f"Loading active plugin: {pluginInfo.get_module_name()}")
result = self.loadPlugin(pluginInfo)
logger.info(f"Plugin {pluginInfo.get_module_name()} load result: {result}")
# Log final plugin status
active_plugins = [p.get_module_name() for p in self.plugins if p.loaded]
logger.info(f"Active plugins after sync: {active_plugins}")
inactive_plugins = [p.get_module_name() for p in self.plugins if not p.loaded]
logger.info(f"Inactive plugins after sync: {inactive_plugins}")
# Use a separate thread with timeout to load each plugin
# This prevents any single plugin from hanging the sync operation
load_success = [False]
load_result = [False]
def load_plugin_with_timeout():
try:
result = self.loadPlugin(pluginInfo)
load_result[0] = result
load_success[0] = True
except Exception as e:
logger.error(f"Error in plugin loading thread for {pluginInfo.get_module_name()}: {e}")
import traceback
logger.error(traceback.format_exc())
# Each plugin gets up to 5 seconds to load
load_thread = threading.Thread(target=load_plugin_with_timeout)
load_thread.daemon = True
load_thread.start()
load_thread.join(5) # 5 second timeout per plugin
if load_success[0]:
logger.info(f"Plugin {pluginInfo.get_module_name()} load result: {load_result[0]}")
else:
logger.error(f"Plugin {pluginInfo.get_module_name()} load timed out or failed")
# Log final plugin status
active_plugins = [p.get_module_name() for p in self.plugins if p.loaded]
logger.info(f"Active plugins after sync: {active_plugins}")
inactive_plugins = [p.get_module_name() for p in self.plugins if not p.loaded]
logger.info(f"Inactive plugins after sync: {inactive_plugins}")
except Exception as e:
logger.error(f"Error syncing plugins: {e}")
import traceback
logger.error(traceback.format_exc())
logger.info("Plugin sync operation completed")
return
def loadPlugin(self, pluginInfo):
"""Load a plugin."""
@ -326,14 +501,35 @@ class PluginSystemManager:
return False
module_name = pluginInfo.get_module_name()
logger.info(f"Attempting to load plugin: {module_name}")
logger.debug(f"===== PLUGIN LOADING START: {module_name} =====")
# Setup timeout for plugin loading with a proper handler that interrupts the process
load_timeout = 5 # seconds
load_completed = False
load_timed_out = False
def timeout_handler():
nonlocal load_completed, load_timed_out
if not load_completed:
load_timed_out = True
logger.error(f"Plugin loading timed out for {module_name}")
# When we time out, we want to mark this as a failed plugin
# but still allow the screen reader to continue
timer = threading.Timer(load_timeout, timeout_handler)
timer.start()
try:
# Already loaded?
if pluginInfo.loaded:
logger.info(f"Plugin {module_name} already loaded, skipping")
load_completed = True
return True
# Check for timeout on every critical step
if load_timed_out:
return False
# Try to find the plugin file
module_name = pluginInfo.get_module_name()
plugin_dir = pluginInfo.get_module_dir()
@ -350,76 +546,206 @@ class PluginSystemManager:
if not os.path.exists(plugin_file):
logger.error(f"Plugin file not found: {plugin_file}")
load_completed = True
return False
# Check for timeout again
if load_timed_out:
return False
logger.info(f"Loading plugin from: {plugin_file}")
spec = importlib.util.spec_from_file_location(module_name, plugin_file)
if spec is None:
logger.error(f"Failed to create spec for plugin: {module_name}")
return False
try:
spec = importlib.util.spec_from_file_location(module_name, plugin_file)
if spec is None:
logger.error(f"Failed to create spec for plugin: {module_name}")
load_completed = True
return False
module = importlib.util.module_from_spec(spec)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
pluginInfo.module = module
# Use a separate thread with timeout to execute the module
# This prevents hanging on problematic module imports
exec_success = [False]
def exec_module_with_timeout():
try:
spec.loader.exec_module(module)
exec_success[0] = True
except Exception as e:
logger.error(f"Error executing module {module_name}: {e}")
import traceback
logger.error(traceback.format_exc())
exec_thread = threading.Thread(target=exec_module_with_timeout)
exec_thread.daemon = True # Make thread daemonic so it doesn't block program exit
exec_thread.start()
exec_thread.join(3) # 3 second timeout for module execution
if not exec_success[0]:
logger.error(f"Module execution timed out or failed for {module_name}")
load_completed = True
return False
pluginInfo.module = module
except Exception as e:
logger.error(f"Failed to load module {module_name}: {e}")
import traceback
logger.error(traceback.format_exc())
load_completed = True
return False
# Check for timeout
if load_timed_out:
return False
# Find Plugin class
# Find Plugin class - limit the time we spend searching
start_time = time.time()
plugin_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (inspect.isclass(attr) and
attr.__module__ == module.__name__ and
hasattr(attr, 'activate')):
plugin_class = attr
logger.info(f"Found plugin class: {attr.__name__} in {module_name}")
break
try:
for attr_name in dir(module):
# Prevent infinite loops or excessive time in dir() operation
if time.time() - start_time > 2: # 2 second timeout
logger.error(f"Finding plugin class timed out for {module_name}")
break
try:
attr = getattr(module, attr_name)
if (inspect.isclass(attr) and
attr.__module__ == module.__name__ and
hasattr(attr, 'activate')):
plugin_class = attr
logger.info(f"Found plugin class: {attr.__name__} in {module_name}")
break
except Exception as e:
logger.error(f"Error checking attribute {attr_name} in {module_name}: {e}")
continue
except Exception as e:
logger.error(f"Error finding plugin class in {module_name}: {e}")
if not plugin_class:
logger.error(f"No plugin class found in {module_name}")
load_completed = True
return False
# Create and initialize plugin instance
logger.info(f"Creating instance of plugin class: {plugin_class.__name__}")
plugin_instance = plugin_class()
pluginInfo.instance = plugin_instance
# Check for timeout
if load_timed_out:
return False
# Create and initialize plugin instance with error handling
try:
logger.info(f"Creating instance of plugin class: {plugin_class.__name__}")
plugin_instance = plugin_class()
pluginInfo.instance = plugin_instance
except Exception as e:
logger.error(f"Failed to create instance of plugin class {plugin_class.__name__}: {e}")
import traceback
logger.error(traceback.format_exc())
load_completed = True
return False
# Check for timeout
if load_timed_out:
return False
# Ensure plugins have a reference to the app
plugin_instance.app = self.getApp()
logger.info(f"Set app reference for plugin: {module_name}")
if hasattr(plugin_instance, 'set_app'):
plugin_instance.set_app(self.getApp())
logger.info(f"Called set_app() for plugin: {module_name}")
try:
plugin_instance.app = self.getApp()
logger.info(f"Set app reference for plugin: {module_name}")
if hasattr(plugin_instance, 'set_app'):
plugin_instance.set_app(self.getApp())
logger.info(f"Called set_app() for plugin: {module_name}")
if hasattr(plugin_instance, 'set_plugin_info'):
plugin_instance.set_plugin_info(pluginInfo)
logger.info(f"Called set_plugin_info() for plugin: {module_name}")
if hasattr(plugin_instance, 'set_plugin_info'):
plugin_instance.set_plugin_info(pluginInfo)
logger.info(f"Called set_plugin_info() for plugin: {module_name}")
except Exception as e:
logger.error(f"Error setting up plugin instance for {module_name}: {e}")
import traceback
logger.error(traceback.format_exc())
# Check for timeout
if load_timed_out:
return False
# Log plugin methods for debugging
if PLUGIN_DEBUG:
try:
logger.debug(f"Plugin instance methods: {[m for m in dir(plugin_instance) if not m.startswith('_')]}")
logger.debug(f"Checking for activate method: {'activate' in dir(plugin_instance)}")
except Exception as e:
logger.error(f"Error logging plugin debug info for {module_name}: {e}")
# Register with pluggy and activate
if self.plugin_manager is None:
logger.error(f"Plugin manager is None when loading {module_name}")
load_completed = True
return False
# Check for timeout
if load_timed_out:
return False
logger.info(f"Registering plugin with pluggy: {module_name}")
self.plugin_manager.register(plugin_instance)
try:
logger.info(f"Registering plugin with pluggy: {module_name}")
self.plugin_manager.register(plugin_instance)
except Exception as e:
logger.error(f"Error registering plugin {module_name} with pluggy: {e}")
import traceback
logger.error(traceback.format_exc())
load_completed = True
return False
# Check for timeout
if load_timed_out:
return False
try:
# Use a separate thread with timeout to activate the plugin
activation_success = [False]
def activate_with_timeout():
try:
self.plugin_manager.hook.activate(plugin=plugin_instance)
activation_success[0] = True
except Exception as e:
logger.error(f"Error in plugin activation thread for {module_name}: {e}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"Activating plugin: {module_name}")
self.plugin_manager.hook.activate(plugin=plugin_instance)
activate_thread = threading.Thread(target=activate_with_timeout)
activate_thread.daemon = True
activate_thread.start()
activate_thread.join(3) # 3 second timeout for activation
if not activation_success[0]:
logger.error(f"Plugin activation timed out or failed for {module_name}")
# We still consider this partially successful - the plugin is loaded
# but not activated. Mark it as not loaded, but don't halt the program.
load_completed = True
return False
except Exception as e:
logger.error(f"Error activating plugin {module_name}: {e}")
import traceback
logger.error(traceback.format_exc())
load_completed = True
return False
pluginInfo.loaded = True
logger.info(f"Successfully loaded plugin: {module_name}")
logger.debug(f"===== PLUGIN LOADING SUCCESS: {module_name} =====")
load_completed = True
return True
except Exception as e:
logger.error(f"Failed to load plugin {module_name}: {e}")
import traceback
logger.error(traceback.format_exc())
logger.debug(f"===== PLUGIN LOADING FAILED: {module_name} =====")
load_completed = True
return False
finally:
timer.cancel()
def unloadPlugin(self, pluginInfo):
"""Unload a plugin."""

View File

@ -26,13 +26,14 @@
from cthulhu import plugin
import gi, os
gi.require_version('Peas', '1.0')
from gi.repository import GObject
from gi.repository import Peas
import logging
logger = logging.getLogger(__name__)
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, Gdk
class Clipboard(GObject.Object, Peas.Activatable, plugin.Plugin):
class Clipboard(GObject.Object, plugin.Plugin):
#__gtype_name__ = 'Clipboard'
object = GObject.Property(type=GObject.Object)

View File

@ -26,11 +26,11 @@
from cthulhu import plugin
import gi, time
gi.require_version('Peas', '1.0')
from gi.repository import GObject
from gi.repository import Peas
import logging
class Date(GObject.Object, Peas.Activatable, plugin.Plugin):
logger = logging.getLogger(__name__)
class Date(GObject.Object, plugin.Plugin):
#__gtype_name__ = 'Date'
object = GObject.Property(type=GObject.Object)

View File

@ -26,23 +26,41 @@
from cthulhu import plugin
import gi
gi.require_version('Peas', '1.0')
from gi.repository import GObject
from gi.repository import Peas
import logging
class HelloCthulhu(GObject.Object, Peas.Activatable, plugin.Plugin):
logger = logging.getLogger(__name__)
class HelloCthulhu(GObject.Object, plugin.Plugin):
#__gtype_name__ = 'HelloCthulhu'
object = GObject.Property(type=GObject.Object)
def __init__(self):
plugin.Plugin.__init__(self)
def do_activate(self):
API = self.object
self.connectSignal("start-application-completed", self.process)
def do_deactivate(self):
API = self.object
def do_update_state(self):
API = self.object
@plugin.cthulhu_hookimpl
def activate(self, plugin=None):
# Skip if this activation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Activating HelloCthulhu plugin")
try:
if self.app:
signal_manager = self.app.getSignalManager()
signal_manager.connectSignal("start-application-completed", self.process, "default")
else:
logger.error("No app reference available")
except Exception as e:
logger.error(f"Error activating HelloCthulhu plugin: {e}")
@plugin.cthulhu_hookimpl
def deactivate(self, plugin=None):
# Skip if this deactivation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Deactivating HelloCthulhu plugin")
# No specific cleanup needed
def process(self, app):
messages = app.getDynamicApiManager().getAPI('Messages')
app.getDynamicApiManager().getAPI('CthulhuState').activeScript.presentMessage(messages.START_CTHULHU, resetStyles=False)

View File

@ -35,9 +35,10 @@ __license__ = "LGPL"
from cthulhu import plugin
import gi, math, time
gi.require_version('Peas', '1.0')
from gi.repository import GObject
from gi.repository import Peas
import logging
logger = logging.getLogger(__name__)
gi.require_version("Atspi", "2.0")
from gi.repository import Atspi
@ -68,7 +69,7 @@ AXUtilities = None
keybindings = None
input_event = None
class MouseReview(GObject.Object, Peas.Activatable, plugin.Plugin):
class MouseReview(GObject.Object, plugin.Plugin):
#__gtype_name__ = 'MouseReview'
object = GObject.Property(type=GObject.Object)

View File

@ -26,25 +26,43 @@
from cthulhu import plugin
import gi
gi.require_version('Peas', '1.0')
from gi.repository import GObject
from gi.repository import Peas
import logging
import PluginManagerUi
class PluginManager(GObject.Object, Peas.Activatable, plugin.Plugin):
logger = logging.getLogger(__name__)
class PluginManager(GObject.Object, plugin.Plugin):
#__gtype_name__ = 'PluginManager'
object = GObject.Property(type=GObject.Object)
def __init__(self):
plugin.Plugin.__init__(self)
self.pluginManagerUi = None
def do_activate(self):
API = self.object
self.registerGestureByString(self.startPluginManagerUi, _('plugin manager'), 'kb:cthulhu+e')
def do_deactivate(self):
API = self.object
@plugin.cthulhu_hookimpl
def activate(self, plugin=None):
# Skip if this activation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Activating PluginManager plugin")
try:
if self.app:
self.registerGestureByString(self.startPluginManagerUi, 'plugin manager', 'kb:cthulhu+e')
else:
logger.error("No app reference available")
except Exception as e:
logger.error(f"Error activating PluginManager plugin: {e}")
@plugin.cthulhu_hookimpl
def deactivate(self, plugin=None):
# Skip if this deactivation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Deactivating PluginManager plugin")
# No specific cleanup needed
def startPluginManagerUi(self, script=None, inputEvent=None):
self.showUI()

View File

@ -25,7 +25,7 @@
from cthulhu import plugin
from gi.repository import GObject, Peas
from gi.repository import GObject
import glob
import os
import importlib.util
@ -33,21 +33,23 @@ import random
import string
import _thread
from subprocess import Popen, PIPE
import logging
logger = logging.getLogger(__name__)
settings = None
speech = None
braille = None
input_event = None
def outputMessage( Message):
if (settings.enableSpeech):
def outputMessage(Message):
if (settings and settings.enableSpeech and speech):
speech.speak(Message)
if (settings.enableBraille):
if (settings and settings.enableBraille and braille):
braille.displayMessage(Message)
class SimplePluginSystem(GObject.Object, Peas.Activatable, plugin.Plugin):
class SimplePluginSystem(GObject.Object, plugin.Plugin):
__gtype_name__ = 'SimplePluginSystem'
object = GObject.Property(type=GObject.Object)
def __init__(self):
plugin.Plugin.__init__(self)
@ -55,27 +57,56 @@ class SimplePluginSystem(GObject.Object, Peas.Activatable, plugin.Plugin):
self.loaded = False
self.plugin_repo = os.path.expanduser('~') + "/.local/share/cthulhu/simple-plugins-enabled/"
def do_activate(self):
API = self.object
global settings
global speech
global braille
global input_event
settings = API.app.getDynamicApiManager().getAPI('Settings')
speech = API.app.getDynamicApiManager().getAPI('Speech')
braille = API.app.getDynamicApiManager().getAPI('Braille')
input_event = API.app.getDynamicApiManager().getAPI('InputEvent')
"""Required method for plugins"""
if not self.loaded:
self.load_plugins()
@plugin.cthulhu_hookimpl
def activate(self, plugin=None):
"""Activate the plugin."""
# Skip if this activation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Activating SimplePluginSystem plugin")
try:
global settings
global speech
global braille
global input_event
if self.app:
settings = self.app.getDynamicApiManager().getAPI('Settings')
speech = self.app.getDynamicApiManager().getAPI('Speech')
braille = self.app.getDynamicApiManager().getAPI('Braille')
input_event = self.app.getDynamicApiManager().getAPI('InputEvent')
if not self.loaded:
self.load_plugins()
else:
logger.error("SimplePluginSystem: No app reference available")
except Exception as e:
logger.error(f"Error activating SimplePluginSystem plugin: {e}")
import traceback
logger.error(traceback.format_exc())
def do_deactivate(self):
"""Required method for plugins"""
# Remove all registered keybindings
for plugin in self.plugin_list:
self.unregisterShortcut(plugin['function'], plugin['shortcut'])
self.loaded = False
self.plugin_list = []
@plugin.cthulhu_hookimpl
def deactivate(self, plugin=None):
"""Deactivate the plugin."""
# Skip if this deactivation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Deactivating SimplePluginSystem plugin")
try:
# Remove all registered keybindings
for plugin in self.plugin_list:
if 'function' in plugin and 'shortcut' in plugin:
self.unregisterShortcut(plugin['function'], plugin['shortcut'])
self.loaded = False
self.plugin_list = []
except Exception as e:
logger.error(f"Error deactivating SimplePluginSystem plugin: {e}")
import traceback
logger.error(traceback.format_exc())
def SetupShortcutAndHandle(self, currPluginSetting):
shortcut = ''

View File

@ -24,30 +24,57 @@
# Original source: https://gitlab.gnome.org/GNOME/orca
import gi, time
gi.require_version('Peas', '1.0')
from gi.repository import GObject
from gi.repository import Peas
import logging
from cthulhu import plugin
class Time(GObject.Object, Peas.Activatable, plugin.Plugin):
logger = logging.getLogger(__name__)
class Time(GObject.Object, plugin.Plugin):
#__gtype_name__ = 'Time'
object = GObject.Property(type=GObject.Object)
def __init__(self):
plugin.Plugin.__init__(self)
def do_activate(self):
API = self.object
self.connectSignal("setup-inputeventhandlers-completed", self.setupCompatBinding)
@plugin.cthulhu_hookimpl
def activate(self, plugin=None):
# Skip if this activation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Activating Time plugin")
try:
if self.app:
signal_manager = self.app.getSignalManager()
signal_manager.connectSignal("setup-inputeventhandlers-completed", self.setupCompatBinding, "default")
else:
logger.error("No app reference available")
except Exception as e:
logger.error(f"Error activating Time plugin: {e}")
def setupCompatBinding(self, app):
cmdnames = app.getDynamicApiManager().getAPI('Cmdnames')
inputEventHandlers = app.getDynamicApiManager().getAPI('inputEventHandlers')
inputEventHandlers['presentTimeHandler'] = app.getAPIHelper().createInputEventHandler(self.presentTime, cmdnames.PRESENT_CURRENT_TIME)
def do_deactivate(self):
API = self.object
inputEventHandlers = API.app.getDynamicApiManager().getAPI('inputEventHandlers')
del inputEventHandlers['presentTimeHandler']
def do_update_state(self):
try:
cmdnames = app.getDynamicApiManager().getAPI('Cmdnames')
inputEventHandlers = app.getDynamicApiManager().getAPI('inputEventHandlers')
inputEventHandlers['presentTimeHandler'] = app.getAPIHelper().createInputEventHandler(self.presentTime, cmdnames.PRESENT_CURRENT_TIME)
except Exception as e:
logger.error(f"Error setting up Time plugin: {e}")
@plugin.cthulhu_hookimpl
def deactivate(self, plugin=None):
# Skip if this deactivation call isn't for us
if plugin is not None and plugin is not self:
return
logger.info("Deactivating Time plugin")
try:
if self.app:
inputEventHandlers = self.app.getDynamicApiManager().getAPI('inputEventHandlers')
if 'presentTimeHandler' in inputEventHandlers:
del inputEventHandlers['presentTimeHandler']
except Exception as e:
logger.error(f"Error deactivating Time plugin: {e}")
API = self.object
def presentTime(self, script=None, inputEvent=None):
""" Presents the current time. """

View File

@ -16,12 +16,12 @@ logger = logging.getLogger(__name__)
class HelloWorld(Plugin):
"""Hello World plugin."""
def __init__(self, *args, **kwargs):
"""Initialize the plugin."""
super().__init__(*args, **kwargs)
logger.info("HelloWorld plugin initialized")
@cthulhu_hookimpl
def activate(self, plugin=None):
"""Activate the plugin."""
@ -31,7 +31,7 @@ class HelloWorld(Plugin):
try:
logger.info("Activating Hello World plugin")
# Register our keyboard shortcut
self.registerGestureByString(
self.speakTest,
@ -41,19 +41,19 @@ class HelloWorld(Plugin):
)
except Exception as e:
logger.error(f"Error activating Hello World plugin: {e}")
@cthulhu_hookimpl
def deactivate(self, plugin=None):
"""Deactivate the plugin."""
# Skip if this deactivation call isn't for us
if plugin is not None and plugin is not self:
return
try:
logger.info("Deactivating Hello World plugin")
except Exception as e:
logger.error(f"Error deactivating Hello World plugin: {e}")
def speakTest(self, script=None, inputEvent=None):
"""Speak a test message."""
try:
@ -62,7 +62,7 @@ class HelloWorld(Plugin):
'hello world',
resetStyles=False
)
return True
except Exception as e:
logger.error(f"Error in speakTest: {e}")