139 lines
4.7 KiB
Python
139 lines
4.7 KiB
Python
import urllib
|
|
import socket
|
|
import os
|
|
import urllib.parse
|
|
import urllib.request
|
|
import subprocess
|
|
import logging
|
|
from .common import KEY_ALBUM, KEY_ARTIST, KEY_GENRE, KEY_TITLE, KEY_TRACK, KEY_YEAR
|
|
|
|
try:
|
|
from config import APP_NAME as ROOT_LOGGER_NAME
|
|
except ImportError:
|
|
ROOT_LOGGER_NAME = 'root'
|
|
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
|
|
|
|
|
|
version = 2.0
|
|
if 'EMAIL' in os.environ:
|
|
(default_user, hostname) = os.environ['EMAIL'].split('@')
|
|
else:
|
|
default_user = os.environ.get('USER') or os.geteuid() or 'user'
|
|
hostname = socket.gethostname() or 'host'
|
|
|
|
proto = 6
|
|
default_server = 'http://gnudb.gnudb.org/~cddb/cddb.cgi'
|
|
|
|
|
|
def my_disc_metadata(**kwargs):
|
|
"""Generate my disc metadata
|
|
|
|
kwargs needs to include the following data:
|
|
* KEY_ARTIST (str)
|
|
* KEY_ALBUM (str)
|
|
* KEY_YEAR (int) - will be converted here
|
|
* KEY_GENRE (str)
|
|
* "track_xx" (str) - where xx is the track number which will be converted to int here
|
|
"""
|
|
main_dict = {}
|
|
for key in [KEY_ARTIST, KEY_ALBUM, KEY_YEAR, KEY_GENRE]:
|
|
try:
|
|
value = kwargs.pop(key)
|
|
except KeyError:
|
|
logger.error("Information is missing in kwargs - key=%s", key)
|
|
return None
|
|
if key in [KEY_YEAR]:
|
|
try:
|
|
main_dict[key] = int(value)
|
|
except ValueError:
|
|
logger.error("Can't convert %s (key=%s) to integer value", value, key)
|
|
return None
|
|
else:
|
|
main_dict[key] = value
|
|
rv = dict(main_dict)
|
|
rv["tracks"] = []
|
|
for key in list(kwargs):
|
|
value = kwargs.pop(key)
|
|
if key.startswith("track_"):
|
|
track = dict(main_dict)
|
|
try:
|
|
track[KEY_TRACK] = int(key[6:])
|
|
except ValueError:
|
|
logger.warning("Useless information kwargs - kwargs[%s] = %s", key, repr(value))
|
|
track[KEY_TITLE] = value
|
|
rv["tracks"].append(track)
|
|
else:
|
|
logger.warning("Useless information kwargs - key=%s", key)
|
|
return rv
|
|
|
|
|
|
def query(data_str, server_url=default_server, user=default_user, host=hostname, client_name=ROOT_LOGGER_NAME, client_version=version):
|
|
url = f"{server_url}?cmd=cddb+query+{data_str}&hello={user}+{host}+{client_name}+{client_version}&proto={proto}"
|
|
response = urllib.request.urlopen(url)
|
|
header = response.readline().decode("utf-8").rstrip().split(" ", 3)
|
|
header[0] = int(header[0])
|
|
|
|
if header[0] not in (210, ):
|
|
logger.error("Error while querying cddb entry: \"%d - %s\"", header[0], header[3])
|
|
return None
|
|
|
|
rv = {}
|
|
for line in response.readlines():
|
|
line = line.decode("utf-8").rstrip()
|
|
if line == '.': # end of matches
|
|
break
|
|
dummy, did, txt = line.split(" ", 2)
|
|
rv[did] = txt
|
|
return rv
|
|
|
|
|
|
def cddb(disc_id, server_url=default_server, user=default_user, host=hostname, client_name=ROOT_LOGGER_NAME, client_version=version):
|
|
KEY_TRANSLATOR = {
|
|
"DGENRE": KEY_GENRE,
|
|
"DYEAR": KEY_YEAR
|
|
}
|
|
#
|
|
url = f"{server_url}?cmd=cddb+read+data+{disc_id}&hello={default_server}+{hostname}+{client_name}+{client_version}&proto={proto}"
|
|
response = urllib.request.urlopen(url)
|
|
|
|
header = response.readline().decode("utf-8").rstrip().split(" ", 3)
|
|
header[0] = int(header[0])
|
|
|
|
if header[0] not in (210, ):
|
|
logger.error("Error while reading cddb entry: \"%d - %s\"", header[1], header[3])
|
|
return None
|
|
data = {}
|
|
for line in response.readlines():
|
|
line = line.decode("utf-8").rstrip()
|
|
if line == '.': # end of matches
|
|
break
|
|
if not line.startswith("#"):
|
|
match = line.split('=', 2)
|
|
key = KEY_TRANSLATOR.get(match[0])
|
|
value = match[1].strip()
|
|
if key:
|
|
if key == KEY_YEAR:
|
|
value = int(value)
|
|
data[key] = value
|
|
elif match[0] == "DTITLE":
|
|
art_tit = value.split(" / ", 2)
|
|
data[KEY_ARTIST] = art_tit[0].strip()
|
|
data[KEY_ALBUM] = art_tit[1].strip()
|
|
elif match[0].startswith("TTITLE"):
|
|
data["track_%02d" % (int(match[0][6:]) + 1)] = value
|
|
else:
|
|
logger.debug("cddb line ignored: \"%s\"", line)
|
|
return my_disc_metadata(**data)
|
|
|
|
|
|
def discid():
|
|
discid_cmd = subprocess.getoutput("which cd-discid")
|
|
if not discid_cmd:
|
|
logger.error("cd-discid is required for encoding. You need to install it to your system.")
|
|
return None
|
|
else:
|
|
try:
|
|
return subprocess.check_output(discid_cmd).decode("utf-8").strip().replace(" ", "+")
|
|
except subprocess.CalledProcessError as e:
|
|
return None
|