make initial PTY running WIP

This commit is contained in:
chrys 2018-03-23 23:09:24 +01:00
parent c884033928
commit 7f6c0ce592

View File

@ -3,27 +3,33 @@
# Fenrir TTY screen reader # Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributers. # By Chrys, Storm Dragon, and contributers.
#attrib:
#http://rampex.ihep.su/Linux/linux_howto/html/tutorials/mini/Colour-ls-6.html
#0 = black, 1 = blue, 2 = green, 3 = cyan, 4 = red, 5 = purple, 6 = brown/yellow, 7 = white.
#https://github.com/jwilk/vcsapeek/blob/master/linuxvt.py
#blink = 5 if attr & 1 else 0
#bold = 1 if attr & 16 else 0
import subprocess import os, struct, sys, pty, tty, termios, shlex, signal, select, pyte, time
import glob, os
import termios
import time
import select
import dbus
import fcntl
from array import array
from fcntl import ioctl
from struct import unpack_from, unpack, pack
from fenrirscreenreader.core import debug from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import fenrirEventType from fenrirscreenreader.core.eventData import fenrirEventType
from fenrirscreenreader.core.screenDriver import screenDriver from fenrirscreenreader.core.screenDriver import screenDriver
class Terminal:
def __init__(self, columns, lines, p_in):
self.screen = pyte.HistoryScreen(columns, lines)
self.screen.set_mode(pyte.modes.LNM)
self.screen.write_process_input = \
lambda data: p_in.write(data.encode())
self.stream = pyte.ByteStream()
self.stream.attach(self.screen)
def feed(self, data):
self.stream.feed(data)
def dump(self):
cursor = self.screen.cursor
lines = []
for y in self.screen.dirty:
line = self.screen.buffer[y]
data = [(char.data, char.reverse, char.fg, char.bg)
for char in (line[x] for x in range(self.screen.columns))]
lines.append((data))
self.screen.dirty.clear()
return {"cursor": (cursor.x, cursor.y), "lines": lines}
class driver(screenDriver): class driver(screenDriver):
def __init__(self): def __init__(self):
screenDriver.__init__(self) screenDriver.__init__(self)
@ -34,9 +40,9 @@ class driver(screenDriver):
self.hichar = None self.hichar = None
def initialize(self, environment): def initialize(self, environment):
self.env = environment self.env = environment
self.env['runtime']['processManager'].addCustomEventThread(self.updateWatchdog) self.env['runtime']['processManager'].addCustomEventThread(self.terminalEmulation)
def getCurrScreen(self): def getCurrScreen(self):
self.env['screen']['oldTTY'] = self.env['screen']['newTTY'] self.env['screen']['oldTTY'] = str(1)
self.env['screen']['newTTY'] = str(1) self.env['screen']['newTTY'] = str(1)
def injectTextToScreen(self, text, screen = None): def injectTextToScreen(self, text, screen = None):
@ -46,28 +52,110 @@ class driver(screenDriver):
self.env['screen']['autoIgnoreScreens'] = [] self.env['screen']['autoIgnoreScreens'] = []
self.env['general']['prevUser'] = 'chrys' self.env['general']['prevUser'] = 'chrys'
self.env['general']['currUser'] = 'chrys' self.env['general']['currUser'] = 'chrys'
def read_all(self,fd):
bytes = os.read(fd, 65536)
if bytes == b'':
raise EOFError
while self.has_more(fd):
data = os.read(fd, 65536)
if data == b'':
raise EOFError
bytes += data
return bytes
def has_more(self,fd):
r, w, e = select.select([fd], [], [], 0)
return (fd in r)
def open_terminal(self,command="bash", columns=80, lines=24):
p_pid, master_fd = pty.fork()
if p_pid == 0: # Child.
argv = shlex.split(command)
env = os.environ.copy()
env["TERM"] = 'vt100'
os.execvpe(argv[0], argv, env)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines, p_out), p_pid, p_out
def terminalEmulation(self,active , eventQueue):
debug = False
running = True
try:
old_attr = termios.tcgetattr(sys.stdin)
tty.setraw(0)
terminal, p_pid, p_out = self.open_terminal()
std_out = os.fdopen(sys.stdout.fileno(), "w+b", 0)
while active:
r, w, x = select.select([sys.stdin, p_out],[],[],1)
# none
if r == []:
continue
# output
if p_out in r:
if debug:
print('pre p_out')
try:
msgBytes = self.read_all(p_out.fileno())
except (EOFError, OSError):
running = False
break
if debug:
print('after p_out read')
terminal.feed(msgBytes)
os.write(sys.stdout.fileno(), msgBytes)
if debug:
print('after p_out write')
eventQueue.put({"Type":fenrirEventType.ScreenUpdate,
"Data":self.createScreenEventData(terminal.dump())
})
#print(terminal.dump())
#self.createScreenEventData(terminal.dump())
if debug:
print('after p_out')
# input
if sys.stdin in r:
if debug:
print('pre stdin')
try:
msgBytes = self.read_all(sys.stdin.fileno())
except (EOFError, OSError):
running = False
break
terminal.feed(msgBytes)
os.write(p_out.fileno(), msgBytes)
if debug:
print('after stdin')
except Exception as e: # Process died?
print(e)
running = False
finally:
os.kill(p_pid, signal.SIGTERM)
p_out.close()
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
eventQueue.put({"Type":fenrirEventType.StopMainLoop,"Data":None})
sys.exit(0)
def updateWatchdog(self,active , eventQueue): def createScreenEventData(self, content):
pass
def createScreenEventData(self, screen, content, attribute):
self.updateCharMap(screen)
eventData = { eventData = {
'bytes': content, 'bytes': content,
'lines': int( content[0]), 'lines': int( 24),
'columns': int( content[1]), 'columns': int( 80),
'textCursor': 'textCursor':
{ {
'x': int( content[2]), 'x': int( content['cursor'][0]),
'y': int( content[3]) 'y': int( content['cursor'][1])
}, },
'screen': screen, 'screen': 1,
'text': '',
'attributes': None,
'screenUpdateTime': time.time(), 'screenUpdateTime': time.time(),
} }
#encText, encAttr =\ #encText, encAttr =\
# self.autoDecodeVCSA(content[4:], eventData['lines'], eventData['columns']) # self.autoDecodeVCSA(content[4:], eventData['lines'], eventData['columns'])
eventData['text'] = content #print(content['lines'][0])
eventData['attributes'] = attribute for line in content['lines']:
for e in line:
eventData['text'] += ''.join(e[0])
#print(eventData['text'])
#eventData['text'] = ''
return eventData.copy() return eventData.copy()
def getFenrirBGColor(self, attribute): def getFenrirBGColor(self, attribute):