import importlib import sys import types import unittest from pathlib import Path from unittest import mock sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) from cthulhu import cthulhu_state from cthulhu import cthulhu from cthulhu import compositor_state_adapter from cthulhu import compositor_state_types from cthulhu import compositor_state_wayland from cthulhu.wayland_protocols import ext_workspace_v1 class FakeWorkspaceBackend: def __init__(self, available: bool, name: str) -> None: self.available = available self.name = name self.activate_calls = [] self.deactivate_calls = [] def is_available(self, session_type: str | None = None) -> bool: return self.available def activate(self, emit_signal=None) -> None: self.activate_calls.append(emit_signal) def deactivate(self, emit_signal=None) -> None: self.deactivate_calls.append(emit_signal) class CompositorStateAdapterRegressionTests(unittest.TestCase): def setUp(self) -> None: cthulhu_state.compositorSnapshot = None cthulhu_state.pauseAtspiChurn = False cthulhu_state.prioritizedDesktopContextToken = None cthulhu_state.activeWindow = None cthulhu_state.locusOfFocus = None def test_activate_selects_first_available_backend(self) -> None: unavailableBackend = FakeWorkspaceBackend(False, "unavailable") selectedBackend = FakeWorkspaceBackend(True, "selected") adapter = compositor_state_adapter.CompositorStateAdapter( workspace_backends=[unavailableBackend, selectedBackend] ) adapter.activate() self.assertEqual(unavailableBackend.activate_calls, []) self.assertEqual(len(selectedBackend.activate_calls), 1) self.assertTrue(callable(selectedBackend.activate_calls[0])) self.assertEqual(adapter.get_snapshot().backend_name, "selected") def test_default_workspace_backends_include_wayland_then_null_backend(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() backend_types = [type(backend) for backend in adapter._workspaceBackends] self.assertEqual( backend_types, [ compositor_state_wayland.WaylandSharedProtocolsBackend, compositor_state_wayland.NullWorkspaceBackend, ], ) def test_wayland_backend_is_unavailable_without_wayland_session(self) -> None: backend = compositor_state_wayland.WaylandSharedProtocolsBackend() with mock.patch.dict(compositor_state_wayland.os.environ, {}, clear=True): self.assertFalse(backend.is_available("x11")) self.assertFalse(backend.is_available("wayland")) def test_wayland_backend_installs_dispatch_watch_and_processes_runtime_events(self) -> None: fakeDisplay = mock.Mock() fakeRegistry = mock.Mock() fakeDisplay.connect = mock.Mock() fakeDisplay.get_registry = mock.Mock(return_value=fakeRegistry) fakeDisplay.get_fd = mock.Mock(return_value=17) fakeDisplay.roundtrip = mock.Mock() fakeDisplay.dispatch = mock.Mock(side_effect=[1, 0]) fakeProtocols = mock.Mock() fakeProtocols.INTERFACE_NAME = "ext_workspace_manager_v1" fakeProtocols.INTERFACE_VERSION = 1 fakeProtocols.ACTIVE_STATE_VALUE = 1 fakeProtocols.has_runtime_support.return_value = True fakeProtocols.get_display_class.return_value = mock.Mock(return_value=fakeDisplay) emitSignal = mock.Mock() backend = compositor_state_wayland.WaylandSharedProtocolsBackend( environment={"WAYLAND_DISPLAY": "wayland-0"}, protocols=fakeProtocols, ) with ( mock.patch.object(compositor_state_wayland, "get_session_type", return_value="wayland"), mock.patch.object(compositor_state_wayland.GLib, "io_add_watch", return_value=41) as ioAddWatch, mock.patch.object(compositor_state_wayland.GLib, "source_remove") as sourceRemove, ): backend.activate(emitSignal) ioAddWatch.assert_called_once() watchCallback = ioAddWatch.call_args.args[3] self.assertTrue(watchCallback(17, compositor_state_wayland.GLib.IO_IN)) self.assertEqual(fakeDisplay.dispatch.call_count, 2) backend.deactivate() sourceRemove.assert_called_once_with(41) def test_wayland_backend_logs_activation_failure_reason(self) -> None: fakeDisplay = mock.Mock() fakeDisplay.connect.side_effect = ValueError("Unable to connect to display") fakeProtocols = mock.Mock() fakeProtocols.has_runtime_support.return_value = True fakeProtocols.get_display_class.return_value = mock.Mock(return_value=fakeDisplay) backend = compositor_state_wayland.WaylandSharedProtocolsBackend( environment={"WAYLAND_DISPLAY": "wayland-0"}, protocols=fakeProtocols, ) with ( mock.patch.object(compositor_state_wayland, "get_session_type", return_value="wayland"), mock.patch.object(compositor_state_wayland.debug, "printMessage") as printMessage, ): backend.activate(mock.Mock()) printMessage.assert_any_call( compositor_state_wayland.debug.LEVEL_WARNING, mock.ANY, True, ) self.assertIn("Unable to connect to display", printMessage.call_args_list[-1].args[1]) self.assertIsNone(backend._display) def test_local_ext_workspace_wrapper_supports_base_pywayland_without_distro_protocol_module(self) -> None: fakeClientModule = types.ModuleType("pywayland.client") class FakeDisplay: pass fakeClientModule.Display = FakeDisplay def fake_import(moduleName: str, package=None): if moduleName in ("pywayland.client", "pywayland.client.display"): return fakeClientModule raise ImportError(moduleName) try: with mock.patch("importlib.import_module", side_effect=fake_import): importlib.reload(ext_workspace_v1) self.assertTrue(ext_workspace_v1.has_runtime_support()) self.assertIs(ext_workspace_v1.get_display_class(), FakeDisplay) self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceManagerV1) self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceHandleV1) self.assertEqual( ext_workspace_v1.ExtWorkspaceManagerV1.interface_name, "ext_workspace_manager_v1", ) self.assertEqual( ext_workspace_v1.ExtWorkspaceHandleV1.interface_name, "ext_workspace_handle_v1", ) finally: importlib.reload(ext_workspace_v1) def test_wayland_backend_is_selected_when_available(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() selected_backend = mock.Mock(name="selected-backend") fallback_backend = mock.Mock(name="fallback-backend") selected_backend.name = "wayland-shared-protocols" selected_backend.is_available.return_value = True fallback_backend.name = "null" fallback_backend.is_available.return_value = True adapter._workspaceBackends = [selected_backend, fallback_backend] adapter.activate() selected_backend.activate.assert_called_once() fallback_backend.activate.assert_not_called() self.assertTrue(callable(selected_backend.activate.call_args.args[0])) self.assertEqual(adapter.get_snapshot().backend_name, "wayland-shared-protocols") def test_activate_is_idempotent_and_deactivates_previous_backend(self) -> None: backend = FakeWorkspaceBackend(True, "selected") adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[backend]) adapter.activate() adapter.activate() self.assertEqual(len(backend.activate_calls), 2) self.assertTrue(all(callable(call) for call in backend.activate_calls)) self.assertEqual(len(backend.deactivate_calls), 1) self.assertEqual(adapter.get_snapshot().backend_name, "selected") def test_sync_accessible_context_emits_focus_context_changed_when_active_window_token_changes(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() events = [] adapter.add_listener(events.append) firstWindow = object() secondWindow = object() def get_process_id(obj): return 111 if obj is firstWindow else 222 def get_name(obj): return "Terminal" with ( mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", side_effect=get_process_id), mock.patch.object(compositor_state_adapter.AXObject, "get_name", side_effect=get_name), ): cthulhu_state.activeWindow = firstWindow cthulhu_state.locusOfFocus = firstWindow adapter.sync_accessible_context("startup") events.clear() cthulhu_state.activeWindow = secondWindow cthulhu_state.locusOfFocus = secondWindow adapter.sync_accessible_context("workspace transition") self.assertIn( compositor_state_types.DESKTOP_FOCUS_CONTEXT_CHANGED, [event.type for event in events], ) self.assertIn( compositor_state_types.PRIORITIZE_FOCUS, [event.type for event in events], ) def test_sync_accessible_context_builds_stable_active_window_tokens(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter() window = object() with ( mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", return_value=4242), mock.patch.object(compositor_state_adapter.AXObject, "get_name", return_value="Terminal"), ): cthulhu_state.activeWindow = window cthulhu_state.locusOfFocus = window snapshot = adapter.sync_accessible_context("startup") self.assertEqual(snapshot.active_window_token, "4242:Terminal") self.assertEqual(cthulhu_state.compositorSnapshot.active_window_token, "4242:Terminal") def test_workspace_backend_normalizes_initial_done_and_handoff(self) -> None: adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[]) events = [] adapter.add_listener(events.append) workspace = compositor_state_wayland.WaylandSharedProtocolsBackend() first_workspace = mock.Mock() second_workspace = mock.Mock() first_workspace_token = f"workspace:{id(first_workspace)}" adapter._snapshot = compositor_state_types.DesktopContextSnapshot( session_type="wayland", backend_name="wayland-shared-protocols", ) with mock.patch.object(workspace, "is_available", return_value=False): workspace.activate(adapter._handle_workspace_signal) workspace._handle_workspace_state(first_workspace, "active", True) workspace._handle_workspace_manager_done() self.assertEqual( [event.type for event in events], [ compositor_state_types.DESKTOP_TRANSITION_STARTED, compositor_state_types.PAUSE_ATSPI_CHURN, compositor_state_types.WORKSPACE_STATE_CHANGED, compositor_state_types.DESKTOP_TRANSITION_FINISHED, compositor_state_types.RESUME_ATSPI_CHURN, compositor_state_types.FLUSH_STALE_ATSPI_EVENTS, ], ) self.assertEqual(events[0].snapshot.active_workspace_ids, frozenset({first_workspace_token})) self.assertTrue(events[0].snapshot.workspace_transition_pending) self.assertEqual(events[2].snapshot.active_workspace_ids, frozenset({first_workspace_token})) self.assertFalse(adapter.get_snapshot().workspace_transition_pending) self.assertEqual(adapter.get_snapshot().active_workspace_ids, frozenset({first_workspace_token})) self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_ids, frozenset({first_workspace_token})) events.clear() workspace._handle_workspace_id(second_workspace, "ws-2") workspace._handle_workspace_state(first_workspace, "active", False) workspace._handle_workspace_state(second_workspace, "active", True) workspace._handle_workspace_manager_done() event_types = [event.type for event in events] self.assertEqual( event_types, [ compositor_state_types.DESKTOP_TRANSITION_STARTED, compositor_state_types.PAUSE_ATSPI_CHURN, compositor_state_types.WORKSPACE_STATE_CHANGED, compositor_state_types.DESKTOP_TRANSITION_FINISHED, compositor_state_types.RESUME_ATSPI_CHURN, compositor_state_types.FLUSH_STALE_ATSPI_EVENTS, ], ) self.assertTrue(events[0].snapshot.workspace_transition_pending) self.assertEqual(events[0].snapshot.active_workspace_ids, frozenset()) self.assertEqual(events[2].snapshot.active_workspace_ids, frozenset({"ws-2"})) self.assertEqual(adapter.get_snapshot().active_workspace_ids, frozenset({"ws-2"})) self.assertFalse(adapter.get_snapshot().workspace_transition_pending) self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_ids, frozenset({"ws-2"})) self.assertFalse(cthulhu_state.pauseAtspiChurn) def test_event_manager_startup_resyncs_adapter_after_focus_recovery(self) -> None: adapter = mock.Mock() adapter.sync_accessible_context = mock.Mock(return_value=None) listener = mock.Mock() window = object() focusedObject = object() with ( mock.patch.object(cthulhu.event_manager.Atspi.EventListener, "new", return_value=listener), mock.patch.object(cthulhu.event_manager.AXUtilities, "can_be_active_window", return_value=False), mock.patch.object(cthulhu.event_manager.AXUtilities, "find_active_window", return_value=window), mock.patch.object(cthulhu.event_manager.AXUtilities, "get_focused_object", return_value=focusedObject), mock.patch.object(cthulhu.event_manager.cthulhu, "setActiveWindow") as setActiveWindow, mock.patch.object(cthulhu.event_manager.cthulhu, "setLocusOfFocus") as setLocusOfFocus, ): manager = cthulhu.event_manager.EventManager(mock.Mock()) manager.set_compositor_state_adapter(adapter) manager._sync_focus_on_startup() setActiveWindow.assert_called_once_with(window, alsoSetLocusOfFocus=True, notifyScript=False) setLocusOfFocus.assert_called_once_with(None, focusedObject, notifyScript=True, force=True) adapter.sync_accessible_context.assert_called_once_with("event-manager-startup") if __name__ == "__main__": unittest.main()