import os from typing import Any, Dict from django_celery_beat.apps import BeatConfig from celery import Celery, Task from celery.signals import worker_ready from celery.worker.consumer import Consumer os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production') app = Celery('sebn-taskbar-manager') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() BeatConfig.verbose_name = "Celery Periodic Tasks" @app.task(bind=True) # type: ignore def debug_task(self: Task) -> None: print(f'Request: {self.request!r}') # # # Move tgbot from celery thread to separate container due some issues with locking worker # # Startup tasks # @worker_ready.connect # type: ignore # def at_start(sender: Consumer, **k: Dict[str, Any]) -> None: # with sender.app.connection() as conn: # sender.app.send_task( # 'tgbot.tasks.start_polling', # None, # connection=conn, # )