chore: organize mumbleBot.py

This commit is contained in:
Terry Geng 2020-02-26 18:30:19 +08:00
parent 3e5a48027c
commit cad5dba31f

View File

@ -63,10 +63,12 @@ type : file
user user
""" """
class MumbleBot: class MumbleBot:
version = 5 version = 5
def __init__(self, args): def __init__(self, args):
logging.info("bot: botamusique version %d, starting..." % self.version) logging.info("bot: botamusique version %d, starting..." % self.version)
signal.signal(signal.SIGINT, self.ctrl_caught) signal.signal(signal.SIGINT, self.ctrl_caught)
self.cmd_handle = {} self.cmd_handle = {}
@ -170,7 +172,6 @@ class MumbleBot:
self.on_ducking = False self.on_ducking = False
self.ducking_release = time.time() self.ducking_release = time.time()
if not var.db.has_option("bot", "ducking") and var.config.getboolean("bot", "ducking", fallback=False)\ if not var.db.has_option("bot", "ducking") and var.config.getboolean("bot", "ducking", fallback=False)\
or var.config.getboolean("bot", "ducking"): or var.config.getboolean("bot", "ducking"):
self.is_ducking = True self.is_ducking = True
@ -178,11 +179,13 @@ class MumbleBot:
self.ducking_volume = var.db.getfloat("bot", "ducking_volume", fallback=self.ducking_volume) self.ducking_volume = var.db.getfloat("bot", "ducking_volume", fallback=self.ducking_volume)
self.ducking_threshold = var.config.getfloat("bot", "ducking_threshold", fallback=5000) self.ducking_threshold = var.config.getfloat("bot", "ducking_threshold", fallback=5000)
self.ducking_threshold = var.db.getfloat("bot", "ducking_threshold", fallback=self.ducking_threshold) self.ducking_threshold = var.db.getfloat("bot", "ducking_threshold", fallback=self.ducking_threshold)
self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_SOUNDRECEIVED, self.ducking_sound_received) self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_SOUNDRECEIVED, \
self.ducking_sound_received)
self.mumble.set_receive_sound(True) self.mumble.set_receive_sound(True)
# Set the CTRL+C shortcut # Set the CTRL+C shortcut
def ctrl_caught(self, signal, frame): def ctrl_caught(self, signal, frame):
logging.info( logging.info(
"\nSIGINT caught, quitting, {} more to kill".format(2 - self.nb_exit)) "\nSIGINT caught, quitting, {} more to kill".format(2 - self.nb_exit))
self.exit = True self.exit = True
@ -209,13 +212,21 @@ class MumbleBot:
self.cmd_handle[command] = handle self.cmd_handle[command] = handle
logging.debug("bot: command added: " + command) logging.debug("bot: command added: " + command)
def set_comment(self):
self.mumble.users.myself.comment(var.config.get('bot', 'comment'))
# =======================
# Message
# =======================
# All text send to the chat is analysed by this function # All text send to the chat is analysed by this function
def message_received(self, text): def message_received(self, text):
message = text.message.strip() message = text.message.strip()
user = self.mumble.users[text.actor]['name'] user = self.mumble.users[text.actor]['name']
if var.config.getboolean('commands', 'split_username_at_space'): if var.config.getboolean('commands', 'split_username_at_space'):
# in can you use https://github.com/Natenom/mumblemoderator-module-collection/tree/master/os-suffixes , you want to split the username # in can you use https://github.com/Natenom/mumblemoderator-module-collection/tree/master/os-suffixes ,
# you want to split the username
user = user.split()[0] user = user.split()[0]
if message[0] in var.config.get('commands', 'command_symbol'): if message[0] in var.config.get('commands', 'command_symbol'):
@ -287,19 +298,25 @@ class MumbleBot:
logging.error("bot: command %s failed with error %s:\n" % (command_exc, error_traceback)) logging.error("bot: command %s failed with error %s:\n" % (command_exc, error_traceback))
self.send_msg(constants.strings('error_executing_command', command=command_exc, error=error), text) self.send_msg(constants.strings('error_executing_command', command=command_exc, error=error), text)
def send_msg(self, msg, text=None):
msg = msg.encode('utf-8', 'ignore').decode('utf-8')
# text if the object message, contain information if direct message or channel message
if not text or not text.session:
own_channel = self.mumble.channels[self.mumble.users.myself['channel_id']]
own_channel.send_text_message(msg)
else:
self.mumble.users[text.actor].send_text_message(msg)
@staticmethod def is_admin(self, user):
def is_admin(user):
list_admin = var.config.get('bot', 'admin').rstrip().split(';') list_admin = var.config.get('bot', 'admin').rstrip().split(';')
if user in list_admin: if user in list_admin:
return True return True
else: else:
return False return False
@staticmethod # =======================
def next(): # Launch and Download
logging.debug("bot: Next into the queue") # =======================
return var.playlist.next()
def launch_music(self, index=-1): def launch_music(self, index=-1):
uri = "" uri = ""
@ -461,48 +478,6 @@ class MumbleBot:
var.playlist.update(music, index) var.playlist.update(music, index)
return music return music
def resume(self):
self.is_playing = True
self.is_pause = False
music = var.playlist.current_item()
if music['type'] == 'radio' or self.playhead == 0 or not self.check_item_path_or_remove():
self.launch_music()
return
if var.config.getboolean('debug', 'ffmpeg'):
ffmpeg_debug = "debug"
else:
ffmpeg_debug = "warning"
logging.info("bot: resume music at %.2f seconds" % self.playhead)
uri = ""
if music["type"] == "url":
uri = music['path']
elif music["type"] == "file":
uri = var.config.get('bot', 'music_folder') + \
var.playlist.current_item()["path"]
command = ("ffmpeg", '-v', ffmpeg_debug, '-nostdin', '-ss', "%f" % self.playhead, '-i',
uri, '-ac', '1', '-f', 's16le', '-ar', '48000', '-')
if var.config.getboolean('bot', 'announce_current_music'):
self.send_msg(util.format_current_playing())
logging.info("bot: execute ffmpeg command: " + " ".join(command))
# The ffmpeg process is a thread
# prepare pipe for catching stderr of ffmpeg
pipe_rd, pipe_wd = os.pipe()
util.pipe_no_wait(pipe_rd) # Let the pipe work in non-blocking mode
self.thread_stderr = os.fdopen(pipe_rd)
self.thread = sp.Popen(command, stdout=sp.PIPE, stderr=pipe_wd, bufsize=480)
self.last_volume_cycle_time = time.time()
def async_download_next(self): def async_download_next(self):
# Function start if the next music isn't ready # Function start if the next music isn't ready
# Do nothing in case the next music is already downloaded # Do nothing in case the next music is already downloaded
@ -545,28 +520,9 @@ class MumbleBot:
return True return True
def volume_cycle(self): # =======================
delta = time.time() - self.last_volume_cycle_time # Loop
# =======================
if self.on_ducking and self.ducking_release < time.time():
self._clear_pymumble_soundqueue()
self.on_ducking = False
if delta > 0.001:
if self.is_ducking and self.on_ducking:
self.volume = (self.volume - self.ducking_volume) * math.exp(- delta / 0.2) + self.ducking_volume
else:
self.volume = self.volume_set - (self.volume_set - self.volume) * math.exp(- delta / 0.5)
self.last_volume_cycle_time = time.time()
def ducking_sound_received(self, user, sound):
if audioop.rms(sound.pcm, 2) > self.ducking_threshold:
if self.on_ducking is False:
logging.debug("bot: ducking triggered")
self.on_ducking = True
self.ducking_release = time.time() + 1 # ducking release after 1s
# Main loop of the Bot # Main loop of the Bot
def loop(self): def loop(self):
@ -622,6 +578,36 @@ class MumbleBot:
logging.info("bot: save playlist into database") logging.info("bot: save playlist into database")
var.playlist.save() var.playlist.save()
def volume_cycle(self):
delta = time.time() - self.last_volume_cycle_time
if self.on_ducking and self.ducking_release < time.time():
self._clear_pymumble_soundqueue()
self.on_ducking = False
if delta > 0.001:
if self.is_ducking and self.on_ducking:
self.volume = (self.volume - self.ducking_volume) * math.exp(- delta / 0.2) + self.ducking_volume
else:
self.volume = self.volume_set - (self.volume_set - self.volume) * math.exp(- delta / 0.5)
self.last_volume_cycle_time = time.time()
def ducking_sound_received(self, user, sound):
if audioop.rms(sound.pcm, 2) > self.ducking_threshold:
if self.on_ducking is False:
logging.debug("bot: ducking triggered")
self.on_ducking = True
self.ducking_release = time.time() + 1 # ducking release after 1s
# =======================
# Play Control
# =======================
def next(self):
logging.debug("bot: Next into the queue")
return var.playlist.next()
def clear(self): def clear(self):
# Kill the ffmpeg thread and empty the playlist # Kill the ffmpeg thread and empty the playlist
if self.thread: if self.thread:
@ -653,17 +639,47 @@ class MumbleBot:
self.song_start_at = -1 self.song_start_at = -1
logging.info("bot: music paused at %.2f seconds." % self.playhead) logging.info("bot: music paused at %.2f seconds." % self.playhead)
def set_comment(self): def resume(self):
self.mumble.users.myself.comment(var.config.get('bot', 'comment')) self.is_playing = True
self.is_pause = False
def send_msg(self, msg, text=None): music = var.playlist.current_item()
msg = msg.encode('utf-8', 'ignore').decode('utf-8')
# text if the object message, contain information if direct message or channel message if music['type'] == 'radio' or self.playhead == 0 or not self.check_item_path_or_remove():
if not text or not text.session: self.launch_music()
own_channel = self.mumble.channels[self.mumble.users.myself['channel_id']] return
own_channel.send_text_message(msg)
if var.config.getboolean('debug', 'ffmpeg'):
ffmpeg_debug = "debug"
else: else:
self.mumble.users[text.actor].send_text_message(msg) ffmpeg_debug = "warning"
logging.info("bot: resume music at %.2f seconds" % self.playhead)
uri = ""
if music["type"] == "url":
uri = music['path']
elif music["type"] == "file":
uri = var.config.get('bot', 'music_folder') + \
var.playlist.current_item()["path"]
command = ("ffmpeg", '-v', ffmpeg_debug, '-nostdin', '-ss', "%f" % self.playhead, '-i',
uri, '-ac', '1', '-f', 's16le', '-ar', '48000', '-')
if var.config.getboolean('bot', 'announce_current_music'):
self.send_msg(util.format_current_playing())
logging.info("bot: execute ffmpeg command: " + " ".join(command))
# The ffmpeg process is a thread
# prepare pipe for catching stderr of ffmpeg
pipe_rd, pipe_wd = os.pipe()
util.pipe_no_wait(pipe_rd) # Let the pipe work in non-blocking mode
self.thread_stderr = os.fdopen(pipe_rd)
self.thread = sp.Popen(command, stdout=sp.PIPE, stderr=pipe_wd, bufsize=480)
self.last_volume_cycle_time = time.time()
# TODO: this is a temporary workaround for issue #44 of pymumble. # TODO: this is a temporary workaround for issue #44 of pymumble.
def _clear_pymumble_soundqueue(self): def _clear_pymumble_soundqueue(self):