diff --git a/src/fenrirscreenreader/screenDriver/ptyDriver.py b/src/fenrirscreenreader/screenDriver/ptyDriver.py index e70d78ad..27ce7852 100644 --- a/src/fenrirscreenreader/screenDriver/ptyDriver.py +++ b/src/fenrirscreenreader/screenDriver/ptyDriver.py @@ -25,6 +25,26 @@ from fenrirscreenreader.core.eventData import FenrirEventType from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver from fenrirscreenreader.utils import screen_utils +# PTY Driver Constants +class PTYConstants: + # Timeouts (in seconds) + DEFAULT_READ_TIMEOUT = 0.3 + INPUT_READ_TIMEOUT = 0.01 + SELECT_TIMEOUT = 0.05 + PROCESS_TERMINATION_TIMEOUT = 3.0 + PROCESS_KILL_DELAY = 0.5 + + # Polling intervals (in seconds) + MIN_POLL_INTERVAL = 0.001 + + # Limits + MAX_TERMINAL_LINES = 10000 + DEFAULT_READ_BUFFER_SIZE = 65536 + INPUT_BUFFER_SIZE = 4096 + + # Error codes + IO_ERROR_ERRNO = 5 + class FenrirScreen(pyte.Screen): def set_margins(self, *args, **kwargs): @@ -91,10 +111,9 @@ class Terminal: self.attributes = [[] for _ in range(self.screen.lines)] for y in lines: # Validate y is within reasonable bounds (prevent memory exhaustion) - max_lines = 10000 # Reasonable maximum for terminal applications - if y >= max_lines: + if y >= PTYConstants.MAX_TERMINAL_LINES: self._log_error( - f"Line index {y} exceeds maximum {max_lines}, " + f"Line index {y} exceeds maximum {PTYConstants.MAX_TERMINAL_LINES}, " f"skipping attribute update", debug.DebugLevel.WARNING ) @@ -104,8 +123,10 @@ class Terminal: if y not in buffer: # Only log this occasionally to prevent spam if y % 10 == 0: # Log every 10th missing line + # Pre-format string to avoid repeated f-string operations + line_range = f"{y}-{y+9}" self._log_error( - f"Lines {y}-{y+9} not found in buffer, skipping attribute updates", + f"Lines {line_range} not found in buffer, skipping attribute updates", debug.DebugLevel.WARNING ) continue @@ -147,7 +168,9 @@ class Terminal: def get_screen_content(self): cursor = self.screen.cursor - self.text = "\n".join(self.screen.display) + # Only regenerate text if screen is dirty or text doesn't exist + if not hasattr(self, 'text') or self.screen.dirty: + self.text = "\n".join(self.screen.display) self.update_attributes(self.attributes is None) self.screen.dirty.clear() @@ -174,12 +197,58 @@ class driver(screenDriver): self.p_pid = -1 self.terminal_lock = threading.Lock() # Synchronize terminal operations signal.signal(signal.SIGWINCH, self.handle_sigwinch) + + # Runtime configuration storage + self.pty_config = {} + + def _load_pty_settings(self): + """Load PTY-specific settings from configuration with fallbacks to defaults.""" + try: + settings_manager = self.env["runtime"]["SettingsManager"] + + # Load timeout settings with defaults + self.pty_config = { + 'input_timeout': float(settings_manager.get_setting( + 'screen', 'ptyInputTimeout', PTYConstants.INPUT_READ_TIMEOUT + )), + 'select_timeout': float(settings_manager.get_setting( + 'screen', 'ptySelectTimeout', PTYConstants.SELECT_TIMEOUT + )), + 'process_termination_timeout': float(settings_manager.get_setting( + 'screen', 'ptyProcessTimeout', PTYConstants.PROCESS_TERMINATION_TIMEOUT + )), + 'poll_interval': float(settings_manager.get_setting( + 'screen', 'ptyPollInterval', PTYConstants.MIN_POLL_INTERVAL + )) + } + + self.env["runtime"]["DebugManager"].write_debug_out( + f"PTY screenDriver: Loaded configuration: {self.pty_config}", + debug.DebugLevel.INFO + ) + + except Exception as e: + # Fallback to constants if settings fail + self.env["runtime"]["DebugManager"].write_debug_out( + f"PTY screenDriver: Failed to load settings, using defaults: {e}", + debug.DebugLevel.WARNING + ) + self.pty_config = { + 'input_timeout': PTYConstants.INPUT_READ_TIMEOUT, + 'select_timeout': PTYConstants.SELECT_TIMEOUT, + 'process_termination_timeout': PTYConstants.PROCESS_TERMINATION_TIMEOUT, + 'poll_interval': PTYConstants.MIN_POLL_INTERVAL + } def initialize(self, environment): self.env = environment self.command = self.env["runtime"]["SettingsManager"].get_setting( "general", "shell" ) + + # Load configurable timeouts from settings + self._load_pty_settings() + self.shortcutType = self.env["runtime"][ "InputManager" ].get_shortcut_type() @@ -203,7 +272,7 @@ class driver(screenDriver): self.env["general"]["prev_user"] = getpass.getuser() self.env["general"]["curr_user"] = getpass.getuser() - def read_all(self, fd, timeout=0.3, interruptFd=None, len=65536): + def read_all(self, fd, timeout=PTYConstants.DEFAULT_READ_TIMEOUT, interruptFd=None, len=PTYConstants.DEFAULT_READ_BUFFER_SIZE): """Read all available data from file descriptor with efficient polling. Uses progressively longer wait times to balance responsiveness with CPU usage. @@ -214,7 +283,7 @@ class driver(screenDriver): fd_list.append(interruptFd) starttime = time.time() - poll_timeout = 0.001 # Start with 1ms, stay responsive + poll_timeout = self.pty_config.get('poll_interval', PTYConstants.MIN_POLL_INTERVAL) # Use configured interval while True: # Use consistent short polling for responsiveness @@ -238,7 +307,7 @@ class driver(screenDriver): debug.DebugLevel.ERROR ) # For I/O errors, exit immediately to prevent endless retry loops - if e.errno == 5: # Input/output error + if e.errno == PTYConstants.IO_ERROR_ERRNO: # Input/output error self.env["runtime"]["DebugManager"].write_debug_out( "PTY screenDriver: Terminal connection lost, stopping read loop", debug.DebugLevel.ERROR @@ -308,7 +377,7 @@ class driver(screenDriver): self.terminal.resize(lines, columns) fd_list = [sys.stdin, self.p_out, self.signalPipe[0]] while active.value: - r, _, _ = select(fd_list, [], [], 0.05) # 50ms timeout for responsiveness + r, _, _ = select(fd_list, [], [], self.pty_config.get('select_timeout', PTYConstants.SELECT_TIMEOUT)) # Configurable timeout # none if r == []: continue @@ -320,7 +389,7 @@ class driver(screenDriver): # input if sys.stdin in r: try: - msg_bytes = self.read_all(sys.stdin.fileno(), timeout=0.01, len=4096) + msg_bytes = self.read_all(sys.stdin.fileno(), timeout=self.pty_config.get('input_timeout', PTYConstants.INPUT_READ_TIMEOUT), len=PTYConstants.INPUT_BUFFER_SIZE) except (EOFError, OSError): event_queue.put( { @@ -424,8 +493,8 @@ class driver(screenDriver): ) os.kill(self.p_pid, signal.SIGTERM) - # Wait up to 3 seconds for graceful termination - timeout = 3.0 + # Wait for graceful termination + timeout = self.pty_config.get('process_termination_timeout', PTYConstants.PROCESS_TERMINATION_TIMEOUT) start_time = time.time() while time.time() - start_time < timeout: try: @@ -442,7 +511,7 @@ class driver(screenDriver): debug.DebugLevel.WARNING ) os.kill(self.p_pid, signal.SIGKILL) - time.sleep(0.5) # Give it a moment + time.sleep(PTYConstants.PROCESS_KILL_DELAY) # Give it a moment except OSError as e: self.env["runtime"]["DebugManager"].write_debug_out(