#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Fenrir TTY screen reader # By Chrys, Storm Dragon, and contributors. from fenrirscreenreader.core import debug from fenrirscreenreader.core.i18n import _ class command: def __init__(self): pass def initialize(self, environment): self.env = environment def shutdown(self): pass def get_description(self): return "Detects progress patterns for progress bar monitoring" def run(self): # Only run if progress monitoring is enabled try: if ( "progressMonitoring" in self.env["commandBuffer"] and self.env["commandBuffer"]["progressMonitoring"] ): # Check if current line is a prompt - if so, reset progress # state if self.is_current_line_prompt(): self.reset_progress_state() # Only check new incoming text (new_delta), but filter out # screen changes elif ( self.env["screen"]["new_delta"] and self.is_real_progress_update() ): self.detect_progress(self.env["screen"]["new_delta"]) except Exception as e: # Silently ignore errors to avoid disrupting normal operation pass def is_real_progress_update(self): """Check if this is a real progress update vs screen change/window switch""" # If the screen/application changed, it's not a progress update if self.env["runtime"]["ScreenManager"].is_screen_change(): return False # If there was a large cursor movement, it's likely navigation, not # progress if self.env["runtime"]["CursorManager"].is_cursor_vertical_move(): x_move = abs( self.env["screen"]["new_cursor"]["x"] - self.env["screen"]["old_cursor"]["x"] ) y_move = abs( self.env["screen"]["new_cursor"]["y"] - self.env["screen"]["old_cursor"]["y"] ) # Large movements suggest navigation, not progress output if y_move > 2 or x_move > 20: return False # Check if delta is too large (screen change) vs small incremental # updates delta_length = len(self.env["screen"]["new_delta"]) if ( delta_length > 200 ): # Allow longer progress lines like Claude Code's status return False # Check if current line looks like a prompt - progress unlikely during # prompts if self.is_current_line_prompt(): return False return True def reset_progress_state(self): """Reset progress state when a prompt is detected, allowing new progress operations to start fresh""" self.env["runtime"]["DebugManager"].write_debug_out( "Resetting progress state due to prompt detection", debug.DebugLevel.INFO, ) self.env["commandBuffer"]["lastProgressValue"] = -1 self.env["commandBuffer"]["lastProgressTime"] = 0 def detect_progress(self, text): import re import time current_time = time.time() # Debug: Print what we're checking self.env["runtime"]["DebugManager"].write_debug_out( "Progress detector checking: '" + text + "'", debug.DebugLevel.INFO ) # Filter out URLs to prevent false positives if self.contains_url(text): self.env["runtime"]["DebugManager"].write_debug_out( "Skipping progress detection - text contains URL", debug.DebugLevel.INFO ) return # Note: Auto-disable on 100% completion removed to respect user # settings # Pattern 1: Percentage (50%, 25.5%, etc.) # Filter out common non-progress percentages (weather, system stats, # etc.) percent_match = re.search(r"(\d+(?:\.\d+)?)\s*%", text) if percent_match: percentage = float(percent_match.group(1)) # Only trigger on realistic progress percentages (0-100%) if 0 <= percentage <= 100: # Filter out weather/system stats that contain percentages if not re.search( r"\b(?:humidity|cpu|memory|disk|usage|temp|weather|forecast)\b", text, re.IGNORECASE, ): self.env["runtime"]["DebugManager"].write_debug_out( "found percentage: " + str(percentage), debug.DebugLevel.INFO, ) if ( percentage != self.env["commandBuffer"]["lastProgressValue"] ): self.env["runtime"]["DebugManager"].write_debug_out( "Playing tone for: " + str(percentage), debug.DebugLevel.INFO, ) self.play_progress_tone(percentage) self.env["commandBuffer"][ "lastProgressValue" ] = percentage self.env["commandBuffer"][ "lastProgressTime" ] = current_time return # Pattern 1a2: Curl classic progress format (percentage without % symbol) # Extract percentage from curl's classic format curl_classic_match = re.search( r"^\s*(\d+)\s+\d+[kMGT]?\s+(\d+)\s+\d+[kMGT]?\s+\d+\s+\d+\s+\d+[kMGT]?\s+\d+\s+\d+:\d+:\d+\s+\d+:\d+:\d+\s+\d+:\d+:\d+\s+\d+[kMGT]?\s*$", text ) if curl_classic_match: # Use the first percentage (total progress) percentage = float(curl_classic_match.group(1)) if 0 <= percentage <= 100: self.env["runtime"]["DebugManager"].write_debug_out( "found curl classic percentage: " + str(percentage), debug.DebugLevel.INFO, ) if ( percentage != self.env["commandBuffer"]["lastProgressValue"] ): self.env["runtime"]["DebugManager"].write_debug_out( "Playing tone for curl: " + str(percentage), debug.DebugLevel.INFO, ) self.play_progress_tone(percentage) self.env["commandBuffer"][ "lastProgressValue" ] = percentage self.env["commandBuffer"][ "lastProgressTime" ] = current_time return # Pattern 1b: Time/token activity (not percentage-based, so use single # beep) time_match = re.search(r"(?:(?:remaining|elapsed|left|ETA|eta)[:;\s]*(\d+)s|(\d+)s\s+(?:remaining|elapsed|left))", text, re.IGNORECASE) token_match = re.search(r"(?:processing|generating|used|consumed)\s+(\d+)\s+tokens", text, re.IGNORECASE) # Pattern 1c: dd command output (bytes copied with transfer rate) dd_match = re.search(r"\d+\s+bytes.*copied.*\d+\s+s.*[kMGT]?B/s", text) # Pattern 1d: Curl-style transfer data (bytes, speed indicators - legacy) curl_match = re.search( r"(\d+\s+\d+\s+\d+\s+\d+.*?(?:k|M|G)?.*?--:--:--|Speed)", text ) # Pattern 1e: General transfer progress (size, rate, time patterns) transfer_match = re.search( r"\d+\s+\d+[kMGT]?\s+\d+\s+\d+[kMGT]?.*?\d+\.\d+[kMGT].*?\d+:\d+:\d+", text ) # Pattern 1f: Pacman-style transfer progress (flexible size/speed/time) pacman_match = re.search( r"\d+(?:\.\d+)?\s+[kKmMgGtT]iB\s+\d+(?:\.\d+)?\s+[kKmMgGtT]iB/s\s+\d+:\d+", text ) if time_match or token_match or dd_match or curl_match or transfer_match or pacman_match: # For non-percentage progress, use a single activity beep every 2 # seconds if ( current_time - self.env["commandBuffer"]["lastProgressTime"] >= 2.0 ): self.env["runtime"]["DebugManager"].write_debug_out( "Playing activity beep for transfer progress", debug.DebugLevel.INFO, ) self.play_activity_beep() self.env["commandBuffer"]["lastProgressTime"] = current_time return # Pattern 2: Fraction (15/100, 3 of 10, etc.) fraction_match = re.search(r"(\d+)\s*(?:of|/)\s*(\d+)", text) if fraction_match: current = int(fraction_match.group(1)) total = int(fraction_match.group(2)) # Filter out dates, page numbers, and other non-progress fractions if (total > 0 and total <= 1000 and current <= total and not re.search(r"\b(?:page|chapter|section|line|row|column|year|month|day)\b", text, re.IGNORECASE) and not re.search(r"\d{1,2}/\d{1,2}/\d{2,4}", text)): # Date pattern percentage = (current / total) * 100 if ( percentage != self.env["commandBuffer"]["lastProgressValue"] ): self.play_progress_tone(percentage) self.env["commandBuffer"]["lastProgressValue"] = percentage self.env["commandBuffer"][ "lastProgressTime" ] = current_time return # Pattern 3: Progress bars ([#### ], [====> ], etc.) # Improved pattern to avoid matching IRC channels like [#channel] bar_match = re.search(r"\[([#=\*]+)([\s\.\-]*)\]", text) if bar_match: filled = len(bar_match.group(1)) unfilled = len(bar_match.group(2)) total = filled + unfilled # Require at least 2 progress chars total and unfilled portion must # be spaces/dots if total >= 2 and ( not bar_match.group(2) or re.match(r"^[\s\.]*$", bar_match.group(2)) ): percentage = (filled / total) * 100 if ( percentage != self.env["commandBuffer"]["lastProgressValue"] ): self.play_progress_tone(percentage) self.env["commandBuffer"]["lastProgressValue"] = percentage self.env["commandBuffer"][ "lastProgressTime" ] = current_time return # Pattern 4: Generic activity indicators (Loading..., Working..., etc.) activity_pattern = re.search( r"(loading|processing|working|installing|downloading|compiling|building).*\.{2,}", text, re.IGNORECASE, ) if activity_pattern: # Play a steady beep every 2 seconds for ongoing activity if ( current_time - self.env["commandBuffer"]["lastProgressTime"] >= 2.0 ): self.play_activity_beep() self.env["commandBuffer"]["lastProgressTime"] = current_time return # Pattern 5: Braille progress indicators braille_match = re.search(r'[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⡿⣟⣯⣷⣾⣽⣻⢿]', text) if braille_match: if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0: self.play_activity_beep() self.env["commandBuffer"]["lastProgressTime"] = current_time return # Pattern 6: Claude Code progress indicators claude_progress_match = re.search(r'^[·✶✢*]\s+\w+[…\.]*\s*\(esc to interrupt\)\s*$', text) if claude_progress_match: if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0: self.play_activity_beep() self.env["commandBuffer"]["lastProgressTime"] = current_time return # Pattern 7: Moon phase progress indicators moon_match = re.search(r'[🌑🌒🌓🌔🌕🌖🌗🌘]', text) if moon_match: moon_phases = { '🌑': 0, '🌒': 12.5, '🌓': 25, '🌔': 37.5, '🌕': 50, '🌖': 62.5, '🌗': 75, '🌘': 87.5 } moon_char = moon_match.group(0) if moon_char in moon_phases: percentage = moon_phases[moon_char] if percentage != self.env["commandBuffer"]["lastProgressValue"]: self.play_progress_tone(percentage) self.env["commandBuffer"]["lastProgressValue"] = percentage return def play_progress_tone(self, percentage): # Map 0-100% to 400-1200Hz frequency range frequency = 400 + (percentage * 8) frequency = max(400, min(1200, frequency)) # Clamp to safe range # Use Sox directly for clean quiet tones like: play -qn synth .1 tri # 400 gain -8 self.play_quiet_tone(frequency, 0.1) def play_activity_beep(self): # Single tone for generic activity self.play_quiet_tone(800, 0.08) def play_quiet_tone(self, frequency, duration): """Play a quiet tone using Sox directly with flood protection""" import shlex import subprocess import time # Flood protection: prevent beeps closer than 0.1 seconds apart current_time = time.time() if not hasattr(self, '_last_beep_time'): self._last_beep_time = 0 if current_time - self._last_beep_time < 0.1: # Skip this beep to prevent audio crackling on low-resource systems return self._last_beep_time = current_time # Build the Sox command: play -qn synth tri gain # -8 command = f"play -qn synth {duration} tri {frequency} gain -8" try: # Only play if sound is enabled if self.env["runtime"]["SettingsManager"].get_setting_as_bool( "sound", "enabled" ): subprocess.Popen( shlex.split(command), stdin=None, stdout=None, stderr=None, shell=False, ) except Exception as e: self.env["runtime"]["DebugManager"].write_debug_out( "Sox tone error: " + str(e), debug.DebugLevel.ERROR ) def is_current_line_prompt(self): """Check if the current line looks like a standalone prompt (not command with progress)""" import re try: # Get the current screen content if not self.env["screen"]["new_content_text"]: return False lines = self.env["screen"]["new_content_text"].split("\n") if not lines: return False # Check the last line (most common) and current cursor line for # prompt patterns lines_to_check = [] # Add last line (most common for prompts) if lines: lines_to_check.append(lines[-1]) # Add current cursor line if different from last line if ( self.env["screen"]["new_cursor"]["y"] < len(lines) and self.env["screen"]["new_cursor"]["y"] != len(lines) - 1 ): lines_to_check.append( lines[self.env["screen"]["new_cursor"]["y"]] ) # Standalone prompt patterns (no commands mixed in) standalone_prompt_patterns = [ r"^\s*\$\s*$", # Just $ (with whitespace) r"^\s*#\s*$", # Just # (with whitespace) r"^\s*>\s*$", # Just > (with whitespace) r"^\[.*\]\s*[\\\$#>]\s*$", # [path]$ without commands r"^[a-zA-Z0-9._-]+[\\\$#>]\s*$", # bash-5.1$ without commands # Interactive prompt patterns (these ARE standalone) r".*\?\s*\[[YyNn]/[YyNn]\]\s*$", # ? [Y/n] or ? [y/N] style r".*\?\s*\[[Yy]es/[Nn]o\]\s*$", # ? [Yes/No] style # "continue? [Y/n]" style r".*continue\?\s*\[[YyNn]/[YyNn]\].*$", r"^::.*\?\s*\[[YyNn]/[YyNn]\].*$", # pacman style prompts # Authentication prompts (these ARE standalone) r"^\[[Ss]udo\]\s*[Pp]assword\s*for\s+.*:\s*$", # [sudo] password r"^[Pp]assword\s*:\s*$", # Password: r".*[Pp]assword\s*:\s*$", # general password prompts # Continuation prompts (these ARE standalone) r"^[Pp]ress\s+any\s+key\s+to\s+continue.*$", # Press any key r"^[Aa]re\s+you\s+sure\?\s*.*$", # Are you sure? ] for line in lines_to_check: line = line.strip() if not line: continue # Check if this line contains both a prompt AND other content (like commands) # If so, don't treat it as a standalone prompt has_prompt_marker = bool( re.search(r".*@.*[\\\$#>]", line) or re.search(r"^\[.*\]\s*[\\\$#>]", line) ) if has_prompt_marker: # If line has prompt marker but also has significant content after it, # it's a command line, not a standalone prompt prompt_end = max( line.rfind("$"), line.rfind("#"), line.rfind(">"), line.rfind("\\"), ) if ( prompt_end >= 0 and prompt_end < len(line) - 5 ): # More than just whitespace after prompt continue # This is a command line, not a standalone prompt for pattern in standalone_prompt_patterns: try: if re.search(pattern, line): return True except re.error: continue return False except Exception: # If anything fails, assume it's not a prompt to be safe return False def contains_url(self, text): """Check if text contains URLs that might cause false progress detection""" import re # Specific URL patterns - only match actual URLs, not filenames url_patterns = [ r"\S+://\S+\.\S{2,}", # Any protocol:// with domain.ext r"www\.[^\s]+\.[a-zA-Z]{2,}", # www.domain.ext patterns ] for pattern in url_patterns: if re.search(pattern, text, re.IGNORECASE): return True return False def set_callback(self, callback): pass