Initial attempt at chaning SOPS into a plugin for Cthulhu.
This commit is contained in:
parent
9f2a349239
commit
8a9bbefeac
7
src/cthulhu/plugins/SimplePluginSystem/Makefile.am
Normal file
7
src/cthulhu/plugins/SimplePluginSystem/Makefile.am
Normal file
@ -0,0 +1,7 @@
|
||||
cthulhu_python_PYTHON = \
|
||||
__init__.py \
|
||||
SimplePluginSystem.plugin \
|
||||
SimplePluginSystem.py
|
||||
|
||||
cthulhu_pythondir=$(pkgpythondir)/plugins/SimplePluginSystem
|
||||
|
@ -0,0 +1,9 @@
|
||||
[Plugin]
|
||||
Module=SimplePluginSystem
|
||||
Loader=python3
|
||||
Name=Simple Plugin System
|
||||
Description=Simple plugin system implementation for Cthulhu
|
||||
Authors=Chrys <chrys@linux-a11y.org>
|
||||
Copyright=Copyright Â2024 Chrys, Storm Dragon
|
||||
Website=https://git.stormux.org/storm/cthulhu
|
||||
Version=1.0
|
295
src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py
Normal file
295
src/cthulhu/plugins/SimplePluginSystem/SimplePluginSystem.py
Normal file
@ -0,0 +1,295 @@
|
||||
from gi.repository import GObject, Peas, Cthulhu
|
||||
import glob
|
||||
import os
|
||||
import importlib.util
|
||||
import random
|
||||
import string
|
||||
import threading
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
class SimplePluginSystem(GObject.Object, Cthulhu.PluginInterface):
|
||||
__gtype_name__ = 'SimplePluginSystem'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.plugin_list = []
|
||||
self.loaded = False
|
||||
self.my_key_bindings = Cthulhu.keybindings.KeyBindings()
|
||||
self.plugin_repo = os.path.expanduser('~') + "/.local/share/cthulhu/plugins/"
|
||||
|
||||
def do_activate(self):
|
||||
"""Required method for Cthulhu plugins"""
|
||||
if not self.loaded:
|
||||
self._load_plugins()
|
||||
|
||||
def do_deactivate(self):
|
||||
"""Required method for Cthulhu plugins"""
|
||||
# Remove all registered keybindings
|
||||
for plugin in self.plugin_list:
|
||||
if plugin.get('inputeventhandler'):
|
||||
self.my_key_bindings.remove(plugin['inputeventhandler'])
|
||||
Cthulhu.settings.keyBindingsMap["default"] = Cthulhu.keybindings.KeyBindings()
|
||||
self.loaded = False
|
||||
self.plugin_list = []
|
||||
|
||||
def output_message(self, message):
|
||||
"""Output message through speech and/or braille"""
|
||||
if Cthulhu.settings.enableSpeech:
|
||||
Cthulhu.speech.speak(message)
|
||||
if Cthulhu.settings.enableBraille:
|
||||
Cthulhu.braille.displayMessage(message)
|
||||
|
||||
def setup_shortcut_and_handle(self, settings):
|
||||
"""Setup keyboard shortcuts for plugins"""
|
||||
settings['inputeventhandler'] = Cthulhu.input_event.InputEventHandler(
|
||||
settings['function'], settings['pluginname'])
|
||||
|
||||
# Create key binding based on modifier combinations
|
||||
modifiers = Cthulhu.keybindings.defaultModifierMask
|
||||
if settings['shiftkey'] and not settings['ctrlkey'] and not settings['altkey']:
|
||||
mask = Cthulhu.keybindings.ORCA_SHIFT_MODIFIER_MASK
|
||||
elif not settings['shiftkey'] and settings['ctrlkey'] and not settings['altkey']:
|
||||
mask = Cthulhu.keybindings.ORCA_CTRL_MODIFIER_MASK
|
||||
elif not settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']:
|
||||
mask = Cthulhu.keybindings.ORCA_ALT_MODIFIER_MASK
|
||||
elif not settings['shiftkey'] and settings['ctrlkey'] and settings['altkey']:
|
||||
mask = Cthulhu.keybindings.ORCA_CTRL_ALT_MODIFIER_MASK
|
||||
elif settings['shiftkey'] and not settings['ctrlkey'] and settings['altkey']:
|
||||
mask = Cthulhu.keybindings.SHIFT_ALT_MODIFIER_MASK
|
||||
else:
|
||||
mask = Cthulhu.keybindings.ORCA_MODIFIER_MASK
|
||||
|
||||
self.my_key_bindings.add(
|
||||
Cthulhu.keybindings.KeyBinding(
|
||||
settings['key'], modifiers, mask, settings['inputeventhandler']
|
||||
)
|
||||
)
|
||||
Cthulhu.settings.keyBindingsMap["default"] = self.my_key_bindings
|
||||
|
||||
def id_generator(self, size=7, chars=string.ascii_letters):
|
||||
"""Generate random identifier"""
|
||||
return ''.join(random.choice(chars) for _ in range(size))
|
||||
|
||||
def init_settings(self):
|
||||
"""Initialize default settings dictionary"""
|
||||
return {
|
||||
'filepath': '',
|
||||
'pluginname': '',
|
||||
'functionname': '',
|
||||
'key': '',
|
||||
'shiftkey': False,
|
||||
'ctrlkey': False,
|
||||
'altkey': False,
|
||||
'startnotify': False,
|
||||
'stopnotify': False,
|
||||
'blockcall': False,
|
||||
'error': False,
|
||||
'exec': False,
|
||||
'executeable': False,
|
||||
'parameters': '',
|
||||
'function': None,
|
||||
'inputeventhandler': None,
|
||||
'valid': False,
|
||||
'supressoutput': False
|
||||
}
|
||||
|
||||
def get_plugin_settings(self, filepath):
|
||||
"""Parse plugin settings from filename and content"""
|
||||
settings = self.init_settings()
|
||||
try:
|
||||
settings['file'] = filepath
|
||||
fileName, fileExtension = os.path.splitext(filepath)
|
||||
if fileExtension and fileExtension != '':
|
||||
settings['loadable'] = (fileExtension.lower() == '.py')
|
||||
|
||||
filename = os.path.basename(filepath)
|
||||
filename = os.path.splitext(filename)[0]
|
||||
|
||||
# Parse filename components
|
||||
name_parts = filename.split('__-__')
|
||||
if len(name_parts) == 2:
|
||||
settings['pluginname'] = name_parts[0]
|
||||
else:
|
||||
settings['pluginname'] = 'NoNameAvailable'
|
||||
|
||||
# Parse settings from filename
|
||||
setting_parts = name_parts[-1].split('__+__')
|
||||
|
||||
# Extract parameters and key
|
||||
for part in setting_parts:
|
||||
if part.lower().startswith('parameters_'):
|
||||
settings['parameters'] = part[11:]
|
||||
elif part.lower().startswith('key_'):
|
||||
settings['key'] = part[4:]
|
||||
|
||||
if not settings['key']:
|
||||
settings['key'] = setting_parts[-1].lower()
|
||||
|
||||
# Parse boolean flags
|
||||
lower_parts = list(map(str.lower, setting_parts))
|
||||
settings.update({
|
||||
'shiftkey': 'shift' in lower_parts,
|
||||
'ctrlkey': 'control' in lower_parts,
|
||||
'altkey': 'alt' in lower_parts,
|
||||
'startnotify': 'startnotify' in lower_parts,
|
||||
'stopnotify': 'stopnotify' in lower_parts,
|
||||
'blockcall': 'blockcall' in lower_parts,
|
||||
'error': 'error' in lower_parts,
|
||||
'supressoutput': 'supressoutput' in lower_parts,
|
||||
'exec': 'exec' in lower_parts,
|
||||
'loadmodule': 'loadmodule' in lower_parts
|
||||
})
|
||||
|
||||
settings = self._read_settings_from_plugin(settings)
|
||||
|
||||
# Validate settings
|
||||
if not settings['loadmodule'] and not os.access(filepath, os.X_OK):
|
||||
return self.init_settings()
|
||||
if settings['loadmodule'] and not settings['loadable']:
|
||||
return self.init_settings()
|
||||
if len(settings['key']) < 1 and not settings['exec']:
|
||||
return self.init_settings()
|
||||
|
||||
settings['valid'] = True
|
||||
return settings
|
||||
|
||||
except Exception:
|
||||
return self.init_settings()
|
||||
|
||||
def _read_settings_from_plugin(self, settings):
|
||||
"""Read additional settings from plugin file content"""
|
||||
if not os.access(settings['file'], os.R_OK):
|
||||
return settings
|
||||
|
||||
_, ext = os.path.splitext(settings['file'])
|
||||
if ext and ext.lower() not in ['.py', '.sh']:
|
||||
return settings
|
||||
|
||||
try:
|
||||
with open(settings['file'], "r") as plugin_file:
|
||||
for line in plugin_file:
|
||||
line = line.lower().replace(" ", "")
|
||||
if 'sopsproperty:' in line:
|
||||
prop = line.split('sopsproperty:')[1].strip()
|
||||
if prop in settings:
|
||||
settings[prop] = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return settings
|
||||
|
||||
def _build_plugin_subprocess(self, settings):
|
||||
"""Build function body for subprocess-based plugins"""
|
||||
curr_plugin = f"'\"{settings['file']}\" {settings['parameters']}'"
|
||||
plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname']
|
||||
|
||||
func_body = [
|
||||
f"def {settings['functionname']}(script=None, inputEvent=None):"
|
||||
]
|
||||
|
||||
if settings['startnotify']:
|
||||
func_body.append(f" self.output_message('start {plugin_name}')")
|
||||
|
||||
func_body.extend([
|
||||
f" p = Popen({curr_plugin}, stdout=PIPE, stderr=PIPE, shell=True)",
|
||||
" stdout, stderr = p.communicate()",
|
||||
" message = ''",
|
||||
f" if not {settings['supressoutput']} and stdout:",
|
||||
' message += str(stdout, "utf-8")',
|
||||
f" if {settings['error']} and stderr:",
|
||||
' message += \' error: \' + str(stderr, "utf-8")',
|
||||
" self.output_message(message)"
|
||||
])
|
||||
|
||||
if settings['stopnotify']:
|
||||
func_body.append(f" self.output_message('finish {plugin_name}')")
|
||||
|
||||
func_body.append(" return True\n")
|
||||
|
||||
# Add threaded version
|
||||
func_body.extend([
|
||||
f"def {settings['functionname']}T(script=None, inputEvent=None):",
|
||||
f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()"
|
||||
])
|
||||
|
||||
return "\n".join(func_body)
|
||||
|
||||
def _build_plugin_exec(self, settings):
|
||||
"""Build function body for Python module-based plugins"""
|
||||
plugin_name = f"blocking {settings['pluginname']}" if settings['blockcall'] else settings['pluginname']
|
||||
|
||||
func_body = [
|
||||
f"def {settings['functionname']}(script=None, inputEvent=None):"
|
||||
]
|
||||
|
||||
if settings['startnotify']:
|
||||
func_body.append(f" self.output_message('start {plugin_name}')")
|
||||
|
||||
func_body.extend([
|
||||
" try:",
|
||||
f' spec = importlib.util.spec_from_file_location("{settings["functionname"]}", "{settings["file"]}")',
|
||||
f" {settings['functionname']}Module = importlib.util.module_from_spec(spec)",
|
||||
f" spec.loader.exec_module({settings['functionname']}Module)",
|
||||
" except:",
|
||||
" pass"
|
||||
])
|
||||
|
||||
if settings['error']:
|
||||
func_body.append(f' self.output_message("Error while executing {plugin_name}")')
|
||||
|
||||
if settings['stopnotify']:
|
||||
func_body.append(f" self.output_message('finish {plugin_name}')")
|
||||
|
||||
func_body.append(" return True\n")
|
||||
|
||||
# Add threaded version
|
||||
func_body.extend([
|
||||
f"def {settings['functionname']}T(script=None, inputEvent=None):",
|
||||
f" threading.Thread(target={settings['functionname']}, args=(script, inputEvent)).start()"
|
||||
])
|
||||
|
||||
return "\n".join(func_body)
|
||||
|
||||
def _get_function_name(self, settings):
|
||||
"""Generate unique function name for plugin"""
|
||||
fn = "_" + settings['pluginname'].replace("-", "_")
|
||||
settings['functionname'] = fn
|
||||
while (settings['functionname'] == fn or
|
||||
settings['functionname'] + 'T' in globals() or
|
||||
settings['functionname'] in globals()):
|
||||
settings['functionname'] = self.id_generator() + fn
|
||||
return settings
|
||||
|
||||
def _load_plugins(self):
|
||||
"""Load all plugins from the plugin directory"""
|
||||
plugin_list = glob.glob(self.plugin_repo + '*')
|
||||
for curr_plugin in plugin_list:
|
||||
settings = self.get_plugin_settings(curr_plugin)
|
||||
if not settings['valid']:
|
||||
continue
|
||||
|
||||
settings = self._get_function_name(settings)
|
||||
|
||||
# Build and execute function definition
|
||||
if settings['loadmodule']:
|
||||
exec(self._build_plugin_exec(settings))
|
||||
else:
|
||||
exec(self._build_plugin_subprocess(settings))
|
||||
|
||||
# Set up function reference
|
||||
if settings['blockcall']:
|
||||
settings['function'] = globals()[settings['functionname']]
|
||||
else:
|
||||
settings['function'] = globals()[settings['functionname'] + "T"]
|
||||
|
||||
# Execute if needed
|
||||
if settings['exec']:
|
||||
settings['function']()
|
||||
|
||||
# Set up keybinding if needed
|
||||
if settings['key']:
|
||||
self.setup_shortcut_and_handle(settings)
|
||||
|
||||
self.plugin_list.append(settings)
|
||||
|
||||
self.loaded = True
|
0
src/cthulhu/plugins/SimplePluginSystem/__init__.py
Normal file
0
src/cthulhu/plugins/SimplePluginSystem/__init__.py
Normal file
Loading…
Reference in New Issue
Block a user