fix all syntax issues in SOPS

This commit is contained in:
Chrys 2024-10-24 00:08:25 +02:00
parent 087dcc3ab2
commit 0abe30791d
2 changed files with 234 additions and 255 deletions

View File

@ -7,3 +7,5 @@ Authors=Chrys <chrys@linux-a11y.org>;Storm Dragon <storm_dragon@stormux.org>
Copyright=Copyright Â2024 Chrys, Storm Dragon Copyright=Copyright Â2024 Chrys, Storm Dragon
Website=https://git.stormux.org/storm/cthulhu Website=https://git.stormux.org/storm/cthulhu
Version=1.0 Version=1.0
Builtin=true

View File

@ -1,295 +1,272 @@
from gi.repository import GObject, Peas, Cthulhu from cthulhu import plugin
from gi.repository import GObject, Peas
import glob import glob
import os import os
import importlib.util import importlib.util
import random import random
import string import string
import threading import _thread
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
class SimplePluginSystem(GObject.Object, Cthulhu.PluginInterface): settings = None
keybindings = None
speech = None
braille = None
input_event = None
def outputMessage( Message):
if (settings.enableSpeech):
speech.speak(Message)
if (settings.enableBraille):
braille.displayMessage(Message)
class SimplePluginSystem(GObject.Object, Peas.Activatable, plugin.Plugin):
__gtype_name__ = 'SimplePluginSystem' __gtype_name__ = 'SimplePluginSystem'
object = GObject.Property(type=GObject.Object)
def __init__(self): def __init__(self):
super().__init__() plugin.Plugin.__init__(self)
self.plugin_list = [] self.plugin_list = []
self.loaded = False self.loaded = False
self.my_key_bindings = Cthulhu.keybindings.KeyBindings() self.my_key_bindings = None
self.plugin_repo = os.path.expanduser('~') + "/.local/share/cthulhu/plugins/" self.plugin_repo = os.path.expanduser('~') + "/.local/share/cthulhu/simple-plugins-enabled/"
def do_activate(self): def do_activate(self):
"""Required method for Cthulhu plugins""" API = self.object
global settings
global keybindings
global speech
global braille
global input_event
settings = API.app.getDynamicApiManager().getAPI('Settings')
keybindings = API.app.getDynamicApiManager().getAPI('Keybindings')
speech = API.app.getDynamicApiManager().getAPI('Speech')
braille = API.app.getDynamicApiManager().getAPI('Braille')
input_event = API.app.getDynamicApiManager().getAPI('InputEvent')
"""Required method for plugins"""
self.my_key_bindings = keybindings.KeyBindings()
if not self.loaded: if not self.loaded:
self._load_plugins() self.load_plugins()
def do_deactivate(self): def do_deactivate(self):
"""Required method for Cthulhu plugins""" """Required method for plugins"""
# Remove all registered keybindings # Remove all registered keybindings
for plugin in self.plugin_list: for plugin in self.plugin_list:
if plugin.get('inputeventhandler'): if plugin.get('inputeventhandler'):
self.my_key_bindings.remove(plugin['inputeventhandler']) self.my_key_bindings.remove(plugin['inputeventhandler'])
Cthulhu.settings.keyBindingsMap["default"] = Cthulhu.keybindings.KeyBindings() settings.keyBindingsMap["default"] = keybindings.KeyBindings()
self.loaded = False self.loaded = False
self.plugin_list = [] self.plugin_list = []
def output_message(self, message): def SetupShortcutAndHandle(self, currPluginSetting):
"""Output message through speech and/or braille""" currPluginSetting['inputeventhandler'] = input_event.InputEventHandler(currPluginSetting['function'], currPluginSetting['pluginname'])
if Cthulhu.settings.enableSpeech: # just the modifier
Cthulhu.speech.speak(message) if not currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and not currPluginSetting['altkey']:
if Cthulhu.settings.enableBraille: self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_MODIFIER_MASK, currPluginSetting['inputeventhandler']))
Cthulhu.braille.displayMessage(message) # + alt
if not currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and currPluginSetting['altkey']:
self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_ALT_MODIFIER_MASK, currPluginSetting['inputeventhandler']))
# + CTRL
if not currPluginSetting['shiftkey'] and currPluginSetting['ctrlkey'] and not currPluginSetting['altkey']:
self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_CTRL_MODIFIER_MASK, currPluginSetting['inputeventhandler']))
# + alt + CTRL
if not currPluginSetting['shiftkey'] and currPluginSetting['ctrlkey'] and currPluginSetting['altkey']:
self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_CTRL_ALT_MODIFIER_MASK, currPluginSetting['inputeventhandler']))
# + shift
if currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and not currPluginSetting['altkey']:
self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.CTHULHU_SHIFT_MODIFIER_MASK, currPluginSetting['inputeventhandler']))
# alt + shift
if currPluginSetting['shiftkey'] and not currPluginSetting['ctrlkey'] and currPluginSetting['altkey']:
self.my_key_bindings.add(keybindings.KeyBinding(currPluginSetting['key'], keybindings.defaultModifierMask, keybindings.SHIFT_ALT_MODIFIER_MASK, currPluginSetting['inputeventhandler']))
print(self.my_key_bindings, currPluginSetting['function'])
def setup_shortcut_and_handle(self, settings): settings.keyBindingsMap["default"] = self.my_key_bindings
"""Setup keyboard shortcuts for plugins""" return currPluginSetting
settings['inputeventhandler'] = Cthulhu.input_event.InputEventHandler(
settings['function'], settings['pluginname'])
# Create key binding based on modifier combinations
modifiers = Cthulhu.keybindings.defaultModifierMask
if settings['shiftkey'] and not settings['ctrlkey'] and not settings['altkey']:
mask = Cthulhu.keybindings.CTHULHU_SHIFT_MODIFIER_MASK
elif not settings['shiftkey'] and settings['ctrlkey'] and not settings['altkey']:
mask = Cthulhu.keybindings.CTHULHU_CTRL_MODIFIER_MASK
elif not settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']:
mask = Cthulhu.keybindings.CTHULHU_ALT_MODIFIER_MASK
elif not settings['shiftkey'] and settings['ctrlkey'] and settings['altkey']:
mask = Cthulhu.keybindings.CTHULHU_CTRL_ALT_MODIFIER_MASK
elif settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']:
mask = Cthulhu.keybindings.SHIFT_ALT_MODIFIER_MASK
else:
mask = Cthulhu.keybindings.CTHULHU_MODIFIER_MASK
self.my_key_bindings.add(
Cthulhu.keybindings.KeyBinding(
settings['key'], modifiers, mask, settings['inputeventhandler']
)
)
Cthulhu.settings.keyBindingsMap["default"] = self.my_key_bindings
def id_generator(self, size=7, chars=string.ascii_letters): def id_generator(self, size=7, chars=string.ascii_letters):
"""Generate random identifier"""
return ''.join(random.choice(chars) for _ in range(size)) return ''.join(random.choice(chars) for _ in range(size))
def init_settings(self): def initSettings(self):
"""Initialize default settings dictionary""" currPluginSetting={
return { 'filepath':'',
'filepath': '', 'pluginname':'',
'pluginname': '', 'functionname':'',
'functionname': '', 'key':'',
'key': '', 'shiftkey':False,
'shiftkey': False, 'ctrlkey':False,
'ctrlkey': False, 'altkey':False,
'altkey': False, 'startnotify':False,
'startnotify': False, 'stopnotify':False,
'stopnotify': False, 'blockcall':False,
'blockcall': False, 'error':False,
'error': False, 'exec': False,
'exec': False, 'parameters':'',
'executeable': False, 'function':None,
'parameters': '', 'inputeventhandler':None,
'function': None, 'valid':False,
'inputeventhandler': None, 'supressoutput':False
'valid': False,
'supressoutput': False
} }
return currPluginSetting
def get_plugin_settings(self, filepath): def getPluginSettings(self, filepath, currPluginSetting):
"""Parse plugin settings from filename and content"""
settings = self.init_settings()
try: try:
settings['file'] = filepath currPluginSetting['file'] = filepath
fileName, fileExtension = os.path.splitext(filepath) fileName, fileExtension = os.path.splitext(filepath)
if fileExtension and fileExtension != '': if (fileExtension and (fileExtension != '')): #if there is an extension
settings['loadable'] = (fileExtension.lower() == '.py') currPluginSetting['loadable'] = (fileExtension.lower() == '.py') # only python is loadable
filename = os.path.basename(filepath) #filename
filename = os.path.basename(filepath) filename = os.path.splitext(filename)[0] #remove extension if we have one
filename = os.path.splitext(filename)[0] #remove pluginname seperated by __-__
filenamehelper = filename.split('__-__')
# Parse filename components filename = filenamehelper[len(filenamehelper) - 1 ]
name_parts = filename.split('__-__') currPluginSetting['permission'] = os.access(filepath, os.X_OK )
if len(name_parts) == 2: currPluginSetting['pluginname'] = 'NoNameAvailable'
settings['pluginname'] = name_parts[0] if len(filenamehelper) == 2:
else: currPluginSetting['pluginname'] = filenamehelper[0]
settings['pluginname'] = 'NoNameAvailable' #now get shortcuts seperated by __+__
filenamehelper = filename.split('__+__')
# Parse settings from filename if len([y for y in filenamehelper if 'parameters_' in y.lower()]) == 1 and\
setting_parts = name_parts[-1].split('__+__') len([y for y in filenamehelper if 'parameters_' in y.lower()][0]) > 11:
currPluginSetting['parameters'] = [y for y in filenamehelper if 'parameters_' in y.lower()][0][11:]
# Extract parameters and key if len([y for y in filenamehelper if 'key_' in y.lower()]) == 1 and\
for part in setting_parts: len([y for y in filenamehelper if 'key_' in y.lower()][0]) > 4 :
if part.lower().startswith('parameters_'): currPluginSetting['key'] = [y for y in filenamehelper if 'key_' in y.lower()][0][4]
settings['parameters'] = part[11:] if currPluginSetting['key'] == '':
elif part.lower().startswith('key_'): settcurrPluginSetting = 'shift' in map(str.lower, filenamehelper)
settings['key'] = part[4:] currPluginSetting['ctrlkey'] = 'control' in map(str.lower, filenamehelper)
currPluginSetting['altkey'] = 'alt' in map(str.lower, filenamehelper)
if not settings['key']: currPluginSetting['startnotify'] = 'startnotify' in map(str.lower, filenamehelper)
settings['key'] = setting_parts[-1].lower() currPluginSetting['stopnotify'] = 'stopnotify' in map(str.lower, filenamehelper)
currPluginSetting['blockcall'] = 'blockcall' in map(str.lower, filenamehelper)
# Parse boolean flags currPluginSetting['error'] = 'error' in map(str.lower, filenamehelper)
lower_parts = list(map(str.lower, setting_parts)) currPluginSetting['supressoutput'] = 'supressoutput' in map(str.lower, filenamehelper)
settings.update({ currPluginSetting['exec'] = 'exec' in map(str.lower, filenamehelper)
'shiftkey': 'shift' in lower_parts, currPluginSetting['loadmodule'] = 'loadmodule' in map(str.lower, filenamehelper)
'ctrlkey': 'control' in lower_parts, currPluginSetting = self.readSettingsFromPlugin(currPluginSetting)
'altkey': 'alt' in lower_parts, if not currPluginSetting['loadmodule']:
'startnotify': 'startnotify' in lower_parts, if not currPluginSetting['permission']: #subprocessing only works with exec permission
'stopnotify': 'stopnotify' in lower_parts, return self.initSettings()
'blockcall': 'blockcall' in lower_parts, if currPluginSetting['loadmodule'] and not currPluginSetting['loadable']: #sorry.. its not loadable only .py is loadable
'error': 'error' in lower_parts, return self.initSettings()
'supressoutput': 'supressoutput' in lower_parts, if (len(currPluginSetting['key']) > 1): #no shortcut
'exec': 'exec' in lower_parts, if not currPluginSetting['exec']: # and no exec -> the plugin make no sense because it isnt hooked anywhere
'loadmodule': 'loadmodule' in lower_parts return self.initSettings() #so not load it (sets valid = False)
}) else:
currPluginSetting['key'] = '' #there is a strange key, but exec? ignore the key..
settings = self._read_settings_from_plugin(settings) currPluginSetting['valid'] = True # we could load everything
return currPluginSetting
# Validate settings except:
if not settings['loadmodule'] and not os.access(filepath, os.X_OK): return self.initSettings()
return self.init_settings()
if settings['loadmodule'] and not settings['loadable']:
return self.init_settings()
if len(settings['key']) < 1 and not settings['exec']:
return self.init_settings()
settings['valid'] = True
return settings
except Exception:
return self.init_settings()
def _read_settings_from_plugin(self, settings): def readSettingsFromPlugin(self, currPluginSetting):
"""Read additional settings from plugin file content""" if not os.access(currPluginSetting['file'], os.R_OK ):
if not os.access(settings['file'], os.R_OK): return currPluginSetting
return settings fileName, fileExtension = os.path.splitext(currPluginSetting['file'])
if (fileExtension and (fileExtension != '')): #if there is an extension
_, ext = os.path.splitext(settings['file']) if (fileExtension.lower() != '.py') and \
if ext and ext.lower() not in ['.py', '.sh']: (fileExtension.lower() != '.sh'):
return settings return currPluginSetting
else:
try: return currPluginSetting
with open(settings['file'], "r") as plugin_file:
for line in plugin_file:
line = line.lower().replace(" ", "")
if 'sopsproperty:' in line:
prop = line.split('sopsproperty:')[1].strip()
if prop in settings:
settings[prop] = True
except Exception:
pass
return settings
def _build_plugin_subprocess(self, settings): with open(currPluginSetting['file'], "r") as pluginFile:
"""Build function body for subprocess-based plugins""" for line in pluginFile:
curr_plugin = f"'\"{settings['file']}\" {settings['parameters']}'" currPluginSetting['shiftkey'] = ('sopsproperty:shift' in line.lower().replace(" ", "")) or currPluginSetting['shiftkey']
plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname'] currPluginSetting['ctrlkey'] = ('sopsproperty:control' in line.lower().replace(" ", "")) or currPluginSetting['ctrlkey']
currPluginSetting['altkey'] = ('sopsproperty:alt' in line.lower().replace(" ", "")) or currPluginSetting['altkey']
func_body = [ currPluginSetting['startnotify'] = ('sopsproperty:startnotify' in line.lower().replace(" ", "")) or currPluginSetting['startnotify']
f"def {settings['functionname']}(script=None, inputEvent=None):" currPluginSetting['stopnotify'] = ('sopsproperty:stopnotify' in line.lower().replace(" ", "")) or currPluginSetting['stopnotify']
] currPluginSetting['blockcall'] = ('sopsproperty:blockcall' in line.lower().replace(" ", "")) or currPluginSetting['blockcall']
currPluginSetting['error'] = ('sopsproperty:error' in line.lower().replace(" ", "")) or currPluginSetting['error']
if settings['startnotify']: currPluginSetting['supressoutput'] = ('sopsproperty:supressoutput' in line.lower().replace(" ", "")) or currPluginSetting['supressoutput']
func_body.append(f" self.output_message('start {plugin_name}')") currPluginSetting['exec'] = ('sopsproperty:exec' in line.lower().replace(" ", "")) or currPluginSetting['exec']
currPluginSetting['loadmodule'] = ('sopsproperty:loadmodule' in line.lower().replace(" ", "")) or currPluginSetting['loadmodule']
func_body.extend([ return currPluginSetting
f" p = Popen({curr_plugin}, stdout=PIPE, stderr=PIPE, shell=True)",
" stdout, stderr = p.communicate()",
" message = ''",
f" if not {settings['supressoutput']} and stdout:",
' message += str(stdout, "utf-8")',
f" if {settings['error']} and stderr:",
' message += \' error: \' + str(stderr, "utf-8")',
" self.output_message(message)"
])
if settings['stopnotify']:
func_body.append(f" self.output_message('finish {plugin_name}')")
func_body.append(" return True\n")
# Add threaded version
func_body.extend([
f"def {settings['functionname']}T(script=None, inputEvent=None):",
f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()"
])
return "\n".join(func_body)
def _build_plugin_exec(self, settings): def buildPluginSubprocess(self, currPluginSetting):
"""Build function body for Python module-based plugins""" currplugin = "\'\"" + currPluginSetting['file'] + "\" " + currPluginSetting['parameters'] + "\'"
plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname'] pluginname = currPluginSetting['pluginname']
if currPluginSetting['blockcall']:
func_body = [ pluginname = "blocking " + pluginname
f"def {settings['functionname']}(script=None, inputEvent=None):" fun_body = "global " + currPluginSetting['functionname']+"\n"
] fun_body += "def " + currPluginSetting['functionname'] + "(script=None, inputEvent=None):\n"
if currPluginSetting['startnotify']:
if settings['startnotify']: fun_body +=" outputMessage('start " + pluginname + "')\n"
func_body.append(f" self.output_message('start {plugin_name}')") fun_body +=" p = Popen(" + currplugin + ", stdout=PIPE, stderr=PIPE, shell=True)\n"
fun_body +=" stdout, stderr = p.communicate()\n"
func_body.extend([ fun_body +=" message = ''\n"
" try:", fun_body +=" if not " + str(currPluginSetting['supressoutput']) + " and stdout:\n"
f' spec = importlib.util.spec_from_file_location("{settings["functionname"]}", "{settings["file"]}")', fun_body +=" message += str(stdout, \"utf-8\")\n"
f" {settings['functionname']}Module = importlib.util.module_from_spec(spec)", fun_body +=" if " + str(currPluginSetting['error']) + " and stderr:\n"
f" spec.loader.exec_module({settings['functionname']}Module)", fun_body +=" message += ' error: ' + str(stderr, \"utf-8\")\n"
" except:", fun_body +=" outputMessage( message)\n"
" pass" if currPluginSetting['stopnotify']:
]) fun_body +=" outputMessage('finish " + pluginname + "')\n"
fun_body +=" return True\n\n"
if settings['error']: fun_body += "global " + currPluginSetting['functionname']+"T\n"
func_body.append(f' self.output_message("Error while executing {plugin_name}")') fun_body +="def " + currPluginSetting['functionname'] + "T(script=None, inputEvent=None):\n"
fun_body +=" _thread.start_new_thread("+ currPluginSetting['functionname'] + ",(script, inputEvent))\n\n"
if settings['stopnotify']: return fun_body
func_body.append(f" self.output_message('finish {plugin_name}')")
func_body.append(" return True\n")
# Add threaded version
func_body.extend([
f"def {settings['functionname']}T(script=None, inputEvent=None):",
f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()"
])
return "\n".join(func_body)
def _get_function_name(self, settings): def buildPluginExec(self, currPluginSetting):
"""Generate unique function name for plugin""" pluginname = currPluginSetting['pluginname']
fn = "_" + settings['pluginname'].replace("-", "_") if currPluginSetting['blockcall']:
settings['functionname'] = fn pluginname = "blocking " + pluginname
while (settings['functionname'] == fn or fun_body = "global " + currPluginSetting['functionname']+"\n"
settings['functionname'] + 'T' in globals() or fun_body += "def " + currPluginSetting['functionname'] + "(script=None, inputEvent=None):\n"
settings['functionname'] in globals()): if currPluginSetting['startnotify']:
settings['functionname'] = self.id_generator() + fn fun_body +=" outputMessage('start " + pluginname + "')\n"
return settings fun_body += " try:\n"
fun_body += " spec = importlib.util.spec_from_file_location(\"" + currPluginSetting['functionname'] + "\",\""+ currPluginSetting['file']+"\")\n"
fun_body += " "+currPluginSetting['functionname'] + "Module = importlib.util.module_from_spec(spec)\n"
fun_body += " spec.loader.exec_module(" + currPluginSetting['functionname'] + "Module)\n"
fun_body += " except:\n"
fun_body += " pass\n"
if currPluginSetting['error']:
fun_body += " outputMessage(\"Error while executing " + pluginname + "\")\n"
if currPluginSetting['stopnotify']:
fun_body +=" outputMessage('finish " + pluginname + "')\n"
fun_body += " return True\n\n"
fun_body += "global " + currPluginSetting['functionname']+"T\n"
fun_body +="def " + currPluginSetting['functionname'] + "T(script=None, inputEvent=None):\n"
fun_body +=" _thread.start_new_thread("+ currPluginSetting['functionname'] + ",(script, inputEvent))\n\n"
return fun_body
def _load_plugins(self): def getFunctionName(self, currPluginSetting):
"""Load all plugins from the plugin directory""" currPluginSetting['functionname'] = ''
plugin_list = glob.glob(self.plugin_repo + '*') while currPluginSetting['functionname'] == '' or currPluginSetting['functionname'] + 'T' in globals() or currPluginSetting['functionname'] in globals():
for curr_plugin in plugin_list: currPluginSetting['functionname'] = self.id_generator()
settings = self.get_plugin_settings(curr_plugin) return currPluginSetting
if not settings['valid']:
continue def load_plugins(self):
if not self.loaded:
self.plugin_list = glob.glob(self.plugin_repo+'*')
for currplugin in self.plugin_list:
currPluginSetting = self.initSettings()
currPluginSetting = self.getPluginSettings(currplugin, currPluginSetting)
if not currPluginSetting['valid']:
continue
currPluginSetting = self.getFunctionName(currPluginSetting)
settings = self._get_function_name(settings) if currPluginSetting['loadmodule']:
exec(self.buildPluginExec(currPluginSetting)) # load as python module
# Build and execute function definition else:
if settings['loadmodule']: exec(self.buildPluginSubprocess(currPluginSetting)) # run as subprocess
exec(self._build_plugin_exec(settings))
else: if currPluginSetting['blockcall']:
exec(self._build_plugin_subprocess(settings)) currPluginSetting['function'] = globals()[currPluginSetting['functionname']] # non threaded
else:
currPluginSetting['function'] = globals()[currPluginSetting['functionname']+"T"] # T = Threaded
# Set up function reference if currPluginSetting['exec']: # exec on load if we want
if settings['blockcall']: currPluginSetting['function']()
settings['function'] = globals()[settings['functionname']]
else: if not currPluginSetting['key'] == '':
settings['function'] = globals()[settings['functionname'] + "T"] currPluginSetting = self.SetupShortcutAndHandle(currPluginSetting)
print(currPluginSetting)
# Execute if needed self.plugin_list.append(currPluginSetting) # store in a list
if settings['exec']: self.loaded = True
settings['function']()
# Set up keybinding if needed
if settings['key']:
self.setup_shortcut_and_handle(settings)
self.plugin_list.append(settings)
self.loaded = True