fix: normalize workspace state signals

This commit is contained in:
2026-04-09 09:13:44 -04:00
parent d0aef1331d
commit 3671b0d6b9
4 changed files with 149 additions and 211 deletions
+69 -89
View File
@@ -73,11 +73,18 @@ class CompositorStateAdapter:
self._backend_activate(backend)
break
self._snapshot = DesktopContextSnapshot(
session_type=self.get_session_type(),
backend_name=self._backend_name(),
)
self.sync_accessible_context("activate")
def deactivate(self) -> None:
self._deactivate_current_backend()
self._snapshot = DesktopContextSnapshot(session_type=self.get_session_type())
cthulhu_state.compositorSnapshot = None
cthulhu_state.pauseAtspiChurn = False
cthulhu_state.prioritizedDesktopContextToken = None
def sync_accessible_context(self, reason: str) -> DesktopContextSnapshot:
active_window = cthulhu_state.activeWindow
@@ -89,9 +96,7 @@ class CompositorStateAdapter:
snapshot = DesktopContextSnapshot(
session_type=self.get_session_type(),
backend_name=self._backend_name(),
active_workspace_token=workspaceSnapshot.active_workspace_token,
active_workspace_name=workspaceSnapshot.active_workspace_name,
active_workspace_coordinates=workspaceSnapshot.active_workspace_coordinates,
active_workspace_ids=workspaceSnapshot.active_workspace_ids,
workspace_transition_pending=workspaceSnapshot.workspace_transition_pending,
active_window_token=active_window_snapshot["token"],
locus_of_focus_token=locus_of_focus_snapshot["token"],
@@ -135,80 +140,75 @@ class CompositorStateAdapter:
return {"token": token, "pid": pid, "name": name}
def handle_workspace_event(
def _handle_workspace_signal(
self,
event_type: str,
*,
signal_name: str,
workspace_ids: Optional[Iterable[str]] = None,
reason: str = "",
payload: Optional[dict[str, Any]] = None,
) -> None:
eventPayload = dict(payload or {})
if event_type == DESKTOP_TRANSITION_STARTED:
snapshot = self._update_workspace_snapshot(
eventPayload,
transition_pending=True,
use_previous_workspace=True,
if workspace_ids is not None:
snapshot = replace(
self._snapshot,
active_workspace_ids=frozenset(str(workspaceId) for workspaceId in workspace_ids if workspaceId),
workspace_transition_pending=signal_name == DESKTOP_TRANSITION_STARTED,
)
cthulhu_state.pauseAtspiChurn = True
self._emit(event_type, reason, snapshot, eventPayload)
self._emit(PAUSE_ATSPI_CHURN, reason, snapshot, eventPayload)
self._snapshot = snapshot
cthulhu_state.compositorSnapshot = snapshot
if signal_name == DESKTOP_TRANSITION_STARTED:
cthulhu_state.pauseAtspiChurn = True
self._emit(signal_name, reason, snapshot)
self._emit(PAUSE_ATSPI_CHURN, reason, snapshot)
return
if signal_name == DESKTOP_TRANSITION_FINISHED:
cthulhu_state.pauseAtspiChurn = False
self._emit(signal_name, reason, snapshot)
self._emit(RESUME_ATSPI_CHURN, reason, snapshot)
self._emit(FLUSH_STALE_ATSPI_EVENTS, reason, snapshot)
return
if signal_name == PAUSE_ATSPI_CHURN:
cthulhu_state.pauseAtspiChurn = True
elif signal_name == RESUME_ATSPI_CHURN:
cthulhu_state.pauseAtspiChurn = False
self._emit(signal_name, reason, snapshot)
return
if event_type == WORKSPACE_STATE_CHANGED:
snapshot = self._update_workspace_snapshot(
eventPayload,
transition_pending=bool(eventPayload.get("is_transition_pending", False)),
)
self._emit(event_type, reason, snapshot, eventPayload)
normalized = (signal_name or "").strip().lower()
if not normalized:
return
if event_type == DESKTOP_TRANSITION_FINISHED:
snapshot = self._update_workspace_snapshot(
eventPayload,
transition_pending=False,
)
cthulhu_state.pauseAtspiChurn = False
self._emit(event_type, reason, snapshot, eventPayload)
self._emit(RESUME_ATSPI_CHURN, reason, snapshot, eventPayload)
self._emit(FLUSH_STALE_ATSPI_EVENTS, reason, snapshot, eventPayload)
if "transition" not in normalized and "workspace" not in normalized:
return
if event_type == PAUSE_ATSPI_CHURN:
cthulhu_state.pauseAtspiChurn = True
elif event_type == RESUME_ATSPI_CHURN:
cthulhu_state.pauseAtspiChurn = False
if any(marker in normalized for marker in ("start", "begin", "enter", "prepare", "opening")):
self._handle_workspace_signal(
DESKTOP_TRANSITION_STARTED,
self._snapshot.active_workspace_ids,
signal_name,
)
return
self._emit(event_type, reason, self._snapshot, eventPayload)
if any(marker in normalized for marker in ("end", "complete", "finished", "finish", "leave", "exit", "stop", "done")):
self._handle_workspace_signal(
DESKTOP_TRANSITION_FINISHED,
self._snapshot.active_workspace_ids,
signal_name,
)
return
def _update_workspace_snapshot(
self,
payload: dict[str, Any],
*,
transition_pending: Optional[bool] = None,
use_previous_workspace: bool = False,
) -> DesktopContextSnapshot:
tokenKey = "previous_workspace_token" if use_previous_workspace else "workspace_token"
nameKey = "previous_workspace_name" if use_previous_workspace else "workspace_name"
coordinatesKey = (
"previous_workspace_coordinates" if use_previous_workspace else "workspace_coordinates"
)
if "flush" in normalized or "stale" in normalized:
self._emit(FLUSH_STALE_ATSPI_EVENTS, signal_name, self._snapshot)
return
coordinates = payload.get(coordinatesKey, self._snapshot.active_workspace_coordinates) or ()
snapshot = replace(
self._snapshot,
active_workspace_token=str(payload.get(tokenKey, self._snapshot.active_workspace_token) or ""),
active_workspace_name=str(payload.get(nameKey, self._snapshot.active_workspace_name) or ""),
active_workspace_coordinates=tuple(coordinates),
workspace_transition_pending=(
self._snapshot.workspace_transition_pending
if transition_pending is None
else bool(transition_pending)
),
)
self._snapshot = snapshot
cthulhu_state.compositorSnapshot = snapshot
return snapshot
if "workspace" in normalized:
self._handle_workspace_signal(
WORKSPACE_STATE_CHANGED,
self._snapshot.active_workspace_ids,
signal_name,
)
def _emit(
self,
@@ -221,29 +221,6 @@ class CompositorStateAdapter:
for listener in list(self._listeners):
listener(event)
def _handle_workspace_signal(self, signal_name: str, *_args: Any) -> None:
normalized = (signal_name or "").strip().lower()
if not normalized:
return
if "transition" not in normalized and "workspace" not in normalized:
return
if any(marker in normalized for marker in ("start", "begin", "enter", "prepare", "opening")):
self.handle_workspace_event(DESKTOP_TRANSITION_STARTED, reason=signal_name)
return
if any(marker in normalized for marker in ("end", "complete", "finished", "finish", "leave", "exit", "stop", "done")):
self.handle_workspace_event(DESKTOP_TRANSITION_FINISHED, reason=signal_name)
return
if "flush" in normalized or "stale" in normalized:
self.handle_workspace_event(FLUSH_STALE_ATSPI_EVENTS, reason=signal_name)
return
if "workspace" in normalized:
self.handle_workspace_event(WORKSPACE_STATE_CHANGED, reason=signal_name)
def _backend_is_available(self, backend: Any) -> bool:
if hasattr(backend, "is_available"):
try:
@@ -257,9 +234,12 @@ class CompositorStateAdapter:
def _backend_activate(self, backend: Any) -> None:
if hasattr(backend, "activate"):
try:
backend.activate(self)
backend.activate(self._handle_workspace_signal)
except TypeError:
backend.activate()
try:
backend.activate(self)
except TypeError:
backend.activate()
def _backend_deactivate(self, backend: Any) -> None:
if hasattr(backend, "deactivate"):
+1 -3
View File
@@ -30,9 +30,7 @@ class DesktopContextSnapshot:
session_type: str
backend_name: str = ""
active_workspace_token: str = ""
active_workspace_name: str = ""
active_workspace_coordinates: tuple[Any, ...] = field(default_factory=tuple)
active_workspace_ids: frozenset[str] = field(default_factory=frozenset)
workspace_transition_pending: bool = False
active_window_token: str = ""
locus_of_focus_token: str = ""
+45 -90
View File
@@ -15,6 +15,11 @@ import os
from collections.abc import Mapping, Sequence
from typing import Any
from .compositor_state_types import (
DESKTOP_TRANSITION_FINISHED,
DESKTOP_TRANSITION_STARTED,
WORKSPACE_STATE_CHANGED,
)
from .wayland_protocols import ext_workspace_v1
from .wnck_support import get_session_type
@@ -25,16 +30,16 @@ class NullWorkspaceBackend:
name = "null"
def __init__(self) -> None:
self._adapter = None
self._emitSignal = None
def is_available(self, session_type: str | None = None) -> bool:
return True
def activate(self, adapter: Any = None) -> None:
self._adapter = adapter
def activate(self, emit_signal: Any = None) -> None:
self._emitSignal = emit_signal
def deactivate(self, adapter: Any = None) -> None:
self._adapter = None
def deactivate(self, emit_signal: Any = None) -> None:
self._emitSignal = None
class WaylandSharedProtocolsBackend:
@@ -50,13 +55,13 @@ class WaylandSharedProtocolsBackend:
) -> None:
self._environment = environment if environment is not None else os.environ
self._protocols = protocols or ext_workspace_v1
self._adapter = None
self._emitSignal = None
self._display = None
self._registry = None
self._workspaceManager = None
self._workspaceStates: dict[Any, dict[str, Any]] = {}
self._activeWorkspace = None
self._batchDirty = False
self._transitionPending = False
def is_available(self, session_type: str | None = None) -> bool:
effectiveSessionType = (session_type or get_session_type()).strip().lower()
@@ -66,15 +71,11 @@ class WaylandSharedProtocolsBackend:
and bool(self._protocols.has_runtime_support())
)
def activate(self, adapter: Any = None) -> None:
def activate(self, emit_signal: Any = None) -> None:
self.deactivate()
self._adapter = adapter
sessionType = None
getSessionType = getattr(adapter, "get_session_type", None)
if callable(getSessionType):
sessionType = getSessionType()
self._emitSignal = emit_signal
if adapter is None or not self.is_available(sessionType):
if emit_signal is None or not self.is_available():
return
displayClass = self._protocols.get_display_class()
@@ -96,10 +97,10 @@ class WaylandSharedProtocolsBackend:
except Exception:
self.deactivate()
def deactivate(self, adapter: Any = None) -> None:
self._activeWorkspace = None
def deactivate(self, emit_signal: Any = None) -> None:
self._workspaceStates = {}
self._batchDirty = False
self._transitionPending = False
self._safe_close_proxy(self._workspaceManager)
self._safe_close_proxy(self._registry)
self._workspaceManager = None
@@ -113,7 +114,7 @@ class WaylandSharedProtocolsBackend:
except Exception:
pass
self._display = None
self._adapter = None
self._emitSignal = None
def _bind_listener(self, target: Any, event_name: str, callback: Any) -> bool:
dispatcher = getattr(target, "dispatcher", None)
@@ -204,71 +205,39 @@ class WaylandSharedProtocolsBackend:
return
self._workspaceStates[workspaceHandle] = {"active": False}
self._bind_listener(workspaceHandle, "id", self._handle_workspace_id)
self._bind_listener(workspaceHandle, "state", self._handle_workspace_state)
self._bind_listener(workspaceHandle, "name", self._handle_workspace_name)
self._bind_listener(workspaceHandle, "coordinates", self._handle_workspace_coordinates)
self._bind_listener(workspaceHandle, "removed", self._handle_workspace_removed)
def _handle_workspace_name(self, workspaceHandle: Any, name: str, *_args: Any) -> None:
def _handle_workspace_id(self, workspaceHandle: Any, workspaceId: str, *_args: Any) -> None:
workspaceState = self._workspaceStates.setdefault(workspaceHandle, {"active": False})
workspaceState["name"] = (name or "").strip()
self._batchDirty = True
def _handle_workspace_coordinates(self, workspaceHandle: Any, coordinates: Sequence[Any], *_args: Any) -> None:
workspaceState = self._workspaceStates.setdefault(workspaceHandle, {"active": False})
workspaceState["coordinates"] = tuple(coordinates or ())
workspaceState["workspace_id"] = (workspaceId or "").strip()
self._batchDirty = True
def _handle_workspace_state(self, workspaceHandle: Any, stateValue: Any, value: Any = None, *_args: Any) -> None:
workspaceState = self._workspaceStates.setdefault(workspaceHandle, {"active": False})
workspaceState["active"] = self._workspace_is_active(stateValue, value)
self._batchDirty = True
self._emit_transition_started("workspace-state-update")
def _handle_workspace_removed(self, workspaceHandle: Any, *_args: Any) -> None:
self._workspaceStates.pop(workspaceHandle, None)
if workspaceHandle is self._activeWorkspace:
self._activeWorkspace = None
self._batchDirty = True
self._emit_transition_started("workspace-removed")
def _handle_workspace_manager_done(self, *_args: Any) -> None:
if not self._batchDirty:
return
newActiveWorkspace = self._find_active_workspace()
previousWorkspace = self._activeWorkspace
transitionReason = "wayland-workspace-batch-done"
transitionPayload = self._build_workspace_payload(
newActiveWorkspace,
previousWorkspace,
is_transition_pending=True,
)
finalPayload = self._build_workspace_payload(
newActiveWorkspace,
previousWorkspace,
is_transition_pending=False,
)
if previousWorkspace is not None and previousWorkspace is not newActiveWorkspace:
self._emit_adapter_event("DESKTOP_TRANSITION_STARTED", transitionReason, transitionPayload)
if previousWorkspace is not newActiveWorkspace or newActiveWorkspace is not None:
self._emit_adapter_event("WORKSPACE_STATE_CHANGED", transitionReason, finalPayload)
if previousWorkspace is not None and previousWorkspace is not newActiveWorkspace:
self._emit_adapter_event("DESKTOP_TRANSITION_FINISHED", transitionReason, finalPayload)
self._activeWorkspace = newActiveWorkspace
activeWorkspaceIds = self._active_workspace_ids()
self._emit_signal(WORKSPACE_STATE_CHANGED, activeWorkspaceIds, "workspace-batch-done")
self._emit_signal(DESKTOP_TRANSITION_FINISHED, activeWorkspaceIds, "workspace-batch-done")
self._batchDirty = False
self._transitionPending = False
def _handle_workspace_manager_finished(self, *_args: Any) -> None:
self._workspaceManager = None
def _find_active_workspace(self) -> Any:
for workspaceHandle, workspaceState in self._workspaceStates.items():
if workspaceState.get("active"):
return workspaceHandle
return None
def _workspace_is_active(self, stateValue: Any, value: Any = None) -> bool:
if isinstance(stateValue, str):
return stateValue.strip().lower() == "active" and bool(value)
@@ -281,45 +250,31 @@ class WaylandSharedProtocolsBackend:
return bool(stateValue)
def _build_workspace_payload(
self,
workspaceHandle: Any,
previousWorkspaceHandle: Any,
*,
is_transition_pending: bool,
) -> dict[str, Any]:
workspaceState = self._workspaceStates.get(workspaceHandle, {})
previousWorkspaceState = self._workspaceStates.get(previousWorkspaceHandle, {})
return {
"workspace_handle": workspaceHandle,
"workspace_token": self._workspace_token(workspaceHandle),
"workspace_name": str(workspaceState.get("name", "") or ""),
"workspace_coordinates": tuple(workspaceState.get("coordinates", ()) or ()),
"previous_workspace_handle": previousWorkspaceHandle,
"previous_workspace_token": self._workspace_token(previousWorkspaceHandle),
"previous_workspace_name": str(previousWorkspaceState.get("name", "") or ""),
"previous_workspace_coordinates": tuple(previousWorkspaceState.get("coordinates", ()) or ()),
"is_transition_pending": is_transition_pending,
}
def _active_workspace_ids(self) -> set[str]:
workspaceIds = set()
for workspaceHandle, workspaceState in self._workspaceStates.items():
if not workspaceState.get("active"):
continue
workspaceId = (workspaceState.get("workspace_id") or "").strip()
if not workspaceId:
workspaceId = self._workspace_id(workspaceHandle)
workspaceIds.add(workspaceId)
return workspaceIds
def _workspace_token(self, workspaceHandle: Any) -> str:
if workspaceHandle is None:
return ""
def _workspace_id(self, workspaceHandle: Any) -> str:
return f"workspace:{id(workspaceHandle)}"
def _emit_adapter_event(self, eventName: str, reason: str, payload: dict[str, Any]) -> None:
if self._adapter is None:
def _emit_transition_started(self, reason: str) -> None:
if self._transitionPending:
return
self._transitionPending = True
self._emit_signal(DESKTOP_TRANSITION_STARTED, self._active_workspace_ids(), reason)
handleWorkspaceEvent = getattr(self._adapter, "handle_workspace_event", None)
if callable(handleWorkspaceEvent):
handleWorkspaceEvent(eventName, reason=reason, payload=payload)
def _emit_signal(self, signalType: str, workspaceIds: set[str], reason: str) -> None:
if callable(self._emitSignal):
self._emitSignal(signalType, workspaceIds, reason)
return
legacyHandler = getattr(self._adapter, "_handle_workspace_signal", None)
if callable(legacyHandler):
legacyHandler(eventName, payload)
def _parse_registry_global_args(self, args: Sequence[Any]) -> tuple[int, str, int]:
if len(args) >= 4:
_, globalName, interfaceName, interfaceVersion = args[-4:]