diff --git a/.gitignore b/.gitignore index b0209fd..3ff1f7e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,5 @@ image/* *.img *.img.xz -# Claude AI assistant files -CLAUDE.md +# AI assistant files +AGENTS.md diff --git a/etc/speech-dispatcher/speechd.conf.bak b/etc/speech-dispatcher/speechd.conf.bak index 540b338..d146afc 100644 --- a/etc/speech-dispatcher/speechd.conf.bak +++ b/etc/speech-dispatcher/speechd.conf.bak @@ -288,6 +288,7 @@ AddModule "piper-tts-generic" "sd_generic" "piper-tts-generic.conf" #AddModule "mary-generic" "sd_generic" "mary-generic.conf" #AddModule "baratinoo" "sd_baratinoo" "baratinoo.conf" AddModule "rhvoice" "sd_rhvoice" "rhvoice.conf" +AddModule "viavoice" "sd_viavoice" "viavoice.conf" AddModule "voxin" "sd_voxin" "voxin.conf" # The output module testing doesn't actually connect to anything. It diff --git a/usr/local/bin/audio_manager.py b/usr/local/bin/audio_manager.py new file mode 100755 index 0000000..88a9cf4 --- /dev/null +++ b/usr/local/bin/audio_manager.py @@ -0,0 +1,600 @@ +#!/usr/bin/env python3 + +import curses +import json +import os +import re +import shutil +import subprocess +import sys +import time +from dataclasses import dataclass + +try: + import simpleaudio as sa +except Exception: + sa = None + +try: + import speechd +except Exception: + speechd = None + + +FEEDBACK_SOUND = "/usr/share/sounds/stormux/menu_move.wav" +TEST_SOUND = "/usr/share/sounds/stormux/menu_select.wav" +HELP_TEXT = ( + "Use left and right to move between controls. " + "Use up and down to change the selected control. " + "Press space to hear the current value. " + "Press enter to confirm a pending device change or apply the selected device. " + "Press escape or q to exit." +) + + +def clamp_percent(value, minimum, maximum): + return max(minimum, min(maximum, value)) + + +def format_percent_text(value): + return f"{value} percent" + + +def cycle_index(current_index, delta, total_count): + if total_count <= 0: + return 0 + return (current_index + delta) % total_count + + +def pending_change_expired(change, now_value=None): + if change is None: + return False + if now_value is None: + now_value = time.time() + return (now_value - change.started_at) >= change.timeout_seconds + + +def build_default_command(device_kind, device_name): + if device_kind == "sink": + return ["pactl", "set-default-sink", str(device_name)] + if device_kind == "source": + return ["pactl", "set-default-source", str(device_name)] + raise ValueError(f"Unsupported device kind: {device_kind}") + + +def build_volume_command(device_kind, device_name, volume_percent): + bounded = clamp_percent(volume_percent, 0, 150) + if device_kind == "sink": + return ["pactl", "set-sink-volume", str(device_name), f"{bounded}%"] + if device_kind == "source": + return ["pactl", "set-source-volume", str(device_name), f"{bounded}%"] + raise ValueError(f"Unsupported device kind: {device_kind}") + + +def is_monitor_source(device_name, description): + lowered_name = (device_name or "").lower() + lowered_description = (description or "").lower() + return lowered_name.endswith(".monitor") or lowered_description.startswith("monitor of ") + + +def extract_devices(payload, exclude_monitors=False): + devices = [] + for item in payload: + device_id = item.get("index") + node_name = item.get("name", "") + properties = item.get("properties") or {} + description = ( + properties.get("device.description") + or item.get("description") + or properties.get("node.description") + or node_name + or f"Device {device_id}" + ) + + if device_id is None: + continue + + if exclude_monitors and is_monitor_source(node_name, description): + continue + + devices.append( + { + "id": device_id, + "name": description, + "node_name": node_name, + } + ) + return devices + + +def parse_percent_volume(output_text): + match = re.search(r"(\d+)%", output_text) + if not match: + raise RuntimeError(f"Unexpected percent volume output: {output_text}") + return int(match.group(1)) + + +def parse_fraction_volume(output_text): + match = re.search(r"([0-9]*\.?[0-9]+)", output_text) + if not match: + raise RuntimeError(f"Unexpected fractional volume output: {output_text}") + return round(float(match.group(1)) * 100) + + +def build_external_sound_command(file_path): + player_candidates = [ + ("pw-play", ["pw-play", file_path]), + ("paplay", ["paplay", file_path]), + ("aplay", ["aplay", file_path]), + ("play", ["play", "-q", file_path]), + ] + for executable, command in player_candidates: + if shutil.which(executable) is not None: + return command + return None + + +def build_feedback_tone_command(): + if shutil.which("sox") is None: + return None + return ["sox", "-nqdV0", "synth", ".1", "tri", "840", "fade", ".04", ".1", ".04"] + + +def find_device_index(devices, *, node_name=None, device_id=None): + for index, device in enumerate(devices): + if device_id is not None and device["id"] == device_id: + return index + if node_name is not None and device["node_name"] == node_name: + return index + return 0 + + +@dataclass +class PendingDeviceChange: + device_kind: str + previous_target: str + candidate_target: str + candidate_name: str + started_at: float = 0.0 + timeout_seconds: int = 15 + + +class AudioBackend: + def run_command(self, command): + return subprocess.run(command, capture_output=True, text=True, check=True) + + def list_sinks(self): + payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sinks"]).stdout) + return extract_devices(payload) + + def list_sources(self): + payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sources"]).stdout) + return extract_devices(payload, exclude_monitors=True) + + def get_default_sink_name(self): + return self.run_command(["pactl", "get-default-sink"]).stdout.strip() + + def get_default_source_name(self): + return self.run_command(["pactl", "get-default-source"]).stdout.strip() + + def set_default_device(self, device_kind, device_name): + self.run_command(build_default_command(device_kind, device_name)) + + def set_device_volume(self, device_kind, device_name, percent_value): + self.run_command(build_volume_command(device_kind, device_name, percent_value)) + + def get_device_volume(self, device_kind, device_name): + if device_kind == "sink": + output = self.run_command(["pactl", "get-sink-volume", str(device_name)]).stdout.strip() + elif device_kind == "source": + output = self.run_command(["pactl", "get-source-volume", str(device_name)]).stdout.strip() + else: + raise ValueError(f"Unsupported device kind: {device_kind}") + return parse_percent_volume(output) + + +class AudioFeedback: + def __init__(self): + self.current_sound = None + self.current_process = None + + def stop_sound(self): + if self.current_sound is not None and self.current_sound.is_playing(): + self.current_sound.stop() + self.current_sound = None + if self.current_process is not None and self.current_process.poll() is None: + self.current_process.terminate() + self.current_process = None + + def play_sound_file(self, file_path): + if sa is not None and os.path.exists(file_path): + try: + self.stop_sound() + self.current_sound = sa.WaveObject.from_wave_file(file_path).play() + return + except Exception: + pass + + external_command = build_external_sound_command(file_path) + if external_command is not None: + try: + self.stop_sound() + self.current_process = subprocess.Popen( + external_command, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return + except Exception: + pass + + curses.beep() + + def play_feedback_beep(self): + tone_command = build_feedback_tone_command() + if tone_command is not None: + try: + self.stop_sound() + self.current_process = subprocess.Popen( + tone_command, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return + except Exception: + pass + + self.play_sound_file(FEEDBACK_SOUND) + + def play_test_beep(self): + self.play_sound_file(TEST_SOUND) + + +class AudioManagerApp: + def __init__(self): + self.backend = AudioBackend() + self.feedback = AudioFeedback() + self.speech_client = None + self.stdscr = None + self.focus_names = [ + "Output Volume", + "Microphone Volume", + "Output Device", + "Microphone Device", + ] + self.focus_index = 0 + self.output_devices = [] + self.input_devices = [] + self.output_index = 0 + self.input_index = 0 + self.default_output_id = None + self.default_input_id = None + self.default_output_name = "" + self.default_input_name = "" + self.output_volume = 50 + self.input_volume = 50 + self.pending_change = None + self.status_message = "" + + def init_speech(self): + if speechd is None: + return + self.speech_client = speechd.SSIPClient("audio_manager") + self.speech_client.set_priority(speechd.Priority.IMPORTANT) + self.speech_client.set_punctuation(speechd.PunctuationMode.SOME) + + def speak(self, text, interrupt=True): + if not self.speech_client: + return + + try: + if interrupt: + self.speech_client.cancel() + self.speech_client.speak(text) + except Exception: + pass + + def set_status(self, message, speak_message=False): + self.status_message = message + if speak_message: + self.speak(message) + + def get_selected_output_device(self): + if not self.output_devices: + return None + return self.output_devices[self.output_index] + + def get_selected_input_device(self): + if not self.input_devices: + return None + return self.input_devices[self.input_index] + + def refresh_state(self, preserve_selection=False): + previous_output_id = None + previous_input_id = None + if preserve_selection: + selected_output = self.get_selected_output_device() + selected_input = self.get_selected_input_device() + previous_output_id = None if selected_output is None else selected_output["id"] + previous_input_id = None if selected_input is None else selected_input["id"] + + self.output_devices = self.backend.list_sinks() + self.input_devices = self.backend.list_sources() + + if not self.output_devices: + raise RuntimeError("No output devices found.") + if not self.input_devices: + raise RuntimeError("No microphone devices found.") + + default_output_name = self.backend.get_default_sink_name() + default_input_name = self.backend.get_default_source_name() + self.default_output_name = default_output_name + self.default_input_name = default_input_name + + self.output_index = find_device_index( + self.output_devices, + device_id=previous_output_id if preserve_selection else None, + node_name=None if preserve_selection else default_output_name, + ) + self.input_index = find_device_index( + self.input_devices, + device_id=previous_input_id if preserve_selection else None, + node_name=None if preserve_selection else default_input_name, + ) + + self.default_output_id = self.output_devices[ + find_device_index(self.output_devices, node_name=default_output_name) + ]["id"] + self.default_input_id = self.input_devices[ + find_device_index(self.input_devices, node_name=default_input_name) + ]["id"] + + self.output_volume = self.backend.get_device_volume("sink", self.default_output_name) + self.input_volume = self.backend.get_device_volume("source", self.default_input_name) + + def apply_volume_delta(self, delta): + try: + if self.focus_index == 0: + new_value = clamp_percent(self.output_volume + delta, 0, 150) + self.backend.set_device_volume("sink", self.default_output_name, new_value) + self.output_volume = new_value + self.feedback.play_feedback_beep() + elif self.focus_index == 1: + new_value = clamp_percent(self.input_volume + delta, 0, 150) + self.backend.set_device_volume("source", self.default_input_name, new_value) + self.input_volume = new_value + self.feedback.play_feedback_beep() + except Exception as error: + self.set_status(f"Volume change failed: {error}", speak_message=True) + + def announce_current_focus(self): + if self.focus_index == 0: + self.speak(format_percent_text(self.output_volume)) + elif self.focus_index == 1: + self.speak(format_percent_text(self.input_volume)) + elif self.focus_index == 2: + device = self.get_selected_output_device() + if device: + self.speak(device["name"]) + elif self.focus_index == 3: + device = self.get_selected_input_device() + if device: + self.speak(device["name"]) + + def move_focus(self, delta): + self.focus_index = cycle_index(self.focus_index, delta, len(self.focus_names)) + self.speak(self.focus_names[self.focus_index]) + + def handle_up(self): + if self.focus_index in (0, 1): + self.apply_volume_delta(5) + return + + if self.focus_index == 2 and self.output_devices: + self.output_index = cycle_index(self.output_index, -1, len(self.output_devices)) + self.speak(self.output_devices[self.output_index]["name"]) + elif self.focus_index == 3 and self.input_devices: + self.input_index = cycle_index(self.input_index, -1, len(self.input_devices)) + self.speak(self.input_devices[self.input_index]["name"]) + + def handle_down(self): + if self.focus_index in (0, 1): + self.apply_volume_delta(-5) + return + + if self.focus_index == 2 and self.output_devices: + self.output_index = cycle_index(self.output_index, 1, len(self.output_devices)) + self.speak(self.output_devices[self.output_index]["name"]) + elif self.focus_index == 3 and self.input_devices: + self.input_index = cycle_index(self.input_index, 1, len(self.input_devices)) + self.speak(self.input_devices[self.input_index]["name"]) + + def start_pending_device_change(self, device_kind, previous_target, candidate): + try: + self.backend.set_default_device(device_kind, candidate["node_name"]) + self.pending_change = PendingDeviceChange( + device_kind="output device" if device_kind == "sink" else "microphone device", + previous_target=previous_target, + candidate_target=candidate["node_name"], + candidate_name=candidate["name"], + started_at=time.time(), + ) + + if device_kind == "sink": + self.default_output_id = candidate["id"] + self.default_output_name = candidate["node_name"] + self.output_volume = self.backend.get_device_volume("sink", self.default_output_name) + self.feedback.play_test_beep() + else: + self.default_input_id = candidate["id"] + self.default_input_name = candidate["node_name"] + self.input_volume = self.backend.get_device_volume("source", self.default_input_name) + + self.set_status( + f"{self.pending_change.device_kind.capitalize()} changed to {candidate['name']}. Press enter within 15 seconds to keep it.", + speak_message=True, + ) + except Exception as error: + self.set_status(f"Device change failed: {error}", speak_message=True) + + def confirm_pending_change(self): + if self.pending_change is None: + return + + message = f"{self.pending_change.device_kind.capitalize()} confirmed." + self.pending_change = None + self.refresh_state() + self.set_status(message, speak_message=True) + + def rollback_pending_change(self): + if self.pending_change is None: + return + + try: + change = self.pending_change + rollback_kind = "sink" if change.device_kind == "output device" else "source" + self.backend.set_default_device(rollback_kind, change.previous_target) + self.pending_change = None + self.refresh_state() + self.set_status( + f"Restored previous {change.device_kind}.", + speak_message=True, + ) + except Exception as error: + self.pending_change = None + self.set_status(f"Failed to restore previous device: {error}", speak_message=True) + + def activate_selected_device(self): + if self.pending_change is not None: + self.confirm_pending_change() + return + + if self.focus_index == 2: + candidate = self.get_selected_output_device() + if candidate is None: + return + if candidate["id"] == self.default_output_id: + self.speak(f"Already using {candidate['name']}") + return + self.start_pending_device_change("sink", self.default_output_name, candidate) + elif self.focus_index == 3: + candidate = self.get_selected_input_device() + if candidate is None: + return + if candidate["id"] == self.default_input_id: + self.speak(f"Already using {candidate['name']}") + return + self.start_pending_device_change("source", self.default_input_name, candidate) + else: + self.announce_current_focus() + + def draw(self): + self.stdscr.clear() + self.stdscr.addstr(1, 2, "Stormux Audio Manager", curses.A_BOLD) + + output_device = self.get_selected_output_device() + input_device = self.get_selected_input_device() + rows = [ + f"Output Volume: {format_percent_text(self.output_volume)}", + f"Microphone Volume: {format_percent_text(self.input_volume)}", + f"Output Device: {'' if output_device is None else output_device['name']}", + f"Microphone Device: {'' if input_device is None else input_device['name']}", + ] + + for row_index, row_text in enumerate(rows): + attribute = curses.A_REVERSE if row_index == self.focus_index else curses.A_NORMAL + self.stdscr.addstr(4 + row_index, 2, row_text, attribute) + + help_text = "Left/Right: Move Up/Down: Change Space: Speak Enter: Apply or Confirm H: Help Q/Esc: Exit" + self.stdscr.addstr(10, 2, help_text) + + if self.pending_change is not None: + seconds_left = max( + 0, + self.pending_change.timeout_seconds - int(time.time() - self.pending_change.started_at), + ) + self.stdscr.addstr( + 12, + 2, + f"Pending {self.pending_change.device_kind}: confirm within {seconds_left} seconds.", + curses.A_BOLD, + ) + + if self.status_message: + self.stdscr.addstr(14, 2, self.status_message) + + self.stdscr.refresh() + + def cleanup(self): + self.feedback.stop_sound() + if self.speech_client is not None: + try: + self.speech_client.close() + except Exception: + pass + if self.stdscr is not None: + try: + curses.nocbreak() + self.stdscr.keypad(False) + curses.echo() + curses.endwin() + except Exception: + pass + + def run(self): + try: + self.init_speech() + self.refresh_state() + + self.stdscr = curses.initscr() + curses.noecho() + curses.cbreak() + self.stdscr.keypad(True) + self.stdscr.timeout(250) + try: + curses.curs_set(0) + except Exception: + pass + + self.speak("Stormux audio manager") + + while True: + if pending_change_expired(self.pending_change): + self.rollback_pending_change() + + self.draw() + key = self.stdscr.getch() + + if key == -1: + continue + if key == curses.KEY_LEFT: + self.move_focus(-1) + elif key == curses.KEY_RIGHT: + self.move_focus(1) + elif key == curses.KEY_UP: + self.handle_up() + elif key == curses.KEY_DOWN: + self.handle_down() + elif key in (10, 13, curses.KEY_ENTER): + self.activate_selected_device() + elif key == ord(" "): + self.announce_current_focus() + elif key in (ord("h"), ord("H")): + self.speak(HELP_TEXT, interrupt=False) + elif key in (ord("q"), ord("Q"), 27): + self.rollback_pending_change() + break + finally: + self.cleanup() + + +def main(): + try: + AudioManagerApp().run() + except Exception as error: + print(f"Audio manager could not start: {error}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/usr/local/bin/game_launcher.py b/usr/local/bin/game_launcher.py index f2e8171..a6e3e13 100755 --- a/usr/local/bin/game_launcher.py +++ b/usr/local/bin/game_launcher.py @@ -47,6 +47,7 @@ class VoicedMenu: # Default settings self.speechRate = 0 # Normal speech rate (0 is default in speechd) + self.speechPitch = 0 # Normal speech pitch self.volume = 50 # Default volume level # Load settings @@ -97,8 +98,9 @@ class VoicedMenu: self.speechClient.set_priority(speechd.Priority.IMPORTANT) self.speechClient.set_punctuation(speechd.PunctuationMode.SOME) - # Apply speech rate from settings + # Apply speech settings from saved values self.speechClient.set_rate(self.speechRate) + self.speechClient.set_pitch(self.speechPitch) except Exception as e: print(f"Could not initialize speech: {e}") # Fallback to None - the speak method will handle this @@ -116,6 +118,7 @@ class VoicedMenu: # Load speech settings if 'Speech' in self.config: self.speechRate = self.config.getint('Speech', 'rate', fallback=0) + self.speechPitch = self.config.getint('Speech', 'pitch', fallback=0) # Load volume settings if 'Volume' in self.config: @@ -134,6 +137,7 @@ class VoicedMenu: self.config['Speech'] = {} self.config['Speech']['rate'] = str(self.speechRate) + self.config['Speech']['pitch'] = str(self.speechPitch) # Save volume settings if 'Volume' not in self.config: @@ -174,6 +178,32 @@ class VoicedMenu: # Save the new setting self.save_settings() + def increase_speech_pitch(self): + """Increase speech pitch""" + self.speechPitch = min(100, self.speechPitch + 10) # Max is 100 + if self.speechClient: + try: + self.speechClient.set_pitch(self.speechPitch) + self.speak(f"Speech pitch: {self.speechPitch}") + except Exception as e: + print(f"Error adjusting speech pitch: {e}") + + # Save the new setting + self.save_settings() + + def decrease_speech_pitch(self): + """Decrease speech pitch""" + self.speechPitch = max(-100, self.speechPitch - 10) # Min is -100 + if self.speechClient: + try: + self.speechClient.set_pitch(self.speechPitch) + self.speak(f"Speech pitch: {self.speechPitch}") + except Exception as e: + print(f"Error adjusting speech pitch: {e}") + + # Save the new setting + self.save_settings() + def get_current_volume(self): """Get the current system volume percentage""" try: @@ -774,6 +804,8 @@ class VoicedMenu: B key: Report battery status. Left bracket: Decrease speech rate. Right bracket: Increase speech rate. + Left brace: Decrease speech pitch. + Right brace: Increase speech pitch. 9 key: Decrease volume. 0 key: Increase volume. Escape: Refresh the menu. @@ -793,7 +825,7 @@ class VoicedMenu: self.stdscr.addstr(1, x, title, curses.A_BOLD) # Draw help line - helpText = "Ãvigate | Enter: Select | H: Help | [ ] Rate | 9 0 Volume | Esc: Refresh" + helpText = "Navigate | Enter: Select | H: Help | [ ] Rate | { } Pitch | 9 0 Volume | Esc: Refresh" x = max(0, w // 2 - len(helpText) // 2) self.stdscr.addstr(3, x, helpText) @@ -826,6 +858,11 @@ class VoicedMenu: rateText = f"Speech Rate: {self.speechRate}" self.stdscr.addstr(h-2, 2, rateText) + # Draw speech pitch indicator + pitchText = f"Pitch: {self.speechPitch}" + pitchX = max(0, w // 2 - len(pitchText) // 2) + self.stdscr.addstr(h-2, pitchX, pitchText) + # Draw volume indicator volumeText = f"Volume: {self.get_current_volume()}%" self.stdscr.addstr(h-2, w-len(volumeText)-2, volumeText) @@ -965,6 +1002,14 @@ class VoicedMenu: self.increase_speech_rate() self.draw_menu() + elif key == ord('{'): # Decrease speech pitch + self.decrease_speech_pitch() + self.draw_menu() + + elif key == ord('}'): # Increase speech pitch + self.increase_speech_pitch() + self.draw_menu() + elif key == ord('9'): # Decrease volume self.decrease_volume() @@ -1147,6 +1192,7 @@ if __name__ == "__main__": menu.add_section("Accessories") menu.add_item("Accessories", "Local IP Address", "/usr/local/bin/ip_info.py local") menu.add_item("Accessories", "Remote IP Address", "/usr/local/bin/ip_info.py remote") + menu.add_item("Accessories", "Sound and Volume", "/usr/local/bin/audio_manager.py") menu.add_item("Accessories", "Web Browser", "GAME=Brave startx") menu.add_item("Accessories", "LibreOffice", lambda: menu.install_and_launch("libreoffice", "gui")) menu.add_item("Accessories", "Thunderbird", lambda: menu.install_and_launch("thunderbird", "gui"))