Unsupervised-ML / Scenario3 / Test-MLP-AE / test_dataset_3_mlp_ae_anomalies_detection.py
test_dataset_3_mlp_ae_anomalies_detection.py
Raw

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model

"""# I. Import .CSV data"""

from google.colab import files
setdate = files.upload()

# reading the input states and parameters from .csv files

no_faults_dataframe = pd.read_csv("nofaults_AE.csv")
raw_nofaults_data = no_faults_dataframe.values
no_faults_dataframe.head()

from google.colab import files
setdate = files.upload()

# reading the input states and parameters from .csv files
faults_dataframe = pd.read_csv("faults_AE.csv")
raw_faults_data = faults_dataframe.values
faults_dataframe.head()

"""# Visualize data

1 - normal; 0 - fault
"""

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.figure(1)
plt.plot(faults_dataframe["label"],'r',label='label historical data')
plt.plot(faults_dataframe["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58365)
plt.xlabel('time')
plt.ylabel('DO')

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.figure(1)
plt.plot(no_faults_dataframe["label"],'red',label='label historical data')
plt.plot(no_faults_dataframe["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58365)
plt.xlabel('time')
plt.ylabel('DO')

"""# Normalize the data to [0,1]."""

# The last element contains the labels
labels_train = raw_nofaults_data[:,-1]

# The other data points are the electrocadriogram data
data_train = raw_nofaults_data[:,1:-1]

# The last element contains the labels
labels_test = raw_faults_data[:,-1]

# The other data points are the electrocadriogram data
data_test = raw_faults_data[:,1:-1]

min_val = tf.reduce_min(data_train)
max_val = tf.reduce_max(data_train)

data_train = (data_train - min_val) / (max_val - min_val)
data_test = (data_test - min_val) / (max_val - min_val)

data_train = tf.cast(data_train, tf.float32)
data_test = tf.cast(data_test, tf.float32)

"""Train the AE using only the normal data, which are labeled in this dataset as 1. Separate the normal data from the abnormal data."""

plt.grid()
plt.plot(data_train)
plt.title("A Normal DO sensor data")
plt.xlim(0, 58365)
plt.show()

plt.grid()
plt.plot(data_test)
plt.title("An Anomalous DO sensor data")
plt.xlim(0, 58365)
plt.show()

"""# Load the model"""

from google.colab import drive
drive.mount('/content/drive')

from keras.models import load_model
encoder = load_model('/content/drive/MyDrive/Autoencoder/models/encoder_model_mlp-ae.h5')
decoder = load_model('/content/drive/MyDrive/Autoencoder/models/decoder_model_mlp-ae.h5')

"""# III. Detecting anomalies"""

encoded_data = encoder(data_test).numpy()
decoded_data = decoder(encoded_data).numpy()
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)
plt.plot(data_test, 'r')
plt.plot(decoded_data, 'b')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

autoencoder = tf.keras.Sequential([encoder, decoder])

reconstructions = autoencoder.predict(data_train)
train_loss = tf.keras.losses.mae(reconstructions, data_train)
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

reconstructions = autoencoder.predict(data_test)
test_loss = tf.keras.losses.mae(reconstructions, data_test)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

# Get test MAE loss.
x_test_pred = autoencoder.predict(data_test)
test_mae_loss = np.mean(np.abs(x_test_pred - data_test), axis=1)
test_mae_loss = test_mae_loss.reshape((-1))

plt.hist(test_mae_loss, bins=50)
plt.xlabel("test MAE loss")
plt.ylabel("No of samples")
plt.show()

def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))

preds = predict(autoencoder, data_test, threshold)
print_stats(preds, labels_test)

from sklearn.metrics import classification_report
target_names = ['0', '1']
#target_names = ['Anomalous', 'Normal']
print(classification_report(labels_test, preds, target_names=target_names, digits=4))

from sklearn.metrics import confusion_matrix
matrix_ae = confusion_matrix(labels_test, preds)

import seaborn as sns
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

# Assuming you have the 'matrix_ae' confusion matrix defined

plt.figure(figsize=(10, 8))
sns.set(font_scale=1.5)

# Define class labels for x and y axes
class_labels = ['Anomaly', 'Normal']

# Create a heatmap for the confusion matrix without the color bar
sns.heatmap(matrix_ae, annot=True, cmap='Blues', fmt=".1f", xticklabels=class_labels, yticklabels=class_labels,
            annot_kws={"color": "black", "fontsize": 14}, cbar=False)

# Set axis labels and title
plt.xlabel('Predicted Label', fontsize=14)
plt.ylabel('True Label', fontsize=14)
plt.title('Autoencoder Confusion Matrix', fontsize=16)

plt.show()

anomalies = (test_loss > threshold)
print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))

import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, roc_curve

def plot_roc_curve(true_y, y_prob):
    """
    plots the roc curve based of the probabilities
    """

    fpr, tpr, thresholds = roc_curve(true_y, y_prob)
    plt.plot(fpr, tpr)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')

plot_roc_curve(labels_test, preds)
print(f'model CNN AUC score: {roc_auc_score(labels_test, preds)}')