import json from typing import Any, Dict from asgiref.sync import async_to_sync from channels.generic.websocket import WebsocketConsumer from .models import Notification, NotificationConfirmation GROUP_NAME = 'informing' class InformingConsumer(WebsocketConsumer): groups = [GROUP_NAME] def connect(self) -> None: self.organizational_unit = self.scope['url_route']['kwargs'][ 'organizational_unit'] self.informing_group_name = f'informing_{self.organizational_unit}' async_to_sync(self.channel_layer.group_add)( # type: ignore self.informing_group_name, self.channel_name) self.accept() def disconnect(self, close_code: int) -> None: async_to_sync(self.channel_layer.group_discard)( # type: ignore self.informing_group_name, self.channel_name) def create_confirmation(self, data: Dict[str, str]) -> bool: notification = Notification.objects.filter(id=data['nId']).first() if notification: NotificationConfirmation.objects.create(user_id=data['uId'], notification=notification) return True return False def receive(self, text_data: str = '', bytes_data: bytes = b'') -> None: if bytes_data == b'ping': self.send(bytes_data=b'pong') return if text_data: data = json.loads(text_data) if data: msg_type = data.get('type', None) if msg_type == 'notification.confirm': n_id = data.get('notification_id', None) u_id = data.get('user_id', None) values = {'nId': n_id, 'uId': u_id} if n_id and u_id: result = self.create_confirmation(values) message = { 'type': 'notification.confirm.response', 'result': result } self.send(text_data=json.dumps(message)) def informing_notify(self, event: Dict[str, Any]) -> None: message = {'type': 'notification.broadcast', 'text': event['text']} self.send(text_data=json.dumps(message))