import re from django.utils.translation import gettext as _ from telegram import (ForceReply, InlineKeyboardMarkup, InputMediaPhoto, Message, MessageEntity, Update) from telegram.ext import ContextTypes, ConversationHandler from config.utils.pillow import image_data_to_data_url from tgbot.handlers.helpdesk.keyboard import \ make_keyboard_for_opened_informing_issue from tgbot.handlers.utils.django import (IssueStatus, attach_file_redmine, get_tg_user_username, update_issue) from tgbot.handlers.utils.info import extract_user_data_from_update def parse_issue_id(text: str) -> int | None: issue_pattern = '^\w+.*#(\d+)' result = re.search(issue_pattern, text, re.IGNORECASE) if result: return int(result[1]) return None def get_issue_id(update: Update, message_text: str = '') -> int | None: try: return int(update.callback_query.data.split()[1]) except Exception: return parse_issue_id(message_text) # Save text formatting for entities (bold text and href) def reformat_message_text(message: Message) -> str: text = '' msg_entities = None parsed_entities = None offset = 0 types = [ MessageEntity.BOLD, MessageEntity.TEXT_LINK, MessageEntity.ITALIC, ] if getattr(message, 'caption', False): text = message.caption msg_entities = message.caption_entities parsed_entities = message.parse_caption_entities(types) else: text = message.text msg_entities = message.entities parsed_entities = message.parse_entities(types) if msg_entities and parsed_entities: for ent, entity_text in parsed_entities.items(): insert_before = '' insert_after = '' if ent.type == MessageEntity.TEXT_LINK: insert_before = f'' insert_after = '' if ent.type == MessageEntity.BOLD: insert_before = '' insert_after = '' if ent.type == MessageEntity.ITALIC: insert_before = '' insert_after = '' text = text[:ent.offset + offset] + insert_before + \ entity_text + insert_after + \ text[len(entity_text) + ent.offset + offset:] offset += len(insert_before) + len(insert_after) return text async def update_issue_handler( update: Update, context: ContextTypes.DEFAULT_TYPE, status_code: int, reply_markup: InlineKeyboardMarkup | None = None) -> None: is_updated = False message_text = None is_photo_message = getattr(update.callback_query.message, 'caption', False) user_data = extract_user_data_from_update(update) message_text = reformat_message_text(update.callback_query.message) issue_id = get_issue_id(update, message_text) is_updated, text = await update_issue(issue_id, user_data['user_id'], status_code) if is_updated: tg_text = message_text if re.search( text, message_text) else f'{message_text}\nšŸ”„ {text}' ########## if status_code == IssueStatus.OPEN: tg_text = f'{message_text}\n\nšŸ‘Øā€šŸ”§ {text}' if issue_id: reply_markup = make_keyboard_for_opened_informing_issue( issue_id) if status_code == IssueStatus.CLOSED: tg_text = f'{message_text}\nšŸ‘ŒšŸ» {text}' ######## if is_photo_message: if (tg_text != message_text): await update.callback_query.message.edit_caption( caption=tg_text, reply_markup=reply_markup, # type: ignore parse_mode='HTML', ) # if status_code == IssueStatus.CLOSED: # replace photo, !unreliable - media_id is changing # await update.callback_query.message.edit_media(media=InputMediaPhoto( # "AgACAgIAAxkBAAEJiGhivB39CfFmaQgM-NlLOhAvLJcxbAACYLwxG4Q-4Um2Qjq1z5GGOAEAAwIAA3MAAykE", # file_id of uploaded empty stub image, uploaded in chat with bot # caption=tg_text, # parse_mode='HTML', # )) else: if (tg_text != message_text): await update.callback_query.message.edit_text( text=tg_text, reply_markup=reply_markup, # type: ignore parse_mode='HTML', ) else: await context.bot.send_message( text=f'āš ļø {text}', chat_id=user_data['user_id'], protect_content=True, ) async def open_issue(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: issue_id = get_issue_id(update) reply_markup = None if issue_id: reply_markup = make_keyboard_for_opened_informing_issue(issue_id) await update_issue_handler(update, context, IssueStatus.OPEN, reply_markup) async def close_issue(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: await update_issue_handler(update, context, IssueStatus.CLOSED) async def attach_file(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: user_data = extract_user_data_from_update(update) issue_id: int | None = None text = _( 'Cant recognize issue_id. Press and hold command in menu, then input issue id after command, for example /attach 0000' ) if context.args: try: issue_id = int(context.args[0]) except Exception: pass else: issue_id = get_issue_id(update) user = await get_tg_user_username(tg_user_id=user_data['user_id']) if user: if issue_id: await context.bot.send_message( text=_('Attach a photo to the issue #%(issue_id)s') % {'issue_id': issue_id}, chat_id=user_data['user_id'], protect_content=True, reply_markup=ForceReply(), ) return 0 else: text = _('You are not logged in. Use /login') await context.bot.send_message( text=text, chat_id=user_data['user_id'], protect_content=True, ) return ConversationHandler.END async def upload_photo_redmine(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: photo = getattr(update.message, 'photo', None) document = getattr(update.message, 'document', None) user_data = extract_user_data_from_update(update) caption = getattr(update.message, 'caption', '') text = None file_id = None # del reply message to clear chat await context.bot.delete_message(update.message.chat.id, update.message.message_id) if photo: file_id = photo[-1]['file_id'] elif document: file_id = document['file_id'] if file_id: reply_to_message = getattr(update.message, 'reply_to_message', {'text': ''}) issue_id = get_issue_id(update, reply_to_message['text']) file = await context.bot.get_file(file_id) file_data = await file.download_as_bytearray() file_data_url = image_data_to_data_url(file_data) if issue_id: if file_data_url: upload_result = await attach_file_redmine( issue_id, user_data['user_id'], caption, file_data_url, ) text = upload_result[1] else: text = _('There must be a photo! Try again from the beginning') else: text = _('Unable to parse issue_id, please try to reply again') else: text = _('There must be a photo! Try again from the beginning') if text: await context.bot.send_message( text=text, chat_id=user_data['user_id'], protect_content=True, ) return ConversationHandler.END