Improve socket handling for -x spawned fenrir instances.

This commit is contained in:
Storm Dragon
2026-05-07 23:24:54 -04:00
parent 0273f9b956
commit 8638bca1d5
53 changed files with 794 additions and 1072 deletions
+3 -18
View File
@@ -60,16 +60,6 @@ def create_argument_parser():
action='store_true',
help='Print debug messages to screen'
)
argumentParser.add_argument(
'-e', '--emulated-pty',
action='store_true',
help='Use PTY emulation with escape sequences for input (enables desktop/X/Wayland usage)'
)
argumentParser.add_argument(
'-E', '--emulated-evdev',
action='store_true',
help='Use PTY emulation with evdev for input (single instance)'
)
argumentParser.add_argument(
'-x', '--x11',
action='store_true',
@@ -102,13 +92,8 @@ def validate_arguments(cliArgs):
if option and ('#' not in option or '=' not in option):
return False, f"Invalid option format: {option}\nExpected format: SECTION#SETTING=VALUE"
emulated_modes = [
cliArgs.emulated_pty,
cliArgs.emulated_evdev,
cliArgs.x11,
]
if sum(bool(mode) for mode in emulated_modes) > 1:
return False, "Cannot combine --emulated-pty, --emulated-evdev, and --x11"
if cliArgs.x11_window_id and not cliArgs.x11:
return False, "--x11-window-id requires --x11"
return True, None
@@ -141,7 +126,7 @@ def run_fenrir():
def should_run_foreground(cliArgs):
return cliArgs.foreground or cliArgs.emulated_pty or cliArgs.x11
return cliArgs.foreground or cliArgs.x11
def main():
@@ -1,55 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return ""
def run(self):
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"keyboard", "interrupt_on_key_press"
):
return
if self.env["runtime"]["InputManager"].no_key_pressed():
return
if self.env["runtime"]["ScreenManager"].is_screen_change():
return
if len(self.env["input"]["curr_input"]) <= len(
self.env["input"]["prev_input"]
):
return
# if the filter is set
if (
self.env["runtime"]["SettingsManager"]
.get_setting("keyboard", "interrupt_on_key_press_filter")
.strip()
!= ""
):
filter_list = (
self.env["runtime"]["SettingsManager"]
.get_setting("keyboard", "interrupt_on_key_press_filter")
.split(",")
)
for curr_key in self.env["input"]["curr_input"]:
if curr_key not in filter_list:
return
self.env["runtime"]["OutputManager"].interrupt_output()
def set_callback(self, callback):
pass
@@ -1,41 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("disables speech until next keypress")
def run(self):
if not self.env["commandBuffer"]["enableSpeechOnKeypress"]:
return
self.env["runtime"]["SettingsManager"].set_setting(
"speech",
"enabled",
str(self.env["commandBuffer"]["enableSpeechOnKeypress"]),
)
self.env["commandBuffer"]["enableSpeechOnKeypress"] = False
# Also disable prompt watching since speech was manually re-enabled
if "silenceUntilPrompt" in self.env["commandBuffer"]:
self.env["commandBuffer"]["silenceUntilPrompt"] = False
self.env["runtime"]["OutputManager"].present_text(
_("speech enabled"), sound_icon="SpeechOn", interrupt=True
)
def set_callback(self, callback):
pass
@@ -42,19 +42,10 @@ class command:
)
if x_move > 3:
return
if self.env["runtime"]["InputManager"].get_shortcut_type() in ["KEY"]:
if self.env["runtime"][
"InputManager"
].get_last_deepest_input() in [["KEY_TAB"]]:
return
elif self.env["runtime"]["InputManager"].get_shortcut_type() in [
"BYTE"
]:
if self.env["runtime"]["ByteManager"].get_last_byte_key() in [
b" ",
b"\t",
]:
return
if self.env["runtime"][
"InputManager"
].get_last_deepest_input() in [["KEY_TAB"]]:
return
# detect deletion or chilling
if (
self.env["screen"]["new_cursor"]["x"]
@@ -81,11 +81,10 @@ class command:
if curr_char.isspace():
# Only announce spaces during pure navigation (arrow keys)
# Check if this is really navigation by looking at input history
if self.env["runtime"]["InputManager"].get_shortcut_type() in [
"KEY"
] and self.env["runtime"]["InputManager"].get_last_deepest_input()[
0
] in [
last_input = self.env["runtime"][
"InputManager"
].get_last_deepest_input()
if last_input and last_input[0] in [
"KEY_LEFT",
"KEY_RIGHT",
"KEY_UP",
@@ -33,19 +33,10 @@ class command:
current_time = time.time()
tab_detected = False
# Check KEY mode
if self.env["runtime"]["InputManager"].get_shortcut_type() in ["KEY"]:
if (self.env["runtime"]["InputManager"].get_last_deepest_input()
in [["KEY_TAB"]]):
tab_detected = True
self.env["commandBuffer"]["tabCompletion"]["lastTabTime"] = current_time
# Check BYTE mode
elif self.env["runtime"]["InputManager"].get_shortcut_type() in ["BYTE"]:
for currByte in self.env["runtime"]["ByteManager"].get_last_byte_key():
if currByte == 9: # Tab character
tab_detected = True
self.env["commandBuffer"]["tabCompletion"]["lastTabTime"] = current_time
if (self.env["runtime"]["InputManager"].get_last_deepest_input()
in [["KEY_TAB"]]):
tab_detected = True
self.env["commandBuffer"]["tabCompletion"]["lastTabTime"] = current_time
# Check if tab was pressed recently (200ms window)
if not tab_detected:
@@ -5,11 +5,16 @@
# By Chrys, Storm Dragon, and contributors.
import datetime
import os
import tempfile
import time
from fenrirscreenreader.core.i18n import _
ANNOUNCEMENT_LOCK_TIMEOUT_SEC = 5.0
class command:
def __init__(self):
pass
@@ -26,6 +31,59 @@ class command:
def get_description(self):
return "No Description found"
def _get_announcement_lock_path(self):
return os.path.join(
tempfile.gettempdir(),
f"fenrirscreenreader-{os.getuid()}-time-announcement.lock",
)
def _try_create_announcement_lock(self, announcement_slot, now):
lock_path = self._get_announcement_lock_path()
try:
lock_fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
except FileExistsError:
return False
with os.fdopen(lock_fd, "w", encoding="utf-8") as lock_file:
lock_file.write(f"{os.getpid()} {announcement_slot} {now}\n")
return True
def _read_announcement_lock_slot(self, lock_path):
with open(lock_path, "r", encoding="utf-8") as lock_file:
lock_content = lock_file.readline().strip().split()
if len(lock_content) < 2:
return ""
return lock_content[1]
def _claim_announcement_lock(self, announcement_slot):
now = time.time()
if self._try_create_announcement_lock(announcement_slot, now):
return True
lock_path = self._get_announcement_lock_path()
try:
lock_slot = self._read_announcement_lock_slot(lock_path)
lock_stat = os.stat(lock_path)
except FileNotFoundError:
return self._try_create_announcement_lock(announcement_slot, now)
except OSError:
return False
if lock_slot == announcement_slot:
return False
if not lock_slot and now - lock_stat.st_mtime < ANNOUNCEMENT_LOCK_TIMEOUT_SEC:
return False
try:
os.unlink(lock_path)
except FileNotFoundError:
pass
except OSError:
return False
return self._try_create_announcement_lock(announcement_slot, now)
def run(self):
if not self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"time", "enabled"
@@ -50,6 +108,7 @@ class command:
if delay_sec > 0:
if int((now - self.last_time).total_seconds()) < delay_sec:
return
announcement_slot = f"delay:{int(now.timestamp()) // delay_sec}"
else:
# should announce?
if not str(now.minute).zfill(2) in on_minutes:
@@ -58,6 +117,7 @@ class command:
if now.hour == self.last_time.hour:
if now.minute == self.last_time.minute:
return
announcement_slot = f"minute:{datetime.datetime.strftime(now, '%Y%m%d%H%M')}"
date_format = self.env["runtime"]["SettingsManager"].get_setting(
"general", "date_format"
@@ -78,6 +138,10 @@ class command:
if not (present_date or present_time):
return
if not self._claim_announcement_lock(announcement_slot):
self.last_time = now
return
time_format = self.env["runtime"]["SettingsManager"].get_setting(
"general", "time_format"
)
@@ -41,20 +41,11 @@ class command:
== self.env["runtime"]["ScreenManager"].get_rows() - 1
):
return
if self.env["runtime"]["InputManager"].get_shortcut_type() in ["KEY"]:
if not (
self.env["runtime"]["InputManager"].get_last_deepest_input()
in [["KEY_UP"], ["KEY_DOWN"]]
):
return
elif self.env["runtime"]["InputManager"].get_shortcut_type() in [
"BYTE"
]:
if not (
self.env["runtime"]["ByteManager"].get_last_byte_key()
in [b"^[[A", b"^[[B"]
):
return
if not (
self.env["runtime"]["InputManager"].get_last_deepest_input()
in [["KEY_UP"], ["KEY_DOWN"]]
):
return
# Get the current cursor's line from both old and new content
prev_line = self.env["screen"]["old_content_text"].split("\n")[
@@ -37,7 +37,7 @@ class command(config_command):
self.present_text(f"Current screen driver: {current_description}")
# Cycle through the available drivers
drivers = ["vcsaDriver", "ptyDriver", "dummyDriver", "debugDriver"]
drivers = ["vcsaDriver", "ptyDriver", "dummyDriver"]
try:
current_index = drivers.index(current_driver)
next_index = (current_index + 1) % len(drivers)
@@ -29,12 +29,7 @@ class command:
self.env["runtime"]["OutputManager"].present_text(
"Okay, loading the information about Nano.", interrupt=True
)
if self.env["runtime"]["InputManager"].get_shortcut_type() in ["KEY"]:
self.env["runtime"]["InputManager"].send_keys(self.key_macro)
elif self.env["runtime"]["InputManager"].get_shortcut_type() in [
"BYTE"
]:
self.env["runtime"]["ByteManager"].send_bytes(self.byteMakro)
self.env["runtime"]["InputManager"].send_keys(self.key_macro)
def set_callback(self, callback):
pass
@@ -29,12 +29,7 @@ class command:
self.env["runtime"]["OutputManager"].present_text(
"Okay, you will now be asked to save your work.", interrupt=True
)
if self.env["runtime"]["InputManager"].get_shortcut_type() in ["KEY"]:
self.env["runtime"]["InputManager"].send_keys(self.key_macro)
elif self.env["runtime"]["InputManager"].get_shortcut_type() in [
"BYTE"
]:
self.env["runtime"]["ByteManager"].send_bytes(self.byteMakro)
self.env["runtime"]["InputManager"].send_keys(self.key_macro)
def set_callback(self, callback):
pass
@@ -18,7 +18,6 @@ class command:
# self.key_macro = [[1,'KEY_LEFTCTRL'],[1,'KEY_O'],[0.05,'SLEEP'],[0,'KEY_O'],[0,'KEY_LEFTCTRL']]
# self.key_macro = [[1,'KEY_LEFTSHIFT'],[1,'KEY_LEFTCTRL'],[1,'KEY_N'],[0.05,'SLEEP'],[0,'KEY_N'],[0,'KEY_LEFTCTRL'],[0,'KEY_LEFTSHIFT']]
self.key_macro = []
self.byteMakro = []
def shutdown(self):
pass
@@ -27,12 +26,7 @@ class command:
return "No description found"
def run(self):
if self.env["runtime"]["InputManager"].get_shortcut_type() in ["KEY"]:
self.env["runtime"]["InputManager"].send_keys(self.key_macro)
elif self.env["runtime"]["InputManager"].get_shortcut_type() in [
"BYTE"
]:
self.env["runtime"]["ByteManager"].send_bytes(self.byteMakro)
self.env["runtime"]["InputManager"].send_keys(self.key_macro)
def set_callback(self, callback):
pass
-224
View File
@@ -1,224 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import inspect
import os
import re
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.i18n import _
currentdir = os.path.dirname(
os.path.realpath(os.path.abspath(inspect.getfile(inspect.currentframe())))
)
fenrir_path = os.path.dirname(currentdir)
class ByteManager:
def __init__(self):
self.switchCtrlModeOnce = 0
self.controlMode = True
self.repeat = 1
self.lastInputTime = time.time()
self.lastByteKey = b""
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def unify_escape_seq(self, escapeSequence):
converted_escape_sequence = escapeSequence
if converted_escape_sequence[0] == 27:
converted_escape_sequence = b"^[" + converted_escape_sequence[1:]
if len(converted_escape_sequence) > 1:
if (
converted_escape_sequence[0] == 94
and converted_escape_sequence[1] == 91
):
converted_escape_sequence = (
b"^[" + converted_escape_sequence[2:]
)
return converted_escape_sequence
def handle_byte_stream(self, event_data, sep=b"\x1b"):
buffer = event_data
# handle prefix
end_index = buffer.find(sep)
if end_index > 0:
curr_sequence = buffer[:end_index]
buffer = buffer[end_index:]
self.handle_single_byte_sequence(curr_sequence)
# special handlig for none found (performance)
elif end_index == -1:
self.handle_single_byte_sequence(buffer)
return
# handle outstanding sequence
while buffer != b"":
end_index = buffer[len(sep) :].find(sep)
if end_index == -1:
curr_sequence = buffer
buffer = b""
else:
curr_sequence = buffer[: end_index + len(sep)]
buffer = buffer[end_index + len(sep) :]
self.handle_single_byte_sequence(curr_sequence)
def handle_byte_input(self, event_data):
if not event_data:
return
if event_data == b"":
return
try:
self.env["runtime"]["DebugManager"].write_debug_out(
"handle_byte_input " + event_data.decode("utf8"),
debug.DebugLevel.INFO,
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"ByteManager handle_byte_input: Error decoding byte data: "
+ str(e),
debug.DebugLevel.ERROR,
)
self.handle_byte_stream(event_data)
def handle_single_byte_sequence(self, event_data):
converted_escape_sequence = self.unify_escape_seq(event_data)
if self.switchCtrlModeOnce > 0:
self.switchCtrlModeOnce -= 1
is_control_mode = False
if (
self.controlMode
and not self.switchCtrlModeOnce == 1
or not self.controlMode
):
is_control_mode = self.handle_control_mode(event_data)
is_command = False
if (
self.controlMode
and not self.switchCtrlModeOnce == 1
or not self.controlMode
and self.switchCtrlModeOnce == 1
):
if self.lastByteKey == converted_escape_sequence:
if time.time() - self.lastInputTime <= self.env["runtime"][
"SettingsManager"
].get_setting_as_float("keyboard", "double_tap_timeout"):
self.repeat += 1
shortcut_data = b""
for i in range(self.repeat):
shortcut_data = shortcut_data + converted_escape_sequence
is_command = self.detect_byte_command(shortcut_data)
# fall back to single stroke - do we want this?
if not is_command:
is_command = self.detect_byte_command(
converted_escape_sequence
)
self.repeat = 1
if not (is_command or is_control_mode):
self.env["runtime"]["ScreenManager"].inject_text_to_screen(
event_data
)
if not is_command:
self.repeat = 1
self.lastByteKey = converted_escape_sequence
self.lastInputTime = time.time()
def get_last_byte_key(self):
return self.lastByteKey
def handle_control_mode(self, escapeSequence):
converted_escape_sequence = self.unify_escape_seq(escapeSequence)
if converted_escape_sequence == b"^[R":
self.controlMode = not self.controlMode
self.switchCtrlModeOnce = 0
if self.controlMode:
self.env["runtime"]["OutputManager"].present_text(
_("Sticky Mode On"),
sound_icon="Accept",
interrupt=True,
flush=True,
)
else:
self.env["runtime"]["OutputManager"].present_text(
_("Sticky Mode On"),
sound_icon="Cancel",
interrupt=True,
flush=True,
)
return True
if converted_escape_sequence == b"^[:":
self.switchCtrlModeOnce = 2
self.env["runtime"]["OutputManager"].present_text(
_("bypass"), sound_icon="PTYBypass", interrupt=True, flush=True
)
return True
return False
def send_bytes(self, byteMacro):
pass
def detect_byte_command(self, escapeSequence):
converted_escape_sequence = self.unify_escape_seq(escapeSequence)
command = self.env["runtime"]["InputManager"].get_command_for_shortcut(
converted_escape_sequence
)
if command != "":
self.env["runtime"]["EventManager"].put_to_event_queue(
FenrirEventType.execute_command, command
)
command = ""
return True
return False
def load_byte_shortcuts(
self, kb_config_path=fenrir_path + "/../../config/keyboard/pty.conf"
):
kb_config = open(kb_config_path, "r")
while True:
line = kb_config.readline()
if not line:
break
line = line.replace("\n", "")
if line.replace(" ", "") == "":
continue
if line.replace(" ", "").startswith("#"):
continue
if line.count("=") != 1:
continue
values = line.split("=")
clean_shortcut = bytes(values[0], "UTF-8")
repeat = 1
if len(clean_shortcut) > 2:
if chr(clean_shortcut[1]) == ",":
try:
repeat = int(chr(clean_shortcut[0]))
clean_shortcut = clean_shortcut[2:]
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"ByteManager load_byte_shortcuts: Error parsing repeat count: "
+ str(e),
debug.DebugLevel.ERROR,
)
repeat = 1
clean_shortcut = clean_shortcut
shortcut = b""
for i in range(repeat):
shortcut += clean_shortcut
command_name = values[1].upper()
self.env["bindings"][shortcut] = command_name
self.env["runtime"]["DebugManager"].write_debug_out(
"Byte Shortcut: " + str(shortcut) + " command:" + command_name,
debug.DebugLevel.INFO,
on_any_level=True,
)
kb_config.close()
+1 -2
View File
@@ -18,8 +18,7 @@ class FenrirEventType(Enum):
screen_changed = 5
heart_beat = 6
execute_command = 7
byte_input = 8
remote_incomming = 9
remote_incomming = 8
def __int__(self):
return self.value
@@ -61,8 +61,6 @@ class EventManager:
self.env["runtime"]["FenrirManager"].handle_heart_beat(event)
elif event["Type"] == FenrirEventType.execute_command:
self.env["runtime"]["FenrirManager"].handle_execute_command(event)
elif event["Type"] == FenrirEventType.byte_input:
self.env["runtime"]["FenrirManager"].handle_byte_input(event)
elif event["Type"] == FenrirEventType.remote_incomming:
self.env["runtime"]["FenrirManager"].handle_remote_incomming(event)
+13 -15
View File
@@ -10,6 +10,7 @@ import sys
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core import remoteInstanceRegistry
from fenrirscreenreader.core import settingsManager
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.i18n import _
@@ -132,16 +133,6 @@ class FenrirManager:
"onKeyInput"
)
def handle_byte_input(self, event):
if not event["data"] or event["data"] == b"":
return
self.environment["runtime"]["ByteManager"].handle_byte_input(
event["data"]
)
self.environment["runtime"]["CommandManager"].execute_default_trigger(
"onByteInput"
)
def handle_execute_command(self, event):
if not event["data"] or event["data"] == "":
return
@@ -456,6 +447,7 @@ class FenrirManager:
# Clean up socket files that might not be removed by the driver
try:
socket_file = None
screen_driver = None
if (
"runtime" in self.environment
and "SettingsManager" in self.environment["runtime"]
@@ -466,14 +458,19 @@ class FenrirManager:
].get_setting("remote", "socket_file")
except Exception:
pass # Use default socket file path
try:
screen_driver = self.environment["runtime"][
"SettingsManager"
].get_setting("screen", "driver")
except Exception:
pass
if not socket_file:
# Use default socket file paths
socket_file = "/tmp/fenrirscreenreader-deamon.sock"
if os.path.exists(socket_file):
os.unlink(socket_file)
if screen_driver == "vcsaDriver":
socket_file = "/tmp/fenrirscreenreader-deamon.sock"
if os.path.exists(socket_file):
os.unlink(socket_file)
# Also try PID-based socket file
pid_socket_file = (
"/tmp/fenrirscreenreader-"
+ str(os.getpid())
@@ -483,6 +480,7 @@ class FenrirManager:
os.unlink(pid_socket_file)
elif os.path.exists(socket_file):
os.unlink(socket_file)
remoteInstanceRegistry.remove_instance()
except Exception:
pass # Ignore errors during socket cleanup
@@ -14,7 +14,6 @@ general_data = {
"managerList": [
"AttributeManager",
"PunctuationManager",
"ByteManager",
"CursorManager",
"ApplicationManager",
"CommandManager",
@@ -40,7 +39,6 @@ general_data = {
"commandFolderList": [
"commands",
"onKeyInput",
"onByteInput",
"onCursorChange",
"onScreenUpdate",
"onScreenChanged",
@@ -15,7 +15,6 @@ class InputDriver:
def initialize(self, environment):
self.env = environment
self.env["runtime"]["InputManager"].set_shortcut_type("KEY")
self._is_initialized = True
def shutdown(self):
@@ -21,17 +21,9 @@ fenrir_path = os.path.dirname(currentdir)
class InputManager:
def __init__(self):
self.shortcutType = "KEY"
self.executeDeviceGrab = False
self.lastDetectedDevices = None
def set_shortcut_type(self, shortcutType="KEY"):
if shortcutType in ["KEY", "BYTE"]:
self.shortcutType = shortcutType
def get_shortcut_type(self):
return self.shortcutType
def initialize(self, environment):
self.env = environment
self.env["runtime"]["SettingsManager"].load_driver(
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import json
import os
import tempfile
import time
INSTANCE_TIMEOUT_SEC = 30.0
def get_registry_dir():
return os.path.join(
tempfile.gettempdir(),
f"fenrirscreenreader-instances-{os.getuid()}",
)
def get_instance_file(pid=None):
if pid is None:
pid = os.getpid()
return os.path.join(get_registry_dir(), f"{pid}.json")
def write_instance(instance_data):
registry_dir = get_registry_dir()
os.makedirs(registry_dir, mode=0o700, exist_ok=True)
os.chmod(registry_dir, 0o700)
instance_data["updated_at"] = time.time()
instance_path = get_instance_file(instance_data.get("pid"))
with open(instance_path, "w", encoding="utf-8") as instance_file:
json.dump(instance_data, instance_file, sort_keys=True)
instance_file.write("\n")
def remove_instance(pid=None):
try:
os.unlink(get_instance_file(pid))
except FileNotFoundError:
pass
def process_exists(pid):
try:
os.kill(pid, 0)
return True
except OSError:
return False
def list_instances():
registry_dir = get_registry_dir()
instances = []
try:
instance_files = os.listdir(registry_dir)
except FileNotFoundError:
return instances
now = time.time()
for instance_name in instance_files:
instance_path = os.path.join(registry_dir, instance_name)
try:
with open(instance_path, "r", encoding="utf-8") as instance_file:
instance_data = json.load(instance_file)
pid = int(instance_data.get("pid", 0))
updated_at = float(instance_data.get("updated_at", 0))
except (OSError, ValueError, TypeError, json.JSONDecodeError):
continue
if not pid or not process_exists(pid) or now - updated_at > INSTANCE_TIMEOUT_SEC:
try:
os.unlink(instance_path)
except OSError:
pass
continue
instances.append(instance_data)
return sorted(instances, key=lambda instance: int(instance.get("pid", 0)))
@@ -26,14 +26,21 @@ command interrupt
"""
import hashlib
import os
import tempfile
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core import remoteInstanceRegistry
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.i18n import _
REMOTE_COMMAND_LOCK_TIMEOUT_SEC = 2.0
REMOTE_COMMAND_LOCK_PREFIX = f"fenrirscreenreader-{os.getuid()}-remote-"
class RemoteManager:
def __init__(self):
# command controll
@@ -48,6 +55,8 @@ class RemoteManager:
self.resetWindowConst = "RESETWINDOW"
self.setClipboardConst = "CLIPBOARD "
self.exportClipboardConst = "EXPORTCLIPBOARD"
self.listInstancesConst = "LS"
self.listInstancesLongConst = "LIST"
# setting controll
self.settingConst = "SETTING "
self.setSettingConst = "SET "
@@ -208,6 +217,14 @@ class RemoteManager:
"success": True,
"message": "Clipboard exported to file",
}
elif upper_command_text in (
self.listInstancesConst,
self.listInstancesLongConst,
):
return {
"success": True,
"message": self.list_instances(),
}
else:
return {
"success": False,
@@ -257,6 +274,102 @@ class RemoteManager:
self.set_clipboard(parameter_text)
elif upper_command_text.startswith(self.exportClipboardConst):
self.export_clipboard()
elif upper_command_text in (
self.listInstancesConst,
self.listInstancesLongConst,
):
return
def _get_remote_command_lock_path(self, event_data):
event_hash = hashlib.sha256(event_data.encode("utf-8")).hexdigest()
return os.path.join(
tempfile.gettempdir(),
f"{REMOTE_COMMAND_LOCK_PREFIX}{event_hash}.lock",
)
def _cleanup_stale_remote_command_locks(self, now):
try:
lock_files = os.listdir(tempfile.gettempdir())
except OSError:
return
for lock_file in lock_files:
if not lock_file.startswith(REMOTE_COMMAND_LOCK_PREFIX):
continue
lock_path = os.path.join(tempfile.gettempdir(), lock_file)
try:
lock_age = now - os.stat(lock_path).st_mtime
if lock_age > REMOTE_COMMAND_LOCK_TIMEOUT_SEC:
os.unlink(lock_path)
except OSError:
pass
def _try_create_remote_command_lock(self, lock_path, now):
try:
lock_fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
except FileExistsError:
return False
with os.fdopen(lock_fd, "w", encoding="utf-8") as lock_file:
lock_file.write(f"{os.getpid()} {now}\n")
return True
def _claim_remote_command(self, event_data):
lock_path = self._get_remote_command_lock_path(event_data)
now = time.time()
self._cleanup_stale_remote_command_locks(now)
if self._try_create_remote_command_lock(lock_path, now):
return True
try:
with open(lock_path, "r", encoding="utf-8") as lock_file:
lock_parts = lock_file.readline().strip().split()
lock_pid = int(lock_parts[0]) if lock_parts else 0
lock_stat = os.stat(lock_path)
except FileNotFoundError:
return self._try_create_remote_command_lock(lock_path, now)
except (OSError, ValueError):
return False
if lock_pid == os.getpid():
return True
if now - lock_stat.st_mtime < REMOTE_COMMAND_LOCK_TIMEOUT_SEC:
return False
try:
os.unlink(lock_path)
except FileNotFoundError:
pass
except OSError:
return False
return self._try_create_remote_command_lock(lock_path, now)
def list_instances(self):
instances = remoteInstanceRegistry.list_instances()
if not instances:
return "No Fenrir instances registered"
lines = []
for instance in instances:
socket_files = ", ".join(instance.get("socket_files", []))
x11_window_id = instance.get("x11_window_id") or "none"
main_socket = "yes" if instance.get("main_socket") else "no"
lines.append(
"pid={pid} ppid={ppid} screen={screen} keyboard={keyboard} "
"main_socket={main_socket} x11_window_id={x11_window_id} "
"sockets={sockets}".format(
pid=instance.get("pid", ""),
ppid=instance.get("ppid", ""),
screen=instance.get("screen_driver", ""),
keyboard=instance.get("keyboard_driver", ""),
main_socket=main_socket,
x11_window_id=x11_window_id,
sockets=socket_files,
)
)
return "\n".join(lines)
def temp_disable_speech(self):
self.env["runtime"]["OutputManager"].temp_disable_speech()
@@ -381,6 +494,14 @@ class RemoteManager:
)
try:
if upper_event_data in (
self.listInstancesConst,
self.listInstancesLongConst,
):
return {
"success": True,
"message": self.list_instances(),
}
if upper_event_data.startswith(self.settingConst):
settings_text = event_data[len(self.settingConst) :]
return self.handle_settings_change_with_response(settings_text)
@@ -406,6 +527,9 @@ class RemoteManager:
debug.DebugLevel.INFO,
)
if not self._claim_remote_command(event_data):
return
if upper_event_data.startswith(self.settingConst):
settings_text = event_data[len(self.settingConst) :]
self.handle_settings_change(settings_text)
+27 -89
View File
@@ -11,7 +11,6 @@ from configparser import ConfigParser
from fenrirscreenreader.core import applicationManager
from fenrirscreenreader.core import attributeManager
from fenrirscreenreader.core import barrierManager
from fenrirscreenreader.core import byteManager
from fenrirscreenreader.core import commandManager
from fenrirscreenreader.core import cursorManager
from fenrirscreenreader.core import debug
@@ -410,9 +409,7 @@ class SettingsManager:
if setting == "driver":
valid_drivers = [
"evdevDriver",
"ptyDriver",
"x11Driver",
"atspiDriver",
"dummyDriver",
]
if value not in valid_drivers:
@@ -496,18 +493,6 @@ class SettingsManager:
if cliArgs.print:
self.set_setting("general", "debug_level", 3)
self.set_setting("general", "debug_mode", "PRINT")
if cliArgs.emulated_pty:
# Set PTY driver settings
pty_settings = {
"screen": {"driver": "ptyDriver"},
"keyboard": {"driver": "ptyDriver", "keyboard_layout": "pty"}
}
for section, settings in pty_settings.items():
for key, value in settings.items():
self.set_setting(section, key, value)
if cliArgs.emulated_evdev:
self.set_setting("screen", "driver", "ptyDriver")
self.set_setting("keyboard", "driver", "evdevDriver")
if cliArgs.x11:
self.set_setting("screen", "driver", "ptyDriver")
self.set_setting("keyboard", "driver", "x11Driver")
@@ -631,9 +616,6 @@ class SettingsManager:
environment["runtime"]["OutputManager"] = outputManager.OutputManager()
environment["runtime"]["OutputManager"].initialize(environment)
environment["runtime"]["ByteManager"] = byteManager.ByteManager()
environment["runtime"]["ByteManager"].initialize(environment)
environment["runtime"]["InputManager"] = inputManager.InputManager()
environment["runtime"]["InputManager"].initialize(environment)
@@ -656,89 +638,45 @@ class SettingsManager:
] = diffReviewManager.DiffReviewManager()
environment["runtime"]["DiffReviewManager"].initialize(environment)
if environment["runtime"]["InputManager"].get_shortcut_type() == "KEY":
if not os.path.exists(
self.get_setting("keyboard", "keyboard_layout")
if not os.path.exists(
self.get_setting("keyboard", "keyboard_layout")
):
if os.path.exists(
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
):
if os.path.exists(
self.set_setting(
"keyboard",
"keyboard_layout",
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
):
self.set_setting(
"keyboard",
"keyboard_layout",
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout"),
)
environment["runtime"]["InputManager"].load_shortcuts(
self.get_setting("keyboard", "keyboard_layout")
)
if os.path.exists(
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
+ ".conf"
):
self.set_setting(
"keyboard",
"keyboard_layout",
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
+ ".conf",
)
environment["runtime"]["InputManager"].load_shortcuts(
self.get_setting("keyboard", "keyboard_layout")
)
else:
+ self.get_setting("keyboard", "keyboard_layout"),
)
environment["runtime"]["InputManager"].load_shortcuts(
self.get_setting("keyboard", "keyboard_layout")
)
elif (
environment["runtime"]["InputManager"].get_shortcut_type()
== "BYTE"
):
if not os.path.exists(
self.get_setting("keyboard", "keyboard_layout")
if os.path.exists(
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
+ ".conf"
):
if os.path.exists(
self.set_setting(
"keyboard",
"keyboard_layout",
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
):
self.set_setting(
"keyboard",
"keyboard_layout",
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout"),
)
environment["runtime"]["ByteManager"].load_byte_shortcuts(
self.get_setting("keyboard", "keyboard_layout")
)
if os.path.exists(
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
+ ".conf"
):
self.set_setting(
"keyboard",
"keyboard_layout",
settings_root
+ "keyboard/"
+ self.get_setting("keyboard", "keyboard_layout")
+ ".conf",
)
environment["runtime"]["ByteManager"].load_byte_shortcuts(
self.get_setting("keyboard", "keyboard_layout")
)
else:
environment["runtime"]["ByteManager"].load_byte_shortcuts(
+ ".conf",
)
environment["runtime"]["InputManager"].load_shortcuts(
self.get_setting("keyboard", "keyboard_layout")
)
else:
environment["runtime"]["InputManager"].load_shortcuts(
self.get_setting("keyboard", "keyboard_layout")
)
environment["runtime"]["CursorManager"] = cursorManager.CursorManager()
environment["runtime"]["CursorManager"].initialize(environment)
+2 -6
View File
@@ -33,9 +33,7 @@ class VmenuManager:
self.env = environment
# use default path
self.defaultVMenuPath = (
fenrir_path
+ "/commands/vmenu-profiles/"
+ self.env["runtime"]["InputManager"].get_shortcut_type()
fenrir_path + "/commands/vmenu-profiles/KEY"
)
# if there is no user configuration
if (
@@ -49,9 +47,7 @@ class VmenuManager:
].get_setting("menu", "vmenu_path")
if not self.defaultVMenuPath.endswith("/"):
self.defaultVMenuPath += "/"
self.defaultVMenuPath += self.env["runtime"][
"InputManager"
].get_shortcut_type()
self.defaultVMenuPath += "KEY"
self.create_menu_tree()
self.closeAfterAction = False
@@ -16,7 +16,6 @@ class driver(inputDriver):
def initialize(self, environment):
self.env = environment
self.env["runtime"]["InputManager"].set_shortcut_type("KEY")
self._initialized = True
print("Input Debug Driver: Initialized")
@@ -87,7 +87,6 @@ class driver(inputDriver):
if libraries are not available.
"""
self.env = environment
self.env["runtime"]["InputManager"].set_shortcut_type("KEY")
global _evdevAvailable
global _udevAvailable
global _evdevAvailableError
@@ -1,215 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.inputDriver import InputDriver as inputDriver
class driver(inputDriver):
"""PTY (Pseudo-terminal) input driver for Fenrir screen reader.
This driver provides input handling for terminal emulation environments
where direct device access (evdev) is not available or appropriate.
It uses byte-based input processing instead of key event processing.
This is primarily used when running Fenrir in terminal emulators,
desktop environments, or other contexts where traditional TTY device
access is not available.
Features:
- Byte-based input processing
- Terminal emulation compatibility
- Simplified input handling for non-TTY environments
"""
def __init__(self):
self._is_initialized = False
inputDriver.__init__(self)
def initialize(self, environment):
"""Initialize the PTY input driver.
Sets the input manager to use byte-based shortcuts instead of
key-based shortcuts, enabling proper operation in terminal
emulation environments.
Args:
environment: Fenrir environment dictionary
Returns:
bool: True if initialization successful, False otherwise
"""
try:
if environment is None:
raise ValueError("Environment cannot be None")
self.env = environment
# Validate required managers are available
if "runtime" not in self.env:
raise ValueError("Runtime environment missing")
if "InputManager" not in self.env["runtime"]:
raise ValueError("InputManager not available")
self.env["runtime"]["InputManager"].set_shortcut_type("BYTE")
self._is_initialized = True
self.env["runtime"]["DebugManager"].write_debug_out(
"PTY inputDriver: Initialized with byte-based shortcuts",
debug.DebugLevel.INFO
)
return True
except Exception as e:
# Log error if possible, otherwise fallback to print
try:
if hasattr(self, 'env') and self.env and "runtime" in self.env:
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY inputDriver: Initialization failed: {e}",
debug.DebugLevel.ERROR
)
else:
print(f"PTY inputDriver initialization error: {e}")
except:
print(f"PTY inputDriver initialization error: {e}")
self._is_initialized = False
return False
def shutdown(self):
"""Shutdown the PTY input driver.
Performs cleanup operations when the driver is being stopped.
For PTY driver, this involves cleaning up any resources and
logging the shutdown.
"""
if not self._is_initialized:
return
try:
self.env["runtime"]["DebugManager"].write_debug_out(
"PTY inputDriver: Shutting down",
debug.DebugLevel.INFO
)
except Exception as e:
# Fallback logging if debug manager is unavailable
print(f"PTY inputDriver shutdown error: {e}")
finally:
self._is_initialized = False
def get_input_event(self):
"""Get input event from PTY.
For PTY driver, input events are handled through the byte-based
shortcut system rather than direct device events. This method
returns None as PTY input is processed through the screen driver
and InputManager's byte processing.
Returns:
None: PTY driver uses byte-based processing, not event-based
"""
return None
def is_device_connected(self):
"""Check if PTY input device is connected.
For PTY driver, the "device" is the terminal interface itself,
which is considered connected if the driver is initialized.
Returns:
bool: True if driver is initialized, False otherwise
"""
return self._is_initialized
def get_device_name(self):
"""Get the name of the PTY input device.
Returns:
str: Human-readable name of the PTY input device
"""
return "PTY (Pseudo-terminal) Input"
def grab_devices(self, grab=True):
"""Grab or release input devices.
For PTY driver, device grabbing is not applicable since input
is processed through terminal emulation rather than direct
device access.
Args:
grab (bool): Whether to grab (True) or release (False) devices
Returns:
bool: Always returns True for PTY driver (no-op success)
"""
if not self._is_initialized:
return False
action = "grab" if grab else "release"
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY inputDriver: {action} devices (no-op for PTY)",
debug.DebugLevel.INFO
)
return True
def has_device_detection(self):
"""Check if driver supports device detection.
PTY driver does not support dynamic device detection since
it operates on the terminal interface directly.
Returns:
bool: Always False for PTY driver
"""
return False
def get_device_list(self):
"""Get list of available input devices.
For PTY driver, there is only one logical device - the terminal
interface itself.
Returns:
list: Single-item list containing PTY device info
"""
if not self._is_initialized:
return []
return [{
'name': 'PTY Terminal',
'path': '/dev/pts/*',
'type': 'terminal',
'connected': True
}]
def get_led_state(self, led_mask=None):
"""Get LED state information.
PTY driver cannot access LED states since it operates through
terminal emulation rather than direct hardware access.
Args:
led_mask: LED mask parameter (ignored for PTY)
Returns:
dict: Empty dict (no LED access for PTY)
"""
return {}
def set_led_state(self, led_dict):
"""Set LED states.
PTY driver cannot control LEDs since it operates through
terminal emulation rather than direct hardware access.
Args:
led_dict (dict): LED state dictionary (ignored for PTY)
Returns:
bool: Always False (LED control not supported)
"""
return False
@@ -160,7 +160,6 @@ class driver(inputDriver):
def initialize(self, environment):
self.env = environment
self.env["runtime"]["InputManager"].set_shortcut_type("KEY")
if display is None:
self.fail_startup("python-xlib is not available: " + str(_x_error))
self.display = display.Display()
+148 -52
View File
@@ -8,15 +8,22 @@ import os
import os.path
import select
import socket
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core import remoteInstanceRegistry
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.remoteDriver import RemoteDriver as remoteDriver
MAIN_SOCKET_FILE = "/tmp/fenrirscreenreader-deamon.sock"
class driver(remoteDriver):
def __init__(self):
remoteDriver.__init__(self)
self.fenrirSocks = []
self.socket_files = []
def initialize(self, environment):
self.env = environment
@@ -26,9 +33,7 @@ class driver(remoteDriver):
self.watch_dog, multiprocess=False
)
def watch_dog(self, active, event_queue):
# echo "command say this is a test" | socat -
# UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
def _get_configured_socket_file(self):
socket_file = ""
try:
socket_file = self.env["runtime"]["SettingsManager"].get_setting(
@@ -40,62 +45,153 @@ class driver(remoteDriver):
+ str(e),
debug.DebugLevel.ERROR,
)
if socket_file == "":
if (
self.env["runtime"]["SettingsManager"].get_setting(
"screen", "driver"
)
== "vcsaDriver"
):
socket_file = "/tmp/fenrirscreenreader-deamon.sock"
else:
socket_file = (
"/tmp/fenrirscreenreader-" + str(os.getppid()) + ".sock"
)
return socket_file
def _get_socket_candidates(self):
configured_socket_file = self._get_configured_socket_file()
if configured_socket_file:
return [(configured_socket_file, False)]
screen_driver = self.env["runtime"]["SettingsManager"].get_setting(
"screen", "driver"
)
if screen_driver == "vcsaDriver":
return [(MAIN_SOCKET_FILE, False)]
private_socket_file = (
"/tmp/fenrirscreenreader-" + str(os.getpid()) + ".sock"
)
return [(private_socket_file, False), (MAIN_SOCKET_FILE, True)]
def _is_socket_active(self, socket_file):
test_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
test_sock.settimeout(0.2)
test_sock.connect(socket_file)
return True
except OSError:
return False
finally:
test_sock.close()
def _bind_socket(self, socket_file, optional):
if os.path.exists(socket_file):
if optional and self._is_socket_active(socket_file):
return None
os.unlink(socket_file)
self.fenrirSock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.fenrirSock.bind(socket_file)
os.chmod(socket_file, 0o666) # Allow all users to read/write
self.fenrirSock.listen(1)
fenrir_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
fenrir_sock.bind(socket_file)
os.chmod(socket_file, 0o666) # Allow all users to read/write
fenrir_sock.listen(1)
except OSError:
fenrir_sock.close()
if optional:
return None
raise
return fenrir_sock
def _register_instance(self):
settings_manager = self.env["runtime"]["SettingsManager"]
instance_data = {
"pid": os.getpid(),
"ppid": os.getppid(),
"socket_files": self.socket_files,
"main_socket": MAIN_SOCKET_FILE in self.socket_files,
"screen_driver": settings_manager.get_setting("screen", "driver"),
"keyboard_driver": settings_manager.get_setting("keyboard", "driver"),
"x11_window_id": settings_manager.get_setting(
"keyboard", "x11_window_id"
),
"created_at": time.time(),
}
remoteInstanceRegistry.write_instance(instance_data)
def _cleanup(self):
for fenrir_sock in self.fenrirSocks:
try:
fenrir_sock.close()
except OSError:
pass
self.fenrirSocks = []
for socket_file in self.socket_files:
try:
if os.path.exists(socket_file):
os.unlink(socket_file)
except OSError:
pass
self.socket_files = []
remoteInstanceRegistry.remove_instance()
def _handle_client(self, client_sock, event_queue):
try:
rawdata = client_sock.recv(8129)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"unixDriver watch_dog: Error receiving data from client: "
+ str(e),
debug.DebugLevel.ERROR,
)
rawdata = b""
try:
data = rawdata.decode("utf-8").rstrip().lstrip()
upper_data = data.upper()
if upper_data in ("LS", "LIST", "COMMAND LS", "COMMAND LIST"):
response = self.env["runtime"][
"RemoteManager"
].handle_remote_incomming_with_response(data)
client_sock.sendall((response["message"] + "\n").encode("utf-8"))
return
event_queue.put(
{
"Type": FenrirEventType.remote_incomming,
"data": data,
}
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"unixDriver watch_dog: Error decoding/queuing data: "
+ str(e),
debug.DebugLevel.ERROR,
)
def watch_dog(self, active, event_queue):
# echo "command say this is a test" | socat -
# UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
for socket_file, optional in self._get_socket_candidates():
fenrir_sock = self._bind_socket(socket_file, optional)
if fenrir_sock is None:
continue
self.fenrirSocks.append(fenrir_sock)
self.socket_files.append(socket_file)
if not self.fenrirSocks:
return
self._register_instance()
last_register = time.time()
while active.value:
if time.time() - last_register > 10.0:
self._register_instance()
last_register = time.time()
# Check if the client is still connected and if data is available:
try:
r, _, _ = select.select([self.fenrirSock], [], [], 0.8)
r, _, _ = select.select(self.fenrirSocks, [], [], 0.8)
except select.error:
break
if r == []:
continue
if self.fenrirSock in r:
client_sock, client_addr = self.fenrirSock.accept()
for fenrir_sock in r:
client_sock, client_addr = fenrir_sock.accept()
# Ensure client socket is always closed to prevent resource
# leaks
try:
try:
rawdata = client_sock.recv(8129)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"unixDriver watch_dog: Error receiving data from "
"client: "
+ str(e),
debug.DebugLevel.ERROR,
)
rawdata = b"" # Set default empty data if recv fails
try:
data = rawdata.decode("utf-8").rstrip().lstrip()
event_queue.put(
{
"Type": FenrirEventType.remote_incomming,
"data": data,
}
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"unixDriver watch_dog: Error decoding/queuing data: "
+ str(e),
debug.DebugLevel.ERROR,
)
self._handle_client(client_sock, event_queue)
finally:
# Always close client socket, even if data processing fails
try:
@@ -106,8 +202,8 @@ class driver(remoteDriver):
+ str(e),
debug.DebugLevel.ERROR,
)
if self.fenrirSock:
self.fenrirSock.close()
self.fenrirSock = None
if os.path.exists(socket_file):
os.unlink(socket_file)
self._cleanup()
def shutdown(self):
self._cleanup()
remoteDriver.shutdown(self)
@@ -254,9 +254,6 @@ class driver(screenDriver):
# Load configurable timeouts from settings
self._load_pty_settings()
self.shortcutType = self.env["runtime"][
"InputManager"
].get_shortcut_type()
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
self.terminal_emulation
)
@@ -403,31 +400,23 @@ class driver(screenDriver):
}
)
break
if self.shortcutType == "KEY":
try:
self.inject_text_to_screen(msg_bytes)
except Exception as e:
self.env["runtime"][
"DebugManager"
].write_debug_out(
"ptyDriver getInputData: Error injecting text to screen: "
+ str(e),
debug.DebugLevel.ERROR,
)
event_queue.put(
{
"Type": FenrirEventType.stop_main_loop,
"data": None,
}
)
break
else:
try:
self.inject_text_to_screen(msg_bytes)
except Exception as e:
self.env["runtime"][
"DebugManager"
].write_debug_out(
"ptyDriver getInputData: Error injecting text to screen: "
+ str(e),
debug.DebugLevel.ERROR,
)
event_queue.put(
{
"Type": FenrirEventType.byte_input,
"data": msg_bytes,
"Type": FenrirEventType.stop_main_loop,
"data": None,
}
)
break
# output
if self.p_out in r:
try: