120 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| import os
 | |
| import struct
 | |
| import sys
 | |
| import pty
 | |
| import tty
 | |
| import termios
 | |
| import shlex
 | |
| import signal
 | |
| import select
 | |
| import pyte
 | |
| import time
 | |
| 
 | |
| class Terminal:
 | |
|     def __init__(self, columns, lines, p_in):
 | |
|         self.screen = pyte.HistoryScreen(columns, lines)
 | |
|         self.screen.set_mode(pyte.modes.LNM)
 | |
|         self.screen.write_process_input = \
 | |
|             lambda data: p_in.write(data.encode())
 | |
|         self.stream = pyte.ByteStream()
 | |
|         self.stream.attach(self.screen)
 | |
|     def feed(self, data):
 | |
|         self.stream.feed(data)
 | |
|     def dump(self):
 | |
|         cursor = self.screen.cursor
 | |
|         lines = []
 | |
|         for y in self.screen.dirty:
 | |
|             line = self.screen.buffer[y]
 | |
|             data = [(char.data, char.reverse, char.fg, char.bg)
 | |
|                     for char in (line[x] for x in range(self.screen.columns))]
 | |
|             lines.append((y, data))
 | |
|         self.screen.dirty.clear()
 | |
|         return {"c": (cursor.x, cursor.y), "lines": lines}
 | |
| 
 | |
| 
 | |
| def open_terminal(command="bash", columns=80, lines=24):
 | |
|     p_pid, master_fd = pty.fork()
 | |
|     if p_pid == 0:  # Child.
 | |
|         argv = shlex.split(command)
 | |
|         env = os.environ.copy()
 | |
|         env["TERM"] = 'vt100'
 | |
|         os.execvpe(argv[0], argv, env)
 | |
|     # File-like object for I/O with the child process aka command.
 | |
|     p_out = os.fdopen(master_fd, "w+b", 0)
 | |
|     return Terminal(columns, lines, p_out), p_pid, p_out
 | |
| 
 | |
| def HandleTerminal():
 | |
|     debug = False
 | |
|     running = True 
 | |
|     try:
 | |
|         old_attr = termios.tcgetattr(sys.stdin)    
 | |
|         tty.setraw(0)
 | |
|         terminal, p_pid, p_out = open_terminal()
 | |
|         std_out = os.fdopen(sys.stdout.fileno(), "w+b", 0)
 | |
|         while running:
 | |
|             r, w, x = select.select([sys.stdin, p_out],[],[])
 | |
|             if r == []:
 | |
|                 continue
 | |
|             if p_out in r:
 | |
|                 if debug:
 | |
|                     print('pre p_out')                                            
 | |
|                 try:
 | |
|                     msgBytes = read_all(p_out.fileno())
 | |
|                 except (EOFError, OSError):
 | |
|                     running = False
 | |
|                     break    
 | |
|                 terminal.feed(msgBytes)                                
 | |
|                 os.write(sys.stdout.fileno(), msgBytes)
 | |
|                 if debug:
 | |
|                     print('after p_out')                    
 | |
|             if sys.stdin in r:
 | |
|                 if debug:
 | |
|                     print('pre stdin')       
 | |
|                 try:
 | |
|                     msgBytes = read_all(sys.stdin.fileno())
 | |
|                 except (EOFError, OSError):
 | |
|                     running = False                    
 | |
|                     break
 | |
|                 terminal.feed(msgBytes)                                
 | |
|                 os.write(p_out.fileno(), msgBytes)
 | |
|                 if debug:
 | |
|                     print('after stdin')                
 | |
|     except Exception as e:  # Process died?
 | |
|         print(e)
 | |
|         running = False
 | |
|     finally:
 | |
|         os.kill(p_pid, signal.SIGTERM)
 | |
|         p_out.close()    
 | |
|         termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
 | |
|         sys.exit(0)
 | |
| 
 | |
| def get_terminal_size(fd):
 | |
|     s = struct.pack('HHHH', 0, 0, 0, 0)
 | |
|     rows, cols, _, _ = struct.unpack('HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s))
 | |
|     return rows, cols
 | |
| 
 | |
| def resize_terminal(fd):
 | |
|     s = struct.pack('HHHH', 0, 0, 0, 0)
 | |
|     s = fcntl.ioctl(0, termios.TIOCGWINSZ, s)
 | |
|     fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
 | |
|     rows, cols, _, _ = struct.unpack('hhhh', s)
 | |
|     return rows, cols
 | |
| 
 | |
| def read_all(fd):
 | |
|     bytes = os.read(fd, 65536)
 | |
|     if bytes == b'':
 | |
|         raise EOFError
 | |
|     while has_more(fd):
 | |
|         data = os.read(fd, 65536)
 | |
|         if data == b'':
 | |
|             raise EOFError
 | |
|         bytes += data
 | |
|     return bytes
 | |
| 
 | |
| def has_more(fd):
 | |
|     r, w, e = select.select([fd], [], [], 0)
 | |
|     return (fd in r)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     HandleTerminal()
 |