Exploring-Systems-For-Scalable-Analytics / dsc102pa0.py
dsc102pa0.py
Raw
"""
Brent Min  |  A15053745
Jonathan Zhang  |  A13646254

PA0.py is the script for writing in our Dask DataFrame Data Explorations into "results_PA0.json"
"""
from dask.distributed import Client
import dask.dataframe as dd
import numpy as np
import pandas as pd
import json    
    
    
def num_helpful(row):
    """
    This function will return the helpful votes of the column 'helpful'
    """
    try:
        return int(row[1:row.index(',')])
    except ValueError:
        return 0


def total_helpful(row):
    """
    This function will return the total votes of the column 'helpful'
    """
    try:
        return int(row[row.index(' '):-1])
    except ValueError:
        return 0
    
    
def PA0(user_reviews_csv):
    """
    PA0 will output the results of our completed DataFrame into "results_PA0.json"
    """
    client = Client()
    client = client.restart()
    
    # column data types
    dtypes = {
        'reviewerID': np.str,
        'asin': np.str,
        'reviewerName': np.str,
        'helpful': np.object,
        'reviewText': np.str,
        'overall': np.float64,
        'summary': np.str,
        'unixReviewTime': np.float64,
        'reviewTime': np.str
    }
    
    # defining our DataFrame with the columns listed below
    df = dd.read_csv(user_reviews_csv, dtype=dtypes)
    df = df[['reviewerID', 'asin', 'helpful', 'overall','reviewTime']]
    
    # creating the column 'year' -> must be an integer
    df['year'] = df['reviewTime'].str[-4:].astype(int)
    
    # creating the column 'helpful_votes' -> must be an integer
    df['helpful_votes'] = df.helpful.apply(num_helpful, meta='int64').astype('int')
    
    # creating the column 'total_votes' -> must be an integer
    df['total_votes'] = df.helpful.apply(total_helpful, meta='int64').astype('int')
    
    # essentially drops old columns replaced with the columns we just created above
    df = df[['reviewerID', 'asin', 'overall','year','helpful_votes', 'total_votes']]
    
    # aggregate function to apply numpy functions across specified columns
    grouped = df.groupby('reviewerID').agg({'asin': ['count'],
                                            'overall': ['mean'],
                                            'year': ['min'],
                                            'helpful_votes': ['sum'],
                                            'total_votes': ['sum']},
                                             split_out=4)
    
    # drops the extraneous column level created from the aggregate method (names)
    grouped.columns = grouped.columns.droplevel(1)
    
    # renames columns to fit PA0 writeup
    grouped = grouped.rename(columns={'asin' : 'number_products_rated',
                                      'overall' : 'avg_ratings',
                                      'year' : 'reviewing_since'})
    
    submit = grouped.describe().compute().round(2)    
    with open('results_PA0.json', 'w') as outfile: json.dump(json.loads(submit.to_json()), outfile)
    
if __name__ == '__main__':
    PA0('user_reviews.csv')