bragi/media/url.py
Gert 47687a5e06 Skip unnecessary MP3 conversion via FFmpeg after audio download
Currently, after downloading YouTube videos, FFmpeg is used to convert
the downloaded audio file into MP3 for historic reasons (there was no
database to keep the metadata around, so the ID3 tags in the MP3 file
were necessary to be able to read the metadata later on).

Now that a metadata database exists, this is no longer necessary. This
conversion is also fairly straining for slower CPUs or CPUs that do not
offer the appropriate processor extensions to be able to accelerate this
conversion, such as older ARM devices - in my case an ARMv7 32-bit
device, where the conversion could take over a minute for a fairly
simple 3-minute audio file of keeping a single core maxed out.

This should also result in less latency playing audio files on stronger
processors, though probably less noticeably.

Fixes #205
2020-09-25 20:58:18 +08:00

244 lines
7.9 KiB
Python

import threading
import logging
import os
import hashlib
import traceback
from PIL import Image
import youtube_dl
import glob
from io import BytesIO
import base64
from constants import tr_cli as tr
import media
import variables as var
from media.item import BaseItem, item_builders, item_loaders, item_id_generators, ValidationFailedError, \
PreparationFailedError
import media.system
log = logging.getLogger("bot")
def url_item_builder(**kwargs):
return URLItem(kwargs['url'])
def url_item_loader(_dict):
return URLItem("", _dict)
def url_item_id_generator(**kwargs):
return hashlib.md5(kwargs['url'].encode()).hexdigest()
item_builders['url'] = url_item_builder
item_loaders['url'] = url_item_loader
item_id_generators['url'] = url_item_id_generator
class URLItem(BaseItem):
def __init__(self, url, from_dict=None):
self.validating_lock = threading.Lock()
if from_dict is None:
super().__init__()
self.url = url if url[-1] != "/" else url[:-1]
self.title = ""
self.duration = 0
self.id = hashlib.md5(url.encode()).hexdigest()
self.path = var.tmp_folder + self.id
self.thumbnail = ""
self.keywords = ""
else:
super().__init__(from_dict)
self.url = from_dict['url']
self.duration = from_dict['duration']
self.path = from_dict['path']
self.title = from_dict['title']
self.thumbnail = from_dict['thumbnail']
self.downloading = False
self.type = "url"
def uri(self):
return self.path
def is_ready(self):
if self.downloading or self.ready != 'yes':
return False
if self.ready == 'yes' and not os.path.exists(self.path):
self.log.info(
"url: music file missed for %s" % self.format_debug_string())
self.ready = 'validated'
return False
return True
def validate(self):
self.validating_lock.acquire()
if self.ready in ['yes', 'validated']:
self.validating_lock.release()
return True
# if self.ready == 'failed':
# self.validating_lock.release()
# return False
#
if os.path.exists(self.path):
self.validating_lock.release()
self.ready = "yes"
return True
# avoid multiple process validating in the meantime
info = self._get_info_from_url()
self.validating_lock.release()
if not info:
return False
if self.duration > var.config.getint('bot', 'max_track_duration') * 60 != 0:
# Check the length, useful in case of playlist, it wasn't checked before)
log.info(
"url: " + self.url + " has a duration of " + str(self.duration / 60) + " min -- too long")
raise ValidationFailedError(tr('too_long', song=self.title))
else:
self.ready = "validated"
self.version += 1 # notify wrapper to save me
return True
# Run in a other thread
def prepare(self):
if not self.downloading:
assert self.ready == 'validated'
return self._download()
else:
assert self.ready == 'yes'
return True
def _get_info_from_url(self):
self.log.info("url: fetching metadata of url %s " % self.url)
ydl_opts = {
'noplaylist': True
}
succeed = False
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
attempts = var.config.getint('bot', 'download_attempts', fallback=2)
for i in range(attempts):
try:
info = ydl.extract_info(self.url, download=False)
self.duration = info['duration']
self.title = info['title']
self.keywords = info['title']
succeed = True
return True
except youtube_dl.utils.DownloadError:
pass
except KeyError: # info has no 'duration'
break
if not succeed:
self.ready = 'failed'
self.log.error("url: error while fetching info from the URL")
raise ValidationFailedError(tr('unable_download', item=self.format_title()))
def _download(self):
media.system.clear_tmp_folder(var.tmp_folder, var.config.getint('bot', 'tmp_folder_max_size'))
self.downloading = True
base_path = var.tmp_folder + self.id
save_path = base_path
# Download only if music is not existed
self.ready = "preparing"
self.log.info("bot: downloading url (%s) %s " % (self.title, self.url))
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': base_path,
'noplaylist': True,
'writethumbnail': True,
'updatetime': False
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
attempts = var.config.getint('bot', 'download_attempts', fallback=2)
download_succeed = False
for i in range(attempts):
self.log.info("bot: download attempts %d / %d" % (i+1, attempts))
try:
ydl.extract_info(self.url)
download_succeed = True
break
except:
error_traceback = traceback.format_exc().split("During")[0]
error = error_traceback.rstrip().split("\n")[-1]
self.log.error("bot: download failed with error:\n %s" % error)
if download_succeed:
self.path = save_path
self.ready = "yes"
self.log.info(
"bot: finished downloading url (%s) %s, saved to %s." % (self.title, self.url, self.path))
self.downloading = False
self._read_thumbnail_from_file(base_path + ".jpg")
self.version += 1 # notify wrapper to save me
return True
else:
for f in glob.glob(base_path + "*"):
os.remove(f)
self.ready = "failed"
self.downloading = False
raise PreparationFailedError(tr('unable_download', item=self.format_title()))
def _read_thumbnail_from_file(self, path_thumbnail):
if os.path.isfile(path_thumbnail):
im = Image.open(path_thumbnail)
self.thumbnail = self._prepare_thumbnail(im)
def _prepare_thumbnail(self, im):
im.thumbnail((100, 100), Image.ANTIALIAS)
buffer = BytesIO()
im = im.convert('RGB')
im.save(buffer, format="JPEG")
return base64.b64encode(buffer.getvalue()).decode('utf-8')
def to_dict(self):
dict = super().to_dict()
dict['type'] = 'url'
dict['url'] = self.url
dict['duration'] = self.duration
dict['path'] = self.path
dict['title'] = self.title
dict['thumbnail'] = self.thumbnail
return dict
def format_debug_string(self):
return "[url] {title} ({url})".format(
title=self.title,
url=self.url
)
def format_song_string(self, user):
if self.ready in ['validated', 'yes']:
return tr("url_item",
title=self.title if self.title else "??",
url=self.url,
user=user)
return self.url
def format_current_playing(self, user):
display = tr("now_playing", item=self.format_song_string(user))
if self.thumbnail:
thumbnail_html = '<img width="80" src="data:image/jpge;base64,' + \
self.thumbnail + '"/>'
display += "<br />" + thumbnail_html
return display
def format_title(self):
return self.title if self.title.strip() else self.url
def display_type(self):
return tr("url")