9 Commits

Author SHA1 Message Date
Storm Dragon 30e1667f77 Merge branch 'testing' 2026-05-24 03:47:25 -04:00
Storm Dragon edc195c46b Some web updates. 2026-05-24 03:47:15 -04:00
Storm Dragon cb553d3031 Improve Cthulhu Remote speech relay 2026-05-21 00:32:00 -04:00
ak47 b055933d6d Add Cthulhu Remote plugin 2026-05-20 22:36:17 -04:00
Storm Dragon 009938c495 Changed how speech interruptions are handled hopefully improved things being interrupted when they shouldn't be. 2026-05-19 15:26:44 -04:00
Storm Dragon a84aec94a9 Preserve web active window during transient Brave focus checks 2026-05-16 23:29:51 -04:00
lilmike 4145e9375b Fix some interruption issues in discord. 2026-05-15 13:46:34 -07:00
Storm Dragon 0acba6a733 Backported Chrome omnibox fix from Orca. 2026-05-15 15:48:52 -04:00
Storm Dragon b224d699c0 Fixed status announcements interrupting page information for the web navigation. 2026-05-15 01:54:17 -04:00
27 changed files with 1998 additions and 32 deletions
@@ -2,7 +2,7 @@
pkgname=cthulhu-git pkgname=cthulhu-git
_pkgname=cthulhu _pkgname=cthulhu
pkgver=2026.05.06.r394.ga5f7c9a pkgver=2026.05.14.r396.ge2f9a7c
pkgrel=1 pkgrel=1
pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca" pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca"
url="https://git.stormux.org/storm/cthulhu" url="https://git.stormux.org/storm/cthulhu"
+30 -2
View File
@@ -148,14 +148,42 @@ class AXObject:
if not toolkit_name.startswith("qt"): if not toolkit_name.startswith("qt"):
return False return False
if not AXObject._can_reach_application(obj):
tokens = ["AXObject:", obj, "has broken ancestry. See qt bug 130116."]
debug.print_tokens(debug.LEVEL_INFO, tokens, True)
return True
return False
@staticmethod
def _can_reach_application(obj: Atspi.Accessible) -> bool:
"""Returns True if obj's ancestry reaches the application."""
reached_app = False reached_app = False
parent = AXObject.get_parent(obj) parent = AXObject.get_parent(obj)
while parent and not reached_app: while parent and not reached_app:
reached_app = AXObject.get_role(parent) == Atspi.Role.APPLICATION reached_app = AXObject.get_role(parent) == Atspi.Role.APPLICATION
parent = AXObject.get_parent(parent) parent = AXObject.get_parent(parent)
if not reached_app: return reached_app
tokens = ["AXObject:", obj, "has broken ancestry. See qt bug 130116."]
@staticmethod
def has_broken_popup_ancestry(obj: Atspi.Accessible) -> bool:
"""Returns True if obj is a popup item whose ancestry is broken."""
if obj is None or AXObject.is_dead(obj):
return False
# Chromium Omnibox popups can lose their path back to the frame after
# the popup is closed and reopened.
if not AXObject.get_toolkit_name(obj).startswith("chromium"):
return False
if AXObject.get_role(obj) != Atspi.Role.LIST_ITEM:
return False
if not AXObject._can_reach_application(obj):
tokens = ["AXObject:", obj, "has broken popup ancestry."]
debug.print_tokens(debug.LEVEL_INFO, tokens, True) debug.print_tokens(debug.LEVEL_INFO, tokens, True)
return True return True
+7
View File
@@ -348,6 +348,13 @@ class FocusManager:
tokens.extend(["in", app]) tokens.extend(["in", app])
_log_tokens(tokens) _log_tokens(tokens)
if frame is None and AXObject.has_broken_popup_ancestry(self._focus):
_log_tokens(
["Not clearing active window; focus", self._focus, "is in popup with broken ancestry"],
"broken-popup-ancestry",
)
return
if frame == self._window: if frame == self._window:
_log("Setting active window to existing active window", "no-change") _log("Setting active window to existing active window", "no-change")
elif frame is None: elif frame is None:
+206
View File
@@ -0,0 +1,206 @@
# Cthulhu Remote
Cthulhu Remote is an NVDA Remote-style assistive technology relay plugin. It is
intended for remote help between screen reader users: one user can control
another user's desktop through keyboard input and receive the remote screen
reader's speech feedback.
This is not graphical screen sharing. It does not forward a framebuffer, window
image, VNC/RDP session, or screenshots. The useful channel is screen reader
output and remote input.
## Current Status
### Working
- Plugin installs and loads as `CthulhuRemote`.
- The plugin is registered as a D-Bus module when loaded.
- The transport connects outbound to NVDA Remote-compatible relay servers over
TLS.
- The default relay port is `6837`.
- Protocol version `2` is used.
- `cthulhuremote://` URLs are parsed.
- `nvdaremote://` URLs are accepted and rewritten to `cthulhuremote://`.
- `master` and `slave` connection modes are represented.
- Random connection keys can be generated.
- Invite URLs can be copied for the opposite role.
- The local clipboard can be pushed to connected peers.
- Incoming remote speech is spoken through Cthulhu.
- Incoming remote speech is suppressed from speech monitor callbacks to avoid
echo loops.
- In `slave` mode, local Cthulhu speech is forwarded to the relay as `speak`
messages.
- In `slave` mode, incoming remote key messages are mapped from common Windows
virtual-key codes or keysyms to AT-SPI key events.
- Disconnect and mute gestures are registered:
- `cthulhu+alt+page_down`: disconnect
- `cthulhu+alt+delete`: mute or unmute incoming remote output
- `cthulhu+control+shift+c`: push clipboard text
### Partially Implemented
- NVDA Remote compatibility is intentional at the relay-message level, but
cross-client interoperability with NVDA has not been verified.
- Remote key injection exists on the controlled/slave side, but master-side key
capture and forwarding is not implemented yet.
- Braille message types exist, but braille routing is not implemented.
- TLS certificate verification is enabled by default, and an insecure mode
exists, but there is no trust-on-first-use certificate workflow.
- Connection state is exposed, but there is no complete user-facing connection
dialog.
### Not Implemented
- Graphical screen sharing.
- Screenshot forwarding.
- VNC/RDP integration.
- Master-side keyboard forwarding.
- Remote braille display output.
- Remote braille input routing.
- Tone, wave, and other non-speech remote audio events.
- Ping/keepalive handling.
- Relay error presentation beyond basic logging/messages.
- Preferences UI for host, port, key, mode, certificate trust, and invite
management.
## Intended User Model
The typical assistive remote-support flow is:
1. The person needing help runs Cthulhu and connects as `slave`.
2. The helper runs Cthulhu and connects as `master`.
3. The helper sends keyboard commands through the relay.
4. The controlled desktop receives those keys locally.
5. The controlled machine's Cthulhu speech is sent back to the helper.
Examples this is meant to support:
- Teaching someone how to use a program.
- Helping with a stuck dialog or inaccessible workflow.
- Typing into the remote user's editor or terminal.
- Navigating menus and controls with speech feedback.
Things it does not solve by itself:
- Visual captchas that require seeing an image.
- Visual inspection of a remote screen.
- Mouse-driven visual troubleshooting without another channel.
## Implementation Checklist
### Core Protocol
- [x] Define protocol constants and message types.
- [x] Serialize and parse newline-delimited JSON messages.
- [x] Connect to relay servers over TLS.
- [x] Send `protocol_version` and `join` messages after connecting.
- [x] Dispatch inbound messages on the GLib main loop.
- [ ] Add ping/keepalive handling.
- [ ] Present relay `error` and `nvda_not_connected` messages clearly.
- [ ] Verify exact payload compatibility with current NVDA Remote clients.
### Connection Management
- [x] Parse `cthulhuremote://` URLs.
- [x] Accept `nvdaremote://` URLs.
- [x] Support host, port, key, mode, and insecure TLS fields.
- [x] Track connection states.
- [x] Expose D-Bus commands for connect, disconnect, state, key generation, and
invite URL copying.
- [ ] Add an accessible GTK connection dialog.
- [ ] Add saved/recent relay configuration if desired.
- [ ] Add trust-on-first-use certificate handling or a clear certificate trust
workflow.
- [ ] Add better reconnect/backoff status reporting.
### Speech
- [x] Speak incoming remote `speak` messages locally.
- [x] Support muting incoming remote speech.
- [x] Prevent inbound remote speech from being echoed back to the relay.
- [x] Forward local speech to the relay in `slave` mode.
- [ ] Decide whether master mode should ever forward local speech.
- [ ] Preserve richer speech sequence details if needed instead of flattening to
plain text.
- [ ] Verify behavior with speech interruption and cancellation across two live
clients.
### Keyboard Control
- [x] Receive remote key messages in `slave` mode.
- [x] Map common Windows virtual-key codes to Linux keysyms.
- [x] Inject remote key presses/releases with AT-SPI.
- [ ] Capture local keyboard events in `master` mode.
- [ ] Forward master key events as remote `key` messages.
- [ ] Prevent forwarded keys from also acting on the master's local desktop,
unless intentionally passed through.
- [ ] Preserve modifier press/release ordering.
- [ ] Verify Xorg behavior.
- [ ] Verify Wayland behavior without weakening Xorg support.
- [ ] Add tests for key payload generation and modifier handling.
### Clipboard
- [x] Push local clipboard text to connected peers.
- [x] Receive remote clipboard text into the local clipboard.
- [ ] Add a command or dialog control for pull/request clipboard if protocol
support is available.
- [ ] Decide how to handle large clipboard payloads.
- [ ] Add tests for empty, plain text, and multiline clipboard text.
### Braille
- [ ] Route incoming remote `display` messages to Cthulhu braille output.
- [ ] Implement `set_display_size`.
- [ ] Implement `set_braille_info` if required for compatibility.
- [ ] Route local braille input as remote `braille_input` messages when acting
as master.
- [ ] Verify routing keys and cursor-routing behavior.
- [ ] Add tests around braille payload parsing and ignored/unsupported fields.
### Audio And Miscellaneous Messages
- [ ] Implement or intentionally ignore `tone`.
- [ ] Implement or intentionally ignore `wave`.
- [ ] Decide whether `index` is relevant to Cthulhu speech.
- [ ] Handle `send_SAS` with a clear Linux-specific message.
- [ ] Log unsupported message types at debug level without spamming users.
### User Interface
- [x] Register basic gestures for disconnect, mute, and clipboard push.
- [ ] Add connect/disconnect controls to plugin preferences.
- [ ] Add host, port, key, mode, and insecure TLS fields.
- [ ] Add generate-key and copy-invite controls.
- [ ] Add connection status text suitable for screen reader users.
- [ ] Ensure Tab and Shift+Tab navigate the entire dialog.
- [ ] Associate GTK labels with their controls.
### Tests And Verification
- [x] Test URL parsing.
- [x] Test serializer message type values.
- [x] Test common key mapping.
- [x] Test plugin loading through the plugin manager.
- [x] Test additive speech monitor callbacks.
- [x] Test remote speech echo suppression.
- [x] Test slave-mode local speech forwarding.
- [x] Test that master mode does not forward local speech.
- [ ] Add transport tests with a fake relay socket.
- [ ] Add connection-state transition tests.
- [ ] Add D-Bus introspection or command exposure tests for the plugin module.
- [ ] Test against a live NVDA Remote-compatible relay.
- [ ] Test Cthulhu-to-Cthulhu master/slave operation.
- [ ] Test NVDA master to Cthulhu slave.
- [ ] Test Cthulhu master to NVDA slave after master key forwarding exists.
## Suggested Next Steps
1. Add an accessible connection dialog so the plugin can be used without manual
D-Bus calls.
2. Implement ping/error handling to improve relay behavior and diagnostics.
3. Design master-side key forwarding carefully around Cthulhu's existing input
event manager.
4. Add fake-relay tests before broad live testing.
5. Perform live two-client testing and record exact compatibility gaps.
@@ -0,0 +1 @@
"""Cthulhu Remote plugin package."""
@@ -0,0 +1,78 @@
"""Connection metadata and URL parsing for Cthulhu Remote."""
from dataclasses import dataclass
from enum import Enum
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from .protocol import SERVER_PORT, URL_PREFIX
from .socket_utils import host_port_to_address
class URLParsingError(Exception):
"""Raised when a remote connection URL is incomplete or invalid."""
class ConnectionMode(Enum):
"""Remote session role."""
MASTER = "master"
SLAVE = "slave"
class ConnectionState(Enum):
"""Remote session connection state."""
CONNECTED = "connected"
CONNECTING = "connecting"
DISCONNECTED = "disconnected"
DISCONNECTING = "disconnecting"
@dataclass
class ConnectionInfo:
"""Relay connection settings."""
hostname: str
mode: ConnectionMode
key: str
port: int = SERVER_PORT
insecure: bool = False
def __post_init__(self) -> None:
self.port = self.port or SERVER_PORT
self.mode = ConnectionMode(self.mode)
@classmethod
def from_url(cls, url: str) -> "ConnectionInfo":
parsedUrl = urlparse(url)
parsedQuery = parse_qs(parsedUrl.query)
hostname = parsedUrl.hostname
port = parsedUrl.port or SERVER_PORT
key = parsedQuery.get("key", [""])[0]
mode = parsedQuery.get("mode", [""])[0].lower()
insecure = parsedQuery.get("insecure", ["false"])[0].lower() == "true"
if not hostname:
raise URLParsingError("No hostname provided")
if not key:
raise URLParsingError("No key provided")
if not mode:
raise URLParsingError("No mode provided")
try:
ConnectionMode(mode)
except ValueError as error:
raise URLParsingError(f"Invalid mode provided: {mode!r}") from error
return cls(hostname=hostname, mode=ConnectionMode(mode), key=key, port=port, insecure=insecure)
def get_address(self) -> str:
return host_port_to_address((self.hostname, self.port))
def get_url(self, mode: ConnectionMode | None = None) -> str:
mode = mode or self.mode
query = {"key": self.key, "mode": mode.value}
if self.insecure:
query["insecure"] = "true"
return urlunparse((URL_PREFIX.split("://", 1)[0], self.get_address(), "", "", urlencode(query), ""))
def get_url_to_connect(self) -> str:
mode = ConnectionMode.SLAVE if self.mode == ConnectionMode.MASTER else ConnectionMode.MASTER
return self.get_url(mode)
@@ -0,0 +1,157 @@
"""Linux desktop integration for Cthulhu Remote commands."""
from __future__ import annotations
import logging
from typing import Any
import gi
gi.require_version("Atspi", "2.0")
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
from gi.repository import Atspi, Gdk, GLib, Gtk
from cthulhu import speech
logger = logging.getLogger(__name__)
WINDOWS_VK_TO_ATSPI_KEYSYM = {
0x08: "BackSpace",
0x09: "Tab",
0x0D: "Return",
0x10: "Shift_L",
0x11: "Control_L",
0x12: "Alt_L",
0x13: "Pause",
0x14: "Caps_Lock",
0x1B: "Escape",
0x20: "space",
0x21: "Page_Up",
0x22: "Page_Down",
0x23: "End",
0x24: "Home",
0x25: "Left",
0x26: "Up",
0x27: "Right",
0x28: "Down",
0x2D: "Insert",
0x2E: "Delete",
0x5B: "Super_L",
0x5C: "Super_R",
0x60: "KP_0",
0x61: "KP_1",
0x62: "KP_2",
0x63: "KP_3",
0x64: "KP_4",
0x65: "KP_5",
0x66: "KP_6",
0x67: "KP_7",
0x68: "KP_8",
0x69: "KP_9",
0x6A: "KP_Multiply",
0x6B: "KP_Add",
0x6D: "KP_Subtract",
0x6E: "KP_Decimal",
0x6F: "KP_Divide",
}
for _digit in range(0x30, 0x3A):
WINDOWS_VK_TO_ATSPI_KEYSYM[_digit] = chr(_digit)
for _letter in range(0x41, 0x5B):
WINDOWS_VK_TO_ATSPI_KEYSYM[_letter] = chr(_letter).lower()
for _functionKey in range(1, 25):
WINDOWS_VK_TO_ATSPI_KEYSYM[0x6F + _functionKey] = f"F{_functionKey}"
class LocalMachine:
"""Apply incoming remote commands to the local Linux desktop."""
def __init__(self, presenter) -> None:
self._presenter = presenter
self.isMuted = False
def speak(self, sequence: Any = None, **kwargs: Any) -> None:
if self.isMuted:
return
text = self._speech_sequence_to_text(sequence)
if text:
with speech.suppress_monitor_callbacks():
speech.speak(text, interrupt=False)
def cancel_speech(self, **kwargs: Any) -> None:
if not self.isMuted:
speech.stop()
def pause_speech(self, switch: bool = False, **kwargs: Any) -> None:
if switch:
speech.stop()
def set_clipboard_text(self, text: str = "", **kwargs: Any) -> None:
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text, -1)
clipboard.store()
self._presenter("Remote clipboard received")
def send_key(
self,
vk_code: int | None = None,
keysym: str | None = None,
pressed: bool | None = True,
**kwargs: Any,
) -> None:
keyval = self._resolve_keyval(vk_code=vk_code, keysym=keysym)
if keyval is None:
logger.debug("Ignoring unmapped remote key: vk_code=%r keysym=%r", vk_code, keysym)
return
eventType = Atspi.KeySynthType.PRESS if pressed else Atspi.KeySynthType.RELEASE
GLib.idle_add(self._generate_key_event, keyval, eventType)
def display(self, cells: list[int] | None = None, **kwargs: Any) -> None:
logger.debug("Remote braille display update ignored until braille routing is implemented: %r", cells)
def braille_input(self, **kwargs: Any) -> None:
logger.debug("Remote braille input ignored until braille routing is implemented: %r", kwargs)
def set_braille_display_size(self, **kwargs: Any) -> None:
logger.debug("Remote braille size negotiation ignored until braille routing is implemented: %r", kwargs)
def send_secure_attention_sequence(self, **kwargs: Any) -> None:
self._presenter("Ctrl Alt Delete is not available on Linux desktops")
def _generate_key_event(self, keyval: int, eventType: Atspi.KeySynthType) -> bool:
try:
Atspi.generate_keyboard_event(keyval, None, eventType)
except Exception:
logger.exception("Failed to generate remote key event")
return False
def _resolve_keyval(self, vk_code: int | None, keysym: str | None) -> int | None:
if keysym:
keyval = Gdk.keyval_from_name(keysym)
return keyval if keyval else None
if vk_code is None:
return None
mappedKeysym = WINDOWS_VK_TO_ATSPI_KEYSYM.get(int(vk_code))
if not mappedKeysym:
return None
keyval = Gdk.keyval_from_name(mappedKeysym)
return keyval if keyval else None
def _speech_sequence_to_text(self, sequence: Any) -> str:
if sequence is None:
return ""
if isinstance(sequence, str):
return sequence
if not isinstance(sequence, list):
return str(sequence)
parts = []
for item in sequence:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, list):
nested = self._speech_sequence_to_text(item)
if nested:
parts.append(nested)
return " ".join(parts)
@@ -0,0 +1,20 @@
cthulhu_remote_python_sources = files([
'__init__.py',
'connection_info.py',
'local_machine.py',
'plugin.py',
'protocol.py',
'serializer.py',
'socket_utils.py',
'transport.py'
])
python3.install_sources(
cthulhu_remote_python_sources,
subdir: 'cthulhu/plugins/CthulhuRemote'
)
install_data(
'plugin.info',
install_dir: python3.get_install_dir() / 'cthulhu' / 'plugins' / 'CthulhuRemote'
)
@@ -0,0 +1,7 @@
[Plugin]
Name = Cthulhu Remote
Module = CthulhuRemote
Description = Remote access plugin compatible with NVDA Remote relay servers
Authors = Storm Dragon <storm_dragon@stormux.org>
Version = 0.1
Category = Utilities
+327
View File
@@ -0,0 +1,327 @@
#!/usr/bin/env python3
#
# Copyright (c) 2026 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
"""Cthulhu Remote plugin."""
from __future__ import annotations
import logging
import secrets
from typing import Any
import gi
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
from gi.repository import Gdk, Gtk
from cthulhu import dbus_service
from cthulhu import speech
from cthulhu.plugin import Plugin, cthulhu_hookimpl
from cthulhu.plugins.CthulhuRemote.connection_info import (
ConnectionInfo,
ConnectionMode,
ConnectionState,
URLParsingError,
)
from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine
from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType, SERVER_PORT
from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer
from cthulhu.plugins.CthulhuRemote.socket_utils import address_to_host_port
from cthulhu.plugins.CthulhuRemote.transport import RelayTransport
logger = logging.getLogger(__name__)
class CthulhuRemote(Plugin):
"""Remote access plugin compatible with NVDA Remote relay servers."""
def __init__(self) -> None:
super().__init__()
self._transport: RelayTransport | None = None
self._connectionInfo: ConnectionInfo | None = None
self._connectionState = ConnectionState.DISCONNECTED
self._localMachine = LocalMachine(self._present_message)
self._muted = False
self._speechMonitorRegistered = False
@cthulhu_hookimpl
def activate(self, plugin=None):
if plugin is not None and plugin is not self:
return
self.registerGestureByString(
self.disconnect,
"Disconnect Cthulhu Remote",
"kb:cthulhu+alt+page_down",
)
self.registerGestureByString(
self.toggle_mute,
"Toggle Cthulhu Remote mute",
"kb:cthulhu+alt+delete",
)
self.registerGestureByString(
self.push_clipboard,
"Push clipboard to Cthulhu Remote peers",
"kb:cthulhu+control+shift+c",
)
logger.info("Cthulhu Remote activated")
@cthulhu_hookimpl
def deactivate(self, plugin=None):
if plugin is not None and plugin is not self:
return
self.disconnect(notify_user=False)
logger.info("Cthulhu Remote deactivated")
@dbus_service.parameterized_command
def connect(
self,
host: str,
key: str,
mode: str = "master",
port: int = SERVER_PORT,
insecure: bool = False,
notify_user: bool = True,
) -> bool:
"""Connect to a Cthulhu Remote relay server."""
if not host or not key:
if notify_user:
self._present_message("Cthulhu Remote requires host and key")
return False
try:
connectionInfo = ConnectionInfo(
hostname=host,
port=port,
key=key,
mode=ConnectionMode(mode.lower()),
insecure=insecure,
)
except ValueError:
if notify_user:
self._present_message("Cthulhu Remote mode must be master or slave")
return False
return self._connect(connectionInfo, notify_user=notify_user)
@dbus_service.parameterized_command
def connect_to_address(
self,
address: str,
key: str,
mode: str = "master",
insecure: bool = False,
notify_user: bool = True,
) -> bool:
"""Connect using host[:port] address text."""
host, port = address_to_host_port(address)
return self.connect(host, key, mode=mode, port=port, insecure=insecure, notify_user=notify_user)
@dbus_service.parameterized_command
def connect_to_url(self, url: str, notify_user: bool = True) -> bool:
"""Connect using a cthulhuremote:// or nvdaremote:// URL."""
if url.startswith("nvdaremote://"):
url = "cthulhuremote://" + url.split("://", 1)[1]
try:
connectionInfo = ConnectionInfo.from_url(url)
except URLParsingError as error:
logger.warning("Invalid Cthulhu Remote URL: %s", error)
if notify_user:
self._present_message("Invalid Cthulhu Remote URL")
return False
return self._connect(connectionInfo, notify_user=notify_user)
@dbus_service.command
def disconnect(self, script=None, inputEvent=None, notify_user: bool = True) -> bool:
"""Disconnect the current Cthulhu Remote session."""
if self._transport is not None:
self._connectionState = ConnectionState.DISCONNECTING
self._transport.close()
self._transport = None
self._deregister_speech_monitor()
self._connectionState = ConnectionState.DISCONNECTED
if notify_user:
self._present_message("Cthulhu Remote disconnected")
return True
@dbus_service.command
def toggle_mute(self, script=None, inputEvent=None) -> bool:
"""Mute or unmute incoming remote output."""
self._muted = not self._muted
self._localMachine.isMuted = self._muted
self._present_message("Cthulhu Remote muted" if self._muted else "Cthulhu Remote unmuted")
return True
@dbus_service.command
def push_clipboard(self, script=None, inputEvent=None) -> bool:
"""Push local clipboard text to connected remote peers."""
if not self._transport or not self._transport.connected:
self._present_message("Cthulhu Remote is not connected")
return True
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
text = clipboard.wait_for_text()
if not text:
self._present_message("Clipboard does not contain text")
return True
self._transport.send(RemoteMessageType.set_clipboard_text, text=text)
self._present_message("Clipboard pushed")
return True
@dbus_service.command
def copy_invite_url(self, script=None, inputEvent=None) -> bool:
"""Copy a remote invite URL for the opposite connection role."""
if not self._connectionInfo:
self._present_message("Cthulhu Remote is not connected")
return True
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(self._connectionInfo.get_url_to_connect(), -1)
clipboard.store()
self._present_message("Cthulhu Remote invite copied")
return True
@dbus_service.command
def generate_key(self) -> str:
"""Generate a random remote connection key."""
return secrets.token_urlsafe(18)
@dbus_service.getter
def get_state(self) -> str:
"""Return the Cthulhu Remote connection state."""
return self._connectionState.value
@dbus_service.getter
def get_connection_url(self) -> str:
"""Return the current connection URL, if connected."""
if not self._connectionInfo:
return ""
return self._connectionInfo.get_url()
def _connect(self, connectionInfo: ConnectionInfo, notify_user: bool = True) -> bool:
self.disconnect(notify_user=False)
serializer = JSONSerializer()
transport = RelayTransport.create(connectionInfo, serializer)
self._register_transport_handlers(transport, connectionInfo.mode)
transport.transportConnected.register(self._handle_transport_connected)
transport.transportDisconnected.register(self._handle_transport_disconnected)
transport.transportConnectionFailed.register(self._handle_transport_failed)
transport.transportCertificateAuthenticationFailed.register(self._handle_certificate_failed)
self._transport = transport
self._connectionInfo = connectionInfo
self._connectionState = ConnectionState.CONNECTING
self._register_speech_monitor(connectionInfo.mode)
transport.start()
if notify_user:
self._present_message("Cthulhu Remote connecting")
return True
def _register_speech_monitor(self, mode: ConnectionMode) -> None:
if mode != ConnectionMode.SLAVE or self._speechMonitorRegistered:
return
speech.add_monitor_callback(self._send_local_speech)
self._speechMonitorRegistered = True
def _deregister_speech_monitor(self) -> None:
if not self._speechMonitorRegistered:
return
speech.remove_monitor_callback(self._send_local_speech)
self._speechMonitorRegistered = False
def _send_local_speech(self, text: str) -> None:
if not text or not text.strip():
return
if not self._connectionInfo or self._connectionInfo.mode != ConnectionMode.SLAVE:
return
if not self._transport or not self._transport.connected:
return
self._transport.send(RemoteMessageType.speak, sequence=[text])
def _register_transport_handlers(self, transport: RelayTransport, mode: ConnectionMode) -> None:
transport.register_inbound(RemoteMessageType.speak, self._localMachine.speak)
transport.register_inbound(RemoteMessageType.cancel, self._localMachine.cancel_speech)
transport.register_inbound(RemoteMessageType.pause_speech, self._localMachine.pause_speech)
transport.register_inbound(RemoteMessageType.set_clipboard_text, self._localMachine.set_clipboard_text)
transport.register_inbound(RemoteMessageType.motd, self._handle_motd)
transport.register_inbound(RemoteMessageType.version_mismatch, self._handle_version_mismatch)
transport.register_inbound(RemoteMessageType.channel_joined, self._handle_channel_joined)
transport.register_inbound(RemoteMessageType.client_joined, self._handle_client_joined)
transport.register_inbound(RemoteMessageType.client_left, self._handle_client_left)
if mode == ConnectionMode.SLAVE:
transport.register_inbound(RemoteMessageType.key, self._localMachine.send_key)
transport.register_inbound(RemoteMessageType.braille_input, self._localMachine.braille_input)
transport.register_inbound(
RemoteMessageType.send_SAS,
self._localMachine.send_secure_attention_sequence,
)
else:
transport.register_inbound(RemoteMessageType.display, self._localMachine.display)
transport.register_inbound(
RemoteMessageType.set_display_size,
self._localMachine.set_braille_display_size,
)
def _handle_transport_connected(self, **kwargs: Any) -> None:
self._connectionState = ConnectionState.CONNECTED
self._present_message("Cthulhu Remote connected")
def _handle_transport_disconnected(self, **kwargs: Any) -> None:
if self._connectionState != ConnectionState.DISCONNECTING:
self._connectionState = ConnectionState.DISCONNECTED
self._present_message("Cthulhu Remote disconnected")
def _handle_transport_failed(self, **kwargs: Any) -> None:
self._connectionState = ConnectionState.DISCONNECTED
self._present_message("Cthulhu Remote connection failed")
def _handle_certificate_failed(self, **kwargs: Any) -> None:
self._connectionState = ConnectionState.DISCONNECTED
fingerprint = self._transport.lastFailFingerprint if self._transport else None
if fingerprint:
logger.warning("Cthulhu Remote certificate fingerprint: %s", fingerprint)
self._present_message("Cthulhu Remote certificate verification failed")
def _handle_motd(self, motd: str = "", **kwargs: Any) -> None:
if motd:
self._present_message(motd)
def _handle_version_mismatch(self, **kwargs: Any) -> None:
self._present_message("Cthulhu Remote relay protocol mismatch")
self.disconnect(notify_user=False)
def _handle_channel_joined(self, **kwargs: Any) -> None:
logger.info("Cthulhu Remote channel joined: %r", kwargs)
def _handle_client_joined(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None:
logger.info("Cthulhu Remote client joined: %r", client or kwargs)
self._present_message("Cthulhu Remote client joined")
def _handle_client_left(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None:
logger.info("Cthulhu Remote client left: %r", client or kwargs)
self._present_message("Cthulhu Remote client left")
def _present_message(self, message: str) -> None:
if not self.app:
return
try:
state = self.app.getDynamicApiManager().getAPI("CthulhuState")
if state and state.activeScript:
state.activeScript.presentMessage(message, resetStyles=False)
except Exception:
logger.exception("Failed to present Cthulhu Remote message")
@@ -0,0 +1,41 @@
"""Protocol constants for Cthulhu Remote.
The wire protocol is intentionally compatible with NVDA Remote protocol
version 2 so relay servers can be shared.
"""
from enum import Enum
PROTOCOL_VERSION = 2
SERVER_PORT = 6837
URL_PREFIX = "cthulhuremote://"
class RemoteMessageType(Enum):
"""Message types used by the remote relay protocol."""
protocol_version = "protocol_version"
join = "join"
channel_joined = "channel_joined"
client_joined = "client_joined"
client_left = "client_left"
generate_key = "generate_key"
key = "key"
speak = "speak"
cancel = "cancel"
pause_speech = "pause_speech"
tone = "tone"
wave = "wave"
send_SAS = "send_SAS"
index = "index"
display = "display"
braille_input = "braille_input"
set_braille_info = "set_braille_info"
set_display_size = "set_display_size"
set_clipboard_text = "set_clipboard_text"
motd = "motd"
version_mismatch = "version_mismatch"
ping = "ping"
error = "error"
nvda_not_connected = "nvda_not_connected"
@@ -0,0 +1,20 @@
"""JSON message serialization for Cthulhu Remote."""
import json
from enum import Enum
from typing import Any
class JSONSerializer:
"""Serialize newline-delimited JSON remote messages."""
separator = b"\n"
def serialize(self, messageType=None, **payload: Any) -> bytes:
if isinstance(messageType, Enum):
messageType = messageType.value
payload["type"] = messageType
return json.dumps(payload).encode("utf-8") + self.separator
def deserialize(self, data: bytes) -> dict[str, Any]:
return json.loads(data.decode("utf-8"))
@@ -0,0 +1,23 @@
"""Socket address helpers for Cthulhu Remote."""
import urllib.parse
from .protocol import SERVER_PORT
def address_to_host_port(address: str) -> tuple[str, int]:
"""Convert host[:port] text to a host, port tuple."""
parsedAddress = urllib.parse.urlparse("//" + address)
return parsedAddress.hostname or "", parsedAddress.port or SERVER_PORT
def host_port_to_address(hostPort: tuple[str, int]) -> str:
"""Convert a host, port tuple to compact host[:port] text."""
host, port = hostPort
if ":" in host:
host = f"[{host}]"
if port != SERVER_PORT:
return f"{host}:{port}"
return host
@@ -0,0 +1,269 @@
"""Threaded TLS transport for Cthulhu Remote relay connections."""
from __future__ import annotations
import hashlib
import logging
import queue
import select
import socket
import ssl
import threading
import time
from collections.abc import Callable
from enum import Enum
from typing import Any
from gi.repository import GLib
from .connection_info import ConnectionInfo
from .protocol import PROTOCOL_VERSION, RemoteMessageType
from .serializer import JSONSerializer
from .socket_utils import host_port_to_address
logger = logging.getLogger(__name__)
class Action:
"""Small callback registrar used by the transport layer."""
def __init__(self) -> None:
self._handlers: list[Callable[..., Any]] = []
self._lock = threading.Lock()
def register(self, handler: Callable[..., Any]) -> None:
with self._lock:
if handler not in self._handlers:
self._handlers.append(handler)
def unregister(self, handler: Callable[..., Any]) -> None:
with self._lock:
if handler in self._handlers:
self._handlers.remove(handler)
def notify(self, **kwargs: Any) -> None:
with self._lock:
handlers = list(self._handlers)
for handler in handlers:
handler(**kwargs)
class RelayTransport:
"""Connect to an NVDA Remote compatible relay server over TLS."""
def __init__(
self,
serializer: JSONSerializer,
address: tuple[str, int],
channel: str,
connectionType: str,
insecure: bool = False,
timeout: int = 0,
protocolVersion: int = PROTOCOL_VERSION,
) -> None:
self.serializer = serializer
self.address = address
self.channel = channel
self.connectionType = connectionType
self.insecure = insecure
self.timeout = timeout
self.protocolVersion = protocolVersion
self.connected = False
self.closed = True
self.successfulConnects = 0
self.lastFailFingerprint: str | None = None
self._buffer = b""
self._socket: ssl.SSLSocket | None = None
self._socketLock = threading.Lock()
self._sendQueue: queue.Queue[bytes | None] = queue.Queue()
self._sendThread: threading.Thread | None = None
self._connectorThread: threading.Thread | None = None
self._running = threading.Event()
self._handlers: dict[RemoteMessageType, Action] = {}
self.transportConnected = Action()
self.transportDisconnected = Action()
self.transportCertificateAuthenticationFailed = Action()
self.transportConnectionFailed = Action()
self.transportClosing = Action()
self.transportConnected.register(self._send_join_messages)
@classmethod
def create(cls, connectionInfo: ConnectionInfo, serializer: JSONSerializer) -> "RelayTransport":
return cls(
serializer=serializer,
address=(connectionInfo.hostname, connectionInfo.port),
channel=connectionInfo.key,
connectionType=connectionInfo.mode.value,
insecure=connectionInfo.insecure,
)
def start(self) -> None:
if self._connectorThread and self._connectorThread.is_alive():
return
self.closed = False
self._running.set()
self._connectorThread = threading.Thread(
target=self._connector_loop,
name="CthulhuRemoteConnector",
daemon=True,
)
self._connectorThread.start()
def close(self) -> None:
self.transportClosing.notify()
self.closed = True
self._running.clear()
self._disconnect()
def register_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None:
self._handlers.setdefault(messageType, Action()).register(handler)
def unregister_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None:
if messageType in self._handlers:
self._handlers[messageType].unregister(handler)
def send(self, messageType: str | Enum, **payload: Any) -> None:
if not self.connected:
logger.debug("Dropping remote message while disconnected: %r", messageType)
return
self._sendQueue.put(self.serializer.serialize(messageType=messageType, **payload))
def _connector_loop(self) -> None:
while self._running.is_set():
try:
self._run_once()
except ssl.SSLCertVerificationError:
logger.warning("Cthulhu Remote certificate verification failed")
except OSError as error:
logger.info("Cthulhu Remote connection failed: %s", error)
except Exception:
logger.exception("Cthulhu Remote connector failed")
if self._running.is_set():
time.sleep(5)
def _run_once(self) -> None:
try:
self._socket = self._create_socket()
self._socket.connect(self.address)
except ssl.SSLCertVerificationError:
self._capture_failed_fingerprint()
self.transportCertificateAuthenticationFailed.notify()
raise
except Exception:
self.transportConnectionFailed.notify()
raise
self.connected = True
self.successfulConnects += 1
self.transportConnected.notify()
self._sendThread = threading.Thread(target=self._send_loop, name="CthulhuRemoteSend", daemon=True)
self._sendThread.start()
try:
while self._running.is_set() and self._socket is not None:
readers, _, errors = select.select([self._socket], [], [self._socket], 1.0)
if errors:
break
if readers:
self._read_available()
finally:
self.connected = False
self.transportDisconnected.notify()
self._disconnect()
def _create_socket(self, insecure: bool | None = None) -> ssl.SSLSocket:
insecure = self.insecure if insecure is None else insecure
host, port = self.address
family, socktype, proto, _, _ = socket.getaddrinfo(host, port)[0]
plainSocket = socket.socket(family, socktype, proto)
if self.timeout:
plainSocket.settimeout(self.timeout)
plainSocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
plainSocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
context = ssl._create_unverified_context() if insecure else ssl.create_default_context()
return context.wrap_socket(plainSocket, server_hostname=host)
def _capture_failed_fingerprint(self) -> None:
try:
socketForCert = self._create_socket(insecure=True)
socketForCert.connect(self.address)
certificate = socketForCert.getpeercert(binary_form=True)
socketForCert.close()
if certificate:
self.lastFailFingerprint = hashlib.sha256(certificate).hexdigest().lower()
except Exception:
self.lastFailFingerprint = None
def _send_join_messages(self) -> None:
self.send(RemoteMessageType.protocol_version, version=self.protocolVersion)
self.send(RemoteMessageType.join, channel=self.channel, connection_type=self.connectionType)
def _read_available(self) -> None:
if self._socket is None:
return
with self._socketLock:
self._socket.setblocking(False)
try:
data = self._buffer + self._socket.recv(16384)
except ssl.SSLWantReadError:
return
finally:
self._socket.setblocking(True)
self._buffer = b""
if not data:
self._disconnect()
return
while b"\n" in data:
line, _, data = data.partition(b"\n")
self._parse_line(line)
self._buffer = data
def _parse_line(self, line: bytes) -> None:
try:
message = self.serializer.deserialize(line)
messageType = RemoteMessageType(message.pop("type"))
except Exception:
logger.exception("Ignoring invalid remote message: %r", line)
return
action = self._handlers.get(messageType)
if action is None:
logger.debug("No handler registered for remote message type: %s", messageType.value)
return
GLib.idle_add(self._notify_action, action, message)
def _notify_action(self, action: Action, payload: dict[str, Any]) -> bool:
action.notify(**payload)
return False
def _send_loop(self) -> None:
while True:
item = self._sendQueue.get()
if item is None:
return
try:
with self._socketLock:
if self._socket is not None:
self._socket.sendall(item)
except OSError:
return
def _disconnect(self) -> None:
if self._sendThread is not None:
self._sendQueue.put(None)
self._sendThread.join(timeout=2)
self._sendThread = None
self._clear_send_queue()
if self._socket is not None:
try:
self._socket.close()
except OSError:
pass
self._socket = None
self._buffer = b""
def _clear_send_queue(self) -> None:
try:
while True:
self._sendQueue.get_nowait()
except queue.Empty:
pass
+1
View File
@@ -2,6 +2,7 @@
subdir('AIAssistant') subdir('AIAssistant')
subdir('ByeCthulhu') subdir('ByeCthulhu')
subdir('Clipboard') subdir('Clipboard')
subdir('CthulhuRemote')
subdir('DisplayVersion') subdir('DisplayVersion')
subdir('HelloCthulhu') subdir('HelloCthulhu')
subdir('GameMode') subdir('GameMode')
+5 -3
View File
@@ -3470,7 +3470,7 @@ class Script(script.Script):
return True return True
def presentMessage(self, fullMessage, briefMessage=None, voice=None, resetStyles=True, def presentMessage(self, fullMessage, briefMessage=None, voice=None, resetStyles=True,
force=False, interrupt=True): force=False, interrupt=False):
"""Convenience method to speak a message and 'flash' it in braille. """Convenience method to speak a message and 'flash' it in braille.
Arguments: Arguments:
@@ -3484,6 +3484,8 @@ class Script(script.Script):
the briefMessage should set briefMessage to an empty string. the briefMessage should set briefMessage to an empty string.
- voice: The voice to use when speaking this message. By default, the - voice: The voice to use when speaking this message. By default, the
"system" voice will be used. "system" voice will be used.
- interrupt: If True, any current speech should be interrupted
prior to speaking the new text. The default queues the message.
""" """
if not fullMessage: if not fullMessage:
@@ -3959,7 +3961,7 @@ class Script(script.Script):
voice = self.speechGenerator.voice(string=character) voice = self.speechGenerator.voice(string=character)
speech.speakCharacter(character, voice) speech.speakCharacter(character, voice)
def speakMessage(self, string, voice=None, interrupt=True, resetStyles=True, force=False): def speakMessage(self, string, voice=None, interrupt=False, resetStyles=True, force=False):
"""Method to speak a single string. Scripts should use this """Method to speak a single string. Scripts should use this
method rather than calling speech.speak directly. method rather than calling speech.speak directly.
@@ -3967,7 +3969,7 @@ class Script(script.Script):
- voice: The voice to use. By default, the "system" voice will - voice: The voice to use. By default, the "system" voice will
be used. be used.
- interrupt: If True, any current speech should be interrupted - interrupt: If True, any current speech should be interrupted
prior to speaking the new text. prior to speaking the new text. The default queues the message.
""" """
if not cthulhu.cthulhuApp.settingsManager.getSetting('enableSpeech') \ if not cthulhu.cthulhuApp.settingsManager.getSetting('enableSpeech') \
+70 -8
View File
@@ -215,7 +215,7 @@ class Script(default.Script):
self._clearSyntheticWebSelection() self._clearSyntheticWebSelection()
if oldContents: if oldContents:
self.speakContents(oldContents) self.speakContents(oldContents)
self.speakMessage(messages.TEXT_UNSELECTED, interrupt=False) self.speakMessage(messages.TEXT_UNSELECTED)
return True return True
self.pointOfReference["syntheticWebSelection"] = { self.pointOfReference["syntheticWebSelection"] = {
@@ -247,7 +247,7 @@ class Script(default.Script):
if deltaContents: if deltaContents:
self.speakContents(deltaContents) self.speakContents(deltaContents)
self.speakMessage(message, interrupt=False) self.speakMessage(message)
return True return True
@@ -1083,7 +1083,7 @@ class Script(default.Script):
"""Speaks the specified contents.""" """Speaks the specified contents."""
utterances = self.speechGenerator.generateContents(contents, **args) utterances = self.speechGenerator.generateContents(contents, **args)
speech.speak(utterances) speech.speak(utterances, interrupt=args.get("interrupt", False))
def sayCharacter(self, obj): def sayCharacter(self, obj):
"""Speaks the character at the current caret position.""" """Speaks the character at the current caret position."""
@@ -1511,7 +1511,10 @@ class Script(default.Script):
elif AXUtilities.is_menu(parent): elif AXUtilities.is_menu(parent):
self.utilities.setCaretContext(AXObject.get_parent(parent), -1) self.utilities.setCaretContext(AXObject.get_parent(parent), -1)
if not self._loadingDocumentContent: if not self._loadingDocumentContent:
self.presentMessage(messages.MODE_BROWSE) if inputEvent is not None:
self.presentMessage(messages.MODE_BROWSE, interrupt=True)
else:
self.presentMessage(messages.MODE_BROWSE)
if not self._shouldSuppressBrowseModeSound(obj, inputEvent): if not self._shouldSuppressBrowseModeSound(obj, inputEvent):
sound_theme_manager.getManager().playBrowseModeSound() sound_theme_manager.getManager().playBrowseModeSound()
else: else:
@@ -1521,7 +1524,10 @@ class Script(default.Script):
or inputEvent): or inputEvent):
self.utilities.grabFocus(obj) self.utilities.grabFocus(obj)
self.presentMessage(messages.MODE_FOCUS) if inputEvent is not None:
self.presentMessage(messages.MODE_FOCUS, interrupt=True)
else:
self.presentMessage(messages.MODE_FOCUS)
sound_theme_manager.getManager().playFocusModeSound() sound_theme_manager.getManager().playFocusModeSound()
self._inFocusMode = not self._inFocusMode self._inFocusMode = not self._inFocusMode
self._focusModeIsSticky = False self._focusModeIsSticky = False
@@ -1873,11 +1879,15 @@ class Script(default.Script):
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
return True return True
if self.utilities.shouldInterruptForLocusOfFocusChange(oldFocus, newFocus, event):
self.presentationInterrupt()
args["interrupt"] = False
if contents: if contents:
self.speakContents(contents, **args) self.speakContents(contents, **args)
else: else:
utterances = self.speechGenerator.generateSpeech(newFocus, **args) utterances = self.speechGenerator.generateSpeech(newFocus, **args)
speech.speak(utterances) speech.speak(utterances, interrupt=False)
self._saveFocusedObjectInfo(newFocus) self._saveFocusedObjectInfo(newFocus)
@@ -2266,7 +2276,10 @@ class Script(default.Script):
tokens = ["WEB: Dumping cache and context: source is focus", tokens = ["WEB: Dumping cache and context: source is focus",
cthulhu_state.locusOfFocus] cthulhu_state.locusOfFocus]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
self.utilities.dumpCache(document, preserveContext=False) self.utilities.dumpCache(
document,
preserveContext=not AXObject.is_dead(event.source),
)
elif AXObject.is_dead(cthulhu_state.locusOfFocus): elif AXObject.is_dead(cthulhu_state.locusOfFocus):
msg = "WEB: Dumping cache: dead focus" msg = "WEB: Dumping cache: dead focus"
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -2379,7 +2392,10 @@ class Script(default.Script):
tokens = ["WEB: Dumping cache and context: source is focus", tokens = ["WEB: Dumping cache and context: source is focus",
cthulhu_state.locusOfFocus] cthulhu_state.locusOfFocus]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
self.utilities.dumpCache(document, preserveContext=False) self.utilities.dumpCache(
document,
preserveContext=not AXObject.is_dead(event.source),
)
elif AXObject.is_dead(cthulhu_state.locusOfFocus): elif AXObject.is_dead(cthulhu_state.locusOfFocus):
msg = "WEB: Dumping cache: dead focus" msg = "WEB: Dumping cache: dead focus"
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
@@ -2584,6 +2600,14 @@ class Script(default.Script):
else: else:
msg = "WEB: Search for caret context failed" msg = "WEB: Search for caret context failed"
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
if AXUtilities.is_focusable(event.source) \
and AXUtilities.is_focused(event.source) \
and self.utilities.inDocumentContent(event.source):
msg = "WEB: Falling back to focused event source as caret context"
debug.printMessage(debug.LEVEL_INFO, msg, True)
cthulhu.setLocusOfFocus(event, event.source, False)
self.utilities.setCaretContext(event.source, 0)
obj, offset = event.source, 0
if self._lastCommandWasCaretNav: if self._lastCommandWasCaretNav:
msg = "WEB: Event ignored: Last command was caret nav" msg = "WEB: Event ignored: Last command was caret nav"
@@ -2649,6 +2673,44 @@ class Script(default.Script):
self._lastCommandWasMouseButton = True self._lastCommandWasMouseButton = True
return False return False
def onDescriptionChanged(self, event):
"""Callback for object:property-change:accessible-description events."""
if self.utilities.eventIsBrowserUINoise(event):
msg = "WEB: Ignoring event believed to be browser UI noise"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return True
obj = event.source
if not self.utilities.inDocumentContent(obj):
return False
descriptions = self.pointOfReference.get('descriptions', {})
oldDescription = descriptions.get(hash(obj))
if oldDescription == event.any_data:
tokens = ["WEB: Old description (", oldDescription, ") is the same as new one"]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return True
descriptions[hash(obj)] = event.any_data
self.pointOfReference['descriptions'] = descriptions
if obj != cthulhu_state.locusOfFocus:
msg = "WEB: Description change is for object other than locusOfFocus"
debug.printMessage(debug.LEVEL_INFO, msg, True)
return True
if not event.any_data:
return True
name = AXObject.get_name(obj)
if self.utilities.stringsAreRedundant(name, event.any_data):
tokens = ["WEB: Description change is redundant with name for", obj]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return True
self.presentMessage(event.any_data)
return True
def onNameChanged(self, event): def onNameChanged(self, event):
"""Callback for object:property-change:accessible-name events.""" """Callback for object:property-change:accessible-name events."""
+18 -1
View File
@@ -276,6 +276,14 @@ class Utilities(script_utilities.Utilities):
def sanityCheckActiveWindow(self): def sanityCheckActiveWindow(self):
app = self._script.app app = self._script.app
if app is None:
app = AXObject.get_application(cthulhu_state.activeWindow) \
or AXObject.get_application(cthulhu_state.locusOfFocus)
if app is not None:
tokens = ["WEB: recovered script app for active window check:", app]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
self._script.app = app
if AXObject.get_parent(cthulhu_state.activeWindow) == app: if AXObject.get_parent(cthulhu_state.activeWindow) == app:
return True return True
@@ -299,6 +307,14 @@ class Utilities(script_utilities.Utilities):
setattr(self._script, attr, value) setattr(self._script, attr, value)
window = self.activeWindow(app) window = self.activeWindow(app)
if window is None:
tokens = [
"WARNING: WEB could not confirm a replacement active window; preserving",
cthulhu_state.activeWindow,
]
debug.printTokens(debug.LEVEL_INFO, tokens, True)
return cthulhu_state.activeWindow is not None
self._script.app = AXObject.get_application(window) self._script.app = AXObject.get_application(window)
tokens = ["WEB: updating script's app to", self._script.app] tokens = ["WEB: updating script's app to", self._script.app]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
@@ -5034,7 +5050,8 @@ class Utilities(script_utilities.Utilities):
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
return False return False
if not AXObject.is_dead(cthulhu_state.locusOfFocus): if not AXObject.is_dead(cthulhu_state.locusOfFocus) \
and not self.isSameObject(replicant, cthulhu_state.locusOfFocus, True, True):
tokens = ["WEB: Not event from context replicant. locusOfFocus", tokens = ["WEB: Not event from context replicant. locusOfFocus",
cthulhu_state.locusOfFocus, "is not dead."] cthulhu_state.locusOfFocus, "is not dead."]
debug.printTokens(debug.LEVEL_INFO, tokens, True) debug.printTokens(debug.LEVEL_INFO, tokens, True)
+40 -7
View File
@@ -36,6 +36,7 @@ __license__ = "LGPL"
import importlib import importlib
import time import time
from contextlib import contextmanager
from typing import TYPE_CHECKING, Optional, List, Dict, Any, Union, Callable from typing import TYPE_CHECKING, Optional, List, Dict, Any, Union, Callable
from . import debug from . import debug
@@ -76,6 +77,8 @@ _timestamp: float = 0.0
# Optional callback for live monitoring of spoken text. # Optional callback for live monitoring of spoken text.
_monitorWriteTextCallback: Optional[Callable[[str], None]] = None _monitorWriteTextCallback: Optional[Callable[[str], None]] = None
_monitorWriteTextListeners: List[Callable[[str], None]] = []
_monitorSuppressionDepth = 0
def _isSpeechDispatcherFactory(moduleName: Optional[str]) -> bool: def _isSpeechDispatcherFactory(moduleName: Optional[str]) -> bool:
if not moduleName: if not moduleName:
@@ -325,15 +328,44 @@ def set_monitor_callbacks(writeText: Optional[Callable[[str], None]] = None) ->
global _monitorWriteTextCallback global _monitorWriteTextCallback
_monitorWriteTextCallback = writeText _monitorWriteTextCallback = writeText
def add_monitor_callback(writeText: Callable[[str], None]) -> None:
"""Adds a runtime callback for live speech monitoring."""
if writeText not in _monitorWriteTextListeners:
_monitorWriteTextListeners.append(writeText)
def remove_monitor_callback(writeText: Callable[[str], None]) -> None:
"""Removes a runtime callback for live speech monitoring."""
if writeText in _monitorWriteTextListeners:
_monitorWriteTextListeners.remove(writeText)
@contextmanager
def suppress_monitor_callbacks():
"""Temporarily suppresses live speech monitoring callbacks."""
global _monitorSuppressionDepth
_monitorSuppressionDepth += 1
try:
yield
finally:
_monitorSuppressionDepth = max(0, _monitorSuppressionDepth - 1)
def _write_to_monitor(text: str) -> None: def _write_to_monitor(text: str) -> None:
"""Writes text to the active speech monitor callback if set.""" """Writes text to the active speech monitor callback if set."""
if _monitorWriteTextCallback is None: if _monitorSuppressionDepth:
return return
try: callbacks = []
_monitorWriteTextCallback(text) if _monitorWriteTextCallback is not None:
except Exception: callbacks.append(_monitorWriteTextCallback)
debug.printException(debug.LEVEL_INFO) callbacks.extend(
callback for callback in _monitorWriteTextListeners
if callback != _monitorWriteTextCallback
)
for callback in callbacks:
try:
callback(text)
except Exception:
debug.printException(debug.LEVEL_INFO)
def __resolveACSS(acss: Optional[Any] = None) -> ACSS: def __resolveACSS(acss: Optional[Any] = None) -> ACSS:
if isinstance(acss, ACSS): if isinstance(acss, ACSS):
@@ -425,10 +457,11 @@ def _speak(text: str, acss: Optional[Any], interrupt: bool) -> None:
debug.printMessage(debug.LEVEL_INFO, msg, True) debug.printMessage(debug.LEVEL_INFO, msg, True)
_speechserver.speak(text, resolvedVoice, interrupt) # type: ignore _speechserver.speak(text, resolvedVoice, interrupt) # type: ignore
def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = True) -> None: def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = False) -> None:
"""Speaks the given content. The content can be either a simple """Speaks the given content. The content can be either a simple
string or an array of arrays of objects returned by a speech string or an array of arrays of objects returned by a speech
generator.""" generator. Speech queues by default; callers that need to cancel
current output should pass interrupt=True or call presentationInterrupt()."""
if settings.silenceSpeech: if settings.silenceSpeech:
return return
+1 -1
View File
@@ -632,7 +632,7 @@ class SpeechServer(speechserver.SpeechServer):
return families return families
def speak(self, text=None, acss=None, interrupt=True): def speak(self, text=None, acss=None, interrupt=False):
if not text: if not text:
return return
+5 -5
View File
@@ -935,9 +935,9 @@ class StructuralNavigation:
return return
if not isNext: if not isNext:
self._script.presentMessage(messages.WRAPPING_TO_BOTTOM) wrapMessage = messages.WRAPPING_TO_BOTTOM
else: else:
self._script.presentMessage(messages.WRAPPING_TO_TOP) wrapMessage = messages.WRAPPING_TO_TOP
matches = self._getAll(structuralNavigationObject, arg) matches = self._getAll(structuralNavigationObject, arg)
if not isNext: if not isNext:
@@ -946,6 +946,7 @@ class StructuralNavigation:
for match in matches: for match in matches:
if _isValidMatch(match): if _isValidMatch(match):
structuralNavigationObject.present(match, arg) structuralNavigationObject.present(match, arg)
self._script.presentMessage(wrapMessage)
return return
structuralNavigationObject.present(None, arg) structuralNavigationObject.present(None, arg)
@@ -2218,14 +2219,13 @@ class StructuralNavigation:
if settings.speakCellCoordinates: if settings.speakCellCoordinates:
[row, col] = self.getCellCoordinates(cell) [row, col] = self.getCellCoordinates(cell)
self._script.presentMessage( self._script.presentMessage(
messages.TABLE_CELL_COORDINATES % {"row": row + 1, "column": col + 1}, messages.TABLE_CELL_COORDINATES % {"row": row + 1, "column": col + 1}
interrupt=False,
) )
rowspan, colspan = self._script.utilities.rowAndColumnSpan(cell) rowspan, colspan = self._script.utilities.rowAndColumnSpan(cell)
spanString = messages.cellSpan(rowspan, colspan) spanString = messages.cellSpan(rowspan, colspan)
if spanString and settings.speakCellSpan: if spanString and settings.speakCellSpan:
self._script.presentMessage(spanString, interrupt=False) self._script.presentMessage(spanString)
######################## ########################
# # # #
@@ -0,0 +1,68 @@
import unittest
from unittest import mock
import gi
gi.require_version("Atspi", "2.0")
from gi.repository import Atspi
from cthulhu import ax_object
from cthulhu import cthulhu_state
from cthulhu import focus_manager
class ChromiumOmniboxRegressionTests(unittest.TestCase):
def tearDown(self):
cthulhu_state.activeWindow = None
cthulhu_state.locusOfFocus = None
def test_detects_chromium_popup_item_with_broken_ancestry(self):
popupItem = object()
with (
mock.patch.object(ax_object.AXObject, "is_dead", return_value=False),
mock.patch.object(ax_object.AXObject, "get_toolkit_name", return_value="chromium"),
mock.patch.object(ax_object.AXObject, "get_role", return_value=Atspi.Role.LIST_ITEM),
mock.patch.object(ax_object.AXObject, "get_parent", return_value=None),
mock.patch.object(ax_object.debug, "print_tokens"),
):
self.assertTrue(ax_object.AXObject.has_broken_popup_ancestry(popupItem))
def test_ignores_chromium_popup_item_with_valid_ancestry(self):
popupItem = object()
parent = object()
def get_role(obj):
if obj is popupItem:
return Atspi.Role.LIST_ITEM
return Atspi.Role.APPLICATION
with (
mock.patch.object(ax_object.AXObject, "is_dead", return_value=False),
mock.patch.object(ax_object.AXObject, "get_toolkit_name", return_value="chromium"),
mock.patch.object(ax_object.AXObject, "get_role", side_effect=get_role),
mock.patch.object(ax_object.AXObject, "get_parent", side_effect=[parent, None]),
):
self.assertFalse(ax_object.AXObject.has_broken_popup_ancestry(popupItem))
def test_preserves_active_window_when_chromium_popup_ancestry_is_broken(self):
activeWindow = object()
popupItem = object()
cthulhu_state.activeWindow = activeWindow
cthulhu_state.locusOfFocus = popupItem
controller = mock.Mock()
app = mock.Mock()
manager = None
with (
mock.patch.object(focus_manager.dbus_service, "get_remote_controller", return_value=controller),
mock.patch.object(focus_manager.AXObject, "has_broken_popup_ancestry", return_value=True),
):
manager = focus_manager.FocusManager(app)
manager.set_active_window(None)
self.assertIs(manager.get_active_window(), activeWindow)
self.assertIs(cthulhu_state.activeWindow, activeWindow)
if __name__ == "__main__":
unittest.main()
+129
View File
@@ -0,0 +1,129 @@
import sys
import types
import unittest
from pathlib import Path
from unittest import mock
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
input_event_manager_stub = types.ModuleType("cthulhu.input_event_manager")
input_event_manager_stub.get_manager = mock.Mock(return_value=mock.Mock())
sys.modules["cthulhu.input_event_manager"] = input_event_manager_stub
from cthulhu.plugin_system_manager import PluginSystemManager
from cthulhu import cthulhu_state
from cthulhu import speech
from cthulhu.plugins.CthulhuRemote.connection_info import ConnectionInfo, ConnectionMode
from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine
from cthulhu.plugins.CthulhuRemote.plugin import CthulhuRemote
from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType
from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer
class CthulhuRemotePluginTests(unittest.TestCase):
def test_connection_info_accepts_nvda_remote_style_fields(self):
info = ConnectionInfo.from_url("cthulhuremote://example.com:1234?key=abc&mode=slave")
self.assertEqual(info.hostname, "example.com")
self.assertEqual(info.port, 1234)
self.assertEqual(info.key, "abc")
self.assertEqual(info.mode, ConnectionMode.SLAVE)
self.assertEqual(
info.get_url_to_connect(),
"cthulhuremote://example.com:1234?key=abc&mode=master",
)
def test_json_serializer_uses_remote_message_type_values(self):
serializer = JSONSerializer()
payload = serializer.deserialize(serializer.serialize(RemoteMessageType.join, channel="abc"))
self.assertEqual(payload["type"], "join")
self.assertEqual(payload["channel"], "abc")
def test_local_machine_maps_common_windows_vk_codes_to_keyvals(self):
machine = LocalMachine(lambda message: None)
self.assertEqual(machine._resolve_keyval(0x41, None), machine._resolve_keyval(None, "a"))
self.assertEqual(machine._resolve_keyval(0x70, None), machine._resolve_keyval(None, "F1"))
self.assertIsNone(machine._resolve_keyval(0xFF, None))
def test_speech_monitor_callbacks_are_additive(self):
primary = mock.Mock()
listener = mock.Mock()
speech.set_monitor_callbacks(writeText=primary)
speech.add_monitor_callback(listener)
self.addCleanup(speech.set_monitor_callbacks, None)
self.addCleanup(speech.remove_monitor_callback, listener)
speech._write_to_monitor("status")
primary.assert_called_once_with("status")
listener.assert_called_once_with("status")
def test_remote_speech_does_not_echo_to_monitor_callbacks(self):
listener = mock.Mock()
speech.add_monitor_callback(listener)
self.addCleanup(speech.remove_monitor_callback, listener)
with (
mock.patch.object(speech, "_speechserver", None),
mock.patch.object(speech.speech_history, "add"),
mock.patch.object(cthulhu_state, "activeScript", None),
):
LocalMachine(lambda message: None).speak(["remote", "status"])
listener.assert_not_called()
def test_slave_mode_forwards_local_speech_to_relay(self):
plugin = CthulhuRemote()
plugin._connectionInfo = ConnectionInfo(
hostname="example.com",
port=1234,
key="abc",
mode=ConnectionMode.SLAVE,
)
plugin._transport = mock.Mock()
plugin._transport.connected = True
plugin._send_local_speech("focused button")
plugin._transport.send.assert_called_once_with(
RemoteMessageType.speak,
sequence=["focused button"],
)
def test_master_mode_does_not_forward_local_speech_to_relay(self):
plugin = CthulhuRemote()
plugin._connectionInfo = ConnectionInfo(
hostname="example.com",
port=1234,
key="abc",
mode=ConnectionMode.MASTER,
)
plugin._transport = mock.Mock()
plugin._transport.connected = True
plugin._send_local_speech("local status")
plugin._transport.send.assert_not_called()
@mock.patch("cthulhu.plugin_system_manager.dbus_service.get_remote_controller")
def test_plugin_manager_can_load_cthulhu_remote(self, remote_controller):
remote_controller.return_value = mock.Mock()
app = mock.Mock()
app.getSignalManager.return_value = mock.Mock()
app.getAPIHelper.return_value = None
manager = PluginSystemManager(app)
manager.rescanPlugins()
plugin_info = manager._resolve_plugin_info("CthulhuRemote")
self.assertIsNotNone(plugin_info)
self.assertTrue(manager.loadPlugin(plugin_info))
self.assertEqual(plugin_info.instance.__class__.__name__, "CthulhuRemote")
self.assertTrue(manager.unloadPlugin(plugin_info))
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,91 @@
import sys
import unittest
from pathlib import Path
from unittest import mock
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from cthulhu import settings
from cthulhu import speech
from cthulhu import cthulhu_state
from cthulhu.scripts import default
class SpeechDefaultPolicyRegressionTests(unittest.TestCase):
def _make_settings_manager(self):
values = {
"enableSpeech": True,
"onlySpeakDisplayedText": False,
"messagesAreDetailed": True,
"enableBraille": False,
"enableBrailleMonitor": False,
"enableFlashMessages": False,
"voices": {settings.SYSTEM_VOICE: "system"},
"capitalizationStyle": settings.CAPITALIZATION_STYLE_NONE,
"verbalizePunctuationStyle": settings.PUNCTUATION_STYLE_NONE,
}
manager = mock.Mock()
manager.getSetting.side_effect = values.get
return manager
def _make_script(self):
testScript = default.Script.__new__(default.Script)
testScript.speechAndVerbosityManager = mock.Mock()
return testScript
def test_speech_speak_queues_by_default(self):
server = mock.Mock()
with (
mock.patch.object(speech, "_speechserver", server),
mock.patch.object(speech, "_write_to_monitor"),
mock.patch.object(speech.speech_history, "add"),
mock.patch.object(cthulhu_state, "activeScript", None),
):
speech.speak("status")
server.speak.assert_called_once()
self.assertFalse(server.speak.call_args.args[2])
def test_speech_speak_explicit_interrupt_is_preserved(self):
server = mock.Mock()
with (
mock.patch.object(speech, "_speechserver", server),
mock.patch.object(speech, "_write_to_monitor"),
mock.patch.object(speech.speech_history, "add"),
mock.patch.object(cthulhu_state, "activeScript", None),
):
speech.speak("status", interrupt=True)
server.speak.assert_called_once()
self.assertTrue(server.speak.call_args.args[2])
def test_present_message_queues_by_default(self):
testScript = self._make_script()
manager = self._make_settings_manager()
with (
mock.patch.object(default.cthulhu.cthulhuApp, "settingsManager", manager),
mock.patch.object(default.speech, "speak") as speak,
):
default.Script.presentMessage(testScript, "Status")
speak.assert_called_once_with("Status", "system", False)
def test_present_message_explicit_interrupt_is_preserved(self):
testScript = self._make_script()
manager = self._make_settings_manager()
with (
mock.patch.object(default.cthulhu.cthulhuApp, "settingsManager", manager),
mock.patch.object(default.speech, "speak") as speak,
):
default.Script.presentMessage(testScript, "Status", interrupt=True)
speak.assert_called_once_with("Status", "system", True)
if __name__ == "__main__":
unittest.main()
@@ -30,6 +30,14 @@ class SpeechDispatcherInterruptRegressionTests(unittest.TestCase):
server._cancel.assert_called_once_with() server._cancel.assert_called_once_with()
server._speak.assert_called_once_with("long utterance", None) server._speak.assert_called_once_with("long utterance", None)
def test_string_speech_queues_by_default(self):
server = self._make_server()
server.speak("next")
server._cancel.assert_not_called()
server._speak.assert_called_once_with("next", None)
def test_recent_key_echo_suppresses_backend_cancel(self): def test_recent_key_echo_suppresses_backend_cancel(self):
server = self._make_server() server = self._make_server()
server._lastKeyEchoTime = time.time() server._lastKeyEchoTime = time.time()
@@ -38,8 +38,47 @@ class StructuralNavigationTableRegressionTests(unittest.TestCase):
navigator._presentObject.assert_called_once_with("cell", 0) navigator._presentObject.assert_called_once_with("cell", 0)
navigator._script.presentMessage.assert_called_once_with( navigator._script.presentMessage.assert_called_once_with(
messages.TABLE_CELL_COORDINATES % {"row": 2, "column": 3}, messages.TABLE_CELL_COORDINATES % {"row": 2, "column": 3}
interrupt=False, )
def test_wrapping_announcement_does_not_interrupt_wrapped_object(self):
navigator = structural_navigation.StructuralNavigation.__new__(
structural_navigation.StructuralNavigation
)
script = mock.Mock()
script.utilities.isZombie.return_value = False
script.utilities.isHidden.return_value = False
script.utilities.isEmpty.return_value = False
script.utilities.pathComparison.return_value = 0
navigator._script = script
first = object()
current = object()
structuralNavigationObject = mock.Mock()
structuralNavigationObject.predicate = None
events = []
structuralNavigationObject.present.side_effect = lambda obj, arg=None: events.append(
("present", obj, arg)
)
script.presentMessage.side_effect = lambda message, **kwargs: events.append(
("message", message, kwargs)
)
navigator._getAll = mock.Mock(return_value=[first, current])
with (
mock.patch.object(structural_navigation.settings, "wrappedStructuralNavigation", True),
mock.patch.object(structural_navigation.AXObject, "is_dead", return_value=False),
mock.patch.object(structural_navigation.AXObject, "get_parent", return_value=None),
mock.patch.object(structural_navigation.AXObject, "get_path", side_effect=lambda obj: [id(obj)]),
):
navigator.goObject(structuralNavigationObject, True, current, "arg")
self.assertEqual(
events,
[
("present", first, "arg"),
("message", messages.WRAPPING_TO_TOP, {}),
],
) )
+334 -2
View File
@@ -100,6 +100,338 @@ class WebKeyGrabRegressionTests(unittest.TestCase):
testScript.refreshKeyGrabs.assert_called_once_with() testScript.refreshKeyGrabs.assert_called_once_with()
class WebActiveWindowRegressionTests(unittest.TestCase):
def test_sanity_check_recovers_missing_script_app_from_active_window(self):
testScript = mock.Mock(app=None)
utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities)
utilities._script = testScript
activeWindow = object()
app = object()
with (
mock.patch.object(web_script_utilities.cthulhu_state, "activeWindow", activeWindow),
mock.patch.object(web_script_utilities.cthulhu_state, "locusOfFocus", None),
mock.patch.object(
web_script_utilities.AXObject,
"get_application",
side_effect=lambda obj: app if obj is activeWindow else None,
),
mock.patch.object(
web_script_utilities.AXObject,
"get_parent",
side_effect=lambda obj: app if obj is activeWindow else None,
),
mock.patch.object(web_script_utilities.cthulhu, "setActiveWindow") as setActiveWindow,
):
result = web_script_utilities.Utilities.sanityCheckActiveWindow(utilities)
self.assertTrue(result)
self.assertIs(testScript.app, app)
setActiveWindow.assert_not_called()
def test_sanity_check_preserves_current_window_when_recovery_is_inconclusive(self):
oldApp = object()
testScript = mock.Mock(app=oldApp)
utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities)
utilities._script = testScript
activeWindow = object()
with (
mock.patch.object(web_script_utilities.cthulhu_state, "activeWindow", activeWindow),
mock.patch.object(
web_script_utilities.AXObject,
"get_parent",
return_value=object(),
),
mock.patch.object(utilities, "activeWindow", return_value=None),
mock.patch.object(web_script_utilities.cthulhu, "setActiveWindow") as setActiveWindow,
):
result = web_script_utilities.Utilities.sanityCheckActiveWindow(utilities)
self.assertTrue(result)
self.assertIs(testScript.app, oldApp)
setActiveWindow.assert_not_called()
class WebPresentationModeSpeechRegressionTests(unittest.TestCase):
def _make_script(self, inFocusMode):
testScript = web_script.Script.__new__(web_script.Script)
testScript._inFocusMode = inFocusMode
testScript._focusModeIsSticky = True
testScript._browseModeIsSticky = True
testScript._loadingDocumentContent = False
testScript._lastCommandWasCaretNav = False
testScript._lastCommandWasStructNav = False
testScript.utilities = mock.Mock()
testScript.utilities.getCaretContext.return_value = ("button", 0)
testScript.utilities.grabFocusWhenSettingCaret.return_value = True
testScript.presentMessage = mock.Mock()
testScript.refreshKeyGrabs = mock.Mock()
testScript._shouldSuppressBrowseModeSound = mock.Mock(return_value=False)
return testScript
def test_automatic_browse_mode_announcement_does_not_interrupt_focus_presentation(self):
testScript = self._make_script(inFocusMode=True)
soundManager = mock.Mock()
with (
mock.patch.object(web_script.AXObject, "get_parent", return_value=None),
mock.patch.object(web_script.AXUtilities, "is_list_box", return_value=False),
mock.patch.object(web_script.AXUtilities, "is_menu", return_value=False),
mock.patch.object(web_script.sound_theme_manager, "getManager", return_value=soundManager),
):
web_script.Script.togglePresentationMode(testScript, None, "document")
testScript.presentMessage.assert_called_once_with(messages.MODE_BROWSE)
soundManager.playBrowseModeSound.assert_called_once_with()
self.assertFalse(testScript._inFocusMode)
self.assertFalse(testScript._focusModeIsSticky)
self.assertFalse(testScript._browseModeIsSticky)
testScript.refreshKeyGrabs.assert_called_once_with()
def test_automatic_focus_mode_announcement_does_not_interrupt_focus_presentation(self):
testScript = self._make_script(inFocusMode=False)
soundManager = mock.Mock()
with mock.patch.object(
web_script.sound_theme_manager,
"getManager",
return_value=soundManager,
):
web_script.Script.togglePresentationMode(testScript, None, "document")
testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS)
soundManager.playFocusModeSound.assert_called_once_with()
self.assertTrue(testScript._inFocusMode)
self.assertFalse(testScript._focusModeIsSticky)
self.assertFalse(testScript._browseModeIsSticky)
testScript.refreshKeyGrabs.assert_called_once_with()
def test_manual_presentation_mode_toggle_still_interrupts(self):
testScript = self._make_script(inFocusMode=False)
with mock.patch.object(web_script.sound_theme_manager, "getManager", return_value=mock.Mock()):
web_script.Script.togglePresentationMode(testScript, object(), "document")
testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS, interrupt=True)
class WebFocusSpeechInterruptionRegressionTests(unittest.TestCase):
def _make_script(self):
testScript = web_script.Script.__new__(web_script.Script)
testScript._navSuspended = False
testScript._lastCommandWasCaretNav = False
testScript._lastCommandWasStructNav = False
testScript._lastCommandWasMouseButton = False
testScript._inFocusMode = True
testScript._focusModeIsSticky = True
testScript._browseModeIsSticky = False
testScript.flatReviewPresenter = mock.Mock()
testScript.flatReviewPresenter.is_active.return_value = False
testScript.utilities = mock.Mock()
testScript.utilities.isZombie.return_value = False
testScript.utilities.isDocument.return_value = False
testScript.utilities.getTopLevelDocumentForObject.return_value = "document"
testScript.utilities.inFindContainer.return_value = False
testScript.utilities.queryNonEmptyText.return_value = None
testScript.utilities.isContentEditableWithEmbeddedObjects.return_value = False
testScript.utilities.isAnchor.return_value = False
testScript.utilities.lastInputEventWasPageNav.return_value = False
testScript.utilities.isFocusedWithMathChild.return_value = False
testScript.utilities.caretMovedToSamePageFragment.return_value = False
testScript.utilities.lastInputEventWasLineNav.return_value = False
testScript.utilities.inDocumentContent.return_value = True
testScript.utilities.shouldInterruptForLocusOfFocusChange.return_value = True
testScript.speechGenerator = mock.Mock()
testScript.speechGenerator.generateSpeech.return_value = ["focus speech"]
testScript.updateBraille = mock.Mock()
testScript.presentationInterrupt = mock.Mock()
testScript._saveFocusedObjectInfo = mock.Mock()
testScript.refreshKeyGrabs = mock.Mock()
return testScript
def test_focus_generated_speech_uses_explicit_interrupt_then_appends(self):
testScript = self._make_script()
oldFocus = object()
newFocus = object()
event = mock.Mock(type="object:state-changed:focused", source=newFocus)
with (
mock.patch.object(web_script.AXObject, "is_dead", return_value=False),
mock.patch.object(web_script.AXUtilities, "is_unknown_or_redundant", return_value=False),
mock.patch.object(web_script.AXUtilities, "is_heading", return_value=False),
mock.patch.object(web_script.speech, "speak") as speak,
mock.patch.object(web_script.cthulhu, "emitRegionChanged"),
):
result = web_script.Script.locus_of_focus_changed(
testScript, event, oldFocus, newFocus
)
self.assertTrue(result)
testScript.presentationInterrupt.assert_called_once_with()
speak.assert_called_once_with(["focus speech"], interrupt=False)
class WebDescriptionChangeRegressionTests(unittest.TestCase):
def _make_script(self):
testScript = web_script.Script.__new__(web_script.Script)
testScript.pointOfReference = {}
testScript.utilities = mock.Mock()
testScript.utilities.eventIsBrowserUINoise.return_value = False
testScript.utilities.inDocumentContent.return_value = True
testScript.utilities.stringsAreRedundant.side_effect = (
lambda first, second: first == second
)
testScript.presentMessage = mock.Mock()
return testScript
def test_document_description_change_matching_name_is_ignored(self):
testScript = self._make_script()
source = object()
event = mock.Mock(source=source, any_data="More")
with (
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", source),
mock.patch.object(web_script.AXObject, "get_name", return_value="More"),
):
result = web_script.Script.onDescriptionChanged(testScript, event)
self.assertTrue(result)
testScript.presentMessage.assert_not_called()
def test_document_description_change_appends_to_focus_presentation(self):
testScript = self._make_script()
source = object()
event = mock.Mock(source=source, any_data="Helpful tooltip")
with (
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", source),
mock.patch.object(web_script.AXObject, "get_name", return_value="Button"),
):
result = web_script.Script.onDescriptionChanged(testScript, event)
self.assertTrue(result)
testScript.presentMessage.assert_called_once_with("Helpful tooltip")
class WebDynamicContentRecoveryRegressionTests(unittest.TestCase):
def _make_dynamic_script(self):
testScript = web_script.Script.__new__(web_script.Script)
testScript._loadingDocumentContent = False
testScript._lastCommandWasCaretNav = False
testScript._lastCommandWasStructNav = False
testScript._lastMouseButtonContext = (None, -1)
testScript.lastMouseRoutingTime = 0
testScript.utilities = mock.Mock()
testScript.utilities.eventIsBrowserUINoise.return_value = False
testScript.utilities.isLiveRegion.return_value = False
testScript.utilities.getTopLevelDocumentForObject.return_value = "document"
testScript.utilities.isZombie.return_value = False
testScript.utilities.handleEventFromContextReplicant.return_value = False
testScript.utilities.handleEventForRemovedChild.return_value = False
testScript.utilities.inDocumentContent.return_value = True
return testScript
def test_children_added_to_live_focus_preserves_caret_context(self):
testScript = self._make_dynamic_script()
focus = object()
event = mock.Mock(source=focus, any_data=object())
with (
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", focus),
mock.patch.object(web_script.AXObject, "is_dead", return_value=False),
mock.patch.object(web_script.AXUtilities, "is_busy", return_value=False),
mock.patch.object(web_script.AXUtilities, "is_alert", return_value=False),
):
result = web_script.Script.onChildrenAdded(testScript, event)
self.assertFalse(result)
testScript.utilities.dumpCache.assert_called_once_with(
"document",
preserveContext=True,
)
def test_children_removed_from_live_focus_preserves_caret_context(self):
testScript = self._make_dynamic_script()
focus = object()
event = mock.Mock(source=focus, any_data=object())
with (
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", focus),
mock.patch.object(web_script.AXObject, "is_dead", return_value=False),
):
result = web_script.Script.onChildrenRemoved(testScript, event)
self.assertFalse(result)
testScript.utilities.dumpCache.assert_called_once_with(
"document",
preserveContext=True,
)
def test_focused_event_source_becomes_context_when_search_fails(self):
testScript = self._make_dynamic_script()
source = object()
oldFocus = object()
event = mock.Mock(detail1=1, source=source)
testScript._lastCommandWasCaretNav = True
testScript.utilities.getDocumentForObject.return_value = "document"
testScript.utilities.isWebAppDescendant.return_value = False
testScript.utilities.handleEventFromContextReplicant.return_value = False
testScript.utilities.getCaretContext.return_value = (None, -1)
testScript.utilities.searchForCaretContext.return_value = (None, -1)
testScript.utilities.inFindContainer.return_value = False
with (
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", oldFocus),
mock.patch.object(web_script.AXUtilities, "is_editable", return_value=False),
mock.patch.object(web_script.AXUtilities, "is_dialog_or_alert", return_value=False),
mock.patch.object(web_script.AXUtilities, "is_focusable", return_value=True),
mock.patch.object(web_script.AXUtilities, "is_focused", return_value=True),
mock.patch.object(web_script.cthulhu, "setLocusOfFocus") as setLocusOfFocus,
):
result = web_script.Script.onFocusedChanged(testScript, event)
self.assertTrue(result)
setLocusOfFocus.assert_called_once_with(event, source, False)
testScript.utilities.setCaretContext.assert_called_once_with(source, 0)
class WebContextReplicantRegressionTests(unittest.TestCase):
def test_same_object_replicant_can_recover_before_old_focus_is_dead(self):
utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities)
document = object()
parent = object()
oldFocus = object()
replicant = object()
event = mock.Mock()
utilities._caretContexts = {hash(parent): (oldFocus, 0)}
utilities.getCaretContextPathRoleAndName = mock.Mock(
return_value=([1, 2, 3], "button", "storm, microphone on")
)
utilities.documentFrame = mock.Mock(return_value=document)
utilities.isSameObject = mock.Mock(return_value=True)
utilities.setCaretContext = mock.Mock()
with (
mock.patch.object(web_script_utilities.cthulhu_state, "locusOfFocus", oldFocus),
mock.patch.object(web_script_utilities.AXObject, "is_dead", return_value=False),
mock.patch.object(web_script_utilities.AXObject, "get_path", return_value=[1, 2, 3]),
mock.patch.object(web_script_utilities.AXObject, "get_role", return_value="button"),
mock.patch.object(web_script_utilities.AXObject, "get_name", return_value="storm, microphone on"),
mock.patch.object(web_script_utilities.AXObject, "get_parent", return_value=parent),
mock.patch.object(web_script_utilities.cthulhu, "setLocusOfFocus") as setLocusOfFocus,
):
result = web_script_utilities.Utilities.handleEventFromContextReplicant(
utilities,
event,
replicant,
)
self.assertTrue(result)
setLocusOfFocus.assert_called_once_with(event, replicant, False)
utilities.setCaretContext.assert_called_once_with(replicant, 0, document)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
@@ -164,7 +496,7 @@ class WebSelectionRegressionTests(unittest.TestCase):
testScript.utilities.setCaretContext.assert_called_once_with(section, 1, document) testScript.utilities.setCaretContext.assert_called_once_with(section, 1, document)
setLocusOfFocus.assert_called_once_with(event, section, False, True) setLocusOfFocus.assert_called_once_with(event, section, False, True)
testScript.speakContents.assert_called_once_with([[section, 0, 1, "D"]]) testScript.speakContents.assert_called_once_with([[section, 0, 1, "D"]])
testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED, interrupt=False) testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED)
self.assertEqual( self.assertEqual(
testScript.pointOfReference["syntheticWebSelection"]["string"], testScript.pointOfReference["syntheticWebSelection"]["string"],
"D", "D",
@@ -200,7 +532,7 @@ class WebSelectionRegressionTests(unittest.TestCase):
testScript.utilities.setCaretContext.assert_called_once_with(link, 0, document) testScript.utilities.setCaretContext.assert_called_once_with(link, 0, document)
setLocusOfFocus.assert_called_once_with(event, link, False, True) setLocusOfFocus.assert_called_once_with(event, link, False, True)
testScript.speakContents.assert_called_once_with([[link, 0, 1, "S"]]) testScript.speakContents.assert_called_once_with([[link, 0, 1, "S"]])
testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED, interrupt=False) testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED)
self.assertEqual( self.assertEqual(
testScript.pointOfReference["syntheticWebSelection"]["string"], testScript.pointOfReference["syntheticWebSelection"]["string"],
"S", "S",