#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Fenrir TTY screen reader # By Chrys, Storm Dragon, and contributors. import fcntl import getpass import os import pty import shlex import signal import struct import sys import termios import threading import time import tty from select import select import pyte from fenrirscreenreader.core import debug from fenrirscreenreader.core.eventData import FenrirEventType from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver from fenrirscreenreader.utils import screen_utils # PTY Driver Constants class PTYConstants: # Timeouts (in seconds) DEFAULT_READ_TIMEOUT = 0.3 INPUT_READ_TIMEOUT = 0.01 OUTPUT_READ_TIMEOUT = 0.05 # Faster than default but allows for network lag SELECT_TIMEOUT = 0.05 PROCESS_TERMINATION_TIMEOUT = 3.0 PROCESS_KILL_DELAY = 0.5 # Polling intervals (in seconds) MIN_POLL_INTERVAL = 0.001 # Limits MAX_TERMINAL_LINES = 10000 DEFAULT_READ_BUFFER_SIZE = 65536 INPUT_BUFFER_SIZE = 4096 # Error codes IO_ERROR_ERRNO = 5 class FenrirScreen(pyte.Screen): def set_margins(self, *args, **kwargs): kwargs.pop("private", None) super(FenrirScreen, self).set_margins(*args, **kwargs) class Terminal: def __init__(self, columns, lines, p_in, env=None): self.text = "" self.attributes = None self.screen = FenrirScreen(columns, lines) self.env = env # Environment for proper logging # Pre-create default attribute template to avoid repeated allocation self._default_attribute = [ "default", "default", False, False, False, False, False, False, "default", "default" ] self.screen.write_process_input = lambda data: p_in.write( data.encode() ) self.stream = pyte.ByteStream() self.stream.attach(self.screen) def _log_error(self, message, level=None): """Log error message using proper debug manager if available.""" if self.env and "runtime" in self.env and "DebugManager" in self.env["runtime"]: try: log_level = level if level else debug.DebugLevel.ERROR self.env["runtime"]["DebugManager"].write_debug_out( f"PTY Terminal: {message}", log_level ) return except Exception: pass # Fallback to print if debug manager fails # Fallback logging when debug manager unavailable print(f"PTY Terminal: {message}") def feed(self, data): self.stream.feed(data) def update_attributes(self, initialize=False): buffer = self.screen.buffer lines = None if not initialize: lines = self.screen.dirty else: lines = range(self.screen.lines) try: self.attributes = [ [ list(attribute[1:]) + [False, "default", "default"] if len(attribute) > 1 else [False, "default", "default"] for attribute in line.values() ] for line in buffer.values() ] except Exception as e: self._log_error(f"Error initializing attributes: {e}") # Fallback to empty attributes self.attributes = [[] for _ in range(self.screen.lines)] for y in lines: # Validate y is within reasonable bounds (prevent memory exhaustion) if y >= PTYConstants.MAX_TERMINAL_LINES: self._log_error( f"Line index {y} exceeds maximum {PTYConstants.MAX_TERMINAL_LINES}, " f"skipping attribute update", debug.DebugLevel.WARNING ) continue # Check if line y exists in buffer before accessing it if y not in buffer: # Only log this occasionally to prevent spam if y % 10 == 0: # Log every 10th missing line # Pre-format string to avoid repeated f-string operations line_range = f"{y}-{y+9}" self._log_error( f"Lines {line_range} not found in buffer, skipping attribute updates", debug.DebugLevel.WARNING ) continue # Ensure attributes array is large enough for line y while len(self.attributes) <= y: self.attributes.append([]) try: self.attributes[y] = [ list(attribute[1:]) + [False, "default", "default"] for attribute in (buffer[y].values()) ] except Exception as e: self._log_error(f"Error updating attributes for line {y}: {e}") # Initialize with empty attributes if update fails self.attributes[y] = [] if len(self.attributes[y]) < self.screen.columns: diff = self.screen.columns - len(self.attributes[y]) # Use pre-created template for efficiency self.attributes[y] += [self._default_attribute[:] for _ in range(diff)] def resize(self, lines, columns): self.screen.resize(lines, columns) self.set_cursor() self.update_attributes(True) def set_cursor(self, x=-1, y=-1): # Determine target cursor position x_pos = x if x != -1 else self.screen.cursor.x y_pos = y if y != -1 else self.screen.cursor.y # Validate and clamp cursor position to screen bounds max_x = max(0, self.screen.columns - 1) max_y = max(0, self.screen.lines - 1) self.screen.cursor.x = max(0, min(x_pos, max_x)) self.screen.cursor.y = max(0, min(y_pos, max_y)) def get_screen_content(self): cursor = self.screen.cursor # Only regenerate text if screen is dirty or text doesn't exist if not hasattr(self, 'text') or self.screen.dirty: self.text = "\n".join(self.screen.display) self.update_attributes(self.attributes is None) self.screen.dirty.clear() # Return screen content without unnecessary copying # Only copy attributes if they exist and need protection screen_data = { "cursor": (cursor.x, cursor.y), "lines": self.screen.lines, "columns": self.screen.columns, "text": self.text, "attributes": self.attributes[:] if self.attributes else [], # Shallow copy only if needed "screen": "pty", "screenUpdateTime": time.time(), } return screen_data class driver(screenDriver): def __init__(self): screenDriver.__init__(self) self.signalPipe = os.pipe() self.p_out = None self.terminal = None self.p_pid = -1 self.terminal_lock = threading.Lock() # Synchronize terminal operations signal.signal(signal.SIGWINCH, self.handle_sigwinch) # Runtime configuration storage self.pty_config = {} def _load_pty_settings(self): """Load PTY-specific settings from configuration with fallbacks to defaults.""" try: settings_manager = self.env["runtime"]["SettingsManager"] # Load timeout settings with defaults self.pty_config = { 'input_timeout': float(settings_manager.get_setting( 'screen', 'ptyInputTimeout', PTYConstants.INPUT_READ_TIMEOUT )), 'output_timeout': float(settings_manager.get_setting( 'screen', 'ptyOutputTimeout', PTYConstants.OUTPUT_READ_TIMEOUT )), 'select_timeout': float(settings_manager.get_setting( 'screen', 'ptySelectTimeout', PTYConstants.SELECT_TIMEOUT )), 'process_termination_timeout': float(settings_manager.get_setting( 'screen', 'ptyProcessTimeout', PTYConstants.PROCESS_TERMINATION_TIMEOUT )), 'poll_interval': float(settings_manager.get_setting( 'screen', 'ptyPollInterval', PTYConstants.MIN_POLL_INTERVAL )) } self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver: Loaded configuration: {self.pty_config}", debug.DebugLevel.INFO ) except Exception as e: # Fallback to constants if settings fail self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver: Failed to load settings, using defaults: {e}", debug.DebugLevel.WARNING ) self.pty_config = { 'input_timeout': PTYConstants.INPUT_READ_TIMEOUT, 'output_timeout': PTYConstants.OUTPUT_READ_TIMEOUT, 'select_timeout': PTYConstants.SELECT_TIMEOUT, 'process_termination_timeout': PTYConstants.PROCESS_TERMINATION_TIMEOUT, 'poll_interval': PTYConstants.MIN_POLL_INTERVAL } def initialize(self, environment): self.env = environment self.command = self.env["runtime"]["SettingsManager"].get_setting( "general", "shell" ) # Load configurable timeouts from settings self._load_pty_settings() self.shortcutType = self.env["runtime"][ "InputManager" ].get_shortcut_type() self.env["runtime"]["ProcessManager"].add_custom_event_thread( self.terminal_emulation ) def get_curr_screen(self): self.env["screen"]["oldTTY"] = "pty" self.env["screen"]["newTTY"] = "pty" def inject_text_to_screen(self, msg_bytes, screen=None): if not screen: screen = self.p_out.fileno() if isinstance(msg_bytes, str): msg_bytes = bytes(msg_bytes, "UTF-8") os.write(screen, msg_bytes) def get_session_information(self): self.env["screen"]["autoIgnoreScreens"] = [] self.env["general"]["prev_user"] = getpass.getuser() self.env["general"]["curr_user"] = getpass.getuser() def read_all(self, fd, timeout=PTYConstants.DEFAULT_READ_TIMEOUT, interruptFd=None, len=PTYConstants.DEFAULT_READ_BUFFER_SIZE): """Read all available data from file descriptor with efficient polling. Uses progressively longer wait times to balance responsiveness with CPU usage. """ msg_bytes = b"" fd_list = [fd] if interruptFd: fd_list.append(interruptFd) starttime = time.time() poll_timeout = self.pty_config.get('poll_interval', PTYConstants.MIN_POLL_INTERVAL) # Use configured interval while True: # Use consistent short polling for responsiveness r = screen_utils.has_more_what(fd_list, poll_timeout) # Nothing more to read if fd not in r: # Check overall timeout if (time.time() - starttime) >= timeout: break continue try: data = os.read(fd, len) if data == b"": raise EOFError msg_bytes += data except OSError as e: self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver read_all: OS error reading from fd {fd}: {e}", debug.DebugLevel.ERROR ) # For I/O errors, exit immediately to prevent endless retry loops if e.errno == PTYConstants.IO_ERROR_ERRNO: # Input/output error self.env["runtime"]["DebugManager"].write_debug_out( "PTY screenDriver: Terminal connection lost, stopping read loop", debug.DebugLevel.ERROR ) raise EOFError("Terminal connection lost") break # Exit on interrupt available if interruptFd and interruptFd in r: break # Check overall timeout if (time.time() - starttime) >= timeout: break return msg_bytes def open_terminal(self, columns, lines, command): p_pid, master_fd = pty.fork() if p_pid == 0: # Child. argv = shlex.split(command) env = os.environ.copy() # values are VT100,xterm-256color,linux try: if env["TERM"] == "": env["TERM"] = "linux" except Exception as e: # Child process doesn't have access to debug manager # Use fallback logging with more context print( f"ptyDriver open_terminal (child): TERM environment error: {e}" ) env["TERM"] = "linux" os.execvpe(argv[0], argv, env) # File-like object for I/O with the child process aka command. p_out = os.fdopen(master_fd, "w+b", 0) return Terminal(columns, lines, p_out, self.env), p_pid, p_out def resize_terminal(self, fd): s = struct.pack("HHHH", 0, 0, 0, 0) s = fcntl.ioctl(0, termios.TIOCGWINSZ, s) fcntl.ioctl(fd, termios.TIOCSWINSZ, s) lines, columns, _, _ = struct.unpack("hhhh", s) return lines, columns def get_terminal_size(self, fd): s = struct.pack("HHHH", 0, 0, 0, 0) lines, columns, _, _ = struct.unpack( "HHHH", fcntl.ioctl(fd, termios.TIOCGWINSZ, s) ) return lines, columns def handle_sigwinch(self, *args): os.write(self.signalPipe[1], b"w") def terminal_emulation(self, active, event_queue): try: old_attr = termios.tcgetattr(sys.stdin) tty.setraw(0) lines, columns = self.get_terminal_size(0) if self.command == "": self.command = screen_utils.get_shell() self.terminal, self.p_pid, self.p_out = self.open_terminal( columns, lines, self.command ) lines, columns = self.resize_terminal(self.p_out) self.terminal.resize(lines, columns) fd_list = [sys.stdin, self.p_out, self.signalPipe[0]] while active.value: r, _, _ = select(fd_list, [], [], self.pty_config.get('select_timeout', PTYConstants.SELECT_TIMEOUT)) # Configurable timeout # none if r == []: continue # signals if self.signalPipe[0] in r: os.read(self.signalPipe[0], 1) lines, columns = self.resize_terminal(self.p_out) self.terminal.resize(lines, columns) # input if sys.stdin in r: try: msg_bytes = self.read_all(sys.stdin.fileno(), timeout=self.pty_config.get('input_timeout', PTYConstants.INPUT_READ_TIMEOUT), len=PTYConstants.INPUT_BUFFER_SIZE) except (EOFError, OSError): event_queue.put( { "Type": FenrirEventType.stop_main_loop, "data": None, } ) break if self.shortcutType == "KEY": try: self.inject_text_to_screen(msg_bytes) except Exception as e: self.env["runtime"][ "DebugManager" ].write_debug_out( "ptyDriver getInputData: Error injecting text to screen: " + str(e), debug.DebugLevel.ERROR, ) event_queue.put( { "Type": FenrirEventType.stop_main_loop, "data": None, } ) break else: event_queue.put( { "Type": FenrirEventType.byte_input, "data": msg_bytes, } ) # output if self.p_out in r: try: msg_bytes = self.read_all( self.p_out.fileno(), timeout=self.pty_config.get('output_timeout', PTYConstants.OUTPUT_READ_TIMEOUT), interruptFd=sys.stdin.fileno() ) except (EOFError, OSError): event_queue.put( { "Type": FenrirEventType.stop_main_loop, "data": None, } ) break # Synchronize terminal operations to prevent race conditions with self.terminal_lock: # Feed data to terminal and get consistent screen state self.terminal.feed(msg_bytes) screen_content = self.terminal.get_screen_content() # Send screen update event with consistent state event_queue.put( { "Type": FenrirEventType.screen_update, "data": screen_utils.create_screen_event_data( screen_content ), } ) # Inject to actual screen (outside lock to avoid blocking) self.inject_text_to_screen( msg_bytes, screen=sys.stdout.fileno() ) except Exception as e: # Process died? self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver terminal_emulation: Exception occurred: {e}", debug.DebugLevel.ERROR ) event_queue.put( {"Type": FenrirEventType.stop_main_loop, "data": None} ) finally: self._safe_cleanup_process() self._safe_cleanup_resources(old_attr) event_queue.put( {"Type": FenrirEventType.stop_main_loop, "data": None} ) def _safe_cleanup_process(self): """Safely terminate the child process with timeout and fallback to SIGKILL.""" if not hasattr(self, 'p_pid') or self.p_pid is None: return try: # Check if process is still alive os.kill(self.p_pid, 0) # Signal 0 checks if process exists except OSError: # Process already dead self.p_pid = None return try: # Try graceful termination first self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver: Terminating process {self.p_pid} gracefully", debug.DebugLevel.INFO ) os.kill(self.p_pid, signal.SIGTERM) # Wait for graceful termination timeout = self.pty_config.get('process_termination_timeout', PTYConstants.PROCESS_TERMINATION_TIMEOUT) start_time = time.time() while time.time() - start_time < timeout: try: os.kill(self.p_pid, 0) # Check if still alive time.sleep(0.1) except OSError: # Process terminated gracefully self.p_pid = None return # Process didn't terminate gracefully, use SIGKILL self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver: Process {self.p_pid} didn't terminate gracefully, using SIGKILL", debug.DebugLevel.WARNING ) os.kill(self.p_pid, signal.SIGKILL) time.sleep(PTYConstants.PROCESS_KILL_DELAY) # Give it a moment except OSError as e: self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver: Error terminating process {self.p_pid}: {e}", debug.DebugLevel.ERROR ) finally: self.p_pid = None def _safe_cleanup_resources(self, old_attr=None): """Safely clean up file descriptors and terminal attributes.""" # Close output pipe safely if hasattr(self, 'p_out') and self.p_out is not None: try: self.p_out.close() self.env["runtime"]["DebugManager"].write_debug_out( "PTY screenDriver: Closed output pipe", debug.DebugLevel.INFO ) except Exception as e: self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver: Error closing output pipe: {e}", debug.DebugLevel.ERROR ) finally: self.p_out = None # Restore terminal attributes safely if old_attr is not None: try: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr) self.env["runtime"]["DebugManager"].write_debug_out( "PTY screenDriver: Restored terminal attributes", debug.DebugLevel.INFO ) except Exception as e: self.env["runtime"]["DebugManager"].write_debug_out( f"PTY screenDriver: Error restoring terminal attributes: {e}", debug.DebugLevel.ERROR ) def get_curr_application(self): pass