Add Cthulhu Remote plugin

This commit is contained in:
2026-05-20 22:36:17 -04:00
parent 009938c495
commit b055933d6d
13 changed files with 982 additions and 1 deletions
@@ -2,7 +2,7 @@
pkgname=cthulhu-git pkgname=cthulhu-git
_pkgname=cthulhu _pkgname=cthulhu
pkgver=2026.05.06.r394.ga5f7c9a pkgver=2026.05.14.r396.ge2f9a7c
pkgrel=1 pkgrel=1
pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca" pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca"
url="https://git.stormux.org/storm/cthulhu" url="https://git.stormux.org/storm/cthulhu"
@@ -0,0 +1 @@
"""Cthulhu Remote plugin package."""
@@ -0,0 +1,78 @@
"""Connection metadata and URL parsing for Cthulhu Remote."""
from dataclasses import dataclass
from enum import Enum
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from .protocol import SERVER_PORT, URL_PREFIX
from .socket_utils import host_port_to_address
class URLParsingError(Exception):
"""Raised when a remote connection URL is incomplete or invalid."""
class ConnectionMode(Enum):
"""Remote session role."""
MASTER = "master"
SLAVE = "slave"
class ConnectionState(Enum):
"""Remote session connection state."""
CONNECTED = "connected"
CONNECTING = "connecting"
DISCONNECTED = "disconnected"
DISCONNECTING = "disconnecting"
@dataclass
class ConnectionInfo:
"""Relay connection settings."""
hostname: str
mode: ConnectionMode
key: str
port: int = SERVER_PORT
insecure: bool = False
def __post_init__(self) -> None:
self.port = self.port or SERVER_PORT
self.mode = ConnectionMode(self.mode)
@classmethod
def from_url(cls, url: str) -> "ConnectionInfo":
parsedUrl = urlparse(url)
parsedQuery = parse_qs(parsedUrl.query)
hostname = parsedUrl.hostname
port = parsedUrl.port or SERVER_PORT
key = parsedQuery.get("key", [""])[0]
mode = parsedQuery.get("mode", [""])[0].lower()
insecure = parsedQuery.get("insecure", ["false"])[0].lower() == "true"
if not hostname:
raise URLParsingError("No hostname provided")
if not key:
raise URLParsingError("No key provided")
if not mode:
raise URLParsingError("No mode provided")
try:
ConnectionMode(mode)
except ValueError as error:
raise URLParsingError(f"Invalid mode provided: {mode!r}") from error
return cls(hostname=hostname, mode=ConnectionMode(mode), key=key, port=port, insecure=insecure)
def get_address(self) -> str:
return host_port_to_address((self.hostname, self.port))
def get_url(self, mode: ConnectionMode | None = None) -> str:
mode = mode or self.mode
query = {"key": self.key, "mode": mode.value}
if self.insecure:
query["insecure"] = "true"
return urlunparse((URL_PREFIX.split("://", 1)[0], self.get_address(), "", "", urlencode(query), ""))
def get_url_to_connect(self) -> str:
mode = ConnectionMode.SLAVE if self.mode == ConnectionMode.MASTER else ConnectionMode.MASTER
return self.get_url(mode)
@@ -0,0 +1,156 @@
"""Linux desktop integration for Cthulhu Remote commands."""
from __future__ import annotations
import logging
from typing import Any
import gi
gi.require_version("Atspi", "2.0")
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
from gi.repository import Atspi, Gdk, GLib, Gtk
from cthulhu import speech
logger = logging.getLogger(__name__)
WINDOWS_VK_TO_ATSPI_KEYSYM = {
0x08: "BackSpace",
0x09: "Tab",
0x0D: "Return",
0x10: "Shift_L",
0x11: "Control_L",
0x12: "Alt_L",
0x13: "Pause",
0x14: "Caps_Lock",
0x1B: "Escape",
0x20: "space",
0x21: "Page_Up",
0x22: "Page_Down",
0x23: "End",
0x24: "Home",
0x25: "Left",
0x26: "Up",
0x27: "Right",
0x28: "Down",
0x2D: "Insert",
0x2E: "Delete",
0x5B: "Super_L",
0x5C: "Super_R",
0x60: "KP_0",
0x61: "KP_1",
0x62: "KP_2",
0x63: "KP_3",
0x64: "KP_4",
0x65: "KP_5",
0x66: "KP_6",
0x67: "KP_7",
0x68: "KP_8",
0x69: "KP_9",
0x6A: "KP_Multiply",
0x6B: "KP_Add",
0x6D: "KP_Subtract",
0x6E: "KP_Decimal",
0x6F: "KP_Divide",
}
for _digit in range(0x30, 0x3A):
WINDOWS_VK_TO_ATSPI_KEYSYM[_digit] = chr(_digit)
for _letter in range(0x41, 0x5B):
WINDOWS_VK_TO_ATSPI_KEYSYM[_letter] = chr(_letter).lower()
for _functionKey in range(1, 25):
WINDOWS_VK_TO_ATSPI_KEYSYM[0x6F + _functionKey] = f"F{_functionKey}"
class LocalMachine:
"""Apply incoming remote commands to the local Linux desktop."""
def __init__(self, presenter) -> None:
self._presenter = presenter
self.isMuted = False
def speak(self, sequence: Any = None, **kwargs: Any) -> None:
if self.isMuted:
return
text = self._speech_sequence_to_text(sequence)
if text:
speech.speak(text, interrupt=False)
def cancel_speech(self, **kwargs: Any) -> None:
if not self.isMuted:
speech.stop()
def pause_speech(self, switch: bool = False, **kwargs: Any) -> None:
if switch:
speech.stop()
def set_clipboard_text(self, text: str = "", **kwargs: Any) -> None:
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text, -1)
clipboard.store()
self._presenter("Remote clipboard received")
def send_key(
self,
vk_code: int | None = None,
keysym: str | None = None,
pressed: bool | None = True,
**kwargs: Any,
) -> None:
keyval = self._resolve_keyval(vk_code=vk_code, keysym=keysym)
if keyval is None:
logger.debug("Ignoring unmapped remote key: vk_code=%r keysym=%r", vk_code, keysym)
return
eventType = Atspi.KeySynthType.PRESS if pressed else Atspi.KeySynthType.RELEASE
GLib.idle_add(self._generate_key_event, keyval, eventType)
def display(self, cells: list[int] | None = None, **kwargs: Any) -> None:
logger.debug("Remote braille display update ignored until braille routing is implemented: %r", cells)
def braille_input(self, **kwargs: Any) -> None:
logger.debug("Remote braille input ignored until braille routing is implemented: %r", kwargs)
def set_braille_display_size(self, **kwargs: Any) -> None:
logger.debug("Remote braille size negotiation ignored until braille routing is implemented: %r", kwargs)
def send_secure_attention_sequence(self, **kwargs: Any) -> None:
self._presenter("Ctrl Alt Delete is not available on Linux desktops")
def _generate_key_event(self, keyval: int, eventType: Atspi.KeySynthType) -> bool:
try:
Atspi.generate_keyboard_event(keyval, None, eventType)
except Exception:
logger.exception("Failed to generate remote key event")
return False
def _resolve_keyval(self, vk_code: int | None, keysym: str | None) -> int | None:
if keysym:
keyval = Gdk.keyval_from_name(keysym)
return keyval if keyval else None
if vk_code is None:
return None
mappedKeysym = WINDOWS_VK_TO_ATSPI_KEYSYM.get(int(vk_code))
if not mappedKeysym:
return None
keyval = Gdk.keyval_from_name(mappedKeysym)
return keyval if keyval else None
def _speech_sequence_to_text(self, sequence: Any) -> str:
if sequence is None:
return ""
if isinstance(sequence, str):
return sequence
if not isinstance(sequence, list):
return str(sequence)
parts = []
for item in sequence:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, list):
nested = self._speech_sequence_to_text(item)
if nested:
parts.append(nested)
return " ".join(parts)
@@ -0,0 +1,20 @@
cthulhu_remote_python_sources = files([
'__init__.py',
'connection_info.py',
'local_machine.py',
'plugin.py',
'protocol.py',
'serializer.py',
'socket_utils.py',
'transport.py'
])
python3.install_sources(
cthulhu_remote_python_sources,
subdir: 'cthulhu/plugins/CthulhuRemote'
)
install_data(
'plugin.info',
install_dir: python3.get_install_dir() / 'cthulhu' / 'plugins' / 'CthulhuRemote'
)
@@ -0,0 +1,7 @@
[Plugin]
Name = Cthulhu Remote
Module = CthulhuRemote
Description = Remote access plugin compatible with NVDA Remote relay servers
Authors = Storm Dragon <storm_dragon@stormux.org>
Version = 0.1
Category = Utilities
+299
View File
@@ -0,0 +1,299 @@
#!/usr/bin/env python3
#
# Copyright (c) 2026 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
"""Cthulhu Remote plugin."""
from __future__ import annotations
import logging
import secrets
from typing import Any
import gi
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
from gi.repository import Gdk, Gtk
from cthulhu import dbus_service
from cthulhu.plugin import Plugin, cthulhu_hookimpl
from cthulhu.plugins.CthulhuRemote.connection_info import (
ConnectionInfo,
ConnectionMode,
ConnectionState,
URLParsingError,
)
from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine
from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType, SERVER_PORT
from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer
from cthulhu.plugins.CthulhuRemote.socket_utils import address_to_host_port
from cthulhu.plugins.CthulhuRemote.transport import RelayTransport
logger = logging.getLogger(__name__)
class CthulhuRemote(Plugin):
"""Remote access plugin compatible with NVDA Remote relay servers."""
def __init__(self) -> None:
super().__init__()
self._transport: RelayTransport | None = None
self._connectionInfo: ConnectionInfo | None = None
self._connectionState = ConnectionState.DISCONNECTED
self._localMachine = LocalMachine(self._present_message)
self._muted = False
@cthulhu_hookimpl
def activate(self, plugin=None):
if plugin is not None and plugin is not self:
return
self.registerGestureByString(
self.disconnect,
"Disconnect Cthulhu Remote",
"kb:cthulhu+alt+page_down",
)
self.registerGestureByString(
self.toggle_mute,
"Toggle Cthulhu Remote mute",
"kb:cthulhu+alt+delete",
)
self.registerGestureByString(
self.push_clipboard,
"Push clipboard to Cthulhu Remote peers",
"kb:cthulhu+control+shift+c",
)
logger.info("Cthulhu Remote activated")
@cthulhu_hookimpl
def deactivate(self, plugin=None):
if plugin is not None and plugin is not self:
return
self.disconnect(notify_user=False)
logger.info("Cthulhu Remote deactivated")
@dbus_service.parameterized_command
def connect(
self,
host: str,
key: str,
mode: str = "master",
port: int = SERVER_PORT,
insecure: bool = False,
notify_user: bool = True,
) -> bool:
"""Connect to a Cthulhu Remote relay server."""
if not host or not key:
if notify_user:
self._present_message("Cthulhu Remote requires host and key")
return False
try:
connectionInfo = ConnectionInfo(
hostname=host,
port=port,
key=key,
mode=ConnectionMode(mode.lower()),
insecure=insecure,
)
except ValueError:
if notify_user:
self._present_message("Cthulhu Remote mode must be master or slave")
return False
return self._connect(connectionInfo, notify_user=notify_user)
@dbus_service.parameterized_command
def connect_to_address(
self,
address: str,
key: str,
mode: str = "master",
insecure: bool = False,
notify_user: bool = True,
) -> bool:
"""Connect using host[:port] address text."""
host, port = address_to_host_port(address)
return self.connect(host, key, mode=mode, port=port, insecure=insecure, notify_user=notify_user)
@dbus_service.parameterized_command
def connect_to_url(self, url: str, notify_user: bool = True) -> bool:
"""Connect using a cthulhuremote:// or nvdaremote:// URL."""
if url.startswith("nvdaremote://"):
url = "cthulhuremote://" + url.split("://", 1)[1]
try:
connectionInfo = ConnectionInfo.from_url(url)
except URLParsingError as error:
logger.warning("Invalid Cthulhu Remote URL: %s", error)
if notify_user:
self._present_message("Invalid Cthulhu Remote URL")
return False
return self._connect(connectionInfo, notify_user=notify_user)
@dbus_service.command
def disconnect(self, script=None, inputEvent=None, notify_user: bool = True) -> bool:
"""Disconnect the current Cthulhu Remote session."""
if self._transport is not None:
self._connectionState = ConnectionState.DISCONNECTING
self._transport.close()
self._transport = None
self._connectionState = ConnectionState.DISCONNECTED
if notify_user:
self._present_message("Cthulhu Remote disconnected")
return True
@dbus_service.command
def toggle_mute(self, script=None, inputEvent=None) -> bool:
"""Mute or unmute incoming remote output."""
self._muted = not self._muted
self._localMachine.isMuted = self._muted
self._present_message("Cthulhu Remote muted" if self._muted else "Cthulhu Remote unmuted")
return True
@dbus_service.command
def push_clipboard(self, script=None, inputEvent=None) -> bool:
"""Push local clipboard text to connected remote peers."""
if not self._transport or not self._transport.connected:
self._present_message("Cthulhu Remote is not connected")
return True
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
text = clipboard.wait_for_text()
if not text:
self._present_message("Clipboard does not contain text")
return True
self._transport.send(RemoteMessageType.set_clipboard_text, text=text)
self._present_message("Clipboard pushed")
return True
@dbus_service.command
def copy_invite_url(self, script=None, inputEvent=None) -> bool:
"""Copy a remote invite URL for the opposite connection role."""
if not self._connectionInfo:
self._present_message("Cthulhu Remote is not connected")
return True
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(self._connectionInfo.get_url_to_connect(), -1)
clipboard.store()
self._present_message("Cthulhu Remote invite copied")
return True
@dbus_service.command
def generate_key(self) -> str:
"""Generate a random remote connection key."""
return secrets.token_urlsafe(18)
@dbus_service.getter
def get_state(self) -> str:
"""Return the Cthulhu Remote connection state."""
return self._connectionState.value
@dbus_service.getter
def get_connection_url(self) -> str:
"""Return the current connection URL, if connected."""
if not self._connectionInfo:
return ""
return self._connectionInfo.get_url()
def _connect(self, connectionInfo: ConnectionInfo, notify_user: bool = True) -> bool:
self.disconnect(notify_user=False)
serializer = JSONSerializer()
transport = RelayTransport.create(connectionInfo, serializer)
self._register_transport_handlers(transport, connectionInfo.mode)
transport.transportConnected.register(self._handle_transport_connected)
transport.transportDisconnected.register(self._handle_transport_disconnected)
transport.transportConnectionFailed.register(self._handle_transport_failed)
transport.transportCertificateAuthenticationFailed.register(self._handle_certificate_failed)
self._transport = transport
self._connectionInfo = connectionInfo
self._connectionState = ConnectionState.CONNECTING
transport.start()
if notify_user:
self._present_message("Cthulhu Remote connecting")
return True
def _register_transport_handlers(self, transport: RelayTransport, mode: ConnectionMode) -> None:
transport.register_inbound(RemoteMessageType.speak, self._localMachine.speak)
transport.register_inbound(RemoteMessageType.cancel, self._localMachine.cancel_speech)
transport.register_inbound(RemoteMessageType.pause_speech, self._localMachine.pause_speech)
transport.register_inbound(RemoteMessageType.set_clipboard_text, self._localMachine.set_clipboard_text)
transport.register_inbound(RemoteMessageType.motd, self._handle_motd)
transport.register_inbound(RemoteMessageType.version_mismatch, self._handle_version_mismatch)
transport.register_inbound(RemoteMessageType.channel_joined, self._handle_channel_joined)
transport.register_inbound(RemoteMessageType.client_joined, self._handle_client_joined)
transport.register_inbound(RemoteMessageType.client_left, self._handle_client_left)
if mode == ConnectionMode.SLAVE:
transport.register_inbound(RemoteMessageType.key, self._localMachine.send_key)
transport.register_inbound(RemoteMessageType.braille_input, self._localMachine.braille_input)
transport.register_inbound(
RemoteMessageType.send_SAS,
self._localMachine.send_secure_attention_sequence,
)
else:
transport.register_inbound(RemoteMessageType.display, self._localMachine.display)
transport.register_inbound(
RemoteMessageType.set_display_size,
self._localMachine.set_braille_display_size,
)
def _handle_transport_connected(self, **kwargs: Any) -> None:
self._connectionState = ConnectionState.CONNECTED
self._present_message("Cthulhu Remote connected")
def _handle_transport_disconnected(self, **kwargs: Any) -> None:
if self._connectionState != ConnectionState.DISCONNECTING:
self._connectionState = ConnectionState.DISCONNECTED
self._present_message("Cthulhu Remote disconnected")
def _handle_transport_failed(self, **kwargs: Any) -> None:
self._connectionState = ConnectionState.DISCONNECTED
self._present_message("Cthulhu Remote connection failed")
def _handle_certificate_failed(self, **kwargs: Any) -> None:
self._connectionState = ConnectionState.DISCONNECTED
fingerprint = self._transport.lastFailFingerprint if self._transport else None
if fingerprint:
logger.warning("Cthulhu Remote certificate fingerprint: %s", fingerprint)
self._present_message("Cthulhu Remote certificate verification failed")
def _handle_motd(self, motd: str = "", **kwargs: Any) -> None:
if motd:
self._present_message(motd)
def _handle_version_mismatch(self, **kwargs: Any) -> None:
self._present_message("Cthulhu Remote relay protocol mismatch")
self.disconnect(notify_user=False)
def _handle_channel_joined(self, **kwargs: Any) -> None:
logger.info("Cthulhu Remote channel joined: %r", kwargs)
def _handle_client_joined(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None:
logger.info("Cthulhu Remote client joined: %r", client or kwargs)
self._present_message("Cthulhu Remote client joined")
def _handle_client_left(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None:
logger.info("Cthulhu Remote client left: %r", client or kwargs)
self._present_message("Cthulhu Remote client left")
def _present_message(self, message: str) -> None:
if not self.app:
return
try:
state = self.app.getDynamicApiManager().getAPI("CthulhuState")
if state and state.activeScript:
state.activeScript.presentMessage(message, resetStyles=False)
except Exception:
logger.exception("Failed to present Cthulhu Remote message")
@@ -0,0 +1,41 @@
"""Protocol constants for Cthulhu Remote.
The wire protocol is intentionally compatible with NVDA Remote protocol
version 2 so relay servers can be shared.
"""
from enum import Enum
PROTOCOL_VERSION = 2
SERVER_PORT = 6837
URL_PREFIX = "cthulhuremote://"
class RemoteMessageType(Enum):
"""Message types used by the remote relay protocol."""
protocol_version = "protocol_version"
join = "join"
channel_joined = "channel_joined"
client_joined = "client_joined"
client_left = "client_left"
generate_key = "generate_key"
key = "key"
speak = "speak"
cancel = "cancel"
pause_speech = "pause_speech"
tone = "tone"
wave = "wave"
send_SAS = "send_SAS"
index = "index"
display = "display"
braille_input = "braille_input"
set_braille_info = "set_braille_info"
set_display_size = "set_display_size"
set_clipboard_text = "set_clipboard_text"
motd = "motd"
version_mismatch = "version_mismatch"
ping = "ping"
error = "error"
nvda_not_connected = "nvda_not_connected"
@@ -0,0 +1,20 @@
"""JSON message serialization for Cthulhu Remote."""
import json
from enum import Enum
from typing import Any
class JSONSerializer:
"""Serialize newline-delimited JSON remote messages."""
separator = b"\n"
def serialize(self, messageType=None, **payload: Any) -> bytes:
if isinstance(messageType, Enum):
messageType = messageType.value
payload["type"] = messageType
return json.dumps(payload).encode("utf-8") + self.separator
def deserialize(self, data: bytes) -> dict[str, Any]:
return json.loads(data.decode("utf-8"))
@@ -0,0 +1,23 @@
"""Socket address helpers for Cthulhu Remote."""
import urllib.parse
from .protocol import SERVER_PORT
def address_to_host_port(address: str) -> tuple[str, int]:
"""Convert host[:port] text to a host, port tuple."""
parsedAddress = urllib.parse.urlparse("//" + address)
return parsedAddress.hostname or "", parsedAddress.port or SERVER_PORT
def host_port_to_address(hostPort: tuple[str, int]) -> str:
"""Convert a host, port tuple to compact host[:port] text."""
host, port = hostPort
if ":" in host:
host = f"[{host}]"
if port != SERVER_PORT:
return f"{host}:{port}"
return host
@@ -0,0 +1,269 @@
"""Threaded TLS transport for Cthulhu Remote relay connections."""
from __future__ import annotations
import hashlib
import logging
import queue
import select
import socket
import ssl
import threading
import time
from collections.abc import Callable
from enum import Enum
from typing import Any
from gi.repository import GLib
from .connection_info import ConnectionInfo
from .protocol import PROTOCOL_VERSION, RemoteMessageType
from .serializer import JSONSerializer
from .socket_utils import host_port_to_address
logger = logging.getLogger(__name__)
class Action:
"""Small callback registrar used by the transport layer."""
def __init__(self) -> None:
self._handlers: list[Callable[..., Any]] = []
self._lock = threading.Lock()
def register(self, handler: Callable[..., Any]) -> None:
with self._lock:
if handler not in self._handlers:
self._handlers.append(handler)
def unregister(self, handler: Callable[..., Any]) -> None:
with self._lock:
if handler in self._handlers:
self._handlers.remove(handler)
def notify(self, **kwargs: Any) -> None:
with self._lock:
handlers = list(self._handlers)
for handler in handlers:
handler(**kwargs)
class RelayTransport:
"""Connect to an NVDA Remote compatible relay server over TLS."""
def __init__(
self,
serializer: JSONSerializer,
address: tuple[str, int],
channel: str,
connectionType: str,
insecure: bool = False,
timeout: int = 0,
protocolVersion: int = PROTOCOL_VERSION,
) -> None:
self.serializer = serializer
self.address = address
self.channel = channel
self.connectionType = connectionType
self.insecure = insecure
self.timeout = timeout
self.protocolVersion = protocolVersion
self.connected = False
self.closed = True
self.successfulConnects = 0
self.lastFailFingerprint: str | None = None
self._buffer = b""
self._socket: ssl.SSLSocket | None = None
self._socketLock = threading.Lock()
self._sendQueue: queue.Queue[bytes | None] = queue.Queue()
self._sendThread: threading.Thread | None = None
self._connectorThread: threading.Thread | None = None
self._running = threading.Event()
self._handlers: dict[RemoteMessageType, Action] = {}
self.transportConnected = Action()
self.transportDisconnected = Action()
self.transportCertificateAuthenticationFailed = Action()
self.transportConnectionFailed = Action()
self.transportClosing = Action()
self.transportConnected.register(self._send_join_messages)
@classmethod
def create(cls, connectionInfo: ConnectionInfo, serializer: JSONSerializer) -> "RelayTransport":
return cls(
serializer=serializer,
address=(connectionInfo.hostname, connectionInfo.port),
channel=connectionInfo.key,
connectionType=connectionInfo.mode.value,
insecure=connectionInfo.insecure,
)
def start(self) -> None:
if self._connectorThread and self._connectorThread.is_alive():
return
self.closed = False
self._running.set()
self._connectorThread = threading.Thread(
target=self._connector_loop,
name="CthulhuRemoteConnector",
daemon=True,
)
self._connectorThread.start()
def close(self) -> None:
self.transportClosing.notify()
self.closed = True
self._running.clear()
self._disconnect()
def register_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None:
self._handlers.setdefault(messageType, Action()).register(handler)
def unregister_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None:
if messageType in self._handlers:
self._handlers[messageType].unregister(handler)
def send(self, messageType: str | Enum, **payload: Any) -> None:
if not self.connected:
logger.debug("Dropping remote message while disconnected: %r", messageType)
return
self._sendQueue.put(self.serializer.serialize(messageType=messageType, **payload))
def _connector_loop(self) -> None:
while self._running.is_set():
try:
self._run_once()
except ssl.SSLCertVerificationError:
logger.warning("Cthulhu Remote certificate verification failed")
except OSError as error:
logger.info("Cthulhu Remote connection failed: %s", error)
except Exception:
logger.exception("Cthulhu Remote connector failed")
if self._running.is_set():
time.sleep(5)
def _run_once(self) -> None:
try:
self._socket = self._create_socket()
self._socket.connect(self.address)
except ssl.SSLCertVerificationError:
self._capture_failed_fingerprint()
self.transportCertificateAuthenticationFailed.notify()
raise
except Exception:
self.transportConnectionFailed.notify()
raise
self.connected = True
self.successfulConnects += 1
self.transportConnected.notify()
self._sendThread = threading.Thread(target=self._send_loop, name="CthulhuRemoteSend", daemon=True)
self._sendThread.start()
try:
while self._running.is_set() and self._socket is not None:
readers, _, errors = select.select([self._socket], [], [self._socket], 1.0)
if errors:
break
if readers:
self._read_available()
finally:
self.connected = False
self.transportDisconnected.notify()
self._disconnect()
def _create_socket(self, insecure: bool | None = None) -> ssl.SSLSocket:
insecure = self.insecure if insecure is None else insecure
host, port = self.address
family, socktype, proto, _, _ = socket.getaddrinfo(host, port)[0]
plainSocket = socket.socket(family, socktype, proto)
if self.timeout:
plainSocket.settimeout(self.timeout)
plainSocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
plainSocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
context = ssl._create_unverified_context() if insecure else ssl.create_default_context()
return context.wrap_socket(plainSocket, server_hostname=host)
def _capture_failed_fingerprint(self) -> None:
try:
socketForCert = self._create_socket(insecure=True)
socketForCert.connect(self.address)
certificate = socketForCert.getpeercert(binary_form=True)
socketForCert.close()
if certificate:
self.lastFailFingerprint = hashlib.sha256(certificate).hexdigest().lower()
except Exception:
self.lastFailFingerprint = None
def _send_join_messages(self) -> None:
self.send(RemoteMessageType.protocol_version, version=self.protocolVersion)
self.send(RemoteMessageType.join, channel=self.channel, connection_type=self.connectionType)
def _read_available(self) -> None:
if self._socket is None:
return
with self._socketLock:
self._socket.setblocking(False)
try:
data = self._buffer + self._socket.recv(16384)
except ssl.SSLWantReadError:
return
finally:
self._socket.setblocking(True)
self._buffer = b""
if not data:
self._disconnect()
return
while b"\n" in data:
line, _, data = data.partition(b"\n")
self._parse_line(line)
self._buffer = data
def _parse_line(self, line: bytes) -> None:
try:
message = self.serializer.deserialize(line)
messageType = RemoteMessageType(message.pop("type"))
except Exception:
logger.exception("Ignoring invalid remote message: %r", line)
return
action = self._handlers.get(messageType)
if action is None:
logger.debug("No handler registered for remote message type: %s", messageType.value)
return
GLib.idle_add(self._notify_action, action, message)
def _notify_action(self, action: Action, payload: dict[str, Any]) -> bool:
action.notify(**payload)
return False
def _send_loop(self) -> None:
while True:
item = self._sendQueue.get()
if item is None:
return
try:
with self._socketLock:
if self._socket is not None:
self._socket.sendall(item)
except OSError:
return
def _disconnect(self) -> None:
if self._sendThread is not None:
self._sendQueue.put(None)
self._sendThread.join(timeout=2)
self._sendThread = None
self._clear_send_queue()
if self._socket is not None:
try:
self._socket.close()
except OSError:
pass
self._socket = None
self._buffer = b""
def _clear_send_queue(self) -> None:
try:
while True:
self._sendQueue.get_nowait()
except queue.Empty:
pass
+1
View File
@@ -2,6 +2,7 @@
subdir('AIAssistant') subdir('AIAssistant')
subdir('ByeCthulhu') subdir('ByeCthulhu')
subdir('Clipboard') subdir('Clipboard')
subdir('CthulhuRemote')
subdir('DisplayVersion') subdir('DisplayVersion')
subdir('HelloCthulhu') subdir('HelloCthulhu')
subdir('GameMode') subdir('GameMode')
+66
View File
@@ -0,0 +1,66 @@
import sys
import types
import unittest
from pathlib import Path
from unittest import mock
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
input_event_manager_stub = types.ModuleType("cthulhu.input_event_manager")
input_event_manager_stub.get_manager = mock.Mock(return_value=mock.Mock())
sys.modules["cthulhu.input_event_manager"] = input_event_manager_stub
from cthulhu.plugin_system_manager import PluginSystemManager
from cthulhu.plugins.CthulhuRemote.connection_info import ConnectionInfo, ConnectionMode
from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine
from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType
from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer
class CthulhuRemotePluginTests(unittest.TestCase):
def test_connection_info_accepts_nvda_remote_style_fields(self):
info = ConnectionInfo.from_url("cthulhuremote://example.com:1234?key=abc&mode=slave")
self.assertEqual(info.hostname, "example.com")
self.assertEqual(info.port, 1234)
self.assertEqual(info.key, "abc")
self.assertEqual(info.mode, ConnectionMode.SLAVE)
self.assertEqual(
info.get_url_to_connect(),
"cthulhuremote://example.com:1234?key=abc&mode=master",
)
def test_json_serializer_uses_remote_message_type_values(self):
serializer = JSONSerializer()
payload = serializer.deserialize(serializer.serialize(RemoteMessageType.join, channel="abc"))
self.assertEqual(payload["type"], "join")
self.assertEqual(payload["channel"], "abc")
def test_local_machine_maps_common_windows_vk_codes_to_keyvals(self):
machine = LocalMachine(lambda message: None)
self.assertEqual(machine._resolve_keyval(0x41, None), machine._resolve_keyval(None, "a"))
self.assertEqual(machine._resolve_keyval(0x70, None), machine._resolve_keyval(None, "F1"))
self.assertIsNone(machine._resolve_keyval(0xFF, None))
@mock.patch("cthulhu.plugin_system_manager.dbus_service.get_remote_controller")
def test_plugin_manager_can_load_cthulhu_remote(self, remote_controller):
remote_controller.return_value = mock.Mock()
app = mock.Mock()
app.getSignalManager.return_value = mock.Mock()
app.getAPIHelper.return_value = None
manager = PluginSystemManager(app)
manager.rescanPlugins()
plugin_info = manager._resolve_plugin_info("CthulhuRemote")
self.assertIsNotNone(plugin_info)
self.assertTrue(manager.loadPlugin(plugin_info))
self.assertEqual(plugin_info.instance.__class__.__name__, "CthulhuRemote")
self.assertTrue(manager.unloadPlugin(plugin_info))
if __name__ == "__main__":
unittest.main()