from json import loads from typing import Any from celery import current_app from django.conf import settings from django.contrib import admin, messages from django.core.cache import cache from django.db.models import QuerySet from django.http import HttpRequest from django.utils.translation import gettext_lazy as _ from django_celery_beat.models import PeriodicTask from import_export import resources from import_export.admin import ExportMixin from import_export.fields import Field from import_export.formats import base_formats from simple_history.admin import SimpleHistoryAdmin from config.utils.datetime import formatted_dt from informing.forms import (NotificationForm, OrganizationalUnitForm, OrganizationalUnitInlineForm) from informing.models import (CronSchedule, Notification, NotificationConfirmation, OrganizationalUnit, Shift, Weekday) from informing.signals import generate_celery_name, get_celery_group_name def generate_list(queryset: QuerySet[Any]) -> str: string = '' for e in queryset: string += f'{e}\n' return string @admin.register(Weekday) class WeekdayAdmin(admin.ModelAdmin): #type: ignore list_display = ('name', 'number') @admin.register(Shift) class ShiftAdmin(SimpleHistoryAdmin): list_display = ( 'name', 'location', 'start_time', 'end_time', 'is_active', 'get_weekdays', ) list_filter = ( 'location', 'is_active', ) search_fields = ['name'] def get_weekdays(self, instance: Shift) -> Any: if instance.pk: queryset = instance.weekdays.all() return generate_list(queryset) return '' def get_queryset(self, request): # type: ignore return super(ShiftAdmin, self).get_queryset(request).prefetch_related('weekdays') class OUInline(admin.TabularInline): #type: ignore verbose_name = _('Organizational Unit') verbose_name_plural = _('Organizational Unit children') model = OrganizationalUnit exclude = ('description', 'location') filter_horizontal = ('workplaces', ) readonly_fields = ('childs', ) extra = 0 form = OrganizationalUnitInlineForm def childs(self, instance: OrganizationalUnit) -> Any: if instance.pk: queryset = OrganizationalUnit.objects.filter(parent=instance) return generate_list(queryset) return '' @admin.register(OrganizationalUnit) class OrganizationalUnitAdmin(SimpleHistoryAdmin): form = OrganizationalUnitForm autocomplete_fields = ['parent'] list_display = ('name', 'description', 'location', 'parent', 'childs') list_filter = ('location', 'parent') search_fields = ['name'] filter_horizontal = ('workplaces', ) readonly_fields = ('childs', ) inlines = (OUInline, ) list_per_page = getattr(settings, 'LIST_PER_PAGE') def childs(self, instance: OrganizationalUnit) -> Any: if instance.pk: childs_str = '' for item in cache.get('ouparentlist'): if item.get('parent_id') == instance.pk: childs_str += f'{item.get("name")}, ' return childs_str return '' def save_formset(self, request, form, formset, change): # type: ignore for inline_form in formset.forms: inline_form.instance.location = form.cleaned_data['location'] super().save_formset(request, form, formset, change) def get_queryset(self, request): # type: ignore queryset = super(OrganizationalUnitAdmin, self).get_queryset(request).prefetch_related('parent') cache.set('ouparentlist', list(queryset.values('id', 'name', 'parent_id')), 30) return queryset @admin.register(Notification) class NotificationAdmin(SimpleHistoryAdmin): form = NotificationForm list_display = ('name', 'schedule', 'last_run_at', 'is_active', 'location', 'get_recipients') readonly_fields = ('last_run_at', 'get_confirmation_count') filter_horizontal = ('shifts', 'recipients', 'cron_schedules') list_filter = ('location', 'is_active') search_fields = ['name'] fieldsets = ( (None, { 'fields': ( 'name', 'location', 'recipients', 'description', ('is_active', 'need_confirmation', 'is_overlay'), ('last_run_at', 'get_confirmation_count'), ), 'classes': ('extrapretty', 'wide'), }), (_('NotificationAdmin content'), { 'fields': ('content', ), 'classes': ('extrapretty', 'wide'), }), (_('NotificationAdmin schedule'), { 'fields': ( ('interval', 'one_off'), 'cron_schedules', 'shifts', 'close_delay', 'show_on_shift_start', 'show_on_shift_end', 'start_datetime', 'expires_datetime', ), 'classes': ('extrapretty', 'wide'), }), ) actions = ( 'enable_notifications', 'disable_notifications', 'run_notifications', ) celery_app = current_app list_select_related = ('location', ) list_per_page = getattr(settings, 'LIST_PER_PAGE') def get_queryset(self, request): # type: ignore queryset = super(NotificationAdmin, self).get_queryset(request).prefetch_related( 'recipients', 'cron_schedules') schedules_list = list( queryset.values('id', 'cron_schedules', 'cron_schedules__name')) cache.set('notification_schedules', schedules_list, 30) return queryset @admin.display(description=f"{_('NotificationAdmin schedule')}") def schedule(self, obj: Notification) -> Any: cached_schedules = cache.get('notification_schedules') if cached_schedules: data = [] for e in cached_schedules: if obj.pk == e['id']: data.append(e['cron_schedules__name']) count = len(data) if count > 2: return _('%(count)s crontab schedules') % {'count': count} else: return data if obj.interval: return _('Every %(interval)s min.') % {'interval': obj.interval} if obj.one_off: return _('Notification one-off %(time)s') % { 'time': formatted_dt(obj.start_datetime) if obj.start_datetime else '' } return _('Not configured') @admin.display(description=str(_('NotificationAdmin recipients'))) def get_recipients(self, instance: Notification) -> Any: qs = instance.recipients.all() return ', '.join([f'{r.location_id}: {r.name}' for r in qs]) @admin.display(description=str(_('NotificationAdmin confirmations'))) def get_confirmation_count(self, instance: Notification) -> Any: return NotificationConfirmation.objects.filter( notification=instance).count() def _message_user_about_update(self, request: HttpRequest, rows_updated: int, verb: str) -> None: self.message_user(request, f'{rows_updated} {verb}') @admin.action(description=str(_('NotificationAdmin enable action'))) def enable_notifications(self, request: HttpRequest, queryset: QuerySet[Notification]) -> None: for notification in queryset: notification.is_active = True notification._change_reason = 'Enable' # type: ignore # save instead of update used to emit post_save signal for PeriodicTask model sync notification.save() self._message_user_about_update(request, queryset.count(), 'enabled.') @admin.action(description=str(_('NotificationAdmin disable action'))) def disable_notifications(self, request: HttpRequest, queryset: QuerySet[Notification]) -> None: for notification in queryset: notification.is_active = False notification._change_reason = 'Disable' # type: ignore # save instead of update used to emit post_save signal for PeriodicTask model sync notification.save() self._message_user_about_update(request, queryset.count(), 'disabled.') @admin.action(description=str(_('NotificationAdmin send action'))) def run_notifications(self, request: HttpRequest, queryset: QuerySet[Notification]) -> None: self.celery_app.loader.import_default_modules() tasks = [] for notification in queryset: name = generate_celery_name(notification) task = PeriodicTask.objects.filter(name=name).first() cron_tasks = PeriodicTask.objects.filter( name__contains=get_celery_group_name(name)) if task: tasks.append(task) if cron_tasks: # only first run to prevent multiple broadcast of the same notification tasks.append(cron_tasks.first()) if task or cron_tasks: notification._change_reason = 'Send by user' # type: ignore notification.save() else: cron_schedules = notification.cron_schedules.all() f = lambda i, n=notification: generate_celery_name(n, i.pk) msg = list(map( f, cron_schedules.all())) if cron_schedules else name self.message_user( request, _('Task %(task)s not found') % {'task': msg}, level=messages.ERROR, ) return tasks = [(self.celery_app.tasks.get(task.task), loads(task.args), loads(task.kwargs), task.queue) for task in tasks] if any(t[0] is None for t in tasks): for i, t in enumerate(tasks): if t[0] is None: break not_found_task_name = tasks[i].task self.message_user( request, _('Task %(task)s not found') % {'task': not_found_task_name}, level=messages.ERROR, ) return task_ids = [ task.apply_async(args=args, kwargs=kwargs, queue=queue) if queue and len(queue) else task.apply_async(args=args, kwargs=kwargs) for task, args, kwargs, queue in tasks ] tasks_run = len(task_ids) self.message_user( request, _('%(tasks_run)s successfully sended') % {'tasks_run': tasks_run}) class NotificationConfirmationResource(resources.ModelResource): notification = Field(column_name=_('Notification')) user_id = Field( attribute='user_id', column_name=_('NotificationConfirmation userId'), ) datetime = Field(column_name=_('NotificationConfirmation datetime')) class Meta: model = NotificationConfirmation exclude = ('id', ) def dehydrate_notification(self, obj: NotificationConfirmation) -> str: return f'{obj.notification.name} ({obj.notification.pk})' def dehydrate_datetime(self, obj: NotificationConfirmation) -> str: return f'{obj.datetime.astimezone().strftime("%d.%m.%Y %H:%M:%S")}' @admin.register(NotificationConfirmation) class NotificationConfirmationAdmin(ExportMixin, admin.ModelAdmin): # type: ignore resource_class = NotificationConfirmationResource list_display = ('user_id', 'notification', 'datetime') list_filter = ('notification__name', 'notification__location', 'datetime') search_fields = [ 'user_id', 'notification__name', ] readonly_fields = [ field.name for field in NotificationConfirmation._meta.fields if not settings.DEBUG or field.name == 'datetime' ] def get_export_formats(self) -> Any: return (base_formats.XLSX, base_formats.CSV) @admin.register(CronSchedule) class CronScheduleAdmin(SimpleHistoryAdmin): list_display = ( 'name', 'crontab', )