123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- import io
- from media import common
- from PIL import Image
- import subprocess
- import platform
- import logging
- import os
- import subprocess
-
- try:
- from config import APP_NAME as ROOT_LOGGER_NAME
- except ImportError:
- ROOT_LOGGER_NAME = 'root'
- logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
-
-
- def get_pil_image(media_instance):
- try:
- media_instance = media_instance._im
- except AttributeError:
- pass
- #
- if type(media_instance) == str:
- ft = common.get_filetype(media_instance)
- if ft == common.FILETYPE_IMAGE:
- return Image.open(media_instance)
- elif ft == common.FILETYPE_VIDEO:
- if platform.system() == 'Linux':
- cmd = 'ffmpeg -ss 0.5 -i "' + media_instance + '" -vframes 1 -f image2pipe pipe:1 2> /dev/null'
- else:
- cmd = 'ffmpeg -ss 0.5 -i "' + media_instance + '" -vframes 1 -f image2pipe pipe:1 2> NULL'
- try:
- data = subprocess.check_output(cmd, shell=True)
- except subprocess.CalledProcessError:
- logger.warning('ffmpeg seems to be not installed')
- return None
- ffmpeg_handle = io.BytesIO(data)
- im = Image.open(ffmpeg_handle)
- return im.copy()
- logger.warning('Filetype is not supported (%s)', media_instance)
- elif type(media_instance) == Image.Image:
- return media_instance.copy()
- else:
- logger.warning('Instance type is not supported: %s' % type(media_instance))
-
-
- def FilenameFilter(filename: str) -> str:
- # WHITELIST = [os.path.sep, os.path.extsep]
- WHITELIST = [chr(x) for x in range(ord('0'), ord('9') + 1)]
- WHITELIST += [chr(x) for x in range(ord('a'), ord('z') + 1)]
- WHITELIST += ["ä", "ö", "ü", "ß"]
- #
- rv = ""
- for c in filename.lower():
- rv += c if c in WHITELIST else '_'
- return rv
-
-
- def track_to_targetpath(basepath: str, track: dict, ext: str):
- return os.path.join(
- basepath,
- FilenameFilter(track[common.KEY_ARTIST]),
- "%04d_" % track[common.KEY_YEAR] + FilenameFilter(track[common.KEY_ALBUM]),
- "%02d_" % track[common.KEY_TRACK] + FilenameFilter(track[common.KEY_TITLE]) + "." + ext
- )
-
-
- def disc_track_rip(track_num: int, target_file: str, progress_callback):
- FAC_SEC_VAL = 1224
- #
- cdp_cmd = subprocess.getoutput("which cdparanoia")
- if not cdp_cmd:
- logger.error("cdparanoia is required for ripping. You need to install it to your system.")
- else:
- cmd = [cdp_cmd, "-e", "-X", "%d" % track_num, target_file]
- cdp = subprocess.Popen(cmd, text=True, stderr=subprocess.PIPE)
- #
- rval = 0
- min_sec = None
- max_sec = None
- min_read = None
- while (out := cdp.stderr.readline()) != "":
- out = out.strip()
- # identify minimum sector
- if ("Ripping from sector" in out):
- min_sec = int(list(filter(None, out.split(" ")))[3])
- # identify maximum sector
- if ("to sector" in out):
- max_sec = int(list(filter(None, out.split(" ")))[2])
- # identify progress
- if "[read]" in out:
- val = int(out.split(" ")[-1])
- if not min_read:
- min_read = val
- rval = max(val, rval)
- try:
- dsec = max_sec - min_sec
- except TypeError:
- logger.exception("Error while parsing cdparanoia. Start and End sector could not be detrmined.")
- else:
- p = (rval - min_read) / FAC_SEC_VAL / dsec
- p = min(p, 1)
- progress_callback(p)
- progress_callback(1)
- return cdp.wait()
-
-
- def wav_to_mp3(infile: str, basepath: str, track_information, progress_callback, bitrate=256, vbr=0, quaulity=0):
- lame_parameter = {
- common.KEY_ARTIST: '--ta',
- common.KEY_ALBUM: '--tl',
- common.KEY_YEAR: '--ty',
- common.KEY_GENRE: '--tg',
- common.KEY_TRACK: '--tn',
- common.KEY_TITLE: '--tt'
- }
- lame_cmd = subprocess.getoutput("which lame")
- if not lame_cmd:
- logger.error("lame is required for encoding. You need to install it to your system.")
- else:
- outfile = track_to_targetpath(basepath, track_information, 'mp3')
- cmd = [lame_cmd, "-b", str(bitrate), "-V", str(vbr), "--vbr-old", "-q", str(quaulity), infile, outfile]
- cmd.extend(["--tc", "Encoded by lame"])
- for key in track_information:
- cmd.extend([lame_parameter[key], str(track_information[key])])
- lame = subprocess.Popen(cmd, text=True, stderr=subprocess.PIPE)
- while (out := lame.stderr.readline()) != "":
- out = out.strip()
- posb = out.find("(")
- posp = out.find("%")
- if posb >= 0 and posp >= 0:
- p = int(out[posb+1:posp]) / 100
- progress_callback(p)
- progress_callback(1)
- return lame.wait()
|