Sound manager added. Pitch controls for menu voice bound to { and }.

This commit is contained in:
Storm Dragon
2026-04-15 15:14:57 -04:00
parent 2f96d403d1
commit a33e90dc70
4 changed files with 651 additions and 4 deletions

4
.gitignore vendored
View File

@@ -3,5 +3,5 @@ image/*
*.img
*.img.xz
# Claude AI assistant files
CLAUDE.md
# AI assistant files
AGENTS.md

View File

@@ -288,6 +288,7 @@ AddModule "piper-tts-generic" "sd_generic" "piper-tts-generic.conf"
#AddModule "mary-generic" "sd_generic" "mary-generic.conf"
#AddModule "baratinoo" "sd_baratinoo" "baratinoo.conf"
AddModule "rhvoice" "sd_rhvoice" "rhvoice.conf"
AddModule "viavoice" "sd_viavoice" "viavoice.conf"
AddModule "voxin" "sd_voxin" "voxin.conf"
# The output module testing doesn't actually connect to anything. It

600
usr/local/bin/audio_manager.py Executable file
View File

@@ -0,0 +1,600 @@
#!/usr/bin/env python3
import curses
import json
import os
import re
import shutil
import subprocess
import sys
import time
from dataclasses import dataclass
try:
import simpleaudio as sa
except Exception:
sa = None
try:
import speechd
except Exception:
speechd = None
FEEDBACK_SOUND = "/usr/share/sounds/stormux/menu_move.wav"
TEST_SOUND = "/usr/share/sounds/stormux/menu_select.wav"
HELP_TEXT = (
"Use left and right to move between controls. "
"Use up and down to change the selected control. "
"Press space to hear the current value. "
"Press enter to confirm a pending device change or apply the selected device. "
"Press escape or q to exit."
)
def clamp_percent(value, minimum, maximum):
return max(minimum, min(maximum, value))
def format_percent_text(value):
return f"{value} percent"
def cycle_index(current_index, delta, total_count):
if total_count <= 0:
return 0
return (current_index + delta) % total_count
def pending_change_expired(change, now_value=None):
if change is None:
return False
if now_value is None:
now_value = time.time()
return (now_value - change.started_at) >= change.timeout_seconds
def build_default_command(device_kind, device_name):
if device_kind == "sink":
return ["pactl", "set-default-sink", str(device_name)]
if device_kind == "source":
return ["pactl", "set-default-source", str(device_name)]
raise ValueError(f"Unsupported device kind: {device_kind}")
def build_volume_command(device_kind, device_name, volume_percent):
bounded = clamp_percent(volume_percent, 0, 150)
if device_kind == "sink":
return ["pactl", "set-sink-volume", str(device_name), f"{bounded}%"]
if device_kind == "source":
return ["pactl", "set-source-volume", str(device_name), f"{bounded}%"]
raise ValueError(f"Unsupported device kind: {device_kind}")
def is_monitor_source(device_name, description):
lowered_name = (device_name or "").lower()
lowered_description = (description or "").lower()
return lowered_name.endswith(".monitor") or lowered_description.startswith("monitor of ")
def extract_devices(payload, exclude_monitors=False):
devices = []
for item in payload:
device_id = item.get("index")
node_name = item.get("name", "")
properties = item.get("properties") or {}
description = (
properties.get("device.description")
or item.get("description")
or properties.get("node.description")
or node_name
or f"Device {device_id}"
)
if device_id is None:
continue
if exclude_monitors and is_monitor_source(node_name, description):
continue
devices.append(
{
"id": device_id,
"name": description,
"node_name": node_name,
}
)
return devices
def parse_percent_volume(output_text):
match = re.search(r"(\d+)%", output_text)
if not match:
raise RuntimeError(f"Unexpected percent volume output: {output_text}")
return int(match.group(1))
def parse_fraction_volume(output_text):
match = re.search(r"([0-9]*\.?[0-9]+)", output_text)
if not match:
raise RuntimeError(f"Unexpected fractional volume output: {output_text}")
return round(float(match.group(1)) * 100)
def build_external_sound_command(file_path):
player_candidates = [
("pw-play", ["pw-play", file_path]),
("paplay", ["paplay", file_path]),
("aplay", ["aplay", file_path]),
("play", ["play", "-q", file_path]),
]
for executable, command in player_candidates:
if shutil.which(executable) is not None:
return command
return None
def build_feedback_tone_command():
if shutil.which("sox") is None:
return None
return ["sox", "-nqdV0", "synth", ".1", "tri", "840", "fade", ".04", ".1", ".04"]
def find_device_index(devices, *, node_name=None, device_id=None):
for index, device in enumerate(devices):
if device_id is not None and device["id"] == device_id:
return index
if node_name is not None and device["node_name"] == node_name:
return index
return 0
@dataclass
class PendingDeviceChange:
device_kind: str
previous_target: str
candidate_target: str
candidate_name: str
started_at: float = 0.0
timeout_seconds: int = 15
class AudioBackend:
def run_command(self, command):
return subprocess.run(command, capture_output=True, text=True, check=True)
def list_sinks(self):
payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sinks"]).stdout)
return extract_devices(payload)
def list_sources(self):
payload = json.loads(self.run_command(["pactl", "--format=json", "list", "sources"]).stdout)
return extract_devices(payload, exclude_monitors=True)
def get_default_sink_name(self):
return self.run_command(["pactl", "get-default-sink"]).stdout.strip()
def get_default_source_name(self):
return self.run_command(["pactl", "get-default-source"]).stdout.strip()
def set_default_device(self, device_kind, device_name):
self.run_command(build_default_command(device_kind, device_name))
def set_device_volume(self, device_kind, device_name, percent_value):
self.run_command(build_volume_command(device_kind, device_name, percent_value))
def get_device_volume(self, device_kind, device_name):
if device_kind == "sink":
output = self.run_command(["pactl", "get-sink-volume", str(device_name)]).stdout.strip()
elif device_kind == "source":
output = self.run_command(["pactl", "get-source-volume", str(device_name)]).stdout.strip()
else:
raise ValueError(f"Unsupported device kind: {device_kind}")
return parse_percent_volume(output)
class AudioFeedback:
def __init__(self):
self.current_sound = None
self.current_process = None
def stop_sound(self):
if self.current_sound is not None and self.current_sound.is_playing():
self.current_sound.stop()
self.current_sound = None
if self.current_process is not None and self.current_process.poll() is None:
self.current_process.terminate()
self.current_process = None
def play_sound_file(self, file_path):
if sa is not None and os.path.exists(file_path):
try:
self.stop_sound()
self.current_sound = sa.WaveObject.from_wave_file(file_path).play()
return
except Exception:
pass
external_command = build_external_sound_command(file_path)
if external_command is not None:
try:
self.stop_sound()
self.current_process = subprocess.Popen(
external_command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return
except Exception:
pass
curses.beep()
def play_feedback_beep(self):
tone_command = build_feedback_tone_command()
if tone_command is not None:
try:
self.stop_sound()
self.current_process = subprocess.Popen(
tone_command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return
except Exception:
pass
self.play_sound_file(FEEDBACK_SOUND)
def play_test_beep(self):
self.play_sound_file(TEST_SOUND)
class AudioManagerApp:
def __init__(self):
self.backend = AudioBackend()
self.feedback = AudioFeedback()
self.speech_client = None
self.stdscr = None
self.focus_names = [
"Output Volume",
"Microphone Volume",
"Output Device",
"Microphone Device",
]
self.focus_index = 0
self.output_devices = []
self.input_devices = []
self.output_index = 0
self.input_index = 0
self.default_output_id = None
self.default_input_id = None
self.default_output_name = ""
self.default_input_name = ""
self.output_volume = 50
self.input_volume = 50
self.pending_change = None
self.status_message = ""
def init_speech(self):
if speechd is None:
return
self.speech_client = speechd.SSIPClient("audio_manager")
self.speech_client.set_priority(speechd.Priority.IMPORTANT)
self.speech_client.set_punctuation(speechd.PunctuationMode.SOME)
def speak(self, text, interrupt=True):
if not self.speech_client:
return
try:
if interrupt:
self.speech_client.cancel()
self.speech_client.speak(text)
except Exception:
pass
def set_status(self, message, speak_message=False):
self.status_message = message
if speak_message:
self.speak(message)
def get_selected_output_device(self):
if not self.output_devices:
return None
return self.output_devices[self.output_index]
def get_selected_input_device(self):
if not self.input_devices:
return None
return self.input_devices[self.input_index]
def refresh_state(self, preserve_selection=False):
previous_output_id = None
previous_input_id = None
if preserve_selection:
selected_output = self.get_selected_output_device()
selected_input = self.get_selected_input_device()
previous_output_id = None if selected_output is None else selected_output["id"]
previous_input_id = None if selected_input is None else selected_input["id"]
self.output_devices = self.backend.list_sinks()
self.input_devices = self.backend.list_sources()
if not self.output_devices:
raise RuntimeError("No output devices found.")
if not self.input_devices:
raise RuntimeError("No microphone devices found.")
default_output_name = self.backend.get_default_sink_name()
default_input_name = self.backend.get_default_source_name()
self.default_output_name = default_output_name
self.default_input_name = default_input_name
self.output_index = find_device_index(
self.output_devices,
device_id=previous_output_id if preserve_selection else None,
node_name=None if preserve_selection else default_output_name,
)
self.input_index = find_device_index(
self.input_devices,
device_id=previous_input_id if preserve_selection else None,
node_name=None if preserve_selection else default_input_name,
)
self.default_output_id = self.output_devices[
find_device_index(self.output_devices, node_name=default_output_name)
]["id"]
self.default_input_id = self.input_devices[
find_device_index(self.input_devices, node_name=default_input_name)
]["id"]
self.output_volume = self.backend.get_device_volume("sink", self.default_output_name)
self.input_volume = self.backend.get_device_volume("source", self.default_input_name)
def apply_volume_delta(self, delta):
try:
if self.focus_index == 0:
new_value = clamp_percent(self.output_volume + delta, 0, 150)
self.backend.set_device_volume("sink", self.default_output_name, new_value)
self.output_volume = new_value
self.feedback.play_feedback_beep()
elif self.focus_index == 1:
new_value = clamp_percent(self.input_volume + delta, 0, 150)
self.backend.set_device_volume("source", self.default_input_name, new_value)
self.input_volume = new_value
self.feedback.play_feedback_beep()
except Exception as error:
self.set_status(f"Volume change failed: {error}", speak_message=True)
def announce_current_focus(self):
if self.focus_index == 0:
self.speak(format_percent_text(self.output_volume))
elif self.focus_index == 1:
self.speak(format_percent_text(self.input_volume))
elif self.focus_index == 2:
device = self.get_selected_output_device()
if device:
self.speak(device["name"])
elif self.focus_index == 3:
device = self.get_selected_input_device()
if device:
self.speak(device["name"])
def move_focus(self, delta):
self.focus_index = cycle_index(self.focus_index, delta, len(self.focus_names))
self.speak(self.focus_names[self.focus_index])
def handle_up(self):
if self.focus_index in (0, 1):
self.apply_volume_delta(5)
return
if self.focus_index == 2 and self.output_devices:
self.output_index = cycle_index(self.output_index, -1, len(self.output_devices))
self.speak(self.output_devices[self.output_index]["name"])
elif self.focus_index == 3 and self.input_devices:
self.input_index = cycle_index(self.input_index, -1, len(self.input_devices))
self.speak(self.input_devices[self.input_index]["name"])
def handle_down(self):
if self.focus_index in (0, 1):
self.apply_volume_delta(-5)
return
if self.focus_index == 2 and self.output_devices:
self.output_index = cycle_index(self.output_index, 1, len(self.output_devices))
self.speak(self.output_devices[self.output_index]["name"])
elif self.focus_index == 3 and self.input_devices:
self.input_index = cycle_index(self.input_index, 1, len(self.input_devices))
self.speak(self.input_devices[self.input_index]["name"])
def start_pending_device_change(self, device_kind, previous_target, candidate):
try:
self.backend.set_default_device(device_kind, candidate["node_name"])
self.pending_change = PendingDeviceChange(
device_kind="output device" if device_kind == "sink" else "microphone device",
previous_target=previous_target,
candidate_target=candidate["node_name"],
candidate_name=candidate["name"],
started_at=time.time(),
)
if device_kind == "sink":
self.default_output_id = candidate["id"]
self.default_output_name = candidate["node_name"]
self.output_volume = self.backend.get_device_volume("sink", self.default_output_name)
self.feedback.play_test_beep()
else:
self.default_input_id = candidate["id"]
self.default_input_name = candidate["node_name"]
self.input_volume = self.backend.get_device_volume("source", self.default_input_name)
self.set_status(
f"{self.pending_change.device_kind.capitalize()} changed to {candidate['name']}. Press enter within 15 seconds to keep it.",
speak_message=True,
)
except Exception as error:
self.set_status(f"Device change failed: {error}", speak_message=True)
def confirm_pending_change(self):
if self.pending_change is None:
return
message = f"{self.pending_change.device_kind.capitalize()} confirmed."
self.pending_change = None
self.refresh_state()
self.set_status(message, speak_message=True)
def rollback_pending_change(self):
if self.pending_change is None:
return
try:
change = self.pending_change
rollback_kind = "sink" if change.device_kind == "output device" else "source"
self.backend.set_default_device(rollback_kind, change.previous_target)
self.pending_change = None
self.refresh_state()
self.set_status(
f"Restored previous {change.device_kind}.",
speak_message=True,
)
except Exception as error:
self.pending_change = None
self.set_status(f"Failed to restore previous device: {error}", speak_message=True)
def activate_selected_device(self):
if self.pending_change is not None:
self.confirm_pending_change()
return
if self.focus_index == 2:
candidate = self.get_selected_output_device()
if candidate is None:
return
if candidate["id"] == self.default_output_id:
self.speak(f"Already using {candidate['name']}")
return
self.start_pending_device_change("sink", self.default_output_name, candidate)
elif self.focus_index == 3:
candidate = self.get_selected_input_device()
if candidate is None:
return
if candidate["id"] == self.default_input_id:
self.speak(f"Already using {candidate['name']}")
return
self.start_pending_device_change("source", self.default_input_name, candidate)
else:
self.announce_current_focus()
def draw(self):
self.stdscr.clear()
self.stdscr.addstr(1, 2, "Stormux Audio Manager", curses.A_BOLD)
output_device = self.get_selected_output_device()
input_device = self.get_selected_input_device()
rows = [
f"Output Volume: {format_percent_text(self.output_volume)}",
f"Microphone Volume: {format_percent_text(self.input_volume)}",
f"Output Device: {'' if output_device is None else output_device['name']}",
f"Microphone Device: {'' if input_device is None else input_device['name']}",
]
for row_index, row_text in enumerate(rows):
attribute = curses.A_REVERSE if row_index == self.focus_index else curses.A_NORMAL
self.stdscr.addstr(4 + row_index, 2, row_text, attribute)
help_text = "Left/Right: Move Up/Down: Change Space: Speak Enter: Apply or Confirm H: Help Q/Esc: Exit"
self.stdscr.addstr(10, 2, help_text)
if self.pending_change is not None:
seconds_left = max(
0,
self.pending_change.timeout_seconds - int(time.time() - self.pending_change.started_at),
)
self.stdscr.addstr(
12,
2,
f"Pending {self.pending_change.device_kind}: confirm within {seconds_left} seconds.",
curses.A_BOLD,
)
if self.status_message:
self.stdscr.addstr(14, 2, self.status_message)
self.stdscr.refresh()
def cleanup(self):
self.feedback.stop_sound()
if self.speech_client is not None:
try:
self.speech_client.close()
except Exception:
pass
if self.stdscr is not None:
try:
curses.nocbreak()
self.stdscr.keypad(False)
curses.echo()
curses.endwin()
except Exception:
pass
def run(self):
try:
self.init_speech()
self.refresh_state()
self.stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
self.stdscr.keypad(True)
self.stdscr.timeout(250)
try:
curses.curs_set(0)
except Exception:
pass
self.speak("Stormux audio manager")
while True:
if pending_change_expired(self.pending_change):
self.rollback_pending_change()
self.draw()
key = self.stdscr.getch()
if key == -1:
continue
if key == curses.KEY_LEFT:
self.move_focus(-1)
elif key == curses.KEY_RIGHT:
self.move_focus(1)
elif key == curses.KEY_UP:
self.handle_up()
elif key == curses.KEY_DOWN:
self.handle_down()
elif key in (10, 13, curses.KEY_ENTER):
self.activate_selected_device()
elif key == ord(" "):
self.announce_current_focus()
elif key in (ord("h"), ord("H")):
self.speak(HELP_TEXT, interrupt=False)
elif key in (ord("q"), ord("Q"), 27):
self.rollback_pending_change()
break
finally:
self.cleanup()
def main():
try:
AudioManagerApp().run()
except Exception as error:
print(f"Audio manager could not start: {error}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -47,6 +47,7 @@ class VoicedMenu:
# Default settings
self.speechRate = 0 # Normal speech rate (0 is default in speechd)
self.speechPitch = 0 # Normal speech pitch
self.volume = 50 # Default volume level
# Load settings
@@ -97,8 +98,9 @@ class VoicedMenu:
self.speechClient.set_priority(speechd.Priority.IMPORTANT)
self.speechClient.set_punctuation(speechd.PunctuationMode.SOME)
# Apply speech rate from settings
# Apply speech settings from saved values
self.speechClient.set_rate(self.speechRate)
self.speechClient.set_pitch(self.speechPitch)
except Exception as e:
print(f"Could not initialize speech: {e}")
# Fallback to None - the speak method will handle this
@@ -116,6 +118,7 @@ class VoicedMenu:
# Load speech settings
if 'Speech' in self.config:
self.speechRate = self.config.getint('Speech', 'rate', fallback=0)
self.speechPitch = self.config.getint('Speech', 'pitch', fallback=0)
# Load volume settings
if 'Volume' in self.config:
@@ -134,6 +137,7 @@ class VoicedMenu:
self.config['Speech'] = {}
self.config['Speech']['rate'] = str(self.speechRate)
self.config['Speech']['pitch'] = str(self.speechPitch)
# Save volume settings
if 'Volume' not in self.config:
@@ -174,6 +178,32 @@ class VoicedMenu:
# Save the new setting
self.save_settings()
def increase_speech_pitch(self):
"""Increase speech pitch"""
self.speechPitch = min(100, self.speechPitch + 10) # Max is 100
if self.speechClient:
try:
self.speechClient.set_pitch(self.speechPitch)
self.speak(f"Speech pitch: {self.speechPitch}")
except Exception as e:
print(f"Error adjusting speech pitch: {e}")
# Save the new setting
self.save_settings()
def decrease_speech_pitch(self):
"""Decrease speech pitch"""
self.speechPitch = max(-100, self.speechPitch - 10) # Min is -100
if self.speechClient:
try:
self.speechClient.set_pitch(self.speechPitch)
self.speak(f"Speech pitch: {self.speechPitch}")
except Exception as e:
print(f"Error adjusting speech pitch: {e}")
# Save the new setting
self.save_settings()
def get_current_volume(self):
"""Get the current system volume percentage"""
try:
@@ -774,6 +804,8 @@ class VoicedMenu:
B key: Report battery status.
Left bracket: Decrease speech rate.
Right bracket: Increase speech rate.
Left brace: Decrease speech pitch.
Right brace: Increase speech pitch.
9 key: Decrease volume.
0 key: Increase volume.
Escape: Refresh the menu.
@@ -793,7 +825,7 @@ class VoicedMenu:
self.stdscr.addstr(1, x, title, curses.A_BOLD)
# Draw help line
helpText = "Ãvigate | Enter: Select | H: Help | [ ] Rate | 9 0 Volume | Esc: Refresh"
helpText = "Navigate | Enter: Select | H: Help | [ ] Rate | { } Pitch | 9 0 Volume | Esc: Refresh"
x = max(0, w // 2 - len(helpText) // 2)
self.stdscr.addstr(3, x, helpText)
@@ -826,6 +858,11 @@ class VoicedMenu:
rateText = f"Speech Rate: {self.speechRate}"
self.stdscr.addstr(h-2, 2, rateText)
# Draw speech pitch indicator
pitchText = f"Pitch: {self.speechPitch}"
pitchX = max(0, w // 2 - len(pitchText) // 2)
self.stdscr.addstr(h-2, pitchX, pitchText)
# Draw volume indicator
volumeText = f"Volume: {self.get_current_volume()}%"
self.stdscr.addstr(h-2, w-len(volumeText)-2, volumeText)
@@ -965,6 +1002,14 @@ class VoicedMenu:
self.increase_speech_rate()
self.draw_menu()
elif key == ord('{'): # Decrease speech pitch
self.decrease_speech_pitch()
self.draw_menu()
elif key == ord('}'): # Increase speech pitch
self.increase_speech_pitch()
self.draw_menu()
elif key == ord('9'): # Decrease volume
self.decrease_volume()
@@ -1147,6 +1192,7 @@ if __name__ == "__main__":
menu.add_section("Accessories")
menu.add_item("Accessories", "Local IP Address", "/usr/local/bin/ip_info.py local")
menu.add_item("Accessories", "Remote IP Address", "/usr/local/bin/ip_info.py remote")
menu.add_item("Accessories", "Sound and Volume", "/usr/local/bin/audio_manager.py")
menu.add_item("Accessories", "Web Browser", "GAME=Brave startx")
menu.add_item("Accessories", "LibreOffice", lambda: menu.install_and_launch("libreoffice", "gui"))
menu.add_item("Accessories", "Thunderbird", lambda: menu.install_and_launch("thunderbird", "gui"))