pygal/models.py

643 lines
24 KiB
Python
Raw Normal View History

2020-01-26 23:38:32 +01:00
import datetime
from django.contrib.auth.models import User
from django.db import models
from django.utils import formats, timezone
from django.urls import reverse
import fstools
import json
import logging
import os
from PIL import Image
import pygal
import subprocess
import time
DEBUG = False
TYPE_FOLDER = 'folder'
TYPE_IMAGE = 'image'
TYPE_VIDEO = 'video'
TYPE_AUDIO = 'audio'
TYPE_OTHER = 'other'
EXTENTIONS_IMAGE = ['.jpg', '.jpeg', '.jpe', '.png', '.tif', '.tiff', '.gif', ]
EXTENTIONS_AUDIO = ['.mp3', ]
EXTENTIONS_VIDEO = ['.avi', '.mpg', '.mpeg', '.mpe', '.mov', '.qt', '.mp4', '.webm', '.ogv', '.flv', '.3gp', ]
# Get a logger instance
logger = logging.getLogger("CACHING")
def get_item_type(full_path):
if os.path.isdir(full_path):
return TYPE_FOLDER
else:
if os.path.splitext(full_path)[1].lower() in EXTENTIONS_IMAGE:
return TYPE_IMAGE
elif os.path.splitext(full_path)[1].lower() in EXTENTIONS_VIDEO:
return TYPE_VIDEO
elif os.path.splitext(full_path)[1].lower() in EXTENTIONS_AUDIO:
return TYPE_AUDIO
return TYPE_OTHER
def supported_types():
rv = []
if pygal.show_audio():
rv.append(TYPE_AUDIO)
if pygal.show_image():
rv.append(TYPE_IMAGE)
if pygal.show_other():
rv.append(TYPE_OTHER)
if pygal.show_video():
rv.append(TYPE_VIDEO)
return rv
def get_item_by_rel_path(rel_path):
try:
rv = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
rv = None
if rv is not None:
# return the existing item
return rv
else:
# create new item, if rel_path exists in filesystem (folders needs to hold files)
full_path = pygal.get_full_path(rel_path)
if os.path.exists(full_path):
# file exists or folder has files in substructure
if get_item_type(full_path) != TYPE_FOLDER or len(fstools.filelist(full_path)) > 0:
i = Item(rel_path=rel_path)
i.save()
logger.info('New Item created: %s', repr(rel_path))
return i
def ffprobe_lines(full_path):
def _ffprobe_command(full_path):
return ['ffprobe', '-v', 'quiet', '-show_format', '-show_streams', full_path]
def _avprobe_command(full_path):
return ['avprobe', '-v', 'quiet', '-show_format', '-show_streams', full_path]
def decode(string):
for i in ['utf-8', 'cp1252']:
try:
return string.decode(i)
except UnicodeEncodeError:
pass
except UnicodeDecodeError:
pass
return string
try:
try:
text_to_be_parsed = subprocess.check_output(_avprobe_command(full_path))
except OSError:
text_to_be_parsed = subprocess.check_output(_ffprobe_command(full_path))
except subprocess.CalledProcessError:
text_to_be_parsed = ''
return decode(text_to_be_parsed).splitlines()
def is_valid_area(x1, y1, x2, y2):
for p in [x1, y1, x2, y2]:
if type(p) is not int:
return False
if (x1, y1) == (x2, y2):
return False
return True
class ItemData(dict):
DATA_FIELDS = [
'size',
'datetime',
'exposure_program',
'exposure_time',
'flash',
'f_number',
'focal_length',
'lon',
'lat',
'height',
'iso',
'camera_vendor',
'camera_model',
'orientation',
'width',
'duration',
'ratio',
'album',
'artist',
'bitrate',
'genre',
'title',
'track',
'year',
'sil',
'num_audio',
'num_folders',
'num_images',
'num_other',
'num_videos',
]
DEFAULT_VALUES = {
'size': 0,
'datetime': datetime.datetime(1900, 1, 1),
'exposure_program': '-',
'exposure_time': 0,
'flash': '-',
'f_number': 0,
'focal_length': 0,
'iso': 0,
'camera_vendor': '-',
'camera_model': '-',
'orientation': 1,
'duration': 3,
'ratio': 1,
'album': '-',
'artist': '-',
'bitrate': 0,
'genre': '-',
'title': '-',
'track': 0,
'year': 0,
'num_audio': 0,
'num_folders': 0,
'num_images': 0,
'num_other': 0,
'num_videos': 0,
}
def __init__(self, item):
for key in self.DATA_FIELDS:
value = getattr(item, key + '_c')
if value is not None:
self[key] = value
setattr(self, key, value)
else:
if key in self.DEFAULT_VALUES:
setattr(self, key, self.DEFAULT_VALUES[key])
@property
def formatted_datetime(self):
try:
return formats.date_format(self.get('datetime'), format="SHORT_DATE_FORMAT", use_l10n=True) + ' - ' + formats.time_format(self.get('datetime'), use_l10n=True)
except AttributeError:
return 'No Datetime available'
@property
def gps(self):
if self.get('lon') and self.get('lat'):
return {'lon': self.get('lon'), 'lat': self.get('lat')}
else:
return None
class Item(models.Model):
DATA_VERSION_NUMBER = 0
#
rel_path = models.TextField(unique=True)
type = models.CharField(max_length=25, choices=((TYPE_AUDIO, 'Audio'), (TYPE_FOLDER, 'Folder'), (TYPE_IMAGE, 'Image'), (TYPE_OTHER, 'Other'), (TYPE_VIDEO, 'Video')))
public_access = models.BooleanField(default=False)
read_access = models.ManyToManyField(User, related_name="read_access", blank=True)
modify_access = models.ManyToManyField(User, related_name="modify_access", blank=True)
favourite_of = models.ManyToManyField(User, related_name="favourite_of", blank=True)
#
uid_c = models.CharField(max_length=50, null=True, blank=True)
settings_c = models.IntegerField(null=True, blank=True)
data_version_c = models.IntegerField(null=True, blank=True)
# common
size_c = models.IntegerField(null=True, blank=True)
datetime_c = models.DateTimeField(null=True, blank=True)
# image
exposure_program_c = models.CharField(max_length=100, null=True, blank=True)
exposure_time_c = models.FloatField(null=True, blank=True)
flash_c = models.CharField(max_length=100, null=True, blank=True)
f_number_c = models.FloatField(null=True, blank=True)
focal_length_c = models.FloatField(null=True, blank=True)
lon_c = models.FloatField(null=True, blank=True)
lat_c = models.FloatField(null=True, blank=True)
height_c = models.IntegerField(null=True, blank=True)
iso_c = models.IntegerField(null=True, blank=True)
camera_vendor_c = models.CharField(max_length=100, null=True, blank=True)
camera_model_c = models.CharField(max_length=100, null=True, blank=True)
orientation_c = models.IntegerField(null=True, blank=True)
width_c = models.IntegerField(null=True, blank=True)
# video
duration_c = models.FloatField(null=True, blank=True)
ratio_c = models.FloatField(null=True, blank=True)
# audio
album_c = models.CharField(max_length=100, null=True, blank=True)
artist_c = models.CharField(max_length=100, null=True, blank=True)
bitrate_c = models.IntegerField(null=True, blank=True)
genre_c = models.CharField(max_length=100, null=True, blank=True)
title_c = models.CharField(max_length=100, null=True, blank=True)
track_c = models.IntegerField(null=True, blank=True)
year_c = models.IntegerField(null=True, blank=True)
# folder
num_audio_c = models.IntegerField(null=True, blank=True)
num_folders_c = models.IntegerField(null=True, blank=True)
num_images_c = models.IntegerField(null=True, blank=True)
num_other_c = models.IntegerField(null=True, blank=True)
num_videos_c = models.IntegerField(null=True, blank=True)
sil_c = models.TextField(null=True, blank=True)
def __init__(self, *args, **kwargs):
self.__current_uid__ = None
self.__current_settings__ = None
models.Model.__init__(self, *args, **kwargs)
@property
def name(self):
return os.path.splitext(os.path.basename(self.rel_path))[0]
@property
def item_data(self):
if self.__cache_update_needed__():
#
self.__set_model_fields_from_file__()
#
self.save()
return self.cached_item_data
@property
def cached_item_data(self):
return ItemData(self)
def get_admin_url(self):
"""the url to the Django admin interface for the model instance"""
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
def suspended(self, user):
if pygal.suspend_public() and not user.is_authenticated:
return True
def may_read(self, user):
if self.suspended(user):
return False
elif self.type == TYPE_FOLDER:
return True
else:
parent = get_item_by_rel_path(os.path.dirname(self.rel_path))
if parent.public_access is True:
return True
if user is None:
return False
if not user.is_authenticated:
return False
if user.is_superuser:
return True
return user in parent.read_access.all()
def may_modify(self, user):
if self.suspended(user):
return False
elif self.type == TYPE_FOLDER:
return user in self.modify_access.all()
else:
if user is None:
return False
if not user.is_authenticated:
return False
if user.is_superuser:
return True
parent = get_item_by_rel_path(os.path.dirname(self.rel_path))
return user in parent.modify_access.all()
def sort_string(self):
try:
tm = int(self.item_data.datetime.strftime('%s'))
except AttributeError:
raise AttributeError('Unable to create a sortstring for %s. Used datetime was: %s' % (str(self), repr(self.item_data.datetime)))
return '%012d_%s' % (tm, os.path.basename(self.rel_path))
def sorted_itemlist(self):
if self.type == TYPE_FOLDER:
rv = []
for rel_path in json.loads(self.item_data['sil']):
try:
rv.append(Item.objects.get(rel_path=rel_path))
except Item.DoesNotExist:
raise Item.DoesNotExist("%s: Item with rel_path=%s does not exist. in %s." % (str(self), rel_path))
return rv
else:
return None
def thumbnail_item(self):
if self.type == TYPE_FOLDER:
try:
first_rel_path = json.loads(self.item_data['sil'])[0]
return Item.objects.get(rel_path=first_rel_path).thumbnail_item()
except IndexError:
raise Item.DoesNotExist("%s: Tried to get the thumbnail_item for an empty list." % str(self))
except Item.DoesNotExist:
raise Item.DoesNotExist("%s: Item with rel_path=%s does not exist. in %s." % (str(self), first_rel_path))
else:
return self
def all_itemslist(self):
rv = []
for i in self.sorted_itemlist():
if i.type != TYPE_FOLDER:
rv.append(i)
else:
rv.extend(i.all_itemslist())
return rv
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
if self.__cache_update_needed__():
self.__set_model_fields_from_file__()
return models.Model.save(self, force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields)
def current_uid(self):
if self.__current_uid__ is None:
self.__current_uid__ = fstools.uid(pygal.get_full_path(self.rel_path), None)
return self.__current_uid__
def current_settings(self):
if self.__current_settings__ is None:
self.__current_settings__ = pygal.show_audio() * 1 + pygal.show_image() * 2 + pygal.show_other() * 3 + pygal.show_video() * 4
return self.__current_settings__
def __cache_update_needed__(self):
if self.type == TYPE_FOLDER:
return DEBUG or self.settings_c != self.current_settings() or self.uid_c != self.current_uid() or self.data_version_c != self.DATA_VERSION_NUMBER
else:
return DEBUG or self.uid_c != self.current_uid() or self.data_version_c != self.DATA_VERSION_NUMBER
def __set_model_fields_from_file__(self):
logger.info('Updating cached data for Item: %s' % repr(self.rel_path))
full_path = pygal.get_full_path(self.rel_path)
self.type = get_item_type(full_path)
self.uid_c = self.current_uid()
self.settings_c = self.current_settings()
self.data_version_c = self.DATA_VERSION_NUMBER
if self.type == TYPE_AUDIO:
self.__update_audio_file_data__(full_path)
elif self.type == TYPE_FOLDER:
self.__update_folder_file_data__(full_path)
elif self.type == TYPE_IMAGE:
self.__update_image_file_data__(full_path)
elif self.type == TYPE_OTHER:
self.__update_other_file_data__(full_path)
elif self.type == TYPE_VIDEO:
self.__update_video_file_data__(full_path)
for key, value in self.cached_item_data.items():
logger.debug(' - Adding %s=%s', key, repr(value))
def __update_audio_file_data__(self, full_path):
self.size_c = os.path.getsize(full_path)
#
tag_type_target_dict = {}
tag_type_target_dict['album'] = (str, 'album')
tag_type_target_dict['TAG:album'] = (str, 'album')
tag_type_target_dict['TAG:artist'] = (str, 'artist')
tag_type_target_dict['artist'] = (str, 'artist')
tag_type_target_dict['bit_rate'] = (self.__int_conv__, 'bitrate')
tag_type_target_dict['duration'] = (float, 'duration')
tag_type_target_dict['TAG:genre'] = (str, 'genre')
tag_type_target_dict['genre'] = (str, 'genre')
tag_type_target_dict['TAG:title'] = (str, 'title')
tag_type_target_dict['title'] = (str, 'title')
tag_type_target_dict['TAG:track'] = (self.__int_conv__, 'track')
tag_type_target_dict['track'] = (self.__int_conv__, 'track')
tag_type_target_dict['TAG:date'] = (self.__int_conv__, 'year')
tag_type_target_dict['date'] = (self.__int_conv__, 'year')
for line in ffprobe_lines(full_path):
try:
key, val = [snippet.strip() for snippet in line.split('=')]
except ValueError:
continue
else:
if key in tag_type_target_dict:
tp, name = tag_type_target_dict[key]
try:
setattr(self, name + '_c', tp(val))
except ValueError:
logger.log(logging.WARNING if val else logger.INFO, 'Can\'t convert %s (%s) for %s', repr(val), name, self.name)
#
if self.year_c is not None and self.track_c is not None:
self.datetime_c = datetime.datetime(max(1971, self.year_c), 1, 1, 12, 0, (60 - self.track_c) % 60, tzinfo=timezone.utc)
def __update_folder_file_data__(self, full_path):
sil = []
self.size_c = 0
self.num_audio_c = 0
self.num_folders_c = 1
self.num_images_c = 0
self.num_other_c = 0
self.num_videos_c = 0
for fn in os.listdir(full_path):
sub_rel_path = pygal.get_rel_path(os.path.join(full_path, fn))
sub_item = get_item_by_rel_path(sub_rel_path)
# size, num_*
if sub_item is not None: # Item does really exist / has sub-items
if (sub_item.type == TYPE_FOLDER and len(json.loads(sub_item.item_data['sil']))) or sub_item.type in supported_types():
self.size_c += sub_item.item_data.size
if sub_item.type == TYPE_AUDIO:
self.num_audio_c += 1
elif sub_item.type == TYPE_FOLDER:
self.num_audio_c += sub_item.item_data['num_audio']
self.num_folders_c += sub_item.item_data['num_folders']
self.num_images_c += sub_item.item_data['num_images']
self.num_other_c += sub_item.item_data['num_other']
self.num_videos_c += sub_item.item_data['num_videos']
elif sub_item.type == TYPE_IMAGE:
self.num_images_c += 1
elif sub_item.type == TYPE_OTHER:
self.num_other_c += 1
elif sub_item.type == TYPE_VIDEO:
self.num_videos_c += 1
# sorted item list
sil.append(sub_item)
sil.sort(key=lambda entry: entry.sort_string(), reverse=True)
self.sil_c = json.dumps([i.rel_path for i in sil], indent=4)
# datetime
if len(sil) > 0:
self.datetime_c = sil[0].datetime_c
def __update_image_file_data__(self, full_path):
self.size_c = os.path.getsize(full_path)
#
tag_type_target_dict = {}
tag_type_target_dict[0x9003] = (self.__datetime_conv__, 'datetime')
tag_type_target_dict[0x8822] = (self.__exposure_program_conv__, 'exposure_program')
tag_type_target_dict[0x829A] = (self.__num_denum_conv__, 'exposure_time')
tag_type_target_dict[0x9209] = (self.__flash_conv__, 'flash')
tag_type_target_dict[0x829D] = (self.__num_denum_conv__, 'f_number')
tag_type_target_dict[0x920A] = (self.__num_denum_conv__, 'focal_length')
tag_type_target_dict[0x8825] = (self.__gps_conv__, ('lon', 'lat'))
tag_type_target_dict[0xA003] = (self.__int_conv__, 'height')
tag_type_target_dict[0x8827] = (self.__int_conv__, 'iso')
tag_type_target_dict[0x010F] = (str, 'camera_vendor')
tag_type_target_dict[0x0110] = (str, 'camera_model')
tag_type_target_dict[0x0112] = (self.__int_conv__, 'orientation')
tag_type_target_dict[0xA002] = (self.__int_conv__, 'width')
im = Image.open(full_path)
try:
exif = dict(im._getexif().items())
except AttributeError:
logger.debug('%s does not have any exif information', full_path)
else:
for key in tag_type_target_dict:
if key in exif:
tp, name = tag_type_target_dict[key]
if type(name) is tuple:
data = tp(exif[key]) or (None, None)
for name, val in zip(name, data):
setattr(self, name + '_c', val)
else:
setattr(self, name + '_c', tp(exif[key]))
def __update_other_file_data__(self, full_path):
self.size_c = os.path.getsize(full_path)
#
self.datetime_c = datetime.datetime.fromtimestamp(os.path.getctime(full_path), tz=timezone.utc)
def __update_video_file_data__(self, full_path):
self.size_c = os.path.getsize(full_path)
#
tag_type_target_dict = {}
tag_type_target_dict['creation_time'] = (self.__vid_datetime_conv__, 'datetime')
tag_type_target_dict['TAG:creation_time'] = (self.__vid_datetime_conv__, 'datetime')
tag_type_target_dict['duration'] = (float, 'duration')
tag_type_target_dict['height'] = (self.__int_conv__, 'height')
tag_type_target_dict['width'] = (self.__int_conv__, 'width')
tag_type_target_dict['display_aspect_ratio'] = (self.__ratio_conv__, 'ratio')
for line in ffprobe_lines(full_path):
try:
key, val = [snippet.strip() for snippet in line.split('=')]
except ValueError:
continue
else:
if key in tag_type_target_dict:
tp, name = tag_type_target_dict[key]
try:
setattr(self, name + '_c', tp(val))
except ValueError:
logger.log(logging.WARNING if val else logger.INFO, 'Can\'t convert %s (%s) for %s', repr(val), name, self.name)
def __datetime_conv__(self, dt):
format_string = "%Y:%m:%d %H:%M:%S%z"
return datetime.datetime.strptime(dt + '+0000', format_string)
def __exposure_program_conv__(self, n):
return {
0: 'Unidentified',
1: 'Manual',
2: 'Program Normal',
3: 'Aperture Priority',
4: 'Shutter Priority',
5: 'Program Creative',
6: 'Program Action',
7: 'Portrait Mode',
8: 'Landscape Mode'
}.get(n, '-')
def __flash_conv__(self, n):
return {
0: 'No',
1: 'Fired',
5: 'Fired (?)', # no return sensed
7: 'Fired (!)', # return sensed
9: 'Fill Fired',
13: 'Fill Fired (?)',
15: 'Fill Fired (!)',
16: 'Off',
24: 'Auto Off',
25: 'Auto Fired',
29: 'Auto Fired (?)',
31: 'Auto Fired (!)',
32: 'Not Available'
}.get(n, '-')
def __int_conv__(self, value):
try:
return int(value)
except ValueError:
for c in ['.', '/', '-']:
p = value.find(c)
if p >= 0:
value = value[:p]
if value == '':
return 0
return int(value)
def __num_denum_conv__(self, data):
num, denum = data
return num / denum
def __gps_conv__(self, data):
def lat_lon_cal(lon_or_lat):
lon_lat = 0.
fac = 1.
for num, denum in lon_or_lat:
lon_lat += float(num) / float(denum) * fac
fac *= 1. / 60.
return lon_lat
try:
lon = lat_lon_cal(data[0x0004])
lat = lat_lon_cal(data[0x0002])
if lon != 0 or lat != 0: # do not use lon and lat equal 0, caused by motorola gps weakness
return lon, lat
except KeyError:
logger.warning('GPS data extraction failed for %s: %s', self.name, repr(data))
return None
def __vid_datetime_conv__(self, dt):
try:
dt = dt[:dt.index('.')]
except ValueError:
pass # time string seems to have no '.'
dt = dt.replace('T', ' ').replace('/', '').replace('\\', '')
if len(dt) == 16:
dt += ':00'
dt += '+0000'
format_string = '%Y-%m-%d %H:%M:%S%z'
return datetime.datetime.strptime(dt, format_string)
def __ratio_conv__(self, ratio):
ratio = ratio.replace('\\', '')
num, denum = ratio.split(':')
return float(num) / float(denum)
def __str__(self):
return 'Item: %s' % self.rel_path
class Tag(models.Model):
item = models.ForeignKey(Item, on_delete=models.CASCADE)
text = models.CharField(max_length=100)
topleft_x = models.IntegerField(null=True, blank=True)
topleft_y = models.IntegerField(null=True, blank=True)
bottomright_x = models.IntegerField(null=True, blank=True)
bottomright_y = models.IntegerField(null=True, blank=True)
def __init__(self, *args, **kwargs):
self.__tm_start__ = time.time()
models.Model.__init__(self, *args, **kwargs)
logger.log(5, 'Initialising Tag Model object in %.02fs: %s', time.time() - self.__tm_start__, str(self))
@property
def has_valid_coordinates(self):
return is_valid_area(self.topleft_x, self.topleft_y, self.bottomright_x, self.bottomright_y)
def get_admin_url(self):
"""the url to the Django admin interface for the model instance"""
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
class Setting(models.Model):
suspend_puplic = models.BooleanField(default=True)
show_image = models.BooleanField(default=True)
show_video = models.BooleanField(default=True)
show_audio = models.BooleanField(default=False)
show_other = models.BooleanField(default=False)