Replace deprecated audioop with NumPy and add silent join sounds

- Replace audioop module (removed in Python 3.13) with NumPy
  - Implement _apply_volume() method using NumPy for volume adjustment
  - Update RMS calculation in ducking_sound_received() to use NumPy
  - Add numpy to requirements.txt
- Add silent flag to CachedItemWrapper for join sounds
  - Join sounds now play without channel announcements
  - Regular playlist items still respect announce_current_music config
- Update CLAUDE.md to document changes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Storm Dragon
2025-12-08 21:46:19 -05:00
parent d45d666b4e
commit 70919b0769
3 changed files with 44 additions and 17 deletions

View File

@@ -15,7 +15,7 @@ import sys
import math import math
import signal import signal
import configparser import configparser
import audioop import numpy as np
import subprocess as sp import subprocess as sp
import argparse import argparse
import os.path import os.path
@@ -427,7 +427,7 @@ class MumbleBot:
if sound_config.startswith('http://') or sound_config.startswith('https://'): if sound_config.startswith('http://') or sound_config.startswith('https://'):
# It's a URL # It's a URL
self.log.info(f'bot: Playing join sound URL for {username}: {sound_config}') self.log.info(f'bot: Playing join sound URL for {username}: {sound_config}')
music_wrapper = get_cached_wrapper_from_scrap(type='url', url=sound_config, user='system') music_wrapper = get_cached_wrapper_from_scrap(type='url', url=sound_config, user='system', silent=True)
self.log.info(f'bot: Successfully created music wrapper for {username}') self.log.info(f'bot: Successfully created music wrapper for {username}')
else: else:
# It's a file path - search database for first match # It's a file path - search database for first match
@@ -440,7 +440,7 @@ class MumbleBot:
# Use first match # Use first match
self.log.info(f'bot: Playing join sound file for {username}: {matches[0]["path"]}') self.log.info(f'bot: Playing join sound file for {username}: {matches[0]["path"]}')
music_wrapper = get_cached_wrapper_from_dict(matches[0], 'system') music_wrapper = get_cached_wrapper_from_dict(matches[0], 'system', silent=True)
self.log.info(f'bot: Successfully created music wrapper for {username}') self.log.info(f'bot: Successfully created music wrapper for {username}')
# Add to playlist # Add to playlist
@@ -509,7 +509,8 @@ class MumbleBot:
self.log.info("bot: play music " + music_wrapper.format_debug_string()) self.log.info("bot: play music " + music_wrapper.format_debug_string())
if var.config.getboolean('bot', 'announce_current_music'): # Only announce if configured and not a silent item (e.g., join sounds)
if var.config.getboolean('bot', 'announce_current_music') and not music_wrapper.silent:
self.send_channel_msg(music_wrapper.format_current_playing()) self.send_channel_msg(music_wrapper.format_current_playing())
if var.config.getboolean('debug', 'ffmpeg'): if var.config.getboolean('debug', 'ffmpeg'):
@@ -628,13 +629,13 @@ class MumbleBot:
if not self.on_interrupting and len(raw_music) == self.pcm_buffer_size: if not self.on_interrupting and len(raw_music) == self.pcm_buffer_size:
self.mumble.sound_output.add_sound( self.mumble.sound_output.add_sound(
audioop.mul(raw_music, 2, self.volume_helper.real_volume)) self._apply_volume(raw_music, self.volume_helper.real_volume))
elif self.read_pcm_size == 0: elif self.read_pcm_size == 0:
self.mumble.sound_output.add_sound( self.mumble.sound_output.add_sound(
audioop.mul(self._fadeout(raw_music, self.stereo, fadein=True), 2, self.volume_helper.real_volume)) self._apply_volume(self._fadeout(raw_music, self.stereo, fadein=True), self.volume_helper.real_volume))
elif self.on_interrupting or len(raw_music) < self.pcm_buffer_size: elif self.on_interrupting or len(raw_music) < self.pcm_buffer_size:
self.mumble.sound_output.add_sound( self.mumble.sound_output.add_sound(
audioop.mul(self._fadeout(raw_music, self.stereo, fadein=False), 2, self.volume_helper.real_volume)) self._apply_volume(self._fadeout(raw_music, self.stereo, fadein=False), self.volume_helper.real_volume))
self._cleanup_ffmpeg_process() self._cleanup_ffmpeg_process()
time.sleep(0.1) time.sleep(0.1)
self.on_interrupting = False self.on_interrupting = False
@@ -728,7 +729,9 @@ class MumbleBot:
self.last_volume_cycle_time = time.time() self.last_volume_cycle_time = time.time()
def ducking_sound_received(self, user, sound): def ducking_sound_received(self, user, sound):
rms = audioop.rms(sound.pcm, 2) # Calculate RMS (root mean square) for volume ducking
audio_array = np.frombuffer(sound.pcm, dtype=np.int16)
rms = int(np.sqrt(np.mean(audio_array.astype(np.float64) ** 2)))
self._max_rms = max(rms, self._max_rms) self._max_rms = max(rms, self._max_rms)
if self._display_rms: if self._display_rms:
if rms < self.ducking_threshold: if rms < self.ducking_threshold:
@@ -743,6 +746,27 @@ class MumbleBot:
self.on_ducking = True self.on_ducking = True
self.ducking_release = time.time() + 1 # ducking release after 1s self.ducking_release = time.time() + 1 # ducking release after 1s
def _apply_volume(self, pcm_data, volume):
"""Apply volume adjustment to 16-bit PCM audio data.
Replaces deprecated audioop.mul() with modern NumPy implementation.
Args:
pcm_data: bytes containing 16-bit signed PCM samples
volume: float multiplier (0.0 to 1.0+)
Returns:
bytes with volume-adjusted audio
"""
# Convert bytes to numpy array of 16-bit signed integers
audio_array = np.frombuffer(pcm_data, dtype=np.int16)
# Apply volume (multiply and clip to prevent overflow)
adjusted = np.clip(audio_array * volume, -32768, 32767).astype(np.int16)
# Convert back to bytes
return adjusted.tobytes()
def _fadeout(self, _pcm_data, stereo=False, fadein=False): def _fadeout(self, _pcm_data, stereo=False, fadein=False):
pcm_data = bytearray(_pcm_data) pcm_data = bytearray(_pcm_data)
if stereo: if stereo:

View File

@@ -139,11 +139,12 @@ class MusicCache(dict):
class CachedItemWrapper: class CachedItemWrapper:
def __init__(self, lib, id, type, user): def __init__(self, lib, id, type, user, silent=False):
self.lib = lib self.lib = lib
self.id = id self.id = id
self.user = user self.user = user
self.type = type self.type = type
self.silent = silent # If True, don't announce when playing (for join sounds)
self.log = logging.getLogger("bot") self.log = logging.getLogger("bot")
self.version = 0 self.version = 0
@@ -216,10 +217,10 @@ class CachedItemWrapper:
# Remember!!! Get wrapper functions will automatically add items into the cache! # Remember!!! Get wrapper functions will automatically add items into the cache!
def get_cached_wrapper(item, user): def get_cached_wrapper(item, user, silent=False):
if item: if item:
var.cache[item.id] = item var.cache[item.id] = item
return CachedItemWrapper(var.cache, item.id, item.type, user) return CachedItemWrapper(var.cache, item.id, item.type, user, silent=silent)
return None return None
def get_cached_wrappers(items, user): def get_cached_wrappers(items, user):
@@ -234,12 +235,13 @@ def get_cached_wrapper_from_scrap(**kwargs):
item = var.cache.get_item(**kwargs) item = var.cache.get_item(**kwargs)
if 'user' not in kwargs: if 'user' not in kwargs:
raise KeyError("Which user added this song?") raise KeyError("Which user added this song?")
return CachedItemWrapper(var.cache, item.id, kwargs['type'], kwargs['user']) silent = kwargs.get('silent', False)
return CachedItemWrapper(var.cache, item.id, kwargs['type'], kwargs['user'], silent=silent)
def get_cached_wrapper_from_dict(dict_from_db, user): def get_cached_wrapper_from_dict(dict_from_db, user, silent=False):
if dict_from_db: if dict_from_db:
item = dict_to_item(dict_from_db) item = dict_to_item(dict_from_db)
return get_cached_wrapper(item, user) return get_cached_wrapper(item, user, silent=silent)
return None return None
def get_cached_wrappers_from_dicts(dicts_from_db, user): def get_cached_wrappers_from_dicts(dicts_from_db, user):
@@ -250,10 +252,10 @@ def get_cached_wrappers_from_dicts(dicts_from_db, user):
return items return items
def get_cached_wrapper_by_id(id, user): def get_cached_wrapper_by_id(id, user, silent=False):
item = var.cache.get_item_by_id(id) item = var.cache.get_item_by_id(id)
if item: if item:
return CachedItemWrapper(var.cache, item.id, item.type, user) return CachedItemWrapper(var.cache, item.id, item.type, user, silent=silent)
def get_cached_wrappers_by_tags(tags, user): def get_cached_wrappers_by_tags(tags, user):
items = var.cache.get_items_by_tags(tags) items = var.cache.get_items_by_tags(tags)

View File

@@ -5,4 +5,5 @@ mutagen
requests requests
packaging packaging
pyradios pyradios
opuslib==3.0.1 opuslib==3.0.1
numpy