import sys import matplotlib.pyplot as plt import pandas as pd import numpy as np from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN, KMeansSMOTE from imblearn.under_sampling import RandomUnderSampler from imblearn.pipeline import Pipeline from sklearn.svm import SVC from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import auc, precision_recall_curve, f1_score, make_scorer from sklearn.model_selection import GridSearchCV, train_test_split, RandomizedSearchCV from sklearn.neural_network import MLPClassifier from sklearn.preprocessing import MinMaxScaler n_splits = 5 def custom_auc(y_test, y_proba): """define scoring function""" precision, recall, _ = precision_recall_curve(y_test, y_proba, pos_label=1) return auc(recall, precision) # to be standard sklearn's scorer my_auc = make_scorer(custom_auc, greater_is_better=True, needs_proba=True) def get_cv_grid(X, Y, model, model_params, scoring_metric): """Generic method that computes grid for the given model and its params""" grid = GridSearchCV(model, model_params, cv=n_splits, scoring=scoring_metric, n_jobs=-1, verbose=2) grid.fit(X, Y) return grid def get_randomized_cv_grid(X, Y, model, model_params, scoring_metric): grid = RandomizedSearchCV(model, model_params, n_iter=100, cv=n_splits, verbose=2, n_jobs=-1, scoring=scoring_metric) grid.fit(X, Y) return grid def print_best_model_cv_results(grid, model_name, f1_value, auc_value): """Formats the computed grid in a tabular format""" print() print(model_name) print('-' * 50) print('Best params: {}'.format(grid.best_params_)) print('Best Model: {}'.format(grid.best_estimator_)) print('CV Results :-') print("{:<5} {:<10}".format('Split', 'Test Score')) for i in range(grid.n_splits_): print('Fold{} {:<10}'.format(i + 1, grid.cv_results_['split{}_test_score'.format(i)][grid.best_index_])) print('Mean test score: {}'.format(grid.cv_results_['mean_test_score'][grid.best_index_])) print('Standard Deviation: {}'.format(grid.cv_results_['std_test_score'][grid.best_index_])) print('f1=%.3f auc=%.3f' % (f1_value, auc_value)) print('-' * 50) def fit_logistic_reg(x_train, y_train, x_test, y_test): model = LogisticRegression(max_iter=1000) model.fit(x_train, y_train) # predict probabilities probs = model.predict_proba(x_test) probs = probs[:, 1] # predict class values y_pred = model.predict(x_test) precision, recall, _ = precision_recall_curve(y_test, probs) f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision) # summarize scores print('Logistic: f1=%.3f auc=%.3f' % (f1_value, auc_value)) # plot the precision-recall curves # plt.plot(recall, precision, marker='.', label='Logistic') def fit_gradient_boosting(x_train, y_train, x_test, y_test): # Create the param grid # param_grid = {'n_estimators': [100, 200, 300], # 'max_depth': [10, 20, 30], # 'learning_rate': [0.1, 0.5, 1]} # # model = GradientBoostingClassifier() # grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc) # model = grid.best_estimator_ # Hard coding the best model to reduce time taken to run this script # To find the best hyper parameters uncomment the above code and run this script model = GradientBoostingClassifier(learning_rate=0.5, max_depth=10) model.fit(x_train, y_train) # predict probabilities probs = model.predict_proba(x_test) probs = probs[:, 1] # predict class values y_pred = model.predict(x_test) precision, recall, _ = precision_recall_curve(y_test, probs) f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision) # summarize scores # print_best_model_cv_results(grid, 'Gradient Boosting', f1_value, auc_value) plt.plot(recall, precision, marker='.', label='Gradient Boosting (auc={})'.format(round(auc_value, 3))) def fit_random_forests(x_train, y_train, x_test, y_test): # # Create the param grid # param_grid = {'n_estimators': [int(x) for x in np.linspace(start=100, stop=500, num=5)], # 'max_depth': [int(x) for x in np.linspace(10, 50, num=5)], # 'model__min_samples_split': [2, 4], # 'model__min_samples_leaf': [1, 2] # } # # model = RandomForestClassifier() # grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc) # model = grid.best_estimator_ # Hard coding the best model to reduce time taken to run this script # To find the best hyper parameters uncomment the above code and run this script model = RandomForestClassifier(max_depth=30, min_samples_split=4, n_estimators=300) model.fit(x_train, y_train) # predict probabilities probs = model.predict_proba(x_test) probs = probs[:, 1] # predict class values y_pred = model.predict(x_test) precision, recall, _ = precision_recall_curve(y_test, probs) f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision) # summarize scores # print_best_model_cv_results(grid, 'Random Forest', f1_value, auc_value) plt.plot(recall, precision, marker='.', label='Random Forest (auc={})'.format(round(auc_value, 3))) def fit_adaboost(x_train, y_train, x_test, y_test): # Create the param grid param_grid = {'model__n_estimators': [50, 100, 200, 300], 'model__learning_rate': [0.01, 0.1, 0.3, 0.6, 1]} model = AdaBoostClassifier() # define pipeline # steps = [('under', RandomUnderSampler()), ('model', AdaBoostClassifier())] # steps = [('over', RandomOverSampler(sampling_strategy=0.8)), ('model', AdaBoostClassifier())] steps = [('over', RandomOverSampler(sampling_strategy=0.1), ('under', RandomUnderSampler(sampling_strategy=0.5))), ('model', AdaBoostClassifier())] # steps = [('smote', SMOTE()), ('model', AdaBoostClassifier())] # steps = [('smote', ADASYN()), ('model', AdaBoostClassifier())] model = Pipeline(steps=steps) grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc) model = grid.best_estimator_ # Hard coding the best model to reduce time taken to run this script # To find the best hyper parameters uncomment the above code and run this script # model = GradientBoostingClassifier(learning_rate=0.5, max_depth=10) model.fit(x_train, y_train) # predict probabilities probs = model.predict_proba(x_test) probs = probs[:, 1] # predict class values y_pred = model.predict(x_test) precision, recall, _ = precision_recall_curve(y_test, probs) f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision) # summarize scores print_best_model_cv_results(grid, 'AdaBoost', f1_value, auc_value) plt.plot(recall, precision, marker='.', label='AdaBoost (auc={})'.format(round(auc_value, 3))) def fit_svm(x_train, y_train, x_test, y_test): # param_grid = [{'model__C': [0.1, 1, 10, 100, 1000], # 'model__gamma': [1, 0.1, 0.01, 0.001, 0.0001], # 'model__kernel': ['rbf']}, # {'model__C': [0.1, 1, 10, 100, 1000], # 'model__gamma': [1, 0.1, 0.01, 0.001, 0.0001], # 'model__kernel': ['poly'], # 'model__degree': [2, 3, 4]}] # # # define pipeline # steps = [('over', RandomOverSampler()), ('model', SVC(probability=True))] # pipeline = Pipeline(steps=steps) # # grid = get_cv_grid(x_train, y_train, pipeline, param_grid, my_auc) # model = grid.best_estimator_ # Hard coding the best model to reduce time taken to run this script # To find the best hyper parameters uncomment the above code and run this script model = Pipeline(steps=[('over', RandomOverSampler(sampling_strategy=0.8)), ('model', SVC(C=1000, gamma=0.01, probability=True))]) model.fit(x_train, y_train) # predict probabilities probs = model.predict_proba(x_test) probs = probs[:, 1] # predict class values y_pred = model.predict(x_test) precision, recall, _ = precision_recall_curve(y_test, probs) f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision) # summarize scores # print_best_model_cv_results(grid, 'SVM', f1_value, auc_value) # plot the precision-recall curves plt.plot(recall, precision, marker='.', label='SVM (auc={})'.format(round(auc_value, 3))) def fit_MLP(x_train, y_train, x_test, y_test): # param_grid = { # 'hidden_layer_sizes': [(10,30,10),(20,)], # 'activation': ['tanh', 'relu'], # 'solver': ['sgd', 'adam'], # 'alpha': [0.0001, 0.05], # 'learning_rate': ['constant','adaptive'], # } # # model = MLPClassifier(max_iter=2000) # grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc) # model = grid.best_estimator_ # Hard coding the best model to reduce time taken to run this script # To find the best hyper parameters uncomment the above code and run this script model = MLPClassifier(alpha=0.05, hidden_layer_sizes=(20,), max_iter=2000) model.fit(x_train, y_train) # predict probabilities probs = model.predict_proba(x_test) probs = probs[:, 1] # predict class values y_pred = model.predict(x_test) precision, recall, _ = precision_recall_curve(y_test, probs) f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision) # summarize scores # print_best_model_cv_results(grid, 'Multi Layer Perceptron', f1_value, auc_value) # plot the precision-recall curves plt.plot(recall, precision, marker='.', label='Multi Layer Perceptron (auc={})'.format(round(auc_value, 3))) def main(args): # Read the input data input_data = pd.read_csv(args[0], sep='\t', header=None) output_X = pd.read_csv(args[1], sep='\t') Y = input_data[71] X = input_data.drop(71, axis=1) # Normalize the 1st, 2nd, 3rd, 4th, and 6th columns whose scales are different from the rest scaler = MinMaxScaler() scaled = pd.DataFrame(scaler.fit_transform(X[[0, 1, 2, 3, 5]])) X[0], X[1], X[2], X[3], X[5] = scaled[0], scaled[1], scaled[2], scaled[3], scaled[4] x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1) # fit_logistic_reg(x_train, y_train, x_test, y_test) fit_random_forests(x_train, y_train, x_test, y_test) fit_svm(x_train, y_train, x_test, y_test) fit_gradient_boosting(x_train, y_train, x_test, y_test) fit_MLP(x_train, y_train, x_test, y_test) # fit_adaboost(x_train, y_train, x_test, y_test) # Best Model model = RandomForestClassifier(max_depth=30, min_samples_split=4, n_estimators=300) model.fit(X, Y) y_output_prob = model.predict_proba(output_X) np.savetxt("A5_predictions_group22.txt", y_output_prob[:, 1], delimiter=",") # plot the precision-recall curves no_skill = len(y_test[y_test == 1]) / len(y_test) plt.plot([0, 1], [no_skill, no_skill], linestyle='--', label='No Skill') # axis labels plt.xlabel('Recall') plt.ylabel('Precision') plt.title('Precision Recall Curve') # show the legend plt.legend() # show the plot plt.show() if __name__ == '__main__': main(sys.argv[1:])