From 0e9bc8ae09b123fe49a9f0a56995cc07805136e2 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Mon, 1 Dec 2025 02:36:07 -0500 Subject: [PATCH] A few fixes to ai.py. --- scripts/ai.py | 173 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 125 insertions(+), 48 deletions(-) diff --git a/scripts/ai.py b/scripts/ai.py index 941c590..6c4cb09 100755 --- a/scripts/ai.py +++ b/scripts/ai.py @@ -20,6 +20,47 @@ import time import pyaudio import wave +class SystemCommands: + """Check availability of required system commands""" + + @staticmethod + def is_command_available(command): + """Check if a command is available in PATH""" + try: + result = subprocess.run(['which', command], + capture_output=True, text=True, timeout=2) + return result.returncode == 0 + except (subprocess.SubprocessError, FileNotFoundError, OSError) as e: + return False + + @staticmethod + def check_dependencies(): + """Check for required system commands and return missing ones""" + required_commands = { + 'scrot': 'Required for screenshots', + 'play': 'Required for audio feedback (sox package)', + 'spd-say': 'Required for text-to-speech output', + } + + optional_commands = { + 'xclip': 'Required for clipboard on X11', + 'wl-paste': 'Required for clipboard on Wayland', + 'tesseract': 'Required for OCR functionality', + } + + missing_required = {} + missing_optional = {} + + for cmd, desc in required_commands.items(): + if not SystemCommands.is_command_available(cmd): + missing_required[cmd] = desc + + for cmd, desc in optional_commands.items(): + if not SystemCommands.is_command_available(cmd): + missing_optional[cmd] = desc + + return missing_required, missing_optional + class VoiceRecognition: """Voice recognition system for AI assistant""" @@ -233,7 +274,7 @@ class OllamaInterface: try: response = requests.get(f'{self.host}/api/tags', timeout=3) return response.status_code == 200 - except: + except (requests.RequestException, ConnectionError, OSError) as e: return False def send_message(self, message, model, context=None, image_path=None): @@ -279,10 +320,10 @@ class ClaudeCodeInterface: def is_available(self): """Check if Claude Code is available""" try: - result = subprocess.run(['claude', '--version'], + result = subprocess.run(['claude', '--version'], capture_output=True, text=True, timeout=5) return result.returncode == 0 - except: + except (subprocess.SubprocessError, FileNotFoundError, OSError) as e: return False def send_message(self, message, context=None, image_path=None): @@ -319,7 +360,7 @@ class WindowContext: def __init__(self): try: self.i3 = i3ipc.Connection() - except: + except (ConnectionError, FileNotFoundError, Exception) as e: self.i3 = None def get_focused_window_info(self): @@ -1041,25 +1082,27 @@ class AiAssistant(Gtk.Window): ai_name = self.get_current_ai_name() else: ai_name = self.get_current_ai_name() - + self.set_response_text(f"{ai_name} is processing your request...") self.askButton.set_sensitive(False) self.contextButton.set_sensitive(False) self.actionButton.set_sensitive(False) - - # Play processing sound - subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800'], - capture_output=True) + + # Play processing sound if available + if SystemCommands.is_command_available('play'): + subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800'], + capture_output=True) def hide_processing(self): """Hide processing message and re-enable buttons""" self.askButton.set_sensitive(True) self.contextButton.set_sensitive(True) self.actionButton.set_sensitive(True) - - # Play completion sound - subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200'], - capture_output=True) + + # Play completion sound if available + if SystemCommands.is_command_available('play'): + subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200'], + capture_output=True) def send_ai_request(self, message, context=None, image_path=None): """Send request to selected AI provider""" @@ -1238,15 +1281,20 @@ class AiAssistant(Gtk.Window): def on_describe_image(self, widget): """Handle describe screenshot button click""" def describe_image_in_thread(): + # Check if scrot is available + if not SystemCommands.is_command_available('scrot'): + GLib.idle_add(self.set_response_text, "Error: scrot not available. Please install scrot for screenshots.") + return + # Take screenshot temp_dir = tempfile.mkdtemp() screenshot_path = os.path.join(temp_dir, 'screenshot.png') - + try: # Use scrot to take screenshot - result = subprocess.run(['scrot', screenshot_path], + result = subprocess.run(['scrot', screenshot_path], capture_output=True, text=True, timeout=10) - + if result.returncode != 0: GLib.idle_add(self.set_response_text, "Error: Could not take screenshot") return @@ -1267,7 +1315,7 @@ class AiAssistant(Gtk.Window): try: os.unlink(screenshot_path) os.rmdir(temp_dir) - except: + except (FileNotFoundError, OSError) as e: pass threading.Thread(target=describe_image_in_thread, daemon=True).start() @@ -1278,15 +1326,18 @@ class AiAssistant(Gtk.Window): try: # First, try to get clipboard content (selected text) # Use wl-paste on Wayland, xclip on X11 + selected_text = "" if os.environ.get('WAYLAND_DISPLAY'): - clipboard_result = subprocess.run(['wl-paste', '-p'], - capture_output=True, text=True, timeout=5) + if SystemCommands.is_command_available('wl-paste'): + clipboard_result = subprocess.run(['wl-paste', '-p'], + capture_output=True, text=True, timeout=5) + selected_text = clipboard_result.stdout.strip() if clipboard_result.returncode == 0 else "" else: - clipboard_result = subprocess.run(['xclip', '-o', '-selection', 'primary'], - capture_output=True, text=True, timeout=5) + if SystemCommands.is_command_available('xclip'): + clipboard_result = subprocess.run(['xclip', '-o', '-selection', 'primary'], + capture_output=True, text=True, timeout=5) + selected_text = clipboard_result.stdout.strip() if clipboard_result.returncode == 0 else "" - selected_text = clipboard_result.stdout.strip() if clipboard_result.returncode == 0 else "" - if selected_text: # We have selected text, analyze it question = self.get_question_text().strip() @@ -1298,15 +1349,20 @@ class AiAssistant(Gtk.Window): else: # No selected text, fallback to OCR of current screen + # Check if scrot is available + if not SystemCommands.is_command_available('scrot'): + GLib.idle_add(self.set_response_text, "Error: No selected text found and scrot not available for screen capture.") + return + # Take screenshot first temp_dir = tempfile.mkdtemp() screenshot_path = os.path.join(temp_dir, 'screen_analysis.png') - + try: # Take screenshot - scrot_result = subprocess.run(['scrot', screenshot_path], + scrot_result = subprocess.run(['scrot', screenshot_path], capture_output=True, text=True, timeout=10) - + if scrot_result.returncode != 0: GLib.idle_add(self.set_response_text, "Error: Could not capture screen content") return @@ -1349,7 +1405,7 @@ class AiAssistant(Gtk.Window): try: os.unlink(screenshot_path) os.rmdir(temp_dir) - except: + except (FileNotFoundError, OSError) as e: pass GLib.idle_add(self.set_response_text, response) @@ -1362,8 +1418,11 @@ class AiAssistant(Gtk.Window): def speak_text(self, text): """Use spd-say to speak text if voice output is enabled""" if self.config.get('voice_output') == 'true': + if not SystemCommands.is_command_available('spd-say'): + print("Warning: spd-say not available for text-to-speech") + return try: - subprocess.run(['spd-say', '-P', 'important', text], + subprocess.run(['spd-say', '-P', 'important', text], capture_output=True, timeout=30) except Exception as e: print(f"Error speaking text: {e}") @@ -1381,17 +1440,19 @@ class AiAssistant(Gtk.Window): def voice_question_thread(): try: self.update_voice_status("🎤 Listening...") - - # Play recording start sound - subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000', 'vol', '0.3'], - capture_output=True) - + + # Play recording start sound if available + if SystemCommands.is_command_available('play'): + subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000', 'vol', '0.3'], + capture_output=True) + timeout = int(self.config.get('voice_timeout', '5')) recognized_text = self.voiceRecognition.recognize_speech(timeout=timeout) - - # Play recording end sound - subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200', 'vol', '0.3'], - capture_output=True) + + # Play recording end sound if available + if SystemCommands.is_command_available('play'): + subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200', 'vol', '0.3'], + capture_output=True) if recognized_text.startswith("Error:") or recognized_text.startswith("Sorry,"): self.update_voice_status(recognized_text) @@ -1468,10 +1529,11 @@ class AiAssistant(Gtk.Window): ai_name = self.get_current_ai_name() self.speak_text("Yes, what can I help you with?") self.update_voice_status(f"🎤 Wake word detected, listening for {ai_name}...") - - # Play wake word detection sound - subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800', 'vol', '0.4'], - capture_output=True) + + # Play wake word detection sound if available + if SystemCommands.is_command_available('play'): + subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800', 'vol', '0.4'], + capture_output=True) def wake_response_thread(): try: @@ -1581,20 +1643,35 @@ class AiAssistant(Gtk.Window): def main(): """Main entry point""" + # Check system dependencies + missing_required, missing_optional = SystemCommands.check_dependencies() + + if missing_required: + print("WARNING: Missing required commands:") + for cmd, desc in missing_required.items(): + print(f" - {cmd}: {desc}") + print("\nSome features may not work properly.") + + if missing_optional: + print("INFO: Missing optional commands:") + for cmd, desc in missing_optional.items(): + print(f" - {cmd}: {desc}") + app = AiAssistant() app.show_all() - - # Play startup sound - subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000'], - capture_output=True) - + + # Play startup sound if available + if SystemCommands.is_command_available('play'): + subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000'], + capture_output=True) + # Connect cleanup on destroy app.connect("destroy", lambda w: app.cleanup()) - + try: Gtk.main() except KeyboardInterrupt: app.cleanup() if __name__ == '__main__': - main() \ No newline at end of file + main()