2020-01-30 22:07:50 +01:00
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
from PIL import Image
|
|
|
|
import subprocess
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
|
|
logger_name = 'MEDIA'
|
|
|
|
logger = logging.getLogger(logger_name)
|
|
|
|
|
|
|
|
FILETYPE_AUDIO = 'audio'
|
|
|
|
FILETYPE_IMAGE = 'image'
|
|
|
|
FILETYPE_VIDEO = 'video'
|
|
|
|
|
|
|
|
EXTENTIONS_AUDIO = ['.mp3', ]
|
|
|
|
EXTENTIONS_IMAGE = ['.jpg', '.jpeg', '.jpe', '.png', '.tif', '.tiff', '.gif', ]
|
|
|
|
EXTENTIONS_VIDEO = ['.avi', '.mpg', '.mpeg', '.mpe', '.mov', '.qt', '.mp4', '.webm', '.ogv', '.flv', '.3gp', ]
|
|
|
|
|
|
|
|
KEY_ALBUM = 'album'
|
|
|
|
KEY_APERTURE = 'aperture'
|
|
|
|
KEY_ARTIST = 'artist'
|
|
|
|
KEY_BITRATE = 'bitrate'
|
|
|
|
KEY_CAMERA = 'camera'
|
|
|
|
KEY_DURATION = 'duration'
|
|
|
|
KEY_EXPOSURE_PROGRAM = 'exposure_program'
|
|
|
|
KEY_EXPOSURE_TIME = 'exposure_time'
|
|
|
|
KEY_FLASH = 'flash'
|
|
|
|
KEY_FOCAL_LENGTH = 'focal_length'
|
|
|
|
KEY_GENRE = 'genre'
|
|
|
|
KEY_GPS = 'gps'
|
|
|
|
KEY_HEIGHT = 'height'
|
|
|
|
KEY_ISO = 'iso'
|
|
|
|
KEY_ORIENTATION = 'orientation'
|
|
|
|
KEY_RATIO = 'ratio'
|
|
|
|
KEY_SIZE = 'size'
|
|
|
|
KEY_TIME = 'time' # USE time.localtime(value) or datetime.fromtimestamp(value) to convert the timestamp
|
|
|
|
KEY_TIME_IS_SUBSTITUTION = 'tm_is_subst'
|
|
|
|
KEY_TITLE = 'title'
|
|
|
|
KEY_TRACK = 'track'
|
|
|
|
KEY_WIDTH = 'width'
|
|
|
|
KEY_YEAR = 'year'
|
|
|
|
|
|
|
|
__KEY_CAMERA_VENDOR__ = 'camera_vendor'
|
|
|
|
__KEY_CAMERA_MODEL__ = 'camera_model'
|
|
|
|
|
|
|
|
|
|
|
|
def get_filetype(full_path):
|
2020-01-30 23:37:29 +01:00
|
|
|
ext = os.path.splitext(full_path.lower())[1]
|
2020-01-30 22:07:50 +01:00
|
|
|
if ext in EXTENTIONS_AUDIO:
|
|
|
|
return FILETYPE_AUDIO
|
|
|
|
elif ext in EXTENTIONS_IMAGE:
|
|
|
|
return FILETYPE_IMAGE
|
|
|
|
elif ext in EXTENTIONS_VIDEO:
|
|
|
|
return FILETYPE_VIDEO
|
|
|
|
|
|
|
|
|
|
|
|
def get_audio_data(full_path):
|
|
|
|
conv_key_dict = {}
|
|
|
|
conv_key_dict['album'] = (str, KEY_ALBUM)
|
|
|
|
conv_key_dict['TAG:album'] = (str, KEY_ALBUM)
|
|
|
|
conv_key_dict['TAG:artist'] = (str, KEY_ARTIST)
|
|
|
|
conv_key_dict['artist'] = (str, KEY_ARTIST)
|
|
|
|
conv_key_dict['bit_rate'] = (__int_conv__, KEY_BITRATE)
|
|
|
|
conv_key_dict['duration'] = (float, KEY_DURATION)
|
|
|
|
conv_key_dict['TAG:genre'] = (str, KEY_GENRE)
|
|
|
|
conv_key_dict['genre'] = (str, KEY_GENRE)
|
|
|
|
conv_key_dict['TAG:title'] = (str, KEY_TITLE)
|
|
|
|
conv_key_dict['title'] = (str, KEY_TITLE)
|
|
|
|
conv_key_dict['TAG:track'] = (__int_conv__, KEY_TRACK)
|
|
|
|
conv_key_dict['track'] = (__int_conv__, KEY_TRACK)
|
|
|
|
conv_key_dict['TAG:date'] = (__int_conv__, KEY_YEAR)
|
|
|
|
conv_key_dict['date'] = (__int_conv__, KEY_YEAR)
|
|
|
|
return __adapt__data__(__get_xxprobe_data__(full_path, conv_key_dict), full_path)
|
|
|
|
|
|
|
|
|
|
|
|
def get_video_data(full_path):
|
|
|
|
conv_key_dict = {}
|
|
|
|
conv_key_dict['creation_time'] = (__vid_datetime_conv__, KEY_TIME)
|
|
|
|
conv_key_dict['TAG:creation_time'] = (__vid_datetime_conv__, KEY_TIME)
|
|
|
|
conv_key_dict['bit_rate'] = (__int_conv__, KEY_BITRATE)
|
|
|
|
conv_key_dict['duration'] = (float, KEY_DURATION)
|
|
|
|
conv_key_dict['height'] = (__int_conv__, KEY_HEIGHT)
|
|
|
|
conv_key_dict['width'] = (__int_conv__, KEY_WIDTH)
|
|
|
|
conv_key_dict['display_aspect_ratio'] = (__ratio_conv__, KEY_RATIO)
|
|
|
|
return __adapt__data__(__get_xxprobe_data__(full_path, conv_key_dict), full_path)
|
|
|
|
|
|
|
|
|
|
|
|
def get_image_data(full_path):
|
|
|
|
return __adapt__data__(__get_exif_data__(full_path), full_path)
|
|
|
|
|
|
|
|
|
|
|
|
def __adapt__data__(data, full_path):
|
|
|
|
data[KEY_SIZE] = os.path.getsize(full_path)
|
|
|
|
# Join Camera Vendor and Camera Model
|
|
|
|
if __KEY_CAMERA_MODEL__ in data and __KEY_CAMERA_VENDOR__ in data:
|
|
|
|
model = data.pop(__KEY_CAMERA_MODEL__)
|
|
|
|
vendor = data.pop(__KEY_CAMERA_VENDOR__)
|
|
|
|
data[KEY_CAMERA] = '%s: %s' % (vendor, model)
|
|
|
|
# Add time if not exists
|
|
|
|
if KEY_TIME not in data:
|
|
|
|
if KEY_YEAR in data and KEY_TRACK in data:
|
2020-01-31 08:12:39 +01:00
|
|
|
if data[KEY_YEAR] != 0: # ignore year 0 - must be wrong
|
|
|
|
# Use a date where track 1 is the newest in the given year
|
|
|
|
minute = int(data[KEY_TRACK] / 60)
|
|
|
|
second = (data[KEY_TRACK] - 60 * minute) % 60
|
|
|
|
#
|
|
|
|
data[KEY_TIME] = int(time.mktime((data[KEY_YEAR], 1, 1, 0, 59 - minute, 59 - second, 0, 0, 0)))
|
|
|
|
data[KEY_TIME_IS_SUBSTITUTION] = True
|
2020-01-30 22:07:50 +01:00
|
|
|
else:
|
|
|
|
data[KEY_TIME] = int(os.path.getmtime(full_path))
|
|
|
|
data[KEY_TIME_IS_SUBSTITUTION] = True
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
def __get_xxprobe_data__(full_path, conv_key_dict):
|
|
|
|
def _ffprobe_command(full_path):
|
|
|
|
return ['ffprobe', '-v', 'quiet', '-show_format', '-show_streams', full_path]
|
|
|
|
|
|
|
|
def _avprobe_command(full_path):
|
|
|
|
return ['avprobe', '-v', 'quiet', '-show_format', '-show_streams', full_path]
|
|
|
|
|
|
|
|
try:
|
|
|
|
xxprobe_text = subprocess.check_output(_avprobe_command(full_path))
|
|
|
|
except FileNotFoundError:
|
|
|
|
try:
|
|
|
|
xxprobe_text = subprocess.check_output(_ffprobe_command(full_path))
|
|
|
|
except FileNotFoundError:
|
|
|
|
logger.warning('ffprobe and avprobe seem to be not installed')
|
|
|
|
return {}
|
|
|
|
#
|
|
|
|
rv = {}
|
|
|
|
for line in xxprobe_text.decode('utf-8').splitlines():
|
|
|
|
try:
|
|
|
|
key, val = [snippet.strip() for snippet in line.split('=')]
|
|
|
|
except ValueError:
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
if key in conv_key_dict:
|
|
|
|
tp, name = conv_key_dict[key]
|
|
|
|
try:
|
|
|
|
rv[name] = tp(val)
|
|
|
|
except ValueError:
|
|
|
|
logger.log(logging.WARNING if val else logger.INFO, 'Can\'t convert %s (%s) for %s', repr(val), name, name)
|
|
|
|
return rv
|
|
|
|
|
|
|
|
|
|
|
|
def __get_exif_data__(full_path):
|
|
|
|
rv = {}
|
|
|
|
im = Image.open(full_path)
|
|
|
|
try:
|
|
|
|
exif = dict(im._getexif().items())
|
|
|
|
except AttributeError:
|
|
|
|
logger.debug('%s does not have any exif information', full_path)
|
|
|
|
else:
|
|
|
|
conv_key_dict = {}
|
|
|
|
# IMAGE
|
|
|
|
conv_key_dict[0x9003] = (__datetime_conv__, KEY_TIME)
|
|
|
|
conv_key_dict[0x8822] = (__exposure_program_conv__, KEY_EXPOSURE_PROGRAM)
|
|
|
|
conv_key_dict[0x829A] = (__num_denum_conv__, KEY_EXPOSURE_TIME)
|
|
|
|
conv_key_dict[0x9209] = (__flash_conv__, KEY_FLASH)
|
|
|
|
conv_key_dict[0x829D] = (__num_denum_conv__, KEY_APERTURE)
|
|
|
|
conv_key_dict[0x920A] = (__num_denum_conv__, KEY_FOCAL_LENGTH)
|
|
|
|
conv_key_dict[0x8825] = (__gps_conv__, KEY_GPS)
|
|
|
|
conv_key_dict[0xA003] = (__int_conv__, KEY_HEIGHT)
|
|
|
|
conv_key_dict[0x8827] = (__int_conv__, KEY_ISO)
|
|
|
|
conv_key_dict[0x010F] = (str, __KEY_CAMERA_VENDOR__)
|
|
|
|
conv_key_dict[0x0110] = (str, __KEY_CAMERA_MODEL__)
|
|
|
|
conv_key_dict[0x0112] = (__int_conv__, KEY_ORIENTATION)
|
|
|
|
conv_key_dict[0xA002] = (__int_conv__, KEY_WIDTH)
|
|
|
|
for key in conv_key_dict:
|
|
|
|
if key in exif:
|
|
|
|
tp, name = conv_key_dict[key]
|
|
|
|
value = tp(exif[key])
|
|
|
|
if value is not None:
|
|
|
|
rv[name] = value
|
|
|
|
return rv
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: Join datetime converter __datetime_conv__ and __vid_datetime_conv_
|
|
|
|
def __datetime_conv__(dt):
|
|
|
|
format_string = "%Y:%m:%d %H:%M:%S"
|
|
|
|
return int(time.mktime(time.strptime(dt, format_string)))
|
|
|
|
|
|
|
|
|
|
|
|
def __vid_datetime_conv__(dt):
|
|
|
|
try:
|
|
|
|
dt = dt[:dt.index('.')]
|
|
|
|
except ValueError:
|
|
|
|
pass # time string seems to have no '.'
|
|
|
|
dt = dt.replace('T', ' ').replace('/', '').replace('\\', '')
|
|
|
|
if len(dt) == 16:
|
|
|
|
dt += ':00'
|
|
|
|
format_string = '%Y-%m-%d %H:%M:%S'
|
|
|
|
return int(time.mktime(time.strptime(dt, format_string)))
|
|
|
|
|
|
|
|
|
|
|
|
def __exposure_program_conv__(n):
|
|
|
|
return {
|
|
|
|
0: 'Unidentified',
|
|
|
|
1: 'Manual',
|
|
|
|
2: 'Program Normal',
|
|
|
|
3: 'Aperture Priority',
|
|
|
|
4: 'Shutter Priority',
|
|
|
|
5: 'Program Creative',
|
|
|
|
6: 'Program Action',
|
|
|
|
7: 'Portrait Mode',
|
|
|
|
8: 'Landscape Mode'
|
|
|
|
}.get(n, None)
|
|
|
|
|
|
|
|
|
|
|
|
def __flash_conv__(n):
|
|
|
|
return {
|
|
|
|
0: 'No',
|
|
|
|
1: 'Fired',
|
|
|
|
5: 'Fired (?)', # no return sensed
|
|
|
|
7: 'Fired (!)', # return sensed
|
|
|
|
9: 'Fill Fired',
|
|
|
|
13: 'Fill Fired (?)',
|
|
|
|
15: 'Fill Fired (!)',
|
|
|
|
16: 'Off',
|
|
|
|
24: 'Auto Off',
|
|
|
|
25: 'Auto Fired',
|
|
|
|
29: 'Auto Fired (?)',
|
|
|
|
31: 'Auto Fired (!)',
|
|
|
|
32: 'Not Available'
|
|
|
|
}.get(n, None)
|
|
|
|
|
|
|
|
|
|
|
|
def __int_conv__(value):
|
|
|
|
try:
|
|
|
|
return int(value)
|
|
|
|
except ValueError:
|
|
|
|
for c in ['.', '/', '-']:
|
|
|
|
p = value.find(c)
|
|
|
|
if p >= 0:
|
|
|
|
value = value[:p]
|
|
|
|
return int(value)
|
|
|
|
|
|
|
|
|
|
|
|
def __num_denum_conv__(data):
|
|
|
|
num, denum = data
|
|
|
|
return num / denum
|
|
|
|
|
|
|
|
|
|
|
|
def __gps_conv__(data):
|
|
|
|
def lat_lon_cal(lon_or_lat):
|
|
|
|
lon_lat = 0.
|
|
|
|
fac = 1.
|
|
|
|
for num, denum in lon_or_lat:
|
|
|
|
lon_lat += float(num) / float(denum) * fac
|
|
|
|
fac *= 1. / 60.
|
|
|
|
return lon_lat
|
|
|
|
try:
|
|
|
|
lon = lat_lon_cal(data[0x0004])
|
|
|
|
lat = lat_lon_cal(data[0x0002])
|
|
|
|
if lon != 0 or lat != 0: # do not use lon and lat equal 0, caused by motorola gps weakness
|
|
|
|
return {'lon': lon, 'lat': lat}
|
|
|
|
except KeyError:
|
|
|
|
logger.warning('GPS data extraction failed for %s', repr(data))
|
|
|
|
|
|
|
|
|
|
|
|
def __ratio_conv__(ratio):
|
|
|
|
ratio = ratio.replace('\\', '')
|
|
|
|
num, denum = ratio.split(':')
|
|
|
|
return float(num) / float(denum)
|