210 lines
9.7 KiB
Python
210 lines
9.7 KiB
Python
import sys
|
|
import types
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
|
|
|
from cthulhu import cthulhu_state
|
|
from cthulhu import compositor_state_types
|
|
|
|
stubCthulhu = types.ModuleType("cthulhu.cthulhu")
|
|
stubCthulhu.cthulhuApp = mock.Mock()
|
|
sys.modules.setdefault("cthulhu.cthulhu", stubCthulhu)
|
|
|
|
from cthulhu import event_manager
|
|
|
|
|
|
class FakeEvent:
|
|
def __init__(self, event_type, source="same", detail1=0, detail2=0, any_data=None):
|
|
self.type = event_type
|
|
self.source = source
|
|
self.detail1 = detail1
|
|
self.detail2 = detail2
|
|
self.any_data = any_data
|
|
|
|
|
|
class EventManagerCompositorContextRegressionTests(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
self.originalPauseAtspiChurn = cthulhu_state.pauseAtspiChurn
|
|
self.originalPrioritizedDesktopContextToken = cthulhu_state.prioritizedDesktopContextToken
|
|
self.originalCompositorSnapshot = cthulhu_state.compositorSnapshot
|
|
self.originalActiveScript = cthulhu_state.activeScript
|
|
self.addCleanup(self._restore_cthulhu_state)
|
|
|
|
self.listener = mock.Mock()
|
|
self.listenerPatch = mock.patch.object(
|
|
event_manager.Atspi.EventListener,
|
|
"new",
|
|
return_value=self.listener,
|
|
)
|
|
self.listenerPatch.start()
|
|
self.addCleanup(self.listenerPatch.stop)
|
|
|
|
self.manager = event_manager.EventManager(mock.Mock(), asyncMode=False)
|
|
self.manager._active = True
|
|
self.manager._context_token_for_event = mock.Mock(side_effect=lambda event: event.source)
|
|
cthulhu_state.pauseAtspiChurn = False
|
|
cthulhu_state.prioritizedDesktopContextToken = None
|
|
cthulhu_state.compositorSnapshot = None
|
|
cthulhu_state.activeScript = None
|
|
|
|
def _restore_cthulhu_state(self) -> None:
|
|
cthulhu_state.pauseAtspiChurn = self.originalPauseAtspiChurn
|
|
cthulhu_state.prioritizedDesktopContextToken = self.originalPrioritizedDesktopContextToken
|
|
cthulhu_state.compositorSnapshot = self.originalCompositorSnapshot
|
|
cthulhu_state.activeScript = self.originalActiveScript
|
|
|
|
def test_set_compositor_state_adapter_registers_compositor_listener(self) -> None:
|
|
adapter = mock.Mock()
|
|
|
|
self.manager.set_compositor_state_adapter(adapter)
|
|
|
|
adapter.add_listener.assert_called_once_with(self.manager._handle_compositor_signal)
|
|
|
|
def test_pause_signal_updates_churn_state_and_resume_clears_it(self) -> None:
|
|
snapshot = compositor_state_types.DesktopContextSnapshot(session_type="wayland")
|
|
|
|
self.manager._handle_compositor_signal(
|
|
compositor_state_types.CompositorStateEvent(
|
|
compositor_state_types.PAUSE_ATSPI_CHURN,
|
|
reason="workspace-transition",
|
|
snapshot=snapshot,
|
|
payload={"context_token": "current"},
|
|
)
|
|
)
|
|
|
|
self.assertTrue(cthulhu_state.pauseAtspiChurn)
|
|
self.assertTrue(self.manager._churnSuppressed)
|
|
|
|
self.manager._handle_compositor_signal(
|
|
compositor_state_types.CompositorStateEvent(
|
|
compositor_state_types.RESUME_ATSPI_CHURN,
|
|
reason="workspace-transition",
|
|
snapshot=snapshot,
|
|
payload={"context_token": "current"},
|
|
)
|
|
)
|
|
|
|
self.assertFalse(cthulhu_state.pauseAtspiChurn)
|
|
self.assertFalse(self.manager._churnSuppressed)
|
|
|
|
def test_stale_context_event_is_obsolete_while_churn_is_paused(self) -> None:
|
|
self.manager._churnSuppressed = True
|
|
self.manager._prioritizedContextToken = "current"
|
|
event = FakeEvent("object:children-changed:add", source="stale")
|
|
|
|
self.assertTrue(self.manager._is_obsolete_by_context(event))
|
|
|
|
def test_flush_signal_removes_stale_events_from_queue(self) -> None:
|
|
self.manager._churnSuppressed = False
|
|
self.manager._prioritizedContextToken = "current"
|
|
staleEvent = FakeEvent("object:children-changed:add", source="stale")
|
|
currentEvent = FakeEvent("object:children-changed:add", source="current")
|
|
self.manager._eventQueue.put(staleEvent)
|
|
self.manager._eventQueue.put(currentEvent)
|
|
|
|
self.manager._handle_compositor_signal(
|
|
compositor_state_types.CompositorStateEvent(
|
|
compositor_state_types.FLUSH_STALE_ATSPI_EVENTS,
|
|
reason="resume",
|
|
snapshot=compositor_state_types.DesktopContextSnapshot(session_type="wayland"),
|
|
payload={"context_token": "current"},
|
|
)
|
|
)
|
|
|
|
self.assertEqual(list(self.manager._eventQueue.queue), [currentEvent])
|
|
|
|
def test_stale_background_event_does_not_activate_script_during_suppression(self) -> None:
|
|
script = mock.Mock()
|
|
script.isActivatableEvent.return_value = True
|
|
script.forceScriptActivation.return_value = False
|
|
self.manager._churnSuppressed = True
|
|
self.manager._prioritizedContextToken = "current"
|
|
|
|
event = FakeEvent("object:state-changed:showing", source="old")
|
|
|
|
result, reason = self.manager._isActivatableEvent(event, script)
|
|
|
|
self.assertFalse(result)
|
|
self.assertIn("compositor-prioritized context", reason)
|
|
|
|
def test_focus_event_syncs_accessible_context_back_into_adapter(self) -> None:
|
|
adapter = mock.Mock()
|
|
script = mock.Mock()
|
|
source = object()
|
|
event = FakeEvent("object:state-changed:focused", source=source, detail1=1)
|
|
self.manager._compositorStateAdapter = adapter
|
|
self.manager._get_scriptForEvent = mock.Mock(return_value=script)
|
|
self.manager._isActivatableEvent = mock.Mock(return_value=(False, "already active"))
|
|
self.manager._inFlood = mock.Mock(return_value=False)
|
|
|
|
with (
|
|
mock.patch.object(event_manager.debug, "printObjectEvent"),
|
|
mock.patch.object(event_manager.debug, "printDetails"),
|
|
mock.patch.object(event_manager.debug, "printMessage"),
|
|
mock.patch.object(event_manager.debug, "printTokens"),
|
|
mock.patch.object(event_manager.AXUtilities, "get_desktop", return_value=object()),
|
|
mock.patch.object(event_manager.AXUtilities, "is_defunct", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "is_iconified", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "is_frame", return_value=False),
|
|
mock.patch.object(event_manager.AXObject, "is_dead", return_value=False),
|
|
):
|
|
self.manager._processObjectEvent(event)
|
|
|
|
adapter.sync_accessible_context.assert_called_once_with("object:state-changed:focused")
|
|
|
|
def test_steam_children_changed_burst_is_suppressed_before_flood_threshold(self) -> None:
|
|
app = object()
|
|
cthulhu_state.activeScript = mock.Mock(app=app)
|
|
firstEvent = FakeEvent("object:children-changed:add", source="steam-context", any_data=object())
|
|
secondEvent = FakeEvent("object:children-changed:add", source="steam-context", any_data=object())
|
|
|
|
with (
|
|
mock.patch.object(event_manager.time, "monotonic", side_effect=[100.0, 100.05]),
|
|
mock.patch.object(event_manager.debug, "printMessage"),
|
|
mock.patch.object(event_manager.debug, "printTokens"),
|
|
mock.patch.object(event_manager.debug, "print_log"),
|
|
mock.patch.object(event_manager.AXObject, "get_application", return_value=app),
|
|
mock.patch.object(event_manager.AXObject, "get_name", return_value="steamwebhelper"),
|
|
mock.patch.object(event_manager.AXObject, "get_role", return_value=mock.Mock()),
|
|
mock.patch.object(event_manager.AXObject, "is_dead", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "has_no_state", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "is_defunct", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "manages_descendants", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "is_image", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "is_menu_item", return_value=False),
|
|
):
|
|
self.manager._isSteamApp = mock.Mock(return_value=True)
|
|
self.manager._isSteamNotificationEvent = mock.Mock(return_value=False)
|
|
|
|
self.assertFalse(self.manager._ignore(firstEvent))
|
|
self.assertTrue(self.manager._ignore(secondEvent))
|
|
|
|
def test_steam_focus_lost_burst_is_ignored_but_focus_gain_is_preserved(self) -> None:
|
|
app = object()
|
|
cthulhu_state.activeScript = mock.Mock(app=app)
|
|
focusLost = FakeEvent("object:state-changed:focused", source="steam-context", detail1=0)
|
|
focusGained = FakeEvent("object:state-changed:focused", source="steam-context", detail1=1)
|
|
|
|
with (
|
|
mock.patch.object(event_manager.debug, "printMessage"),
|
|
mock.patch.object(event_manager.debug, "printTokens"),
|
|
mock.patch.object(event_manager.debug, "print_log"),
|
|
mock.patch.object(event_manager.AXObject, "get_application", return_value=app),
|
|
mock.patch.object(event_manager.AXObject, "get_name", return_value="steamwebhelper"),
|
|
mock.patch.object(event_manager.AXObject, "get_role", return_value=mock.Mock()),
|
|
mock.patch.object(event_manager.AXUtilities, "has_no_state", return_value=False),
|
|
mock.patch.object(event_manager.AXUtilities, "is_defunct", return_value=False),
|
|
):
|
|
self.manager._isSteamApp = mock.Mock(return_value=True)
|
|
self.manager._isSteamNotificationEvent = mock.Mock(return_value=False)
|
|
|
|
self.assertTrue(self.manager._ignore(focusLost))
|
|
self.assertFalse(self.manager._ignore(focusGained))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|