production-taskbar / backend / informing / consumers.py
consumers.py
Raw
import json
from typing import Any, Dict

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer

from .models import Notification, NotificationConfirmation

GROUP_NAME = 'informing'


class InformingConsumer(WebsocketConsumer):
    groups = [GROUP_NAME]

    def connect(self) -> None:
        self.organizational_unit = self.scope['url_route']['kwargs'][
            'organizational_unit']
        self.informing_group_name = f'informing_{self.organizational_unit}'
        async_to_sync(self.channel_layer.group_add)(    # type: ignore
            self.informing_group_name, self.channel_name)
        self.accept()

    def disconnect(self, close_code: int) -> None:
        async_to_sync(self.channel_layer.group_discard)(    # type: ignore
            self.informing_group_name, self.channel_name)

    def create_confirmation(self, data: Dict[str, str]) -> bool:
        notification = Notification.objects.filter(id=data['nId']).first()
        if notification:
            NotificationConfirmation.objects.create(user_id=data['uId'],
                                                    notification=notification)
            return True
        return False

    def receive(self, text_data: str = '', bytes_data: bytes = b'') -> None:
        if bytes_data == b'ping':
            self.send(bytes_data=b'pong')
            return

        if text_data:
            data = json.loads(text_data)
            if data:
                msg_type = data.get('type', None)
                if msg_type == 'notification.confirm':
                    n_id = data.get('notification_id', None)
                    u_id = data.get('user_id', None)
                    values = {'nId': n_id, 'uId': u_id}
                    if n_id and u_id:
                        result = self.create_confirmation(values)
                        message = {
                            'type': 'notification.confirm.response',
                            'result': result
                        }
                        self.send(text_data=json.dumps(message))

    def informing_notify(self, event: Dict[str, Any]) -> None:
        message = {'type': 'notification.broadcast', 'text': event['text']}
        self.send(text_data=json.dumps(message))