Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dfef847fb5 | |||
| b08f7095cb | |||
| 85d30d331a | |||
| bbef7473ee | |||
| f1ed2ce085 | |||
| 43996e7b8c | |||
| 6cd745dc57 | |||
| 6ecc775c6d |
@@ -24,6 +24,54 @@ This repository is a screen reader. Prioritize accessibility, correctness, and s
|
||||
- If repo and installed behavior differ, prefer rebuilding with `./build-local.sh` over patching the installed package directly.
|
||||
- Treat direct edits under `~/.local/.../cthulhu/` as an exception path that requires explicit user approval.
|
||||
|
||||
## Bug intake
|
||||
- Assume reporters may have little or no development experience. Ask for user-visible steps and guide them through logs or diagnostic commands without expecting them to identify the subsystem or use technical terminology.
|
||||
- Capture the application or browser, page URL when relevant, desktop/session type, exact steps, expected result, actual result, and whether the problem is reproducible elsewhere.
|
||||
- Separate keyboard focus from speech output: ask whether focus fails to move, moves but is not announced, or is reachable through another navigation method.
|
||||
- For web issues, determine whether the behavior is site-specific, browser-specific, or reproducible across similar controls before changing code.
|
||||
- When a log is needed, guide the reporter through launching the locally installed build with `~/.local/bin/cthulhu --debug-file /tmp/cthulhu.log --debug`, reproducing the problem once, exiting Cthulhu, and providing `/tmp/cthulhu.log` for review.
|
||||
- If a reporter still sees the same problem after a fix, verify that the refreshed build is actually installed and running before assuming the patch failed. Guide the reporter through the check rather than assuming they know how the local install works.
|
||||
|
||||
## Contribution workflow (including generated patches)
|
||||
- Assume contributors may use code-generation tools without understanding every changed line. Review the resulting code, not the contributor's confidence in it.
|
||||
- Start from a reproducible user-visible problem. Complete the relevant bug-intake steps before changing code.
|
||||
- Investigate the full confirmed behavior class before implementing a fix. For example, a stale-focus bug seen in one browser may also affect other applications or empty workspaces.
|
||||
- Prefer the smallest root-cause fix that covers the full confirmed behavior class. Do not add app-specific exceptions, desktop-specific branches, compatibility fallbacks, or broad refactors unless the evidence requires them.
|
||||
- Add or update automated regression tests for the general behavior and the originally reported workflow whenever practical.
|
||||
- Read every generated diff before committing. Remove unrelated rewrites, speculative cleanup, dead code, debug leftovers, and generated files that are not required by the fix.
|
||||
- Never commit a behavior change solely because it compiles or because an automated tool says it works. Verify the real user-facing workflow after rebuilding the installed copy.
|
||||
|
||||
## Verification checklist before commit
|
||||
Run the narrowest relevant checks first, then broaden testing based on the affected behavior:
|
||||
- For documentation-only changes, diff inspection is sufficient unless the edited documentation includes commands or generated content that need validation.
|
||||
|
||||
1. Inspect the diff:
|
||||
- `git diff --check`
|
||||
- `git status --short`
|
||||
- `git diff --stat`
|
||||
- `git diff -- <changed files>`
|
||||
2. Run syntax checks for each changed Python file:
|
||||
- `python -m py_compile <changed .py files>`
|
||||
3. Run focused automated regression tests:
|
||||
- `python -m unittest <relevant test modules>`
|
||||
- For shared input, focus, script lifecycle, settings, plugin loading, or installation changes, run `./test-local.sh` after the focused tests pass.
|
||||
4. For code or behavior changes, rebuild the local installed copy:
|
||||
- `./build-local.sh`
|
||||
5. For code or behavior changes, confirm the runtime import resolves to the refreshed local install:
|
||||
- `python - <<'PY'`
|
||||
- `import importlib.util`
|
||||
- `print(importlib.util.find_spec("cthulhu").origin)`
|
||||
- `PY`
|
||||
6. For code or behavior changes, reproduce the original user-visible workflow against the rebuilt copy.
|
||||
7. For focus, keyboard, or window-tracking changes, manually test closely related regressions:
|
||||
- Xorg and the user's active window manager or desktop.
|
||||
- Switching among browser content, terminal windows, GTK applications, dialogs, and empty workspaces.
|
||||
- Returning to the original application after each switch.
|
||||
- Cthulhu shortcuts, structural navigation, flat review, and any delegated key handling such as Fenrir in XTerm.
|
||||
- Both key press and key release behavior for modifiers, NumLock, and keypad keys when relevant.
|
||||
- Clean shutdown without crashing the browser or leaving grabs behind.
|
||||
8. Record what was tested, what could not be tested locally, and any remaining uncertainty in the commit message or review notes.
|
||||
|
||||
## Platform support stance
|
||||
- **critical** Robust Xorg support is required and is a merge gate for Cthulhu.
|
||||
- Wayland support is desirable, but it is secondary to keeping Xorg stable and usable.
|
||||
@@ -45,7 +93,7 @@ This repository is a screen reader. Prioritize accessibility, correctness, and s
|
||||
- Screen-reader-first UX: assume non-visual navigation.
|
||||
- No keyboard traps; Tab/Shift+Tab must move through all controls.
|
||||
- Use clear, complete labels (e.g., “Confirm Password”, not “Confirm”).
|
||||
- **No Speech-Dispatcher usage in GUI apps** (no `spd-say`); rely on the accessibility API.
|
||||
- Do not invoke `spd-say` or add direct speech output to GUI dialogs. Expose accessible UI state and let the active screen reader announce it.
|
||||
|
||||
### Python GTK (Gtk 3)
|
||||
- Associate labels with controls (mnemonics + buddy widget):
|
||||
@@ -54,11 +102,6 @@ This repository is a screen reader. Prioritize accessibility, correctness, and s
|
||||
- For `Gtk.TextView`, call `set_accepts_tab(False)` so Tab moves focus.
|
||||
- For custom widgets, set accessible name/role via ATK where applicable.
|
||||
|
||||
### PySide6 / Qt
|
||||
- Set `setAccessibleName()` on all widgets.
|
||||
- Associate labels via `setBuddy()`.
|
||||
- Use `setAccessibleDescription()` when extra context is needed.
|
||||
|
||||
## Shell script rules
|
||||
- **Bash variables must be `camelCase`** (except system env vars like `ACCESSIBILITY_ENABLED`).
|
||||
- If you edit a `#!/usr/bin/env bash`, `#!/bin/bash`, or POSIX `sh` script, run `shellcheck` and fix issues.
|
||||
@@ -67,13 +110,20 @@ This repository is a screen reader. Prioritize accessibility, correctness, and s
|
||||
## Plugins (Cthulhu)
|
||||
- System plugins live in `src/cthulhu/plugins/`.
|
||||
- Follow existing plugin patterns (keybinding registration, GTK dialog/window patterns, debug logging).
|
||||
- Ensure plugin install integration is wired via Meson (`src/cthulhu/plugins/meson.build` + plugin subdir `meson.build`).
|
||||
|
||||
## Meson install reminder (important)
|
||||
- If you add new Python modules under `src/cthulhu/`, update `src/cthulhu/meson.build` so they get installed (otherwise imports can fail after install).
|
||||
- If you add a new plugin directory, update `src/cthulhu/plugins/meson.build` and add a `meson.build` in the plugin directory.
|
||||
|
||||
## Settings policy (important)
|
||||
- Keep persistent settings in Cthulhu TOML files through the existing settings manager and TOML backend.
|
||||
- Do not add `gsettings`, `dconf`, or `Gio.Settings` dependencies or fallback paths.
|
||||
|
||||
## Dependency synchronization (important)
|
||||
- This repository does not use `requirements.txt`. When adding, removing, or changing a runtime, optional, or build dependency, keep the relevant dependency declarations and documentation synchronized.
|
||||
- Check `pyproject.toml`, `meson.build`, the dependency sections in `README.md` and `README-DEVELOPMENT.md`, and each applicable distro package recipe under `distro-packages/`.
|
||||
- In particular, update both Arch Linux `PKGBUILD` files and, when relevant, Slint's `cthulhu-info` and `README`. Review the Slackware and Slint build files for any corresponding build-dependency changes.
|
||||
- Preserve the distinction between required, optional, and build-only dependencies so package builds install what Cthulhu actually needs without forcing optional features on every user.
|
||||
|
||||
## Common Cthulhu agent mistakes
|
||||
- Checking the import origin, seeing `~/.local/...`, and then editing the installed package instead of the repo.
|
||||
- Forgetting that `./build-local.sh` is the normal way to apply repo changes into the installed copy for testing.
|
||||
- Making repo fixes and then diagnosing the old installed copy without rebuilding.
|
||||
- Do not edit or diagnose a stale installed copy under `~/.local/...`; update the repo and rebuild with `./build-local.sh`.
|
||||
|
||||
@@ -57,9 +57,9 @@ toolkit, OpenOffice/LibreOffice, Gecko, WebKitGtk, and KDE Qt toolkit.
|
||||
- **Preserves exit key**: Only sleep toggle remains active
|
||||
|
||||
### Self-Voicing
|
||||
- **Unix socket interface**: Direct speech output via `/tmp/cthulhu.sock`
|
||||
- **Unix socket interface**: Direct speech output via `${XDG_RUNTIME_DIR}/cthulhu.sock`
|
||||
- **External integration**: Other applications can speak through Cthulhu
|
||||
- **Simple protocol**: `echo "text" | socat - UNIX-CLIENT:/tmp/cthulhu.sock`
|
||||
- **Simple protocol**: `echo "text" | socat - UNIX-CLIENT:"${XDG_RUNTIME_DIR}/cthulhu.sock"`
|
||||
|
||||
## D-Bus Remote Controller
|
||||
|
||||
@@ -275,9 +275,9 @@ Cthulhu offers a mechanism through which messages may be spoken directly by the
|
||||
|
||||
```bash
|
||||
# Speak hello world.
|
||||
echo "Hello world." | socat - UNIX-CLIENT:/tmp/cthulhu.sock
|
||||
echo "Hello world." | socat - UNIX-CLIENT:"${XDG_RUNTIME_DIR}/cthulhu.sock"
|
||||
# Speak Hello world without interrupting the previous speech.
|
||||
echo "<!#APPEND#!>Hello world." | socat - UNIX-CLIENT:/tmp/cthulhu.sock
|
||||
# Make hello world persistant in Braille.
|
||||
echo "Hello world.<#APPEND#>" | socat - UNIX-CLIENT:/tmp/cthulhu.sock
|
||||
echo "<#APPEND#>Hello world." | socat - UNIX-CLIENT:"${XDG_RUNTIME_DIR}/cthulhu.sock"
|
||||
# Make hello world persistent in Braille.
|
||||
echo "Hello world.<#PERSISTENT#>" | socat - UNIX-CLIENT:"${XDG_RUNTIME_DIR}/cthulhu.sock"
|
||||
```
|
||||
|
||||
@@ -1787,6 +1787,35 @@
|
||||
<property name="top_attach">5</property>
|
||||
</packing>
|
||||
</child>
|
||||
<child>
|
||||
<object class="GtkLabel" id="hardwareDeviceLabel">
|
||||
<property name="visible">False</property>
|
||||
<property name="can_focus">False</property>
|
||||
<property name="xalign">1</property>
|
||||
<property name="label" translatable="yes">Serial _device:</property>
|
||||
<property name="use_underline">True</property>
|
||||
<property name="justify">right</property>
|
||||
<property name="mnemonic_widget">hardwareDeviceCombo</property>
|
||||
<accessibility>
|
||||
<relation type="label-for" target="hardwareDeviceCombo"/>
|
||||
</accessibility>
|
||||
</object>
|
||||
<packing>
|
||||
<property name="left_attach">0</property>
|
||||
<property name="top_attach">9</property>
|
||||
</packing>
|
||||
</child>
|
||||
<child>
|
||||
<object class="GtkComboBox" id="hardwareDeviceCombo">
|
||||
<property name="visible">False</property>
|
||||
<property name="can_focus">False</property>
|
||||
<signal name="changed" handler="hardwareDeviceChanged" swapped="no"/>
|
||||
</object>
|
||||
<packing>
|
||||
<property name="left_attach">1</property>
|
||||
<property name="top_attach">9</property>
|
||||
</packing>
|
||||
</child>
|
||||
</object>
|
||||
</child>
|
||||
</object>
|
||||
|
||||
@@ -168,6 +168,9 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
|
||||
self.speechFamiliesChoice = None
|
||||
self.speechFamiliesChoices = None
|
||||
self.speechFamiliesModel = None
|
||||
self.hardwareDeviceChoice = None
|
||||
self.hardwareDeviceChoices = None
|
||||
self.hardwareDeviceModel = None
|
||||
self.speechLanguagesChoice = None
|
||||
self.speechLanguagesChoices = None
|
||||
self.speechLanguagesModel = None
|
||||
@@ -405,6 +408,11 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
|
||||
self._initComboBox(self.get_widget("speechLanguages"))
|
||||
self.speechFamiliesModel = \
|
||||
self._initComboBox(self.get_widget("speechFamilies"))
|
||||
try:
|
||||
self.hardwareDeviceModel = \
|
||||
self._initComboBox(self.get_widget("hardwareDeviceCombo"))
|
||||
except AttributeError:
|
||||
self.hardwareDeviceModel = None
|
||||
self.echoSpeechServersModel = \
|
||||
self._initComboBox(self.get_widget("echoSpeechServers"))
|
||||
self.echoSpeechFamiliesModel = \
|
||||
@@ -611,7 +619,7 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
|
||||
for plugin_info in sorted(plugin_infos, key=lambda item: (item.get_name() or item.get_module_name()).lower()):
|
||||
plugin_name = plugin_info.get_module_name()
|
||||
canonical_name = plugin_info.get_canonical_name()
|
||||
if plugin_info.hidden or canonical_name == "PluginManager":
|
||||
if plugin_info.hidden:
|
||||
continue
|
||||
|
||||
self._plugin_canonical_map[plugin_name] = canonical_name
|
||||
@@ -1703,6 +1711,8 @@ class CthulhuSetupGUI(cthulhu_gtkbuilder.GtkBuilderWrapper):
|
||||
#
|
||||
self.initializingSpeech = True
|
||||
self._setupSpeechSystems(factories)
|
||||
self._setupHardwareDevice()
|
||||
self._updateHardwareDeviceVisibility()
|
||||
self.initializingSpeech = False
|
||||
|
||||
def _getSpeechDispatcherFactory(self):
|
||||
@@ -3847,6 +3857,121 @@ print(json.dumps(result))
|
||||
self.prefsDict["onlySpeakDisplayedText"] = enable
|
||||
self.get_widget("contextOptionsGrid").set_sensitive(not enable)
|
||||
|
||||
|
||||
def _scanSerialDevices(self):
|
||||
"""Scan for available serial devices and return a list of paths."""
|
||||
import glob
|
||||
devices = []
|
||||
patterns = [
|
||||
"/dev/ttyUSB*",
|
||||
"/dev/ttyACM*",
|
||||
"/dev/ttyS*",
|
||||
"/dev/ttyAMA*",
|
||||
"/dev/rfcomm*",
|
||||
"/dev/serial/by-id/*",
|
||||
]
|
||||
for pattern in patterns:
|
||||
devices.extend(glob.glob(pattern))
|
||||
devices = sorted(set(devices))
|
||||
return devices
|
||||
|
||||
def _setupHardwareDevice(self):
|
||||
"""Sets up the hardware device combo box with available serial ports.
|
||||
|
||||
Populates the combo with scanned serial devices and restores the
|
||||
previously saved selection if still available.
|
||||
"""
|
||||
if self.hardwareDeviceModel is None:
|
||||
return
|
||||
combobox = self.get_widget("hardwareDeviceCombo")
|
||||
combobox.set_model(None)
|
||||
self.hardwareDeviceModel.clear()
|
||||
self.hardwareDeviceChoices = []
|
||||
|
||||
devices = self._scanSerialDevices()
|
||||
saved_device = self.prefsDict.get("hardwareSpeechDevice",
|
||||
settings.hardwareSpeechDevice)
|
||||
|
||||
# Always include a "(none)" option so the user can clear the device
|
||||
self.hardwareDeviceChoices.append("")
|
||||
self.hardwareDeviceModel.append((0, "(none)"))
|
||||
i = 1
|
||||
for device in devices:
|
||||
self.hardwareDeviceChoices.append(device)
|
||||
self.hardwareDeviceModel.append((i, device))
|
||||
i += 1
|
||||
|
||||
# If the saved device is not in the scanned list but is non-empty,
|
||||
# append it so the user still sees their configured device.
|
||||
if saved_device and saved_device not in devices:
|
||||
self.hardwareDeviceChoices.append(saved_device)
|
||||
self.hardwareDeviceModel.append((i, saved_device))
|
||||
i += 1
|
||||
|
||||
combobox.set_model(self.hardwareDeviceModel)
|
||||
self._setHardwareDeviceChoice(saved_device)
|
||||
|
||||
@staticmethod
|
||||
def _isHardwareSpeechFactory(factory):
|
||||
moduleName = getattr(factory, "__name__", "")
|
||||
return moduleName.split(".")[-1] == "hardwarefactory"
|
||||
|
||||
def _setHardwareDeviceChoice(self, device_name):
|
||||
"""Set the active item in the hardware device combo box.
|
||||
|
||||
Arguments:
|
||||
- device_name: the device path to select.
|
||||
"""
|
||||
if not self.hardwareDeviceChoices:
|
||||
self.hardwareDeviceChoice = None
|
||||
return
|
||||
|
||||
for i, choice in enumerate(self.hardwareDeviceChoices):
|
||||
if choice == device_name:
|
||||
self.get_widget("hardwareDeviceCombo").set_active(i)
|
||||
self.hardwareDeviceChoice = choice
|
||||
return
|
||||
|
||||
self.get_widget("hardwareDeviceCombo").set_active(0)
|
||||
self.hardwareDeviceChoice = self.hardwareDeviceChoices[0]
|
||||
|
||||
def _updateHardwareDeviceVisibility(self):
|
||||
"""Show or hide the hardware device combo based on speech system.
|
||||
|
||||
The hardware device selector is only visible when the hardware
|
||||
speech synthesizer factory is active.
|
||||
"""
|
||||
if self.hardwareDeviceModel is None:
|
||||
return
|
||||
is_hardware = False
|
||||
if self.speechSystemsChoice:
|
||||
try:
|
||||
is_hardware = self._isHardwareSpeechFactory(self.speechSystemsChoice)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.get_widget("hardwareDeviceLabel").set_visible(is_hardware)
|
||||
self.get_widget("hardwareDeviceCombo").set_visible(is_hardware)
|
||||
|
||||
def hardwareDeviceChanged(self, widget):
|
||||
"""Signal handler for the hardware device combo box changed signal.
|
||||
|
||||
Arguments:
|
||||
- widget: the component that generated the signal.
|
||||
"""
|
||||
if self.initializingSpeech:
|
||||
return
|
||||
|
||||
selected_index = widget.get_active()
|
||||
if selected_index >= 0 and selected_index < len(self.hardwareDeviceChoices):
|
||||
self.hardwareDeviceChoice = self.hardwareDeviceChoices[selected_index]
|
||||
else:
|
||||
self.hardwareDeviceChoice = None
|
||||
|
||||
# Update runtime settings so the factory sees the new device
|
||||
if self.hardwareDeviceChoice is not None:
|
||||
settings.hardwareSpeechDevice = self.hardwareDeviceChoice
|
||||
|
||||
def speechSystemsChanged(self, widget):
|
||||
"""Signal handler for the "changed" signal for the speechSystems
|
||||
GtkComboBox widget. The user has selected a different speech
|
||||
@@ -3866,6 +3991,7 @@ print(json.dumps(result))
|
||||
self._setupSpeechServers()
|
||||
self._setupEchoSpeechServers()
|
||||
self._setEchoVoiceItems()
|
||||
self._updateHardwareDeviceVisibility()
|
||||
|
||||
def speechServersChanged(self, widget):
|
||||
"""Signal handler for the "changed" signal for the speechServers
|
||||
@@ -4927,6 +5053,16 @@ print(json.dumps(result))
|
||||
self.prefsDict["speechServerFactory"] = \
|
||||
self.speechSystemsChoice.__name__
|
||||
|
||||
# Save hardware speech device setting when hardware factory is active
|
||||
if self.speechSystemsChoice and \
|
||||
self._isHardwareSpeechFactory(self.speechSystemsChoice):
|
||||
if self.hardwareDeviceChoice is not None:
|
||||
self.prefsDict["hardwareSpeechDevice"] = self.hardwareDeviceChoice
|
||||
else:
|
||||
self.prefsDict["hardwareSpeechDevice"] = ""
|
||||
else:
|
||||
self.prefsDict["hardwareSpeechDevice"] = settings.hardwareSpeechDevice
|
||||
|
||||
speechServerChoice = self._getSpeechServerChoiceForSave()
|
||||
if speechServerChoice:
|
||||
self.prefsDict["speechServerInfo"] = \
|
||||
|
||||
@@ -870,6 +870,19 @@ SPEECH_DISPATCHER = _("Speech Dispatcher")
|
||||
# Translators: This label refers to the Piper neural text-to-speech system.
|
||||
# (https://github.com/rhasspy/piper)
|
||||
PIPER_TTS = _("Piper Neural TTS")
|
||||
# Translators: This label refers to external hardware serial speech synthesizers.
|
||||
HARDWARE_SPEECH = _("Hardware Speech Synthesizer")
|
||||
# Translators: This label refers to the LiteTalk hardware speech synthesizer.
|
||||
HARDWARE_LITETALK = _("LiteTalk")
|
||||
# Translators: This label refers to the DoubleTalk LT hardware speech synthesizer.
|
||||
HARDWARE_DOUBLETALK = _("DoubleTalk LT")
|
||||
# Translators: This label refers to the TripleTalk hardware speech synthesizer.
|
||||
HARDWARE_TRIPLETALK = _("TripleTalk")
|
||||
# Translators: This label refers to the Dectalk hardware synthesizer.
|
||||
HARDWARE_DECTALK = _("Dectalk")
|
||||
# Translators: This is the label for the combo box that lets the user choose
|
||||
# the serial device used by a hardware speech synthesizer.
|
||||
HARDWARE_SERIAL_DEVICE = _("Serial _device:")
|
||||
|
||||
# Translators: This is a label for a group of options related to Cthulhu's behavior
|
||||
# when presenting an application's spell check dialog.
|
||||
|
||||
@@ -0,0 +1,571 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Copyright (c) 2024 Stormux
|
||||
#
|
||||
# This library is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation; either
|
||||
# version 2.1 of the License, or (at your option) any later version.
|
||||
#
|
||||
# This library is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public
|
||||
# License along with this library; if not, write to the
|
||||
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
|
||||
# Boston MA 02110-1301 USA.
|
||||
#
|
||||
# Cthulhu project: https://git.stormux.org/storm/cthulhu
|
||||
|
||||
"""Provides a Cthulhu speech server for hardware serial synthesizers.
|
||||
|
||||
Ports Fenrir's hardware serial drivers (LiteTalk/DoubleTalk/TripleTalk,
|
||||
Dectalk) to Cthulhu's SpeechServer interface.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import termios
|
||||
import threading
|
||||
import tty
|
||||
from queue import Empty, Queue
|
||||
|
||||
from . import debug
|
||||
from . import guilabels
|
||||
from . import messages
|
||||
from . import settings
|
||||
from . import speechserver
|
||||
from .acss import ACSS
|
||||
|
||||
|
||||
class _SpeakQueue(Queue):
|
||||
"""Queue with a clear() method."""
|
||||
|
||||
def clear(self):
|
||||
try:
|
||||
while True:
|
||||
self.get_nowait()
|
||||
except Empty:
|
||||
pass
|
||||
|
||||
|
||||
class _HardwareSerialDriver:
|
||||
"""Base class for hardware serial speech synthesizers.
|
||||
|
||||
Ported from Fenrir's hardwareSerialDriver.py.
|
||||
"""
|
||||
|
||||
cancel_command = b""
|
||||
default_baud_rate = 9600
|
||||
|
||||
def __init__(self, device: str, baud_rate: int):
|
||||
self.device = device
|
||||
self.baud_rate = baud_rate
|
||||
self.serial_port: int | None = None
|
||||
self.text_queue = _SpeakQueue()
|
||||
self.lock = threading.Lock()
|
||||
self.worker_thread: threading.Thread | None = None
|
||||
self._stop_worker = False
|
||||
self._is_initialized = False
|
||||
|
||||
def initialize(self) -> bool:
|
||||
self._open_serial_port()
|
||||
self._is_initialized = self.serial_port is not None
|
||||
if self._is_initialized:
|
||||
self._stop_worker = False
|
||||
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
|
||||
self.worker_thread.start()
|
||||
return self._is_initialized
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self._stop_worker = True
|
||||
self.clear_buffer()
|
||||
self.text_queue.put(None)
|
||||
if self.worker_thread:
|
||||
self.worker_thread.join(timeout=0.5)
|
||||
self._close_serial_port()
|
||||
self._is_initialized = False
|
||||
|
||||
def speak(self, text: str, interrupt: bool = True) -> None:
|
||||
if not self._is_initialized:
|
||||
return
|
||||
if interrupt:
|
||||
self.stop()
|
||||
if not isinstance(text, str) or text == "":
|
||||
return
|
||||
self.text_queue.put(text)
|
||||
|
||||
def stop(self) -> None:
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self.clear_buffer()
|
||||
if self.cancel_command:
|
||||
self._write_bytes(self.cancel_command, "cancel")
|
||||
|
||||
def clear_buffer(self) -> None:
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self.text_queue.clear()
|
||||
|
||||
def set_rate(self, rate: float) -> None:
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self._write_bytes(self._rate_command(rate), "rate")
|
||||
|
||||
def set_pitch(self, pitch: float) -> None:
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self._write_bytes(self._pitch_command(pitch), "pitch")
|
||||
|
||||
def set_volume(self, volume: float) -> None:
|
||||
if not self._is_initialized:
|
||||
return
|
||||
self._write_bytes(self._volume_command(volume), "volume")
|
||||
|
||||
def _worker(self) -> None:
|
||||
while not self._stop_worker:
|
||||
text = self.text_queue.get()
|
||||
if text is None:
|
||||
return
|
||||
try:
|
||||
data = self._speak_bytes(text)
|
||||
self._write_bytes(data, "speech")
|
||||
except Exception as error:
|
||||
msg = f"HARDWARE SPEECH: worker failed: {error}"
|
||||
debug.printMessage(debug.LEVEL_ERROR, msg, True)
|
||||
|
||||
def _open_serial_port(self) -> None:
|
||||
if not self.device or self.device == "auto":
|
||||
msg = "HARDWARE SPEECH: requires an explicit serial device"
|
||||
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||
return
|
||||
|
||||
port = self._open_configured_serial_port(self.device)
|
||||
if port is not None:
|
||||
self._activate_serial_port(self.device, port)
|
||||
|
||||
def _open_configured_serial_port(self, device: str) -> int | None:
|
||||
port = None
|
||||
try:
|
||||
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
|
||||
tty.setraw(port)
|
||||
attrs = termios.tcgetattr(port)
|
||||
attrs[2] |= termios.CLOCAL | termios.CREAD
|
||||
baud_rate = self._termios_baud_rate(self.baud_rate)
|
||||
attrs[4] = baud_rate
|
||||
attrs[5] = baud_rate
|
||||
attrs[6][termios.VMIN] = 0
|
||||
attrs[6][termios.VTIME] = 0
|
||||
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
|
||||
termios.tcsetattr(port, termios.TCSANOW, attrs)
|
||||
return port
|
||||
except (OSError, termios.error) as error:
|
||||
self._close_port(port)
|
||||
msg = f"HARDWARE SPEECH: device open failed: {device}: {error}"
|
||||
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||
return None
|
||||
|
||||
def _activate_serial_port(self, device: str, port: int) -> None:
|
||||
self.serial_port = port
|
||||
self.device = device
|
||||
msg = f"HARDWARE SPEECH: device opened: {device}, baud_rate={self.baud_rate}"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
|
||||
def _close_serial_port(self) -> None:
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
self._close_port(self.serial_port)
|
||||
self.serial_port = None
|
||||
|
||||
def _close_port(self, port: int | None) -> None:
|
||||
if port is None:
|
||||
return
|
||||
try:
|
||||
os.close(port)
|
||||
except OSError as error:
|
||||
msg = f"HARDWARE SPEECH: device close failed: {error}"
|
||||
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||
|
||||
def _write_bytes(self, data: bytes, description: str = "data") -> None:
|
||||
if not data:
|
||||
return
|
||||
with self.lock:
|
||||
if self.serial_port is None:
|
||||
return
|
||||
try:
|
||||
total_written = 0
|
||||
while total_written < len(data):
|
||||
bytes_written = os.write(self.serial_port, data[total_written:])
|
||||
if bytes_written == 0:
|
||||
raise OSError("serial write returned 0 bytes")
|
||||
total_written += bytes_written
|
||||
preview = self._format_bytes_preview(data)
|
||||
msg = f"HARDWARE SPEECH: wrote {total_written} {description} bytes: {preview}"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
except OSError as error:
|
||||
msg = f"HARDWARE SPEECH: write failed: {error}"
|
||||
debug.printMessage(debug.LEVEL_ERROR, msg, True)
|
||||
|
||||
def _termios_baud_rate(self, baud_rate: int) -> int:
|
||||
baud_name = f"B{baud_rate}"
|
||||
if hasattr(termios, baud_name):
|
||||
return getattr(termios, baud_name)
|
||||
msg = f"HARDWARE SPEECH: unsupported baud rate {baud_rate}; using 9600"
|
||||
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||
return termios.B9600
|
||||
|
||||
@staticmethod
|
||||
def _clean_text(text: str) -> str:
|
||||
text = text.replace("\r", " ").replace("\n", " ")
|
||||
return "".join(char if 0x20 <= ord(char) <= 0x7E else " " for char in text)
|
||||
|
||||
@staticmethod
|
||||
def _scale(value: float, minimum: int, maximum: int) -> int:
|
||||
value = max(0.0, min(1.0, value))
|
||||
return int(round(minimum + value * (maximum - minimum)))
|
||||
|
||||
@staticmethod
|
||||
def _format_bytes_preview(data: bytes, limit: int = 32) -> str:
|
||||
preview = data[:limit]
|
||||
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
|
||||
ascii_preview = "".join(
|
||||
chr(byte) if 0x20 <= byte <= 0x7E else "." for byte in preview
|
||||
)
|
||||
suffix = "" if len(data) <= limit else " ..."
|
||||
return f"hex=[{hex_preview}{suffix}] ascii=[{ascii_preview}{suffix}]"
|
||||
|
||||
def _speak_bytes(self, text: str) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
def _rate_command(self, rate: float) -> bytes:
|
||||
return b""
|
||||
|
||||
def _pitch_command(self, pitch: float) -> bytes:
|
||||
return b""
|
||||
|
||||
def _volume_command(self, volume: float) -> bytes:
|
||||
return b""
|
||||
|
||||
|
||||
class _LiteTalkDriver(_HardwareSerialDriver):
|
||||
"""LiteTalk-compatible serial driver."""
|
||||
|
||||
cancel_command = b"\x18"
|
||||
|
||||
def _speak_bytes(self, text: str) -> bytes:
|
||||
return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
|
||||
|
||||
def _rate_command(self, rate: float) -> bytes:
|
||||
return self._setting_command(self._scale(rate, 0, 9), b"S")
|
||||
|
||||
def _pitch_command(self, pitch: float) -> bytes:
|
||||
return self._setting_command(self._scale(pitch, 0, 99), b"P")
|
||||
|
||||
def _volume_command(self, volume: float) -> bytes:
|
||||
return self._setting_command(self._scale(volume, 0, 9), b"V")
|
||||
|
||||
@staticmethod
|
||||
def _setting_command(value: int, command: bytes) -> bytes:
|
||||
return b"\x01" + str(value).encode("ascii") + command
|
||||
|
||||
|
||||
class _DectalkDriver(_HardwareSerialDriver):
|
||||
"""Dectalk serial driver."""
|
||||
|
||||
cancel_command = b"\x18"
|
||||
|
||||
def _speak_bytes(self, text: str) -> bytes:
|
||||
return self._clean_text(text).encode("ascii", errors="replace") + b"\x01"
|
||||
|
||||
def _rate_command(self, rate: float) -> bytes:
|
||||
return self._setting_command("ra", self._scale(rate, 75, 650))
|
||||
|
||||
def _pitch_command(self, pitch: float) -> bytes:
|
||||
return self._setting_command("dv ap", self._scale(pitch, 50, 180))
|
||||
|
||||
def _volume_command(self, volume: float) -> bytes:
|
||||
return self._setting_command("vo", self._scale(volume, 0, 100))
|
||||
|
||||
@staticmethod
|
||||
def _setting_command(command: str, value: int) -> bytes:
|
||||
return f"[:{command} {value}]".encode("ascii")
|
||||
|
||||
|
||||
_DRIVER_MAP: dict[str, type[_HardwareSerialDriver]] = {
|
||||
"litetalk": _LiteTalkDriver,
|
||||
"doubletalk": _LiteTalkDriver,
|
||||
"tripletalk": _LiteTalkDriver,
|
||||
"dectalk": _DectalkDriver,
|
||||
}
|
||||
|
||||
_SYNTH_DISPLAY_NAMES = {
|
||||
"litetalk": guilabels.HARDWARE_LITETALK,
|
||||
"doubletalk": guilabels.HARDWARE_DOUBLETALK,
|
||||
"tripletalk": guilabels.HARDWARE_TRIPLETALK,
|
||||
"dectalk": guilabels.HARDWARE_DECTALK,
|
||||
}
|
||||
|
||||
|
||||
class SpeechServer(speechserver.SpeechServer):
|
||||
"""Hardware serial speech server implementation for Cthulhu."""
|
||||
|
||||
_active_servers: dict[str, SpeechServer] = {}
|
||||
|
||||
@staticmethod
|
||||
def getFactoryName() -> str:
|
||||
"""Returns a localized name describing this factory."""
|
||||
return guilabels.HARDWARE_SPEECH
|
||||
|
||||
@staticmethod
|
||||
def getSpeechServers() -> list[SpeechServer]:
|
||||
"""Gets available speech servers as a list."""
|
||||
return [
|
||||
SpeechServer(server_id, initialize=False, register=False)
|
||||
for server_id in _DRIVER_MAP
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def _getSpeechServer(cls, server_id: str) -> SpeechServer | None:
|
||||
"""Return an active server for the given id."""
|
||||
active_server = cls._active_servers.get(server_id)
|
||||
if active_server is not None:
|
||||
if active_server._matches_current_settings():
|
||||
return active_server
|
||||
active_server.shutdown()
|
||||
|
||||
cls(server_id)
|
||||
return cls._active_servers.get(server_id)
|
||||
|
||||
@staticmethod
|
||||
def getSpeechServer(info: list[str] | None = None) -> SpeechServer | None:
|
||||
"""Gets a given SpeechServer based upon the info."""
|
||||
if info and len(info) >= 2:
|
||||
server_id = info[1]
|
||||
else:
|
||||
server_id = "litetalk"
|
||||
return SpeechServer._getSpeechServer(server_id)
|
||||
|
||||
@staticmethod
|
||||
def shutdownActiveServers() -> None:
|
||||
"""Cleans up and shuts down this factory."""
|
||||
servers = list(SpeechServer._active_servers.values())
|
||||
for server in servers:
|
||||
server.shutdown()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_id: str,
|
||||
initialize: bool = True,
|
||||
register: bool = True,
|
||||
):
|
||||
super().__init__()
|
||||
self._id = server_id
|
||||
self._driver: _HardwareSerialDriver | None = None
|
||||
self._info: list[str] = []
|
||||
self._device = ""
|
||||
self._baud_rate = settings.hardwareSpeechBaudRate
|
||||
|
||||
driver_class = _DRIVER_MAP.get(server_id)
|
||||
if driver_class is None:
|
||||
msg = f"HARDWARE SPEECH: unknown synth type: {server_id}"
|
||||
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||
return
|
||||
|
||||
display_name = _SYNTH_DISPLAY_NAMES.get(server_id, server_id)
|
||||
self._info = [display_name, server_id]
|
||||
|
||||
if not initialize:
|
||||
return
|
||||
|
||||
self._device = settings.hardwareSpeechDevice
|
||||
self._baud_rate = settings.hardwareSpeechBaudRate
|
||||
self._driver = driver_class(self._device, self._baud_rate)
|
||||
if self._driver.initialize():
|
||||
if register:
|
||||
SpeechServer._active_servers[server_id] = self
|
||||
msg = f"HARDWARE SPEECH: server initialized: {server_id} on {self._device}"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
else:
|
||||
msg = f"HARDWARE SPEECH: server initialization failed: {server_id}"
|
||||
debug.printMessage(debug.LEVEL_WARNING, msg, True)
|
||||
self._driver = None
|
||||
|
||||
def _matches_current_settings(self) -> bool:
|
||||
return (
|
||||
self._driver is not None
|
||||
and self._device == settings.hardwareSpeechDevice
|
||||
and self._baud_rate == settings.hardwareSpeechBaudRate
|
||||
)
|
||||
|
||||
def getInfo(self) -> list[str]:
|
||||
"""Returns [name, id]."""
|
||||
return self._info
|
||||
|
||||
def getVoiceFamilies(self) -> list[dict[str, str]]:
|
||||
"""Returns a list of VoiceFamily instances."""
|
||||
return []
|
||||
|
||||
def speakCharacter(self, character: str, acss: dict | None = None) -> None:
|
||||
"""Speaks a single character immediately."""
|
||||
if self._driver:
|
||||
self._apply_acss(acss)
|
||||
self._driver.speak(character, interrupt=True)
|
||||
|
||||
def speakKeyEvent(self, event, acss: dict | None = None) -> None:
|
||||
"""Speaks a key event immediately."""
|
||||
event_string = event.getKeyName()
|
||||
locking_state_string = event.getLockingStateString()
|
||||
text = f"{event_string} {locking_state_string}".strip()
|
||||
self.speak(text, acss=acss)
|
||||
|
||||
def speak(
|
||||
self,
|
||||
text: str | None = None,
|
||||
acss: dict | None = None,
|
||||
interrupt: bool = True,
|
||||
) -> None:
|
||||
"""Speaks all queued text immediately."""
|
||||
if not self._driver or text is None:
|
||||
return
|
||||
self._apply_acss(acss)
|
||||
self._driver.speak(text, interrupt=interrupt)
|
||||
|
||||
def sayAll(self, utteranceIterator, progressCallback) -> None:
|
||||
"""Iterates through the given utteranceIterator, speaking each utterance."""
|
||||
for context, acss in utteranceIterator:
|
||||
self.speak(context.utterance, acss=acss, interrupt=False)
|
||||
|
||||
def increaseSpeechRate(self, step: int = 5) -> None:
|
||||
self._change_default_speech_rate(step)
|
||||
|
||||
def decreaseSpeechRate(self, step: int = 5) -> None:
|
||||
self._change_default_speech_rate(step, decrease=True)
|
||||
|
||||
def increaseSpeechPitch(self, step: float = 0.5) -> None:
|
||||
self._change_default_speech_pitch(step)
|
||||
|
||||
def decreaseSpeechPitch(self, step: float = 0.5) -> None:
|
||||
self._change_default_speech_pitch(step, decrease=True)
|
||||
|
||||
def increaseSpeechVolume(self, step: float = 0.5) -> None:
|
||||
self._change_default_speech_volume(step)
|
||||
|
||||
def decreaseSpeechVolume(self, step: float = 0.5) -> None:
|
||||
self._change_default_speech_volume(step, decrease=True)
|
||||
|
||||
def updateCapitalizationStyle(self) -> None:
|
||||
pass
|
||||
|
||||
def updatePunctuationLevel(self) -> None:
|
||||
pass
|
||||
|
||||
def stop(self) -> None:
|
||||
if self._driver:
|
||||
self._driver.stop()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if self._driver:
|
||||
self._driver.shutdown()
|
||||
self._driver = None
|
||||
if self._id in SpeechServer._active_servers:
|
||||
del SpeechServer._active_servers[self._id]
|
||||
|
||||
def reset(self, text: str | None = None, acss: dict | None = None) -> None:
|
||||
if self._driver:
|
||||
self._driver.shutdown()
|
||||
self._driver = None
|
||||
|
||||
driver_class = _DRIVER_MAP.get(self._id)
|
||||
if driver_class is None:
|
||||
return
|
||||
|
||||
self._device = settings.hardwareSpeechDevice
|
||||
self._baud_rate = settings.hardwareSpeechBaudRate
|
||||
self._driver = driver_class(self._device, self._baud_rate)
|
||||
if not self._driver.initialize():
|
||||
self._driver = None
|
||||
|
||||
def _apply_acss(self, acss: dict | None) -> None:
|
||||
if not self._driver or not acss:
|
||||
return
|
||||
try:
|
||||
rate = acss.get(ACSS.RATE)
|
||||
if rate is not None:
|
||||
normalized = max(0.0, min(99.0, float(rate))) / 99.0
|
||||
self._driver.set_rate(normalized)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
pitch = acss.get(ACSS.AVERAGE_PITCH)
|
||||
if pitch is not None:
|
||||
normalized = max(0.0, min(9.0, float(pitch))) / 9.0
|
||||
self._driver.set_pitch(normalized)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
volume = acss.get(ACSS.GAIN)
|
||||
if volume is not None:
|
||||
normalized = max(0.0, min(9.0, float(volume))) / 9.0
|
||||
self._driver.set_volume(normalized)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _change_default_speech_rate(self, step: float, decrease: bool = False) -> None:
|
||||
acss = settings.voices[settings.DEFAULT_VOICE]
|
||||
delta = step * (-1 if decrease else 1)
|
||||
try:
|
||||
rate = acss[ACSS.RATE]
|
||||
except KeyError:
|
||||
rate = 50.0
|
||||
acss[ACSS.RATE] = max(0, min(99, rate + delta))
|
||||
msg = f"HARDWARE SPEECH: rate set to {acss[ACSS.RATE]}"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
if self._driver:
|
||||
normalized = acss[ACSS.RATE] / 99.0
|
||||
self._driver.set_rate(normalized)
|
||||
self.speak(
|
||||
messages.SPEECH_SLOWER if decrease else messages.SPEECH_FASTER,
|
||||
acss=acss
|
||||
)
|
||||
|
||||
def _change_default_speech_pitch(self, step: float, decrease: bool = False) -> None:
|
||||
acss = settings.voices[settings.DEFAULT_VOICE]
|
||||
delta = step * (-1 if decrease else 1)
|
||||
try:
|
||||
pitch = acss[ACSS.AVERAGE_PITCH]
|
||||
except KeyError:
|
||||
pitch = 5.0
|
||||
acss[ACSS.AVERAGE_PITCH] = max(0, min(9, pitch + delta))
|
||||
msg = f"HARDWARE SPEECH: pitch set to {acss[ACSS.AVERAGE_PITCH]}"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
if self._driver:
|
||||
normalized = acss[ACSS.AVERAGE_PITCH] / 9.0
|
||||
self._driver.set_pitch(normalized)
|
||||
self.speak(
|
||||
messages.SPEECH_LOWER if decrease else messages.SPEECH_HIGHER,
|
||||
acss=acss
|
||||
)
|
||||
|
||||
def _change_default_speech_volume(self, step: float, decrease: bool = False) -> None:
|
||||
acss = settings.voices[settings.DEFAULT_VOICE]
|
||||
delta = step * (-1 if decrease else 1)
|
||||
try:
|
||||
volume = acss[ACSS.GAIN]
|
||||
except KeyError:
|
||||
volume = 10.0
|
||||
acss[ACSS.GAIN] = max(0, min(9, volume + delta))
|
||||
msg = f"HARDWARE SPEECH: volume set to {acss[ACSS.GAIN]}"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
if self._driver:
|
||||
normalized = acss[ACSS.GAIN] / 9.0
|
||||
self._driver.set_volume(normalized)
|
||||
self.speak(
|
||||
messages.SPEECH_SOFTER if decrease else messages.SPEECH_LOUDER,
|
||||
acss=acss
|
||||
)
|
||||
@@ -477,28 +477,49 @@ class InputEventManager:
|
||||
return None
|
||||
|
||||
self._log_active_x11_window_for_xterm_check(window)
|
||||
sawIdentifier = False
|
||||
for attrName in ("get_class_group_name", "get_class_instance_name", "get_name"):
|
||||
if self._identifier_is_xterm(self._safe_call(window, attrName)):
|
||||
value = self._safe_call(window, attrName)
|
||||
if self._identifier_is_xterm(value):
|
||||
return True
|
||||
if isinstance(value, str) and value.strip():
|
||||
sawIdentifier = True
|
||||
|
||||
classGroup = self._safe_call(window, "get_class_group")
|
||||
if classGroup is not None:
|
||||
|
||||
for attrName in ("get_name", "get_res_class"):
|
||||
if self._identifier_is_xterm(self._safe_call(classGroup, attrName)):
|
||||
value = self._safe_call(classGroup, attrName)
|
||||
if self._identifier_is_xterm(value):
|
||||
return True
|
||||
if isinstance(value, str) and value.strip():
|
||||
sawIdentifier = True
|
||||
|
||||
try:
|
||||
pid = int(window.get_pid())
|
||||
except Exception:
|
||||
pid = -1
|
||||
if pid < 1:
|
||||
if not sawIdentifier:
|
||||
msg = (
|
||||
"INPUT EVENT MANAGER: XTerm matcher cannot identify active X11 window; "
|
||||
"no usable identifiers or pid."
|
||||
)
|
||||
debug.print_message(debug.LEVEL_INFO, msg, True)
|
||||
return None
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(f"/proc/{pid}/cmdline", "rb") as cmdlineFile:
|
||||
executable = cmdlineFile.read().split(b"\0", 1)[0].decode(errors="ignore")
|
||||
except OSError:
|
||||
if not sawIdentifier:
|
||||
msg = (
|
||||
"INPUT EVENT MANAGER: XTerm matcher cannot identify active X11 window; "
|
||||
"pid disappeared before cmdline lookup."
|
||||
)
|
||||
debug.print_message(debug.LEVEL_INFO, msg, True)
|
||||
return None
|
||||
return False
|
||||
|
||||
return self._identifier_is_xterm(executable)
|
||||
@@ -590,6 +611,20 @@ class InputEventManager:
|
||||
|
||||
return result
|
||||
|
||||
def _clear_stale_atspi_context(self, manager: Any) -> None:
|
||||
"""Clears cached input context after X11 positively contradicts AT-SPI."""
|
||||
|
||||
reason = "active X11 window not found in AT-SPI"
|
||||
msg = (
|
||||
"INPUT EVENT MANAGER: Clearing stale AT-SPI focus context; "
|
||||
"X11 focus moved to an untracked window."
|
||||
)
|
||||
debug.print_message(debug.LEVEL_INFO, msg, True)
|
||||
script_manager.get_manager().set_active_script(None, reason)
|
||||
manager.clear_state(reason)
|
||||
self._last_input_event = None
|
||||
self._last_non_modifier_key_event = None
|
||||
|
||||
@staticmethod
|
||||
def _get_active_script_app() -> Optional[Atspi.Accessible]:
|
||||
"""Returns the app for the active script, if one is available."""
|
||||
@@ -621,15 +656,15 @@ class InputEventManager:
|
||||
) -> bool:
|
||||
"""Returns True when XTerm is active and Cthulhu lacks matching AT-SPI context."""
|
||||
|
||||
if pendingFocus is not None:
|
||||
msg = "INPUT EVENT MANAGER: XTerm matcher false; pending focus exists."
|
||||
debug.print_message(debug.LEVEL_INFO, msg, True)
|
||||
return False
|
||||
match = self._active_x11_window_xterm_match()
|
||||
tokens = ["INPUT EVENT MANAGER: XTerm matcher returned", match]
|
||||
debug.print_tokens(debug.LEVEL_INFO, tokens, True)
|
||||
if match is True:
|
||||
return True
|
||||
if pendingFocus is not None:
|
||||
msg = "INPUT EVENT MANAGER: XTerm matcher false; pending focus exists."
|
||||
debug.print_message(debug.LEVEL_INFO, msg, True)
|
||||
return False
|
||||
if match is None and self._scriptWithSuspendedGrabsForXterm is not None:
|
||||
msg = (
|
||||
"INPUT EVENT MANAGER: Keeping XTerm key-grab suspension; "
|
||||
@@ -743,6 +778,11 @@ class InputEventManager:
|
||||
manager.set_active_window(window)
|
||||
else:
|
||||
focus_window = self._get_top_level_window(pendingFocus or manager.get_locus_of_focus())
|
||||
staleContext = focus_window or window or self._get_active_script_app()
|
||||
if self._active_x11_window_differs_from(staleContext):
|
||||
self._clear_stale_atspi_context(manager)
|
||||
return False
|
||||
|
||||
if focus_window is not None:
|
||||
window = focus_window
|
||||
tokens = [
|
||||
@@ -755,15 +795,6 @@ class InputEventManager:
|
||||
# One example: Brave's popup menus live in frames which lack the active
|
||||
# state. Failing to revalidate the window on a key press is inconclusive;
|
||||
# do not wipe out the last known window and focus state.
|
||||
focus = pendingFocus or manager.get_locus_of_focus()
|
||||
staleContext = window or self._get_active_script_app()
|
||||
if self._active_x11_window_differs_from(staleContext):
|
||||
msg = (
|
||||
"INPUT EVENT MANAGER: X11 focus moved to an untracked window; "
|
||||
"preserving Cthulhu key handling."
|
||||
)
|
||||
debug.print_message(debug.LEVEL_INFO, msg, True)
|
||||
|
||||
tokens = [
|
||||
"WARNING:",
|
||||
window,
|
||||
|
||||
@@ -101,6 +101,7 @@ cthulhu_python_sources = files([
|
||||
'speech.py',
|
||||
'spellcheck.py',
|
||||
'speechdispatcherfactory.py',
|
||||
'hardwarefactory.py',
|
||||
'speech_generator.py',
|
||||
'speechserver.py',
|
||||
'piperfactory.py',
|
||||
|
||||
@@ -300,19 +300,21 @@ class SpeechServer(speechserver.SpeechServer):
|
||||
return voiceInfo.sampleRate if voiceInfo else None
|
||||
|
||||
def _mapRate(self, acssRate):
|
||||
"""Map ACSS rate (0-99) to Piper length_scale.
|
||||
"""Map ACSS rate (0-100) to Piper length_scale.
|
||||
|
||||
ACSS rate 50 (default) = length_scale 1.0
|
||||
Higher ACSS rate = lower length_scale (faster)
|
||||
Lower ACSS rate = higher length_scale (slower)
|
||||
|
||||
Arguments:
|
||||
- acssRate: Rate value from 0-99
|
||||
- acssRate: Rate value from 0-100
|
||||
"""
|
||||
rate = acssRate if acssRate is not None else 50
|
||||
rate = max(0, min(99, rate))
|
||||
lengthScale = 2.0 - (rate / 99.0) * 1.5
|
||||
return max(0.5, min(2.0, lengthScale))
|
||||
rate = max(0.0, min(100.0, float(rate)))
|
||||
if rate <= 50.0:
|
||||
return 2.0 - (rate / 50.0)
|
||||
|
||||
return 1.0 - ((rate - 50.0) / 50.0) * 0.75
|
||||
|
||||
def _mapPitch(self, acssPitch):
|
||||
"""Map ACSS pitch (0-9) to pitch adjustment factor.
|
||||
@@ -614,7 +616,7 @@ class SpeechServer(speechserver.SpeechServer):
|
||||
rate = acss[ACSS.RATE]
|
||||
except KeyError:
|
||||
rate = 50
|
||||
acss[ACSS.RATE] = max(0, min(99, rate + delta))
|
||||
acss[ACSS.RATE] = max(0, min(100, rate + delta))
|
||||
msg = f"PIPER: Rate set to {acss[ACSS.RATE]}"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
self.speak(
|
||||
|
||||
@@ -36,6 +36,10 @@ LEGACY_PLUGIN_NAME_ALIASES: Dict[str, str] = {
|
||||
"ocrdesktop": "OCR",
|
||||
}
|
||||
|
||||
OBSOLETE_PLUGIN_NAMES = {
|
||||
"pluginmanager",
|
||||
}
|
||||
|
||||
LEGACY_PLUGIN_DIR_ALIASES: Dict[str, str] = {
|
||||
"OCRDesktop": "OCR",
|
||||
}
|
||||
@@ -721,6 +725,9 @@ class PluginSystemManager:
|
||||
if not name:
|
||||
continue
|
||||
name_lower = name.lower()
|
||||
if name_lower in OBSOLETE_PLUGIN_NAMES:
|
||||
logger.info(f"Dropping obsolete plugin name {name}")
|
||||
continue
|
||||
alias = LEGACY_PLUGIN_NAME_ALIASES.get(name_lower)
|
||||
if alias:
|
||||
logger.info(f"Mapping legacy plugin name {name} to {alias}")
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Copyright (c) 2025 Stormux
|
||||
#
|
||||
# This library is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation; either
|
||||
# version 2.1 of the License, or (at your option) any later version.
|
||||
|
||||
"""PluginManager plugin package."""
|
||||
|
||||
from .plugin import PluginManager
|
||||
|
||||
__all__ = ['PluginManager']
|
||||
@@ -1,14 +0,0 @@
|
||||
pluginmanager_python_sources = files([
|
||||
'__init__.py',
|
||||
'plugin.py'
|
||||
])
|
||||
|
||||
python3.install_sources(
|
||||
pluginmanager_python_sources,
|
||||
subdir: 'cthulhu/plugins/PluginManager'
|
||||
)
|
||||
|
||||
install_data(
|
||||
'plugin.info',
|
||||
install_dir: python3.get_install_dir() / 'cthulhu' / 'plugins' / 'PluginManager'
|
||||
)
|
||||
@@ -1,8 +0,0 @@
|
||||
name = PluginManager
|
||||
version = 1.0.0
|
||||
description = GUI interface for managing Cthulhu plugins - enable/disable plugins with checkboxes
|
||||
authors = Stormux <storm_dragon@stormux.org>
|
||||
website = https://git.stormux.org/storm/cthulhu
|
||||
copyright = Copyright 2025
|
||||
builtin = false
|
||||
hidden = false
|
||||
@@ -1,505 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Copyright (c) 2025 Stormux
|
||||
#
|
||||
# This library is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation; either
|
||||
# version 2.1 of the License, or (at your option) any later version.
|
||||
|
||||
"""PluginManager plugin for Cthulhu - GUI interface for managing plugins."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import configparser
|
||||
from pathlib import Path
|
||||
|
||||
import gi
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk, Gdk, Pango
|
||||
|
||||
from cthulhu.plugin import Plugin, cthulhu_hookimpl
|
||||
from cthulhu import cthulhu
|
||||
from cthulhu import debug
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PluginManager(Plugin):
|
||||
"""Plugin that provides a GUI interface for managing other plugins."""
|
||||
|
||||
PLUGIN_COL_ENABLED = 0
|
||||
PLUGIN_COL_DISPLAY = 1
|
||||
PLUGIN_COL_CAN_TOGGLE = 2
|
||||
PLUGIN_COL_NAME = 3
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Initialize the PluginManager plugin."""
|
||||
super().__init__(*args, **kwargs)
|
||||
self._kb_binding = None
|
||||
self._dialog = None
|
||||
self._plugin_treeview = None
|
||||
self._plugin_model = None
|
||||
self._activated = False
|
||||
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Plugin initialized", True)
|
||||
|
||||
@cthulhu_hookimpl
|
||||
def activate(self, plugin=None):
|
||||
"""Activate the PluginManager plugin."""
|
||||
if plugin is not None and plugin is not self:
|
||||
return
|
||||
|
||||
# Prevent duplicate activation
|
||||
if self._activated:
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Already activated, skipping", True)
|
||||
return
|
||||
|
||||
try:
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Plugin activation starting", True)
|
||||
|
||||
self._activated = True
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Plugin activated successfully", True)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: ERROR activating plugin: {e}", True)
|
||||
return False
|
||||
|
||||
@cthulhu_hookimpl
|
||||
def deactivate(self, plugin=None):
|
||||
"""Deactivate the PluginManager plugin."""
|
||||
if plugin is not None and plugin is not self:
|
||||
return
|
||||
|
||||
try:
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Plugin deactivation starting", True)
|
||||
|
||||
# Close dialog if open
|
||||
if self._dialog:
|
||||
self._dialog.destroy()
|
||||
self._dialog = None
|
||||
|
||||
self._activated = False
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Plugin deactivated successfully", True)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: ERROR deactivating plugin: {e}", True)
|
||||
return False
|
||||
|
||||
def _register_keybinding(self):
|
||||
"""Register the Cthulhu+Shift+P keybinding for opening plugin manager."""
|
||||
try:
|
||||
if not self.app:
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: ERROR - No app reference available for keybinding", True)
|
||||
return
|
||||
|
||||
# Register Cthulhu+Shift+P keybinding
|
||||
gesture_string = "kb:cthulhu+shift+p"
|
||||
description = "Open plugin manager"
|
||||
|
||||
self._kb_binding = self.registerGestureByString(
|
||||
self._open_plugin_manager,
|
||||
description,
|
||||
gesture_string
|
||||
)
|
||||
|
||||
if self._kb_binding:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Registered keybinding {gesture_string}", True)
|
||||
else:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: ERROR - Failed to register keybinding {gesture_string}", True)
|
||||
|
||||
except Exception as e:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: ERROR registering keybinding: {e}", True)
|
||||
|
||||
def _open_plugin_manager(self, script, inputEvent=None):
|
||||
"""Open the plugin manager dialog."""
|
||||
try:
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Opening plugin manager dialog", True)
|
||||
|
||||
# Close existing dialog if open
|
||||
if self._dialog:
|
||||
self._dialog.destroy()
|
||||
|
||||
# Create new dialog
|
||||
self._create_dialog()
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PluginManager: Error opening plugin manager: {e}")
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Error opening dialog: {e}", True)
|
||||
return False
|
||||
|
||||
def _create_dialog(self):
|
||||
"""Create and show the plugin manager dialog."""
|
||||
try:
|
||||
# Create dialog window
|
||||
self._dialog = Gtk.Dialog(
|
||||
title="Cthulhu Plugin Manager",
|
||||
parent=None,
|
||||
flags=Gtk.DialogFlags.MODAL | Gtk.DialogFlags.DESTROY_WITH_PARENT
|
||||
)
|
||||
|
||||
# Set dialog properties
|
||||
self._dialog.set_default_size(400, 300)
|
||||
self._dialog.set_border_width(10)
|
||||
|
||||
# Add buttons
|
||||
self._dialog.add_button("Close", Gtk.ResponseType.CLOSE)
|
||||
|
||||
# Create main content area
|
||||
content_area = self._dialog.get_content_area()
|
||||
|
||||
# Add title label
|
||||
title_label = Gtk.Label()
|
||||
title_label.set_markup("<b>Available Plugins</b>")
|
||||
title_label.set_halign(Gtk.Align.CENTER)
|
||||
content_area.pack_start(title_label, False, False, 10)
|
||||
|
||||
# Add info label about PluginManager
|
||||
info_label = Gtk.Label()
|
||||
info_label.set_markup("<i>Note:To apply changes, restart Cthulhu</i>")
|
||||
info_label.set_halign(Gtk.Align.CENTER)
|
||||
content_area.pack_start(info_label, False, False, 5)
|
||||
|
||||
# Create scrolled window for plugin list
|
||||
scrolled = Gtk.ScrolledWindow()
|
||||
scrolled.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
|
||||
scrolled.set_size_request(-1, 200)
|
||||
|
||||
self._plugin_model = Gtk.TreeStore(bool, str, bool, str)
|
||||
self._plugin_treeview = Gtk.TreeView(model=self._plugin_model)
|
||||
self._plugin_treeview.set_headers_visible(True)
|
||||
self._plugin_treeview.set_enable_search(False)
|
||||
self._plugin_treeview.connect("key-press-event", self._on_plugin_list_key_press)
|
||||
self._plugin_treeview.connect("row-activated", self._on_plugin_row_activated)
|
||||
|
||||
toggle_renderer = Gtk.CellRendererToggle()
|
||||
toggle_renderer.set_activatable(True)
|
||||
toggle_renderer.connect("toggled", self._on_plugin_toggled)
|
||||
toggle_column = Gtk.TreeViewColumn(
|
||||
"Enabled",
|
||||
toggle_renderer,
|
||||
active=self.PLUGIN_COL_ENABLED,
|
||||
activatable=self.PLUGIN_COL_CAN_TOGGLE
|
||||
)
|
||||
toggle_column.add_attribute(toggle_renderer, "visible", self.PLUGIN_COL_CAN_TOGGLE)
|
||||
self._plugin_treeview.append_column(toggle_column)
|
||||
|
||||
text_renderer = Gtk.CellRendererText()
|
||||
text_renderer.set_property("ellipsize", Pango.EllipsizeMode.END)
|
||||
text_column = Gtk.TreeViewColumn(
|
||||
"Plugin",
|
||||
text_renderer,
|
||||
text=self.PLUGIN_COL_DISPLAY
|
||||
)
|
||||
text_column.set_expand(True)
|
||||
self._plugin_treeview.append_column(text_column)
|
||||
|
||||
scrolled.add(self._plugin_treeview)
|
||||
|
||||
content_area.pack_start(scrolled, True, True, 0)
|
||||
|
||||
# Populate plugin list
|
||||
self._populate_plugin_list()
|
||||
|
||||
# Connect signals
|
||||
self._dialog.connect("response", self._on_dialog_response)
|
||||
|
||||
# Show dialog
|
||||
self._dialog.show_all()
|
||||
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Dialog created and shown", True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PluginManager: Error creating dialog: {e}")
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Error creating dialog: {e}", True)
|
||||
|
||||
def _populate_plugin_list(self):
|
||||
"""Populate the plugin list with available plugins."""
|
||||
try:
|
||||
# Get available plugins
|
||||
available_plugins = self._discover_plugins()
|
||||
|
||||
# Get currently active plugins
|
||||
active_plugins = cthulhu.cthulhuApp.settingsManager.getSetting('activePlugins') or []
|
||||
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Found {len(available_plugins)} plugins", True)
|
||||
|
||||
# Clear existing model rows
|
||||
self._plugin_model.clear()
|
||||
|
||||
# Add each plugin as a checkbox (except PluginManager itself)
|
||||
enabled_iter = self._plugin_model.append(
|
||||
None,
|
||||
[False, "Enabled plugins", False, ""]
|
||||
)
|
||||
disabled_iter = self._plugin_model.append(
|
||||
None,
|
||||
[False, "Disabled plugins", False, ""]
|
||||
)
|
||||
|
||||
for plugin_name, plugin_info in sorted(available_plugins.items()):
|
||||
# Skip PluginManager to prevent users from disabling plugin management
|
||||
if plugin_name == "PluginManager":
|
||||
continue
|
||||
|
||||
display_name = plugin_info.get('name', plugin_name)
|
||||
display_text = display_name
|
||||
description = plugin_info.get('description')
|
||||
if description:
|
||||
display_text += f" - {description}"
|
||||
version = plugin_info.get('version')
|
||||
if version:
|
||||
display_text += f" (v{version})"
|
||||
|
||||
is_active = plugin_name in active_plugins
|
||||
parent_iter = enabled_iter if is_active else disabled_iter
|
||||
self._plugin_model.append(
|
||||
parent_iter,
|
||||
[is_active, display_text, True, plugin_name]
|
||||
)
|
||||
|
||||
self._plugin_treeview.collapse_all()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PluginManager: Error populating plugin list: {e}")
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Error populating list: {e}", True)
|
||||
|
||||
def _discover_plugins(self):
|
||||
"""Discover all available plugins."""
|
||||
plugins = {}
|
||||
|
||||
try:
|
||||
# Get plugin system manager
|
||||
from cthulhu import plugin_system_manager
|
||||
|
||||
# Use existing plugin manager to get plugins
|
||||
manager = plugin_system_manager.getManager()
|
||||
if manager:
|
||||
manager.rescanPlugins()
|
||||
|
||||
for plugin_info in manager.plugins:
|
||||
plugin_name = plugin_info.get_module_name()
|
||||
plugins[plugin_name] = {
|
||||
'name': plugin_info.get_name(),
|
||||
'description': plugin_info.get_description(),
|
||||
'version': plugin_info.get_version(),
|
||||
'path': plugin_info.get_module_dir()
|
||||
}
|
||||
else:
|
||||
# Fallback: manually scan plugin directories
|
||||
plugins = self._manual_plugin_discovery()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PluginManager: Error in plugin discovery: {e}")
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Plugin discovery error: {e}", True)
|
||||
# Fallback to manual discovery
|
||||
plugins = self._manual_plugin_discovery()
|
||||
|
||||
return plugins
|
||||
|
||||
def _manual_plugin_discovery(self):
|
||||
"""Manually discover plugins by scanning directories."""
|
||||
plugins = {}
|
||||
|
||||
try:
|
||||
# Get plugin directories
|
||||
from cthulhu.plugin_system_manager import PluginType
|
||||
|
||||
system_dir = PluginType.SYSTEM.get_root_dir()
|
||||
user_dir = PluginType.USER.get_root_dir()
|
||||
|
||||
for plugin_dir in [system_dir, user_dir]:
|
||||
if os.path.exists(plugin_dir):
|
||||
self._scan_plugin_directory(plugin_dir, plugins)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PluginManager: Error in manual discovery: {e}")
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Manual discovery error: {e}", True)
|
||||
|
||||
return plugins
|
||||
|
||||
def _scan_plugin_directory(self, directory, plugins):
|
||||
"""Scan a directory for plugins."""
|
||||
try:
|
||||
for item in os.listdir(directory):
|
||||
plugin_path = os.path.join(directory, item)
|
||||
if os.path.isdir(plugin_path):
|
||||
# Check for plugin.py and plugin.info
|
||||
plugin_py = os.path.join(plugin_path, "plugin.py")
|
||||
plugin_info_file = os.path.join(plugin_path, "plugin.info")
|
||||
|
||||
if os.path.exists(plugin_py):
|
||||
plugin_info = {'name': item, 'description': '', 'version': ''}
|
||||
|
||||
# Try to read plugin.info
|
||||
if os.path.exists(plugin_info_file):
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
config.read(plugin_info_file)
|
||||
|
||||
# Handle both INI-style and simple key=value format
|
||||
if config.sections():
|
||||
# INI-style format
|
||||
for section in config.sections():
|
||||
for key, value in config[section].items():
|
||||
if key.lower() in ['description', 'version']:
|
||||
plugin_info[key.lower()] = value
|
||||
else:
|
||||
# Simple key=value format
|
||||
with open(plugin_info_file, 'r') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if '=' in line and not line.startswith('#'):
|
||||
key, value = line.split('=', 1)
|
||||
key = key.strip().lower()
|
||||
value = value.strip()
|
||||
if key in ['description', 'version']:
|
||||
plugin_info[key] = value
|
||||
except Exception as info_e:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Error reading {plugin_info_file}: {info_e}", True)
|
||||
|
||||
plugins[item] = plugin_info
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PluginManager: Error scanning directory {directory}: {e}")
|
||||
|
||||
def _on_plugin_toggled(self, renderer, path):
|
||||
"""Handle plugin toggle via TreeView."""
|
||||
try:
|
||||
tree_iter = self._plugin_model.get_iter(path)
|
||||
if not tree_iter:
|
||||
return
|
||||
|
||||
can_toggle = self._plugin_model.get_value(tree_iter, self.PLUGIN_COL_CAN_TOGGLE)
|
||||
if not can_toggle:
|
||||
return
|
||||
|
||||
is_active = self._plugin_model.get_value(tree_iter, self.PLUGIN_COL_ENABLED)
|
||||
plugin_name = self._plugin_model.get_value(tree_iter, self.PLUGIN_COL_NAME)
|
||||
new_active = not is_active
|
||||
self._plugin_model.set_value(tree_iter, self.PLUGIN_COL_ENABLED, new_active)
|
||||
self._set_plugin_active(plugin_name, new_active)
|
||||
self._rebuild_plugin_groups()
|
||||
except Exception as error:
|
||||
logger.error(f"PluginManager: Error toggling plugin at path {path}: {error}")
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Error toggling plugin at path {path}: {error}", True)
|
||||
|
||||
def _on_plugin_list_key_press(self, widget, event):
|
||||
"""Toggle plugin on Space while keeping Tab navigation outside the list."""
|
||||
if event.keyval != Gdk.KEY_space:
|
||||
return False
|
||||
|
||||
selection = self._plugin_treeview.get_selection()
|
||||
model, tree_iter = selection.get_selected()
|
||||
if not tree_iter:
|
||||
return False
|
||||
|
||||
path = model.get_path(tree_iter)
|
||||
self._on_plugin_toggled(None, path)
|
||||
return True
|
||||
|
||||
def _on_plugin_row_activated(self, treeview, path, column):
|
||||
"""Toggle plugin when activating a row."""
|
||||
self._on_plugin_toggled(None, path)
|
||||
|
||||
def _rebuild_plugin_groups(self):
|
||||
"""Rebuild the tree so enabled/disabled groups stay accurate."""
|
||||
if not self._plugin_model:
|
||||
return
|
||||
|
||||
selection = self._plugin_treeview.get_selection()
|
||||
model, tree_iter = selection.get_selected()
|
||||
selected_name = None
|
||||
if tree_iter:
|
||||
selected_name = model.get_value(tree_iter, self.PLUGIN_COL_NAME)
|
||||
|
||||
self._populate_plugin_list()
|
||||
|
||||
if selected_name:
|
||||
self._select_plugin_row(selected_name)
|
||||
|
||||
def _select_plugin_row(self, plugin_name):
|
||||
"""Select the row for the given plugin name."""
|
||||
def _walk(model, tree_iter):
|
||||
while tree_iter:
|
||||
name = model.get_value(tree_iter, self.PLUGIN_COL_NAME)
|
||||
if name == plugin_name:
|
||||
path = model.get_path(tree_iter)
|
||||
self._plugin_treeview.expand_to_path(path)
|
||||
self._plugin_treeview.get_selection().select_path(path)
|
||||
return True
|
||||
if model.iter_has_child(tree_iter):
|
||||
child = model.iter_children(tree_iter)
|
||||
if _walk(model, child):
|
||||
return True
|
||||
tree_iter = model.iter_next(tree_iter)
|
||||
return False
|
||||
|
||||
root = self._plugin_model.get_iter_first()
|
||||
_walk(self._plugin_model, root)
|
||||
|
||||
def _set_plugin_active(self, plugin_name, is_active):
|
||||
"""Update settings to enable or disable a plugin."""
|
||||
try:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Plugin {plugin_name} toggled to {'active' if is_active else 'inactive'}", True)
|
||||
|
||||
# Get current active plugins
|
||||
active_plugins = cthulhu.cthulhuApp.settingsManager.getSetting('activePlugins') or []
|
||||
active_plugins = list(active_plugins)
|
||||
|
||||
if is_active and plugin_name not in active_plugins:
|
||||
active_plugins.append(plugin_name)
|
||||
elif not is_active and plugin_name in active_plugins:
|
||||
active_plugins.remove(plugin_name)
|
||||
|
||||
cthulhu.cthulhuApp.settingsManager.setSetting('activePlugins', active_plugins)
|
||||
if hasattr(cthulhu.cthulhuApp.settingsManager, "general") and isinstance(cthulhu.cthulhuApp.settingsManager.general, dict):
|
||||
cthulhu.cthulhuApp.settingsManager.general['activePlugins'] = active_plugins
|
||||
|
||||
try:
|
||||
active_profile = cthulhu.cthulhuApp.settingsManager.getSetting('activeProfile')
|
||||
if isinstance(active_profile, (list, tuple)) and len(active_profile) > 1:
|
||||
profile_name = active_profile[1]
|
||||
else:
|
||||
profile_name = cthulhu.cthulhuApp.settingsManager.profile or 'default'
|
||||
|
||||
current_general = cthulhu.cthulhuApp.settingsManager.getGeneralSettings(profile_name) or {}
|
||||
current_general['activePlugins'] = active_plugins
|
||||
|
||||
cthulhu.cthulhuApp.settingsManager.profile = profile_name
|
||||
cthulhu.cthulhuApp.settingsManager.saveProfileSettings(current_general)
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Settings saved via settings manager (profile {profile_name})", True)
|
||||
|
||||
except Exception as save_error:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Error saving plugin state: {save_error}", True)
|
||||
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Updated active plugins: {active_plugins}", True)
|
||||
|
||||
try:
|
||||
from cthulhu import plugin_system_manager
|
||||
manager = plugin_system_manager.getManager()
|
||||
if manager:
|
||||
manager.setActivePlugins(active_plugins)
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Applied active plugin changes", True)
|
||||
except Exception as apply_error:
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Failed to apply plugin changes: {apply_error}", True)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(f"PluginManager: Error toggling plugin {plugin_name}: {error}")
|
||||
debug.printMessage(debug.LEVEL_INFO, f"PluginManager: Error toggling {plugin_name}: {error}", True)
|
||||
|
||||
|
||||
def _on_dialog_response(self, dialog, response_id):
|
||||
"""Handle dialog response (close button clicked)."""
|
||||
try:
|
||||
if response_id == Gtk.ResponseType.CLOSE:
|
||||
dialog.destroy()
|
||||
self._dialog = None
|
||||
debug.printMessage(debug.LEVEL_INFO, "PluginManager: Dialog closed", True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PluginManager: Error handling dialog response: {e}")
|
||||
@@ -24,6 +24,7 @@ import logging
|
||||
|
||||
from gi.repository import GLib
|
||||
|
||||
from cthulhu.ax_object import AXObject
|
||||
from cthulhu import debug
|
||||
from cthulhu import dbus_service
|
||||
from cthulhu.plugin import Plugin, cthulhu_hookimpl
|
||||
@@ -53,6 +54,9 @@ class WindowTitleReader(Plugin):
|
||||
self._enabled = False
|
||||
self._pollSourceId = None
|
||||
self._pollIntervalMs = 100
|
||||
self._fallbackDelayMs = 250
|
||||
self._pendingFallbackSourceId = None
|
||||
self._lastActiveWindowId = None
|
||||
self._lastTitle = None
|
||||
self._display = None
|
||||
self._root = None
|
||||
@@ -71,6 +75,13 @@ class WindowTitleReader(Plugin):
|
||||
return True
|
||||
|
||||
self._register_keybinding()
|
||||
self.app.getDynamicApiManager().registerAPI(
|
||||
"WindowTitleReader",
|
||||
self,
|
||||
overwrite=True,
|
||||
)
|
||||
if xlibAvailable:
|
||||
self._start_tracking()
|
||||
self._activated = True
|
||||
debug.printMessage(debug.LEVEL_INFO, "WindowTitleReader: Activated", True)
|
||||
return True
|
||||
@@ -81,6 +92,8 @@ class WindowTitleReader(Plugin):
|
||||
return
|
||||
|
||||
self._stop_tracking()
|
||||
if self.app:
|
||||
self.app.getDynamicApiManager().unregisterAPI("WindowTitleReader")
|
||||
self._activated = False
|
||||
debug.printMessage(debug.LEVEL_INFO, "WindowTitleReader: Deactivated", True)
|
||||
return True
|
||||
@@ -118,7 +131,8 @@ class WindowTitleReader(Plugin):
|
||||
|
||||
def _toggle_tracking(self, script=None, inputEvent=None):
|
||||
if self._enabled:
|
||||
self._stop_tracking()
|
||||
self._enabled = False
|
||||
self._lastTitle = None
|
||||
self._present_message("Window title reader off")
|
||||
return True
|
||||
|
||||
@@ -132,6 +146,8 @@ class WindowTitleReader(Plugin):
|
||||
return True
|
||||
|
||||
if self._start_tracking():
|
||||
self._enabled = True
|
||||
self._poll_window_title()
|
||||
self._present_message("Window title reader on")
|
||||
else:
|
||||
self._present_message("Window title reader unavailable")
|
||||
@@ -162,6 +178,8 @@ class WindowTitleReader(Plugin):
|
||||
return False
|
||||
|
||||
if self._start_tracking():
|
||||
self._enabled = True
|
||||
self._poll_window_title()
|
||||
if notify_user:
|
||||
self._present_message("Window title reader on")
|
||||
return True
|
||||
@@ -171,7 +189,8 @@ class WindowTitleReader(Plugin):
|
||||
return False
|
||||
|
||||
if self._enabled:
|
||||
self._stop_tracking()
|
||||
self._enabled = False
|
||||
self._lastTitle = None
|
||||
|
||||
if notify_user:
|
||||
self._present_message("Window title reader off")
|
||||
@@ -195,7 +214,7 @@ class WindowTitleReader(Plugin):
|
||||
pluginLogger.exception("WindowTitleReader: Failed to present message")
|
||||
|
||||
def _start_tracking(self):
|
||||
if self._enabled:
|
||||
if self._pollSourceId is not None:
|
||||
return True
|
||||
|
||||
try:
|
||||
@@ -203,7 +222,6 @@ class WindowTitleReader(Plugin):
|
||||
self._root = self._display.screen().root
|
||||
self._init_atoms()
|
||||
self._pollSourceId = GLib.timeout_add(self._pollIntervalMs, self._poll_window_title)
|
||||
self._enabled = True
|
||||
self._poll_window_title()
|
||||
return True
|
||||
except Exception as error:
|
||||
@@ -217,11 +235,16 @@ class WindowTitleReader(Plugin):
|
||||
return False
|
||||
|
||||
def _stop_tracking(self):
|
||||
if self._pendingFallbackSourceId is not None:
|
||||
GLib.source_remove(self._pendingFallbackSourceId)
|
||||
self._pendingFallbackSourceId = None
|
||||
|
||||
if self._pollSourceId is not None:
|
||||
GLib.source_remove(self._pollSourceId)
|
||||
self._pollSourceId = None
|
||||
|
||||
self._enabled = False
|
||||
self._lastActiveWindowId = None
|
||||
self._lastTitle = None
|
||||
self._cleanup_display()
|
||||
|
||||
@@ -246,7 +269,7 @@ class WindowTitleReader(Plugin):
|
||||
}
|
||||
|
||||
def _poll_window_title(self):
|
||||
if not self._enabled:
|
||||
if self._pollSourceId is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
@@ -260,7 +283,14 @@ class WindowTitleReader(Plugin):
|
||||
self._lastTitle = None
|
||||
return True
|
||||
|
||||
if windowTitle != self._lastTitle:
|
||||
activeWindowId = activeWindow.id
|
||||
if self._lastActiveWindowId is None:
|
||||
self._lastActiveWindowId = activeWindowId
|
||||
elif activeWindowId != self._lastActiveWindowId:
|
||||
self._lastActiveWindowId = activeWindowId
|
||||
self._schedule_fallback_title()
|
||||
|
||||
if self._enabled and windowTitle != self._lastTitle:
|
||||
self._present_title(windowTitle)
|
||||
self._lastTitle = windowTitle
|
||||
except Exception as error:
|
||||
@@ -269,6 +299,70 @@ class WindowTitleReader(Plugin):
|
||||
|
||||
return True
|
||||
|
||||
def _schedule_fallback_title(self):
|
||||
if self._enabled:
|
||||
return
|
||||
|
||||
if self._pendingFallbackSourceId is not None:
|
||||
GLib.source_remove(self._pendingFallbackSourceId)
|
||||
|
||||
self._pendingFallbackSourceId = GLib.timeout_add(
|
||||
self._fallbackDelayMs,
|
||||
self._present_pending_fallback_title,
|
||||
)
|
||||
|
||||
def _present_pending_fallback_title(self):
|
||||
self._pendingFallbackSourceId = None
|
||||
titleText = self.get_fallback_title()
|
||||
if titleText:
|
||||
self._present_title(titleText)
|
||||
return False
|
||||
|
||||
def get_fallback_title(self, atspiTitle=None):
|
||||
"""Returns the X11 title when AT-SPI has not exposed an equivalent title."""
|
||||
|
||||
if not xlibAvailable and self._display is None:
|
||||
return ""
|
||||
|
||||
startedTracking = self._pollSourceId is not None
|
||||
if not startedTracking and not self._start_tracking():
|
||||
return ""
|
||||
|
||||
activeWindow = self._get_active_window()
|
||||
titleText = self._get_current_title(activeWindow) if activeWindow else ""
|
||||
if not titleText:
|
||||
return ""
|
||||
|
||||
if atspiTitle is None:
|
||||
atspiTitle = self._get_atspi_title()
|
||||
|
||||
if self._titles_match(atspiTitle, titleText):
|
||||
return ""
|
||||
|
||||
return titleText
|
||||
|
||||
def _get_atspi_title(self):
|
||||
if not self.app:
|
||||
return ""
|
||||
|
||||
appState = self.app.getDynamicApiManager().getAPI("CthulhuState")
|
||||
if not appState:
|
||||
return ""
|
||||
|
||||
return AXObject.get_name(appState.activeWindow) or ""
|
||||
|
||||
def _titles_match(self, atspiTitle, fallbackTitle):
|
||||
if not atspiTitle or not fallbackTitle:
|
||||
return False
|
||||
|
||||
if self._is_wine_desktop_title(atspiTitle):
|
||||
return False
|
||||
|
||||
normalizedAtspiTitle = " ".join(atspiTitle.casefold().split())
|
||||
normalizedFallbackTitle = " ".join(fallbackTitle.casefold().split())
|
||||
return normalizedAtspiTitle in normalizedFallbackTitle \
|
||||
or normalizedFallbackTitle in normalizedAtspiTitle
|
||||
|
||||
def _present_title(self, titleText):
|
||||
if not self.app:
|
||||
return
|
||||
|
||||
@@ -8,7 +8,6 @@ subdir('HelloCthulhu')
|
||||
subdir('GameMode')
|
||||
subdir('nvda2cthulhu')
|
||||
subdir('OCR')
|
||||
subdir('PluginManager')
|
||||
subdir('SpeechHistory')
|
||||
subdir('SimplePluginSystem')
|
||||
subdir('hello_world')
|
||||
|
||||
@@ -26,6 +26,7 @@ import select
|
||||
import logging
|
||||
import threading
|
||||
from threading import Thread, Lock
|
||||
from typing import Optional
|
||||
from cthulhu.plugin import Plugin, cthulhu_hookimpl
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -34,6 +35,13 @@ logger = logging.getLogger(__name__)
|
||||
APPEND_CODE = '<#APPEND#>'
|
||||
PERSISTENT_CODE = '<#PERSISTENT#>'
|
||||
|
||||
def _get_socket_file() -> Optional[str]:
|
||||
runtimeDir = os.environ.get('XDG_RUNTIME_DIR')
|
||||
if not runtimeDir:
|
||||
return None
|
||||
|
||||
return os.path.join(runtimeDir, 'cthulhu.sock')
|
||||
|
||||
class SelfVoice(Plugin):
|
||||
"""Plugin that provides a socket interface for external applications to send text to Cthulhu."""
|
||||
|
||||
@@ -115,9 +123,10 @@ class SelfVoice(Plugin):
|
||||
|
||||
def voiceWorker(self):
|
||||
"""Worker thread that listens on a socket for messages to speak."""
|
||||
socketFile = '/tmp/cthulhu.sock'
|
||||
# For testing purposes
|
||||
# socketFile = '/tmp/cthulhu-plugin.sock'
|
||||
socketFile = _get_socket_file()
|
||||
if not socketFile:
|
||||
logger.error("XDG_RUNTIME_DIR is not set; Self Voice plugin cannot create its socket")
|
||||
return
|
||||
|
||||
# Clean up any existing socket file
|
||||
if os.path.exists(socketFile):
|
||||
|
||||
@@ -164,6 +164,16 @@ class Script(Chromium.Script):
|
||||
|
||||
super().onTextInserted(event)
|
||||
|
||||
def onCaretMoved(self, event):
|
||||
"""Callback for object:text-caret-moved accessibility events."""
|
||||
|
||||
if self._isSteamLoadingSpinnerCaretEvent(event):
|
||||
msg = "STEAM: Ignoring caret event from transient loading spinner"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
return True
|
||||
|
||||
return super().onCaretMoved(event)
|
||||
|
||||
def onSelectionChanged(self, event):
|
||||
"""Callback for object:selection-changed accessibility events."""
|
||||
|
||||
@@ -193,6 +203,34 @@ class Script(Chromium.Script):
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
return True
|
||||
|
||||
def _isSteamLoadingSpinnerCaretEvent(self, event) -> bool:
|
||||
if not (event and event.type.startswith("object:text-caret-moved")):
|
||||
return False
|
||||
|
||||
source = getattr(event, "source", None)
|
||||
if not source or not self.utilities.inDocumentContent(source):
|
||||
return False
|
||||
|
||||
if AXObject.get_name(source):
|
||||
return False
|
||||
|
||||
try:
|
||||
children = list(AXObject.iter_children(source))
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
if not children:
|
||||
return False
|
||||
|
||||
for child in children:
|
||||
if not AXUtilities.is_image_or_canvas(child):
|
||||
continue
|
||||
name = AXObject.get_name(child) or ""
|
||||
if name.strip().casefold() == "steam spinner":
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _trySteamButtonActivation(self, keyboardEvent) -> bool:
|
||||
if keyboardEvent.event_string not in ["Return", "KP_Enter"]:
|
||||
return False
|
||||
|
||||
@@ -393,8 +393,6 @@ class Script(script.Script):
|
||||
return keyBindings
|
||||
|
||||
def getExtensionBindings(self):
|
||||
with open('/tmp/extension_bindings_debug.log', 'a') as f:
|
||||
f.write(f"=== getExtensionBindings() called ===\n")
|
||||
keyBindings = keybindings.KeyBindings()
|
||||
|
||||
bindings = self.notificationPresenter.get_bindings()
|
||||
@@ -443,35 +441,13 @@ class Script(script.Script):
|
||||
try:
|
||||
if hasattr(cthulhu, 'cthulhuApp') and cthulhu.cthulhuApp:
|
||||
api_helper = cthulhu.cthulhuApp.getAPIHelper()
|
||||
with open('/tmp/extension_bindings_debug.log', 'a') as f:
|
||||
f.write(f"=== Checking for plugin bindings ===\n")
|
||||
f.write(f"api_helper exists: {api_helper is not None}\n")
|
||||
if api_helper:
|
||||
f.write(f"api_helper has _gestureBindings: {hasattr(api_helper, '_gestureBindings')}\n")
|
||||
if hasattr(api_helper, '_gestureBindings'):
|
||||
f.write(f"_gestureBindings content: {api_helper._gestureBindings}\n")
|
||||
f.write(f"Available contexts: {list(api_helper._gestureBindings.keys())}\n")
|
||||
|
||||
if api_helper and hasattr(api_helper, '_gestureBindings'):
|
||||
with open('/tmp/extension_bindings_debug.log', 'a') as f:
|
||||
f.write(f"=== Adding plugin bindings in getExtensionBindings() ===\n")
|
||||
|
||||
for context_name, context_bindings in api_helper._gestureBindings.items():
|
||||
for binding in context_bindings:
|
||||
keyBindings.add(binding)
|
||||
with open('/tmp/extension_bindings_debug.log', 'a') as f:
|
||||
f.write(f"Added plugin binding: {binding.keysymstring} modifiers={binding.modifiers} desc={binding.handler.description}\n")
|
||||
else:
|
||||
with open('/tmp/extension_bindings_debug.log', 'a') as f:
|
||||
f.write(f"=== No plugin bindings available ===\n")
|
||||
else:
|
||||
with open('/tmp/extension_bindings_debug.log', 'a') as f:
|
||||
f.write(f"=== cthulhuApp not available ===\n")
|
||||
except Exception as e:
|
||||
import cthulhu.debug as debug
|
||||
debug.printMessage(debug.LEVEL_WARNING, f"Failed to add plugin bindings: {e}", True)
|
||||
with open('/tmp/extension_bindings_debug.log', 'a') as f:
|
||||
f.write(f"Exception in plugin binding addition: {e}\n")
|
||||
|
||||
return keyBindings
|
||||
|
||||
|
||||
@@ -1758,6 +1758,7 @@ class Script(default.Script):
|
||||
if not document:
|
||||
msg = "WEB: Locus of focus changed to non-document obj"
|
||||
self._madeFindAnnouncement = False
|
||||
if not self._focusModeIsSticky:
|
||||
self._inFocusMode = False
|
||||
self._setNavigationSuspended(True, "focus left document content")
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
@@ -1779,6 +1780,19 @@ class Script(default.Script):
|
||||
if self._navSuspended:
|
||||
self._setNavigationSuspended(False, "focus entered document content")
|
||||
|
||||
if self._focusModeIsSticky and not self._inFocusMode:
|
||||
msg = "WEB: Restoring sticky focus mode after returning to document content"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
self._inFocusMode = True
|
||||
self.presentMessage(messages.MODE_FOCUS_IS_STICKY)
|
||||
self.refreshKeyGrabs()
|
||||
elif self._browseModeIsSticky and self._inFocusMode:
|
||||
msg = "WEB: Restoring sticky browse mode after returning to document content"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
self._inFocusMode = False
|
||||
self.presentMessage(messages.MODE_BROWSE_IS_STICKY)
|
||||
self.refreshKeyGrabs()
|
||||
|
||||
if self.flatReviewPresenter.is_active():
|
||||
self.flatReviewPresenter.quit()
|
||||
|
||||
@@ -2551,6 +2565,7 @@ class Script(default.Script):
|
||||
if self._browseModeIsSticky:
|
||||
msg = "WEB: Web app descendant claimed focus, but browse mode is sticky"
|
||||
debug.printMessage(debug.LEVEL_INFO, msg, True)
|
||||
return True
|
||||
elif AXUtilities.is_tool_tip(event.source) \
|
||||
and AXObject.find_ancestor(cthulhu_state.locusOfFocus, lambda x: x == event.source):
|
||||
msg = "WEB: Event believed to be side effect of tooltip navigation."
|
||||
|
||||
@@ -42,6 +42,8 @@ userCustomizableSettings = [
|
||||
"onlySpeakDisplayedText",
|
||||
"speechServerFactory",
|
||||
"speechServerInfo",
|
||||
"hardwareSpeechDevice",
|
||||
"hardwareSpeechBaudRate",
|
||||
"voices",
|
||||
"speechVerbosityLevel",
|
||||
"readFullRowInGUITable",
|
||||
@@ -265,9 +267,11 @@ activeProfile = ['Default', 'default']
|
||||
profile = ['Default', 'default']
|
||||
|
||||
# Speech
|
||||
speechFactoryModules = ["speechdispatcherfactory", "piperfactory"]
|
||||
speechFactoryModules = ["speechdispatcherfactory", "piperfactory", "hardwarefactory"]
|
||||
speechServerFactory = "speechdispatcherfactory"
|
||||
speechServerInfo = None # None means let the factory decide.
|
||||
hardwareSpeechDevice = ""
|
||||
hardwareSpeechBaudRate = 9600
|
||||
enableSpeech = True
|
||||
silenceSpeech = False
|
||||
enableTutorialMessages = False
|
||||
@@ -493,7 +497,6 @@ presentLiveRegionFromInactiveTab = False
|
||||
|
||||
# Plugins
|
||||
activePlugins = [
|
||||
'PluginManager',
|
||||
'ByeCthulhu',
|
||||
'Clipboard',
|
||||
'HelloCthulhu',
|
||||
|
||||
@@ -33,6 +33,7 @@ __copyright__ = "Copyright (c) 2005-2008 Sun Microsystems Inc." \
|
||||
__license__ = "LGPL"
|
||||
|
||||
from . import cmdnames
|
||||
from . import cthulhu
|
||||
from . import debug
|
||||
from . import input_event
|
||||
from . import keybindings
|
||||
@@ -319,10 +320,33 @@ class WhereAmIPresenter:
|
||||
return True
|
||||
|
||||
title = script.speechGenerator.generateTitle(obj)
|
||||
fallbackTitle = self._get_fallback_title(title)
|
||||
if fallbackTitle:
|
||||
script.presentMessage(fallbackTitle)
|
||||
return True
|
||||
|
||||
for (string, voice) in title:
|
||||
script.presentMessage(string, voice=voice)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _get_fallback_title(generatedTitle):
|
||||
"""Returns a non-AT-SPI title when the active title reader can provide one."""
|
||||
|
||||
try:
|
||||
reader = cthulhu.getManager().getDynamicApiManager().getAPI("WindowTitleReader")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
if reader is None:
|
||||
return ""
|
||||
|
||||
atspiTitle = " ".join(
|
||||
str(item[0]) for item in generatedTitle
|
||||
if isinstance(item, (list, tuple)) and item
|
||||
)
|
||||
return reader.get_fallback_title(atspiTitle)
|
||||
|
||||
def _present_default_button(self, script, event=None, dialog=None, error_messages=True):
|
||||
"""Presents the default button of the current dialog."""
|
||||
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
import os
|
||||
import select
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||
|
||||
from cthulhu import hardwarefactory
|
||||
from cthulhu import settings
|
||||
|
||||
|
||||
def read_available(fd, expectedLength, timeout=1.0):
|
||||
deadline = time.monotonic() + timeout
|
||||
data = b""
|
||||
while len(data) < expectedLength and time.monotonic() < deadline:
|
||||
readable, _, _ = select.select([fd], [], [], 0.05)
|
||||
if readable:
|
||||
data += os.read(fd, 1024)
|
||||
return data
|
||||
|
||||
|
||||
class HardwareFactoryRegressionTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._oldDevice = settings.hardwareSpeechDevice
|
||||
self._oldBaudRate = settings.hardwareSpeechBaudRate
|
||||
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||
|
||||
def tearDown(self):
|
||||
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||
settings.hardwareSpeechDevice = self._oldDevice
|
||||
settings.hardwareSpeechBaudRate = self._oldBaudRate
|
||||
|
||||
def test_lists_explicit_synth_choices_without_opening_serial_device(self):
|
||||
settings.hardwareSpeechDevice = ""
|
||||
|
||||
servers = hardwarefactory.SpeechServer.getSpeechServers()
|
||||
|
||||
self.assertEqual(
|
||||
["litetalk", "doubletalk", "tripletalk", "dectalk"],
|
||||
[server.getInfo()[1] for server in servers],
|
||||
)
|
||||
self.assertEqual({}, hardwarefactory.SpeechServer._active_servers)
|
||||
self.assertTrue(all(server._driver is None for server in servers))
|
||||
|
||||
def test_failed_initialization_is_not_cached(self):
|
||||
settings.hardwareSpeechDevice = ""
|
||||
|
||||
self.assertIsNone(
|
||||
hardwarefactory.SpeechServer.getSpeechServer(["LiteTalk", "litetalk"])
|
||||
)
|
||||
self.assertEqual({}, hardwarefactory.SpeechServer._active_servers)
|
||||
|
||||
masterFd, slaveFd = os.openpty()
|
||||
try:
|
||||
settings.hardwareSpeechDevice = os.ttyname(slaveFd)
|
||||
server = hardwarefactory.SpeechServer.getSpeechServer(
|
||||
["LiteTalk", "litetalk"]
|
||||
)
|
||||
|
||||
self.assertIsNotNone(server)
|
||||
self.assertIsNotNone(server._driver)
|
||||
self.assertIs(
|
||||
server,
|
||||
hardwarefactory.SpeechServer._active_servers.get("litetalk"),
|
||||
)
|
||||
finally:
|
||||
os.close(masterFd)
|
||||
os.close(slaveFd)
|
||||
|
||||
def test_explicit_synth_choices_write_expected_serial_bytes(self):
|
||||
expectedBytes = {
|
||||
"litetalk": b"Alias\r",
|
||||
"doubletalk": b"Alias\r",
|
||||
"tripletalk": b"Alias\r",
|
||||
"dectalk": b"Alias\x01",
|
||||
}
|
||||
|
||||
for synthId, expected in expectedBytes.items():
|
||||
with self.subTest(synthId=synthId):
|
||||
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||
masterFd, slaveFd = os.openpty()
|
||||
try:
|
||||
settings.hardwareSpeechDevice = os.ttyname(slaveFd)
|
||||
server = hardwarefactory.SpeechServer.getSpeechServer(
|
||||
["", synthId]
|
||||
)
|
||||
|
||||
self.assertIsNotNone(server)
|
||||
server.speak("Alias", interrupt=False)
|
||||
self.assertEqual(expected, read_available(masterFd, len(expected)))
|
||||
finally:
|
||||
hardwarefactory.SpeechServer.shutdownActiveServers()
|
||||
os.close(masterFd)
|
||||
os.close(slaveFd)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -20,7 +20,7 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
):
|
||||
self.assertTrue(manager._active_x11_window_differs_from(cachedWindow))
|
||||
|
||||
def test_keyboard_event_preserves_key_handling_when_unknown_x11_window_is_not_xterm(self):
|
||||
def test_keyboard_event_clears_stale_context_before_recovering_old_focus_window(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
staleWindow = object()
|
||||
staleFocus = object()
|
||||
@@ -30,6 +30,8 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
scriptManager = mock.Mock()
|
||||
keyboardEvent = mock.Mock()
|
||||
keyboardEvent.is_modifier_key.return_value = False
|
||||
manager._last_input_event = object()
|
||||
manager._last_non_modifier_key_event = object()
|
||||
|
||||
with (
|
||||
mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False),
|
||||
@@ -38,7 +40,8 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent),
|
||||
mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window", return_value=False),
|
||||
mock.patch.object(input_event_manager.AXUtilities, "find_active_window", return_value=None),
|
||||
mock.patch.object(manager, "_get_top_level_window", return_value=None),
|
||||
mock.patch.object(manager, "_find_active_x11_atspi_window", return_value=None),
|
||||
mock.patch.object(manager, "_get_top_level_window", return_value=staleWindow),
|
||||
mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=False),
|
||||
mock.patch.object(manager, "_active_x11_window_differs_from", return_value=True),
|
||||
mock.patch.object(manager, "last_event_was_keyboard", return_value=False),
|
||||
@@ -53,12 +56,18 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
"Return",
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
scriptManager.set_active_script.assert_not_called()
|
||||
focusManager.clear_state.assert_not_called()
|
||||
keyboardEvent.process.assert_called_once_with()
|
||||
self.assertFalse(result)
|
||||
scriptManager.set_active_script.assert_called_once_with(
|
||||
None,
|
||||
"active X11 window not found in AT-SPI",
|
||||
)
|
||||
focusManager.clear_state.assert_called_once_with("active X11 window not found in AT-SPI")
|
||||
focusManager.set_active_window.assert_not_called()
|
||||
keyboardEvent.process.assert_not_called()
|
||||
self.assertIsNone(manager._last_input_event)
|
||||
self.assertIsNone(manager._last_non_modifier_key_event)
|
||||
|
||||
def test_keyboard_event_uses_active_script_app_when_cached_window_is_missing(self):
|
||||
def test_keyboard_event_clears_stale_script_when_cached_context_is_missing(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
staleApp = object()
|
||||
staleScript = mock.Mock(app=staleApp)
|
||||
@@ -77,6 +86,7 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent),
|
||||
mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window", return_value=False),
|
||||
mock.patch.object(input_event_manager.AXUtilities, "find_active_window", return_value=None),
|
||||
mock.patch.object(manager, "_find_active_x11_atspi_window", return_value=None),
|
||||
mock.patch.object(manager, "_get_top_level_window", return_value=None),
|
||||
mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=False),
|
||||
mock.patch.object(manager, "_active_x11_window_differs_from", return_value=True) as differs,
|
||||
@@ -92,8 +102,51 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
"KP_Page_Up",
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertFalse(result)
|
||||
differs.assert_called_once_with(staleApp)
|
||||
scriptManager.set_active_script.assert_called_once_with(
|
||||
None,
|
||||
"active X11 window not found in AT-SPI",
|
||||
)
|
||||
focusManager.clear_state.assert_called_once_with("active X11 window not found in AT-SPI")
|
||||
keyboardEvent.process.assert_not_called()
|
||||
|
||||
def test_keyboard_event_recovers_focus_window_when_x11_does_not_contradict_it(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
cachedWindow = object()
|
||||
staleFocus = object()
|
||||
focusManager = mock.Mock()
|
||||
focusManager.get_active_window.return_value = cachedWindow
|
||||
focusManager.get_locus_of_focus.return_value = staleFocus
|
||||
scriptManager = mock.Mock()
|
||||
keyboardEvent = mock.Mock()
|
||||
keyboardEvent.is_modifier_key.return_value = False
|
||||
|
||||
with (
|
||||
mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False),
|
||||
mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager),
|
||||
mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager),
|
||||
mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent),
|
||||
mock.patch.object(input_event_manager.AXUtilities, "can_be_active_window", return_value=False),
|
||||
mock.patch.object(input_event_manager.AXUtilities, "find_active_window", return_value=None),
|
||||
mock.patch.object(manager, "_find_active_x11_atspi_window", return_value=None),
|
||||
mock.patch.object(manager, "_get_top_level_window", return_value=cachedWindow),
|
||||
mock.patch.object(manager, "_should_pass_through_for_active_xterm", return_value=False),
|
||||
mock.patch.object(manager, "_active_x11_window_differs_from", return_value=False),
|
||||
mock.patch.object(manager, "last_event_was_keyboard", return_value=False),
|
||||
mock.patch.object(input_event_manager.debug, "print_message"),
|
||||
):
|
||||
result = manager.process_keyboard_event(
|
||||
mock.Mock(),
|
||||
True,
|
||||
36,
|
||||
65293,
|
||||
0,
|
||||
"Return",
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
focusManager.set_active_window.assert_called_once_with(cachedWindow)
|
||||
scriptManager.set_active_script.assert_not_called()
|
||||
focusManager.clear_state.assert_not_called()
|
||||
keyboardEvent.process.assert_called_once_with()
|
||||
@@ -226,6 +279,52 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
activeScript.addKeyGrabs.assert_not_called()
|
||||
keyboardEvent.process.assert_not_called()
|
||||
|
||||
def test_xterm_pass_through_ignores_stale_pending_self_hosted_focus(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
pendingFocus = object()
|
||||
focusManager = mock.Mock()
|
||||
focusManager.get_active_window.return_value = None
|
||||
focusManager.get_locus_of_focus.return_value = None
|
||||
scriptManager = mock.Mock()
|
||||
activeScript = mock.Mock(app=None)
|
||||
scriptManager.get_active_script.return_value = activeScript
|
||||
keyboardEvent = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(input_event_manager.cthulhu_state, "capturingKeys", False),
|
||||
mock.patch.object(input_event_manager.cthulhu_state, "pendingSelfHostedFocus", pendingFocus),
|
||||
mock.patch.object(input_event_manager.focus_manager, "get_manager", return_value=focusManager),
|
||||
mock.patch.object(input_event_manager.script_manager, "get_manager", return_value=scriptManager),
|
||||
mock.patch.object(input_event_manager.input_event, "KeyboardEvent", return_value=keyboardEvent),
|
||||
mock.patch.object(manager, "_active_x11_window_xterm_match", return_value=True),
|
||||
mock.patch.object(input_event_manager.debug, "print_message"),
|
||||
mock.patch.object(input_event_manager.debug, "print_tokens"),
|
||||
):
|
||||
result = manager.process_keyboard_event(
|
||||
mock.Mock(),
|
||||
True,
|
||||
90,
|
||||
65438,
|
||||
0,
|
||||
"KP_Insert",
|
||||
)
|
||||
|
||||
self.assertFalse(result)
|
||||
activeScript.removeKeyGrabs.assert_called_once_with()
|
||||
keyboardEvent.process.assert_not_called()
|
||||
|
||||
def test_pending_self_hosted_focus_blocks_unknown_xterm_match(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
|
||||
with (
|
||||
mock.patch.object(manager, "_active_x11_window_xterm_match", return_value=None),
|
||||
mock.patch.object(input_event_manager.debug, "print_message"),
|
||||
mock.patch.object(input_event_manager.debug, "print_tokens"),
|
||||
):
|
||||
result = manager._should_pass_through_for_active_xterm(None, None, object())
|
||||
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_xterm_grabs_restore_when_active_window_is_positively_not_xterm(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
focusManager = mock.Mock()
|
||||
@@ -264,6 +363,24 @@ class InputEventManagerX11FocusRegressionTests(unittest.TestCase):
|
||||
activeScript.addKeyGrabs.assert_called_once_with()
|
||||
keyboardEvent.process.assert_called_once_with()
|
||||
|
||||
def test_unidentified_active_x11_window_is_unknown_not_non_xterm(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
window = mock.Mock()
|
||||
window.get_class_group_name.return_value = None
|
||||
window.get_class_instance_name.return_value = None
|
||||
window.get_name.return_value = None
|
||||
window.get_class_group.return_value = None
|
||||
window.get_pid.return_value = -1
|
||||
|
||||
with (
|
||||
mock.patch.object(manager, "_get_active_x11_window", return_value=window),
|
||||
mock.patch.object(input_event_manager.debug, "print_message"),
|
||||
mock.patch.object(input_event_manager.debug, "print_tokens"),
|
||||
):
|
||||
result = manager._active_x11_window_xterm_match()
|
||||
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_finds_focused_atspi_window_for_active_x11_pid(self):
|
||||
manager = input_event_manager.InputEventManager()
|
||||
app = object()
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||
|
||||
from cthulhu import piperfactory
|
||||
|
||||
|
||||
class PiperFactoryRateMappingTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.server = piperfactory.SpeechServer.__new__(piperfactory.SpeechServer)
|
||||
|
||||
def test_default_rate_maps_to_native_piper_speed(self):
|
||||
self.assertEqual(1.0, self.server._mapRate(50))
|
||||
|
||||
def test_rate_scale_uses_full_cthulhu_range(self):
|
||||
self.assertEqual(2.0, self.server._mapRate(0))
|
||||
self.assertEqual(0.25, self.server._mapRate(100))
|
||||
|
||||
def test_high_screen_reader_rate_is_substantially_faster(self):
|
||||
self.assertAlmostEqual(0.415, self.server._mapRate(89), places=3)
|
||||
|
||||
def test_rate_values_are_clamped(self):
|
||||
self.assertEqual(2.0, self.server._mapRate(-1))
|
||||
self.assertEqual(0.25, self.server._mapRate(101))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -137,6 +137,21 @@ class PluginSystemManagerRegressionTests(unittest.TestCase):
|
||||
self.assertEqual(instance.deactivation_count, 1)
|
||||
self.assertEqual(plugin_info.instance, None)
|
||||
|
||||
@mock.patch("cthulhu.plugin_system_manager.dbus_service.get_remote_controller")
|
||||
def test_obsolete_plugin_manager_active_entry_is_dropped(self, remote_controller):
|
||||
remote_controller.return_value = mock.Mock()
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
manager = self._create_manager()
|
||||
plugin_info = self._create_plugin_info(temp_dir, "OCR")
|
||||
manager._plugins["OCR"] = plugin_info
|
||||
manager._plugin_name_index["OCR"] = ["OCR"]
|
||||
|
||||
with mock.patch.object(manager, "syncAllPluginsActive"):
|
||||
manager.setActivePlugins(["PluginManager", "OCR"])
|
||||
|
||||
self.assertEqual(manager.getActivePlugins(), ["OCR"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
import os
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from cthulhu.plugins.self_voice import plugin
|
||||
|
||||
|
||||
class SelfVoicePluginRegressionTests(unittest.TestCase):
|
||||
def test_socket_file_is_scoped_to_user_runtime_directory(self) -> None:
|
||||
with mock.patch.dict(os.environ, {"XDG_RUNTIME_DIR": "/run/user/1000"}, clear=True):
|
||||
self.assertEqual(plugin._get_socket_file(), "/run/user/1000/cthulhu.sock")
|
||||
|
||||
def test_socket_file_is_unavailable_without_user_runtime_directory(self) -> None:
|
||||
with mock.patch.dict(os.environ, {}, clear=True):
|
||||
self.assertIsNone(plugin._get_socket_file())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -96,6 +96,45 @@ class SteamReturnActivationTests(unittest.TestCase):
|
||||
|
||||
|
||||
class SteamVirtualizedListMutationTests(unittest.TestCase):
|
||||
def test_caret_moved_ignores_transient_loading_spinner(self):
|
||||
testScript = steam_script.Script.__new__(steam_script.Script)
|
||||
source = object()
|
||||
spinner = object()
|
||||
event = mock.Mock(type="object:text-caret-moved", source=source)
|
||||
|
||||
testScript.utilities = mock.Mock()
|
||||
testScript.utilities.inDocumentContent.return_value = True
|
||||
|
||||
with (
|
||||
mock.patch.object(steam_script.AXObject, "get_name", side_effect=["", "Steam Spinner"]),
|
||||
mock.patch.object(steam_script.AXObject, "iter_children", return_value=iter([spinner])),
|
||||
mock.patch.object(steam_script.AXUtilities, "is_image_or_canvas", return_value=True),
|
||||
mock.patch.object(steam_script.debug, "printMessage"),
|
||||
mock.patch.object(steam_script.Chromium.Script, "onCaretMoved") as chromiumCaretMoved,
|
||||
):
|
||||
self.assertTrue(testScript.onCaretMoved(event))
|
||||
|
||||
chromiumCaretMoved.assert_not_called()
|
||||
|
||||
def test_caret_moved_defers_to_chromium_without_loading_spinner(self):
|
||||
testScript = steam_script.Script.__new__(steam_script.Script)
|
||||
source = object()
|
||||
child = object()
|
||||
event = mock.Mock(type="object:text-caret-moved", source=source)
|
||||
|
||||
testScript.utilities = mock.Mock()
|
||||
testScript.utilities.inDocumentContent.return_value = True
|
||||
|
||||
with (
|
||||
mock.patch.object(steam_script.AXObject, "get_name", side_effect=["", "Not A Spinner"]),
|
||||
mock.patch.object(steam_script.AXObject, "iter_children", return_value=iter([child])),
|
||||
mock.patch.object(steam_script.AXUtilities, "is_image_or_canvas", return_value=True),
|
||||
mock.patch.object(steam_script.Chromium.Script, "onCaretMoved", return_value="handled") as chromiumCaretMoved,
|
||||
):
|
||||
self.assertEqual(testScript.onCaretMoved(event), "handled")
|
||||
|
||||
chromiumCaretMoved.assert_called_once_with(event)
|
||||
|
||||
def test_children_added_skips_generic_web_cache_dump_for_virtualized_list_churn(self):
|
||||
testScript = steam_script.Script.__new__(steam_script.Script)
|
||||
source = object()
|
||||
|
||||
@@ -39,7 +39,7 @@ color-calculation-max = 3
|
||||
copy-to-clipboard = false
|
||||
|
||||
[profiles.default.plugins]
|
||||
active-plugins = ["PluginManager", "OCR"]
|
||||
active-plugins = ["Clipboard", "OCR"]
|
||||
plugin-sources = []
|
||||
"""
|
||||
|
||||
@@ -64,7 +64,7 @@ class LegacyTomlSchemaMigrationTests(unittest.TestCase):
|
||||
settings.GENERAL_KEYBOARD_LAYOUT_DESKTOP,
|
||||
)
|
||||
self.assertEqual(general["cthulhuModifierKeys"], settings.DESKTOP_MODIFIER_KEYS)
|
||||
self.assertEqual(general["activePlugins"], ["PluginManager", "OCR"])
|
||||
self.assertEqual(general["activePlugins"], ["Clipboard", "OCR"])
|
||||
self.assertEqual(general["aiProvider"], settings.AI_PROVIDER_OLLAMA)
|
||||
self.assertFalse(general["aiAssistantEnabled"])
|
||||
self.assertEqual(general["ocrLanguageCode"], "eng")
|
||||
@@ -82,7 +82,7 @@ class LegacyTomlSchemaMigrationTests(unittest.TestCase):
|
||||
savedSettings = settingsPath.read_text(encoding="utf-8")
|
||||
|
||||
self.assertIn('profile = ["Default", "default"]', savedSettings)
|
||||
self.assertIn('activePlugins = ["PluginManager", "OCR"]', savedSettings)
|
||||
self.assertIn('activePlugins = ["Clipboard", "OCR"]', savedSettings)
|
||||
self.assertNotIn("format-version = 2", savedSettings)
|
||||
self.assertNotIn("[profiles.default.metadata]", savedSettings)
|
||||
|
||||
|
||||
@@ -34,14 +34,33 @@ class WebKeyGrabRegressionTests(unittest.TestCase):
|
||||
testScript._lastMouseButtonContext = ("old", 7)
|
||||
testScript._madeFindAnnouncement = True
|
||||
testScript._inFocusMode = True
|
||||
testScript._focusModeIsSticky = False
|
||||
testScript._browseModeIsSticky = False
|
||||
testScript._navSuspended = False
|
||||
testScript.removeKeyGrabs = mock.Mock()
|
||||
testScript.refreshKeyGrabs = mock.Mock()
|
||||
testScript._setNavigationSuspended = mock.Mock()
|
||||
testScript.presentMessage = mock.Mock()
|
||||
testScript.utilities = mock.Mock()
|
||||
testScript.utilities.isZombie.return_value = False
|
||||
testScript.utilities.isDocument.return_value = False
|
||||
testScript.utilities.getTopLevelDocumentForObject.return_value = None
|
||||
testScript.utilities.inFindContainer.return_value = False
|
||||
testScript.utilities.getCaretContext.return_value = (None, -1)
|
||||
testScript.utilities.queryNonEmptyText.return_value = None
|
||||
testScript.utilities.isContentEditableWithEmbeddedObjects.return_value = False
|
||||
testScript.utilities.isAnchor.return_value = False
|
||||
testScript.utilities.lastInputEventWasPageNav.return_value = False
|
||||
testScript.utilities.isFocusedWithMathChild.return_value = False
|
||||
testScript.utilities.caretMovedToSamePageFragment.return_value = False
|
||||
testScript.utilities.lastInputEventWasLineNav.return_value = False
|
||||
testScript.utilities.shouldInterruptForLocusOfFocusChange.return_value = False
|
||||
testScript.flatReviewPresenter = mock.Mock()
|
||||
testScript.flatReviewPresenter.is_active.return_value = False
|
||||
testScript.updateBraille = mock.Mock()
|
||||
testScript.speechGenerator = mock.Mock()
|
||||
testScript.speechGenerator.generateSpeech.return_value = []
|
||||
testScript._saveFocusedObjectInfo = mock.Mock()
|
||||
return testScript
|
||||
|
||||
def test_window_deactivate_does_not_drop_key_grabs(self):
|
||||
@@ -99,6 +118,44 @@ class WebKeyGrabRegressionTests(unittest.TestCase):
|
||||
self.assertFalse(result)
|
||||
testScript.refreshKeyGrabs.assert_called_once_with()
|
||||
|
||||
def test_non_document_focus_preserves_sticky_focus_mode(self):
|
||||
testScript = self._make_partial_script()
|
||||
testScript._focusModeIsSticky = True
|
||||
oldFocus = object()
|
||||
newFocus = object()
|
||||
|
||||
with mock.patch("cthulhu.scripts.web.script.AXObject.is_dead", return_value=False):
|
||||
result = web_script.Script.locus_of_focus_changed(testScript, None, oldFocus, newFocus)
|
||||
|
||||
self.assertFalse(result)
|
||||
self.assertTrue(testScript._inFocusMode)
|
||||
|
||||
def test_document_focus_restores_sticky_focus_after_suspension(self):
|
||||
testScript = self._make_partial_script()
|
||||
testScript._inFocusMode = False
|
||||
testScript._focusModeIsSticky = True
|
||||
testScript._navSuspended = True
|
||||
oldFocus = object()
|
||||
newFocus = object()
|
||||
document = object()
|
||||
testScript.utilities.getTopLevelDocumentForObject.side_effect = (
|
||||
lambda obj: document if obj is newFocus else None
|
||||
)
|
||||
|
||||
with (
|
||||
mock.patch("cthulhu.scripts.web.script.AXObject.is_dead", return_value=False),
|
||||
mock.patch("cthulhu.scripts.web.script.AXUtilities.is_unknown_or_redundant", return_value=False),
|
||||
mock.patch("cthulhu.scripts.web.script.AXUtilities.is_heading", return_value=False),
|
||||
mock.patch("cthulhu.scripts.web.script.speech.speak"),
|
||||
mock.patch("cthulhu.scripts.web.script.cthulhu.emitRegionChanged"),
|
||||
):
|
||||
result = web_script.Script.locus_of_focus_changed(testScript, None, oldFocus, newFocus)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(testScript._inFocusMode)
|
||||
testScript.presentMessage.assert_called_with(messages.MODE_FOCUS_IS_STICKY)
|
||||
testScript.refreshKeyGrabs.assert_called_once_with()
|
||||
|
||||
|
||||
class WebActiveWindowRegressionTests(unittest.TestCase):
|
||||
def test_sanity_check_recovers_missing_script_app_from_active_window(self):
|
||||
@@ -395,6 +452,23 @@ class WebDynamicContentRecoveryRegressionTests(unittest.TestCase):
|
||||
setLocusOfFocus.assert_called_once_with(event, source, False)
|
||||
testScript.utilities.setCaretContext.assert_called_once_with(source, 0)
|
||||
|
||||
def test_browse_mode_sticky_blocks_web_app_descendant_focus_claim(self):
|
||||
testScript = self._make_dynamic_script()
|
||||
source = object()
|
||||
event = mock.Mock(detail1=1, source=source)
|
||||
testScript._browseModeIsSticky = True
|
||||
testScript.utilities.getDocumentForObject.return_value = "document"
|
||||
testScript.utilities.isWebAppDescendant.return_value = True
|
||||
|
||||
with (
|
||||
mock.patch.object(web_script.cthulhu_state, "locusOfFocus", object()),
|
||||
mock.patch.object(web_script.cthulhu, "setLocusOfFocus") as setLocusOfFocus,
|
||||
):
|
||||
result = web_script.Script.onFocusedChanged(testScript, event)
|
||||
|
||||
self.assertTrue(result)
|
||||
setLocusOfFocus.assert_not_called()
|
||||
|
||||
|
||||
class WebContextReplicantRegressionTests(unittest.TestCase):
|
||||
def test_same_object_replicant_can_recover_before_old_focus_is_dead(self):
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||
|
||||
from cthulhu import where_am_i_presenter
|
||||
from cthulhu.plugins.WindowTitleReader.plugin import WindowTitleReader
|
||||
|
||||
|
||||
class WindowTitleFallbackRegressionTests(unittest.TestCase):
|
||||
def test_activate_registers_reader_api_and_starts_fallback_tracking(self):
|
||||
plugin = WindowTitleReader()
|
||||
plugin.app = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(plugin, "_register_keybinding"),
|
||||
mock.patch.object(plugin, "_start_tracking") as startTracking,
|
||||
):
|
||||
plugin.activate(plugin)
|
||||
|
||||
plugin.app.getDynamicApiManager.return_value.registerAPI.assert_called_once_with(
|
||||
"WindowTitleReader",
|
||||
plugin,
|
||||
overwrite=True,
|
||||
)
|
||||
startTracking.assert_called_once_with()
|
||||
|
||||
def test_poll_schedules_fallback_after_active_window_changes(self):
|
||||
plugin = WindowTitleReader()
|
||||
plugin._pollSourceId = 1
|
||||
plugin._lastActiveWindowId = 100
|
||||
activeWindow = mock.Mock(id=200)
|
||||
|
||||
with (
|
||||
mock.patch.object(plugin, "_get_active_window", return_value=activeWindow),
|
||||
mock.patch.object(plugin, "_get_current_title", return_value="XTerm"),
|
||||
mock.patch.object(plugin, "_schedule_fallback_title") as scheduleFallback,
|
||||
):
|
||||
self.assertTrue(plugin._poll_window_title())
|
||||
|
||||
scheduleFallback.assert_called_once_with()
|
||||
|
||||
def test_poll_keeps_previous_window_across_transient_missing_active_window(self):
|
||||
plugin = WindowTitleReader()
|
||||
plugin._pollSourceId = 1
|
||||
plugin._lastActiveWindowId = 100
|
||||
|
||||
with mock.patch.object(plugin, "_get_active_window", return_value=None):
|
||||
self.assertTrue(plugin._poll_window_title())
|
||||
|
||||
self.assertEqual(plugin._lastActiveWindowId, 100)
|
||||
|
||||
def test_fallback_is_empty_when_atspi_exposes_same_title(self):
|
||||
plugin = WindowTitleReader()
|
||||
plugin._pollSourceId = 1
|
||||
activeWindow = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(plugin, "_get_active_window", return_value=activeWindow),
|
||||
mock.patch.object(plugin, "_get_current_title", return_value="Terminal"),
|
||||
):
|
||||
self.assertEqual(plugin.get_fallback_title("Terminal"), "")
|
||||
|
||||
def test_fallback_replaces_wine_desktop_title(self):
|
||||
plugin = WindowTitleReader()
|
||||
plugin._pollSourceId = 1
|
||||
activeWindow = mock.Mock()
|
||||
|
||||
with (
|
||||
mock.patch.object(plugin, "_get_active_window", return_value=activeWindow),
|
||||
mock.patch.object(plugin, "_get_current_title", return_value="Game Window"),
|
||||
):
|
||||
self.assertEqual(plugin.get_fallback_title("Wine Desktop"), "Game Window")
|
||||
|
||||
def test_present_title_uses_fallback_instead_of_atspi_title(self):
|
||||
presenter = where_am_i_presenter.WhereAmIPresenter()
|
||||
script = mock.Mock()
|
||||
script.speechGenerator.generateTitle.return_value = [("Wine Desktop", None)]
|
||||
|
||||
with (
|
||||
mock.patch.object(where_am_i_presenter.cthulhu_state, "locusOfFocus", object()),
|
||||
mock.patch.object(where_am_i_presenter.AXObject, "is_dead", return_value=False),
|
||||
mock.patch.object(
|
||||
presenter,
|
||||
"_get_fallback_title",
|
||||
return_value="Game Window",
|
||||
),
|
||||
):
|
||||
self.assertTrue(presenter.present_title(script))
|
||||
|
||||
script.presentMessage.assert_called_once_with("Game Window")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user