pygal/management/commands/import_userdata.py

99 righe
4.8 KiB
Python

from django.contrib.auth.models import User
from django.core.management.base import BaseCommand
import json
from pygal.models import Item, Tag
import os
from ._private import KEY_USERDATA_VERSION
from ._private import KEY_USERDATA_FAVOURITE
from ._private import KEY_USERDATA_TAGS
class Command(BaseCommand):
help = 'Import userdata from a JSON-File.'
def add_arguments(self, parser):
parser.add_argument('filename(s)', nargs='+', type=str)
def error_skipped_file(self, path, reason):
self.stderr.write(' Skipped file %s caused by an error (%s) !' % (path, reason))
def handle(self, *args, **options):
for fn in options['filename(s)']:
fn = os.path.abspath(fn)
self.stdout.write('Parsing %s:' % fn)
try:
with open(fn, 'r') as fh:
data = json.load(fh)
try:
version = data.pop(KEY_USERDATA_VERSION)
except KeyError:
version = 1
except PermissionError:
self.error_skipped_file(fn, 'file permission')
except json.decoder.JSONDecodeError:
self.error_skipped_file(fn, 'json decoding')
except FileNotFoundError:
self.error_skipped_file(fn, 'file not found')
else:
for section in data:
self.stdout.write(' Importing "%s":' % section)
cnt_all_datasets = 0
cnt_success = 0
cnt_already_exist = 0
cnt_user_missing = 0
cnt_item_missing = 0
if section == KEY_USERDATA_FAVOURITE:
for username in data[section]:
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
user = None
for rel_path in data[section][username]:
cnt_all_datasets += 1
if user is None:
cnt_user_missing += 1
try:
i = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
cnt_item_missing += 1
else:
if user in i.favourite_of.all():
cnt_already_exist += 1
else:
cnt_success += 1
i.favourite_of.add(user)
elif section == KEY_USERDATA_TAGS:
for rel_path in data[section]:
cnt_all_datasets += len(data[section][rel_path])
try:
i = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
cnt_item_missing += len(data[section][rel_path])
else:
for tag_data in data[section][rel_path]:
if len(Tag.objects.filter(item=i, text=tag_data['text'])) > 0:
cnt_already_exist += 1
else:
tag_data['item'] = i
Tag(**tag_data).save()
cnt_success += 1
else:
self.stdout.write(self.style.ERROR(' Section unknown!'))
#
# REPORT
#
if cnt_all_datasets > 0:
if (cnt_success + cnt_already_exist) < cnt_all_datasets:
style = self.style.WARNING
self.stdout.write(style(' NOT all data imported.'))
else:
style = self.style.SUCCESS
self.stdout.write(style(' All data imported.'))
self.stdout.write(style(' %5d %s(s) successfully added.' % (cnt_success, section)))
self.stdout.write(style(' %5d %s(s) already set correctly.' % (cnt_already_exist, section)))
if cnt_user_missing > 0:
self.stdout.write(self.style.ERROR(' %5d %s(s) not added, because the target user does not exist.' % (cnt_user_missing, section)))
if cnt_item_missing > 0:
self.stdout.write(self.style.ERROR(' %5d %s(s) not added, because the target item does not exist.' % (cnt_item_missing, section)))