1678 lines
70 KiB
Python
Executable File
1678 lines
70 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
AI Assistant Interface
|
|
Provides accessibility-focused AI interaction with multiple providers
|
|
"""
|
|
|
|
import gi
|
|
gi.require_version('Gtk', '3.0')
|
|
from gi.repository import Gtk, GLib, Gdk
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import configparser
|
|
from pathlib import Path
|
|
import i3ipc
|
|
import threading
|
|
import requests
|
|
import time
|
|
import pyaudio
|
|
import wave
|
|
|
|
class SystemCommands:
|
|
"""Check availability of required system commands"""
|
|
|
|
@staticmethod
|
|
def is_command_available(command):
|
|
"""Check if a command is available in PATH"""
|
|
try:
|
|
result = subprocess.run(['which', command],
|
|
capture_output=True, text=True, timeout=2)
|
|
return result.returncode == 0
|
|
except (subprocess.SubprocessError, FileNotFoundError, OSError) as e:
|
|
return False
|
|
|
|
@staticmethod
|
|
def check_dependencies():
|
|
"""Check for required system commands and return missing ones"""
|
|
required_commands = {
|
|
'scrot': 'Required for screenshots',
|
|
'play': 'Required for audio feedback (sox package)',
|
|
'spd-say': 'Required for text-to-speech output',
|
|
}
|
|
|
|
optional_commands = {
|
|
'xclip': 'Required for clipboard on X11',
|
|
'wl-paste': 'Required for clipboard on Wayland',
|
|
'tesseract': 'Required for OCR functionality',
|
|
}
|
|
|
|
missing_required = {}
|
|
missing_optional = {}
|
|
|
|
for cmd, desc in required_commands.items():
|
|
if not SystemCommands.is_command_available(cmd):
|
|
missing_required[cmd] = desc
|
|
|
|
for cmd, desc in optional_commands.items():
|
|
if not SystemCommands.is_command_available(cmd):
|
|
missing_optional[cmd] = desc
|
|
|
|
return missing_required, missing_optional
|
|
|
|
class VoiceRecognition:
|
|
"""Voice recognition system for AI assistant"""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.is_recording = False
|
|
self.audio = None
|
|
self.stream = None
|
|
|
|
# Audio settings
|
|
self.sample_rate = 16000
|
|
self.chunk_size = 1024
|
|
self.audio_format = pyaudio.paInt16
|
|
self.channels = 1
|
|
|
|
try:
|
|
import speech_recognition as sr
|
|
self.recognizer = sr.Recognizer()
|
|
self.microphone = sr.Microphone()
|
|
self.sr_available = True
|
|
|
|
# Adjust for ambient noise
|
|
with self.microphone as source:
|
|
self.recognizer.adjust_for_ambient_noise(source)
|
|
except ImportError:
|
|
self.sr_available = False
|
|
self.recognizer = None
|
|
self.microphone = None
|
|
|
|
def is_available(self):
|
|
"""Check if voice recognition is available"""
|
|
return self.sr_available
|
|
|
|
def start_recording(self):
|
|
"""Start recording audio"""
|
|
if not self.sr_available:
|
|
return False
|
|
|
|
try:
|
|
self.audio = pyaudio.PyAudio()
|
|
self.is_recording = True
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error starting recording: {e}")
|
|
return False
|
|
|
|
def stop_recording(self):
|
|
"""Stop recording audio"""
|
|
self.is_recording = False
|
|
if self.stream:
|
|
self.stream.stop_stream()
|
|
self.stream.close()
|
|
self.stream = None
|
|
if self.audio:
|
|
self.audio.terminate()
|
|
self.audio = None
|
|
|
|
def recognize_speech(self, timeout=5, phrase_timeout=1):
|
|
"""Recognize speech from microphone"""
|
|
if not self.sr_available:
|
|
return "Error: Speech recognition not available. Install python-speech-recognition."
|
|
|
|
try:
|
|
import speech_recognition as sr
|
|
|
|
with self.microphone as source:
|
|
# Listen for audio with timeout
|
|
audio = self.recognizer.listen(source, timeout=timeout, phrase_time_limit=phrase_timeout)
|
|
|
|
# Try to recognize speech using Google Speech Recognition
|
|
try:
|
|
text = self.recognizer.recognize_google(audio)
|
|
return text
|
|
except sr.RequestError:
|
|
# Try offline recognition as fallback
|
|
try:
|
|
text = self.recognizer.recognize_sphinx(audio)
|
|
return text
|
|
except sr.RequestError:
|
|
return "Error: Speech recognition service unavailable"
|
|
except sr.UnknownValueError:
|
|
return "Sorry, I couldn't understand that. Please try again."
|
|
|
|
except sr.WaitTimeoutError:
|
|
return "No speech detected. Please try again."
|
|
except Exception as e:
|
|
return f"Error during speech recognition: {str(e)}"
|
|
|
|
def recognize_speech_continuous(self, callback, stop_event):
|
|
"""Continuous speech recognition for wake word detection"""
|
|
if not self.sr_available:
|
|
return
|
|
|
|
try:
|
|
import speech_recognition as sr
|
|
|
|
with self.microphone as source:
|
|
while not stop_event.is_set():
|
|
try:
|
|
# Listen for 1 second chunks
|
|
audio = self.recognizer.listen(source, timeout=1, phrase_time_limit=1)
|
|
|
|
try:
|
|
text = self.recognizer.recognize_google(audio)
|
|
callback(text.lower())
|
|
except (sr.UnknownValueError, sr.RequestError):
|
|
# Ignore recognition errors in continuous mode
|
|
pass
|
|
except sr.WaitTimeoutError:
|
|
# Normal timeout, continue listening
|
|
continue
|
|
except Exception as e:
|
|
print(f"Error in continuous recognition: {e}")
|
|
break
|
|
except ImportError:
|
|
print("Speech recognition not available")
|
|
return
|
|
|
|
class AiConfig:
|
|
"""Configuration manager for AI settings with XDG directory support"""
|
|
|
|
def __init__(self):
|
|
self.configDir = Path(os.environ.get('XDG_CONFIG_HOME',
|
|
os.path.expanduser('~/.config'))) / 'stormux' / 'I38'
|
|
self.configFile = self.configDir / 'ai.conf'
|
|
self.configDir.mkdir(parents=True, exist_ok=True)
|
|
self.load_config()
|
|
|
|
def load_config(self):
|
|
"""Load configuration from file"""
|
|
self.config = configparser.ConfigParser()
|
|
self.config.read(self.configFile)
|
|
|
|
# Set defaults if sections don't exist
|
|
if 'ai' not in self.config:
|
|
self.config.add_section('ai')
|
|
|
|
# Default values
|
|
defaults = {
|
|
'provider': 'claude-code',
|
|
'ollama_model': 'llama2',
|
|
'ollama_vision_model': 'llava',
|
|
'ollama_host': 'http://localhost:11434',
|
|
'confirm_actions': 'true',
|
|
'voice_enabled': 'false',
|
|
'voice_output': 'true',
|
|
'wake_word': 'hey assistant',
|
|
'voice_timeout': '5',
|
|
'continuous_listening': 'false'
|
|
}
|
|
|
|
for key, value in defaults.items():
|
|
if key not in self.config['ai']:
|
|
self.config.set('ai', key, value)
|
|
|
|
def save_config(self):
|
|
"""Save configuration to file"""
|
|
with open(self.configFile, 'w') as f:
|
|
self.config.write(f)
|
|
|
|
def get(self, key, fallback=None):
|
|
"""Get configuration value"""
|
|
return self.config.get('ai', key, fallback=fallback)
|
|
|
|
def set(self, key, value):
|
|
"""Set configuration value"""
|
|
self.config.set('ai', key, str(value))
|
|
self.save_config()
|
|
|
|
class OllamaInterface:
|
|
"""Interface for Ollama AI provider"""
|
|
|
|
def __init__(self, host='http://localhost:11434'):
|
|
self.host = host
|
|
|
|
def get_models(self):
|
|
"""Get list of available Ollama models"""
|
|
try:
|
|
response = requests.get(f'{self.host}/api/tags', timeout=5)
|
|
if response.status_code == 200:
|
|
models = response.json().get('models', [])
|
|
return [model['name'] for model in models]
|
|
except Exception as e:
|
|
print(f"Error getting Ollama models: {e}")
|
|
return []
|
|
|
|
def get_vision_models(self):
|
|
"""Get list of models that can handle images"""
|
|
all_models = self.get_models()
|
|
# Common vision model patterns
|
|
vision_patterns = ['llava', 'llama3.2-vision', 'minicpm-v', 'bakllava', 'moondream']
|
|
vision_models = []
|
|
|
|
for model in all_models:
|
|
model_lower = model.lower()
|
|
if any(pattern in model_lower for pattern in vision_patterns):
|
|
vision_models.append(model)
|
|
|
|
return vision_models
|
|
|
|
def is_vision_model(self, model_name):
|
|
"""Check if a model can handle images"""
|
|
if not model_name:
|
|
return False
|
|
model_lower = model_name.lower()
|
|
vision_patterns = ['llava', 'llama3.2-vision', 'minicpm-v', 'bakllava', 'moondream']
|
|
return any(pattern in model_lower for pattern in vision_patterns)
|
|
|
|
def is_available(self):
|
|
"""Check if Ollama is running and available"""
|
|
try:
|
|
response = requests.get(f'{self.host}/api/tags', timeout=3)
|
|
return response.status_code == 200
|
|
except (requests.RequestException, ConnectionError, OSError) as e:
|
|
return False
|
|
|
|
def send_message(self, message, model, context=None, image_path=None):
|
|
"""Send message to Ollama"""
|
|
try:
|
|
data = {
|
|
'model': model,
|
|
'prompt': message,
|
|
'stream': False
|
|
}
|
|
|
|
if context and not context.startswith("You are a helpful AI assistant"):
|
|
data['system'] = context
|
|
|
|
# Handle image if provided
|
|
if image_path and os.path.exists(image_path):
|
|
import base64
|
|
|
|
# Check if the model can handle images
|
|
if not self.is_vision_model(model):
|
|
return f"Error: Model '{model}' cannot process images. Please select a vision model like llava or llama3.2-vision in settings."
|
|
|
|
# Encode image to base64
|
|
try:
|
|
with open(image_path, 'rb') as image_file:
|
|
image_data = base64.b64encode(image_file.read()).decode('utf-8')
|
|
data['images'] = [image_data]
|
|
except Exception as e:
|
|
return f"Error reading image: {str(e)}"
|
|
|
|
response = requests.post(f'{self.host}/api/generate',
|
|
json=data, timeout=60) # Longer timeout for image processing
|
|
if response.status_code == 200:
|
|
return response.json().get('response', 'No response received')
|
|
else:
|
|
return f"Error: HTTP {response.status_code}"
|
|
except Exception as e:
|
|
return f"Error communicating with Ollama: {str(e)}"
|
|
|
|
class ClaudeCodeInterface:
|
|
"""Interface for Claude Code AI provider"""
|
|
|
|
def is_available(self):
|
|
"""Check if Claude Code is available"""
|
|
try:
|
|
result = subprocess.run(['claude', '--version'],
|
|
capture_output=True, text=True, timeout=5)
|
|
return result.returncode == 0
|
|
except (subprocess.SubprocessError, FileNotFoundError, OSError) as e:
|
|
return False
|
|
|
|
def send_message(self, message, context=None, image_path=None):
|
|
"""Send message to Claude Code"""
|
|
try:
|
|
cmd = ['claude']
|
|
|
|
# Add context if provided
|
|
if context and not context.startswith("You are a helpful AI assistant"):
|
|
message = f"Context: {context}\n\n{message}"
|
|
|
|
# Add image if provided
|
|
if image_path and os.path.exists(image_path):
|
|
cmd.extend(['--image', image_path])
|
|
|
|
# Send the message
|
|
cmd.append(message)
|
|
|
|
# Run from home directory to avoid picking up project context
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60, cwd=os.path.expanduser('~'))
|
|
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
else:
|
|
return f"Error: {result.stderr.strip()}"
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Request timed out"
|
|
except Exception as e:
|
|
return f"Error communicating with Claude Code: {str(e)}"
|
|
|
|
class WindowContext:
|
|
"""Get context information from focused window"""
|
|
|
|
def __init__(self):
|
|
try:
|
|
self.i3 = i3ipc.Connection()
|
|
except (ConnectionError, FileNotFoundError, Exception) as e:
|
|
self.i3 = None
|
|
|
|
def get_focused_window_info(self):
|
|
"""Get information about the currently focused window"""
|
|
if not self.i3:
|
|
return "Unable to connect to i3"
|
|
|
|
try:
|
|
tree = self.i3.get_tree()
|
|
focused = tree.find_focused()
|
|
|
|
if not focused:
|
|
return "No focused window found"
|
|
|
|
info = {
|
|
'name': focused.name or 'Unknown',
|
|
'class': getattr(focused, 'window_class', 'Unknown'),
|
|
'title': getattr(focused, 'window_title', 'Unknown'),
|
|
'workspace': focused.workspace().name if focused.workspace() else 'Unknown'
|
|
}
|
|
|
|
return f"Current application: {info['name']}\nWindow type: {info['class']}"
|
|
except Exception as e:
|
|
return f"Error getting window info: {str(e)}"
|
|
|
|
class AiAssistant(Gtk.Window):
|
|
"""Main AI Assistant window with accessibility features"""
|
|
|
|
def __init__(self):
|
|
super().__init__(title="AI Assistant")
|
|
|
|
# Initialize components
|
|
self.config = AiConfig()
|
|
self.claudeInterface = ClaudeCodeInterface()
|
|
self.ollamaInterface = OllamaInterface(self.config.get('ollama_host'))
|
|
self.windowContext = WindowContext()
|
|
self.voiceRecognition = VoiceRecognition(self.config)
|
|
|
|
# Voice mode state
|
|
self.continuousListening = False
|
|
self.listeningThread = None
|
|
self.stopListening = threading.Event()
|
|
|
|
# Window setup
|
|
self.set_default_size(600, 500)
|
|
self.set_position(Gtk.WindowPosition.CENTER)
|
|
self.connect("destroy", Gtk.main_quit)
|
|
self.connect("key-press-event", self.on_key_press)
|
|
|
|
# Enable accessibility
|
|
self.set_can_focus(True)
|
|
self.set_focus_on_map(True)
|
|
|
|
# Create notebook for tabs
|
|
self.notebook = Gtk.Notebook()
|
|
self.notebook.set_tab_pos(Gtk.PositionType.TOP)
|
|
self.notebook.set_can_focus(True)
|
|
self.notebook.set_scrollable(True)
|
|
self.notebook.connect("switch-page", self.on_tab_switched)
|
|
self.add(self.notebook)
|
|
|
|
# Create tabs
|
|
self.create_interaction_tab()
|
|
self.create_settings_tab()
|
|
|
|
# Set focus to interaction tab and initial focus
|
|
self.notebook.set_current_page(0)
|
|
|
|
# Set initial focus to question text after window is shown
|
|
GLib.idle_add(self.set_initial_focus)
|
|
|
|
# Update button labels with current AI provider
|
|
GLib.idle_add(self.update_button_labels)
|
|
|
|
|
|
def create_interaction_tab(self):
|
|
"""Create the main interaction tab"""
|
|
# Create main container
|
|
vbox = Gtk.VBox(spacing=10)
|
|
vbox.set_border_width(10)
|
|
|
|
# Add question section
|
|
questionLabel = Gtk.Label("_Ask AI a question:")
|
|
questionLabel.set_use_underline(True)
|
|
questionLabel.set_alignment(0, 0.5)
|
|
vbox.pack_start(questionLabel, False, False, 0)
|
|
|
|
# Question text view with scrolling
|
|
scrollWindow = Gtk.ScrolledWindow()
|
|
scrollWindow.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC)
|
|
scrollWindow.set_size_request(-1, 100)
|
|
scrollWindow.set_can_focus(True)
|
|
|
|
self.questionText = Gtk.TextView()
|
|
self.questionText.set_wrap_mode(Gtk.WrapMode.WORD)
|
|
self.questionText.set_can_focus(True)
|
|
self.questionText.set_accepts_tab(False) # Allow Tab to move focus instead of inserting tab
|
|
|
|
# Set accessibility properties
|
|
atk_obj = self.questionText.get_accessible()
|
|
atk_obj.set_name("Question input")
|
|
atk_obj.set_description("Enter your question for the AI assistant here")
|
|
|
|
# Link label to text view for screen readers
|
|
questionLabel.set_mnemonic_widget(self.questionText)
|
|
|
|
# Connect key press event for additional navigation
|
|
self.questionText.connect("key-press-event", self.on_textview_key_press)
|
|
|
|
scrollWindow.add(self.questionText)
|
|
vbox.pack_start(scrollWindow, False, False, 0)
|
|
|
|
# Action buttons row
|
|
buttonBox = Gtk.HBox(spacing=10)
|
|
|
|
self.askButton = Gtk.Button("Ask _Question")
|
|
self.askButton.set_use_underline(True)
|
|
self.askButton.connect("clicked", self.on_ask_question)
|
|
self.askButton.set_can_focus(True)
|
|
self.askButton.get_accessible().set_description("Send your question to the AI assistant")
|
|
buttonBox.pack_start(self.askButton, True, True, 0)
|
|
|
|
self.contextButton = Gtk.Button("Ask About _Window")
|
|
self.contextButton.set_use_underline(True)
|
|
self.contextButton.connect("clicked", self.on_ask_with_context)
|
|
self.contextButton.set_can_focus(True)
|
|
self.contextButton.get_accessible().set_description("Ask about the currently focused window")
|
|
buttonBox.pack_start(self.contextButton, True, True, 0)
|
|
|
|
self.actionButton = Gtk.Button("Request _Action")
|
|
self.actionButton.set_use_underline(True)
|
|
self.actionButton.connect("clicked", self.on_request_action)
|
|
self.actionButton.set_can_focus(True)
|
|
self.actionButton.get_accessible().set_description("Request step-by-step instructions from AI")
|
|
buttonBox.pack_start(self.actionButton, True, True, 0)
|
|
|
|
vbox.pack_start(buttonBox, False, False, 0)
|
|
|
|
# Voice input section
|
|
voiceFrame = Gtk.Frame(label="Voice Input")
|
|
voiceBox = Gtk.HBox(spacing=10)
|
|
voiceBox.set_border_width(10)
|
|
|
|
self.voiceButton = Gtk.Button("🎤 _Voice Question")
|
|
self.voiceButton.set_use_underline(True)
|
|
self.voiceButton.connect("clicked", self.on_voice_question)
|
|
self.voiceButton.set_can_focus(True)
|
|
self.voiceButton.set_sensitive(self.voiceRecognition.is_available())
|
|
self.voiceButton.get_accessible().set_description("Record your question using voice input")
|
|
voiceBox.pack_start(self.voiceButton, True, True, 0)
|
|
|
|
self.listenToggle = Gtk.ToggleButton("👂 _Continuous Listen")
|
|
self.listenToggle.set_use_underline(True)
|
|
self.listenToggle.connect("toggled", self.on_toggle_continuous_listening)
|
|
self.listenToggle.set_can_focus(True)
|
|
self.listenToggle.set_sensitive(self.voiceRecognition.is_available())
|
|
self.listenToggle.get_accessible().set_description("Toggle continuous listening for wake word")
|
|
voiceBox.pack_start(self.listenToggle, True, True, 0)
|
|
|
|
# Voice status label
|
|
self.voiceStatus = Gtk.Label("")
|
|
voiceBox.pack_start(self.voiceStatus, False, False, 0)
|
|
|
|
voiceFrame.add(voiceBox)
|
|
vbox.pack_start(voiceFrame, False, False, 0)
|
|
|
|
# File sharing section
|
|
fileLabel = Gtk.Label("Share _file with AI:")
|
|
fileLabel.set_use_underline(True)
|
|
fileLabel.set_alignment(0, 0.5)
|
|
vbox.pack_start(fileLabel, False, False, 0)
|
|
|
|
fileBox = Gtk.HBox(spacing=10)
|
|
|
|
self.fileEntry = Gtk.Entry()
|
|
self.fileEntry.set_placeholder_text("Select a file to share...")
|
|
self.fileEntry.set_can_focus(True)
|
|
self.fileEntry.get_accessible().set_name("File path")
|
|
self.fileEntry.get_accessible().set_description("Path to file to share with AI")
|
|
fileLabel.set_mnemonic_widget(self.fileEntry)
|
|
fileBox.pack_start(self.fileEntry, True, True, 0)
|
|
|
|
self.browseButton = Gtk.Button("_Browse")
|
|
self.browseButton.set_use_underline(True)
|
|
self.browseButton.connect("clicked", self.on_browse_file)
|
|
self.browseButton.set_can_focus(True)
|
|
self.browseButton.get_accessible().set_description("Browse for file to share")
|
|
fileBox.pack_start(self.browseButton, False, False, 0)
|
|
|
|
self.shareButton = Gtk.Button("Ask About _File")
|
|
self.shareButton.set_use_underline(True)
|
|
self.shareButton.connect("clicked", self.on_ask_about_file)
|
|
self.shareButton.set_can_focus(True)
|
|
self.shareButton.get_accessible().set_description("Ask AI about the selected file")
|
|
fileBox.pack_start(self.shareButton, False, False, 0)
|
|
|
|
vbox.pack_start(fileBox, False, False, 0)
|
|
|
|
# Image description section
|
|
self.imageButton = Gtk.Button("Describe _Screenshot")
|
|
self.imageButton.set_use_underline(True)
|
|
self.imageButton.connect("clicked", self.on_describe_image)
|
|
self.imageButton.set_can_focus(True)
|
|
self.imageButton.get_accessible().set_description("Take screenshot and get AI description")
|
|
vbox.pack_start(self.imageButton, False, False, 0)
|
|
|
|
# Selected text section
|
|
self.selectedButton = Gtk.Button("Analyze _Selected Content")
|
|
self.selectedButton.set_use_underline(True)
|
|
self.selectedButton.connect("clicked", self.on_analyze_selected)
|
|
self.selectedButton.set_can_focus(True)
|
|
self.selectedButton.get_accessible().set_description("Analyze selected text or screen content using OCR")
|
|
vbox.pack_start(self.selectedButton, False, False, 0)
|
|
|
|
# Response section
|
|
self.responseLabel = Gtk.Label("AI _Response:")
|
|
self.responseLabel.set_use_underline(True)
|
|
self.responseLabel.set_alignment(0, 0.5)
|
|
vbox.pack_start(self.responseLabel, False, False, 0)
|
|
|
|
# Response text view with scrolling
|
|
responseScrollWindow = Gtk.ScrolledWindow()
|
|
responseScrollWindow.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC)
|
|
responseScrollWindow.set_can_focus(True)
|
|
|
|
self.responseText = Gtk.TextView()
|
|
self.responseText.set_wrap_mode(Gtk.WrapMode.WORD)
|
|
self.responseText.set_editable(False)
|
|
self.responseText.set_can_focus(True)
|
|
self.responseText.set_accepts_tab(False)
|
|
|
|
# Set accessibility properties for response
|
|
response_atk = self.responseText.get_accessible()
|
|
response_atk.set_name("AI Response")
|
|
response_atk.set_description("AI assistant's response to your question")
|
|
|
|
# Link response label to text view
|
|
self.responseLabel.set_mnemonic_widget(self.responseText)
|
|
|
|
responseScrollWindow.add(self.responseText)
|
|
vbox.pack_start(responseScrollWindow, True, True, 0)
|
|
|
|
# Add tab to notebook
|
|
tabLabel = Gtk.Label("Interaction")
|
|
self.notebook.append_page(vbox, tabLabel)
|
|
|
|
# Set initial focus
|
|
self.questionText.grab_focus()
|
|
|
|
def create_settings_tab(self):
|
|
"""Create the settings tab"""
|
|
# Create main container
|
|
vbox = Gtk.VBox(spacing=15)
|
|
vbox.set_border_width(15)
|
|
vbox.set_can_focus(False) # Container shouldn't steal focus
|
|
|
|
# AI Provider section
|
|
providerFrame = Gtk.Frame(label="AI Provider")
|
|
providerBox = Gtk.VBox(spacing=10)
|
|
providerBox.set_border_width(10)
|
|
# Make sure the box itself doesn't interfere with focus
|
|
providerBox.set_can_focus(False)
|
|
|
|
# Claude Code option
|
|
self.claudeRadio = Gtk.RadioButton.new_with_mnemonic(None, "_Claude Code")
|
|
self.claudeRadio.connect("toggled", self.on_provider_changed)
|
|
self.claudeRadio.set_can_focus(True)
|
|
self.claudeRadio.get_accessible().set_description("Use Claude Code CLI as AI provider")
|
|
providerBox.pack_start(self.claudeRadio, False, False, 0)
|
|
|
|
# Ollama option
|
|
self.ollamaRadio = Gtk.RadioButton.new_with_mnemonic_from_widget(self.claudeRadio, "_Ollama")
|
|
self.ollamaRadio.connect("toggled", self.on_provider_changed)
|
|
self.ollamaRadio.set_can_focus(True)
|
|
self.ollamaRadio.get_accessible().set_description("Use local Ollama service as AI provider")
|
|
providerBox.pack_start(self.ollamaRadio, False, False, 0)
|
|
|
|
providerFrame.add(providerBox)
|
|
vbox.pack_start(providerFrame, False, False, 0)
|
|
|
|
# Ollama settings
|
|
self.ollamaFrame = Gtk.Frame(label="Ollama Settings")
|
|
ollamaBox = Gtk.VBox(spacing=10)
|
|
ollamaBox.set_border_width(10)
|
|
|
|
# Text Models section
|
|
modelLabel = Gtk.Label("Text Models:")
|
|
modelLabel.set_alignment(0, 0.5)
|
|
ollamaBox.pack_start(modelLabel, False, False, 0)
|
|
|
|
# Container for text model radio buttons
|
|
self.textModelBox = Gtk.VBox(spacing=5)
|
|
self.textModelBox.set_border_width(10)
|
|
ollamaBox.pack_start(self.textModelBox, False, False, 0)
|
|
|
|
# Will be populated with radio buttons in refresh_ollama_models()
|
|
self.textModelRadios = []
|
|
self.textModelGroup = None
|
|
|
|
# Vision Models section
|
|
visionModelLabel = Gtk.Label("Vision Models:")
|
|
visionModelLabel.set_alignment(0, 0.5)
|
|
ollamaBox.pack_start(visionModelLabel, False, False, 0)
|
|
|
|
# Container for vision model radio buttons
|
|
self.visionModelBox = Gtk.VBox(spacing=5)
|
|
self.visionModelBox.set_border_width(10)
|
|
ollamaBox.pack_start(self.visionModelBox, False, False, 0)
|
|
|
|
# Will be populated with radio buttons in refresh_ollama_models()
|
|
self.visionModelRadios = []
|
|
self.visionModelGroup = None
|
|
|
|
# Refresh models button
|
|
self.refreshButton = Gtk.Button("_Refresh Models")
|
|
self.refreshButton.set_use_underline(True)
|
|
self.refreshButton.connect("clicked", self.on_refresh_models)
|
|
self.refreshButton.set_can_focus(True)
|
|
self.refreshButton.get_accessible().set_description("Refresh the list of available Ollama models")
|
|
ollamaBox.pack_start(self.refreshButton, False, False, 0)
|
|
|
|
# Host entry
|
|
hostLabel = Gtk.Label("Ollama _Host:")
|
|
hostLabel.set_use_underline(True)
|
|
hostLabel.set_alignment(0, 0.5)
|
|
ollamaBox.pack_start(hostLabel, False, False, 0)
|
|
|
|
self.hostEntry = Gtk.Entry()
|
|
self.hostEntry.set_text(self.config.get('ollama_host'))
|
|
self.hostEntry.set_can_focus(True)
|
|
self.hostEntry.get_accessible().set_name("Ollama host URL")
|
|
self.hostEntry.get_accessible().set_description("URL of the Ollama service")
|
|
hostLabel.set_mnemonic_widget(self.hostEntry)
|
|
ollamaBox.pack_start(self.hostEntry, False, False, 0)
|
|
|
|
self.ollamaFrame.add(ollamaBox)
|
|
vbox.pack_start(self.ollamaFrame, False, False, 0)
|
|
|
|
# Voice settings
|
|
self.voiceFrame = Gtk.Frame(label="Voice Settings")
|
|
voiceSettingsBox = Gtk.VBox(spacing=10)
|
|
voiceSettingsBox.set_border_width(10)
|
|
|
|
self.voiceEnabledCheck = Gtk.CheckButton("Enable _voice input")
|
|
self.voiceEnabledCheck.set_use_underline(True)
|
|
self.voiceEnabledCheck.set_active(self.config.get('voice_enabled') == 'true')
|
|
self.voiceEnabledCheck.set_sensitive(self.voiceRecognition.is_available())
|
|
self.voiceEnabledCheck.set_can_focus(True)
|
|
self.voiceEnabledCheck.get_accessible().set_description("Enable voice input for asking questions")
|
|
voiceSettingsBox.pack_start(self.voiceEnabledCheck, False, False, 0)
|
|
|
|
self.voiceOutputCheck = Gtk.CheckButton("Enable voice _output (speak responses)")
|
|
self.voiceOutputCheck.set_use_underline(True)
|
|
self.voiceOutputCheck.set_active(self.config.get('voice_output') == 'true')
|
|
self.voiceOutputCheck.set_can_focus(True)
|
|
self.voiceOutputCheck.get_accessible().set_description("Speak AI responses aloud using text-to-speech")
|
|
voiceSettingsBox.pack_start(self.voiceOutputCheck, False, False, 0)
|
|
|
|
# Wake word entry
|
|
wakeWordLabel = Gtk.Label("_Wake word phrase:")
|
|
wakeWordLabel.set_use_underline(True)
|
|
wakeWordLabel.set_alignment(0, 0.5)
|
|
voiceSettingsBox.pack_start(wakeWordLabel, False, False, 0)
|
|
|
|
self.wakeWordEntry = Gtk.Entry()
|
|
self.wakeWordEntry.set_text(self.config.get('wake_word'))
|
|
self.wakeWordEntry.set_placeholder_text("e.g., 'hey assistant'")
|
|
self.wakeWordEntry.set_can_focus(True)
|
|
self.wakeWordEntry.get_accessible().set_name("Wake word phrase")
|
|
self.wakeWordEntry.get_accessible().set_description("Phrase to activate voice listening")
|
|
wakeWordLabel.set_mnemonic_widget(self.wakeWordEntry)
|
|
voiceSettingsBox.pack_start(self.wakeWordEntry, False, False, 0)
|
|
|
|
# Voice timeout
|
|
timeoutLabel = Gtk.Label("Voice recognition _timeout (seconds):")
|
|
timeoutLabel.set_use_underline(True)
|
|
timeoutLabel.set_alignment(0, 0.5)
|
|
voiceSettingsBox.pack_start(timeoutLabel, False, False, 0)
|
|
|
|
self.timeoutSpin = Gtk.SpinButton.new_with_range(1, 30, 1)
|
|
self.timeoutSpin.set_value(int(self.config.get('voice_timeout', '5')))
|
|
self.timeoutSpin.set_can_focus(True)
|
|
self.timeoutSpin.get_accessible().set_name("Voice timeout")
|
|
self.timeoutSpin.get_accessible().set_description("How long to listen for speech in seconds")
|
|
timeoutLabel.set_mnemonic_widget(self.timeoutSpin)
|
|
voiceSettingsBox.pack_start(self.timeoutSpin, False, False, 0)
|
|
|
|
# Voice status
|
|
voiceStatusLabel = Gtk.Label("")
|
|
if not self.voiceRecognition.is_available():
|
|
voiceStatusLabel.set_text("Voice recognition unavailable - install python-speech-recognition and python-pyaudio")
|
|
voiceStatusLabel.set_line_wrap(True)
|
|
else:
|
|
voiceStatusLabel.set_text("Voice recognition available")
|
|
voiceSettingsBox.pack_start(voiceStatusLabel, False, False, 0)
|
|
|
|
self.voiceFrame.add(voiceSettingsBox)
|
|
vbox.pack_start(self.voiceFrame, False, False, 0)
|
|
|
|
# General settings
|
|
generalFrame = Gtk.Frame(label="General Settings")
|
|
generalBox = Gtk.VBox(spacing=10)
|
|
generalBox.set_border_width(10)
|
|
|
|
self.confirmCheck = Gtk.CheckButton("_Confirm AI actions before execution")
|
|
self.confirmCheck.set_use_underline(True)
|
|
self.confirmCheck.set_active(self.config.get('confirm_actions') == 'true')
|
|
self.confirmCheck.set_can_focus(True)
|
|
self.confirmCheck.get_accessible().set_description("Show confirmation dialog before executing AI suggested actions")
|
|
generalBox.pack_start(self.confirmCheck, False, False, 0)
|
|
|
|
generalFrame.add(generalBox)
|
|
vbox.pack_start(generalFrame, False, False, 0)
|
|
|
|
# Save button
|
|
self.saveButton = Gtk.Button("_Save Settings")
|
|
self.saveButton.set_use_underline(True)
|
|
self.saveButton.connect("clicked", self.on_save_settings)
|
|
self.saveButton.set_can_focus(True)
|
|
self.saveButton.get_accessible().set_description("Save all configuration changes")
|
|
vbox.pack_start(self.saveButton, False, False, 0)
|
|
|
|
# Status label
|
|
self.statusLabel = Gtk.Label("")
|
|
vbox.pack_start(self.statusLabel, False, False, 0)
|
|
|
|
# Add tab to notebook
|
|
tabLabel = Gtk.Label("Settings")
|
|
self.notebook.append_page(vbox, tabLabel)
|
|
|
|
# Don't set focus chain - let GTK handle it naturally
|
|
|
|
# Load current settings
|
|
self.load_current_settings()
|
|
|
|
def on_radio_key_press(self, widget, event):
|
|
"""Handle key press events for radio buttons"""
|
|
keyval = event.keyval
|
|
|
|
# Arrow keys and space to change radio button selection
|
|
if keyval in [Gdk.KEY_Up, Gdk.KEY_Down, Gdk.KEY_Left, Gdk.KEY_Right, Gdk.KEY_space]:
|
|
if widget == self.claudeRadio:
|
|
self.ollamaRadio.set_active(True)
|
|
self.ollamaRadio.grab_focus()
|
|
else:
|
|
self.claudeRadio.set_active(True)
|
|
self.claudeRadio.grab_focus()
|
|
return True
|
|
|
|
return False
|
|
|
|
def on_combo_key_press(self, widget, event):
|
|
"""Handle key press events for combo boxes to allow Tab navigation"""
|
|
keyval = event.keyval
|
|
state = event.state & Gdk.ModifierType.CONTROL_MASK
|
|
|
|
# Allow Tab and Shift+Tab to move focus away from combo box
|
|
if keyval == Gdk.KEY_Tab:
|
|
# Close combo box popup if open
|
|
widget.popdown()
|
|
|
|
# Let the normal tab handling take over
|
|
if event.state & Gdk.ModifierType.SHIFT_MASK:
|
|
# Shift+Tab - move to previous widget
|
|
widget.get_toplevel().child_focus(Gtk.DirectionType.TAB_BACKWARD)
|
|
else:
|
|
# Tab - move to next widget
|
|
widget.get_toplevel().child_focus(Gtk.DirectionType.TAB_FORWARD)
|
|
return True
|
|
|
|
return False
|
|
|
|
def setup_settings_focus_chain(self, container):
|
|
"""Set up explicit focus chain for settings tab - disabled for now"""
|
|
# Commenting out focus chain to let GTK handle it naturally
|
|
# GTK accessibility with explicit focus chains is problematic
|
|
pass
|
|
|
|
def load_current_settings(self):
|
|
"""Load current settings into UI"""
|
|
provider = self.config.get('provider')
|
|
if provider == 'claude-code':
|
|
self.claudeRadio.set_active(True)
|
|
else:
|
|
self.ollamaRadio.set_active(True)
|
|
|
|
self.on_provider_changed(None)
|
|
self.refresh_ollama_models()
|
|
|
|
# Set saved models after radio buttons are created
|
|
self.set_saved_model_selections()
|
|
|
|
def set_saved_model_selections(self):
|
|
"""Set the saved model selections on radio buttons"""
|
|
saved_model = self.config.get('ollama_model')
|
|
saved_vision_model = self.config.get('ollama_vision_model')
|
|
|
|
# Set text model selection
|
|
for radio in self.textModelRadios:
|
|
if radio.get_label() == saved_model:
|
|
radio.set_active(True)
|
|
break
|
|
|
|
# Set vision model selection
|
|
for radio in self.visionModelRadios:
|
|
if radio.get_label() == saved_vision_model:
|
|
radio.set_active(True)
|
|
break
|
|
|
|
def on_provider_changed(self, widget):
|
|
"""Handle provider radio button change"""
|
|
if self.claudeRadio.get_active():
|
|
self.ollamaFrame.set_sensitive(False)
|
|
self.update_status("Claude Code selected")
|
|
else:
|
|
self.ollamaFrame.set_sensitive(True)
|
|
self.update_status("Ollama selected")
|
|
|
|
def refresh_ollama_models(self):
|
|
"""Refresh the list of available Ollama models using radio buttons"""
|
|
# Clear existing radio buttons
|
|
for radio in self.textModelRadios:
|
|
self.textModelBox.remove(radio)
|
|
for radio in self.visionModelRadios:
|
|
self.visionModelBox.remove(radio)
|
|
|
|
self.textModelRadios = []
|
|
self.visionModelRadios = []
|
|
self.textModelGroup = None
|
|
self.visionModelGroup = None
|
|
|
|
if self.ollamaInterface.is_available():
|
|
all_models = self.ollamaInterface.get_models()
|
|
vision_models = self.ollamaInterface.get_vision_models()
|
|
|
|
# Create radio buttons for text models (all models)
|
|
for i, model in enumerate(all_models):
|
|
if i == 0:
|
|
# First radio button in group
|
|
radio = Gtk.RadioButton.new_with_label(None, model)
|
|
self.textModelGroup = radio
|
|
else:
|
|
# Additional radio buttons in group
|
|
radio = Gtk.RadioButton.new_with_label_from_widget(self.textModelGroup, model)
|
|
|
|
radio.set_can_focus(True)
|
|
radio.get_accessible().set_description(f"Use {model} for text questions")
|
|
radio.connect("toggled", self.on_text_model_changed)
|
|
self.textModelRadios.append(radio)
|
|
self.textModelBox.pack_start(radio, False, False, 0)
|
|
|
|
# Create radio buttons for vision models
|
|
for i, model in enumerate(vision_models):
|
|
if i == 0:
|
|
# First radio button in vision group
|
|
radio = Gtk.RadioButton.new_with_label(None, model)
|
|
self.visionModelGroup = radio
|
|
else:
|
|
# Additional radio buttons in vision group
|
|
radio = Gtk.RadioButton.new_with_label_from_widget(self.visionModelGroup, model)
|
|
|
|
radio.set_can_focus(True)
|
|
radio.get_accessible().set_description(f"Use {model} for image analysis")
|
|
radio.connect("toggled", self.on_vision_model_changed)
|
|
self.visionModelRadios.append(radio)
|
|
self.visionModelBox.pack_start(radio, False, False, 0)
|
|
|
|
# Add "None" option for vision models
|
|
if vision_models:
|
|
radio = Gtk.RadioButton.new_with_label_from_widget(self.visionModelGroup, "(No vision model)")
|
|
radio.set_can_focus(True)
|
|
radio.get_accessible().set_description("Don't use vision models")
|
|
radio.connect("toggled", self.on_vision_model_changed)
|
|
self.visionModelRadios.append(radio)
|
|
self.visionModelBox.pack_start(radio, False, False, 0)
|
|
|
|
# Show the new radio buttons
|
|
self.textModelBox.show_all()
|
|
self.visionModelBox.show_all()
|
|
|
|
if all_models:
|
|
# Select first text model by default
|
|
if self.textModelRadios:
|
|
self.textModelRadios[0].set_active(True)
|
|
# Select first vision model by default
|
|
if self.visionModelRadios:
|
|
self.visionModelRadios[0].set_active(True)
|
|
|
|
status = f"Found {len(all_models)} total models"
|
|
if vision_models:
|
|
status += f", {len(vision_models)} vision models"
|
|
self.update_status(status)
|
|
else:
|
|
self.update_status("Ollama running but no models found")
|
|
else:
|
|
self.update_status("Ollama not available")
|
|
|
|
def on_text_model_changed(self, widget):
|
|
"""Handle text model radio button change"""
|
|
if widget.get_active():
|
|
model = widget.get_label()
|
|
self.config.set('ollama_model', model)
|
|
|
|
def on_vision_model_changed(self, widget):
|
|
"""Handle vision model radio button change"""
|
|
if widget.get_active():
|
|
model = widget.get_label()
|
|
if model != "(No vision model)":
|
|
self.config.set('ollama_vision_model', model)
|
|
|
|
def on_refresh_models(self, widget):
|
|
"""Handle refresh models button click"""
|
|
# Update host if changed
|
|
new_host = self.hostEntry.get_text()
|
|
self.ollamaInterface = OllamaInterface(new_host)
|
|
self.refresh_ollama_models()
|
|
|
|
def on_save_settings(self, widget):
|
|
"""Save settings to configuration"""
|
|
if self.claudeRadio.get_active():
|
|
self.config.set('provider', 'claude-code')
|
|
else:
|
|
self.config.set('provider', 'ollama')
|
|
|
|
self.config.set('ollama_host', self.hostEntry.get_text())
|
|
|
|
# Save selected text model
|
|
for radio in self.textModelRadios:
|
|
if radio.get_active():
|
|
self.config.set('ollama_model', radio.get_label())
|
|
break
|
|
|
|
# Save selected vision model
|
|
for radio in self.visionModelRadios:
|
|
if radio.get_active():
|
|
model = radio.get_label()
|
|
if model != "(No vision model)":
|
|
self.config.set('ollama_vision_model', model)
|
|
break
|
|
|
|
self.config.set('confirm_actions', 'true' if self.confirmCheck.get_active() else 'false')
|
|
|
|
# Save voice settings
|
|
self.config.set('voice_enabled', 'true' if self.voiceEnabledCheck.get_active() else 'false')
|
|
self.config.set('voice_output', 'true' if self.voiceOutputCheck.get_active() else 'false')
|
|
self.config.set('wake_word', self.wakeWordEntry.get_text())
|
|
self.config.set('voice_timeout', str(int(self.timeoutSpin.get_value())))
|
|
|
|
self.update_status("Settings saved successfully!")
|
|
|
|
# Update button labels with new AI provider
|
|
self.update_button_labels()
|
|
|
|
def update_status(self, message):
|
|
"""Update status label"""
|
|
self.statusLabel.set_text(message)
|
|
GLib.timeout_add_seconds(5, lambda: self.statusLabel.set_text(""))
|
|
|
|
def get_current_ai_name(self):
|
|
"""Get the name of the currently selected AI provider"""
|
|
provider = self.config.get('provider')
|
|
if provider == 'claude-code':
|
|
return "Claude"
|
|
elif provider == 'ollama':
|
|
model = self.config.get('ollama_model', 'llama2')
|
|
return f"Ollama ({model})" if model != 'llama2' else "Ollama"
|
|
else:
|
|
return "AI"
|
|
|
|
def update_button_labels(self):
|
|
"""Update button labels with current AI provider name"""
|
|
ai_name = self.get_current_ai_name()
|
|
self.askButton.set_label(f"Ask _{ai_name}")
|
|
self.contextButton.set_label(f"Ask {ai_name} About _Window")
|
|
self.actionButton.set_label(f"Request {ai_name} _Action")
|
|
if hasattr(self, 'shareButton'):
|
|
self.shareButton.set_label(f"Ask {ai_name} About _File")
|
|
|
|
# Update response label
|
|
self.responseLabel.set_label(f"{ai_name} _Response:")
|
|
|
|
# Update accessible descriptions
|
|
response_atk = self.responseText.get_accessible()
|
|
response_atk.set_name(f"{ai_name} Response")
|
|
response_atk.set_description(f"{ai_name}'s response to your question")
|
|
|
|
# Update button descriptions
|
|
self.askButton.get_accessible().set_description(f"Send your question to {ai_name}")
|
|
self.contextButton.get_accessible().set_description(f"Ask {ai_name} about the currently focused window")
|
|
self.actionButton.get_accessible().set_description(f"Request step-by-step instructions from {ai_name}")
|
|
if hasattr(self, 'shareButton'):
|
|
self.shareButton.get_accessible().set_description(f"Ask {ai_name} about the selected file")
|
|
|
|
def get_question_text(self):
|
|
"""Get text from question text view"""
|
|
buffer = self.questionText.get_buffer()
|
|
start_iter = buffer.get_start_iter()
|
|
end_iter = buffer.get_end_iter()
|
|
return buffer.get_text(start_iter, end_iter, False)
|
|
|
|
def set_response_text(self, text):
|
|
"""Set text in response text view"""
|
|
buffer = self.responseText.get_buffer()
|
|
buffer.set_text(text)
|
|
|
|
def append_response_text(self, text):
|
|
"""Append text to response text view"""
|
|
buffer = self.responseText.get_buffer()
|
|
end_iter = buffer.get_end_iter()
|
|
buffer.insert(end_iter, "\n\n" + text)
|
|
|
|
def show_processing(self, provider_type):
|
|
"""Show processing message"""
|
|
# Show specific model name if available
|
|
if hasattr(self, 'current_processing_model') and self.current_processing_model:
|
|
if provider_type == 'ollama':
|
|
ai_name = f"Ollama ({self.current_processing_model})"
|
|
else:
|
|
ai_name = self.get_current_ai_name()
|
|
else:
|
|
ai_name = self.get_current_ai_name()
|
|
|
|
self.set_response_text(f"{ai_name} is processing your request...")
|
|
self.askButton.set_sensitive(False)
|
|
self.contextButton.set_sensitive(False)
|
|
self.actionButton.set_sensitive(False)
|
|
|
|
# Play processing sound if available
|
|
if SystemCommands.is_command_available('play'):
|
|
subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800'],
|
|
capture_output=True)
|
|
|
|
def hide_processing(self):
|
|
"""Hide processing message and re-enable buttons"""
|
|
self.askButton.set_sensitive(True)
|
|
self.contextButton.set_sensitive(True)
|
|
self.actionButton.set_sensitive(True)
|
|
|
|
# Play completion sound if available
|
|
if SystemCommands.is_command_available('play'):
|
|
subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200'],
|
|
capture_output=True)
|
|
|
|
def send_ai_request(self, message, context=None, image_path=None):
|
|
"""Send request to selected AI provider"""
|
|
provider = self.config.get('provider')
|
|
|
|
# Add neutral system context to avoid AI making assumptions
|
|
if context is None and not image_path:
|
|
system_context = "You are a helpful AI assistant. Please provide a direct and helpful response to the user's question without making assumptions about their specific use case or technical setup."
|
|
else:
|
|
system_context = context
|
|
|
|
# Store which model is being used for status display
|
|
self.current_processing_model = None
|
|
|
|
if provider == 'claude-code':
|
|
if not self.claudeInterface.is_available():
|
|
return "Error: Claude Code is not available. Please install or configure Claude Code."
|
|
|
|
self.show_processing("claude-code")
|
|
try:
|
|
response = self.claudeInterface.send_message(message, system_context, image_path)
|
|
return response
|
|
finally:
|
|
self.hide_processing()
|
|
|
|
elif provider == 'ollama':
|
|
if not self.ollamaInterface.is_available():
|
|
return "Error: Ollama is not available. Please start Ollama service."
|
|
|
|
# Choose model based on whether we have an image
|
|
if image_path:
|
|
model = self.config.get('ollama_vision_model', 'llava')
|
|
# Verify the vision model is available
|
|
available_models = self.ollamaInterface.get_models()
|
|
if model not in available_models:
|
|
vision_models = self.ollamaInterface.get_vision_models()
|
|
if vision_models:
|
|
model = vision_models[0] # Use first available vision model
|
|
else:
|
|
return "Error: No vision models available for image processing. Please install a vision model like llava."
|
|
else:
|
|
model = self.config.get('ollama_model')
|
|
|
|
# Store the actual model being used for status display
|
|
self.current_processing_model = model
|
|
self.show_processing("ollama")
|
|
try:
|
|
response = self.ollamaInterface.send_message(message, model, system_context, image_path)
|
|
return response
|
|
finally:
|
|
self.hide_processing()
|
|
|
|
return "Error: No AI provider configured"
|
|
|
|
def on_ask_question(self, widget):
|
|
"""Handle ask question button click"""
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
self.set_response_text("Please enter a question first.")
|
|
return
|
|
|
|
def ask_in_thread():
|
|
response = self.send_ai_request(question)
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
threading.Thread(target=ask_in_thread, daemon=True).start()
|
|
|
|
def on_ask_with_context(self, widget):
|
|
"""Handle ask with context button click"""
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
self.set_response_text("Please enter a question first.")
|
|
return
|
|
|
|
def ask_with_context_in_thread():
|
|
context = self.windowContext.get_focused_window_info()
|
|
response = self.send_ai_request(question, context)
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
threading.Thread(target=ask_with_context_in_thread, daemon=True).start()
|
|
|
|
def on_request_action(self, widget):
|
|
"""Handle request action button click"""
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
self.set_response_text("Please enter an action request first.")
|
|
return
|
|
|
|
# Add action context to the request
|
|
action_prompt = f"Please provide step-by-step instructions for: {question}\n\nFormat your response as a numbered list of specific actions I should take."
|
|
|
|
def request_action_in_thread():
|
|
response = self.send_ai_request(action_prompt)
|
|
|
|
# Show confirmation dialog if enabled
|
|
if self.config.get('confirm_actions') == 'true':
|
|
GLib.idle_add(self.show_action_confirmation, response)
|
|
else:
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
threading.Thread(target=request_action_in_thread, daemon=True).start()
|
|
|
|
def show_action_confirmation(self, response):
|
|
"""Show confirmation dialog for AI actions"""
|
|
dialog = Gtk.MessageDialog(
|
|
transient_for=self,
|
|
flags=0,
|
|
message_type=Gtk.MessageType.QUESTION,
|
|
buttons=Gtk.ButtonsType.YES_NO,
|
|
text="AI Action Confirmation"
|
|
)
|
|
|
|
dialog.format_secondary_text(
|
|
f"The AI has provided the following action plan:\n\n{response}\n\n"
|
|
"Do you want to proceed with these actions?"
|
|
)
|
|
|
|
response_id = dialog.run()
|
|
dialog.destroy()
|
|
|
|
if response_id == Gtk.ResponseType.YES:
|
|
self.set_response_text(f"Action plan approved:\n\n{response}")
|
|
else:
|
|
self.set_response_text("Action cancelled by user.")
|
|
|
|
def on_browse_file(self, widget):
|
|
"""Handle browse file button click"""
|
|
dialog = Gtk.FileChooserDialog(
|
|
title="Select file to share",
|
|
parent=self,
|
|
action=Gtk.FileChooserAction.OPEN
|
|
)
|
|
|
|
dialog.add_buttons(
|
|
Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
|
|
Gtk.STOCK_OPEN, Gtk.ResponseType.OK
|
|
)
|
|
|
|
response = dialog.run()
|
|
if response == Gtk.ResponseType.OK:
|
|
filename = dialog.get_filename()
|
|
self.fileEntry.set_text(filename)
|
|
|
|
dialog.destroy()
|
|
|
|
def on_ask_about_file(self, widget):
|
|
"""Handle ask about file button click"""
|
|
file_path = self.fileEntry.get_text().strip()
|
|
if not file_path or not os.path.exists(file_path):
|
|
self.set_response_text("Please select a valid file first.")
|
|
return
|
|
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
question = "Please analyze this file and tell me what it does."
|
|
|
|
def ask_about_file_in_thread():
|
|
# For text files, read content and add to message
|
|
if file_path.lower().endswith(('.txt', '.py', '.sh', '.conf', '.md', '.json', '.xml', '.html')):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
file_content = f.read()
|
|
|
|
file_question = f"{question}\n\nFile: {file_path}\nContent:\n{file_content}"
|
|
response = self.send_ai_request(file_question)
|
|
except Exception as e:
|
|
response = f"Error reading file: {str(e)}"
|
|
else:
|
|
# For other files (including images), use Claude Code's file handling
|
|
response = self.send_ai_request(question, image_path=file_path)
|
|
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
threading.Thread(target=ask_about_file_in_thread, daemon=True).start()
|
|
|
|
def on_describe_image(self, widget):
|
|
"""Handle describe screenshot button click"""
|
|
def describe_image_in_thread():
|
|
# Check if scrot is available
|
|
if not SystemCommands.is_command_available('scrot'):
|
|
GLib.idle_add(self.set_response_text, "Error: scrot not available. Please install scrot for screenshots.")
|
|
return
|
|
|
|
# Take screenshot
|
|
temp_dir = tempfile.mkdtemp()
|
|
screenshot_path = os.path.join(temp_dir, 'screenshot.png')
|
|
|
|
try:
|
|
# Use scrot to take screenshot
|
|
result = subprocess.run(['scrot', screenshot_path],
|
|
capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode != 0:
|
|
GLib.idle_add(self.set_response_text, "Error: Could not take screenshot")
|
|
return
|
|
|
|
# Send to AI for description
|
|
response = self.send_ai_request(
|
|
"Please describe what you see in this screenshot in detail. "
|
|
"Focus on any text, interface elements, and visual content.",
|
|
image_path=screenshot_path
|
|
)
|
|
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
except Exception as e:
|
|
GLib.idle_add(self.set_response_text, f"Error taking screenshot: {str(e)}")
|
|
finally:
|
|
# Clean up temp file
|
|
try:
|
|
os.unlink(screenshot_path)
|
|
os.rmdir(temp_dir)
|
|
except (FileNotFoundError, OSError) as e:
|
|
pass
|
|
|
|
threading.Thread(target=describe_image_in_thread, daemon=True).start()
|
|
|
|
def on_analyze_selected(self, widget):
|
|
"""Handle analyze selected text/screen content button click"""
|
|
def analyze_selected_in_thread():
|
|
try:
|
|
# First, try to get clipboard content (selected text)
|
|
# Use wl-paste on Wayland, xclip on X11
|
|
selected_text = ""
|
|
if os.environ.get('WAYLAND_DISPLAY'):
|
|
if SystemCommands.is_command_available('wl-paste'):
|
|
clipboard_result = subprocess.run(['wl-paste', '-p'],
|
|
capture_output=True, text=True, timeout=5)
|
|
selected_text = clipboard_result.stdout.strip() if clipboard_result.returncode == 0 else ""
|
|
else:
|
|
if SystemCommands.is_command_available('xclip'):
|
|
clipboard_result = subprocess.run(['xclip', '-o', '-selection', 'primary'],
|
|
capture_output=True, text=True, timeout=5)
|
|
selected_text = clipboard_result.stdout.strip() if clipboard_result.returncode == 0 else ""
|
|
|
|
if selected_text:
|
|
# We have selected text, analyze it
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
question = "Please analyze this selected text and tell me what it means or what I should know about it."
|
|
|
|
full_question = f"{question}\n\nSelected text: {selected_text}"
|
|
response = self.send_ai_request(full_question)
|
|
|
|
else:
|
|
# No selected text, fallback to OCR of current screen
|
|
# Check if scrot is available
|
|
if not SystemCommands.is_command_available('scrot'):
|
|
GLib.idle_add(self.set_response_text, "Error: No selected text found and scrot not available for screen capture.")
|
|
return
|
|
|
|
# Take screenshot first
|
|
temp_dir = tempfile.mkdtemp()
|
|
screenshot_path = os.path.join(temp_dir, 'screen_analysis.png')
|
|
|
|
try:
|
|
# Take screenshot
|
|
scrot_result = subprocess.run(['scrot', screenshot_path],
|
|
capture_output=True, text=True, timeout=10)
|
|
|
|
if scrot_result.returncode != 0:
|
|
GLib.idle_add(self.set_response_text, "Error: Could not capture screen content")
|
|
return
|
|
|
|
# Try OCR first to get text content
|
|
try:
|
|
from PIL import Image
|
|
import pytesseract
|
|
|
|
image = Image.open(screenshot_path)
|
|
ocr_text = pytesseract.image_to_string(image).strip()
|
|
|
|
if ocr_text:
|
|
# We found text via OCR
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
question = "Please analyze this text content and tell me what's important or what I should know about it."
|
|
|
|
full_question = f"{question}\n\nScreen text content: {ocr_text}"
|
|
response = self.send_ai_request(full_question)
|
|
|
|
else:
|
|
# No text found, do visual analysis
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
question = "Please analyze this screen content and tell me what you see."
|
|
|
|
response = self.send_ai_request(question, image_path=screenshot_path)
|
|
|
|
except ImportError:
|
|
# Fallback to AI image analysis without OCR
|
|
question = self.get_question_text().strip()
|
|
if not question:
|
|
question = "Please analyze this screen content and tell me what you see."
|
|
|
|
response = self.send_ai_request(question, image_path=screenshot_path)
|
|
|
|
finally:
|
|
# Clean up temp file
|
|
try:
|
|
os.unlink(screenshot_path)
|
|
os.rmdir(temp_dir)
|
|
except (FileNotFoundError, OSError) as e:
|
|
pass
|
|
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
except Exception as e:
|
|
GLib.idle_add(self.set_response_text, f"Error analyzing content: {str(e)}")
|
|
|
|
threading.Thread(target=analyze_selected_in_thread, daemon=True).start()
|
|
|
|
def speak_text(self, text):
|
|
"""Use spd-say to speak text if voice output is enabled"""
|
|
if self.config.get('voice_output') == 'true':
|
|
if not SystemCommands.is_command_available('spd-say'):
|
|
print("Warning: spd-say not available for text-to-speech")
|
|
return
|
|
try:
|
|
subprocess.run(['spd-say', '-P', 'important', text],
|
|
capture_output=True, timeout=30)
|
|
except Exception as e:
|
|
print(f"Error speaking text: {e}")
|
|
|
|
def update_voice_status(self, message):
|
|
"""Update voice status label"""
|
|
GLib.idle_add(self.voiceStatus.set_text, message)
|
|
|
|
def on_voice_question(self, widget):
|
|
"""Handle voice question button click"""
|
|
if not self.voiceRecognition.is_available():
|
|
self.set_response_text("Voice recognition not available. Please install python-speech-recognition and python-pyaudio.")
|
|
return
|
|
|
|
def voice_question_thread():
|
|
try:
|
|
self.update_voice_status("🎤 Listening...")
|
|
|
|
# Play recording start sound if available
|
|
if SystemCommands.is_command_available('play'):
|
|
subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000', 'vol', '0.3'],
|
|
capture_output=True)
|
|
|
|
timeout = int(self.config.get('voice_timeout', '5'))
|
|
recognized_text = self.voiceRecognition.recognize_speech(timeout=timeout)
|
|
|
|
# Play recording end sound if available
|
|
if SystemCommands.is_command_available('play'):
|
|
subprocess.run(['play', '-qnG', 'synth', '0.05', 'sin', '1200', 'vol', '0.3'],
|
|
capture_output=True)
|
|
|
|
if recognized_text.startswith("Error:") or recognized_text.startswith("Sorry,"):
|
|
self.update_voice_status(recognized_text)
|
|
self.speak_text(recognized_text)
|
|
return
|
|
|
|
# Set the recognized text in the question field
|
|
GLib.idle_add(self.set_question_text, recognized_text)
|
|
self.update_voice_status(f"Recognized: {recognized_text}")
|
|
|
|
# Automatically send the question to AI
|
|
response = self.send_ai_request(recognized_text)
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
# Speak the response if enabled
|
|
self.speak_text(response)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Voice recognition error: {str(e)}"
|
|
self.update_voice_status(error_msg)
|
|
GLib.idle_add(self.set_response_text, error_msg)
|
|
finally:
|
|
self.update_voice_status("")
|
|
|
|
threading.Thread(target=voice_question_thread, daemon=True).start()
|
|
|
|
def on_toggle_continuous_listening(self, widget):
|
|
"""Handle continuous listening toggle"""
|
|
if not self.voiceRecognition.is_available():
|
|
widget.set_active(False)
|
|
self.set_response_text("Voice recognition not available.")
|
|
return
|
|
|
|
if widget.get_active():
|
|
self.start_continuous_listening()
|
|
else:
|
|
self.stop_continuous_listening()
|
|
|
|
def start_continuous_listening(self):
|
|
"""Start continuous listening for wake word"""
|
|
if self.continuousListening:
|
|
return
|
|
|
|
self.continuousListening = True
|
|
self.stopListening.clear()
|
|
self.update_voice_status("👂 Listening for wake word...")
|
|
|
|
wake_word = self.config.get('wake_word', 'hey assistant').lower()
|
|
|
|
def wake_word_callback(text):
|
|
if wake_word in text:
|
|
GLib.idle_add(self.on_wake_word_detected)
|
|
|
|
def continuous_listening_thread():
|
|
self.voiceRecognition.recognize_speech_continuous(wake_word_callback, self.stopListening)
|
|
|
|
self.listeningThread = threading.Thread(target=continuous_listening_thread, daemon=True)
|
|
self.listeningThread.start()
|
|
|
|
def stop_continuous_listening(self):
|
|
"""Stop continuous listening"""
|
|
if not self.continuousListening:
|
|
return
|
|
|
|
self.continuousListening = False
|
|
self.stopListening.set()
|
|
self.update_voice_status("")
|
|
|
|
if self.listeningThread:
|
|
self.listeningThread.join(timeout=2)
|
|
|
|
def on_wake_word_detected(self):
|
|
"""Handle wake word detection"""
|
|
ai_name = self.get_current_ai_name()
|
|
self.speak_text("Yes, what can I help you with?")
|
|
self.update_voice_status(f"🎤 Wake word detected, listening for {ai_name}...")
|
|
|
|
# Play wake word detection sound if available
|
|
if SystemCommands.is_command_available('play'):
|
|
subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '800', 'vol', '0.4'],
|
|
capture_output=True)
|
|
|
|
def wake_response_thread():
|
|
try:
|
|
timeout = int(self.config.get('voice_timeout', '5'))
|
|
recognized_text = self.voiceRecognition.recognize_speech(timeout=timeout)
|
|
|
|
if recognized_text.startswith("Error:") or recognized_text.startswith("Sorry,"):
|
|
self.update_voice_status("")
|
|
self.speak_text("I didn't catch that. Please try again.")
|
|
return
|
|
|
|
# Process the question
|
|
GLib.idle_add(self.set_question_text, recognized_text)
|
|
response = self.send_ai_request(recognized_text)
|
|
GLib.idle_add(self.set_response_text, response)
|
|
|
|
# Speak the response
|
|
self.speak_text(response)
|
|
|
|
except Exception as e:
|
|
self.speak_text("Sorry, there was an error processing your question.")
|
|
finally:
|
|
self.update_voice_status("👂 Listening for wake word...")
|
|
|
|
threading.Thread(target=wake_response_thread, daemon=True).start()
|
|
|
|
def set_question_text(self, text):
|
|
"""Set text in question text view"""
|
|
buffer = self.questionText.get_buffer()
|
|
buffer.set_text(text)
|
|
|
|
def on_key_press(self, widget, event):
|
|
"""Handle keyboard shortcuts"""
|
|
keyval = event.keyval
|
|
state = event.state & (Gdk.ModifierType.CONTROL_MASK | Gdk.ModifierType.MOD1_MASK)
|
|
|
|
# Ctrl+Tab to switch tabs
|
|
if (state & Gdk.ModifierType.CONTROL_MASK) and keyval == Gdk.KEY_Tab:
|
|
current = self.notebook.get_current_page()
|
|
next_page = (current + 1) % self.notebook.get_n_pages()
|
|
self.notebook.set_current_page(next_page)
|
|
return True
|
|
|
|
# Ctrl+Shift+Tab to switch tabs backwards
|
|
if (state & Gdk.ModifierType.CONTROL_MASK) and keyval == Gdk.KEY_ISO_Left_Tab:
|
|
current = self.notebook.get_current_page()
|
|
prev_page = (current - 1) % self.notebook.get_n_pages()
|
|
self.notebook.set_current_page(prev_page)
|
|
return True
|
|
|
|
# F4 for voice input (accessibility shortcut)
|
|
if keyval == Gdk.KEY_F4:
|
|
if self.voiceRecognition.is_available():
|
|
self.on_voice_question(None)
|
|
return True
|
|
|
|
# F5 to toggle continuous listening
|
|
if keyval == Gdk.KEY_F5:
|
|
if self.voiceRecognition.is_available():
|
|
self.listenToggle.set_active(not self.listenToggle.get_active())
|
|
return True
|
|
|
|
# Ctrl+S to save settings (when on settings tab)
|
|
if (state & Gdk.ModifierType.CONTROL_MASK) and keyval == Gdk.KEY_s:
|
|
if self.notebook.get_current_page() == 1: # Settings tab
|
|
self.on_save_settings(None)
|
|
return True
|
|
|
|
# Escape to close program
|
|
if keyval == Gdk.KEY_Escape:
|
|
self.cleanup()
|
|
Gtk.main_quit()
|
|
return True
|
|
|
|
return False
|
|
|
|
def on_textview_key_press(self, widget, event):
|
|
"""Handle key press events in text views for better navigation"""
|
|
keyval = event.keyval
|
|
state = event.state & Gdk.ModifierType.CONTROL_MASK
|
|
|
|
# Ctrl+Enter to submit question
|
|
if state and keyval == Gdk.KEY_Return:
|
|
self.on_ask_question(None)
|
|
return True
|
|
|
|
return False
|
|
|
|
def set_initial_focus(self):
|
|
"""Set initial focus to the question text input"""
|
|
self.questionText.grab_focus()
|
|
return False # Don't repeat this idle callback
|
|
|
|
def on_tab_switched(self, notebook, page, page_num):
|
|
"""Handle tab switching to set proper focus"""
|
|
if page_num == 0: # Interaction tab
|
|
GLib.idle_add(lambda: self.questionText.grab_focus())
|
|
elif page_num == 1: # Settings tab
|
|
# Focus the first radio button in settings
|
|
GLib.idle_add(lambda: self.claudeRadio.grab_focus())
|
|
|
|
def cleanup(self):
|
|
"""Cleanup voice resources on exit"""
|
|
self.stop_continuous_listening()
|
|
if self.voiceRecognition:
|
|
self.voiceRecognition.stop_recording()
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
# Check system dependencies
|
|
missing_required, missing_optional = SystemCommands.check_dependencies()
|
|
|
|
if missing_required:
|
|
print("WARNING: Missing required commands:")
|
|
for cmd, desc in missing_required.items():
|
|
print(f" - {cmd}: {desc}")
|
|
print("\nSome features may not work properly.")
|
|
|
|
if missing_optional:
|
|
print("INFO: Missing optional commands:")
|
|
for cmd, desc in missing_optional.items():
|
|
print(f" - {cmd}: {desc}")
|
|
|
|
app = AiAssistant()
|
|
app.show_all()
|
|
|
|
# Play startup sound if available
|
|
if SystemCommands.is_command_available('play'):
|
|
subprocess.run(['play', '-qnG', 'synth', '0.1', 'sin', '1000'],
|
|
capture_output=True)
|
|
|
|
# Connect cleanup on destroy
|
|
app.connect("destroy", lambda w: app.cleanup())
|
|
|
|
try:
|
|
Gtk.main()
|
|
except KeyboardInterrupt:
|
|
app.cleanup()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|