Compare commits
9 Commits
2026.05.14
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 30e1667f77 | |||
| edc195c46b | |||
| cb553d3031 | |||
| b055933d6d | |||
| 009938c495 | |||
| a84aec94a9 | |||
| 4145e9375b | |||
| 0acba6a733 | |||
| b224d699c0 |
@@ -2,7 +2,7 @@
|
||||
|
||||
pkgname=cthulhu-git
|
||||
_pkgname=cthulhu
|
||||
pkgver=2026.05.06.r394.ga5f7c9a
|
||||
pkgver=2026.05.14.r396.ge2f9a7c
|
||||
pkgrel=1
|
||||
pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca"
|
||||
url="https://git.stormux.org/storm/cthulhu"
|
||||
|
||||
@@ -148,14 +148,42 @@ class AXObject:
|
||||
if not toolkit_name.startswith("qt"):
|
||||
return False
|
||||
|
||||
if not AXObject._can_reach_application(obj):
|
||||
tokens = ["AXObject:", obj, "has broken ancestry. See qt bug 130116."]
|
||||
debug.print_tokens(debug.LEVEL_INFO, tokens, True)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _can_reach_application(obj: Atspi.Accessible) -> bool:
|
||||
"""Returns True if obj's ancestry reaches the application."""
|
||||
|
||||
reached_app = False
|
||||
parent = AXObject.get_parent(obj)
|
||||
while parent and not reached_app:
|
||||
reached_app = AXObject.get_role(parent) == Atspi.Role.APPLICATION
|
||||
parent = AXObject.get_parent(parent)
|
||||
|
||||
if not reached_app:
|
||||
tokens = ["AXObject:", obj, "has broken ancestry. See qt bug 130116."]
|
||||
return reached_app
|
||||
|
||||
@staticmethod
|
||||
def has_broken_popup_ancestry(obj: Atspi.Accessible) -> bool:
|
||||
"""Returns True if obj is a popup item whose ancestry is broken."""
|
||||
|
||||
if obj is None or AXObject.is_dead(obj):
|
||||
return False
|
||||
|
||||
# Chromium Omnibox popups can lose their path back to the frame after
|
||||
# the popup is closed and reopened.
|
||||
if not AXObject.get_toolkit_name(obj).startswith("chromium"):
|
||||
return False
|
||||
|
||||
if AXObject.get_role(obj) != Atspi.Role.LIST_ITEM:
|
||||
return False
|
||||
|
||||
if not AXObject._can_reach_application(obj):
|
||||
tokens = ["AXObject:", obj, "has broken popup ancestry."]
|
||||
debug.print_tokens(debug.LEVEL_INFO, tokens, True)
|
||||
return True
|
||||
|
||||
|
||||
@@ -348,6 +348,13 @@ class FocusManager:
|
||||
tokens.extend(["in", app])
|
||||
_log_tokens(tokens)
|
||||
|
||||
if frame is None and AXObject.has_broken_popup_ancestry(self._focus):
|
||||
_log_tokens(
|
||||
["Not clearing active window; focus", self._focus, "is in popup with broken ancestry"],
|
||||
"broken-popup-ancestry",
|
||||
)
|
||||
return
|
||||
|
||||
if frame == self._window:
|
||||
_log("Setting active window to existing active window", "no-change")
|
||||
elif frame is None:
|
||||
|
||||
@@ -0,0 +1,206 @@
|
||||
# Cthulhu Remote
|
||||
|
||||
Cthulhu Remote is an NVDA Remote-style assistive technology relay plugin. It is
|
||||
intended for remote help between screen reader users: one user can control
|
||||
another user's desktop through keyboard input and receive the remote screen
|
||||
reader's speech feedback.
|
||||
|
||||
This is not graphical screen sharing. It does not forward a framebuffer, window
|
||||
image, VNC/RDP session, or screenshots. The useful channel is screen reader
|
||||
output and remote input.
|
||||
|
||||
## Current Status
|
||||
|
||||
### Working
|
||||
|
||||
- Plugin installs and loads as `CthulhuRemote`.
|
||||
- The plugin is registered as a D-Bus module when loaded.
|
||||
- The transport connects outbound to NVDA Remote-compatible relay servers over
|
||||
TLS.
|
||||
- The default relay port is `6837`.
|
||||
- Protocol version `2` is used.
|
||||
- `cthulhuremote://` URLs are parsed.
|
||||
- `nvdaremote://` URLs are accepted and rewritten to `cthulhuremote://`.
|
||||
- `master` and `slave` connection modes are represented.
|
||||
- Random connection keys can be generated.
|
||||
- Invite URLs can be copied for the opposite role.
|
||||
- The local clipboard can be pushed to connected peers.
|
||||
- Incoming remote speech is spoken through Cthulhu.
|
||||
- Incoming remote speech is suppressed from speech monitor callbacks to avoid
|
||||
echo loops.
|
||||
- In `slave` mode, local Cthulhu speech is forwarded to the relay as `speak`
|
||||
messages.
|
||||
- In `slave` mode, incoming remote key messages are mapped from common Windows
|
||||
virtual-key codes or keysyms to AT-SPI key events.
|
||||
- Disconnect and mute gestures are registered:
|
||||
- `cthulhu+alt+page_down`: disconnect
|
||||
- `cthulhu+alt+delete`: mute or unmute incoming remote output
|
||||
- `cthulhu+control+shift+c`: push clipboard text
|
||||
|
||||
### Partially Implemented
|
||||
|
||||
- NVDA Remote compatibility is intentional at the relay-message level, but
|
||||
cross-client interoperability with NVDA has not been verified.
|
||||
- Remote key injection exists on the controlled/slave side, but master-side key
|
||||
capture and forwarding is not implemented yet.
|
||||
- Braille message types exist, but braille routing is not implemented.
|
||||
- TLS certificate verification is enabled by default, and an insecure mode
|
||||
exists, but there is no trust-on-first-use certificate workflow.
|
||||
- Connection state is exposed, but there is no complete user-facing connection
|
||||
dialog.
|
||||
|
||||
### Not Implemented
|
||||
|
||||
- Graphical screen sharing.
|
||||
- Screenshot forwarding.
|
||||
- VNC/RDP integration.
|
||||
- Master-side keyboard forwarding.
|
||||
- Remote braille display output.
|
||||
- Remote braille input routing.
|
||||
- Tone, wave, and other non-speech remote audio events.
|
||||
- Ping/keepalive handling.
|
||||
- Relay error presentation beyond basic logging/messages.
|
||||
- Preferences UI for host, port, key, mode, certificate trust, and invite
|
||||
management.
|
||||
|
||||
## Intended User Model
|
||||
|
||||
The typical assistive remote-support flow is:
|
||||
|
||||
1. The person needing help runs Cthulhu and connects as `slave`.
|
||||
2. The helper runs Cthulhu and connects as `master`.
|
||||
3. The helper sends keyboard commands through the relay.
|
||||
4. The controlled desktop receives those keys locally.
|
||||
5. The controlled machine's Cthulhu speech is sent back to the helper.
|
||||
|
||||
Examples this is meant to support:
|
||||
|
||||
- Teaching someone how to use a program.
|
||||
- Helping with a stuck dialog or inaccessible workflow.
|
||||
- Typing into the remote user's editor or terminal.
|
||||
- Navigating menus and controls with speech feedback.
|
||||
|
||||
Things it does not solve by itself:
|
||||
|
||||
- Visual captchas that require seeing an image.
|
||||
- Visual inspection of a remote screen.
|
||||
- Mouse-driven visual troubleshooting without another channel.
|
||||
|
||||
## Implementation Checklist
|
||||
|
||||
### Core Protocol
|
||||
|
||||
- [x] Define protocol constants and message types.
|
||||
- [x] Serialize and parse newline-delimited JSON messages.
|
||||
- [x] Connect to relay servers over TLS.
|
||||
- [x] Send `protocol_version` and `join` messages after connecting.
|
||||
- [x] Dispatch inbound messages on the GLib main loop.
|
||||
- [ ] Add ping/keepalive handling.
|
||||
- [ ] Present relay `error` and `nvda_not_connected` messages clearly.
|
||||
- [ ] Verify exact payload compatibility with current NVDA Remote clients.
|
||||
|
||||
### Connection Management
|
||||
|
||||
- [x] Parse `cthulhuremote://` URLs.
|
||||
- [x] Accept `nvdaremote://` URLs.
|
||||
- [x] Support host, port, key, mode, and insecure TLS fields.
|
||||
- [x] Track connection states.
|
||||
- [x] Expose D-Bus commands for connect, disconnect, state, key generation, and
|
||||
invite URL copying.
|
||||
- [ ] Add an accessible GTK connection dialog.
|
||||
- [ ] Add saved/recent relay configuration if desired.
|
||||
- [ ] Add trust-on-first-use certificate handling or a clear certificate trust
|
||||
workflow.
|
||||
- [ ] Add better reconnect/backoff status reporting.
|
||||
|
||||
### Speech
|
||||
|
||||
- [x] Speak incoming remote `speak` messages locally.
|
||||
- [x] Support muting incoming remote speech.
|
||||
- [x] Prevent inbound remote speech from being echoed back to the relay.
|
||||
- [x] Forward local speech to the relay in `slave` mode.
|
||||
- [ ] Decide whether master mode should ever forward local speech.
|
||||
- [ ] Preserve richer speech sequence details if needed instead of flattening to
|
||||
plain text.
|
||||
- [ ] Verify behavior with speech interruption and cancellation across two live
|
||||
clients.
|
||||
|
||||
### Keyboard Control
|
||||
|
||||
- [x] Receive remote key messages in `slave` mode.
|
||||
- [x] Map common Windows virtual-key codes to Linux keysyms.
|
||||
- [x] Inject remote key presses/releases with AT-SPI.
|
||||
- [ ] Capture local keyboard events in `master` mode.
|
||||
- [ ] Forward master key events as remote `key` messages.
|
||||
- [ ] Prevent forwarded keys from also acting on the master's local desktop,
|
||||
unless intentionally passed through.
|
||||
- [ ] Preserve modifier press/release ordering.
|
||||
- [ ] Verify Xorg behavior.
|
||||
- [ ] Verify Wayland behavior without weakening Xorg support.
|
||||
- [ ] Add tests for key payload generation and modifier handling.
|
||||
|
||||
### Clipboard
|
||||
|
||||
- [x] Push local clipboard text to connected peers.
|
||||
- [x] Receive remote clipboard text into the local clipboard.
|
||||
- [ ] Add a command or dialog control for pull/request clipboard if protocol
|
||||
support is available.
|
||||
- [ ] Decide how to handle large clipboard payloads.
|
||||
- [ ] Add tests for empty, plain text, and multiline clipboard text.
|
||||
|
||||
### Braille
|
||||
|
||||
- [ ] Route incoming remote `display` messages to Cthulhu braille output.
|
||||
- [ ] Implement `set_display_size`.
|
||||
- [ ] Implement `set_braille_info` if required for compatibility.
|
||||
- [ ] Route local braille input as remote `braille_input` messages when acting
|
||||
as master.
|
||||
- [ ] Verify routing keys and cursor-routing behavior.
|
||||
- [ ] Add tests around braille payload parsing and ignored/unsupported fields.
|
||||
|
||||
### Audio And Miscellaneous Messages
|
||||
|
||||
- [ ] Implement or intentionally ignore `tone`.
|
||||
- [ ] Implement or intentionally ignore `wave`.
|
||||
- [ ] Decide whether `index` is relevant to Cthulhu speech.
|
||||
- [ ] Handle `send_SAS` with a clear Linux-specific message.
|
||||
- [ ] Log unsupported message types at debug level without spamming users.
|
||||
|
||||
### User Interface
|
||||
|
||||
- [x] Register basic gestures for disconnect, mute, and clipboard push.
|
||||
- [ ] Add connect/disconnect controls to plugin preferences.
|
||||
- [ ] Add host, port, key, mode, and insecure TLS fields.
|
||||
- [ ] Add generate-key and copy-invite controls.
|
||||
- [ ] Add connection status text suitable for screen reader users.
|
||||
- [ ] Ensure Tab and Shift+Tab navigate the entire dialog.
|
||||
- [ ] Associate GTK labels with their controls.
|
||||
|
||||
### Tests And Verification
|
||||
|
||||
- [x] Test URL parsing.
|
||||
- [x] Test serializer message type values.
|
||||
- [x] Test common key mapping.
|
||||
- [x] Test plugin loading through the plugin manager.
|
||||
- [x] Test additive speech monitor callbacks.
|
||||
- [x] Test remote speech echo suppression.
|
||||
- [x] Test slave-mode local speech forwarding.
|
||||
- [x] Test that master mode does not forward local speech.
|
||||
- [ ] Add transport tests with a fake relay socket.
|
||||
- [ ] Add connection-state transition tests.
|
||||
- [ ] Add D-Bus introspection or command exposure tests for the plugin module.
|
||||
- [ ] Test against a live NVDA Remote-compatible relay.
|
||||
- [ ] Test Cthulhu-to-Cthulhu master/slave operation.
|
||||
- [ ] Test NVDA master to Cthulhu slave.
|
||||
- [ ] Test Cthulhu master to NVDA slave after master key forwarding exists.
|
||||
|
||||
## Suggested Next Steps
|
||||
|
||||
1. Add an accessible connection dialog so the plugin can be used without manual
|
||||
D-Bus calls.
|
||||
2. Implement ping/error handling to improve relay behavior and diagnostics.
|
||||
3. Design master-side key forwarding carefully around Cthulhu's existing input
|
||||
event manager.
|
||||
4. Add fake-relay tests before broad live testing.
|
||||
5. Perform live two-client testing and record exact compatibility gaps.
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
"""Cthulhu Remote plugin package."""
|
||||
@@ -0,0 +1,78 @@
|
||||
"""Connection metadata and URL parsing for Cthulhu Remote."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
|
||||
from .protocol import SERVER_PORT, URL_PREFIX
|
||||
from .socket_utils import host_port_to_address
|
||||
|
||||
|
||||
class URLParsingError(Exception):
|
||||
"""Raised when a remote connection URL is incomplete or invalid."""
|
||||
|
||||
|
||||
class ConnectionMode(Enum):
|
||||
"""Remote session role."""
|
||||
|
||||
MASTER = "master"
|
||||
SLAVE = "slave"
|
||||
|
||||
|
||||
class ConnectionState(Enum):
|
||||
"""Remote session connection state."""
|
||||
|
||||
CONNECTED = "connected"
|
||||
CONNECTING = "connecting"
|
||||
DISCONNECTED = "disconnected"
|
||||
DISCONNECTING = "disconnecting"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConnectionInfo:
|
||||
"""Relay connection settings."""
|
||||
|
||||
hostname: str
|
||||
mode: ConnectionMode
|
||||
key: str
|
||||
port: int = SERVER_PORT
|
||||
insecure: bool = False
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.port = self.port or SERVER_PORT
|
||||
self.mode = ConnectionMode(self.mode)
|
||||
|
||||
@classmethod
|
||||
def from_url(cls, url: str) -> "ConnectionInfo":
|
||||
parsedUrl = urlparse(url)
|
||||
parsedQuery = parse_qs(parsedUrl.query)
|
||||
hostname = parsedUrl.hostname
|
||||
port = parsedUrl.port or SERVER_PORT
|
||||
key = parsedQuery.get("key", [""])[0]
|
||||
mode = parsedQuery.get("mode", [""])[0].lower()
|
||||
insecure = parsedQuery.get("insecure", ["false"])[0].lower() == "true"
|
||||
if not hostname:
|
||||
raise URLParsingError("No hostname provided")
|
||||
if not key:
|
||||
raise URLParsingError("No key provided")
|
||||
if not mode:
|
||||
raise URLParsingError("No mode provided")
|
||||
try:
|
||||
ConnectionMode(mode)
|
||||
except ValueError as error:
|
||||
raise URLParsingError(f"Invalid mode provided: {mode!r}") from error
|
||||
return cls(hostname=hostname, mode=ConnectionMode(mode), key=key, port=port, insecure=insecure)
|
||||
|
||||
def get_address(self) -> str:
|
||||
return host_port_to_address((self.hostname, self.port))
|
||||
|
||||
def get_url(self, mode: ConnectionMode | None = None) -> str:
|
||||
mode = mode or self.mode
|
||||
query = {"key": self.key, "mode": mode.value}
|
||||
if self.insecure:
|
||||
query["insecure"] = "true"
|
||||
return urlunparse((URL_PREFIX.split("://", 1)[0], self.get_address(), "", "", urlencode(query), ""))
|
||||
|
||||
def get_url_to_connect(self) -> str:
|
||||
mode = ConnectionMode.SLAVE if self.mode == ConnectionMode.MASTER else ConnectionMode.MASTER
|
||||
return self.get_url(mode)
|
||||
@@ -0,0 +1,157 @@
|
||||
"""Linux desktop integration for Cthulhu Remote commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Atspi", "2.0")
|
||||
gi.require_version("Gdk", "3.0")
|
||||
gi.require_version("Gtk", "3.0")
|
||||
from gi.repository import Atspi, Gdk, GLib, Gtk
|
||||
|
||||
from cthulhu import speech
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
WINDOWS_VK_TO_ATSPI_KEYSYM = {
|
||||
0x08: "BackSpace",
|
||||
0x09: "Tab",
|
||||
0x0D: "Return",
|
||||
0x10: "Shift_L",
|
||||
0x11: "Control_L",
|
||||
0x12: "Alt_L",
|
||||
0x13: "Pause",
|
||||
0x14: "Caps_Lock",
|
||||
0x1B: "Escape",
|
||||
0x20: "space",
|
||||
0x21: "Page_Up",
|
||||
0x22: "Page_Down",
|
||||
0x23: "End",
|
||||
0x24: "Home",
|
||||
0x25: "Left",
|
||||
0x26: "Up",
|
||||
0x27: "Right",
|
||||
0x28: "Down",
|
||||
0x2D: "Insert",
|
||||
0x2E: "Delete",
|
||||
0x5B: "Super_L",
|
||||
0x5C: "Super_R",
|
||||
0x60: "KP_0",
|
||||
0x61: "KP_1",
|
||||
0x62: "KP_2",
|
||||
0x63: "KP_3",
|
||||
0x64: "KP_4",
|
||||
0x65: "KP_5",
|
||||
0x66: "KP_6",
|
||||
0x67: "KP_7",
|
||||
0x68: "KP_8",
|
||||
0x69: "KP_9",
|
||||
0x6A: "KP_Multiply",
|
||||
0x6B: "KP_Add",
|
||||
0x6D: "KP_Subtract",
|
||||
0x6E: "KP_Decimal",
|
||||
0x6F: "KP_Divide",
|
||||
}
|
||||
|
||||
for _digit in range(0x30, 0x3A):
|
||||
WINDOWS_VK_TO_ATSPI_KEYSYM[_digit] = chr(_digit)
|
||||
for _letter in range(0x41, 0x5B):
|
||||
WINDOWS_VK_TO_ATSPI_KEYSYM[_letter] = chr(_letter).lower()
|
||||
for _functionKey in range(1, 25):
|
||||
WINDOWS_VK_TO_ATSPI_KEYSYM[0x6F + _functionKey] = f"F{_functionKey}"
|
||||
|
||||
|
||||
class LocalMachine:
|
||||
"""Apply incoming remote commands to the local Linux desktop."""
|
||||
|
||||
def __init__(self, presenter) -> None:
|
||||
self._presenter = presenter
|
||||
self.isMuted = False
|
||||
|
||||
def speak(self, sequence: Any = None, **kwargs: Any) -> None:
|
||||
if self.isMuted:
|
||||
return
|
||||
text = self._speech_sequence_to_text(sequence)
|
||||
if text:
|
||||
with speech.suppress_monitor_callbacks():
|
||||
speech.speak(text, interrupt=False)
|
||||
|
||||
def cancel_speech(self, **kwargs: Any) -> None:
|
||||
if not self.isMuted:
|
||||
speech.stop()
|
||||
|
||||
def pause_speech(self, switch: bool = False, **kwargs: Any) -> None:
|
||||
if switch:
|
||||
speech.stop()
|
||||
|
||||
def set_clipboard_text(self, text: str = "", **kwargs: Any) -> None:
|
||||
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
|
||||
clipboard.set_text(text, -1)
|
||||
clipboard.store()
|
||||
self._presenter("Remote clipboard received")
|
||||
|
||||
def send_key(
|
||||
self,
|
||||
vk_code: int | None = None,
|
||||
keysym: str | None = None,
|
||||
pressed: bool | None = True,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
keyval = self._resolve_keyval(vk_code=vk_code, keysym=keysym)
|
||||
if keyval is None:
|
||||
logger.debug("Ignoring unmapped remote key: vk_code=%r keysym=%r", vk_code, keysym)
|
||||
return
|
||||
eventType = Atspi.KeySynthType.PRESS if pressed else Atspi.KeySynthType.RELEASE
|
||||
GLib.idle_add(self._generate_key_event, keyval, eventType)
|
||||
|
||||
def display(self, cells: list[int] | None = None, **kwargs: Any) -> None:
|
||||
logger.debug("Remote braille display update ignored until braille routing is implemented: %r", cells)
|
||||
|
||||
def braille_input(self, **kwargs: Any) -> None:
|
||||
logger.debug("Remote braille input ignored until braille routing is implemented: %r", kwargs)
|
||||
|
||||
def set_braille_display_size(self, **kwargs: Any) -> None:
|
||||
logger.debug("Remote braille size negotiation ignored until braille routing is implemented: %r", kwargs)
|
||||
|
||||
def send_secure_attention_sequence(self, **kwargs: Any) -> None:
|
||||
self._presenter("Ctrl Alt Delete is not available on Linux desktops")
|
||||
|
||||
def _generate_key_event(self, keyval: int, eventType: Atspi.KeySynthType) -> bool:
|
||||
try:
|
||||
Atspi.generate_keyboard_event(keyval, None, eventType)
|
||||
except Exception:
|
||||
logger.exception("Failed to generate remote key event")
|
||||
return False
|
||||
|
||||
def _resolve_keyval(self, vk_code: int | None, keysym: str | None) -> int | None:
|
||||
if keysym:
|
||||
keyval = Gdk.keyval_from_name(keysym)
|
||||
return keyval if keyval else None
|
||||
if vk_code is None:
|
||||
return None
|
||||
mappedKeysym = WINDOWS_VK_TO_ATSPI_KEYSYM.get(int(vk_code))
|
||||
if not mappedKeysym:
|
||||
return None
|
||||
keyval = Gdk.keyval_from_name(mappedKeysym)
|
||||
return keyval if keyval else None
|
||||
|
||||
def _speech_sequence_to_text(self, sequence: Any) -> str:
|
||||
if sequence is None:
|
||||
return ""
|
||||
if isinstance(sequence, str):
|
||||
return sequence
|
||||
if not isinstance(sequence, list):
|
||||
return str(sequence)
|
||||
parts = []
|
||||
for item in sequence:
|
||||
if isinstance(item, str):
|
||||
parts.append(item)
|
||||
elif isinstance(item, list):
|
||||
nested = self._speech_sequence_to_text(item)
|
||||
if nested:
|
||||
parts.append(nested)
|
||||
return " ".join(parts)
|
||||
@@ -0,0 +1,20 @@
|
||||
cthulhu_remote_python_sources = files([
|
||||
'__init__.py',
|
||||
'connection_info.py',
|
||||
'local_machine.py',
|
||||
'plugin.py',
|
||||
'protocol.py',
|
||||
'serializer.py',
|
||||
'socket_utils.py',
|
||||
'transport.py'
|
||||
])
|
||||
|
||||
python3.install_sources(
|
||||
cthulhu_remote_python_sources,
|
||||
subdir: 'cthulhu/plugins/CthulhuRemote'
|
||||
)
|
||||
|
||||
install_data(
|
||||
'plugin.info',
|
||||
install_dir: python3.get_install_dir() / 'cthulhu' / 'plugins' / 'CthulhuRemote'
|
||||
)
|
||||
@@ -0,0 +1,7 @@
|
||||
[Plugin]
|
||||
Name = Cthulhu Remote
|
||||
Module = CthulhuRemote
|
||||
Description = Remote access plugin compatible with NVDA Remote relay servers
|
||||
Authors = Storm Dragon <storm_dragon@stormux.org>
|
||||
Version = 0.1
|
||||
Category = Utilities
|
||||
@@ -0,0 +1,327 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Copyright (c) 2026 Stormux
|
||||
#
|
||||
# This library is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation; either
|
||||
# version 2.1 of the License, or (at your option) any later version.
|
||||
|
||||
"""Cthulhu Remote plugin."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import secrets
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gdk", "3.0")
|
||||
gi.require_version("Gtk", "3.0")
|
||||
from gi.repository import Gdk, Gtk
|
||||
|
||||
from cthulhu import dbus_service
|
||||
from cthulhu import speech
|
||||
from cthulhu.plugin import Plugin, cthulhu_hookimpl
|
||||
|
||||
from cthulhu.plugins.CthulhuRemote.connection_info import (
|
||||
ConnectionInfo,
|
||||
ConnectionMode,
|
||||
ConnectionState,
|
||||
URLParsingError,
|
||||
)
|
||||
from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine
|
||||
from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType, SERVER_PORT
|
||||
from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer
|
||||
from cthulhu.plugins.CthulhuRemote.socket_utils import address_to_host_port
|
||||
from cthulhu.plugins.CthulhuRemote.transport import RelayTransport
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CthulhuRemote(Plugin):
|
||||
"""Remote access plugin compatible with NVDA Remote relay servers."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._transport: RelayTransport | None = None
|
||||
self._connectionInfo: ConnectionInfo | None = None
|
||||
self._connectionState = ConnectionState.DISCONNECTED
|
||||
self._localMachine = LocalMachine(self._present_message)
|
||||
self._muted = False
|
||||
self._speechMonitorRegistered = False
|
||||
|
||||
@cthulhu_hookimpl
|
||||
def activate(self, plugin=None):
|
||||
if plugin is not None and plugin is not self:
|
||||
return
|
||||
self.registerGestureByString(
|
||||
self.disconnect,
|
||||
"Disconnect Cthulhu Remote",
|
||||
"kb:cthulhu+alt+page_down",
|
||||
)
|
||||
self.registerGestureByString(
|
||||
self.toggle_mute,
|
||||
"Toggle Cthulhu Remote mute",
|
||||
"kb:cthulhu+alt+delete",
|
||||
)
|
||||
self.registerGestureByString(
|
||||
self.push_clipboard,
|
||||
"Push clipboard to Cthulhu Remote peers",
|
||||
"kb:cthulhu+control+shift+c",
|
||||
)
|
||||
logger.info("Cthulhu Remote activated")
|
||||
|
||||
@cthulhu_hookimpl
|
||||
def deactivate(self, plugin=None):
|
||||
if plugin is not None and plugin is not self:
|
||||
return
|
||||
self.disconnect(notify_user=False)
|
||||
logger.info("Cthulhu Remote deactivated")
|
||||
|
||||
@dbus_service.parameterized_command
|
||||
def connect(
|
||||
self,
|
||||
host: str,
|
||||
key: str,
|
||||
mode: str = "master",
|
||||
port: int = SERVER_PORT,
|
||||
insecure: bool = False,
|
||||
notify_user: bool = True,
|
||||
) -> bool:
|
||||
"""Connect to a Cthulhu Remote relay server."""
|
||||
|
||||
if not host or not key:
|
||||
if notify_user:
|
||||
self._present_message("Cthulhu Remote requires host and key")
|
||||
return False
|
||||
try:
|
||||
connectionInfo = ConnectionInfo(
|
||||
hostname=host,
|
||||
port=port,
|
||||
key=key,
|
||||
mode=ConnectionMode(mode.lower()),
|
||||
insecure=insecure,
|
||||
)
|
||||
except ValueError:
|
||||
if notify_user:
|
||||
self._present_message("Cthulhu Remote mode must be master or slave")
|
||||
return False
|
||||
return self._connect(connectionInfo, notify_user=notify_user)
|
||||
|
||||
@dbus_service.parameterized_command
|
||||
def connect_to_address(
|
||||
self,
|
||||
address: str,
|
||||
key: str,
|
||||
mode: str = "master",
|
||||
insecure: bool = False,
|
||||
notify_user: bool = True,
|
||||
) -> bool:
|
||||
"""Connect using host[:port] address text."""
|
||||
|
||||
host, port = address_to_host_port(address)
|
||||
return self.connect(host, key, mode=mode, port=port, insecure=insecure, notify_user=notify_user)
|
||||
|
||||
@dbus_service.parameterized_command
|
||||
def connect_to_url(self, url: str, notify_user: bool = True) -> bool:
|
||||
"""Connect using a cthulhuremote:// or nvdaremote:// URL."""
|
||||
|
||||
if url.startswith("nvdaremote://"):
|
||||
url = "cthulhuremote://" + url.split("://", 1)[1]
|
||||
try:
|
||||
connectionInfo = ConnectionInfo.from_url(url)
|
||||
except URLParsingError as error:
|
||||
logger.warning("Invalid Cthulhu Remote URL: %s", error)
|
||||
if notify_user:
|
||||
self._present_message("Invalid Cthulhu Remote URL")
|
||||
return False
|
||||
return self._connect(connectionInfo, notify_user=notify_user)
|
||||
|
||||
@dbus_service.command
|
||||
def disconnect(self, script=None, inputEvent=None, notify_user: bool = True) -> bool:
|
||||
"""Disconnect the current Cthulhu Remote session."""
|
||||
|
||||
if self._transport is not None:
|
||||
self._connectionState = ConnectionState.DISCONNECTING
|
||||
self._transport.close()
|
||||
self._transport = None
|
||||
self._deregister_speech_monitor()
|
||||
self._connectionState = ConnectionState.DISCONNECTED
|
||||
if notify_user:
|
||||
self._present_message("Cthulhu Remote disconnected")
|
||||
return True
|
||||
|
||||
@dbus_service.command
|
||||
def toggle_mute(self, script=None, inputEvent=None) -> bool:
|
||||
"""Mute or unmute incoming remote output."""
|
||||
|
||||
self._muted = not self._muted
|
||||
self._localMachine.isMuted = self._muted
|
||||
self._present_message("Cthulhu Remote muted" if self._muted else "Cthulhu Remote unmuted")
|
||||
return True
|
||||
|
||||
@dbus_service.command
|
||||
def push_clipboard(self, script=None, inputEvent=None) -> bool:
|
||||
"""Push local clipboard text to connected remote peers."""
|
||||
|
||||
if not self._transport or not self._transport.connected:
|
||||
self._present_message("Cthulhu Remote is not connected")
|
||||
return True
|
||||
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
|
||||
text = clipboard.wait_for_text()
|
||||
if not text:
|
||||
self._present_message("Clipboard does not contain text")
|
||||
return True
|
||||
self._transport.send(RemoteMessageType.set_clipboard_text, text=text)
|
||||
self._present_message("Clipboard pushed")
|
||||
return True
|
||||
|
||||
@dbus_service.command
|
||||
def copy_invite_url(self, script=None, inputEvent=None) -> bool:
|
||||
"""Copy a remote invite URL for the opposite connection role."""
|
||||
|
||||
if not self._connectionInfo:
|
||||
self._present_message("Cthulhu Remote is not connected")
|
||||
return True
|
||||
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
|
||||
clipboard.set_text(self._connectionInfo.get_url_to_connect(), -1)
|
||||
clipboard.store()
|
||||
self._present_message("Cthulhu Remote invite copied")
|
||||
return True
|
||||
|
||||
@dbus_service.command
|
||||
def generate_key(self) -> str:
|
||||
"""Generate a random remote connection key."""
|
||||
|
||||
return secrets.token_urlsafe(18)
|
||||
|
||||
@dbus_service.getter
|
||||
def get_state(self) -> str:
|
||||
"""Return the Cthulhu Remote connection state."""
|
||||
|
||||
return self._connectionState.value
|
||||
|
||||
@dbus_service.getter
|
||||
def get_connection_url(self) -> str:
|
||||
"""Return the current connection URL, if connected."""
|
||||
|
||||
if not self._connectionInfo:
|
||||
return ""
|
||||
return self._connectionInfo.get_url()
|
||||
|
||||
def _connect(self, connectionInfo: ConnectionInfo, notify_user: bool = True) -> bool:
|
||||
self.disconnect(notify_user=False)
|
||||
serializer = JSONSerializer()
|
||||
transport = RelayTransport.create(connectionInfo, serializer)
|
||||
self._register_transport_handlers(transport, connectionInfo.mode)
|
||||
transport.transportConnected.register(self._handle_transport_connected)
|
||||
transport.transportDisconnected.register(self._handle_transport_disconnected)
|
||||
transport.transportConnectionFailed.register(self._handle_transport_failed)
|
||||
transport.transportCertificateAuthenticationFailed.register(self._handle_certificate_failed)
|
||||
self._transport = transport
|
||||
self._connectionInfo = connectionInfo
|
||||
self._connectionState = ConnectionState.CONNECTING
|
||||
self._register_speech_monitor(connectionInfo.mode)
|
||||
transport.start()
|
||||
if notify_user:
|
||||
self._present_message("Cthulhu Remote connecting")
|
||||
return True
|
||||
|
||||
def _register_speech_monitor(self, mode: ConnectionMode) -> None:
|
||||
if mode != ConnectionMode.SLAVE or self._speechMonitorRegistered:
|
||||
return
|
||||
|
||||
speech.add_monitor_callback(self._send_local_speech)
|
||||
self._speechMonitorRegistered = True
|
||||
|
||||
def _deregister_speech_monitor(self) -> None:
|
||||
if not self._speechMonitorRegistered:
|
||||
return
|
||||
|
||||
speech.remove_monitor_callback(self._send_local_speech)
|
||||
self._speechMonitorRegistered = False
|
||||
|
||||
def _send_local_speech(self, text: str) -> None:
|
||||
if not text or not text.strip():
|
||||
return
|
||||
if not self._connectionInfo or self._connectionInfo.mode != ConnectionMode.SLAVE:
|
||||
return
|
||||
if not self._transport or not self._transport.connected:
|
||||
return
|
||||
|
||||
self._transport.send(RemoteMessageType.speak, sequence=[text])
|
||||
|
||||
def _register_transport_handlers(self, transport: RelayTransport, mode: ConnectionMode) -> None:
|
||||
transport.register_inbound(RemoteMessageType.speak, self._localMachine.speak)
|
||||
transport.register_inbound(RemoteMessageType.cancel, self._localMachine.cancel_speech)
|
||||
transport.register_inbound(RemoteMessageType.pause_speech, self._localMachine.pause_speech)
|
||||
transport.register_inbound(RemoteMessageType.set_clipboard_text, self._localMachine.set_clipboard_text)
|
||||
transport.register_inbound(RemoteMessageType.motd, self._handle_motd)
|
||||
transport.register_inbound(RemoteMessageType.version_mismatch, self._handle_version_mismatch)
|
||||
transport.register_inbound(RemoteMessageType.channel_joined, self._handle_channel_joined)
|
||||
transport.register_inbound(RemoteMessageType.client_joined, self._handle_client_joined)
|
||||
transport.register_inbound(RemoteMessageType.client_left, self._handle_client_left)
|
||||
if mode == ConnectionMode.SLAVE:
|
||||
transport.register_inbound(RemoteMessageType.key, self._localMachine.send_key)
|
||||
transport.register_inbound(RemoteMessageType.braille_input, self._localMachine.braille_input)
|
||||
transport.register_inbound(
|
||||
RemoteMessageType.send_SAS,
|
||||
self._localMachine.send_secure_attention_sequence,
|
||||
)
|
||||
else:
|
||||
transport.register_inbound(RemoteMessageType.display, self._localMachine.display)
|
||||
transport.register_inbound(
|
||||
RemoteMessageType.set_display_size,
|
||||
self._localMachine.set_braille_display_size,
|
||||
)
|
||||
|
||||
def _handle_transport_connected(self, **kwargs: Any) -> None:
|
||||
self._connectionState = ConnectionState.CONNECTED
|
||||
self._present_message("Cthulhu Remote connected")
|
||||
|
||||
def _handle_transport_disconnected(self, **kwargs: Any) -> None:
|
||||
if self._connectionState != ConnectionState.DISCONNECTING:
|
||||
self._connectionState = ConnectionState.DISCONNECTED
|
||||
self._present_message("Cthulhu Remote disconnected")
|
||||
|
||||
def _handle_transport_failed(self, **kwargs: Any) -> None:
|
||||
self._connectionState = ConnectionState.DISCONNECTED
|
||||
self._present_message("Cthulhu Remote connection failed")
|
||||
|
||||
def _handle_certificate_failed(self, **kwargs: Any) -> None:
|
||||
self._connectionState = ConnectionState.DISCONNECTED
|
||||
fingerprint = self._transport.lastFailFingerprint if self._transport else None
|
||||
if fingerprint:
|
||||
logger.warning("Cthulhu Remote certificate fingerprint: %s", fingerprint)
|
||||
self._present_message("Cthulhu Remote certificate verification failed")
|
||||
|
||||
def _handle_motd(self, motd: str = "", **kwargs: Any) -> None:
|
||||
if motd:
|
||||
self._present_message(motd)
|
||||
|
||||
def _handle_version_mismatch(self, **kwargs: Any) -> None:
|
||||
self._present_message("Cthulhu Remote relay protocol mismatch")
|
||||
self.disconnect(notify_user=False)
|
||||
|
||||
def _handle_channel_joined(self, **kwargs: Any) -> None:
|
||||
logger.info("Cthulhu Remote channel joined: %r", kwargs)
|
||||
|
||||
def _handle_client_joined(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None:
|
||||
logger.info("Cthulhu Remote client joined: %r", client or kwargs)
|
||||
self._present_message("Cthulhu Remote client joined")
|
||||
|
||||
def _handle_client_left(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None:
|
||||
logger.info("Cthulhu Remote client left: %r", client or kwargs)
|
||||
self._present_message("Cthulhu Remote client left")
|
||||
|
||||
def _present_message(self, message: str) -> None:
|
||||
if not self.app:
|
||||
return
|
||||
try:
|
||||
state = self.app.getDynamicApiManager().getAPI("CthulhuState")
|
||||
if state and state.activeScript:
|
||||
state.activeScript.presentMessage(message, resetStyles=False)
|
||||
except Exception:
|
||||
logger.exception("Failed to present Cthulhu Remote message")
|
||||
@@ -0,0 +1,41 @@
|
||||
"""Protocol constants for Cthulhu Remote.
|
||||
|
||||
The wire protocol is intentionally compatible with NVDA Remote protocol
|
||||
version 2 so relay servers can be shared.
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
|
||||
|
||||
PROTOCOL_VERSION = 2
|
||||
SERVER_PORT = 6837
|
||||
URL_PREFIX = "cthulhuremote://"
|
||||
|
||||
|
||||
class RemoteMessageType(Enum):
|
||||
"""Message types used by the remote relay protocol."""
|
||||
|
||||
protocol_version = "protocol_version"
|
||||
join = "join"
|
||||
channel_joined = "channel_joined"
|
||||
client_joined = "client_joined"
|
||||
client_left = "client_left"
|
||||
generate_key = "generate_key"
|
||||
key = "key"
|
||||
speak = "speak"
|
||||
cancel = "cancel"
|
||||
pause_speech = "pause_speech"
|
||||
tone = "tone"
|
||||
wave = "wave"
|
||||
send_SAS = "send_SAS"
|
||||
index = "index"
|
||||
display = "display"
|
||||
braille_input = "braille_input"
|
||||
set_braille_info = "set_braille_info"
|
||||
set_display_size = "set_display_size"
|
||||
set_clipboard_text = "set_clipboard_text"
|
||||
motd = "motd"
|
||||
version_mismatch = "version_mismatch"
|
||||
ping = "ping"
|
||||
error = "error"
|
||||
nvda_not_connected = "nvda_not_connected"
|
||||
@@ -0,0 +1,20 @@
|
||||
"""JSON message serialization for Cthulhu Remote."""
|
||||
|
||||
import json
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
|
||||
class JSONSerializer:
|
||||
"""Serialize newline-delimited JSON remote messages."""
|
||||
|
||||
separator = b"\n"
|
||||
|
||||
def serialize(self, messageType=None, **payload: Any) -> bytes:
|
||||
if isinstance(messageType, Enum):
|
||||
messageType = messageType.value
|
||||
payload["type"] = messageType
|
||||
return json.dumps(payload).encode("utf-8") + self.separator
|
||||
|
||||
def deserialize(self, data: bytes) -> dict[str, Any]:
|
||||
return json.loads(data.decode("utf-8"))
|
||||
@@ -0,0 +1,23 @@
|
||||
"""Socket address helpers for Cthulhu Remote."""
|
||||
|
||||
import urllib.parse
|
||||
|
||||
from .protocol import SERVER_PORT
|
||||
|
||||
|
||||
def address_to_host_port(address: str) -> tuple[str, int]:
|
||||
"""Convert host[:port] text to a host, port tuple."""
|
||||
|
||||
parsedAddress = urllib.parse.urlparse("//" + address)
|
||||
return parsedAddress.hostname or "", parsedAddress.port or SERVER_PORT
|
||||
|
||||
|
||||
def host_port_to_address(hostPort: tuple[str, int]) -> str:
|
||||
"""Convert a host, port tuple to compact host[:port] text."""
|
||||
|
||||
host, port = hostPort
|
||||
if ":" in host:
|
||||
host = f"[{host}]"
|
||||
if port != SERVER_PORT:
|
||||
return f"{host}:{port}"
|
||||
return host
|
||||
@@ -0,0 +1,269 @@
|
||||
"""Threaded TLS transport for Cthulhu Remote relay connections."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import queue
|
||||
import select
|
||||
import socket
|
||||
import ssl
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from gi.repository import GLib
|
||||
|
||||
from .connection_info import ConnectionInfo
|
||||
from .protocol import PROTOCOL_VERSION, RemoteMessageType
|
||||
from .serializer import JSONSerializer
|
||||
from .socket_utils import host_port_to_address
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Action:
|
||||
"""Small callback registrar used by the transport layer."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._handlers: list[Callable[..., Any]] = []
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def register(self, handler: Callable[..., Any]) -> None:
|
||||
with self._lock:
|
||||
if handler not in self._handlers:
|
||||
self._handlers.append(handler)
|
||||
|
||||
def unregister(self, handler: Callable[..., Any]) -> None:
|
||||
with self._lock:
|
||||
if handler in self._handlers:
|
||||
self._handlers.remove(handler)
|
||||
|
||||
def notify(self, **kwargs: Any) -> None:
|
||||
with self._lock:
|
||||
handlers = list(self._handlers)
|
||||
for handler in handlers:
|
||||
handler(**kwargs)
|
||||
|
||||
|
||||
class RelayTransport:
|
||||
"""Connect to an NVDA Remote compatible relay server over TLS."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
serializer: JSONSerializer,
|
||||
address: tuple[str, int],
|
||||
channel: str,
|
||||
connectionType: str,
|
||||
insecure: bool = False,
|
||||
timeout: int = 0,
|
||||
protocolVersion: int = PROTOCOL_VERSION,
|
||||
) -> None:
|
||||
self.serializer = serializer
|
||||
self.address = address
|
||||
self.channel = channel
|
||||
self.connectionType = connectionType
|
||||
self.insecure = insecure
|
||||
self.timeout = timeout
|
||||
self.protocolVersion = protocolVersion
|
||||
self.connected = False
|
||||
self.closed = True
|
||||
self.successfulConnects = 0
|
||||
self.lastFailFingerprint: str | None = None
|
||||
self._buffer = b""
|
||||
self._socket: ssl.SSLSocket | None = None
|
||||
self._socketLock = threading.Lock()
|
||||
self._sendQueue: queue.Queue[bytes | None] = queue.Queue()
|
||||
self._sendThread: threading.Thread | None = None
|
||||
self._connectorThread: threading.Thread | None = None
|
||||
self._running = threading.Event()
|
||||
self._handlers: dict[RemoteMessageType, Action] = {}
|
||||
self.transportConnected = Action()
|
||||
self.transportDisconnected = Action()
|
||||
self.transportCertificateAuthenticationFailed = Action()
|
||||
self.transportConnectionFailed = Action()
|
||||
self.transportClosing = Action()
|
||||
self.transportConnected.register(self._send_join_messages)
|
||||
|
||||
@classmethod
|
||||
def create(cls, connectionInfo: ConnectionInfo, serializer: JSONSerializer) -> "RelayTransport":
|
||||
return cls(
|
||||
serializer=serializer,
|
||||
address=(connectionInfo.hostname, connectionInfo.port),
|
||||
channel=connectionInfo.key,
|
||||
connectionType=connectionInfo.mode.value,
|
||||
insecure=connectionInfo.insecure,
|
||||
)
|
||||
|
||||
def start(self) -> None:
|
||||
if self._connectorThread and self._connectorThread.is_alive():
|
||||
return
|
||||
self.closed = False
|
||||
self._running.set()
|
||||
self._connectorThread = threading.Thread(
|
||||
target=self._connector_loop,
|
||||
name="CthulhuRemoteConnector",
|
||||
daemon=True,
|
||||
)
|
||||
self._connectorThread.start()
|
||||
|
||||
def close(self) -> None:
|
||||
self.transportClosing.notify()
|
||||
self.closed = True
|
||||
self._running.clear()
|
||||
self._disconnect()
|
||||
|
||||
def register_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None:
|
||||
self._handlers.setdefault(messageType, Action()).register(handler)
|
||||
|
||||
def unregister_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None:
|
||||
if messageType in self._handlers:
|
||||
self._handlers[messageType].unregister(handler)
|
||||
|
||||
def send(self, messageType: str | Enum, **payload: Any) -> None:
|
||||
if not self.connected:
|
||||
logger.debug("Dropping remote message while disconnected: %r", messageType)
|
||||
return
|
||||
self._sendQueue.put(self.serializer.serialize(messageType=messageType, **payload))
|
||||
|
||||
def _connector_loop(self) -> None:
|
||||
while self._running.is_set():
|
||||
try:
|
||||
self._run_once()
|
||||
except ssl.SSLCertVerificationError:
|
||||
logger.warning("Cthulhu Remote certificate verification failed")
|
||||
except OSError as error:
|
||||
logger.info("Cthulhu Remote connection failed: %s", error)
|
||||
except Exception:
|
||||
logger.exception("Cthulhu Remote connector failed")
|
||||
if self._running.is_set():
|
||||
time.sleep(5)
|
||||
|
||||
def _run_once(self) -> None:
|
||||
try:
|
||||
self._socket = self._create_socket()
|
||||
self._socket.connect(self.address)
|
||||
except ssl.SSLCertVerificationError:
|
||||
self._capture_failed_fingerprint()
|
||||
self.transportCertificateAuthenticationFailed.notify()
|
||||
raise
|
||||
except Exception:
|
||||
self.transportConnectionFailed.notify()
|
||||
raise
|
||||
|
||||
self.connected = True
|
||||
self.successfulConnects += 1
|
||||
self.transportConnected.notify()
|
||||
self._sendThread = threading.Thread(target=self._send_loop, name="CthulhuRemoteSend", daemon=True)
|
||||
self._sendThread.start()
|
||||
|
||||
try:
|
||||
while self._running.is_set() and self._socket is not None:
|
||||
readers, _, errors = select.select([self._socket], [], [self._socket], 1.0)
|
||||
if errors:
|
||||
break
|
||||
if readers:
|
||||
self._read_available()
|
||||
finally:
|
||||
self.connected = False
|
||||
self.transportDisconnected.notify()
|
||||
self._disconnect()
|
||||
|
||||
def _create_socket(self, insecure: bool | None = None) -> ssl.SSLSocket:
|
||||
insecure = self.insecure if insecure is None else insecure
|
||||
host, port = self.address
|
||||
family, socktype, proto, _, _ = socket.getaddrinfo(host, port)[0]
|
||||
plainSocket = socket.socket(family, socktype, proto)
|
||||
if self.timeout:
|
||||
plainSocket.settimeout(self.timeout)
|
||||
plainSocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
plainSocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
context = ssl._create_unverified_context() if insecure else ssl.create_default_context()
|
||||
return context.wrap_socket(plainSocket, server_hostname=host)
|
||||
|
||||
def _capture_failed_fingerprint(self) -> None:
|
||||
try:
|
||||
socketForCert = self._create_socket(insecure=True)
|
||||
socketForCert.connect(self.address)
|
||||
certificate = socketForCert.getpeercert(binary_form=True)
|
||||
socketForCert.close()
|
||||
if certificate:
|
||||
self.lastFailFingerprint = hashlib.sha256(certificate).hexdigest().lower()
|
||||
except Exception:
|
||||
self.lastFailFingerprint = None
|
||||
|
||||
def _send_join_messages(self) -> None:
|
||||
self.send(RemoteMessageType.protocol_version, version=self.protocolVersion)
|
||||
self.send(RemoteMessageType.join, channel=self.channel, connection_type=self.connectionType)
|
||||
|
||||
def _read_available(self) -> None:
|
||||
if self._socket is None:
|
||||
return
|
||||
with self._socketLock:
|
||||
self._socket.setblocking(False)
|
||||
try:
|
||||
data = self._buffer + self._socket.recv(16384)
|
||||
except ssl.SSLWantReadError:
|
||||
return
|
||||
finally:
|
||||
self._socket.setblocking(True)
|
||||
self._buffer = b""
|
||||
if not data:
|
||||
self._disconnect()
|
||||
return
|
||||
while b"\n" in data:
|
||||
line, _, data = data.partition(b"\n")
|
||||
self._parse_line(line)
|
||||
self._buffer = data
|
||||
|
||||
def _parse_line(self, line: bytes) -> None:
|
||||
try:
|
||||
message = self.serializer.deserialize(line)
|
||||
messageType = RemoteMessageType(message.pop("type"))
|
||||
except Exception:
|
||||
logger.exception("Ignoring invalid remote message: %r", line)
|
||||
return
|
||||
action = self._handlers.get(messageType)
|
||||
if action is None:
|
||||
logger.debug("No handler registered for remote message type: %s", messageType.value)
|
||||
return
|
||||
GLib.idle_add(self._notify_action, action, message)
|
||||
|
||||
def _notify_action(self, action: Action, payload: dict[str, Any]) -> bool:
|
||||
action.notify(**payload)
|
||||
return False
|
||||
|
||||
def _send_loop(self) -> None:
|
||||
while True:
|
||||
item = self._sendQueue.get()
|
||||
if item is None:
|
||||
return
|
||||
try:
|
||||
with self._socketLock:
|
||||
if self._socket is not None:
|
||||
self._socket.sendall(item)
|
||||
except OSError:
|
||||
return
|
||||
|
||||
def _disconnect(self) -> None:
|
||||
if self._sendThread is not None:
|
||||
self._sendQueue.put(None)
|
||||
self._sendThread.join(timeout=2)
|
||||
self._sendThread = None
|
||||
self._clear_send_queue()
|
||||
if self._socket is not None:
|
||||
try:
|
||||
self._socket.close()
|
||||
except OSError:
|
||||
pass
|
||||
self._socket = None
|
||||
self._buffer = b""
|
||||
|
||||
def _clear_send_queue(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
self._sendQueue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
@@ -2,6 +2,7 @@
|
||||
subdir('AIAssistant')
|
||||
subdir('ByeCthulhu')
|
||||
subdir('Clipboard')
|
||||
subdir('CthulhuRemote')
|
||||
subdir('DisplayVersion')
|
||||
subdir('HelloCthulhu')
|
||||
subdir('GameMode')
|
||||
|
||||
@@ -3470,7 +3470,7 @@ class Script(script.Script):
|
||||
return True
|
||||
|
||||
def presentMessage(self, fullMessage, briefMessage=None, voice=None, resetStyles=True,
|
||||
force=False, interrupt=True):
|
||||
force=False, interrupt=False):
|
||||
"""Convenience method to speak a message and 'flash' it in braille.
|
||||
|
||||
Arguments:
|
||||
@@ -3484,6 +3484,8 @@ class Script(script.Script):
|
||||
the briefMessage should set briefMessage to an empty string.
|
||||
- voice: The voice to use when speaking this message. By default, the
|
||||
"system" voice will be used.
|
||||
- interrupt: If True, any current speech should be interrupted
|
||||
prior to speaking the new text. The default queues the message.
|
||||
"""
|
||||
|
||||
if not fullMessage:
|
||||
@@ -3959,7 +3961,7 @@ class Script(script.Script):
|
||||
voice = self.speechGenerator.voice(string=character)
|
||||
speech.speakCharacter(character, voice)
|
||||
|
||||
def speakMessage(self, string, voice=None, interrupt=True, resetStyles=True, force=False):
|
||||
def speakMessage(self, string, voice=None, interrupt=False, resetStyles=True, force=False):
|
||||
"""Method to speak a single string. Scripts should use this
|
||||
method rather than calling speech.speak directly.
|
||||
|
||||
@@ -3967,7 +3969,7 @@ class Script(script.Script):
|
||||
- voice: The voice to use. By default, the "system" voice will
|
||||
be used.
|
||||
- interrupt: If True, any current speech should be interrupted
|
||||
prior to speaking the new text.
|
||||
prior to speaking the new text. The default queues the message.
|
||||
"""
|
||||
|
||||
if not cthulhu.cthulhuApp.settingsManager.getSetting('enableSpeech') \
|
||||
|
||||
@@ -215,7 +215,7 @@ class Script(default.Script):
|
||||
self._clearSyntheticWebSelection()
|
||||
if oldContents:
|
||||
self.speakContents(oldContents)
|
||||
self.speakMessage(messages.TEXT_UNSELECTED, interrupt=False)
|
||||
self.speakMessage(messages.TEXT_UNSELECTED)
|
||||
return True
|
||||
|
||||
self.pointOfReference["syntheticWebSelection"] = {
|
||||
@@ -247,7 +247,7 @@ class Script(default.Script):
|
||||
|
||||
if deltaContents:
|
||||
self.speakContents(deltaContents)
|
||||
self.speakMessage(message, interrupt=False)
|
||||
self.speakMessage(message)
|
||||
|
||||
return True
|
||||
|
||||
@@ -1083,7 +1083,7 @@ class Script(default.Script):
|
||||
"""Speaks the specified contents."""
|
||||
|
||||
utterances = self.speechGenerator.generateContents(contents, **args)
|
||||
speech.speak(utterances)
|
||||
speech.speak(utterances, interrupt=args.get("interrupt", False))
|
||||
|
||||
def sayCharacter(self, obj):
|
||||
"""Speaks the character at the current caret position."""
|
||||
@@ -1511,6 +1511,9 @@ class Script(default.Script):
|
||||
elif AXUtilities.is_menu(parent):
|
||||
self.utilities.setCaretContext(AXObject.get_parent(parent), -1)
|
||||
if not self._loadingDocumentContent:
|
||||
if inputEvent is not None:
|
||||
self.presentMessage(messages.MODE_BROWSE, interrupt=True)
|
||||
else:
|
||||
self.presentMessage(messages.MODE_BROWSE)
|
||||
if not self._shouldSuppressBrowseModeSound(obj, inputEvent):
|
||||
sound_theme_manager.getManager().playBrowseModeSound()
|
||||
@@ -1521,6 +1524,9 @@ class Script(default.Script):
|
||||
or inputEvent):
|
||||
self.utilities.grabFocus(obj)
|
||||
|
||||
if inputEvent is not None:
|
||||
self.presentMessage(messages.MODE_FOCUS, interrupt=True)
|
||||
else:
|
||||
self.presentMessage(messages.MODE_FOCUS)
|
||||
sound_theme_manager.getManager().playFocusModeSound()
|
||||
self._inFocusMode = not self._inFocusMode
|
||||
@@ -1873,11 +1879,15 @@ class Script(default.Script):
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
return True
|
||||
|
||||
if self.utilities.shouldInterruptForLocusOfFocusChange(oldFocus, newFocus, event):
|
||||
self.presentationInterrupt()
|
||||
|
||||
args["interrupt"] = False
|
||||
if contents:
|
||||
self.speakContents(contents, **args)
|
||||
else:
|
||||
utterances = self.speechGenerator.generateSpeech(newFocus, **args)
|
||||
speech.speak(utterances)
|
||||
speech.speak(utterances, interrupt=False)
|
||||
|
||||
self._saveFocusedObjectInfo(newFocus)
|
||||
|
||||
@@ -2266,7 +2276,10 @@ class Script(default.Script):
|
||||
tokens = ["WEB: Dumping cache and context: source is focus",
|
||||
cthulhu_state.locusOfFocus]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
self.utilities.dumpCache(document, preserveContext=False)
|
||||
self.utilities.dumpCache(
|
||||
document,
|
||||
preserveContext=not AXObject.is_dead(event.source),
|
||||
)
|
||||
elif AXObject.is_dead(cthulhu_state.locusOfFocus):
|
||||
msg = "WEB: Dumping cache: dead focus"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
@@ -2379,7 +2392,10 @@ class Script(default.Script):
|
||||
tokens = ["WEB: Dumping cache and context: source is focus",
|
||||
cthulhu_state.locusOfFocus]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
self.utilities.dumpCache(document, preserveContext=False)
|
||||
self.utilities.dumpCache(
|
||||
document,
|
||||
preserveContext=not AXObject.is_dead(event.source),
|
||||
)
|
||||
elif AXObject.is_dead(cthulhu_state.locusOfFocus):
|
||||
msg = "WEB: Dumping cache: dead focus"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
@@ -2584,6 +2600,14 @@ class Script(default.Script):
|
||||
else:
|
||||
msg = "WEB: Search for caret context failed"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
if AXUtilities.is_focusable(event.source) \
|
||||
and AXUtilities.is_focused(event.source) \
|
||||
and self.utilities.inDocumentContent(event.source):
|
||||
msg = "WEB: Falling back to focused event source as caret context"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
cthulhu.setLocusOfFocus(event, event.source, False)
|
||||
self.utilities.setCaretContext(event.source, 0)
|
||||
obj, offset = event.source, 0
|
||||
|
||||
if self._lastCommandWasCaretNav:
|
||||
msg = "WEB: Event ignored: Last command was caret nav"
|
||||
@@ -2649,6 +2673,44 @@ class Script(default.Script):
|
||||
self._lastCommandWasMouseButton = True
|
||||
return False
|
||||
|
||||
def onDescriptionChanged(self, event):
|
||||
"""Callback for object:property-change:accessible-description events."""
|
||||
|
||||
if self.utilities.eventIsBrowserUINoise(event):
|
||||
msg = "WEB: Ignoring event believed to be browser UI noise"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
return True
|
||||
|
||||
obj = event.source
|
||||
if not self.utilities.inDocumentContent(obj):
|
||||
return False
|
||||
|
||||
descriptions = self.pointOfReference.get('descriptions', {})
|
||||
oldDescription = descriptions.get(hash(obj))
|
||||
if oldDescription == event.any_data:
|
||||
tokens = ["WEB: Old description (", oldDescription, ") is the same as new one"]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
return True
|
||||
|
||||
descriptions[hash(obj)] = event.any_data
|
||||
self.pointOfReference['descriptions'] = descriptions
|
||||
if obj != cthulhu_state.locusOfFocus:
|
||||
msg = "WEB: Description change is for object other than locusOfFocus"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
return True
|
||||
|
||||
if not event.any_data:
|
||||
return True
|
||||
|
||||
name = AXObject.get_name(obj)
|
||||
if self.utilities.stringsAreRedundant(name, event.any_data):
|
||||
tokens = ["WEB: Description change is redundant with name for", obj]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
return True
|
||||
|
||||
self.presentMessage(event.any_data)
|
||||
return True
|
||||
|
||||
def onNameChanged(self, event):
|
||||
"""Callback for object:property-change:accessible-name events."""
|
||||
|
||||
|
||||
@@ -276,6 +276,14 @@ class Utilities(script_utilities.Utilities):
|
||||
|
||||
def sanityCheckActiveWindow(self):
|
||||
app = self._script.app
|
||||
if app is None:
|
||||
app = AXObject.get_application(cthulhu_state.activeWindow) \
|
||||
or AXObject.get_application(cthulhu_state.locusOfFocus)
|
||||
if app is not None:
|
||||
tokens = ["WEB: recovered script app for active window check:", app]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
self._script.app = app
|
||||
|
||||
if AXObject.get_parent(cthulhu_state.activeWindow) == app:
|
||||
return True
|
||||
|
||||
@@ -299,6 +307,14 @@ class Utilities(script_utilities.Utilities):
|
||||
setattr(self._script, attr, value)
|
||||
|
||||
window = self.activeWindow(app)
|
||||
if window is None:
|
||||
tokens = [
|
||||
"WARNING: WEB could not confirm a replacement active window; preserving",
|
||||
cthulhu_state.activeWindow,
|
||||
]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
return cthulhu_state.activeWindow is not None
|
||||
|
||||
self._script.app = AXObject.get_application(window)
|
||||
tokens = ["WEB: updating script's app to", self._script.app]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
@@ -5034,7 +5050,8 @@ class Utilities(script_utilities.Utilities):
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
return False
|
||||
|
||||
if not AXObject.is_dead(cthulhu_state.locusOfFocus):
|
||||
if not AXObject.is_dead(cthulhu_state.locusOfFocus) \
|
||||
and not self.isSameObject(replicant, cthulhu_state.locusOfFocus, True, True):
|
||||
tokens = ["WEB: Not event from context replicant. locusOfFocus",
|
||||
cthulhu_state.locusOfFocus, "is not dead."]
|
||||
debug.printTokens(debug.LEVEL_INFO, tokens, True)
|
||||
|
||||
+37
-4
@@ -36,6 +36,7 @@ __license__ = "LGPL"
|
||||
|
||||
import importlib
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, Optional, List, Dict, Any, Union, Callable
|
||||
|
||||
from . import debug
|
||||
@@ -76,6 +77,8 @@ _timestamp: float = 0.0
|
||||
|
||||
# Optional callback for live monitoring of spoken text.
|
||||
_monitorWriteTextCallback: Optional[Callable[[str], None]] = None
|
||||
_monitorWriteTextListeners: List[Callable[[str], None]] = []
|
||||
_monitorSuppressionDepth = 0
|
||||
|
||||
def _isSpeechDispatcherFactory(moduleName: Optional[str]) -> bool:
|
||||
if not moduleName:
|
||||
@@ -325,13 +328,42 @@ def set_monitor_callbacks(writeText: Optional[Callable[[str], None]] = None) ->
|
||||
global _monitorWriteTextCallback
|
||||
_monitorWriteTextCallback = writeText
|
||||
|
||||
def add_monitor_callback(writeText: Callable[[str], None]) -> None:
|
||||
"""Adds a runtime callback for live speech monitoring."""
|
||||
if writeText not in _monitorWriteTextListeners:
|
||||
_monitorWriteTextListeners.append(writeText)
|
||||
|
||||
def remove_monitor_callback(writeText: Callable[[str], None]) -> None:
|
||||
"""Removes a runtime callback for live speech monitoring."""
|
||||
if writeText in _monitorWriteTextListeners:
|
||||
_monitorWriteTextListeners.remove(writeText)
|
||||
|
||||
@contextmanager
|
||||
def suppress_monitor_callbacks():
|
||||
"""Temporarily suppresses live speech monitoring callbacks."""
|
||||
global _monitorSuppressionDepth
|
||||
_monitorSuppressionDepth += 1
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_monitorSuppressionDepth = max(0, _monitorSuppressionDepth - 1)
|
||||
|
||||
def _write_to_monitor(text: str) -> None:
|
||||
"""Writes text to the active speech monitor callback if set."""
|
||||
if _monitorWriteTextCallback is None:
|
||||
if _monitorSuppressionDepth:
|
||||
return
|
||||
|
||||
callbacks = []
|
||||
if _monitorWriteTextCallback is not None:
|
||||
callbacks.append(_monitorWriteTextCallback)
|
||||
callbacks.extend(
|
||||
callback for callback in _monitorWriteTextListeners
|
||||
if callback != _monitorWriteTextCallback
|
||||
)
|
||||
|
||||
for callback in callbacks:
|
||||
try:
|
||||
_monitorWriteTextCallback(text)
|
||||
callback(text)
|
||||
except Exception:
|
||||
debug.printException(debug.LEVEL_INFO)
|
||||
|
||||
@@ -425,10 +457,11 @@ def _speak(text: str, acss: Optional[Any], interrupt: bool) -> None:
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
_speechserver.speak(text, resolvedVoice, interrupt) # type: ignore
|
||||
|
||||
def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = True) -> None:
|
||||
def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = False) -> None:
|
||||
"""Speaks the given content. The content can be either a simple
|
||||
string or an array of arrays of objects returned by a speech
|
||||
generator."""
|
||||
generator. Speech queues by default; callers that need to cancel
|
||||
current output should pass interrupt=True or call presentationInterrupt()."""
|
||||
|
||||
if settings.silenceSpeech:
|
||||
return
|
||||
|
||||
@@ -632,7 +632,7 @@ class SpeechServer(speechserver.SpeechServer):
|
||||
|
||||
return families
|
||||
|
||||
def speak(self, text=None, acss=None, interrupt=True):
|
||||
def speak(self, text=None, acss=None, interrupt=False):
|
||||
if not text:
|
||||
return
|
||||
|
||||
|
||||
@@ -935,9 +935,9 @@ class StructuralNavigation:
|
||||
return
|
||||
|
||||
if not isNext:
|
||||
self._script.presentMessage(messages.WRAPPING_TO_BOTTOM)
|
||||
wrapMessage = messages.WRAPPING_TO_BOTTOM
|
||||
else:
|
||||
self._script.presentMessage(messages.WRAPPING_TO_TOP)
|
||||
wrapMessage = messages.WRAPPING_TO_TOP
|
||||
|
||||
matches = self._getAll(structuralNavigationObject, arg)
|
||||
if not isNext:
|
||||
@@ -946,6 +946,7 @@ class StructuralNavigation:
|
||||
for match in matches:
|
||||
if _isValidMatch(match):
|
||||
structuralNavigationObject.present(match, arg)
|
||||
self._script.presentMessage(wrapMessage)
|
||||
return
|
||||
|
||||
structuralNavigationObject.present(None, arg)
|
||||
@@ -2218,14 +2219,13 @@ class StructuralNavigation:
|
||||
if settings.speakCellCoordinates:
|
||||
[row, col] = self.getCellCoordinates(cell)
|
||||
self._script.presentMessage(
|
||||
messages.TABLE_CELL_COORDINATES % {"row": row + 1, "column": col + 1},
|
||||
interrupt=False,
|
||||
messages.TABLE_CELL_COORDINATES % {"row": row + 1, "column": col + 1}
|
||||
)
|
||||
|
||||
rowspan, colspan = self._script.utilities.rowAndColumnSpan(cell)
|
||||
spanString = messages.cellSpan(rowspan, colspan)
|
||||
if spanString and settings.speakCellSpan:
|
||||
self._script.presentMessage(spanString, interrupt=False)
|
||||
self._script.presentMessage(spanString)
|
||||
|
||||
########################
|
||||
# #
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
import gi
|
||||
gi.require_version("Atspi", "2.0")
|
||||
from gi.repository import Atspi
|
||||
|
||||
from cthulhu import ax_object
|
||||
from cthulhu import cthulhu_state
|
||||
from cthulhu import focus_manager
|
||||
|
||||
|
||||
class ChromiumOmniboxRegressionTests(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
cthulhu_state.activeWindow = None
|
||||
cthulhu_state.locusOfFocus = None
|
||||
|
||||
def test_detects_chromium_popup_item_with_broken_ancestry(self):
|
||||
popupItem = object()
|
||||
|
||||
with (
|
||||
mock.patch.object(ax_object.AXObject, "is_dead", return_value=False),
|
||||
mock.patch.object(ax_object.AXObject, "get_toolkit_name", return_value="chromium"),
|
||||
mock.patch.object(ax_object.AXObject, "get_role", return_value=Atspi.Role.LIST_ITEM),
|
||||
mock.patch.object(ax_object.AXObject, "get_parent", return_value=None),
|
||||
mock.patch.object(ax_object.debug, "print_tokens"),
|
||||
):
|
||||
self.assertTrue(ax_object.AXObject.has_broken_popup_ancestry(popupItem))
|
||||
|
||||
def test_ignores_chromium_popup_item_with_valid_ancestry(self):
|
||||
popupItem = object()
|
||||
parent = object()
|
||||
|
||||
def get_role(obj):
|
||||
if obj is popupItem:
|
||||
return Atspi.Role.LIST_ITEM
|
||||
return Atspi.Role.APPLICATION
|
||||
|
||||
with (
|
||||
mock.patch.object(ax_object.AXObject, "is_dead", return_value=False),
|
||||
mock.patch.object(ax_object.AXObject, "get_toolkit_name", return_value="chromium"),
|
||||
mock.patch.object(ax_object.AXObject, "get_role", side_effect=get_role),
|
||||
mock.patch.object(ax_object.AXObject, "get_parent", side_effect=[parent, None]),
|
||||
):
|
||||
self.assertFalse(ax_object.AXObject.has_broken_popup_ancestry(popupItem))
|
||||
|
||||
def test_preserves_active_window_when_chromium_popup_ancestry_is_broken(self):
|
||||
activeWindow = object()
|
||||
popupItem = object()
|
||||
cthulhu_state.activeWindow = activeWindow
|
||||
cthulhu_state.locusOfFocus = popupItem
|
||||
|
||||
controller = mock.Mock()
|
||||
app = mock.Mock()
|
||||
manager = None
|
||||
with (
|
||||
mock.patch.object(focus_manager.dbus_service, "get_remote_controller", return_value=controller),
|
||||
mock.patch.object(focus_manager.AXObject, "has_broken_popup_ancestry", return_value=True),
|
||||
):
|
||||
manager = focus_manager.FocusManager(app)
|
||||
manager.set_active_window(None)
|
||||
|
||||
self.assertIs(manager.get_active_window(), activeWindow)
|
||||
self.assertIs(cthulhu_state.activeWindow, activeWindow)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,129 @@
|
||||
import sys
|
||||
import types
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||
|
||||
input_event_manager_stub = types.ModuleType("cthulhu.input_event_manager")
|
||||
input_event_manager_stub.get_manager = mock.Mock(return_value=mock.Mock())
|
||||
sys.modules["cthulhu.input_event_manager"] = input_event_manager_stub
|
||||
|
||||
from cthulhu.plugin_system_manager import PluginSystemManager
|
||||
from cthulhu import cthulhu_state
|
||||
from cthulhu import speech
|
||||
from cthulhu.plugins.CthulhuRemote.connection_info import ConnectionInfo, ConnectionMode
|
||||
from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine
|
||||
from cthulhu.plugins.CthulhuRemote.plugin import CthulhuRemote
|
||||
from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType
|
||||
from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer
|
||||
|
||||
|
||||
class CthulhuRemotePluginTests(unittest.TestCase):
|
||||
def test_connection_info_accepts_nvda_remote_style_fields(self):
|
||||
info = ConnectionInfo.from_url("cthulhuremote://example.com:1234?key=abc&mode=slave")
|
||||
|
||||
self.assertEqual(info.hostname, "example.com")
|
||||
self.assertEqual(info.port, 1234)
|
||||
self.assertEqual(info.key, "abc")
|
||||
self.assertEqual(info.mode, ConnectionMode.SLAVE)
|
||||
self.assertEqual(
|
||||
info.get_url_to_connect(),
|
||||
"cthulhuremote://example.com:1234?key=abc&mode=master",
|
||||
)
|
||||
|
||||
def test_json_serializer_uses_remote_message_type_values(self):
|
||||
serializer = JSONSerializer()
|
||||
|
||||
payload = serializer.deserialize(serializer.serialize(RemoteMessageType.join, channel="abc"))
|
||||
|
||||
self.assertEqual(payload["type"], "join")
|
||||
self.assertEqual(payload["channel"], "abc")
|
||||
|
||||
def test_local_machine_maps_common_windows_vk_codes_to_keyvals(self):
|
||||
machine = LocalMachine(lambda message: None)
|
||||
|
||||
self.assertEqual(machine._resolve_keyval(0x41, None), machine._resolve_keyval(None, "a"))
|
||||
self.assertEqual(machine._resolve_keyval(0x70, None), machine._resolve_keyval(None, "F1"))
|
||||
self.assertIsNone(machine._resolve_keyval(0xFF, None))
|
||||
|
||||
def test_speech_monitor_callbacks_are_additive(self):
|
||||
primary = mock.Mock()
|
||||
listener = mock.Mock()
|
||||
speech.set_monitor_callbacks(writeText=primary)
|
||||
speech.add_monitor_callback(listener)
|
||||
self.addCleanup(speech.set_monitor_callbacks, None)
|
||||
self.addCleanup(speech.remove_monitor_callback, listener)
|
||||
|
||||
speech._write_to_monitor("status")
|
||||
|
||||
primary.assert_called_once_with("status")
|
||||
listener.assert_called_once_with("status")
|
||||
|
||||
def test_remote_speech_does_not_echo_to_monitor_callbacks(self):
|
||||
listener = mock.Mock()
|
||||
speech.add_monitor_callback(listener)
|
||||
self.addCleanup(speech.remove_monitor_callback, listener)
|
||||
|
||||
with (
|
||||
mock.patch.object(speech, "_speechserver", None),
|
||||
mock.patch.object(speech.speech_history, "add"),
|
||||
mock.patch.object(cthulhu_state, "activeScript", None),
|
||||
):
|
||||
LocalMachine(lambda message: None).speak(["remote", "status"])
|
||||
|
||||
listener.assert_not_called()
|
||||
|
||||
def test_slave_mode_forwards_local_speech_to_relay(self):
|
||||
plugin = CthulhuRemote()
|
||||
plugin._connectionInfo = ConnectionInfo(
|
||||
hostname="example.com",
|
||||
port=1234,
|
||||
key="abc",
|
||||
mode=ConnectionMode.SLAVE,
|
||||
)
|
||||
plugin._transport = mock.Mock()
|
||||
plugin._transport.connected = True
|
||||
|
||||
plugin._send_local_speech("focused button")
|
||||
|
||||
plugin._transport.send.assert_called_once_with(
|
||||
RemoteMessageType.speak,
|
||||
sequence=["focused button"],
|
||||
)
|
||||
|
||||
def test_master_mode_does_not_forward_local_speech_to_relay(self):
|
||||
plugin = CthulhuRemote()
|
||||
plugin._connectionInfo = ConnectionInfo(
|
||||
hostname="example.com",
|
||||
port=1234,
|
||||
key="abc",
|
||||
mode=ConnectionMode.MASTER,
|
||||
)
|
||||
plugin._transport = mock.Mock()
|
||||
plugin._transport.connected = True
|
||||
|
||||
plugin._send_local_speech("local status")
|
||||
|
||||
plugin._transport.send.assert_not_called()
|
||||
|
||||
@mock.patch("cthulhu.plugin_system_manager.dbus_service.get_remote_controller")
|
||||
def test_plugin_manager_can_load_cthulhu_remote(self, remote_controller):
|
||||
remote_controller.return_value = mock.Mock()
|
||||
app = mock.Mock()
|
||||
app.getSignalManager.return_value = mock.Mock()
|
||||
app.getAPIHelper.return_value = None
|
||||
manager = PluginSystemManager(app)
|
||||
manager.rescanPlugins()
|
||||
plugin_info = manager._resolve_plugin_info("CthulhuRemote")
|
||||
|
||||
self.assertIsNotNone(plugin_info)
|
||||
self.assertTrue(manager.loadPlugin(plugin_info))
|
||||
|
||||
self.assertEqual(plugin_info.instance.__class__.__name__, "CthulhuRemote")
|
||||
self.assertTrue(manager.unloadPlugin(plugin_info))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,91 @@
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||
|
||||
from cthulhu import settings
|
||||
from cthulhu import speech
|
||||
from cthulhu import cthulhu_state
|
||||
from cthulhu.scripts import default
|
||||
|
||||
|
||||
class SpeechDefaultPolicyRegressionTests(unittest.TestCase):
|
||||
def _make_settings_manager(self):
|
||||
values = {
|
||||
"enableSpeech": True,
|
||||
"onlySpeakDisplayedText": False,
|
||||
"messagesAreDetailed": True,
|
||||
"enableBraille": False,
|
||||
"enableBrailleMonitor": False,
|
||||
"enableFlashMessages": False,
|
||||
"voices": {settings.SYSTEM_VOICE: "system"},
|
||||
"capitalizationStyle": settings.CAPITALIZATION_STYLE_NONE,
|
||||
"verbalizePunctuationStyle": settings.PUNCTUATION_STYLE_NONE,
|
||||
}
|
||||
|
||||
manager = mock.Mock()
|
||||
manager.getSetting.side_effect = values.get
|
||||
return manager
|
||||
|
||||
def _make_script(self):
|
||||
testScript = default.Script.__new__(default.Script)
|
||||
testScript.speechAndVerbosityManager = mock.Mock()
|
||||
return testScript
|
||||
|
||||
def test_speech_speak_queues_by_default(self):
|
||||
server = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(speech, "_speechserver", server),
|
||||
mock.patch.object(speech, "_write_to_monitor"),
|
||||
mock.patch.object(speech.speech_history, "add"),
|
||||
mock.patch.object(cthulhu_state, "activeScript", None),
|
||||
):
|
||||
speech.speak("status")
|
||||
|
||||
server.speak.assert_called_once()
|
||||
self.assertFalse(server.speak.call_args.args[2])
|
||||
|
||||
def test_speech_speak_explicit_interrupt_is_preserved(self):
|
||||
server = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(speech, "_speechserver", server),
|
||||
mock.patch.object(speech, "_write_to_monitor"),
|
||||
mock.patch.object(speech.speech_history, "add"),
|
||||
mock.patch.object(cthulhu_state, "activeScript", None),
|
||||
):
|
||||
speech.speak("status", interrupt=True)
|
||||
|
||||
server.speak.assert_called_once()
|
||||
self.assertTrue(server.speak.call_args.args[2])
|
||||
|
||||
def test_present_message_queues_by_default(self):
|
||||
testScript = self._make_script()
|
||||
manager = self._make_settings_manager()
|
||||
|
||||
with (
|
||||
mock.patch.object(default.cthulhu.cthulhuApp, "settingsManager", manager),
|
||||
mock.patch.object(default.speech, "speak") as speak,
|
||||
):
|
||||
default.Script.presentMessage(testScript, "Status")
|
||||
|
||||
speak.assert_called_once_with("Status", "system", False)
|
||||
|
||||
def test_present_message_explicit_interrupt_is_preserved(self):
|
||||
testScript = self._make_script()
|
||||
manager = self._make_settings_manager()
|
||||
|
||||
with (
|
||||
mock.patch.object(default.cthulhu.cthulhuApp, "settingsManager", manager),
|
||||
mock.patch.object(default.speech, "speak") as speak,
|
||||
):
|
||||
default.Script.presentMessage(testScript, "Status", interrupt=True)
|
||||
|
||||
speak.assert_called_once_with("Status", "system", True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -30,6 +30,14 @@ class SpeechDispatcherInterruptRegressionTests(unittest.TestCase):
|
||||
server._cancel.assert_called_once_with()
|
||||
server._speak.assert_called_once_with("long utterance", None)
|
||||
|
||||
def test_string_speech_queues_by_default(self):
|
||||
server = self._make_server()
|
||||
|
||||
server.speak("next")
|
||||
|
||||
server._cancel.assert_not_called()
|
||||
server._speak.assert_called_once_with("next", None)
|
||||
|
||||
def test_recent_key_echo_suppresses_backend_cancel(self):
|
||||
server = self._make_server()
|
||||
server._lastKeyEchoTime = time.time()
|
||||
|
||||
@@ -38,8 +38,47 @@ class StructuralNavigationTableRegressionTests(unittest.TestCase):
|
||||
|
||||
navigator._presentObject.assert_called_once_with("cell", 0)
|
||||
navigator._script.presentMessage.assert_called_once_with(
|
||||
messages.TABLE_CELL_COORDINATES % {"row": 2, "column": 3},
|
||||
interrupt=False,
|
||||
messages.TABLE_CELL_COORDINATES % {"row": 2, "column": 3}
|
||||
)
|
||||
|
||||
def test_wrapping_announcement_does_not_interrupt_wrapped_object(self):
|
||||
navigator = structural_navigation.StructuralNavigation.__new__(
|
||||
structural_navigation.StructuralNavigation
|
||||
)
|
||||
script = mock.Mock()
|
||||
script.utilities.isZombie.return_value = False
|
||||
script.utilities.isHidden.return_value = False
|
||||
script.utilities.isEmpty.return_value = False
|
||||
script.utilities.pathComparison.return_value = 0
|
||||
navigator._script = script
|
||||
|
||||
first = object()
|
||||
current = object()
|
||||
structuralNavigationObject = mock.Mock()
|
||||
structuralNavigationObject.predicate = None
|
||||
events = []
|
||||
structuralNavigationObject.present.side_effect = lambda obj, arg=None: events.append(
|
||||
("present", obj, arg)
|
||||
)
|
||||
script.presentMessage.side_effect = lambda message, **kwargs: events.append(
|
||||
("message", message, kwargs)
|
||||
)
|
||||
navigator._getAll = mock.Mock(return_value=[first, current])
|
||||
|
||||
with (
|
||||
mock.patch.object(structural_navigation.settings, "wrappedStructuralNavigation", True),
|
||||
mock.patch.object(structural_navigation.AXObject, "is_dead", return_value=False),
|
||||
mock.patch.object(structural_navigation.AXObject, "get_parent", return_value=None),
|
||||
mock.patch.object(structural_navigation.AXObject, "get_path", side_effect=lambda obj: [id(obj)]),
|
||||
):
|
||||
navigator.goObject(structuralNavigationObject, True, current, "arg")
|
||||
|
||||
self.assertEqual(
|
||||
events,
|
||||
[
|
||||
("present", first, "arg"),
|
||||
("message", messages.WRAPPING_TO_TOP, {}),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -100,6 +100,338 @@ class WebKeyGrabRegressionTests(unittest.TestCase):
|
||||
testScript.refreshKeyGrabs.assert_called_once_with()
|
||||
|
||||
|
||||
class WebActiveWindowRegressionTests(unittest.TestCase):
|
||||
def test_sanity_check_recovers_missing_script_app_from_active_window(self):
|
||||
testScript = mock.Mock(app=None)
|
||||
utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities)
|
||||
utilities._script = testScript
|
||||
activeWindow = object()
|
||||
app = object()
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script_utilities.cthulhu_state, "activeWindow", activeWindow),
|
||||
mock.patch.object(web_script_utilities.cthulhu_state, "locusOfFocus", None),
|
||||
mock.patch.object(
|
||||
web_script_utilities.AXObject,
|
||||
"get_application",
|
||||
side_effect=lambda obj: app if obj is activeWindow else None,
|
||||
),
|
||||
mock.patch.object(
|
||||
web_script_utilities.AXObject,
|
||||
"get_parent",
|
||||
side_effect=lambda obj: app if obj is activeWindow else None,
|
||||
),
|
||||
mock.patch.object(web_script_utilities.cthulhu, "setActiveWindow") as setActiveWindow,
|
||||
):
|
||||
result = web_script_utilities.Utilities.sanityCheckActiveWindow(utilities)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertIs(testScript.app, app)
|
||||
setActiveWindow.assert_not_called()
|
||||
|
||||
def test_sanity_check_preserves_current_window_when_recovery_is_inconclusive(self):
|
||||
oldApp = object()
|
||||
testScript = mock.Mock(app=oldApp)
|
||||
utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities)
|
||||
utilities._script = testScript
|
||||
activeWindow = object()
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script_utilities.cthulhu_state, "activeWindow", activeWindow),
|
||||
mock.patch.object(
|
||||
web_script_utilities.AXObject,
|
||||
"get_parent",
|
||||
return_value=object(),
|
||||
),
|
||||
mock.patch.object(utilities, "activeWindow", return_value=None),
|
||||
mock.patch.object(web_script_utilities.cthulhu, "setActiveWindow") as setActiveWindow,
|
||||
):
|
||||
result = web_script_utilities.Utilities.sanityCheckActiveWindow(utilities)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertIs(testScript.app, oldApp)
|
||||
setActiveWindow.assert_not_called()
|
||||
|
||||
|
||||
class WebPresentationModeSpeechRegressionTests(unittest.TestCase):
|
||||
def _make_script(self, inFocusMode):
|
||||
testScript = web_script.Script.__new__(web_script.Script)
|
||||
testScript._inFocusMode = inFocusMode
|
||||
testScript._focusModeIsSticky = True
|
||||
testScript._browseModeIsSticky = True
|
||||
testScript._loadingDocumentContent = False
|
||||
testScript._lastCommandWasCaretNav = False
|
||||
testScript._lastCommandWasStructNav = False
|
||||
testScript.utilities = mock.Mock()
|
||||
testScript.utilities.getCaretContext.return_value = ("button", 0)
|
||||
testScript.utilities.grabFocusWhenSettingCaret.return_value = True
|
||||
testScript.presentMessage = mock.Mock()
|
||||
testScript.refreshKeyGrabs = mock.Mock()
|
||||
testScript._shouldSuppressBrowseModeSound = mock.Mock(return_value=False)
|
||||
return testScript
|
||||
|
||||
def test_automatic_browse_mode_announcement_does_not_interrupt_focus_presentation(self):
|
||||
testScript = self._make_script(inFocusMode=True)
|
||||
soundManager = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.AXObject, "get_parent", return_value=None),
|
||||
mock.patch.object(web_script.AXUtilities, "is_list_box", return_value=False),
|
||||
mock.patch.object(web_script.AXUtilities, "is_menu", return_value=False),
|
||||
mock.patch.object(web_script.sound_theme_manager, "getManager", return_value=soundManager),
|
||||
):
|
||||
web_script.Script.togglePresentationMode(testScript, None, "document")
|
||||
|
||||
testScript.presentMessage.assert_called_once_with(messages.MODE_BROWSE)
|
||||
soundManager.playBrowseModeSound.assert_called_once_with()
|
||||
self.assertFalse(testScript._inFocusMode)
|
||||
self.assertFalse(testScript._focusModeIsSticky)
|
||||
self.assertFalse(testScript._browseModeIsSticky)
|
||||
testScript.refreshKeyGrabs.assert_called_once_with()
|
||||
|
||||
def test_automatic_focus_mode_announcement_does_not_interrupt_focus_presentation(self):
|
||||
testScript = self._make_script(inFocusMode=False)
|
||||
soundManager = mock.Mock()
|
||||
|
||||
with mock.patch.object(
|
||||
web_script.sound_theme_manager,
|
||||
"getManager",
|
||||
return_value=soundManager,
|
||||
):
|
||||
web_script.Script.togglePresentationMode(testScript, None, "document")
|
||||
|
||||
testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS)
|
||||
soundManager.playFocusModeSound.assert_called_once_with()
|
||||
self.assertTrue(testScript._inFocusMode)
|
||||
self.assertFalse(testScript._focusModeIsSticky)
|
||||
self.assertFalse(testScript._browseModeIsSticky)
|
||||
testScript.refreshKeyGrabs.assert_called_once_with()
|
||||
|
||||
def test_manual_presentation_mode_toggle_still_interrupts(self):
|
||||
testScript = self._make_script(inFocusMode=False)
|
||||
|
||||
with mock.patch.object(web_script.sound_theme_manager, "getManager", return_value=mock.Mock()):
|
||||
web_script.Script.togglePresentationMode(testScript, object(), "document")
|
||||
|
||||
testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS, interrupt=True)
|
||||
|
||||
|
||||
class WebFocusSpeechInterruptionRegressionTests(unittest.TestCase):
|
||||
def _make_script(self):
|
||||
testScript = web_script.Script.__new__(web_script.Script)
|
||||
testScript._navSuspended = False
|
||||
testScript._lastCommandWasCaretNav = False
|
||||
testScript._lastCommandWasStructNav = False
|
||||
testScript._lastCommandWasMouseButton = False
|
||||
testScript._inFocusMode = True
|
||||
testScript._focusModeIsSticky = True
|
||||
testScript._browseModeIsSticky = False
|
||||
testScript.flatReviewPresenter = mock.Mock()
|
||||
testScript.flatReviewPresenter.is_active.return_value = False
|
||||
testScript.utilities = mock.Mock()
|
||||
testScript.utilities.isZombie.return_value = False
|
||||
testScript.utilities.isDocument.return_value = False
|
||||
testScript.utilities.getTopLevelDocumentForObject.return_value = "document"
|
||||
testScript.utilities.inFindContainer.return_value = False
|
||||
testScript.utilities.queryNonEmptyText.return_value = None
|
||||
testScript.utilities.isContentEditableWithEmbeddedObjects.return_value = False
|
||||
testScript.utilities.isAnchor.return_value = False
|
||||
testScript.utilities.lastInputEventWasPageNav.return_value = False
|
||||
testScript.utilities.isFocusedWithMathChild.return_value = False
|
||||
testScript.utilities.caretMovedToSamePageFragment.return_value = False
|
||||
testScript.utilities.lastInputEventWasLineNav.return_value = False
|
||||
testScript.utilities.inDocumentContent.return_value = True
|
||||
testScript.utilities.shouldInterruptForLocusOfFocusChange.return_value = True
|
||||
testScript.speechGenerator = mock.Mock()
|
||||
testScript.speechGenerator.generateSpeech.return_value = ["focus speech"]
|
||||
testScript.updateBraille = mock.Mock()
|
||||
testScript.presentationInterrupt = mock.Mock()
|
||||
testScript._saveFocusedObjectInfo = mock.Mock()
|
||||
testScript.refreshKeyGrabs = mock.Mock()
|
||||
return testScript
|
||||
|
||||
def test_focus_generated_speech_uses_explicit_interrupt_then_appends(self):
|
||||
testScript = self._make_script()
|
||||
oldFocus = object()
|
||||
newFocus = object()
|
||||
event = mock.Mock(type="object:state-changed:focused", source=newFocus)
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.AXObject, "is_dead", return_value=False),
|
||||
mock.patch.object(web_script.AXUtilities, "is_unknown_or_redundant", return_value=False),
|
||||
mock.patch.object(web_script.AXUtilities, "is_heading", return_value=False),
|
||||
mock.patch.object(web_script.speech, "speak") as speak,
|
||||
mock.patch.object(web_script.cthulhu, "emitRegionChanged"),
|
||||
):
|
||||
result = web_script.Script.locus_of_focus_changed(
|
||||
testScript, event, oldFocus, newFocus
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
testScript.presentationInterrupt.assert_called_once_with()
|
||||
speak.assert_called_once_with(["focus speech"], interrupt=False)
|
||||
|
||||
|
||||
class WebDescriptionChangeRegressionTests(unittest.TestCase):
|
||||
def _make_script(self):
|
||||
testScript = web_script.Script.__new__(web_script.Script)
|
||||
testScript.pointOfReference = {}
|
||||
testScript.utilities = mock.Mock()
|
||||
testScript.utilities.eventIsBrowserUINoise.return_value = False
|
||||
testScript.utilities.inDocumentContent.return_value = True
|
||||
testScript.utilities.stringsAreRedundant.side_effect = (
|
||||
lambda first, second: first == second
|
||||
)
|
||||
testScript.presentMessage = mock.Mock()
|
||||
return testScript
|
||||
|
||||
def test_document_description_change_matching_name_is_ignored(self):
|
||||
testScript = self._make_script()
|
||||
source = object()
|
||||
event = mock.Mock(source=source, any_data="More")
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", source),
|
||||
mock.patch.object(web_script.AXObject, "get_name", return_value="More"),
|
||||
):
|
||||
result = web_script.Script.onDescriptionChanged(testScript, event)
|
||||
|
||||
self.assertTrue(result)
|
||||
testScript.presentMessage.assert_not_called()
|
||||
|
||||
def test_document_description_change_appends_to_focus_presentation(self):
|
||||
testScript = self._make_script()
|
||||
source = object()
|
||||
event = mock.Mock(source=source, any_data="Helpful tooltip")
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", source),
|
||||
mock.patch.object(web_script.AXObject, "get_name", return_value="Button"),
|
||||
):
|
||||
result = web_script.Script.onDescriptionChanged(testScript, event)
|
||||
|
||||
self.assertTrue(result)
|
||||
testScript.presentMessage.assert_called_once_with("Helpful tooltip")
|
||||
|
||||
|
||||
class WebDynamicContentRecoveryRegressionTests(unittest.TestCase):
|
||||
def _make_dynamic_script(self):
|
||||
testScript = web_script.Script.__new__(web_script.Script)
|
||||
testScript._loadingDocumentContent = False
|
||||
testScript._lastCommandWasCaretNav = False
|
||||
testScript._lastCommandWasStructNav = False
|
||||
testScript._lastMouseButtonContext = (None, -1)
|
||||
testScript.lastMouseRoutingTime = 0
|
||||
testScript.utilities = mock.Mock()
|
||||
testScript.utilities.eventIsBrowserUINoise.return_value = False
|
||||
testScript.utilities.isLiveRegion.return_value = False
|
||||
testScript.utilities.getTopLevelDocumentForObject.return_value = "document"
|
||||
testScript.utilities.isZombie.return_value = False
|
||||
testScript.utilities.handleEventFromContextReplicant.return_value = False
|
||||
testScript.utilities.handleEventForRemovedChild.return_value = False
|
||||
testScript.utilities.inDocumentContent.return_value = True
|
||||
return testScript
|
||||
|
||||
def test_children_added_to_live_focus_preserves_caret_context(self):
|
||||
testScript = self._make_dynamic_script()
|
||||
focus = object()
|
||||
event = mock.Mock(source=focus, any_data=object())
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", focus),
|
||||
mock.patch.object(web_script.AXObject, "is_dead", return_value=False),
|
||||
mock.patch.object(web_script.AXUtilities, "is_busy", return_value=False),
|
||||
mock.patch.object(web_script.AXUtilities, "is_alert", return_value=False),
|
||||
):
|
||||
result = web_script.Script.onChildrenAdded(testScript, event)
|
||||
|
||||
self.assertFalse(result)
|
||||
testScript.utilities.dumpCache.assert_called_once_with(
|
||||
"document",
|
||||
preserveContext=True,
|
||||
)
|
||||
|
||||
def test_children_removed_from_live_focus_preserves_caret_context(self):
|
||||
testScript = self._make_dynamic_script()
|
||||
focus = object()
|
||||
event = mock.Mock(source=focus, any_data=object())
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", focus),
|
||||
mock.patch.object(web_script.AXObject, "is_dead", return_value=False),
|
||||
):
|
||||
result = web_script.Script.onChildrenRemoved(testScript, event)
|
||||
|
||||
self.assertFalse(result)
|
||||
testScript.utilities.dumpCache.assert_called_once_with(
|
||||
"document",
|
||||
preserveContext=True,
|
||||
)
|
||||
|
||||
def test_focused_event_source_becomes_context_when_search_fails(self):
|
||||
testScript = self._make_dynamic_script()
|
||||
source = object()
|
||||
oldFocus = object()
|
||||
event = mock.Mock(detail1=1, source=source)
|
||||
testScript._lastCommandWasCaretNav = True
|
||||
testScript.utilities.getDocumentForObject.return_value = "document"
|
||||
testScript.utilities.isWebAppDescendant.return_value = False
|
||||
testScript.utilities.handleEventFromContextReplicant.return_value = False
|
||||
testScript.utilities.getCaretContext.return_value = (None, -1)
|
||||
testScript.utilities.searchForCaretContext.return_value = (None, -1)
|
||||
testScript.utilities.inFindContainer.return_value = False
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", oldFocus),
|
||||
mock.patch.object(web_script.AXUtilities, "is_editable", return_value=False),
|
||||
mock.patch.object(web_script.AXUtilities, "is_dialog_or_alert", return_value=False),
|
||||
mock.patch.object(web_script.AXUtilities, "is_focusable", return_value=True),
|
||||
mock.patch.object(web_script.AXUtilities, "is_focused", return_value=True),
|
||||
mock.patch.object(web_script.cthulhu, "setLocusOfFocus") as setLocusOfFocus,
|
||||
):
|
||||
result = web_script.Script.onFocusedChanged(testScript, event)
|
||||
|
||||
self.assertTrue(result)
|
||||
setLocusOfFocus.assert_called_once_with(event, source, False)
|
||||
testScript.utilities.setCaretContext.assert_called_once_with(source, 0)
|
||||
|
||||
|
||||
class WebContextReplicantRegressionTests(unittest.TestCase):
|
||||
def test_same_object_replicant_can_recover_before_old_focus_is_dead(self):
|
||||
utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities)
|
||||
document = object()
|
||||
parent = object()
|
||||
oldFocus = object()
|
||||
replicant = object()
|
||||
event = mock.Mock()
|
||||
utilities._caretContexts = {hash(parent): (oldFocus, 0)}
|
||||
utilities.getCaretContextPathRoleAndName = mock.Mock(
|
||||
return_value=([1, 2, 3], "button", "storm, microphone on")
|
||||
)
|
||||
utilities.documentFrame = mock.Mock(return_value=document)
|
||||
utilities.isSameObject = mock.Mock(return_value=True)
|
||||
utilities.setCaretContext = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script_utilities.cthulhu_state, "locusOfFocus", oldFocus),
|
||||
mock.patch.object(web_script_utilities.AXObject, "is_dead", return_value=False),
|
||||
mock.patch.object(web_script_utilities.AXObject, "get_path", return_value=[1, 2, 3]),
|
||||
mock.patch.object(web_script_utilities.AXObject, "get_role", return_value="button"),
|
||||
mock.patch.object(web_script_utilities.AXObject, "get_name", return_value="storm, microphone on"),
|
||||
mock.patch.object(web_script_utilities.AXObject, "get_parent", return_value=parent),
|
||||
mock.patch.object(web_script_utilities.cthulhu, "setLocusOfFocus") as setLocusOfFocus,
|
||||
):
|
||||
result = web_script_utilities.Utilities.handleEventFromContextReplicant(
|
||||
utilities,
|
||||
event,
|
||||
replicant,
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
setLocusOfFocus.assert_called_once_with(event, replicant, False)
|
||||
utilities.setCaretContext.assert_called_once_with(replicant, 0, document)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -164,7 +496,7 @@ class WebSelectionRegressionTests(unittest.TestCase):
|
||||
testScript.utilities.setCaretContext.assert_called_once_with(section, 1, document)
|
||||
setLocusOfFocus.assert_called_once_with(event, section, False, True)
|
||||
testScript.speakContents.assert_called_once_with([[section, 0, 1, "D"]])
|
||||
testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED, interrupt=False)
|
||||
testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED)
|
||||
self.assertEqual(
|
||||
testScript.pointOfReference["syntheticWebSelection"]["string"],
|
||||
"D",
|
||||
@@ -200,7 +532,7 @@ class WebSelectionRegressionTests(unittest.TestCase):
|
||||
testScript.utilities.setCaretContext.assert_called_once_with(link, 0, document)
|
||||
setLocusOfFocus.assert_called_once_with(event, link, False, True)
|
||||
testScript.speakContents.assert_called_once_with([[link, 0, 1, "S"]])
|
||||
testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED, interrupt=False)
|
||||
testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED)
|
||||
self.assertEqual(
|
||||
testScript.pointOfReference["syntheticWebSelection"]["string"],
|
||||
"S",
|
||||
|
||||
Reference in New Issue
Block a user