DalTag / backend / api / views.py
views.py
Raw
from celery.result import AsyncResult
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView


class TaskStatus(APIView):
    permission_classes = (IsAuthenticated,)

    def get(self, request, *args, **kwargs):
        task = AsyncResult(kwargs["task_id"])
        ready = task.ready()
        error = ready and not task.successful()

        return Response(
            {
                "ready": ready,
                "result": task.result if ready and not error else None,
                "error": {"text": str(task.result)} if error else None,
            }
        )