import io from os import environ from typing import Any from django.conf import settings from django.core import management from django.core.mail import send_mail from django_celery_beat.models import PeriodicTask, PeriodicTasks from celery import shared_task def mail_admins(subject: str, message: Any) -> int: return send_mail( subject=f'[Django] {subject}', message=str(message), from_email=settings.SERVER_EMAIL, recipient_list=[a[1] for a in settings.ADMINS], fail_silently=True, ) @shared_task def cleanup() -> str: try: """Cleanup expired sessions by using Django management command.""" management.call_command("clearsessions", verbosity=0) management.call_command("clean_duplicate_history", "--auto") management.call_command("clean_old_history", "--auto") return "success" except Exception as e: mail_admins('cleanup exception', e) return "failure" @shared_task def test_mail_admins(*args: Any, **kwargs: Any) -> str: mail_admins('test message', f'test celery task.\nArgs: {args}\nKwargs: {kwargs}') return "success" @shared_task def tasks_reset_last_run_time(*args: Any, **kwargs: Any) -> str: ''' https://django-celery-beat.readthedocs.io/en/latest/ If you change the Django TIME_ZONE setting your periodic task schedule will still be based on the old timezone. To fix that you would have to reset the “last run time” for each periodic task. Note that this will reset the state as if the periodic tasks have never run before. ''' tasks = PeriodicTask.objects.all() tasks.update(last_run_at=None) for task in tasks: PeriodicTasks.changed(task) return "success" @shared_task def migrate() -> str: with io.StringIO() as out: management.call_command('migrate', verbosity=1, interactive=False, stdout=out) return out.getvalue()