diff --git a/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.plugin b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.plugin index 2fe4d1a..60dde6d 100644 --- a/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.plugin +++ b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.plugin @@ -7,3 +7,5 @@ Authors=Chrys ;Storm Dragon Copyright=Copyright Â2024 Chrys, Storm Dragon Website=https://git.stormux.org/storm/cthulhu Version=1.0 +Builtin=true + diff --git a/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py index fe03322..b5d5ab0 100644 --- a/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py +++ b/src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py @@ -1,295 +1,272 @@ -from gi.repository import GObject, Peas, Cthulhu +from cthulhu import plugin + +from gi.repository import GObject, Peas import glob import os import importlib.util import random import string -import threading +import _thread from subprocess import Popen, PIPE -class SimplePluginSystem(GObject.Object, Cthulhu.PluginInterface): +settings = None +keybindings = None +speech = None +braille = None +input_event = None + +def outputMessage( Message): + if (settings.enableSpeech): + speech.speak(Message) + if (settings.enableBraille): + braille.displayMessage(Message) + +class SimplePluginSystem(GObject.Object, Peas.Activatable, plugin.Plugin): __gtype_name__ = 'SimplePluginSystem' + object = GObject.Property(type=GObject.Object) def __init__(self): - super().__init__() + plugin.Plugin.__init__(self) self.plugin_list = [] self.loaded = False - self.my_key_bindings = Cthulhu.keybindings.KeyBindings() - self.plugin_repo = os.path.expanduser('~') + "/.local/share/cthulhu/plugins/" + self.my_key_bindings = None + self.plugin_repo = os.path.expanduser('~') + "/.local/share/cthulhu/simple-plugins-enabled/" def do_activate(self): - """Required method for Cthulhu plugins""" + API = self.object + global settings + global keybindings + global speech + global braille + global input_event + settings = API.app.getDynamicApiManager().getAPI('Settings') + keybindings = API.app.getDynamicApiManager().getAPI('Keybindings') + speech = API.app.getDynamicApiManager().getAPI('Speech') + braille = API.app.getDynamicApiManager().getAPI('Braille') + input_event = API.app.getDynamicApiManager().getAPI('InputEvent') + """Required method for plugins""" + self.my_key_bindings = keybindings.KeyBindings() if not self.loaded: - self._load_plugins() + self.load_plugins() def do_deactivate(self): - """Required method for Cthulhu plugins""" + """Required method for plugins""" # Remove all registered keybindings for plugin in self.plugin_list: if plugin.get('inputeventhandler'): self.my_key_bindings.remove(plugin['inputeventhandler']) - Cthulhu.settings.keyBindingsMap["default"] = Cthulhu.keybindings.KeyBindings() + settings.keyBindingsMap["default"] = keybindings.KeyBindings() self.loaded = False self.plugin_list = [] - def output_message(self, message): - """Output message through speech and/or braille""" - if Cthulhu.settings.enableSpeech: - Cthulhu.speech.speak(message) - if Cthulhu.settings.enableBraille: - Cthulhu.braille.displayMessage(message) + def SetupShortcutAndHandle(self, currPluginSetting): + currPluginSetting['inputeventhandler'] = input_event.InputEventHandler(currPluginSetting['function'], currPluginSetting['pluginname']) + # just the modifier + if not currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and not currPluginSetting['altkey']: + self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_MODIFIER_MASK, currPluginSetting['inputeventhandler'])) + # + alt + if not currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and currPluginSetting['altkey']: + self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_ALT_MODIFIER_MASK, currPluginSetting['inputeventhandler'])) + # + CTRL + if not currPluginSetting['shiftkey'] and currPluginSetting['ctrlkey'] and not currPluginSetting['altkey']: + self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_CTRL_MODIFIER_MASK, currPluginSetting['inputeventhandler'])) + # + alt + CTRL + if not currPluginSetting['shiftkey'] and currPluginSetting['ctrlkey'] and currPluginSetting['altkey']: + self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_CTRL_ALT_MODIFIER_MASK, currPluginSetting['inputeventhandler'])) + # + shift + if currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and not currPluginSetting['altkey']: + self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_SHIFT_MODIFIER_MASK, currPluginSetting['inputeventhandler'])) + # alt + shift + if currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and currPluginSetting['altkey']: + self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.SHIFT_ALT_MODIFIER_MASK, currPluginSetting['inputeventhandler'])) + print(self.my_key_bindings, currPluginSetting['function']) - def setup_shortcut_and_handle(self, settings): - """Setup keyboard shortcuts for plugins""" - settings['inputeventhandler'] = Cthulhu.input_event.InputEventHandler( - settings['function'], settings['pluginname']) - - # Create key binding based on modifier combinations - modifiers = Cthulhu.keybindings.defaultModifierMask - if settings['shiftkey'] and not settings['ctrlkey'] and not settings['altkey']: - mask = Cthulhu.keybindings.CTHULHU_SHIFT_MODIFIER_MASK - elif not settings['shiftkey'] and settings['ctrlkey'] and not settings['altkey']: - mask = Cthulhu.keybindings.CTHULHU_CTRL_MODIFIER_MASK - elif not settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']: - mask = Cthulhu.keybindings.CTHULHU_ALT_MODIFIER_MASK - elif not settings['shiftkey'] and settings['ctrlkey'] and settings['altkey']: - mask = Cthulhu.keybindings.CTHULHU_CTRL_ALT_MODIFIER_MASK - elif settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']: - mask = Cthulhu.keybindings.SHIFT_ALT_MODIFIER_MASK - else: - mask = Cthulhu.keybindings.CTHULHU_MODIFIER_MASK - - self.my_key_bindings.add( - Cthulhu.keybindings.KeyBinding( - settings['key'], modifiers, mask, settings['inputeventhandler'] - ) - ) - Cthulhu.settings.keyBindingsMap["default"] = self.my_key_bindings + settings.keyBindingsMap["default"] = self.my_key_bindings + return currPluginSetting def id_generator(self, size=7, chars=string.ascii_letters): - """Generate random identifier""" return ''.join(random.choice(chars) for _ in range(size)) - def init_settings(self): - """Initialize default settings dictionary""" - return { - 'filepath': '', - 'pluginname': '', - 'functionname': '', - 'key': '', - 'shiftkey': False, - 'ctrlkey': False, - 'altkey': False, - 'startnotify': False, - 'stopnotify': False, - 'blockcall': False, - 'error': False, - 'exec': False, - 'executeable': False, - 'parameters': '', - 'function': None, - 'inputeventhandler': None, - 'valid': False, - 'supressoutput': False + def initSettings(self): + currPluginSetting={ + 'filepath':'', + 'pluginname':'', + 'functionname':'', + 'key':'', + 'shiftkey':False, + 'ctrlkey':False, + 'altkey':False, + 'startnotify':False, + 'stopnotify':False, + 'blockcall':False, + 'error':False, + 'exec': False, + 'parameters':'', + 'function':None, + 'inputeventhandler':None, + 'valid':False, + 'supressoutput':False } + return currPluginSetting - def get_plugin_settings(self, filepath): - """Parse plugin settings from filename and content""" - settings = self.init_settings() + def getPluginSettings(self, filepath, currPluginSetting): try: - settings['file'] = filepath + currPluginSetting['file'] = filepath fileName, fileExtension = os.path.splitext(filepath) - if fileExtension and fileExtension != '': - settings['loadable'] = (fileExtension.lower() == '.py') - - filename = os.path.basename(filepath) - filename = os.path.splitext(filename)[0] - - # Parse filename components - name_parts = filename.split('__-__') - if len(name_parts) == 2: - settings['pluginname'] = name_parts[0] - else: - settings['pluginname'] = 'NoNameAvailable' - - # Parse settings from filename - setting_parts = name_parts[-1].split('__+__') - - # Extract parameters and key - for part in setting_parts: - if part.lower().startswith('parameters_'): - settings['parameters'] = part[11:] - elif part.lower().startswith('key_'): - settings['key'] = part[4:] - - if not settings['key']: - settings['key'] = setting_parts[-1].lower() - - # Parse boolean flags - lower_parts = list(map(str.lower, setting_parts)) - settings.update({ - 'shiftkey': 'shift' in lower_parts, - 'ctrlkey': 'control' in lower_parts, - 'altkey': 'alt' in lower_parts, - 'startnotify': 'startnotify' in lower_parts, - 'stopnotify': 'stopnotify' in lower_parts, - 'blockcall': 'blockcall' in lower_parts, - 'error': 'error' in lower_parts, - 'supressoutput': 'supressoutput' in lower_parts, - 'exec': 'exec' in lower_parts, - 'loadmodule': 'loadmodule' in lower_parts - }) - - settings = self._read_settings_from_plugin(settings) - - # Validate settings - if not settings['loadmodule'] and not os.access(filepath, os.X_OK): - return self.init_settings() - if settings['loadmodule'] and not settings['loadable']: - return self.init_settings() - if len(settings['key']) < 1 and not settings['exec']: - return self.init_settings() - - settings['valid'] = True - return settings - - except Exception: - return self.init_settings() + if (fileExtension and (fileExtension != '')): #if there is an extension + currPluginSetting['loadable'] = (fileExtension.lower() == '.py') # only python is loadable + filename = os.path.basename(filepath) #filename + filename = os.path.splitext(filename)[0] #remove extension if we have one + #remove pluginname seperated by __-__ + filenamehelper = filename.split('__-__') + filename = filenamehelper[len(filenamehelper) - 1 ] + currPluginSetting['permission'] = os.access(filepath, os.X_OK ) + currPluginSetting['pluginname'] = 'NoNameAvailable' + if len(filenamehelper) == 2: + currPluginSetting['pluginname'] = filenamehelper[0] + #now get shortcuts seperated by __+__ + filenamehelper = filename.split('__+__') + if len([y for y in filenamehelper if 'parameters_' in y.lower()]) == 1 and\ + len([y for y in filenamehelper if 'parameters_' in y.lower()][0]) > 11: + currPluginSetting['parameters'] = [y for y in filenamehelper if 'parameters_' in y.lower()][0][11:] + if len([y for y in filenamehelper if 'key_' in y.lower()]) == 1 and\ + len([y for y in filenamehelper if 'key_' in y.lower()][0]) > 4 : + currPluginSetting['key'] = [y for y in filenamehelper if 'key_' in y.lower()][0][4] + if currPluginSetting['key'] == '': + settcurrPluginSetting = 'shift' in map(str.lower, filenamehelper) + currPluginSetting['ctrlkey'] = 'control' in map(str.lower, filenamehelper) + currPluginSetting['altkey'] = 'alt' in map(str.lower, filenamehelper) + currPluginSetting['startnotify'] = 'startnotify' in map(str.lower, filenamehelper) + currPluginSetting['stopnotify'] = 'stopnotify' in map(str.lower, filenamehelper) + currPluginSetting['blockcall'] = 'blockcall' in map(str.lower, filenamehelper) + currPluginSetting['error'] = 'error' in map(str.lower, filenamehelper) + currPluginSetting['supressoutput'] = 'supressoutput' in map(str.lower, filenamehelper) + currPluginSetting['exec'] = 'exec' in map(str.lower, filenamehelper) + currPluginSetting['loadmodule'] = 'loadmodule' in map(str.lower, filenamehelper) + currPluginSetting = self.readSettingsFromPlugin(currPluginSetting) + if not currPluginSetting['loadmodule']: + if not currPluginSetting['permission']: #subprocessing only works with exec permission + return self.initSettings() + if currPluginSetting['loadmodule'] and not currPluginSetting['loadable']: #sorry.. its not loadable only .py is loadable + return self.initSettings() + if (len(currPluginSetting['key']) > 1): #no shortcut + if not currPluginSetting['exec']: # and no exec -> the plugin make no sense because it isnt hooked anywhere + return self.initSettings() #so not load it (sets valid = False) + else: + currPluginSetting['key'] = '' #there is a strange key, but exec? ignore the key.. + currPluginSetting['valid'] = True # we could load everything + return currPluginSetting + except: + return self.initSettings() - def _read_settings_from_plugin(self, settings): - """Read additional settings from plugin file content""" - if not os.access(settings['file'], os.R_OK): - return settings - - _, ext = os.path.splitext(settings['file']) - if ext and ext.lower() not in ['.py', '.sh']: - return settings - - try: - with open(settings['file'], "r") as plugin_file: - for line in plugin_file: - line = line.lower().replace(" ", "") - if 'sopsproperty:' in line: - prop = line.split('sopsproperty:')[1].strip() - if prop in settings: - settings[prop] = True - except Exception: - pass - - return settings + def readSettingsFromPlugin(self, currPluginSetting): + if not os.access(currPluginSetting['file'], os.R_OK ): + return currPluginSetting + fileName, fileExtension = os.path.splitext(currPluginSetting['file']) + if (fileExtension and (fileExtension != '')): #if there is an extension + if (fileExtension.lower() != '.py') and \ + (fileExtension.lower() != '.sh'): + return currPluginSetting + else: + return currPluginSetting - def _build_plugin_subprocess(self, settings): - """Build function body for subprocess-based plugins""" - curr_plugin = f"'\"{settings['file']}\" {settings['parameters']}'" - plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname'] - - func_body = [ - f"def {settings['functionname']}(script=None, inputEvent=None):" - ] - - if settings['startnotify']: - func_body.append(f" self.output_message('start {plugin_name}')") - - func_body.extend([ - f" p = Popen({curr_plugin}, stdout=PIPE, stderr=PIPE, shell=True)", - " stdout, stderr = p.communicate()", - " message = ''", - f" if not {settings['supressoutput']} and stdout:", - ' message += str(stdout, "utf-8")', - f" if {settings['error']} and stderr:", - ' message += \' error: \' + str(stderr, "utf-8")', - " self.output_message(message)" - ]) - - if settings['stopnotify']: - func_body.append(f" self.output_message('finish {plugin_name}')") - - func_body.append(" return True\n") - - # Add threaded version - func_body.extend([ - f"def {settings['functionname']}T(script=None, inputEvent=None):", - f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()" - ]) - - return "\n".join(func_body) + with open(currPluginSetting['file'], "r") as pluginFile: + for line in pluginFile: + currPluginSetting['shiftkey'] = ('sopsproperty:shift' in line.lower().replace(" ", "")) or currPluginSetting['shiftkey'] + currPluginSetting['ctrlkey'] = ('sopsproperty:control' in line.lower().replace(" ", "")) or currPluginSetting['ctrlkey'] + currPluginSetting['altkey'] = ('sopsproperty:alt' in line.lower().replace(" ", "")) or currPluginSetting['altkey'] + currPluginSetting['startnotify'] = ('sopsproperty:startnotify' in line.lower().replace(" ", "")) or currPluginSetting['startnotify'] + currPluginSetting['stopnotify'] = ('sopsproperty:stopnotify' in line.lower().replace(" ", "")) or currPluginSetting['stopnotify'] + currPluginSetting['blockcall'] = ('sopsproperty:blockcall' in line.lower().replace(" ", "")) or currPluginSetting['blockcall'] + currPluginSetting['error'] = ('sopsproperty:error' in line.lower().replace(" ", "")) or currPluginSetting['error'] + currPluginSetting['supressoutput'] = ('sopsproperty:supressoutput' in line.lower().replace(" ", "")) or currPluginSetting['supressoutput'] + currPluginSetting['exec'] = ('sopsproperty:exec' in line.lower().replace(" ", "")) or currPluginSetting['exec'] + currPluginSetting['loadmodule'] = ('sopsproperty:loadmodule' in line.lower().replace(" ", "")) or currPluginSetting['loadmodule'] + return currPluginSetting - def _build_plugin_exec(self, settings): - """Build function body for Python module-based plugins""" - plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname'] - - func_body = [ - f"def {settings['functionname']}(script=None, inputEvent=None):" - ] - - if settings['startnotify']: - func_body.append(f" self.output_message('start {plugin_name}')") - - func_body.extend([ - " try:", - f' spec = importlib.util.spec_from_file_location("{settings["functionname"]}", "{settings["file"]}")', - f" {settings['functionname']}Module = importlib.util.module_from_spec(spec)", - f" spec.loader.exec_module({settings['functionname']}Module)", - " except:", - " pass" - ]) - - if settings['error']: - func_body.append(f' self.output_message("Error while executing {plugin_name}")') - - if settings['stopnotify']: - func_body.append(f" self.output_message('finish {plugin_name}')") - - func_body.append(" return True\n") - - # Add threaded version - func_body.extend([ - f"def {settings['functionname']}T(script=None, inputEvent=None):", - f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()" - ]) - - return "\n".join(func_body) + def buildPluginSubprocess(self, currPluginSetting): + currplugin = "\'\"" + currPluginSetting['file'] + "\" " + currPluginSetting['parameters'] + "\'" + pluginname = currPluginSetting['pluginname'] + if currPluginSetting['blockcall']: + pluginname = "blocking " + pluginname + fun_body = "global " + currPluginSetting['functionname']+"\n" + fun_body += "def " + currPluginSetting['functionname'] + "(script=None, inputEvent=None):\n" + if currPluginSetting['startnotify']: + fun_body +=" outputMessage('start " + pluginname + "')\n" + fun_body +=" p = Popen(" + currplugin + ", stdout=PIPE, stderr=PIPE, shell=True)\n" + fun_body +=" stdout, stderr = p.communicate()\n" + fun_body +=" message = ''\n" + fun_body +=" if not " + str(currPluginSetting['supressoutput']) + " and stdout:\n" + fun_body +=" message += str(stdout, \"utf-8\")\n" + fun_body +=" if " + str(currPluginSetting['error']) + " and stderr:\n" + fun_body +=" message += ' error: ' + str(stderr, \"utf-8\")\n" + fun_body +=" outputMessage( message)\n" + if currPluginSetting['stopnotify']: + fun_body +=" outputMessage('finish " + pluginname + "')\n" + fun_body +=" return True\n\n" + fun_body += "global " + currPluginSetting['functionname']+"T\n" + fun_body +="def " + currPluginSetting['functionname'] + "T(script=None, inputEvent=None):\n" + fun_body +=" _thread.start_new_thread("+ currPluginSetting['functionname'] + ",(script, inputEvent))\n\n" + return fun_body - def _get_function_name(self, settings): - """Generate unique function name for plugin""" - fn = "_" + settings['pluginname'].replace("-", "_") - settings['functionname'] = fn - while (settings['functionname'] == fn or - settings['functionname'] + 'T' in globals() or - settings['functionname'] in globals()): - settings['functionname'] = self.id_generator() + fn - return settings + def buildPluginExec(self, currPluginSetting): + pluginname = currPluginSetting['pluginname'] + if currPluginSetting['blockcall']: + pluginname = "blocking " + pluginname + fun_body = "global " + currPluginSetting['functionname']+"\n" + fun_body += "def " + currPluginSetting['functionname'] + "(script=None, inputEvent=None):\n" + if currPluginSetting['startnotify']: + fun_body +=" outputMessage('start " + pluginname + "')\n" + fun_body += " try:\n" + fun_body += " spec = importlib.util.spec_from_file_location(\"" + currPluginSetting['functionname'] + "\",\""+ currPluginSetting['file']+"\")\n" + fun_body += " "+currPluginSetting['functionname'] + "Module = importlib.util.module_from_spec(spec)\n" + fun_body += " spec.loader.exec_module(" + currPluginSetting['functionname'] + "Module)\n" + fun_body += " except:\n" + fun_body += " pass\n" + if currPluginSetting['error']: + fun_body += " outputMessage(\"Error while executing " + pluginname + "\")\n" + if currPluginSetting['stopnotify']: + fun_body +=" outputMessage('finish " + pluginname + "')\n" + fun_body += " return True\n\n" + fun_body += "global " + currPluginSetting['functionname']+"T\n" + fun_body +="def " + currPluginSetting['functionname'] + "T(script=None, inputEvent=None):\n" + fun_body +=" _thread.start_new_thread("+ currPluginSetting['functionname'] + ",(script, inputEvent))\n\n" + return fun_body - def _load_plugins(self): - """Load all plugins from the plugin directory""" - plugin_list = glob.glob(self.plugin_repo + '*') - for curr_plugin in plugin_list: - settings = self.get_plugin_settings(curr_plugin) - if not settings['valid']: - continue + def getFunctionName(self, currPluginSetting): + currPluginSetting['functionname'] = '' + while currPluginSetting['functionname'] == '' or currPluginSetting['functionname'] + 'T' in globals() or currPluginSetting['functionname'] in globals(): + currPluginSetting['functionname'] = self.id_generator() + return currPluginSetting + + def load_plugins(self): + if not self.loaded: + self.plugin_list = glob.glob(self.plugin_repo+'*') + for currplugin in self.plugin_list: + currPluginSetting = self.initSettings() + currPluginSetting = self.getPluginSettings(currplugin, currPluginSetting) + + if not currPluginSetting['valid']: + continue + + currPluginSetting = self.getFunctionName(currPluginSetting) - settings = self._get_function_name(settings) - - # Build and execute function definition - if settings['loadmodule']: - exec(self._build_plugin_exec(settings)) - else: - exec(self._build_plugin_subprocess(settings)) + if currPluginSetting['loadmodule']: + exec(self.buildPluginExec(currPluginSetting)) # load as python module + else: + exec(self.buildPluginSubprocess(currPluginSetting)) # run as subprocess + + if currPluginSetting['blockcall']: + currPluginSetting['function'] = globals()[currPluginSetting['functionname']] # non threaded + else: + currPluginSetting['function'] = globals()[currPluginSetting['functionname']+"T"] # T = Threaded + - # Set up function reference - if settings['blockcall']: - settings['function'] = globals()[settings['functionname']] - else: - settings['function'] = globals()[settings['functionname'] + "T"] - - # Execute if needed - if settings['exec']: - settings['function']() - - # Set up keybinding if needed - if settings['key']: - self.setup_shortcut_and_handle(settings) - - self.plugin_list.append(settings) - - self.loaded = True + if currPluginSetting['exec']: # exec on load if we want + currPluginSetting['function']() + + if not currPluginSetting['key'] == '': + currPluginSetting = self.SetupShortcutAndHandle(currPluginSetting) + print(currPluginSetting) + self.plugin_list.append(currPluginSetting) # store in a list + self.loaded = True