Hopefully fixed lockup bug.

This commit is contained in:
Storm Dragon
2025-08-16 19:57:19 -04:00
parent 022b9ba048
commit c86921ea65
5 changed files with 228 additions and 111 deletions
+33 -3
View File
@@ -547,8 +547,7 @@ class MumbleBot:
elif self.on_interrupting or len(raw_music) < self.pcm_buffer_size: elif self.on_interrupting or len(raw_music) < self.pcm_buffer_size:
self.mumble.sound_output.add_sound( self.mumble.sound_output.add_sound(
audioop.mul(self._fadeout(raw_music, self.stereo, fadein=False), 2, self.volume_helper.real_volume)) audioop.mul(self._fadeout(raw_music, self.stereo, fadein=False), 2, self.volume_helper.real_volume))
self.thread.kill() self._cleanup_ffmpeg_process()
self.thread = None
time.sleep(0.1) time.sleep(0.1)
self.on_interrupting = False self.on_interrupting = False
else: else:
@@ -557,7 +556,7 @@ class MumbleBot:
time.sleep(0.1) time.sleep(0.1)
if not self.is_pause and not raw_music: if not self.is_pause and not raw_music:
self.thread = None self._cleanup_ffmpeg_process()
# bot is not paused, but ffmpeg thread has gone. # bot is not paused, but ffmpeg thread has gone.
# indicate that last song has finished, or the bot just resumed from pause, or something is wrong. # indicate that last song has finished, or the bot just resumed from pause, or something is wrong.
if self.read_pcm_size < self.pcm_buffer_size \ if self.read_pcm_size < self.pcm_buffer_size \
@@ -703,12 +702,14 @@ class MumbleBot:
def clear(self): def clear(self):
# Kill the ffmpeg thread and empty the playlist # Kill the ffmpeg thread and empty the playlist
self.interrupt() self.interrupt()
self._cleanup_ffmpeg_process() # Ensure proper cleanup
var.playlist.clear() var.playlist.clear()
self.wait_for_ready = False self.wait_for_ready = False
self.log.info("bot: music stopped. playlist trashed.") self.log.info("bot: music stopped. playlist trashed.")
def stop(self): def stop(self):
self.interrupt() self.interrupt()
self._cleanup_ffmpeg_process() # Ensure proper cleanup
self.is_pause = True self.is_pause = True
if len(var.playlist) > 0: if len(var.playlist) > 0:
self.wait_for_ready = True self.wait_for_ready = True
@@ -716,6 +717,34 @@ class MumbleBot:
self.wait_for_ready = False self.wait_for_ready = False
self.log.info("bot: music stopped.") self.log.info("bot: music stopped.")
def _cleanup_ffmpeg_process(self):
"""Properly cleanup FFmpeg process and associated resources"""
if self.thread:
try:
# Check if process is already terminated to avoid double cleanup
if self.thread.poll() is None: # Process is still running
self.thread.terminate() # Send SIGTERM first
try:
self.thread.wait(timeout=2) # Wait up to 2 seconds for graceful exit
except sp.TimeoutExpired:
self.log.warning("bot: FFmpeg process didn't terminate gracefully, killing forcefully")
self.thread.kill() # Force kill if it doesn't terminate
self.thread.wait() # Wait for process to be reaped
else:
# Process already terminated, just wait to reap it
self.thread.wait()
except Exception as e:
self.log.error(f"bot: Error cleaning up FFmpeg process: {e}")
finally:
# Close stderr pipe if it exists
if self.thread_stderr:
try:
self.thread_stderr.close()
self.thread_stderr = None
except Exception as e:
self.log.error(f"bot: Error closing stderr pipe: {e}")
self.thread = None
def interrupt(self): def interrupt(self):
# Kill the ffmpeg thread # Kill the ffmpeg thread
if self.thread: if self.thread:
@@ -728,6 +757,7 @@ class MumbleBot:
def pause(self): def pause(self):
# Kill the ffmpeg thread # Kill the ffmpeg thread
self.interrupt() self.interrupt()
self._cleanup_ffmpeg_process() # Ensure proper cleanup
self.is_pause = True self.is_pause = True
self.song_start_at = -1 self.song_start_at = -1
if len(var.playlist) > 0: if len(var.playlist) > 0:
+151 -92
View File
@@ -208,10 +208,14 @@ class SettingsDatabase:
def get(self, section, option, **kwargs): def get(self, section, option, **kwargs):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?", cursor = conn.cursor()
(section, option)).fetchall() result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?",
conn.close() (section, option)).fetchall()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
if len(result) > 0: if len(result) > 0:
return result[0][0] return result[0][0]
@@ -232,18 +236,26 @@ class SettingsDatabase:
def set(self, section, option, value): def set(self, section, option, value):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
cursor.execute("INSERT OR REPLACE INTO botamusique (section, option, value) " cursor = conn.cursor()
"VALUES (?, ?, ?)", (section, option, value)) cursor.execute("INSERT OR REPLACE INTO botamusique (section, option, value) "
conn.commit() "VALUES (?, ?, ?)", (section, option, value))
conn.close() conn.commit()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
def has_option(self, section, option): def has_option(self, section, option):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?", cursor = conn.cursor()
(section, option)).fetchall() result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?",
conn.close() (section, option)).fetchall()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
if len(result) > 0: if len(result) > 0:
return True return True
else: else:
@@ -251,23 +263,35 @@ class SettingsDatabase:
def remove_option(self, section, option): def remove_option(self, section, option):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
cursor.execute("DELETE FROM botamusique WHERE section=? AND option=?", (section, option)) cursor = conn.cursor()
conn.commit() cursor.execute("DELETE FROM botamusique WHERE section=? AND option=?", (section, option))
conn.close() conn.commit()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
def remove_section(self, section): def remove_section(self, section):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
cursor.execute("DELETE FROM botamusique WHERE section=?", (section,)) cursor = conn.cursor()
conn.commit() cursor.execute("DELETE FROM botamusique WHERE section=?", (section,))
conn.close() conn.commit()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
def items(self, section): def items(self, section):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
results = cursor.execute("SELECT option, value FROM botamusique WHERE section=?", (section,)).fetchall() cursor = conn.cursor()
conn.close() results = cursor.execute("SELECT option, value FROM botamusique WHERE section=?", (section,)).fetchall()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
if len(results) > 0: if len(results) > 0:
return list(map(lambda v: (v[0], v[1]), results)) return list(map(lambda v: (v[0], v[1]), results))
@@ -276,9 +300,13 @@ class SettingsDatabase:
def drop_table(self): def drop_table(self):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
cursor.execute("DROP TABLE botamusique") cursor = conn.cursor()
conn.close() cursor.execute("DROP TABLE botamusique")
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
class MusicDatabase: class MusicDatabase:
@@ -287,66 +315,79 @@ class MusicDatabase:
def insert_music(self, music_dict, _conn=None): def insert_music(self, music_dict, _conn=None):
conn = sqlite3.connect(self.db_path) if _conn is None else _conn conn = sqlite3.connect(self.db_path) if _conn is None else _conn
cursor = conn.cursor() cursor = None
try:
cursor = conn.cursor()
id = music_dict['id'] id = music_dict['id']
title = music_dict['title'] title = music_dict['title']
type = music_dict['type'] type = music_dict['type']
path = music_dict['path'] if 'path' in music_dict else '' path = music_dict['path'] if 'path' in music_dict else ''
keywords = music_dict['keywords'] keywords = music_dict['keywords']
tags_list = list(dict.fromkeys(music_dict['tags'])) tags_list = list(dict.fromkeys(music_dict['tags']))
tags = '' tags = ''
if tags_list: if tags_list:
tags = ",".join(tags_list) + "," tags = ",".join(tags_list) + ","
del music_dict['id'] del music_dict['id']
del music_dict['title'] del music_dict['title']
del music_dict['type'] del music_dict['type']
del music_dict['tags'] del music_dict['tags']
if 'path' in music_dict: if 'path' in music_dict:
del music_dict['path'] del music_dict['path']
del music_dict['keywords'] del music_dict['keywords']
existed = cursor.execute("SELECT 1 FROM music WHERE id=?", (id,)).fetchall() existed = cursor.execute("SELECT 1 FROM music WHERE id=?", (id,)).fetchall()
if len(existed) == 0: if len(existed) == 0:
cursor.execute( cursor.execute(
"INSERT INTO music (id, type, title, metadata, tags, path, keywords) VALUES (?, ?, ?, ?, ?, ?, ?)", "INSERT INTO music (id, type, title, metadata, tags, path, keywords) VALUES (?, ?, ?, ?, ?, ?, ?)",
(id, (id,
type, type,
title, title,
json.dumps(music_dict), json.dumps(music_dict),
tags, tags,
path, path,
keywords)) keywords))
else: else:
cursor.execute("UPDATE music SET type=:type, title=:title, metadata=:metadata, tags=:tags, " cursor.execute("UPDATE music SET type=:type, title=:title, metadata=:metadata, tags=:tags, "
"path=:path, keywords=:keywords WHERE id=:id", "path=:path, keywords=:keywords WHERE id=:id",
{'id': id, {'id': id,
'type': type, 'type': type,
'title': title, 'title': title,
'metadata': json.dumps(music_dict), 'metadata': json.dumps(music_dict),
'tags': tags, 'tags': tags,
'path': path, 'path': path,
'keywords': keywords}) 'keywords': keywords})
finally:
if cursor:
cursor.close()
if not _conn: if not _conn:
conn.commit() conn.commit()
conn.close() conn.close()
def query_music_ids(self, condition: Condition): def query_music_ids(self, condition: Condition):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
results = cursor.execute("SELECT id FROM music WHERE id != 'info' AND %s" % cursor = conn.cursor()
condition.sql(conn), condition.filler).fetchall() results = cursor.execute("SELECT id FROM music WHERE id != 'info' AND %s" %
conn.close() condition.sql(conn), condition.filler).fetchall()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
return list(map(lambda i: i[0], results)) return list(map(lambda i: i[0], results))
def query_all_paths(self): def query_all_paths(self):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
results = cursor.execute("SELECT path FROM music WHERE id != 'info' AND type = 'file'").fetchall() cursor = conn.cursor()
conn.close() results = cursor.execute("SELECT path FROM music WHERE id != 'info' AND type = 'file'").fetchall()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
paths = [] paths = []
for result in results: for result in results:
if result and result[0]: if result and result[0]:
@@ -356,25 +397,33 @@ class MusicDatabase:
def query_all_tags(self): def query_all_tags(self):
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
cursor = conn.cursor() try:
results = cursor.execute("SELECT tags FROM music WHERE id != 'info'").fetchall() cursor = conn.cursor()
tags = [] results = cursor.execute("SELECT tags FROM music WHERE id != 'info'").fetchall()
for result in results: tags = []
for tag in result[0].strip(",").split(","): for result in results:
if tag and tag not in tags: for tag in result[0].strip(",").split(","):
tags.append(tag) if tag and tag not in tags:
conn.close() tags.append(tag)
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
return tags return tags
def query_music_count(self, condition: Condition): def query_music_count(self, condition: Condition):
filler = condition.filler filler = condition.filler
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
condition_str = condition.sql(conn) try:
cursor = conn.cursor() condition_str = condition.sql(conn)
results = cursor.execute("SELECT COUNT(*) FROM music " cursor = conn.cursor()
"WHERE id != 'info' AND %s" % condition_str, filler).fetchall() results = cursor.execute("SELECT COUNT(*) FROM music "
conn.close() "WHERE id != 'info' AND %s" % condition_str, filler).fetchall()
finally:
if 'cursor' in locals():
cursor.close()
conn.close()
return results[0][0] return results[0][0]
@@ -382,10 +431,15 @@ class MusicDatabase:
filler = condition.filler filler = condition.filler
conn = sqlite3.connect(self.db_path) if _conn is None else _conn conn = sqlite3.connect(self.db_path) if _conn is None else _conn
condition_str = condition.sql(conn) cursor = None
cursor = conn.cursor() try:
results = cursor.execute("SELECT id, type, title, metadata, tags, path, keywords FROM music " condition_str = condition.sql(conn)
"WHERE id != 'info' AND %s" % condition_str, filler).fetchall() cursor = conn.cursor()
results = cursor.execute("SELECT id, type, title, metadata, tags, path, keywords FROM music "
"WHERE id != 'info' AND %s" % condition_str, filler).fetchall()
finally:
if cursor:
cursor.close()
if not _conn: if not _conn:
conn.close() conn.close()
@@ -393,9 +447,14 @@ class MusicDatabase:
def _query_music_by_plain_sql_cond(self, sql_cond, _conn=None): def _query_music_by_plain_sql_cond(self, sql_cond, _conn=None):
conn = sqlite3.connect(self.db_path) if _conn is None else _conn conn = sqlite3.connect(self.db_path) if _conn is None else _conn
cursor = conn.cursor() cursor = None
results = cursor.execute("SELECT id, type, title, metadata, tags, path, keywords FROM music " try:
"WHERE id != 'info' AND %s" % sql_cond).fetchall() cursor = conn.cursor()
results = cursor.execute("SELECT id, type, title, metadata, tags, path, keywords FROM music "
"WHERE id != 'info' AND %s" % sql_cond).fetchall()
finally:
if cursor:
cursor.close()
if not _conn: if not _conn:
conn.close() conn.close()
+1
View File
@@ -81,6 +81,7 @@ class CallBacks(dict):
for func in self[callback]: for func in self[callback]:
if callback is PYMUMBLE_CLBK_TEXTMESSAGERECEIVED: if callback is PYMUMBLE_CLBK_TEXTMESSAGERECEIVED:
thr = threading.Thread(target=func, args=pos_parameters) thr = threading.Thread(target=func, args=pos_parameters)
thr.daemon = True # Fix critical memory leak - ensure thread cleanup
thr.start() thr.start()
else: else:
func(*pos_parameters) func(*pos_parameters)
+21 -8
View File
@@ -161,14 +161,27 @@ class SoundOutput:
samples = int(self.encoder_framesize * PYMUMBLE_SAMPLERATE * 2 * self.channels) # number of samples in an encoder frame samples = int(self.encoder_framesize * PYMUMBLE_SAMPLERATE * 2 * self.channels) # number of samples in an encoder frame
self.lock.acquire() self.lock.acquire()
if len(self.pcm) and len(self.pcm[-1]) < samples: try:
initial_offset = samples - len(self.pcm[-1]) # Prevent unbounded buffer growth - limit to 10 seconds of audio
self.pcm[-1] += pcm[:initial_offset] max_buffer_seconds = 10.0
else: max_buffer_frames = int(max_buffer_seconds / self.encoder_framesize)
initial_offset = 0
for i in range(initial_offset, len(pcm), samples): # If buffer is too full, drop oldest frames to prevent memory leak
self.pcm.append(pcm[i:i + samples]) if len(self.pcm) > max_buffer_frames:
self.lock.release() frames_to_drop = len(self.pcm) - max_buffer_frames
self.pcm = self.pcm[frames_to_drop:]
if hasattr(self, 'Log'):
self.Log.warning(f"Audio buffer overflow, dropped {frames_to_drop} frames")
if len(self.pcm) and len(self.pcm[-1]) < samples:
initial_offset = samples - len(self.pcm[-1])
self.pcm[-1] += pcm[:initial_offset]
else:
initial_offset = 0
for i in range(initial_offset, len(pcm), samples):
self.pcm.append(pcm[i:i + samples])
finally:
self.lock.release()
def clear_buffer(self): def clear_buffer(self):
self.lock.acquire() self.lock.acquire()
+22 -8
View File
@@ -312,15 +312,29 @@ def get_media_duration(path):
command = ("ffprobe", "-v", "quiet", "-show_entries", "format=duration", command = ("ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path) "-of", "default=noprint_wrappers=1:nokey=1", path)
process = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE) process = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE)
stdout, stderr = process.communicate()
try: try:
if not stderr: stdout, stderr = process.communicate(timeout=10) # Add timeout to prevent hanging
return float(stdout)
else: try:
if not stderr:
return float(stdout)
else:
return 0
except ValueError:
return 0 return 0
except ValueError: except sp.TimeoutExpired:
process.kill()
process.wait()
return 0 return 0
finally:
# Ensure process is properly cleaned up
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=2)
except sp.TimeoutExpired:
process.kill()
process.wait()
def parse_time(human): def parse_time(human):
@@ -405,9 +419,9 @@ def get_snapshot_version():
ver = "unknown" ver = "unknown"
if os.path.exists(os.path.join(root_dir, ".git")): if os.path.exists(os.path.join(root_dir, ".git")):
try: try:
ret = subprocess.check_output(["git", "describe", "--tags"]).strip() ret = subprocess.check_output(["git", "describe", "--tags"], timeout=5).strip()
ver = ret.decode("utf-8") ver = ret.decode("utf-8")
except (FileNotFoundError, subprocess.CalledProcessError): except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
try: try:
with open(os.path.join(root_dir, ".git/refs/heads/master")) as f: with open(os.path.join(root_dir, ".git/refs/heads/master")) as f:
ver = "g" + f.read()[:7] ver = "g" + f.read()[:7]