import re
from django.utils.translation import gettext as _
from telegram import (ForceReply, InlineKeyboardMarkup, InputMediaPhoto,
Message, MessageEntity, Update)
from telegram.ext import ContextTypes, ConversationHandler
from config.utils.pillow import image_data_to_data_url
from tgbot.handlers.helpdesk.keyboard import \
make_keyboard_for_opened_informing_issue
from tgbot.handlers.utils.django import (IssueStatus, attach_file_redmine,
get_tg_user_username, update_issue)
from tgbot.handlers.utils.info import extract_user_data_from_update
def parse_issue_id(text: str) -> int | None:
issue_pattern = '^\w+.*#(\d+)'
result = re.search(issue_pattern, text, re.IGNORECASE)
if result:
return int(result[1])
return None
def get_issue_id(update: Update, message_text: str = '') -> int | None:
try:
return int(update.callback_query.data.split()[1])
except Exception:
return parse_issue_id(message_text)
# Save text formatting for entities (bold text and href)
def reformat_message_text(message: Message) -> str:
text = ''
msg_entities = None
parsed_entities = None
offset = 0
types = [
MessageEntity.BOLD,
MessageEntity.TEXT_LINK,
MessageEntity.ITALIC,
]
if getattr(message, 'caption', False):
text = message.caption
msg_entities = message.caption_entities
parsed_entities = message.parse_caption_entities(types)
else:
text = message.text
msg_entities = message.entities
parsed_entities = message.parse_entities(types)
if msg_entities and parsed_entities:
for ent, entity_text in parsed_entities.items():
insert_before = ''
insert_after = ''
if ent.type == MessageEntity.TEXT_LINK:
insert_before = f''
insert_after = ''
if ent.type == MessageEntity.BOLD:
insert_before = ''
insert_after = ''
if ent.type == MessageEntity.ITALIC:
insert_before = ''
insert_after = ''
text = text[:ent.offset + offset] + insert_before + \
entity_text + insert_after + \
text[len(entity_text) + ent.offset + offset:]
offset += len(insert_before) + len(insert_after)
return text
async def update_issue_handler(
update: Update,
context: ContextTypes.DEFAULT_TYPE,
status_code: int,
reply_markup: InlineKeyboardMarkup | None = None) -> None:
is_updated = False
message_text = None
is_photo_message = getattr(update.callback_query.message, 'caption', False)
user_data = extract_user_data_from_update(update)
message_text = reformat_message_text(update.callback_query.message)
issue_id = get_issue_id(update, message_text)
is_updated, text = await update_issue(issue_id, user_data['user_id'],
status_code)
if is_updated:
tg_text = message_text if re.search(
text, message_text) else f'{message_text}\nš {text}'
##########
if status_code == IssueStatus.OPEN:
tg_text = f'{message_text}\n\nšØāš§ {text}'
if issue_id:
reply_markup = make_keyboard_for_opened_informing_issue(
issue_id)
if status_code == IssueStatus.CLOSED:
tg_text = f'{message_text}\nšš» {text}'
########
if is_photo_message:
if (tg_text != message_text):
await update.callback_query.message.edit_caption(
caption=tg_text,
reply_markup=reply_markup, # type: ignore
parse_mode='HTML',
)
# if status_code == IssueStatus.CLOSED: # replace photo, !unreliable - media_id is changing
# await update.callback_query.message.edit_media(media=InputMediaPhoto(
# "AgACAgIAAxkBAAEJiGhivB39CfFmaQgM-NlLOhAvLJcxbAACYLwxG4Q-4Um2Qjq1z5GGOAEAAwIAA3MAAykE", # file_id of uploaded empty stub image, uploaded in chat with bot
# caption=tg_text,
# parse_mode='HTML',
# ))
else:
if (tg_text != message_text):
await update.callback_query.message.edit_text(
text=tg_text,
reply_markup=reply_markup, # type: ignore
parse_mode='HTML',
)
else:
await context.bot.send_message(
text=f'ā ļø {text}',
chat_id=user_data['user_id'],
protect_content=True,
)
async def open_issue(update: Update,
context: ContextTypes.DEFAULT_TYPE) -> None:
issue_id = get_issue_id(update)
reply_markup = None
if issue_id:
reply_markup = make_keyboard_for_opened_informing_issue(issue_id)
await update_issue_handler(update, context, IssueStatus.OPEN, reply_markup)
async def close_issue(update: Update,
context: ContextTypes.DEFAULT_TYPE) -> None:
await update_issue_handler(update, context, IssueStatus.CLOSED)
async def attach_file(update: Update,
context: ContextTypes.DEFAULT_TYPE) -> int:
user_data = extract_user_data_from_update(update)
issue_id: int | None = None
text = _(
'Cant recognize issue_id. Press and hold command in menu, then input issue id after command, for example /attach 0000'
)
if context.args:
try:
issue_id = int(context.args[0])
except Exception:
pass
else:
issue_id = get_issue_id(update)
user = await get_tg_user_username(tg_user_id=user_data['user_id'])
if user:
if issue_id:
await context.bot.send_message(
text=_('Attach a photo to the issue #%(issue_id)s') %
{'issue_id': issue_id},
chat_id=user_data['user_id'],
protect_content=True,
reply_markup=ForceReply(),
)
return 0
else:
text = _('You are not logged in. Use /login')
await context.bot.send_message(
text=text,
chat_id=user_data['user_id'],
protect_content=True,
)
return ConversationHandler.END
async def upload_photo_redmine(update: Update,
context: ContextTypes.DEFAULT_TYPE) -> int:
photo = getattr(update.message, 'photo', None)
document = getattr(update.message, 'document', None)
user_data = extract_user_data_from_update(update)
caption = getattr(update.message, 'caption', '')
text = None
file_id = None
# del reply message to clear chat
await context.bot.delete_message(update.message.chat.id,
update.message.message_id)
if photo:
file_id = photo[-1]['file_id']
elif document:
file_id = document['file_id']
if file_id:
reply_to_message = getattr(update.message, 'reply_to_message',
{'text': ''})
issue_id = get_issue_id(update, reply_to_message['text'])
file = await context.bot.get_file(file_id)
file_data = await file.download_as_bytearray()
file_data_url = image_data_to_data_url(file_data)
if issue_id:
if file_data_url:
upload_result = await attach_file_redmine(
issue_id,
user_data['user_id'],
caption,
file_data_url,
)
text = upload_result[1]
else:
text = _('There must be a photo! Try again from the beginning')
else:
text = _('Unable to parse issue_id, please try to reply again')
else:
text = _('There must be a photo! Try again from the beginning')
if text:
await context.bot.send_message(
text=text,
chat_id=user_data['user_id'],
protect_content=True,
)
return ConversationHandler.END