refactor: replace urllib into requests in radio.py

This commit is contained in:
Terry Geng 2020-03-02 19:02:56 +08:00
parent 26eb650e9b
commit 13753afa83

View File

@ -1,13 +1,16 @@
import re import re
import urllib.request
import urllib.error
import logging import logging
import json import json
import http.client import http.client
import struct import struct
import requests
import traceback
log = logging.getLogger("bot")
def get_radio_server_description(url): def get_radio_server_description(url):
global log
p = re.compile('(https?\:\/\/[^\/]*)', re.IGNORECASE) p = re.compile('(https?\:\/\/[^\/]*)', re.IGNORECASE)
res = re.search(p, url) res = re.search(p, url)
base_url = res.group(1) base_url = res.group(1)
@ -15,59 +18,52 @@ def get_radio_server_description(url):
url_shoutcast = base_url + '/stats?json=1' url_shoutcast = base_url + '/stats?json=1'
title_server = None title_server = None
try: try:
request = urllib.request.Request(url_shoutcast) r = requests.get(url_shoutcast, timeout=5)
response = urllib.request.urlopen(request) data = r.json()
data = json.loads(response.read().decode("utf-8"))
title_server = data['servertitle'] title_server = data['servertitle']
return title_server
# logging.info("TITLE FOUND SHOUTCAST: " + title_server) # logging.info("TITLE FOUND SHOUTCAST: " + title_server)
except urllib.error.HTTPError: except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
pass error_traceback = traceback.format_exc()
except http.client.BadStatusLine: error = error_traceback.rstrip().split("\n")[-1]
pass log.debug("radio: unsuccessful attempts on fetching radio description (shoutcast): " + error)
except ValueError: except ValueError:
return False return False # ?
if not title_server: try:
try: r = requests.get(url_icecast, timeout=5)
request = urllib.request.Request(url_icecast) data = r.json()
response = urllib.request.urlopen(request) source = data['icestats']['source']
response_data = response.read().decode('utf-8', errors='ignore') if type(source) is list:
if response_data: source = source[0]
data = json.loads(response_data, strict=False) title_server = source['server_name']
source = data['icestats']['source'] if 'server_description' in source:
if type(source) is list: title_server += ' - ' + source['server_description']
source = source[0] # logging.info("TITLE FOUND ICECAST: " + title_server)
title_server = source['server_name'] return title_server
if 'server_description' in source: except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
title_server += ' - ' + source['server_description'] error_traceback = traceback.format_exc()
# logging.info("TITLE FOUND ICECAST: " + title_server) error = error_traceback.rstrip().split("\n")[-1]
if not title_server: log.debug("radio: unsuccessful attempts on fetching radio description (icecast): " + error)
title_server = url
except urllib.error.URLError: return url
title_server = url
except http.client.BadStatusLine:
pass
return title_server
def get_radio_title(url): def get_radio_title(url):
request = urllib.request.Request(url, headers={'Icy-MetaData': 1})
try: try:
r = requests.get(url, headers={'Icy-MetaData': '1'}, stream=True, timeout=5)
icy_metaint_header = int(r.headers['icy-metaint'])
r.raw.read(icy_metaint_header)
response = urllib.request.urlopen(request) metadata_length = struct.unpack('B', r.raw.read(1))[0] * 16 # length byte
icy_metaint_header = int(response.headers['icy-metaint']) metadata = r.raw.read(metadata_length).rstrip(b'\0')
if icy_metaint_header is not None: logging.info(metadata)
response.read(icy_metaint_header) # extract title from the metadata
m = re.search(br"StreamTitle='([^']*)';", metadata)
metadata_length = struct.unpack('B', response.read(1))[0] * 16 # length byte if m:
metadata = response.read(metadata_length).rstrip(b'\0') title = m.group(1)
logging.info(metadata) if title:
# extract title from the metadata return title.decode()
m = re.search(br"StreamTitle='([^']*)';", metadata) except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
if m:
title = m.group(1)
if title:
return title.decode()
except (urllib.error.URLError, urllib.error.HTTPError, http.client.BadStatusLine):
pass pass
return 'Unknown title' return url