#!/usr/bin/env python3 import curses import configparser import json import os import re import shutil import subprocess import sys import time from dataclasses import dataclass try: import simpleaudio as sa except Exception: sa = None try: import speechd except Exception: speechd = None FEEDBACK_SOUND = "/usr/share/sounds/stormux/menu_move.wav" TEST_SOUND = "/usr/share/sounds/stormux/menu_select.wav" HELP_TEXT = ( "Use left and right arrows to select a control. " "Use up and down arrows to adjust it. " "Press space to read the current value. " "Press h to hear this help again. " "Press enter to confirm a pending device change or apply the selected device. " "Press escape or q to exit." ) def clamp_percent(value, minimum, maximum): return max(minimum, min(maximum, value)) def format_percent_text(value): return f"{value} percent" def cycle_index(current_index, delta, total_count): if total_count <= 0: return 0 return (current_index + delta) % total_count def pending_change_expired(change, now_value=None): if change is None: return False if now_value is None: now_value = time.time() return (now_value - change.started_at) >= change.timeout_seconds def build_default_command(device_kind, device_name): if device_kind == "sink": return ["pactl", "set-default-sink", str(device_name)] if device_kind == "source": return ["pactl", "set-default-source", str(device_name)] raise ValueError(f"Unsupported device kind: {device_kind}") def build_volume_command(device_kind, device_name, volume_percent): bounded = clamp_percent(volume_percent, 0, 150) if device_kind == "sink": return ["pactl", "set-sink-volume", str(device_name), f"{bounded}%"] if device_kind == "source": return ["pactl", "set-source-volume", str(device_name), f"{bounded}%"] raise ValueError(f"Unsupported device kind: {device_kind}") def is_monitor_source(device_name, description): lowered_name = (device_name or "").lower() lowered_description = (description or "").lower() return lowered_name.endswith(".monitor") or lowered_description.startswith("monitor of ") def extract_devices(payload, exclude_monitors=False): devices = [] for item in payload: device_id = item.get("index") node_name = item.get("name", "") properties = item.get("properties") or {} description = ( properties.get("device.description") or item.get("description") or properties.get("node.description") or node_name or f"Device {device_id}" ) if device_id is None: continue if exclude_monitors and is_monitor_source(node_name, description): continue devices.append( { "id": device_id, "name": description, "node_name": node_name, } ) return devices def parse_percent_volume(output_text): match = re.search(r"(\d+)%", output_text) if not match: raise RuntimeError(f"Unexpected percent volume output: {output_text}") return int(match.group(1)) def parse_fraction_volume(output_text): match = re.search(r"([0-9]*\.?[0-9]+)", output_text) if not match: raise RuntimeError(f"Unexpected fractional volume output: {output_text}") return round(float(match.group(1)) * 100) def build_external_sound_command(file_path): player_candidates = [ ("pw-play", ["pw-play", file_path]), ("paplay", ["paplay", file_path]), ("aplay", ["aplay", file_path]), ("play", ["play", "-q", file_path]), ] for executable, command in player_candidates: if shutil.which(executable) is not None: return command return None def build_feedback_tone_command(): if shutil.which("sox") is None: return None return ["sox", "-nqdV0", "synth", ".1", "tri", "840", "fade", ".04", ".1", ".04"] def find_device_index(devices, *, node_name=None, device_id=None): for index, device in enumerate(devices): if device_id is not None and device["id"] == device_id: return index if node_name is not None and device["node_name"] == node_name: return index return 0 @dataclass class PendingDeviceChange: device_kind: str previous_target: str candidate_target: str candidate_name: str started_at: float = 0.0 timeout_seconds: int = 15 class AudioBackend: def run_command(self, command): return subprocess.run(command, capture_output=True, text=True, check=True) def list_sinks(self): payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sinks"]).stdout) return extract_devices(payload) def list_sources(self): payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sources"]).stdout) return extract_devices(payload, exclude_monitors=True) def get_default_sink_name(self): return self.run_command(["pactl", "get-default-sink"]).stdout.strip() def get_default_source_name(self): return self.run_command(["pactl", "get-default-source"]).stdout.strip() def set_default_device(self, device_kind, device_name): self.run_command(build_default_command(device_kind, device_name)) def set_device_volume(self, device_kind, device_name, percent_value): self.run_command(build_volume_command(device_kind, device_name, percent_value)) def get_device_volume(self, device_kind, device_name): if device_kind == "sink": output = self.run_command(["pactl", "get-sink-volume", str(device_name)]).stdout.strip() elif device_kind == "source": output = self.run_command(["pactl", "get-source-volume", str(device_name)]).stdout.strip() else: raise ValueError(f"Unsupported device kind: {device_kind}") return parse_percent_volume(output) class AudioFeedback: def __init__(self): self.current_sound = None self.current_process = None def stop_sound(self): if self.current_sound is not None and self.current_sound.is_playing(): self.current_sound.stop() self.current_sound = None if self.current_process is not None and self.current_process.poll() is None: self.current_process.terminate() self.current_process = None def play_sound_file(self, file_path): if sa is not None and os.path.exists(file_path): try: self.stop_sound() self.current_sound = sa.WaveObject.from_wave_file(file_path).play() return except Exception: pass external_command = build_external_sound_command(file_path) if external_command is not None: try: self.stop_sound() self.current_process = subprocess.Popen( external_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return except Exception: pass curses.beep() def play_feedback_beep(self): tone_command = build_feedback_tone_command() if tone_command is not None: try: self.stop_sound() self.current_process = subprocess.Popen( tone_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return except Exception: pass self.play_sound_file(FEEDBACK_SOUND) def play_test_beep(self): self.play_sound_file(TEST_SOUND) class AudioManagerApp: def __init__(self): self.backend = AudioBackend() self.feedback = AudioFeedback() self.speech_client = None self.config_dir = os.path.expanduser("~/.config/stormux") self.config_file = os.path.join(self.config_dir, "game_launcher.conf") self.config = configparser.ConfigParser() self.speech_rate = 0 self.speech_pitch = 0 self.stdscr = None self.focus_names = [ "Output Volume", "Microphone Volume", "Output Device", "Microphone Device", ] self.focus_index = 0 self.output_devices = [] self.input_devices = [] self.output_index = 0 self.input_index = 0 self.default_output_id = None self.default_input_id = None self.default_output_name = "" self.default_input_name = "" self.output_volume = 50 self.input_volume = 50 self.pending_change = None self.status_message = "" self.load_speech_settings() def load_speech_settings(self): if not os.path.exists(self.config_file): return try: self.config.read(self.config_file) if "Speech" in self.config: self.speech_rate = self.config.getint("Speech", "rate", fallback=0) self.speech_pitch = self.config.getint("Speech", "pitch", fallback=0) except Exception: pass def init_speech(self): if speechd is None: return self.speech_client = speechd.SSIPClient("audio_manager") self.speech_client.set_priority(speechd.Priority.IMPORTANT) self.speech_client.set_punctuation(speechd.PunctuationMode.SOME) self.speech_client.set_rate(self.speech_rate) self.speech_client.set_pitch(self.speech_pitch) def speak(self, text, interrupt=True): if not self.speech_client: return try: if interrupt: self.speech_client.cancel() self.speech_client.speak(text) except Exception: pass def set_status(self, message, speak_message=False): self.status_message = message if speak_message: self.speak(message) def get_selected_output_device(self): if not self.output_devices: return None return self.output_devices[self.output_index] def get_selected_input_device(self): if not self.input_devices: return None return self.input_devices[self.input_index] def refresh_state(self, preserve_selection=False): previous_output_id = None previous_input_id = None if preserve_selection: selected_output = self.get_selected_output_device() selected_input = self.get_selected_input_device() previous_output_id = None if selected_output is None else selected_output["id"] previous_input_id = None if selected_input is None else selected_input["id"] self.output_devices = self.backend.list_sinks() self.input_devices = self.backend.list_sources() if not self.output_devices: raise RuntimeError("No output devices found.") if not self.input_devices: raise RuntimeError("No microphone devices found.") default_output_name = self.backend.get_default_sink_name() default_input_name = self.backend.get_default_source_name() self.default_output_name = default_output_name self.default_input_name = default_input_name self.output_index = find_device_index( self.output_devices, device_id=previous_output_id if preserve_selection else None, node_name=None if preserve_selection else default_output_name, ) self.input_index = find_device_index( self.input_devices, device_id=previous_input_id if preserve_selection else None, node_name=None if preserve_selection else default_input_name, ) self.default_output_id = self.output_devices[ find_device_index(self.output_devices, node_name=default_output_name) ]["id"] self.default_input_id = self.input_devices[ find_device_index(self.input_devices, node_name=default_input_name) ]["id"] self.output_volume = self.backend.get_device_volume("sink", self.default_output_name) self.input_volume = self.backend.get_device_volume("source", self.default_input_name) def apply_volume_delta(self, delta): try: if self.focus_index == 0: new_value = clamp_percent(self.output_volume + delta, 0, 150) self.backend.set_device_volume("sink", self.default_output_name, new_value) self.output_volume = new_value self.feedback.play_feedback_beep() elif self.focus_index == 1: new_value = clamp_percent(self.input_volume + delta, 0, 150) self.backend.set_device_volume("source", self.default_input_name, new_value) self.input_volume = new_value self.feedback.play_feedback_beep() except Exception as error: self.set_status(f"Volume change failed: {error}", speak_message=True) def announce_current_focus(self): if self.focus_index == 0: self.speak(format_percent_text(self.output_volume)) elif self.focus_index == 1: self.speak(format_percent_text(self.input_volume)) elif self.focus_index == 2: device = self.get_selected_output_device() if device: self.speak(device["name"]) elif self.focus_index == 3: device = self.get_selected_input_device() if device: self.speak(device["name"]) def move_focus(self, delta): self.focus_index = cycle_index(self.focus_index, delta, len(self.focus_names)) self.speak(self.focus_names[self.focus_index]) def handle_up(self): if self.focus_index in (0, 1): self.apply_volume_delta(5) return if self.focus_index == 2 and self.output_devices: self.output_index = cycle_index(self.output_index, -1, len(self.output_devices)) self.speak(self.output_devices[self.output_index]["name"]) elif self.focus_index == 3 and self.input_devices: self.input_index = cycle_index(self.input_index, -1, len(self.input_devices)) self.speak(self.input_devices[self.input_index]["name"]) def handle_down(self): if self.focus_index in (0, 1): self.apply_volume_delta(-5) return if self.focus_index == 2 and self.output_devices: self.output_index = cycle_index(self.output_index, 1, len(self.output_devices)) self.speak(self.output_devices[self.output_index]["name"]) elif self.focus_index == 3 and self.input_devices: self.input_index = cycle_index(self.input_index, 1, len(self.input_devices)) self.speak(self.input_devices[self.input_index]["name"]) def start_pending_device_change(self, device_kind, previous_target, candidate): try: self.backend.set_default_device(device_kind, candidate["node_name"]) self.pending_change = PendingDeviceChange( device_kind="output device" if device_kind == "sink" else "microphone device", previous_target=previous_target, candidate_target=candidate["node_name"], candidate_name=candidate["name"], started_at=time.time(), ) if device_kind == "sink": self.default_output_id = candidate["id"] self.default_output_name = candidate["node_name"] self.output_volume = self.backend.get_device_volume("sink", self.default_output_name) self.feedback.play_test_beep() else: self.default_input_id = candidate["id"] self.default_input_name = candidate["node_name"] self.input_volume = self.backend.get_device_volume("source", self.default_input_name) self.set_status( f"{self.pending_change.device_kind.capitalize()} changed to {candidate['name']}. Press enter within 15 seconds to keep it.", speak_message=True, ) except Exception as error: self.set_status(f"Device change failed: {error}", speak_message=True) def confirm_pending_change(self): if self.pending_change is None: return message = f"{self.pending_change.device_kind.capitalize()} confirmed." self.pending_change = None self.refresh_state() self.set_status(message, speak_message=True) def rollback_pending_change(self): if self.pending_change is None: return try: change = self.pending_change rollback_kind = "sink" if change.device_kind == "output device" else "source" self.backend.set_default_device(rollback_kind, change.previous_target) self.pending_change = None self.refresh_state() self.set_status( f"Restored previous {change.device_kind}.", speak_message=True, ) except Exception as error: self.pending_change = None self.set_status(f"Failed to restore previous device: {error}", speak_message=True) def activate_selected_device(self): if self.pending_change is not None: self.confirm_pending_change() return if self.focus_index == 2: candidate = self.get_selected_output_device() if candidate is None: return if candidate["id"] == self.default_output_id: self.speak(f"Already using {candidate['name']}") return self.start_pending_device_change("sink", self.default_output_name, candidate) elif self.focus_index == 3: candidate = self.get_selected_input_device() if candidate is None: return if candidate["id"] == self.default_input_id: self.speak(f"Already using {candidate['name']}") return self.start_pending_device_change("source", self.default_input_name, candidate) else: self.announce_current_focus() def draw(self): self.stdscr.clear() self.stdscr.addstr(1, 2, "Stormux Audio Manager", curses.A_BOLD) output_device = self.get_selected_output_device() input_device = self.get_selected_input_device() rows = [ f"Output Volume: {format_percent_text(self.output_volume)}", f"Microphone Volume: {format_percent_text(self.input_volume)}", f"Output Device: {'' if output_device is None else output_device['name']}", f"Microphone Device: {'' if input_device is None else input_device['name']}", ] for row_index, row_text in enumerate(rows): attribute = curses.A_REVERSE if row_index == self.focus_index else curses.A_NORMAL self.stdscr.addstr(4 + row_index, 2, row_text, attribute) help_text = "Left/Right: Move Up/Down: Change Space: Speak Enter: Apply or Confirm H: Help Q/Esc: Exit" self.stdscr.addstr(10, 2, help_text) if self.pending_change is not None: seconds_left = max( 0, self.pending_change.timeout_seconds - int(time.time() - self.pending_change.started_at), ) self.stdscr.addstr( 12, 2, f"Pending {self.pending_change.device_kind}: confirm within {seconds_left} seconds.", curses.A_BOLD, ) if self.status_message: self.stdscr.addstr(14, 2, self.status_message) self.stdscr.refresh() def cleanup(self): self.feedback.stop_sound() if self.speech_client is not None: try: self.speech_client.close() except Exception: pass if self.stdscr is not None: try: curses.nocbreak() self.stdscr.keypad(False) curses.echo() curses.endwin() except Exception: pass def run(self): try: self.init_speech() self.refresh_state() self.stdscr = curses.initscr() curses.noecho() curses.cbreak() self.stdscr.keypad(True) self.stdscr.timeout(250) try: curses.curs_set(0) except Exception: pass self.speak("Stormux audio manager, press h for help.") while True: if pending_change_expired(self.pending_change): self.rollback_pending_change() self.draw() key = self.stdscr.getch() if key == -1: continue if key == curses.KEY_LEFT: self.move_focus(-1) elif key == curses.KEY_RIGHT: self.move_focus(1) elif key == curses.KEY_UP: self.handle_up() elif key == curses.KEY_DOWN: self.handle_down() elif key in (10, 13, curses.KEY_ENTER): self.activate_selected_device() elif key == ord(" "): self.announce_current_focus() elif key in (ord("h"), ord("H")): self.speak(HELP_TEXT, interrupt=False) elif key in (ord("q"), ord("Q"), 27): self.rollback_pending_change() break finally: self.cleanup() def main(): try: AudioManagerApp().run() except Exception as error: print(f"Audio manager could not start: {error}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()