#!/usr/bin/env python3 """ AI Assistant Interface Provides accessibility-focused AI interaction with multiple providers """ import gi gi.require_version('Gtk', '3.0') from gi.repository import Gtk, GLib, Gdk import json import os import subprocess import tempfile import configparser from pathlib import Path import i3ipc import threading import requests import time import pyaudio import wave class VoiceRecognition: """Voice recognition system for AI assistant""" def __init__(self, config): self.config = config self.is_recording = False self.audio = None self.stream = None # Audio settings self.sample_rate = 16000 self.chunk_size = 1024 self.audio_format = pyaudio.paInt16 self.channels = 1 try: import speech_recognition as sr self.recognizer = sr.Recognizer() self.microphone = sr.Microphone() self.sr_available = True # Adjust for ambient noise with self.microphone as source: self.recognizer.adjust_for_ambient_noise(source) except ImportError: self.sr_available = False self.recognizer = None self.microphone = None def is_available(self): """Check if voice recognition is available""" return self.sr_available def start_recording(self): """Start recording audio""" if not self.sr_available: return False try: self.audio = pyaudio.PyAudio() self.is_recording = True return True except Exception as e: print(f"Error starting recording: {e}") return False def stop_recording(self): """Stop recording audio""" self.is_recording = False if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None if self.audio: self.audio.terminate() self.audio = None def recognize_speech(self, timeout=5, phrase_timeout=1): """Recognize speech from microphone""" if not self.sr_available: return "Error: Speech recognition not available. Install python-speech-recognition." try: import speech_recognition as sr with self.microphone as source: # Listen for audio with timeout audio = self.recognizer.listen(source, timeout=timeout, phrase_time_limit=phrase_timeout) # Try to recognize speech using Google Speech Recognition try: text = self.recognizer.recognize_google(audio) return text except sr.RequestError: # Try offline recognition as fallback try: text = self.recognizer.recognize_sphinx(audio) return text except sr.RequestError: return "Error: Speech recognition service unavailable" except sr.UnknownValueError: return "Sorry, I couldn't understand that. Please try again." except sr.WaitTimeoutError: return "No speech detected. Please try again." except Exception as e: return f"Error during speech recognition: {str(e)}" def recognize_speech_continuous(self, callback, stop_event): """Continuous speech recognition for wake word detection""" if not self.sr_available: return try: import speech_recognition as sr with self.microphone as source: while not stop_event.is_set(): try: # Listen for 1 second chunks audio = self.recognizer.listen(source, timeout=1, phrase_time_limit=1) try: text = self.recognizer.recognize_google(audio) callback(text.lower()) except (sr.UnknownValueError, sr.RequestError): # Ignore recognition errors in continuous mode pass except sr.WaitTimeoutError: # Normal timeout, continue listening continue except Exception as e: print(f"Error in continuous recognition: {e}") break except ImportError: print("Speech recognition not available") return class AiConfig: """Configuration manager for AI settings with XDG directory support""" def __init__(self): self.configDir = Path(os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))) / 'stormux' / 'I38' self.configFile = self.configDir / 'ai.conf' self.configDir.mkdir(parents=True, exist_ok=True) self.load_config() def load_config(self): """Load configuration from file""" self.config = configparser.ConfigParser() self.config.read(self.configFile) # Set defaults if sections don't exist if 'ai' not in self.config: self.config.add_section('ai') # Default values defaults = { 'provider': 'claude-code', 'ollama_model': 'llama2', 'ollama_vision_model': 'llava', 'ollama_host': 'http://localhost:11434', 'confirm_actions': 'true', 'voice_enabled': 'false', 'voice_output': 'true', 'wake_word': 'hey assistant', 'voice_timeout': '5', 'continuous_listening': 'false' } for key, value in defaults.items(): if key not in self.config['ai']: self.config.set('ai', key, value) def save_config(self): """Save configuration to file""" with open(self.configFile, 'w') as f: self.config.write(f) def get(self, key, fallback=None): """Get configuration value""" return self.config.get('ai', key, fallback=fallback) def set(self, key, value): """Set configuration value""" self.config.set('ai', key, str(value)) self.save_config() class OllamaInterface: """Interface for Ollama AI provider""" def __init__(self, host='http://localhost:11434'): self.host = host def get_models(self): """Get list of available Ollama models""" try: response = requests.get(f'{self.host}/api/tags', timeout=5) if response.status_code == 200: models = response.json().get('models', []) return [model['name'] for model in models] except Exception as e: print(f"Error getting Ollama models: {e}") return [] def get_vision_models(self): """Get list of models that can handle images""" all_models = self.get_models() # Common vision model patterns vision_patterns = ['llava', 'llama3.2-vision', 'minicpm-v', 'bakllava', 'moondream'] vision_models = [] for model in all_models: model_lower = model.lower() if any(pattern in model_lower for pattern in vision_patterns): vision_models.append(model) return vision_models def is_vision_model(self, model_name): """Check if a model can handle images""" if not model_name: return False model_lower = model_name.lower() vision_patterns = ['llava', 'llama3.2-vision', 'minicpm-v', 'bakllava', 'moondream'] return any(pattern in model_lower for pattern in vision_patterns) def is_available(self): """Check if Ollama is running and available""" try: response = requests.get(f'{self.host}/api/tags', timeout=3) return response.status_code == 200 except: return False def send_message(self, message, model, context=None, image_path=None): """Send message to Ollama""" try: data = { 'model': model, 'prompt': message, 'stream': False } if context and not context.startswith("You are a helpful AI assistant"): data['system'] = context # Handle image if provided if image_path and os.path.exists(image_path): import base64 # Check if the model can handle images if not self.is_vision_model(model): return f"Error: Model '{model}' cannot process images. Please select a vision model like llava or llama3.2-vision in settings." # Encode image to base64 try: with open(image_path, 'rb') as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') data['images'] = [image_data] except Exception as e: return f"Error reading image: {str(e)}" response = requests.post(f'{self.host}/api/generate', json=data, timeout=60) # Longer timeout for image processing if response.status_code == 200: return response.json().get('response', 'No response received') else: return f"Error: HTTP {response.status_code}" except Exception as e: return f"Error communicating with Ollama: {str(e)}" class ClaudeCodeInterface: """Interface for Claude Code AI provider""" def is_available(self): """Check if Claude Code is available""" try: result = subprocess.run(['claude', '--version'], capture_output=True, text=True, timeout=5) return result.returncode == 0 except: return False def send_message(self, message, context=None, image_path=None): """Send message to Claude Code""" try: cmd = ['claude'] # Add context if provided if context and not context.startswith("You are a helpful AI assistant"): message = f"Context: {context}\n\n{message}" # Add image if provided if image_path and os.path.exists(image_path): cmd.extend(['--image', image_path]) # Send the message cmd.append(message) # Run from home directory to avoid picking up project context result = subprocess.run(cmd, capture_output=True, text=True, timeout=60, cwd=os.path.expanduser('~')) if result.returncode == 0: return result.stdout.strip() else: return f"Error: {result.stderr.strip()}" except subprocess.TimeoutExpired: return "Error: Request timed out" except Exception as e: return f"Error communicating with Claude Code: {str(e)}" class WindowContext: """Get context information from focused window""" def __init__(self): try: self.i3 = i3ipc.Connection() except: self.i3 = None def get_focused_window_info(self): """Get information about the currently focused window""" if not self.i3: return "Unable to connect to i3" try: tree = self.i3.get_tree() focused = tree.find_focused() if not focused: return "No focused window found" info = { 'name': focused.name or 'Unknown', 'class': getattr(focused, 'window_class', 'Unknown'), 'title': getattr(focused, 'window_title', 'Unknown'), 'workspace': focused.workspace().name if focused.workspace() else 'Unknown' } return f"Current application: {info['name']}\nWindow type: {info['class']}" except Exception as e: return f"Error getting window info: {str(e)}" class AiAssistant(Gtk.Window): """Main AI Assistant window with accessibility features""" def __init__(self): super().__init__(title="AI Assistant") # Initialize components self.config = AiConfig() self.claudeInterface = ClaudeCodeInterface() self.ollamaInterface = OllamaInterface(self.config.get('ollama_host')) self.windowContext = WindowContext() self.voiceRecognition = VoiceRecognition(self.config) # Voice mode state self.continuousListening = False self.listeningThread = None self.stopListening = threading.Event() # Window setup self.set_default_size(600, 500) self.set_position(Gtk.WindowPosition.CENTER) self.connect("destroy", Gtk.main_quit) self.connect("key-press-event", self.on_key_press) # Enable accessibility self.set_can_focus(True) self.set_focus_on_map(True) # Create notebook for tabs self.notebook = Gtk.Notebook() self.notebook.set_tab_pos(Gtk.PositionType.TOP) self.notebook.set_can_focus(True) self.notebook.set_scrollable(True) self.notebook.connect("switch-page", self.on_tab_switched) self.add(self.notebook) # Create tabs self.create_interaction_tab() self.create_settings_tab() # Set focus to interaction tab and initial focus self.notebook.set_current_page(0) # Set initial focus to question text after window is shown GLib.idle_add(self.set_initial_focus) # Update button labels with current AI provider GLib.idle_add(self.update_button_labels) def create_interaction_tab(self): """Create the main interaction tab""" # Create main container vbox = Gtk.VBox(spacing=10) vbox.set_border_width(10) # Add question section questionLabel = Gtk.Label("_Ask AI a question:") questionLabel.set_use_underline(True) questionLabel.set_alignment(0, 0.5) vbox.pack_start(questionLabel, False, False, 0) # Question text view with scrolling scrollWindow = Gtk.ScrolledWindow() scrollWindow.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) scrollWindow.set_size_request(-1, 100) scrollWindow.set_can_focus(True) self.questionText = Gtk.TextView() self.questionText.set_wrap_mode(Gtk.WrapMode.WORD) self.questionText.set_can_focus(True) self.questionText.set_accepts_tab(False) # Allow Tab to move focus instead of inserting tab # Set accessibility properties atk_obj = self.questionText.get_accessible() atk_obj.set_name("Question input") atk_obj.set_description("Enter your question for the AI assistant here") # Link label to text view for screen readers questionLabel.set_mnemonic_widget(self.questionText) # Connect key press event for additional navigation self.questionText.connect("key-press-event", self.on_textview_key_press) scrollWindow.add(self.questionText) vbox.pack_start(scrollWindow, False, False, 0) # Action buttons row buttonBox = Gtk.HBox(spacing=10) self.askButton = Gtk.Button("Ask _Question") self.askButton.set_use_underline(True) self.askButton.connect("clicked", self.on_ask_question) self.askButton.set_can_focus(True) self.askButton.get_accessible().set_description("Send your question to the AI assistant") buttonBox.pack_start(self.askButton, True, True, 0) self.contextButton = Gtk.Button("Ask About _Window") self.contextButton.set_use_underline(True) self.contextButton.connect("clicked", self.on_ask_with_context) self.contextButton.set_can_focus(True) self.contextButton.get_accessible().set_description("Ask about the currently focused window") buttonBox.pack_start(self.contextButton, True, True, 0) self.actionButton = Gtk.Button("Request _Action") self.actionButton.set_use_underline(True) self.actionButton.connect("clicked", self.on_request_action) self.actionButton.set_can_focus(True) self.actionButton.get_accessible().set_description("Request step-by-step instructions from AI") buttonBox.pack_start(self.actionButton, True, True, 0) vbox.pack_start(buttonBox, False, False, 0) # Voice input section voiceFrame = Gtk.Frame(label="Voice Input") voiceBox = Gtk.HBox(spacing=10) voiceBox.set_border_width(10) self.voiceButton = Gtk.Button("🎤 _Voice Question") self.voiceButton.set_use_underline(True) self.voiceButton.connect("clicked", self.on_voice_question) self.voiceButton.set_can_focus(True) self.voiceButton.set_sensitive(self.voiceRecognition.is_available()) self.voiceButton.get_accessible().set_description("Record your question using voice input") voiceBox.pack_start(self.voiceButton, True, True, 0) self.listenToggle = Gtk.ToggleButton("👂 _Continuous Listen") self.listenToggle.set_use_underline(True) self.listenToggle.connect("toggled", self.on_toggle_continuous_listening) self.listenToggle.set_can_focus(True) self.listenToggle.set_sensitive(self.voiceRecognition.is_available()) self.listenToggle.get_accessible().set_description("Toggle continuous listening for wake word") voiceBox.pack_start(self.listenToggle, True, True, 0) # Voice status label self.voiceStatus = Gtk.Label("") voiceBox.pack_start(self.voiceStatus, False, False, 0) voiceFrame.add(voiceBox) vbox.pack_start(voiceFrame, False, False, 0) # File sharing section fileLabel = Gtk.Label("Share _file with AI:") fileLabel.set_use_underline(True) fileLabel.set_alignment(0, 0.5) vbox.pack_start(fileLabel, False, False, 0) fileBox = Gtk.HBox(spacing=10) self.fileEntry = Gtk.Entry() self.fileEntry.set_placeholder_text("Select a file to share...") self.fileEntry.set_can_focus(True) self.fileEntry.get_accessible().set_name("File path") self.fileEntry.get_accessible().set_description("Path to file to share with AI") fileLabel.set_mnemonic_widget(self.fileEntry) fileBox.pack_start(self.fileEntry, True, True, 0) self.browseButton = Gtk.Button("_Browse") self.browseButton.set_use_underline(True) self.browseButton.connect("clicked", self.on_browse_file) self.browseButton.set_can_focus(True) self.browseButton.get_accessible().set_description("Browse for file to share") fileBox.pack_start(self.browseButton, False, False, 0) self.shareButton = Gtk.Button("Ask About _File") self.shareButton.set_use_underline(True) self.shareButton.connect("clicked", self.on_ask_about_file) self.shareButton.set_can_focus(True) self.shareButton.get_accessible().set_description("Ask AI about the selected file") fileBox.pack_start(self.shareButton, False, False, 0) vbox.pack_start(fileBox, False, False, 0) # Image description section self.imageButton = Gtk.Button("Describe _Screenshot") self.imageButton.set_use_underline(True) self.imageButton.connect("clicked", self.on_describe_image) self.imageButton.set_can_focus(True) self.imageButton.get_accessible().set_description("Take screenshot and get AI description") vbox.pack_start(self.imageButton, False, False, 0) # Selected text section self.selectedButton = Gtk.Button("Analyze _Selected Content") self.selectedButton.set_use_underline(True) self.selectedButton.connect("clicked", self.on_analyze_selected) self.selectedButton.set_can_focus(True) self.selectedButton.get_accessible().set_description("Analyze selected text or screen content using OCR") vbox.pack_start(self.selectedButton, False, False, 0) # Response section self.responseLabel = Gtk.Label("AI _Response:") self.responseLabel.set_use_underline(True) self.responseLabel.set_alignment(0, 0.5) vbox.pack_start(self.responseLabel, False, False, 0) # Response text view with scrolling responseScrollWindow = Gtk.ScrolledWindow() responseScrollWindow.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) responseScrollWindow.set_can_focus(True) self.responseText = Gtk.TextView() self.responseText.set_wrap_mode(Gtk.WrapMode.WORD) self.responseText.set_editable(False) self.responseText.set_can_focus(True) self.responseText.set_accepts_tab(False) # Set accessibility properties for response response_atk = self.responseText.get_accessible() response_atk.set_name("AI Response") response_atk.set_description("AI assistant's response to your question") # Link response label to text view self.responseLabel.set_mnemonic_widget(self.responseText) responseScrollWindow.add(self.responseText) vbox.pack_start(responseScrollWindow, True, True, 0) # Add tab to notebook tabLabel = Gtk.Label("Interaction") self.notebook.append_page(vbox, tabLabel) # Set initial focus self.questionText.grab_focus() def create_settings_tab(self): """Create the settings tab""" # Create main container vbox = Gtk.VBox(spacing=15) vbox.set_border_width(15) vbox.set_can_focus(False) # Container shouldn't steal focus # AI Provider section providerFrame = Gtk.Frame(label="AI Provider") providerBox = Gtk.VBox(spacing=10) providerBox.set_border_width(10) # Make sure the box itself doesn't interfere with focus providerBox.set_can_focus(False) # Claude Code option self.claudeRadio = Gtk.RadioButton.new_with_mnemonic(None, "_Claude Code") self.claudeRadio.connect("toggled", self.on_provider_changed) self.claudeRadio.set_can_focus(True) self.claudeRadio.get_accessible().set_description("Use Claude Code CLI as AI provider") providerBox.pack_start(self.claudeRadio, False, False, 0) # Ollama option self.ollamaRadio = Gtk.RadioButton.new_with_mnemonic_from_widget(self.claudeRadio, "_Ollama") self.ollamaRadio.connect("toggled", self.on_provider_changed) self.ollamaRadio.set_can_focus(True) self.ollamaRadio.get_accessible().set_description("Use local Ollama service as AI provider") providerBox.pack_start(self.ollamaRadio, False, False, 0) providerFrame.add(providerBox) vbox.pack_start(providerFrame, False, False, 0) # Ollama settings self.ollamaFrame = Gtk.Frame(label="Ollama Settings") ollamaBox = Gtk.VBox(spacing=10) ollamaBox.set_border_width(10) # Text Models section modelLabel = Gtk.Label("Text Models:") modelLabel.set_alignment(0, 0.5) ollamaBox.pack_start(modelLabel, False, False, 0) # Container for text model radio buttons self.textModelBox = Gtk.VBox(spacing=5) self.textModelBox.set_border_width(10) ollamaBox.pack_start(self.textModelBox, False, False, 0) # Will be populated with radio buttons in refresh_ollama_models() self.textModelRadios = [] self.textModelGroup = None # Vision Models section visionModelLabel = Gtk.Label("Vision Models:") visionModelLabel.set_alignment(0, 0.5) ollamaBox.pack_start(visionModelLabel, False, False, 0) # Container for vision model radio buttons self.visionModelBox = Gtk.VBox(spacing=5) self.visionModelBox.set_border_width(10) ollamaBox.pack_start(self.visionModelBox, False, False, 0) # Will be populated with radio buttons in refresh_ollama_models() self.visionModelRadios = [] self.visionModelGroup = None # Refresh models button self.refreshButton = Gtk.Button("_Refresh Models") self.refreshButton.set_use_underline(True) self.refreshButton.connect("clicked", self.on_refresh_models) self.refreshButton.set_can_focus(True) self.refreshButton.get_accessible().set_description("Refresh the list of available Ollama models") ollamaBox.pack_start(self.refreshButton, False, False, 0) # Host entry hostLabel = Gtk.Label("Ollama _Host:") hostLabel.set_use_underline(True) hostLabel.set_alignment(0, 0.5) ollamaBox.pack_start(hostLabel, False, False, 0) self.hostEntry = Gtk.Entry() self.hostEntry.set_text(self.config.get('ollama_host')) self.hostEntry.set_can_focus(True) self.hostEntry.get_accessible().set_name("Ollama host URL") self.hostEntry.get_accessible().set_description("URL of the Ollama service") hostLabel.set_mnemonic_widget(self.hostEntry) ollamaBox.pack_start(self.hostEntry, False, False, 0) self.ollamaFrame.add(ollamaBox) vbox.pack_start(self.ollamaFrame, False, False, 0) # Voice settings self.voiceFrame = Gtk.Frame(label="Voice Settings") voiceSettingsBox = Gtk.VBox(spacing=10) voiceSettingsBox.set_border_width(10) self.voiceEnabledCheck = Gtk.CheckButton("Enable _voice input") self.voiceEnabledCheck.set_use_underline(True) self.voiceEnabledCheck.set_active(self.config.get('voice_enabled') == 'true') self.voiceEnabledCheck.set_sensitive(self.voiceRecognition.is_available()) self.voiceEnabledCheck.set_can_focus(True) self.voiceEnabledCheck.get_accessible().set_description("Enable voice input for asking questions") voiceSettingsBox.pack_start(self.voiceEnabledCheck, False, False, 0) self.voiceOutputCheck = Gtk.CheckButton("Enable voice _output (speak responses)") self.voiceOutputCheck.set_use_underline(True) self.voiceOutputCheck.set_active(self.config.get('voice_output') == 'true') self.voiceOutputCheck.set_can_focus(True) self.voiceOutputCheck.get_accessible().set_description("Speak AI responses aloud using text-to-speech") voiceSettingsBox.pack_start(self.voiceOutputCheck, False, False, 0) # Wake word entry wakeWordLabel = Gtk.Label("_Wake word phrase:") wakeWordLabel.set_use_underline(True) wakeWordLabel.set_alignment(0, 0.5) voiceSettingsBox.pack_start(wakeWordLabel, False, False, 0) self.wakeWordEntry = Gtk.Entry() self.wakeWordEntry.set_text(self.config.get('wake_word')) self.wakeWordEntry.set_placeholder_text("e.g., 'hey assistant'") self.wakeWordEntry.set_can_focus(True) self.wakeWordEntry.get_accessible().set_name("Wake word phrase") self.wakeWordEntry.get_accessible().set_description("Phrase to activate voice listening") wakeWordLabel.set_mnemonic_widget(self.wakeWordEntry) voiceSettingsBox.pack_start(self.wakeWordEntry, False, False, 0) # Voice timeout timeoutLabel = Gtk.Label("Voice recognition _timeout (seconds):") timeoutLabel.set_use_underline(True) timeoutLabel.set_alignment(0, 0.5) voiceSettingsBox.pack_start(timeoutLabel, False, False, 0) self.timeoutSpin = Gtk.SpinButton.new_with_range(1, 30, 1) self.timeoutSpin.set_value(int(self.config.get('voice_timeout', '5'))) self.timeoutSpin.set_can_focus(True) self.timeoutSpin.get_accessible().set_name("Voice timeout") self.timeoutSpin.get_accessible().set_description("How long to listen for speech in seconds") timeoutLabel.set_mnemonic_widget(self.timeoutSpin) voiceSettingsBox.pack_start(self.timeoutSpin, False, False, 0) # Voice status voiceStatusLabel = Gtk.Label("") if not self.voiceRecognition.is_available(): voiceStatusLabel.set_text("Voice recognition unavailable - install python-speech-recognition and python-pyaudio") voiceStatusLabel.set_line_wrap(True) else: voiceStatusLabel.set_text("Voice recognition available") voiceSettingsBox.pack_start(voiceStatusLabel, False, False, 0) self.voiceFrame.add(voiceSettingsBox) vbox.pack_start(self.voiceFrame, False, False, 0) # General settings generalFrame = Gtk.Frame(label="General Settings") generalBox = Gtk.VBox(spacing=10) generalBox.set_border_width(10) self.confirmCheck = Gtk.CheckButton("_Confirm AI actions before execution") self.confirmCheck.set_use_underline(True) self.confirmCheck.set_active(self.config.get('confirm_actions') == 'true') self.confirmCheck.set_can_focus(True) self.confirmCheck.get_accessible().set_description("Show confirmation dialog before executing AI suggested actions") generalBox.pack_start(self.confirmCheck, False, False, 0) generalFrame.add(generalBox) vbox.pack_start(generalFrame, False, False, 0) # Save button self.saveButton = Gtk.Button("_Save Settings") self.saveButton.set_use_underline(True) self.saveButton.connect("clicked", self.on_save_settings) self.saveButton.set_can_focus(True) self.saveButton.get_accessible().set_description("Save all configuration changes") vbox.pack_start(self.saveButton, False, False, 0) # Status label self.statusLabel = Gtk.Label("") vbox.pack_start(self.statusLabel, False, False, 0) # Add tab to notebook tabLabel = Gtk.Label("Settings") self.notebook.append_page(vbox, tabLabel) # Don't set focus chain - let GTK handle it naturally # Load current settings self.load_current_settings() def on_radio_key_press(self, widget, event): """Handle key press events for radio buttons""" keyval = event.keyval # Arrow keys and space to change radio button selection if keyval in [Gdk.KEY_Up, Gdk.KEY_Down, Gdk.KEY_Left, Gdk.KEY_Right, Gdk.KEY_space]: if widget == self.claudeRadio: self.ollamaRadio.set_active(True) self.ollamaRadio.grab_focus() else: self.claudeRadio.set_active(True) self.claudeRadio.grab_focus() return True return False def on_combo_key_press(self, widget, event): """Handle key press events for combo boxes to allow Tab navigation""" keyval = event.keyval state = event.state & Gdk.ModifierType.CONTROL_MASK # Allow Tab and Shift+Tab to move focus away from combo box if keyval == Gdk.KEY_Tab: # Close combo box popup if open widget.popdown() # Let the normal tab handling take over if event.state & Gdk.ModifierType.SHIFT_MASK: # Shift+Tab - move to previous widget widget.get_toplevel().child_focus(Gtk.DirectionType.TAB_BACKWARD) else: # Tab - move to next widget widget.get_toplevel().child_focus(Gtk.DirectionType.TAB_FORWARD) return True return False def setup_settings_focus_chain(self, container): """Set up explicit focus chain for settings tab - disabled for now""" # Commenting out focus chain to let GTK handle it naturally # GTK accessibility with explicit focus chains is problematic pass def load_current_settings(self): """Load current settings into UI""" provider = self.config.get('provider') if provider == 'claude-code': self.claudeRadio.set_active(True) else: self.ollamaRadio.set_active(True) self.on_provider_changed(None) self.refresh_ollama_models() # Set saved models after radio buttons are created self.set_saved_model_selections() def set_saved_model_selections(self): """Set the saved model selections on radio buttons""" saved_model = self.config.get('ollama_model') saved_vision_model = self.config.get('ollama_vision_model') # Set text model selection for radio in self.textModelRadios: if radio.get_label() == saved_model: radio.set_active(True) break # Set vision model selection for radio in self.visionModelRadios: if radio.get_label() == saved_vision_model: radio.set_active(True) break def on_provider_changed(self, widget): """Handle provider radio button change""" if self.claudeRadio.get_active(): self.ollamaFrame.set_sensitive(False) self.update_status("Claude Code selected") else: self.ollamaFrame.set_sensitive(True) self.update_status("Ollama selected") def refresh_ollama_models(self): """Refresh the list of available Ollama models using radio buttons""" # Clear existing radio buttons for radio in self.textModelRadios: self.textModelBox.remove(radio) for radio in self.visionModelRadios: self.visionModelBox.remove(radio) self.textModelRadios = [] self.visionModelRadios = [] self.textModelGroup = None self.visionModelGroup = None if self.ollamaInterface.is_available(): all_models = self.ollamaInterface.get_models() vision_models = self.ollamaInterface.get_vision_models() # Create radio buttons for text models (all models) for i, model in enumerate(all_models): if i == 0: # First radio button in group radio = Gtk.RadioButton.new_with_label(None, model) self.textModelGroup = radio else: # Additional radio buttons in group radio = Gtk.RadioButton.new_with_label_from_widget(self.textModelGroup, model) radio.set_can_focus(True) radio.get_accessible().set_description(f"Use {model} for text questions") radio.connect("toggled", self.on_text_model_changed) self.textModelRadios.append(radio) self.textModelBox.pack_start(radio, False, False, 0) # Create radio buttons for vision models for i, model in enumerate(vision_models): if i == 0: # First radio button in vision group radio = Gtk.RadioButton.new_with_label(None, model) self.visionModelGroup = radio else: # Additional radio buttons in vision group radio = Gtk.RadioButton.new_with_label_from_widget(self.visionModelGroup, model) radio.set_can_focus(True) radio.get_accessible().set_description(f"Use {model} for image analysis") radio.connect("toggled", self.on_vision_model_changed) self.visionModelRadios.append(radio) self.visionModelBox.pack_start(radio, False, False, 0) # Add "None" option for vision models if vision_models: radio = Gtk.RadioButton.new_with_label_from_widget(self.visionModelGroup, "(No vision model)") radio.set_can_focus(True) radio.get_accessible().set_description("Don't use vision models") radio.connect("toggled", self.on_vision_model_changed) self.visionModelRadios.append(radio) self.visionModelBox.pack_start(radio, False, False, 0) # Show the new radio buttons self.textModelBox.show_all() self.visionModelBox.show_all() if all_models: # Select first text model by default if self.textModelRadios: self.textModelRadios[0].set_active(True) # Select first vision model by default if self.visionModelRadios: self.visionModelRadios[0].set_active(True) status = f"Found {len(all_models)} total models" if vision_models: status += f", {len(vision_models)} vision models" self.update_status(status) else: self.update_status("Ollama running but no models found") else: self.update_status("Ollama not available") def on_text_model_changed(self, widget): """Handle text model radio button change""" if widget.get_active(): model = widget.get_label() self.config.set('ollama_model', model) def on_vision_model_changed(self, widget): """Handle vision model radio button change""" if widget.get_active(): model = widget.get_label() if model != "(No vision model)": self.config.set('ollama_vision_model', model) def on_refresh_models(self, widget): """Handle refresh models button click""" # Update host if changed new_host = self.hostEntry.get_text() self.ollamaInterface = OllamaInterface(new_host) self.refresh_ollama_models() def on_save_settings(self, widget): """Save settings to configuration""" if self.claudeRadio.get_active(): self.config.set('provider', 'claude-code') else: self.config.set('provider', 'ollama') self.config.set('ollama_host', self.hostEntry.get_text()) # Save selected text model for radio in self.textModelRadios: if radio.get_active(): self.config.set('ollama_model', radio.get_label()) break # Save selected vision model for radio in self.visionModelRadios: if radio.get_active(): model = radio.get_label() if model != "(No vision model)": self.config.set('ollama_vision_model', model) break self.config.set('confirm_actions', 'true' if self.confirmCheck.get_active() else 'false') # Save voice settings self.config.set('voice_enabled', 'true' if self.voiceEnabledCheck.get_active() else 'false') self.config.set('voice_output', 'true' if self.voiceOutputCheck.get_active() else 'false') self.config.set('wake_word', self.wakeWordEntry.get_text()) self.config.set('voice_timeout', str(int(self.timeoutSpin.get_value()))) self.update_status("Settings saved successfully!") # Update button labels with new AI provider self.update_button_labels() def update_status(self, message): """Update status label""" self.statusLabel.set_text(message) GLib.timeout_add_seconds(5, lambda: self.statusLabel.set_text("")) def get_current_ai_name(self): """Get the name of the currently selected AI provider""" provider = self.config.get('provider') if provider == 'claude-code': return "Claude" elif provider == 'ollama': model = self.config.get('ollama_model', 'llama2') return f"Ollama ({model})" if model != 'llama2' else "Ollama" else: return "AI" def update_button_labels(self): """Update button labels with current AI provider name""" ai_name = self.get_current_ai_name() self.askButton.set_label(f"Ask _{ai_name}") self.contextButton.set_label(f"Ask {ai_name} About _Window") self.actionButton.set_label(f"Request {ai_name} _Action") if hasattr(self, 'shareButton'): self.shareButton.set_label(f"Ask {ai_name} About _File") # Update response label self.responseLabel.set_label(f"{ai_name} _Response:") # Update accessible descriptions response_atk = self.responseText.get_accessible() response_atk.set_name(f"{ai_name} Response") response_atk.set_description(f"{ai_name}'s response to your question") # Update button descriptions self.askButton.get_accessible().set_description(f"Send your question to {ai_name}") self.contextButton.get_accessible().set_description(f"Ask {ai_name} about the currently focused window") self.actionButton.get_accessible().set_description(f"Request step-by-step instructions from {ai_name}") if hasattr(self, 'shareButton'): self.shareButton.get_accessible().set_description(f"Ask {ai_name} about the selected file") def get_question_text(self): """Get text from question text view""" buffer = self.questionText.get_buffer() start_iter = buffer.get_start_iter() end_iter = buffer.get_end_iter() return buffer.get_text(start_iter, end_iter, False) def set_response_text(self, text): """Set text in response text view""" buffer = self.responseText.get_buffer() buffer.set_text(text) def append_response_text(self, text): """Append text to response text view""" buffer = self.responseText.get_buffer() end_iter = buffer.get_end_iter() buffer.insert(end_iter, "\n\n" + text) def show_processing(self, provider_type): """Show processing message""" # Show specific model name if available if hasattr(self, 'current_processing_model') and self.current_processing_model: if provider_type == 'ollama': ai_name = f"Ollama ({self.current_processing_model})" else: ai_name = self.get_current_ai_name() else: ai_name = self.get_current_ai_name() self.set_response_text(f"{ai_name} is processing your request...") self.askButton.set_sensitive(False) self.contextButton.set_sensitive(False) self.actionButton.set_sensitive(False) # Play processing sound subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800'], capture_output=True) def hide_processing(self): """Hide processing message and re-enable buttons""" self.askButton.set_sensitive(True) self.contextButton.set_sensitive(True) self.actionButton.set_sensitive(True) # Play completion sound subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200'], capture_output=True) def send_ai_request(self, message, context=None, image_path=None): """Send request to selected AI provider""" provider = self.config.get('provider') # Add neutral system context to avoid AI making assumptions if context is None and not image_path: system_context = "You are a helpful AI assistant. Please provide a direct and helpful response to the user's question without making assumptions about their specific use case or technical setup." else: system_context = context # Store which model is being used for status display self.current_processing_model = None if provider == 'claude-code': if not self.claudeInterface.is_available(): return "Error: Claude Code is not available. Please install or configure Claude Code." self.show_processing("claude-code") try: response = self.claudeInterface.send_message(message, system_context, image_path) return response finally: self.hide_processing() elif provider == 'ollama': if not self.ollamaInterface.is_available(): return "Error: Ollama is not available. Please start Ollama service." # Choose model based on whether we have an image if image_path: model = self.config.get('ollama_vision_model', 'llava') # Verify the vision model is available available_models = self.ollamaInterface.get_models() if model not in available_models: vision_models = self.ollamaInterface.get_vision_models() if vision_models: model = vision_models[0] # Use first available vision model else: return "Error: No vision models available for image processing. Please install a vision model like llava." else: model = self.config.get('ollama_model') # Store the actual model being used for status display self.current_processing_model = model self.show_processing("ollama") try: response = self.ollamaInterface.send_message(message, model, system_context, image_path) return response finally: self.hide_processing() return "Error: No AI provider configured" def on_ask_question(self, widget): """Handle ask question button click""" question = self.get_question_text().strip() if not question: self.set_response_text("Please enter a question first.") return def ask_in_thread(): response = self.send_ai_request(question) GLib.idle_add(self.set_response_text, response) threading.Thread(target=ask_in_thread, daemon=True).start() def on_ask_with_context(self, widget): """Handle ask with context button click""" question = self.get_question_text().strip() if not question: self.set_response_text("Please enter a question first.") return def ask_with_context_in_thread(): context = self.windowContext.get_focused_window_info() response = self.send_ai_request(question, context) GLib.idle_add(self.set_response_text, response) threading.Thread(target=ask_with_context_in_thread, daemon=True).start() def on_request_action(self, widget): """Handle request action button click""" question = self.get_question_text().strip() if not question: self.set_response_text("Please enter an action request first.") return # Add action context to the request action_prompt = f"Please provide step-by-step instructions for: {question}\n\nFormat your response as a numbered list of specific actions I should take." def request_action_in_thread(): response = self.send_ai_request(action_prompt) # Show confirmation dialog if enabled if self.config.get('confirm_actions') == 'true': GLib.idle_add(self.show_action_confirmation, response) else: GLib.idle_add(self.set_response_text, response) threading.Thread(target=request_action_in_thread, daemon=True).start() def show_action_confirmation(self, response): """Show confirmation dialog for AI actions""" dialog = Gtk.MessageDialog( transient_for=self, flags=0, message_type=Gtk.MessageType.QUESTION, buttons=Gtk.ButtonsType.YES_NO, text="AI Action Confirmation" ) dialog.format_secondary_text( f"The AI has provided the following action plan:\n\n{response}\n\n" "Do you want to proceed with these actions?" ) response_id = dialog.run() dialog.destroy() if response_id == Gtk.ResponseType.YES: self.set_response_text(f"Action plan approved:\n\n{response}") else: self.set_response_text("Action cancelled by user.") def on_browse_file(self, widget): """Handle browse file button click""" dialog = Gtk.FileChooserDialog( title="Select file to share", parent=self, action=Gtk.FileChooserAction.OPEN ) dialog.add_buttons( Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_OPEN, Gtk.ResponseType.OK ) response = dialog.run() if response == Gtk.ResponseType.OK: filename = dialog.get_filename() self.fileEntry.set_text(filename) dialog.destroy() def on_ask_about_file(self, widget): """Handle ask about file button click""" file_path = self.fileEntry.get_text().strip() if not file_path or not os.path.exists(file_path): self.set_response_text("Please select a valid file first.") return question = self.get_question_text().strip() if not question: question = "Please analyze this file and tell me what it does." def ask_about_file_in_thread(): # For text files, read content and add to message if file_path.lower().endswith(('.txt', '.py', '.sh', '.conf', '.md', '.json', '.xml', '.html')): try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: file_content = f.read() file_question = f"{question}\n\nFile: {file_path}\nContent:\n{file_content}" response = self.send_ai_request(file_question) except Exception as e: response = f"Error reading file: {str(e)}" else: # For other files (including images), use Claude Code's file handling response = self.send_ai_request(question, image_path=file_path) GLib.idle_add(self.set_response_text, response) threading.Thread(target=ask_about_file_in_thread, daemon=True).start() def on_describe_image(self, widget): """Handle describe screenshot button click""" def describe_image_in_thread(): # Take screenshot temp_dir = tempfile.mkdtemp() screenshot_path = os.path.join(temp_dir, 'screenshot.png') try: # Use scrot to take screenshot result = subprocess.run(['scrot', screenshot_path], capture_output=True, text=True, timeout=10) if result.returncode != 0: GLib.idle_add(self.set_response_text, "Error: Could not take screenshot") return # Send to AI for description response = self.send_ai_request( "Please describe what you see in this screenshot in detail. " "Focus on any text, interface elements, and visual content.", image_path=screenshot_path ) GLib.idle_add(self.set_response_text, response) except Exception as e: GLib.idle_add(self.set_response_text, f"Error taking screenshot: {str(e)}") finally: # Clean up temp file try: os.unlink(screenshot_path) os.rmdir(temp_dir) except: pass threading.Thread(target=describe_image_in_thread, daemon=True).start() def on_analyze_selected(self, widget): """Handle analyze selected text/screen content button click""" def analyze_selected_in_thread(): try: # First, try to get clipboard content (selected text) # Use wl-paste on Wayland, xclip on X11 if os.environ.get('WAYLAND_DISPLAY'): clipboard_result = subprocess.run(['wl-paste', '-p'], capture_output=True, text=True, timeout=5) else: clipboard_result = subprocess.run(['xclip', '-o', '-selection', 'primary'], capture_output=True, text=True, timeout=5) selected_text = clipboard_result.stdout.strip() if clipboard_result.returncode == 0 else "" if selected_text: # We have selected text, analyze it question = self.get_question_text().strip() if not question: question = "Please analyze this selected text and tell me what it means or what I should know about it." full_question = f"{question}\n\nSelected text: {selected_text}" response = self.send_ai_request(full_question) else: # No selected text, fallback to OCR of current screen # Take screenshot first temp_dir = tempfile.mkdtemp() screenshot_path = os.path.join(temp_dir, 'screen_analysis.png') try: # Take screenshot scrot_result = subprocess.run(['scrot', screenshot_path], capture_output=True, text=True, timeout=10) if scrot_result.returncode != 0: GLib.idle_add(self.set_response_text, "Error: Could not capture screen content") return # Try OCR first to get text content try: from PIL import Image import pytesseract image = Image.open(screenshot_path) ocr_text = pytesseract.image_to_string(image).strip() if ocr_text: # We found text via OCR question = self.get_question_text().strip() if not question: question = "Please analyze this text content and tell me what's important or what I should know about it." full_question = f"{question}\n\nScreen text content: {ocr_text}" response = self.send_ai_request(full_question) else: # No text found, do visual analysis question = self.get_question_text().strip() if not question: question = "Please analyze this screen content and tell me what you see." response = self.send_ai_request(question, image_path=screenshot_path) except ImportError: # Fallback to AI image analysis without OCR question = self.get_question_text().strip() if not question: question = "Please analyze this screen content and tell me what you see." response = self.send_ai_request(question, image_path=screenshot_path) finally: # Clean up temp file try: os.unlink(screenshot_path) os.rmdir(temp_dir) except: pass GLib.idle_add(self.set_response_text, response) except Exception as e: GLib.idle_add(self.set_response_text, f"Error analyzing content: {str(e)}") threading.Thread(target=analyze_selected_in_thread, daemon=True).start() def speak_text(self, text): """Use spd-say to speak text if voice output is enabled""" if self.config.get('voice_output') == 'true': try: subprocess.run(['spd-say', '-P', 'important', text], capture_output=True, timeout=30) except Exception as e: print(f"Error speaking text: {e}") def update_voice_status(self, message): """Update voice status label""" GLib.idle_add(self.voiceStatus.set_text, message) def on_voice_question(self, widget): """Handle voice question button click""" if not self.voiceRecognition.is_available(): self.set_response_text("Voice recognition not available. Please install python-speech-recognition and python-pyaudio.") return def voice_question_thread(): try: self.update_voice_status("🎤 Listening...") # Play recording start sound subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000', 'vol', '0.3'], capture_output=True) timeout = int(self.config.get('voice_timeout', '5')) recognized_text = self.voiceRecognition.recognize_speech(timeout=timeout) # Play recording end sound subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200', 'vol', '0.3'], capture_output=True) if recognized_text.startswith("Error:") or recognized_text.startswith("Sorry,"): self.update_voice_status(recognized_text) self.speak_text(recognized_text) return # Set the recognized text in the question field GLib.idle_add(self.set_question_text, recognized_text) self.update_voice_status(f"Recognized: {recognized_text}") # Automatically send the question to AI response = self.send_ai_request(recognized_text) GLib.idle_add(self.set_response_text, response) # Speak the response if enabled self.speak_text(response) except Exception as e: error_msg = f"Voice recognition error: {str(e)}" self.update_voice_status(error_msg) GLib.idle_add(self.set_response_text, error_msg) finally: self.update_voice_status("") threading.Thread(target=voice_question_thread, daemon=True).start() def on_toggle_continuous_listening(self, widget): """Handle continuous listening toggle""" if not self.voiceRecognition.is_available(): widget.set_active(False) self.set_response_text("Voice recognition not available.") return if widget.get_active(): self.start_continuous_listening() else: self.stop_continuous_listening() def start_continuous_listening(self): """Start continuous listening for wake word""" if self.continuousListening: return self.continuousListening = True self.stopListening.clear() self.update_voice_status("👂 Listening for wake word...") wake_word = self.config.get('wake_word', 'hey assistant').lower() def wake_word_callback(text): if wake_word in text: GLib.idle_add(self.on_wake_word_detected) def continuous_listening_thread(): self.voiceRecognition.recognize_speech_continuous(wake_word_callback, self.stopListening) self.listeningThread = threading.Thread(target=continuous_listening_thread, daemon=True) self.listeningThread.start() def stop_continuous_listening(self): """Stop continuous listening""" if not self.continuousListening: return self.continuousListening = False self.stopListening.set() self.update_voice_status("") if self.listeningThread: self.listeningThread.join(timeout=2) def on_wake_word_detected(self): """Handle wake word detection""" ai_name = self.get_current_ai_name() self.speak_text("Yes, what can I help you with?") self.update_voice_status(f"🎤 Wake word detected, listening for {ai_name}...") # Play wake word detection sound subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800', 'vol', '0.4'], capture_output=True) def wake_response_thread(): try: timeout = int(self.config.get('voice_timeout', '5')) recognized_text = self.voiceRecognition.recognize_speech(timeout=timeout) if recognized_text.startswith("Error:") or recognized_text.startswith("Sorry,"): self.update_voice_status("") self.speak_text("I didn't catch that. Please try again.") return # Process the question GLib.idle_add(self.set_question_text, recognized_text) response = self.send_ai_request(recognized_text) GLib.idle_add(self.set_response_text, response) # Speak the response self.speak_text(response) except Exception as e: self.speak_text("Sorry, there was an error processing your question.") finally: self.update_voice_status("👂 Listening for wake word...") threading.Thread(target=wake_response_thread, daemon=True).start() def set_question_text(self, text): """Set text in question text view""" buffer = self.questionText.get_buffer() buffer.set_text(text) def on_key_press(self, widget, event): """Handle keyboard shortcuts""" keyval = event.keyval state = event.state & (Gdk.ModifierType.CONTROL_MASK | Gdk.ModifierType.MOD1_MASK) # Ctrl+Tab to switch tabs if (state & Gdk.ModifierType.CONTROL_MASK) and keyval == Gdk.KEY_Tab: current = self.notebook.get_current_page() next_page = (current + 1) % self.notebook.get_n_pages() self.notebook.set_current_page(next_page) return True # Ctrl+Shift+Tab to switch tabs backwards if (state & Gdk.ModifierType.CONTROL_MASK) and keyval == Gdk.KEY_ISO_Left_Tab: current = self.notebook.get_current_page() prev_page = (current - 1) % self.notebook.get_n_pages() self.notebook.set_current_page(prev_page) return True # F4 for voice input (accessibility shortcut) if keyval == Gdk.KEY_F4: if self.voiceRecognition.is_available(): self.on_voice_question(None) return True # F5 to toggle continuous listening if keyval == Gdk.KEY_F5: if self.voiceRecognition.is_available(): self.listenToggle.set_active(not self.listenToggle.get_active()) return True # Ctrl+S to save settings (when on settings tab) if (state & Gdk.ModifierType.CONTROL_MASK) and keyval == Gdk.KEY_s: if self.notebook.get_current_page() == 1: # Settings tab self.on_save_settings(None) return True # Escape to close program if keyval == Gdk.KEY_Escape: self.cleanup() Gtk.main_quit() return True return False def on_textview_key_press(self, widget, event): """Handle key press events in text views for better navigation""" keyval = event.keyval state = event.state & Gdk.ModifierType.CONTROL_MASK # Ctrl+Enter to submit question if state and keyval == Gdk.KEY_Return: self.on_ask_question(None) return True return False def set_initial_focus(self): """Set initial focus to the question text input""" self.questionText.grab_focus() return False # Don't repeat this idle callback def on_tab_switched(self, notebook, page, page_num): """Handle tab switching to set proper focus""" if page_num == 0: # Interaction tab GLib.idle_add(lambda: self.questionText.grab_focus()) elif page_num == 1: # Settings tab # Focus the first radio button in settings GLib.idle_add(lambda: self.claudeRadio.grab_focus()) def cleanup(self): """Cleanup voice resources on exit""" self.stop_continuous_listening() if self.voiceRecognition: self.voiceRecognition.stop_recording() def main(): """Main entry point""" app = AiAssistant() app.show_all() # Play startup sound subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000'], capture_output=True) # Connect cleanup on destroy app.connect("destroy", lambda w: app.cleanup()) try: Gtk.main() except KeyboardInterrupt: app.cleanup() if __name__ == '__main__': main()