Files
cthulhu/tests/test_compositor_state_adapter_regressions.py

338 lines
15 KiB
Python

import importlib
import sys
import types
import unittest
from pathlib import Path
from unittest import mock
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from cthulhu import cthulhu_state
from cthulhu import cthulhu
from cthulhu import compositor_state_adapter
from cthulhu import compositor_state_types
from cthulhu import compositor_state_wayland
from cthulhu.wayland_protocols import ext_workspace_v1
class FakeWorkspaceBackend:
def __init__(self, available: bool, name: str) -> None:
self.available = available
self.name = name
self.activate_calls = []
self.deactivate_calls = []
def is_available(self, session_type: str | None = None) -> bool:
return self.available
def activate(self, emit_signal=None) -> None:
self.activate_calls.append(emit_signal)
def deactivate(self, emit_signal=None) -> None:
self.deactivate_calls.append(emit_signal)
class CompositorStateAdapterRegressionTests(unittest.TestCase):
def setUp(self) -> None:
cthulhu_state.compositorSnapshot = None
cthulhu_state.pauseAtspiChurn = False
cthulhu_state.prioritizedDesktopContextToken = None
cthulhu_state.activeWindow = None
cthulhu_state.locusOfFocus = None
def test_activate_selects_first_available_backend(self) -> None:
unavailableBackend = FakeWorkspaceBackend(False, "unavailable")
selectedBackend = FakeWorkspaceBackend(True, "selected")
adapter = compositor_state_adapter.CompositorStateAdapter(
workspace_backends=[unavailableBackend, selectedBackend]
)
adapter.activate()
self.assertEqual(unavailableBackend.activate_calls, [])
self.assertEqual(len(selectedBackend.activate_calls), 1)
self.assertTrue(callable(selectedBackend.activate_calls[0]))
self.assertEqual(adapter.get_snapshot().backend_name, "selected")
def test_default_workspace_backends_include_wayland_then_null_backend(self) -> None:
adapter = compositor_state_adapter.CompositorStateAdapter()
backend_types = [type(backend) for backend in adapter._workspaceBackends]
self.assertEqual(
backend_types,
[
compositor_state_wayland.WaylandSharedProtocolsBackend,
compositor_state_wayland.NullWorkspaceBackend,
],
)
def test_wayland_backend_is_unavailable_without_wayland_session(self) -> None:
backend = compositor_state_wayland.WaylandSharedProtocolsBackend()
with mock.patch.dict(compositor_state_wayland.os.environ, {}, clear=True):
self.assertFalse(backend.is_available("x11"))
self.assertFalse(backend.is_available("wayland"))
def test_wayland_backend_installs_dispatch_watch_and_processes_runtime_events(self) -> None:
fakeDisplay = mock.Mock()
fakeRegistry = mock.Mock()
fakeDisplay.connect = mock.Mock()
fakeDisplay.get_registry = mock.Mock(return_value=fakeRegistry)
fakeDisplay.get_fd = mock.Mock(return_value=17)
fakeDisplay.roundtrip = mock.Mock()
fakeDisplay.dispatch = mock.Mock(side_effect=[1, 0])
fakeProtocols = mock.Mock()
fakeProtocols.INTERFACE_NAME = "ext_workspace_manager_v1"
fakeProtocols.INTERFACE_VERSION = 1
fakeProtocols.ACTIVE_STATE_VALUE = 1
fakeProtocols.has_runtime_support.return_value = True
fakeProtocols.get_display_class.return_value = mock.Mock(return_value=fakeDisplay)
emitSignal = mock.Mock()
backend = compositor_state_wayland.WaylandSharedProtocolsBackend(
environment={"WAYLAND_DISPLAY": "wayland-0"},
protocols=fakeProtocols,
)
with (
mock.patch.object(compositor_state_wayland, "get_session_type", return_value="wayland"),
mock.patch.object(compositor_state_wayland.GLib, "io_add_watch", return_value=41) as ioAddWatch,
mock.patch.object(compositor_state_wayland.GLib, "source_remove") as sourceRemove,
):
backend.activate(emitSignal)
ioAddWatch.assert_called_once()
watchCallback = ioAddWatch.call_args.args[3]
self.assertTrue(watchCallback(17, compositor_state_wayland.GLib.IO_IN))
self.assertEqual(fakeDisplay.dispatch.call_count, 2)
backend.deactivate()
sourceRemove.assert_called_once_with(41)
def test_wayland_backend_logs_activation_failure_reason(self) -> None:
fakeDisplay = mock.Mock()
fakeDisplay.connect.side_effect = ValueError("Unable to connect to display")
fakeProtocols = mock.Mock()
fakeProtocols.has_runtime_support.return_value = True
fakeProtocols.get_display_class.return_value = mock.Mock(return_value=fakeDisplay)
backend = compositor_state_wayland.WaylandSharedProtocolsBackend(
environment={"WAYLAND_DISPLAY": "wayland-0"},
protocols=fakeProtocols,
)
with (
mock.patch.object(compositor_state_wayland, "get_session_type", return_value="wayland"),
mock.patch.object(compositor_state_wayland.debug, "printMessage") as printMessage,
):
backend.activate(mock.Mock())
printMessage.assert_any_call(
compositor_state_wayland.debug.LEVEL_WARNING,
mock.ANY,
True,
)
self.assertIn("Unable to connect to display", printMessage.call_args_list[-1].args[1])
self.assertIsNone(backend._display)
def test_local_ext_workspace_wrapper_supports_base_pywayland_without_distro_protocol_module(self) -> None:
fakeClientModule = types.ModuleType("pywayland.client")
class FakeDisplay:
pass
fakeClientModule.Display = FakeDisplay
def fake_import(moduleName: str, package=None):
if moduleName in ("pywayland.client", "pywayland.client.display"):
return fakeClientModule
raise ImportError(moduleName)
try:
with mock.patch("importlib.import_module", side_effect=fake_import):
importlib.reload(ext_workspace_v1)
self.assertTrue(ext_workspace_v1.has_runtime_support())
self.assertIs(ext_workspace_v1.get_display_class(), FakeDisplay)
self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceManagerV1)
self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceHandleV1)
self.assertEqual(
ext_workspace_v1.ExtWorkspaceManagerV1.interface_name,
"ext_workspace_manager_v1",
)
self.assertEqual(
ext_workspace_v1.ExtWorkspaceHandleV1.interface_name,
"ext_workspace_handle_v1",
)
finally:
importlib.reload(ext_workspace_v1)
def test_wayland_backend_is_selected_when_available(self) -> None:
adapter = compositor_state_adapter.CompositorStateAdapter()
selected_backend = mock.Mock(name="selected-backend")
fallback_backend = mock.Mock(name="fallback-backend")
selected_backend.name = "wayland-shared-protocols"
selected_backend.is_available.return_value = True
fallback_backend.name = "null"
fallback_backend.is_available.return_value = True
adapter._workspaceBackends = [selected_backend, fallback_backend]
adapter.activate()
selected_backend.activate.assert_called_once()
fallback_backend.activate.assert_not_called()
self.assertTrue(callable(selected_backend.activate.call_args.args[0]))
self.assertEqual(adapter.get_snapshot().backend_name, "wayland-shared-protocols")
def test_activate_is_idempotent_and_deactivates_previous_backend(self) -> None:
backend = FakeWorkspaceBackend(True, "selected")
adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[backend])
adapter.activate()
adapter.activate()
self.assertEqual(len(backend.activate_calls), 2)
self.assertTrue(all(callable(call) for call in backend.activate_calls))
self.assertEqual(len(backend.deactivate_calls), 1)
self.assertEqual(adapter.get_snapshot().backend_name, "selected")
def test_sync_accessible_context_emits_focus_context_changed_when_active_window_token_changes(self) -> None:
adapter = compositor_state_adapter.CompositorStateAdapter()
events = []
adapter.add_listener(events.append)
firstWindow = object()
secondWindow = object()
def get_process_id(obj):
return 111 if obj is firstWindow else 222
def get_name(obj):
return "Terminal"
with (
mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", side_effect=get_process_id),
mock.patch.object(compositor_state_adapter.AXObject, "get_name", side_effect=get_name),
):
cthulhu_state.activeWindow = firstWindow
cthulhu_state.locusOfFocus = firstWindow
adapter.sync_accessible_context("startup")
events.clear()
cthulhu_state.activeWindow = secondWindow
cthulhu_state.locusOfFocus = secondWindow
adapter.sync_accessible_context("workspace transition")
self.assertIn(
compositor_state_types.DESKTOP_FOCUS_CONTEXT_CHANGED,
[event.type for event in events],
)
self.assertIn(
compositor_state_types.PRIORITIZE_FOCUS,
[event.type for event in events],
)
def test_sync_accessible_context_builds_stable_active_window_tokens(self) -> None:
adapter = compositor_state_adapter.CompositorStateAdapter()
window = object()
with (
mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", return_value=4242),
mock.patch.object(compositor_state_adapter.AXObject, "get_name", return_value="Terminal"),
):
cthulhu_state.activeWindow = window
cthulhu_state.locusOfFocus = window
snapshot = adapter.sync_accessible_context("startup")
self.assertEqual(snapshot.active_window_token, "4242:Terminal")
self.assertEqual(cthulhu_state.compositorSnapshot.active_window_token, "4242:Terminal")
def test_workspace_backend_normalizes_initial_done_and_handoff(self) -> None:
adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[])
events = []
adapter.add_listener(events.append)
workspace = compositor_state_wayland.WaylandSharedProtocolsBackend()
first_workspace = mock.Mock()
second_workspace = mock.Mock()
first_workspace_token = f"workspace:{id(first_workspace)}"
adapter._snapshot = compositor_state_types.DesktopContextSnapshot(
session_type="wayland",
backend_name="wayland-shared-protocols",
)
with mock.patch.object(workspace, "is_available", return_value=False):
workspace.activate(adapter._handle_workspace_signal)
workspace._handle_workspace_state(first_workspace, "active", True)
workspace._handle_workspace_manager_done()
self.assertEqual(
[event.type for event in events],
[
compositor_state_types.DESKTOP_TRANSITION_STARTED,
compositor_state_types.PAUSE_ATSPI_CHURN,
compositor_state_types.WORKSPACE_STATE_CHANGED,
compositor_state_types.DESKTOP_TRANSITION_FINISHED,
compositor_state_types.RESUME_ATSPI_CHURN,
compositor_state_types.FLUSH_STALE_ATSPI_EVENTS,
],
)
self.assertEqual(events[0].snapshot.active_workspace_ids, frozenset({first_workspace_token}))
self.assertTrue(events[0].snapshot.workspace_transition_pending)
self.assertEqual(events[2].snapshot.active_workspace_ids, frozenset({first_workspace_token}))
self.assertFalse(adapter.get_snapshot().workspace_transition_pending)
self.assertEqual(adapter.get_snapshot().active_workspace_ids, frozenset({first_workspace_token}))
self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_ids, frozenset({first_workspace_token}))
events.clear()
workspace._handle_workspace_id(second_workspace, "ws-2")
workspace._handle_workspace_state(first_workspace, "active", False)
workspace._handle_workspace_state(second_workspace, "active", True)
workspace._handle_workspace_manager_done()
event_types = [event.type for event in events]
self.assertEqual(
event_types,
[
compositor_state_types.DESKTOP_TRANSITION_STARTED,
compositor_state_types.PAUSE_ATSPI_CHURN,
compositor_state_types.WORKSPACE_STATE_CHANGED,
compositor_state_types.DESKTOP_TRANSITION_FINISHED,
compositor_state_types.RESUME_ATSPI_CHURN,
compositor_state_types.FLUSH_STALE_ATSPI_EVENTS,
],
)
self.assertTrue(events[0].snapshot.workspace_transition_pending)
self.assertEqual(events[0].snapshot.active_workspace_ids, frozenset())
self.assertEqual(events[2].snapshot.active_workspace_ids, frozenset({"ws-2"}))
self.assertEqual(adapter.get_snapshot().active_workspace_ids, frozenset({"ws-2"}))
self.assertFalse(adapter.get_snapshot().workspace_transition_pending)
self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_ids, frozenset({"ws-2"}))
self.assertFalse(cthulhu_state.pauseAtspiChurn)
def test_event_manager_startup_resyncs_adapter_after_focus_recovery(self) -> None:
adapter = mock.Mock()
adapter.sync_accessible_context = mock.Mock(return_value=None)
listener = mock.Mock()
window = object()
focusedObject = object()
with (
mock.patch.object(cthulhu.event_manager.Atspi.EventListener, "new", return_value=listener),
mock.patch.object(cthulhu.event_manager.AXUtilities, "can_be_active_window", return_value=False),
mock.patch.object(cthulhu.event_manager.AXUtilities, "find_active_window", return_value=window),
mock.patch.object(cthulhu.event_manager.AXUtilities, "get_focused_object", return_value=focusedObject),
mock.patch.object(cthulhu.event_manager.cthulhu, "setActiveWindow") as setActiveWindow,
mock.patch.object(cthulhu.event_manager.cthulhu, "setLocusOfFocus") as setLocusOfFocus,
):
manager = cthulhu.event_manager.EventManager(mock.Mock())
manager.set_compositor_state_adapter(adapter)
manager._sync_focus_on_startup()
setActiveWindow.assert_called_once_with(window, alsoSetLocusOfFocus=True, notifyScript=False)
setLocusOfFocus.assert_called_once_with(None, focusedObject, notifyScript=True, force=True)
adapter.sync_accessible_context.assert_called_once_with("event-manager-startup")
if __name__ == "__main__":
unittest.main()