More work on the pty driver.

This commit is contained in:
Storm Dragon
2025-07-16 19:43:07 -04:00
parent d1bad818cd
commit 579bf0f0f0

View File

@ -25,6 +25,26 @@ from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver
from fenrirscreenreader.utils import screen_utils
# PTY Driver Constants
class PTYConstants:
# Timeouts (in seconds)
DEFAULT_READ_TIMEOUT = 0.3
INPUT_READ_TIMEOUT = 0.01
SELECT_TIMEOUT = 0.05
PROCESS_TERMINATION_TIMEOUT = 3.0
PROCESS_KILL_DELAY = 0.5
# Polling intervals (in seconds)
MIN_POLL_INTERVAL = 0.001
# Limits
MAX_TERMINAL_LINES = 10000
DEFAULT_READ_BUFFER_SIZE = 65536
INPUT_BUFFER_SIZE = 4096
# Error codes
IO_ERROR_ERRNO = 5
class FenrirScreen(pyte.Screen):
def set_margins(self, *args, **kwargs):
@ -91,10 +111,9 @@ class Terminal:
self.attributes = [[] for _ in range(self.screen.lines)]
for y in lines:
# Validate y is within reasonable bounds (prevent memory exhaustion)
max_lines = 10000 # Reasonable maximum for terminal applications
if y >= max_lines:
if y >= PTYConstants.MAX_TERMINAL_LINES:
self._log_error(
f"Line index {y} exceeds maximum {max_lines}, "
f"Line index {y} exceeds maximum {PTYConstants.MAX_TERMINAL_LINES}, "
f"skipping attribute update",
debug.DebugLevel.WARNING
)
@ -104,8 +123,10 @@ class Terminal:
if y not in buffer:
# Only log this occasionally to prevent spam
if y % 10 == 0: # Log every 10th missing line
# Pre-format string to avoid repeated f-string operations
line_range = f"{y}-{y+9}"
self._log_error(
f"Lines {y}-{y+9} not found in buffer, skipping attribute updates",
f"Lines {line_range} not found in buffer, skipping attribute updates",
debug.DebugLevel.WARNING
)
continue
@ -147,7 +168,9 @@ class Terminal:
def get_screen_content(self):
cursor = self.screen.cursor
self.text = "\n".join(self.screen.display)
# Only regenerate text if screen is dirty or text doesn't exist
if not hasattr(self, 'text') or self.screen.dirty:
self.text = "\n".join(self.screen.display)
self.update_attributes(self.attributes is None)
self.screen.dirty.clear()
@ -174,12 +197,58 @@ class driver(screenDriver):
self.p_pid = -1
self.terminal_lock = threading.Lock() # Synchronize terminal operations
signal.signal(signal.SIGWINCH, self.handle_sigwinch)
# Runtime configuration storage
self.pty_config = {}
def _load_pty_settings(self):
"""Load PTY-specific settings from configuration with fallbacks to defaults."""
try:
settings_manager = self.env["runtime"]["SettingsManager"]
# Load timeout settings with defaults
self.pty_config = {
'input_timeout': float(settings_manager.get_setting(
'screen', 'ptyInputTimeout', PTYConstants.INPUT_READ_TIMEOUT
)),
'select_timeout': float(settings_manager.get_setting(
'screen', 'ptySelectTimeout', PTYConstants.SELECT_TIMEOUT
)),
'process_termination_timeout': float(settings_manager.get_setting(
'screen', 'ptyProcessTimeout', PTYConstants.PROCESS_TERMINATION_TIMEOUT
)),
'poll_interval': float(settings_manager.get_setting(
'screen', 'ptyPollInterval', PTYConstants.MIN_POLL_INTERVAL
))
}
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver: Loaded configuration: {self.pty_config}",
debug.DebugLevel.INFO
)
except Exception as e:
# Fallback to constants if settings fail
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver: Failed to load settings, using defaults: {e}",
debug.DebugLevel.WARNING
)
self.pty_config = {
'input_timeout': PTYConstants.INPUT_READ_TIMEOUT,
'select_timeout': PTYConstants.SELECT_TIMEOUT,
'process_termination_timeout': PTYConstants.PROCESS_TERMINATION_TIMEOUT,
'poll_interval': PTYConstants.MIN_POLL_INTERVAL
}
def initialize(self, environment):
self.env = environment
self.command = self.env["runtime"]["SettingsManager"].get_setting(
"general", "shell"
)
# Load configurable timeouts from settings
self._load_pty_settings()
self.shortcutType = self.env["runtime"][
"InputManager"
].get_shortcut_type()
@ -203,7 +272,7 @@ class driver(screenDriver):
self.env["general"]["prev_user"] = getpass.getuser()
self.env["general"]["curr_user"] = getpass.getuser()
def read_all(self, fd, timeout=0.3, interruptFd=None, len=65536):
def read_all(self, fd, timeout=PTYConstants.DEFAULT_READ_TIMEOUT, interruptFd=None, len=PTYConstants.DEFAULT_READ_BUFFER_SIZE):
"""Read all available data from file descriptor with efficient polling.
Uses progressively longer wait times to balance responsiveness with CPU usage.
@ -214,7 +283,7 @@ class driver(screenDriver):
fd_list.append(interruptFd)
starttime = time.time()
poll_timeout = 0.001 # Start with 1ms, stay responsive
poll_timeout = self.pty_config.get('poll_interval', PTYConstants.MIN_POLL_INTERVAL) # Use configured interval
while True:
# Use consistent short polling for responsiveness
@ -238,7 +307,7 @@ class driver(screenDriver):
debug.DebugLevel.ERROR
)
# For I/O errors, exit immediately to prevent endless retry loops
if e.errno == 5: # Input/output error
if e.errno == PTYConstants.IO_ERROR_ERRNO: # Input/output error
self.env["runtime"]["DebugManager"].write_debug_out(
"PTY screenDriver: Terminal connection lost, stopping read loop",
debug.DebugLevel.ERROR
@ -308,7 +377,7 @@ class driver(screenDriver):
self.terminal.resize(lines, columns)
fd_list = [sys.stdin, self.p_out, self.signalPipe[0]]
while active.value:
r, _, _ = select(fd_list, [], [], 0.05) # 50ms timeout for responsiveness
r, _, _ = select(fd_list, [], [], self.pty_config.get('select_timeout', PTYConstants.SELECT_TIMEOUT)) # Configurable timeout
# none
if r == []:
continue
@ -320,7 +389,7 @@ class driver(screenDriver):
# input
if sys.stdin in r:
try:
msg_bytes = self.read_all(sys.stdin.fileno(), timeout=0.01, len=4096)
msg_bytes = self.read_all(sys.stdin.fileno(), timeout=self.pty_config.get('input_timeout', PTYConstants.INPUT_READ_TIMEOUT), len=PTYConstants.INPUT_BUFFER_SIZE)
except (EOFError, OSError):
event_queue.put(
{
@ -424,8 +493,8 @@ class driver(screenDriver):
)
os.kill(self.p_pid, signal.SIGTERM)
# Wait up to 3 seconds for graceful termination
timeout = 3.0
# Wait for graceful termination
timeout = self.pty_config.get('process_termination_timeout', PTYConstants.PROCESS_TERMINATION_TIMEOUT)
start_time = time.time()
while time.time() - start_time < timeout:
try:
@ -442,7 +511,7 @@ class driver(screenDriver):
debug.DebugLevel.WARNING
)
os.kill(self.p_pid, signal.SIGKILL)
time.sleep(0.5) # Give it a moment
time.sleep(PTYConstants.PROCESS_KILL_DELAY) # Give it a moment
except OSError as e:
self.env["runtime"]["DebugManager"].write_debug_out(