Files
fenrir/src/fenrirscreenreader/inputDriver/x11Driver.py
T
2026-05-12 02:26:02 -04:00

659 lines
22 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import os
import sys
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.inputDriver import InputDriver as inputDriver
try:
from Xlib import X
from Xlib import XK
from Xlib import display
from Xlib.error import BadAccess
from Xlib.error import BadWindow
except Exception as x_error:
X = None
XK = None
display = None
BadAccess = Exception
BadWindow = Exception
_x_error = x_error
else:
_x_error = None
def build_keysym_name_map():
if XK is None:
return {}
names = {}
for attr_name in dir(XK):
if not attr_name.startswith("XK_"):
continue
value = getattr(XK, attr_name)
if not isinstance(value, int):
continue
names.setdefault(value, attr_name[3:])
return names
KEYSYM_NAME_MAP = build_keysym_name_map()
class X11DriverError(RuntimeError):
pass
class driver(inputDriver):
"""X11 terminal-scoped keyboard input driver."""
key_name_overrides = {
"BackSpace": "KEY_BACKSPACE",
"Return": "KEY_ENTER",
"ISO_Enter": "KEY_ENTER",
"Tab": "KEY_TAB",
"Escape": "KEY_ESC",
"space": "KEY_SPACE",
"minus": "KEY_MINUS",
"equal": "KEY_EQUAL",
"bracketleft": "KEY_LEFTBRACE",
"bracketright": "KEY_RIGHTBRACE",
"backslash": "KEY_BACKSLASH",
"semicolon": "KEY_SEMICOLON",
"apostrophe": "KEY_APOSTROPHE",
"grave": "KEY_GRAVE",
"comma": "KEY_COMMA",
"period": "KEY_DOT",
"slash": "KEY_SLASH",
"Shift_L": "KEY_LEFTSHIFT",
"Shift_R": "KEY_RIGHTSHIFT",
"Control_L": "KEY_LEFTCTRL",
"Control_R": "KEY_RIGHTCTRL",
"Alt_L": "KEY_LEFTALT",
"Alt_R": "KEY_RIGHTALT",
"Meta_L": "KEY_LEFTMETA",
"Meta_R": "KEY_RIGHTMETA",
"Super_L": "KEY_LEFTMETA",
"Super_R": "KEY_RIGHTMETA",
"Multi_key": "KEY_COMPOSE",
"Caps_Lock": "KEY_CAPSLOCK",
"Num_Lock": "KEY_NUMLOCK",
"Scroll_Lock": "KEY_SCROLLLOCK",
"Insert": "KEY_INSERT",
"Delete": "KEY_DELETE",
"Home": "KEY_HOME",
"End": "KEY_END",
"Prior": "KEY_PAGEUP",
"Next": "KEY_PAGEDOWN",
"Page_Up": "KEY_PAGEUP",
"Page_Down": "KEY_PAGEDOWN",
"Up": "KEY_UP",
"Down": "KEY_DOWN",
"Left": "KEY_LEFT",
"Right": "KEY_RIGHT",
"KP_0": "KEY_KP0",
"KP_Insert": "KEY_KP0",
"KP_1": "KEY_KP1",
"KP_End": "KEY_KP1",
"KP_2": "KEY_KP2",
"KP_Down": "KEY_KP2",
"KP_3": "KEY_KP3",
"KP_Page_Down": "KEY_KP3",
"KP_4": "KEY_KP4",
"KP_Left": "KEY_KP4",
"KP_5": "KEY_KP5",
"KP_Begin": "KEY_KP5",
"KP_6": "KEY_KP6",
"KP_Right": "KEY_KP6",
"KP_7": "KEY_KP7",
"KP_Home": "KEY_KP7",
"KP_8": "KEY_KP8",
"KP_Up": "KEY_KP8",
"KP_9": "KEY_KP9",
"KP_Page_Up": "KEY_KP9",
"KP_Decimal": "KEY_KPDOT",
"KP_Delete": "KEY_KPDOT",
"KP_Add": "KEY_KPPLUS",
"KP_Subtract": "KEY_KPMINUS",
"KP_Multiply": "KEY_KPASTERISK",
"KP_Divide": "KEY_KPSLASH",
"KP_Enter": "KEY_KPENTER",
"KP_Equal": "KEY_KPEQUAL",
}
modifier_masks = {
"KEY_SHIFT": X.ShiftMask if X else 1,
"KEY_LEFTSHIFT": X.ShiftMask if X else 1,
"KEY_RIGHTSHIFT": X.ShiftMask if X else 1,
"KEY_CTRL": X.ControlMask if X else 4,
"KEY_LEFTCTRL": X.ControlMask if X else 4,
"KEY_RIGHTCTRL": X.ControlMask if X else 4,
"KEY_ALT": X.Mod1Mask if X else 8,
"KEY_LEFTALT": X.Mod1Mask if X else 8,
"KEY_RIGHTALT": X.Mod1Mask if X else 8,
"KEY_META": X.Mod4Mask if X else 64,
"KEY_LEFTMETA": X.Mod4Mask if X else 64,
"KEY_RIGHTMETA": X.Mod4Mask if X else 64,
}
modifier_key_names = set(modifier_masks.keys())
def __init__(self):
inputDriver.__init__(self)
self.display = None
self.root = None
self.window = None
self.window_id = None
self.active = True
self.num_lock_mask = 0
self.grabbed = set()
self.grab_signature = None
self.interesting_keys = set()
self.fenrir_keys = set()
self.failed_grabs = 0
def initialize(self, environment):
self.env = environment
if display is None:
self.fail_startup("python-xlib is not available: " + str(_x_error))
self.display = display.Display()
self.root = self.display.screen().root
self.window_id = self.resolve_window_id()
if self.window_id is None:
self.fail_startup(
"No X11 target window found. Use --x11-window-id or launch from an X11 terminal."
)
self.write_debug(
"x11Driver target window "
+ self.format_window_id(self.window_id)
+ ", active window "
+ self.format_window_id(self.get_active_window_id())
+ ", WINDOWID="
+ os.environ.get("WINDOWID", ""),
debug.DebugLevel.INFO,
)
self.window = self.display.create_resource_object("window", self.window_id)
self.window.change_attributes(
event_mask=(
X.KeyPressMask
| X.KeyReleaseMask
| X.FocusChangeMask
| X.StructureNotifyMask
)
)
self.num_lock_mask = self.find_num_lock_mask()
self.refresh_interesting_keys()
self.refresh_grabs(force=True)
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
self.input_watchdog
)
self._initialized = True
def shutdown(self):
self.ungrab_all_devices()
try:
if self.display:
self.display.close()
except Exception:
pass
self._initialized = False
def fail_startup(self, message):
print("Fenrir X11 driver error: " + message, file=sys.stderr)
raise SystemExit(1)
def resolve_window_id(self):
configured_window = self.env["runtime"]["SettingsManager"].get_setting(
"keyboard", "x11_window_id"
)
for candidate in [configured_window, os.environ.get("WINDOWID", "")]:
window_id = self.parse_window_id(candidate)
if window_id is not None:
return window_id
return self.get_active_window_id()
def parse_window_id(self, value):
if value is None:
return None
value = str(value).strip()
if value == "":
return None
try:
return int(value, 0)
except ValueError:
raise X11DriverError("Invalid X11 window id: " + value)
def get_active_window_id(self):
atom = self.display.intern_atom("_NET_ACTIVE_WINDOW")
prop = self.root.get_full_property(atom, X.AnyPropertyType)
if prop is None or len(prop.value) == 0:
return None
return int(prop.value[0])
def format_window_id(self, window_id):
if window_id is None:
return "None"
return hex(int(window_id))
def find_num_lock_mask(self):
num_lock_keysym = XK.string_to_keysym("Num_Lock")
if not num_lock_keysym:
return 0
num_lock_keycode = self.display.keysym_to_keycode(num_lock_keysym)
if not num_lock_keycode:
return 0
modifier_map = self.display.get_modifier_mapping()
masks = [
X.ShiftMask,
X.LockMask,
X.ControlMask,
X.Mod1Mask,
X.Mod2Mask,
X.Mod3Mask,
X.Mod4Mask,
X.Mod5Mask,
]
for index, keycodes in enumerate(modifier_map):
if num_lock_keycode in keycodes:
return masks[index]
return 0
def input_watchdog(self, active, event_queue):
while active.value:
try:
self.refresh_grabs()
if not self.display.pending_events():
time.sleep(0.01)
continue
event = self.display.next_event()
self.handle_x_event(event, event_queue)
except BadWindow:
self.env["runtime"]["DebugManager"].write_debug_out(
"x11Driver target window disappeared",
debug.DebugLevel.ERROR,
)
break
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"x11Driver input watchdog error: " + str(e),
debug.DebugLevel.ERROR,
)
time.sleep(0.1)
def handle_x_event(self, event, event_queue):
event_type = getattr(event, "type", None)
if event_type == X.FocusIn:
self.active = True
self.clear_event_buffer()
return
if event_type == X.FocusOut:
self.active = False
self.reset_input_state()
return
if event_type not in [X.KeyPress, X.KeyRelease]:
return
if not self.active and not self.event_is_in_target_tree(event):
return
input_event = self.map_event(event)
if input_event is None:
return
key_name = input_event["event_name"]
if not self.should_emit_key(key_name):
return
self.write_debug(
"x11Driver key event "
+ key_name
+ " state "
+ str(input_event["event_state"])
+ " raw state "
+ str(getattr(event, "state", 0)),
debug.DebugLevel.INFO,
)
self.env["input"]["event_buffer"].append(input_event.copy())
event_queue.put(
{
"Type": FenrirEventType.keyboard_input,
"data": input_event,
}
)
def event_is_in_target_tree(self, event):
event_window = getattr(event, "event", None)
if event_window is None:
return False
window_id = getattr(event_window, "id", None)
if window_id is None:
return False
return window_id == self.window_id or self.window_has_ancestor(
window_id, self.window_id
)
def window_has_ancestor(self, window_id, ancestor_id):
try:
window = self.display.create_resource_object("window", window_id)
while window.id != self.root.id:
if window.id == ancestor_id:
return True
parent = window.query_tree().parent
if parent is None or parent.id == window.id:
return False
window = parent
except Exception:
return False
return False
def map_event(self, event):
key_name = self.keycode_to_key_name(event.detail, event.state)
if key_name is None:
return None
event_time = time.time()
return {
"event_name": key_name,
"event_value": event.detail,
"event_sec": int(event_time),
"event_usec": int((event_time % 1) * 1000000),
"event_state": 1 if event.type == X.KeyPress else 0,
"event_type": event.type,
}
def keycode_to_key_name(self, keycode, state=0):
key_names = []
for keysym_name in self.keycode_to_keysym_names(keycode):
key_name = self.keysym_name_to_key_name(keysym_name)
if key_name is None or key_name in key_names:
continue
key_names.append(key_name)
for key_name in key_names:
if key_name.startswith("KEY_KP"):
return key_name
if key_names:
return key_names[0]
return None
def keycode_to_keysym_names(self, keycode):
names = []
for column in range(8):
keysym = self.display.keycode_to_keysym(keycode, column)
keysym_name = self.keysym_to_name(keysym)
if keysym_name and keysym_name not in names:
names.append(keysym_name)
return names
def keysym_to_name(self, keysym):
keysym_name = XK.keysym_to_string(keysym)
if keysym_name:
return keysym_name
return KEYSYM_NAME_MAP.get(keysym)
def keysym_name_to_key_name(self, keysym_name):
if not keysym_name:
return None
if keysym_name in self.key_name_overrides:
return self.key_name_overrides[keysym_name]
if len(keysym_name) == 1:
if keysym_name.isalpha():
return "KEY_" + keysym_name.upper()
if keysym_name.isdigit():
return "KEY_" + keysym_name
if keysym_name.startswith("F") and keysym_name[1:].isdigit():
return "KEY_" + keysym_name
return None
def should_emit_key(self, key_name):
return key_name is not None
def refresh_interesting_keys(self):
input_manager = self.env["runtime"]["InputManager"]
self.fenrir_keys = set(self.env["input"]["fenrir_key"])
interesting = set(self.fenrir_keys)
interesting.update(self.env["input"]["script_key"])
interesting.update(self.modifier_key_names)
for shortcut in self.env.get("rawBindings", {}).values():
for key_name in shortcut[1]:
if key_name == "KEY_FENRIR":
interesting.update(self.fenrir_keys)
elif key_name == "KEY_SCRIPT":
interesting.update(self.env["input"]["script_key"])
else:
interesting.add(input_manager.convert_event_name(key_name))
interesting.add(key_name)
self.interesting_keys = interesting
def refresh_grabs(self, force=False):
signature = (
tuple(sorted(self.env.get("rawBindings", {}).keys())),
tuple(sorted(self.env["input"]["fenrir_key"])),
tuple(sorted(self.env["input"]["script_key"])),
)
if not force and signature == self.grab_signature:
return
self.ungrab_all_devices()
self.refresh_interesting_keys()
passive_grabs = self.build_passive_grabs()
failed_before = self.failed_grabs
for key_name, modifier_mask, include_num_lock in passive_grabs:
self.grab_key_name(key_name, modifier_mask, include_num_lock)
self.display.flush()
self.grab_signature = signature
self.write_debug(
"x11Driver passive grabs planned "
+ str(len(passive_grabs))
+ ", installed "
+ str(len(self.grabbed))
+ ", failed "
+ str(self.failed_grabs - failed_before),
debug.DebugLevel.INFO,
)
def build_passive_grabs(self):
grabs = set()
for fenrir_key in self.env["input"]["fenrir_key"]:
grabs.add((fenrir_key, 0, True))
for script_key in self.env["input"]["script_key"]:
grabs.add((script_key, 0, True))
grabs.add(("KEY_NUMLOCK", 0, True))
for shortcut in self.env.get("rawBindings", {}).values():
keys = shortcut[1]
expanded_keys = self.expand_special_keys(keys)
modifier_mask = self.shortcut_modifier_mask(expanded_keys)
non_modifier_keys = [
key for key in expanded_keys
if key not in self.modifier_key_names
]
if not non_modifier_keys:
continue
final_key = non_modifier_keys[-1]
if "KEY_FENRIR" in keys:
for fenrir_key in self.env["input"]["fenrir_key"]:
grabs.add((fenrir_key, modifier_mask, True))
fenrir_modifier_mask = self.modifier_masks.get(
fenrir_key, 0
)
if fenrir_modifier_mask:
grabs.add(
(
final_key,
modifier_mask | fenrir_modifier_mask,
not final_key.startswith("KEY_KP"),
)
)
elif "KEY_SCRIPT" in keys:
for script_key in self.env["input"]["script_key"]:
grabs.add((script_key, modifier_mask, True))
script_modifier_mask = self.modifier_masks.get(
script_key, 0
)
if script_modifier_mask:
grabs.add(
(
final_key,
modifier_mask | script_modifier_mask,
not final_key.startswith("KEY_KP"),
)
)
else:
grabs.add(
(
final_key,
modifier_mask,
not final_key.startswith("KEY_KP"),
)
)
return grabs
def expand_special_keys(self, keys):
expanded = []
for key_name in keys:
if key_name == "KEY_FENRIR":
expanded.extend(self.env["input"]["fenrir_key"])
elif key_name == "KEY_SCRIPT":
expanded.extend(self.env["input"]["script_key"])
else:
expanded.append(key_name)
return expanded
def shortcut_modifier_mask(self, keys):
modifier_mask = 0
for key_name in keys:
modifier_mask |= self.modifier_masks.get(key_name, 0)
return modifier_mask
def grab_key_name(
self, key_name, modifier_mask=0, include_num_lock=True
):
keysym_names = self.key_name_to_keysym_names(key_name)
for keysym_name in keysym_names:
keysym = XK.string_to_keysym(keysym_name)
if not keysym:
continue
keycode = self.display.keysym_to_keycode(keysym)
if not keycode:
continue
for effective_mask in self.optional_modifier_masks(
modifier_mask, include_num_lock
):
try:
self.window.grab_key(
keycode,
effective_mask,
False,
X.GrabModeAsync,
X.GrabModeAsync,
)
self.grabbed.add((keycode, effective_mask))
except BadAccess:
self.failed_grabs += 1
self.env["runtime"]["DebugManager"].write_debug_out(
"x11Driver could not grab "
+ key_name
+ " with modifier mask "
+ str(effective_mask),
debug.DebugLevel.WARNING,
)
def optional_modifier_masks(self, modifier_mask, include_num_lock=True):
optional_masks = [0, X.LockMask]
if include_num_lock and self.num_lock_mask:
optional_masks += [self.num_lock_mask, self.num_lock_mask | X.LockMask]
return {modifier_mask | optional for optional in optional_masks}
def key_name_to_keysym_names(self, key_name):
reverse_map = {
value: key for key, value in self.key_name_overrides.items()
}
aliases = {
"KEY_META": ["Super_L", "Super_R", "Meta_L", "Meta_R"],
"KEY_LEFTMETA": ["Super_L", "Meta_L"],
"KEY_RIGHTMETA": ["Super_R", "Meta_R"],
"KEY_COMPOSE": ["Multi_key"],
"KEY_PAGEUP": ["Page_Up", "Prior"],
"KEY_PAGEDOWN": ["Page_Down", "Next"],
"KEY_KP0": ["KP_0", "KP_Insert"],
"KEY_KP1": ["KP_1", "KP_End"],
"KEY_KP2": ["KP_2", "KP_Down"],
"KEY_KP3": ["KP_3", "KP_Page_Down"],
"KEY_KP4": ["KP_4", "KP_Left"],
"KEY_KP5": ["KP_5", "KP_Begin"],
"KEY_KP6": ["KP_6", "KP_Right"],
"KEY_KP7": ["KP_7", "KP_Home"],
"KEY_KP8": ["KP_8", "KP_Up"],
"KEY_KP9": ["KP_9", "KP_Page_Up"],
"KEY_KPDOT": ["KP_Decimal", "KP_Delete"],
}
if key_name in aliases:
return aliases[key_name]
if key_name in reverse_map:
return [reverse_map[key_name]]
if key_name.startswith("KEY_F") and key_name[5:].isdigit():
return [key_name[4:]]
if key_name.startswith("KEY_") and len(key_name) == 5:
return [key_name[4:].lower()]
if key_name.startswith("KEY_") and key_name[4:].isdigit():
return [key_name[4:]]
return []
def reset_input_state(self):
try:
self.env["runtime"]["InputManager"].reset_input_state()
except Exception:
self.clear_event_buffer()
def write_event_buffer(self):
self.clear_event_buffer()
def clear_event_buffer(self):
if not self._initialized:
return
del self.env["input"]["event_buffer"][:]
def write_debug(self, message, level):
try:
self.env["runtime"]["DebugManager"].write_debug_out(message, level)
except Exception:
pass
def update_input_devices(self, new_devices=None, init=False):
return
def grab_all_devices(self):
return True
def ungrab_all_devices(self):
if not self.display or not self.window:
return True
for keycode, modifier_mask in list(self.grabbed):
try:
self.window.ungrab_key(keycode, modifier_mask)
except Exception:
pass
self.grabbed.clear()
try:
self.display.flush()
except Exception:
pass
return True
def remove_all_devices(self):
self.ungrab_all_devices()
def get_led_state(self, led=0):
try:
pointer = self.root.query_pointer()
mask = getattr(pointer, "mask", 0)
if led == 0:
return bool(self.num_lock_mask and mask & self.num_lock_mask)
if led == 1:
return bool(mask & X.LockMask)
except Exception:
pass
return False
def set_led_state(self, led_dict):
return False