production-taskbar / backend / config / celery.py
celery.py
Raw
import os
from typing import Any, Dict

from django_celery_beat.apps import BeatConfig

from celery import Celery, Task
from celery.signals import worker_ready
from celery.worker.consumer import Consumer

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production')

app = Celery('sebn-taskbar-manager')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

BeatConfig.verbose_name = "Celery Periodic Tasks"


@app.task(bind=True)    # type: ignore
def debug_task(self: Task) -> None:
    print(f'Request: {self.request!r}')


# # # Move tgbot from celery thread to separate container due some issues with locking worker
# # Startup tasks
# @worker_ready.connect    # type: ignore
# def at_start(sender: Consumer, **k: Dict[str, Any]) -> None:
#     with sender.app.connection() as conn:
#         sender.app.send_task(
#             'tgbot.tasks.start_polling',
#             None,
#             connection=conn,
#         )