production-taskbar / backend / tgbot / handlers / helpdesk / handlers.py
handlers.py
Raw
import re

from django.utils.translation import gettext as _
from telegram import (ForceReply, InlineKeyboardMarkup, InputMediaPhoto,
                      Message, MessageEntity, Update)
from telegram.ext import ContextTypes, ConversationHandler

from config.utils.pillow import image_data_to_data_url
from tgbot.handlers.helpdesk.keyboard import \
    make_keyboard_for_opened_informing_issue
from tgbot.handlers.utils.django import (IssueStatus, attach_file_redmine,
                                         get_tg_user_username, update_issue)
from tgbot.handlers.utils.info import extract_user_data_from_update


def parse_issue_id(text: str) -> int | None:
    issue_pattern = '^\w+.*#(\d+)'
    result = re.search(issue_pattern, text, re.IGNORECASE)
    if result:
        return int(result[1])

    return None


def get_issue_id(update: Update, message_text: str = '') -> int | None:
    try:
        return int(update.callback_query.data.split()[1])
    except Exception:
        return parse_issue_id(message_text)


# Save text formatting for entities (bold text and href)
def reformat_message_text(message: Message) -> str:
    text = ''
    msg_entities = None
    parsed_entities = None
    offset = 0
    types = [
        MessageEntity.BOLD,
        MessageEntity.TEXT_LINK,
        MessageEntity.ITALIC,
    ]

    if getattr(message, 'caption', False):
        text = message.caption
        msg_entities = message.caption_entities
        parsed_entities = message.parse_caption_entities(types)
    else:
        text = message.text
        msg_entities = message.entities
        parsed_entities = message.parse_entities(types)

    if msg_entities and parsed_entities:
        for ent, entity_text in parsed_entities.items():
            insert_before = ''
            insert_after = ''

            if ent.type == MessageEntity.TEXT_LINK:
                insert_before = f'<a href="{ent.url}">'
                insert_after = '</a>'

            if ent.type == MessageEntity.BOLD:
                insert_before = '<b>'
                insert_after = '</b>'

            if ent.type == MessageEntity.ITALIC:
                insert_before = '<i>'
                insert_after = '</i>'

            text = text[:ent.offset + offset] + insert_before + \
                       entity_text + insert_after + \
                       text[len(entity_text) + ent.offset + offset:]
            offset += len(insert_before) + len(insert_after)

    return text


async def update_issue_handler(
        update: Update,
        context: ContextTypes.DEFAULT_TYPE,
        status_code: int,
        reply_markup: InlineKeyboardMarkup | None = None) -> None:

    is_updated = False
    message_text = None
    is_photo_message = getattr(update.callback_query.message, 'caption', False)
    user_data = extract_user_data_from_update(update)
    message_text = reformat_message_text(update.callback_query.message)
    issue_id = get_issue_id(update, message_text)
    is_updated, text = await update_issue(issue_id, user_data['user_id'],
                                          status_code)

    if is_updated:
        tg_text = message_text if re.search(
            text, message_text) else f'{message_text}\n๐Ÿ”„ {text}'
        ##########
        if status_code == IssueStatus.OPEN:
            tg_text = f'{message_text}\n\n๐Ÿ‘จโ€๐Ÿ”ง <ins>{text}</ins>'
            if issue_id:
                reply_markup = make_keyboard_for_opened_informing_issue(
                    issue_id)
        if status_code == IssueStatus.CLOSED:
            tg_text = f'{message_text}\n๐Ÿ‘Œ๐Ÿป <ins>{text}</ins>'

        ########
        if is_photo_message:
            if (tg_text != message_text):
                await update.callback_query.message.edit_caption(
                    caption=tg_text,
                    reply_markup=reply_markup,    # type: ignore
                    parse_mode='HTML',
                )
            # if status_code == IssueStatus.CLOSED:    # replace photo, !unreliable - media_id is changing
            #     await update.callback_query.message.edit_media(media=InputMediaPhoto(
            #         "AgACAgIAAxkBAAEJiGhivB39CfFmaQgM-NlLOhAvLJcxbAACYLwxG4Q-4Um2Qjq1z5GGOAEAAwIAA3MAAykE",    # file_id of uploaded empty stub image, uploaded in chat with bot
            #         caption=tg_text,
            #         parse_mode='HTML',
            #     ))

        else:
            if (tg_text != message_text):
                await update.callback_query.message.edit_text(
                    text=tg_text,
                    reply_markup=reply_markup,    # type: ignore
                    parse_mode='HTML',
                )
    else:
        await context.bot.send_message(
            text=f'โš ๏ธ {text}',
            chat_id=user_data['user_id'],
            protect_content=True,
        )


async def open_issue(update: Update,
                     context: ContextTypes.DEFAULT_TYPE) -> None:
    issue_id = get_issue_id(update)
    reply_markup = None
    if issue_id:
        reply_markup = make_keyboard_for_opened_informing_issue(issue_id)
    await update_issue_handler(update, context, IssueStatus.OPEN, reply_markup)


async def close_issue(update: Update,
                      context: ContextTypes.DEFAULT_TYPE) -> None:
    await update_issue_handler(update, context, IssueStatus.CLOSED)


async def attach_file(update: Update,
                      context: ContextTypes.DEFAULT_TYPE) -> int:
    user_data = extract_user_data_from_update(update)
    issue_id: int | None = None
    text = _(
        'Cant recognize issue_id. Press and hold command in menu, then input issue id after command, for example /attach 0000'
    )

    if context.args:
        try:
            issue_id = int(context.args[0])
        except Exception:
            pass
    else:
        issue_id = get_issue_id(update)

    user = await get_tg_user_username(tg_user_id=user_data['user_id'])
    if user:
        if issue_id:
            await context.bot.send_message(
                text=_('Attach a photo to the issue #%(issue_id)s') %
                {'issue_id': issue_id},
                chat_id=user_data['user_id'],
                protect_content=True,
                reply_markup=ForceReply(),
            )
            return 0
    else:
        text = _('You are not logged in. Use /login')
    await context.bot.send_message(
        text=text,
        chat_id=user_data['user_id'],
        protect_content=True,
    )
    return ConversationHandler.END


async def upload_photo_redmine(update: Update,
                               context: ContextTypes.DEFAULT_TYPE) -> int:
    photo = getattr(update.message, 'photo', None)
    document = getattr(update.message, 'document', None)
    user_data = extract_user_data_from_update(update)
    caption = getattr(update.message, 'caption', '')
    text = None
    file_id = None

    # del reply message to clear chat
    await context.bot.delete_message(update.message.chat.id,
                                     update.message.message_id)
    if photo:
        file_id = photo[-1]['file_id']
    elif document:
        file_id = document['file_id']

    if file_id:
        reply_to_message = getattr(update.message, 'reply_to_message',
                                   {'text': ''})
        issue_id = get_issue_id(update, reply_to_message['text'])
        file = await context.bot.get_file(file_id)
        file_data = await file.download_as_bytearray()
        file_data_url = image_data_to_data_url(file_data)

        if issue_id:
            if file_data_url:
                upload_result = await attach_file_redmine(
                    issue_id,
                    user_data['user_id'],
                    caption,
                    file_data_url,
                )
                text = upload_result[1]
            else:
                text = _('There must be a photo! Try again from the beginning')
        else:
            text = _('Unable to parse issue_id, please try to reply again')
    else:
        text = _('There must be a photo! Try again from the beginning')
    if text:
        await context.bot.send_message(
            text=text,
            chat_id=user_data['user_id'],
            protect_content=True,
        )
    return ConversationHandler.END