Most of the pep8 changes finished. Be careful, things may be horribly broken.

This commit is contained in:
Storm Dragon
2025-07-03 13:22:00 -04:00
parent 7408951152
commit 21bb9c6083
344 changed files with 6374 additions and 6083 deletions

View File

@ -5,7 +5,7 @@
# By Chrys, Storm Dragon, and contributers.
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.screenDriver import screenDriver
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver
class driver(screenDriver):

View File

@ -18,22 +18,22 @@ import fcntl
import getpass
from select import select
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import fenrirEventType
from fenrirscreenreader.core.screenDriver import screenDriver
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver
from fenrirscreenreader.utils import screen_utils
class fenrirScreen(pyte.Screen):
class FenrirScreen(pyte.Screen):
def set_margins(self, *args, **kwargs):
kwargs.pop("private", None)
super(fenrirScreen, self).set_margins(*args, **kwargs)
super(FenrirScreen, self).set_margins(*args, **kwargs)
class Terminal:
def __init__(self, columns, lines, p_in):
self.text = ''
self.attributes = None
self.screen = fenrirScreen(columns, lines)
self.screen = FenrirScreen(columns, lines)
self.screen.write_process_input = \
lambda data: p_in.write(data.encode())
self.stream = pyte.ByteStream()
@ -42,7 +42,7 @@ class Terminal:
def feed(self, data):
self.stream.feed(data)
def updateAttributes(self, initialize=False):
def update_attributes(self, initialize=False):
buffer = self.screen.buffer
lines = None
if not initialize:
@ -58,7 +58,7 @@ class Terminal:
# Terminal class doesn't have access to env, use fallback
# logging
print(
f'ptyDriver Terminal updateAttributes: Error accessing attributes: {e}')
f'ptyDriver Terminal update_attributes: Error accessing attributes: {e}')
self.attributes.append([])
self.attributes[y] = [list(
@ -78,25 +78,25 @@ class Terminal:
def resize(self, lines, columns):
self.screen.resize(lines, columns)
self.setCursor()
self.updateAttributes(True)
self.set_cursor()
self.update_attributes(True)
def setCursor(self, x=-1, y=-1):
xPos = x
yPos = y
if xPos == -1:
xPos = self.screen.cursor.x
if yPos == -1:
yPos = self.screen.cursor.y
def set_cursor(self, x=-1, y=-1):
x_pos = x
y_pos = y
if x_pos == -1:
x_pos = self.screen.cursor.x
if y_pos == -1:
y_pos = self.screen.cursor.y
self.screen.cursor.x = min(
self.screen.cursor.x,
self.screen.columns - 1)
self.screen.cursor.y = min(self.screen.cursor.y, self.screen.lines - 1)
def GetScreenContent(self):
def get_screen_content(self):
cursor = self.screen.cursor
self.text = '\n'.join(self.screen.display)
self.updateAttributes(self.attributes is None)
self.update_attributes(self.attributes is None)
self.screen.dirty.clear()
return {"cursor": (cursor.x, cursor.y),
'lines': self.screen.lines,
@ -115,49 +115,49 @@ class driver(screenDriver):
self.p_out = None
self.terminal = None
self.p_pid = -1
signal.signal(signal.SIGWINCH, self.handleSigwinch)
signal.signal(signal.SIGWINCH, self.handle_sigwinch)
def initialize(self, environment):
self.env = environment
self.command = self.env['runtime']['settingsManager'].getSetting(
self.command = self.env['runtime']['SettingsManager'].get_setting(
'general', 'shell')
self.shortcutType = self.env['runtime']['inputManager'].getShortcutType(
self.shortcutType = self.env['runtime']['InputManager'].get_shortcut_type(
)
self.env['runtime']['processManager'].addCustomEventThread(
self.terminalEmulation)
self.env['runtime']['ProcessManager'].add_custom_event_thread(
self.terminal_emulation)
def getCurrScreen(self):
def get_curr_screen(self):
self.env['screen']['oldTTY'] = 'pty'
self.env['screen']['newTTY'] = 'pty'
def injectTextToScreen(self, msgBytes, screen=None):
def inject_text_to_screen(self, msg_bytes, screen=None):
if not screen:
screen = self.p_out.fileno()
if isinstance(msgBytes, str):
msgBytes = bytes(msgBytes, 'UTF-8')
os.write(screen, msgBytes)
if isinstance(msg_bytes, str):
msg_bytes = bytes(msg_bytes, 'UTF-8')
os.write(screen, msg_bytes)
def getSessionInformation(self):
def get_session_information(self):
self.env['screen']['autoIgnoreScreens'] = []
self.env['general']['prevUser'] = getpass.getuser()
self.env['general']['currUser'] = getpass.getuser()
self.env['general']['prev_user'] = getpass.getuser()
self.env['general']['curr_user'] = getpass.getuser()
def readAll(self, fd, timeout=0.3, interruptFd=None, len=65536):
msgBytes = b''
fdList = []
fdList += [fd]
def read_all(self, fd, timeout=0.3, interruptFd=None, len=65536):
msg_bytes = b''
fd_list = []
fd_list += [fd]
if interruptFd:
fdList += [interruptFd]
fd_list += [interruptFd]
starttime = time.time()
while True:
r = screen_utils.hasMoreWhat(fdList, 0.0001)
r = screen_utils.has_more_what(fd_list, 0.0001)
# nothing more to read
if fd not in r:
break
data = os.read(fd, len)
if data == b'':
raise EOFError
msgBytes += data
msg_bytes += data
# exit on interrupt available
if interruptFd in r:
break
@ -165,9 +165,9 @@ class driver(screenDriver):
# more is here
if (time.time() - starttime) >= timeout:
break
return msgBytes
return msg_bytes
def openTerminal(self, columns, lines, command):
def open_terminal(self, columns, lines, command):
p_pid, master_fd = pty.fork()
if p_pid == 0: # Child.
argv = shlex.split(command)
@ -187,96 +187,96 @@ class driver(screenDriver):
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines, p_out), p_pid, p_out
def resizeTerminal(self, fd):
def resize_terminal(self, fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
s = fcntl.ioctl(0, termios.TIOCGWINSZ, s)
fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
lines, columns, _, _ = struct.unpack('hhhh', s)
return lines, columns
def getTerminalSize(self, fd):
def get_terminal_size(self, fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
lines, columns, _, _ = struct.unpack(
'HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s))
return lines, columns
def handleSigwinch(self, *args):
def handle_sigwinch(self, *args):
os.write(self.signalPipe[1], b'w')
def terminalEmulation(self, active, eventQueue):
def terminal_emulation(self, active, event_queue):
try:
old_attr = termios.tcgetattr(sys.stdin)
tty.setraw(0)
lines, columns = self.getTerminalSize(0)
lines, columns = self.get_terminal_size(0)
if self.command == '':
self.command = screen_utils.getShell()
self.terminal, self.p_pid, self.p_out = self.openTerminal(
self.command = screen_utils.get_shell()
self.terminal, self.p_pid, self.p_out = self.open_terminal(
columns, lines, self.command)
lines, columns = self.resizeTerminal(self.p_out)
lines, columns = self.resize_terminal(self.p_out)
self.terminal.resize(lines, columns)
fdList = [sys.stdin, self.p_out, self.signalPipe[0]]
fd_list = [sys.stdin, self.p_out, self.signalPipe[0]]
while active.value:
r, _, _ = select(fdList, [], [], 1)
r, _, _ = select(fd_list, [], [], 1)
# none
if r == []:
continue
# signals
if self.signalPipe[0] in r:
os.read(self.signalPipe[0], 1)
lines, columns = self.resizeTerminal(self.p_out)
lines, columns = self.resize_terminal(self.p_out)
self.terminal.resize(lines, columns)
# input
if sys.stdin in r:
try:
msgBytes = self.readAll(sys.stdin.fileno(), len=4096)
msg_bytes = self.read_all(sys.stdin.fileno(), len=4096)
except (EOFError, OSError):
eventQueue.put(
{"Type": fenrirEventType.StopMainLoop, "Data": None})
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
break
if self.shortcutType == 'KEY':
try:
self.injectTextToScreen(msgBytes)
self.inject_text_to_screen(msg_bytes)
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
self.env['runtime']['DebugManager'].write_debug_out(
'ptyDriver getInputData: Error injecting text to screen: ' + str(e),
debug.debugLevel.ERROR)
eventQueue.put(
{"Type": fenrirEventType.StopMainLoop, "Data": None})
debug.DebugLevel.ERROR)
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
break
else:
eventQueue.put({"Type": fenrirEventType.ByteInput,
"Data": msgBytes})
event_queue.put({"Type": FenrirEventType.byte_input,
"data": msg_bytes})
# output
if self.p_out in r:
try:
msgBytes = self.readAll(
msg_bytes = self.read_all(
self.p_out.fileno(), interruptFd=sys.stdin.fileno())
except (EOFError, OSError):
eventQueue.put(
{"Type": fenrirEventType.StopMainLoop, "Data": None})
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
break
# feed and send event bevore write, the pyte already has the right state
# so fenrir already can progress bevore os.write what
# should give some better reaction time
self.terminal.feed(msgBytes)
eventQueue.put(
self.terminal.feed(msg_bytes)
event_queue.put(
{
"Type": fenrirEventType.ScreenUpdate,
"Data": screen_utils.createScreenEventData(
self.terminal.GetScreenContent())})
self.injectTextToScreen(
msgBytes, screen=sys.stdout.fileno())
"Type": FenrirEventType.screen_update,
"data": screen_utils.create_screen_event_data(
self.terminal.get_screen_content())})
self.inject_text_to_screen(
msg_bytes, screen=sys.stdout.fileno())
except Exception as e: # Process died?
print(e)
eventQueue.put(
{"Type": fenrirEventType.StopMainLoop, "Data": None})
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
finally:
os.kill(self.p_pid, signal.SIGTERM)
self.p_out.close()
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
eventQueue.put(
{"Type": fenrirEventType.StopMainLoop, "Data": None})
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
sys.exit(0)
def getCurrApplication(self):
def get_curr_application(self):
pass

View File

@ -22,8 +22,8 @@ from array import array
from fcntl import ioctl
from struct import unpack_from, unpack, pack
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import fenrirEventType
from fenrirscreenreader.core.screenDriver import screenDriver
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver
from fenrirscreenreader.utils import screen_utils
@ -61,17 +61,17 @@ class driver(screenDriver):
15: 'white'}
self.hichar = None
try:
# set workaround for paste clipboard -> injectTextToScreen
# set workaround for paste clipboard -> inject_text_to_screen
subprocess.run(['sysctl', 'dev.tty.legacy_tiocsti=1'],
check=False, capture_output=True, timeout=5)
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error running fgconsole: ' + str(e),
debug.debugLevel.ERROR)
debug.DebugLevel.ERROR)
def initialize(self, environment):
self.env = environment
self.env['runtime']['attributeManager'].appendDefaultAttributes([
self.env['runtime']['AttributeManager'].append_default_attributes([
self.fgColorValues[7], # fg
self.bgColorValues[0], # bg
False, # bold
@ -83,27 +83,27 @@ class driver(screenDriver):
'default', # fontsize
'default' # fontfamily
]) # end attribute )
self.env['runtime']['processManager'].addCustomEventThread(
self.updateWatchdog, multiprocess=True)
self.env['runtime']['ProcessManager'].add_custom_event_thread(
self.update_watchdog, multiprocess=True)
def getCurrScreen(self):
def get_curr_screen(self):
self.env['screen']['oldTTY'] = self.env['screen']['newTTY']
try:
with open('/sys/devices/virtual/tty/tty0/active', 'r') as currScreenFile:
self.env['screen']['newTTY'] = str(currScreenFile.read()[3:-1])
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
str(e), debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
str(e), debug.DebugLevel.ERROR)
def injectTextToScreen(self, text, screen=None):
useScreen = "/dev/tty" + self.env['screen']['newTTY']
def inject_text_to_screen(self, text, screen=None):
use_screen = "/dev/tty" + self.env['screen']['newTTY']
if screen is not None:
useScreen = screen
with open(useScreen, 'w') as fd:
use_screen = screen
with open(use_screen, 'w') as fd:
for c in text:
fcntl.ioctl(fd, termios.TIOCSTI, c)
def getSessionInformation(self):
def get_session_information(self):
self.env['screen']['autoIgnoreScreens'] = []
try:
if not self.sysBus:
@ -118,7 +118,7 @@ class driver(screenDriver):
obj = self.sysBus.get_object(
'org.freedesktop.login1', session[4])
inf = dbus.Interface(obj, 'org.freedesktop.DBus.Properties')
sessionType = inf.Get('org.freedesktop.login1.Session', 'Type')
session_type = inf.Get('org.freedesktop.login1.Session', 'Type')
screen = str(inf.Get('org.freedesktop.login1.Session', 'VTNr'))
if screen == '':
screen = str(
@ -127,29 +127,29 @@ class driver(screenDriver):
'TTY'))
screen = screen[screen.upper().find('TTY') + 3:]
if screen == '':
self.env['runtime']['debug'].writeDebugOut(
'No TTY found for session:' + session[4], debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'No TTY found for session:' + session[4], debug.DebugLevel.ERROR)
return
if sessionType.upper() != 'TTY':
if session_type.upper() != 'TTY':
self.env['screen']['autoIgnoreScreens'] += [screen]
if screen == self.env['screen']['newTTY']:
if self.env['general']['currUser'] != session[2]:
self.env['general']['prevUser'] = self.env['general']['currUser']
self.env['general']['currUser'] = session[2]
if self.env['general']['curr_user'] != session[2]:
self.env['general']['prev_user'] = self.env['general']['curr_user']
self.env['general']['curr_user'] = session[2]
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'getSessionInformation: Maybe no LoginD:' + str(e), debug.debugLevel.ERROR)
# self.env['runtime']['debug'].writeDebugOut('getSessionInformation:' + str(self.env['screen']['autoIgnoreScreens']) + ' ' + str(self.env['general']) ,debug.debugLevel.INFO)
self.env['runtime']['DebugManager'].write_debug_out(
'get_session_information: Maybe no LoginD:' + str(e), debug.DebugLevel.ERROR)
# self.env['runtime']['DebugManager'].write_debug_out('get_session_information:' + str(self.env['screen']['autoIgnoreScreens']) + ' ' + str(self.env['general']) ,debug.DebugLevel.INFO)
def readFile(self, file):
def read_file(self, file):
d = b''
file.seek(0)
try:
d = file.read()
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver getScreenText: Error reading file: ' + str(e),
debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver get_screen_text: Error reading file: ' + str(e),
debug.DebugLevel.ERROR)
file.seek(0)
while True:
# Read from file
@ -161,40 +161,40 @@ class driver(screenDriver):
break
return d
def updateWatchdog(self, active, eventQueue):
def update_watchdog(self, active, event_queue):
vcsa = {}
vcsu = {}
tty = None
watchdog = None
try:
useVCSU = os.access('/dev/vcsu', os.R_OK)
vcsaDevices = glob.glob('/dev/vcsa*')
vcsuDevices = None
lastScreenContent = b''
use_vcsu = os.access('/dev/vcsu', os.R_OK)
vcsa_devices = glob.glob('/dev/vcsa*')
vcsu_devices = None
last_screen_content = b''
# Open TTY file with proper cleanup
tty = open('/sys/devices/virtual/tty/tty0/active', 'r')
currScreen = str(tty.read()[3:-1])
oldScreen = currScreen
curr_screen = str(tty.read()[3:-1])
old_screen = curr_screen
# Open VCSA devices with proper cleanup tracking
for vcsaDev in vcsaDevices:
for vcsaDev in vcsa_devices:
index = str(vcsaDev[9:])
vcsa[index] = open(vcsaDev, 'rb')
if index == currScreen:
lastScreenContent = self.readFile(vcsa[index])
if index == curr_screen:
last_screen_content = self.read_file(vcsa[index])
# Open VCSU devices if available
if useVCSU:
vcsuDevices = glob.glob('/dev/vcsu*')
for vcsuDev in vcsuDevices:
if use_vcsu:
vcsu_devices = glob.glob('/dev/vcsu*')
for vcsuDev in vcsu_devices:
index = str(vcsuDev[9:])
vcsu[index] = open(vcsuDev, 'rb')
self.updateCharMap(currScreen)
self.update_char_map(curr_screen)
watchdog = select.epoll()
watchdog.register(
vcsa[currScreen],
vcsa[curr_screen],
select.POLLPRI | select.POLLERR)
watchdog.register(tty, select.POLLPRI | select.POLLERR)
while active.value:
@ -203,86 +203,86 @@ class driver(screenDriver):
fileno = change[0]
event = change[1]
if fileno == tty.fileno():
self.env['runtime']['debug'].writeDebugOut(
'ScreenChange', debug.debugLevel.INFO)
self.env['runtime']['DebugManager'].write_debug_out(
'ScreenChange', debug.DebugLevel.INFO)
tty.seek(0)
currScreen = str(tty.read()[3:-1])
if currScreen != oldScreen:
curr_screen = str(tty.read()[3:-1])
if curr_screen != old_screen:
try:
watchdog.unregister(vcsa[oldScreen])
watchdog.unregister(vcsa[old_screen])
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver updateWatchdog: Error unregistering watchdog: ' + str(e),
debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error unregistering watchdog: ' + str(e),
debug.DebugLevel.ERROR)
try:
watchdog.register(
vcsa[currScreen], select.POLLPRI | select.POLLERR)
vcsa[curr_screen], select.POLLPRI | select.POLLERR)
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver updateWatchdog: Error registering watchdog: ' + str(e),
debug.debugLevel.ERROR)
self.updateCharMap(currScreen)
oldScreen = currScreen
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error registering watchdog: ' + str(e),
debug.DebugLevel.ERROR)
self.update_char_map(curr_screen)
old_screen = curr_screen
try:
vcsa[currScreen].seek(0)
lastScreenContent = self.readFile(
vcsa[currScreen])
vcsa[curr_screen].seek(0)
last_screen_content = self.read_file(
vcsa[curr_screen])
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver updateWatchdog: Error reading screen content: ' + str(e),
debug.debugLevel.ERROR)
vcsuContent = None
if useVCSU:
vcsu[currScreen].seek(0)
vcsuContent = self.readFile(vcsu[currScreen])
eventQueue.put({"Type": fenrirEventType.ScreenChanged, "Data": self.createScreenEventData(
currScreen, lastScreenContent, vcsuContent)})
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error reading screen content: ' + str(e),
debug.DebugLevel.ERROR)
vcsu_content = None
if use_vcsu:
vcsu[curr_screen].seek(0)
vcsu_content = self.read_file(vcsu[curr_screen])
event_queue.put({"Type": FenrirEventType.screen_changed, "data": self.create_screen_event_data(
curr_screen, last_screen_content, vcsu_content)})
else:
self.env['runtime']['debug'].writeDebugOut(
'ScreenUpdate', debug.debugLevel.INFO)
vcsa[currScreen].seek(0)
self.env['runtime']['DebugManager'].write_debug_out(
'screen_update', debug.DebugLevel.INFO)
vcsa[curr_screen].seek(0)
time.sleep(0.01)
dirtyContent = self.readFile(vcsa[currScreen])
screenContent = dirtyContent
vcsuContent = None
dirty_content = self.read_file(vcsa[curr_screen])
screen_content = dirty_content
vcsu_content = None
timeout = time.time()
# error case
if screenContent == b'':
if screen_content == b'':
continue
if lastScreenContent == b'':
lastScreenContent = screenContent
if (abs(int(screenContent[2]) - int(lastScreenContent[2])) in [1, 2]) and (
int(screenContent[3]) == int(lastScreenContent[3])):
if last_screen_content == b'':
last_screen_content = screen_content
if (abs(int(screen_content[2]) - int(last_screen_content[2])) in [1, 2]) and (
int(screen_content[3]) == int(last_screen_content[3])):
# Skip X Movement
pass
elif (abs(int(screenContent[3]) - int(lastScreenContent[3])) in [1]) and \
(int(screenContent[2]) == int(lastScreenContent[2])):
elif (abs(int(screen_content[3]) - int(last_screen_content[3])) in [1]) and \
(int(screen_content[2]) == int(last_screen_content[2])):
# Skip Y Movement
pass
else:
# anything else? wait for completion
while True:
screenContent = dirtyContent
screen_content = dirty_content
time.sleep(0.02)
# r,_,_ = select.select([vcsa[currScreen]], [], [], 0.07)
# if not vcsa[currScreen] in r:
# r,_,_ = select.select([vcsa[curr_screen]], [], [], 0.07)
# if not vcsa[curr_screen] in r:
# break
vcsa[currScreen].seek(0)
dirtyContent = self.readFile(vcsa[currScreen])
if screenContent == dirtyContent:
vcsa[curr_screen].seek(0)
dirty_content = self.read_file(vcsa[curr_screen])
if screen_content == dirty_content:
break
if time.time() - timeout >= 0.1:
screenContent = dirtyContent
screen_content = dirty_content
break
if useVCSU:
vcsu[currScreen].seek(0)
vcsuContent = self.readFile(vcsu[currScreen])
lastScreenContent = screenContent
eventQueue.put({"Type": fenrirEventType.ScreenUpdate, "Data": self.createScreenEventData(
currScreen, screenContent, vcsuContent)})
if use_vcsu:
vcsu[curr_screen].seek(0)
vcsu_content = self.read_file(vcsu[curr_screen])
last_screen_content = screen_content
event_queue.put({"Type": FenrirEventType.screen_update, "data": self.create_screen_event_data(
curr_screen, screen_content, vcsu_content)})
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'VCSA:updateWatchdog:' + str(e), debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'VCSA:update_watchdog:' + str(e), debug.DebugLevel.ERROR)
time.sleep(0.2)
finally:
# Clean up all file handles
@ -290,32 +290,32 @@ class driver(screenDriver):
if watchdog:
watchdog.close()
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver updateWatchdog: Error closing watchdog: ' + str(e),
debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error closing watchdog: ' + str(e),
debug.DebugLevel.ERROR)
try:
if tty:
tty.close()
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver shutdown: Error closing TTY: ' + str(e), debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error closing TTY: ' + str(e), debug.DebugLevel.ERROR)
for handle in vcsa.values():
try:
handle.close()
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error closing VCSA handle: ' + str(e),
debug.debugLevel.ERROR)
debug.DebugLevel.ERROR)
for handle in vcsu.values():
try:
handle.close()
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error closing VCSU handle: ' + str(e),
debug.debugLevel.ERROR)
debug.DebugLevel.ERROR)
def createScreenEventData(self, screen, vcsaContent, vcsuContent=None):
eventData = {
def create_screen_event_data(self, screen, vcsaContent, vcsu_content=None):
event_data = {
'bytes': vcsaContent,
'lines': int(vcsaContent[0]),
'columns': int(vcsaContent[1]),
@ -330,26 +330,26 @@ class driver(screenDriver):
'attributes': [],
}
try:
eventData['text'], eventData['attributes'] = self.autoDecodeVCSA(
vcsaContent[4:], eventData['lines'], eventData['columns'])
event_data['text'], event_data['attributes'] = self.auto_decode_vcsa(
vcsaContent[4:], event_data['lines'], event_data['columns'])
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver createScreenEventData: Error decoding VCSA content: ' + str(e),
debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver create_screen_event_data: Error decoding VCSA content: ' + str(e),
debug.DebugLevel.ERROR)
# VCSU seems to give b' ' instead of b'\x00\x00\x00' (tsp),
# deactivated until its fixed
if vcsuContent is not None:
if vcsu_content is not None:
try:
vcsuContentAsText = vcsuContent.decode('UTF-32')
eventData['text'] = screen_utils.insertNewlines(
vcsuContentAsText, eventData['columns'])
vcsu_content_as_text = vcsu_content.decode('UTF-32')
event_data['text'] = screen_utils.insert_newlines(
vcsu_content_as_text, event_data['columns'])
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver createScreenEventData: Error decoding VCSU content: ' + str(e),
debug.debugLevel.ERROR)
return eventData.copy()
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver create_screen_event_data: Error decoding VCSU content: ' + str(e),
debug.DebugLevel.ERROR)
return event_data.copy()
def updateCharMap(self, screen):
def update_char_map(self, screen):
self.charmap = {}
try:
with open('/dev/tty' + screen, 'rb') as tty:
@ -369,13 +369,13 @@ class driver(screenDriver):
ioctl(tty.fileno(), GIO_UNIMAP, unimapdesc)
break
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'VCSA:updateCharMap:scaling up sz=' + str(sz) + ' ' + str(e),
debug.debugLevel.WARNING)
self.env['runtime']['DebugManager'].write_debug_out(
'VCSA:update_char_map:scaling up sz=' + str(sz) + ' ' + str(e),
debug.DebugLevel.WARNING)
sz *= 2
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'VCSA:updateCharMap:' + str(e), debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'VCSA:update_char_map:' + str(e), debug.DebugLevel.ERROR)
return
ncodes, = unpack_from("@H", unimapdesc)
utable = unpack_from("@%dH" % (2 * ncodes), unipairs)
@ -383,13 +383,13 @@ class driver(screenDriver):
if self.charmap.get(b) is None:
self.charmap[b] = chr(u)
def autoDecodeVCSA(self, allData, rows, cols):
allText = ''
allAttrib = []
def auto_decode_vcsa(self, allData, rows, cols):
all_text = ''
all_attrib = []
i = 0
for y in range(rows):
lineText = ''
lineAttrib = []
line_text = ''
line_attrib = []
blink = 0
bold = 0
ink = 7
@ -402,7 +402,7 @@ class driver(screenDriver):
# ink = 7
# paper = 0
# ch = ' '
charAttrib = [
char_attrib = [
self.fgColorValues[7], # fg
self.bgColorValues[0], # bg
False, # bold
@ -413,8 +413,8 @@ class driver(screenDriver):
False, # blink
'default', # fontsize
'default'] # fontfamily
lineAttrib.append(charAttrib)
lineText += ' '
line_attrib.append(char_attrib)
line_text += ' '
continue
ch = None
try:
@ -425,9 +425,9 @@ class driver(screenDriver):
if sh & self.hichar:
ch |= 0x100
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver autoDecodeVCSA: Error processing character: ' + str(e),
debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver auto_decode_vcsa: Error processing character: ' + str(e),
debug.DebugLevel.ERROR)
ch = None
if self.hichar == 0x100:
attr >>= 1
@ -443,15 +443,15 @@ class driver(screenDriver):
# if (ink != 7) or (paper != 0):
# print(ink,paper)
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
'vcsaDriver autoDecodeVCSA: Error processing attributes: ' + str(e),
debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver auto_decode_vcsa: Error processing attributes: ' + str(e),
debug.DebugLevel.ERROR)
try:
lineText += self.charmap[ch]
line_text += self.charmap[ch]
except KeyError:
lineText += '?'
line_text += '?'
charAttrib = [
char_attrib = [
self.fgColorValues[ink],
self.bgColorValues[paper],
bold == 1, # bold
@ -462,28 +462,28 @@ class driver(screenDriver):
blink == 1, # blink
'default', # fontsize
'default'] # fontfamily
lineAttrib.append(charAttrib)
allText += lineText
line_attrib.append(char_attrib)
all_text += line_text
if y + 1 < rows:
allText += '\n'
allAttrib.append(lineAttrib)
return str(allText), allAttrib
all_text += '\n'
all_attrib.append(line_attrib)
return str(all_text), all_attrib
def getCurrApplication(self):
def get_curr_application(self):
apps = []
try:
currScreen = self.env['screen']['newTTY']
curr_screen = self.env['screen']['newTTY']
apps = subprocess.Popen(
'ps -t tty' +
currScreen +
curr_screen +
' -o comm,tty,stat',
shell=True,
stdout=subprocess.PIPE).stdout.read().decode()[
:-
1].split('\n')
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
str(e), debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
str(e), debug.DebugLevel.ERROR)
return
try:
for i in apps:
@ -496,10 +496,10 @@ class driver(screenDriver):
if not "GREP" == i[0] and \
not "SH" == i[0] and \
not "PS" == i[0]:
if "TTY" + currScreen in i[1]:
if self.env['screen']['newApplication'] != i[0]:
self.env['screen']['newApplication'] = i[0]
if "TTY" + curr_screen in i[1]:
if self.env['screen']['new_application'] != i[0]:
self.env['screen']['new_application'] = i[0]
return
except Exception as e:
self.env['runtime']['debug'].writeDebugOut(
str(e), debug.debugLevel.ERROR)
self.env['runtime']['DebugManager'].write_debug_out(
str(e), debug.DebugLevel.ERROR)