""" Set of functions and classes that allows us to interact with our sqlite database. Attributes: debug_mode (bool): Debug mode is activated for this run. testing_mode (bool): Testing mode is activated for this run. db_file (str): Steam database path. """ import os import sys import time import logging import sqlite3 import datetime import json import db_definition import review_model debug_mode = False testing_mode = False db_file = "output/steam.db" class ReviewQuery: """Class storing data representing an sqlite query 'where' component. Attributes: lang_key (str): Language key. from_date (datetime): First time to consider. until_date (datetime): Last time to consider. hide_never_updated (bool): Hide reviews that have no updates. only_resolved_issues (bool): All review issues are solved. issue_list (lst(int)): List of issue ids. Only include reviews with at least 1 matching id. only_update_after_response (bool): Only get reviews where last update happened after last response. can_be_turned (bool): Can be turned. has_response (bool): Only get reviews with responses. responded_by (int): Id of responder. recommended (bool): Was the game recommended or not. """ def __init__(self, **kwargs): self.lang_key = kwargs.get("lang_key", None) self.from_date = kwargs.get("from_date", None) self.until_date = kwargs.get("until_date", None) self.hide_never_updated = kwargs.get("hide_never_updated", None) self.only_resolved_issues = kwargs.get("only_resolved_issues", None) self.issue_list = kwargs.get("issue_list", None) self.only_update_after_response = kwargs.get("only_update_after_response", None) self.can_be_turned = kwargs.get("can_be_turned", None) self.has_response = kwargs.get("has_response", None) self.responded_by = kwargs.get("responded_by", None) self.recommended = kwargs.get("recommended", None) def set_debug(debug_on): global debug_mode debug_mode = debug_on def insert_or_update_app(appid, name): """ Inserts an app if it doesn't exists, otherwise updates. Args: appid (int): App/Game steam id. name (str): App/Game steam name. """ all_columns = [ "steam_appid", "display_name" ] insert_column_str = ", ".join(all_columns) insert_values_str = ", ".join(["?"] * len(all_columns)) upsert_query = "INSERT OR REPLACE INTO {0} ({1}) VALUES ({2});".format( db_definition.TABLE_STEAM_GAMES, insert_column_str, insert_values_str) data = (appid, name) run_db_query(upsert_query, data) def insert_or_update_languages(languages): """ Inserts a language if it doesn't exists, otherwise updates. Args: languages (lst(common.Language)): List of languages. """ data_list = [] for language in languages: all_columns = [ "lang_key", "name", "steam_key" ] insert_column_str = ", ".join(all_columns) insert_values_str = ", ".join(["?"] * len(all_columns)) upsert_query = "INSERT OR REPLACE INTO {0} ({1}) VALUES ({2});".format( db_definition.TABLE_STEAM_LANGUAGES, insert_column_str, insert_values_str) data = (language.lang_key, language.name, language.steam_key) data_list.append(data) conn = get_db_connection() conn.executemany(upsert_query, data_list) conn.commit() def insert_or_update_reviews(reviews, update_time, include_user_input_columns=False): """ Inserts (or updates if the ID already exists) the given reviews into the DB. Args: reviews (lst(SteamReview)): List of reviews update_time (int): Time at which the inserts/updates are being made. include_user_input_columns (bool): If true, the issue_list and can_be_turned columns will also be set, else we don't update those. """ data_columns = db_definition.COLUMNS_STEAM_REVIEWS.keys() if not include_user_input_columns: data_columns.remove("can_be_turned") data_columns.remove("issue_list") insert_column_str = ", ".join(data_columns) insert_values_str = ", ".join(["?"] * len(data_columns)) upsert_query = """ INSERT OR REPLACE INTO stats_steam_reviews ({0}) VALUES ({1}); """.format(insert_column_str, insert_values_str) data_list = [] for review in reviews: if not review.date_posted: logging.info("ReviewMissingDate,{0},{1}".format(review.review_url,review.id)) continue data = ( review.id, review.steam_appid, review.recommended, review.user_name, review.content, review.hours_played, review.review_url, review.date_posted, review.date_updated, review.helpful_amount, review.helpful_total, review.games_owned, review.responded_by, review.responded_date, review.lang_key, review.received_compensation, update_time ) if include_user_input_columns: data = data + ( review.can_be_turned, review.issue_list ) if debug_mode: logging.info("Query: {0} ({1})".format(upsert_query, data)) data_list.append(data) conn = get_db_connection() conn.executemany(upsert_query, data_list) conn.commit() def delete_review(review_id): """Delete a single review of a given id.""" run_db_query("DELETE FROM stats_steam_reviews WHERE id = ?;", (review_id,)) def delete_all_unchanged_reviews(appid, languages_keys, compare_time): """Delete all reviews last updated before the given time. Args: appid (int): App/Game id. languages_keys (lst(str)): Langugaes in format 'english'. compare_time (int): Epoch time to compare to. Returns: int: Count of deleted reviews. """ query = "DELETE FROM stats_steam_reviews " where_args = ["WHERE steam_appid = ?"] where_args.append("entry_updated < ?") where_args.append("lang_key IN ({});".format( ",".join(["?"]*len(languages_keys))) ) data = [appid, compare_time] data.extend(languages_keys) query += " AND ".join(where_args) conn = get_db_connection() c = conn.cursor() c.execute(query, data) conn.commit() return c.rowcount def get_reviews( steam_appid, sort_by="date_posted", sort_order=db_definition.ORDER_MODES[0], page_number=0, reviews_per_page=20, review_query_object=None): """ Get reviews matching a set of query settings. Args: steam_appid (str): The id of the app/game containing the reviews. sort_by (str): The named db field to sort using. sort_order (str): Ordering rule, such as 'desc' or 'asc'. page_number (int): The page number to find reviews at, as decided by reviews_per_page. reviews_per_page (int): The number of reviews per page. review_query_object (ReviewQuery): Object containing extra query settings. Returns: lst(SteamReview): Reviews for the given page. """ column_str = ", ".join(db_definition.COLUMNS_STEAM_REVIEWS.keys()) order_by_str = "" if sort_by and sort_order: sort_by_col = sort_by if sort_by in db_definition.COLUMNS_STEAM_REVIEWS.keys() else "date_posted" sort_by_order = sort_order if sort_order in db_definition.ORDER_MODES else db_definition.ORDER_MODES[0] order_by_str = "ORDER BY {col} {order}".format(col=sort_by_col, order=sort_by_order) # Paging total_row_count = get_review_count(steam_appid, review_query_object) start_count = page_number * reviews_per_page if start_count < total_row_count: pagination_str = " LIMIT ? OFFSET ?" pagination_variables = [reviews_per_page, start_count] else: pagination_variables = [] pagination_str = "" where_str, variables = construct_where_argument(steam_appid, review_query_object) final_variables = tuple(variables) final_pagination_variables = tuple(pagination_variables) select_reviews_query = get_reviews_select_query( column_str, where_str, order_by_str, pagination_str ) reviews = run_db_query( select_reviews_query, final_variables + final_pagination_variables ) return [steam_review_from_result(rev) for rev in reviews] def construct_where_argument(appid, review_query_object): """ Construct a where str and variables from a review query. Args: appid (int): App/Game id. review_query_object (ReviewQuery): Query object. Returns: tup(str, lst()): Where string and variables. """ variables = [appid] where_clauses = [] where_clauses.append("steam_appid = ?") if review_query_object: if review_query_object.lang_key: where_clauses.append("lang_key = ?") variables.append(review_query_object.lang_key) if review_query_object.from_date: where_clauses.append("date_posted >= ?") variables.append(review_query_object.from_date) if review_query_object.until_date: where_clauses.append("date_posted <= ?") variables.append(review_query_object.until_date) if review_query_object.hide_never_updated: where_clauses.append("date_updated IS NOT NULL") if review_query_object.only_resolved_issues: # Resolved issues have a status > 0, so we check if 0 < min issue. where_clauses.append(""" 0 < (select min(ri.resolved_status) from {0} left join {1} as ri ON IS_INT_IN_LST(ri.id, re.issue_list)) """.format( db_definition.TABLE_STEAM_REVIEWS, db_definition.TABLE_STEAM_REVIEW_ISSUES)) elif review_query_object.issue_list: # We check if any provided issues intersect with review issues. json_lst = json.dumps(review_query_object.issue_list) where_clauses.append("""COMPARE_LST(issue_list, ?)""") variables.append(json_lst) if review_query_object.only_update_after_response: where_clauses.append("date_updated > responded_date") if review_query_object.can_be_turned is not None: where_clauses.append("can_be_turned = ?") variables.append(review_query_object.can_be_turned) if review_query_object.has_response is not None: value = "NOT" if review_query_object.has_response else "" where_clauses.append("responded_by IS {} NULL".format(value)) if review_query_object.responded_by and review_query_object.responded_by != 0: where_clauses.append("responded_by = ?") variables.append(review_query_object.responded_by) if review_query_object.recommended is not None: where_clauses.append("recommended = ?") variables.append(review_query_object.recommended) where_str = " AND ".join(where_clauses) if where_str: where_str = "WHERE " + where_str + " " return (where_str, variables) def steam_review_from_result(review): """ Construct a steam review object given a db result. Args: review (tpl): DB result tuple. Returns: SteamReview: New steam review. """ data_dict = {} for index, key in enumerate(db_definition.COLUMNS_STEAM_REVIEWS.keys()): # DB will have returned None for any empty column. data_dict[key] = review[index] return review_model.SteamReview(**data_dict) def get_review_count(steam_appid, review_query=None): """ Count the total reviews for a given id and query object. Args: steam_appid (int): App/Game id. review_query (ReviewQuery): Object used to construct query. Returns: int: Review count. """ select = "count(id)" where_str, variables = construct_where_argument(steam_appid, review_query) query = get_reviews_select_query(select, where_str, "", "") q_review_count = run_db_query(query, variables) if q_review_count: return q_review_count[0][0] return 0 def get_reviews_select_query(select, where, order, pagination): """ Constructs and returns a new review query str. """ query = "SELECT {select} FROM {table} as re {where}{order}{pagination};".format( select=select, table=db_definition.TABLE_STEAM_REVIEWS, where=where, order=order, pagination=pagination) return query def run_db_query(query, data=None): """ Run a given db query. Args: query (str): Constructed query without values. data (lst()): Optional query values. Returns: tuple: fetchall query result or None. """ conn = get_db_connection() c = conn.cursor() if data: c.execute(query, data) else: c.execute(query) conn.commit() try: return c.fetchall() except Exception: return None def get_db_connection(): """ Create connection with a previoulsy created db. Returns: connection: New db connection. """ global db_file global testing_mode if not db_file: raise Exception("Database file is not set.") # Check if database file exists. if not os.path.isfile(db_file) and not testing_mode: raise Exception( """Sqlite database file does not exists, did you run the import script? If this is a new docker spin up, set the environment variable import_database_on_startup=1""" ) conn = sqlite3.connect( db_file, detect_types=sqlite3.PARSE_DECLTYPES) conn.text_factory = str _prepare_connection(conn) return conn def create_database(_db_file=None): """ Construct a new db and return a connection to it. Args: _db_file (str): Override db file to be constructed. Returns: connection: Connection to new db. """ global db_file global testing_mode if _db_file: db_file = _db_file if os.path.isfile(db_file) and not testing_mode: logging.info("Database file already exists, skipping database creation.") return sqlite3.register_adapter(list, json.dumps) sqlite3.register_converter("bigintlst", json.loads) conn = sqlite3.connect( db_file, detect_types=sqlite3.PARSE_DECLTYPES) conn.text_factory = str _prepare_connection(conn) cursor = conn.cursor() cursor.execute(db_definition.CREATE_TABLE_EVENTS) cursor.execute(db_definition.CREATE_TABLE_STEAM_GAMES) cursor.execute(db_definition.CREATE_TABLE_STEAM_LANGUAGES) cursor.execute(db_definition.CREATE_TABLE_STEAM_PLAYER_COUNT) cursor.execute(db_definition.CREATE_TABLE_STEAM_REVIEWS) cursor.execute(db_definition.CREATE_TABLE_STEAM_REVIEW_ISSUES) cursor.execute(db_definition.CREATE_TABLE_USERS) conn.commit() return conn def _prepare_connection(conn): conn.create_function("COMPARE_LST", 2, _compare_lists) conn.create_function("IS_INT_IN_LST", 2, _is_int_in_list) def _compare_lists(json1, json2): if(json1 is None or json2 is None): return False lst1 = json.loads(json1) lst2 = json.loads(json2) return bool(set(lst1).intersection(set(lst2))) def _is_int_in_list(value, json_lst): if value is None or json_lst is None: return False lst = json.loads(json_lst) return bool(int(value) in lst)