fix validity check in genericspeech driver

This commit is contained in:
chrys 2018-03-12 19:48:24 +01:00
parent d76a613cfd
commit 927da494f8
2 changed files with 413 additions and 9 deletions

View File

@ -0,0 +1,379 @@
#!/bin/python
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributers.
#attrib:
#http://rampex.ihep.su/Linux/linux_howto/html/tutorials/mini/Colour-ls-6.html
#0 = black, 1 = blue, 2 = green, 3 = cyan, 4 = red, 5 = purple, 6 = brown/yellow, 7 = white.
#https://github.com/jwilk/vcsapeek/blob/master/linuxvt.py
#blink = 5 if attr & 1 else 0
#bold = 1 if attr & 16 else 0
import difflib
import re
import subprocess
import glob, os
import termios
import time
import select
import dbus
import fcntl
from array import array
import errno
import sys
from utils import screen_utils
from fcntl import ioctl
from struct import unpack_from, unpack, pack
from core import debug
from core.eventData import fenrirEventType
from core.screenDriver import screenDriver
class driver(screenDriver):
def __init__(self):
screenDriver.__init__(self)
self.vcsaDevicePath = '/dev/vcsa'
self.ListSessions = None
self.charmap = {}
self.bgColorNames = {0: _('black'), 1: _('blue'), 2: _('green'), 3: _('cyan'), 4: _('red'), 5: _('Magenta'), 6: _('brown/yellow'), 7: _('white')}
self.fgColorNames = {0: _('Black'), 1: _('Blue'), 2: _('Green'), 3: _('Cyan'), 4: _('Red'), 5: _('Magenta'), 6: _('brown/yellow'), 7: _('Light gray'), 8: _('Dark gray'), 9: _('Light blue'), 10: ('Light green'), 11: _('Light cyan'), 12: _('Light red'), 13: _('Light magenta'), 14: _('Light yellow'), 15: _('White')}
self.hichar = None
def initialize(self, environment):
self.env = environment
self.env['runtime']['processManager'].addCustomEventThread(self.updateWatchdog)
def getCurrScreen(self):
self.env['screen']['oldTTY'] = self.env['screen']['newTTY']
try:
currScreenFile = open('/sys/devices/virtual/tty/tty0/active','r')
self.env['screen']['newTTY'] = str(currScreenFile.read()[3:-1])
currScreenFile.close()
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(str(e),debug.debugLevel.ERROR)
def injectTextToScreen(self, text, screen = None):
useScreen = "/dev/tty" + self.env['screen']['newTTY']
if screen != None:
useScreen = screen
with open(useScreen, 'w') as fd:
for c in text:
fcntl.ioctl(fd, termios.TIOCSTI, c)
def getCurrApplication(self):
apps = []
try:
currScreen = self.env['screen']['newTTY']
apps = subprocess.Popen('ps -t tty' + currScreen + ' -o comm,tty,stat', shell=True, stdout=subprocess.PIPE).stdout.read().decode()[:-1].split('\n')
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(str(e),debug.debugLevel.ERROR)
return
try:
for i in apps:
i = i.upper()
i = i.split()
i[0] = i[0]
i[1] = i[1]
if '+' in i[2]:
if i[0] != '':
if not "GREP" == i[0] and \
not "SH" == i[0] and \
not "PS" == i[0]:
if "TTY"+currScreen in i[1]:
if self.env['screen']['newApplication'] != i[0]:
self.env['screen']['newApplication'] = i[0]
return
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(str(e),debug.debugLevel.ERROR)
def getSessionInformation(self):
try:
bus = dbus.SystemBus()
if not self.ListSessions:
obj = bus.get_object('org.freedesktop.login1', '/org/freedesktop/login1')
inf = dbus.Interface(obj, 'org.freedesktop.login1.Manager')
self.ListSessions = inf.get_dbus_method('ListSessions')
sessions = self.ListSessions()
self.env['screen']['autoIgnoreScreens'] = []
for session in sessions:
obj = bus.get_object('org.freedesktop.login1', session[4])
inf = dbus.Interface(obj, 'org.freedesktop.DBus.Properties')
sessionType = inf.Get('org.freedesktop.login1.Session', 'Type')
screen = str(inf.Get('org.freedesktop.login1.Session', 'VTNr'))
if screen == '':
screen = str(inf.Get('org.freedesktop.login1.Session', 'TTY'))
screen = screen[screen.upper().find('TTY') + 3:]
if screen == '':
self.env['runtime']['debug'].writeDebugOut('No TTY found for session:' + session[4],debug.debugLevel.ERROR)
return
if sessionType.upper() == 'X11':
self.env['screen']['autoIgnoreScreens'].append(screen)
if screen == self.env['screen']['newTTY'] :
if self.env['general']['currUser'] != session[2]:
self.env['general']['prevUser'] = self.env['general']['currUser']
self.env['general']['currUser'] = session[2]
except Exception as e:
self.env['runtime']['debug'].writeDebugOut('getSessionInformation: Maybe no LoginD:' + str(e),debug.debugLevel.ERROR)
self.env['screen']['autoIgnoreScreens'] = []
#self.env['runtime']['debug'].writeDebugOut('getSessionInformation:' + str(self.env['screen']['autoIgnoreScreens']) + ' ' + str(self.env['general']) ,debug.debugLevel.INFO)
def updateWatchdog(self,active , eventQueue):
try:
vcsa = {}
vcsaDevices = glob.glob('/dev/vcsa*')
for vcsaDev in vcsaDevices:
index = vcsaDev[9:]
vcsa[str(index)] = open(vcsaDev,'rb')
tty = open('/sys/devices/virtual/tty/tty0/active','r')
currScreen = str(tty.read()[3:-1])
oldScreen = currScreen
watchdog = select.epoll()
watchdog.register(vcsa[currScreen], select.POLLPRI | select.POLLERR)
watchdog.register(tty, select.POLLPRI | select.POLLERR)
while active.value == 1:
changes = watchdog.poll(2)
for change in changes:
fileno = change[0]
event = change[1]
if fileno == tty.fileno():
self.env['runtime']['debug'].writeDebugOut('ScreenChange',debug.debugLevel.INFO)
tty.seek(0)
currScreen = str(tty.read()[3:-1])
if currScreen != oldScreen:
try:
watchdog.unregister(vcsa[ oldScreen ])
except:
pass
try:
watchdog.register(vcsa[ currScreen ], select.POLLPRI | select.POLLERR)
except:
pass
oldScreen = currScreen
eventQueue.put({"Type":fenrirEventType.ScreenChanged,"Data":''})
try:
vcsa[currScreen].seek(0)
lastScreenContent = vcsa[currScreen].read()
except:
pass
else:
self.env['runtime']['debug'].writeDebugOut('ScreenUpdate',debug.debugLevel.INFO)
vcsa[currScreen].seek(0)
dirtyContent = vcsa[currScreen].read()
screenContent = b''
timeout = time.time()
while screenContent != dirtyContent:
screenContent = dirtyContent
if time.time() - timeout >= 0.4:
break
time.sleep(0.008)
vcsa[currScreen].seek(0)
dirtyContent = vcsa[currScreen].read()
eventQueue.put({"Type":fenrirEventType.ScreenUpdate,"Data":None})
except Exception as e:
self.env['runtime']['debug'].writeDebugOut('VCSA:updateWatchdog:' + str(e),debug.debugLevel.ERROR)
def updateCharMap(self, screen):
self.charmap = {}
try:
tty = open('/dev/tty' + screen, 'rb')
except Exception as e:
self.env['runtime']['debug'].writeDebugOut('VCSA:updateCharMap:' + str(e),debug.debugLevel.ERROR)
return
GIO_UNIMAP = 0x4B66
VT_GETHIFONTMASK = 0x560D
himask = array("H", (0,))
ioctl(tty, VT_GETHIFONTMASK, himask)
self.hichar, = unpack_from("@H", himask)
sz = 512
line = ''
while True:
try:
unipairs = array("H", [0]*(2*sz))
unimapdesc = array("B", pack("@HP", sz, unipairs.buffer_info()[0]))
ioctl(tty.fileno(), GIO_UNIMAP, unimapdesc)
break
except Exception as e:
self.env['runtime']['debug'].writeDebugOut('VCSA:updateCharMap:scaling up sz=' + str(sz) + ' ' + str(e),debug.debugLevel.WARNING)
sz *= 2
tty.close()
ncodes, = unpack_from("@H", unimapdesc)
utable = unpack_from("@%dH" % (2*ncodes), unipairs)
for u, b in zip(utable[::2], utable[1::2]):
if self.charmap.get(b) is None:
self.charmap[b] = chr(u)
def autoDecodeVCSA(self, allData, rows, cols):
allText = ''
allAttrib = []
i = 0
for y in range(rows):
lineText = ''
lineAttrib = []
for x in range(cols):
data = allData[i: i + 2]
i += 2
if data == b' \x07':
#attr = 7
#ink = 7
#paper = 0
#ch = ' '
lineAttrib.append((7,15,0,0,0,0)) # attribute, ink, paper, blink, bold, underline
lineText += ' '
continue
(sh,) = unpack("=H", data)
attr = (sh >> 8) & 0xFF
ch = sh & 0xFF
if self.hichar == 0x100:
attr >>= 1
ink = attr & 0x0F
paper = (attr>>4) & 0x0F
blink = 0
#if attr & 1:
# blink = 1
bold = 0
#if attr & 16:
# bold = 1
#if (ink != 7) or (paper != 0):
# print(ink,paper)
if sh & self.hichar:
ch |= 0x100
try:
lineText += self.charmap[ch]
except KeyError:
lineText += '?'
lineAttrib.append((attr,ink, paper,blink,bold,0)) # attribute, ink, paper, blink, bold, underline
allText += lineText + '\n'
allAttrib += lineAttrib
return str(allText), allAttrib
def getFenrirBGColor(self, attribute):
try:
return self.bgColorNames[attribute[2]]
except Exception as e:
print(e)
return ''
def getFenrirFGColor(self, attribute):
try:
return self.fgColorNames[attribute[1]]
except Exception as e:
print(e)
return ''
def getFenrirUnderline(self, attribute):
if attribute[5] == 1:
return _('underlined')
return ''
def getFenrirBold(self, attribute):
if attribute[4] == 1:
return _('bold')
return ''
def getFenrirBlink(self, attribute):
if attribute[3] == 1:
return _('blink')
return ''
def getFenrirFont(self, attribute):
return _('Default')
def getFenrirFontSize(self, attribute):
return _('Default')
def update(self, trigger='onUpdate'):
if trigger == 'onInput': # no need for an update on input for VCSA
return
newContentBytes = b''
try:
# read screen
vcsa = open(self.vcsaDevicePath + self.env['screen']['newTTY'],'rb',0)
newContentBytes = vcsa.read()
vcsa.close()
if len(newContentBytes) < 5:
return
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(str(e),debug.debugLevel.ERROR)
return
# set new "old" values
self.env['screen']['oldContentBytes'] = self.env['screen']['newContentBytes']
self.env['screen']['oldContentText'] = self.env['screen']['newContentText']
self.env['screen']['oldContentAttrib'] = self.env['screen']['newContentAttrib']
self.env['screen']['oldCursor'] = self.env['screen']['newCursor'].copy()
if self.env['screen']['newCursorAttrib']:
self.env['screen']['oldCursorAttrib'] = self.env['screen']['newCursorAttrib'].copy()
self.env['screen']['oldDelta'] = self.env['screen']['newDelta']
self.env['screen']['oldAttribDelta'] = self.env['screen']['newAttribDelta']
self.env['screen']['oldNegativeDelta'] = self.env['screen']['newNegativeDelta']
self.env['screen']['newContentBytes'] = newContentBytes
# get metadata like cursor or screensize
self.env['screen']['lines'] = int( self.env['screen']['newContentBytes'][0])
self.env['screen']['columns'] = int( self.env['screen']['newContentBytes'][1])
self.env['screen']['newCursor']['x'] = int( self.env['screen']['newContentBytes'][2])
self.env['screen']['newCursor']['y'] = int( self.env['screen']['newContentBytes'][3])
# analyze content
self.updateCharMap(str(self.env['screen']['newTTY']))
self.env['screen']['newContentText'], \
self.env['screen']['newContentAttrib'] =\
self.autoDecodeVCSA(self.env['screen']['newContentBytes'][4:], self.env['screen']['lines'], self.env['screen']['columns'])
if self.env['screen']['newTTY'] != self.env['screen']['oldTTY']:
self.env['screen']['oldContentBytes'] = b''
self.env['screen']['oldContentAttrib'] = None
self.env['screen']['oldContentText'] = ''
self.env['screen']['oldCursor']['x'] = 0
self.env['screen']['oldCursor']['y'] = 0
self.env['screen']['oldDelta'] = ''
self.env['screen']['oldAttribDelta'] = ''
self.env['screen']['oldCursorAttrib'] = None
self.env['screen']['newCursorAttrib'] = None
self.env['screen']['oldNegativeDelta'] = ''
# initialize current deltas
self.env['screen']['newNegativeDelta'] = ''
self.env['screen']['newDelta'] = ''
self.env['screen']['newAttribDelta'] = ''
# changes on the screen
oldScreenText = re.sub(' +',' ',self.env['runtime']['screenManager'].getWindowAreaInText(self.env['screen']['oldContentText']))
newScreenText = re.sub(' +',' ',self.env['runtime']['screenManager'].getWindowAreaInText(self.env['screen']['newContentText']))
typing = False
diffList = []
if (self.env['screen']['oldContentText'] != self.env['screen']['newContentText']):
if self.env['screen']['newContentText'] != '' and self.env['screen']['oldContentText'] == '':
if oldScreenText == '' and\
newScreenText != '':
self.env['screen']['newDelta'] = newScreenText
else:
cursorLineStart = self.env['screen']['newCursor']['y'] * self.env['screen']['columns'] + self.env['screen']['newCursor']['y']
cursorLineEnd = cursorLineStart + self.env['screen']['columns']
if abs(self.env['screen']['oldCursor']['x'] - self.env['screen']['newCursor']['x']) >= 1 and \
self.env['screen']['oldCursor']['y'] == self.env['screen']['newCursor']['y'] and \
self.env['screen']['newContentText'][:cursorLineStart] == self.env['screen']['oldContentText'][:cursorLineStart] and \
self.env['screen']['newContentText'][cursorLineEnd:] == self.env['screen']['oldContentText'][cursorLineEnd:]:
cursorLineStartOffset = cursorLineStart
cursorLineEndOffset = cursorLineEnd
#if cursorLineStart < cursorLineStart + self.env['screen']['newCursor']['x'] - 4:
# cursorLineStartOffset = cursorLineStart + self.env['screen']['newCursor']['x'] - 4
if cursorLineEnd > cursorLineStart + self.env['screen']['newCursor']['x'] + 3:
cursorLineEndOffset = cursorLineStart + self.env['screen']['newCursor']['x'] + 3
oldScreenText = self.env['screen']['oldContentText'][cursorLineStartOffset:cursorLineEndOffset]
# oldScreenText = re.sub(' +',' ',oldScreenText)
newScreenText = self.env['screen']['newContentText'][cursorLineStartOffset:cursorLineEndOffset]
#newScreenText = re.sub(' +',' ',newScreenText)
diff = difflib.ndiff(oldScreenText, newScreenText)
diffList = list(diff)
tempNewDelta = ''.join(x[2:] for x in diffList if x[0] == '+')
if tempNewDelta.strip() != '':
if tempNewDelta != ''.join(newScreenText[self.env['screen']['oldCursor']['x']:self.env['screen']['newCursor']['x']].rstrip()):
diffList = ['+ ' + self.env['screen']['newContentText'].split('\n')[self.env['screen']['newCursor']['y']]]
typing = True
else:
diff = difflib.ndiff( oldScreenText.split('\n'),\
newScreenText.split('\n'))
diffList = list(diff)
if self.env['runtime']['settingsManager'].getSetting('general', 'newLinePause') and not typing:
self.env['screen']['newDelta'] = '\n'.join(x[2:] for x in diffList if x[0] == '+')
else:
self.env['screen']['newDelta'] = ''.join(x[2:] for x in diffList if x[0] == '+')
self.env['screen']['newNegativeDelta'] = ''.join(x[2:] for x in diffList if x[0] == '-')
# track highlighted
if self.env['screen']['oldContentAttrib'] != self.env['screen']['newContentAttrib']:
if self.env['runtime']['settingsManager'].getSettingAsBool('focus', 'highlight'):
self.env['screen']['newAttribDelta'], self.env['screen']['newCursorAttrib'] = screen_utils.trackHighlights(self.env['screen']['oldContentAttrib'], self.env['screen']['newContentAttrib'], self.env['screen']['newContentText'], self.env['screen']['columns'])

View File

@ -131,15 +131,40 @@ class driver(speechDriver):
return
elif not isinstance(utterance, dict):
continue
for key in ['volume','module','language','voice','pitch','rate','text']:
if not key in utterance:
utterance[key] = ''
if not isinstance(utterance[key],str):
utterance[key] = ''
if key == 'text':
if utterance[key] == '':
continue
# no text means nothing to speak
if not 'text' in utterance:
continue
if not isinstance(utterance['text'],str):
continue
if utterance['text'] == '':
continue
# check for valid data fields
if not isinstance(utterance['volume'],str):
utterance['volume'] = ''
if not 'volume' in utterance:
utterance['volume'] = ''
if not isinstance(utterance['volume'],str):
utterance['volume'] = ''
if not 'module' in utterance:
utterance['module'] = ''
if not isinstance(utterance['module'],str):
utterance['module'] = ''
if not 'language' in utterance:
utterance['language'] = ''
if not isinstance(utterance['language'],str):
utterance['language'] = ''
if not 'voice' in utterance:
utterance['voice'] = ''
if not isinstance(utterance['voice'],str):
utterance['voice'] = ''
if not 'pitch' in utterance:
utterance['pitch'] = ''
if not isinstance(utterance['pitch'],str):
utterance['pitch'] = ''
if not 'rate' in utterance:
utterance['rate'] = ''
if not isinstance(utterance['rate'],str):
utterance['rate'] = ''
popenSpeechCommand = shlex.split(self.speechCommand)
for idx, word in enumerate(popenSpeechCommand):