""" Sound manager for audio notifications """ import subprocess import platform import json import wave import numpy as np import logging from pathlib import Path from typing import Dict, List, Optional from threading import Thread try: import simpleaudio as sa SIMPLEAUDIO_AVAILABLE = True except ImportError: SIMPLEAUDIO_AVAILABLE = False sa = None from config.settings import SettingsManager class SoundPack: """Represents a sound pack with metadata""" def __init__(self, pack_dir: Path): self.pack_dir = pack_dir self.pack_file = pack_dir / "pack.json" self.metadata = self._load_metadata() def _load_metadata(self) -> Dict: """Load pack metadata from pack.json""" if not self.pack_file.exists(): return { "name": self.pack_dir.name.title(), "description": f"Sound pack: {self.pack_dir.name}", "author": "Unknown", "version": "1.0", "sounds": {} } try: with open(self.pack_file, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError): return { "name": self.pack_dir.name.title(), "description": "Corrupted sound pack", "author": "Unknown", "version": "1.0", "sounds": {} } def get_sound_path(self, event_type: str) -> Optional[Path]: """Get the file path for a specific sound event""" sound_file = self.metadata.get("sounds", {}).get(event_type) if not sound_file: return None sound_path = self.pack_dir / sound_file if sound_path.exists(): return sound_path return None @property def name(self) -> str: return self.metadata.get("name", self.pack_dir.name.title()) @property def description(self) -> str: return self.metadata.get("description", "") @property def author(self) -> str: return self.metadata.get("author", "Unknown") @property def version(self) -> str: return self.metadata.get("version", "1.0") class SoundManager: """Manages audio notifications for Bifrost""" # Standard sound events SOUND_EVENTS = [ "private_message", "direct_message", "mention", "boost", "reply", "favorite", "follow", "unfollow", "post_sent", "post", "timeline_update", "notification", "startup", "shutdown", "success", "error", "expand", "collapse", "autocomplete", "autocomplete_end" ] def __init__(self, settings: SettingsManager): self.settings = settings self.system = platform.system() self.sound_packs = {} self.current_pack = None self.logger = logging.getLogger('bifrost.audio') self.discover_sound_packs() self.load_current_pack() def reload_settings(self): """Reload settings and sound pack""" self.discover_sound_packs() self.load_current_pack() def discover_sound_packs(self): """Discover available sound packs""" sounds_dir = self.settings.get_sounds_dir() sounds_dir.mkdir(parents=True, exist_ok=True) # Create default pack if it doesn't exist self.create_default_pack() # Scan for sound packs self.sound_packs = {} for pack_dir in sounds_dir.iterdir(): if pack_dir.is_dir(): pack = SoundPack(pack_dir) self.sound_packs[pack_dir.name] = pack def create_default_pack(self): """Create default sound pack if it doesn't exist""" default_dir = self.settings.get_sounds_dir() / "default" default_dir.mkdir(parents=True, exist_ok=True) pack_file = default_dir / "pack.json" if not pack_file.exists(): pack_data = { "name": "Default", "description": "Default system sounds", "author": "Bifrost", "version": "1.0", "sounds": { "private_message": "private_message.wav", "direct_message": "direct_message.wav", "mention": "mention.wav", "boost": "boost.wav", "reply": "reply.wav", "favorite": "favorite.wav", "follow": "follow.wav", "unfollow": "unfollow.wav", "post_sent": "post_sent.wav", "post": "post.wav", "timeline_update": "timeline_update.wav", "notification": "notification.wav", "startup": "startup.wav", "shutdown": "shutdown.wav", "success": "success.wav", "error": "error.wav", "expand": "expand.wav", "collapse": "collapse.wav", "autocomplete": "autocomplete.wav", "autocomplete_end": "autocomplete_end.wav" } } with open(pack_file, 'w') as f: json.dump(pack_data, f, indent=2) def load_current_pack(self): """Load the currently selected sound pack""" current_pack_name = self.settings.get('audio', 'sound_pack', 'default') # Handle 'None' pack (no sounds) if current_pack_name.lower() == 'none': self.current_pack = None return if current_pack_name in self.sound_packs: self.current_pack = self.sound_packs[current_pack_name] else: # Fall back to default pack self.current_pack = self.sound_packs.get('default') if self.current_pack: self.settings.set('audio', 'sound_pack', 'default') self.settings.save_settings() def set_current_pack(self, pack_name: str) -> bool: """Set the current sound pack""" if pack_name.lower() == 'none': self.current_pack = None self.settings.set('audio', 'sound_pack', 'None') self.settings.save_settings() return True elif pack_name in self.sound_packs: self.current_pack = self.sound_packs[pack_name] self.settings.set('audio', 'sound_pack', pack_name) self.settings.save_settings() return True return False def get_available_packs(self) -> List[str]: """Get list of available sound pack names""" packs = ['None'] + list(self.sound_packs.keys()) return packs def get_pack_info(self, pack_name: str) -> Optional[Dict]: """Get information about a sound pack""" pack = self.sound_packs.get(pack_name) if pack: return { "name": pack.name, "description": pack.description, "author": pack.author, "version": pack.version } return None def is_sound_enabled(self, event_type: str) -> bool: """Check if sound is enabled for an event type""" # Sounds are enabled if we have a sound pack loaded (not None) return self.current_pack is not None def get_sound_volume(self, event_type: str) -> float: """Get volume setting for an event type""" # Use sound pack volume for all sounds return self.settings.get_float('audio', 'volume', 100) / 100.0 def play_sound(self, file_path: Path, volume_multiplier: float = 1.0): """Play a sound file using simpleaudio or platform-specific fallback""" if not file_path.exists(): return try: # Use the volume multiplier as final volume (already includes sound pack volume) final_volume = volume_multiplier if SIMPLEAUDIO_AVAILABLE: # Use simpleaudio directly (no threading needed for Python library) self._play_with_simpleaudio(file_path, final_volume) else: # Use subprocess in background thread for external commands def _play_async(): self._play_with_subprocess(file_path, final_volume) thread = Thread(target=_play_async, daemon=True) thread.start() except Exception as e: self.logger.error(f"Audio playback failed: {e}") def _play_with_simpleaudio(self, file_path: Path, volume: float): """Play sound using simpleaudio library""" try: # Read the audio file if str(file_path).lower().endswith('.wav'): wave_obj = sa.WaveObject.from_wave_file(str(file_path)) else: # For non-WAV files, we need to convert or use subprocess fallback self._play_with_subprocess(file_path, volume) return # Apply volume by modifying the audio data if volume != 1.0: # Get audio data audio_data = wave_obj.audio_data # Convert to numpy array for volume adjustment if wave_obj.bytes_per_sample == 1: # 8-bit audio audio_array = np.frombuffer(audio_data, dtype=np.uint8) # Convert to signed for math audio_array = audio_array.astype(np.int16) - 128 audio_array = (audio_array * volume).astype(np.int16) # Convert back to unsigned audio_array = (audio_array + 128).clip(0, 255).astype(np.uint8) elif wave_obj.bytes_per_sample == 2: # 16-bit audio audio_array = np.frombuffer(audio_data, dtype=np.int16) audio_array = (audio_array * volume).clip(-32768, 32767).astype(np.int16) else: # Unsupported format, use original data audio_array = audio_data # Create new wave object with modified audio wave_obj = sa.WaveObject( audio_array.tobytes(), wave_obj.num_channels, wave_obj.bytes_per_sample, wave_obj.sample_rate ) # Play the sound asynchronously (simpleaudio handles this) play_obj = wave_obj.play() # simpleaudio plays in background, no need to wait except Exception as e: self.logger.error(f"simpleaudio playback failed: {e}") # Fall back to subprocess method self._play_with_subprocess(file_path, volume) def _play_with_subprocess(self, file_path: Path, volume: float): """Play sound using platform-specific subprocess commands""" try: if self.system == "Linux": # Try to use play with volume control first play_result = subprocess.run(["which", "play"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if play_result.returncode == 0: # Use play with volume control result = subprocess.run(["play", "-v", str(volume), str(file_path)], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if result.returncode != 0: self.logger.error("Play command failed") else: # Fall back to aplay (no volume control) subprocess.run(["aplay", str(file_path)], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) elif self.system == "Windows": subprocess.run(["powershell", "-c", f"(New-Object Media.SoundPlayer '{file_path}').PlaySync()"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) elif self.system == "Darwin": # macOS subprocess.run(["afplay", str(file_path)], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception as e: self.logger.error(f"Subprocess audio playback failed: {e}") def play_event(self, event_type: str): """Play sound for a specific event type""" self.logger.debug(f"play_event() called for: {event_type}") if not self.is_sound_enabled(event_type): self.logger.debug(f"Sound disabled for event: {event_type}") return if not self.current_pack: self.logger.warning("No current sound pack loaded") return sound_path = self.current_pack.get_sound_path(event_type) if sound_path: # Get volume settings notification_vol = self.get_sound_volume(event_type) self.logger.info(f"Playing sound: {event_type} from {sound_path} at volume {notification_vol}") self.play_sound(sound_path, notification_vol) else: self.logger.warning(f"No sound file found for event: {event_type} in current pack") # Try fallback to default pack default_pack = self.sound_packs.get('default') if default_pack and default_pack != self.current_pack: fallback_path = default_pack.get_sound_path(event_type) if fallback_path: self.logger.info(f"Using fallback sound: {event_type} from default pack: {fallback_path}") self.play_sound(fallback_path) else: self.logger.error(f"No fallback sound available for event: {event_type}") else: self.logger.error(f"No sound available for event: {event_type}") def play_private_message(self): """Play private message sound""" self.play_event("private_message") def play_direct_message(self): """Play direct message sound""" self.play_event("direct_message") def play_mention(self): """Play mention sound""" self.play_event("mention") def play_boost(self): """Play boost sound""" self.play_event("boost") def play_reply(self): """Play reply sound""" self.play_event("reply") def play_favorite(self): """Play favorite sound""" self.play_event("favorite") def play_follow(self): """Play follow sound""" self.play_event("follow") def play_unfollow(self): """Play unfollow sound""" self.play_event("unfollow") def play_post_sent(self): """Play post sent sound""" self.play_event("post_sent") def play_post(self): """Play post sound""" self.play_event("post") def play_timeline_update(self): """Play timeline update sound""" self.play_event("timeline_update") def play_notification(self): """Play general notification sound""" self.play_event("notification") def play_startup(self): """Play application startup sound""" self.play_event("startup") def play_shutdown(self): """Play application shutdown sound""" self.play_event("shutdown") def play_success(self): """Play success feedback sound""" self.play_event("success") def play_error(self): """Play error feedback sound""" self.play_event("error") def play_expand(self): """Play thread expand sound""" self.play_event("expand") def play_collapse(self): """Play thread collapse sound""" self.play_event("collapse") def play_autocomplete(self): """Play autocomplete available sound""" self.play_event("autocomplete") def play_autocomplete_end(self): """Play autocomplete ended sound""" self.play_event("autocomplete_end") def test_sound(self, event_type: str): """Test play a specific sound type""" self.play_event(event_type)