Files
fenrir/src/fenrirscreenreader/core/fenrirManager.py
2025-07-07 00:42:23 -04:00

455 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import os
import signal
import sys
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core import settingsManager
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.i18n import _
class FenrirManager:
def __init__(self, cliArgs):
self.is_initialized = False
self.environment = None
self.signal_handlers_set = False
try:
self.environment = (
settingsManager.SettingsManager().init_fenrir_config(
cliArgs, self
)
)
if not self.environment:
raise RuntimeError(
"Cannot Initialize. Maybe the configfile is not available or not parseable"
)
self.environment["runtime"]["OutputManager"].present_text(
_("Start Fenrir"), sound_icon="ScreenReaderOn", interrupt=True
)
# Set signal handlers after successful initialization
signal.signal(signal.SIGINT, self.capture_signal)
signal.signal(signal.SIGTERM, self.capture_signal)
self.signal_handlers_set = True
self.is_initialized = True
self.modifierInput = False
self.singleKeyCommand = False
self.command = ""
self.set_process_name()
except Exception as e:
# Clean up any partial initialization
self.cleanup_on_error()
raise
def proceed(self):
if not self.is_initialized:
return
self.environment["runtime"]["EventManager"].start_main_event_loop()
self.shutdown()
def handle_input(self, event):
self.environment["runtime"]["DebugManager"].write_debug_out(
"DEBUG INPUT fenrirMan:" + str(event), debug.DebugLevel.INFO
)
if not event["data"]:
event["data"] = self.environment["runtime"][
"InputManager"
].get_input_event()
if event["data"]:
event["data"]["EventName"] = self.environment["runtime"][
"InputManager"
].convert_event_name(event["data"]["EventName"])
self.environment["runtime"]["InputManager"].handle_input_event(
event["data"]
)
else:
return
if self.environment["runtime"]["InputManager"].no_key_pressed():
self.environment["runtime"]["InputManager"].clear_last_deep_input()
if self.environment["runtime"]["ScreenManager"].is_ignored_screen():
self.environment["runtime"]["InputManager"].write_event_buffer()
else:
if self.environment["runtime"]["HelpManager"].is_tutorial_mode():
self.environment["runtime"][
"InputManager"
].clear_event_buffer()
self.environment["runtime"]["InputManager"].key_echo(
event["data"]
)
if self.environment["runtime"]["VmenuManager"].get_active():
self.environment["runtime"][
"InputManager"
].clear_event_buffer()
self.detect_shortcut_command()
if self.modifierInput:
self.environment["runtime"][
"InputManager"
].clear_event_buffer()
if self.singleKeyCommand:
if self.environment["runtime"][
"InputManager"
].no_key_pressed():
self.environment["runtime"][
"InputManager"
].clear_event_buffer()
else:
self.environment["runtime"][
"InputManager"
].write_event_buffer()
if self.environment["runtime"]["InputManager"].no_key_pressed():
self.modifierInput = False
self.singleKeyCommand = False
self.environment["runtime"]["InputManager"].write_event_buffer()
self.environment["runtime"]["InputManager"].handle_device_grab()
if self.environment["input"]["keyForeward"] > 0:
self.environment["input"]["keyForeward"] -= 1
self.environment["runtime"]["CommandManager"].execute_default_trigger(
"onKeyInput"
)
def handle_byte_input(self, event):
if not event["data"] or event["data"] == b"":
return
self.environment["runtime"]["ByteManager"].handle_byte_input(
event["data"]
)
self.environment["runtime"]["CommandManager"].execute_default_trigger(
"onByteInput"
)
def handle_execute_command(self, event):
if not event["data"] or event["data"] == "":
return
current_command = event["data"]
# special modes
if self.environment["runtime"]["HelpManager"].is_tutorial_mode():
if self.environment["runtime"]["CommandManager"].command_exists(
current_command, "help"
):
self.environment["runtime"]["CommandManager"].execute_command(
current_command, "help"
)
return
elif self.environment["runtime"]["VmenuManager"].get_active():
if self.environment["runtime"]["CommandManager"].command_exists(
current_command, "vmenu-navigation"
):
self.environment["runtime"]["CommandManager"].execute_command(
current_command, "vmenu-navigation"
)
return
# default
self.environment["runtime"]["CommandManager"].execute_command(
current_command, "commands"
)
def handle_remote_incomming(self, event):
if not event["data"]:
return
self.environment["runtime"]["RemoteManager"].handle_remote_incomming(
event["data"]
)
def handle_screen_change(self, event):
self.environment["runtime"]["ScreenManager"].handle_screen_change(
event["data"]
)
if self.environment["runtime"]["VmenuManager"].get_active():
return
self.environment["runtime"]["CommandManager"].execute_default_trigger(
"onScreenChanged"
)
self.environment["runtime"]["ScreenDriver"].get_curr_screen()
def handle_screen_update(self, event):
self.environment["runtime"]["ScreenManager"].handle_screen_update(
event["data"]
)
if (
time.time()
- self.environment["runtime"]["InputManager"].get_last_input_time()
>= 0.3
):
self.environment["runtime"]["InputManager"].clear_last_deep_input()
if (
self.environment["runtime"][
"CursorManager"
].is_cursor_vertical_move()
or self.environment["runtime"][
"CursorManager"
].is_cursor_horizontal_move()
):
self.environment["runtime"][
"CommandManager"
].execute_default_trigger("onCursorChange")
self.environment["runtime"]["CommandManager"].execute_default_trigger(
"onScreenUpdate"
)
self.environment["runtime"]["InputManager"].clear_last_deep_input()
def handle_plug_input_device(self, event):
try:
self.environment["runtime"][
"InputManager"
].set_last_detected_devices(event["data"])
except Exception as e:
self.environment["runtime"]["DebugManager"].write_debug_out(
"handle_plug_input_device: Error setting last detected devices: "
+ str(e),
debug.DebugLevel.ERROR,
)
self.environment["runtime"]["InputManager"].handle_plug_input_device(
event["data"]
)
self.environment["runtime"]["CommandManager"].execute_default_trigger(
"onPlugInputDevice", force=True
)
self.environment["runtime"]["InputManager"].set_last_detected_devices(
None
)
def handle_heart_beat(self, event):
self.environment["runtime"]["CommandManager"].execute_default_trigger(
"onHeartBeat", force=True
)
def detect_shortcut_command(self):
if self.environment["input"]["keyForeward"] > 0:
return
if len(self.environment["input"]["prevInput"]) > len(
self.environment["input"]["currInput"]
):
return
if self.environment["runtime"]["InputManager"].is_key_press():
self.modifierInput = self.environment["runtime"][
"InputManager"
].curr_key_is_modifier()
else:
if not self.environment["runtime"][
"InputManager"
].no_key_pressed():
if self.singleKeyCommand:
self.singleKeyCommand = (
len(self.environment["input"]["currInput"]) == 1
)
if not (
self.singleKeyCommand
and self.environment["runtime"]["InputManager"].no_key_pressed()
):
current_shortcut = self.environment["runtime"][
"InputManager"
].get_curr_shortcut()
self.command = self.environment["runtime"][
"InputManager"
].get_command_for_shortcut(current_shortcut)
if not self.modifierInput:
if self.environment["runtime"]["InputManager"].is_key_press():
if self.command != "":
self.singleKeyCommand = True
if not (self.singleKeyCommand or self.modifierInput):
return
# fire event
if self.command != "":
if self.modifierInput:
self.environment["runtime"]["EventManager"].put_to_event_queue(
FenrirEventType.execute_command, self.command
)
self.command = ""
else:
if self.singleKeyCommand:
self.environment["runtime"][
"EventManager"
].put_to_event_queue(
FenrirEventType.execute_command, self.command
)
self.command = ""
def set_process_name(self, name="fenrir"):
"""Attempts to set the process name to 'fenrir'."""
try:
from setproctitle import setproctitle
except ImportError:
pass
else:
setproctitle(name)
return True
try:
from ctypes import byref
from ctypes import cdll
from ctypes import create_string_buffer
libc = cdll.LoadLibrary("libc.so.6")
string_buffer = create_string_buffer(len(name) + 1)
string_buffer.value = bytes(name, "UTF-8")
libc.prctl(15, byref(string_buffer), 0, 0, 0)
return True
except Exception as e:
self.environment["runtime"]["DebugManager"].write_debug_out(
"setProcName: Error setting process name: " + str(e),
debug.DebugLevel.ERROR,
)
return False
def shutdown_request(self):
try:
self.environment["runtime"]["EventManager"].stop_main_event_loop()
except Exception as e:
self.environment["runtime"]["DebugManager"].write_debug_out(
"shutdown_request: Error stopping main event loop: " + str(e),
debug.DebugLevel.ERROR,
)
def capture_signal(self, sigInit, frame):
self.shutdown_request()
def shutdown(self):
self.environment["runtime"]["InputManager"].ungrab_all_devices()
self.environment["runtime"]["EventManager"].stop_main_event_loop()
self.environment["runtime"]["OutputManager"].present_text(
_("Quit Fenrir"), sound_icon="ScreenReaderOff", interrupt=True
)
self.environment["runtime"]["EventManager"].clean_event_queue()
time.sleep(0.6)
for currentManager in self.environment["general"]["managerList"]:
if self.environment["runtime"][currentManager]:
self.environment["runtime"][currentManager].shutdown()
del self.environment["runtime"][currentManager]
self.environment = None
def cleanup_on_error(self):
"""Clean up partially initialized state when initialization fails"""
try:
# Reset signal handlers to default if they were set
if self.signal_handlers_set:
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.signal_handlers_set = False
# Clean up any initialized managers
if self.environment:
try:
# Try to ungrab devices if input manager exists
if (
"runtime" in self.environment
and "InputManager" in self.environment["runtime"]
):
if self.environment["runtime"]["InputManager"]:
self.environment["runtime"][
"InputManager"
].ungrab_all_devices()
except Exception:
pass # Ignore errors during cleanup
try:
# Try to stop event manager if it exists
if (
"runtime" in self.environment
and "EventManager" in self.environment["runtime"]
):
if self.environment["runtime"]["EventManager"]:
self.environment["runtime"][
"EventManager"
].stop_main_event_loop()
except Exception:
pass # Ignore errors during cleanup
try:
# Try to clean up all managers
if (
"general" in self.environment
and "managerList" in self.environment["general"]
):
for currentManager in self.environment["general"][
"managerList"
]:
if (
"runtime" in self.environment
and currentManager
in self.environment["runtime"]
and self.environment["runtime"][currentManager]
):
try:
self.environment["runtime"][
currentManager
].shutdown()
del self.environment["runtime"][
currentManager
]
except Exception:
pass # Ignore errors during cleanup
except Exception:
pass # Ignore errors during cleanup
# Clean up socket files that might not be removed by the driver
try:
socket_file = None
if (
"runtime" in self.environment
and "SettingsManager" in self.environment["runtime"]
):
try:
socket_file = self.environment["runtime"][
"SettingsManager"
].get_setting("remote", "socket_file")
except Exception:
pass # Use default socket file path
if not socket_file:
# Use default socket file paths
socket_file = "/tmp/fenrirscreenreader-deamon.sock"
if os.path.exists(socket_file):
os.unlink(socket_file)
# Also try PID-based socket file
pid_socket_file = (
"/tmp/fenrirscreenreader-"
+ str(os.getpid())
+ ".sock"
)
if os.path.exists(pid_socket_file):
os.unlink(pid_socket_file)
elif os.path.exists(socket_file):
os.unlink(socket_file)
except Exception:
pass # Ignore errors during socket cleanup
self.environment = None
except Exception:
pass # Ignore all errors during error cleanup