Files
fenrir/src/fenrirscreenreader/screenDriver/vcsaDriver.py
2025-07-17 11:11:35 -04:00

1103 lines
47 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
# attrib:
# http://rampex.ihep.su/Linux/linux_howto/html/tutorials/mini/Colour-ls-6.html
# 0 = black, 1 = blue, 2 = green, 3 = cyan, 4 = red, 5 = purple, 6 = brown/yellow, 7 = white.
# https://github.com/jwilk/vcsapeek/blob/master/linuxvt.py
# blink = 5 if attr & 1 else 0
# bold = 1 if attr & 16 else 0
import fcntl
import glob
import os
import select
import subprocess
import termios
import time
from array import array
from fcntl import ioctl
from struct import pack
from struct import unpack
from struct import unpack_from
import dbus
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.screenDriver import ScreenDriver as screenDriver
from fenrirscreenreader.utils import screen_utils
class driver(screenDriver):
"""Linux VCSA (Virtual Console Screen Access) driver for Fenrir screen reader.
This driver provides access to Linux virtual consoles (TTYs) through the VCSA
interface, allowing real-time monitoring of screen content and cursor position.
It supports both text content extraction and color/attribute detection.
The driver monitors multiple virtual consoles simultaneously and can detect:
- Screen content changes (text updates)
- Cursor movement
- TTY switching
- Text attributes (colors, bold, etc.)
- Session information via D-Bus/logind
Attributes:
ListSessions: D-Bus method for listing login sessions
sysBus: D-Bus system bus connection
charmap: Character mapping for text decoding
bgColorValues: Background color value mappings
fgColorValues: Foreground color value mappings
hichar: High character mask for Unicode support
"""
def __init__(self):
screenDriver.__init__(self)
self.ListSessions = None
self.sysBus = None
self.charmap = {}
self.bgColorValues = {
0: "black",
1: "blue",
2: "green",
3: "cyan",
4: "red",
5: "magenta",
6: "brown/yellow",
7: "white",
}
self.fgColorValues = {
0: "black",
1: "blue",
2: "green",
3: "cyan",
4: "red",
5: "magenta",
6: "brown/yellow",
7: "light gray",
8: "dark gray",
9: "light blue",
10: "light green",
11: "light cyan",
12: "light red",
13: "light magenta",
14: "light yellow",
15: "white",
}
self.hichar = None
try:
# set workaround for paste clipboard -> inject_text_to_screen
subprocess.run(
["sysctl", "dev.tty.legacy_tiocsti=1"],
check=False,
capture_output=True,
timeout=5,
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error running fgconsole: " + str(e),
debug.DebugLevel.ERROR,
)
def initialize(self, environment):
"""Initialize the VCSA driver with the given environment.
Sets up default attributes, starts the screen monitoring watchdog process,
and prepares the driver for screen content monitoring.
Args:
environment: The Fenrir environment dictionary containing runtime managers
and configuration settings.
"""
self.env = environment
self.env["runtime"]["AttributeManager"].append_default_attributes(
[
self.fgColorValues[7], # fg
self.bgColorValues[0], # bg
False, # bold
False, # italics
False, # underscore
False, # strikethrough
False, # reverse
False, # blink
"default", # fontsize
"default", # fontfamily
]
) # end attribute )
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
self.update_watchdog, multiprocess=True
)
def get_curr_screen(self):
"""Get the currently active TTY number.
Reads from /sys/devices/virtual/tty/tty0/active to determine which
virtual console is currently active and updates the environment.
Updates:
env['screen']['oldTTY']: Previous TTY number
env['screen']['newTTY']: Current TTY number
"""
self.env["screen"]["oldTTY"] = self.env["screen"]["newTTY"]
try:
with open(
"/sys/devices/virtual/tty/tty0/active", "r"
) as currScreenFile:
self.env["screen"]["newTTY"] = str(currScreenFile.read()[3:-1])
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
def inject_text_to_screen(self, text, screen=None):
"""Inject text into the specified screen as if typed by user.
Uses the TIOCSTI ioctl to simulate keystrokes on the target TTY.
This is primarily used for clipboard paste functionality.
Args:
text (str): Text to inject into the screen
screen (str, optional): Target screen device (e.g., '/dev/tty1').
If None, uses current TTY.
Note:
Requires appropriate permissions and may need legacy_tiocsti=1
kernel parameter on newer systems.
"""
use_screen = "/dev/tty" + self.env["screen"]["newTTY"]
if screen is not None:
use_screen = screen
with open(use_screen, "w") as fd:
text_bytes = text.encode('utf-8')
for byte in text_bytes:
fcntl.ioctl(fd, termios.TIOCSTI, bytes([byte]))
def get_session_information(self):
"""Retrieve session information via D-Bus logind interface.
Connects to systemd-logind to gather information about active sessions,
including session types and TTY assignments. This helps identify which
screens should be automatically ignored (e.g., X11 sessions).
Updates:
env['screen']['autoIgnoreScreens']: List of screens to ignore
env['general']['curr_user']: Current user for active session
env['general']['prev_user']: Previous user
Note:
Gracefully handles cases where logind is not available.
"""
self.env["screen"]["autoIgnoreScreens"] = []
try:
if not self.sysBus:
self.sysBus = dbus.SystemBus()
obj = self.sysBus.get_object(
"org.freedesktop.login1", "/org/freedesktop/login1"
)
inf = dbus.Interface(obj, "org.freedesktop.login1.Manager")
self.ListSessions = inf.get_dbus_method("ListSessions")
sessions = self.ListSessions()
for session in sessions:
obj = self.sysBus.get_object(
"org.freedesktop.login1", session[4]
)
inf = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
session_type = inf.Get(
"org.freedesktop.login1.Session", "Type"
)
screen = str(inf.Get("org.freedesktop.login1.Session", "VTNr"))
if screen == "":
screen = str(
inf.Get("org.freedesktop.login1.Session", "TTY")
)
screen = screen[screen.upper().find("TTY") + 3 :]
if screen == "":
self.env["runtime"]["DebugManager"].write_debug_out(
"No TTY found for session:" + session[4],
debug.DebugLevel.ERROR,
)
return
if session_type.upper() != "TTY":
self.env["screen"]["autoIgnoreScreens"] += [screen]
if screen == self.env["screen"]["newTTY"]:
if self.env["general"]["curr_user"] != session[2]:
self.env["general"]["prev_user"] = self.env["general"][
"curr_user"
]
self.env["general"]["curr_user"] = session[2]
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"get_session_information: Maybe no LoginD:" + str(e),
debug.DebugLevel.ERROR,
)
# self.env['runtime']['DebugManager'].write_debug_out('get_session_information:' + str(self.env['screen']['autoIgnoreScreens']) + ' ' + str(self.env['general']) ,debug.DebugLevel.INFO)
def read_file(self, file):
"""Read content from a file handle with error recovery.
Attempts to read the entire file content, falling back to
line-by-line reading if the initial read fails. This is used
for reading VCSA/VCSU device files.
Args:
file: Open file handle to read from
Returns:
bytes: File content as bytes
"""
d = b""
file.seek(0)
try:
d = file.read()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver get_screen_text: Error reading file: " + str(e),
debug.DebugLevel.ERROR,
)
file.seek(0)
while True:
# Read from file
try:
d += file.readline(1)
if not d:
break
except Exception as e:
break
return d
def update_watchdog(self, active, event_queue):
"""Main watchdog loop for monitoring screen changes.
This is the core monitoring function that runs in a separate process.
It uses epoll to efficiently monitor multiple VCSA devices and the
active TTY file for changes. When changes are detected, it generates
appropriate events for the main Fenrir process.
The watchdog monitors:
- Screen content changes (text updates)
- TTY switches (screen changes)
- Cursor position changes
Args:
active: Shared boolean value controlling the watchdog loop
event_queue: Queue for sending events to the main process
Events Generated:
- FenrirEventType.screen_changed: When switching TTYs
- FenrirEventType.screen_update: When screen content changes
Note:
This method runs in a multiprocess context and includes comprehensive
cleanup of file handles in the finally block.
"""
vcsa = {}
vcsu = {}
tty = None
watchdog = None
try:
use_vcsu = os.access("/dev/vcsu", os.R_OK)
vcsa_devices = glob.glob("/dev/vcsa*")
vcsu_devices = None
last_screen_content = b""
# Open TTY file with proper cleanup
tty = open("/sys/devices/virtual/tty/tty0/active", "r")
curr_screen = str(tty.read()[3:-1])
old_screen = curr_screen
# Open VCSA devices with proper cleanup tracking
for vcsaDev in vcsa_devices:
index = str(vcsaDev[9:])
vcsa[index] = open(vcsaDev, "rb")
if index == curr_screen:
last_screen_content = self.read_file(vcsa[index])
# Open VCSU devices if available
if use_vcsu:
vcsu_devices = glob.glob("/dev/vcsu*")
for vcsuDev in vcsu_devices:
index = str(vcsuDev[9:])
vcsu[index] = open(vcsuDev, "rb")
self.update_char_map(curr_screen)
watchdog = select.epoll()
watchdog.register(
vcsa[curr_screen], select.POLLPRI | select.POLLERR
)
watchdog.register(tty, select.POLLPRI | select.POLLERR)
while active.value:
changes = watchdog.poll(1)
for change in changes:
fileno = change[0]
event = change[1]
if fileno == tty.fileno():
self.env["runtime"]["DebugManager"].write_debug_out(
"ScreenChange", debug.DebugLevel.INFO
)
tty.seek(0)
curr_screen = str(tty.read()[3:-1])
if curr_screen != old_screen:
try:
watchdog.unregister(vcsa[old_screen])
except Exception as e:
self.env["runtime"][
"DebugManager"
].write_debug_out(
"vcsaDriver update_watchdog: Error unregistering watchdog: "
+ str(e),
debug.DebugLevel.ERROR,
)
try:
watchdog.register(
vcsa[curr_screen],
select.POLLPRI | select.POLLERR,
)
except Exception as e:
self.env["runtime"][
"DebugManager"
].write_debug_out(
"vcsaDriver update_watchdog: Error registering watchdog: "
+ str(e),
debug.DebugLevel.ERROR,
)
self.update_char_map(curr_screen)
old_screen = curr_screen
try:
vcsa[curr_screen].seek(0)
last_screen_content = self.read_file(
vcsa[curr_screen]
)
except Exception as e:
self.env["runtime"][
"DebugManager"
].write_debug_out(
"vcsaDriver update_watchdog: Error reading screen content: "
+ str(e),
debug.DebugLevel.ERROR,
)
vcsu_content = None
if use_vcsu:
vcsu[curr_screen].seek(0)
vcsu_content = self.read_file(
vcsu[curr_screen]
)
event_queue.put(
{
"Type": FenrirEventType.screen_changed,
"data": self.create_screen_event_data(
curr_screen,
last_screen_content,
vcsu_content,
),
}
)
else:
self.env["runtime"]["DebugManager"].write_debug_out(
"screen_update", debug.DebugLevel.INFO
)
vcsa[curr_screen].seek(0)
time.sleep(0.01)
dirty_content = self.read_file(vcsa[curr_screen])
screen_content = dirty_content
vcsu_content = None
timeout = time.time()
# error case
if screen_content == b"":
continue
if last_screen_content == b"":
last_screen_content = screen_content
if (
abs(
int(screen_content[2])
- int(last_screen_content[2])
)
in [1, 2]
) and (
int(screen_content[3])
== int(last_screen_content[3])
):
# Skip X Movement
pass
elif (
abs(
int(screen_content[3])
- int(last_screen_content[3])
)
in [1]
) and (
int(screen_content[2])
== int(last_screen_content[2])
):
# Skip Y Movement
pass
else:
# anything else? wait for completion
while True:
screen_content = dirty_content
time.sleep(0.02)
# r,_,_ = select.select([vcsa[curr_screen]], [], [], 0.07)
# if not vcsa[curr_screen] in r:
# break
vcsa[curr_screen].seek(0)
dirty_content = self.read_file(
vcsa[curr_screen]
)
if screen_content == dirty_content:
break
if time.time() - timeout >= 0.1:
screen_content = dirty_content
break
if use_vcsu:
vcsu[curr_screen].seek(0)
vcsu_content = self.read_file(vcsu[curr_screen])
last_screen_content = screen_content
event_queue.put(
{
"Type": FenrirEventType.screen_update,
"data": self.create_screen_event_data(
curr_screen, screen_content, vcsu_content
),
}
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"VCSA:update_watchdog:" + str(e), debug.DebugLevel.ERROR
)
time.sleep(0.2)
finally:
# Clean up all file handles
try:
if watchdog:
watchdog.close()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver update_watchdog: Error closing watchdog: "
+ str(e),
debug.DebugLevel.ERROR,
)
try:
if tty:
tty.close()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error closing TTY: " + str(e),
debug.DebugLevel.ERROR,
)
for handle in vcsa.values():
try:
handle.close()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error closing VCSA handle: "
+ str(e),
debug.DebugLevel.ERROR,
)
for handle in vcsu.values():
try:
handle.close()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver shutdown: Error closing VCSU handle: "
+ str(e),
debug.DebugLevel.ERROR,
)
def create_screen_event_data(self, screen, vcsaContent, vcsu_content=None):
"""Create standardized screen event data from VCSA content.
Processes raw VCSA bytes into a structured event data dictionary
containing screen dimensions, cursor position, text content, and
color attributes.
Args:
screen (str): TTY number (e.g., '1' for tty1)
vcsaContent (bytes): Raw VCSA device content
vcsu_content (bytes, optional): VCSU content for Unicode support
Returns:
dict: Event data with keys:
- bytes: Raw VCSA content
- lines: Screen height
- columns: Screen width
- textCursor: Cursor position {x, y}
- screen: TTY number
- screenUpdateTime: Timestamp
- text: Decoded text content
- attributes: Color/formatting attributes
"""
event_data = {
"bytes": vcsaContent,
"lines": int(vcsaContent[0]),
"columns": int(vcsaContent[1]),
"textCursor": {"x": int(vcsaContent[2]), "y": int(vcsaContent[3])},
"screen": screen,
"screenUpdateTime": time.time(),
"text": "",
"attributes": [],
}
try:
event_data["text"], event_data["attributes"] = (
self.auto_decode_vcsa(
vcsaContent[4:], event_data["lines"], event_data["columns"]
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver create_screen_event_data: Error decoding VCSA content: "
+ str(e),
debug.DebugLevel.ERROR,
)
# VCSU seems to give b' ' instead of b'\x00\x00\x00' (tsp),
# deactivated until its fixed
if vcsu_content is not None:
try:
vcsu_content_as_text = vcsu_content.decode("UTF-32")
event_data["text"] = screen_utils.insert_newlines(
vcsu_content_as_text, event_data["columns"]
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver create_screen_event_data: Error decoding VCSU content: "
+ str(e),
debug.DebugLevel.ERROR,
)
return event_data.copy()
def update_char_map(self, screen):
"""Update character mapping for the specified screen.
Reads the Unicode font mapping from the TTY to properly decode
character data from VCSA. This handles special characters and
Unicode properly.
Args:
screen (str): TTY number to update mapping for
Updates:
self.charmap: Dictionary mapping byte values to Unicode characters
self.hichar: High character mask for extended characters
"""
self.charmap = {}
try:
with open("/dev/tty" + screen, "rb") as tty:
GIO_UNIMAP = 0x4B66
VT_GETHIFONTMASK = 0x560D
himask = array("H", (0,))
ioctl(tty, VT_GETHIFONTMASK, himask)
(self.hichar,) = unpack_from("@H", himask)
sz = 512
line = ""
while True:
try:
unipairs = array("H", [0] * (2 * sz))
unimapdesc = array(
"B", pack("@HP", sz, unipairs.buffer_info()[0])
)
ioctl(tty.fileno(), GIO_UNIMAP, unimapdesc)
break
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"VCSA:update_char_map:scaling up sz="
+ str(sz)
+ " "
+ str(e),
debug.DebugLevel.WARNING,
)
sz *= 2
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"VCSA:update_char_map:" + str(e), debug.DebugLevel.ERROR
)
return
(ncodes,) = unpack_from("@H", unimapdesc)
utable = unpack_from("@%dH" % (2 * ncodes), unipairs)
for u, b in zip(utable[::2], utable[1::2]):
if self.charmap.get(b) is None:
self.charmap[b] = chr(u)
def auto_decode_vcsa(self, allData, rows, cols):
"""Decode raw VCSA data into text and attributes.
Processes the character and attribute data from VCSA devices,
extracting both the text content and formatting information
(colors, bold, blink, etc.).
Args:
allData (bytes): Raw character and attribute data from VCSA
rows (int): Number of screen rows
cols (int): Number of screen columns
Returns:
tuple: (text_content, attributes)
- text_content (str): Decoded text with newlines
- attributes (list): List of attribute arrays for each character
Note:
Each character in VCSA is stored as 2 bytes: character + attribute.
Attributes encode foreground/background colors, bold, blink, etc.
"""
all_text = ""
all_attrib = []
i = 0
for y in range(rows):
line_text = ""
line_attrib = []
blink = 0
bold = 0
ink = 7
paper = 0
for x in range(cols):
data = allData[i : i + 2]
i += 2
if data == b" \x07":
# attr = 7
# ink = 7
# paper = 0
# ch = ' '
char_attrib = [
self.fgColorValues[7], # fg
self.bgColorValues[0], # bg
False, # bold
False, # italics
False, # underscore
False, # strikethrough
False, # reverse
False, # blink
"default", # fontsize
"default",
] # fontfamily
line_attrib.append(char_attrib)
line_text += " "
continue
ch = None
try:
(sh,) = unpack("=H", data)
attr = (sh >> 8) & 0xFF
ch = sh & 0xFF
try:
if sh & self.hichar:
ch |= 0x100
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver auto_decode_vcsa: Error processing character: "
+ str(e),
debug.DebugLevel.ERROR,
)
ch = None
if self.hichar == 0x100:
attr >>= 1
ink = attr & 0x0F
paper = (attr >> 4) & 0x0F
if attr & 1:
blink = 1
# blink seems to be set always, ignore for now
blink = 0
bold = 0
if attr & 16:
bold = 1
# if (ink != 7) or (paper != 0):
# print(ink,paper)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"vcsaDriver auto_decode_vcsa: Error processing attributes: "
+ str(e),
debug.DebugLevel.ERROR,
)
try:
line_text += self.charmap[ch]
except KeyError:
line_text += "?"
char_attrib = [
self.fgColorValues[ink],
self.bgColorValues[paper],
bold == 1, # bold
False, # italics
False, # underscore
False, # strikethrough
False, # reverse
blink == 1, # blink
"default", # fontsize
"default",
] # fontfamily
line_attrib.append(char_attrib)
all_text += line_text
if y + 1 < rows:
all_text += "\n"
all_attrib.append(line_attrib)
return str(all_text), all_attrib
def get_screen_process_for_tty(self, tty_num):
"""Find the screen process associated with specific TTY"""
try:
result = subprocess.run([
'ps', '-eo', 'pid,ppid,comm,tty,stat', '--no-headers'
], capture_output=True, text=True, timeout=2)
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 4 and parts[2] == 'screen' and f'tty{tty_num}' in parts[3]:
return parts[0] # Return PID of screen process
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
pass
return None
def get_screen_session_process(self, screen_tty_pid):
"""Get the session manager process for a TTY screen process"""
try:
result = subprocess.run([
'ps', '-eo', 'pid,ppid,comm', '--no-headers'
], capture_output=True, text=True, timeout=2)
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 3 and parts[1] == screen_tty_pid and parts[2] == 'screen':
return parts[0] # Return session manager PID
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
pass
return None
def parse_active_app_from_pstree(self, pstree_output):
"""Parse pstree output to find currently active application"""
try:
# Look for processes that indicate active applications
# Example: screen(1786)---bash(1787)---irssi(2016)
import re
# Find all application processes (non-bash, non-screen)
# Pattern excludes --- prefix from pstree connection lines
app_pattern = r'([a-zA-Z0-9_]+)\((\d+)\)'
matches = re.findall(app_pattern, pstree_output)
skip_processes = {'screen', 'bash', 'sh', 'grep', 'ps', 'sudo', 'sleep', 'clipboard_sync'}
applications = []
for app_name, pid in matches:
if app_name.lower() not in skip_processes:
# Check if this process is in foreground state
try:
ps_result = subprocess.run([
'ps', '-p', pid, '-o', 'stat', '--no-headers'
], capture_output=True, text=True, timeout=1)
if ps_result.returncode == 0:
stat = ps_result.stdout.strip()
# Prioritize active processes
priority = 0
if 'S' in stat or 'R' in stat: # Active processes
priority += 10
if '+' in stat: # Foreground processes
priority += 20
if 'l' in stat.lower(): # Locked processes
priority += 5
applications.append((app_name, pid, priority, stat))
except:
# If we can't check status, still consider it with low priority
applications.append((app_name, pid, 1, 'unknown'))
# Sort by priority and return the highest priority application
if applications:
applications.sort(key=lambda x: (x[2], int(x[1])), reverse=True)
best_app = applications[0][0].upper()
self.env["runtime"]["DebugManager"].write_debug_out(
f"parse_active_app_from_pstree found {len(applications)} apps, selected: {best_app}",
debug.DebugLevel.INFO
)
return best_app
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Error parsing pstree output: {str(e)}", debug.DebugLevel.ERROR
)
return None
def get_app_from_screen_session(self, tty_num):
"""Get current application from active screen window on specific TTY"""
try:
# Find the screen session PID for this TTY
ps_result = subprocess.run([
'ps', '-eo', 'pid,ppid,comm,tty,args', '--no-headers'
], capture_output=True, text=True, timeout=2)
if ps_result.returncode == 0:
# Find the main screen process for our TTY
screen_pid = None
for line in ps_result.stdout.split('\n'):
if not line.strip():
continue
parts = line.split(None, 4)
if len(parts) >= 4 and parts[2] == 'screen' and f'tty{tty_num}' in parts[3]:
screen_pid = parts[0]
break
if not screen_pid:
return None
# Get the session manager screen process (child of TTY screen)
session_pid = None
for line in ps_result.stdout.split('\n'):
if not line.strip():
continue
parts = line.split(None, 4)
if len(parts) >= 3 and parts[1] == screen_pid and parts[2] == 'screen':
session_pid = parts[0]
break
if not session_pid:
return None
self.env["runtime"]["DebugManager"].write_debug_out(
f"Found screen session PID: {session_pid}",
debug.DebugLevel.INFO
)
# Get all bash processes under this screen session
bash_processes = []
for line in ps_result.stdout.split('\n'):
if not line.strip():
continue
parts = line.split(None, 4)
if len(parts) >= 3 and parts[1] == session_pid and parts[2] == 'bash':
bash_processes.append(parts[0])
# Check each bash for child applications, prioritizing active ones
best_app = None
best_priority = 0
for bash_pid in bash_processes:
child_result = subprocess.run([
'ps', '--ppid', bash_pid, '-o', 'comm,stat', '--no-headers'
], capture_output=True, text=True, timeout=1)
if child_result.returncode == 0:
for child_line in child_result.stdout.split('\n'):
if not child_line.strip():
continue
child_parts = child_line.split()
if len(child_parts) >= 2:
child_comm, child_stat = child_parts[0], child_parts[1]
if child_comm.lower() not in ['grep', 'ps', 'bash', 'sh', 'sudo', 'sleep']:
# Calculate priority based on process state
priority = 0
if 'R' in child_stat: # Running
priority += 30
elif 'S' in child_stat: # Sleeping
priority += 20
if '+' in child_stat: # Foreground
priority += 20
self.env["runtime"]["DebugManager"].write_debug_out(
f"Found child process: {child_comm} ({child_stat}) priority: {priority}",
debug.DebugLevel.INFO
)
if priority > best_priority:
best_app = child_comm.upper()
best_priority = priority
if best_app:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Selected best app: {best_app} (priority: {best_priority})",
debug.DebugLevel.INFO
)
return best_app
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError) as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Error getting app from screen session: {str(e)}", debug.DebugLevel.ERROR
)
return None
def parse_latest_active_app_from_pstree(self, pstree_output):
"""Parse pstree output to find the most recently active application"""
try:
import re
# Find all application processes with PIDs
app_pattern = r'([a-zA-Z0-9_]+)\((\d+)\)'
matches = re.findall(app_pattern, pstree_output)
skip_processes = {'screen', 'bash', 'sh', 'grep', 'ps', 'sudo', 'sleep', 'clipboard_sync'}
applications = []
for app_name, pid in matches:
if app_name.lower() not in skip_processes:
# Check if this process is still active and get its start time
try:
ps_result = subprocess.run([
'ps', '-p', pid, '-o', 'stat,lstart', '--no-headers'
], capture_output=True, text=True, timeout=1)
if ps_result.returncode == 0:
stat_info = ps_result.stdout.strip().split(None, 1)
if len(stat_info) >= 2:
stat = stat_info[0]
start_time = stat_info[1]
# Prioritize processes that are active or have recent activity
priority = 0
if 'S' in stat or 'R' in stat: # Running or sleeping (active)
priority += 10
if '+' in stat: # Foreground process
priority += 20
applications.append((app_name, pid, priority, start_time))
except:
# If we can't get status, still consider it but with low priority
applications.append((app_name, pid, 1, 'unknown'))
# Sort by priority (highest first), then by PID (most recent)
if applications:
applications.sort(key=lambda x: (x[2], int(x[1])), reverse=True)
best_app = applications[0][0].upper()
self.env["runtime"]["DebugManager"].write_debug_out(
f"Found {len(applications)} applications, selected: {best_app}",
debug.DebugLevel.INFO
)
return best_app
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Error parsing latest active app from pstree: {str(e)}",
debug.DebugLevel.ERROR
)
return None
def get_app_from_tmux_session(self, tty_num):
"""Get current application from tmux session on specific TTY"""
try:
# Try tmux list-panes to find active application
result = subprocess.run([
'tmux', 'list-panes', '-F', '#{pane_active} #{pane_current_command} #{pane_tty}'
], capture_output=True, text=True, timeout=2)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 3 and parts[0] == '1': # Active pane
tty_part = parts[2]
if tty_num in tty_part:
app = parts[1].upper()
if app not in ['BASH', 'SH']:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Found tmux application: {app}",
debug.DebugLevel.INFO
)
return app
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
return None
def get_curr_application(self):
"""Enhanced application detection supporting screen/tmux sessions.
Multi-method approach:
1. Try screen session detection via process tree analysis
2. Try tmux session detection via tmux commands
3. Fall back to standard ps-based detection
Updates:
env['screen']['new_application']: Name of current application
Features:
- Detects applications inside screen/tmux sessions
- Handles multiple screen sessions on different TTYs
- Provides detailed debug logging for troubleshooting
"""
curr_screen = self.env["screen"]["newTTY"]
detected_app = None
self.env["runtime"]["DebugManager"].write_debug_out(
f"Starting application detection for TTY{curr_screen}",
debug.DebugLevel.INFO
)
# Method 1: Try screen session detection
try:
detected_app = self.get_app_from_screen_session(curr_screen)
if detected_app:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Screen session detection found: {detected_app}",
debug.DebugLevel.INFO
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Screen session detection failed: {str(e)}", debug.DebugLevel.ERROR
)
# Method 2: Try tmux session detection
if not detected_app:
try:
detected_app = self.get_app_from_tmux_session(curr_screen)
if detected_app:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Tmux session detection found: {detected_app}",
debug.DebugLevel.INFO
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Tmux session detection failed: {str(e)}", debug.DebugLevel.ERROR
)
# Method 3: Fall back to standard ps-based detection
if not detected_app:
try:
detected_app = self.get_app_via_standard_ps(curr_screen)
if detected_app:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Standard ps detection found: {detected_app}",
debug.DebugLevel.INFO
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Standard ps detection failed: {str(e)}", debug.DebugLevel.ERROR
)
# Update application if we found one and it's different
if detected_app and self.env["screen"]["new_application"] != detected_app:
self.env["screen"]["new_application"] = detected_app
self.env["runtime"]["DebugManager"].write_debug_out(
f"Application changed to: {detected_app}",
debug.DebugLevel.INFO
)
def get_app_via_standard_ps(self, curr_screen):
"""Fallback ps-based application detection for non-screen/tmux environments"""
try:
# Simple TTY-specific detection as fallback
result = subprocess.run([
'ps', '-t', f'tty{curr_screen}', '-o', 'comm,stat', '--no-headers'
], capture_output=True, text=True, timeout=2)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 2:
comm, stat = parts[0], parts[1]
if '+' in stat and comm.upper() not in ['BASH', 'SH', 'SCREEN', 'GREP', 'PS']:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Fallback detection found: {comm}",
debug.DebugLevel.INFO
)
return comm.upper()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
f"Standard ps detection error: {str(e)}", debug.DebugLevel.ERROR
)
return None