initial improvements to pty driver. Improved clipboard handling of multibyte characters. Added emoji menu to vmenu. It places the emoji to the clipboard to be used wherever.

This commit is contained in:
Storm Dragon
2025-07-16 19:34:56 -04:00
parent ae4c418323
commit d1bad818cd
60 changed files with 1503 additions and 89 deletions

View File

@ -13,6 +13,7 @@ import signal
import struct
import sys
import termios
import threading
import time
import tty
from select import select
@ -32,16 +33,39 @@ class FenrirScreen(pyte.Screen):
class Terminal:
def __init__(self, columns, lines, p_in):
def __init__(self, columns, lines, p_in, env=None):
self.text = ""
self.attributes = None
self.screen = FenrirScreen(columns, lines)
self.env = env # Environment for proper logging
# Pre-create default attribute template to avoid repeated allocation
self._default_attribute = [
"default", "default", False, False, False, False, False, False,
"default", "default"
]
self.screen.write_process_input = lambda data: p_in.write(
data.encode()
)
self.stream = pyte.ByteStream()
self.stream.attach(self.screen)
def _log_error(self, message, level=None):
"""Log error message using proper debug manager if available."""
if self.env and "runtime" in self.env and "DebugManager" in self.env["runtime"]:
try:
log_level = level if level else debug.DebugLevel.ERROR
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY Terminal: {message}",
log_level
)
return
except Exception:
pass # Fallback to print if debug manager fails
# Fallback logging when debug manager unavailable
print(f"PTY Terminal: {message}")
def feed(self, data):
self.stream.feed(data)
@ -52,45 +76,57 @@ class Terminal:
lines = self.screen.dirty
else:
lines = range(self.screen.lines)
self.attributes = [
[
list(attribute[1:]) + [False, "default", "default"]
for attribute in line.values()
]
for line in buffer.values()
]
for y in lines:
try:
t = self.attributes[y]
self.attributes = [
[
list(attribute[1:]) + [False, "default", "default"]
if len(attribute) > 1 else [False, "default", "default"]
for attribute in line.values()
]
for line in buffer.values()
]
except Exception as e:
# Terminal class doesn't have access to env, use fallback
# logging
print(
f"ptyDriver Terminal update_attributes: Error accessing "
f"attributes: {e}"
self._log_error(f"Error initializing attributes: {e}")
# Fallback to empty attributes
self.attributes = [[] for _ in range(self.screen.lines)]
for y in lines:
# Validate y is within reasonable bounds (prevent memory exhaustion)
max_lines = 10000 # Reasonable maximum for terminal applications
if y >= max_lines:
self._log_error(
f"Line index {y} exceeds maximum {max_lines}, "
f"skipping attribute update",
debug.DebugLevel.WARNING
)
continue
# Check if line y exists in buffer before accessing it
if y not in buffer:
# Only log this occasionally to prevent spam
if y % 10 == 0: # Log every 10th missing line
self._log_error(
f"Lines {y}-{y+9} not found in buffer, skipping attribute updates",
debug.DebugLevel.WARNING
)
continue
# Ensure attributes array is large enough for line y
while len(self.attributes) <= y:
self.attributes.append([])
self.attributes[y] = [
list(attribute[1:]) + [False, "default", "default"]
for attribute in (buffer[y].values())
]
try:
self.attributes[y] = [
list(attribute[1:]) + [False, "default", "default"]
for attribute in (buffer[y].values())
]
except Exception as e:
self._log_error(f"Error updating attributes for line {y}: {e}")
# Initialize with empty attributes if update fails
self.attributes[y] = []
if len(self.attributes[y]) < self.screen.columns:
diff = self.screen.columns - len(self.attributes[y])
self.attributes[y] += [
[
"default",
"default",
False,
False,
False,
False,
False,
False,
"default",
"default",
]
] * diff
# Use pre-created template for efficiency
self.attributes[y] += [self._default_attribute[:] for _ in range(diff)]
def resize(self, lines, columns):
self.screen.resize(lines, columns)
@ -98,31 +134,35 @@ class Terminal:
self.update_attributes(True)
def set_cursor(self, x=-1, y=-1):
x_pos = x
y_pos = y
if x_pos == -1:
x_pos = self.screen.cursor.x
if y_pos == -1:
y_pos = self.screen.cursor.y
self.screen.cursor.x = min(
self.screen.cursor.x, self.screen.columns - 1
)
self.screen.cursor.y = min(self.screen.cursor.y, self.screen.lines - 1)
# Determine target cursor position
x_pos = x if x != -1 else self.screen.cursor.x
y_pos = y if y != -1 else self.screen.cursor.y
# Validate and clamp cursor position to screen bounds
max_x = max(0, self.screen.columns - 1)
max_y = max(0, self.screen.lines - 1)
self.screen.cursor.x = max(0, min(x_pos, max_x))
self.screen.cursor.y = max(0, min(y_pos, max_y))
def get_screen_content(self):
cursor = self.screen.cursor
self.text = "\n".join(self.screen.display)
self.update_attributes(self.attributes is None)
self.screen.dirty.clear()
return {
# Return screen content without unnecessary copying
# Only copy attributes if they exist and need protection
screen_data = {
"cursor": (cursor.x, cursor.y),
"lines": self.screen.lines,
"columns": self.screen.columns,
"text": self.text,
"attributes": self.attributes.copy(),
"attributes": self.attributes[:] if self.attributes else [], # Shallow copy only if needed
"screen": "pty",
"screenUpdateTime": time.time(),
}.copy()
}
return screen_data
class driver(screenDriver):
@ -132,6 +172,7 @@ class driver(screenDriver):
self.p_out = None
self.terminal = None
self.p_pid = -1
self.terminal_lock = threading.Lock() # Synchronize terminal operations
signal.signal(signal.SIGWINCH, self.handle_sigwinch)
def initialize(self, environment):
@ -163,28 +204,56 @@ class driver(screenDriver):
self.env["general"]["curr_user"] = getpass.getuser()
def read_all(self, fd, timeout=0.3, interruptFd=None, len=65536):
"""Read all available data from file descriptor with efficient polling.
Uses progressively longer wait times to balance responsiveness with CPU usage.
"""
msg_bytes = b""
fd_list = []
fd_list += [fd]
fd_list = [fd]
if interruptFd:
fd_list += [interruptFd]
fd_list.append(interruptFd)
starttime = time.time()
poll_timeout = 0.001 # Start with 1ms, stay responsive
while True:
r = screen_utils.has_more_what(fd_list, 0.0001)
# nothing more to read
# Use consistent short polling for responsiveness
r = screen_utils.has_more_what(fd_list, poll_timeout)
# Nothing more to read
if fd not in r:
# Check overall timeout
if (time.time() - starttime) >= timeout:
break
continue
try:
data = os.read(fd, len)
if data == b"":
raise EOFError
msg_bytes += data
except OSError as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver read_all: OS error reading from fd {fd}: {e}",
debug.DebugLevel.ERROR
)
# For I/O errors, exit immediately to prevent endless retry loops
if e.errno == 5: # Input/output error
self.env["runtime"]["DebugManager"].write_debug_out(
"PTY screenDriver: Terminal connection lost, stopping read loop",
debug.DebugLevel.ERROR
)
raise EOFError("Terminal connection lost")
break
data = os.read(fd, len)
if data == b"":
raise EOFError
msg_bytes += data
# exit on interrupt available
if interruptFd in r:
# Exit on interrupt available
if interruptFd and interruptFd in r:
break
# respect timeout but wait a little bit of time to see if something
# more is here
# Check overall timeout
if (time.time() - starttime) >= timeout:
break
return msg_bytes
def open_terminal(self, columns, lines, command):
@ -197,16 +266,16 @@ class driver(screenDriver):
if env["TERM"] == "":
env["TERM"] = "linux"
except Exception as e:
# Child process doesn't have access to env, use fallback
# logging
# Child process doesn't have access to debug manager
# Use fallback logging with more context
print(
f"ptyDriver spawnTerminal: Error checking TERM environment: {e}"
f"ptyDriver open_terminal (child): TERM environment error: {e}"
)
env["TERM"] = "linux"
os.execvpe(argv[0], argv, env)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines, p_out), p_pid, p_out
return Terminal(columns, lines, p_out, self.env), p_pid, p_out
def resize_terminal(self, fd):
s = struct.pack("HHHH", 0, 0, 0, 0)
@ -239,7 +308,7 @@ class driver(screenDriver):
self.terminal.resize(lines, columns)
fd_list = [sys.stdin, self.p_out, self.signalPipe[0]]
while active.value:
r, _, _ = select(fd_list, [], [], 1)
r, _, _ = select(fd_list, [], [], 0.05) # 50ms timeout for responsiveness
# none
if r == []:
continue
@ -251,7 +320,7 @@ class driver(screenDriver):
# input
if sys.stdin in r:
try:
msg_bytes = self.read_all(sys.stdin.fileno(), len=4096)
msg_bytes = self.read_all(sys.stdin.fileno(), timeout=0.01, len=4096)
except (EOFError, OSError):
event_queue.put(
{
@ -299,34 +368,121 @@ class driver(screenDriver):
}
)
break
# feed and send event bevore write, the pyte already has the right state
# so fenrir already can progress bevore os.write what
# should give some better reaction time
self.terminal.feed(msg_bytes)
event_queue.put(
{
"Type": FenrirEventType.screen_update,
"data": screen_utils.create_screen_event_data(
self.terminal.get_screen_content()
),
}
)
# Synchronize terminal operations to prevent race conditions
with self.terminal_lock:
# Feed data to terminal and get consistent screen state
self.terminal.feed(msg_bytes)
screen_content = self.terminal.get_screen_content()
# Send screen update event with consistent state
event_queue.put(
{
"Type": FenrirEventType.screen_update,
"data": screen_utils.create_screen_event_data(
screen_content
),
}
)
# Inject to actual screen (outside lock to avoid blocking)
self.inject_text_to_screen(
msg_bytes, screen=sys.stdout.fileno()
)
except Exception as e: # Process died?
print(e)
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver terminal_emulation: Exception occurred: {e}",
debug.DebugLevel.ERROR
)
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None}
)
finally:
os.kill(self.p_pid, signal.SIGTERM)
self.p_out.close()
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
self._safe_cleanup_process()
self._safe_cleanup_resources(old_attr)
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None}
)
sys.exit(0)
def _safe_cleanup_process(self):
"""Safely terminate the child process with timeout and fallback to SIGKILL."""
if not hasattr(self, 'p_pid') or self.p_pid is None:
return
try:
# Check if process is still alive
os.kill(self.p_pid, 0) # Signal 0 checks if process exists
except OSError:
# Process already dead
self.p_pid = None
return
try:
# Try graceful termination first
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver: Terminating process {self.p_pid} gracefully",
debug.DebugLevel.INFO
)
os.kill(self.p_pid, signal.SIGTERM)
# Wait up to 3 seconds for graceful termination
timeout = 3.0
start_time = time.time()
while time.time() - start_time < timeout:
try:
os.kill(self.p_pid, 0) # Check if still alive
time.sleep(0.1)
except OSError:
# Process terminated gracefully
self.p_pid = None
return
# Process didn't terminate gracefully, use SIGKILL
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver: Process {self.p_pid} didn't terminate gracefully, using SIGKILL",
debug.DebugLevel.WARNING
)
os.kill(self.p_pid, signal.SIGKILL)
time.sleep(0.5) # Give it a moment
except OSError as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver: Error terminating process {self.p_pid}: {e}",
debug.DebugLevel.ERROR
)
finally:
self.p_pid = None
def _safe_cleanup_resources(self, old_attr=None):
"""Safely clean up file descriptors and terminal attributes."""
# Close output pipe safely
if hasattr(self, 'p_out') and self.p_out is not None:
try:
self.p_out.close()
self.env["runtime"]["DebugManager"].write_debug_out(
"PTY screenDriver: Closed output pipe",
debug.DebugLevel.INFO
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver: Error closing output pipe: {e}",
debug.DebugLevel.ERROR
)
finally:
self.p_out = None
# Restore terminal attributes safely
if old_attr is not None:
try:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
self.env["runtime"]["DebugManager"].write_debug_out(
"PTY screenDriver: Restored terminal attributes",
debug.DebugLevel.INFO
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"PTY screenDriver: Error restoring terminal attributes: {e}",
debug.DebugLevel.ERROR
)
def get_curr_application(self):
pass