import json import logging import random import re import time from collections import OrderedDict from datetime import datetime from itertools import groupby from typing import Any, Dict, List from asgiref.sync import async_to_sync from config.utils.pillow import concat_images from django.conf import settings from django.core.cache import cache from django.db.models import Count, Q from django.utils.timezone import make_aware from rest_framework import serializers from rest_framework.exceptions import (APIException, PermissionDenied, ValidationError) from rest_framework.fields import SerializerMethodField from rest_framework.utils.serializer_helpers import ReturnDict from taskbar.models import Workplace from tgbot.application import send_message from tgbot.handlers.helpdesk.keyboard import \ make_keyboard_for_new_informing_issue from .models import (CategoriesSet, Category, Department, Issue, Problem, Reciever, Status) from .redmine import (RedmineStatus, create_redmine_issue, get_user, is_redmine, update_redmine_issue, upload_file) from .sysaid import create_sysaid_ticket, fetch_layout, is_sysaid def get_chat_id(data: Dict[str, Any]) -> int | None: chat_id = None c = data.get('category', None) d = data.get('department', None) try: if getattr(c, 'configuration', False): chat_id = c.configuration.get('telegram', {}).get('chat_id', None) if not chat_id and getattr(d, 'configuration', False): chat_id = d.configuration.get('telegram', {}).get('chat_id', None) except Exception: chat_id = None if chat_id: return int(chat_id) return None def create_description(form: Dict[str, Any], for_telegram: bool = False) -> str: text = '' ip = form.get('ip', None) employee = form.get('employee', None) wp_info = form.get('wp_info', None) description = form.get('description', None) phone = form.get('phone', None) winkhm_path = form.get('winkhm_path', None) mao_boards = form.get('mao_boards', None) layout_url = form.get('layout_url', None) if ip and not for_telegram: text = f'{text}{ip} ' if wp_info: text = f'{text}{wp_info.strip()}' # client send string with line-break if employee and not for_telegram: text = f'{text}\n{employee}' if phone and not for_telegram: text = f'{text}\n{phone}' if description: text = f'{text}\n\n{description}' if winkhm_path or mao_boards or layout_url: text = f'{text}\n' if winkhm_path and not for_telegram: text = f'{text}\n{winkhm_path}' if mao_boards and not for_telegram: text = f'{text}\n{mao_boards}' if layout_url and not for_telegram: text = f'{text}\nLayout: {layout_url}' if for_telegram: text = re.sub('^$\n', '', text, flags=re.MULTILINE) return text def create_sysaid_title(workplace: Workplace | Any, title: str) -> str: if workplace: return f'[{workplace.workplace_system or workplace.workplace_type.name}] {workplace.hostname}: {title}' return title def create_redmine_title(workplace: Workplace | Any, title: str) -> str: if workplace: return f'[{workplace.workplace_system or workplace.workplace_type.name}] {workplace.workplace_name or workplace.hostname}: {title}' return title class ProblemSerializer(serializers.ModelSerializer[Problem]): class Meta: model = Problem fields = ('id', 'name', 'label', 'require_scan', 'add_mao_boards', 'add_winkhmman_path', 'attach_screenshot') class CategorySerializer(serializers.ModelSerializer[Category]): problems = SerializerMethodField() def get_problems(self, obj: Category) -> ReturnDict: serializer = ProblemSerializer(obj.problems, many=True) return serializer.data class Meta: model = Category fields = ('id', 'name', 'label', 'problems') class CategoriesSetSerializer(serializers.ModelSerializer[CategoriesSet]): categories = SerializerMethodField() def get_categories(self, obj: CategoriesSet) -> ReturnDict: serializer = CategorySerializer(obj.categories, many=True) return serializer.data class Meta: model = CategoriesSet fields = ('id', 'name', 'categories') class RecieverSerializer(serializers.ModelSerializer[Reciever]): class Meta: model = Reciever fields = ('id', 'name', 'configuration') class DepartmentHistorySerializer(serializers.ModelSerializer[Department]): class Meta: model = Department fields = ('id', 'name', "configuration") class DepartmentFullInfoSerializer(serializers.ModelSerializer[Department]): categories_set = SerializerMethodField() reciever = SerializerMethodField() location_name = SerializerMethodField() def get_categories_set(self, obj: Department) -> ReturnDict: serializer = CategoriesSetSerializer(obj.categories_set, many=False) return serializer.data def get_reciever(self, obj: Department) -> ReturnDict: queryset = obj.reciever serializer = RecieverSerializer(queryset, many=False) return serializer.data def get_location_name(self, obj: Department) -> Any: return obj.location.name class Meta: model = Department fields = ( 'id', 'name', 'location_name', 'reciever', 'contacts', 'categories_set', ) class StatusSerializer(serializers.ModelSerializer[Status]): class Meta: model = Status fields = ('id', 'name', 'label', 'description', 'color', 'close_timeout') class IssueSerializer(serializers.Serializer[Issue]): department_id = serializers.IntegerField() category_id = serializers.IntegerField(allow_null=True, required=False) impact = serializers.IntegerField(allow_null=True, required=False) urgency = serializers.IntegerField(allow_null=True, required=False) priority = serializers.IntegerField(allow_null=True, required=False) user_id = serializers.IntegerField(required=True) description = serializers.CharField(required=False, allow_blank=True, allow_null=True) title = serializers.CharField(required=True) phone = serializers.CharField(required=False, allow_blank=True, allow_null=True) hostname = serializers.CharField(required=True) issue_id = serializers.IntegerField(required=False, allow_null=True) id = serializers.IntegerField(required=False, allow_null=True) comments = serializers.CharField(required=False, allow_null=True) responsibility = serializers.CharField(required=False, allow_null=True) status = serializers.SerializerMethodField() create_datetime = serializers.DateTimeField(required=False, allow_null=True) update_datetime = serializers.DateTimeField(required=False, allow_null=True) close_datetime = serializers.DateTimeField(required=False, allow_null=True) form = serializers.CharField(required=False, allow_blank=True, allow_null=True) def get_status(self, obj: Issue) -> ReturnDict: queryset = obj.status serializer = StatusSerializer(queryset, many=False) return serializer.data def validate(self, data: OrderedDict[str, Any]) -> Any | None: department = Department.objects.filter( pk=data['department_id']).first() if (department): data['department'] = department category_id = data.get('category_id', None) if category_id: data['category'] = Category.objects.get(pk=category_id) reciever = department.reciever issue_id = None reply_markup = None attachment = None image = None # get location layout hostname = data['hostname'] workplace = Workplace.objects.filter(hostname=hostname).first() layout_result = fetch_layout(hostname) # create description with backward compatibilty form = data.get('form', None) form_title = '' form_json = {} if form: form_json = json.loads(form) if layout_result: form_json['layout_url'] = layout_result[1] form_title = form_json.get('title', '') data['description'] = create_description(form_json) attachment = form_json.get('attachment', None) # image for telegram image = layout_result[0] if layout_result else None #? screenshots to telegram disabled due privacy reasons # # concat images due impossible send to telegram media_group with keyboard # if layout_result and attachment: # header, encoded = attachment.split(",", 1) # image = concat_images(image, base64.b64decode(encoded)) if settings.DEBUG: issue_id = random.randint(0, 9999999) #NOSONAR logging.info( f'Generate issue_id {issue_id}. Issue doesn`t send to reciever in debug mode' ) chat_id = get_chat_id(data) reply_markup = make_keyboard_for_new_informing_issue(7230) url = 'example.com/' result = async_to_sync(send_message)( # type: ignore chat_id=chat_id, text= f'<b>Redmine issue</b> <a href="http://{url}redmine/issues/{7230}">#{7230}</a> -> <i>пріорітет негайний</i>:\n{create_redmine_title(workplace, form_title)}\n{create_description(form_json, True)}', image=image, reply_markup=reply_markup, ) logging.info(f'Sending to telegram {chat_id} result: {result}') time.sleep(1) else: if (is_sysaid(reciever)): data['title'] = create_sysaid_title(workplace, form_title) result = create_sysaid_ticket( reciever.backend_configuration, data) elif (is_redmine(reciever)): # upload attachment if attachment: upload_result = upload_file( reciever.backend_configuration, attachment, 'screenshot', ) if upload_result[0]: upload = json.loads(upload_result[1])['upload'] data['uploads'] = [{'token': upload['token']}] data['title'] = create_redmine_title(workplace, form_title) result = create_redmine_issue(reciever, data) if result[0]: issue_id = result[1].get('issue_id', None) chat_id = get_chat_id(data) result_text = result[1].get('text', None) title = data.get('title', '[no title]') text = f'{result_text}\n{title}\n{create_description(form_json, True)}'[: 4096] if issue_id: reply_markup = make_keyboard_for_new_informing_issue( issue_id) if chat_id: async_to_sync(send_message)( #type: ignore chat_id, text, image=image, reply_markup=reply_markup) else: raise APIException(f'Cant create an issue: {result[1]}') if (not issue_id): raise ValidationError(f'Invalid issue_id: {issue_id}') data['issue_id'] = issue_id data['title'] = data['title'][:40] return data raise ValidationError(f'Invalid department: {department}') def create(self, validated_data: Dict[str, Any]) -> Any: return Issue.objects.create( department=validated_data['department'], issue_id=validated_data['issue_id'], title=validated_data['title'], description=validated_data['description'], phone=validated_data.get('phone'), hostname=validated_data['hostname'], user_id=validated_data['user_id'], ) class IssueUpdateSerializer(serializers.Serializer[Issue]): status_id = serializers.IntegerField() user_id = serializers.IntegerField() def update(self, instance: Issue, validated_data: Dict[str, Any]) -> Issue: reciever = instance.department.reciever done_ratio = 0 if (is_redmine(reciever)): project_identifier = instance.department.configuration.get( 'redmine').get('identifier') user = get_user(config=reciever.backend_configuration, user_id=validated_data['user_id'], project_identifier=project_identifier) if (user): status = Status.objects.filter( id=validated_data['status_id']).first() if (status): status_id = status.redmine_code if status_id == RedmineStatus.CLOSED: done_ratio = 100 data = { "issue": { "issue_id": instance.issue_id, "status_id": status_id, "assigned_to_id": user[0]['user']['id'], "done_ratio": done_ratio } } result = update_redmine_issue( reciever.backend_configuration, data) if result[0]: status = Status.objects.filter( id=validated_data['status_id']).first() instance.status = status instance.update_datetime = make_aware(datetime.now()) instance.save() return instance raise PermissionDenied raise ValidationError class IssueHistorySerializer(serializers.ModelSerializer[Any]): department = SerializerMethodField() reciever = SerializerMethodField() status = SerializerMethodField() def get_department(self, obj: Issue) -> ReturnDict: serializer = DepartmentHistorySerializer(obj.department, many=False) return serializer.data def get_status(self, obj: Issue) -> ReturnDict: serializer = StatusSerializer(obj.status, many=False) return serializer.data def get_reciever(self, obj: Issue) -> ReturnDict: serializer = RecieverSerializer(obj.department.reciever, many=False) return serializer.data class Meta: model = Issue fields = '__all__' class IssuePerWorkplaceSerializer(serializers.Serializer[Any]): datetime_from = serializers.DateTimeField(required=False) datetime_to = serializers.DateTimeField(required=False) hostnames = serializers.ListField( child=serializers.CharField(min_length=10, max_length=10)) def validate(self, data: OrderedDict[str, Any]) -> Any | None: key = json.dumps(data, sort_keys=True, default=str) cached = cache.get(key, None) if cached: return cached hostnames = data.get('hostnames') datetime_from = data.get('datetime_from', None) datetime_to = data.get('datetime_to', None) filter_query = Q(hostname__in=hostnames) validated_data: Dict[str, List[Dict[str, Any]]] = {} if datetime_to and not datetime_from or datetime_from and not datetime_to: raise ValidationError( "Both 'datetime_to' and 'datetime_to' must be set") else: filter_query = filter_query & Q(create_datetime__gte=datetime_from, create_datetime__lte=datetime_to) queryset = Issue.objects.filter(filter_query).values( 'hostname', 'department__name').annotate( amount=Count('hostname', distinct=False)) # regroup by hostname for hostname, group in groupby( queryset, lambda x: x['hostname']): # type: ignore for obj in group: validated_data.setdefault(hostname, []).append({ 'department': obj['department__name'], 'amount': obj['amount'] }) cache.set(key, validated_data, 30 * 60) return validated_data