production-taskbar / backend / informing / forms.py
forms.py
Raw
from typing import Any, List, Union

from django import forms
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _

from .models import Notification, OrganizationalUnit


class OrganizationalUnitInlineForm(forms.ModelForm):    #type: ignore

    class Meta:
        model = OrganizationalUnit
        fields = '__all__'

    def clean(self):    # type: ignore
        super(OrganizationalUnitInlineForm, self).clean()

        msg = None
        ou_id = self.cleaned_data.get('id', None)
        name = self.cleaned_data.get('name', None)
        parent = self.cleaned_data.get('parent', None)

        ous = cache.get('ous', None)
        if not ou_id and not ous:
            ous = OrganizationalUnit.objects.filter(
                location_id=parent.location_id).values_list('name', flat=True)
            cache.set('ous', ous)
            if name in ous:
                msg = _('OU with name %(name)s already exists') % {
                    'name': name,
                }

        if msg:
            raise forms.ValidationError(msg)

        if not ou_id:
            cache.delete('ous')

        return self.cleaned_data


class OrganizationalUnitForm(forms.ModelForm):    #type: ignore

    class Meta:
        model = OrganizationalUnit
        fields = '__all__'

    def clean(self):    # type: ignore
        super(OrganizationalUnitForm, self).clean()
        msg = None
        location = self.cleaned_data.get('location', None)

        # validate childs location for existed ou
        if self.instance.pk:
            childs = OrganizationalUnit.objects.filter(parent=self.instance)
            for c in childs:
                if c.location != location:
                    msg = _('OU %(ou)s location is not %(location)s') % {
                        'ou': c,
                        'location': location
                    }
                    break
        if msg:
            raise forms.ValidationError(msg)
        return self.cleaned_data

    def clean_workplaces(self) -> Any:
        msg = None
        workplaces = self.cleaned_data.get('workplaces', None)
        location = self.cleaned_data.get('location', None)
        workplaces_err = []
        workplaces_uniq_err = []
        for workplace in workplaces:
            if workplace.workplace_type.location != location:
                workplaces_err.append(workplace.hostname)
            # validate workplaces existing in another ou
            ous = workplace.organizationalunit_set.all()
            if self.instance.pk:
                ous = ous.exclude(id=self.instance.pk)
                if ous:
                    workplaces_uniq_err.append(
                        f'{workplace.hostname} => {list(ous.values_list("name", flat=True))}'
                    )

        if workplaces_uniq_err:
            msg = _('Workplaces already in: %(err)s') % {
                'err': workplaces_uniq_err
            }
        if workplaces_err:
            msg = _(
                'Workplaces %(err)s from another location. Select from %(location)s'
            ) % {
                'err': workplaces_err,
                'location': location
            }
        if msg:
            raise forms.ValidationError(msg)

        return workplaces

    def clean_parent(self) -> Any:
        parent = self.cleaned_data['parent']
        if parent:
            location = self.cleaned_data.get('location', None)
            if parent == self.instance:
                msg = _('Parent cant be self')
                raise forms.ValidationError(msg)
            if location != parent.location:
                msg = _('Select ou from your location')
                raise forms.ValidationError(msg)
        return parent


def recursive_parent_validate(
        ou: OrganizationalUnit,
        ous: List[OrganizationalUnit],
        child: Union[OrganizationalUnit, None] = None) -> OrganizationalUnit:
    if not ou.parent:
        return ou
    else:
        parent = recursive_parent_validate(ou.parent, ous, ou)

        if parent in ous:
            msg = _(
                'Remove %(unit)s because it is already in [%(parent)s]') % {
                    'unit': child or ou,
                    'parent': parent
                }
            raise forms.ValidationError(msg)
        else:
            return ou


class NotificationForm(forms.ModelForm):    #type: ignore

    class Meta:
        model = Notification
        fields = '__all__'

    def clean(self):    # type: ignore
        super(NotificationForm, self).clean()
        shifts = self.cleaned_data.get('shifts', None)
        recipients = self.cleaned_data.get('recipients', None)
        location = self.cleaned_data.get('location', None)
        need_confirmation = self.cleaned_data.get('need_confirmation', None)
        is_overlay = self.cleaned_data.get('is_overlay', None)

        if need_confirmation and is_overlay:
            msg = _('Overlay is not compatible with confirmation')
            raise forms.ValidationError(msg)

        # validate shifts location
        for s in shifts:
            if (s.location != location):
                msg = _('Shift location does not match notification location')
                raise forms.ValidationError(msg)

        # validate recipients location
        for r in recipients:
            if (r.location != location):
                msg = _(
                    'Reciever location does not match notification location')
                raise forms.ValidationError(msg)
            recursive_parent_validate(r, recipients)

        schedule_types = ['interval', 'cron_schedules', 'one_off']
        selected_schedule_types = [
            s for s in schedule_types if self.cleaned_data.get(s, None)
        ]

        if len(selected_schedule_types) == 0:
            err_msg = _('Select interval, crontab, or one-off schedule')
            raise forms.ValidationError(err_msg)

        if len(selected_schedule_types) > 1:
            error_info = {}
            err_msg = _(
                'Only one of interval, crontab, or one-off must be set')
            for selected_schedule_type in selected_schedule_types:
                error_info[selected_schedule_type] = [err_msg]
            raise forms.ValidationError(error_info)

        return self.cleaned_data

    def clean_close_delay(self) -> Any:
        max_delay = 10
        close_delay = self.cleaned_data['close_delay']
        is_overlay = self.cleaned_data.get('is_overlay', None)
        if is_overlay and close_delay > max_delay:
            msg = _(
                'Max time in overlay cant be greater than %(time)s min') % {
                    'time': max_delay
                }
            raise forms.ValidationError(msg)
        return close_delay