fake-news-mlt / evaluation_testing.py
evaluation_testing.py
Raw
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer, TfidfVectorizer
from sklearn import metrics
from sklearn.feature_selection import SelectFromModel
from sklearn.linear_model import PassiveAggressiveClassifier, Perceptron, RidgeClassifier, SGDClassifier
from sklearn.naive_bayes import BernoulliNB, ComplementNB, MultinomialNB
from sklearn.neighbors import KNeighborsClassifier, NearestCentroid
from sklearn.pipeline import Pipeline
from sklearn.svm import LinearSVC
from itertools import product

from main import pd, test_set
from ensemble_voter import EnsembleVoter

# test many classifiers on a given set of data
# modified from https://scikit-learn.org/stable/auto_examples/text/plot_document_classification_20newsgroups.html
def test_classifiers(X_train, y_train, X_test, y_test):
    vectorizer = TfidfVectorizer()
    X_train = vectorizer.fit_transform(X_train)
    X_test = vectorizer.transform(X_test)

    def benchmark(clf):
        clf.fit(X_train, y_train)
        pred = clf.predict(X_test)
        score = metrics.accuracy_score(y_test, pred)
        f1_score = metrics.f1_score(y_test, pred, average="micro", labels=["real", "fake"])
        clf_descr = str(clf).split("(")[0]
        return clf_descr, score, f1_score

    results = []
    for clf, name in (
            (RidgeClassifier(tol=1e-2, solver="sag"), "Ridge Classifier"),
            (Perceptron(max_iter=50), "Perceptron"),
            (PassiveAggressiveClassifier(max_iter=50), "Passive-Aggressive"),
            (KNeighborsClassifier(n_neighbors=10), "kNN"),
            (RandomForestClassifier(), "Random forest"),
    ):
        results.append(benchmark(clf))

    for penalty in ["l2", "l1"]:
        # Train Liblinear model
        results.append(benchmark(LinearSVC(penalty=penalty, dual=False, tol=1e-3)))

        # Train SGD model
        results.append(benchmark(SGDClassifier(alpha=0.0001, max_iter=50, penalty=penalty)))

    # Train SGD with Elastic Net penalty

    results.append(
        benchmark(SGDClassifier(alpha=0.0001, max_iter=50, penalty="elasticnet"))
    )

    # Train NearestCentroid without threshold

    results.append(benchmark(NearestCentroid()))

    # Train sparse Naive Bayes classifiers

    results.append(benchmark(MultinomialNB()))
    results.append(benchmark(BernoulliNB()))
    results.append(benchmark(ComplementNB()))

    # The smaller C, the stronger the regularization.
    # The more regularization, the more sparsity.
    results.append(
        benchmark(
            Pipeline(
                [
                    (
                        "feature_selection",
                        SelectFromModel(LinearSVC(penalty="l1", dual=False, tol=1e-3)),
                    ),
                    ("classification", LinearSVC(penalty="l2")),
                ]
            )
        )
    )

    return list(filter(lambda x: x[2] > 0.75, results))


# test different combinations of preprocessing options with all classifiers to find best
def test_config_combinations():
    opts = {"url": (True, False), "punctuation": (True, False), "emoji": (True, False), "contract": (True, False), "stops": (True, False)}
    keys = opts.keys()
    values = (opts[key] for key in keys)
    combinations = [dict(zip(keys, combination)) for combination in product(*values)]
    results = []
    for i, combo in enumerate(combinations):
        print(f"Trying combo {i}")
        training = EnsembleVoter.process_dataframe(pd, **combo)
        test = EnsembleVoter.process_dataframe(test_set, **combo)
        # make_cv_bayes_estimator(pd, test)
        X_train, y_train, X_test, y_test = training["processedTweets"], training["label"], test["processedTweets"], test["label"]
        results += (test_classifiers(X_train, y_train, X_test, y_test), combo)
    best_per_classifier = {}
    for i in range(0, len(results), 2):
        for name, _, f1_micro in results[i]:
            config = results[i+1]
            if best_per_classifier.get(name, (0, {}))[0] < f1_micro:
                best_per_classifier[name] = (f1_micro, config)

    for name, scores in sorted(best_per_classifier.items(), reverse=True, key=lambda x: x[1][0]):
        print(f"{name}: {round(scores[0], 3)} - {scores[1]}")

def tune_multinomial(training, testing):
    training = EnsembleVoter.process_dataframe(training, url=False, punctuation=False, emoji=False)
    testing = EnsembleVoter.process_dataframe(testing, url=False, punctuation=False, emoji=False)
    cv_params = {"ngram_range": [(1, 1), (1, 2), (1, 3)],
                 "max_df": (0.5, 0.75, 1.0)}
    cv_keys = cv_params.keys()
    cv_values = (cv_params[key] for key in cv_keys)
    cv_combinations = [dict(zip(cv_keys, combination)) for combination in product(*cv_values)]
    tf_params = {'use_idf': (True, False),
                 'norm': ('l1', 'l2')}
    tf_keys = tf_params.keys()
    tf_values = (tf_params[key] for key in tf_params)
    tf_combinations = [dict(zip(tf_keys, combination)) for combination in product(*tf_values)]
    clf_params = {"alpha": (1, 0.001, 0.0001, 0.00001, 0.000001)}
    clf_keys = clf_params.keys()
    clf_values = (clf_params[key] for key in clf_params)
    clf_combinations = [dict(zip(clf_keys, combination)) for combination in product(*clf_values)]
    best_f1 = 0
    best_combos = ()
    for clf_combo in clf_combinations:
        print(clf_combo)
        for tf_combo in tf_combinations:
            print(tf_combo)
            for cv_combo in cv_combinations:
                print(cv_combo)
                estimator = Pipeline([
                    ('vect', CountVectorizer(**cv_combo)),
                    ('tfidf', TfidfTransformer(**tf_combo)),
                    ('clf', MultinomialNB(**clf_combo)),
                ])
                estimator.fit(training["processedTweets"], training["label"])
                predictions = estimator.predict(testing["processedTweets"])
                f1 = metrics.f1_score(testing["label"], predictions, average="micro", labels=["real", "fake"])
                if f1 > best_f1:
                    best_f1 = f1
                    best_combos = (clf_combo, tf_combo, cv_combo)
    print(best_f1)
    print(best_combos)


def tune_clf(clf, clf_params, tf_params, training, testing, dataset_kwargs):
    training = EnsembleVoter.process_dataframe(training, **dataset_kwargs)
    testing = EnsembleVoter.process_dataframe(testing, **dataset_kwargs)
    tf_keys = tf_params.keys()
    tf_values = (tf_params[key] for key in tf_params)
    tf_combinations = [dict(zip(tf_keys, combination)) for combination in product(*tf_values)]
    clf_keys = clf_params.keys()
    clf_values = (clf_params[key] for key in clf_params)
    clf_combinations = [dict(zip(clf_keys, combination)) for combination in product(*clf_values)]
    best_f1 = 0
    best_combos = ()
    for clf_combo in clf_combinations:
        print(clf_combo)
        for tf_combo in tf_combinations:
            print(tf_combo)
            estimator = Pipeline([
                ('tfidf', TfidfVectorizer(**tf_combo)),
                ('clf', clf(**clf_combo)),
            ])
            estimator.fit(training["processedTweets"], training["label"])
            predictions = estimator.predict(testing["processedTweets"])
            f1 = metrics.f1_score(testing["label"], predictions, average="micro", labels=["real", "fake"])
            if f1 > best_f1:
                best_f1 = f1
                best_combos = (clf_combo, tf_combo)
    print(best_f1)
    print(best_combos)

if __name__ == "__main__":
    # test knn
    """0.8687795010114634
    ({'weights': 'uniform', 'n_neighbors': 8, 'n_jobs': -1}, {'use_idf': True, 'norm': 'l2', 'ngram_range': (1, 2), 'max_df': 0.75})"""
    knn_params = {"weights": ("uniform",),
                  "n_neighbors": (7, 8, 9),
                  "n_jobs": (-1,),
                  "p": (1, 2)}
    knn_tf_params = {'use_idf': (True, False),
                 'norm': ('l2', 'l1'),
                 "max_df": (0.7, 0.75, 0.8, 1)
                 }
    tune_clf(KNeighborsClassifier, knn_params, knn_tf_params, pd, test_set,  {"emoji": False})

    # tune multinomial
    multinomial_params = {"alpha": (1, 0.001, 0.0001, 0.00001, 0.000001)}
    # tune_clf(MultinomialNB, multinomial_params, knn_tf_params, pd, test_set, {'url': False, 'punctuation': False, 'emoji': False})"

    # tune perceptron
    """0.8755259942989004
    ({'penalty': 'l2', 'alpha': 0.0001, 'n_jobs': -1, 'max_iter': 50}, {'use_idf': True, 'norm': 'l1', 'ngram_range': (1, 1), 'max_df': 0.5})"""
    perceptron_params = {"penalty": ("l2", "l1", "elasticnet"),
                         "alpha": (0.0001, 0.00001, 0.001, 0.1),
                         "n_jobs": (-1,),
                         "max_iter": (50, 100, 200)}
    # tune_clf(Perceptron, perceptron_params, knn_tf_params, pd, test_set, {})

    #tune passive aggressive classifier
    """0.8310415248468347
    ({'C': 1, 'max_iter': 50, 'n_jobs': -1, 'fit_intercept': True}, {'use_idf': True, 'norm': 'l2', 'ngram_range': (1, 1), 'max_df': 1.0})"""
    passive_params = {"C": (0.1, 0.5, 1, 2),
                      "max_iter": (50, 100, 200),
                      "n_jobs": (-1,),
                      "fit_intercept": (True, False)}
    # tune_clf(PassiveAggressiveClassifier, passive_params, knn_tf_params, pd, test_set, {})

    # tune linearsvc
    """0.8267022696929239
    ({'C': 2, 'penalty': 'l1', 'fit_intercept': True, 'max_iter': 50, 'dual': False}, {'use_idf': True, 'norm': 'l1', 'ngram_range': (1, 1), 'max_df': 1.0})"""
    svc_params = {"C": (0.5, 1, 2),
                  "penalty": ("l1", "l2"),
                  "fit_intercept": (True, False),
                  "max_iter": (50, 100, 200),
                  "dual": (False,)}
    # tune_clf(LinearSVC, svc_params, knn_tf_params, pd, test_set, {"punctuation": False})