import io from media import common from PIL import Image import subprocess import platform import logging import os import subprocess try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) def get_pil_image(media_instance): try: media_instance = media_instance._im except AttributeError: pass # if type(media_instance) == str: ft = common.get_filetype(media_instance) if ft == common.FILETYPE_IMAGE: return Image.open(media_instance) elif ft == common.FILETYPE_VIDEO: if platform.system() == 'Linux': cmd = 'ffmpeg -ss 0.5 -i "' + media_instance + '" -vframes 1 -f image2pipe pipe:1 2> /dev/null' else: cmd = 'ffmpeg -ss 0.5 -i "' + media_instance + '" -vframes 1 -f image2pipe pipe:1 2> NULL' try: data = subprocess.check_output(cmd, shell=True) except subprocess.CalledProcessError: logger.warning('ffmpeg seems to be not installed') return None ffmpeg_handle = io.BytesIO(data) im = Image.open(ffmpeg_handle) return im.copy() logger.warning('Filetype is not supported (%s)', media_instance) elif type(media_instance) == Image.Image: return media_instance.copy() else: logger.warning('Instance type is not supported: %s' % type(media_instance)) def FilenameFilter(filename: str) -> str: # WHITELIST = [os.path.sep, os.path.extsep] WHITELIST = [chr(x) for x in range(ord('0'), ord('9') + 1)] WHITELIST += [chr(x) for x in range(ord('a'), ord('z') + 1)] WHITELIST += ["ä", "ö", "ü", "ß"] # rv = "" for c in filename.lower(): rv += c if c in WHITELIST else '_' return rv def track_to_targetpath(basepath: str, track: dict, ext: str): return os.path.join( basepath, FilenameFilter(track[common.KEY_ARTIST]), "%04d_" % track[common.KEY_YEAR] + FilenameFilter(track[common.KEY_ALBUM]), "%02d_" % track[common.KEY_TRACK] + FilenameFilter(track[common.KEY_TITLE]) + "." + ext ) def disc_track_rip(track_num: int, target_file: str, progress_callback): FAC_SEC_VAL = 1224 # cdp_cmd = subprocess.getoutput("which cdparanoia") if not cdp_cmd: logger.error("cdparanoia is required for ripping. You need to install it to your system.") else: cmd = [cdp_cmd, "-e", "-X", "%d" % track_num, target_file] cdp = subprocess.Popen(cmd, text=True, stderr=subprocess.PIPE) # rval = 0 min_sec = None max_sec = None min_read = None while (out := cdp.stderr.readline()) != "": out = out.strip() # identify minimum sector if ("Ripping from sector" in out): min_sec = int(list(filter(None, out.split(" ")))[3]) # identify maximum sector if ("to sector" in out): max_sec = int(list(filter(None, out.split(" ")))[2]) # identify progress if "[read]" in out: val = int(out.split(" ")[-1]) if not min_read: min_read = val rval = max(val, rval) try: dsec = max_sec - min_sec except TypeError: logger.exception("Error while parsing cdparanoia. Start and End sector could not be detrmined.") else: p = (rval - min_read) / FAC_SEC_VAL / dsec p = min(p, 1) progress_callback(p) progress_callback(1) return cdp.wait() def wav_to_mp3(infile: str, basepath: str, track_information, progress_callback, bitrate=256, vbr=0, quaulity=0): lame_parameter = { common.KEY_ARTIST: '--ta', common.KEY_ALBUM: '--tl', common.KEY_YEAR: '--ty', common.KEY_GENRE: '--tg', common.KEY_TRACK: '--tn', common.KEY_TITLE: '--tt' } lame_cmd = subprocess.getoutput("which lame") if not lame_cmd: logger.error("lame is required for encoding. You need to install it to your system.") else: outfile = track_to_targetpath(basepath, track_information, 'mp3') cmd = [lame_cmd, "-b", str(bitrate), "-V", str(vbr), "--vbr-old", "-q", str(quaulity), infile, outfile] cmd.extend(["--tc", "Encoded by lame"]) for key in track_information: cmd.extend([lame_parameter[key], str(track_information[key])]) lame = subprocess.Popen(cmd, text=True, stderr=subprocess.PIPE) while (out := lame.stderr.readline()) != "": out = out.strip() posb = out.find("(") posp = out.find("%") if posb >= 0 and posp >= 0: p = int(out[posb+1:posp]) / 100 progress_callback(p) progress_callback(1) return lame.wait()