import json from typing import Any from django.db.models.signals import m2m_changed, post_save, pre_delete from django.dispatch import receiver from django_celery_beat.models import (ClockedSchedule, IntervalSchedule, PeriodicTask, PeriodicTasks) from .models import CronSchedule, Notification def generate_celery_name(notification: Notification, cron_schedule_pk: str = '') -> str: cron_schedule_text = f'/{cron_schedule_pk}' if cron_schedule_pk else '' return f'Notification({notification.pk}{cron_schedule_text}) - {notification.name}' def get_celery_uniq_name(name: str) -> str: return name.split('-')[0] def get_celery_group_name(name: str) -> str: return get_celery_uniq_name(name).replace(') ', '/') @receiver(post_save, sender=CronSchedule) def update_celery(sender: CronSchedule, **kwargs: Any) -> None: instance = kwargs['instance'] queryset = instance.cron_schedules.all() for notification in queryset: name = generate_celery_name(notification, instance.pk) celery_uniq_name = get_celery_uniq_name(name) PeriodicTask.objects.update_or_create( name__contains=celery_uniq_name, defaults={'crontab': instance.crontab}) @receiver(pre_delete, sender=CronSchedule) def check_notifications(sender: CronSchedule, instance: CronSchedule, using: str, **kwargs: Any) -> None: queryset = instance.cron_schedules.all() # type: ignore queryset.update(is_active=False, one_off=True) for notification in queryset: name = generate_celery_name(notification, instance.pk) PeriodicTask.objects.filter( name__contains=get_celery_uniq_name(name)).delete() @receiver(pre_delete, sender=Notification) def del_celery(sender: Notification, **kwargs: Any) -> None: notification = kwargs['instance'] name = generate_celery_name(notification) queryset = PeriodicTask.objects.filter( name__contains=get_celery_uniq_name(name)) cron_schedules = notification.cron_schedules.all() for cron_schedule in cron_schedules: name = generate_celery_name(notification, cron_schedule.pk) queryset = queryset | PeriodicTask.objects.filter(name__contains=name) queryset.delete() @receiver(post_save, sender=Notification) def add_celery(sender: Notification, **kwargs: Any) -> None: clocked = None notification = kwargs['instance'] interval = getattr(notification, 'interval', None) one_off = getattr(notification, 'one_off', False) start_time = getattr(notification, 'start_datetime') enabled = getattr(notification, 'is_active') expires_time = getattr(notification, 'expires_datetime', None) if interval: interval, schedule_created = IntervalSchedule.objects.get_or_create( every=notification.interval, period=IntervalSchedule.MINUTES, ) if one_off: clocked, clocked_created = ClockedSchedule.objects.get_or_create( clocked_time=start_time) name = generate_celery_name(notification) values = { 'task': 'informing.tasks.broadcast_notification', 'enabled': enabled, 'interval': interval, 'one_off': one_off, 'clocked': clocked, 'start_time': start_time, 'expires': expires_time } if interval or one_off: celery_uniq_name = get_celery_uniq_name(name) values['name'] = name values['crontab'] = None values['kwargs'] = json.dumps({ 'notification_id': notification.pk, 'task_name': celery_uniq_name }) PeriodicTask.objects.update_or_create(name__contains=celery_uniq_name, defaults=values) else: # get and update crontab tasks, crontab itself change on m2m_changed queryset = PeriodicTask.objects.filter( name__contains=get_celery_group_name(name)) queryset.update(**values) PeriodicTasks.update_changed() @receiver(m2m_changed, sender=Notification.cron_schedules.through) def add_celery_cron(sender: Notification, instance: Notification, action: str, **kwargs: Any) -> None: pk_set = kwargs.get('pk_set', set()) start_time = getattr(instance, 'start_datetime') enabled = getattr(instance, 'is_active') expires_time = getattr(instance, 'expires_datetime', None) values = { 'task': 'informing.tasks.broadcast_notification', 'enabled': enabled, 'interval': None, 'one_off': False, 'clocked': None, 'start_time': start_time, 'expires': expires_time } if action == 'post_add': # remove non-cron task PeriodicTask.objects.filter(name__contains=get_celery_uniq_name( generate_celery_name(instance))).delete() for pk in pk_set: name = generate_celery_name(instance, pk) celery_uniq_name = get_celery_uniq_name(name) cron_schedule = CronSchedule.objects.filter(pk=pk).first() values['name'] = name values['crontab'] = getattr(cron_schedule, 'crontab', None) values['kwargs'] = json.dumps({ 'notification_id': instance.pk, 'task_name': celery_uniq_name }) if action == 'post_add': PeriodicTask.objects.update_or_create( name__contains=celery_uniq_name, defaults=values) if action == 'post_remove': PeriodicTask.objects.filter( name__contains=celery_uniq_name).delete()