123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import urllib
- import socket
- import os
- import urllib.parse
- import urllib.request
- import subprocess
- import logging
- from .common import KEY_ALBUM, KEY_ARTIST, KEY_GENRE, KEY_TITLE, KEY_TRACK, KEY_YEAR
-
- try:
- from config import APP_NAME as ROOT_LOGGER_NAME
- except ImportError:
- ROOT_LOGGER_NAME = 'root'
- logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
-
-
- version = 2.0
- if 'EMAIL' in os.environ:
- (default_user, hostname) = os.environ['EMAIL'].split('@')
- else:
- default_user = os.environ.get('USER') or os.geteuid() or 'user'
- hostname = socket.gethostname() or 'host'
-
- proto = 6
- default_server = 'http://gnudb.gnudb.org/~cddb/cddb.cgi'
-
-
- def my_disc_metadata(**kwargs):
- """Generate my disc metadata
-
- kwargs needs to include the following data:
- * KEY_ARTIST (str)
- * KEY_ALBUM (str)
- * KEY_YEAR (int) - will be converted here
- * KEY_GENRE (str)
- * "track_xx" (str) - where xx is the track number which will be converted to int here
- """
- main_dict = {}
- for key in [KEY_ARTIST, KEY_ALBUM, KEY_YEAR, KEY_GENRE]:
- try:
- value = kwargs.pop(key)
- except KeyError:
- logger.error("Information is missing in kwargs - key=%s", key)
- return None
- if key in [KEY_YEAR]:
- try:
- main_dict[key] = int(value)
- except ValueError:
- logger.error("Can't convert %s (key=%s) to integer value", value, key)
- return None
- else:
- main_dict[key] = value
- rv = dict(main_dict)
- rv["tracks"] = []
- for key in list(kwargs):
- value = kwargs.pop(key)
- if key.startswith("track_"):
- track = dict(main_dict)
- try:
- track[KEY_TRACK] = int(key[6:])
- except ValueError:
- logger.warning("Useless information kwargs - kwargs[%s] = %s", key, repr(value))
- track[KEY_TITLE] = value
- rv["tracks"].append(track)
- else:
- logger.warning("Useless information kwargs - key=%s", key)
- return rv
-
-
- def query(data_str, server_url=default_server, user=default_user, host=hostname, client_name=ROOT_LOGGER_NAME, client_version=version):
- url = f"{server_url}?cmd=cddb+query+{data_str}&hello={user}+{host}+{client_name}+{client_version}&proto={proto}"
- response = urllib.request.urlopen(url)
- header = response.readline().decode("utf-8").rstrip().split(" ", 3)
- header[0] = int(header[0])
-
- if header[0] not in (210, ):
- logger.error("Error while querying cddb entry: \"%d - %s\"", header[0], header[3])
- return None
-
- rv = {}
- for line in response.readlines():
- line = line.decode("utf-8").rstrip()
- if line == '.': # end of matches
- break
- dummy, did, txt = line.split(" ", 2)
- rv[did] = txt
- return rv
-
-
- def cddb(disc_id, server_url=default_server, user=default_user, host=hostname, client_name=ROOT_LOGGER_NAME, client_version=version):
- KEY_TRANSLATOR = {
- "DGENRE": KEY_GENRE,
- "DYEAR": KEY_YEAR
- }
- #
- url = f"{server_url}?cmd=cddb+read+data+{disc_id}&hello={default_server}+{hostname}+{client_name}+{client_version}&proto={proto}"
- response = urllib.request.urlopen(url)
-
- header = response.readline().decode("utf-8").rstrip().split(" ", 3)
- header[0] = int(header[0])
-
- if header[0] not in (210, ):
- logger.error("Error while reading cddb entry: \"%d - %s\"", header[1], header[3])
- return None
- data = {}
- for line in response.readlines():
- line = line.decode("utf-8").rstrip()
- if line == '.': # end of matches
- break
- if not line.startswith("#"):
- match = line.split('=', 2)
- key = KEY_TRANSLATOR.get(match[0])
- value = match[1].strip()
- if key:
- if key == KEY_YEAR:
- value = int(value)
- data[key] = value
- elif match[0] == "DTITLE":
- art_tit = value.split("/", 2)
- data[KEY_ARTIST] = art_tit[0].strip()
- data[KEY_ALBUM] = art_tit[1].strip()
- elif match[0].startswith("TTITLE"):
- data["track_%02d" % (int(match[0][6:]) + 1)] = value
- else:
- logger.debug("cddb line ignored: \"%s\"", line)
- return my_disc_metadata(**data)
-
-
- def discid():
- discid_cmd = subprocess.getoutput("which cd-discid")
- if not discid_cmd:
- logger.error("cd-discid is required for encoding. You need to install it to your system.")
- return None
- else:
- try:
- return subprocess.check_output(discid_cmd).decode("utf-8").strip().replace(" ", "+")
- except subprocess.CalledProcessError as e:
- return None
|