338 lines
15 KiB
Python
338 lines
15 KiB
Python
import importlib
|
|
import sys
|
|
import types
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
|
|
|
from cthulhu import cthulhu_state
|
|
from cthulhu import cthulhu
|
|
from cthulhu import compositor_state_adapter
|
|
from cthulhu import compositor_state_types
|
|
from cthulhu import compositor_state_wayland
|
|
from cthulhu.wayland_protocols import ext_workspace_v1
|
|
|
|
|
|
class FakeWorkspaceBackend:
|
|
def __init__(self, available: bool, name: str) -> None:
|
|
self.available = available
|
|
self.name = name
|
|
self.activate_calls = []
|
|
self.deactivate_calls = []
|
|
|
|
def is_available(self, session_type: str | None = None) -> bool:
|
|
return self.available
|
|
|
|
def activate(self, emit_signal=None) -> None:
|
|
self.activate_calls.append(emit_signal)
|
|
|
|
def deactivate(self, emit_signal=None) -> None:
|
|
self.deactivate_calls.append(emit_signal)
|
|
|
|
|
|
class CompositorStateAdapterRegressionTests(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
cthulhu_state.compositorSnapshot = None
|
|
cthulhu_state.pauseAtspiChurn = False
|
|
cthulhu_state.prioritizedDesktopContextToken = None
|
|
cthulhu_state.activeWindow = None
|
|
cthulhu_state.locusOfFocus = None
|
|
|
|
def test_activate_selects_first_available_backend(self) -> None:
|
|
unavailableBackend = FakeWorkspaceBackend(False, "unavailable")
|
|
selectedBackend = FakeWorkspaceBackend(True, "selected")
|
|
adapter = compositor_state_adapter.CompositorStateAdapter(
|
|
workspace_backends=[unavailableBackend, selectedBackend]
|
|
)
|
|
|
|
adapter.activate()
|
|
|
|
self.assertEqual(unavailableBackend.activate_calls, [])
|
|
self.assertEqual(len(selectedBackend.activate_calls), 1)
|
|
self.assertTrue(callable(selectedBackend.activate_calls[0]))
|
|
self.assertEqual(adapter.get_snapshot().backend_name, "selected")
|
|
|
|
def test_default_workspace_backends_include_wayland_then_null_backend(self) -> None:
|
|
adapter = compositor_state_adapter.CompositorStateAdapter()
|
|
|
|
backend_types = [type(backend) for backend in adapter._workspaceBackends]
|
|
|
|
self.assertEqual(
|
|
backend_types,
|
|
[
|
|
compositor_state_wayland.WaylandSharedProtocolsBackend,
|
|
compositor_state_wayland.NullWorkspaceBackend,
|
|
],
|
|
)
|
|
|
|
def test_wayland_backend_is_unavailable_without_wayland_session(self) -> None:
|
|
backend = compositor_state_wayland.WaylandSharedProtocolsBackend()
|
|
|
|
with mock.patch.dict(compositor_state_wayland.os.environ, {}, clear=True):
|
|
self.assertFalse(backend.is_available("x11"))
|
|
self.assertFalse(backend.is_available("wayland"))
|
|
|
|
def test_wayland_backend_installs_dispatch_watch_and_processes_runtime_events(self) -> None:
|
|
fakeDisplay = mock.Mock()
|
|
fakeRegistry = mock.Mock()
|
|
fakeDisplay.connect = mock.Mock()
|
|
fakeDisplay.get_registry = mock.Mock(return_value=fakeRegistry)
|
|
fakeDisplay.get_fd = mock.Mock(return_value=17)
|
|
fakeDisplay.roundtrip = mock.Mock()
|
|
fakeDisplay.dispatch = mock.Mock(side_effect=[1, 0])
|
|
fakeProtocols = mock.Mock()
|
|
fakeProtocols.INTERFACE_NAME = "ext_workspace_manager_v1"
|
|
fakeProtocols.INTERFACE_VERSION = 1
|
|
fakeProtocols.ACTIVE_STATE_VALUE = 1
|
|
fakeProtocols.has_runtime_support.return_value = True
|
|
fakeProtocols.get_display_class.return_value = mock.Mock(return_value=fakeDisplay)
|
|
emitSignal = mock.Mock()
|
|
backend = compositor_state_wayland.WaylandSharedProtocolsBackend(
|
|
environment={"WAYLAND_DISPLAY": "wayland-0"},
|
|
protocols=fakeProtocols,
|
|
)
|
|
|
|
with (
|
|
mock.patch.object(compositor_state_wayland, "get_session_type", return_value="wayland"),
|
|
mock.patch.object(compositor_state_wayland.GLib, "io_add_watch", return_value=41) as ioAddWatch,
|
|
mock.patch.object(compositor_state_wayland.GLib, "source_remove") as sourceRemove,
|
|
):
|
|
backend.activate(emitSignal)
|
|
|
|
ioAddWatch.assert_called_once()
|
|
watchCallback = ioAddWatch.call_args.args[3]
|
|
self.assertTrue(watchCallback(17, compositor_state_wayland.GLib.IO_IN))
|
|
self.assertEqual(fakeDisplay.dispatch.call_count, 2)
|
|
|
|
backend.deactivate()
|
|
|
|
sourceRemove.assert_called_once_with(41)
|
|
|
|
def test_wayland_backend_logs_activation_failure_reason(self) -> None:
|
|
fakeDisplay = mock.Mock()
|
|
fakeDisplay.connect.side_effect = ValueError("Unable to connect to display")
|
|
fakeProtocols = mock.Mock()
|
|
fakeProtocols.has_runtime_support.return_value = True
|
|
fakeProtocols.get_display_class.return_value = mock.Mock(return_value=fakeDisplay)
|
|
backend = compositor_state_wayland.WaylandSharedProtocolsBackend(
|
|
environment={"WAYLAND_DISPLAY": "wayland-0"},
|
|
protocols=fakeProtocols,
|
|
)
|
|
|
|
with (
|
|
mock.patch.object(compositor_state_wayland, "get_session_type", return_value="wayland"),
|
|
mock.patch.object(compositor_state_wayland.debug, "printMessage") as printMessage,
|
|
):
|
|
backend.activate(mock.Mock())
|
|
|
|
printMessage.assert_any_call(
|
|
compositor_state_wayland.debug.LEVEL_WARNING,
|
|
mock.ANY,
|
|
True,
|
|
)
|
|
self.assertIn("Unable to connect to display", printMessage.call_args_list[-1].args[1])
|
|
self.assertIsNone(backend._display)
|
|
|
|
def test_local_ext_workspace_wrapper_supports_base_pywayland_without_distro_protocol_module(self) -> None:
|
|
fakeClientModule = types.ModuleType("pywayland.client")
|
|
|
|
class FakeDisplay:
|
|
pass
|
|
|
|
fakeClientModule.Display = FakeDisplay
|
|
|
|
def fake_import(moduleName: str, package=None):
|
|
if moduleName in ("pywayland.client", "pywayland.client.display"):
|
|
return fakeClientModule
|
|
raise ImportError(moduleName)
|
|
|
|
try:
|
|
with mock.patch("importlib.import_module", side_effect=fake_import):
|
|
importlib.reload(ext_workspace_v1)
|
|
self.assertTrue(ext_workspace_v1.has_runtime_support())
|
|
self.assertIs(ext_workspace_v1.get_display_class(), FakeDisplay)
|
|
self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceManagerV1)
|
|
self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceHandleV1)
|
|
self.assertEqual(
|
|
ext_workspace_v1.ExtWorkspaceManagerV1.interface_name,
|
|
"ext_workspace_manager_v1",
|
|
)
|
|
self.assertEqual(
|
|
ext_workspace_v1.ExtWorkspaceHandleV1.interface_name,
|
|
"ext_workspace_handle_v1",
|
|
)
|
|
finally:
|
|
importlib.reload(ext_workspace_v1)
|
|
|
|
def test_wayland_backend_is_selected_when_available(self) -> None:
|
|
adapter = compositor_state_adapter.CompositorStateAdapter()
|
|
selected_backend = mock.Mock(name="selected-backend")
|
|
fallback_backend = mock.Mock(name="fallback-backend")
|
|
selected_backend.name = "wayland-shared-protocols"
|
|
selected_backend.is_available.return_value = True
|
|
fallback_backend.name = "null"
|
|
fallback_backend.is_available.return_value = True
|
|
adapter._workspaceBackends = [selected_backend, fallback_backend]
|
|
|
|
adapter.activate()
|
|
|
|
selected_backend.activate.assert_called_once()
|
|
fallback_backend.activate.assert_not_called()
|
|
self.assertTrue(callable(selected_backend.activate.call_args.args[0]))
|
|
self.assertEqual(adapter.get_snapshot().backend_name, "wayland-shared-protocols")
|
|
|
|
def test_activate_is_idempotent_and_deactivates_previous_backend(self) -> None:
|
|
backend = FakeWorkspaceBackend(True, "selected")
|
|
adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[backend])
|
|
|
|
adapter.activate()
|
|
adapter.activate()
|
|
|
|
self.assertEqual(len(backend.activate_calls), 2)
|
|
self.assertTrue(all(callable(call) for call in backend.activate_calls))
|
|
self.assertEqual(len(backend.deactivate_calls), 1)
|
|
self.assertEqual(adapter.get_snapshot().backend_name, "selected")
|
|
|
|
def test_sync_accessible_context_emits_focus_context_changed_when_active_window_token_changes(self) -> None:
|
|
adapter = compositor_state_adapter.CompositorStateAdapter()
|
|
events = []
|
|
adapter.add_listener(events.append)
|
|
firstWindow = object()
|
|
secondWindow = object()
|
|
|
|
def get_process_id(obj):
|
|
return 111 if obj is firstWindow else 222
|
|
|
|
def get_name(obj):
|
|
return "Terminal"
|
|
|
|
with (
|
|
mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", side_effect=get_process_id),
|
|
mock.patch.object(compositor_state_adapter.AXObject, "get_name", side_effect=get_name),
|
|
):
|
|
cthulhu_state.activeWindow = firstWindow
|
|
cthulhu_state.locusOfFocus = firstWindow
|
|
adapter.sync_accessible_context("startup")
|
|
|
|
events.clear()
|
|
cthulhu_state.activeWindow = secondWindow
|
|
cthulhu_state.locusOfFocus = secondWindow
|
|
adapter.sync_accessible_context("workspace transition")
|
|
|
|
self.assertIn(
|
|
compositor_state_types.DESKTOP_FOCUS_CONTEXT_CHANGED,
|
|
[event.type for event in events],
|
|
)
|
|
self.assertIn(
|
|
compositor_state_types.PRIORITIZE_FOCUS,
|
|
[event.type for event in events],
|
|
)
|
|
|
|
def test_sync_accessible_context_builds_stable_active_window_tokens(self) -> None:
|
|
adapter = compositor_state_adapter.CompositorStateAdapter()
|
|
window = object()
|
|
|
|
with (
|
|
mock.patch.object(compositor_state_adapter.AXObject, "get_process_id", return_value=4242),
|
|
mock.patch.object(compositor_state_adapter.AXObject, "get_name", return_value="Terminal"),
|
|
):
|
|
cthulhu_state.activeWindow = window
|
|
cthulhu_state.locusOfFocus = window
|
|
snapshot = adapter.sync_accessible_context("startup")
|
|
|
|
self.assertEqual(snapshot.active_window_token, "4242:Terminal")
|
|
self.assertEqual(cthulhu_state.compositorSnapshot.active_window_token, "4242:Terminal")
|
|
|
|
def test_workspace_backend_normalizes_initial_done_and_handoff(self) -> None:
|
|
adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[])
|
|
events = []
|
|
adapter.add_listener(events.append)
|
|
workspace = compositor_state_wayland.WaylandSharedProtocolsBackend()
|
|
first_workspace = mock.Mock()
|
|
second_workspace = mock.Mock()
|
|
first_workspace_token = f"workspace:{id(first_workspace)}"
|
|
|
|
adapter._snapshot = compositor_state_types.DesktopContextSnapshot(
|
|
session_type="wayland",
|
|
backend_name="wayland-shared-protocols",
|
|
)
|
|
|
|
with mock.patch.object(workspace, "is_available", return_value=False):
|
|
workspace.activate(adapter._handle_workspace_signal)
|
|
workspace._handle_workspace_state(first_workspace, "active", True)
|
|
workspace._handle_workspace_manager_done()
|
|
|
|
self.assertEqual(
|
|
[event.type for event in events],
|
|
[
|
|
compositor_state_types.DESKTOP_TRANSITION_STARTED,
|
|
compositor_state_types.PAUSE_ATSPI_CHURN,
|
|
compositor_state_types.WORKSPACE_STATE_CHANGED,
|
|
compositor_state_types.DESKTOP_TRANSITION_FINISHED,
|
|
compositor_state_types.RESUME_ATSPI_CHURN,
|
|
compositor_state_types.FLUSH_STALE_ATSPI_EVENTS,
|
|
],
|
|
)
|
|
self.assertEqual(events[0].snapshot.active_workspace_ids, frozenset({first_workspace_token}))
|
|
self.assertTrue(events[0].snapshot.workspace_transition_pending)
|
|
self.assertEqual(events[2].snapshot.active_workspace_ids, frozenset({first_workspace_token}))
|
|
self.assertFalse(adapter.get_snapshot().workspace_transition_pending)
|
|
self.assertEqual(adapter.get_snapshot().active_workspace_ids, frozenset({first_workspace_token}))
|
|
self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_ids, frozenset({first_workspace_token}))
|
|
|
|
events.clear()
|
|
workspace._handle_workspace_id(second_workspace, "ws-2")
|
|
workspace._handle_workspace_state(first_workspace, "active", False)
|
|
workspace._handle_workspace_state(second_workspace, "active", True)
|
|
workspace._handle_workspace_manager_done()
|
|
|
|
event_types = [event.type for event in events]
|
|
|
|
self.assertEqual(
|
|
event_types,
|
|
[
|
|
compositor_state_types.DESKTOP_TRANSITION_STARTED,
|
|
compositor_state_types.PAUSE_ATSPI_CHURN,
|
|
compositor_state_types.WORKSPACE_STATE_CHANGED,
|
|
compositor_state_types.DESKTOP_TRANSITION_FINISHED,
|
|
compositor_state_types.RESUME_ATSPI_CHURN,
|
|
compositor_state_types.FLUSH_STALE_ATSPI_EVENTS,
|
|
],
|
|
)
|
|
self.assertTrue(events[0].snapshot.workspace_transition_pending)
|
|
self.assertEqual(events[0].snapshot.active_workspace_ids, frozenset())
|
|
self.assertEqual(events[2].snapshot.active_workspace_ids, frozenset({"ws-2"}))
|
|
self.assertEqual(adapter.get_snapshot().active_workspace_ids, frozenset({"ws-2"}))
|
|
self.assertFalse(adapter.get_snapshot().workspace_transition_pending)
|
|
self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_ids, frozenset({"ws-2"}))
|
|
self.assertFalse(cthulhu_state.pauseAtspiChurn)
|
|
|
|
def test_event_manager_startup_resyncs_adapter_after_focus_recovery(self) -> None:
|
|
adapter = mock.Mock()
|
|
adapter.sync_accessible_context = mock.Mock(return_value=None)
|
|
listener = mock.Mock()
|
|
window = object()
|
|
focusedObject = object()
|
|
|
|
with (
|
|
mock.patch.object(cthulhu.event_manager.Atspi.EventListener, "new", return_value=listener),
|
|
mock.patch.object(cthulhu.event_manager.AXUtilities, "can_be_active_window", return_value=False),
|
|
mock.patch.object(cthulhu.event_manager.AXUtilities, "find_active_window", return_value=window),
|
|
mock.patch.object(cthulhu.event_manager.AXUtilities, "get_focused_object", return_value=focusedObject),
|
|
mock.patch.object(cthulhu.event_manager.cthulhu, "setActiveWindow") as setActiveWindow,
|
|
mock.patch.object(cthulhu.event_manager.cthulhu, "setLocusOfFocus") as setLocusOfFocus,
|
|
):
|
|
manager = cthulhu.event_manager.EventManager(mock.Mock())
|
|
manager.set_compositor_state_adapter(adapter)
|
|
manager._sync_focus_on_startup()
|
|
|
|
setActiveWindow.assert_called_once_with(window, alsoSetLocusOfFocus=True, notifyScript=False)
|
|
setLocusOfFocus.assert_called_once_with(None, focusedObject, notifyScript=True, force=True)
|
|
adapter.sync_accessible_context.assert_called_once_with("event-manager-startup")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|