from os import environ from typing import Any import requests from celery import shared_task from taskbar.models import Workplace data_acq_url = environ.get('DATA_ACQUISITION_URL') workplaces_url = f'{data_acq_url}sebn-data-acquisition/api/sebn-taskbar-manager/workplaces/' @shared_task # type: ignore def populate_workplaces(*args: Any, **kwargs: Any) -> str: session = requests.Session() session.trust_env = False # ignore proxies env response = session.post(workplaces_url) if (response.status_code == 200 and response.json()): updated_count = 0 data = response.json() for item in data: result = Workplace.objects.filter( hostname__iexact=item['hostname']).update( workplace_name=item['workplace'], workplace_system=item['system'], ) if result: updated_count += 1 return f'success {updated_count}/{len(data)}' return f'response-error: {response.status_code}'