production-taskbar / backend / taskbar / tasks.py
tasks.py
Raw
from os import environ
from typing import Any

import requests
from celery import shared_task

from taskbar.models import Workplace

data_acq_url = environ.get('DATA_ACQUISITION_URL')
workplaces_url = f'{data_acq_url}sebn-data-acquisition/api/sebn-taskbar-manager/workplaces/'


@shared_task    # type: ignore
def populate_workplaces(*args: Any, **kwargs: Any) -> str:

    session = requests.Session()
    session.trust_env = False    # ignore proxies env
    response = session.post(workplaces_url)
    if (response.status_code == 200 and response.json()):
        updated_count = 0
        data = response.json()
        for item in data:
            result = Workplace.objects.filter(
                hostname__iexact=item['hostname']).update(
                    workplace_name=item['workplace'],
                    workplace_system=item['system'],
                )
            if result: updated_count += 1
        return f'success {updated_count}/{len(data)}'

    return f'response-error: {response.status_code}'