media/CDDB.py

139 lines
4.7 KiB
Python

import urllib
import socket
import os
import urllib.parse
import urllib.request
import subprocess
import logging
from .common import KEY_ALBUM, KEY_ARTIST, KEY_GENRE, KEY_TITLE, KEY_TRACK, KEY_YEAR
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
version = 2.0
if 'EMAIL' in os.environ:
(default_user, hostname) = os.environ['EMAIL'].split('@')
else:
default_user = os.environ['USER'] or os.geteuid() or 'user'
hostname = socket.gethostname() or 'host'
proto = 6
default_server = 'http://gnudb.gnudb.org/~cddb/cddb.cgi'
def my_disc_metadata(**kwargs):
"""Generate my disc metadata
kwargs needs to include the following data:
* KEY_ARTIST (str)
* KEY_ALBUM (str)
* KEY_YEAR (int) - will be converted here
* KEY_GENRE (str)
* "track_xx" (str) - where xx is the track number which will be converted to int here
"""
main_dict = {}
for key in [KEY_ARTIST, KEY_ALBUM, KEY_YEAR, KEY_GENRE]:
try:
value = kwargs.pop(key)
except KeyError:
logger.error("Information is missing in kwargs - key=%s", key)
return None
if key in [KEY_YEAR]:
try:
main_dict[key] = int(value)
except ValueError:
logger.error("Can't convert %s (key=%s) to integer value", value, key)
return None
else:
main_dict[key] = value
rv = dict(main_dict)
rv["tracks"] = []
for key in list(kwargs):
value = kwargs.pop(key)
if key.startswith("track_"):
track = dict(main_dict)
try:
track[KEY_TRACK] = int(key[6:])
except ValueError:
logger.warning("Useless information kwargs - kwargs[%s] = %s", key, repr(value))
track[KEY_TITLE] = value
rv["tracks"].append(track)
else:
logger.warning("Useless information kwargs - key=%s", key)
return rv
def query(data_str, server_url=default_server, user=default_user, host=hostname, client_name=ROOT_LOGGER_NAME, client_version=version):
url = f"{server_url}?cmd=cddb+query+{data_str}&hello={user}+{host}+{client_name}+{client_version}&proto={proto}"
response = urllib.request.urlopen(url)
header = response.readline().decode("utf-8").rstrip().split(" ", 3)
header[0] = int(header[0])
if header[0] not in (210, ):
logger.error("Error while querying cddb entry: \"%d - %s\"", header[0], header[3])
return None
rv = {}
for line in response.readlines():
line = line.decode("utf-8").rstrip()
if line == '.': # end of matches
break
dummy, did, txt = line.split(" ", 2)
rv[did] = txt
return rv
def cddb(disc_id, server_url=default_server, user=default_user, host=hostname, client_name=ROOT_LOGGER_NAME, client_version=version):
KEY_TRANSLATOR = {
"DGENRE": KEY_GENRE,
"DYEAR": KEY_YEAR
}
#
url = f"{server_url}?cmd=cddb+read+data+{disc_id}&hello={default_server}+{hostname}+{client_name}+{client_version}&proto={proto}"
response = urllib.request.urlopen(url)
header = response.readline().decode("utf-8").rstrip().split(" ", 3)
header[0] = int(header[0])
if header[0] not in (210, ):
logger.error("Error while reading cddb entry: \"%d - %s\"", header[1], header[3])
return None
data = {}
for line in response.readlines():
line = line.decode("utf-8").rstrip()
if line == '.': # end of matches
break
if not line.startswith("#"):
match = line.split('=', 2)
key = KEY_TRANSLATOR.get(match[0])
value = match[1].strip()
if key:
if key == KEY_YEAR:
value = int(value)
data[key] = value
elif match[0] == "DTITLE":
art_tit = value.split(" / ", 2)
data[KEY_ARTIST] = art_tit[0].strip()
data[KEY_ALBUM] = art_tit[1].strip()
elif match[0].startswith("TTITLE"):
data["track_%02d" % (int(match[0][6:]) + 1)] = value
else:
logger.debug("cddb line ignored: \"%s\"", line)
return my_disc_metadata(**data)
def discid():
discid_cmd = subprocess.getoutput("which cd-discid")
if not discid_cmd:
logger.error("cd-discid is required for encoding. You need to install it to your system.")
return None
else:
try:
return subprocess.check_output(discid_cmd).decode("utf-8").strip().replace(" ", "+")
except subprocess.CalledProcessError as e:
return None