import os import struct import sys import pty import tty import shlex import signal import select import pyte class Terminal: def __init__(self, columns, lines, p_in): self.screen = pyte.HistoryScreen(columns, lines) self.screen.set_mode(pyte.modes.LNM) self.screen.write_process_input = \ lambda data: p_in.write(data.encode()) self.stream = pyte.ByteStream() self.stream.attach(self.screen) def feed(self, data): self.stream.feed(data) def dump(self): cursor = self.screen.cursor lines = [] for y in self.screen.dirty: line = self.screen.buffer[y] data = [(char.data, char.reverse, char.fg, char.bg) for char in (line[x] for x in range(self.screen.columns))] lines.append((y, data)) self.screen.dirty.clear() return {"c": (cursor.x, cursor.y), "lines": lines} def open_terminal(command="bash -i", columns=80, lines=24): p_pid, master_fd = pty.fork() if p_pid == 0: # Child. argv = shlex.split(command) env = dict(TERM="linux", LC_ALL="de_DE.UTF-8", COLUMNS=str(columns), LINES=str(lines)) os.execvpe(argv[0], argv, env) # File-like object for I/O with the child process aka command. p_out = os.fdopen(master_fd, "w+b", 0) return Terminal(columns, lines, p_out), p_pid, p_out def convert_to_text(dump): lines = dump['lines'] strLines = '' for line in lines: strLines += str(line[0]) for c in line[1]: strLines += c[0] return strLines def HandleTerminal(): try: terminal, p_pid, p_out = open_terminal() signal_pipe = os.pipe() Running = True tty.setraw(0) while Running: try: r, w, x = select.select([sys.stdin, p_out,signal_pipe[0]],[],[],0) if r == []: continue for fd in r: if fd == signal_pipe[0]: msgBytes = os.read(signal_pipe[0], 1) if fd == sys.stdin: msgBytes = os.read(sys.stdin.fileno(), 65536) terminal.feed(msgBytes) p_out.write(msgBytes) if fd == p_out: try: msgBytes = p_out.read(65536) except (EOFError, OSError): sys.exit(0) terminal.feed(msgBytes) os.write(sys.stdout.fileno(), msgBytes) #print(terminal.screen.display) except Exception as e: # Process died? print(e) Running = False except Exception as e: # Process died? print(e) Running = False finally: p_out.close() os.kill(p_pid, signal.SIGTERM) def get_terminal_size(fd): s = struct.pack('HHHH', 0, 0, 0, 0) rows, cols, _, _ = struct.unpack('HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s)) return rows, cols def resize_terminal(fd): s = struct.pack('HHHH', 0, 0, 0, 0) s = fcntl.ioctl(0, termios.TIOCGWINSZ, s) fcntl.ioctl(fd, termios.TIOCSWINSZ, s) rows, cols, _, _ = struct.unpack('hhhh', s) return rows, cols if __name__ == "__main__": HandleTerminal()