production-taskbar / backend / informing / signals.py
signals.py
Raw
import json
from typing import Any

from django.db.models.signals import m2m_changed, post_save, pre_delete
from django.dispatch import receiver
from django_celery_beat.models import (ClockedSchedule, IntervalSchedule,
                                       PeriodicTask, PeriodicTasks)

from .models import CronSchedule, Notification


def generate_celery_name(notification: Notification,
                         cron_schedule_pk: str = '') -> str:
    cron_schedule_text = f'/{cron_schedule_pk}' if cron_schedule_pk else ''
    return f'Notification({notification.pk}{cron_schedule_text}) - {notification.name}'


def get_celery_uniq_name(name: str) -> str:
    return name.split('-')[0]


def get_celery_group_name(name: str) -> str:
    return get_celery_uniq_name(name).replace(') ', '/')


@receiver(post_save, sender=CronSchedule)
def update_celery(sender: CronSchedule, **kwargs: Any) -> None:
    instance = kwargs['instance']
    queryset = instance.cron_schedules.all()
    for notification in queryset:
        name = generate_celery_name(notification, instance.pk)
        celery_uniq_name = get_celery_uniq_name(name)
        PeriodicTask.objects.update_or_create(
            name__contains=celery_uniq_name,
            defaults={'crontab': instance.crontab})


@receiver(pre_delete, sender=CronSchedule)
def check_notifications(sender: CronSchedule, instance: CronSchedule,
                        using: str, **kwargs: Any) -> None:
    queryset = instance.cron_schedules.all()    # type: ignore
    queryset.update(is_active=False, one_off=True)
    for notification in queryset:
        name = generate_celery_name(notification, instance.pk)
        PeriodicTask.objects.filter(
            name__contains=get_celery_uniq_name(name)).delete()


@receiver(pre_delete, sender=Notification)
def del_celery(sender: Notification, **kwargs: Any) -> None:
    notification = kwargs['instance']
    name = generate_celery_name(notification)
    queryset = PeriodicTask.objects.filter(
        name__contains=get_celery_uniq_name(name))

    cron_schedules = notification.cron_schedules.all()
    for cron_schedule in cron_schedules:
        name = generate_celery_name(notification, cron_schedule.pk)
        queryset = queryset | PeriodicTask.objects.filter(name__contains=name)

    queryset.delete()


@receiver(post_save, sender=Notification)
def add_celery(sender: Notification, **kwargs: Any) -> None:
    clocked = None
    notification = kwargs['instance']
    interval = getattr(notification, 'interval', None)
    one_off = getattr(notification, 'one_off', False)
    start_time = getattr(notification, 'start_datetime')
    enabled = getattr(notification, 'is_active')
    expires_time = getattr(notification, 'expires_datetime', None)

    if interval:
        interval, schedule_created = IntervalSchedule.objects.get_or_create(
            every=notification.interval,
            period=IntervalSchedule.MINUTES,
        )

    if one_off:
        clocked, clocked_created = ClockedSchedule.objects.get_or_create(
            clocked_time=start_time)

    name = generate_celery_name(notification)

    values = {
        'task': 'informing.tasks.broadcast_notification',
        'enabled': enabled,
        'interval': interval,
        'one_off': one_off,
        'clocked': clocked,
        'start_time': start_time,
        'expires': expires_time
    }

    if interval or one_off:
        celery_uniq_name = get_celery_uniq_name(name)
        values['name'] = name
        values['crontab'] = None
        values['kwargs'] = json.dumps({
            'notification_id': notification.pk,
            'task_name': celery_uniq_name
        })
        PeriodicTask.objects.update_or_create(name__contains=celery_uniq_name,
                                              defaults=values)
    else:
        # get and update crontab tasks, crontab itself change on m2m_changed
        queryset = PeriodicTask.objects.filter(
            name__contains=get_celery_group_name(name))
        queryset.update(**values)
        PeriodicTasks.update_changed()


@receiver(m2m_changed, sender=Notification.cron_schedules.through)
def add_celery_cron(sender: Notification, instance: Notification, action: str,
                    **kwargs: Any) -> None:
    pk_set = kwargs.get('pk_set', set())
    start_time = getattr(instance, 'start_datetime')
    enabled = getattr(instance, 'is_active')
    expires_time = getattr(instance, 'expires_datetime', None)

    values = {
        'task': 'informing.tasks.broadcast_notification',
        'enabled': enabled,
        'interval': None,
        'one_off': False,
        'clocked': None,
        'start_time': start_time,
        'expires': expires_time
    }

    if action == 'post_add':
        # remove non-cron task
        PeriodicTask.objects.filter(name__contains=get_celery_uniq_name(
            generate_celery_name(instance))).delete()

    for pk in pk_set:
        name = generate_celery_name(instance, pk)
        celery_uniq_name = get_celery_uniq_name(name)
        cron_schedule = CronSchedule.objects.filter(pk=pk).first()
        values['name'] = name
        values['crontab'] = getattr(cron_schedule, 'crontab', None)
        values['kwargs'] = json.dumps({
            'notification_id': instance.pk,
            'task_name': celery_uniq_name
        })

        if action == 'post_add':
            PeriodicTask.objects.update_or_create(
                name__contains=celery_uniq_name, defaults=values)

        if action == 'post_remove':
            PeriodicTask.objects.filter(
                name__contains=celery_uniq_name).delete()