diff --git a/src/cthulhu/plugins/SimplePluginSystem/Makefile.am b/src/cthulhu/plugins/SimplePluginSystem/Makefile.am new file mode 100644 index 0000000..6bd1d0a --- /dev/null +++ b/src/cthulhu/plugins/SimplePluginSystem/Makefile.am @@ -0,0 +1,7 @@ +cthulhu_python_PYTHON = \ + __init__.py \ + SimplePluginSystem.plugin \ + SimplePluginSystem.py + +cthulhu_pythondir=$(pkgpythondir)/plugins/SimplePluginSystem + diff --git a/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.plugin b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.plugin new file mode 100644 index 0000000..48eb592 --- /dev/null +++ b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.plugin @@ -0,0 +1,9 @@ +[Plugin] +Module=SimplePluginSystem +Loader=python3 +Name=Simple Plugin System +Description=Simple plugin system implementation for Cthulhu +Authors=Chrys +Copyright=Copyright Â2024 Chrys, Storm Dragon +Website=https://git.stormux.org/storm/cthulhu +Version=1.0 diff --git a/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py new file mode 100644 index 0000000..0fe5c9d --- /dev/null +++ b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py @@ -0,0 +1,295 @@ +from gi.repository import GObject, Peas, Cthulhu +import glob +import os +import importlib.util +import random +import string +import threading +from subprocess import Popen, PIPE + +class SimplePluginSystem(GObject.Object, Cthulhu.PluginInterface): + __gtype_name__ = 'SimplePluginSystem' + + def __init__(self): + super().__init__() + self.plugin_list = [] + self.loaded = False + self.my_key_bindings = Cthulhu.keybindings.KeyBindings() + self.plugin_repo = os.path.expanduser('~') + "/.local/share/cthulhu/plugins/" + + def do_activate(self): + """Required method for Cthulhu plugins""" + if not self.loaded: + self._load_plugins() + + def do_deactivate(self): + """Required method for Cthulhu plugins""" + # Remove all registered keybindings + for plugin in self.plugin_list: + if plugin.get('inputeventhandler'): + self.my_key_bindings.remove(plugin['inputeventhandler']) + Cthulhu.settings.keyBindingsMap["default"] = Cthulhu.keybindings.KeyBindings() + self.loaded = False + self.plugin_list = [] + + def output_message(self, message): + """Output message through speech and/or braille""" + if Cthulhu.settings.enableSpeech: + Cthulhu.speech.speak(message) + if Cthulhu.settings.enableBraille: + Cthulhu.braille.displayMessage(message) + + def setup_shortcut_and_handle(self, settings): + """Setup keyboard shortcuts for plugins""" + settings['inputeventhandler'] = Cthulhu.input_event.InputEventHandler( + settings['function'], settings['pluginname']) + + # Create key binding based on modifier combinations + modifiers = Cthulhu.keybindings.defaultModifierMask + if settings['shiftkey'] and not settings['ctrlkey'] and not settings['altkey']: + mask = Cthulhu.keybindings.ORCA_SHIFT_MODIFIER_MASK + elif not settings['shiftkey'] and settings['ctrlkey'] and not settings['altkey']: + mask = Cthulhu.keybindings.ORCA_CTRL_MODIFIER_MASK + elif not settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']: + mask = Cthulhu.keybindings.ORCA_ALT_MODIFIER_MASK + elif not settings['shiftkey'] and settings['ctrlkey'] and settings['altkey']: + mask = Cthulhu.keybindings.ORCA_CTRL_ALT_MODIFIER_MASK + elif settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']: + mask = Cthulhu.keybindings.SHIFT_ALT_MODIFIER_MASK + else: + mask = Cthulhu.keybindings.ORCA_MODIFIER_MASK + + self.my_key_bindings.add( + Cthulhu.keybindings.KeyBinding( + settings['key'], modifiers, mask, settings['inputeventhandler'] + ) + ) + Cthulhu.settings.keyBindingsMap["default"] = self.my_key_bindings + + def id_generator(self, size=7, chars=string.ascii_letters): + """Generate random identifier""" + return ''.join(random.choice(chars) for _ in range(size)) + + def init_settings(self): + """Initialize default settings dictionary""" + return { + 'filepath': '', + 'pluginname': '', + 'functionname': '', + 'key': '', + 'shiftkey': False, + 'ctrlkey': False, + 'altkey': False, + 'startnotify': False, + 'stopnotify': False, + 'blockcall': False, + 'error': False, + 'exec': False, + 'executeable': False, + 'parameters': '', + 'function': None, + 'inputeventhandler': None, + 'valid': False, + 'supressoutput': False + } + + def get_plugin_settings(self, filepath): + """Parse plugin settings from filename and content""" + settings = self.init_settings() + try: + settings['file'] = filepath + fileName, fileExtension = os.path.splitext(filepath) + if fileExtension and fileExtension != '': + settings['loadable'] = (fileExtension.lower() == '.py') + + filename = os.path.basename(filepath) + filename = os.path.splitext(filename)[0] + + # Parse filename components + name_parts = filename.split('__-__') + if len(name_parts) == 2: + settings['pluginname'] = name_parts[0] + else: + settings['pluginname'] = 'NoNameAvailable' + + # Parse settings from filename + setting_parts = name_parts[-1].split('__+__') + + # Extract parameters and key + for part in setting_parts: + if part.lower().startswith('parameters_'): + settings['parameters'] = part[11:] + elif part.lower().startswith('key_'): + settings['key'] = part[4:] + + if not settings['key']: + settings['key'] = setting_parts[-1].lower() + + # Parse boolean flags + lower_parts = list(map(str.lower, setting_parts)) + settings.update({ + 'shiftkey': 'shift' in lower_parts, + 'ctrlkey': 'control' in lower_parts, + 'altkey': 'alt' in lower_parts, + 'startnotify': 'startnotify' in lower_parts, + 'stopnotify': 'stopnotify' in lower_parts, + 'blockcall': 'blockcall' in lower_parts, + 'error': 'error' in lower_parts, + 'supressoutput': 'supressoutput' in lower_parts, + 'exec': 'exec' in lower_parts, + 'loadmodule': 'loadmodule' in lower_parts + }) + + settings = self._read_settings_from_plugin(settings) + + # Validate settings + if not settings['loadmodule'] and not os.access(filepath, os.X_OK): + return self.init_settings() + if settings['loadmodule'] and not settings['loadable']: + return self.init_settings() + if len(settings['key']) < 1 and not settings['exec']: + return self.init_settings() + + settings['valid'] = True + return settings + + except Exception: + return self.init_settings() + + def _read_settings_from_plugin(self, settings): + """Read additional settings from plugin file content""" + if not os.access(settings['file'], os.R_OK): + return settings + + _, ext = os.path.splitext(settings['file']) + if ext and ext.lower() not in ['.py', '.sh']: + return settings + + try: + with open(settings['file'], "r") as plugin_file: + for line in plugin_file: + line = line.lower().replace(" ", "") + if 'sopsproperty:' in line: + prop = line.split('sopsproperty:')[1].strip() + if prop in settings: + settings[prop] = True + except Exception: + pass + + return settings + + def _build_plugin_subprocess(self, settings): + """Build function body for subprocess-based plugins""" + curr_plugin = f"'\"{settings['file']}\" {settings['parameters']}'" + plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname'] + + func_body = [ + f"def {settings['functionname']}(script=None, inputEvent=None):" + ] + + if settings['startnotify']: + func_body.append(f" self.output_message('start {plugin_name}')") + + func_body.extend([ + f" p = Popen({curr_plugin}, stdout=PIPE, stderr=PIPE, shell=True)", + " stdout, stderr = p.communicate()", + " message = ''", + f" if not {settings['supressoutput']} and stdout:", + ' message += str(stdout, "utf-8")', + f" if {settings['error']} and stderr:", + ' message += \' error: \' + str(stderr, "utf-8")', + " self.output_message(message)" + ]) + + if settings['stopnotify']: + func_body.append(f" self.output_message('finish {plugin_name}')") + + func_body.append(" return True\n") + + # Add threaded version + func_body.extend([ + f"def {settings['functionname']}T(script=None, inputEvent=None):", + f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()" + ]) + + return "\n".join(func_body) + + def _build_plugin_exec(self, settings): + """Build function body for Python module-based plugins""" + plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname'] + + func_body = [ + f"def {settings['functionname']}(script=None, inputEvent=None):" + ] + + if settings['startnotify']: + func_body.append(f" self.output_message('start {plugin_name}')") + + func_body.extend([ + " try:", + f' spec = importlib.util.spec_from_file_location("{settings["functionname"]}", "{settings["file"]}")', + f" {settings['functionname']}Module = importlib.util.module_from_spec(spec)", + f" spec.loader.exec_module({settings['functionname']}Module)", + " except:", + " pass" + ]) + + if settings['error']: + func_body.append(f' self.output_message("Error while executing {plugin_name}")') + + if settings['stopnotify']: + func_body.append(f" self.output_message('finish {plugin_name}')") + + func_body.append(" return True\n") + + # Add threaded version + func_body.extend([ + f"def {settings['functionname']}T(script=None, inputEvent=None):", + f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()" + ]) + + return "\n".join(func_body) + + def _get_function_name(self, settings): + """Generate unique function name for plugin""" + fn = "_" + settings['pluginname'].replace("-", "_") + settings['functionname'] = fn + while (settings['functionname'] == fn or + settings['functionname'] + 'T' in globals() or + settings['functionname'] in globals()): + settings['functionname'] = self.id_generator() + fn + return settings + + def _load_plugins(self): + """Load all plugins from the plugin directory""" + plugin_list = glob.glob(self.plugin_repo + '*') + for curr_plugin in plugin_list: + settings = self.get_plugin_settings(curr_plugin) + if not settings['valid']: + continue + + settings = self._get_function_name(settings) + + # Build and execute function definition + if settings['loadmodule']: + exec(self._build_plugin_exec(settings)) + else: + exec(self._build_plugin_subprocess(settings)) + + # Set up function reference + if settings['blockcall']: + settings['function'] = globals()[settings['functionname']] + else: + settings['function'] = globals()[settings['functionname'] + "T"] + + # Execute if needed + if settings['exec']: + settings['function']() + + # Set up keybinding if needed + if settings['key']: + self.setup_shortcut_and_handle(settings) + + self.plugin_list.append(settings) + + self.loaded = True diff --git a/src/cthulhu/plugins/SimplePluginSystem/__init__.py b/src/cthulhu/plugins/SimplePluginSystem/__init__.py new file mode 100644 index 0000000..e69de29