From b224d699c0de1ba7d90adc9e2d20cf88f5932e15 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Fri, 15 May 2026 01:54:17 -0400 Subject: [PATCH 1/8] Fixed status announcements interrupting page information for the web navigation. --- src/cthulhu/scripts/web/script.py | 5 +- src/cthulhu/structural_navigation.py | 5 +- ...structural_navigation_table_regressions.py | 40 ++++++++++++ tests/test_web_input_regressions.py | 63 +++++++++++++++++++ 4 files changed, 109 insertions(+), 4 deletions(-) diff --git a/src/cthulhu/scripts/web/script.py b/src/cthulhu/scripts/web/script.py index db71882..ccc5071 100644 --- a/src/cthulhu/scripts/web/script.py +++ b/src/cthulhu/scripts/web/script.py @@ -1504,6 +1504,7 @@ class Script(default.Script): def togglePresentationMode(self, inputEvent, documentFrame=None): [obj, characterOffset] = self.utilities.getCaretContext(documentFrame) + interrupt = inputEvent is not None if self._inFocusMode: parent = AXObject.get_parent(obj) if AXUtilities.is_list_box(parent): @@ -1511,7 +1512,7 @@ class Script(default.Script): elif AXUtilities.is_menu(parent): self.utilities.setCaretContext(AXObject.get_parent(parent), -1) if not self._loadingDocumentContent: - self.presentMessage(messages.MODE_BROWSE) + self.presentMessage(messages.MODE_BROWSE, interrupt=interrupt) if not self._shouldSuppressBrowseModeSound(obj, inputEvent): sound_theme_manager.getManager().playBrowseModeSound() else: @@ -1521,7 +1522,7 @@ class Script(default.Script): or inputEvent): self.utilities.grabFocus(obj) - self.presentMessage(messages.MODE_FOCUS) + self.presentMessage(messages.MODE_FOCUS, interrupt=interrupt) sound_theme_manager.getManager().playFocusModeSound() self._inFocusMode = not self._inFocusMode self._focusModeIsSticky = False diff --git a/src/cthulhu/structural_navigation.py b/src/cthulhu/structural_navigation.py index 602d095..67ce85b 100644 --- a/src/cthulhu/structural_navigation.py +++ b/src/cthulhu/structural_navigation.py @@ -935,9 +935,9 @@ class StructuralNavigation: return if not isNext: - self._script.presentMessage(messages.WRAPPING_TO_BOTTOM) + wrapMessage = messages.WRAPPING_TO_BOTTOM else: - self._script.presentMessage(messages.WRAPPING_TO_TOP) + wrapMessage = messages.WRAPPING_TO_TOP matches = self._getAll(structuralNavigationObject, arg) if not isNext: @@ -946,6 +946,7 @@ class StructuralNavigation: for match in matches: if _isValidMatch(match): structuralNavigationObject.present(match, arg) + self._script.presentMessage(wrapMessage, interrupt=False) return structuralNavigationObject.present(None, arg) diff --git a/tests/test_structural_navigation_table_regressions.py b/tests/test_structural_navigation_table_regressions.py index 7a3120f..2487378 100644 --- a/tests/test_structural_navigation_table_regressions.py +++ b/tests/test_structural_navigation_table_regressions.py @@ -42,6 +42,46 @@ class StructuralNavigationTableRegressionTests(unittest.TestCase): interrupt=False, ) + def test_wrapping_announcement_does_not_interrupt_wrapped_object(self): + navigator = structural_navigation.StructuralNavigation.__new__( + structural_navigation.StructuralNavigation + ) + script = mock.Mock() + script.utilities.isZombie.return_value = False + script.utilities.isHidden.return_value = False + script.utilities.isEmpty.return_value = False + script.utilities.pathComparison.return_value = 0 + navigator._script = script + + first = object() + current = object() + structuralNavigationObject = mock.Mock() + structuralNavigationObject.predicate = None + events = [] + structuralNavigationObject.present.side_effect = lambda obj, arg=None: events.append( + ("present", obj, arg) + ) + script.presentMessage.side_effect = lambda message, **kwargs: events.append( + ("message", message, kwargs) + ) + navigator._getAll = mock.Mock(return_value=[first, current]) + + with ( + mock.patch.object(structural_navigation.settings, "wrappedStructuralNavigation", True), + mock.patch.object(structural_navigation.AXObject, "is_dead", return_value=False), + mock.patch.object(structural_navigation.AXObject, "get_parent", return_value=None), + mock.patch.object(structural_navigation.AXObject, "get_path", side_effect=lambda obj: [id(obj)]), + ): + navigator.goObject(structuralNavigationObject, True, current, "arg") + + self.assertEqual( + events, + [ + ("present", first, "arg"), + ("message", messages.WRAPPING_TO_TOP, {"interrupt": False}), + ], + ) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_web_input_regressions.py b/tests/test_web_input_regressions.py index b17933b..8add719 100644 --- a/tests/test_web_input_regressions.py +++ b/tests/test_web_input_regressions.py @@ -100,6 +100,69 @@ class WebKeyGrabRegressionTests(unittest.TestCase): testScript.refreshKeyGrabs.assert_called_once_with() +class WebPresentationModeSpeechRegressionTests(unittest.TestCase): + def _make_script(self, inFocusMode): + testScript = web_script.Script.__new__(web_script.Script) + testScript._inFocusMode = inFocusMode + testScript._focusModeIsSticky = True + testScript._browseModeIsSticky = True + testScript._loadingDocumentContent = False + testScript._lastCommandWasCaretNav = False + testScript._lastCommandWasStructNav = False + testScript.utilities = mock.Mock() + testScript.utilities.getCaretContext.return_value = ("button", 0) + testScript.utilities.grabFocusWhenSettingCaret.return_value = True + testScript.presentMessage = mock.Mock() + testScript.refreshKeyGrabs = mock.Mock() + testScript._shouldSuppressBrowseModeSound = mock.Mock(return_value=False) + return testScript + + def test_automatic_browse_mode_announcement_does_not_interrupt_focus_presentation(self): + testScript = self._make_script(inFocusMode=True) + soundManager = mock.Mock() + + with ( + mock.patch.object(web_script.AXObject, "get_parent", return_value=None), + mock.patch.object(web_script.AXUtilities, "is_list_box", return_value=False), + mock.patch.object(web_script.AXUtilities, "is_menu", return_value=False), + mock.patch.object(web_script.sound_theme_manager, "getManager", return_value=soundManager), + ): + web_script.Script.togglePresentationMode(testScript, None, "document") + + testScript.presentMessage.assert_called_once_with(messages.MODE_BROWSE, interrupt=False) + soundManager.playBrowseModeSound.assert_called_once_with() + self.assertFalse(testScript._inFocusMode) + self.assertFalse(testScript._focusModeIsSticky) + self.assertFalse(testScript._browseModeIsSticky) + testScript.refreshKeyGrabs.assert_called_once_with() + + def test_automatic_focus_mode_announcement_does_not_interrupt_focus_presentation(self): + testScript = self._make_script(inFocusMode=False) + soundManager = mock.Mock() + + with mock.patch.object( + web_script.sound_theme_manager, + "getManager", + return_value=soundManager, + ): + web_script.Script.togglePresentationMode(testScript, None, "document") + + testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS, interrupt=False) + soundManager.playFocusModeSound.assert_called_once_with() + self.assertTrue(testScript._inFocusMode) + self.assertFalse(testScript._focusModeIsSticky) + self.assertFalse(testScript._browseModeIsSticky) + testScript.refreshKeyGrabs.assert_called_once_with() + + def test_manual_presentation_mode_toggle_still_interrupts(self): + testScript = self._make_script(inFocusMode=False) + + with mock.patch.object(web_script.sound_theme_manager, "getManager", return_value=mock.Mock()): + web_script.Script.togglePresentationMode(testScript, object(), "document") + + testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS, interrupt=True) + + if __name__ == "__main__": unittest.main() From 0acba6a733eb168d20530a0121c559d484fe4487 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Fri, 15 May 2026 15:48:52 -0400 Subject: [PATCH 2/8] Backported Chrome omnibox fix from Orca. --- src/cthulhu/ax_object.py | 32 +++++++++- src/cthulhu/focus_manager.py | 7 +++ tests/test_chromium_omnibox_regressions.py | 68 ++++++++++++++++++++++ 3 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 tests/test_chromium_omnibox_regressions.py diff --git a/src/cthulhu/ax_object.py b/src/cthulhu/ax_object.py index 00137a6..3614941 100644 --- a/src/cthulhu/ax_object.py +++ b/src/cthulhu/ax_object.py @@ -148,14 +148,42 @@ class AXObject: if not toolkit_name.startswith("qt"): return False + if not AXObject._can_reach_application(obj): + tokens = ["AXObject:", obj, "has broken ancestry. See qt bug 130116."] + debug.print_tokens(debug.LEVEL_INFO, tokens, True) + return True + + return False + + @staticmethod + def _can_reach_application(obj: Atspi.Accessible) -> bool: + """Returns True if obj's ancestry reaches the application.""" + reached_app = False parent = AXObject.get_parent(obj) while parent and not reached_app: reached_app = AXObject.get_role(parent) == Atspi.Role.APPLICATION parent = AXObject.get_parent(parent) - if not reached_app: - tokens = ["AXObject:", obj, "has broken ancestry. See qt bug 130116."] + return reached_app + + @staticmethod + def has_broken_popup_ancestry(obj: Atspi.Accessible) -> bool: + """Returns True if obj is a popup item whose ancestry is broken.""" + + if obj is None or AXObject.is_dead(obj): + return False + + # Chromium Omnibox popups can lose their path back to the frame after + # the popup is closed and reopened. + if not AXObject.get_toolkit_name(obj).startswith("chromium"): + return False + + if AXObject.get_role(obj) != Atspi.Role.LIST_ITEM: + return False + + if not AXObject._can_reach_application(obj): + tokens = ["AXObject:", obj, "has broken popup ancestry."] debug.print_tokens(debug.LEVEL_INFO, tokens, True) return True diff --git a/src/cthulhu/focus_manager.py b/src/cthulhu/focus_manager.py index 3625db2..8391150 100644 --- a/src/cthulhu/focus_manager.py +++ b/src/cthulhu/focus_manager.py @@ -348,6 +348,13 @@ class FocusManager: tokens.extend(["in", app]) _log_tokens(tokens) + if frame is None and AXObject.has_broken_popup_ancestry(self._focus): + _log_tokens( + ["Not clearing active window; focus", self._focus, "is in popup with broken ancestry"], + "broken-popup-ancestry", + ) + return + if frame == self._window: _log("Setting active window to existing active window", "no-change") elif frame is None: diff --git a/tests/test_chromium_omnibox_regressions.py b/tests/test_chromium_omnibox_regressions.py new file mode 100644 index 0000000..81d38f7 --- /dev/null +++ b/tests/test_chromium_omnibox_regressions.py @@ -0,0 +1,68 @@ +import unittest +from unittest import mock + +import gi +gi.require_version("Atspi", "2.0") +from gi.repository import Atspi + +from cthulhu import ax_object +from cthulhu import cthulhu_state +from cthulhu import focus_manager + + +class ChromiumOmniboxRegressionTests(unittest.TestCase): + def tearDown(self): + cthulhu_state.activeWindow = None + cthulhu_state.locusOfFocus = None + + def test_detects_chromium_popup_item_with_broken_ancestry(self): + popupItem = object() + + with ( + mock.patch.object(ax_object.AXObject, "is_dead", return_value=False), + mock.patch.object(ax_object.AXObject, "get_toolkit_name", return_value="chromium"), + mock.patch.object(ax_object.AXObject, "get_role", return_value=Atspi.Role.LIST_ITEM), + mock.patch.object(ax_object.AXObject, "get_parent", return_value=None), + mock.patch.object(ax_object.debug, "print_tokens"), + ): + self.assertTrue(ax_object.AXObject.has_broken_popup_ancestry(popupItem)) + + def test_ignores_chromium_popup_item_with_valid_ancestry(self): + popupItem = object() + parent = object() + + def get_role(obj): + if obj is popupItem: + return Atspi.Role.LIST_ITEM + return Atspi.Role.APPLICATION + + with ( + mock.patch.object(ax_object.AXObject, "is_dead", return_value=False), + mock.patch.object(ax_object.AXObject, "get_toolkit_name", return_value="chromium"), + mock.patch.object(ax_object.AXObject, "get_role", side_effect=get_role), + mock.patch.object(ax_object.AXObject, "get_parent", side_effect=[parent, None]), + ): + self.assertFalse(ax_object.AXObject.has_broken_popup_ancestry(popupItem)) + + def test_preserves_active_window_when_chromium_popup_ancestry_is_broken(self): + activeWindow = object() + popupItem = object() + cthulhu_state.activeWindow = activeWindow + cthulhu_state.locusOfFocus = popupItem + + controller = mock.Mock() + app = mock.Mock() + manager = None + with ( + mock.patch.object(focus_manager.dbus_service, "get_remote_controller", return_value=controller), + mock.patch.object(focus_manager.AXObject, "has_broken_popup_ancestry", return_value=True), + ): + manager = focus_manager.FocusManager(app) + manager.set_active_window(None) + + self.assertIs(manager.get_active_window(), activeWindow) + self.assertIs(cthulhu_state.activeWindow, activeWindow) + + +if __name__ == "__main__": + unittest.main() From 4145e9375baf8da01f5b6408c8faf344c2391536 Mon Sep 17 00:00:00 2001 From: Michael Taboada Date: Fri, 15 May 2026 13:46:34 -0700 Subject: [PATCH 3/8] Fix some interruption issues in discord. --- src/cthulhu/scripts/web/script.py | 46 +++++++++++++- tests/test_web_input_regressions.py | 98 +++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/src/cthulhu/scripts/web/script.py b/src/cthulhu/scripts/web/script.py index ccc5071..caf3e1e 100644 --- a/src/cthulhu/scripts/web/script.py +++ b/src/cthulhu/scripts/web/script.py @@ -1083,7 +1083,7 @@ class Script(default.Script): """Speaks the specified contents.""" utterances = self.speechGenerator.generateContents(contents, **args) - speech.speak(utterances) + speech.speak(utterances, interrupt=args.get("interrupt", True)) def sayCharacter(self, obj): """Speaks the character at the current caret position.""" @@ -1874,11 +1874,15 @@ class Script(default.Script): debug.printMessage(debug.LEVEL_INFO, msg, True) return True + if self.utilities.shouldInterruptForLocusOfFocusChange(oldFocus, newFocus, event): + self.presentationInterrupt() + + args["interrupt"] = False if contents: self.speakContents(contents, **args) else: utterances = self.speechGenerator.generateSpeech(newFocus, **args) - speech.speak(utterances) + speech.speak(utterances, interrupt=False) self._saveFocusedObjectInfo(newFocus) @@ -2650,6 +2654,44 @@ class Script(default.Script): self._lastCommandWasMouseButton = True return False + def onDescriptionChanged(self, event): + """Callback for object:property-change:accessible-description events.""" + + if self.utilities.eventIsBrowserUINoise(event): + msg = "WEB: Ignoring event believed to be browser UI noise" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return True + + obj = event.source + if not self.utilities.inDocumentContent(obj): + return False + + descriptions = self.pointOfReference.get('descriptions', {}) + oldDescription = descriptions.get(hash(obj)) + if oldDescription == event.any_data: + tokens = ["WEB: Old description (", oldDescription, ") is the same as new one"] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + return True + + descriptions[hash(obj)] = event.any_data + self.pointOfReference['descriptions'] = descriptions + if obj != cthulhu_state.locusOfFocus: + msg = "WEB: Description change is for object other than locusOfFocus" + debug.printMessage(debug.LEVEL_INFO, msg, True) + return True + + if not event.any_data: + return True + + name = AXObject.get_name(obj) + if self.utilities.stringsAreRedundant(name, event.any_data): + tokens = ["WEB: Description change is redundant with name for", obj] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + return True + + self.presentMessage(event.any_data, interrupt=False) + return True + def onNameChanged(self, event): """Callback for object:property-change:accessible-name events.""" diff --git a/tests/test_web_input_regressions.py b/tests/test_web_input_regressions.py index 8add719..a20e9c6 100644 --- a/tests/test_web_input_regressions.py +++ b/tests/test_web_input_regressions.py @@ -163,6 +163,104 @@ class WebPresentationModeSpeechRegressionTests(unittest.TestCase): testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS, interrupt=True) +class WebFocusSpeechInterruptionRegressionTests(unittest.TestCase): + def _make_script(self): + testScript = web_script.Script.__new__(web_script.Script) + testScript._navSuspended = False + testScript._lastCommandWasCaretNav = False + testScript._lastCommandWasStructNav = False + testScript._lastCommandWasMouseButton = False + testScript._inFocusMode = True + testScript._focusModeIsSticky = True + testScript._browseModeIsSticky = False + testScript.flatReviewPresenter = mock.Mock() + testScript.flatReviewPresenter.is_active.return_value = False + testScript.utilities = mock.Mock() + testScript.utilities.isZombie.return_value = False + testScript.utilities.isDocument.return_value = False + testScript.utilities.getTopLevelDocumentForObject.return_value = "document" + testScript.utilities.inFindContainer.return_value = False + testScript.utilities.queryNonEmptyText.return_value = None + testScript.utilities.isContentEditableWithEmbeddedObjects.return_value = False + testScript.utilities.isAnchor.return_value = False + testScript.utilities.lastInputEventWasPageNav.return_value = False + testScript.utilities.isFocusedWithMathChild.return_value = False + testScript.utilities.caretMovedToSamePageFragment.return_value = False + testScript.utilities.lastInputEventWasLineNav.return_value = False + testScript.utilities.inDocumentContent.return_value = True + testScript.utilities.shouldInterruptForLocusOfFocusChange.return_value = True + testScript.speechGenerator = mock.Mock() + testScript.speechGenerator.generateSpeech.return_value = ["focus speech"] + testScript.updateBraille = mock.Mock() + testScript.presentationInterrupt = mock.Mock() + testScript._saveFocusedObjectInfo = mock.Mock() + testScript.refreshKeyGrabs = mock.Mock() + return testScript + + def test_focus_generated_speech_uses_explicit_interrupt_then_appends(self): + testScript = self._make_script() + oldFocus = object() + newFocus = object() + event = mock.Mock(type="object:state-changed:focused", source=newFocus) + + with ( + mock.patch.object(web_script.AXObject, "is_dead", return_value=False), + mock.patch.object(web_script.AXUtilities, "is_unknown_or_redundant", return_value=False), + mock.patch.object(web_script.AXUtilities, "is_heading", return_value=False), + mock.patch.object(web_script.speech, "speak") as speak, + mock.patch.object(web_script.cthulhu, "emitRegionChanged"), + ): + result = web_script.Script.locus_of_focus_changed( + testScript, event, oldFocus, newFocus + ) + + self.assertTrue(result) + testScript.presentationInterrupt.assert_called_once_with() + speak.assert_called_once_with(["focus speech"], interrupt=False) + + +class WebDescriptionChangeRegressionTests(unittest.TestCase): + def _make_script(self): + testScript = web_script.Script.__new__(web_script.Script) + testScript.pointOfReference = {} + testScript.utilities = mock.Mock() + testScript.utilities.eventIsBrowserUINoise.return_value = False + testScript.utilities.inDocumentContent.return_value = True + testScript.utilities.stringsAreRedundant.side_effect = ( + lambda first, second: first == second + ) + testScript.presentMessage = mock.Mock() + return testScript + + def test_document_description_change_matching_name_is_ignored(self): + testScript = self._make_script() + source = object() + event = mock.Mock(source=source, any_data="More") + + with ( + mock.patch.object(web_script.cthulhu_state, "locusOfFocus", source), + mock.patch.object(web_script.AXObject, "get_name", return_value="More"), + ): + result = web_script.Script.onDescriptionChanged(testScript, event) + + self.assertTrue(result) + testScript.presentMessage.assert_not_called() + + def test_document_description_change_appends_to_focus_presentation(self): + testScript = self._make_script() + source = object() + event = mock.Mock(source=source, any_data="Helpful tooltip") + + with ( + mock.patch.object(web_script.cthulhu_state, "locusOfFocus", source), + mock.patch.object(web_script.AXObject, "get_name", return_value="Button"), + ): + result = web_script.Script.onDescriptionChanged(testScript, event) + + self.assertTrue(result) + testScript.presentMessage.assert_called_once_with("Helpful tooltip", interrupt=False) + + if __name__ == "__main__": unittest.main() From a84aec94a9f3b5e5dde65256decc9fd87c50e4ea Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Sat, 16 May 2026 23:29:51 -0400 Subject: [PATCH 4/8] Preserve web active window during transient Brave focus checks --- src/cthulhu/scripts/web/script_utilities.py | 16 +++++++ tests/test_web_input_regressions.py | 53 +++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/src/cthulhu/scripts/web/script_utilities.py b/src/cthulhu/scripts/web/script_utilities.py index 749ae68..f9949df 100644 --- a/src/cthulhu/scripts/web/script_utilities.py +++ b/src/cthulhu/scripts/web/script_utilities.py @@ -276,6 +276,14 @@ class Utilities(script_utilities.Utilities): def sanityCheckActiveWindow(self): app = self._script.app + if app is None: + app = AXObject.get_application(cthulhu_state.activeWindow) \ + or AXObject.get_application(cthulhu_state.locusOfFocus) + if app is not None: + tokens = ["WEB: recovered script app for active window check:", app] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + self._script.app = app + if AXObject.get_parent(cthulhu_state.activeWindow) == app: return True @@ -299,6 +307,14 @@ class Utilities(script_utilities.Utilities): setattr(self._script, attr, value) window = self.activeWindow(app) + if window is None: + tokens = [ + "WARNING: WEB could not confirm a replacement active window; preserving", + cthulhu_state.activeWindow, + ] + debug.printTokens(debug.LEVEL_INFO, tokens, True) + return cthulhu_state.activeWindow is not None + self._script.app = AXObject.get_application(window) tokens = ["WEB: updating script's app to", self._script.app] debug.printTokens(debug.LEVEL_INFO, tokens, True) diff --git a/tests/test_web_input_regressions.py b/tests/test_web_input_regressions.py index a20e9c6..287f0cc 100644 --- a/tests/test_web_input_regressions.py +++ b/tests/test_web_input_regressions.py @@ -100,6 +100,59 @@ class WebKeyGrabRegressionTests(unittest.TestCase): testScript.refreshKeyGrabs.assert_called_once_with() +class WebActiveWindowRegressionTests(unittest.TestCase): + def test_sanity_check_recovers_missing_script_app_from_active_window(self): + testScript = mock.Mock(app=None) + utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities) + utilities._script = testScript + activeWindow = object() + app = object() + + with ( + mock.patch.object(web_script_utilities.cthulhu_state, "activeWindow", activeWindow), + mock.patch.object(web_script_utilities.cthulhu_state, "locusOfFocus", None), + mock.patch.object( + web_script_utilities.AXObject, + "get_application", + side_effect=lambda obj: app if obj is activeWindow else None, + ), + mock.patch.object( + web_script_utilities.AXObject, + "get_parent", + side_effect=lambda obj: app if obj is activeWindow else None, + ), + mock.patch.object(web_script_utilities.cthulhu, "setActiveWindow") as setActiveWindow, + ): + result = web_script_utilities.Utilities.sanityCheckActiveWindow(utilities) + + self.assertTrue(result) + self.assertIs(testScript.app, app) + setActiveWindow.assert_not_called() + + def test_sanity_check_preserves_current_window_when_recovery_is_inconclusive(self): + oldApp = object() + testScript = mock.Mock(app=oldApp) + utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities) + utilities._script = testScript + activeWindow = object() + + with ( + mock.patch.object(web_script_utilities.cthulhu_state, "activeWindow", activeWindow), + mock.patch.object( + web_script_utilities.AXObject, + "get_parent", + return_value=object(), + ), + mock.patch.object(utilities, "activeWindow", return_value=None), + mock.patch.object(web_script_utilities.cthulhu, "setActiveWindow") as setActiveWindow, + ): + result = web_script_utilities.Utilities.sanityCheckActiveWindow(utilities) + + self.assertTrue(result) + self.assertIs(testScript.app, oldApp) + setActiveWindow.assert_not_called() + + class WebPresentationModeSpeechRegressionTests(unittest.TestCase): def _make_script(self, inFocusMode): testScript = web_script.Script.__new__(web_script.Script) From 009938c49594050b2d41ea1ef3460c437b0108a1 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Tue, 19 May 2026 15:26:44 -0400 Subject: [PATCH 5/8] Changed how speech interruptions are handled hopefully improved things being interrupted when they shouldn't be. --- src/cthulhu/scripts/default.py | 8 +- src/cthulhu/scripts/web/script.py | 19 ++-- src/cthulhu/speech.py | 5 +- src/cthulhu/speechdispatcherfactory.py | 2 +- src/cthulhu/structural_navigation.py | 7 +- .../test_speech_default_policy_regressions.py | 91 +++++++++++++++++++ ..._speechdispatcher_interrupt_regressions.py | 8 ++ ...structural_navigation_table_regressions.py | 5 +- tests/test_web_input_regressions.py | 10 +- 9 files changed, 130 insertions(+), 25 deletions(-) create mode 100644 tests/test_speech_default_policy_regressions.py diff --git a/src/cthulhu/scripts/default.py b/src/cthulhu/scripts/default.py index b665f4a..911d2e9 100644 --- a/src/cthulhu/scripts/default.py +++ b/src/cthulhu/scripts/default.py @@ -3470,7 +3470,7 @@ class Script(script.Script): return True def presentMessage(self, fullMessage, briefMessage=None, voice=None, resetStyles=True, - force=False, interrupt=True): + force=False, interrupt=False): """Convenience method to speak a message and 'flash' it in braille. Arguments: @@ -3484,6 +3484,8 @@ class Script(script.Script): the briefMessage should set briefMessage to an empty string. - voice: The voice to use when speaking this message. By default, the "system" voice will be used. + - interrupt: If True, any current speech should be interrupted + prior to speaking the new text. The default queues the message. """ if not fullMessage: @@ -3959,7 +3961,7 @@ class Script(script.Script): voice = self.speechGenerator.voice(string=character) speech.speakCharacter(character, voice) - def speakMessage(self, string, voice=None, interrupt=True, resetStyles=True, force=False): + def speakMessage(self, string, voice=None, interrupt=False, resetStyles=True, force=False): """Method to speak a single string. Scripts should use this method rather than calling speech.speak directly. @@ -3967,7 +3969,7 @@ class Script(script.Script): - voice: The voice to use. By default, the "system" voice will be used. - interrupt: If True, any current speech should be interrupted - prior to speaking the new text. + prior to speaking the new text. The default queues the message. """ if not cthulhu.cthulhuApp.settingsManager.getSetting('enableSpeech') \ diff --git a/src/cthulhu/scripts/web/script.py b/src/cthulhu/scripts/web/script.py index caf3e1e..1dde9f4 100644 --- a/src/cthulhu/scripts/web/script.py +++ b/src/cthulhu/scripts/web/script.py @@ -215,7 +215,7 @@ class Script(default.Script): self._clearSyntheticWebSelection() if oldContents: self.speakContents(oldContents) - self.speakMessage(messages.TEXT_UNSELECTED, interrupt=False) + self.speakMessage(messages.TEXT_UNSELECTED) return True self.pointOfReference["syntheticWebSelection"] = { @@ -247,7 +247,7 @@ class Script(default.Script): if deltaContents: self.speakContents(deltaContents) - self.speakMessage(message, interrupt=False) + self.speakMessage(message) return True @@ -1083,7 +1083,7 @@ class Script(default.Script): """Speaks the specified contents.""" utterances = self.speechGenerator.generateContents(contents, **args) - speech.speak(utterances, interrupt=args.get("interrupt", True)) + speech.speak(utterances, interrupt=args.get("interrupt", False)) def sayCharacter(self, obj): """Speaks the character at the current caret position.""" @@ -1504,7 +1504,6 @@ class Script(default.Script): def togglePresentationMode(self, inputEvent, documentFrame=None): [obj, characterOffset] = self.utilities.getCaretContext(documentFrame) - interrupt = inputEvent is not None if self._inFocusMode: parent = AXObject.get_parent(obj) if AXUtilities.is_list_box(parent): @@ -1512,7 +1511,10 @@ class Script(default.Script): elif AXUtilities.is_menu(parent): self.utilities.setCaretContext(AXObject.get_parent(parent), -1) if not self._loadingDocumentContent: - self.presentMessage(messages.MODE_BROWSE, interrupt=interrupt) + if inputEvent is not None: + self.presentMessage(messages.MODE_BROWSE, interrupt=True) + else: + self.presentMessage(messages.MODE_BROWSE) if not self._shouldSuppressBrowseModeSound(obj, inputEvent): sound_theme_manager.getManager().playBrowseModeSound() else: @@ -1522,7 +1524,10 @@ class Script(default.Script): or inputEvent): self.utilities.grabFocus(obj) - self.presentMessage(messages.MODE_FOCUS, interrupt=interrupt) + if inputEvent is not None: + self.presentMessage(messages.MODE_FOCUS, interrupt=True) + else: + self.presentMessage(messages.MODE_FOCUS) sound_theme_manager.getManager().playFocusModeSound() self._inFocusMode = not self._inFocusMode self._focusModeIsSticky = False @@ -2689,7 +2694,7 @@ class Script(default.Script): debug.printTokens(debug.LEVEL_INFO, tokens, True) return True - self.presentMessage(event.any_data, interrupt=False) + self.presentMessage(event.any_data) return True def onNameChanged(self, event): diff --git a/src/cthulhu/speech.py b/src/cthulhu/speech.py index 604aeeb..3c02a7b 100644 --- a/src/cthulhu/speech.py +++ b/src/cthulhu/speech.py @@ -425,10 +425,11 @@ def _speak(text: str, acss: Optional[Any], interrupt: bool) -> None: debug.printMessage(debug.LEVEL_INFO, msg, True) _speechserver.speak(text, resolvedVoice, interrupt) # type: ignore -def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = True) -> None: +def speak(content: Union[str, List[Any]], acss: Optional[Any] = None, interrupt: bool = False) -> None: """Speaks the given content. The content can be either a simple string or an array of arrays of objects returned by a speech - generator.""" + generator. Speech queues by default; callers that need to cancel + current output should pass interrupt=True or call presentationInterrupt().""" if settings.silenceSpeech: return diff --git a/src/cthulhu/speechdispatcherfactory.py b/src/cthulhu/speechdispatcherfactory.py index 3896994..01a9e89 100644 --- a/src/cthulhu/speechdispatcherfactory.py +++ b/src/cthulhu/speechdispatcherfactory.py @@ -632,7 +632,7 @@ class SpeechServer(speechserver.SpeechServer): return families - def speak(self, text=None, acss=None, interrupt=True): + def speak(self, text=None, acss=None, interrupt=False): if not text: return diff --git a/src/cthulhu/structural_navigation.py b/src/cthulhu/structural_navigation.py index 67ce85b..b595da3 100644 --- a/src/cthulhu/structural_navigation.py +++ b/src/cthulhu/structural_navigation.py @@ -946,7 +946,7 @@ class StructuralNavigation: for match in matches: if _isValidMatch(match): structuralNavigationObject.present(match, arg) - self._script.presentMessage(wrapMessage, interrupt=False) + self._script.presentMessage(wrapMessage) return structuralNavigationObject.present(None, arg) @@ -2219,14 +2219,13 @@ class StructuralNavigation: if settings.speakCellCoordinates: [row, col] = self.getCellCoordinates(cell) self._script.presentMessage( - messages.TABLE_CELL_COORDINATES % {"row": row + 1, "column": col + 1}, - interrupt=False, + messages.TABLE_CELL_COORDINATES % {"row": row + 1, "column": col + 1} ) rowspan, colspan = self._script.utilities.rowAndColumnSpan(cell) spanString = messages.cellSpan(rowspan, colspan) if spanString and settings.speakCellSpan: - self._script.presentMessage(spanString, interrupt=False) + self._script.presentMessage(spanString) ######################## # # diff --git a/tests/test_speech_default_policy_regressions.py b/tests/test_speech_default_policy_regressions.py new file mode 100644 index 0000000..debb79b --- /dev/null +++ b/tests/test_speech_default_policy_regressions.py @@ -0,0 +1,91 @@ +import sys +import unittest +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +from cthulhu import settings +from cthulhu import speech +from cthulhu import cthulhu_state +from cthulhu.scripts import default + + +class SpeechDefaultPolicyRegressionTests(unittest.TestCase): + def _make_settings_manager(self): + values = { + "enableSpeech": True, + "onlySpeakDisplayedText": False, + "messagesAreDetailed": True, + "enableBraille": False, + "enableBrailleMonitor": False, + "enableFlashMessages": False, + "voices": {settings.SYSTEM_VOICE: "system"}, + "capitalizationStyle": settings.CAPITALIZATION_STYLE_NONE, + "verbalizePunctuationStyle": settings.PUNCTUATION_STYLE_NONE, + } + + manager = mock.Mock() + manager.getSetting.side_effect = values.get + return manager + + def _make_script(self): + testScript = default.Script.__new__(default.Script) + testScript.speechAndVerbosityManager = mock.Mock() + return testScript + + def test_speech_speak_queues_by_default(self): + server = mock.Mock() + + with ( + mock.patch.object(speech, "_speechserver", server), + mock.patch.object(speech, "_write_to_monitor"), + mock.patch.object(speech.speech_history, "add"), + mock.patch.object(cthulhu_state, "activeScript", None), + ): + speech.speak("status") + + server.speak.assert_called_once() + self.assertFalse(server.speak.call_args.args[2]) + + def test_speech_speak_explicit_interrupt_is_preserved(self): + server = mock.Mock() + + with ( + mock.patch.object(speech, "_speechserver", server), + mock.patch.object(speech, "_write_to_monitor"), + mock.patch.object(speech.speech_history, "add"), + mock.patch.object(cthulhu_state, "activeScript", None), + ): + speech.speak("status", interrupt=True) + + server.speak.assert_called_once() + self.assertTrue(server.speak.call_args.args[2]) + + def test_present_message_queues_by_default(self): + testScript = self._make_script() + manager = self._make_settings_manager() + + with ( + mock.patch.object(default.cthulhu.cthulhuApp, "settingsManager", manager), + mock.patch.object(default.speech, "speak") as speak, + ): + default.Script.presentMessage(testScript, "Status") + + speak.assert_called_once_with("Status", "system", False) + + def test_present_message_explicit_interrupt_is_preserved(self): + testScript = self._make_script() + manager = self._make_settings_manager() + + with ( + mock.patch.object(default.cthulhu.cthulhuApp, "settingsManager", manager), + mock.patch.object(default.speech, "speak") as speak, + ): + default.Script.presentMessage(testScript, "Status", interrupt=True) + + speak.assert_called_once_with("Status", "system", True) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_speechdispatcher_interrupt_regressions.py b/tests/test_speechdispatcher_interrupt_regressions.py index 786a6a1..0f23a45 100644 --- a/tests/test_speechdispatcher_interrupt_regressions.py +++ b/tests/test_speechdispatcher_interrupt_regressions.py @@ -30,6 +30,14 @@ class SpeechDispatcherInterruptRegressionTests(unittest.TestCase): server._cancel.assert_called_once_with() server._speak.assert_called_once_with("long utterance", None) + def test_string_speech_queues_by_default(self): + server = self._make_server() + + server.speak("next") + + server._cancel.assert_not_called() + server._speak.assert_called_once_with("next", None) + def test_recent_key_echo_suppresses_backend_cancel(self): server = self._make_server() server._lastKeyEchoTime = time.time() diff --git a/tests/test_structural_navigation_table_regressions.py b/tests/test_structural_navigation_table_regressions.py index 2487378..de49dd3 100644 --- a/tests/test_structural_navigation_table_regressions.py +++ b/tests/test_structural_navigation_table_regressions.py @@ -38,8 +38,7 @@ class StructuralNavigationTableRegressionTests(unittest.TestCase): navigator._presentObject.assert_called_once_with("cell", 0) navigator._script.presentMessage.assert_called_once_with( - messages.TABLE_CELL_COORDINATES % {"row": 2, "column": 3}, - interrupt=False, + messages.TABLE_CELL_COORDINATES % {"row": 2, "column": 3} ) def test_wrapping_announcement_does_not_interrupt_wrapped_object(self): @@ -78,7 +77,7 @@ class StructuralNavigationTableRegressionTests(unittest.TestCase): events, [ ("present", first, "arg"), - ("message", messages.WRAPPING_TO_TOP, {"interrupt": False}), + ("message", messages.WRAPPING_TO_TOP, {}), ], ) diff --git a/tests/test_web_input_regressions.py b/tests/test_web_input_regressions.py index 287f0cc..00bbf84 100644 --- a/tests/test_web_input_regressions.py +++ b/tests/test_web_input_regressions.py @@ -182,7 +182,7 @@ class WebPresentationModeSpeechRegressionTests(unittest.TestCase): ): web_script.Script.togglePresentationMode(testScript, None, "document") - testScript.presentMessage.assert_called_once_with(messages.MODE_BROWSE, interrupt=False) + testScript.presentMessage.assert_called_once_with(messages.MODE_BROWSE) soundManager.playBrowseModeSound.assert_called_once_with() self.assertFalse(testScript._inFocusMode) self.assertFalse(testScript._focusModeIsSticky) @@ -200,7 +200,7 @@ class WebPresentationModeSpeechRegressionTests(unittest.TestCase): ): web_script.Script.togglePresentationMode(testScript, None, "document") - testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS, interrupt=False) + testScript.presentMessage.assert_called_once_with(messages.MODE_FOCUS) soundManager.playFocusModeSound.assert_called_once_with() self.assertTrue(testScript._inFocusMode) self.assertFalse(testScript._focusModeIsSticky) @@ -311,7 +311,7 @@ class WebDescriptionChangeRegressionTests(unittest.TestCase): result = web_script.Script.onDescriptionChanged(testScript, event) self.assertTrue(result) - testScript.presentMessage.assert_called_once_with("Helpful tooltip", interrupt=False) + testScript.presentMessage.assert_called_once_with("Helpful tooltip") if __name__ == "__main__": @@ -378,7 +378,7 @@ class WebSelectionRegressionTests(unittest.TestCase): testScript.utilities.setCaretContext.assert_called_once_with(section, 1, document) setLocusOfFocus.assert_called_once_with(event, section, False, True) testScript.speakContents.assert_called_once_with([[section, 0, 1, "D"]]) - testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED, interrupt=False) + testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED) self.assertEqual( testScript.pointOfReference["syntheticWebSelection"]["string"], "D", @@ -414,7 +414,7 @@ class WebSelectionRegressionTests(unittest.TestCase): testScript.utilities.setCaretContext.assert_called_once_with(link, 0, document) setLocusOfFocus.assert_called_once_with(event, link, False, True) testScript.speakContents.assert_called_once_with([[link, 0, 1, "S"]]) - testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED, interrupt=False) + testScript.speakMessage.assert_called_once_with(messages.TEXT_SELECTED) self.assertEqual( testScript.pointOfReference["syntheticWebSelection"]["string"], "S", From b055933d6d502dbf382d8c2f4a871b8f6231140a Mon Sep 17 00:00:00 2001 From: ak47 Date: Wed, 20 May 2026 22:36:17 -0400 Subject: [PATCH 6/8] Add Cthulhu Remote plugin --- .../Arch-Linux/cthulhu-git/PKGBUILD | 2 +- src/cthulhu/plugins/CthulhuRemote/__init__.py | 1 + .../plugins/CthulhuRemote/connection_info.py | 78 +++++ .../plugins/CthulhuRemote/local_machine.py | 156 +++++++++ src/cthulhu/plugins/CthulhuRemote/meson.build | 20 ++ src/cthulhu/plugins/CthulhuRemote/plugin.info | 7 + src/cthulhu/plugins/CthulhuRemote/plugin.py | 299 ++++++++++++++++++ src/cthulhu/plugins/CthulhuRemote/protocol.py | 41 +++ .../plugins/CthulhuRemote/serializer.py | 20 ++ .../plugins/CthulhuRemote/socket_utils.py | 23 ++ .../plugins/CthulhuRemote/transport.py | 269 ++++++++++++++++ src/cthulhu/plugins/meson.build | 1 + tests/test_cthulhu_remote_plugin.py | 66 ++++ 13 files changed, 982 insertions(+), 1 deletion(-) create mode 100644 src/cthulhu/plugins/CthulhuRemote/__init__.py create mode 100644 src/cthulhu/plugins/CthulhuRemote/connection_info.py create mode 100644 src/cthulhu/plugins/CthulhuRemote/local_machine.py create mode 100644 src/cthulhu/plugins/CthulhuRemote/meson.build create mode 100644 src/cthulhu/plugins/CthulhuRemote/plugin.info create mode 100644 src/cthulhu/plugins/CthulhuRemote/plugin.py create mode 100644 src/cthulhu/plugins/CthulhuRemote/protocol.py create mode 100644 src/cthulhu/plugins/CthulhuRemote/serializer.py create mode 100644 src/cthulhu/plugins/CthulhuRemote/socket_utils.py create mode 100644 src/cthulhu/plugins/CthulhuRemote/transport.py create mode 100644 tests/test_cthulhu_remote_plugin.py diff --git a/distro-packages/Arch-Linux/cthulhu-git/PKGBUILD b/distro-packages/Arch-Linux/cthulhu-git/PKGBUILD index f83a7af..076c123 100644 --- a/distro-packages/Arch-Linux/cthulhu-git/PKGBUILD +++ b/distro-packages/Arch-Linux/cthulhu-git/PKGBUILD @@ -2,7 +2,7 @@ pkgname=cthulhu-git _pkgname=cthulhu -pkgver=2026.05.06.r394.ga5f7c9a +pkgver=2026.05.14.r396.ge2f9a7c pkgrel=1 pkgdesc="Desktop-agnostic screen reader with plugin system, forked from Orca" url="https://git.stormux.org/storm/cthulhu" diff --git a/src/cthulhu/plugins/CthulhuRemote/__init__.py b/src/cthulhu/plugins/CthulhuRemote/__init__.py new file mode 100644 index 0000000..d0212af --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/__init__.py @@ -0,0 +1 @@ +"""Cthulhu Remote plugin package.""" diff --git a/src/cthulhu/plugins/CthulhuRemote/connection_info.py b/src/cthulhu/plugins/CthulhuRemote/connection_info.py new file mode 100644 index 0000000..763b988 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/connection_info.py @@ -0,0 +1,78 @@ +"""Connection metadata and URL parsing for Cthulhu Remote.""" + +from dataclasses import dataclass +from enum import Enum +from urllib.parse import parse_qs, urlencode, urlparse, urlunparse + +from .protocol import SERVER_PORT, URL_PREFIX +from .socket_utils import host_port_to_address + + +class URLParsingError(Exception): + """Raised when a remote connection URL is incomplete or invalid.""" + + +class ConnectionMode(Enum): + """Remote session role.""" + + MASTER = "master" + SLAVE = "slave" + + +class ConnectionState(Enum): + """Remote session connection state.""" + + CONNECTED = "connected" + CONNECTING = "connecting" + DISCONNECTED = "disconnected" + DISCONNECTING = "disconnecting" + + +@dataclass +class ConnectionInfo: + """Relay connection settings.""" + + hostname: str + mode: ConnectionMode + key: str + port: int = SERVER_PORT + insecure: bool = False + + def __post_init__(self) -> None: + self.port = self.port or SERVER_PORT + self.mode = ConnectionMode(self.mode) + + @classmethod + def from_url(cls, url: str) -> "ConnectionInfo": + parsedUrl = urlparse(url) + parsedQuery = parse_qs(parsedUrl.query) + hostname = parsedUrl.hostname + port = parsedUrl.port or SERVER_PORT + key = parsedQuery.get("key", [""])[0] + mode = parsedQuery.get("mode", [""])[0].lower() + insecure = parsedQuery.get("insecure", ["false"])[0].lower() == "true" + if not hostname: + raise URLParsingError("No hostname provided") + if not key: + raise URLParsingError("No key provided") + if not mode: + raise URLParsingError("No mode provided") + try: + ConnectionMode(mode) + except ValueError as error: + raise URLParsingError(f"Invalid mode provided: {mode!r}") from error + return cls(hostname=hostname, mode=ConnectionMode(mode), key=key, port=port, insecure=insecure) + + def get_address(self) -> str: + return host_port_to_address((self.hostname, self.port)) + + def get_url(self, mode: ConnectionMode | None = None) -> str: + mode = mode or self.mode + query = {"key": self.key, "mode": mode.value} + if self.insecure: + query["insecure"] = "true" + return urlunparse((URL_PREFIX.split("://", 1)[0], self.get_address(), "", "", urlencode(query), "")) + + def get_url_to_connect(self) -> str: + mode = ConnectionMode.SLAVE if self.mode == ConnectionMode.MASTER else ConnectionMode.MASTER + return self.get_url(mode) diff --git a/src/cthulhu/plugins/CthulhuRemote/local_machine.py b/src/cthulhu/plugins/CthulhuRemote/local_machine.py new file mode 100644 index 0000000..39a48d3 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/local_machine.py @@ -0,0 +1,156 @@ +"""Linux desktop integration for Cthulhu Remote commands.""" + +from __future__ import annotations + +import logging +from typing import Any + +import gi + +gi.require_version("Atspi", "2.0") +gi.require_version("Gdk", "3.0") +gi.require_version("Gtk", "3.0") +from gi.repository import Atspi, Gdk, GLib, Gtk + +from cthulhu import speech + +logger = logging.getLogger(__name__) + + +WINDOWS_VK_TO_ATSPI_KEYSYM = { + 0x08: "BackSpace", + 0x09: "Tab", + 0x0D: "Return", + 0x10: "Shift_L", + 0x11: "Control_L", + 0x12: "Alt_L", + 0x13: "Pause", + 0x14: "Caps_Lock", + 0x1B: "Escape", + 0x20: "space", + 0x21: "Page_Up", + 0x22: "Page_Down", + 0x23: "End", + 0x24: "Home", + 0x25: "Left", + 0x26: "Up", + 0x27: "Right", + 0x28: "Down", + 0x2D: "Insert", + 0x2E: "Delete", + 0x5B: "Super_L", + 0x5C: "Super_R", + 0x60: "KP_0", + 0x61: "KP_1", + 0x62: "KP_2", + 0x63: "KP_3", + 0x64: "KP_4", + 0x65: "KP_5", + 0x66: "KP_6", + 0x67: "KP_7", + 0x68: "KP_8", + 0x69: "KP_9", + 0x6A: "KP_Multiply", + 0x6B: "KP_Add", + 0x6D: "KP_Subtract", + 0x6E: "KP_Decimal", + 0x6F: "KP_Divide", +} + +for _digit in range(0x30, 0x3A): + WINDOWS_VK_TO_ATSPI_KEYSYM[_digit] = chr(_digit) +for _letter in range(0x41, 0x5B): + WINDOWS_VK_TO_ATSPI_KEYSYM[_letter] = chr(_letter).lower() +for _functionKey in range(1, 25): + WINDOWS_VK_TO_ATSPI_KEYSYM[0x6F + _functionKey] = f"F{_functionKey}" + + +class LocalMachine: + """Apply incoming remote commands to the local Linux desktop.""" + + def __init__(self, presenter) -> None: + self._presenter = presenter + self.isMuted = False + + def speak(self, sequence: Any = None, **kwargs: Any) -> None: + if self.isMuted: + return + text = self._speech_sequence_to_text(sequence) + if text: + speech.speak(text, interrupt=False) + + def cancel_speech(self, **kwargs: Any) -> None: + if not self.isMuted: + speech.stop() + + def pause_speech(self, switch: bool = False, **kwargs: Any) -> None: + if switch: + speech.stop() + + def set_clipboard_text(self, text: str = "", **kwargs: Any) -> None: + clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) + clipboard.set_text(text, -1) + clipboard.store() + self._presenter("Remote clipboard received") + + def send_key( + self, + vk_code: int | None = None, + keysym: str | None = None, + pressed: bool | None = True, + **kwargs: Any, + ) -> None: + keyval = self._resolve_keyval(vk_code=vk_code, keysym=keysym) + if keyval is None: + logger.debug("Ignoring unmapped remote key: vk_code=%r keysym=%r", vk_code, keysym) + return + eventType = Atspi.KeySynthType.PRESS if pressed else Atspi.KeySynthType.RELEASE + GLib.idle_add(self._generate_key_event, keyval, eventType) + + def display(self, cells: list[int] | None = None, **kwargs: Any) -> None: + logger.debug("Remote braille display update ignored until braille routing is implemented: %r", cells) + + def braille_input(self, **kwargs: Any) -> None: + logger.debug("Remote braille input ignored until braille routing is implemented: %r", kwargs) + + def set_braille_display_size(self, **kwargs: Any) -> None: + logger.debug("Remote braille size negotiation ignored until braille routing is implemented: %r", kwargs) + + def send_secure_attention_sequence(self, **kwargs: Any) -> None: + self._presenter("Ctrl Alt Delete is not available on Linux desktops") + + def _generate_key_event(self, keyval: int, eventType: Atspi.KeySynthType) -> bool: + try: + Atspi.generate_keyboard_event(keyval, None, eventType) + except Exception: + logger.exception("Failed to generate remote key event") + return False + + def _resolve_keyval(self, vk_code: int | None, keysym: str | None) -> int | None: + if keysym: + keyval = Gdk.keyval_from_name(keysym) + return keyval if keyval else None + if vk_code is None: + return None + mappedKeysym = WINDOWS_VK_TO_ATSPI_KEYSYM.get(int(vk_code)) + if not mappedKeysym: + return None + keyval = Gdk.keyval_from_name(mappedKeysym) + return keyval if keyval else None + + def _speech_sequence_to_text(self, sequence: Any) -> str: + if sequence is None: + return "" + if isinstance(sequence, str): + return sequence + if not isinstance(sequence, list): + return str(sequence) + parts = [] + for item in sequence: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, list): + nested = self._speech_sequence_to_text(item) + if nested: + parts.append(nested) + return " ".join(parts) diff --git a/src/cthulhu/plugins/CthulhuRemote/meson.build b/src/cthulhu/plugins/CthulhuRemote/meson.build new file mode 100644 index 0000000..cf579c2 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/meson.build @@ -0,0 +1,20 @@ +cthulhu_remote_python_sources = files([ + '__init__.py', + 'connection_info.py', + 'local_machine.py', + 'plugin.py', + 'protocol.py', + 'serializer.py', + 'socket_utils.py', + 'transport.py' +]) + +python3.install_sources( + cthulhu_remote_python_sources, + subdir: 'cthulhu/plugins/CthulhuRemote' +) + +install_data( + 'plugin.info', + install_dir: python3.get_install_dir() / 'cthulhu' / 'plugins' / 'CthulhuRemote' +) diff --git a/src/cthulhu/plugins/CthulhuRemote/plugin.info b/src/cthulhu/plugins/CthulhuRemote/plugin.info new file mode 100644 index 0000000..aae2024 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/plugin.info @@ -0,0 +1,7 @@ +[Plugin] +Name = Cthulhu Remote +Module = CthulhuRemote +Description = Remote access plugin compatible with NVDA Remote relay servers +Authors = Storm Dragon +Version = 0.1 +Category = Utilities diff --git a/src/cthulhu/plugins/CthulhuRemote/plugin.py b/src/cthulhu/plugins/CthulhuRemote/plugin.py new file mode 100644 index 0000000..8061345 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/plugin.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2026 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. + +"""Cthulhu Remote plugin.""" + +from __future__ import annotations + +import logging +import secrets +from typing import Any + +import gi + +gi.require_version("Gdk", "3.0") +gi.require_version("Gtk", "3.0") +from gi.repository import Gdk, Gtk + +from cthulhu import dbus_service +from cthulhu.plugin import Plugin, cthulhu_hookimpl + +from cthulhu.plugins.CthulhuRemote.connection_info import ( + ConnectionInfo, + ConnectionMode, + ConnectionState, + URLParsingError, +) +from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine +from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType, SERVER_PORT +from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer +from cthulhu.plugins.CthulhuRemote.socket_utils import address_to_host_port +from cthulhu.plugins.CthulhuRemote.transport import RelayTransport + +logger = logging.getLogger(__name__) + + +class CthulhuRemote(Plugin): + """Remote access plugin compatible with NVDA Remote relay servers.""" + + def __init__(self) -> None: + super().__init__() + self._transport: RelayTransport | None = None + self._connectionInfo: ConnectionInfo | None = None + self._connectionState = ConnectionState.DISCONNECTED + self._localMachine = LocalMachine(self._present_message) + self._muted = False + + @cthulhu_hookimpl + def activate(self, plugin=None): + if plugin is not None and plugin is not self: + return + self.registerGestureByString( + self.disconnect, + "Disconnect Cthulhu Remote", + "kb:cthulhu+alt+page_down", + ) + self.registerGestureByString( + self.toggle_mute, + "Toggle Cthulhu Remote mute", + "kb:cthulhu+alt+delete", + ) + self.registerGestureByString( + self.push_clipboard, + "Push clipboard to Cthulhu Remote peers", + "kb:cthulhu+control+shift+c", + ) + logger.info("Cthulhu Remote activated") + + @cthulhu_hookimpl + def deactivate(self, plugin=None): + if plugin is not None and plugin is not self: + return + self.disconnect(notify_user=False) + logger.info("Cthulhu Remote deactivated") + + @dbus_service.parameterized_command + def connect( + self, + host: str, + key: str, + mode: str = "master", + port: int = SERVER_PORT, + insecure: bool = False, + notify_user: bool = True, + ) -> bool: + """Connect to a Cthulhu Remote relay server.""" + + if not host or not key: + if notify_user: + self._present_message("Cthulhu Remote requires host and key") + return False + try: + connectionInfo = ConnectionInfo( + hostname=host, + port=port, + key=key, + mode=ConnectionMode(mode.lower()), + insecure=insecure, + ) + except ValueError: + if notify_user: + self._present_message("Cthulhu Remote mode must be master or slave") + return False + return self._connect(connectionInfo, notify_user=notify_user) + + @dbus_service.parameterized_command + def connect_to_address( + self, + address: str, + key: str, + mode: str = "master", + insecure: bool = False, + notify_user: bool = True, + ) -> bool: + """Connect using host[:port] address text.""" + + host, port = address_to_host_port(address) + return self.connect(host, key, mode=mode, port=port, insecure=insecure, notify_user=notify_user) + + @dbus_service.parameterized_command + def connect_to_url(self, url: str, notify_user: bool = True) -> bool: + """Connect using a cthulhuremote:// or nvdaremote:// URL.""" + + if url.startswith("nvdaremote://"): + url = "cthulhuremote://" + url.split("://", 1)[1] + try: + connectionInfo = ConnectionInfo.from_url(url) + except URLParsingError as error: + logger.warning("Invalid Cthulhu Remote URL: %s", error) + if notify_user: + self._present_message("Invalid Cthulhu Remote URL") + return False + return self._connect(connectionInfo, notify_user=notify_user) + + @dbus_service.command + def disconnect(self, script=None, inputEvent=None, notify_user: bool = True) -> bool: + """Disconnect the current Cthulhu Remote session.""" + + if self._transport is not None: + self._connectionState = ConnectionState.DISCONNECTING + self._transport.close() + self._transport = None + self._connectionState = ConnectionState.DISCONNECTED + if notify_user: + self._present_message("Cthulhu Remote disconnected") + return True + + @dbus_service.command + def toggle_mute(self, script=None, inputEvent=None) -> bool: + """Mute or unmute incoming remote output.""" + + self._muted = not self._muted + self._localMachine.isMuted = self._muted + self._present_message("Cthulhu Remote muted" if self._muted else "Cthulhu Remote unmuted") + return True + + @dbus_service.command + def push_clipboard(self, script=None, inputEvent=None) -> bool: + """Push local clipboard text to connected remote peers.""" + + if not self._transport or not self._transport.connected: + self._present_message("Cthulhu Remote is not connected") + return True + clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) + text = clipboard.wait_for_text() + if not text: + self._present_message("Clipboard does not contain text") + return True + self._transport.send(RemoteMessageType.set_clipboard_text, text=text) + self._present_message("Clipboard pushed") + return True + + @dbus_service.command + def copy_invite_url(self, script=None, inputEvent=None) -> bool: + """Copy a remote invite URL for the opposite connection role.""" + + if not self._connectionInfo: + self._present_message("Cthulhu Remote is not connected") + return True + clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) + clipboard.set_text(self._connectionInfo.get_url_to_connect(), -1) + clipboard.store() + self._present_message("Cthulhu Remote invite copied") + return True + + @dbus_service.command + def generate_key(self) -> str: + """Generate a random remote connection key.""" + + return secrets.token_urlsafe(18) + + @dbus_service.getter + def get_state(self) -> str: + """Return the Cthulhu Remote connection state.""" + + return self._connectionState.value + + @dbus_service.getter + def get_connection_url(self) -> str: + """Return the current connection URL, if connected.""" + + if not self._connectionInfo: + return "" + return self._connectionInfo.get_url() + + def _connect(self, connectionInfo: ConnectionInfo, notify_user: bool = True) -> bool: + self.disconnect(notify_user=False) + serializer = JSONSerializer() + transport = RelayTransport.create(connectionInfo, serializer) + self._register_transport_handlers(transport, connectionInfo.mode) + transport.transportConnected.register(self._handle_transport_connected) + transport.transportDisconnected.register(self._handle_transport_disconnected) + transport.transportConnectionFailed.register(self._handle_transport_failed) + transport.transportCertificateAuthenticationFailed.register(self._handle_certificate_failed) + self._transport = transport + self._connectionInfo = connectionInfo + self._connectionState = ConnectionState.CONNECTING + transport.start() + if notify_user: + self._present_message("Cthulhu Remote connecting") + return True + + def _register_transport_handlers(self, transport: RelayTransport, mode: ConnectionMode) -> None: + transport.register_inbound(RemoteMessageType.speak, self._localMachine.speak) + transport.register_inbound(RemoteMessageType.cancel, self._localMachine.cancel_speech) + transport.register_inbound(RemoteMessageType.pause_speech, self._localMachine.pause_speech) + transport.register_inbound(RemoteMessageType.set_clipboard_text, self._localMachine.set_clipboard_text) + transport.register_inbound(RemoteMessageType.motd, self._handle_motd) + transport.register_inbound(RemoteMessageType.version_mismatch, self._handle_version_mismatch) + transport.register_inbound(RemoteMessageType.channel_joined, self._handle_channel_joined) + transport.register_inbound(RemoteMessageType.client_joined, self._handle_client_joined) + transport.register_inbound(RemoteMessageType.client_left, self._handle_client_left) + if mode == ConnectionMode.SLAVE: + transport.register_inbound(RemoteMessageType.key, self._localMachine.send_key) + transport.register_inbound(RemoteMessageType.braille_input, self._localMachine.braille_input) + transport.register_inbound( + RemoteMessageType.send_SAS, + self._localMachine.send_secure_attention_sequence, + ) + else: + transport.register_inbound(RemoteMessageType.display, self._localMachine.display) + transport.register_inbound( + RemoteMessageType.set_display_size, + self._localMachine.set_braille_display_size, + ) + + def _handle_transport_connected(self, **kwargs: Any) -> None: + self._connectionState = ConnectionState.CONNECTED + self._present_message("Cthulhu Remote connected") + + def _handle_transport_disconnected(self, **kwargs: Any) -> None: + if self._connectionState != ConnectionState.DISCONNECTING: + self._connectionState = ConnectionState.DISCONNECTED + self._present_message("Cthulhu Remote disconnected") + + def _handle_transport_failed(self, **kwargs: Any) -> None: + self._connectionState = ConnectionState.DISCONNECTED + self._present_message("Cthulhu Remote connection failed") + + def _handle_certificate_failed(self, **kwargs: Any) -> None: + self._connectionState = ConnectionState.DISCONNECTED + fingerprint = self._transport.lastFailFingerprint if self._transport else None + if fingerprint: + logger.warning("Cthulhu Remote certificate fingerprint: %s", fingerprint) + self._present_message("Cthulhu Remote certificate verification failed") + + def _handle_motd(self, motd: str = "", **kwargs: Any) -> None: + if motd: + self._present_message(motd) + + def _handle_version_mismatch(self, **kwargs: Any) -> None: + self._present_message("Cthulhu Remote relay protocol mismatch") + self.disconnect(notify_user=False) + + def _handle_channel_joined(self, **kwargs: Any) -> None: + logger.info("Cthulhu Remote channel joined: %r", kwargs) + + def _handle_client_joined(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None: + logger.info("Cthulhu Remote client joined: %r", client or kwargs) + self._present_message("Cthulhu Remote client joined") + + def _handle_client_left(self, client: dict[str, Any] | None = None, **kwargs: Any) -> None: + logger.info("Cthulhu Remote client left: %r", client or kwargs) + self._present_message("Cthulhu Remote client left") + + def _present_message(self, message: str) -> None: + if not self.app: + return + try: + state = self.app.getDynamicApiManager().getAPI("CthulhuState") + if state and state.activeScript: + state.activeScript.presentMessage(message, resetStyles=False) + except Exception: + logger.exception("Failed to present Cthulhu Remote message") diff --git a/src/cthulhu/plugins/CthulhuRemote/protocol.py b/src/cthulhu/plugins/CthulhuRemote/protocol.py new file mode 100644 index 0000000..428d53b --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/protocol.py @@ -0,0 +1,41 @@ +"""Protocol constants for Cthulhu Remote. + +The wire protocol is intentionally compatible with NVDA Remote protocol +version 2 so relay servers can be shared. +""" + +from enum import Enum + + +PROTOCOL_VERSION = 2 +SERVER_PORT = 6837 +URL_PREFIX = "cthulhuremote://" + + +class RemoteMessageType(Enum): + """Message types used by the remote relay protocol.""" + + protocol_version = "protocol_version" + join = "join" + channel_joined = "channel_joined" + client_joined = "client_joined" + client_left = "client_left" + generate_key = "generate_key" + key = "key" + speak = "speak" + cancel = "cancel" + pause_speech = "pause_speech" + tone = "tone" + wave = "wave" + send_SAS = "send_SAS" + index = "index" + display = "display" + braille_input = "braille_input" + set_braille_info = "set_braille_info" + set_display_size = "set_display_size" + set_clipboard_text = "set_clipboard_text" + motd = "motd" + version_mismatch = "version_mismatch" + ping = "ping" + error = "error" + nvda_not_connected = "nvda_not_connected" diff --git a/src/cthulhu/plugins/CthulhuRemote/serializer.py b/src/cthulhu/plugins/CthulhuRemote/serializer.py new file mode 100644 index 0000000..2f54e80 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/serializer.py @@ -0,0 +1,20 @@ +"""JSON message serialization for Cthulhu Remote.""" + +import json +from enum import Enum +from typing import Any + + +class JSONSerializer: + """Serialize newline-delimited JSON remote messages.""" + + separator = b"\n" + + def serialize(self, messageType=None, **payload: Any) -> bytes: + if isinstance(messageType, Enum): + messageType = messageType.value + payload["type"] = messageType + return json.dumps(payload).encode("utf-8") + self.separator + + def deserialize(self, data: bytes) -> dict[str, Any]: + return json.loads(data.decode("utf-8")) diff --git a/src/cthulhu/plugins/CthulhuRemote/socket_utils.py b/src/cthulhu/plugins/CthulhuRemote/socket_utils.py new file mode 100644 index 0000000..c7e71c9 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/socket_utils.py @@ -0,0 +1,23 @@ +"""Socket address helpers for Cthulhu Remote.""" + +import urllib.parse + +from .protocol import SERVER_PORT + + +def address_to_host_port(address: str) -> tuple[str, int]: + """Convert host[:port] text to a host, port tuple.""" + + parsedAddress = urllib.parse.urlparse("//" + address) + return parsedAddress.hostname or "", parsedAddress.port or SERVER_PORT + + +def host_port_to_address(hostPort: tuple[str, int]) -> str: + """Convert a host, port tuple to compact host[:port] text.""" + + host, port = hostPort + if ":" in host: + host = f"[{host}]" + if port != SERVER_PORT: + return f"{host}:{port}" + return host diff --git a/src/cthulhu/plugins/CthulhuRemote/transport.py b/src/cthulhu/plugins/CthulhuRemote/transport.py new file mode 100644 index 0000000..766c201 --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/transport.py @@ -0,0 +1,269 @@ +"""Threaded TLS transport for Cthulhu Remote relay connections.""" + +from __future__ import annotations + +import hashlib +import logging +import queue +import select +import socket +import ssl +import threading +import time +from collections.abc import Callable +from enum import Enum +from typing import Any + +from gi.repository import GLib + +from .connection_info import ConnectionInfo +from .protocol import PROTOCOL_VERSION, RemoteMessageType +from .serializer import JSONSerializer +from .socket_utils import host_port_to_address + +logger = logging.getLogger(__name__) + + +class Action: + """Small callback registrar used by the transport layer.""" + + def __init__(self) -> None: + self._handlers: list[Callable[..., Any]] = [] + self._lock = threading.Lock() + + def register(self, handler: Callable[..., Any]) -> None: + with self._lock: + if handler not in self._handlers: + self._handlers.append(handler) + + def unregister(self, handler: Callable[..., Any]) -> None: + with self._lock: + if handler in self._handlers: + self._handlers.remove(handler) + + def notify(self, **kwargs: Any) -> None: + with self._lock: + handlers = list(self._handlers) + for handler in handlers: + handler(**kwargs) + + +class RelayTransport: + """Connect to an NVDA Remote compatible relay server over TLS.""" + + def __init__( + self, + serializer: JSONSerializer, + address: tuple[str, int], + channel: str, + connectionType: str, + insecure: bool = False, + timeout: int = 0, + protocolVersion: int = PROTOCOL_VERSION, + ) -> None: + self.serializer = serializer + self.address = address + self.channel = channel + self.connectionType = connectionType + self.insecure = insecure + self.timeout = timeout + self.protocolVersion = protocolVersion + self.connected = False + self.closed = True + self.successfulConnects = 0 + self.lastFailFingerprint: str | None = None + self._buffer = b"" + self._socket: ssl.SSLSocket | None = None + self._socketLock = threading.Lock() + self._sendQueue: queue.Queue[bytes | None] = queue.Queue() + self._sendThread: threading.Thread | None = None + self._connectorThread: threading.Thread | None = None + self._running = threading.Event() + self._handlers: dict[RemoteMessageType, Action] = {} + self.transportConnected = Action() + self.transportDisconnected = Action() + self.transportCertificateAuthenticationFailed = Action() + self.transportConnectionFailed = Action() + self.transportClosing = Action() + self.transportConnected.register(self._send_join_messages) + + @classmethod + def create(cls, connectionInfo: ConnectionInfo, serializer: JSONSerializer) -> "RelayTransport": + return cls( + serializer=serializer, + address=(connectionInfo.hostname, connectionInfo.port), + channel=connectionInfo.key, + connectionType=connectionInfo.mode.value, + insecure=connectionInfo.insecure, + ) + + def start(self) -> None: + if self._connectorThread and self._connectorThread.is_alive(): + return + self.closed = False + self._running.set() + self._connectorThread = threading.Thread( + target=self._connector_loop, + name="CthulhuRemoteConnector", + daemon=True, + ) + self._connectorThread.start() + + def close(self) -> None: + self.transportClosing.notify() + self.closed = True + self._running.clear() + self._disconnect() + + def register_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None: + self._handlers.setdefault(messageType, Action()).register(handler) + + def unregister_inbound(self, messageType: RemoteMessageType, handler: Callable[..., Any]) -> None: + if messageType in self._handlers: + self._handlers[messageType].unregister(handler) + + def send(self, messageType: str | Enum, **payload: Any) -> None: + if not self.connected: + logger.debug("Dropping remote message while disconnected: %r", messageType) + return + self._sendQueue.put(self.serializer.serialize(messageType=messageType, **payload)) + + def _connector_loop(self) -> None: + while self._running.is_set(): + try: + self._run_once() + except ssl.SSLCertVerificationError: + logger.warning("Cthulhu Remote certificate verification failed") + except OSError as error: + logger.info("Cthulhu Remote connection failed: %s", error) + except Exception: + logger.exception("Cthulhu Remote connector failed") + if self._running.is_set(): + time.sleep(5) + + def _run_once(self) -> None: + try: + self._socket = self._create_socket() + self._socket.connect(self.address) + except ssl.SSLCertVerificationError: + self._capture_failed_fingerprint() + self.transportCertificateAuthenticationFailed.notify() + raise + except Exception: + self.transportConnectionFailed.notify() + raise + + self.connected = True + self.successfulConnects += 1 + self.transportConnected.notify() + self._sendThread = threading.Thread(target=self._send_loop, name="CthulhuRemoteSend", daemon=True) + self._sendThread.start() + + try: + while self._running.is_set() and self._socket is not None: + readers, _, errors = select.select([self._socket], [], [self._socket], 1.0) + if errors: + break + if readers: + self._read_available() + finally: + self.connected = False + self.transportDisconnected.notify() + self._disconnect() + + def _create_socket(self, insecure: bool | None = None) -> ssl.SSLSocket: + insecure = self.insecure if insecure is None else insecure + host, port = self.address + family, socktype, proto, _, _ = socket.getaddrinfo(host, port)[0] + plainSocket = socket.socket(family, socktype, proto) + if self.timeout: + plainSocket.settimeout(self.timeout) + plainSocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + plainSocket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + context = ssl._create_unverified_context() if insecure else ssl.create_default_context() + return context.wrap_socket(plainSocket, server_hostname=host) + + def _capture_failed_fingerprint(self) -> None: + try: + socketForCert = self._create_socket(insecure=True) + socketForCert.connect(self.address) + certificate = socketForCert.getpeercert(binary_form=True) + socketForCert.close() + if certificate: + self.lastFailFingerprint = hashlib.sha256(certificate).hexdigest().lower() + except Exception: + self.lastFailFingerprint = None + + def _send_join_messages(self) -> None: + self.send(RemoteMessageType.protocol_version, version=self.protocolVersion) + self.send(RemoteMessageType.join, channel=self.channel, connection_type=self.connectionType) + + def _read_available(self) -> None: + if self._socket is None: + return + with self._socketLock: + self._socket.setblocking(False) + try: + data = self._buffer + self._socket.recv(16384) + except ssl.SSLWantReadError: + return + finally: + self._socket.setblocking(True) + self._buffer = b"" + if not data: + self._disconnect() + return + while b"\n" in data: + line, _, data = data.partition(b"\n") + self._parse_line(line) + self._buffer = data + + def _parse_line(self, line: bytes) -> None: + try: + message = self.serializer.deserialize(line) + messageType = RemoteMessageType(message.pop("type")) + except Exception: + logger.exception("Ignoring invalid remote message: %r", line) + return + action = self._handlers.get(messageType) + if action is None: + logger.debug("No handler registered for remote message type: %s", messageType.value) + return + GLib.idle_add(self._notify_action, action, message) + + def _notify_action(self, action: Action, payload: dict[str, Any]) -> bool: + action.notify(**payload) + return False + + def _send_loop(self) -> None: + while True: + item = self._sendQueue.get() + if item is None: + return + try: + with self._socketLock: + if self._socket is not None: + self._socket.sendall(item) + except OSError: + return + + def _disconnect(self) -> None: + if self._sendThread is not None: + self._sendQueue.put(None) + self._sendThread.join(timeout=2) + self._sendThread = None + self._clear_send_queue() + if self._socket is not None: + try: + self._socket.close() + except OSError: + pass + self._socket = None + self._buffer = b"" + + def _clear_send_queue(self) -> None: + try: + while True: + self._sendQueue.get_nowait() + except queue.Empty: + pass diff --git a/src/cthulhu/plugins/meson.build b/src/cthulhu/plugins/meson.build index 4654d79..d4f7682 100644 --- a/src/cthulhu/plugins/meson.build +++ b/src/cthulhu/plugins/meson.build @@ -2,6 +2,7 @@ subdir('AIAssistant') subdir('ByeCthulhu') subdir('Clipboard') +subdir('CthulhuRemote') subdir('DisplayVersion') subdir('HelloCthulhu') subdir('GameMode') diff --git a/tests/test_cthulhu_remote_plugin.py b/tests/test_cthulhu_remote_plugin.py new file mode 100644 index 0000000..d037e7a --- /dev/null +++ b/tests/test_cthulhu_remote_plugin.py @@ -0,0 +1,66 @@ +import sys +import types +import unittest +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) + +input_event_manager_stub = types.ModuleType("cthulhu.input_event_manager") +input_event_manager_stub.get_manager = mock.Mock(return_value=mock.Mock()) +sys.modules["cthulhu.input_event_manager"] = input_event_manager_stub + +from cthulhu.plugin_system_manager import PluginSystemManager +from cthulhu.plugins.CthulhuRemote.connection_info import ConnectionInfo, ConnectionMode +from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine +from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType +from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer + + +class CthulhuRemotePluginTests(unittest.TestCase): + def test_connection_info_accepts_nvda_remote_style_fields(self): + info = ConnectionInfo.from_url("cthulhuremote://example.com:1234?key=abc&mode=slave") + + self.assertEqual(info.hostname, "example.com") + self.assertEqual(info.port, 1234) + self.assertEqual(info.key, "abc") + self.assertEqual(info.mode, ConnectionMode.SLAVE) + self.assertEqual( + info.get_url_to_connect(), + "cthulhuremote://example.com:1234?key=abc&mode=master", + ) + + def test_json_serializer_uses_remote_message_type_values(self): + serializer = JSONSerializer() + + payload = serializer.deserialize(serializer.serialize(RemoteMessageType.join, channel="abc")) + + self.assertEqual(payload["type"], "join") + self.assertEqual(payload["channel"], "abc") + + def test_local_machine_maps_common_windows_vk_codes_to_keyvals(self): + machine = LocalMachine(lambda message: None) + + self.assertEqual(machine._resolve_keyval(0x41, None), machine._resolve_keyval(None, "a")) + self.assertEqual(machine._resolve_keyval(0x70, None), machine._resolve_keyval(None, "F1")) + self.assertIsNone(machine._resolve_keyval(0xFF, None)) + + @mock.patch("cthulhu.plugin_system_manager.dbus_service.get_remote_controller") + def test_plugin_manager_can_load_cthulhu_remote(self, remote_controller): + remote_controller.return_value = mock.Mock() + app = mock.Mock() + app.getSignalManager.return_value = mock.Mock() + app.getAPIHelper.return_value = None + manager = PluginSystemManager(app) + manager.rescanPlugins() + plugin_info = manager._resolve_plugin_info("CthulhuRemote") + + self.assertIsNotNone(plugin_info) + self.assertTrue(manager.loadPlugin(plugin_info)) + + self.assertEqual(plugin_info.instance.__class__.__name__, "CthulhuRemote") + self.assertTrue(manager.unloadPlugin(plugin_info)) + + +if __name__ == "__main__": + unittest.main() From cb553d3031694f82a96b9b88b0b6a4065622a7ad Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Thu, 21 May 2026 00:32:00 -0400 Subject: [PATCH 7/8] Improve Cthulhu Remote speech relay --- src/cthulhu/plugins/CthulhuRemote/README.md | 206 ++++++++++++++++++ .../plugins/CthulhuRemote/local_machine.py | 3 +- src/cthulhu/plugins/CthulhuRemote/plugin.py | 28 +++ src/cthulhu/speech.py | 42 +++- tests/test_cthulhu_remote_plugin.py | 63 ++++++ 5 files changed, 336 insertions(+), 6 deletions(-) create mode 100644 src/cthulhu/plugins/CthulhuRemote/README.md diff --git a/src/cthulhu/plugins/CthulhuRemote/README.md b/src/cthulhu/plugins/CthulhuRemote/README.md new file mode 100644 index 0000000..994accb --- /dev/null +++ b/src/cthulhu/plugins/CthulhuRemote/README.md @@ -0,0 +1,206 @@ +# Cthulhu Remote + +Cthulhu Remote is an NVDA Remote-style assistive technology relay plugin. It is +intended for remote help between screen reader users: one user can control +another user's desktop through keyboard input and receive the remote screen +reader's speech feedback. + +This is not graphical screen sharing. It does not forward a framebuffer, window +image, VNC/RDP session, or screenshots. The useful channel is screen reader +output and remote input. + +## Current Status + +### Working + +- Plugin installs and loads as `CthulhuRemote`. +- The plugin is registered as a D-Bus module when loaded. +- The transport connects outbound to NVDA Remote-compatible relay servers over + TLS. +- The default relay port is `6837`. +- Protocol version `2` is used. +- `cthulhuremote://` URLs are parsed. +- `nvdaremote://` URLs are accepted and rewritten to `cthulhuremote://`. +- `master` and `slave` connection modes are represented. +- Random connection keys can be generated. +- Invite URLs can be copied for the opposite role. +- The local clipboard can be pushed to connected peers. +- Incoming remote speech is spoken through Cthulhu. +- Incoming remote speech is suppressed from speech monitor callbacks to avoid + echo loops. +- In `slave` mode, local Cthulhu speech is forwarded to the relay as `speak` + messages. +- In `slave` mode, incoming remote key messages are mapped from common Windows + virtual-key codes or keysyms to AT-SPI key events. +- Disconnect and mute gestures are registered: + - `cthulhu+alt+page_down`: disconnect + - `cthulhu+alt+delete`: mute or unmute incoming remote output + - `cthulhu+control+shift+c`: push clipboard text + +### Partially Implemented + +- NVDA Remote compatibility is intentional at the relay-message level, but + cross-client interoperability with NVDA has not been verified. +- Remote key injection exists on the controlled/slave side, but master-side key + capture and forwarding is not implemented yet. +- Braille message types exist, but braille routing is not implemented. +- TLS certificate verification is enabled by default, and an insecure mode + exists, but there is no trust-on-first-use certificate workflow. +- Connection state is exposed, but there is no complete user-facing connection + dialog. + +### Not Implemented + +- Graphical screen sharing. +- Screenshot forwarding. +- VNC/RDP integration. +- Master-side keyboard forwarding. +- Remote braille display output. +- Remote braille input routing. +- Tone, wave, and other non-speech remote audio events. +- Ping/keepalive handling. +- Relay error presentation beyond basic logging/messages. +- Preferences UI for host, port, key, mode, certificate trust, and invite + management. + +## Intended User Model + +The typical assistive remote-support flow is: + +1. The person needing help runs Cthulhu and connects as `slave`. +2. The helper runs Cthulhu and connects as `master`. +3. The helper sends keyboard commands through the relay. +4. The controlled desktop receives those keys locally. +5. The controlled machine's Cthulhu speech is sent back to the helper. + +Examples this is meant to support: + +- Teaching someone how to use a program. +- Helping with a stuck dialog or inaccessible workflow. +- Typing into the remote user's editor or terminal. +- Navigating menus and controls with speech feedback. + +Things it does not solve by itself: + +- Visual captchas that require seeing an image. +- Visual inspection of a remote screen. +- Mouse-driven visual troubleshooting without another channel. + +## Implementation Checklist + +### Core Protocol + +- [x] Define protocol constants and message types. +- [x] Serialize and parse newline-delimited JSON messages. +- [x] Connect to relay servers over TLS. +- [x] Send `protocol_version` and `join` messages after connecting. +- [x] Dispatch inbound messages on the GLib main loop. +- [ ] Add ping/keepalive handling. +- [ ] Present relay `error` and `nvda_not_connected` messages clearly. +- [ ] Verify exact payload compatibility with current NVDA Remote clients. + +### Connection Management + +- [x] Parse `cthulhuremote://` URLs. +- [x] Accept `nvdaremote://` URLs. +- [x] Support host, port, key, mode, and insecure TLS fields. +- [x] Track connection states. +- [x] Expose D-Bus commands for connect, disconnect, state, key generation, and + invite URL copying. +- [ ] Add an accessible GTK connection dialog. +- [ ] Add saved/recent relay configuration if desired. +- [ ] Add trust-on-first-use certificate handling or a clear certificate trust + workflow. +- [ ] Add better reconnect/backoff status reporting. + +### Speech + +- [x] Speak incoming remote `speak` messages locally. +- [x] Support muting incoming remote speech. +- [x] Prevent inbound remote speech from being echoed back to the relay. +- [x] Forward local speech to the relay in `slave` mode. +- [ ] Decide whether master mode should ever forward local speech. +- [ ] Preserve richer speech sequence details if needed instead of flattening to + plain text. +- [ ] Verify behavior with speech interruption and cancellation across two live + clients. + +### Keyboard Control + +- [x] Receive remote key messages in `slave` mode. +- [x] Map common Windows virtual-key codes to Linux keysyms. +- [x] Inject remote key presses/releases with AT-SPI. +- [ ] Capture local keyboard events in `master` mode. +- [ ] Forward master key events as remote `key` messages. +- [ ] Prevent forwarded keys from also acting on the master's local desktop, + unless intentionally passed through. +- [ ] Preserve modifier press/release ordering. +- [ ] Verify Xorg behavior. +- [ ] Verify Wayland behavior without weakening Xorg support. +- [ ] Add tests for key payload generation and modifier handling. + +### Clipboard + +- [x] Push local clipboard text to connected peers. +- [x] Receive remote clipboard text into the local clipboard. +- [ ] Add a command or dialog control for pull/request clipboard if protocol + support is available. +- [ ] Decide how to handle large clipboard payloads. +- [ ] Add tests for empty, plain text, and multiline clipboard text. + +### Braille + +- [ ] Route incoming remote `display` messages to Cthulhu braille output. +- [ ] Implement `set_display_size`. +- [ ] Implement `set_braille_info` if required for compatibility. +- [ ] Route local braille input as remote `braille_input` messages when acting + as master. +- [ ] Verify routing keys and cursor-routing behavior. +- [ ] Add tests around braille payload parsing and ignored/unsupported fields. + +### Audio And Miscellaneous Messages + +- [ ] Implement or intentionally ignore `tone`. +- [ ] Implement or intentionally ignore `wave`. +- [ ] Decide whether `index` is relevant to Cthulhu speech. +- [ ] Handle `send_SAS` with a clear Linux-specific message. +- [ ] Log unsupported message types at debug level without spamming users. + +### User Interface + +- [x] Register basic gestures for disconnect, mute, and clipboard push. +- [ ] Add connect/disconnect controls to plugin preferences. +- [ ] Add host, port, key, mode, and insecure TLS fields. +- [ ] Add generate-key and copy-invite controls. +- [ ] Add connection status text suitable for screen reader users. +- [ ] Ensure Tab and Shift+Tab navigate the entire dialog. +- [ ] Associate GTK labels with their controls. + +### Tests And Verification + +- [x] Test URL parsing. +- [x] Test serializer message type values. +- [x] Test common key mapping. +- [x] Test plugin loading through the plugin manager. +- [x] Test additive speech monitor callbacks. +- [x] Test remote speech echo suppression. +- [x] Test slave-mode local speech forwarding. +- [x] Test that master mode does not forward local speech. +- [ ] Add transport tests with a fake relay socket. +- [ ] Add connection-state transition tests. +- [ ] Add D-Bus introspection or command exposure tests for the plugin module. +- [ ] Test against a live NVDA Remote-compatible relay. +- [ ] Test Cthulhu-to-Cthulhu master/slave operation. +- [ ] Test NVDA master to Cthulhu slave. +- [ ] Test Cthulhu master to NVDA slave after master key forwarding exists. + +## Suggested Next Steps + +1. Add an accessible connection dialog so the plugin can be used without manual + D-Bus calls. +2. Implement ping/error handling to improve relay behavior and diagnostics. +3. Design master-side key forwarding carefully around Cthulhu's existing input + event manager. +4. Add fake-relay tests before broad live testing. +5. Perform live two-client testing and record exact compatibility gaps. + diff --git a/src/cthulhu/plugins/CthulhuRemote/local_machine.py b/src/cthulhu/plugins/CthulhuRemote/local_machine.py index 39a48d3..eeb5021 100644 --- a/src/cthulhu/plugins/CthulhuRemote/local_machine.py +++ b/src/cthulhu/plugins/CthulhuRemote/local_machine.py @@ -77,7 +77,8 @@ class LocalMachine: return text = self._speech_sequence_to_text(sequence) if text: - speech.speak(text, interrupt=False) + with speech.suppress_monitor_callbacks(): + speech.speak(text, interrupt=False) def cancel_speech(self, **kwargs: Any) -> None: if not self.isMuted: diff --git a/src/cthulhu/plugins/CthulhuRemote/plugin.py b/src/cthulhu/plugins/CthulhuRemote/plugin.py index 8061345..26fb7b8 100644 --- a/src/cthulhu/plugins/CthulhuRemote/plugin.py +++ b/src/cthulhu/plugins/CthulhuRemote/plugin.py @@ -22,6 +22,7 @@ gi.require_version("Gtk", "3.0") from gi.repository import Gdk, Gtk from cthulhu import dbus_service +from cthulhu import speech from cthulhu.plugin import Plugin, cthulhu_hookimpl from cthulhu.plugins.CthulhuRemote.connection_info import ( @@ -49,6 +50,7 @@ class CthulhuRemote(Plugin): self._connectionState = ConnectionState.DISCONNECTED self._localMachine = LocalMachine(self._present_message) self._muted = False + self._speechMonitorRegistered = False @cthulhu_hookimpl def activate(self, plugin=None): @@ -145,6 +147,7 @@ class CthulhuRemote(Plugin): self._connectionState = ConnectionState.DISCONNECTING self._transport.close() self._transport = None + self._deregister_speech_monitor() self._connectionState = ConnectionState.DISCONNECTED if notify_user: self._present_message("Cthulhu Remote disconnected") @@ -220,11 +223,36 @@ class CthulhuRemote(Plugin): self._transport = transport self._connectionInfo = connectionInfo self._connectionState = ConnectionState.CONNECTING + self._register_speech_monitor(connectionInfo.mode) transport.start() if notify_user: self._present_message("Cthulhu Remote connecting") return True + def _register_speech_monitor(self, mode: ConnectionMode) -> None: + if mode != ConnectionMode.SLAVE or self._speechMonitorRegistered: + return + + speech.add_monitor_callback(self._send_local_speech) + self._speechMonitorRegistered = True + + def _deregister_speech_monitor(self) -> None: + if not self._speechMonitorRegistered: + return + + speech.remove_monitor_callback(self._send_local_speech) + self._speechMonitorRegistered = False + + def _send_local_speech(self, text: str) -> None: + if not text or not text.strip(): + return + if not self._connectionInfo or self._connectionInfo.mode != ConnectionMode.SLAVE: + return + if not self._transport or not self._transport.connected: + return + + self._transport.send(RemoteMessageType.speak, sequence=[text]) + def _register_transport_handlers(self, transport: RelayTransport, mode: ConnectionMode) -> None: transport.register_inbound(RemoteMessageType.speak, self._localMachine.speak) transport.register_inbound(RemoteMessageType.cancel, self._localMachine.cancel_speech) diff --git a/src/cthulhu/speech.py b/src/cthulhu/speech.py index 3c02a7b..60c3e7c 100644 --- a/src/cthulhu/speech.py +++ b/src/cthulhu/speech.py @@ -36,6 +36,7 @@ __license__ = "LGPL" import importlib import time +from contextlib import contextmanager from typing import TYPE_CHECKING, Optional, List, Dict, Any, Union, Callable from . import debug @@ -76,6 +77,8 @@ _timestamp: float = 0.0 # Optional callback for live monitoring of spoken text. _monitorWriteTextCallback: Optional[Callable[[str], None]] = None +_monitorWriteTextListeners: List[Callable[[str], None]] = [] +_monitorSuppressionDepth = 0 def _isSpeechDispatcherFactory(moduleName: Optional[str]) -> bool: if not moduleName: @@ -325,15 +328,44 @@ def set_monitor_callbacks(writeText: Optional[Callable[[str], None]] = None) -> global _monitorWriteTextCallback _monitorWriteTextCallback = writeText +def add_monitor_callback(writeText: Callable[[str], None]) -> None: + """Adds a runtime callback for live speech monitoring.""" + if writeText not in _monitorWriteTextListeners: + _monitorWriteTextListeners.append(writeText) + +def remove_monitor_callback(writeText: Callable[[str], None]) -> None: + """Removes a runtime callback for live speech monitoring.""" + if writeText in _monitorWriteTextListeners: + _monitorWriteTextListeners.remove(writeText) + +@contextmanager +def suppress_monitor_callbacks(): + """Temporarily suppresses live speech monitoring callbacks.""" + global _monitorSuppressionDepth + _monitorSuppressionDepth += 1 + try: + yield + finally: + _monitorSuppressionDepth = max(0, _monitorSuppressionDepth - 1) + def _write_to_monitor(text: str) -> None: """Writes text to the active speech monitor callback if set.""" - if _monitorWriteTextCallback is None: + if _monitorSuppressionDepth: return - try: - _monitorWriteTextCallback(text) - except Exception: - debug.printException(debug.LEVEL_INFO) + callbacks = [] + if _monitorWriteTextCallback is not None: + callbacks.append(_monitorWriteTextCallback) + callbacks.extend( + callback for callback in _monitorWriteTextListeners + if callback != _monitorWriteTextCallback + ) + + for callback in callbacks: + try: + callback(text) + except Exception: + debug.printException(debug.LEVEL_INFO) def __resolveACSS(acss: Optional[Any] = None) -> ACSS: if isinstance(acss, ACSS): diff --git a/tests/test_cthulhu_remote_plugin.py b/tests/test_cthulhu_remote_plugin.py index d037e7a..6e01677 100644 --- a/tests/test_cthulhu_remote_plugin.py +++ b/tests/test_cthulhu_remote_plugin.py @@ -11,8 +11,11 @@ input_event_manager_stub.get_manager = mock.Mock(return_value=mock.Mock()) sys.modules["cthulhu.input_event_manager"] = input_event_manager_stub from cthulhu.plugin_system_manager import PluginSystemManager +from cthulhu import cthulhu_state +from cthulhu import speech from cthulhu.plugins.CthulhuRemote.connection_info import ConnectionInfo, ConnectionMode from cthulhu.plugins.CthulhuRemote.local_machine import LocalMachine +from cthulhu.plugins.CthulhuRemote.plugin import CthulhuRemote from cthulhu.plugins.CthulhuRemote.protocol import RemoteMessageType from cthulhu.plugins.CthulhuRemote.serializer import JSONSerializer @@ -45,6 +48,66 @@ class CthulhuRemotePluginTests(unittest.TestCase): self.assertEqual(machine._resolve_keyval(0x70, None), machine._resolve_keyval(None, "F1")) self.assertIsNone(machine._resolve_keyval(0xFF, None)) + def test_speech_monitor_callbacks_are_additive(self): + primary = mock.Mock() + listener = mock.Mock() + speech.set_monitor_callbacks(writeText=primary) + speech.add_monitor_callback(listener) + self.addCleanup(speech.set_monitor_callbacks, None) + self.addCleanup(speech.remove_monitor_callback, listener) + + speech._write_to_monitor("status") + + primary.assert_called_once_with("status") + listener.assert_called_once_with("status") + + def test_remote_speech_does_not_echo_to_monitor_callbacks(self): + listener = mock.Mock() + speech.add_monitor_callback(listener) + self.addCleanup(speech.remove_monitor_callback, listener) + + with ( + mock.patch.object(speech, "_speechserver", None), + mock.patch.object(speech.speech_history, "add"), + mock.patch.object(cthulhu_state, "activeScript", None), + ): + LocalMachine(lambda message: None).speak(["remote", "status"]) + + listener.assert_not_called() + + def test_slave_mode_forwards_local_speech_to_relay(self): + plugin = CthulhuRemote() + plugin._connectionInfo = ConnectionInfo( + hostname="example.com", + port=1234, + key="abc", + mode=ConnectionMode.SLAVE, + ) + plugin._transport = mock.Mock() + plugin._transport.connected = True + + plugin._send_local_speech("focused button") + + plugin._transport.send.assert_called_once_with( + RemoteMessageType.speak, + sequence=["focused button"], + ) + + def test_master_mode_does_not_forward_local_speech_to_relay(self): + plugin = CthulhuRemote() + plugin._connectionInfo = ConnectionInfo( + hostname="example.com", + port=1234, + key="abc", + mode=ConnectionMode.MASTER, + ) + plugin._transport = mock.Mock() + plugin._transport.connected = True + + plugin._send_local_speech("local status") + + plugin._transport.send.assert_not_called() + @mock.patch("cthulhu.plugin_system_manager.dbus_service.get_remote_controller") def test_plugin_manager_can_load_cthulhu_remote(self, remote_controller): remote_controller.return_value = mock.Mock() From edc195c46b6d6733df317aa951704941ee57e054 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Sun, 24 May 2026 03:47:15 -0400 Subject: [PATCH 8/8] Some web updates. --- src/cthulhu/scripts/web/script.py | 18 ++- src/cthulhu/scripts/web/script_utilities.py | 3 +- tests/test_web_input_regressions.py | 118 ++++++++++++++++++++ 3 files changed, 136 insertions(+), 3 deletions(-) diff --git a/src/cthulhu/scripts/web/script.py b/src/cthulhu/scripts/web/script.py index 1dde9f4..5232aff 100644 --- a/src/cthulhu/scripts/web/script.py +++ b/src/cthulhu/scripts/web/script.py @@ -2276,7 +2276,10 @@ class Script(default.Script): tokens = ["WEB: Dumping cache and context: source is focus", cthulhu_state.locusOfFocus] debug.printTokens(debug.LEVEL_INFO, tokens, True) - self.utilities.dumpCache(document, preserveContext=False) + self.utilities.dumpCache( + document, + preserveContext=not AXObject.is_dead(event.source), + ) elif AXObject.is_dead(cthulhu_state.locusOfFocus): msg = "WEB: Dumping cache: dead focus" debug.printMessage(debug.LEVEL_INFO, msg, True) @@ -2389,7 +2392,10 @@ class Script(default.Script): tokens = ["WEB: Dumping cache and context: source is focus", cthulhu_state.locusOfFocus] debug.printTokens(debug.LEVEL_INFO, tokens, True) - self.utilities.dumpCache(document, preserveContext=False) + self.utilities.dumpCache( + document, + preserveContext=not AXObject.is_dead(event.source), + ) elif AXObject.is_dead(cthulhu_state.locusOfFocus): msg = "WEB: Dumping cache: dead focus" debug.printMessage(debug.LEVEL_INFO, msg, True) @@ -2594,6 +2600,14 @@ class Script(default.Script): else: msg = "WEB: Search for caret context failed" debug.printMessage(debug.LEVEL_INFO, msg, True) + if AXUtilities.is_focusable(event.source) \ + and AXUtilities.is_focused(event.source) \ + and self.utilities.inDocumentContent(event.source): + msg = "WEB: Falling back to focused event source as caret context" + debug.printMessage(debug.LEVEL_INFO, msg, True) + cthulhu.setLocusOfFocus(event, event.source, False) + self.utilities.setCaretContext(event.source, 0) + obj, offset = event.source, 0 if self._lastCommandWasCaretNav: msg = "WEB: Event ignored: Last command was caret nav" diff --git a/src/cthulhu/scripts/web/script_utilities.py b/src/cthulhu/scripts/web/script_utilities.py index f9949df..05012b6 100644 --- a/src/cthulhu/scripts/web/script_utilities.py +++ b/src/cthulhu/scripts/web/script_utilities.py @@ -5050,7 +5050,8 @@ class Utilities(script_utilities.Utilities): debug.printMessage(debug.LEVEL_INFO, msg, True) return False - if not AXObject.is_dead(cthulhu_state.locusOfFocus): + if not AXObject.is_dead(cthulhu_state.locusOfFocus) \ + and not self.isSameObject(replicant, cthulhu_state.locusOfFocus, True, True): tokens = ["WEB: Not event from context replicant. locusOfFocus", cthulhu_state.locusOfFocus, "is not dead."] debug.printTokens(debug.LEVEL_INFO, tokens, True) diff --git a/tests/test_web_input_regressions.py b/tests/test_web_input_regressions.py index 00bbf84..9fa762b 100644 --- a/tests/test_web_input_regressions.py +++ b/tests/test_web_input_regressions.py @@ -314,6 +314,124 @@ class WebDescriptionChangeRegressionTests(unittest.TestCase): testScript.presentMessage.assert_called_once_with("Helpful tooltip") +class WebDynamicContentRecoveryRegressionTests(unittest.TestCase): + def _make_dynamic_script(self): + testScript = web_script.Script.__new__(web_script.Script) + testScript._loadingDocumentContent = False + testScript._lastCommandWasCaretNav = False + testScript._lastCommandWasStructNav = False + testScript._lastMouseButtonContext = (None, -1) + testScript.lastMouseRoutingTime = 0 + testScript.utilities = mock.Mock() + testScript.utilities.eventIsBrowserUINoise.return_value = False + testScript.utilities.isLiveRegion.return_value = False + testScript.utilities.getTopLevelDocumentForObject.return_value = "document" + testScript.utilities.isZombie.return_value = False + testScript.utilities.handleEventFromContextReplicant.return_value = False + testScript.utilities.handleEventForRemovedChild.return_value = False + testScript.utilities.inDocumentContent.return_value = True + return testScript + + def test_children_added_to_live_focus_preserves_caret_context(self): + testScript = self._make_dynamic_script() + focus = object() + event = mock.Mock(source=focus, any_data=object()) + + with ( + mock.patch.object(web_script.cthulhu_state, "locusOfFocus", focus), + mock.patch.object(web_script.AXObject, "is_dead", return_value=False), + mock.patch.object(web_script.AXUtilities, "is_busy", return_value=False), + mock.patch.object(web_script.AXUtilities, "is_alert", return_value=False), + ): + result = web_script.Script.onChildrenAdded(testScript, event) + + self.assertFalse(result) + testScript.utilities.dumpCache.assert_called_once_with( + "document", + preserveContext=True, + ) + + def test_children_removed_from_live_focus_preserves_caret_context(self): + testScript = self._make_dynamic_script() + focus = object() + event = mock.Mock(source=focus, any_data=object()) + + with ( + mock.patch.object(web_script.cthulhu_state, "locusOfFocus", focus), + mock.patch.object(web_script.AXObject, "is_dead", return_value=False), + ): + result = web_script.Script.onChildrenRemoved(testScript, event) + + self.assertFalse(result) + testScript.utilities.dumpCache.assert_called_once_with( + "document", + preserveContext=True, + ) + + def test_focused_event_source_becomes_context_when_search_fails(self): + testScript = self._make_dynamic_script() + source = object() + oldFocus = object() + event = mock.Mock(detail1=1, source=source) + testScript._lastCommandWasCaretNav = True + testScript.utilities.getDocumentForObject.return_value = "document" + testScript.utilities.isWebAppDescendant.return_value = False + testScript.utilities.handleEventFromContextReplicant.return_value = False + testScript.utilities.getCaretContext.return_value = (None, -1) + testScript.utilities.searchForCaretContext.return_value = (None, -1) + testScript.utilities.inFindContainer.return_value = False + + with ( + mock.patch.object(web_script.cthulhu_state, "locusOfFocus", oldFocus), + mock.patch.object(web_script.AXUtilities, "is_editable", return_value=False), + mock.patch.object(web_script.AXUtilities, "is_dialog_or_alert", return_value=False), + mock.patch.object(web_script.AXUtilities, "is_focusable", return_value=True), + mock.patch.object(web_script.AXUtilities, "is_focused", return_value=True), + mock.patch.object(web_script.cthulhu, "setLocusOfFocus") as setLocusOfFocus, + ): + result = web_script.Script.onFocusedChanged(testScript, event) + + self.assertTrue(result) + setLocusOfFocus.assert_called_once_with(event, source, False) + testScript.utilities.setCaretContext.assert_called_once_with(source, 0) + + +class WebContextReplicantRegressionTests(unittest.TestCase): + def test_same_object_replicant_can_recover_before_old_focus_is_dead(self): + utilities = web_script_utilities.Utilities.__new__(web_script_utilities.Utilities) + document = object() + parent = object() + oldFocus = object() + replicant = object() + event = mock.Mock() + utilities._caretContexts = {hash(parent): (oldFocus, 0)} + utilities.getCaretContextPathRoleAndName = mock.Mock( + return_value=([1, 2, 3], "button", "storm, microphone on") + ) + utilities.documentFrame = mock.Mock(return_value=document) + utilities.isSameObject = mock.Mock(return_value=True) + utilities.setCaretContext = mock.Mock() + + with ( + mock.patch.object(web_script_utilities.cthulhu_state, "locusOfFocus", oldFocus), + mock.patch.object(web_script_utilities.AXObject, "is_dead", return_value=False), + mock.patch.object(web_script_utilities.AXObject, "get_path", return_value=[1, 2, 3]), + mock.patch.object(web_script_utilities.AXObject, "get_role", return_value="button"), + mock.patch.object(web_script_utilities.AXObject, "get_name", return_value="storm, microphone on"), + mock.patch.object(web_script_utilities.AXObject, "get_parent", return_value=parent), + mock.patch.object(web_script_utilities.cthulhu, "setLocusOfFocus") as setLocusOfFocus, + ): + result = web_script_utilities.Utilities.handleEventFromContextReplicant( + utilities, + event, + replicant, + ) + + self.assertTrue(result) + setLocusOfFocus.assert_called_once_with(event, replicant, False) + utilities.setCaretContext.assert_called_once_with(replicant, 0, document) + + if __name__ == "__main__": unittest.main()