More pep8 fixes. A tiny bit of refactoring.

This commit is contained in:
Storm Dragon
2025-07-07 00:42:23 -04:00
parent d28c18faed
commit 3390c25dfe
343 changed files with 11092 additions and 7582 deletions

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributers.
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver

View File

@ -2,21 +2,23 @@
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributers.
# By Chrys, Storm Dragon, and contributors.
import os
import struct
import sys
import pty
import tty
import termios
import shlex
import signal
import pyte
import time
import fcntl
import getpass
import os
import pty
import shlex
import signal
import struct
import sys
import termios
import time
import tty
from select import select
import pyte
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver
@ -31,11 +33,12 @@ class FenrirScreen(pyte.Screen):
class Terminal:
def __init__(self, columns, lines, p_in):
self.text = ''
self.text = ""
self.attributes = None
self.screen = FenrirScreen(columns, lines)
self.screen.write_process_input = \
lambda data: p_in.write(data.encode())
self.screen.write_process_input = lambda data: p_in.write(
data.encode()
)
self.stream = pyte.ByteStream()
self.stream.attach(self.screen)
@ -49,8 +52,13 @@ class Terminal:
lines = self.screen.dirty
else:
lines = range(self.screen.lines)
self.attributes = [[list(attribute[1:]) + [False, 'default', 'default']
for attribute in line.values()] for line in buffer.values()]
self.attributes = [
[
list(attribute[1:]) + [False, "default", "default"]
for attribute in line.values()
]
for line in buffer.values()
]
for y in lines:
try:
t = self.attributes[y]
@ -58,23 +66,30 @@ class Terminal:
# Terminal class doesn't have access to env, use fallback
# logging
print(
f'ptyDriver Terminal update_attributes: Error accessing attributes: {e}')
f"ptyDriver Terminal update_attributes: Error accessing attributes: {e}"
)
self.attributes.append([])
self.attributes[y] = [list(
attribute[1:]) + [False, 'default', 'default'] for attribute in (buffer[y].values())]
self.attributes[y] = [
list(attribute[1:]) + [False, "default", "default"]
for attribute in (buffer[y].values())
]
if len(self.attributes[y]) < self.screen.columns:
diff = self.screen.columns - len(self.attributes[y])
self.attributes[y] += [['default',
'default',
False,
False,
False,
False,
False,
False,
'default',
'default']] * diff
self.attributes[y] += [
[
"default",
"default",
False,
False,
False,
False,
False,
False,
"default",
"default",
]
] * diff
def resize(self, lines, columns):
self.screen.resize(lines, columns)
@ -89,23 +104,24 @@ class Terminal:
if y_pos == -1:
y_pos = self.screen.cursor.y
self.screen.cursor.x = min(
self.screen.cursor.x,
self.screen.columns - 1)
self.screen.cursor.x, self.screen.columns - 1
)
self.screen.cursor.y = min(self.screen.cursor.y, self.screen.lines - 1)
def get_screen_content(self):
cursor = self.screen.cursor
self.text = '\n'.join(self.screen.display)
self.text = "\n".join(self.screen.display)
self.update_attributes(self.attributes is None)
self.screen.dirty.clear()
return {"cursor": (cursor.x, cursor.y),
'lines': self.screen.lines,
'columns': self.screen.columns,
"text": self.text,
'attributes': self.attributes.copy(),
'screen': 'pty',
'screenUpdateTime': time.time(),
}.copy()
return {
"cursor": (cursor.x, cursor.y),
"lines": self.screen.lines,
"columns": self.screen.columns,
"text": self.text,
"attributes": self.attributes.copy(),
"screen": "pty",
"screenUpdateTime": time.time(),
}.copy()
class driver(screenDriver):
@ -119,31 +135,34 @@ class driver(screenDriver):
def initialize(self, environment):
self.env = environment
self.command = self.env['runtime']['SettingsManager'].get_setting(
'general', 'shell')
self.shortcutType = self.env['runtime']['InputManager'].get_shortcut_type(
self.command = self.env["runtime"]["SettingsManager"].get_setting(
"general", "shell"
)
self.shortcutType = self.env["runtime"][
"InputManager"
].get_shortcut_type()
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
self.terminal_emulation
)
self.env['runtime']['ProcessManager'].add_custom_event_thread(
self.terminal_emulation)
def get_curr_screen(self):
self.env['screen']['oldTTY'] = 'pty'
self.env['screen']['newTTY'] = 'pty'
self.env["screen"]["oldTTY"] = "pty"
self.env["screen"]["newTTY"] = "pty"
def inject_text_to_screen(self, msg_bytes, screen=None):
if not screen:
screen = self.p_out.fileno()
if isinstance(msg_bytes, str):
msg_bytes = bytes(msg_bytes, 'UTF-8')
msg_bytes = bytes(msg_bytes, "UTF-8")
os.write(screen, msg_bytes)
def get_session_information(self):
self.env['screen']['autoIgnoreScreens'] = []
self.env['general']['prev_user'] = getpass.getuser()
self.env['general']['curr_user'] = getpass.getuser()
self.env["screen"]["autoIgnoreScreens"] = []
self.env["general"]["prev_user"] = getpass.getuser()
self.env["general"]["curr_user"] = getpass.getuser()
def read_all(self, fd, timeout=0.3, interruptFd=None, len=65536):
msg_bytes = b''
msg_bytes = b""
fd_list = []
fd_list += [fd]
if interruptFd:
@ -155,7 +174,7 @@ class driver(screenDriver):
if fd not in r:
break
data = os.read(fd, len)
if data == b'':
if data == b"":
raise EOFError
msg_bytes += data
# exit on interrupt available
@ -174,44 +193,47 @@ class driver(screenDriver):
env = os.environ.copy()
# values are VT100,xterm-256color,linux
try:
if env["TERM"] == '':
env["TERM"] = 'linux'
if env["TERM"] == "":
env["TERM"] = "linux"
except Exception as e:
# Child process doesn't have access to env, use fallback
# logging
print(
f'ptyDriver spawnTerminal: Error checking TERM environment: {e}')
env["TERM"] = 'linux'
f"ptyDriver spawnTerminal: Error checking TERM environment: {e}"
)
env["TERM"] = "linux"
os.execvpe(argv[0], argv, env)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines, p_out), p_pid, p_out
def resize_terminal(self, fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
s = struct.pack("HHHH", 0, 0, 0, 0)
s = fcntl.ioctl(0, termios.TIOCGWINSZ, s)
fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
lines, columns, _, _ = struct.unpack('hhhh', s)
lines, columns, _, _ = struct.unpack("hhhh", s)
return lines, columns
def get_terminal_size(self, fd):
s = struct.pack('HHHH', 0, 0, 0, 0)
s = struct.pack("HHHH", 0, 0, 0, 0)
lines, columns, _, _ = struct.unpack(
'HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s))
"HHHH", fcntl.ioctl(fd, termios.TIOCGWINSZ, s)
)
return lines, columns
def handle_sigwinch(self, *args):
os.write(self.signalPipe[1], b'w')
os.write(self.signalPipe[1], b"w")
def terminal_emulation(self, active, event_queue):
try:
old_attr = termios.tcgetattr(sys.stdin)
tty.setraw(0)
lines, columns = self.get_terminal_size(0)
if self.command == '':
if self.command == "":
self.command = screen_utils.get_shell()
self.terminal, self.p_pid, self.p_out = self.open_terminal(
columns, lines, self.command)
columns, lines, self.command
)
lines, columns = self.resize_terminal(self.p_out)
self.terminal.resize(lines, columns)
fd_list = [sys.stdin, self.p_out, self.signalPipe[0]]
@ -231,29 +253,50 @@ class driver(screenDriver):
msg_bytes = self.read_all(sys.stdin.fileno(), len=4096)
except (EOFError, OSError):
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
{
"Type": FenrirEventType.stop_main_loop,
"data": None,
}
)
break
if self.shortcutType == 'KEY':
if self.shortcutType == "KEY":
try:
self.inject_text_to_screen(msg_bytes)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'ptyDriver getInputData: Error injecting text to screen: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"][
"DebugManager"
].write_debug_out(
"ptyDriver getInputData: Error injecting text to screen: "
+ str(e),
debug.DebugLevel.ERROR,
)
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
{
"Type": FenrirEventType.stop_main_loop,
"data": None,
}
)
break
else:
event_queue.put({"Type": FenrirEventType.byte_input,
"data": msg_bytes})
event_queue.put(
{
"Type": FenrirEventType.byte_input,
"data": msg_bytes,
}
)
# output
if self.p_out in r:
try:
msg_bytes = self.read_all(
self.p_out.fileno(), interruptFd=sys.stdin.fileno())
self.p_out.fileno(), interruptFd=sys.stdin.fileno()
)
except (EOFError, OSError):
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
{
"Type": FenrirEventType.stop_main_loop,
"data": None,
}
)
break
# feed and send event bevore write, the pyte already has the right state
# so fenrir already can progress bevore os.write what
@ -263,19 +306,25 @@ class driver(screenDriver):
{
"Type": FenrirEventType.screen_update,
"data": screen_utils.create_screen_event_data(
self.terminal.get_screen_content())})
self.terminal.get_screen_content()
),
}
)
self.inject_text_to_screen(
msg_bytes, screen=sys.stdout.fileno())
msg_bytes, screen=sys.stdout.fileno()
)
except Exception as e: # Process died?
print(e)
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
{"Type": FenrirEventType.stop_main_loop, "data": None}
)
finally:
os.kill(self.p_pid, signal.SIGTERM)
self.p_out.close()
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
event_queue.put(
{"Type": FenrirEventType.stop_main_loop, "data": None})
{"Type": FenrirEventType.stop_main_loop, "data": None}
)
sys.exit(0)
def get_curr_application(self):

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributers.
# By Chrys, Storm Dragon, and contributors.
# attrib:
# http://rampex.ihep.su/Linux/linux_howto/html/tutorials/mini/Colour-ls-6.html
# 0 = black, 1 = blue, 2 = green, 3 = cyan, 4 = red, 5 = purple, 6 = brown/yellow, 7 = white.
@ -10,17 +10,21 @@
# blink = 5 if attr & 1 else 0
# bold = 1 if attr & 16 else 0
import subprocess
import fcntl
import glob
import os
import select
import subprocess
import termios
import time
import select
import dbus
import fcntl
from array import array
from fcntl import ioctl
from struct import unpack_from, unpack, pack
from struct import pack
from struct import unpack
from struct import unpack_from
import dbus
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver
@ -28,128 +32,230 @@ from fenrirscreenreader.utils import screen_utils
class driver(screenDriver):
"""Linux VCSA (Virtual Console Screen Access) driver for Fenrir screen reader.
This driver provides access to Linux virtual consoles (TTYs) through the VCSA
interface, allowing real-time monitoring of screen content and cursor position.
It supports both text content extraction and color/attribute detection.
The driver monitors multiple virtual consoles simultaneously and can detect:
- Screen content changes (text updates)
- Cursor movement
- TTY switching
- Text attributes (colors, bold, etc.)
- Session information via D-Bus/logind
Attributes:
ListSessions: D-Bus method for listing login sessions
sysBus: D-Bus system bus connection
charmap: Character mapping for text decoding
bgColorValues: Background color value mappings
fgColorValues: Foreground color value mappings
hichar: High character mask for Unicode support
"""
def __init__(self):
screenDriver.__init__(self)
self.ListSessions = None
self.sysBus = None
self.charmap = {}
self.bgColorValues = {
0: 'black',
1: 'blue',
2: 'green',
3: 'cyan',
4: 'red',
5: 'magenta',
6: 'brown/yellow',
7: 'white'}
0: "black",
1: "blue",
2: "green",
3: "cyan",
4: "red",
5: "magenta",
6: "brown/yellow",
7: "white",
}
self.fgColorValues = {
0: 'black',
1: 'blue',
2: 'green',
3: 'cyan',
4: 'red',
5: 'magenta',
6: 'brown/yellow',
7: 'light gray',
8: 'dark gray',
9: 'light blue',
10: 'light green',
11: 'light cyan',
12: 'light red',
13: 'light magenta',
14: 'light yellow',
15: 'white'}
0: "black",
1: "blue",
2: "green",
3: "cyan",
4: "red",
5: "magenta",
6: "brown/yellow",
7: "light gray",
8: "dark gray",
9: "light blue",
10: "light green",
11: "light cyan",
12: "light red",
13: "light magenta",
14: "light yellow",
15: "white",
}
self.hichar = None
try:
# set workaround for paste clipboard -> inject_text_to_screen
subprocess.run(['sysctl', 'dev.tty.legacy_tiocsti=1'],
check=False, capture_output=True, timeout=5)
subprocess.run(
["sysctl", "dev.tty.legacy_tiocsti=1"],
check=False,
capture_output=True,
timeout=5,
)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error running fgconsole: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error running fgconsole: " + str(e),
debug.DebugLevel.ERROR,
)
def initialize(self, environment):
"""Initialize the VCSA driver with the given environment.
Sets up default attributes, starts the screen monitoring watchdog process,
and prepares the driver for screen content monitoring.
Args:
environment: The Fenrir environment dictionary containing runtime managers
and configuration settings.
"""
self.env = environment
self.env['runtime']['AttributeManager'].append_default_attributes([
self.fgColorValues[7], # fg
self.bgColorValues[0], # bg
False, # bold
False, # italics
False, # underscore
False, # strikethrough
False, # reverse
False, # blink
'default', # fontsize
'default' # fontfamily
]) # end attribute )
self.env['runtime']['ProcessManager'].add_custom_event_thread(
self.update_watchdog, multiprocess=True)
self.env["runtime"]["AttributeManager"].append_default_attributes(
[
self.fgColorValues[7], # fg
self.bgColorValues[0], # bg
False, # bold
False, # italics
False, # underscore
False, # strikethrough
False, # reverse
False, # blink
"default", # fontsize
"default", # fontfamily
]
) # end attribute )
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
self.update_watchdog, multiprocess=True
)
def get_curr_screen(self):
self.env['screen']['oldTTY'] = self.env['screen']['newTTY']
"""Get the currently active TTY number.
Reads from /sys/devices/virtual/tty/tty0/active to determine which
virtual console is currently active and updates the environment.
Updates:
env['screen']['oldTTY']: Previous TTY number
env['screen']['newTTY']: Current TTY number
"""
self.env["screen"]["oldTTY"] = self.env["screen"]["newTTY"]
try:
with open('/sys/devices/virtual/tty/tty0/active', 'r') as currScreenFile:
self.env['screen']['newTTY'] = str(currScreenFile.read()[3:-1])
with open(
"/sys/devices/virtual/tty/tty0/active", "r"
) as currScreenFile:
self.env["screen"]["newTTY"] = str(currScreenFile.read()[3:-1])
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
str(e), debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
def inject_text_to_screen(self, text, screen=None):
use_screen = "/dev/tty" + self.env['screen']['newTTY']
"""Inject text into the specified screen as if typed by user.
Uses the TIOCSTI ioctl to simulate keystrokes on the target TTY.
This is primarily used for clipboard paste functionality.
Args:
text (str): Text to inject into the screen
screen (str, optional): Target screen device (e.g., '/dev/tty1').
If None, uses current TTY.
Note:
Requires appropriate permissions and may need legacy_tiocsti=1
kernel parameter on newer systems.
"""
use_screen = "/dev/tty" + self.env["screen"]["newTTY"]
if screen is not None:
use_screen = screen
with open(use_screen, 'w') as fd:
with open(use_screen, "w") as fd:
for c in text:
fcntl.ioctl(fd, termios.TIOCSTI, c)
def get_session_information(self):
self.env['screen']['autoIgnoreScreens'] = []
"""Retrieve session information via D-Bus logind interface.
Connects to systemd-logind to gather information about active sessions,
including session types and TTY assignments. This helps identify which
screens should be automatically ignored (e.g., X11 sessions).
Updates:
env['screen']['autoIgnoreScreens']: List of screens to ignore
env['general']['curr_user']: Current user for active session
env['general']['prev_user']: Previous user
Note:
Gracefully handles cases where logind is not available.
"""
self.env["screen"]["autoIgnoreScreens"] = []
try:
if not self.sysBus:
self.sysBus = dbus.SystemBus()
obj = self.sysBus.get_object(
'org.freedesktop.login1', '/org/freedesktop/login1')
inf = dbus.Interface(obj, 'org.freedesktop.login1.Manager')
self.ListSessions = inf.get_dbus_method('ListSessions')
"org.freedesktop.login1", "/org/freedesktop/login1"
)
inf = dbus.Interface(obj, "org.freedesktop.login1.Manager")
self.ListSessions = inf.get_dbus_method("ListSessions")
sessions = self.ListSessions()
for session in sessions:
obj = self.sysBus.get_object(
'org.freedesktop.login1', session[4])
inf = dbus.Interface(obj, 'org.freedesktop.DBus.Properties')
session_type = inf.Get('org.freedesktop.login1.Session', 'Type')
screen = str(inf.Get('org.freedesktop.login1.Session', 'VTNr'))
if screen == '':
"org.freedesktop.login1", session[4]
)
inf = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
session_type = inf.Get(
"org.freedesktop.login1.Session", "Type"
)
screen = str(inf.Get("org.freedesktop.login1.Session", "VTNr"))
if screen == "":
screen = str(
inf.Get(
'org.freedesktop.login1.Session',
'TTY'))
screen = screen[screen.upper().find('TTY') + 3:]
if screen == '':
self.env['runtime']['DebugManager'].write_debug_out(
'No TTY found for session:' + session[4], debug.DebugLevel.ERROR)
inf.Get("org.freedesktop.login1.Session", "TTY")
)
screen = screen[screen.upper().find("TTY") + 3 :]
if screen == "":
self.env["runtime"]["DebugManager"].write_debug_out(
"No TTY found for session:" + session[4],
debug.DebugLevel.ERROR,
)
return
if session_type.upper() != 'TTY':
self.env['screen']['autoIgnoreScreens'] += [screen]
if screen == self.env['screen']['newTTY']:
if self.env['general']['curr_user'] != session[2]:
self.env['general']['prev_user'] = self.env['general']['curr_user']
self.env['general']['curr_user'] = session[2]
if session_type.upper() != "TTY":
self.env["screen"]["autoIgnoreScreens"] += [screen]
if screen == self.env["screen"]["newTTY"]:
if self.env["general"]["curr_user"] != session[2]:
self.env["general"]["prev_user"] = self.env["general"][
"curr_user"
]
self.env["general"]["curr_user"] = session[2]
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'get_session_information: Maybe no LoginD:' + str(e), debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"get_session_information: Maybe no LoginD:" + str(e),
debug.DebugLevel.ERROR,
)
# self.env['runtime']['DebugManager'].write_debug_out('get_session_information:' + str(self.env['screen']['autoIgnoreScreens']) + ' ' + str(self.env['general']) ,debug.DebugLevel.INFO)
def read_file(self, file):
d = b''
"""Read content from a file handle with error recovery.
Attempts to read the entire file content, falling back to
line-by-line reading if the initial read fails. This is used
for reading VCSA/VCSU device files.
Args:
file: Open file handle to read from
Returns:
bytes: File content as bytes
"""
d = b""
file.seek(0)
try:
d = file.read()
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver get_screen_text: Error reading file: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver get_screen_text: Error reading file: " + str(e),
debug.DebugLevel.ERROR,
)
file.seek(0)
while True:
# Read from file
@ -162,40 +268,64 @@ class driver(screenDriver):
return d
def update_watchdog(self, active, event_queue):
"""Main watchdog loop for monitoring screen changes.
This is the core monitoring function that runs in a separate process.
It uses epoll to efficiently monitor multiple VCSA devices and the
active TTY file for changes. When changes are detected, it generates
appropriate events for the main Fenrir process.
The watchdog monitors:
- Screen content changes (text updates)
- TTY switches (screen changes)
- Cursor position changes
Args:
active: Shared boolean value controlling the watchdog loop
event_queue: Queue for sending events to the main process
Events Generated:
- FenrirEventType.screen_changed: When switching TTYs
- FenrirEventType.screen_update: When screen content changes
Note:
This method runs in a multiprocess context and includes comprehensive
cleanup of file handles in the finally block.
"""
vcsa = {}
vcsu = {}
tty = None
watchdog = None
try:
use_vcsu = os.access('/dev/vcsu', os.R_OK)
vcsa_devices = glob.glob('/dev/vcsa*')
use_vcsu = os.access("/dev/vcsu", os.R_OK)
vcsa_devices = glob.glob("/dev/vcsa*")
vcsu_devices = None
last_screen_content = b''
last_screen_content = b""
# Open TTY file with proper cleanup
tty = open('/sys/devices/virtual/tty/tty0/active', 'r')
tty = open("/sys/devices/virtual/tty/tty0/active", "r")
curr_screen = str(tty.read()[3:-1])
old_screen = curr_screen
# Open VCSA devices with proper cleanup tracking
for vcsaDev in vcsa_devices:
index = str(vcsaDev[9:])
vcsa[index] = open(vcsaDev, 'rb')
vcsa[index] = open(vcsaDev, "rb")
if index == curr_screen:
last_screen_content = self.read_file(vcsa[index])
# Open VCSU devices if available
if use_vcsu:
vcsu_devices = glob.glob('/dev/vcsu*')
vcsu_devices = glob.glob("/dev/vcsu*")
for vcsuDev in vcsu_devices:
index = str(vcsuDev[9:])
vcsu[index] = open(vcsuDev, 'rb')
vcsu[index] = open(vcsuDev, "rb")
self.update_char_map(curr_screen)
watchdog = select.epoll()
watchdog.register(
vcsa[curr_screen],
select.POLLPRI | select.POLLERR)
vcsa[curr_screen], select.POLLPRI | select.POLLERR
)
watchdog.register(tty, select.POLLPRI | select.POLLERR)
while active.value:
changes = watchdog.poll(1)
@ -203,43 +333,70 @@ class driver(screenDriver):
fileno = change[0]
event = change[1]
if fileno == tty.fileno():
self.env['runtime']['DebugManager'].write_debug_out(
'ScreenChange', debug.DebugLevel.INFO)
self.env["runtime"]["DebugManager"].write_debug_out(
"ScreenChange", debug.DebugLevel.INFO
)
tty.seek(0)
curr_screen = str(tty.read()[3:-1])
if curr_screen != old_screen:
try:
watchdog.unregister(vcsa[old_screen])
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error unregistering watchdog: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"][
"DebugManager"
].write_debug_out(
"vcsaDriver update_watchdog: Error unregistering watchdog: "
+ str(e),
debug.DebugLevel.ERROR,
)
try:
watchdog.register(
vcsa[curr_screen], select.POLLPRI | select.POLLERR)
vcsa[curr_screen],
select.POLLPRI | select.POLLERR,
)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error registering watchdog: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"][
"DebugManager"
].write_debug_out(
"vcsaDriver update_watchdog: Error registering watchdog: "
+ str(e),
debug.DebugLevel.ERROR,
)
self.update_char_map(curr_screen)
old_screen = curr_screen
try:
vcsa[curr_screen].seek(0)
last_screen_content = self.read_file(
vcsa[curr_screen])
vcsa[curr_screen]
)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error reading screen content: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"][
"DebugManager"
].write_debug_out(
"vcsaDriver update_watchdog: Error reading screen content: "
+ str(e),
debug.DebugLevel.ERROR,
)
vcsu_content = None
if use_vcsu:
vcsu[curr_screen].seek(0)
vcsu_content = self.read_file(vcsu[curr_screen])
event_queue.put({"Type": FenrirEventType.screen_changed, "data": self.create_screen_event_data(
curr_screen, last_screen_content, vcsu_content)})
vcsu_content = self.read_file(
vcsu[curr_screen]
)
event_queue.put(
{
"Type": FenrirEventType.screen_changed,
"data": self.create_screen_event_data(
curr_screen,
last_screen_content,
vcsu_content,
),
}
)
else:
self.env['runtime']['DebugManager'].write_debug_out(
'screen_update', debug.DebugLevel.INFO)
self.env["runtime"]["DebugManager"].write_debug_out(
"screen_update", debug.DebugLevel.INFO
)
vcsa[curr_screen].seek(0)
time.sleep(0.01)
dirty_content = self.read_file(vcsa[curr_screen])
@ -247,16 +404,32 @@ class driver(screenDriver):
vcsu_content = None
timeout = time.time()
# error case
if screen_content == b'':
if screen_content == b"":
continue
if last_screen_content == b'':
if last_screen_content == b"":
last_screen_content = screen_content
if (abs(int(screen_content[2]) - int(last_screen_content[2])) in [1, 2]) and (
int(screen_content[3]) == int(last_screen_content[3])):
if (
abs(
int(screen_content[2])
- int(last_screen_content[2])
)
in [1, 2]
) and (
int(screen_content[3])
== int(last_screen_content[3])
):
# Skip X Movement
pass
elif (abs(int(screen_content[3]) - int(last_screen_content[3])) in [1]) and \
(int(screen_content[2]) == int(last_screen_content[2])):
elif (
abs(
int(screen_content[3])
- int(last_screen_content[3])
)
in [1]
) and (
int(screen_content[2])
== int(last_screen_content[2])
):
# Skip Y Movement
pass
else:
@ -268,7 +441,9 @@ class driver(screenDriver):
# if not vcsa[curr_screen] in r:
# break
vcsa[curr_screen].seek(0)
dirty_content = self.read_file(vcsa[curr_screen])
dirty_content = self.read_file(
vcsa[curr_screen]
)
if screen_content == dirty_content:
break
if time.time() - timeout >= 0.1:
@ -278,11 +453,18 @@ class driver(screenDriver):
vcsu[curr_screen].seek(0)
vcsu_content = self.read_file(vcsu[curr_screen])
last_screen_content = screen_content
event_queue.put({"Type": FenrirEventType.screen_update, "data": self.create_screen_event_data(
curr_screen, screen_content, vcsu_content)})
event_queue.put(
{
"Type": FenrirEventType.screen_update,
"data": self.create_screen_event_data(
curr_screen, screen_content, vcsu_content
),
}
)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'VCSA:update_watchdog:' + str(e), debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"VCSA:update_watchdog:" + str(e), debug.DebugLevel.ERROR
)
time.sleep(0.2)
finally:
# Clean up all file handles
@ -290,114 +472,186 @@ class driver(screenDriver):
if watchdog:
watchdog.close()
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver update_watchdog: Error closing watchdog: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver update_watchdog: Error closing watchdog: "
+ str(e),
debug.DebugLevel.ERROR,
)
try:
if tty:
tty.close()
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error closing TTY: ' + str(e), debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error closing TTY: " + str(e),
debug.DebugLevel.ERROR,
)
for handle in vcsa.values():
try:
handle.close()
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error closing VCSA handle: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error closing VCSA handle: "
+ str(e),
debug.DebugLevel.ERROR,
)
for handle in vcsu.values():
try:
handle.close()
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver shutdown: Error closing VCSU handle: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error closing VCSU handle: "
+ str(e),
debug.DebugLevel.ERROR,
)
def create_screen_event_data(self, screen, vcsaContent, vcsu_content=None):
"""Create standardized screen event data from VCSA content.
Processes raw VCSA bytes into a structured event data dictionary
containing screen dimensions, cursor position, text content, and
color attributes.
Args:
screen (str): TTY number (e.g., '1' for tty1)
vcsaContent (bytes): Raw VCSA device content
vcsu_content (bytes, optional): VCSU content for Unicode support
Returns:
dict: Event data with keys:
- bytes: Raw VCSA content
- lines: Screen height
- columns: Screen width
- textCursor: Cursor position {x, y}
- screen: TTY number
- screenUpdateTime: Timestamp
- text: Decoded text content
- attributes: Color/formatting attributes
"""
event_data = {
'bytes': vcsaContent,
'lines': int(vcsaContent[0]),
'columns': int(vcsaContent[1]),
'textCursor':
{
'x': int(vcsaContent[2]),
'y': int(vcsaContent[3])
},
'screen': screen,
'screenUpdateTime': time.time(),
'text': '',
'attributes': [],
"bytes": vcsaContent,
"lines": int(vcsaContent[0]),
"columns": int(vcsaContent[1]),
"textCursor": {"x": int(vcsaContent[2]), "y": int(vcsaContent[3])},
"screen": screen,
"screenUpdateTime": time.time(),
"text": "",
"attributes": [],
}
try:
event_data['text'], event_data['attributes'] = self.auto_decode_vcsa(
vcsaContent[4:], event_data['lines'], event_data['columns'])
event_data["text"], event_data["attributes"] = (
self.auto_decode_vcsa(
vcsaContent[4:], event_data["lines"], event_data["columns"]
)
)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver create_screen_event_data: Error decoding VCSA content: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver create_screen_event_data: Error decoding VCSA content: "
+ str(e),
debug.DebugLevel.ERROR,
)
# VCSU seems to give b' ' instead of b'\x00\x00\x00' (tsp),
# deactivated until its fixed
if vcsu_content is not None:
try:
vcsu_content_as_text = vcsu_content.decode('UTF-32')
event_data['text'] = screen_utils.insert_newlines(
vcsu_content_as_text, event_data['columns'])
vcsu_content_as_text = vcsu_content.decode("UTF-32")
event_data["text"] = screen_utils.insert_newlines(
vcsu_content_as_text, event_data["columns"]
)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver create_screen_event_data: Error decoding VCSU content: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver create_screen_event_data: Error decoding VCSU content: "
+ str(e),
debug.DebugLevel.ERROR,
)
return event_data.copy()
def update_char_map(self, screen):
"""Update character mapping for the specified screen.
Reads the Unicode font mapping from the TTY to properly decode
character data from VCSA. This handles special characters and
Unicode properly.
Args:
screen (str): TTY number to update mapping for
Updates:
self.charmap: Dictionary mapping byte values to Unicode characters
self.hichar: High character mask for extended characters
"""
self.charmap = {}
try:
with open('/dev/tty' + screen, 'rb') as tty:
with open("/dev/tty" + screen, "rb") as tty:
GIO_UNIMAP = 0x4B66
VT_GETHIFONTMASK = 0x560D
himask = array("H", (0,))
ioctl(tty, VT_GETHIFONTMASK, himask)
self.hichar, = unpack_from("@H", himask)
(self.hichar,) = unpack_from("@H", himask)
sz = 512
line = ''
line = ""
while True:
try:
unipairs = array("H", [0] * (2 * sz))
unimapdesc = array(
"B", pack(
"@HP", sz, unipairs.buffer_info()[0]))
"B", pack("@HP", sz, unipairs.buffer_info()[0])
)
ioctl(tty.fileno(), GIO_UNIMAP, unimapdesc)
break
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'VCSA:update_char_map:scaling up sz=' + str(sz) + ' ' + str(e),
debug.DebugLevel.WARNING)
self.env["runtime"]["DebugManager"].write_debug_out(
"VCSA:update_char_map:scaling up sz="
+ str(sz)
+ " "
+ str(e),
debug.DebugLevel.WARNING,
)
sz *= 2
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'VCSA:update_char_map:' + str(e), debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"VCSA:update_char_map:" + str(e), debug.DebugLevel.ERROR
)
return
ncodes, = unpack_from("@H", unimapdesc)
(ncodes,) = unpack_from("@H", unimapdesc)
utable = unpack_from("@%dH" % (2 * ncodes), unipairs)
for u, b in zip(utable[::2], utable[1::2]):
if self.charmap.get(b) is None:
self.charmap[b] = chr(u)
def auto_decode_vcsa(self, allData, rows, cols):
all_text = ''
"""Decode raw VCSA data into text and attributes.
Processes the character and attribute data from VCSA devices,
extracting both the text content and formatting information
(colors, bold, blink, etc.).
Args:
allData (bytes): Raw character and attribute data from VCSA
rows (int): Number of screen rows
cols (int): Number of screen columns
Returns:
tuple: (text_content, attributes)
- text_content (str): Decoded text with newlines
- attributes (list): List of attribute arrays for each character
Note:
Each character in VCSA is stored as 2 bytes: character + attribute.
Attributes encode foreground/background colors, bold, blink, etc.
"""
all_text = ""
all_attrib = []
i = 0
for y in range(rows):
line_text = ''
line_text = ""
line_attrib = []
blink = 0
bold = 0
ink = 7
paper = 0
for x in range(cols):
data = allData[i: i + 2]
data = allData[i : i + 2]
i += 2
if data == b' \x07':
if data == b" \x07":
# attr = 7
# ink = 7
# paper = 0
@ -411,10 +665,11 @@ class driver(screenDriver):
False, # strikethrough
False, # reverse
False, # blink
'default', # fontsize
'default'] # fontfamily
"default", # fontsize
"default",
] # fontfamily
line_attrib.append(char_attrib)
line_text += ' '
line_text += " "
continue
ch = None
try:
@ -425,9 +680,11 @@ class driver(screenDriver):
if sh & self.hichar:
ch |= 0x100
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver auto_decode_vcsa: Error processing character: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver auto_decode_vcsa: Error processing character: "
+ str(e),
debug.DebugLevel.ERROR,
)
ch = None
if self.hichar == 0x100:
attr >>= 1
@ -443,13 +700,15 @@ class driver(screenDriver):
# if (ink != 7) or (paper != 0):
# print(ink,paper)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
'vcsaDriver auto_decode_vcsa: Error processing attributes: ' + str(e),
debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver auto_decode_vcsa: Error processing attributes: "
+ str(e),
debug.DebugLevel.ERROR,
)
try:
line_text += self.charmap[ch]
except KeyError:
line_text += '?'
line_text += "?"
char_attrib = [
self.fgColorValues[ink],
@ -460,30 +719,47 @@ class driver(screenDriver):
False, # strikethrough
False, # reverse
blink == 1, # blink
'default', # fontsize
'default'] # fontfamily
"default", # fontsize
"default",
] # fontfamily
line_attrib.append(char_attrib)
all_text += line_text
if y + 1 < rows:
all_text += '\n'
all_text += "\n"
all_attrib.append(line_attrib)
return str(all_text), all_attrib
def get_curr_application(self):
"""Detect the currently running application on the active TTY.
Uses 'ps' command to identify which process is currently in the
foreground on the active TTY, enabling application-specific features
like bookmarks and settings.
Updates:
env['screen']['new_application']: Name of current application
Note:
Filters out common shell processes (grep, sh, ps) to find the
actual user application.
"""
apps = []
try:
curr_screen = self.env['screen']['newTTY']
apps = subprocess.Popen(
'ps -t tty' +
curr_screen +
' -o comm,tty,stat',
shell=True,
stdout=subprocess.PIPE).stdout.read().decode()[
:-
1].split('\n')
curr_screen = self.env["screen"]["newTTY"]
apps = (
subprocess.Popen(
"ps -t tty" + curr_screen + " -o comm,tty,stat",
shell=True,
stdout=subprocess.PIPE,
)
.stdout.read()
.decode()[:-1]
.split("\n")
)
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
str(e), debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
return
try:
for i in apps:
@ -491,15 +767,23 @@ class driver(screenDriver):
i = i.split()
i[0] = i[0]
i[1] = i[1]
if '+' in i[2]:
if i[0] != '':
if not "GREP" == i[0] and \
not "SH" == i[0] and \
not "PS" == i[0]:
if "+" in i[2]:
if i[0] != "":
if (
not "GREP" == i[0]
and not "SH" == i[0]
and not "PS" == i[0]
):
if "TTY" + curr_screen in i[1]:
if self.env['screen']['new_application'] != i[0]:
self.env['screen']['new_application'] = i[0]
if (
self.env["screen"]["new_application"]
!= i[0]
):
self.env["screen"]["new_application"] = i[
0
]
return
except Exception as e:
self.env['runtime']['DebugManager'].write_debug_out(
str(e), debug.DebugLevel.ERROR)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)