finance-watcher / database / finance_database.py
finance_database.py
Raw
import psycopg2
from database.database_classes import (
    AccountType, Account, Business, BusinessType, Transaction, User, Budget
)
from finance_watcher_dataclasses.enums import AccountTypeEnum
from typing import Tuple, List, Optional, Dict
from database.sql_queries import *
from datetime import datetime
from helpers.config import Config

class DatabaseConfig:
    def __init__(self):
        config = Config()
        self.host = config.host
        self.port = config.port
        self.database = config.database
        self.user = config.user
        self.password = config.password


class FinanceDatabase:
    def __init__(self, config: Optional[DatabaseConfig] = None):
        if config is None:
            self.__config = DatabaseConfig()
        else:
            self.__config = config

    def _run_query(self, query: str) -> List[Tuple]:
        connection = psycopg2.connect(
            host=self.__config.host,
            port=self.__config.port,
            database=self.__config.database,
            user=self.__config.user,
            password=self.__config.password,
        )
        cursor = connection.cursor()
        
        cursor.execute(
            query
        )
        connection.commit()
        query_results= [
            tuple(query_row) for query_row in cursor
        ]
        cursor.close()
        return query_results
    
    
    def ensure_account_type(
        self, account_type: AccountTypeEnum
    ) -> Optional[AccountType]:
        """
        Inserts an Account Type, only if, it is not there already.

        :param account_type: account type
        """
        account_type_name = account_type.name
        if account_type_name is None:
            return None
        query = get_ensure_account_type_query(
            name=account_type_name,
            is_income=AccountTypeEnum.is_income(account_type),
            is_expense=AccountTypeEnum.is_expense(account_type),
        )
        results = self._run_query(
            query=query
        )

        if len(results) != 1:
            return None

        return AccountType(
            id=results[0][0],
            name=results[0][1],
            is_income=results[0][2],
            is_expense=results[0][3]
        )

    def insert_default_business_types(self):
        """
        Inserts all business types.
        """
        query = get_insert_default_business_types()
        self._run_query(query=query)

    def ensure_business(
        self,
        name: str,
        business_type_id: int
    ) -> Optional[Business]:
        """
        Inserts a Business, only if, it is not there already.

        :param name: name of the business
        :param business_type_id: id of the business type
        """
        query = get_ensure_business_query(
            name, business_type_id
        )
        results = self._run_query(query)
        if len(results) == 0:
            return None
        result_business = results[0]
        return Business(
            id=result_business[0],
            name=result_business[1],
            business_type_id=result_business[2]
        )
    
    def get_business_type(
        self, business_type: BusinessTypeEnum
    ) -> Optional[BusinessType]:
        """
        Gets the Business Type based on the enumerator equivalent.
        Returns None if failed to get the business type.

        :param business_type: business type enumerator
        """
        results = self._run_query(get_business_type_query(business_type.value))
        if len(results) == 0:
            return None
        return BusinessType(
            id=results[0][0],
            name=results[0][1]
        )
    
    def get_dashboard_user(self, user_id: int) -> Optional[User]:
        """
        Gets the Dashboard user by it's id.
        Returns None if failed to get the user.

        :param user_id: id of the user
        """
        results = self._run_query(
            get_dashboard_user_query(user_id)
        )
        if len(results) == 0:
            return None
        
        result_found = results[0]

        user = User(
            id=result_found[0],
            name=result_found[1],
            created_date=result_found[2],
        )
        if len(result_found) == 4:
            user.updated_date = result_found[3]

        return user
    
    def update_user_password(self, user_id: int, password: str) -> bool:
        """
        Updates the password of a User.

        :param user_id: id of the user
        :param password: new password
        """
        query = get_update_user_password_query(user_id, password)
        
        results = self._run_query(query)

        return len(results) == 1

    def get_transactions(
        self,
        user_id: int,
        page: int = 1,
        business_type_id: Optional[int] = None,
        start_date: Optional[datetime] = None,
        end_date: Optional[datetime] = None
    ) -> Optional[List[Transaction]]:
        """
        Gets a list of transactions from a User.
        Returns None if failed to get a list of transactions.

        :param user_id: id of the user
        :param page: page number of transactions (to not return ALL transactions at once if too many)
        :param business_type_id: filter transactions based on the type of transaction.
        """
        if (start_date and end_date) or (not start_date and not end_date):
            if (start_date and end_date) and start_date > end_date:
                print("Error: The start date is later than the end date.")
                return None
            
            try:
                results = self._run_query(
                    get_transactions_query(
                        user_id, page, business_type_id, start_date, end_date
                    )
                )
                
                return [
                    Transaction(
                        id=result[0],
                        dashboard_user_id=result[1],
                        amount=result[2],
                        business_id=result[3],
                        is_expense=result[4],
                        created_date=result[5],
                    )
                    for result in results
                ]
            except:
                print("Error: A date is included. Either both or none.")
                return None
    
    def get_account_type(
        self, 
        account_type: AccountTypeEnum
    ) -> Optional[AccountType]:
        """
        Gets the Account Type based on the enumerator equivalent.
        Returns None if failed to get the account type.

        :param account_type: account type enumerator
        """
        results = self._run_query(
            get_account_type_query(account_type.name)
        )
        if len(results) == 0:
            return None
        return AccountType(
            id=results[0][0],
            name=results[0][1],
            is_income=results[0][2],
            is_expense=results[0][3]
        )
    
    def ensure_account(
        self, name: str, account_type_id: int, dashboard_user_id: int
    ) -> Optional[Account]:
        """
        Inserts a Account, only if, it is not there already.

        :param dashboard_user_id: id of the user
        :param name: name of the account
        :param account_type_id: id of the account type
        """
        query = get_ensure_account_query(
            name=name,
            account_type_id=account_type_id,
            dashboard_user_id=dashboard_user_id
        )
        results = self._run_query(
            query=query
        )
        if len(results) == 0:
            return None
        result_account = results[0]
        updated_date = None
        if updated_date:
            updated_date=result_account[4]
        return Account(
            id=result_account[0],
            dashboard_user_id=result_account[5],
            name=result_account[1],
            account_type_id=result_account[2],
            created_date=result_account[3],
            updated_date=updated_date
        )
    
    def get_all_accounts_and_latest_totals(
        self, user_id: int
    ) -> Dict[int, Tuple[str, float]]:
        """
        Gets Accounts and Their latest totals of the user.
        Returns a dictionary with this format: 
        {
            <account_id: int>: (
                <account_name: str>, <account_total: int>
            )
        }

        :param user_id: id of the user
        """
        query = get_accounts_and_latest_total_queries(user_id)
        results = self._run_query(query)
        if len(results) == 0:
            return {}

        accounts_and_totals = {}
        for result in results:
            account_name, account_id, account_total = result
            accounts_and_totals[account_id] = (account_name, account_total)
        return accounts_and_totals

    def insert_dashboard_user(
        self, name: str, email: str, password: str
    ) -> Optional[User]:
        """
        Creates a user row into the database.

        :param name: name of the user
        :param email: email of the user
        :param password: password of the user (should be already encrypted)
        """
        results = self._run_query(
            get_insert_dashboard_user(name, email, password)
        )
        if len(results) == 0:
            return None
        return User(
            id=results[0][0],
            name=results[0][1],
            created_date=results[0][2],
            updated_date=None
        )
        
    def insert_account_totals(
        self, account_rows: List[AccountRow]
    ) -> bool:
        """
        Inserts new totals of accounts.

        :param account_rows: rows of account and their respective totals
        """
        if len(account_rows) == 0:
            return True
        query = get_insert_account_totals_query(
            account_rows=account_rows
        )
        return len(self._run_query(query)) != 0
    
    def get_latest_account_percentages(self, user_id: int) -> Dict[str, float]:
        """
        Gets the latest account percentages of a user.

        Returns a dictionary with this format:
        {
            <account_name: str>: <percentage: float>
        }

        :param user_id: id of the user
        """
        accounts_totals = self.get_all_accounts_and_latest_totals(user_id)
        account_percentages: Dict[str, float] = {}
        overall_total = 0.0
        for _, name_total in accounts_totals.items():
            name, total = name_total
            if total <0:
                continue
            account_percentages[name] = total
            overall_total += total

        for account, total in account_percentages.items():
            account_percentages[account] = total/overall_total

        return account_percentages
    
    def insert_account_types(
        self
    ) -> None:
        """
        Insert ALL default account types into the database.
        """
        query = get_insert_default_account_types_query()
        self._run_query(query=query)

    def insert_account(
        self,
        name: str,
        account_type_id: int,
        dashboard_user_id: int
    ) -> Optional[Account]:
        """
        Inserts an Account into the database, based on account type and user.

        :param name: name of the account
        :param account_type: type of the account
        :param dashboard_user_id: id of the user
        """
        query = get_insert_account_query(name, account_type_id, dashboard_user_id)
        results = self._run_query(query=query)
        if len(results) == 0:
            return None
        return Account(
            id=results[0][0],
            name=results[0][1],
            account_type_id=results[0][2],
            created_date=results[0][3],
            dashboard_user_id=results[0][5],
            updated_date=None
        )

    def insert_business(
        self,
        name: str,
        business_type_id: int
    ) -> Optional[Business]:
        """
        Insert a single business into the database.

        :param name: name of the business
        :param business_type_id: id of the business type
        """
        query = get_insert_business_query(name, business_type_id)
        results = self._run_query(query=query)
        if len(results) == 0:
            return None
        return Business(
            id=results[0][0],
            name=results[0][1],
            business_type_id=results[0][2],
        )
    
    def insert_businesses(
        self,
        names_business_type_id: dict
    ) -> Optional[List[Business]]:
        """
        Insert multiple businesses into the database.

        :param name_business_type_id: dictionary of name to business_type_id
        """
        query = get_insert_businesses_query(names_business_type_id)
        results = self._run_query(query=query)
        if len(results) == 0:
            return None
        businesses_inserted = []
        for result in results:
            businesses_inserted.append(
                Business(
                    id=result[0],
                    name=result[1],
                    business_type_id=result[2],
                )
            )
        return businesses_inserted

    def insert_transaction(
        self,
        amount: int,
        business_id: int,
        is_expense: bool,
        created_date: datetime = datetime.now()
    ) -> bool:
        """
        Inserts a transaction into the database.

        :param amount: amount in the transaction
        :param business_id: id of the business
        :param is_expense: bool if the transaction was an expense
        :param created_date: date of the transaction (default: current date)
        """
        query = get_insert_transaction(
            amount, business_id, is_expense, created_date
        )
        results = self._run_query(query)
        return len(results) == 1
    
    def insert_transactions(
        self,
        transactions: List[Transaction]
    ) -> bool:
        """
        Insert transactions into the database.

        :param transactions: transactions to insert
        """
        values_to_insert = [
            {
               'dashboard_user_id': transaction.dashboard_user_id,
               'amount': transaction.amount,
               'business_id': transaction.business_id,
               'created_date': transaction.created_date
            } for transaction in transactions
        ]
        query = get_insert_transactions_query(
            values_to_insert
        )
        results = self._run_query(query=query)

        return len(results) != 0
    

    def check_account_connection(
        self,
        dashboard_user_id: int,
        account_ids: List[int]
    ) -> bool:
        """
        Checks if the account is connected to the specified user.

        Return True if all accounts given are related to the user.
        Returns False if ONE account is not related to the user.

        :param dashboard_user_id: id of the user
        :param account_ids: ids of accounts
        """
        query = get_account_connection_query(dashboard_user_id, account_ids)
        results = self._run_query(query=query)
        return len(results) == len(account_ids)
    
    def get_budgets(
        self,
        dashboard_user_id: int,
        page: int = 1,
    ) -> Optional[List[Budget]]:
        """
        Gets a list of budgets from a User.

        :param dashboard_user_id: id of the user
        :param page: page number of transactions (to not return ALL transactions at once if too many)
        """
        query = get_budget_query(
            dashboard_user_id=dashboard_user_id,
            page=page
        )

        results = self._run_query(query)
        if results is None:
            return None

        budgets = []
        for result in results:

            budget_obj = Budget(
                id=result[0],
                name=result[1],
                amount=result[2],
                dashboard_user_id=result[3],
                business_type_id=result[4],
                created_date=result[5],
            )
            if len(results) == 7:
                budget_obj.updated_date = result[6]

            budgets.append(budget_obj)
        
        return budgets

    def create_budget(
        self,
        dashboard_user_id: int,
        name: str,
        amount: int,
        business_type_id: int
    ) -> Optional[Budget]:
        """
        Craetes a Budget that is related to a User.

        :param dashboard_user_id: id of the user
        :param name: name of the budget
        :param amount: amount of the budget
        :param business_type_id: id of the business type
        """
        query = get_create_budget_query(
            name=name,
            amount=amount,
            dashboard_user_id=dashboard_user_id,
            business_type_id=business_type_id
        )

        results = self._run_query(query)
        if len(results) == 0:
            return None
        inserted_budget = results[0]

        budget_obj = Budget(
            id=inserted_budget[0],
            name=inserted_budget[1],
            amount=inserted_budget[2],
            dashboard_user_id=inserted_budget[3],
            business_type_id=inserted_budget[4],
            created_date=inserted_budget[5],
        )
        if len(results) == 7:
            budget_obj.updated_date = inserted_budget[6]

        return budget_obj


    def update_budget(
        self,
        budget_id: int,
        name: Optional[str] = None,
        amount: Optional[int] = None,
        business_type_id: Optional[int] = None
    ) -> bool:
        """
        Updates properties of the Budget.
        If none of the parameters are given, then it would not be updated.

        :param budget_id: id of the budget
        :param name: new name to update to
        :param amount: new amount to update to
        :param business_type_id: new business type to update to
        """
        
        # if all of the variables are None, no action needed.
        if all(var is None for var in [name, amount, business_type_id]):
            return True
        
        query = get_update_budget_query(
            budget_id=budget_id,
            name=name,
            amount=amount,
            business_type_id=business_type_id,
        )
        try:
            results = self._run_query(query)
            if results is None:
                return False
            
            return True
        except Exception:
            return False
        
    def delete_budget(
        self,
        dashboard_user_id: int,
        name: str
    )-> bool:
        """
        Deletes a Budget of a User.

        :param dashboard_user_id: id of the user
        :param name: name of the budget
        """
        
        query = get_delete_budget_query(
            user_id=dashboard_user_id,
            name=name
        )

        try:
            results = self._run_query(query)
            if results is None:
                return False
            return True
        except Exception:
            return False

    def get_user_by_email(
        self,
        email: str
    ) -> Optional[User]:
        """
        Get user information by email.

        :param email: email of the user
        """
        query = get_user_by_email_query(email)

        try:
            results = self._run_query(query)
            if results is None:
                return None
            result_found = results[0]
            return User(
                id=result_found[0],
                name=result_found[1],
                created_date=result_found[2]
            )
        except Exception:
            return None
        
    def check_user(
        self, dashboard_user_id: int, password: str
    ) -> bool:
        """
        Check if the login provided is correct.

        :param dashboard_user_id: id of the desired user
        :param password: password to access user information
        """
        query = get_check_user_query(dashboard_user_id, password)

        results = self._run_query(query)

        return len(results) == 1