#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
import sys
import signal
import configparser
import audioop
import subprocess as sp
import argparse
import os.path
import pymumble.pymumble_py3 as pymumble
import interface
import variables as var
import hashlib
import youtube_dl
import logging
import util
import base64
from PIL import Image
from io import BytesIO
from mutagen.easyid3 import EasyID3
import re
import media.url
import media.file
import media.playlist
import media.radio
import media.system
from librb import radiobrowser
"""
FORMAT OF A MUSIC INTO THE PLAYLIST
type : url
url
title
path
duration
thundnail
user
ready (validation, no, downloading, yes)
from_playlist (yes,no)
playlist_title
playlist_url
type : radio
url
name
current_title
user
type : file
path
title
duration
user
"""
version = 2
class MumbleBot:
def __init__(self, args):
signal.signal(signal.SIGINT, self.ctrl_caught)
self.volume = var.config.getfloat('bot', 'volume')
if db.has_option('bot', 'volume'):
self.volume = var.db.getfloat('bot', 'volume')
self.channel = args.channel
# Set specific format for the log
FORMAT = '%(asctime)s: %(message)s'
if args.verbose:
logging.basicConfig(
format=FORMAT, level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S')
logging.debug("Starting in DEBUG loglevel")
elif args.quiet:
logging.basicConfig(
format=FORMAT, level=logging.ERROR, datefmt='%Y-%m-%d %H:%M:%S')
logging.error("Starting in ERROR loglevel")
else:
logging.basicConfig(
format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
logging.info("Starting in INFO loglevel")
# the playlist is... a list (Surprise !!)
var.playlist = []
var.user = args.user
var.music_folder = var.config.get('bot', 'music_folder')
var.is_proxified = var.config.getboolean(
"webinterface", "is_web_proxified")
self.exit = False
self.nb_exit = 0
self.thread = None
self.is_playing = False
if var.config.getboolean("webinterface", "enabled"):
wi_addr = var.config.get("webinterface", "listening_addr")
wi_port = var.config.getint("webinterface", "listening_port")
interface.init_proxy()
tt = threading.Thread(
target=start_web_interface, args=(wi_addr, wi_port))
tt.daemon = True
tt.start()
if args.host:
host = args.host
else:
host = var.config.get("server", "host")
if args.port:
port = args.port
else:
port = var.config.getint("server", "port")
if args.password:
password = args.password
else:
password = var.config.get("server", "password")
if args.certificate:
certificate = args.certificate
else:
certificate = var.config.get("server", "certificate")
if args.tokens:
tokens = args.tokens
else:
tokens = var.config.get("server", "tokens")
tokens = tokens.split(',')
if args.user:
self.username = args.user
else:
self.username = var.config.get("bot", "username")
self.mumble = pymumble.Mumble(host, user=self.username, port=port, password=password, tokens=tokens,
debug=var.config.getboolean('debug', 'mumbleConnection'), certfile=certificate)
self.mumble.callbacks.set_callback(
"text_received", self.message_received)
self.mumble.set_codec_profile("audio")
self.mumble.start() # start the mumble thread
self.mumble.is_ready() # wait for the connection
self.set_comment()
self.mumble.users.myself.unmute() # by sure the user is not muted
if self.channel:
self.mumble.channels.find_by_name(self.channel).move_in()
self.mumble.set_bandwidth(200000)
self.loop()
# Set the CTRL+C shortcut
def ctrl_caught(self, signal, frame):
logging.info(
"\nSIGINT caught, quitting, {} more to kill".format(2 - self.nb_exit))
self.exit = True
self.stop()
if self.nb_exit > 1:
logging.info("Forced Quit")
sys.exit(0)
self.nb_exit += 1
# All text send to the chat is analysed by this function
def message_received(self, text):
message = text.message.strip()
user = self.mumble.users[text.actor]['name']
if var.config.getboolean('command', 'split_username_at_space'):
# in can you use https://github.com/Natenom/mumblemoderator-module-collection/tree/master/os-suffixes , you want to split the username
user = user.split()[0]
if message[0] == var.config.get('command', 'command_symbol'):
# remove the symbol from the message
message = message[1:].split(' ', 1)
# use the first word as a command, the others one as parameters
if len(message) > 0:
command = message[0]
parameter = ''
if len(message) > 1:
parameter = message[1]
else:
return
logging.info(command + ' - ' + parameter + ' by ' + user)
if command == var.config.get('command', 'joinme'):
self.mumble.users.myself.move_in(
self.mumble.users[text.actor]['channel_id'], token=parameter)
return
# Anti stupid guy function
if not self.is_admin(user) and not var.config.getboolean('bot', 'allow_other_channel_message') and self.mumble.users[text.actor]['channel_id'] != self.mumble.users.myself['channel_id']:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'not_in_my_channel'))
return
if not self.is_admin(user) and not var.config.getboolean('bot', 'allow_private_message') and text.session:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'pm_not_allowed'))
return
###
# Admin command
###
for i in var.db.items("user_ban"):
if user.lower() == i[0]:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'user_ban'))
return
if command == var.config.get('command', 'user_ban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_text_message(
util.user_ban(parameter))
else:
self.mumble.users[text.actor].send_text_message(
util.get_user_ban())
else:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'not_admin'))
return
elif command == var.config.get('command', 'user_unban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_text_message(
util.user_unban(parameter))
else:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'not_admin'))
return
elif command == var.config.get('command', 'url_ban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_text_message(
util.url_ban(self.get_url_from_input(parameter)))
else:
self.mumble.users[text.actor].send_text_message(
util.get_url_ban())
else:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'not_admin'))
return
elif command == var.config.get('command', 'url_unban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_text_message(
util.url_unban(self.get_url_from_input(parameter)))
else:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'not_admin'))
return
if parameter:
for i in var.db.items("url_ban"):
if self.get_url_from_input(parameter.lower()) == i[0]:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'url_ban'))
return
###
# everyday commands
###
if command == var.config.get('command', 'play_file') and parameter:
music_folder = var.config.get('bot', 'music_folder')
# sanitize "../" and so on
path = os.path.abspath(os.path.join(music_folder, parameter))
if path.startswith(music_folder):
if os.path.isfile(path):
filename = path.replace(music_folder, '')
music = {'type': 'file',
'path': filename,
'user': user}
var.playlist.append(music)
else:
# try to do a partial match
matches = [file for file in util.get_recursive_filelist_sorted(
music_folder) if parameter.lower() in file.lower()]
if len(matches) == 0:
self.send_msg(var.config.get(
'strings', 'no_file'), text)
elif len(matches) == 1:
music = {'type': 'file',
'path': matches[0],
'user': user}
var.playlist.append(music)
else:
msg = var.config.get(
'strings', 'multiple_matches') + '
'
msg += '
'.join(matches)
self.send_msg(msg, text)
else:
self.send_msg(var.config.get('strings', 'bad_file'), text)
self.async_download_next()
elif command == var.config.get('command', 'play_url') and parameter:
music = {'type': 'url',
# grab the real URL
'url': self.get_url_from_input(parameter),
'user': user,
'ready': 'validation'}
var.playlist.append(music)
if media.url.get_url_info():
if var.playlist[-1]['duration'] > var.config.getint('bot', 'max_track_duration'):
var.playlist.pop()
self.send_msg(var.config.get(
'strings', 'too_long'), text)
else:
for i in var.db.options("url_ban"):
if var.playlist[-1]['url'] == i:
self.mumble.users[text.actor].send_text_message(
var.config.get('strings', 'url_ban'))
var.playlist.pop()
return
var.playlist[-1]['ready'] = "no"
self.async_download_next()
else:
var.playlist.pop()
self.send_msg(var.config.get(
'strings', 'unable_download'), text)
elif command == var.config.get('command', 'play_playlist') and parameter:
offset = 1 # if you want to start the playlist at a specific index
try:
offset = int(parameter.split(" ")[-1])
except ValueError:
pass
if media.playlist.get_playlist_info(url=self.get_url_from_input(parameter), start_index=offset, user=user):
self.async_download_next()
elif command == var.config.get('command', 'play_radio'):
if not parameter:
all_radio = var.config.items('radio')
msg = var.config.get(
'strings', 'preconfigurated_radio') + " :"
for i in all_radio:
comment = ""
if len(i[1].split(maxsplit=1)) == 2:
comment = " - " + i[1].split(maxsplit=1)[1]
msg += "
" + i[0] + comment
self.send_msg(msg, text)
else:
if var.config.has_option('radio', parameter):
parameter = var.config.get('radio', parameter)
parameter = parameter.split()[0]
url = self.get_url_from_input(parameter)
if url:
music = {'type': 'radio',
'url': url,
'user': user}
var.playlist.append(music)
self.async_download_next()
else:
self.send_msg(var.config.get('strings', 'bad_url'))
# query http://www.radio-browser.info API for a radio station
elif command == var.config.get('command', 'rb_query'):
logging.info('Querying radio stations')
if not parameter:
logging.debug('rbquery without parameter')
msg += 'You have to add a query text to search for a specific radio station.'
self.send_msg(msg, text)
else:
logging.debug('Found query parameter: ' + parameter)
self.send_msg('Searching for stations - this may take some seconds...', text)
rb_stations = radiobrowser.getstations_byname(parameter)
msg = var.config.get('strings', 'rbqueryresult') + " :"
msg += '\n
ID | Station Name |
---|---|
' + s['id'] + ' | ' + s['stationname'] + ' |