#!/usr/bin/python3 # coding=utf-8 import hashlib import magic import os import sys import variables as var import zipfile import requests import re import subprocess as sp import logging import youtube_dl from importlib import reload from sys import platform import traceback import urllib.request from packaging import version log = logging.getLogger("bot") def solve_filepath(path): if not path: return '' if path[0] == '/': return path else: mydir = os.path.dirname(os.path.realpath(__file__)) return mydir + '/' + path def get_recursive_file_list_sorted(path): filelist = [] for root, dirs, files in os.walk(path, topdown=True, onerror=None, followlinks=True): relroot = root.replace(path, '', 1) if relroot != '' and relroot in var.config.get('bot', 'ignored_folders'): continue if len(relroot): relroot += '/' for file in files: if file in var.config.get('bot', 'ignored_files'): continue fullpath = os.path.join(path, relroot, file) if not os.access(fullpath, os.R_OK): continue try: mime = magic.from_file(fullpath, mime=True) if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime: filelist.append(relroot + file) except: pass filelist.sort() return filelist # - zips all files of the given zippath (must be a directory) # - returns the absolute path of the created zip file # - zip file will be in the applications tmp folder (according to configuration) # - format of the filename itself = prefix_hash.zip # - prefix can be controlled by the caller # - hash is a sha1 of the string representation of the directories' contents (which are # zipped) def zipdir(zippath, zipname_prefix=None): zipname = var.tmp_folder if zipname_prefix and '../' not in zipname_prefix: zipname += zipname_prefix.strip().replace('/', '_') + '_' files = get_recursive_file_list_sorted(zippath) hash = hashlib.sha1((str(files).encode())).hexdigest() zipname += hash + '.zip' if os.path.exists(zipname): return zipname zipf = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED) for file in files: file_to_add = os.path.join(zippath, file) if not os.access(file_to_add, os.R_OK): continue if file in var.config.get('bot', 'ignored_files'): continue add_file_as = os.path.relpath(os.path.join(zippath, file), os.path.join(zippath, '..')) zipf.write(file_to_add, add_file_as) zipf.close() return zipname def get_user_ban(): res = "List of ban hash" for i in var.db.items("user_ban"): res += "
" + i[0] return res def new_release_version(): v = urllib.request.urlopen(urllib.request.Request("https://packages.azlux.fr/botamusique/version")).read() return v.rstrip().decode() def update(current_version): global log new_version = new_release_version() target = var.config.get('bot', 'target_version') if version.parse(new_version) > version.parse(current_version) or target == "testing": log.info('update: new version, start updating...') tp = sp.check_output(['/usr/bin/env', 'bash', 'update.sh', target]).decode() log.debug(tp) log.info('update: update pip libraries dependencies') sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', '-r', 'requirements.txt']).decode() msg = "New version installed, please restart the bot." if target == "testing": msg += tp.replace('\n', '
') else: log.info('update: starting update youtube-dl via pip3') tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode() msg = "" if "Requirement already up-to-date" in tp: msg += "Youtube-dl is up-to-date" else: msg += "Update done: " + tp.split('Successfully installed')[1] reload(youtube_dl) msg += "
Youtube-dl reloaded" return msg def user_ban(user): var.db.set("user_ban", user, None) res = "User " + user + " banned" return res def user_unban(user): var.db.remove_option("user_ban", user) res = "Done" return res def get_url_ban(): res = "List of ban:" for i in var.db.items("url_ban"): res += "
" + i[0] return res def url_ban(url): var.db.set("url_ban", url, None) res = "url " + url + " banned" return res def url_unban(url): var.db.remove_option("url_ban", url) res = "Done" return res def pipe_no_wait(pipefd): """ Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()""" if platform == "linux" or platform == "linux2" or platform == "darwin": import fcntl import os try: fl = fcntl.fcntl(pipefd, fcntl.F_GETFL) fcntl.fcntl(pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK) except: print(sys.exc_info()[1]) return False else: return True elif platform == "win32": # https://stackoverflow.com/questions/34504970/non-blocking-read-on-os-pipe-on-windows import msvcrt import os from ctypes import windll, byref, wintypes, WinError, POINTER from ctypes.wintypes import HANDLE, DWORD, BOOL LPDWORD = POINTER(DWORD) PIPE_NOWAIT = wintypes.DWORD(0x00000001) ERROR_NO_DATA = 232 SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD] SetNamedPipeHandleState.restype = BOOL h = msvcrt.get_osfhandle(pipefd) res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None) if res == 0: print(WinError()) return False return True class Dir(object): def __init__(self, path): self.name = os.path.basename(path.strip('/')) self.fullpath = path self.subdirs = {} self.files = [] def add_file(self, file): if file.startswith(self.name + '/'): file = file.replace(self.name + '/', '', 1) if '/' in file: # This file is in a subdir subdir = file.split('/')[0] if subdir in self.subdirs: self.subdirs[subdir].add_file(file) else: self.subdirs[subdir] = Dir(os.path.join(self.fullpath, subdir)) self.subdirs[subdir].add_file(file) else: self.files.append(file) return True def get_subdirs(self, path=None): subdirs = [] if path and path != '' and path != './': subdir = path.split('/')[0] if subdir in self.subdirs: searchpath = '/'.join(path.split('/')[1::]) subdirs = self.subdirs[subdir].get_subdirs(searchpath) subdirs = list(map(lambda subsubdir: os.path.join(subdir, subsubdir), subdirs)) else: subdirs = self.subdirs return subdirs def get_subdirs_recursively(self, path=None): subdirs = [] if path and path != '' and path != './': subdir = path.split('/')[0] if subdir in self.subdirs: searchpath = '/'.join(path.split('/')[1::]) subdirs = self.subdirs[subdir].get_subdirs_recursively(searchpath) else: subdirs = list(self.subdirs.keys()) for key, val in self.subdirs.items(): subdirs.extend(map(lambda subdir: key + '/' + subdir, val.get_subdirs_recursively())) subdirs.sort() return subdirs def get_files(self, path=None): files = [] if path and path != '' and path != './': subdir = path.split('/')[0] if subdir in self.subdirs: searchpath = '/'.join(path.split('/')[1::]) files = self.subdirs[subdir].get_files(searchpath) else: files = self.files return files def get_files_recursively(self, path=None): files = [] if path and path != '' and path != './': subdir = path.split('/')[0] if subdir in self.subdirs: searchpath = '/'.join(path.split('/')[1::]) files = self.subdirs[subdir].get_files_recursively(searchpath) else: files = self.files for key, val in self.subdirs.items(): files.extend(map(lambda file: key + '/' + file, val.get_files_recursively())) return files def render_text(self, ident=0): print('{}{}/'.format(' ' * (ident * 4), self.name)) for key, val in self.subdirs.items(): val.render_text(ident + 1) for file in self.files: print('{}{}'.format(' ' * (ident + 1) * 4, file)) # Parse the html from the message to get the URL def get_url_from_input(string): if string.startswith('http'): return string p = re.compile('href="(.+?)"', re.IGNORECASE) res = re.search(p, string) if res: return res.group(1) else: return False def youtube_search(query): global log try: r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5) results = re.findall(r"watch\?v=(.*?)\".*?title=\"(.*?)\".*?" "(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader) if len(results) > 0: return results except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout): error_traceback = traceback.format_exc().split("During")[0] log.error("util: youtube query failed with error:\n %s" % error_traceback) return False