diff --git a/mumbleBot.py b/mumbleBot.py index 8ec3d5a..f228be1 100644 --- a/mumbleBot.py +++ b/mumbleBot.py @@ -27,7 +27,7 @@ import media.file import media.playlist import media.radio import media.system -from radiobrowser import getstations_byname, geturl_byid +from lib.radiobrowser import getstations_byname, geturl_byid """ FORMAT OF A MUSIC INTO THE PLAYLIST @@ -364,15 +364,15 @@ class MumbleBot: # query http://www.radio-browser.info API for a radio station elif command == var.config.get('command', 'rb_query'): logging.info('Querying radio stations') - msg = var.config.get( - 'strings', 'rbqueryresult') + " :" if not parameter: logging.info('rbquery without parameter') msg += 'You have to add a query text to search for a specific radio station.' self.send_msg(msg, text) else: logging.info('Found query parameter: ' + parameter) + self.send_msg('Searching for stations - this may take some seconds...', text) rb_stations = getstations_byname(parameter) + msg = var.config.get('strings', 'rbqueryresult') + " :" msg += '\n' for s in rb_stations: msg += '' diff --git a/mumbleBot.py.bak b/mumbleBot.py.bak deleted file mode 100644 index 2326f53..0000000 --- a/mumbleBot.py.bak +++ /dev/null @@ -1,732 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import threading -import time -import sys -import signal -import configparser -import audioop -import subprocess as sp -import argparse -import os.path -import pymumble.pymumble_py3 as pymumble -import interface -import variables as var -import hashlib -import youtube_dl -import logging -import util -import base64 -from PIL import Image -from io import BytesIO -from mutagen.easyid3 import EasyID3 -import re -import media.url -import media.file -import media.playlist -import media.radio -import media.system -from radiobrowser import getstations_byname - -""" -FORMAT OF A MUSIC INTO THE PLAYLIST -type : url - url - title - path - duration - thundnail - user - ready (validation, no, downloading, yes) - from_playlist (yes,no) - playlist_title - playlist_url - -type : radio - url - name - current_title - user - -type : file - path - title - duration - user -""" - -version = 2 - - -class MumbleBot: - def __init__(self, args): - signal.signal(signal.SIGINT, self.ctrl_caught) - self.volume = var.config.getfloat('bot', 'volume') - if db.has_option('bot', 'volume'): - self.volume = var.db.getfloat('bot', 'volume') - - self.channel = args.channel - - # Set specific format for the log - FORMAT = '%(asctime)s: %(message)s' - if args.verbose: - logging.basicConfig(format=FORMAT, level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S') - logging.debug("Starting in DEBUG loglevel") - elif args.quiet: - logging.basicConfig(format=FORMAT, level=logging.ERROR, datefmt='%Y-%m-%d %H:%M:%S') - logging.error("Starting in ERROR loglevel") - else: - logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') - logging.info("Starting in INFO loglevel") - - # the playlist is... a list (Surprise !!) - var.playlist = [] - - var.user = args.user - var.music_folder = var.config.get('bot', 'music_folder') - var.is_proxified = var.config.getboolean("webinterface", "is_web_proxified") - self.exit = False - self.nb_exit = 0 - self.thread = None - self.is_playing = False - - if var.config.getboolean("webinterface", "enabled"): - wi_addr = var.config.get("webinterface", "listening_addr") - wi_port = var.config.getint("webinterface", "listening_port") - interface.init_proxy() - tt = threading.Thread(target=start_web_interface, args=(wi_addr, wi_port)) - tt.daemon = True - tt.start() - - if args.host: - host = args.host - else: - host = var.config.get("server", "host") - - if args.port: - port = args.port - else: - port = var.config.getint("server", "port") - - if args.password: - password = args.password - else: - password = var.config.get("server", "password") - - if args.certificate: - certificate = args.certificate - else: - certificate = var.config.get("server", "certificate") - - if args.tokens: - tokens = args.tokens - else: - tokens = var.config.get("server", "tokens") - tokens = tokens.split(',') - - if args.user: - self.username = args.user - else: - self.username = var.config.get("bot", "username") - - self.mumble = pymumble.Mumble(host, user=self.username, port=port, password=password, tokens=tokens, - debug=var.config.getboolean('debug', 'mumbleConnection'), certfile=certificate) - self.mumble.callbacks.set_callback("text_received", self.message_received) - - self.mumble.set_codec_profile("audio") - self.mumble.start() # start the mumble thread - self.mumble.is_ready() # wait for the connection - self.set_comment() - self.mumble.users.myself.unmute() # by sure the user is not muted - if self.channel: - self.mumble.channels.find_by_name(self.channel).move_in() - self.mumble.set_bandwidth(200000) - - self.loop() - - # Set the CTRL+C shortcut - def ctrl_caught(self, signal, frame): - logging.info("\nSIGINT caught, quitting, {} more to kill".format(2 - self.nb_exit)) - self.exit = True - self.stop() - if self.nb_exit > 1: - logging.info("Forced Quit") - sys.exit(0) - self.nb_exit += 1 - - # All text send to the chat is analysed by this function - def message_received(self, text): - - message = text.message.strip() - user = self.mumble.users[text.actor]['name'] - - if var.config.getboolean('command', 'split_username_at_space'): - # in can you use https://github.com/Natenom/mumblemoderator-module-collection/tree/master/os-suffixes , you want to split the username - user = user.split()[0] - - if message[0] == var.config.get('command', 'command_symbol'): - # remove the symbol from the message - message = message[1:].split(' ', 1) - - # use the first word as a command, the others one as parameters - if len(message) > 0: - command = message[0] - parameter = '' - if len(message) > 1: - parameter = message[1] - - else: - return - - logging.info(command + ' - ' + parameter + ' by ' + user) - - if command == var.config.get('command', 'joinme'): - self.mumble.users.myself.move_in(self.mumble.users[text.actor]['channel_id'], token=parameter) - return - - # Anti stupid guy function - if not self.is_admin(user) and not var.config.getboolean('bot', 'allow_other_channel_message') and self.mumble.users[text.actor]['channel_id'] != self.mumble.users.myself['channel_id']: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'not_in_my_channel')) - return - - if not self.is_admin(user) and not var.config.getboolean('bot', 'allow_private_message') and text.session: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'pm_not_allowed')) - return - - ### - # Admin command - ### - for i in var.db.items("user_ban"): - if user.lower() == i[0]: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'user_ban')) - return - - if command == var.config.get('command', 'user_ban'): - if self.is_admin(user): - if parameter: - self.mumble.users[text.actor].send_text_message(util.user_ban(parameter)) - else: - self.mumble.users[text.actor].send_text_message(util.get_user_ban()) - else: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'not_admin')) - return - - elif command == var.config.get('command', 'user_unban'): - if self.is_admin(user): - if parameter: - self.mumble.users[text.actor].send_text_message(util.user_unban(parameter)) - else: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'not_admin')) - return - - elif command == var.config.get('command', 'url_ban'): - if self.is_admin(user): - if parameter: - self.mumble.users[text.actor].send_text_message(util.url_ban(self.get_url_from_input(parameter))) - else: - self.mumble.users[text.actor].send_text_message(util.get_url_ban()) - else: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'not_admin')) - return - - elif command == var.config.get('command', 'url_unban'): - if self.is_admin(user): - if parameter: - self.mumble.users[text.actor].send_text_message(util.url_unban(self.get_url_from_input(parameter))) - else: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'not_admin')) - return - - if parameter: - for i in var.db.items("url_ban"): - if self.get_url_from_input(parameter.lower()) == i[0]: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'url_ban')) - return - - ### - # everyday commands - ### - if command == var.config.get('command', 'play_file') and parameter: - music_folder = var.config.get('bot', 'music_folder') - # sanitize "../" and so on - path = os.path.abspath(os.path.join(music_folder, parameter)) - if path.startswith(music_folder): - if os.path.isfile(path): - filename = path.replace(music_folder, '') - music = {'type': 'file', - 'path': filename, - 'user': user} - var.playlist.append(music) - else: - # try to do a partial match - matches = [file for file in util.get_recursive_filelist_sorted(music_folder) if parameter.lower() in file.lower()] - if len(matches) == 0: - self.send_msg(var.config.get('strings', 'no_file'), text) - elif len(matches) == 1: - music = {'type': 'file', - 'path': matches[0], - 'user': user} - var.playlist.append(music) - else: - msg = var.config.get('strings', 'multiple_matches') + '
' - msg += '
'.join(matches) - self.send_msg(msg, text) - else: - self.send_msg(var.config.get('strings', 'bad_file'), text) - self.async_download_next() - - elif command == var.config.get('command', 'play_url') and parameter: - music = {'type': 'url', - 'url': self.get_url_from_input(parameter), # grab the real URL - 'user': user, - 'ready': 'validation'} - var.playlist.append(music) - - if media.url.get_url_info(): - if var.playlist[-1]['duration'] > var.config.getint('bot', 'max_track_duration'): - var.playlist.pop() - self.send_msg(var.config.get('strings', 'too_long'), text) - else: - for i in var.db.options("url_ban"): - if var.playlist[-1]['url'] == i: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'url_ban')) - var.playlist.pop() - return - var.playlist[-1]['ready'] = "no" - self.async_download_next() - else: - var.playlist.pop() - self.send_msg(var.config.get('strings', 'unable_download'), text) - - elif command == var.config.get('command', 'play_playlist') and parameter: - offset = 1 # if you want to start the playlist at a specific index - try: - offset = int(parameter.split(" ")[-1]) - except ValueError: - pass - if media.playlist.get_playlist_info(url=self.get_url_from_input(parameter), start_index=offset, user=user): - self.async_download_next() - - elif command == var.config.get('command', 'play_radio'): - if not parameter: - all_radio = var.config.items('radio') - msg = var.config.get('strings', 'preconfigurated_radio') + " :" - for i in all_radio: - comment = "" - if len(i[1].split(maxsplit=1)) == 2: - comment = " - " + i[1].split(maxsplit=1)[1] - msg += "
" + i[0] + comment - self.send_msg(msg, text) - else: - if var.config.has_option('radio', parameter): - parameter = var.config.get('radio', parameter) - parameter = parameter.split()[0] - url = self.get_url_from_input(parameter) - if url: - music = {'type': 'radio', - 'url': url, - 'user': user} - var.playlist.append(music) - self.async_download_next() - else: - self.send_msg(var.config.get('strings', 'bad_url')) - elif command == var.config.get('command', 'rb_query'): - logging.info('Querying radio stations') - if not parameter: - logging.info('rbquery without parameter') - else: - logging.info('Found query parameter: ' + parameter) - stations = getstations_byname(parameter) - - - - elif command == var.config.get('command', 'help'): - self.send_msg(var.config.get('strings', 'help'), text) - if self.is_admin(user): - self.send_msg(var.config.get('strings', 'admin_help'), text) - - elif command == var.config.get('command', 'stop'): - self.stop() - - elif command == var.config.get('command', 'kill'): - if self.is_admin(user): - self.stop() - self.exit = True - else: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'not_admin')) - - elif command == var.config.get('command', 'update'): - if self.is_admin(user): - self.mumble.users[text.actor].send_text_message("Starting the update") - # Need to be improved - msg = util.update(version) - self.mumble.users[text.actor].send_text_message(msg) - else: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'not_admin')) - - elif command == var.config.get('command', 'stop_and_getout'): - self.stop() - if self.channel: - self.mumble.channels.find_by_name(self.channel).move_in() - - elif command == var.config.get('command', 'volume'): - # The volume is a percentage - if parameter is not None and parameter.isdigit() and 0 <= int(parameter) <= 100: - self.volume = float(float(parameter) / 100) - self.send_msg(var.config.get('strings', 'change_volume') % ( - int(self.volume * 100), self.mumble.users[text.actor]['name']), text) - var.db.set('bot', 'volume', str(self.volume)) - else: - self.send_msg(var.config.get('strings', 'current_volume') % int(self.volume * 100), text) - - elif command == var.config.get('command', 'current_music'): - if len(var.playlist) > 0: - source = var.playlist[0]["type"] - if source == "radio": - reply = "[radio] {title} on {url} by {user}".format( - title=media.radio.get_radio_title(var.playlist[0]["url"]), - url=var.playlist[0]["title"], - user=var.playlist[0]["user"] - ) - elif source == "url" and 'from_playlist' in var.playlist[0]: - reply = "[playlist] {title} (from the playlist {playlist} by {user}".format( - title=var.playlist[0]["title"], - url=var.playlist[0]["playlist_url"], - playlist=var.playlist[0]["playlist_title"], - user=var.playlist[0]["user"] - ) - elif source == "url": - reply = "[url] {title} ({url}) by {user}".format( - title=var.playlist[0]["title"], - url=var.playlist[0]["url"], - user=var.playlist[0]["user"] - ) - elif source == "file": - reply = "[file] {title} by {user}".format( - title=var.playlist[0]["title"], - user=var.playlist[0]["user"]) - else: - reply = "ERROR" - logging.error(var.playlist) - else: - reply = var.config.get('strings', 'not_playing') - - self.send_msg(reply, text) - - elif command == var.config.get('command', 'skip'): - if parameter is not None and parameter.isdigit() and int(parameter) > 0: # Allow to remove specific music into the queue with a number - if int(parameter) < len(var.playlist): - removed = var.playlist.pop(int(parameter)) - - # the Title isn't here if the music wasn't downloaded - self.send_msg(var.config.get('strings', 'removing_item') % (removed['title'] if 'title' in removed else removed['url']), text) - else: - self.send_msg(var.config.get('strings', 'no_possible'), text) - elif self.next(): # Is no number send, just skip the current music - self.launch_music() - self.async_download_next() - else: - self.send_msg(var.config.get('strings', 'queue_empty'), text) - self.stop() - - elif command == var.config.get('command', 'list'): - folder_path = var.config.get('bot', 'music_folder') - - files = util.get_recursive_filelist_sorted(folder_path) - if files: - self.send_msg('
'.join(files), text) - else: - self.send_msg(var.config.get('strings', 'no_file'), text) - - elif command == var.config.get('command', 'queue'): - if len(var.playlist) <= 1: - msg = var.config.get('strings', 'queue_empty') - else: - msg = var.config.get('strings', 'queue_contents') + '
' - i = 1 - for value in var.playlist[1:]: - msg += '[{}] ({}) {}
'.format(i, value['type'], value['title'] if 'title' in value else value['url']) - i += 1 - - self.send_msg(msg, text) - - elif command == var.config.get('command', 'repeat'): - var.playlist.append(var.playlist[0]) - - else: - self.mumble.users[text.actor].send_text_message(var.config.get('strings', 'bad_command')) - - @staticmethod - def is_admin(user): - list_admin = var.config.get('bot', 'admin').split(';') - if user in list_admin: - return True - else: - return False - - @staticmethod - def next(): - logging.debug("Next into the queue") - if len(var.playlist) > 1: - var.playlist.pop(0) - return True - elif len(var.playlist) == 1: - var.playlist.pop(0) - return False - else: - return False - - def launch_music(self): - uri = "" - logging.debug("launch_music asked" + str(var.playlist[0])) - if var.playlist[0]["type"] == "url": - # Delete older music is the tmp folder is too big - media.system.clear_tmp_folder(var.config.get('bot', 'tmp_folder'), var.config.getint('bot', 'tmp_folder_max_size')) - - # Check if the music is ready to be played - if var.playlist[0]["ready"] == "downloading": - return - elif var.playlist[0]["ready"] != "yes": - logging.info("Current music wasn't ready, Downloading...") - self.download_music(index=0) - - # get the Path - uri = var.playlist[0]['path'] - if os.path.isfile(uri): - audio = EasyID3(uri) - title = "" - if audio["title"]: - title = audio["title"][0] # take the title from the file tag - - path_thumbnail = var.playlist[0]['path'][:-4] + '.jpg' # Remove .mp3 and add .jpg - thumbnail_html = "" - if os.path.isfile(path_thumbnail): - # Create the image message - im = Image.open(path_thumbnail) - im.thumbnail((100, 100), Image.ANTIALIAS) - buffer = BytesIO() - im.save(buffer, format="JPEG") - thumbnail_base64 = base64.b64encode(buffer.getvalue()) - thumbnail_html = '' - - logging.debug("Thunbail data " + thumbnail_html) - if var.config.getboolean('bot', 'announce_current_music'): - self.send_msg(var.config.get('strings', 'now_playing') % (title, thumbnail_html)) - else: - logging.error("Error with the path during launch_music") - pass - - elif var.playlist[0]["type"] == "file": - uri = var.config.get('bot', 'music_folder') + var.playlist[0]["path"] - - elif var.playlist[0]["type"] == "radio": - uri = var.playlist[0]["url"] - title = media.radio.get_radio_server_description(uri) - var.playlist[0]["title"] = title - if var.config.getboolean('bot', 'announce_current_music'): - self.send_msg(var.config.get('strings', 'now_playing') % (title, "URL : " + uri)) - - if var.config.getboolean('debug', 'ffmpeg'): - ffmpeg_debug = "debug" - else: - ffmpeg_debug = "warning" - - command = ("ffmpeg", '-v', ffmpeg_debug, '-nostdin', '-i', uri, '-ac', '1', '-f', 's16le', '-ar', '48000', '-') - logging.info("FFmpeg command : " + " ".join(command)) - self.thread = sp.Popen(command, stdout=sp.PIPE, bufsize=480) # The ffmpeg process is a thread - self.is_playing = True - - def download_music(self, index): - if var.playlist[index]['type'] == 'url' and var.playlist[index]['ready'] == "validation": - if media.url.get_url_info(index=index): - if var.playlist[index]['duration'] > var.config.getint('bot', 'max_track_duration'): - # Check the length, useful in case of playlist, it wasn't checked before) - var.playlist.pop() - logging.info("the music " + var.playlist[index]["url"] + " has a duration of " + var.playlist[index]['duration'] + "s -- too long") - self.send_msg(var.config.get('strings', 'too_long')) - return - else: - var.playlist[index]['ready'] = "no" - else: - var.playlist.pop(index) - logging.error("Error while fetching info from the URL") - self.send_msg(var.config.get('strings', 'unable_download')) - - if var.playlist[index]['type'] == 'url' and var.playlist[index]['ready'] == "no": - # download the music - var.playlist[index]['ready'] = "downloading" - - logging.debug("Download index:" + str(index)) - logging.debug(var.playlist[index]) - - url = var.playlist[index]['url'] - url_hash = hashlib.md5(url.encode()).hexdigest() - - path = var.config.get('bot', 'tmp_folder') + url_hash + ".%(ext)s" - mp3 = path.replace(".%(ext)s", ".mp3") - var.playlist[index]['path'] = mp3 - - # if os.path.isfile(mp3): - # audio = EasyID3(mp3) - # var.playlist[index]['title'] = audio["title"][0] - ydl_opts = "" - - ydl_opts = { - 'format': 'bestaudio/best', - 'outtmpl': path, - 'noplaylist': True, - 'writethumbnail': True, - 'updatetime': False, - 'postprocessors': [{ - 'key': 'FFmpegExtractAudio', - 'preferredcodec': 'mp3', - 'preferredquality': '192'}, - {'key': 'FFmpegMetadata'}] - } - self.send_msg(var.config.get('strings', "download_in_progress") % var.playlist[index]['title']) - - logging.info("Information before start downloading :" + str(var.playlist[index])) - with youtube_dl.YoutubeDL(ydl_opts) as ydl: - for i in range(2): # Always try 2 times - try: - ydl.extract_info(url) - if 'ready' in var.playlist[index] and var.playlist[index]['ready'] == "downloading": - var.playlist[index]['ready'] = "yes" - except youtube_dl.utils.DownloadError: - pass - else: - break - return - - def async_download_next(self): - # Function start if the next music isn't ready - # Do nothing in case the next music is already downloaded - logging.info("Async download next asked") - if len(var.playlist) > 1 and var.playlist[1]['type'] == 'url' and var.playlist[1]['ready'] in ["no", "validation"]: - th = threading.Thread(target=self.download_music, kwargs={'index': 1}) - else: - return - logging.info("Start downloading next in thread") - th.daemon = True - th.start() - - @staticmethod - # Parse the html from the message to get the URL - def get_url_from_input(string): - if string.startswith('http'): - return string - p = re.compile('href="(.+?)"', re.IGNORECASE) - res = re.search(p, string) - if res: - return res.group(1) - else: - return False - - # Main loop of the Bot - def loop(self): - raw_music = "" - while not self.exit and self.mumble.isAlive(): - - while self.mumble.sound_output.get_buffer_size() > 0.5 and not self.exit: - # If the buffer isn't empty, I cannot send new music part, so I wait - time.sleep(0.01) - if self.thread: - # I get raw from ffmpeg thread - raw_music = self.thread.stdout.read(480) - if raw_music: - # Adjust the volume and send it to mumble - self.mumble.sound_output.add_sound(audioop.mul(raw_music, 2, self.volume)) - else: - time.sleep(0.1) - else: - time.sleep(0.1) - - if self.thread is None or not raw_music: - # Not music into the buffet - if self.is_playing: - # get next music - self.is_playing = False - self.next() - if len(var.playlist) > 0: - if var.playlist[0]['type'] in ['radio', 'file'] \ - or (var.playlist[0]['type'] == 'url' and var.playlist[0]['ready'] not in ['validation', 'downloading']): - # Check if the music can be start before launch the music - self.launch_music() - self.async_download_next() - - while self.mumble.sound_output.get_buffer_size() > 0: - # Empty the buffer before exit - time.sleep(0.01) - time.sleep(0.5) - - if self.exit: - # The db is not fixed config like url/user ban and volume - util.write_db() - - def stop(self): - # Kill the ffmpeg thread and empty the playlist - if self.thread: - self.thread.kill() - self.thread = None - var.playlist = [] - self.is_playing = False - - def set_comment(self): - self.mumble.users.myself.comment(var.config.get('bot', 'comment')) - - def send_msg(self, msg, text=None): - # text if the object message, contain information if direct message or channel message - if not text or not text.session: - own_channel = self.mumble.channels[self.mumble.users.myself['channel_id']] - own_channel.send_text_message(msg) - else: - self.mumble.users[text.actor].send_text_message(msg) - - -def start_web_interface(addr, port): - logging.info('Starting web interface on {}:{}'.format(addr, port)) - interface.web.run(port=port, host=addr) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Bot for playing music on Mumble') - - # General arguments - parser.add_argument("--config", dest='config', type=str, default='configuration.ini', help='Load configuration from this file. Default: configuration.ini') - parser.add_argument("--db", dest='db', type=str, default='db.ini', help='database file. Default db.ini') - - parser.add_argument("-q", "--quiet", dest="quiet", action="store_true", help="Only Error logs") - parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Show debug log") - - # Mumble arguments - parser.add_argument("-s", "--server", dest="host", type=str, help="Hostname of the Mumble server") - parser.add_argument("-u", "--user", dest="user", type=str, help="Username for the bot") - parser.add_argument("-P", "--password", dest="password", type=str, help="Server password, if required") - parser.add_argument("-T", "--tokens", dest="tokens", type=str, help="Server tokens, if required") - parser.add_argument("-p", "--port", dest="port", type=int, help="Port for the Mumble server") - parser.add_argument("-c", "--channel", dest="channel", type=str, help="Default channel for the bot") - parser.add_argument("-C", "--cert", dest="certificate", type=str, default=None, help="Certificate file") - - args = parser.parse_args() - var.dbfile = args.db - config = configparser.ConfigParser(interpolation=None, allow_no_value=True) - parsed_configs = config.read(['configuration.default.ini', args.config], encoding='latin-1') - - db = configparser.ConfigParser(interpolation=None, allow_no_value=True, delimiters='²') - db.read(var.dbfile, encoding='latin-1') - - if 'url_ban' not in db.sections(): - db.add_section('url_ban') - if 'bot' not in db.sections(): - db.add_section('bot') - if 'user_ban' not in db.sections(): - db.add_section('user_ban') - - if len(parsed_configs) == 0: - logging.error('Could not read configuration from file \"{}\"'.format(args.config), file=sys.stderr) - sys.exit() - - var.config = config - var.db = db - botamusique = MumbleBot(args) diff --git a/radiobrowser.py b/radiobrowser.py deleted file mode 100644 index 2318cbe..0000000 --- a/radiobrowser.py +++ /dev/null @@ -1,27 +0,0 @@ -from rbRadios import RadioBrowser - -rb = RadioBrowser() - -def getstations_byname(query): - results = rb.stations_byname(query) - stations = [] - for st in results: - try: - # url = rb.playable_station(st['id'])['url'] - station = {'stationname': st['name'], 'id':st['id']} - stations.append(station) - except: - pass - return stations - -def geturl_byid(id): - url = rb.playable_station(id)['url'] - if url != None: - return url - else: - return "-1" - - -if __name__ == "__main__": - r = getstations_byname('r.sh') - pass \ No newline at end of file diff --git a/rbConstants.py b/rbConstants.py deleted file mode 100644 index bd0f121..0000000 --- a/rbConstants.py +++ /dev/null @@ -1,16 +0,0 @@ -BASE_URL = "http://www.radio-browser.info/webservice/" - -endpoints = { - "countries": {1: "{fmt}/countries", 2: "{fmt}/countries/{filter}"}, - "codecs": {1: "{fmt}/codecs", 2: "{fmt}/codecs/{filter}"}, - "states": { - 1: "{fmt}/states", - 2: "{fmt}/states/{filter}", - 3: "{fmt}/states/{country}/{filter}", - }, - "languages": {1: "{fmt}/languages", 2: "{fmt}/languages/{filter}"}, - "tags": {1: "{fmt}/tags", 2: "{fmt}/tags/{filter}"}, - "stations": {1: "{fmt}/stations", 3: "{fmt}/stations/{by}/{search_term}"}, - "playable_station": {3: "{ver}/{fmt}/url/{station_id}"}, - "station_search": {1: "{fmt}/stations/search"}, -} diff --git a/rbRadios.py b/rbRadios.py deleted file mode 100644 index 7fc5bd4..0000000 --- a/rbRadios.py +++ /dev/null @@ -1,183 +0,0 @@ -import requests - -from xml.etree import ElementTree -from urllib.parse import urljoin - -from rbConstants import endpoints, BASE_URL - - -def request(endpoint, **kwargs): - - fmt = kwargs.get("format", "json") - - if fmt == "xml": - content_type = f"application/{fmt}" - else: - content_type = f"application/{fmt}" - - headers = {"content-type": content_type, "User-Agent": "pyradios/dev"} - - params = kwargs.get("params", {}) - - url = BASE_URL + endpoint - - resp = requests.get(url, headers=headers, params=params) - - if resp.status_code == 200: - if fmt == "xml": - return resp.text - return resp.json() - - return resp.raise_for_status() - - -class EndPointBuilder: - def __init__(self, fmt="json"): - self.fmt = fmt - self._option = None - self._endpoint = None - - @property - def endpoint(self): - return endpoints[self._endpoint][self._option] - - def produce_endpoint(self, **parts): - self._option = len(parts) - self._endpoint = parts["endpoint"] - parts.update({"fmt": self.fmt}) - return self.endpoint.format(**parts) - - -class RadioBrowser: - def __init__(self, fmt="json"): - self.fmt = fmt - self.builder = EndPointBuilder(fmt=self.fmt) - - def countries(self, filter=""): - endpoint = self.builder.produce_endpoint(endpoint="countries") - return request(endpoint) - - def codecs(self, filter=""): - endpoint = self.builder.produce_endpoint(endpoint="codecs") - return request(endpoint) - - def states(self, country="", filter=""): - endpoint = self.builder.produce_endpoint( - endpoint="states", country=country, filter=filter - ) - return request(endpoint) - - def languages(self, filter=""): - endpoint = self.builder.produce_endpoint(endpoint="languages", filter=filter) - return request(endpoint) - - def tags(self, filter=""): - endpoint = self.builder.produce_endpoint(endpoint="tags", filter=filter) - return request(endpoint) - - def stations(self, **params): - endpoint = self.builder.produce_endpoint(endpoint="stations") - kwargs = {} - if params: - kwargs.update({"params": params}) - return request(endpoint, **kwargs) - - def stations_byid(self, id): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="byid", search_term=id - ) - return request(endpoint) - - def stations_byuuid(self, uuid): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="byuuid", search_term=uuid - ) - return request(endpoint) - - def stations_byname(self, name): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="byname", search_term=name - ) - return request(endpoint) - - def stations_bynameexact(self, nameexact): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bynameexact", search_term=nameexact - ) - return request(endpoint) - - def stations_bycodec(self, codec): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bycodec", search_term=codec - ) - return request(endpoint) - - def stations_bycodecexact(self, codecexact): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bycodecexact", search_term=codecexact - ) - return request(endpoint) - - def stations_bycountry(self, country): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bycountry", search_term=country - ) - return request(endpoint) - - def stations_bycountryexact(self, countryexact): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bycountryexact", search_term=countryexact - ) - return request(endpoint) - - def stations_bystate(self, state): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bystate", search_term=state - ) - return request(endpoint) - - def stations_bystateexact(self, stateexact): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bystateexact", search_term=stateexact - ) - return request(endpoint) - - # - def stations_bylanguage(self, language): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bylanguage", search_term=language - ) - return request(endpoint) - - def stations_bylanguageexact(self, languageexact): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bylanguageexact", search_term=languageexact - ) - return request(endpoint) - - def stations_bytag(self, tag): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bytag", search_term=tag - ) - return request(endpoint) - - def stations_bytagexact(self, tagexact): - endpoint = self.builder.produce_endpoint( - endpoint="stations", by="bytagexact", search_term=tagexact - ) - return request(endpoint) - - def playable_station(self, station_id): - endpoint = self.builder.produce_endpoint( - endpoint="playable_station", station_id=station_id, ver="v2" - ) - - return request(endpoint) - - def station_search(self, params, **kwargs): - # http://www.radio-browser.info/webservice#Advanced_station_search - assert isinstance(params, dict), "params is not a dict" - kwargs["params"] = params - endpoint = self.builder.produce_endpoint(endpoint="station_search") - return request(endpoint, **kwargs) - \ No newline at end of file
IDStation Name
' + s['id'] + '' + s['stationname'] + '