import logging from os import environ from typing import Any, Dict, Union from warnings import filterwarnings from django.utils.translation import gettext_lazy as _ from telegram import (Bot, BotCommand, ForceReply, InlineKeyboardMarkup, ReplyKeyboardMarkup, ReplyKeyboardRemove) from telegram.ext import (Application, ApplicationBuilder, CallbackQueryHandler, CommandHandler, ConversationHandler, MessageHandler, filters) from telegram.warnings import PTBUserWarning from tgbot.handlers.helpdesk import handlers as helpdesk_handlers from tgbot.handlers.onboarding.handlers import (cancel, command_login, command_logout, command_start, hash_handler, show_login_guide, HASH) from tgbot.handlers.utils.util_handlers import (command_unknown, handle_stacktrace) filterwarnings(action="ignore", message=r".*CallbackQueryHandler", category=PTBUserWarning) TOKEN = environ.get('TELEGRAM_BOT_TOKEN', None) def get_bot() -> Bot | None: if TOKEN: bot = Bot(TOKEN) return bot return None async def set_up_commands(bot_instance: Bot) -> None: langs_with_commands: Dict[str, Dict[str, str]] = { 'en': { 'start': '🚀 Start', 'login': '🔑 Login', 'attach': '📷 Add photo to redmine issue', 'logout': '👋 Logout', 'cancel': '❌ Cancel operation', }, 'ru': { 'start': '🚀 Старт', 'login': '🔑 Войти', 'attach': '📷 Добавить фото к redmine issue', 'logout': '👋 Выйти', 'cancel': '❌ Отменить операцию', }, 'uk': { 'start': '🚀 Старт', 'login': '🔑 Залогуватись', 'attach': '📷 Додати фото до redmine issue', 'logout': '👋 Вилогуватись', 'cancel': '❌ Скасувати операцію', } } await bot_instance.delete_my_commands() for language_code in langs_with_commands: await bot_instance.set_my_commands( language_code=language_code, commands=[ BotCommand(command, description) for command, description in langs_with_commands[language_code].items() ]) async def setup_application( ) -> Application[Any, Any, Any, Any, Any, Any] | None: if TOKEN: app = ApplicationBuilder().token(TOKEN).build() # onboarding start_handler = CommandHandler('start', command_start) login_handler = CommandHandler('login', command_login) cancel_handler = CommandHandler('cancel', cancel) login_callback = CallbackQueryHandler(command_login, pattern='command_login') app.add_handler(start_handler) app.add_handler(cancel_handler) app.add_handler( CallbackQueryHandler(show_login_guide, pattern='show-login-guide')) # Login conversation login_conversation_handler = ConversationHandler( entry_points=[login_handler, login_callback], states={ HASH: [MessageHandler(filters.TEXT, hash_handler)], }, fallbacks=[cancel_handler], allow_reentry=True, per_user=True, per_message=False, ) app.add_handler(login_conversation_handler) # logout logout_handler = CommandHandler('logout', command_logout) logout_callback_handler = CallbackQueryHandler( command_logout, pattern='command_logout', ) app.add_handler(logout_callback_handler) app.add_handler(logout_handler) # helpdesk app.add_handler( CallbackQueryHandler( helpdesk_handlers.open_issue, pattern='open_issue', )) app.add_handler( CallbackQueryHandler( helpdesk_handlers.close_issue, pattern='close_issue', )) # Attach file conversation attach_file_callback = CallbackQueryHandler( helpdesk_handlers.attach_file, pattern='^attach_file \\d+') attach_file_handler = CommandHandler('attach', helpdesk_handlers.attach_file) file_conversation_handler = ConversationHandler( entry_points=[attach_file_callback, attach_file_handler], states={ 0: [ MessageHandler(filters.ALL, helpdesk_handlers.upload_photo_redmine) ], }, fallbacks=[cancel_handler], allow_reentry=True, per_user=True, per_message=False, per_chat=False, ) app.add_handler(file_conversation_handler) app.add_handler(attach_file_handler) # utility unknown_handler = MessageHandler(filters.COMMAND, command_unknown) app.add_handler(unknown_handler) app.add_error_handler(handle_stacktrace) await app.bot.initialize() await set_up_commands(app.bot) await app.bot.shutdown() return app return None async def send_message(chat_id: int, text: str, reply_markup: Union[InlineKeyboardMarkup, ReplyKeyboardMarkup, ReplyKeyboardRemove, ForceReply, None] = None, image: Any = None, api_kwargs: Dict[str, Any] = {}) -> Any: try: bot = get_bot() if bot: if image: return await bot.send_photo( chat_id=chat_id, photo=image, caption=text, parse_mode='HTML', reply_markup=reply_markup, protect_content=True, api_kwargs=api_kwargs, ) else: return await bot.send_message( chat_id=chat_id, text=text, parse_mode='HTML', reply_markup=reply_markup, protect_content=True, api_kwargs=api_kwargs, ) return {'ok': False, 'result': 'telegram api not configured'} except Exception as e: logging.error(f"Telegram send_message error: {e}")