import sys import types import unittest from pathlib import Path from unittest import mock sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) from cthulhu import cthulhu_state from cthulhu import compositor_state_types stubCthulhu = types.ModuleType("cthulhu.cthulhu") stubCthulhu.cthulhuApp = mock.Mock() sys.modules.setdefault("cthulhu.cthulhu", stubCthulhu) from cthulhu import event_manager class FakeEvent: def __init__(self, event_type, source="same", detail1=0, detail2=0, any_data=None): self.type = event_type self.source = source self.detail1 = detail1 self.detail2 = detail2 self.any_data = any_data class EventManagerCompositorContextRegressionTests(unittest.TestCase): def setUp(self) -> None: self.originalPauseAtspiChurn = cthulhu_state.pauseAtspiChurn self.originalPrioritizedDesktopContextToken = cthulhu_state.prioritizedDesktopContextToken self.originalCompositorSnapshot = cthulhu_state.compositorSnapshot self.originalActiveScript = cthulhu_state.activeScript self.addCleanup(self._restore_cthulhu_state) self.listener = mock.Mock() self.listenerPatch = mock.patch.object( event_manager.Atspi.EventListener, "new", return_value=self.listener, ) self.listenerPatch.start() self.addCleanup(self.listenerPatch.stop) self.manager = event_manager.EventManager(mock.Mock(), asyncMode=False) self.manager._active = True self.manager._context_token_for_event = mock.Mock(side_effect=lambda event: event.source) cthulhu_state.pauseAtspiChurn = False cthulhu_state.prioritizedDesktopContextToken = None cthulhu_state.compositorSnapshot = None cthulhu_state.activeScript = None def _restore_cthulhu_state(self) -> None: cthulhu_state.pauseAtspiChurn = self.originalPauseAtspiChurn cthulhu_state.prioritizedDesktopContextToken = self.originalPrioritizedDesktopContextToken cthulhu_state.compositorSnapshot = self.originalCompositorSnapshot cthulhu_state.activeScript = self.originalActiveScript def test_set_compositor_state_adapter_registers_compositor_listener(self) -> None: adapter = mock.Mock() self.manager.set_compositor_state_adapter(adapter) adapter.add_listener.assert_called_once_with(self.manager._handle_compositor_signal) def test_pause_signal_updates_churn_state_and_resume_clears_it(self) -> None: snapshot = compositor_state_types.DesktopContextSnapshot(session_type="wayland") self.manager._handle_compositor_signal( compositor_state_types.CompositorStateEvent( compositor_state_types.PAUSE_ATSPI_CHURN, reason="workspace-transition", snapshot=snapshot, payload={"context_token": "current"}, ) ) self.assertTrue(cthulhu_state.pauseAtspiChurn) self.assertTrue(self.manager._churnSuppressed) self.manager._handle_compositor_signal( compositor_state_types.CompositorStateEvent( compositor_state_types.RESUME_ATSPI_CHURN, reason="workspace-transition", snapshot=snapshot, payload={"context_token": "current"}, ) ) self.assertFalse(cthulhu_state.pauseAtspiChurn) self.assertFalse(self.manager._churnSuppressed) def test_stale_context_event_is_obsolete_while_churn_is_paused(self) -> None: self.manager._churnSuppressed = True self.manager._prioritizedContextToken = "current" event = FakeEvent("object:children-changed:add", source="stale") self.assertTrue(self.manager._is_obsolete_by_context(event)) def test_flush_signal_removes_stale_events_from_queue(self) -> None: self.manager._churnSuppressed = False self.manager._prioritizedContextToken = "current" staleEvent = FakeEvent("object:children-changed:add", source="stale") currentEvent = FakeEvent("object:children-changed:add", source="current") self.manager._eventQueue.put(staleEvent) self.manager._eventQueue.put(currentEvent) self.manager._handle_compositor_signal( compositor_state_types.CompositorStateEvent( compositor_state_types.FLUSH_STALE_ATSPI_EVENTS, reason="resume", snapshot=compositor_state_types.DesktopContextSnapshot(session_type="wayland"), payload={"context_token": "current"}, ) ) self.assertEqual(list(self.manager._eventQueue.queue), [currentEvent]) def test_stale_background_event_does_not_activate_script_during_suppression(self) -> None: script = mock.Mock() script.isActivatableEvent.return_value = True script.forceScriptActivation.return_value = False self.manager._churnSuppressed = True self.manager._prioritizedContextToken = "current" event = FakeEvent("object:state-changed:showing", source="old") result, reason = self.manager._isActivatableEvent(event, script) self.assertFalse(result) self.assertIn("compositor-prioritized context", reason) def test_focus_event_syncs_accessible_context_back_into_adapter(self) -> None: adapter = mock.Mock() script = mock.Mock() source = object() event = FakeEvent("object:state-changed:focused", source=source, detail1=1) self.manager._compositorStateAdapter = adapter self.manager._get_scriptForEvent = mock.Mock(return_value=script) self.manager._isActivatableEvent = mock.Mock(return_value=(False, "already active")) self.manager._inFlood = mock.Mock(return_value=False) with ( mock.patch.object(event_manager.debug, "printObjectEvent"), mock.patch.object(event_manager.debug, "printDetails"), mock.patch.object(event_manager.debug, "printMessage"), mock.patch.object(event_manager.debug, "printTokens"), mock.patch.object(event_manager.AXUtilities, "get_desktop", return_value=object()), mock.patch.object(event_manager.AXUtilities, "is_defunct", return_value=False), mock.patch.object(event_manager.AXUtilities, "is_iconified", return_value=False), mock.patch.object(event_manager.AXUtilities, "is_frame", return_value=False), mock.patch.object(event_manager.AXObject, "is_dead", return_value=False), ): self.manager._processObjectEvent(event) adapter.sync_accessible_context.assert_called_once_with("object:state-changed:focused") def test_steam_children_changed_burst_is_suppressed_before_flood_threshold(self) -> None: app = object() cthulhu_state.activeScript = mock.Mock(app=app) firstEvent = FakeEvent("object:children-changed:add", source="steam-context", any_data=object()) secondEvent = FakeEvent("object:children-changed:add", source="steam-context", any_data=object()) with ( mock.patch.object(event_manager.time, "monotonic", side_effect=[100.0, 100.05]), mock.patch.object(event_manager.debug, "printMessage"), mock.patch.object(event_manager.debug, "printTokens"), mock.patch.object(event_manager.debug, "print_log"), mock.patch.object(event_manager.AXObject, "get_application", return_value=app), mock.patch.object(event_manager.AXObject, "get_name", return_value="steamwebhelper"), mock.patch.object(event_manager.AXObject, "get_role", return_value=mock.Mock()), mock.patch.object(event_manager.AXObject, "is_dead", return_value=False), mock.patch.object(event_manager.AXUtilities, "has_no_state", return_value=False), mock.patch.object(event_manager.AXUtilities, "is_defunct", return_value=False), mock.patch.object(event_manager.AXUtilities, "manages_descendants", return_value=False), mock.patch.object(event_manager.AXUtilities, "is_image", return_value=False), mock.patch.object(event_manager.AXUtilities, "is_menu_item", return_value=False), ): self.manager._isSteamApp = mock.Mock(return_value=True) self.manager._isSteamNotificationEvent = mock.Mock(return_value=False) self.assertFalse(self.manager._ignore(firstEvent)) self.assertTrue(self.manager._ignore(secondEvent)) def test_steam_focus_lost_burst_is_ignored_but_focus_gain_is_preserved(self) -> None: app = object() cthulhu_state.activeScript = mock.Mock(app=app) focusLost = FakeEvent("object:state-changed:focused", source="steam-context", detail1=0) focusGained = FakeEvent("object:state-changed:focused", source="steam-context", detail1=1) with ( mock.patch.object(event_manager.debug, "printMessage"), mock.patch.object(event_manager.debug, "printTokens"), mock.patch.object(event_manager.debug, "print_log"), mock.patch.object(event_manager.AXObject, "get_application", return_value=app), mock.patch.object(event_manager.AXObject, "get_name", return_value="steamwebhelper"), mock.patch.object(event_manager.AXObject, "get_role", return_value=mock.Mock()), mock.patch.object(event_manager.AXUtilities, "has_no_state", return_value=False), mock.patch.object(event_manager.AXUtilities, "is_defunct", return_value=False), ): self.manager._isSteamApp = mock.Mock(return_value=True) self.manager._isSteamNotificationEvent = mock.Mock(return_value=False) self.assertTrue(self.manager._ignore(focusLost)) self.assertFalse(self.manager._ignore(focusGained)) if __name__ == "__main__": unittest.main()