diff --git a/Toby Doom Launcher.py b/Toby Doom Launcher.py index e5273e4..c9225f6 100755 --- a/Toby Doom Launcher.py +++ b/Toby Doom Launcher.py @@ -49,266 +49,224 @@ from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QHBoxLayout, from PySide6.QtCore import Qt, QTimer import webbrowser -class OrcaRemoteController: +class ScreenReaderRemoteController: + """D-Bus interface for screen reader remote control using dasbus library""" + _instance = None + _availability_checked = False + _is_available = False + service_configs = [] + process_name = "" + display_name = "screen reader" + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + cls = type(self) + if cls._availability_checked: + self.available = cls._is_available + return + self.service_name = "" + self.main_path = "" + self.proxy = None + self.speech_proxy = None + self.speech_proxies = [] + self.speech_command_style = "direct" + self.available = self._test_availability() + cls._is_available = self.available + cls._availability_checked = True + + def _call_with_timeout(self, func, timeout_seconds=2): + """Execute a function with timeout using threading""" + result = [None] + exception = [None] + + def wrapper(): + try: + result[0] = func() + except Exception as e: + exception[0] = e + + thread = threading.Thread(target=wrapper, daemon=True) + thread.start() + thread.join(timeout=timeout_seconds) + + if thread.is_alive(): + return None + if exception[0]: + raise exception[0] + return result[0] + + def _get_dbus_address(self): + """Try to find the screen reader's D-Bus session address from its process""" + try: + result = subprocess.run( + ["pgrep", "-x", self.process_name], + capture_output=True, + text=True, + timeout=1 + ) + if result.returncode == 0: + pid = result.stdout.strip().split('\n')[0] + with open(f"/proc/{pid}/environ", 'r') as f: + environ = f.read() + for var in environ.split('\0'): + if var.startswith('DBUS_SESSION_BUS_ADDRESS='): + return var.split('=', 1)[1] + except Exception: + pass + return None + + def _test_service_proxy(self, proxy): + for method_name in ("GetVersion", "ListCommands"): + try: + getattr(proxy, method_name)() + return True + except Exception: + pass + return False + + def _connect_with_bus(self, bus): + for config in self.service_configs: + try: + proxy = bus.get_proxy(config["service_name"], config["main_path"]) + if not self._test_service_proxy(proxy): + continue + speech_proxies = [] + for speech_module, command_style in config["speech_modules"]: + speech_proxies.append(( + bus.get_proxy( + config["service_name"], + f"{config['main_path']}/{speech_module}" + ), + command_style + )) + except Exception: + continue + + self.service_name = config["service_name"] + self.main_path = config["main_path"] + self.proxy = proxy + self.speech_proxy = speech_proxies[0][0] if speech_proxies else None + self.speech_proxies = speech_proxies + self.speech_command_style = speech_proxies[0][1] if speech_proxies else "direct" + return True + return False + + def _test_availability(self): + """Test if the screen reader remote controller is available""" + def test_connection(): + from dasbus.connection import SessionMessageBus + + try: + if self._connect_with_bus(SessionMessageBus()): + return True + except Exception: + pass + + dbus_address = self._get_dbus_address() + if not dbus_address: + return False + + old_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS') + try: + os.environ['DBUS_SESSION_BUS_ADDRESS'] = dbus_address + return self._connect_with_bus(SessionMessageBus()) + finally: + if old_address: + os.environ['DBUS_SESSION_BUS_ADDRESS'] = old_address + elif 'DBUS_SESSION_BUS_ADDRESS' in os.environ: + del os.environ['DBUS_SESSION_BUS_ADDRESS'] + + try: + return self._call_with_timeout(test_connection, timeout_seconds=2) or False + except Exception: + return False + + def present_message(self, message): + """Present a message via screen reader speech/braille output""" + if not self.available or not self.proxy: + return False + + try: + return self._call_with_timeout(lambda: self.proxy.PresentMessage(message), timeout_seconds=2) or False + except Exception: + return False + + def interrupt_speech(self): + """Interrupt current speech via the screen reader's speech module""" + if not self.available or not self.speech_proxies: + return False + + for speech_proxy, command_style in self.speech_proxies: + try: + if command_style == "direct": + result = self._call_with_timeout(lambda: speech_proxy.InterruptSpeech(False), timeout_seconds=2) + else: + result = self._call_with_timeout(lambda: speech_proxy.ExecuteCommand("InterruptSpeech", False), timeout_seconds=2) + if result: + self.speech_proxy = speech_proxy + self.speech_command_style = command_style + return True + except Exception: + pass + return False + + +class OrcaRemoteController(ScreenReaderRemoteController): """D-Bus interface for Orca screen reader remote control using dasbus library""" _instance = None _availability_checked = False _is_available = False - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self): - if self._availability_checked: - return - self.service_name = "org.gnome.Orca.Service" - self.main_path = "/org/gnome/Orca/Service" - self.proxy = None - self.speech_proxy = None - self.available = self._test_availability() - OrcaRemoteController._is_available = self.available - OrcaRemoteController._availability_checked = True - - def _call_with_timeout(self, func, timeout_seconds=2): - """Execute a function with timeout using threading""" - result = [None] - exception = [None] - - def wrapper(): - try: - result[0] = func() - except Exception as e: - exception[0] = e - - thread = threading.Thread(target=wrapper, daemon=True) - thread.start() - thread.join(timeout=timeout_seconds) - - if thread.is_alive(): - # Timeout occurred - return None - if exception[0]: - raise exception[0] - return result[0] - - def _get_orca_dbus_address(self): - """Try to find Orca's D-Bus session address from its process""" - try: - # Find Orca process - result = subprocess.run( - ["pgrep", "-x", "orca"], - capture_output=True, - text=True, - timeout=1 - ) - if result.returncode == 0: - pid = result.stdout.strip().split('\n')[0] - # Read Orca's environment - with open(f"/proc/{pid}/environ", 'r') as f: - environ = f.read() - for var in environ.split('\0'): - if var.startswith('DBUS_SESSION_BUS_ADDRESS='): - return var.split('=', 1)[1] - except Exception: - pass - return None - - def _test_availability(self): - """Test if Orca remote controller is available""" - def test_connection(): - from dasbus.connection import SessionMessageBus - - # First try with current session bus - try: - bus = SessionMessageBus() - self.proxy = bus.get_proxy(self.service_name, self.main_path) - self.proxy.ListCommands() - self.speech_proxy = bus.get_proxy( - self.service_name, - f"{self.main_path}/SpeechAndVerbosityManager" - ) - return True - except Exception: - # If that fails, try to find Orca's D-Bus session - orca_bus_address = self._get_orca_dbus_address() - if orca_bus_address: - # Temporarily set the environment variable - old_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS') - try: - os.environ['DBUS_SESSION_BUS_ADDRESS'] = orca_bus_address - bus = SessionMessageBus() - self.proxy = bus.get_proxy(self.service_name, self.main_path) - self.proxy.ListCommands() - self.speech_proxy = bus.get_proxy( - self.service_name, - f"{self.main_path}/SpeechAndVerbosityManager" - ) - return True - finally: - # Restore original address - if old_address: - os.environ['DBUS_SESSION_BUS_ADDRESS'] = old_address - elif 'DBUS_SESSION_BUS_ADDRESS' in os.environ: - del os.environ['DBUS_SESSION_BUS_ADDRESS'] - raise - - try: - return self._call_with_timeout(test_connection, timeout_seconds=2) or False - except Exception: - return False - - def present_message(self, message): - """Present a message via Orca speech/braille output""" - if not self.available or not self.proxy: - return False - - try: - return self._call_with_timeout(lambda: self.proxy.PresentMessage(message), timeout_seconds=2) or False - except Exception: - return False - - def interrupt_speech(self): - """Interrupt current speech via SpeechAndVerbosityManager""" - if not self.available or not self.speech_proxy: - return False - - try: - return self._call_with_timeout(lambda: self.speech_proxy.ExecuteCommand("InterruptSpeech", False), timeout_seconds=2) or False - except Exception: - return False + process_name = "orca" + display_name = "Orca" + service_configs = [ + { + "service_name": "org.gnome.Orca1.Service", + "main_path": "/org/gnome/Orca1/Service", + "speech_modules": [ + ("SpeechManager", "direct"), + ("SpeechAndVerbosityManager", "direct"), + ], + }, + { + "service_name": "org.gnome.Orca.Service", + "main_path": "/org/gnome/Orca/Service", + "speech_modules": [ + ("SpeechAndVerbosityManager", "execute"), + ("SpeechManager", "direct"), + ], + }, + ] -class CthulhuRemoteController: +class CthulhuRemoteController(ScreenReaderRemoteController): """D-Bus interface for Cthulhu screen reader remote control using dasbus library""" _instance = None _availability_checked = False _is_available = False - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self): - if self._availability_checked: - return - self.service_name = "org.stormux.Cthulhu.Service" - self.main_path = "/org/stormux/Cthulhu/Service" - self.proxy = None - self.speech_proxy = None - self.available = self._test_availability() - CthulhuRemoteController._is_available = self.available - CthulhuRemoteController._availability_checked = True - - def _call_with_timeout(self, func, timeout_seconds=2): - """Execute a function with timeout using threading""" - result = [None] - exception = [None] - - def wrapper(): - try: - result[0] = func() - except Exception as e: - exception[0] = e - - thread = threading.Thread(target=wrapper, daemon=True) - thread.start() - thread.join(timeout=timeout_seconds) - - if thread.is_alive(): - # Timeout occurred - return None - if exception[0]: - raise exception[0] - return result[0] - - def _get_cthulhu_dbus_address(self): - """Try to find Cthulhu's D-Bus session address from its process""" - try: - # Find Cthulhu process - result = subprocess.run( - ["pgrep", "-x", "cthulhu"], - capture_output=True, - text=True, - timeout=1 - ) - if result.returncode == 0: - pid = result.stdout.strip().split('\n')[0] - # Read Cthulhu's environment - with open(f"/proc/{pid}/environ", 'r') as f: - environ = f.read() - for var in environ.split('\0'): - if var.startswith('DBUS_SESSION_BUS_ADDRESS='): - return var.split('=', 1)[1] - except Exception: - pass - return None - - def _test_availability(self): - """Test if Cthulhu remote controller is available""" - def test_connection(): - from dasbus.connection import SessionMessageBus - - # First try with current session bus - try: - bus = SessionMessageBus() - self.proxy = bus.get_proxy(self.service_name, self.main_path) - self.proxy.ListCommands() - self.speech_proxy = bus.get_proxy( - self.service_name, - f"{self.main_path}/SpeechAndVerbosityManager" - ) - return True - except Exception: - # If that fails, try to find Cthulhu's D-Bus session - cthulhu_bus_address = self._get_cthulhu_dbus_address() - if cthulhu_bus_address: - # Temporarily set the environment variable - old_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS') - try: - os.environ['DBUS_SESSION_BUS_ADDRESS'] = cthulhu_bus_address - bus = SessionMessageBus() - self.proxy = bus.get_proxy(self.service_name, self.main_path) - self.proxy.ListCommands() - self.speech_proxy = bus.get_proxy( - self.service_name, - f"{self.main_path}/SpeechAndVerbosityManager" - ) - return True - finally: - # Restore original address - if old_address: - os.environ['DBUS_SESSION_BUS_ADDRESS'] = old_address - elif 'DBUS_SESSION_BUS_ADDRESS' in os.environ: - del os.environ['DBUS_SESSION_BUS_ADDRESS'] - raise - - try: - return self._call_with_timeout(test_connection, timeout_seconds=2) or False - except Exception: - return False - - def present_message(self, message): - """Present a message via Cthulhu speech/braille output""" - if not self.available or not self.proxy: - return False - - try: - return self._call_with_timeout(lambda: self.proxy.PresentMessage(message), timeout_seconds=2) or False - except Exception: - return False - - def interrupt_speech(self): - """Interrupt current speech via SpeechAndVerbosityManager""" - if not self.available or not self.speech_proxy: - return False - - try: - return self._call_with_timeout(lambda: self.speech_proxy.ExecuteCommand("InterruptSpeech", False), timeout_seconds=2) or False - except Exception: - return False + process_name = "cthulhu" + display_name = "Cthulhu" + service_configs = [ + { + "service_name": "org.stormux.Cthulhu1.Service", + "main_path": "/org/stormux/Cthulhu1/Service", + "speech_modules": [ + ("SpeechManager", "direct"), + ("SpeechAndVerbosityManager", "direct"), + ], + }, + { + "service_name": "org.stormux.Cthulhu.Service", + "main_path": "/org/stormux/Cthulhu/Service", + "speech_modules": [ + ("SpeechAndVerbosityManager", "execute"), + ("SpeechManager", "direct"), + ], + }, + ] # Initialize speech provider based on platform