""" Jon Zhang Brent Min """ import os import pyspark.sql.functions as F import pyspark.sql.types as T from utilities import SEED # import any other dependencies you want, but make sure only to use the ones # availiable on AWS EMR # ---------------- choose input format, dataframe or rdd ---------------------- INPUT_FORMAT = 'dataframe' # change to 'rdd' if you wish to use rdd inputs # ----------------------------------------------------------------------------- if INPUT_FORMAT == 'dataframe': import pyspark.ml as M import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.ml.regression import DecisionTreeRegressor from pyspark.ml.evaluation import RegressionEvaluator if INPUT_FORMAT == 'koalas': import databricks.koalas as ks elif INPUT_FORMAT == 'rdd': import pyspark.mllib as M from pyspark.mllib.feature import Word2Vec from pyspark.mllib.linalg import Vectors from pyspark.mllib.linalg.distributed import RowMatrix from pyspark.mllib.tree import DecisionTree from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.linalg import DenseVector from pyspark.mllib.evaluation import RegressionMetrics # %load -s task_1 assignment2.py def task_1(data_io, review_data, product_data): # -----------------------------Column names-------------------------------- # Inputs: asin_column = 'asin' overall_column = 'overall' # Outputs: mean_rating_column = 'meanRating' count_rating_column = 'countRating' # ------------------------------------------------------------------------- # ---------------------- Your implementation begins------------------------ data = review_data.groupBy(F.col(asin_column)).agg(F.avg(F.col(overall_column)).alias(mean_rating_column), F.count("*").alias(count_rating_column)) merged = product_data.join(data, on=asin_column, how= 'left') aggregate_func = merged.agg(F.count("*"), F.avg(F.col(mean_rating_column)), F.variance(F.col(mean_rating_column)), F.sum(F.isnull(F.col(mean_rating_column)).astype("int")), F.avg(F.col(count_rating_column)), F.variance(F.col(count_rating_column)), F.sum(F.isnull(F.col(count_rating_column)).astype("int"))).collect()[0] # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- # Calculate the values programmaticly. Do not change the keys and do not # hard-code values in the dict. Your submission will be evaluated with # different inputs. # Modify the values of the following dictionary accordingly. res = { 'count_total': None, 'mean_meanRating': None, 'variance_meanRating': None, 'numNulls_meanRating': None, 'mean_countRating': None, 'variance_countRating': None, 'numNulls_countRating': None } # Modify res: res['count_total'] = aggregate_func[0] res['mean_meanRating'] = aggregate_func[1] res['variance_meanRating'] = aggregate_func[2] res['numNulls_meanRating'] = aggregate_func[3] res['mean_countRating'] = aggregate_func[4] res['variance_countRating'] = aggregate_func[5] res['numNulls_countRating'] = aggregate_func[6] # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_1') return res # ------------------------------------------------------------------------- def task_2(data_io, product_data): # -----------------------------Column names-------------------------------- # Inputs: salesRank_column = 'salesRank' categories_column = 'categories' asin_column = 'asin' # Outputs: category_column = 'category' bestSalesCategory_column = 'bestSalesCategory' bestSalesRank_column = 'bestSalesRank' # ------------------------------------------------------------------------- # ---------------------- Your implementation begins------------------------ ### category data = product_data.withColumn(category_column, F.when(F.col(categories_column)[0][0] == '', None).otherwise(F.col(categories_column)[0][0])) category_nulls = data.filter(F.col(category_column).isNull()).count() category_distinct = data.agg(F.countDistinct(F.col(category_column))).head()[0] ### salesRank and salesCategory key_and_values = data.select(asin_column, category_column, F.map_keys(salesRank_column)[0].alias(bestSalesCategory_column), F.map_values(salesRank_column)[0].alias(bestSalesRank_column)) mean_of_salesRank = key_and_values.select(F.avg(F.col(bestSalesRank_column))).head()[0] variance_of_salesRank = key_and_values.select(F.variance(F.col(bestSalesRank_column))).head()[0] salesCategory_nulls = key_and_values.filter(F.col(bestSalesCategory_column).isNull()).count() salesCategory_distinct = key_and_values.agg(F.countDistinct(F.col(bestSalesCategory_column))).head()[0] # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- res = { 'count_total': None, 'mean_bestSalesRank': None, 'variance_bestSalesRank': None, 'numNulls_category': None, 'countDistinct_category': None, 'numNulls_bestSalesCategory': None, 'countDistinct_bestSalesCategory': None } # Modify res: res['count_total'] = data.count() res['mean_bestSalesRank'] = mean_of_salesRank res['variance_bestSalesRank'] = variance_of_salesRank res['numNulls_category'] = category_nulls res['countDistinct_category'] = category_distinct res['numNulls_bestSalesCategory'] = salesCategory_nulls res['countDistinct_bestSalesCategory'] = salesCategory_distinct # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_2') return res # ------------------------------------------------------------------------- # %load -s task_3 assignment2.py def task_3(data_io, product_data): # -----------------------------Column names-------------------------------- # Inputs: asin_column = 'asin' price_column = 'price' attribute = 'also_viewed' related_column = 'related' # Outputs: meanPriceAlsoViewed_column = 'meanPriceAlsoViewed' countAlsoViewed_column = 'countAlsoViewed' # ------------------------------------------------------------------------- # ---------------------- Your implementation begins------------------------ data = product_data.select(F.col(asin_column), F.explode(F.col(related_column))).filter(F.col('key')==attribute) data = data.select(F.col(asin_column), F.explode_outer(F.col('value'))) first_join = product_data.select(F.col(asin_column).alias("col"), F.col(price_column).alias("prices")) merged = data.join(first_join, on="col", how='left') merged = merged.groupby(F.col(asin_column)).agg(F.avg('prices').alias(meanPriceAlsoViewed_column), F.count('*').alias(countAlsoViewed_column)) merged = merged.withColumn(countAlsoViewed_column, F.when(F.col(countAlsoViewed_column)==0, None).otherwise(F.col(countAlsoViewed_column))) count_total = product_data.count() numNulls_meanPriceAlsoViewed = count_total - merged.where((merged[meanPriceAlsoViewed_column].isNotNull())).count() numNulls_countAlsoViewed = count_total - merged.where((merged[countAlsoViewed_column].isNotNull())).count() # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- res = { 'count_total': None, 'mean_meanPriceAlsoViewed': None, 'variance_meanPriceAlsoViewed': None, 'numNulls_meanPriceAlsoViewed': None, 'mean_countAlsoViewed': None, 'variance_countAlsoViewed': None, 'numNulls_countAlsoViewed': None } # Modify res: res['count_total']= count_total res['mean_meanPriceAlsoViewed'] = merged.select(F.avg(F.col(meanPriceAlsoViewed_column))).head()[0] res['variance_meanPriceAlsoViewed'] = merged.select(F.variance(F.col(meanPriceAlsoViewed_column))).head()[0] res['numNulls_meanPriceAlsoViewed'] = numNulls_meanPriceAlsoViewed res['mean_countAlsoViewed'] = merged.select(F.avg(F.col(countAlsoViewed_column))).head()[0] res['variance_countAlsoViewed'] = merged.select(F.variance(F.col(countAlsoViewed_column))).head()[0] res['numNulls_countAlsoViewed'] = numNulls_countAlsoViewed # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_3') return res # ------------------------------------------------------------------------- def task_4(data_io, product_data): # -----------------------------Column names-------------------------------- # Inputs: price_column = 'price' title_column = 'title' # Outputs: meanImputedPrice_column = 'meanImputedPrice' medianImputedPrice_column = 'medianImputedPrice' unknownImputedTitle_column = 'unknownImputedTitle' # ------------------------------------------------------------------------- # ---------------------- Your implementation begins------------------------ casted_data = product_data.withColumn(price_column, F.col(price_column).cast(T.FloatType())) mean_imputer = M.feature.Imputer(strategy='mean',inputCols=[price_column], outputCols=[meanImputedPrice_column]) mean_model = mean_imputer.fit(casted_data) meanImputedPrice = mean_model.transform(casted_data.select('asin', price_column)) mean_meanImputedPrice = meanImputedPrice.select(F.avg(F.col(meanImputedPrice_column))).head()[0] variance_meanImputedPrice = meanImputedPrice.select(F.variance(F.col(meanImputedPrice_column))).head()[0] numNulls_meanImputedPrice = meanImputedPrice.filter(F.col(meanImputedPrice_column).isNull()).count() median_imputer = M.feature.Imputer(strategy='median',inputCols=[price_column], outputCols=[medianImputedPrice_column]) median_model = median_imputer.fit(casted_data) medianImputedPrice = median_model.transform(meanImputedPrice) mean_medianImputedPrice = medianImputedPrice.select(F.avg(F.col(medianImputedPrice_column))).head()[0] variance_medianImputedPrice = medianImputedPrice.select(F.variance(F.col(medianImputedPrice_column))).head()[0] numNulls_medianImputedPrice = medianImputedPrice.filter(F.col(medianImputedPrice_column).isNull()).count() unknownImputedTitle = product_data.select(F.col(title_column).alias(unknownImputedTitle_column)).fillna('unknown').replace('','unknown') numUnknowns_unknownImputedTitle = unknownImputedTitle.filter(F.col(unknownImputedTitle_column)=='unknown').count() # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- res = { 'count_total': None, 'mean_meanImputedPrice': None, 'variance_meanImputedPrice': None, 'numNulls_meanImputedPrice': None, 'mean_medianImputedPrice': None, 'variance_medianImputedPrice': None, 'numNulls_medianImputedPrice': None, 'numUnknowns_unknownImputedTitle': None } # Modify res: res['count_total'] = medianImputedPrice.count() res['mean_meanImputedPrice'] = mean_meanImputedPrice res['variance_meanImputedPrice'] = variance_meanImputedPrice res['numNulls_meanImputedPrice'] = numNulls_meanImputedPrice res['mean_medianImputedPrice'] = mean_medianImputedPrice res['variance_medianImputedPrice'] = variance_medianImputedPrice res['numNulls_medianImputedPrice'] = numNulls_medianImputedPrice res['numUnknowns_unknownImputedTitle'] = numUnknowns_unknownImputedTitle # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_4') return res # ------------------------------------------------------------------------- def task_5(data_io, product_processed_data, word_0, word_1, word_2): # -----------------------------Column names-------------------------------- # Inputs: title_column = 'title' # Outputs: titleArray_column = 'titleArray' titleVector_column = 'titleVector' # ------------------------------------------------------------------------- # ---------------------- Your implementation begins------------------------ step = product_processed_data.select(title_column, F.lower(F.col(title_column)).alias('temp')) split = step.withColumn(titleArray_column, F.split(F.col('temp'), ' ').cast("array")) word2vec = M.feature.Word2Vec(minCount=100, vectorSize=16, seed=SEED, numPartitions=4, inputCol=titleArray_column, outputCol=titleVector_column) model = word2vec.fit(split) # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- res = { 'count_total': None, 'size_vocabulary': None, 'word_0_synonyms': [(None, None), ], 'word_1_synonyms': [(None, None), ], 'word_2_synonyms': [(None, None), ] } # Modify res: res['count_total'] = split.count() res['size_vocabulary'] = model.getVectors().count() for name, word in zip( ['word_0_synonyms', 'word_1_synonyms', 'word_2_synonyms'], [word_0, word_1, word_2] ): res[name] = model.findSynonymsArray(word, 10) # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_5') return res # ------------------------------------------------------------------------- def task_6(data_io, product_processed_data): # -----------------------------Column names-------------------------------- # Inputs: category_column = 'category' # Outputs: categoryIndex_column = 'categoryIndex' categoryOneHot_column = 'categoryOneHot' categoryPCA_column = 'categoryPCA' # ------------------------------------------------------------------------- # ---------------------- Your implementation begins------------------------ count_total = product_processed_data.count() indexer = M.feature.StringIndexer(inputCol=category_column, outputCol=categoryIndex_column, handleInvalid="error", stringOrderType="frequencyDesc").fit(product_processed_data).transform(product_processed_data) ohe = M.feature.OneHotEncoderEstimator(inputCols=[categoryIndex_column], outputCols=[categoryOneHot_column], dropLast=False).fit(indexer).transform(indexer) pca = M.feature.PCA(k=15, inputCol=categoryOneHot_column, outputCol=categoryPCA_column).fit(ohe).transform(ohe) meanVector_categoryOneHot = ohe.agg(M.stat.Summarizer.mean(F.col(categoryOneHot_column)).alias('ohe')) meanVector_categoryPCA = pca.agg(M.stat.Summarizer.mean(F.col(categoryPCA_column)).alias('pca')) # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- res = { 'count_total': None, 'meanVector_categoryOneHot': [None, ], 'meanVector_categoryPCA': [None, ] } # Modify res: res['count_total'] = count_total res['meanVector_categoryOneHot'] = meanVector_categoryOneHot.head().asDict()["ohe"] res['meanVector_categoryPCA'] = meanVector_categoryPCA.head().asDict()["pca"] # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_6') return res # ------------------------------------------------------------------------- def task_7(data_io, train_data, test_data): # ---------------------- Your implementation begins------------------------ tree_regression = M.regression.DecisionTreeRegressor(maxDepth=5, featuresCol='features', labelCol='overall') model = tree_regression.fit(train_data) predicted = model.transform(test_data) # ------------------------------------------------------------------------- evaluator = M.evaluation.RegressionEvaluator(predictionCol='prediction', labelCol='overall', metricName='rmse') RMSE = evaluator.evaluate(predicted) # ---------------------- Put results in res dict -------------------------- res = { 'test_rmse': None } # Modify res: res['test_rmse'] = RMSE # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_7') return res # ------------------------------------------------------------------------- def task_8(data_io, train_data, test_data): # ---------------------- Your implementation begins------------------------ tree_regression = M.regression.DecisionTreeRegressor(maxDepth=5, featuresCol='features', labelCol='overall') evaluator = M.evaluation.RegressionEvaluator(predictionCol='prediction', labelCol='overall', metricName='rmse') tuning_grid = M.tuning.ParamGridBuilder().addGrid(tree_regression.maxDepth, [5,7,9,12]).build() tvs = M.tuning.TrainValidationSplit(estimator = tree_regression, estimatorParamMaps = tuning_grid, evaluator = evaluator, trainRatio=0.75) model = tvs.fit(train_data) best_predictions = model.bestModel.transform(test_data) test_RMSE = evaluator.evaluate(best_predictions) # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- res = { 'test_rmse': None, 'valid_rmse_depth_5': None, 'valid_rmse_depth_7': None, 'valid_rmse_depth_9': None, 'valid_rmse_depth_12': None, } # Modify res: res['test_rmse'] = test_RMSE for name, rmse in zip( ['valid_rmse_depth_5', 'valid_rmse_depth_7', 'valid_rmse_depth_9', 'valid_rmse_depth_12'], model.validationMetrics ): res[name] = rmse # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_8') return res # -------------------------------------------------------------------------