Insure launcher still works with orca dbus call changes.

This commit is contained in:
Storm Dragon
2026-05-06 17:31:39 -04:00
parent 279b22ea02
commit 8939922030
+208 -250
View File
@@ -49,266 +49,224 @@ from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QHBoxLayout,
from PySide6.QtCore import Qt, QTimer
import webbrowser
class OrcaRemoteController:
class ScreenReaderRemoteController:
"""D-Bus interface for screen reader remote control using dasbus library"""
_instance = None
_availability_checked = False
_is_available = False
service_configs = []
process_name = ""
display_name = "screen reader"
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
cls = type(self)
if cls._availability_checked:
self.available = cls._is_available
return
self.service_name = ""
self.main_path = ""
self.proxy = None
self.speech_proxy = None
self.speech_proxies = []
self.speech_command_style = "direct"
self.available = self._test_availability()
cls._is_available = self.available
cls._availability_checked = True
def _call_with_timeout(self, func, timeout_seconds=2):
"""Execute a function with timeout using threading"""
result = [None]
exception = [None]
def wrapper():
try:
result[0] = func()
except Exception as e:
exception[0] = e
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
return None
if exception[0]:
raise exception[0]
return result[0]
def _get_dbus_address(self):
"""Try to find the screen reader's D-Bus session address from its process"""
try:
result = subprocess.run(
["pgrep", "-x", self.process_name],
capture_output=True,
text=True,
timeout=1
)
if result.returncode == 0:
pid = result.stdout.strip().split('\n')[0]
with open(f"/proc/{pid}/environ", 'r') as f:
environ = f.read()
for var in environ.split('\0'):
if var.startswith('DBUS_SESSION_BUS_ADDRESS='):
return var.split('=', 1)[1]
except Exception:
pass
return None
def _test_service_proxy(self, proxy):
for method_name in ("GetVersion", "ListCommands"):
try:
getattr(proxy, method_name)()
return True
except Exception:
pass
return False
def _connect_with_bus(self, bus):
for config in self.service_configs:
try:
proxy = bus.get_proxy(config["service_name"], config["main_path"])
if not self._test_service_proxy(proxy):
continue
speech_proxies = []
for speech_module, command_style in config["speech_modules"]:
speech_proxies.append((
bus.get_proxy(
config["service_name"],
f"{config['main_path']}/{speech_module}"
),
command_style
))
except Exception:
continue
self.service_name = config["service_name"]
self.main_path = config["main_path"]
self.proxy = proxy
self.speech_proxy = speech_proxies[0][0] if speech_proxies else None
self.speech_proxies = speech_proxies
self.speech_command_style = speech_proxies[0][1] if speech_proxies else "direct"
return True
return False
def _test_availability(self):
"""Test if the screen reader remote controller is available"""
def test_connection():
from dasbus.connection import SessionMessageBus
try:
if self._connect_with_bus(SessionMessageBus()):
return True
except Exception:
pass
dbus_address = self._get_dbus_address()
if not dbus_address:
return False
old_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS')
try:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = dbus_address
return self._connect_with_bus(SessionMessageBus())
finally:
if old_address:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = old_address
elif 'DBUS_SESSION_BUS_ADDRESS' in os.environ:
del os.environ['DBUS_SESSION_BUS_ADDRESS']
try:
return self._call_with_timeout(test_connection, timeout_seconds=2) or False
except Exception:
return False
def present_message(self, message):
"""Present a message via screen reader speech/braille output"""
if not self.available or not self.proxy:
return False
try:
return self._call_with_timeout(lambda: self.proxy.PresentMessage(message), timeout_seconds=2) or False
except Exception:
return False
def interrupt_speech(self):
"""Interrupt current speech via the screen reader's speech module"""
if not self.available or not self.speech_proxies:
return False
for speech_proxy, command_style in self.speech_proxies:
try:
if command_style == "direct":
result = self._call_with_timeout(lambda: speech_proxy.InterruptSpeech(False), timeout_seconds=2)
else:
result = self._call_with_timeout(lambda: speech_proxy.ExecuteCommand("InterruptSpeech", False), timeout_seconds=2)
if result:
self.speech_proxy = speech_proxy
self.speech_command_style = command_style
return True
except Exception:
pass
return False
class OrcaRemoteController(ScreenReaderRemoteController):
"""D-Bus interface for Orca screen reader remote control using dasbus library"""
_instance = None
_availability_checked = False
_is_available = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._availability_checked:
return
self.service_name = "org.gnome.Orca.Service"
self.main_path = "/org/gnome/Orca/Service"
self.proxy = None
self.speech_proxy = None
self.available = self._test_availability()
OrcaRemoteController._is_available = self.available
OrcaRemoteController._availability_checked = True
def _call_with_timeout(self, func, timeout_seconds=2):
"""Execute a function with timeout using threading"""
result = [None]
exception = [None]
def wrapper():
try:
result[0] = func()
except Exception as e:
exception[0] = e
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
# Timeout occurred
return None
if exception[0]:
raise exception[0]
return result[0]
def _get_orca_dbus_address(self):
"""Try to find Orca's D-Bus session address from its process"""
try:
# Find Orca process
result = subprocess.run(
["pgrep", "-x", "orca"],
capture_output=True,
text=True,
timeout=1
)
if result.returncode == 0:
pid = result.stdout.strip().split('\n')[0]
# Read Orca's environment
with open(f"/proc/{pid}/environ", 'r') as f:
environ = f.read()
for var in environ.split('\0'):
if var.startswith('DBUS_SESSION_BUS_ADDRESS='):
return var.split('=', 1)[1]
except Exception:
pass
return None
def _test_availability(self):
"""Test if Orca remote controller is available"""
def test_connection():
from dasbus.connection import SessionMessageBus
# First try with current session bus
try:
bus = SessionMessageBus()
self.proxy = bus.get_proxy(self.service_name, self.main_path)
self.proxy.ListCommands()
self.speech_proxy = bus.get_proxy(
self.service_name,
f"{self.main_path}/SpeechAndVerbosityManager"
)
return True
except Exception:
# If that fails, try to find Orca's D-Bus session
orca_bus_address = self._get_orca_dbus_address()
if orca_bus_address:
# Temporarily set the environment variable
old_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS')
try:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = orca_bus_address
bus = SessionMessageBus()
self.proxy = bus.get_proxy(self.service_name, self.main_path)
self.proxy.ListCommands()
self.speech_proxy = bus.get_proxy(
self.service_name,
f"{self.main_path}/SpeechAndVerbosityManager"
)
return True
finally:
# Restore original address
if old_address:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = old_address
elif 'DBUS_SESSION_BUS_ADDRESS' in os.environ:
del os.environ['DBUS_SESSION_BUS_ADDRESS']
raise
try:
return self._call_with_timeout(test_connection, timeout_seconds=2) or False
except Exception:
return False
def present_message(self, message):
"""Present a message via Orca speech/braille output"""
if not self.available or not self.proxy:
return False
try:
return self._call_with_timeout(lambda: self.proxy.PresentMessage(message), timeout_seconds=2) or False
except Exception:
return False
def interrupt_speech(self):
"""Interrupt current speech via SpeechAndVerbosityManager"""
if not self.available or not self.speech_proxy:
return False
try:
return self._call_with_timeout(lambda: self.speech_proxy.ExecuteCommand("InterruptSpeech", False), timeout_seconds=2) or False
except Exception:
return False
process_name = "orca"
display_name = "Orca"
service_configs = [
{
"service_name": "org.gnome.Orca1.Service",
"main_path": "/org/gnome/Orca1/Service",
"speech_modules": [
("SpeechManager", "direct"),
("SpeechAndVerbosityManager", "direct"),
],
},
{
"service_name": "org.gnome.Orca.Service",
"main_path": "/org/gnome/Orca/Service",
"speech_modules": [
("SpeechAndVerbosityManager", "execute"),
("SpeechManager", "direct"),
],
},
]
class CthulhuRemoteController:
class CthulhuRemoteController(ScreenReaderRemoteController):
"""D-Bus interface for Cthulhu screen reader remote control using dasbus library"""
_instance = None
_availability_checked = False
_is_available = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._availability_checked:
return
self.service_name = "org.stormux.Cthulhu.Service"
self.main_path = "/org/stormux/Cthulhu/Service"
self.proxy = None
self.speech_proxy = None
self.available = self._test_availability()
CthulhuRemoteController._is_available = self.available
CthulhuRemoteController._availability_checked = True
def _call_with_timeout(self, func, timeout_seconds=2):
"""Execute a function with timeout using threading"""
result = [None]
exception = [None]
def wrapper():
try:
result[0] = func()
except Exception as e:
exception[0] = e
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
# Timeout occurred
return None
if exception[0]:
raise exception[0]
return result[0]
def _get_cthulhu_dbus_address(self):
"""Try to find Cthulhu's D-Bus session address from its process"""
try:
# Find Cthulhu process
result = subprocess.run(
["pgrep", "-x", "cthulhu"],
capture_output=True,
text=True,
timeout=1
)
if result.returncode == 0:
pid = result.stdout.strip().split('\n')[0]
# Read Cthulhu's environment
with open(f"/proc/{pid}/environ", 'r') as f:
environ = f.read()
for var in environ.split('\0'):
if var.startswith('DBUS_SESSION_BUS_ADDRESS='):
return var.split('=', 1)[1]
except Exception:
pass
return None
def _test_availability(self):
"""Test if Cthulhu remote controller is available"""
def test_connection():
from dasbus.connection import SessionMessageBus
# First try with current session bus
try:
bus = SessionMessageBus()
self.proxy = bus.get_proxy(self.service_name, self.main_path)
self.proxy.ListCommands()
self.speech_proxy = bus.get_proxy(
self.service_name,
f"{self.main_path}/SpeechAndVerbosityManager"
)
return True
except Exception:
# If that fails, try to find Cthulhu's D-Bus session
cthulhu_bus_address = self._get_cthulhu_dbus_address()
if cthulhu_bus_address:
# Temporarily set the environment variable
old_address = os.environ.get('DBUS_SESSION_BUS_ADDRESS')
try:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = cthulhu_bus_address
bus = SessionMessageBus()
self.proxy = bus.get_proxy(self.service_name, self.main_path)
self.proxy.ListCommands()
self.speech_proxy = bus.get_proxy(
self.service_name,
f"{self.main_path}/SpeechAndVerbosityManager"
)
return True
finally:
# Restore original address
if old_address:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = old_address
elif 'DBUS_SESSION_BUS_ADDRESS' in os.environ:
del os.environ['DBUS_SESSION_BUS_ADDRESS']
raise
try:
return self._call_with_timeout(test_connection, timeout_seconds=2) or False
except Exception:
return False
def present_message(self, message):
"""Present a message via Cthulhu speech/braille output"""
if not self.available or not self.proxy:
return False
try:
return self._call_with_timeout(lambda: self.proxy.PresentMessage(message), timeout_seconds=2) or False
except Exception:
return False
def interrupt_speech(self):
"""Interrupt current speech via SpeechAndVerbosityManager"""
if not self.available or not self.speech_proxy:
return False
try:
return self._call_with_timeout(lambda: self.speech_proxy.ExecuteCommand("InterruptSpeech", False), timeout_seconds=2) or False
except Exception:
return False
process_name = "cthulhu"
display_name = "Cthulhu"
service_configs = [
{
"service_name": "org.stormux.Cthulhu1.Service",
"main_path": "/org/stormux/Cthulhu1/Service",
"speech_modules": [
("SpeechManager", "direct"),
("SpeechAndVerbosityManager", "direct"),
],
},
{
"service_name": "org.stormux.Cthulhu.Service",
"main_path": "/org/stormux/Cthulhu/Service",
"speech_modules": [
("SpeechAndVerbosityManager", "execute"),
("SpeechManager", "direct"),
],
},
]
# Initialize speech provider based on platform