import os import struct import sys import pty import tty import termios import shlex import signal import select import pyte import time class Terminal: def __init__(self, columns, lines, p_in): self.screen = pyte.HistoryScreen(columns, lines) self.screen.set_mode(pyte.modes.LNM) self.screen.write_process_input = \ lambda data: p_in.write(data.encode()) self.stream = pyte.ByteStream() self.stream.attach(self.screen) def feed(self, data): self.stream.feed(data) def dump(self): cursor = self.screen.cursor lines = [] for y in self.screen.dirty: line = self.screen.buffer[y] data = [(char.data, char.reverse, char.fg, char.bg) for char in (line[x] for x in range(self.screen.columns))] lines.append((y, data)) self.screen.dirty.clear() return {"c": (cursor.x, cursor.y), "lines": lines} def open_terminal(command="bash", columns=80, lines=25): p_pid, master_fd = pty.fork() print('PID',p_pid) if p_pid == 0: # Child. argv = shlex.split(command) env = dict(TERM="linux", LC_ALL=sys.stdout.encoding, COLUMNS=str(columns), LINES=str(lines)) os.execvpe(argv[0], argv, env) # File-like object for I/O with the child process aka command. p_out = os.fdopen(master_fd, "w+b", 0) return Terminal(columns, lines, p_out), p_pid, p_out def HandleTerminal(): debug = False running = True #attr = termios.tcgetattr(sys.stdin.fileno()) try: old_attr = termios.tcgetattr(sys.stdin) tty.setraw(0) terminal, p_pid, p_out = open_terminal() std_out = os.fdopen(sys.stdout.fileno(), "w+b", 0) #termios.tcdrain(p_pid) #termios.tcdrain(0) while running: time.sleep(0.04) r, w, x = select.select([sys.stdin, p_out],[],[]) if r == []: continue if p_out in r: if debug: print('pre p_out') try: msgBytes = read_all(p_out.fileno()) #msgBytes = p_out.read(65536) #msgBytes = os.read(p_out.fileno(), 4) #p_out.read(4096) except (EOFError, OSError): running = False break #sys.exit(0) os.write(sys.stdout.fileno(), msgBytes) sys.stdout.flush() #print(terminal.screen.display) if debug: print('after p_out') if sys.stdin in r: if debug: print('pre stdin') try: msgBytes = read_all(sys.stdin.fileno()) #msgBytes = p_out.read(65536) #msgBytes = os.read(p_out.fileno(), 4) #p_out.read(4096) except (EOFError, OSError): running = False break #msgBytes = os.read(sys.stdin.fileno(), 1) terminal.feed(msgBytes) os.write(p_out.fileno(), msgBytes) if debug: print('after stdin') except Exception as e: # Process died? print(e) running = False finally: #termios.tcsetattr(0, termios.TCSADRAIN, attr) os.kill(p_pid, signal.SIGTERM) p_out.close() termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr) def get_terminal_size(fd): s = struct.pack('HHHH', 0, 0, 0, 0) rows, cols, _, _ = struct.unpack('HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s)) return rows, cols def resize_terminal(fd): s = struct.pack('HHHH', 0, 0, 0, 0) s = fcntl.ioctl(0, termios.TIOCGWINSZ, s) fcntl.ioctl(fd, termios.TIOCSWINSZ, s) rows, cols, _, _ = struct.unpack('hhhh', s) return rows, cols def read_all(fd): bytes = os.read(fd, 4096) if bytes == b'': raise EOFError while has_more(fd): data = os.read(fd, 4096) if data == b'': raise EOFError bytes += data return bytes def has_more(fd): r, w, e = select.select([fd], [], [], 0) return (fd in r) if __name__ == "__main__": HandleTerminal()