From d0aef1331d949f2e13fc4e5da84ed579f0f33347 Mon Sep 17 00:00:00 2001 From: Hunter Jozwiak Date: Thu, 9 Apr 2026 08:07:20 -0400 Subject: [PATCH] feat: add shared Wayland workspace backend --- meson.build | 1 + pyproject.toml | 5 + src/cthulhu/compositor_state_adapter.py | 115 +++++- src/cthulhu/compositor_state_types.py | 7 + src/cthulhu/compositor_state_wayland.py | 341 ++++++++++++++++++ src/cthulhu/meson.build | 2 + src/cthulhu/wayland_protocols/__init__.py | 14 + .../wayland_protocols/ext_workspace_v1.py | 130 +++++++ src/cthulhu/wayland_protocols/meson.build | 9 + ...st_compositor_state_adapter_regressions.py | 134 +++++++ 10 files changed, 749 insertions(+), 9 deletions(-) create mode 100644 src/cthulhu/compositor_state_wayland.py create mode 100644 src/cthulhu/wayland_protocols/__init__.py create mode 100644 src/cthulhu/wayland_protocols/ext_workspace_v1.py create mode 100644 src/cthulhu/wayland_protocols/meson.build diff --git a/meson.build b/meson.build index de1c219..23f79e9 100644 --- a/meson.build +++ b/meson.build @@ -57,6 +57,7 @@ optional_modules = { 'speechd': 'speech output', 'dasbus': 'D-Bus remote controller', 'psutil': 'system information commands', + 'pywayland': 'Wayland shared workspace backend', 'gi.repository.Wnck': 'mouse review', 'pdf2image': 'PDF processing for OCR', 'scipy': 'Scientific computing for OCR analysis', diff --git a/pyproject.toml b/pyproject.toml index 9527f5b..2e31090 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,11 @@ dependencies = [ "louis; extra == 'braille'" ] +[project.optional-dependencies] +wayland = [ + "pywayland" +] + [project.scripts] cthulhu = "cthulhu.cthulhu:main" diff --git a/src/cthulhu/compositor_state_adapter.py b/src/cthulhu/compositor_state_adapter.py index fbc4def..5854c81 100644 --- a/src/cthulhu/compositor_state_adapter.py +++ b/src/cthulhu/compositor_state_adapter.py @@ -11,6 +11,7 @@ from __future__ import annotations +from dataclasses import replace from collections.abc import Callable, Iterable from typing import Any, Optional @@ -19,12 +20,16 @@ from .ax_object import AXObject from .compositor_state_types import ( CompositorStateEvent, DesktopContextSnapshot, + DESKTOP_TRANSITION_FINISHED, + DESKTOP_TRANSITION_STARTED, DESKTOP_FOCUS_CONTEXT_CHANGED, FLUSH_STALE_ATSPI_EVENTS, PAUSE_ATSPI_CHURN, PRIORITIZE_FOCUS, RESUME_ATSPI_CHURN, + WORKSPACE_STATE_CHANGED, ) +from .compositor_state_wayland import NullWorkspaceBackend, WaylandSharedProtocolsBackend from .wnck_support import get_session_type @@ -35,7 +40,12 @@ class CompositorStateAdapter: self, workspace_backends: Optional[Iterable[Any]] = None, ) -> None: - self._workspaceBackends = list(workspace_backends or []) + if workspace_backends is None: + workspace_backends = [ + WaylandSharedProtocolsBackend(), + NullWorkspaceBackend(), + ] + self._workspaceBackends = list(workspace_backends) self._workspaceBackend: Optional[Any] = None self._listeners: list[Callable[[CompositorStateEvent], None]] = [] self._snapshot = DesktopContextSnapshot(session_type=self.get_session_type()) @@ -75,9 +85,14 @@ class CompositorStateAdapter: active_window_snapshot = self._build_object_snapshot(active_window) locus_of_focus_snapshot = self._build_object_snapshot(locus_of_focus) + workspaceSnapshot = self._snapshot snapshot = DesktopContextSnapshot( session_type=self.get_session_type(), backend_name=self._backend_name(), + active_workspace_token=workspaceSnapshot.active_workspace_token, + active_workspace_name=workspaceSnapshot.active_workspace_name, + active_workspace_coordinates=workspaceSnapshot.active_workspace_coordinates, + workspace_transition_pending=workspaceSnapshot.workspace_transition_pending, active_window_token=active_window_snapshot["token"], locus_of_focus_token=locus_of_focus_snapshot["token"], active_window_pid=active_window_snapshot["pid"], @@ -120,8 +135,89 @@ class CompositorStateAdapter: return {"token": token, "pid": pid, "name": name} - def _emit(self, event_type: str, reason: str, snapshot: Optional[DesktopContextSnapshot]) -> None: - event = CompositorStateEvent(event_type, reason=reason, snapshot=snapshot) + def handle_workspace_event( + self, + event_type: str, + *, + reason: str = "", + payload: Optional[dict[str, Any]] = None, + ) -> None: + eventPayload = dict(payload or {}) + + if event_type == DESKTOP_TRANSITION_STARTED: + snapshot = self._update_workspace_snapshot( + eventPayload, + transition_pending=True, + use_previous_workspace=True, + ) + cthulhu_state.pauseAtspiChurn = True + self._emit(event_type, reason, snapshot, eventPayload) + self._emit(PAUSE_ATSPI_CHURN, reason, snapshot, eventPayload) + return + + if event_type == WORKSPACE_STATE_CHANGED: + snapshot = self._update_workspace_snapshot( + eventPayload, + transition_pending=bool(eventPayload.get("is_transition_pending", False)), + ) + self._emit(event_type, reason, snapshot, eventPayload) + return + + if event_type == DESKTOP_TRANSITION_FINISHED: + snapshot = self._update_workspace_snapshot( + eventPayload, + transition_pending=False, + ) + cthulhu_state.pauseAtspiChurn = False + self._emit(event_type, reason, snapshot, eventPayload) + self._emit(RESUME_ATSPI_CHURN, reason, snapshot, eventPayload) + self._emit(FLUSH_STALE_ATSPI_EVENTS, reason, snapshot, eventPayload) + return + + if event_type == PAUSE_ATSPI_CHURN: + cthulhu_state.pauseAtspiChurn = True + elif event_type == RESUME_ATSPI_CHURN: + cthulhu_state.pauseAtspiChurn = False + + self._emit(event_type, reason, self._snapshot, eventPayload) + + def _update_workspace_snapshot( + self, + payload: dict[str, Any], + *, + transition_pending: Optional[bool] = None, + use_previous_workspace: bool = False, + ) -> DesktopContextSnapshot: + tokenKey = "previous_workspace_token" if use_previous_workspace else "workspace_token" + nameKey = "previous_workspace_name" if use_previous_workspace else "workspace_name" + coordinatesKey = ( + "previous_workspace_coordinates" if use_previous_workspace else "workspace_coordinates" + ) + + coordinates = payload.get(coordinatesKey, self._snapshot.active_workspace_coordinates) or () + snapshot = replace( + self._snapshot, + active_workspace_token=str(payload.get(tokenKey, self._snapshot.active_workspace_token) or ""), + active_workspace_name=str(payload.get(nameKey, self._snapshot.active_workspace_name) or ""), + active_workspace_coordinates=tuple(coordinates), + workspace_transition_pending=( + self._snapshot.workspace_transition_pending + if transition_pending is None + else bool(transition_pending) + ), + ) + self._snapshot = snapshot + cthulhu_state.compositorSnapshot = snapshot + return snapshot + + def _emit( + self, + event_type: str, + reason: str, + snapshot: Optional[DesktopContextSnapshot], + payload: Optional[dict[str, Any]] = None, + ) -> None: + event = CompositorStateEvent(event_type, reason=reason, snapshot=snapshot, payload=payload or {}) for listener in list(self._listeners): listener(event) @@ -134,18 +230,19 @@ class CompositorStateAdapter: return if any(marker in normalized for marker in ("start", "begin", "enter", "prepare", "opening")): - cthulhu_state.pauseAtspiChurn = True - self._emit(PAUSE_ATSPI_CHURN, signal_name, self._snapshot) + self.handle_workspace_event(DESKTOP_TRANSITION_STARTED, reason=signal_name) return if any(marker in normalized for marker in ("end", "complete", "finished", "finish", "leave", "exit", "stop", "done")): - cthulhu_state.pauseAtspiChurn = False - self._emit(RESUME_ATSPI_CHURN, signal_name, self._snapshot) - self._emit(FLUSH_STALE_ATSPI_EVENTS, signal_name, self._snapshot) + self.handle_workspace_event(DESKTOP_TRANSITION_FINISHED, reason=signal_name) return if "flush" in normalized or "stale" in normalized: - self._emit(FLUSH_STALE_ATSPI_EVENTS, signal_name, self._snapshot) + self.handle_workspace_event(FLUSH_STALE_ATSPI_EVENTS, reason=signal_name) + return + + if "workspace" in normalized: + self.handle_workspace_event(WORKSPACE_STATE_CHANGED, reason=signal_name) def _backend_is_available(self, backend: Any) -> bool: if hasattr(backend, "is_available"): diff --git a/src/cthulhu/compositor_state_types.py b/src/cthulhu/compositor_state_types.py index 1ee16e1..39b24a6 100644 --- a/src/cthulhu/compositor_state_types.py +++ b/src/cthulhu/compositor_state_types.py @@ -16,6 +16,9 @@ from typing import Any, Mapping, Optional DESKTOP_FOCUS_CONTEXT_CHANGED = "DESKTOP_FOCUS_CONTEXT_CHANGED" PRIORITIZE_FOCUS = "PRIORITIZE_FOCUS" +WORKSPACE_STATE_CHANGED = "WORKSPACE_STATE_CHANGED" +DESKTOP_TRANSITION_STARTED = "DESKTOP_TRANSITION_STARTED" +DESKTOP_TRANSITION_FINISHED = "DESKTOP_TRANSITION_FINISHED" PAUSE_ATSPI_CHURN = "PAUSE_ATSPI_CHURN" RESUME_ATSPI_CHURN = "RESUME_ATSPI_CHURN" FLUSH_STALE_ATSPI_EVENTS = "FLUSH_STALE_ATSPI_EVENTS" @@ -27,6 +30,10 @@ class DesktopContextSnapshot: session_type: str backend_name: str = "" + active_workspace_token: str = "" + active_workspace_name: str = "" + active_workspace_coordinates: tuple[Any, ...] = field(default_factory=tuple) + workspace_transition_pending: bool = False active_window_token: str = "" locus_of_focus_token: str = "" active_window_pid: int = -1 diff --git a/src/cthulhu/compositor_state_wayland.py b/src/cthulhu/compositor_state_wayland.py new file mode 100644 index 0000000..73320b2 --- /dev/null +++ b/src/cthulhu/compositor_state_wayland.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2026 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. + +"""Wayland workspace backends for normalized compositor state tracking.""" + +from __future__ import annotations + +import os +from collections.abc import Mapping, Sequence +from typing import Any + +from .wayland_protocols import ext_workspace_v1 +from .wnck_support import get_session_type + + +class NullWorkspaceBackend: + """No-op backend used when no real workspace backend is available.""" + + name = "null" + + def __init__(self) -> None: + self._adapter = None + + def is_available(self, session_type: str | None = None) -> bool: + return True + + def activate(self, adapter: Any = None) -> None: + self._adapter = adapter + + def deactivate(self, adapter: Any = None) -> None: + self._adapter = None + + +class WaylandSharedProtocolsBackend: + """Runtime-optional backend for the ext-workspace shared Wayland protocol.""" + + name = "wayland-shared-protocols" + + def __init__( + self, + *, + environment: Mapping[str, str] | None = None, + protocols: Any = None, + ) -> None: + self._environment = environment if environment is not None else os.environ + self._protocols = protocols or ext_workspace_v1 + self._adapter = None + self._display = None + self._registry = None + self._workspaceManager = None + self._workspaceStates: dict[Any, dict[str, Any]] = {} + self._activeWorkspace = None + self._batchDirty = False + + def is_available(self, session_type: str | None = None) -> bool: + effectiveSessionType = (session_type or get_session_type()).strip().lower() + return ( + effectiveSessionType == "wayland" + and bool((self._environment.get("WAYLAND_DISPLAY") or "").strip()) + and bool(self._protocols.has_runtime_support()) + ) + + def activate(self, adapter: Any = None) -> None: + self.deactivate() + self._adapter = adapter + sessionType = None + getSessionType = getattr(adapter, "get_session_type", None) + if callable(getSessionType): + sessionType = getSessionType() + + if adapter is None or not self.is_available(sessionType): + return + + displayClass = self._protocols.get_display_class() + if displayClass is None: + return + + try: + self._display = displayClass() + connect = getattr(self._display, "connect", None) + if callable(connect): + connect() + get_registry = getattr(self._display, "get_registry", None) + if not callable(get_registry): + raise RuntimeError("Wayland display missing registry access") + self._registry = get_registry() + self._bind_listener(self._registry, "global", self._handle_registry_global) + self._bind_listener(self._registry, "global_remove", self._handle_registry_global_remove) + self._roundtrip() + except Exception: + self.deactivate() + + def deactivate(self, adapter: Any = None) -> None: + self._activeWorkspace = None + self._workspaceStates = {} + self._batchDirty = False + self._safe_close_proxy(self._workspaceManager) + self._safe_close_proxy(self._registry) + self._workspaceManager = None + self._registry = None + + if self._display is not None: + disconnect = getattr(self._display, "disconnect", None) + if callable(disconnect): + try: + disconnect() + except Exception: + pass + self._display = None + self._adapter = None + + def _bind_listener(self, target: Any, event_name: str, callback: Any) -> bool: + dispatcher = getattr(target, "dispatcher", None) + if dispatcher is not None: + try: + dispatcher[event_name] = callback + return True + except Exception: + pass + + add_listener = getattr(target, "add_listener", None) + if callable(add_listener): + try: + add_listener(**{event_name: callback}) + return True + except TypeError: + try: + add_listener(event_name, callback) + return True + except TypeError: + pass + + attributeName = f"on_{event_name}" + if hasattr(target, attributeName): + setattr(target, attributeName, callback) + return True + + return False + + def _roundtrip(self) -> None: + if self._display is None: + return + + for methodName in ("roundtrip", "dispatch", "dispatch_pending"): + method = getattr(self._display, methodName, None) + if callable(method): + method() + break + + def _safe_close_proxy(self, proxy: Any) -> None: + if proxy is None: + return + + for methodName in ("destroy", "release"): + method = getattr(proxy, methodName, None) + if callable(method): + try: + method() + except Exception: + pass + return + + def _handle_registry_global(self, *args: Any) -> None: + if self._workspaceManager is not None: + return + + globalName, interfaceName, interfaceVersion = self._parse_registry_global_args(args) + if interfaceName != self._protocols.INTERFACE_NAME: + return + + registry = self._registry + if registry is None: + return + + bindVersion = min(interfaceVersion, self._protocols.INTERFACE_VERSION) + try: + self._workspaceManager = self._protocols.bind_workspace_manager( + registry, + globalName, + bindVersion, + ) + except Exception: + self._workspaceManager = None + + if self._workspaceManager is None: + return + + self._bind_listener(self._workspaceManager, "workspace", self._handle_workspace_manager_workspace) + self._bind_listener(self._workspaceManager, "done", self._handle_workspace_manager_done) + self._bind_listener(self._workspaceManager, "finished", self._handle_workspace_manager_finished) + + def _handle_registry_global_remove(self, *args: Any) -> None: + return + + def _handle_workspace_manager_workspace(self, *args: Any) -> None: + workspaceHandle = args[-1] if args else None + if workspaceHandle is None or workspaceHandle in self._workspaceStates: + return + + self._workspaceStates[workspaceHandle] = {"active": False} + self._bind_listener(workspaceHandle, "state", self._handle_workspace_state) + self._bind_listener(workspaceHandle, "name", self._handle_workspace_name) + self._bind_listener(workspaceHandle, "coordinates", self._handle_workspace_coordinates) + self._bind_listener(workspaceHandle, "removed", self._handle_workspace_removed) + + def _handle_workspace_name(self, workspaceHandle: Any, name: str, *_args: Any) -> None: + workspaceState = self._workspaceStates.setdefault(workspaceHandle, {"active": False}) + workspaceState["name"] = (name or "").strip() + self._batchDirty = True + + def _handle_workspace_coordinates(self, workspaceHandle: Any, coordinates: Sequence[Any], *_args: Any) -> None: + workspaceState = self._workspaceStates.setdefault(workspaceHandle, {"active": False}) + workspaceState["coordinates"] = tuple(coordinates or ()) + self._batchDirty = True + + def _handle_workspace_state(self, workspaceHandle: Any, stateValue: Any, value: Any = None, *_args: Any) -> None: + workspaceState = self._workspaceStates.setdefault(workspaceHandle, {"active": False}) + workspaceState["active"] = self._workspace_is_active(stateValue, value) + self._batchDirty = True + + def _handle_workspace_removed(self, workspaceHandle: Any, *_args: Any) -> None: + self._workspaceStates.pop(workspaceHandle, None) + if workspaceHandle is self._activeWorkspace: + self._activeWorkspace = None + self._batchDirty = True + + def _handle_workspace_manager_done(self, *_args: Any) -> None: + if not self._batchDirty: + return + + newActiveWorkspace = self._find_active_workspace() + previousWorkspace = self._activeWorkspace + transitionReason = "wayland-workspace-batch-done" + transitionPayload = self._build_workspace_payload( + newActiveWorkspace, + previousWorkspace, + is_transition_pending=True, + ) + finalPayload = self._build_workspace_payload( + newActiveWorkspace, + previousWorkspace, + is_transition_pending=False, + ) + + if previousWorkspace is not None and previousWorkspace is not newActiveWorkspace: + self._emit_adapter_event("DESKTOP_TRANSITION_STARTED", transitionReason, transitionPayload) + + if previousWorkspace is not newActiveWorkspace or newActiveWorkspace is not None: + self._emit_adapter_event("WORKSPACE_STATE_CHANGED", transitionReason, finalPayload) + + if previousWorkspace is not None and previousWorkspace is not newActiveWorkspace: + self._emit_adapter_event("DESKTOP_TRANSITION_FINISHED", transitionReason, finalPayload) + + self._activeWorkspace = newActiveWorkspace + self._batchDirty = False + + def _handle_workspace_manager_finished(self, *_args: Any) -> None: + self._workspaceManager = None + + def _find_active_workspace(self) -> Any: + for workspaceHandle, workspaceState in self._workspaceStates.items(): + if workspaceState.get("active"): + return workspaceHandle + return None + + def _workspace_is_active(self, stateValue: Any, value: Any = None) -> bool: + if isinstance(stateValue, str): + return stateValue.strip().lower() == "active" and bool(value) + + if isinstance(stateValue, Mapping): + return bool(stateValue.get("active")) + + if isinstance(stateValue, Sequence) and not isinstance(stateValue, (str, bytes, bytearray)): + return self._protocols.ACTIVE_STATE_VALUE in stateValue + + return bool(stateValue) + + def _build_workspace_payload( + self, + workspaceHandle: Any, + previousWorkspaceHandle: Any, + *, + is_transition_pending: bool, + ) -> dict[str, Any]: + workspaceState = self._workspaceStates.get(workspaceHandle, {}) + previousWorkspaceState = self._workspaceStates.get(previousWorkspaceHandle, {}) + return { + "workspace_handle": workspaceHandle, + "workspace_token": self._workspace_token(workspaceHandle), + "workspace_name": str(workspaceState.get("name", "") or ""), + "workspace_coordinates": tuple(workspaceState.get("coordinates", ()) or ()), + "previous_workspace_handle": previousWorkspaceHandle, + "previous_workspace_token": self._workspace_token(previousWorkspaceHandle), + "previous_workspace_name": str(previousWorkspaceState.get("name", "") or ""), + "previous_workspace_coordinates": tuple(previousWorkspaceState.get("coordinates", ()) or ()), + "is_transition_pending": is_transition_pending, + } + + def _workspace_token(self, workspaceHandle: Any) -> str: + if workspaceHandle is None: + return "" + return f"workspace:{id(workspaceHandle)}" + + def _emit_adapter_event(self, eventName: str, reason: str, payload: dict[str, Any]) -> None: + if self._adapter is None: + return + + handleWorkspaceEvent = getattr(self._adapter, "handle_workspace_event", None) + if callable(handleWorkspaceEvent): + handleWorkspaceEvent(eventName, reason=reason, payload=payload) + return + + legacyHandler = getattr(self._adapter, "_handle_workspace_signal", None) + if callable(legacyHandler): + legacyHandler(eventName, payload) + + def _parse_registry_global_args(self, args: Sequence[Any]) -> tuple[int, str, int]: + if len(args) >= 4: + _, globalName, interfaceName, interfaceVersion = args[-4:] + elif len(args) >= 3: + globalName, interfaceName, interfaceVersion = args[-3:] + else: + return (-1, "", 0) + + try: + parsedName = int(globalName) + except (TypeError, ValueError): + parsedName = -1 + + try: + parsedVersion = int(interfaceVersion) + except (TypeError, ValueError): + parsedVersion = self._protocols.INTERFACE_VERSION + + return parsedName, str(interfaceName or ""), parsedVersion diff --git a/src/cthulhu/meson.build b/src/cthulhu/meson.build index 8e9d9eb..d025922 100644 --- a/src/cthulhu/meson.build +++ b/src/cthulhu/meson.build @@ -35,6 +35,7 @@ cthulhu_python_sources = files([ 'colornames.py', 'compositor_state_adapter.py', 'compositor_state_types.py', + 'compositor_state_wayland.py', 'common_keyboardmap.py', 'cthulhuVersion.py', 'cthulhu_modifier_manager.py', @@ -180,4 +181,5 @@ install_data( # Subdirectories subdir('backends') subdir('scripts') +subdir('wayland_protocols') subdir('plugins') diff --git a/src/cthulhu/wayland_protocols/__init__.py b/src/cthulhu/wayland_protocols/__init__.py new file mode 100644 index 0000000..5083127 --- /dev/null +++ b/src/cthulhu/wayland_protocols/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2026 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. + +"""Runtime-optional Wayland protocol compatibility wrappers.""" + +from . import ext_workspace_v1 + +__all__ = ["ext_workspace_v1"] diff --git a/src/cthulhu/wayland_protocols/ext_workspace_v1.py b/src/cthulhu/wayland_protocols/ext_workspace_v1.py new file mode 100644 index 0000000..c3d2a46 --- /dev/null +++ b/src/cthulhu/wayland_protocols/ext_workspace_v1.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2026 Stormux +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. + +"""Local ext-workspace protocol wrapper built on top of base pywayland.""" + +from __future__ import annotations + +import importlib +from types import ModuleType +from typing import Any + +INTERFACE_NAME = "ext_workspace_manager_v1" +INTERFACE_VERSION = 1 +ACTIVE_STATE_VALUE = 1 +WORKSPACE_HANDLE_INTERFACE_NAME = "ext_workspace_handle_v1" + + +def _load_module(module_names: list[str]) -> ModuleType | None: + for moduleName in module_names: + try: + return importlib.import_module(moduleName) + except ImportError: + continue + return None + + +def _first_attribute(module: ModuleType | None, attribute_names: list[str]) -> Any: + if module is None: + return None + + for attributeName in attribute_names: + if hasattr(module, attributeName): + return getattr(module, attributeName) + return None + + +_pywaylandClientModule = _load_module( + [ + "pywayland.client", + "pywayland.client.display", + ] +) +Display = _first_attribute(_pywaylandClientModule, ["Display"]) + + +class _LocalProtocolProxy: + """Minimal local proxy descriptor with a dispatcher-compatible shape.""" + + interface_name = "" + version = 1 + event_names: tuple[str, ...] = () + + def __init__(self, *args: Any, **kwargs: Any) -> None: + try: + super().__init__(*args, **kwargs) + except Exception: + pass + self.dispatcher = getattr(self, "dispatcher", {}) or {} + + +class ExtWorkspaceManagerV1(_LocalProtocolProxy): + interface_name = INTERFACE_NAME + version = INTERFACE_VERSION + event_names = ("workspace", "done", "finished") + + +class ExtWorkspaceHandleV1(_LocalProtocolProxy): + interface_name = WORKSPACE_HANDLE_INTERFACE_NAME + version = INTERFACE_VERSION + event_names = ("name", "coordinates", "state", "removed") + + +def get_display_class() -> Any: + return Display + + +def has_runtime_support() -> bool: + return Display is not None + + +def bind_workspace_manager(registry: Any, global_name: int, version: int) -> Any: + bind = getattr(registry, "bind", None) + if not callable(bind): + return None + + for bindArgs in ( + (global_name, ExtWorkspaceManagerV1, version), + (global_name, version, ExtWorkspaceManagerV1), + (global_name, INTERFACE_NAME, version), + (global_name, version, INTERFACE_NAME), + ): + try: + proxy = bind(*bindArgs) + return _ensure_dispatcher(proxy) + except TypeError: + continue + + return None + + +def _ensure_dispatcher(proxy: Any) -> Any: + if proxy is None: + return None + + if getattr(proxy, "dispatcher", None) is None: + try: + proxy.dispatcher = {} + except Exception: + pass + return proxy + + +__all__ = [ + "ACTIVE_STATE_VALUE", + "Display", + "ExtWorkspaceHandleV1", + "ExtWorkspaceManagerV1", + "INTERFACE_NAME", + "INTERFACE_VERSION", + "WORKSPACE_HANDLE_INTERFACE_NAME", + "bind_workspace_manager", + "get_display_class", + "has_runtime_support", +] diff --git a/src/cthulhu/wayland_protocols/meson.build b/src/cthulhu/wayland_protocols/meson.build new file mode 100644 index 0000000..71f53e6 --- /dev/null +++ b/src/cthulhu/wayland_protocols/meson.build @@ -0,0 +1,9 @@ +wayland_protocol_python_sources = files([ + '__init__.py', + 'ext_workspace_v1.py', +]) + +python3.install_sources( + wayland_protocol_python_sources, + subdir: 'cthulhu/wayland_protocols' +) diff --git a/tests/test_compositor_state_adapter_regressions.py b/tests/test_compositor_state_adapter_regressions.py index 7b560f0..27395bf 100644 --- a/tests/test_compositor_state_adapter_regressions.py +++ b/tests/test_compositor_state_adapter_regressions.py @@ -1,4 +1,6 @@ +import importlib import sys +import types import unittest from pathlib import Path from unittest import mock @@ -9,6 +11,8 @@ from cthulhu import cthulhu_state from cthulhu import cthulhu from cthulhu import compositor_state_adapter from cthulhu import compositor_state_types +from cthulhu import compositor_state_wayland +from cthulhu.wayland_protocols import ext_workspace_v1 class FakeWorkspaceBackend: @@ -33,6 +37,8 @@ class CompositorStateAdapterRegressionTests(unittest.TestCase): cthulhu_state.compositorSnapshot = None cthulhu_state.pauseAtspiChurn = False cthulhu_state.prioritizedDesktopContextToken = None + cthulhu_state.activeWindow = None + cthulhu_state.locusOfFocus = None def test_activate_selects_first_available_backend(self) -> None: unavailableBackend = FakeWorkspaceBackend(False, "unavailable") @@ -47,6 +53,73 @@ class CompositorStateAdapterRegressionTests(unittest.TestCase): self.assertEqual(selectedBackend.activate_calls, [adapter]) self.assertEqual(adapter.get_snapshot().backend_name, "selected") + def test_default_workspace_backends_include_wayland_then_null_backend(self) -> None: + adapter = compositor_state_adapter.CompositorStateAdapter() + + backend_types = [type(backend) for backend in adapter._workspaceBackends] + + self.assertEqual( + backend_types, + [ + compositor_state_wayland.WaylandSharedProtocolsBackend, + compositor_state_wayland.NullWorkspaceBackend, + ], + ) + + def test_wayland_backend_is_unavailable_without_wayland_session(self) -> None: + backend = compositor_state_wayland.WaylandSharedProtocolsBackend() + + with mock.patch.dict(compositor_state_wayland.os.environ, {}, clear=True): + self.assertFalse(backend.is_available("x11")) + self.assertFalse(backend.is_available("wayland")) + + def test_local_ext_workspace_wrapper_supports_base_pywayland_without_distro_protocol_module(self) -> None: + fakeClientModule = types.ModuleType("pywayland.client") + + class FakeDisplay: + pass + + fakeClientModule.Display = FakeDisplay + + def fake_import(moduleName: str, package=None): + if moduleName in ("pywayland.client", "pywayland.client.display"): + return fakeClientModule + raise ImportError(moduleName) + + try: + with mock.patch("importlib.import_module", side_effect=fake_import): + importlib.reload(ext_workspace_v1) + self.assertTrue(ext_workspace_v1.has_runtime_support()) + self.assertIs(ext_workspace_v1.get_display_class(), FakeDisplay) + self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceManagerV1) + self.assertIsNotNone(ext_workspace_v1.ExtWorkspaceHandleV1) + self.assertEqual( + ext_workspace_v1.ExtWorkspaceManagerV1.interface_name, + "ext_workspace_manager_v1", + ) + self.assertEqual( + ext_workspace_v1.ExtWorkspaceHandleV1.interface_name, + "ext_workspace_handle_v1", + ) + finally: + importlib.reload(ext_workspace_v1) + + def test_wayland_backend_is_selected_when_available(self) -> None: + adapter = compositor_state_adapter.CompositorStateAdapter() + selected_backend = mock.Mock(name="selected-backend") + fallback_backend = mock.Mock(name="fallback-backend") + selected_backend.name = "wayland-shared-protocols" + selected_backend.is_available.return_value = True + fallback_backend.name = "null" + fallback_backend.is_available.return_value = True + adapter._workspaceBackends = [selected_backend, fallback_backend] + + adapter.activate() + + selected_backend.activate.assert_called_once_with(adapter) + fallback_backend.activate.assert_not_called() + self.assertEqual(adapter.get_snapshot().backend_name, "wayland-shared-protocols") + def test_activate_is_idempotent_and_deactivates_previous_backend(self) -> None: backend = FakeWorkspaceBackend(True, "selected") adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[backend]) @@ -108,6 +181,67 @@ class CompositorStateAdapterRegressionTests(unittest.TestCase): self.assertEqual(snapshot.active_window_token, "4242:Terminal") self.assertEqual(cthulhu_state.compositorSnapshot.active_window_token, "4242:Terminal") + def test_workspace_backend_normalizes_transition_completion(self) -> None: + adapter = compositor_state_adapter.CompositorStateAdapter(workspace_backends=[]) + events = [] + adapter.add_listener(events.append) + workspace = compositor_state_wayland.WaylandSharedProtocolsBackend() + first_workspace = mock.Mock() + second_workspace = mock.Mock() + first_workspace_token = f"workspace:{id(first_workspace)}" + second_workspace_token = f"workspace:{id(second_workspace)}" + + adapter._snapshot = compositor_state_types.DesktopContextSnapshot( + session_type="wayland", + backend_name="wayland-shared-protocols", + ) + + workspace.activate(adapter) + workspace._handle_workspace_name(first_workspace, "Workspace 1") + workspace._handle_workspace_coordinates(first_workspace, (0, 0)) + workspace._handle_workspace_state(first_workspace, "active", True) + workspace._handle_workspace_manager_done() + + self.assertEqual(adapter.get_snapshot().active_workspace_token, first_workspace_token) + self.assertEqual(adapter.get_snapshot().active_workspace_name, "Workspace 1") + self.assertEqual(adapter.get_snapshot().active_workspace_coordinates, (0, 0)) + self.assertFalse(adapter.get_snapshot().workspace_transition_pending) + self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_token, first_workspace_token) + + events.clear() + workspace._handle_workspace_name(second_workspace, "Workspace 2") + workspace._handle_workspace_coordinates(second_workspace, (1, 0)) + workspace._handle_workspace_state(first_workspace, "active", False) + workspace._handle_workspace_state(second_workspace, "active", True) + workspace._handle_workspace_manager_done() + + event_types = [event.type for event in events] + + self.assertEqual( + event_types, + [ + compositor_state_types.DESKTOP_TRANSITION_STARTED, + compositor_state_types.PAUSE_ATSPI_CHURN, + compositor_state_types.WORKSPACE_STATE_CHANGED, + compositor_state_types.DESKTOP_TRANSITION_FINISHED, + compositor_state_types.RESUME_ATSPI_CHURN, + compositor_state_types.FLUSH_STALE_ATSPI_EVENTS, + ], + ) + self.assertTrue(events[0].snapshot.workspace_transition_pending) + self.assertEqual(events[0].snapshot.active_workspace_token, first_workspace_token) + self.assertFalse(events[2].payload["is_transition_pending"]) + self.assertEqual(events[2].snapshot.active_workspace_token, second_workspace_token) + self.assertEqual(events[2].snapshot.active_workspace_name, "Workspace 2") + self.assertEqual(events[2].snapshot.active_workspace_coordinates, (1, 0)) + self.assertIs(events[2].payload["workspace_handle"], second_workspace) + self.assertEqual(adapter.get_snapshot().active_workspace_token, second_workspace_token) + self.assertEqual(adapter.get_snapshot().active_workspace_name, "Workspace 2") + self.assertEqual(adapter.get_snapshot().active_workspace_coordinates, (1, 0)) + self.assertFalse(adapter.get_snapshot().workspace_transition_pending) + self.assertEqual(cthulhu_state.compositorSnapshot.active_workspace_token, second_workspace_token) + self.assertFalse(cthulhu_state.pauseAtspiChurn) + def test_event_manager_startup_resyncs_adapter_after_focus_recovery(self) -> None: adapter = mock.Mock() adapter.sync_accessible_context = mock.Mock(return_value=None)