production-taskbar / backend / config / tasks.py
tasks.py
Raw
import io
from os import environ
from typing import Any

from django.conf import settings
from django.core import management
from django.core.mail import send_mail
from django_celery_beat.models import PeriodicTask, PeriodicTasks

from celery import shared_task


def mail_admins(subject: str, message: Any) -> int:
    return send_mail(
        subject=f'[Django] {subject}',
        message=str(message),
        from_email=settings.SERVER_EMAIL,
        recipient_list=[a[1] for a in settings.ADMINS],
        fail_silently=True,
    )


@shared_task
def cleanup() -> str:
    try:
        """Cleanup expired sessions by using Django management command."""
        management.call_command("clearsessions", verbosity=0)
        management.call_command("clean_duplicate_history", "--auto")
        management.call_command("clean_old_history", "--auto")
        return "success"
    except Exception as e:
        mail_admins('cleanup exception', e)
        return "failure"


@shared_task
def test_mail_admins(*args: Any, **kwargs: Any) -> str:
    mail_admins('test message',
                f'test celery task.\nArgs: {args}\nKwargs: {kwargs}')
    return "success"


@shared_task
def tasks_reset_last_run_time(*args: Any, **kwargs: Any) -> str:
    '''
    https://django-celery-beat.readthedocs.io/en/latest/

    If you change the Django TIME_ZONE setting your periodic task schedule will still be based on the old timezone.
    To fix that you would have to reset the “last run time” for each periodic task.
    Note that this will reset the state as if the periodic tasks have never run before.
    '''

    tasks = PeriodicTask.objects.all()
    tasks.update(last_run_at=None)
    for task in tasks:
        PeriodicTasks.changed(task)
    return "success"


@shared_task
def migrate() -> str:
    with io.StringIO() as out:
        management.call_command('migrate',
                                verbosity=1,
                                interactive=False,
                                stdout=out)
        return out.getvalue()