606 lines
21 KiB
Python
Executable File
606 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import curses
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
try:
|
|
import simpleaudio as sa
|
|
except Exception:
|
|
sa = None
|
|
|
|
try:
|
|
import speechd
|
|
except Exception:
|
|
speechd = None
|
|
|
|
|
|
FEEDBACK_SOUND = "/usr/share/sounds/stormux/menu_move.wav"
|
|
TEST_SOUND = "/usr/share/sounds/stormux/menu_select.wav"
|
|
HELP_TEXT = (
|
|
"Use left and right arrows to select a control. "
|
|
"Use up and down arrows to adjust it. "
|
|
"Press space to read the current value. "
|
|
"Press h to hear this help again. "
|
|
"Press enter to confirm a pending device change or apply the selected device. "
|
|
"Press escape or q to exit."
|
|
)
|
|
|
|
|
|
def clamp_percent(value, minimum, maximum):
|
|
return max(minimum, min(maximum, value))
|
|
|
|
|
|
def format_percent_text(value):
|
|
return f"{value} percent"
|
|
|
|
|
|
def cycle_index(current_index, delta, total_count):
|
|
if total_count <= 0:
|
|
return 0
|
|
return (current_index + delta) % total_count
|
|
|
|
|
|
def pending_change_expired(change, now_value=None):
|
|
if change is None:
|
|
return False
|
|
if now_value is None:
|
|
now_value = time.time()
|
|
return (now_value - change.started_at) >= change.timeout_seconds
|
|
|
|
|
|
def build_default_command(device_kind, device_name):
|
|
if device_kind == "sink":
|
|
return ["pactl", "set-default-sink", str(device_name)]
|
|
if device_kind == "source":
|
|
return ["pactl", "set-default-source", str(device_name)]
|
|
raise ValueError(f"Unsupported device kind: {device_kind}")
|
|
|
|
|
|
def build_volume_command(device_kind, device_name, volume_percent):
|
|
bounded = clamp_percent(volume_percent, 0, 150)
|
|
if device_kind == "sink":
|
|
return ["pactl", "set-sink-volume", str(device_name), f"{bounded}%"]
|
|
if device_kind == "source":
|
|
return ["pactl", "set-source-volume", str(device_name), f"{bounded}%"]
|
|
raise ValueError(f"Unsupported device kind: {device_kind}")
|
|
|
|
|
|
def is_monitor_source(device_name, description):
|
|
lowered_name = (device_name or "").lower()
|
|
lowered_description = (description or "").lower()
|
|
return lowered_name.endswith(".monitor") or lowered_description.startswith("monitor of ")
|
|
|
|
|
|
def extract_devices(payload, exclude_monitors=False):
|
|
devices = []
|
|
for item in payload:
|
|
device_id = item.get("index")
|
|
node_name = item.get("name", "")
|
|
properties = item.get("properties") or {}
|
|
description = (
|
|
properties.get("device.description")
|
|
or item.get("description")
|
|
or properties.get("node.description")
|
|
or node_name
|
|
or f"Device {device_id}"
|
|
)
|
|
|
|
if device_id is None:
|
|
continue
|
|
|
|
if exclude_monitors and is_monitor_source(node_name, description):
|
|
continue
|
|
|
|
devices.append(
|
|
{
|
|
"id": device_id,
|
|
"name": description,
|
|
"node_name": node_name,
|
|
}
|
|
)
|
|
return devices
|
|
|
|
|
|
def parse_percent_volume(output_text):
|
|
match = re.search(r"(\d+)%", output_text)
|
|
if not match:
|
|
raise RuntimeError(f"Unexpected percent volume output: {output_text}")
|
|
return int(match.group(1))
|
|
|
|
|
|
def parse_fraction_volume(output_text):
|
|
match = re.search(r"([0-9]*\.?[0-9]+)", output_text)
|
|
if not match:
|
|
raise RuntimeError(f"Unexpected fractional volume output: {output_text}")
|
|
return round(float(match.group(1)) * 100)
|
|
|
|
|
|
def build_external_sound_command(file_path):
|
|
player_candidates = [
|
|
("pw-play", ["pw-play", file_path]),
|
|
("paplay", ["paplay", file_path]),
|
|
("aplay", ["aplay", file_path]),
|
|
("play", ["play", "-q", file_path]),
|
|
]
|
|
for executable, command in player_candidates:
|
|
if shutil.which(executable) is not None:
|
|
return command
|
|
return None
|
|
|
|
|
|
def build_feedback_tone_command():
|
|
if shutil.which("sox") is None:
|
|
return None
|
|
return ["sox", "-nqdV0", "synth", ".1", "tri", "840", "fade", ".04", ".1", ".04"]
|
|
|
|
|
|
def find_device_index(devices, *, node_name=None, device_id=None):
|
|
for index, device in enumerate(devices):
|
|
if device_id is not None and device["id"] == device_id:
|
|
return index
|
|
if node_name is not None and device["node_name"] == node_name:
|
|
return index
|
|
return 0
|
|
|
|
|
|
@dataclass
|
|
class PendingDeviceChange:
|
|
device_kind: str
|
|
previous_target: str
|
|
candidate_target: str
|
|
candidate_name: str
|
|
started_at: float = 0.0
|
|
timeout_seconds: int = 15
|
|
|
|
|
|
class AudioBackend:
|
|
def run_command(self, command):
|
|
return subprocess.run(command, capture_output=True, text=True, check=True)
|
|
|
|
def list_sinks(self):
|
|
payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sinks"]).stdout)
|
|
return extract_devices(payload)
|
|
|
|
def list_sources(self):
|
|
payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sources"]).stdout)
|
|
return extract_devices(payload, exclude_monitors=True)
|
|
|
|
def get_default_sink_name(self):
|
|
return self.run_command(["pactl", "get-default-sink"]).stdout.strip()
|
|
|
|
def get_default_source_name(self):
|
|
return self.run_command(["pactl", "get-default-source"]).stdout.strip()
|
|
|
|
def set_default_device(self, device_kind, device_name):
|
|
self.run_command(build_default_command(device_kind, device_name))
|
|
|
|
def set_device_volume(self, device_kind, device_name, percent_value):
|
|
self.run_command(build_volume_command(device_kind, device_name, percent_value))
|
|
|
|
def get_device_volume(self, device_kind, device_name):
|
|
if device_kind == "sink":
|
|
output = self.run_command(["pactl", "get-sink-volume", str(device_name)]).stdout.strip()
|
|
elif device_kind == "source":
|
|
output = self.run_command(["pactl", "get-source-volume", str(device_name)]).stdout.strip()
|
|
else:
|
|
raise ValueError(f"Unsupported device kind: {device_kind}")
|
|
return parse_percent_volume(output)
|
|
|
|
|
|
class AudioFeedback:
|
|
def __init__(self):
|
|
self.current_sound = None
|
|
self.current_process = None
|
|
|
|
def stop_sound(self):
|
|
if self.current_sound is not None and self.current_sound.is_playing():
|
|
self.current_sound.stop()
|
|
self.current_sound = None
|
|
if self.current_process is not None and self.current_process.poll() is None:
|
|
self.current_process.terminate()
|
|
self.current_process = None
|
|
|
|
def play_sound_file(self, file_path):
|
|
if sa is not None and os.path.exists(file_path):
|
|
try:
|
|
self.stop_sound()
|
|
self.current_sound = sa.WaveObject.from_wave_file(file_path).play()
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
external_command = build_external_sound_command(file_path)
|
|
if external_command is not None:
|
|
try:
|
|
self.stop_sound()
|
|
self.current_process = subprocess.Popen(
|
|
external_command,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
curses.beep()
|
|
|
|
def play_feedback_beep(self):
|
|
tone_command = build_feedback_tone_command()
|
|
if tone_command is not None:
|
|
try:
|
|
self.stop_sound()
|
|
self.current_process = subprocess.Popen(
|
|
tone_command,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
self.play_sound_file(FEEDBACK_SOUND)
|
|
|
|
def play_test_beep(self):
|
|
self.play_sound_file(TEST_SOUND)
|
|
|
|
|
|
class AudioManagerApp:
|
|
def __init__(self):
|
|
self.backend = AudioBackend()
|
|
self.feedback = AudioFeedback()
|
|
self.speech_client = None
|
|
self.stdscr = None
|
|
self.focus_names = [
|
|
"Output Volume",
|
|
"Microphone Volume",
|
|
"Output Device",
|
|
"Microphone Device",
|
|
]
|
|
self.focus_index = 0
|
|
self.output_devices = []
|
|
self.input_devices = []
|
|
self.output_index = 0
|
|
self.input_index = 0
|
|
self.default_output_id = None
|
|
self.default_input_id = None
|
|
self.default_output_name = ""
|
|
self.default_input_name = ""
|
|
self.output_volume = 50
|
|
self.input_volume = 50
|
|
self.pending_change = None
|
|
self.status_message = ""
|
|
|
|
def init_speech(self):
|
|
if speechd is None:
|
|
return
|
|
self.speech_client = speechd.SSIPClient("audio_manager")
|
|
self.speech_client.set_priority(speechd.Priority.IMPORTANT)
|
|
self.speech_client.set_punctuation(speechd.PunctuationMode.SOME)
|
|
|
|
def speak(self, text, interrupt=True):
|
|
if not self.speech_client:
|
|
return
|
|
|
|
try:
|
|
if interrupt:
|
|
self.speech_client.cancel()
|
|
self.speech_client.speak(text)
|
|
except Exception:
|
|
pass
|
|
|
|
def set_status(self, message, speak_message=False):
|
|
self.status_message = message
|
|
if speak_message:
|
|
self.speak(message)
|
|
|
|
def get_selected_output_device(self):
|
|
if not self.output_devices:
|
|
return None
|
|
return self.output_devices[self.output_index]
|
|
|
|
def get_selected_input_device(self):
|
|
if not self.input_devices:
|
|
return None
|
|
return self.input_devices[self.input_index]
|
|
|
|
def refresh_state(self, preserve_selection=False):
|
|
previous_output_id = None
|
|
previous_input_id = None
|
|
if preserve_selection:
|
|
selected_output = self.get_selected_output_device()
|
|
selected_input = self.get_selected_input_device()
|
|
previous_output_id = None if selected_output is None else selected_output["id"]
|
|
previous_input_id = None if selected_input is None else selected_input["id"]
|
|
|
|
self.output_devices = self.backend.list_sinks()
|
|
self.input_devices = self.backend.list_sources()
|
|
|
|
if not self.output_devices:
|
|
raise RuntimeError("No output devices found.")
|
|
if not self.input_devices:
|
|
raise RuntimeError("No microphone devices found.")
|
|
|
|
default_output_name = self.backend.get_default_sink_name()
|
|
default_input_name = self.backend.get_default_source_name()
|
|
self.default_output_name = default_output_name
|
|
self.default_input_name = default_input_name
|
|
|
|
self.output_index = find_device_index(
|
|
self.output_devices,
|
|
device_id=previous_output_id if preserve_selection else None,
|
|
node_name=None if preserve_selection else default_output_name,
|
|
)
|
|
self.input_index = find_device_index(
|
|
self.input_devices,
|
|
device_id=previous_input_id if preserve_selection else None,
|
|
node_name=None if preserve_selection else default_input_name,
|
|
)
|
|
|
|
self.default_output_id = self.output_devices[
|
|
find_device_index(self.output_devices, node_name=default_output_name)
|
|
]["id"]
|
|
self.default_input_id = self.input_devices[
|
|
find_device_index(self.input_devices, node_name=default_input_name)
|
|
]["id"]
|
|
|
|
self.output_volume = self.backend.get_device_volume("sink", self.default_output_name)
|
|
self.input_volume = self.backend.get_device_volume("source", self.default_input_name)
|
|
|
|
def apply_volume_delta(self, delta):
|
|
try:
|
|
if self.focus_index == 0:
|
|
new_value = clamp_percent(self.output_volume + delta, 0, 150)
|
|
self.backend.set_device_volume("sink", self.default_output_name, new_value)
|
|
self.output_volume = new_value
|
|
self.feedback.play_feedback_beep()
|
|
elif self.focus_index == 1:
|
|
new_value = clamp_percent(self.input_volume + delta, 0, 150)
|
|
self.backend.set_device_volume("source", self.default_input_name, new_value)
|
|
self.input_volume = new_value
|
|
self.feedback.play_feedback_beep()
|
|
except Exception as error:
|
|
self.set_status(f"Volume change failed: {error}", speak_message=True)
|
|
|
|
def announce_current_focus(self):
|
|
if self.focus_index == 0:
|
|
self.speak(format_percent_text(self.output_volume))
|
|
elif self.focus_index == 1:
|
|
self.speak(format_percent_text(self.input_volume))
|
|
elif self.focus_index == 2:
|
|
device = self.get_selected_output_device()
|
|
if device:
|
|
self.speak(device["name"])
|
|
elif self.focus_index == 3:
|
|
device = self.get_selected_input_device()
|
|
if device:
|
|
self.speak(device["name"])
|
|
|
|
def move_focus(self, delta):
|
|
self.focus_index = cycle_index(self.focus_index, delta, len(self.focus_names))
|
|
self.speak(self.focus_names[self.focus_index])
|
|
|
|
def handle_up(self):
|
|
if self.focus_index in (0, 1):
|
|
self.apply_volume_delta(5)
|
|
return
|
|
|
|
if self.focus_index == 2 and self.output_devices:
|
|
self.output_index = cycle_index(self.output_index, -1, len(self.output_devices))
|
|
self.speak(self.output_devices[self.output_index]["name"])
|
|
elif self.focus_index == 3 and self.input_devices:
|
|
self.input_index = cycle_index(self.input_index, -1, len(self.input_devices))
|
|
self.speak(self.input_devices[self.input_index]["name"])
|
|
|
|
def handle_down(self):
|
|
if self.focus_index in (0, 1):
|
|
self.apply_volume_delta(-5)
|
|
return
|
|
|
|
if self.focus_index == 2 and self.output_devices:
|
|
self.output_index = cycle_index(self.output_index, 1, len(self.output_devices))
|
|
self.speak(self.output_devices[self.output_index]["name"])
|
|
elif self.focus_index == 3 and self.input_devices:
|
|
self.input_index = cycle_index(self.input_index, 1, len(self.input_devices))
|
|
self.speak(self.input_devices[self.input_index]["name"])
|
|
|
|
def start_pending_device_change(self, device_kind, previous_target, candidate):
|
|
try:
|
|
self.backend.set_default_device(device_kind, candidate["node_name"])
|
|
self.pending_change = PendingDeviceChange(
|
|
device_kind="output device" if device_kind == "sink" else "microphone device",
|
|
previous_target=previous_target,
|
|
candidate_target=candidate["node_name"],
|
|
candidate_name=candidate["name"],
|
|
started_at=time.time(),
|
|
)
|
|
|
|
if device_kind == "sink":
|
|
self.default_output_id = candidate["id"]
|
|
self.default_output_name = candidate["node_name"]
|
|
self.output_volume = self.backend.get_device_volume("sink", self.default_output_name)
|
|
self.feedback.play_test_beep()
|
|
else:
|
|
self.default_input_id = candidate["id"]
|
|
self.default_input_name = candidate["node_name"]
|
|
self.input_volume = self.backend.get_device_volume("source", self.default_input_name)
|
|
|
|
self.set_status(
|
|
f"{self.pending_change.device_kind.capitalize()} changed to {candidate['name']}. Press enter within 15 seconds to keep it.",
|
|
speak_message=True,
|
|
)
|
|
except Exception as error:
|
|
self.set_status(f"Device change failed: {error}", speak_message=True)
|
|
|
|
def confirm_pending_change(self):
|
|
if self.pending_change is None:
|
|
return
|
|
|
|
message = f"{self.pending_change.device_kind.capitalize()} confirmed."
|
|
self.pending_change = None
|
|
self.refresh_state()
|
|
self.set_status(message, speak_message=True)
|
|
|
|
def rollback_pending_change(self):
|
|
if self.pending_change is None:
|
|
return
|
|
|
|
try:
|
|
change = self.pending_change
|
|
rollback_kind = "sink" if change.device_kind == "output device" else "source"
|
|
self.backend.set_default_device(rollback_kind, change.previous_target)
|
|
self.pending_change = None
|
|
self.refresh_state()
|
|
self.set_status(
|
|
f"Restored previous {change.device_kind}.",
|
|
speak_message=True,
|
|
)
|
|
except Exception as error:
|
|
self.pending_change = None
|
|
self.set_status(f"Failed to restore previous device: {error}", speak_message=True)
|
|
|
|
def activate_selected_device(self):
|
|
if self.pending_change is not None:
|
|
self.confirm_pending_change()
|
|
return
|
|
|
|
if self.focus_index == 2:
|
|
candidate = self.get_selected_output_device()
|
|
if candidate is None:
|
|
return
|
|
if candidate["id"] == self.default_output_id:
|
|
self.speak(f"Already using {candidate['name']}")
|
|
return
|
|
self.start_pending_device_change("sink", self.default_output_name, candidate)
|
|
elif self.focus_index == 3:
|
|
candidate = self.get_selected_input_device()
|
|
if candidate is None:
|
|
return
|
|
if candidate["id"] == self.default_input_id:
|
|
self.speak(f"Already using {candidate['name']}")
|
|
return
|
|
self.start_pending_device_change("source", self.default_input_name, candidate)
|
|
else:
|
|
self.announce_current_focus()
|
|
|
|
def draw(self):
|
|
self.stdscr.clear()
|
|
self.stdscr.addstr(1, 2, "Stormux Audio Manager", curses.A_BOLD)
|
|
|
|
output_device = self.get_selected_output_device()
|
|
input_device = self.get_selected_input_device()
|
|
rows = [
|
|
f"Output Volume: {format_percent_text(self.output_volume)}",
|
|
f"Microphone Volume: {format_percent_text(self.input_volume)}",
|
|
f"Output Device: {'' if output_device is None else output_device['name']}",
|
|
f"Microphone Device: {'' if input_device is None else input_device['name']}",
|
|
]
|
|
|
|
for row_index, row_text in enumerate(rows):
|
|
attribute = curses.A_REVERSE if row_index == self.focus_index else curses.A_NORMAL
|
|
self.stdscr.addstr(4 + row_index, 2, row_text, attribute)
|
|
|
|
help_text = "Left/Right: Move Up/Down: Change Space: Speak Enter: Apply or Confirm H: Help Q/Esc: Exit"
|
|
self.stdscr.addstr(10, 2, help_text)
|
|
|
|
if self.pending_change is not None:
|
|
seconds_left = max(
|
|
0,
|
|
self.pending_change.timeout_seconds - int(time.time() - self.pending_change.started_at),
|
|
)
|
|
self.stdscr.addstr(
|
|
12,
|
|
2,
|
|
f"Pending {self.pending_change.device_kind}: confirm within {seconds_left} seconds.",
|
|
curses.A_BOLD,
|
|
)
|
|
|
|
if self.status_message:
|
|
self.stdscr.addstr(14, 2, self.status_message)
|
|
|
|
self.stdscr.refresh()
|
|
|
|
def cleanup(self):
|
|
self.feedback.stop_sound()
|
|
if self.speech_client is not None:
|
|
try:
|
|
self.speech_client.close()
|
|
except Exception:
|
|
pass
|
|
if self.stdscr is not None:
|
|
try:
|
|
curses.nocbreak()
|
|
self.stdscr.keypad(False)
|
|
curses.echo()
|
|
curses.endwin()
|
|
except Exception:
|
|
pass
|
|
|
|
def run(self):
|
|
try:
|
|
self.init_speech()
|
|
self.refresh_state()
|
|
|
|
self.stdscr = curses.initscr()
|
|
curses.noecho()
|
|
curses.cbreak()
|
|
self.stdscr.keypad(True)
|
|
self.stdscr.timeout(250)
|
|
try:
|
|
curses.curs_set(0)
|
|
except Exception:
|
|
pass
|
|
|
|
self.speak(
|
|
"Sound and Volume. Use left and right arrows to select, "
|
|
"up and down arrows to adjust, and space to read the current value. "
|
|
"Press h for help."
|
|
)
|
|
|
|
while True:
|
|
if pending_change_expired(self.pending_change):
|
|
self.rollback_pending_change()
|
|
|
|
self.draw()
|
|
key = self.stdscr.getch()
|
|
|
|
if key == -1:
|
|
continue
|
|
if key == curses.KEY_LEFT:
|
|
self.move_focus(-1)
|
|
elif key == curses.KEY_RIGHT:
|
|
self.move_focus(1)
|
|
elif key == curses.KEY_UP:
|
|
self.handle_up()
|
|
elif key == curses.KEY_DOWN:
|
|
self.handle_down()
|
|
elif key in (10, 13, curses.KEY_ENTER):
|
|
self.activate_selected_device()
|
|
elif key == ord(" "):
|
|
self.announce_current_focus()
|
|
elif key in (ord("h"), ord("H")):
|
|
self.speak(HELP_TEXT, interrupt=False)
|
|
elif key in (ord("q"), ord("Q"), 27):
|
|
self.rollback_pending_change()
|
|
break
|
|
finally:
|
|
self.cleanup()
|
|
|
|
|
|
def main():
|
|
try:
|
|
AudioManagerApp().run()
|
|
except Exception as error:
|
|
print(f"Audio manager could not start: {error}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|