import psycopg2 from database.database_classes import ( AccountType, Account, Business, BusinessType, Transaction, User, Budget ) from finance_watcher_dataclasses.enums import AccountTypeEnum from typing import Tuple, List, Optional, Dict from database.sql_queries import * from datetime import datetime from helpers.config import Config class DatabaseConfig: def __init__(self): config = Config() self.host = config.host self.port = config.port self.database = config.database self.user = config.user self.password = config.password class FinanceDatabase: def __init__(self, config: Optional[DatabaseConfig] = None): if config is None: self.__config = DatabaseConfig() else: self.__config = config def _run_query(self, query: str) -> List[Tuple]: connection = psycopg2.connect( host=self.__config.host, port=self.__config.port, database=self.__config.database, user=self.__config.user, password=self.__config.password, ) cursor = connection.cursor() cursor.execute( query ) connection.commit() query_results= [ tuple(query_row) for query_row in cursor ] cursor.close() return query_results def ensure_account_type( self, account_type: AccountTypeEnum ) -> Optional[AccountType]: """ Inserts an Account Type, only if, it is not there already. :param account_type: account type """ account_type_name = account_type.name if account_type_name is None: return None query = get_ensure_account_type_query( name=account_type_name, is_income=AccountTypeEnum.is_income(account_type), is_expense=AccountTypeEnum.is_expense(account_type), ) results = self._run_query( query=query ) if len(results) != 1: return None return AccountType( id=results[0][0], name=results[0][1], is_income=results[0][2], is_expense=results[0][3] ) def insert_default_business_types(self): """ Inserts all business types. """ query = get_insert_default_business_types() self._run_query(query=query) def ensure_business( self, name: str, business_type_id: int ) -> Optional[Business]: """ Inserts a Business, only if, it is not there already. :param name: name of the business :param business_type_id: id of the business type """ query = get_ensure_business_query( name, business_type_id ) results = self._run_query(query) if len(results) == 0: return None result_business = results[0] return Business( id=result_business[0], name=result_business[1], business_type_id=result_business[2] ) def get_business_type( self, business_type: BusinessTypeEnum ) -> Optional[BusinessType]: """ Gets the Business Type based on the enumerator equivalent. Returns None if failed to get the business type. :param business_type: business type enumerator """ results = self._run_query(get_business_type_query(business_type.value)) if len(results) == 0: return None return BusinessType( id=results[0][0], name=results[0][1] ) def get_dashboard_user(self, user_id: int) -> Optional[User]: """ Gets the Dashboard user by it's id. Returns None if failed to get the user. :param user_id: id of the user """ results = self._run_query( get_dashboard_user_query(user_id) ) if len(results) == 0: return None result_found = results[0] user = User( id=result_found[0], name=result_found[1], created_date=result_found[2], ) if len(result_found) == 4: user.updated_date = result_found[3] return user def update_user_password(self, user_id: int, password: str) -> bool: """ Updates the password of a User. :param user_id: id of the user :param password: new password """ query = get_update_user_password_query(user_id, password) results = self._run_query(query) return len(results) == 1 def get_transactions( self, user_id: int, page: int = 1, business_type_id: Optional[int] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Optional[List[Transaction]]: """ Gets a list of transactions from a User. Returns None if failed to get a list of transactions. :param user_id: id of the user :param page: page number of transactions (to not return ALL transactions at once if too many) :param business_type_id: filter transactions based on the type of transaction. """ if (start_date and end_date) or (not start_date and not end_date): if (start_date and end_date) and start_date > end_date: print("Error: The start date is later than the end date.") return None try: results = self._run_query( get_transactions_query( user_id, page, business_type_id, start_date, end_date ) ) return [ Transaction( id=result[0], dashboard_user_id=result[1], amount=result[2], business_id=result[3], is_expense=result[4], created_date=result[5], ) for result in results ] except: print("Error: A date is included. Either both or none.") return None def get_account_type( self, account_type: AccountTypeEnum ) -> Optional[AccountType]: """ Gets the Account Type based on the enumerator equivalent. Returns None if failed to get the account type. :param account_type: account type enumerator """ results = self._run_query( get_account_type_query(account_type.name) ) if len(results) == 0: return None return AccountType( id=results[0][0], name=results[0][1], is_income=results[0][2], is_expense=results[0][3] ) def ensure_account( self, name: str, account_type_id: int, dashboard_user_id: int ) -> Optional[Account]: """ Inserts a Account, only if, it is not there already. :param dashboard_user_id: id of the user :param name: name of the account :param account_type_id: id of the account type """ query = get_ensure_account_query( name=name, account_type_id=account_type_id, dashboard_user_id=dashboard_user_id ) results = self._run_query( query=query ) if len(results) == 0: return None result_account = results[0] updated_date = None if updated_date: updated_date=result_account[4] return Account( id=result_account[0], dashboard_user_id=result_account[5], name=result_account[1], account_type_id=result_account[2], created_date=result_account[3], updated_date=updated_date ) def get_all_accounts_and_latest_totals( self, user_id: int ) -> Dict[int, Tuple[str, float]]: """ Gets Accounts and Their latest totals of the user. Returns a dictionary with this format: { : ( , ) } :param user_id: id of the user """ query = get_accounts_and_latest_total_queries(user_id) results = self._run_query(query) if len(results) == 0: return {} accounts_and_totals = {} for result in results: account_name, account_id, account_total = result accounts_and_totals[account_id] = (account_name, account_total) return accounts_and_totals def insert_dashboard_user( self, name: str, email: str, password: str ) -> Optional[User]: """ Creates a user row into the database. :param name: name of the user :param email: email of the user :param password: password of the user (should be already encrypted) """ results = self._run_query( get_insert_dashboard_user(name, email, password) ) if len(results) == 0: return None return User( id=results[0][0], name=results[0][1], created_date=results[0][2], updated_date=None ) def insert_account_totals( self, account_rows: List[AccountRow] ) -> bool: """ Inserts new totals of accounts. :param account_rows: rows of account and their respective totals """ if len(account_rows) == 0: return True query = get_insert_account_totals_query( account_rows=account_rows ) return len(self._run_query(query)) != 0 def get_latest_account_percentages(self, user_id: int) -> Dict[str, float]: """ Gets the latest account percentages of a user. Returns a dictionary with this format: { : } :param user_id: id of the user """ accounts_totals = self.get_all_accounts_and_latest_totals(user_id) account_percentages: Dict[str, float] = {} overall_total = 0.0 for _, name_total in accounts_totals.items(): name, total = name_total if total <0: continue account_percentages[name] = total overall_total += total for account, total in account_percentages.items(): account_percentages[account] = total/overall_total return account_percentages def insert_account_types( self ) -> None: """ Insert ALL default account types into the database. """ query = get_insert_default_account_types_query() self._run_query(query=query) def insert_account( self, name: str, account_type_id: int, dashboard_user_id: int ) -> Optional[Account]: """ Inserts an Account into the database, based on account type and user. :param name: name of the account :param account_type: type of the account :param dashboard_user_id: id of the user """ query = get_insert_account_query(name, account_type_id, dashboard_user_id) results = self._run_query(query=query) if len(results) == 0: return None return Account( id=results[0][0], name=results[0][1], account_type_id=results[0][2], created_date=results[0][3], dashboard_user_id=results[0][5], updated_date=None ) def insert_business( self, name: str, business_type_id: int ) -> Optional[Business]: """ Insert a single business into the database. :param name: name of the business :param business_type_id: id of the business type """ query = get_insert_business_query(name, business_type_id) results = self._run_query(query=query) if len(results) == 0: return None return Business( id=results[0][0], name=results[0][1], business_type_id=results[0][2], ) def insert_businesses( self, names_business_type_id: dict ) -> Optional[List[Business]]: """ Insert multiple businesses into the database. :param name_business_type_id: dictionary of name to business_type_id """ query = get_insert_businesses_query(names_business_type_id) results = self._run_query(query=query) if len(results) == 0: return None businesses_inserted = [] for result in results: businesses_inserted.append( Business( id=result[0], name=result[1], business_type_id=result[2], ) ) return businesses_inserted def insert_transaction( self, amount: int, business_id: int, is_expense: bool, created_date: datetime = datetime.now() ) -> bool: """ Inserts a transaction into the database. :param amount: amount in the transaction :param business_id: id of the business :param is_expense: bool if the transaction was an expense :param created_date: date of the transaction (default: current date) """ query = get_insert_transaction( amount, business_id, is_expense, created_date ) results = self._run_query(query) return len(results) == 1 def insert_transactions( self, transactions: List[Transaction] ) -> bool: """ Insert transactions into the database. :param transactions: transactions to insert """ values_to_insert = [ { 'dashboard_user_id': transaction.dashboard_user_id, 'amount': transaction.amount, 'business_id': transaction.business_id, 'created_date': transaction.created_date } for transaction in transactions ] query = get_insert_transactions_query( values_to_insert ) results = self._run_query(query=query) return len(results) != 0 def check_account_connection( self, dashboard_user_id: int, account_ids: List[int] ) -> bool: """ Checks if the account is connected to the specified user. Return True if all accounts given are related to the user. Returns False if ONE account is not related to the user. :param dashboard_user_id: id of the user :param account_ids: ids of accounts """ query = get_account_connection_query(dashboard_user_id, account_ids) results = self._run_query(query=query) return len(results) == len(account_ids) def get_budgets( self, dashboard_user_id: int, page: int = 1, ) -> Optional[List[Budget]]: """ Gets a list of budgets from a User. :param dashboard_user_id: id of the user :param page: page number of transactions (to not return ALL transactions at once if too many) """ query = get_budget_query( dashboard_user_id=dashboard_user_id, page=page ) results = self._run_query(query) if results is None: return None budgets = [] for result in results: budget_obj = Budget( id=result[0], name=result[1], amount=result[2], dashboard_user_id=result[3], business_type_id=result[4], created_date=result[5], ) if len(results) == 7: budget_obj.updated_date = result[6] budgets.append(budget_obj) return budgets def create_budget( self, dashboard_user_id: int, name: str, amount: int, business_type_id: int ) -> Optional[Budget]: """ Craetes a Budget that is related to a User. :param dashboard_user_id: id of the user :param name: name of the budget :param amount: amount of the budget :param business_type_id: id of the business type """ query = get_create_budget_query( name=name, amount=amount, dashboard_user_id=dashboard_user_id, business_type_id=business_type_id ) results = self._run_query(query) if len(results) == 0: return None inserted_budget = results[0] budget_obj = Budget( id=inserted_budget[0], name=inserted_budget[1], amount=inserted_budget[2], dashboard_user_id=inserted_budget[3], business_type_id=inserted_budget[4], created_date=inserted_budget[5], ) if len(results) == 7: budget_obj.updated_date = inserted_budget[6] return budget_obj def update_budget( self, budget_id: int, name: Optional[str] = None, amount: Optional[int] = None, business_type_id: Optional[int] = None ) -> bool: """ Updates properties of the Budget. If none of the parameters are given, then it would not be updated. :param budget_id: id of the budget :param name: new name to update to :param amount: new amount to update to :param business_type_id: new business type to update to """ # if all of the variables are None, no action needed. if all(var is None for var in [name, amount, business_type_id]): return True query = get_update_budget_query( budget_id=budget_id, name=name, amount=amount, business_type_id=business_type_id, ) try: results = self._run_query(query) if results is None: return False return True except Exception: return False def delete_budget( self, dashboard_user_id: int, name: str )-> bool: """ Deletes a Budget of a User. :param dashboard_user_id: id of the user :param name: name of the budget """ query = get_delete_budget_query( user_id=dashboard_user_id, name=name ) try: results = self._run_query(query) if results is None: return False return True except Exception: return False def get_user_by_email( self, email: str ) -> Optional[User]: """ Get user information by email. :param email: email of the user """ query = get_user_by_email_query(email) try: results = self._run_query(query) if results is None: return None result_found = results[0] return User( id=result_found[0], name=result_found[1], created_date=result_found[2] ) except Exception: return None def check_user( self, dashboard_user_id: int, password: str ) -> bool: """ Check if the login provided is correct. :param dashboard_user_id: id of the desired user :param password: password to access user information """ query = get_check_user_query(dashboard_user_id, password) results = self._run_query(query) return len(results) == 1