Finally got battery monitoring working reliably. This was a hell of a lot harder than it should have been.
This commit is contained in:
@@ -1,32 +1,27 @@
|
||||
[Unit]
|
||||
Description=Battery Monitor for Stormux Gaming Image
|
||||
Documentation=man:battery_monitor.py(1)
|
||||
After=multi-user.target
|
||||
After=multi-user.target sound.target
|
||||
ConditionPathExists=/sys/class/power_supply
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart=/usr/local/bin/battery_monitor.py
|
||||
Restart=always
|
||||
RestartSec=10
|
||||
User=root
|
||||
Group=root
|
||||
RestartSec=30
|
||||
User=stormux
|
||||
Group=stormux
|
||||
|
||||
# Security settings
|
||||
NoNewPrivileges=yes
|
||||
ProtectSystem=strict
|
||||
ProtectHome=yes
|
||||
ProtectKernelTunables=yes
|
||||
ProtectKernelModules=yes
|
||||
ProtectControlGroups=yes
|
||||
ReadWritePaths=/var/log
|
||||
ReadOnlyPaths=/sys/class/power_supply
|
||||
# Allow access to system resources
|
||||
SupplementaryGroups=audio video
|
||||
|
||||
# Allow access to speech and audio
|
||||
SupplementaryGroups=audio
|
||||
|
||||
# Environment
|
||||
# Environment for audio and speech
|
||||
Environment=PYTHONUNBUFFERED=1
|
||||
Environment=XDG_RUNTIME_DIR=/run/user/1000
|
||||
Environment=PULSE_RUNTIME_PATH=/run/user/1000/pulse
|
||||
|
||||
# Allow sudo for shutdown without password
|
||||
# This requires sudoers configuration
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
WantedBy=default.target
|
||||
@@ -13,14 +13,31 @@ import sys
|
||||
import time
|
||||
import subprocess
|
||||
import logging
|
||||
import wave
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
|
||||
# Set up logging
|
||||
try:
|
||||
import speechd
|
||||
import simpleaudio as sa
|
||||
except ImportError as e:
|
||||
print(f"Required module missing: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Set up logging - create log directory if it doesn't exist
|
||||
try:
|
||||
log_dir = Path.home() / '.config' / 'stormux'
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
log_file = log_dir / 'battery_monitor.log'
|
||||
except:
|
||||
# Fallback to /tmp if home directory not accessible
|
||||
log_file = '/tmp/battery_monitor.log'
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('/var/log/battery_monitor.log'),
|
||||
logging.FileHandler(log_file),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
@@ -28,7 +45,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class BatteryMonitor:
|
||||
def __init__(self):
|
||||
# Hardcoded settings - no config file needed
|
||||
# Settings
|
||||
self.enabled = True
|
||||
self.warning_10_percent = True
|
||||
self.warning_5_percent = True
|
||||
@@ -36,8 +53,25 @@ class BatteryMonitor:
|
||||
self.check_interval = 30
|
||||
self.speech_enabled = True
|
||||
|
||||
# Warning state tracking
|
||||
self.warned_10 = False
|
||||
self.warned_5 = False
|
||||
|
||||
# Initialize speech client
|
||||
self.speech_client = None
|
||||
self.init_speech()
|
||||
|
||||
# Ensure config directory exists (handled in logging setup above)
|
||||
|
||||
def init_speech(self):
|
||||
"""Initialize speech-dispatcher client"""
|
||||
try:
|
||||
self.speech_client = speechd.SSIPClient("battery_monitor")
|
||||
self.speech_client.set_priority(speechd.Priority.IMPORTANT)
|
||||
logger.info("Speech client initialized")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize speech: {e}")
|
||||
self.speech_enabled = False
|
||||
|
||||
def has_battery(self):
|
||||
"""Check if system has a real battery"""
|
||||
@@ -96,36 +130,72 @@ class BatteryMonitor:
|
||||
return False
|
||||
|
||||
def speak(self, message):
|
||||
"""Use speech-dispatcher to speak a message"""
|
||||
if not self.speech_enabled:
|
||||
"""Speak message using speech-dispatcher"""
|
||||
if not self.speech_enabled or not self.speech_client:
|
||||
logger.warning("Speech not available")
|
||||
return
|
||||
|
||||
try:
|
||||
subprocess.run(['espeak-ng', '-v', 'en-us', '-a', '200', message], check=True)
|
||||
self.speech_client.cancel()
|
||||
self.speech_client.speak(message)
|
||||
logger.info(f"Speaking: {message}")
|
||||
except Exception as e:
|
||||
logger.error(f"Speech error: {e}")
|
||||
|
||||
def play_urgent_sound(self):
|
||||
"""Play urgent warning sound using SoX"""
|
||||
def generate_urgent_sound(self):
|
||||
"""Generate urgent beep sound in memory"""
|
||||
try:
|
||||
# Blocking sound command as specified
|
||||
cmd = [
|
||||
'play', '-n', 'synth', '2', 'sine', '700:1000', 'sine', '900:1200',
|
||||
'fade', '0', '2', '0', 'remix', '-', 'norm', '-12', 'repeat', '3',
|
||||
'overdrive', 'reverb', 'speed', '4'
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
# Generate urgent beeping sound
|
||||
sample_rate = 44100
|
||||
duration = 0.5
|
||||
frequency = 800
|
||||
|
||||
# Create beep pattern: 3 short beeps
|
||||
beeps = []
|
||||
for _ in range(3):
|
||||
t = np.linspace(0, duration, int(sample_rate * duration))
|
||||
wave_data = np.sin(2 * np.pi * frequency * t)
|
||||
# Add fade in/out
|
||||
fade_samples = int(0.05 * sample_rate)
|
||||
wave_data[:fade_samples] *= np.linspace(0, 1, fade_samples)
|
||||
wave_data[-fade_samples:] *= np.linspace(1, 0, fade_samples)
|
||||
beeps.extend(wave_data)
|
||||
# Add silence between beeps
|
||||
beeps.extend([0] * int(0.2 * sample_rate))
|
||||
|
||||
# Convert to 16-bit integers
|
||||
audio_data = np.array(beeps) * 32767
|
||||
audio_data = audio_data.astype(np.int16)
|
||||
|
||||
return audio_data, sample_rate
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Sound error: {e}")
|
||||
logger.error(f"Sound generation error: {e}")
|
||||
return None, None
|
||||
|
||||
def play_urgent_sound(self):
|
||||
"""Play urgent warning sound"""
|
||||
try:
|
||||
audio_data, sample_rate = self.generate_urgent_sound()
|
||||
if audio_data is not None:
|
||||
play_obj = sa.play_buffer(audio_data, 1, 2, sample_rate)
|
||||
play_obj.wait_done()
|
||||
logger.info("Urgent sound played")
|
||||
except Exception as e:
|
||||
logger.error(f"Sound playback error: {e}")
|
||||
|
||||
def handle_low_battery(self, level):
|
||||
"""Handle low battery warnings and actions"""
|
||||
# Don't give warnings if we're plugged into AC power
|
||||
if self.is_on_ac_power():
|
||||
return
|
||||
|
||||
if level <= 3:
|
||||
if self.shutdown_3_percent:
|
||||
logger.critical(f"Battery at {level}% - initiating shutdown")
|
||||
self.speak("Critical battery level. System shutting down now.")
|
||||
time.sleep(2) # Give speech time to complete
|
||||
subprocess.run(['systemctl', 'poweroff'], check=True)
|
||||
time.sleep(3) # Give speech time to complete
|
||||
subprocess.run(['sudo', 'systemctl', 'poweroff'], check=True)
|
||||
return
|
||||
|
||||
elif level <= 5 and not self.warned_5:
|
||||
@@ -181,11 +251,21 @@ class BatteryMonitor:
|
||||
time.sleep(self.check_interval)
|
||||
|
||||
def main():
|
||||
if os.geteuid() != 0:
|
||||
print("Warning: Running as non-root user. Shutdown functionality will require sudo.")
|
||||
|
||||
monitor = BatteryMonitor()
|
||||
monitor.monitor()
|
||||
try:
|
||||
monitor = BatteryMonitor()
|
||||
monitor.monitor()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Battery monitor stopped")
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error: {e}")
|
||||
sys.exit(1)
|
||||
finally:
|
||||
# Clean up speech client
|
||||
try:
|
||||
if 'monitor' in locals() and monitor.speech_client:
|
||||
monitor.speech_client.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user