Files
fenrir/src/fenrirscreenreader/commands/onScreenUpdate/65000-progress_detector.py
2025-08-31 14:54:07 -04:00

462 lines
19 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return "Detects progress patterns for progress bar monitoring"
def run(self):
# Only run if progress monitoring is enabled
try:
if (
"progressMonitoring" in self.env["commandBuffer"]
and self.env["commandBuffer"]["progressMonitoring"]
):
# Check if current line is a prompt - if so, reset progress
# state
if self.is_current_line_prompt():
self.reset_progress_state()
# Only check new incoming text (new_delta), but filter out
# screen changes
elif (
self.env["screen"]["new_delta"]
and self.is_real_progress_update()
):
self.detect_progress(self.env["screen"]["new_delta"])
except Exception as e:
# Silently ignore errors to avoid disrupting normal operation
pass
def is_real_progress_update(self):
"""Check if this is a real progress update vs screen change/window switch"""
# If the screen/application changed, it's not a progress update
if self.env["runtime"]["ScreenManager"].is_screen_change():
return False
# If there was a large cursor movement, it's likely navigation, not
# progress
if self.env["runtime"]["CursorManager"].is_cursor_vertical_move():
x_move = abs(
self.env["screen"]["new_cursor"]["x"]
- self.env["screen"]["old_cursor"]["x"]
)
y_move = abs(
self.env["screen"]["new_cursor"]["y"]
- self.env["screen"]["old_cursor"]["y"]
)
# Large movements suggest navigation, not progress output
if y_move > 2 or x_move > 20:
return False
# Check if delta is too large (screen change) vs small incremental
# updates
delta_length = len(self.env["screen"]["new_delta"])
if (
delta_length > 200
): # Allow longer progress lines like Claude Code's status
return False
# Check if current line looks like a prompt - progress unlikely during
# prompts
if self.is_current_line_prompt():
return False
return True
def reset_progress_state(self):
"""Reset progress state when a prompt is detected, allowing new progress operations to start fresh"""
self.env["runtime"]["DebugManager"].write_debug_out(
"Resetting progress state due to prompt detection",
debug.DebugLevel.INFO,
)
self.env["commandBuffer"]["lastProgressValue"] = -1
self.env["commandBuffer"]["lastProgressTime"] = 0
def detect_progress(self, text):
import re
import time
current_time = time.time()
# Debug: Print what we're checking
self.env["runtime"]["DebugManager"].write_debug_out(
"Progress detector checking: '" + text + "'", debug.DebugLevel.INFO
)
# Filter out URLs to prevent false positives
if self.contains_url(text):
self.env["runtime"]["DebugManager"].write_debug_out(
"Skipping progress detection - text contains URL", debug.DebugLevel.INFO
)
return
# Note: Auto-disable on 100% completion removed to respect user
# settings
# Pattern 1: Percentage (50%, 25.5%, etc.)
# Filter out common non-progress percentages (weather, system stats,
# etc.)
percent_match = re.search(r"(\d+(?:\.\d+)?)\s*%", text)
if percent_match:
percentage = float(percent_match.group(1))
# Only trigger on realistic progress percentages (0-100%)
if 0 <= percentage <= 100:
# Filter out weather/system stats that contain percentages
if not re.search(
r"\b(?:humidity|cpu|memory|disk|usage|temp|weather|forecast)\b",
text,
re.IGNORECASE,
):
self.env["runtime"]["DebugManager"].write_debug_out(
"found percentage: " + str(percentage),
debug.DebugLevel.INFO,
)
if (
percentage
!= self.env["commandBuffer"]["lastProgressValue"]
):
self.env["runtime"]["DebugManager"].write_debug_out(
"Playing tone for: " + str(percentage),
debug.DebugLevel.INFO,
)
self.play_progress_tone(percentage)
self.env["commandBuffer"][
"lastProgressValue"
] = percentage
self.env["commandBuffer"][
"lastProgressTime"
] = current_time
return
# Pattern 1a2: Curl classic progress format (percentage without % symbol)
# Extract percentage from curl's classic format
curl_classic_match = re.search(
r"^\s*(\d+)\s+\d+[kMGT]?\s+(\d+)\s+\d+[kMGT]?\s+\d+\s+\d+\s+\d+[kMGT]?\s+\d+\s+\d+:\d+:\d+\s+\d+:\d+:\d+\s+\d+:\d+:\d+\s+\d+[kMGT]?\s*$", text
)
if curl_classic_match:
# Use the first percentage (total progress)
percentage = float(curl_classic_match.group(1))
if 0 <= percentage <= 100:
self.env["runtime"]["DebugManager"].write_debug_out(
"found curl classic percentage: " + str(percentage),
debug.DebugLevel.INFO,
)
if (
percentage
!= self.env["commandBuffer"]["lastProgressValue"]
):
self.env["runtime"]["DebugManager"].write_debug_out(
"Playing tone for curl: " + str(percentage),
debug.DebugLevel.INFO,
)
self.play_progress_tone(percentage)
self.env["commandBuffer"][
"lastProgressValue"
] = percentage
self.env["commandBuffer"][
"lastProgressTime"
] = current_time
return
# Pattern 1b: Time/token activity (not percentage-based, so use single
# beep)
time_match = re.search(r"(?:(?:remaining|elapsed|left|ETA|eta)[:;\s]*(\d+)s|(\d+)s\s+(?:remaining|elapsed|left))", text, re.IGNORECASE)
token_match = re.search(r"(?:processing|generating|used|consumed)\s+(\d+)\s+tokens", text, re.IGNORECASE)
# Pattern 1c: dd command output (bytes copied with transfer rate)
dd_match = re.search(r"\d+\s+bytes.*copied.*\d+\s+s.*[kMGT]?B/s", text)
# Pattern 1d: Curl-style transfer data (bytes, speed indicators - legacy)
curl_match = re.search(
r"(\d+\s+\d+\s+\d+\s+\d+.*?(?:k|M|G)?.*?--:--:--|Speed)", text
)
# Pattern 1e: General transfer progress (size, rate, time patterns)
transfer_match = re.search(
r"\d+\s+\d+[kMGT]?\s+\d+\s+\d+[kMGT]?.*?\d+\.\d+[kMGT].*?\d+:\d+:\d+", text
)
# Pattern 1f: Pacman-style transfer progress (flexible size/speed/time)
pacman_match = re.search(
r"\d+(?:\.\d+)?\s+[kKmMgGtT]iB\s+\d+(?:\.\d+)?\s+[kKmMgGtT]iB/s\s+\d+:\d+", text
)
if time_match or token_match or dd_match or curl_match or transfer_match or pacman_match:
# For non-percentage progress, use a single activity beep every 2
# seconds
if (
current_time - self.env["commandBuffer"]["lastProgressTime"]
>= 2.0
):
self.env["runtime"]["DebugManager"].write_debug_out(
"Playing activity beep for transfer progress",
debug.DebugLevel.INFO,
)
self.play_activity_beep()
self.env["commandBuffer"]["lastProgressTime"] = current_time
return
# Pattern 2: Fraction (15/100, 3 of 10, etc.)
fraction_match = re.search(r"(\d+)\s*(?:of|/)\s*(\d+)", text)
if fraction_match:
current = int(fraction_match.group(1))
total = int(fraction_match.group(2))
# Filter out dates, page numbers, and other non-progress fractions
if (total > 0 and total <= 1000 and current <= total and
not re.search(r"\b(?:page|chapter|section|line|row|column|year|month|day)\b", text, re.IGNORECASE) and
not re.search(r"\d{1,2}/\d{1,2}/\d{2,4}", text)): # Date pattern
percentage = (current / total) * 100
if (
percentage
!= self.env["commandBuffer"]["lastProgressValue"]
):
self.play_progress_tone(percentage)
self.env["commandBuffer"]["lastProgressValue"] = percentage
self.env["commandBuffer"][
"lastProgressTime"
] = current_time
return
# Pattern 3: Progress bars ([#### ], [====> ], etc.)
# Improved pattern to avoid matching IRC channels like [#channel]
bar_match = re.search(r"\[([#=\*]+)([\s\.\-]*)\]", text)
if bar_match:
filled = len(bar_match.group(1))
unfilled = len(bar_match.group(2))
total = filled + unfilled
# Require at least 2 progress chars total and unfilled portion must
# be spaces/dots
if total >= 2 and (
not bar_match.group(2)
or re.match(r"^[\s\.]*$", bar_match.group(2))
):
percentage = (filled / total) * 100
if (
percentage
!= self.env["commandBuffer"]["lastProgressValue"]
):
self.play_progress_tone(percentage)
self.env["commandBuffer"]["lastProgressValue"] = percentage
self.env["commandBuffer"][
"lastProgressTime"
] = current_time
return
# Pattern 4: Generic activity indicators (Loading..., Working..., etc.)
activity_pattern = re.search(
r"(loading|processing|working|installing|downloading|compiling|building).*\.{2,}",
text,
re.IGNORECASE,
)
if activity_pattern:
# Play a steady beep every 2 seconds for ongoing activity
if (
current_time - self.env["commandBuffer"]["lastProgressTime"]
>= 2.0
):
self.play_activity_beep()
self.env["commandBuffer"]["lastProgressTime"] = current_time
return
# Pattern 5: Braille progress indicators
braille_match = re.search(r'[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⡿⣟⣯⣷⣾⣽⣻⢿]', text)
if braille_match:
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
self.play_activity_beep()
self.env["commandBuffer"]["lastProgressTime"] = current_time
return
# Pattern 6: Claude Code progress indicators
claude_progress_match = re.search(r'^[·✶✢✻*]\s+[^(]+[…\.]*\s*\(esc to interrupt[^)]*\)\s*$', text)
if claude_progress_match:
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
self.play_activity_beep()
self.env["commandBuffer"]["lastProgressTime"] = current_time
return
# Pattern 7: Moon phase progress indicators
moon_match = re.search(r'[🌑🌒🌓🌔🌕🌖🌗🌘]', text)
if moon_match:
moon_phases = {
'🌑': 0, '🌒': 12.5, '🌓': 25, '🌔': 37.5,
'🌕': 50, '🌖': 62.5, '🌗': 75, '🌘': 87.5
}
moon_char = moon_match.group(0)
if moon_char in moon_phases:
percentage = moon_phases[moon_char]
if percentage != self.env["commandBuffer"]["lastProgressValue"]:
self.play_progress_tone(percentage)
self.env["commandBuffer"]["lastProgressValue"] = percentage
return
def play_progress_tone(self, percentage):
# Map 0-100% to 400-1200Hz frequency range
frequency = 400 + (percentage * 8)
frequency = max(400, min(1200, frequency)) # Clamp to safe range
# Use Sox directly for clean quiet tones like: play -qn synth .1 tri
# 400 gain -8
self.play_quiet_tone(frequency, 0.1)
def play_activity_beep(self):
# Single tone for generic activity
self.play_quiet_tone(800, 0.08)
def play_quiet_tone(self, frequency, duration):
"""Play a quiet tone using Sox directly with flood protection"""
import shlex
import subprocess
import time
# Flood protection: prevent beeps closer than 0.1 seconds apart
current_time = time.time()
if not hasattr(self, '_last_beep_time'):
self._last_beep_time = 0
if current_time - self._last_beep_time < 0.1:
# Skip this beep to prevent audio crackling on low-resource systems
return
self._last_beep_time = current_time
# Build the Sox command: play -qn synth <duration> tri <frequency> gain
# -8
command = f"play -qn synth {duration} tri {frequency} gain -8"
try:
# Only play if sound is enabled
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"sound", "enabled"
):
subprocess.Popen(
shlex.split(command),
stdin=None,
stdout=None,
stderr=None,
shell=False,
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"Sox tone error: " + str(e), debug.DebugLevel.ERROR
)
def is_current_line_prompt(self):
"""Check if the current line looks like a standalone prompt (not command with progress)"""
import re
try:
# Get the current screen content
if not self.env["screen"]["new_content_text"]:
return False
lines = self.env["screen"]["new_content_text"].split("\n")
if not lines:
return False
# Check the last line (most common) and current cursor line for
# prompt patterns
lines_to_check = []
# Add last line (most common for prompts)
if lines:
lines_to_check.append(lines[-1])
# Add current cursor line if different from last line
if (
self.env["screen"]["new_cursor"]["y"] < len(lines)
and self.env["screen"]["new_cursor"]["y"] != len(lines) - 1
):
lines_to_check.append(
lines[self.env["screen"]["new_cursor"]["y"]]
)
# Standalone prompt patterns (no commands mixed in)
standalone_prompt_patterns = [
r"^\s*\$\s*$", # Just $ (with whitespace)
r"^\s*#\s*$", # Just # (with whitespace)
r"^\s*>\s*$", # Just > (with whitespace)
r"^\[.*\]\s*[\\\$#>]\s*$", # [path]$ without commands
r"^[a-zA-Z0-9._-]+[\\\$#>]\s*$", # bash-5.1$ without commands
# Interactive prompt patterns (these ARE standalone)
r".*\?\s*\[[YyNn]/[YyNn]\]\s*$", # ? [Y/n] or ? [y/N] style
r".*\?\s*\[[Yy]es/[Nn]o\]\s*$", # ? [Yes/No] style
# "continue? [Y/n]" style
r".*continue\?\s*\[[YyNn]/[YyNn]\].*$",
r"^::.*\?\s*\[[YyNn]/[YyNn]\].*$", # pacman style prompts
# Authentication prompts (these ARE standalone)
r"^\[[Ss]udo\]\s*[Pp]assword\s*for\s+.*:\s*$", # [sudo] password
r"^[Pp]assword\s*:\s*$", # Password:
r".*[Pp]assword\s*:\s*$", # general password prompts
# Continuation prompts (these ARE standalone)
r"^[Pp]ress\s+any\s+key\s+to\s+continue.*$", # Press any key
r"^[Aa]re\s+you\s+sure\?\s*.*$", # Are you sure?
]
for line in lines_to_check:
line = line.strip()
if not line:
continue
# Check if this line contains both a prompt AND other content (like commands)
# If so, don't treat it as a standalone prompt
has_prompt_marker = bool(
re.search(r".*@.*[\\\$#>]", line)
or re.search(r"^\[.*\]\s*[\\\$#>]", line)
)
if has_prompt_marker:
# If line has prompt marker but also has significant content after it,
# it's a command line, not a standalone prompt
prompt_end = max(
line.rfind("$"),
line.rfind("#"),
line.rfind(">"),
line.rfind("\\"),
)
if (
prompt_end >= 0 and prompt_end < len(line) - 5
): # More than just whitespace after prompt
continue # This is a command line, not a standalone prompt
for pattern in standalone_prompt_patterns:
try:
if re.search(pattern, line):
return True
except re.error:
continue
return False
except Exception:
# If anything fails, assume it's not a prompt to be safe
return False
def contains_url(self, text):
"""Check if text contains URLs that might cause false progress detection"""
import re
# Specific URL patterns - only match actual URLs, not filenames
url_patterns = [
r"\S+://\S+\.\S{2,}", # Any protocol:// with domain.ext
r"www\.[^\s]+\.[a-zA-Z]{2,}", # www.domain.ext patterns
]
for pattern in url_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
def set_callback(self, callback):
pass