improve code

This commit is contained in:
Chrys 2019-08-13 09:41:16 +02:00
parent 52adc37434
commit 8ea6e37537

View File

@ -1,216 +1,215 @@
#!/bin/python
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributers.
import os, struct, sys, pty, tty, termios, shlex, signal, select, pyte, time, fcntl ,getpass
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import fenrirEventType
from fenrirscreenreader.core.screenDriver import screenDriver
from fenrirscreenreader.utils import screen_utils
#!/bin/python
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributers.
import os, struct, sys, pty, tty, termios, shlex, signal, select, pyte, time, fcntl ,getpass
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import fenrirEventType
from fenrirscreenreader.core.screenDriver import screenDriver
from fenrirscreenreader.utils import screen_utils
class fenrirScreen(pyte.HistoryScreen):
def set_margins(self, *args, **kwargs):
kwargs.pop("private", None)
return super(fenrirScreen, self).set_margins(*args, **kwargs)
class Terminal:
def __init__(self, columns, lines, p_in):
self.text = ''
self.attributes = None
self.screen = fenrirScreen(columns, lines)
self.screen.set_mode(pyte.modes.LNM)
self.screen.write_process_input = \
lambda data: p_in.write(data.encode())
self.stream = pyte.ByteStream()
self.stream.attach(self.screen)
def feed(self, data):
self.stream.feed(data)
def updateAttributes(self, initialize = False):
buffer = self.screen.buffer
lines = None
if not initialize:
lines = self.screen.dirty
else:
lines = range(self.screen.lines)
self.attributes = [[list(attribute[1:]) + [False, 'default', 'default'] for attribute in line.values()] for line in buffer.values()]
for y in lines:
try:
t = self.attributes[y]
except:
self.attributes.append([])
self.attributes[y] = [list(attribute[1:]) + [False, 'default', 'default'] for attribute in (buffer[y].values())]
if len(self.attributes[y]) < self.screen.columns:
diff = self.screen.columns - len(self.attributes[y])
self.attributes[y] += [['default', 'default', False, False, False, False, False, False, 'default', 'default']] * diff
def resize(self, lines, columns):
self.screen.resize(lines, columns)
self.setCursor()
self.updateAttributes(True)
def setCursor(self, x = -1, y = -1):
xPos = x
yPos = y
if xPos == -1:
xPos = self.screen.cursor.x
if yPos == -1:
yPos = self.screen.cursor.y
self.screen.cursor.x = min(self.screen.cursor.x, self.screen.columns - 1)
self.screen.cursor.y = min(self.screen.cursor.y, self.screen.lines - 1)
def GetScreenContent(self):
cursor = self.screen.cursor
self.text = '\n'.join(self.screen.display)
self.updateAttributes(self.attributes == None)
self.screen.dirty.clear()
return {"cursor": (cursor.x, cursor.y),
'lines': self.screen.lines,
'columns': self.screen.columns,
"text": self.text,
'attributes': self.attributes.copy(),
'screen': 'pty',
'screenUpdateTime': time.time(),
}.copy()
class driver(screenDriver):
def __init__(self):
screenDriver.__init__(self)
self.signalPipe = os.pipe()
self.p_out = None
signal.signal(signal.SIGWINCH, self.handleSigwinch)
def initialize(self, environment):
self.env = environment
self.command = self.env['runtime']['settingsManager'].getSetting('general','shell')
self.shortcutType = self.env['runtime']['inputManager'].getShortcutType()
self.env['runtime']['processManager'].addCustomEventThread(self.terminalEmulation)
def getCurrScreen(self):
self.env['screen']['oldTTY'] = 'pty'
self.env['screen']['newTTY'] = 'pty'
def injectTextToScreen(self, msgBytes, screen = None):
if not screen:
screen = self.p_out.fileno()
if isinstance(msgBytes, str):
msgBytes = bytes(msgBytes, 'UTF-8')
os.write(screen, msgBytes)
def getSessionInformation(self):
self.env['screen']['autoIgnoreScreens'] = []
self.env['general']['prevUser'] = getpass.getuser()
self.env['general']['currUser'] = getpass.getuser()
def readAll(self, fd, timeout = 9999999, interruptFd = None, len = 2048):
bytes = b''
fdList = []
fdList += [fd]
if interruptFd:
fdList += [interruptFd]
starttime = time.time()
while True:
# respect timeout but wait a little bit of time to see if something more is here
if (time.time() - starttime) >= timeout:
break
r = screen_utils.hasMoreWhat(fdList,0)
hasmore = fd in r
if not hasmore:
break
# exit on interrupt available
if interruptFd in r:
break
data = os.read(fd, len)
if data == b'':
raise EOFError
bytes += data
return bytes
def openTerminal(self, columns, lines, command):
p_pid, master_fd = pty.fork()
if p_pid == 0: # Child.
argv = shlex.split(command)
env = os.environ.copy()
#values are VT100,xterm-256color,linux
try:
eterm = env["TERM"]
if eterm == '':
env["TERM"] = 'linux'
except:
env["TERM"] = 'linux'
os.execvpe(argv[0], argv, env)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines, p_out), p_pid, p_out
def resizeTerminal(self,fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
s = fcntl.ioctl(0, termios.TIOCGWINSZ, s)
fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
lines, columns, _, _ = struct.unpack('hhhh', s)
return lines, columns
def getTerminalSize(self, fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
lines, columns, _, _ = struct.unpack('HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s))
return lines, columns
def handleSigwinch(self, *args):
os.write(self.signalPipe[1], b'w')
def terminalEmulation(self,active , eventQueue):
try:
old_attr = termios.tcgetattr(sys.stdin)
tty.setraw(0)
lines, columns = self.getTerminalSize(0)
if self.command == '':
self.command = screen_utils.getShell()
terminal, p_pid, self.p_out = self.openTerminal(columns, lines, self.command)
lines, columns = self.resizeTerminal(self.p_out)
terminal.resize(lines, columns)
fdList = [sys.stdin, self.p_out, self.signalPipe[0]]
while active.value:
r, _, _ = select.select(fdList, [], [], 1)
# none
if r == []:
continue
# signals
if self.signalPipe[0] in r:
os.read(self.signalPipe[0], 1)
lines, columns = self.resizeTerminal(self.p_out)
terminal.resize(lines, columns)
# input
if sys.stdin in r:
try:
msgBytes = self.readAll(sys.stdin.fileno())
except (EOFError, OSError):
active.value = False
break
if self.shortcutType == 'KEY':
try:
self.injectTextToScreen(msgBytes)
except:
active.value = False
break
else:
eventQueue.put({"Type":fenrirEventType.ByteInput,
"Data":msgBytes })
# output
if self.p_out in r:
try:
msgBytes = self.readAll(self.p_out.fileno(), timeout=0.001, interruptFd=sys.stdin)
except (EOFError, OSError):
active.value = False
break
terminal.feed(msgBytes)
os.write(sys.stdout.fileno(), msgBytes)
eventQueue.put({"Type":fenrirEventType.ScreenUpdate,
"Data":screen_utils.createScreenEventData(terminal.GetScreenContent())
})
except Exception as e: # Process died?
print(e)
active.value = False
finally:
os.kill(p_pid, signal.SIGTERM)
self.p_out.close()
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
eventQueue.put({"Type":fenrirEventType.StopMainLoop,"Data":None})
sys.exit(0)
def getCurrApplication(self):
pass
return super(fenrirScreen, self).set_margins(*args, **kwargs)
class Terminal:
def __init__(self, columns, lines, p_in):
self.text = ''
self.attributes = None
self.screen = fenrirScreen(columns, lines)
self.screen.set_mode(pyte.modes.LNM)
self.screen.write_process_input = \
lambda data: p_in.write(data.encode())
self.stream = pyte.ByteStream()
self.stream.attach(self.screen)
def feed(self, data):
self.stream.feed(data)
def updateAttributes(self, initialize = False):
buffer = self.screen.buffer
lines = None
if not initialize:
lines = self.screen.dirty
else:
lines = range(self.screen.lines)
self.attributes = [[list(attribute[1:]) + [False, 'default', 'default'] for attribute in line.values()] for line in buffer.values()]
for y in lines:
try:
t = self.attributes[y]
except:
self.attributes.append([])
self.attributes[y] = [list(attribute[1:]) + [False, 'default', 'default'] for attribute in (buffer[y].values())]
if len(self.attributes[y]) < self.screen.columns:
diff = self.screen.columns - len(self.attributes[y])
self.attributes[y] += [['default', 'default', False, False, False, False, False, False, 'default', 'default']] * diff
def resize(self, lines, columns):
self.screen.resize(lines, columns)
self.setCursor()
self.updateAttributes(True)
def setCursor(self, x = -1, y = -1):
xPos = x
yPos = y
if xPos == -1:
xPos = self.screen.cursor.x
if yPos == -1:
yPos = self.screen.cursor.y
self.screen.cursor.x = min(self.screen.cursor.x, self.screen.columns - 1)
self.screen.cursor.y = min(self.screen.cursor.y, self.screen.lines - 1)
def GetScreenContent(self):
cursor = self.screen.cursor
self.text = '\n'.join(self.screen.display)
self.updateAttributes(self.attributes == None)
self.screen.dirty.clear()
return {"cursor": (cursor.x, cursor.y),
'lines': self.screen.lines,
'columns': self.screen.columns,
"text": self.text,
'attributes': self.attributes.copy(),
'screen': 'pty',
'screenUpdateTime': time.time(),
}.copy()
class driver(screenDriver):
def __init__(self):
screenDriver.__init__(self)
self.signalPipe = os.pipe()
self.p_out = None
signal.signal(signal.SIGWINCH, self.handleSigwinch)
def initialize(self, environment):
self.env = environment
self.command = self.env['runtime']['settingsManager'].getSetting('general','shell')
self.shortcutType = self.env['runtime']['inputManager'].getShortcutType()
self.env['runtime']['processManager'].addCustomEventThread(self.terminalEmulation)
def getCurrScreen(self):
self.env['screen']['oldTTY'] = 'pty'
self.env['screen']['newTTY'] = 'pty'
def injectTextToScreen(self, msgBytes, screen = None):
if not screen:
screen = self.p_out.fileno()
if isinstance(msgBytes, str):
msgBytes = bytes(msgBytes, 'UTF-8')
os.write(screen, msgBytes)
def getSessionInformation(self):
self.env['screen']['autoIgnoreScreens'] = []
self.env['general']['prevUser'] = getpass.getuser()
self.env['general']['currUser'] = getpass.getuser()
def readAll(self, fd, timeout = 9999999, interruptFd = None, len = 2048):
bytes = b''
fdList = []
fdList += [fd]
if interruptFd:
fdList += [interruptFd]
starttime = time.time()
while True:
# respect timeout but wait a little bit of time to see if something more is here
if (time.time() - starttime) >= timeout:
break
r = screen_utils.hasMoreWhat(fdList,0)
hasmore = fd in r
if not hasmore:
break
# exit on interrupt available
if interruptFd in r:
break
data = os.read(fd, len)
if data == b'':
raise EOFError
bytes += data
return bytes
def openTerminal(self, columns, lines, command):
p_pid, master_fd = pty.fork()
if p_pid == 0: # Child.
argv = shlex.split(command)
env = os.environ.copy()
#values are VT100,xterm-256color,linux
try:
if env["TERM"] == '':
env["TERM"] = 'linux'
except:
env["TERM"] = 'linux'
os.execvpe(argv[0], argv, env)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines, p_out), p_pid, p_out
def resizeTerminal(self,fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
s = fcntl.ioctl(0, termios.TIOCGWINSZ, s)
fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
lines, columns, _, _ = struct.unpack('hhhh', s)
return lines, columns
def getTerminalSize(self, fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
lines, columns, _, _ = struct.unpack('HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s))
return lines, columns
def handleSigwinch(self, *args):
os.write(self.signalPipe[1], b'w')
def terminalEmulation(self,active , eventQueue):
try:
old_attr = termios.tcgetattr(sys.stdin)
tty.setraw(0)
lines, columns = self.getTerminalSize(0)
if self.command == '':
self.command = screen_utils.getShell()
terminal, p_pid, self.p_out = self.openTerminal(columns, lines, self.command)
lines, columns = self.resizeTerminal(self.p_out)
terminal.resize(lines, columns)
fdList = [sys.stdin, self.p_out, self.signalPipe[0]]
while active.value:
r, _, _ = select.select(fdList, [], [], 1)
# none
if r == []:
continue
# signals
if self.signalPipe[0] in r:
os.read(self.signalPipe[0], 1)
lines, columns = self.resizeTerminal(self.p_out)
terminal.resize(lines, columns)
# input
if sys.stdin in r:
try:
msgBytes = self.readAll(sys.stdin.fileno())
except (EOFError, OSError):
active.value = False
break
if self.shortcutType == 'KEY':
try:
self.injectTextToScreen(msgBytes)
except:
active.value = False
break
else:
eventQueue.put({"Type":fenrirEventType.ByteInput,
"Data":msgBytes })
# output
if self.p_out in r:
try:
msgBytes = self.readAll(self.p_out.fileno(), timeout=0.001, interruptFd=sys.stdin)
except (EOFError, OSError):
active.value = False
break
terminal.feed(msgBytes)
os.write(sys.stdout.fileno(), msgBytes)
eventQueue.put({"Type":fenrirEventType.ScreenUpdate,
"Data":screen_utils.createScreenEventData(terminal.GetScreenContent())
})
except Exception as e: # Process died?
print(e)
active.value = False
finally:
os.kill(p_pid, signal.SIGTERM)
self.p_out.close()
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
eventQueue.put({"Type":fenrirEventType.StopMainLoop,"Data":None})
sys.exit(0)
def getCurrApplication(self):
pass