steam-review-scraper / steam_review_scraper / db_common.py
db_common.py
Raw
"""
Set of functions and classes that allows us to interact with our sqlite database.

Attributes:
    debug_mode (bool): Debug mode is activated for this run.
    testing_mode (bool): Testing mode is activated for this run.
    db_file (str): Steam database path.
"""

import os
import sys
import time
import logging
import sqlite3
import datetime
import json

import db_definition
import review_model


debug_mode = False
testing_mode = False
db_file = "output/steam.db"


class ReviewQuery:
    """Class storing data representing an sqlite query 'where' component.

    Attributes:
        lang_key (str): Language key.
        from_date (datetime): First time to consider.
        until_date (datetime): Last time to consider.
        hide_never_updated (bool): Hide reviews that have no updates.
        only_resolved_issues (bool): All review issues are solved.
        issue_list (lst(int)): List of issue ids. 
            Only include reviews with at least 1 matching id.
        only_update_after_response (bool): Only get reviews where last update
            happened after last response.
        can_be_turned (bool): Can be turned.
        has_response (bool): Only get reviews with responses.
        responded_by (int): Id of responder.
        recommended (bool): Was the game recommended or not.
    """
    def __init__(self, **kwargs):
        self.lang_key = kwargs.get("lang_key", None)
        self.from_date = kwargs.get("from_date", None)
        self.until_date = kwargs.get("until_date", None)
        self.hide_never_updated = kwargs.get("hide_never_updated", None)
        self.only_resolved_issues = kwargs.get("only_resolved_issues", None)
        self.issue_list = kwargs.get("issue_list", None)
        self.only_update_after_response = kwargs.get("only_update_after_response", None)
        self.can_be_turned = kwargs.get("can_be_turned", None)
        self.has_response = kwargs.get("has_response", None)
        self.responded_by = kwargs.get("responded_by", None)
        self.recommended = kwargs.get("recommended", None)


def set_debug(debug_on):
    global debug_mode
    debug_mode = debug_on


def insert_or_update_app(appid, name):
    """
    Inserts an app if it doesn't exists, otherwise updates.
    
    Args:
        appid (int): App/Game steam id.
        name (str): App/Game steam name.
    """
    all_columns = [
        "steam_appid",
        "display_name"
    ]
    insert_column_str = ", ".join(all_columns)
    insert_values_str = ", ".join(["?"] * len(all_columns))
    upsert_query = "INSERT OR REPLACE INTO {0} ({1}) VALUES ({2});".format(
        db_definition.TABLE_STEAM_GAMES, insert_column_str, insert_values_str)
    data = (appid, name)
    run_db_query(upsert_query, data)


def insert_or_update_languages(languages):
    """
    Inserts a language if it doesn't exists, otherwise updates.

    Args:
        languages (lst(common.Language)): List of languages.
    """
    data_list = []
    for language in languages:
        all_columns = [
            "lang_key",
            "name",
            "steam_key"
        ]
        insert_column_str = ", ".join(all_columns)
        insert_values_str = ", ".join(["?"] * len(all_columns))
        upsert_query = "INSERT OR REPLACE INTO {0} ({1}) VALUES ({2});".format(
            db_definition.TABLE_STEAM_LANGUAGES, insert_column_str, insert_values_str)
        data = (language.lang_key, language.name, language.steam_key)
        data_list.append(data)
    
    conn = get_db_connection()
    conn.executemany(upsert_query, data_list)
    conn.commit()


def insert_or_update_reviews(reviews, update_time, include_user_input_columns=False):
    """ Inserts (or updates if the ID already exists) the given reviews into the DB.

    Args:
        reviews (lst(SteamReview)): List of reviews
        update_time (int): Time at which the inserts/updates are being made.
        include_user_input_columns (bool): If true, the issue_list and can_be_turned 
            columns will also be set, else we don't update those.
    """

    data_columns = db_definition.COLUMNS_STEAM_REVIEWS.keys()
    if not include_user_input_columns:
        data_columns.remove("can_be_turned")
        data_columns.remove("issue_list")

    insert_column_str = ", ".join(data_columns)
    insert_values_str = ", ".join(["?"] * len(data_columns))

    upsert_query = """
        INSERT OR REPLACE INTO stats_steam_reviews
            ({0}) VALUES ({1});
        """.format(insert_column_str, insert_values_str)

    data_list = []

    for review in reviews:
        if not review.date_posted:
            logging.info("ReviewMissingDate,{0},{1}".format(review.review_url,review.id))
            continue

        data = (
            review.id,
            review.steam_appid,
            review.recommended,
            review.user_name,
            review.content,
            review.hours_played,
            review.review_url,
            review.date_posted,
            review.date_updated,
            review.helpful_amount,
            review.helpful_total,
            review.games_owned,
            review.responded_by,
            review.responded_date,
            review.lang_key,
            review.received_compensation,
            update_time
        )
        if include_user_input_columns:
            data = data + (
                review.can_be_turned,
                review.issue_list
            )

        if debug_mode:
            logging.info("Query: {0} ({1})".format(upsert_query, data))

        data_list.append(data)

    conn = get_db_connection()
    conn.executemany(upsert_query, data_list)
    conn.commit()


def delete_review(review_id):
    """Delete a single review of a given id."""
    run_db_query("DELETE FROM stats_steam_reviews WHERE id = ?;", (review_id,))


def delete_all_unchanged_reviews(appid, languages_keys, compare_time):
    """Delete all reviews last updated before the given time.

        Args:
            appid (int): App/Game id.
            languages_keys (lst(str)): Langugaes in format 'english'.
            compare_time (int): Epoch time to compare to.

        Returns:
            int: Count of deleted reviews.
    """
    query = "DELETE FROM stats_steam_reviews "

    where_args = ["WHERE steam_appid = ?"]
    where_args.append("entry_updated < ?")
    where_args.append("lang_key IN ({});".format(
        ",".join(["?"]*len(languages_keys)))
    )

    data = [appid, compare_time]
    data.extend(languages_keys)

    query += " AND ".join(where_args)

    conn = get_db_connection()
    c = conn.cursor()
    c.execute(query, data)
    conn.commit()
    return c.rowcount


def get_reviews(
        steam_appid,
        sort_by="date_posted",
        sort_order=db_definition.ORDER_MODES[0],
        page_number=0,
        reviews_per_page=20,
        review_query_object=None):
    """
    Get reviews matching a set of query settings.

    Args:
        steam_appid (str): The id of the app/game containing the reviews.
        sort_by (str): The named db field to sort using.
        sort_order (str): Ordering rule, such as 'desc' or 'asc'.
        page_number (int): The page number to find reviews at, as decided by reviews_per_page.
        reviews_per_page (int): The number of reviews per page.
        review_query_object (ReviewQuery): Object containing extra query settings.

    Returns:
        lst(SteamReview): Reviews for the given page.
    """

    column_str = ", ".join(db_definition.COLUMNS_STEAM_REVIEWS.keys())

    order_by_str = ""
    if sort_by and sort_order:
        sort_by_col = sort_by if sort_by in db_definition.COLUMNS_STEAM_REVIEWS.keys() else "date_posted"
        sort_by_order = sort_order if sort_order in db_definition.ORDER_MODES else db_definition.ORDER_MODES[0]
        order_by_str = "ORDER BY {col} {order}".format(col=sort_by_col, order=sort_by_order)

    # Paging
    total_row_count = get_review_count(steam_appid, review_query_object)
    start_count = page_number * reviews_per_page
    if start_count < total_row_count:
        pagination_str = " LIMIT ? OFFSET ?"
        pagination_variables = [reviews_per_page, start_count]
    else:
        pagination_variables = []
        pagination_str = ""

    where_str, variables = construct_where_argument(steam_appid, review_query_object)

    final_variables = tuple(variables)
    final_pagination_variables = tuple(pagination_variables)

    select_reviews_query = get_reviews_select_query(
        column_str, where_str, order_by_str, pagination_str
    )
    reviews = run_db_query(
        select_reviews_query,
        final_variables + final_pagination_variables
    )

    return [steam_review_from_result(rev) for rev in reviews]


def construct_where_argument(appid, review_query_object):
    """ Construct a where str and variables from a review query.

    Args:
        appid (int): App/Game id.
        review_query_object (ReviewQuery): Query object.

    Returns:
        tup(str, lst()): Where string and variables.
    """
    variables = [appid]
    where_clauses = []
    where_clauses.append("steam_appid = ?")

    if review_query_object:
        if review_query_object.lang_key:
            where_clauses.append("lang_key = ?")
            variables.append(review_query_object.lang_key)

        if review_query_object.from_date:
            where_clauses.append("date_posted >= ?")
            variables.append(review_query_object.from_date)

        if review_query_object.until_date:
            where_clauses.append("date_posted <= ?")
            variables.append(review_query_object.until_date)

        if review_query_object.hide_never_updated:
            where_clauses.append("date_updated IS NOT NULL")

        if review_query_object.only_resolved_issues:
            # Resolved issues have a status > 0, so we check if 0 < min issue.
            where_clauses.append("""
                0 < (select min(ri.resolved_status) from {0} 
                    left join {1} as ri ON IS_INT_IN_LST(ri.id, re.issue_list))
            """.format(
                db_definition.TABLE_STEAM_REVIEWS,
                db_definition.TABLE_STEAM_REVIEW_ISSUES))

        elif review_query_object.issue_list:
            # We check if any provided issues intersect with review issues.  
            json_lst = json.dumps(review_query_object.issue_list)
            where_clauses.append("""COMPARE_LST(issue_list, ?)""")
            variables.append(json_lst)

        if review_query_object.only_update_after_response:
            where_clauses.append("date_updated > responded_date")

        if review_query_object.can_be_turned is not None:
            where_clauses.append("can_be_turned = ?")
            variables.append(review_query_object.can_be_turned)

        if review_query_object.has_response is not None:
            value = "NOT" if review_query_object.has_response else ""
            where_clauses.append("responded_by IS {} NULL".format(value))

        if review_query_object.responded_by and review_query_object.responded_by != 0:
            where_clauses.append("responded_by = ?")
            variables.append(review_query_object.responded_by)

        if review_query_object.recommended is not None:
            where_clauses.append("recommended = ?")
            variables.append(review_query_object.recommended)

    where_str = " AND ".join(where_clauses)
    if where_str:
        where_str = "WHERE " + where_str + " "

    return (where_str, variables)


def steam_review_from_result(review):
    """
    Construct a steam review object given a db result.

    Args:
        review (tpl): DB result tuple.

    Returns:
        SteamReview: New steam review.
    """
    data_dict = {}
    for index, key in enumerate(db_definition.COLUMNS_STEAM_REVIEWS.keys()):
        # DB will have returned None for any empty column.
        data_dict[key] = review[index]

    return review_model.SteamReview(**data_dict)


def get_review_count(steam_appid, review_query=None):
    """ Count the total reviews for a given id and query object.

    Args:
        steam_appid (int): App/Game id.
        review_query (ReviewQuery): Object used to construct query.

    Returns:
        int: Review count.
    """
    select = "count(id)"
    where_str, variables = construct_where_argument(steam_appid, review_query)
    query = get_reviews_select_query(select, where_str, "", "")
    q_review_count = run_db_query(query, variables)
    if q_review_count:
        return q_review_count[0][0]
    return 0


def get_reviews_select_query(select, where, order, pagination):
    """ Constructs and returns a new review query str. """
    query = "SELECT {select} FROM {table} as re {where}{order}{pagination};".format(
        select=select, table=db_definition.TABLE_STEAM_REVIEWS, where=where, order=order, pagination=pagination)
    return query


def run_db_query(query, data=None):
    """ Run a given db query.

    Args:
        query (str): Constructed query without values.
        data (lst()): Optional query values.

    Returns:
        tuple: fetchall query result or None.
    """

    conn = get_db_connection()
    c = conn.cursor()
    if data:
        c.execute(query, data)
    else:
        c.execute(query)

    conn.commit()

    try:
        return c.fetchall()
    except Exception:
        return None


def get_db_connection():
    """
    Create connection with a previoulsy created db.

    Returns:
        connection: New db connection.
    """

    global db_file
    global testing_mode
    
    if not db_file:
        raise Exception("Database file is not set.")

    # Check if database file exists.
    if not os.path.isfile(db_file) and not testing_mode:
        raise Exception(
            """Sqlite database file does not exists,
            did you run the import script? If this is a new docker spin up,
            set the environment variable import_database_on_startup=1"""
        )

    conn = sqlite3.connect(
        db_file, detect_types=sqlite3.PARSE_DECLTYPES)
    conn.text_factory = str
    _prepare_connection(conn)
    return conn


def create_database(_db_file=None):
    """
    Construct a new db and return a connection to it.

    Args:
        _db_file (str): Override db file to be constructed.

    Returns:
        connection: Connection to new db.
    """

    global db_file
    global testing_mode

    if _db_file:
        db_file = _db_file

    if os.path.isfile(db_file) and not testing_mode:
        logging.info("Database file already exists, skipping database creation.")
        return

    sqlite3.register_adapter(list, json.dumps)
    sqlite3.register_converter("bigintlst", json.loads)

    conn = sqlite3.connect(
        db_file, detect_types=sqlite3.PARSE_DECLTYPES)
    conn.text_factory = str
    _prepare_connection(conn)
    cursor = conn.cursor()

    cursor.execute(db_definition.CREATE_TABLE_EVENTS)
    cursor.execute(db_definition.CREATE_TABLE_STEAM_GAMES)
    cursor.execute(db_definition.CREATE_TABLE_STEAM_LANGUAGES)
    cursor.execute(db_definition.CREATE_TABLE_STEAM_PLAYER_COUNT)
    cursor.execute(db_definition.CREATE_TABLE_STEAM_REVIEWS)
    cursor.execute(db_definition.CREATE_TABLE_STEAM_REVIEW_ISSUES)
    cursor.execute(db_definition.CREATE_TABLE_USERS)

    conn.commit()
    return conn


def _prepare_connection(conn):
    conn.create_function("COMPARE_LST", 2, _compare_lists)
    conn.create_function("IS_INT_IN_LST", 2, _is_int_in_list)


def _compare_lists(json1, json2):
    if(json1 is None or json2 is None):
        return False
    lst1 = json.loads(json1)
    lst2 = json.loads(json2)
    return bool(set(lst1).intersection(set(lst2)))


def _is_int_in_list(value, json_lst):
    if value is None or json_lst is None:
        return False
    lst = json.loads(json_lst)
    return bool(int(value) in lst)