from typing import Any from django.utils.translation import gettext as _ from telegram import ForceReply, ReplyKeyboardRemove, Update from telegram.constants import ParseMode from telegram.ext import ContextTypes, ConversationHandler from tgbot.handlers.onboarding.keyboards import ( make_keyboard_for_login, make_keyboard_for_new_user, make_keyboard_for_start_command) from tgbot.handlers.utils.django import (del_tg_user, get_or_create_tg_user, get_tg_user_username) from tgbot.handlers.utils.django_info import get_users from tgbot.handlers.utils.info import extract_user_data_from_update HASH = range(1) async def command_start( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: #NOSONAR, context must be user_data = extract_user_data_from_update(update) username = await get_tg_user_username(user_data['user_id']) if username: text = _('Hello %(user)s.') % {'user': username} reply_markup = make_keyboard_for_start_command() else: text = _('Hello user. Login first') reply_markup = make_keyboard_for_new_user() await update.message.reply_text(text=text, reply_markup=reply_markup) async def show_login_guide(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = extract_user_data_from_update(update)['user_id'] message = update.callback_query.message text = _( 'Login to sebn-taskbar-manager, in Features click on Telegram password, which will open page with your telegram password.' ) if (text != message.text): await context.bot.edit_message_text( text=text, chat_id=user_id, message_id=message.message_id, reply_markup=make_keyboard_for_login(), parse_mode=ParseMode.HTML, ) async def command_login(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Any: user_id = extract_user_data_from_update(update)['user_id'] await context.bot.send_message( text=_('Enter your password to login'), chat_id=user_id, protect_content=True, reply_markup=ForceReply(), ) return HASH async def hash_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Any: tg_user = None user_data = extract_user_data_from_update(update) message = update.message await context.bot.delete_message( chat_id=user_data['user_id'], message_id=message.message_id, ) users = await get_users() for user in users: password = getattr(user, 'password', None) if password: user_hash = user.password[-12::2] if (message.text == user_hash): tg_user = await get_or_create_tg_user( user=user, tg_user_id=user_data['user_id'], username=user_data.get('username', user_data.get('first_name', None))) break if tg_user: await update.message.reply_text( f"✅ {_('Hello %(user)s.') % {'user': tg_user}}") else: msg = _('Access denied.') await update.message.reply_text(f"⛔ {msg}") return ConversationHandler.END async def command_logout(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = extract_user_data_from_update(update)['user_id'] msg = _('Bye.') await del_tg_user(user_id) await context.bot.send_message( text=f"👋 {msg}", chat_id=user_id, protect_content=True, ) async def cancel( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> Any: #NOSONAR, context must be await update.message.reply_text(_('Cancel'), reply_markup=ReplyKeyboardRemove()) return ConversationHandler.END # async def text_handler(update: Update, # context: ContextTypes.DEFAULT_TYPE) -> None: # print(f'update: {update}') # print(f'update.message: {update.message}') # print(f'update.callback_query: {update.callback_query}') # if update.effective_chat: # await context.bot.send_message(chat_id=update.effective_chat.id, # text=update.message.text)