"""
Set of functions and classes that allows us to interact with our sqlite database.
Attributes:
debug_mode (bool): Debug mode is activated for this run.
testing_mode (bool): Testing mode is activated for this run.
db_file (str): Steam database path.
"""
import os
import sys
import time
import logging
import sqlite3
import datetime
import json
import db_definition
import review_model
debug_mode = False
testing_mode = False
db_file = "output/steam.db"
class ReviewQuery:
"""Class storing data representing an sqlite query 'where' component.
Attributes:
lang_key (str): Language key.
from_date (datetime): First time to consider.
until_date (datetime): Last time to consider.
hide_never_updated (bool): Hide reviews that have no updates.
only_resolved_issues (bool): All review issues are solved.
issue_list (lst(int)): List of issue ids.
Only include reviews with at least 1 matching id.
only_update_after_response (bool): Only get reviews where last update
happened after last response.
can_be_turned (bool): Can be turned.
has_response (bool): Only get reviews with responses.
responded_by (int): Id of responder.
recommended (bool): Was the game recommended or not.
"""
def __init__(self, **kwargs):
self.lang_key = kwargs.get("lang_key", None)
self.from_date = kwargs.get("from_date", None)
self.until_date = kwargs.get("until_date", None)
self.hide_never_updated = kwargs.get("hide_never_updated", None)
self.only_resolved_issues = kwargs.get("only_resolved_issues", None)
self.issue_list = kwargs.get("issue_list", None)
self.only_update_after_response = kwargs.get("only_update_after_response", None)
self.can_be_turned = kwargs.get("can_be_turned", None)
self.has_response = kwargs.get("has_response", None)
self.responded_by = kwargs.get("responded_by", None)
self.recommended = kwargs.get("recommended", None)
def set_debug(debug_on):
global debug_mode
debug_mode = debug_on
def insert_or_update_app(appid, name):
"""
Inserts an app if it doesn't exists, otherwise updates.
Args:
appid (int): App/Game steam id.
name (str): App/Game steam name.
"""
all_columns = [
"steam_appid",
"display_name"
]
insert_column_str = ", ".join(all_columns)
insert_values_str = ", ".join(["?"] * len(all_columns))
upsert_query = "INSERT OR REPLACE INTO {0} ({1}) VALUES ({2});".format(
db_definition.TABLE_STEAM_GAMES, insert_column_str, insert_values_str)
data = (appid, name)
run_db_query(upsert_query, data)
def insert_or_update_languages(languages):
"""
Inserts a language if it doesn't exists, otherwise updates.
Args:
languages (lst(common.Language)): List of languages.
"""
data_list = []
for language in languages:
all_columns = [
"lang_key",
"name",
"steam_key"
]
insert_column_str = ", ".join(all_columns)
insert_values_str = ", ".join(["?"] * len(all_columns))
upsert_query = "INSERT OR REPLACE INTO {0} ({1}) VALUES ({2});".format(
db_definition.TABLE_STEAM_LANGUAGES, insert_column_str, insert_values_str)
data = (language.lang_key, language.name, language.steam_key)
data_list.append(data)
conn = get_db_connection()
conn.executemany(upsert_query, data_list)
conn.commit()
def insert_or_update_reviews(reviews, update_time, include_user_input_columns=False):
""" Inserts (or updates if the ID already exists) the given reviews into the DB.
Args:
reviews (lst(SteamReview)): List of reviews
update_time (int): Time at which the inserts/updates are being made.
include_user_input_columns (bool): If true, the issue_list and can_be_turned
columns will also be set, else we don't update those.
"""
data_columns = db_definition.COLUMNS_STEAM_REVIEWS.keys()
if not include_user_input_columns:
data_columns.remove("can_be_turned")
data_columns.remove("issue_list")
insert_column_str = ", ".join(data_columns)
insert_values_str = ", ".join(["?"] * len(data_columns))
upsert_query = """
INSERT OR REPLACE INTO stats_steam_reviews
({0}) VALUES ({1});
""".format(insert_column_str, insert_values_str)
data_list = []
for review in reviews:
if not review.date_posted:
logging.info("ReviewMissingDate,{0},{1}".format(review.review_url,review.id))
continue
data = (
review.id,
review.steam_appid,
review.recommended,
review.user_name,
review.content,
review.hours_played,
review.review_url,
review.date_posted,
review.date_updated,
review.helpful_amount,
review.helpful_total,
review.games_owned,
review.responded_by,
review.responded_date,
review.lang_key,
review.received_compensation,
update_time
)
if include_user_input_columns:
data = data + (
review.can_be_turned,
review.issue_list
)
if debug_mode:
logging.info("Query: {0} ({1})".format(upsert_query, data))
data_list.append(data)
conn = get_db_connection()
conn.executemany(upsert_query, data_list)
conn.commit()
def delete_review(review_id):
"""Delete a single review of a given id."""
run_db_query("DELETE FROM stats_steam_reviews WHERE id = ?;", (review_id,))
def delete_all_unchanged_reviews(appid, languages_keys, compare_time):
"""Delete all reviews last updated before the given time.
Args:
appid (int): App/Game id.
languages_keys (lst(str)): Langugaes in format 'english'.
compare_time (int): Epoch time to compare to.
Returns:
int: Count of deleted reviews.
"""
query = "DELETE FROM stats_steam_reviews "
where_args = ["WHERE steam_appid = ?"]
where_args.append("entry_updated < ?")
where_args.append("lang_key IN ({});".format(
",".join(["?"]*len(languages_keys)))
)
data = [appid, compare_time]
data.extend(languages_keys)
query += " AND ".join(where_args)
conn = get_db_connection()
c = conn.cursor()
c.execute(query, data)
conn.commit()
return c.rowcount
def get_reviews(
steam_appid,
sort_by="date_posted",
sort_order=db_definition.ORDER_MODES[0],
page_number=0,
reviews_per_page=20,
review_query_object=None):
"""
Get reviews matching a set of query settings.
Args:
steam_appid (str): The id of the app/game containing the reviews.
sort_by (str): The named db field to sort using.
sort_order (str): Ordering rule, such as 'desc' or 'asc'.
page_number (int): The page number to find reviews at, as decided by reviews_per_page.
reviews_per_page (int): The number of reviews per page.
review_query_object (ReviewQuery): Object containing extra query settings.
Returns:
lst(SteamReview): Reviews for the given page.
"""
column_str = ", ".join(db_definition.COLUMNS_STEAM_REVIEWS.keys())
order_by_str = ""
if sort_by and sort_order:
sort_by_col = sort_by if sort_by in db_definition.COLUMNS_STEAM_REVIEWS.keys() else "date_posted"
sort_by_order = sort_order if sort_order in db_definition.ORDER_MODES else db_definition.ORDER_MODES[0]
order_by_str = "ORDER BY {col} {order}".format(col=sort_by_col, order=sort_by_order)
# Paging
total_row_count = get_review_count(steam_appid, review_query_object)
start_count = page_number * reviews_per_page
if start_count < total_row_count:
pagination_str = " LIMIT ? OFFSET ?"
pagination_variables = [reviews_per_page, start_count]
else:
pagination_variables = []
pagination_str = ""
where_str, variables = construct_where_argument(steam_appid, review_query_object)
final_variables = tuple(variables)
final_pagination_variables = tuple(pagination_variables)
select_reviews_query = get_reviews_select_query(
column_str, where_str, order_by_str, pagination_str
)
reviews = run_db_query(
select_reviews_query,
final_variables + final_pagination_variables
)
return [steam_review_from_result(rev) for rev in reviews]
def construct_where_argument(appid, review_query_object):
""" Construct a where str and variables from a review query.
Args:
appid (int): App/Game id.
review_query_object (ReviewQuery): Query object.
Returns:
tup(str, lst()): Where string and variables.
"""
variables = [appid]
where_clauses = []
where_clauses.append("steam_appid = ?")
if review_query_object:
if review_query_object.lang_key:
where_clauses.append("lang_key = ?")
variables.append(review_query_object.lang_key)
if review_query_object.from_date:
where_clauses.append("date_posted >= ?")
variables.append(review_query_object.from_date)
if review_query_object.until_date:
where_clauses.append("date_posted <= ?")
variables.append(review_query_object.until_date)
if review_query_object.hide_never_updated:
where_clauses.append("date_updated IS NOT NULL")
if review_query_object.only_resolved_issues:
# Resolved issues have a status > 0, so we check if 0 < min issue.
where_clauses.append("""
0 < (select min(ri.resolved_status) from {0}
left join {1} as ri ON IS_INT_IN_LST(ri.id, re.issue_list))
""".format(
db_definition.TABLE_STEAM_REVIEWS,
db_definition.TABLE_STEAM_REVIEW_ISSUES))
elif review_query_object.issue_list:
# We check if any provided issues intersect with review issues.
json_lst = json.dumps(review_query_object.issue_list)
where_clauses.append("""COMPARE_LST(issue_list, ?)""")
variables.append(json_lst)
if review_query_object.only_update_after_response:
where_clauses.append("date_updated > responded_date")
if review_query_object.can_be_turned is not None:
where_clauses.append("can_be_turned = ?")
variables.append(review_query_object.can_be_turned)
if review_query_object.has_response is not None:
value = "NOT" if review_query_object.has_response else ""
where_clauses.append("responded_by IS {} NULL".format(value))
if review_query_object.responded_by and review_query_object.responded_by != 0:
where_clauses.append("responded_by = ?")
variables.append(review_query_object.responded_by)
if review_query_object.recommended is not None:
where_clauses.append("recommended = ?")
variables.append(review_query_object.recommended)
where_str = " AND ".join(where_clauses)
if where_str:
where_str = "WHERE " + where_str + " "
return (where_str, variables)
def steam_review_from_result(review):
"""
Construct a steam review object given a db result.
Args:
review (tpl): DB result tuple.
Returns:
SteamReview: New steam review.
"""
data_dict = {}
for index, key in enumerate(db_definition.COLUMNS_STEAM_REVIEWS.keys()):
# DB will have returned None for any empty column.
data_dict[key] = review[index]
return review_model.SteamReview(**data_dict)
def get_review_count(steam_appid, review_query=None):
""" Count the total reviews for a given id and query object.
Args:
steam_appid (int): App/Game id.
review_query (ReviewQuery): Object used to construct query.
Returns:
int: Review count.
"""
select = "count(id)"
where_str, variables = construct_where_argument(steam_appid, review_query)
query = get_reviews_select_query(select, where_str, "", "")
q_review_count = run_db_query(query, variables)
if q_review_count:
return q_review_count[0][0]
return 0
def get_reviews_select_query(select, where, order, pagination):
""" Constructs and returns a new review query str. """
query = "SELECT {select} FROM {table} as re {where}{order}{pagination};".format(
select=select, table=db_definition.TABLE_STEAM_REVIEWS, where=where, order=order, pagination=pagination)
return query
def run_db_query(query, data=None):
""" Run a given db query.
Args:
query (str): Constructed query without values.
data (lst()): Optional query values.
Returns:
tuple: fetchall query result or None.
"""
conn = get_db_connection()
c = conn.cursor()
if data:
c.execute(query, data)
else:
c.execute(query)
conn.commit()
try:
return c.fetchall()
except Exception:
return None
def get_db_connection():
"""
Create connection with a previoulsy created db.
Returns:
connection: New db connection.
"""
global db_file
global testing_mode
if not db_file:
raise Exception("Database file is not set.")
# Check if database file exists.
if not os.path.isfile(db_file) and not testing_mode:
raise Exception(
"""Sqlite database file does not exists,
did you run the import script? If this is a new docker spin up,
set the environment variable import_database_on_startup=1"""
)
conn = sqlite3.connect(
db_file, detect_types=sqlite3.PARSE_DECLTYPES)
conn.text_factory = str
_prepare_connection(conn)
return conn
def create_database(_db_file=None):
"""
Construct a new db and return a connection to it.
Args:
_db_file (str): Override db file to be constructed.
Returns:
connection: Connection to new db.
"""
global db_file
global testing_mode
if _db_file:
db_file = _db_file
if os.path.isfile(db_file) and not testing_mode:
logging.info("Database file already exists, skipping database creation.")
return
sqlite3.register_adapter(list, json.dumps)
sqlite3.register_converter("bigintlst", json.loads)
conn = sqlite3.connect(
db_file, detect_types=sqlite3.PARSE_DECLTYPES)
conn.text_factory = str
_prepare_connection(conn)
cursor = conn.cursor()
cursor.execute(db_definition.CREATE_TABLE_EVENTS)
cursor.execute(db_definition.CREATE_TABLE_STEAM_GAMES)
cursor.execute(db_definition.CREATE_TABLE_STEAM_LANGUAGES)
cursor.execute(db_definition.CREATE_TABLE_STEAM_PLAYER_COUNT)
cursor.execute(db_definition.CREATE_TABLE_STEAM_REVIEWS)
cursor.execute(db_definition.CREATE_TABLE_STEAM_REVIEW_ISSUES)
cursor.execute(db_definition.CREATE_TABLE_USERS)
conn.commit()
return conn
def _prepare_connection(conn):
conn.create_function("COMPARE_LST", 2, _compare_lists)
conn.create_function("IS_INT_IN_LST", 2, _is_int_in_list)
def _compare_lists(json1, json2):
if(json1 is None or json2 is None):
return False
lst1 = json.loads(json1)
lst2 = json.loads(json2)
return bool(set(lst1).intersection(set(lst2)))
def _is_int_in_list(value, json_lst):
if value is None or json_lst is None:
return False
lst = json.loads(json_lst)
return bool(int(value) in lst)