production-taskbar / backend / tgbot / handlers / onboarding / handlers.py
handlers.py
Raw
from typing import Any

from django.utils.translation import gettext as _
from telegram import ForceReply, ReplyKeyboardRemove, Update
from telegram.constants import ParseMode
from telegram.ext import ContextTypes, ConversationHandler
from tgbot.handlers.onboarding.keyboards import (
    make_keyboard_for_login, make_keyboard_for_new_user,
    make_keyboard_for_start_command)
from tgbot.handlers.utils.django import (del_tg_user, get_or_create_tg_user,
                                         get_tg_user_username)
from tgbot.handlers.utils.django_info import get_users
from tgbot.handlers.utils.info import extract_user_data_from_update

HASH = range(1)


async def command_start(
        update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:    #NOSONAR, context must be
    user_data = extract_user_data_from_update(update)
    username = await get_tg_user_username(user_data['user_id'])

    if username:
        text = _('Hello %(user)s.') % {'user': username}
        reply_markup = make_keyboard_for_start_command()
    else:
        text = _('Hello user. Login first')
        reply_markup = make_keyboard_for_new_user()
    await update.message.reply_text(text=text, reply_markup=reply_markup)


async def show_login_guide(update: Update,
                           context: ContextTypes.DEFAULT_TYPE) -> None:
    user_id = extract_user_data_from_update(update)['user_id']
    message = update.callback_query.message
    text = _(
        'Login to sebn-taskbar-manager, in Features click on Telegram password, which will open page with your telegram password.'
    )
    if (text != message.text):
        await context.bot.edit_message_text(
            text=text,
            chat_id=user_id,
            message_id=message.message_id,
            reply_markup=make_keyboard_for_login(),
            parse_mode=ParseMode.HTML,
        )


async def command_login(update: Update,
                        context: ContextTypes.DEFAULT_TYPE) -> Any:
    user_id = extract_user_data_from_update(update)['user_id']
    await context.bot.send_message(
        text=_('Enter your password to login'),
        chat_id=user_id,
        protect_content=True,
        reply_markup=ForceReply(),
    )
    return HASH


async def hash_handler(update: Update,
                       context: ContextTypes.DEFAULT_TYPE) -> Any:
    tg_user = None
    user_data = extract_user_data_from_update(update)
    message = update.message
    await context.bot.delete_message(
        chat_id=user_data['user_id'],
        message_id=message.message_id,
    )
    users = await get_users()
    for user in users:
        password = getattr(user, 'password', None)
        if password:
            user_hash = user.password[-12::2]
            if (message.text == user_hash):
                tg_user = await get_or_create_tg_user(
                    user=user,
                    tg_user_id=user_data['user_id'],
                    username=user_data.get('username',
                                           user_data.get('first_name', None)))
                break

    if tg_user:
        await update.message.reply_text(
            f"โœ… {_('Hello %(user)s.') % {'user': tg_user}}")
    else:
        msg = _('Access denied.')
        await update.message.reply_text(f"โ›” {msg}")

    return ConversationHandler.END


async def command_logout(update: Update,
                         context: ContextTypes.DEFAULT_TYPE) -> None:
    user_id = extract_user_data_from_update(update)['user_id']
    msg = _('Bye.')
    await del_tg_user(user_id)
    await context.bot.send_message(
        text=f"๐Ÿ‘‹ {msg}",
        chat_id=user_id,
        protect_content=True,
    )


async def cancel(
        update: Update, context: ContextTypes.DEFAULT_TYPE
) -> Any:    #NOSONAR, context must be
    await update.message.reply_text(_('Cancel'),
                                    reply_markup=ReplyKeyboardRemove())
    return ConversationHandler.END


# async def text_handler(update: Update,
#                        context: ContextTypes.DEFAULT_TYPE) -> None:

#     print(f'update: {update}')
#     print(f'update.message: {update.message}')
#     print(f'update.callback_query: {update.callback_query}')

#     if update.effective_chat:
#         await context.bot.send_message(chat_id=update.effective_chat.id,
#                                        text=update.message.text)