import importlib import sys import types import unittest from pathlib import Path from unittest import mock sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) from cthulhu import cthulhu_state from cthulhu import cthulhu from cthulhu import compositor_state_adapter from cthulhu import compositor_state_types from cthulhu import compositor_state_wayland from cthulhu.wayland_protocols import ext_workspace_v1 class FakeWorkspaceBackend: def __init__(self, available: bool, name: str) -> None: self.available = available self.name = name self.activate_calls = [] self.deactivate_calls = [] def is_available(self, session_type: str | None = None) -> bool: return self.available def activate(self, adapter=None) -> None: self.activate_calls.append(adapter) def deactivate(self, adapter=None) -> None: self.deactivate_calls.append(adapter) class CompositorStateAdapterRegressionTests(unittest.TestCase): def setUp(self) -> None: cthulhu_state.compositorSnapshot = None cthulhu_state.pauseAtspiChurn = False cthulhu_state.prioritizedDesktopContextToken = None cthulhu_state.activeWindow = None cthulhu_state.locusOfFocus = None def test_activate_selects_first_available_backend(self) -> None: unavailableBackend = FakeWorkspaceBackend(False, "unavailable") selectedBackend = FakeWorkspaceBackend(True, "selected") adapter = compositor_state_adapter.CompositorStateAdapter( workspace_backends=[unavailableBackend, selectedBackend] ) adapter.activate() self.assertEqual(unavailableBackend.activate_calls, []) self.assertEqual(selectedBackend.activate_calls, [adapter]) self.assertEqual(adapter.get_snapshot().backend_name, "selected") def test_default_workspace_backends_include_wayland_then_null_backend(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() backend_types = [type(backend) for backend in adapter._workspaceBackends] self.assertEqual( backend_types, [ compositor_state_wayland.WaylandSharedProtocolsBackend, compositor_state_wayland.NullWorkspaceBackend, ], ) def test_wayland_backend_is_unavailable_without_wayland_session(self) -> None: backend = compositor_state_wayland.WaylandSharedProtocolsBackend() with mock.patch.dict(compositor_state_wayland.os.environ, {}, clear=True): self.assertFalse(backend.is_available("x11")) self.assertFalse(backend.is_available("wayland")) def test_local_ext_workspace_wrapper_supports_base_pywayland_without_distro_protocol_module(self) -> None: fakeClientModule = types.ModuleType("pywayland.client") class FakeDisplay: pass fakeClientModule.Display = FakeDisplay def fake_import(moduleName: str, package=None): if moduleName in ("pywayland.client", "pywayland.client.display"): return fakeClientModule raise ImportError(moduleName) try: with mock.patch("importlib.import_module", side_effect=fake_import): importlib.reload(ext_workspace_v1) self.assertTrue(ext_workspace_v1.has_runtime_support()) self.assertIs(ext_workspace_v1.get_display_class(), FakeDisplay) self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceManagerV1) self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceHandleV1) self.assertEqual( ext_workspace_v1.ExtWorkspaceManagerV1.interface_name, "ext_workspace_manager_v1", ) self.assertEqual( ext_workspace_v1.ExtWorkspaceHandleV1.interface_name, "ext_workspace_handle_v1", ) finally: importlib.reload(ext_workspace_v1) def test_wayland_backend_is_selected_when_available(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() selected_backend = mock.Mock(name="selected-backend") fallback_backend = mock.Mock(name="fallback-backend") selected_backend.name = "wayland-shared-protocols" selected_backend.is_available.return_value = True fallback_backend.name = "null" fallback_backend.is_available.return_value = True adapter._workspaceBackends = [selected_backend, fallback_backend] adapter.activate() selected_backend.activate.assert_called_once_with(adapter) fallback_backend.activate.assert_not_called() self.assertEqual(adapter.get_snapshot().backend_name, "wayland-shared-protocols") def test_activate_is_idempotent_and_deactivates_previous_backend(self) -> None: backend = FakeWorkspaceBackend(True, "selected") adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[backend]) adapter.activate() adapter.activate() self.assertEqual(backend.activate_calls, [adapter, adapter]) self.assertEqual(backend.deactivate_calls, [adapter]) self.assertEqual(adapter.get_snapshot().backend_name, "selected") def test_sync_accessible_context_emits_focus_context_changed_when_active_window_token_changes(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() events = [] adapter.add_listener(events.append) firstWindow = object() secondWindow = object() def get_process_id(obj): return 111 if obj is firstWindow else 222 def get_name(obj): return "Terminal" with ( mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", side_effect=get_process_id), mock.patch.object(compositor_state_adapter.AXObject, "get_name", side_effect=get_name), ): cthulhu_state.activeWindow = firstWindow cthulhu_state.locusOfFocus = firstWindow adapter.sync_accessible_context("startup") events.clear() cthulhu_state.activeWindow = secondWindow cthulhu_state.locusOfFocus = secondWindow adapter.sync_accessible_context("workspace transition") self.assertIn( compositor_state_types.DESKTOP_FOCUS_CONTEXT_CHANGED, [event.type for event in events], ) self.assertIn( compositor_state_types.PRIORITIZE_FOCUS, [event.type for event in events], ) def test_sync_accessible_context_builds_stable_active_window_tokens(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() window = object() with ( mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", return_value=4242), mock.patch.object(compositor_state_adapter.AXObject, "get_name", return_value="Terminal"), ): cthulhu_state.activeWindow = window cthulhu_state.locusOfFocus = window snapshot = adapter.sync_accessible_context("startup") self.assertEqual(snapshot.active_window_token, "4242:Terminal") self.assertEqual(cthulhu_state.compositorSnapshot.active_window_token, "4242:Terminal") def test_workspace_backend_normalizes_transition_completion(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[]) events = [] adapter.add_listener(events.append) workspace = compositor_state_wayland.WaylandSharedProtocolsBackend() first_workspace = mock.Mock() second_workspace = mock.Mock() first_workspace_token = f"workspace:{id(first_workspace)}" second_workspace_token = f"workspace:{id(second_workspace)}" adapter._snapshot = compositor_state_types.DesktopContextSnapshot( session_type="wayland", backend_name="wayland-shared-protocols", ) workspace.activate(adapter) workspace._handle_workspace_name(first_workspace, "Workspace 1") workspace._handle_workspace_coordinates(first_workspace, (0, 0)) workspace._handle_workspace_state(first_workspace, "active", True) workspace._handle_workspace_manager_done() self.assertEqual(adapter.get_snapshot().active_workspace_token, first_workspace_token) self.assertEqual(adapter.get_snapshot().active_workspace_name, "Workspace 1") self.assertEqual(adapter.get_snapshot().active_workspace_coordinates, (0, 0)) self.assertFalse(adapter.get_snapshot().workspace_transition_pending) self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_token, first_workspace_token) events.clear() workspace._handle_workspace_name(second_workspace, "Workspace 2") workspace._handle_workspace_coordinates(second_workspace, (1, 0)) workspace._handle_workspace_state(first_workspace, "active", False) workspace._handle_workspace_state(second_workspace, "active", True) workspace._handle_workspace_manager_done() event_types = [event.type for event in events] self.assertEqual( event_types, [ compositor_state_types.DESKTOP_TRANSITION_STARTED, compositor_state_types.PAUSE_ATSPI_CHURN, compositor_state_types.WORKSPACE_STATE_CHANGED, compositor_state_types.DESKTOP_TRANSITION_FINISHED, compositor_state_types.RESUME_ATSPI_CHURN, compositor_state_types.FLUSH_STALE_ATSPI_EVENTS, ], ) self.assertTrue(events[0].snapshot.workspace_transition_pending) self.assertEqual(events[0].snapshot.active_workspace_token, first_workspace_token) self.assertFalse(events[2].payload["is_transition_pending"]) self.assertEqual(events[2].snapshot.active_workspace_token, second_workspace_token) self.assertEqual(events[2].snapshot.active_workspace_name, "Workspace 2") self.assertEqual(events[2].snapshot.active_workspace_coordinates, (1, 0)) self.assertIs(events[2].payload["workspace_handle"], second_workspace) self.assertEqual(adapter.get_snapshot().active_workspace_token, second_workspace_token) self.assertEqual(adapter.get_snapshot().active_workspace_name, "Workspace 2") self.assertEqual(adapter.get_snapshot().active_workspace_coordinates, (1, 0)) self.assertFalse(adapter.get_snapshot().workspace_transition_pending) self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_token, second_workspace_token) self.assertFalse(cthulhu_state.pauseAtspiChurn) def test_event_manager_startup_resyncs_adapter_after_focus_recovery(self) -> None: adapter = mock.Mock() adapter.sync_accessible_context = mock.Mock(return_value=None) listener = mock.Mock() window = object() focusedObject = object() with ( mock.patch.object(cthulhu.event_manager.Atspi.EventListener, "new", return_value=listener), mock.patch.object(cthulhu.event_manager.AXUtilities, "can_be_active_window", return_value=False), mock.patch.object(cthulhu.event_manager.AXUtilities, "find_active_window", return_value=window), mock.patch.object(cthulhu.event_manager.AXUtilities, "get_focused_object", return_value=focusedObject), mock.patch.object(cthulhu.event_manager.cthulhu, "setActiveWindow") as setActiveWindow, mock.patch.object(cthulhu.event_manager.cthulhu, "setLocusOfFocus") as setLocusOfFocus, ): manager = cthulhu.event_manager.EventManager(mock.Mock()) manager.set_compositor_state_adapter(adapter) manager._sync_focus_on_startup() setActiveWindow.assert_called_once_with(window, alsoSetLocusOfFocus=True, notifyScript=False) setLocusOfFocus.assert_called_once_with(None, focusedObject, notifyScript=True, force=True) adapter.sync_accessible_context.assert_called_once_with("event-manager-startup") if __name__ == "__main__": unittest.main()