pygal/management/commands/import_userdata.py

213 行
11 KiB
Python

from django.contrib.auth.models import User
from django.core.management.base import BaseCommand
import json
from pygal.models import Item, Tag, TagExist
from themes.models import GetSettings, SettingExist, BottomBar, BottomBarExist
import os
from ._private import KEY_USERDATA_FAVOURITE
from ._private import KEY_USERDATA_TAGS
from ._private import KEY_USER_PROFILE
from ._private import KEY_THEME_SETTINGS
from ._private import KEY_THEME_BOTTOMBAR
from users.models import UserProfile, UserprofilerExist
from pygal.management.commands._private import KEY_ACCESS_DATA
class Command(BaseCommand):
help = 'Import userdata from a JSON-File.'
def add_arguments(self, parser):
parser.add_argument('filename(s)', nargs='+', type=str)
def error_skipped_file(self, path, reason):
self.stderr.write(' Skipped file %s caused by an error (%s) !' % (path, reason))
def handle(self, *args, **options):
for fn in options['filename(s)']:
fn = os.path.abspath(fn)
self.stdout.write('Parsing %s:' % fn)
try:
with open(fn, 'r') as fh:
data = json.load(fh)
except PermissionError:
self.error_skipped_file(fn, 'file permission')
except json.decoder.JSONDecodeError:
self.error_skipped_file(fn, 'json decoding')
except FileNotFoundError:
self.error_skipped_file(fn, 'file not found')
else:
for section in data:
self.stdout.write(' Importing "%s":' % section)
cnt_all_datasets = 0
cnt_success = 0
cnt_already_exist = 0
cnt_user_missing = 0
missing_users = []
cnt_item_missing = 0
#
# Access
#
if section == KEY_ACCESS_DATA:
for username in data[section]:
if username == '':
# public_access
for rel_path in data[section][username]:
cnt_all_datasets += 1
try:
item = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
cnt_item_missing += 1
else:
if item.public_access:
cnt_already_exist += 1
else:
item.public_access = True
item.save()
cnt_success += 1
else:
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
user = None
# read_access
for rel_path in data[section][username]['read_access']:
cnt_all_datasets += 1
if user is None:
cnt_user_missing += 1
if username not in missing_users:
missing_users.append(username)
else:
try:
item = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
cnt_item_missing += 1
else:
if user in item.read_access.all():
cnt_already_exist += 1
else:
item.read_access.add(user)
item.save()
cnt_success += 1
# modify_access
for rel_path in data[section][username]['modify_access']:
cnt_all_datasets += 1
if user is None:
cnt_user_missing += 1
if username not in missing_users:
missing_users.append(username)
else:
try:
item = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
cnt_item_missing += 1
else:
if user in item.modify_access.all():
cnt_already_exist += 1
else:
item.modify_access.add(user)
item.save()
cnt_success += 1
#
# Favourites
#
elif section == KEY_USERDATA_FAVOURITE:
for username in data[section]:
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
user = None
for rel_path in data[section][username]:
cnt_all_datasets += 1
if user is None:
cnt_user_missing += 1
if username not in missing_users:
missing_users.append(username)
try:
i = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
cnt_item_missing += 1
else:
if user in i.favourite_of.all():
cnt_already_exist += 1
elif user is not None:
cnt_success += 1
i.favourite_of.add(user)
#
# Tags
#
elif section == KEY_USERDATA_TAGS:
for rel_path in data[section]:
cnt_all_datasets += len(data[section][rel_path])
try:
i = Item.objects.get(rel_path=rel_path)
except Item.DoesNotExist:
cnt_item_missing += len(data[section][rel_path])
else:
for tag_data in data[section][rel_path]:
if TagExist(i, tag_data):
cnt_already_exist += 1
else:
tag_data['item'] = i
Tag(**tag_data).save()
cnt_success += 1
#
# Settings
#
elif section == KEY_THEME_SETTINGS:
cnt_all_datasets = 1
if SettingExist(data[section]):
cnt_already_exist += 1
else:
s = GetSettings()
s.import_data(data[section])
cnt_success += 1
#
# Bottombar
#
elif section == KEY_THEME_BOTTOMBAR:
cnt_all_datasets = len(data[section])
for bb_entry_import in data[section]:
if BottomBarExist(bb_entry_import):
cnt_already_exist += 1
else:
bb_entry = BottomBar()
bb_entry.import_data(bb_entry_import)
cnt_success += 1
#
# Userprofile
#
elif section == KEY_USER_PROFILE:
cnt_all_datasets = len(data[section])
for username in data[section]:
if UserprofilerExist(username, data[section][username]):
cnt_already_exist += 1
else:
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
cnt_user_missing += 1
missing_users.append(username)
else:
profile = UserProfile(user=user)
profile.import_data(data[section][username])
cnt_success += 1
else:
self.stdout.write(self.style.ERROR(' Section unknown!'))
#
# REPORT
#
if cnt_all_datasets > 0:
if (cnt_success + cnt_already_exist) < cnt_all_datasets:
style = self.style.WARNING
self.stdout.write(style(' NOT all data imported.'))
else:
style = self.style.SUCCESS
self.stdout.write(style(' All data imported.'))
self.stdout.write(style(' %5d %s(s) successfully added.' % (cnt_success, section)))
self.stdout.write(style(' %5d %s(s) already set correctly.' % (cnt_already_exist, section)))
if cnt_user_missing > 0:
self.stdout.write(self.style.ERROR(' %5d %s(s) not added, because the target user(s) %s do(es) not exist.' % (cnt_user_missing, section, repr(missing_users))))
if cnt_item_missing > 0:
self.stdout.write(self.style.ERROR(' %5d %s(s) not added, because the target item does not exist. Try python manage.py rebuild_cache to create items, if you import to a scratch database.' % (cnt_item_missing, section)))