import os import struct import sys import pty import tty import termios import shlex import signal import select import pyte import time class Terminal: def __init__(self, columns, lines, p_in): self.screen = pyte.HistoryScreen(columns, lines) self.screen.set_mode(pyte.modes.LNM) self.screen.write_process_input = \ lambda data: p_in.write(data.encode()) self.stream = pyte.ByteStream() self.stream.attach(self.screen) def feed(self, data): self.stream.feed(data) def dump(self): cursor = self.screen.cursor lines = [] for y in self.screen.dirty: line = self.screen.buffer[y] data = [(char.data, char.reverse, char.fg, char.bg) for char in (line[x] for x in range(self.screen.columns))] lines.append((y, data)) self.screen.dirty.clear() return {"c": (cursor.x, cursor.y), "lines": lines} def open_terminal(command="bash", columns=80, lines=24): p_pid, master_fd = pty.fork() if p_pid == 0: # Child. argv = shlex.split(command) env = dict(TERM="linux", LC_ALL="de_DE.UTF-8", COLUMNS=str(columns), LINES=str(lines)) os.execvpe(argv[0], argv, env) # File-like object for I/O with the child process aka command. p_out = os.fdopen(master_fd, "w+b", 0) return Terminal(columns, lines, p_out), p_pid, p_out def convert_to_text(dump): lines = dump['lines'] strLines = '' for line in lines: strLines += str(line[0]) for c in line[1]: strLines += c[0] return strLines def HandleTerminal(): debug = False running = True #attr = termios.tcgetattr(sys.stdin.fileno()) try: tty.setraw(0) terminal, p_pid, p_out = open_terminal() #termios.tcdrain(p_pid) #termios.tcdrain(0) while running: r, w, x = select.select([sys.stdin, p_out],[],[],0) if r == []: continue if p_out in r: if debug: print('pre p_out') try: msgBytes = os.read(p_out.fileno(), 4096) #p_out.read(4096) except (EOFError, OSError): running = False #sys.exit(0) terminal.feed(msgBytes) os.write(sys.stdout.fileno(), msgBytes) #print(terminal.screen.display) if debug: print('after p_out') if sys.stdin in r: if debug: print('pre stdin') msgBytes = os.read(sys.stdin.fileno(), 4096) terminal.feed(msgBytes) p_out.write(msgBytes) if debug: print('after stdin') except Exception as e: # Process died? print(e) running = False finally: #termios.tcsetattr(0, termios.TCSADRAIN, attr) p_out.close() os.kill(p_pid, signal.SIGTERM) def get_terminal_size(fd): s = struct.pack('HHHH', 0, 0, 0, 0) rows, cols, _, _ = struct.unpack('HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s)) return rows, cols def resize_terminal(fd): s = struct.pack('HHHH', 0, 0, 0, 0) s = fcntl.ioctl(0, termios.TIOCGWINSZ, s) fcntl.ioctl(fd, termios.TIOCSWINSZ, s) rows, cols, _, _ = struct.unpack('hhhh', s) return rows, cols if __name__ == "__main__": HandleTerminal()