#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Fenrir TTY screen reader # By Chrys, Storm Dragon, and contributors. import os import signal import sys import time from fenrirscreenreader.core import debug from fenrirscreenreader.core import settingsManager from fenrirscreenreader.core.eventData import FenrirEventType from fenrirscreenreader.core.i18n import _ class FenrirManager: def __init__(self, cliArgs): self.is_initialized = False self.environment = None self.signal_handlers_set = False try: self.environment = ( settingsManager.SettingsManager().init_fenrir_config( cliArgs, self ) ) if not self.environment: raise RuntimeError( "Cannot Initialize. Maybe the configfile is not available or not parseable" ) self.environment["runtime"]["OutputManager"].present_text( _("Start Fenrir"), sound_icon="ScreenReaderOn", interrupt=True ) # Set signal handlers after successful initialization signal.signal(signal.SIGINT, self.capture_signal) signal.signal(signal.SIGTERM, self.capture_signal) self.signal_handlers_set = True self.is_initialized = True self.modifierInput = False self.singleKeyCommand = False self.command = "" self.set_process_name() except Exception as e: # Clean up any partial initialization self.cleanup_on_error() raise def proceed(self): if not self.is_initialized: return self.environment["runtime"]["EventManager"].start_main_event_loop() self.shutdown() def handle_input(self, event): self.environment["runtime"]["DebugManager"].write_debug_out( "DEBUG INPUT fenrirMan:" + str(event), debug.DebugLevel.INFO ) if not event["data"]: event["data"] = self.environment["runtime"][ "InputManager" ].get_input_event() if event["data"]: event["data"]["EventName"] = self.environment["runtime"][ "InputManager" ].convert_event_name(event["data"]["EventName"]) self.environment["runtime"]["InputManager"].handle_input_event( event["data"] ) else: return if self.environment["runtime"]["InputManager"].no_key_pressed(): self.environment["runtime"]["InputManager"].clear_last_deep_input() if self.environment["runtime"]["ScreenManager"].is_ignored_screen(): self.environment["runtime"]["InputManager"].write_event_buffer() else: if self.environment["runtime"]["HelpManager"].is_tutorial_mode(): self.environment["runtime"][ "InputManager" ].clear_event_buffer() self.environment["runtime"]["InputManager"].key_echo( event["data"] ) if self.environment["runtime"]["VmenuManager"].get_active(): self.environment["runtime"][ "InputManager" ].clear_event_buffer() self.detect_shortcut_command() if self.modifierInput: self.environment["runtime"][ "InputManager" ].clear_event_buffer() if self.singleKeyCommand: if self.environment["runtime"][ "InputManager" ].no_key_pressed(): self.environment["runtime"][ "InputManager" ].clear_event_buffer() else: self.environment["runtime"][ "InputManager" ].write_event_buffer() if self.environment["runtime"]["InputManager"].no_key_pressed(): self.modifierInput = False self.singleKeyCommand = False self.environment["runtime"]["InputManager"].write_event_buffer() self.environment["runtime"]["InputManager"].handle_device_grab() if self.environment["input"]["keyForeward"] > 0: self.environment["input"]["keyForeward"] -= 1 self.environment["runtime"]["CommandManager"].execute_default_trigger( "onKeyInput" ) def handle_byte_input(self, event): if not event["data"] or event["data"] == b"": return self.environment["runtime"]["ByteManager"].handle_byte_input( event["data"] ) self.environment["runtime"]["CommandManager"].execute_default_trigger( "onByteInput" ) def handle_execute_command(self, event): if not event["data"] or event["data"] == "": return current_command = event["data"] # special modes if self.environment["runtime"]["HelpManager"].is_tutorial_mode(): if self.environment["runtime"]["CommandManager"].command_exists( current_command, "help" ): self.environment["runtime"]["CommandManager"].execute_command( current_command, "help" ) return elif self.environment["runtime"]["VmenuManager"].get_active(): if self.environment["runtime"]["CommandManager"].command_exists( current_command, "vmenu-navigation" ): self.environment["runtime"]["CommandManager"].execute_command( current_command, "vmenu-navigation" ) return # default self.environment["runtime"]["CommandManager"].execute_command( current_command, "commands" ) def handle_remote_incomming(self, event): if not event["data"]: return self.environment["runtime"]["RemoteManager"].handle_remote_incomming( event["data"] ) def handle_screen_change(self, event): self.environment["runtime"]["ScreenManager"].handle_screen_change( event["data"] ) if self.environment["runtime"]["VmenuManager"].get_active(): return self.environment["runtime"]["CommandManager"].execute_default_trigger( "onScreenChanged" ) self.environment["runtime"]["ScreenDriver"].get_curr_screen() def handle_screen_update(self, event): self.environment["runtime"]["ScreenManager"].handle_screen_update( event["data"] ) if ( time.time() - self.environment["runtime"]["InputManager"].get_last_input_time() >= 0.3 ): self.environment["runtime"]["InputManager"].clear_last_deep_input() if ( self.environment["runtime"][ "CursorManager" ].is_cursor_vertical_move() or self.environment["runtime"][ "CursorManager" ].is_cursor_horizontal_move() ): self.environment["runtime"][ "CommandManager" ].execute_default_trigger("onCursorChange") self.environment["runtime"]["CommandManager"].execute_default_trigger( "onScreenUpdate" ) self.environment["runtime"]["InputManager"].clear_last_deep_input() def handle_plug_input_device(self, event): try: self.environment["runtime"][ "InputManager" ].set_last_detected_devices(event["data"]) except Exception as e: self.environment["runtime"]["DebugManager"].write_debug_out( "handle_plug_input_device: Error setting last detected devices: " + str(e), debug.DebugLevel.ERROR, ) self.environment["runtime"]["InputManager"].handle_plug_input_device( event["data"] ) self.environment["runtime"]["CommandManager"].execute_default_trigger( "onPlugInputDevice", force=True ) self.environment["runtime"]["InputManager"].set_last_detected_devices( None ) def handle_heart_beat(self, event): self.environment["runtime"]["CommandManager"].execute_default_trigger( "onHeartBeat", force=True ) def detect_shortcut_command(self): if self.environment["input"]["keyForeward"] > 0: return if len(self.environment["input"]["prevInput"]) > len( self.environment["input"]["currInput"] ): return if self.environment["runtime"]["InputManager"].is_key_press(): self.modifierInput = self.environment["runtime"][ "InputManager" ].curr_key_is_modifier() else: if not self.environment["runtime"][ "InputManager" ].no_key_pressed(): if self.singleKeyCommand: self.singleKeyCommand = ( len(self.environment["input"]["currInput"]) == 1 ) if not ( self.singleKeyCommand and self.environment["runtime"]["InputManager"].no_key_pressed() ): current_shortcut = self.environment["runtime"][ "InputManager" ].get_curr_shortcut() self.command = self.environment["runtime"][ "InputManager" ].get_command_for_shortcut(current_shortcut) if not self.modifierInput: if self.environment["runtime"]["InputManager"].is_key_press(): if self.command != "": self.singleKeyCommand = True if not (self.singleKeyCommand or self.modifierInput): return # fire event if self.command != "": if self.modifierInput: self.environment["runtime"]["EventManager"].put_to_event_queue( FenrirEventType.execute_command, self.command ) self.command = "" else: if self.singleKeyCommand: self.environment["runtime"][ "EventManager" ].put_to_event_queue( FenrirEventType.execute_command, self.command ) self.command = "" def set_process_name(self, name="fenrir"): """Attempts to set the process name to 'fenrir'.""" try: from setproctitle import setproctitle except ImportError: pass else: setproctitle(name) return True try: from ctypes import byref from ctypes import cdll from ctypes import create_string_buffer libc = cdll.LoadLibrary("libc.so.6") string_buffer = create_string_buffer(len(name) + 1) string_buffer.value = bytes(name, "UTF-8") libc.prctl(15, byref(string_buffer), 0, 0, 0) return True except Exception as e: self.environment["runtime"]["DebugManager"].write_debug_out( "setProcName: Error setting process name: " + str(e), debug.DebugLevel.ERROR, ) return False def shutdown_request(self): try: self.environment["runtime"]["EventManager"].stop_main_event_loop() except Exception as e: self.environment["runtime"]["DebugManager"].write_debug_out( "shutdown_request: Error stopping main event loop: " + str(e), debug.DebugLevel.ERROR, ) def capture_signal(self, sigInit, frame): self.shutdown_request() def shutdown(self): self.environment["runtime"]["InputManager"].ungrab_all_devices() self.environment["runtime"]["EventManager"].stop_main_event_loop() self.environment["runtime"]["OutputManager"].present_text( _("Quit Fenrir"), sound_icon="ScreenReaderOff", interrupt=True ) self.environment["runtime"]["EventManager"].clean_event_queue() time.sleep(0.6) for currentManager in self.environment["general"]["managerList"]: if self.environment["runtime"][currentManager]: self.environment["runtime"][currentManager].shutdown() del self.environment["runtime"][currentManager] self.environment = None def cleanup_on_error(self): """Clean up partially initialized state when initialization fails""" try: # Reset signal handlers to default if they were set if self.signal_handlers_set: signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) self.signal_handlers_set = False # Clean up any initialized managers if self.environment: try: # Try to ungrab devices if input manager exists if ( "runtime" in self.environment and "InputManager" in self.environment["runtime"] ): if self.environment["runtime"]["InputManager"]: self.environment["runtime"][ "InputManager" ].ungrab_all_devices() except Exception: pass # Ignore errors during cleanup try: # Try to stop event manager if it exists if ( "runtime" in self.environment and "EventManager" in self.environment["runtime"] ): if self.environment["runtime"]["EventManager"]: self.environment["runtime"][ "EventManager" ].stop_main_event_loop() except Exception: pass # Ignore errors during cleanup try: # Try to clean up all managers if ( "general" in self.environment and "managerList" in self.environment["general"] ): for currentManager in self.environment["general"][ "managerList" ]: if ( "runtime" in self.environment and currentManager in self.environment["runtime"] and self.environment["runtime"][currentManager] ): try: self.environment["runtime"][ currentManager ].shutdown() del self.environment["runtime"][ currentManager ] except Exception: pass # Ignore errors during cleanup except Exception: pass # Ignore errors during cleanup # Clean up socket files that might not be removed by the driver try: socket_file = None if ( "runtime" in self.environment and "SettingsManager" in self.environment["runtime"] ): try: socket_file = self.environment["runtime"][ "SettingsManager" ].get_setting("remote", "socket_file") except Exception: pass # Use default socket file path if not socket_file: # Use default socket file paths socket_file = "/tmp/fenrirscreenreader-deamon.sock" if os.path.exists(socket_file): os.unlink(socket_file) # Also try PID-based socket file pid_socket_file = ( "/tmp/fenrirscreenreader-" + str(os.getpid()) + ".sock" ) if os.path.exists(pid_socket_file): os.unlink(pid_socket_file) elif os.path.exists(socket_file): os.unlink(socket_file) except Exception: pass # Ignore errors during socket cleanup self.environment = None except Exception: pass # Ignore all errors during error cleanup