production-taskbar / backend / informing / admin.py
admin.py
Raw
from json import loads
from typing import Any

from celery import current_app
from django.conf import settings
from django.contrib import admin, messages
from django.core.cache import cache
from django.db.models import QuerySet
from django.http import HttpRequest
from django.utils.translation import gettext_lazy as _
from django_celery_beat.models import PeriodicTask
from import_export import resources
from import_export.admin import ExportMixin
from import_export.fields import Field
from import_export.formats import base_formats
from simple_history.admin import SimpleHistoryAdmin

from config.utils.datetime import formatted_dt
from informing.forms import (NotificationForm, OrganizationalUnitForm,
                             OrganizationalUnitInlineForm)
from informing.models import (CronSchedule, Notification,
                              NotificationConfirmation, OrganizationalUnit,
                              Shift, Weekday)
from informing.signals import generate_celery_name, get_celery_group_name


def generate_list(queryset: QuerySet[Any]) -> str:
    string = ''
    for e in queryset:
        string += f'{e}\n'
    return string


@admin.register(Weekday)
class WeekdayAdmin(admin.ModelAdmin):    #type: ignore

    list_display = ('name', 'number')


@admin.register(Shift)
class ShiftAdmin(SimpleHistoryAdmin):
    list_display = (
        'name',
        'location',
        'start_time',
        'end_time',
        'is_active',
        'get_weekdays',
    )
    list_filter = (
        'location',
        'is_active',
    )
    search_fields = ['name']

    def get_weekdays(self, instance: Shift) -> Any:
        if instance.pk:
            queryset = instance.weekdays.all()
            return generate_list(queryset)
        return ''

    def get_queryset(self, request):    # type: ignore
        return super(ShiftAdmin,
                     self).get_queryset(request).prefetch_related('weekdays')


class OUInline(admin.TabularInline):    #type: ignore
    verbose_name = _('Organizational Unit')
    verbose_name_plural = _('Organizational Unit children')
    model = OrganizationalUnit
    exclude = ('description', 'location')
    filter_horizontal = ('workplaces', )
    readonly_fields = ('childs', )
    extra = 0
    form = OrganizationalUnitInlineForm

    def childs(self, instance: OrganizationalUnit) -> Any:
        if instance.pk:
            queryset = OrganizationalUnit.objects.filter(parent=instance)
            return generate_list(queryset)
        return ''


@admin.register(OrganizationalUnit)
class OrganizationalUnitAdmin(SimpleHistoryAdmin):

    form = OrganizationalUnitForm
    autocomplete_fields = ['parent']
    list_display = ('name', 'description', 'location', 'parent', 'childs')
    list_filter = ('location', 'parent')
    search_fields = ['name']
    filter_horizontal = ('workplaces', )
    readonly_fields = ('childs', )
    inlines = (OUInline, )
    list_per_page = getattr(settings, 'LIST_PER_PAGE')

    def childs(self, instance: OrganizationalUnit) -> Any:
        if instance.pk:
            childs_str = ''
            for item in cache.get('ouparentlist'):
                if item.get('parent_id') == instance.pk:
                    childs_str += f'{item.get("name")}, '
            return childs_str
        return ''

    def save_formset(self, request, form, formset, change):    # type: ignore
        for inline_form in formset.forms:
            inline_form.instance.location = form.cleaned_data['location']
        super().save_formset(request, form, formset, change)

    def get_queryset(self, request):    # type: ignore
        queryset = super(OrganizationalUnitAdmin,
                         self).get_queryset(request).prefetch_related('parent')
        cache.set('ouparentlist',
                  list(queryset.values('id', 'name', 'parent_id')), 30)
        return queryset


@admin.register(Notification)
class NotificationAdmin(SimpleHistoryAdmin):

    form = NotificationForm
    list_display = ('name', 'schedule', 'last_run_at', 'is_active', 'location',
                    'get_recipients')
    readonly_fields = ('last_run_at', 'get_confirmation_count')
    filter_horizontal = ('shifts', 'recipients', 'cron_schedules')
    list_filter = ('location', 'is_active')
    search_fields = ['name']
    fieldsets = (
        (None, {
            'fields': (
                'name',
                'location',
                'recipients',
                'description',
                ('is_active', 'need_confirmation', 'is_overlay'),
                ('last_run_at', 'get_confirmation_count'),
            ),
            'classes': ('extrapretty', 'wide'),
        }),
        (_('NotificationAdmin content'), {
            'fields': ('content', ),
            'classes': ('extrapretty', 'wide'),
        }),
        (_('NotificationAdmin schedule'), {
            'fields': (
                ('interval', 'one_off'),
                'cron_schedules',
                'shifts',
                'close_delay',
                'show_on_shift_start',
                'show_on_shift_end',
                'start_datetime',
                'expires_datetime',
            ),
            'classes': ('extrapretty', 'wide'),
        }),
    )
    actions = (
        'enable_notifications',
        'disable_notifications',
        'run_notifications',
    )
    celery_app = current_app
    list_select_related = ('location', )
    list_per_page = getattr(settings, 'LIST_PER_PAGE')

    def get_queryset(self, request):    # type: ignore
        queryset = super(NotificationAdmin,
                         self).get_queryset(request).prefetch_related(
                             'recipients', 'cron_schedules')
        schedules_list = list(
            queryset.values('id', 'cron_schedules', 'cron_schedules__name'))
        cache.set('notification_schedules', schedules_list, 30)
        return queryset

    @admin.display(description=f"{_('NotificationAdmin schedule')}")
    def schedule(self, obj: Notification) -> Any:
        cached_schedules = cache.get('notification_schedules')

        if cached_schedules:
            data = []
            for e in cached_schedules:
                if obj.pk == e['id']:
                    data.append(e['cron_schedules__name'])

            count = len(data)
            if count > 2:
                return _('%(count)s crontab schedules') % {'count': count}
            else:
                return data

        if obj.interval:
            return _('Every %(interval)s min.') % {'interval': obj.interval}

        if obj.one_off:
            return _('Notification one-off %(time)s') % {
                'time': formatted_dt(obj.start_datetime)
                if obj.start_datetime else ''
            }

        return _('Not configured')

    @admin.display(description=str(_('NotificationAdmin recipients')))
    def get_recipients(self, instance: Notification) -> Any:
        qs = instance.recipients.all()
        return ', '.join([f'{r.location_id}: {r.name}' for r in qs])

    @admin.display(description=str(_('NotificationAdmin confirmations')))
    def get_confirmation_count(self, instance: Notification) -> Any:
        return NotificationConfirmation.objects.filter(
            notification=instance).count()

    def _message_user_about_update(self, request: HttpRequest,
                                   rows_updated: int, verb: str) -> None:
        self.message_user(request, f'{rows_updated} {verb}')

    @admin.action(description=str(_('NotificationAdmin enable action')))
    def enable_notifications(self, request: HttpRequest,
                             queryset: QuerySet[Notification]) -> None:
        for notification in queryset:
            notification.is_active = True
            notification._change_reason = 'Enable'    # type: ignore
            # save instead of update used to emit post_save signal for PeriodicTask model sync
            notification.save()
        self._message_user_about_update(request, queryset.count(), 'enabled.')

    @admin.action(description=str(_('NotificationAdmin disable action')))
    def disable_notifications(self, request: HttpRequest,
                              queryset: QuerySet[Notification]) -> None:
        for notification in queryset:
            notification.is_active = False
            notification._change_reason = 'Disable'    # type: ignore
            # save instead of update used to emit post_save signal for PeriodicTask model sync
            notification.save()
        self._message_user_about_update(request, queryset.count(), 'disabled.')

    @admin.action(description=str(_('NotificationAdmin send action')))
    def run_notifications(self, request: HttpRequest,
                          queryset: QuerySet[Notification]) -> None:
        self.celery_app.loader.import_default_modules()
        tasks = []
        for notification in queryset:
            name = generate_celery_name(notification)
            task = PeriodicTask.objects.filter(name=name).first()
            cron_tasks = PeriodicTask.objects.filter(
                name__contains=get_celery_group_name(name))
            if task:
                tasks.append(task)

            if cron_tasks:
                # only first run to prevent multiple broadcast of the same notification
                tasks.append(cron_tasks.first())

            if task or cron_tasks:
                notification._change_reason = 'Send by user'    # type: ignore
                notification.save()
            else:
                cron_schedules = notification.cron_schedules.all()
                f = lambda i, n=notification: generate_celery_name(n, i.pk)
                msg = list(map(
                    f, cron_schedules.all())) if cron_schedules else name
                self.message_user(
                    request,
                    _('Task %(task)s not found') % {'task': msg},
                    level=messages.ERROR,
                )
                return
        tasks = [(self.celery_app.tasks.get(task.task), loads(task.args),
                  loads(task.kwargs), task.queue) for task in tasks]

        if any(t[0] is None for t in tasks):
            for i, t in enumerate(tasks):
                if t[0] is None:
                    break
            not_found_task_name = tasks[i].task

            self.message_user(
                request,
                _('Task %(task)s not found') % {'task': not_found_task_name},
                level=messages.ERROR,
            )
            return

        task_ids = [
            task.apply_async(args=args, kwargs=kwargs, queue=queue) if queue
            and len(queue) else task.apply_async(args=args, kwargs=kwargs)
            for task, args, kwargs, queue in tasks
        ]
        tasks_run = len(task_ids)
        self.message_user(
            request,
            _('%(tasks_run)s successfully sended') % {'tasks_run': tasks_run})


class NotificationConfirmationResource(resources.ModelResource):

    notification = Field(column_name=_('Notification'))
    user_id = Field(
        attribute='user_id',
        column_name=_('NotificationConfirmation userId'),
    )
    datetime = Field(column_name=_('NotificationConfirmation datetime'))

    class Meta:
        model = NotificationConfirmation
        exclude = ('id', )

    def dehydrate_notification(self, obj: NotificationConfirmation) -> str:
        return f'{obj.notification.name} ({obj.notification.pk})'

    def dehydrate_datetime(self, obj: NotificationConfirmation) -> str:
        return f'{obj.datetime.astimezone().strftime("%d.%m.%Y %H:%M:%S")}'


@admin.register(NotificationConfirmation)
class NotificationConfirmationAdmin(ExportMixin,
                                    admin.ModelAdmin):    # type: ignore

    resource_class = NotificationConfirmationResource
    list_display = ('user_id', 'notification', 'datetime')
    list_filter = ('notification__name', 'notification__location', 'datetime')
    search_fields = [
        'user_id',
        'notification__name',
    ]
    readonly_fields = [
        field.name for field in NotificationConfirmation._meta.fields
        if not settings.DEBUG or field.name == 'datetime'
    ]

    def get_export_formats(self) -> Any:
        return (base_formats.XLSX, base_formats.CSV)


@admin.register(CronSchedule)
class CronScheduleAdmin(SimpleHistoryAdmin):

    list_display = (
        'name',
        'crontab',
    )