ML-Classification / Classification.py
Classification.py
Raw
import sys

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN, KMeansSMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import auc, precision_recall_curve, f1_score, make_scorer
from sklearn.model_selection import GridSearchCV, train_test_split, RandomizedSearchCV
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import MinMaxScaler

n_splits = 5


def custom_auc(y_test, y_proba):
    """define scoring function"""
    precision, recall, _ = precision_recall_curve(y_test, y_proba, pos_label=1)
    return auc(recall, precision)


# to be standard sklearn's scorer
my_auc = make_scorer(custom_auc, greater_is_better=True, needs_proba=True)


def get_cv_grid(X, Y, model, model_params, scoring_metric):
    """Generic method that computes grid for the given model and its params"""

    grid = GridSearchCV(model, model_params, cv=n_splits, scoring=scoring_metric, n_jobs=-1, verbose=2)
    grid.fit(X, Y)
    return grid


def get_randomized_cv_grid(X, Y, model, model_params, scoring_metric):
    grid = RandomizedSearchCV(model, model_params, n_iter=100, cv=n_splits, verbose=2, n_jobs=-1,
                              scoring=scoring_metric)
    grid.fit(X, Y)
    return grid


def print_best_model_cv_results(grid, model_name, f1_value, auc_value):
    """Formats the computed grid in a tabular format"""

    print()
    print(model_name)
    print('-' * 50)
    print('Best params: {}'.format(grid.best_params_))
    print('Best Model: {}'.format(grid.best_estimator_))
    print('CV Results :-')
    print("{:<5} {:<10}".format('Split', 'Test Score'))
    for i in range(grid.n_splits_):
        print('Fold{} {:<10}'.format(i + 1, grid.cv_results_['split{}_test_score'.format(i)][grid.best_index_]))
    print('Mean test score: {}'.format(grid.cv_results_['mean_test_score'][grid.best_index_]))
    print('Standard Deviation: {}'.format(grid.cv_results_['std_test_score'][grid.best_index_]))
    print('f1=%.3f auc=%.3f' % (f1_value, auc_value))
    print('-' * 50)


def fit_logistic_reg(x_train, y_train, x_test, y_test):
    model = LogisticRegression(max_iter=1000)
    model.fit(x_train, y_train)
    # predict probabilities
    probs = model.predict_proba(x_test)
    probs = probs[:, 1]
    # predict class values
    y_pred = model.predict(x_test)
    precision, recall, _ = precision_recall_curve(y_test, probs)
    f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision)
    # summarize scores
    print('Logistic: f1=%.3f auc=%.3f' % (f1_value, auc_value))
    # plot the precision-recall curves
    # plt.plot(recall, precision, marker='.', label='Logistic')


def fit_gradient_boosting(x_train, y_train, x_test, y_test):
    # Create the param grid
    # param_grid = {'n_estimators': [100, 200, 300],
    #               'max_depth': [10, 20, 30],
    #               'learning_rate': [0.1, 0.5, 1]}
    #
    # model = GradientBoostingClassifier()
    # grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc)
    # model = grid.best_estimator_

    # Hard coding the best model to reduce time taken to run this script
    # To find the best hyper parameters uncomment the above code and run this script
    model = GradientBoostingClassifier(learning_rate=0.5, max_depth=10)
    model.fit(x_train, y_train)
    # predict probabilities
    probs = model.predict_proba(x_test)
    probs = probs[:, 1]
    # predict class values
    y_pred = model.predict(x_test)
    precision, recall, _ = precision_recall_curve(y_test, probs)
    f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision)
    # summarize scores
    # print_best_model_cv_results(grid, 'Gradient Boosting', f1_value, auc_value)
    plt.plot(recall, precision, marker='.', label='Gradient Boosting (auc={})'.format(round(auc_value, 3)))


def fit_random_forests(x_train, y_train, x_test, y_test):
    # # Create the param grid
    # param_grid = {'n_estimators': [int(x) for x in np.linspace(start=100, stop=500, num=5)],
    #               'max_depth': [int(x) for x in np.linspace(10, 50, num=5)],
    #               'model__min_samples_split': [2, 4],
    #               'model__min_samples_leaf': [1, 2]
    #               }
    #
    # model = RandomForestClassifier()
    # grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc)
    # model = grid.best_estimator_

    # Hard coding the best model to reduce time taken to run this script
    # To find the best hyper parameters uncomment the above code and run this script
    model = RandomForestClassifier(max_depth=30, min_samples_split=4, n_estimators=300)

    model.fit(x_train, y_train)
    # predict probabilities
    probs = model.predict_proba(x_test)
    probs = probs[:, 1]
    # predict class values
    y_pred = model.predict(x_test)
    precision, recall, _ = precision_recall_curve(y_test, probs)
    f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision)
    # summarize scores
    # print_best_model_cv_results(grid, 'Random Forest', f1_value, auc_value)
    plt.plot(recall, precision, marker='.', label='Random Forest (auc={})'.format(round(auc_value, 3)))


def fit_adaboost(x_train, y_train, x_test, y_test):
    # Create the param grid
    param_grid = {'model__n_estimators': [50, 100, 200, 300],
                  'model__learning_rate': [0.01, 0.1, 0.3, 0.6, 1]}

    model = AdaBoostClassifier()
    # define pipeline
    # steps = [('under', RandomUnderSampler()), ('model', AdaBoostClassifier())]
    # steps = [('over', RandomOverSampler(sampling_strategy=0.8)), ('model', AdaBoostClassifier())]
    steps = [('over', RandomOverSampler(sampling_strategy=0.1), ('under', RandomUnderSampler(sampling_strategy=0.5))),
             ('model', AdaBoostClassifier())]

    # steps = [('smote', SMOTE()), ('model', AdaBoostClassifier())]
    # steps = [('smote', ADASYN()), ('model', AdaBoostClassifier())]
    model = Pipeline(steps=steps)
    grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc)
    model = grid.best_estimator_

    # Hard coding the best model to reduce time taken to run this script
    # To find the best hyper parameters uncomment the above code and run this script
    # model = GradientBoostingClassifier(learning_rate=0.5, max_depth=10)
    model.fit(x_train, y_train)
    # predict probabilities
    probs = model.predict_proba(x_test)
    probs = probs[:, 1]
    # predict class values
    y_pred = model.predict(x_test)
    precision, recall, _ = precision_recall_curve(y_test, probs)
    f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision)
    # summarize scores
    print_best_model_cv_results(grid, 'AdaBoost', f1_value, auc_value)
    plt.plot(recall, precision, marker='.', label='AdaBoost (auc={})'.format(round(auc_value, 3)))


def fit_svm(x_train, y_train, x_test, y_test):
    # param_grid = [{'model__C': [0.1, 1, 10, 100, 1000],
    #                'model__gamma': [1, 0.1, 0.01, 0.001, 0.0001],
    #                'model__kernel': ['rbf']},
    #               {'model__C': [0.1, 1, 10, 100, 1000],
    #                'model__gamma': [1, 0.1, 0.01, 0.001, 0.0001],
    #                'model__kernel': ['poly'],
    #                'model__degree': [2, 3, 4]}]
    #
    # # define pipeline
    # steps = [('over', RandomOverSampler()), ('model', SVC(probability=True))]
    # pipeline = Pipeline(steps=steps)
    #
    # grid = get_cv_grid(x_train, y_train, pipeline, param_grid, my_auc)
    # model = grid.best_estimator_

    # Hard coding the best model to reduce time taken to run this script
    # To find the best hyper parameters uncomment the above code and run this script
    model = Pipeline(steps=[('over', RandomOverSampler(sampling_strategy=0.8)),
                            ('model', SVC(C=1000, gamma=0.01, probability=True))])

    model.fit(x_train, y_train)
    # predict probabilities
    probs = model.predict_proba(x_test)
    probs = probs[:, 1]
    # predict class values
    y_pred = model.predict(x_test)
    precision, recall, _ = precision_recall_curve(y_test, probs)
    f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision)
    # summarize scores
    # print_best_model_cv_results(grid, 'SVM', f1_value, auc_value)
    # plot the precision-recall curves
    plt.plot(recall, precision, marker='.', label='SVM (auc={})'.format(round(auc_value, 3)))


def fit_MLP(x_train, y_train, x_test, y_test):
    # param_grid = {
    #     'hidden_layer_sizes': [(10,30,10),(20,)],
    #     'activation': ['tanh', 'relu'],
    #     'solver': ['sgd', 'adam'],
    #     'alpha': [0.0001, 0.05],
    #     'learning_rate': ['constant','adaptive'],
    # }
    #
    # model = MLPClassifier(max_iter=2000)
    # grid = get_cv_grid(x_train, y_train, model, param_grid, my_auc)
    # model = grid.best_estimator_

    # Hard coding the best model to reduce time taken to run this script
    # To find the best hyper parameters uncomment the above code and run this script
    model = MLPClassifier(alpha=0.05, hidden_layer_sizes=(20,), max_iter=2000)

    model.fit(x_train, y_train)
    # predict probabilities
    probs = model.predict_proba(x_test)
    probs = probs[:, 1]
    # predict class values
    y_pred = model.predict(x_test)
    precision, recall, _ = precision_recall_curve(y_test, probs)
    f1_value, auc_value = f1_score(y_test, y_pred), auc(recall, precision)
    # summarize scores
    # print_best_model_cv_results(grid, 'Multi Layer Perceptron', f1_value, auc_value)
    # plot the precision-recall curves
    plt.plot(recall, precision, marker='.', label='Multi Layer Perceptron (auc={})'.format(round(auc_value, 3)))


def main(args):
    # Read the input data
    input_data = pd.read_csv(args[0], sep='\t', header=None)
    output_X = pd.read_csv(args[1], sep='\t')

    Y = input_data[71]
    X = input_data.drop(71, axis=1)

    # Normalize the 1st, 2nd, 3rd, 4th, and 6th columns whose scales are different from the rest
    scaler = MinMaxScaler()
    scaled = pd.DataFrame(scaler.fit_transform(X[[0, 1, 2, 3, 5]]))
    X[0], X[1], X[2], X[3], X[5] = scaled[0], scaled[1], scaled[2], scaled[3], scaled[4]

    x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1)

    # fit_logistic_reg(x_train, y_train, x_test, y_test)
    fit_random_forests(x_train, y_train, x_test, y_test)
    fit_svm(x_train, y_train, x_test, y_test)
    fit_gradient_boosting(x_train, y_train, x_test, y_test)
    fit_MLP(x_train, y_train, x_test, y_test)
    # fit_adaboost(x_train, y_train, x_test, y_test)

    # Best Model
    model = RandomForestClassifier(max_depth=30, min_samples_split=4, n_estimators=300)
    model.fit(X, Y)
    y_output_prob = model.predict_proba(output_X)
    np.savetxt("A5_predictions_group22.txt", y_output_prob[:, 1], delimiter=",")

    # plot the precision-recall curves
    no_skill = len(y_test[y_test == 1]) / len(y_test)
    plt.plot([0, 1], [no_skill, no_skill], linestyle='--', label='No Skill')
    # axis labels
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision Recall Curve')
    # show the legend
    plt.legend()
    # show the plot
    plt.show()


if __name__ == '__main__':
    main(sys.argv[1:])