from typing import Any, List, Union from django import forms from django.core.cache import cache from django.utils.translation import gettext_lazy as _ from .models import Notification, OrganizationalUnit class OrganizationalUnitInlineForm(forms.ModelForm): #type: ignore class Meta: model = OrganizationalUnit fields = '__all__' def clean(self): # type: ignore super(OrganizationalUnitInlineForm, self).clean() msg = None ou_id = self.cleaned_data.get('id', None) name = self.cleaned_data.get('name', None) parent = self.cleaned_data.get('parent', None) ous = cache.get('ous', None) if not ou_id and not ous: ous = OrganizationalUnit.objects.filter( location_id=parent.location_id).values_list('name', flat=True) cache.set('ous', ous) if name in ous: msg = _('OU with name %(name)s already exists') % { 'name': name, } if msg: raise forms.ValidationError(msg) if not ou_id: cache.delete('ous') return self.cleaned_data class OrganizationalUnitForm(forms.ModelForm): #type: ignore class Meta: model = OrganizationalUnit fields = '__all__' def clean(self): # type: ignore super(OrganizationalUnitForm, self).clean() msg = None location = self.cleaned_data.get('location', None) # validate childs location for existed ou if self.instance.pk: childs = OrganizationalUnit.objects.filter(parent=self.instance) for c in childs: if c.location != location: msg = _('OU %(ou)s location is not %(location)s') % { 'ou': c, 'location': location } break if msg: raise forms.ValidationError(msg) return self.cleaned_data def clean_workplaces(self) -> Any: msg = None workplaces = self.cleaned_data.get('workplaces', None) location = self.cleaned_data.get('location', None) workplaces_err = [] workplaces_uniq_err = [] for workplace in workplaces: if workplace.workplace_type.location != location: workplaces_err.append(workplace.hostname) # validate workplaces existing in another ou ous = workplace.organizationalunit_set.all() if self.instance.pk: ous = ous.exclude(id=self.instance.pk) if ous: workplaces_uniq_err.append( f'{workplace.hostname} => {list(ous.values_list("name", flat=True))}' ) if workplaces_uniq_err: msg = _('Workplaces already in: %(err)s') % { 'err': workplaces_uniq_err } if workplaces_err: msg = _( 'Workplaces %(err)s from another location. Select from %(location)s' ) % { 'err': workplaces_err, 'location': location } if msg: raise forms.ValidationError(msg) return workplaces def clean_parent(self) -> Any: parent = self.cleaned_data['parent'] if parent: location = self.cleaned_data.get('location', None) if parent == self.instance: msg = _('Parent cant be self') raise forms.ValidationError(msg) if location != parent.location: msg = _('Select ou from your location') raise forms.ValidationError(msg) return parent def recursive_parent_validate( ou: OrganizationalUnit, ous: List[OrganizationalUnit], child: Union[OrganizationalUnit, None] = None) -> OrganizationalUnit: if not ou.parent: return ou else: parent = recursive_parent_validate(ou.parent, ous, ou) if parent in ous: msg = _( 'Remove %(unit)s because it is already in [%(parent)s]') % { 'unit': child or ou, 'parent': parent } raise forms.ValidationError(msg) else: return ou class NotificationForm(forms.ModelForm): #type: ignore class Meta: model = Notification fields = '__all__' def clean(self): # type: ignore super(NotificationForm, self).clean() shifts = self.cleaned_data.get('shifts', None) recipients = self.cleaned_data.get('recipients', None) location = self.cleaned_data.get('location', None) need_confirmation = self.cleaned_data.get('need_confirmation', None) is_overlay = self.cleaned_data.get('is_overlay', None) if need_confirmation and is_overlay: msg = _('Overlay is not compatible with confirmation') raise forms.ValidationError(msg) # validate shifts location for s in shifts: if (s.location != location): msg = _('Shift location does not match notification location') raise forms.ValidationError(msg) # validate recipients location for r in recipients: if (r.location != location): msg = _( 'Reciever location does not match notification location') raise forms.ValidationError(msg) recursive_parent_validate(r, recipients) schedule_types = ['interval', 'cron_schedules', 'one_off'] selected_schedule_types = [ s for s in schedule_types if self.cleaned_data.get(s, None) ] if len(selected_schedule_types) == 0: err_msg = _('Select interval, crontab, or one-off schedule') raise forms.ValidationError(err_msg) if len(selected_schedule_types) > 1: error_info = {} err_msg = _( 'Only one of interval, crontab, or one-off must be set') for selected_schedule_type in selected_schedule_types: error_info[selected_schedule_type] = [err_msg] raise forms.ValidationError(error_info) return self.cleaned_data def clean_close_delay(self) -> Any: max_delay = 10 close_delay = self.cleaned_data['close_delay'] is_overlay = self.cleaned_data.get('is_overlay', None) if is_overlay and close_delay > max_delay: msg = _( 'Max time in overlay cant be greater than %(time)s min') % { 'time': max_delay } raise forms.ValidationError(msg) return close_delay