Unsupervised-ML / Unsupervised MLs / mlp_ae_anomalies_detection.py
mlp_ae_anomalies_detection.py
Raw
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model

"""# I. Import .CSV data"""

from google.colab import files
setdate = files.upload()

# reading the input states and parameters from .csv files

no_faults_dataframe = pd.read_csv("nofaults_AE.csv")
raw_nofaults_data = no_faults_dataframe.values
no_faults_dataframe.head()

from google.colab import files
setdate = files.upload()

# reading the input states and parameters from .csv files
faults_dataframe = pd.read_csv("faults_AE.csv")
raw_faults_data = faults_dataframe.values
faults_dataframe.head()

"""# Visualize data

1 - normal; 0 - fault
"""

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.figure(1)
plt.plot(faults_dataframe["label"],'r',label='label historical data')
plt.plot(faults_dataframe["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58365)
plt.xlabel('time')
plt.ylabel('DO')

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.figure(1)
plt.plot(no_faults_dataframe["label"],'red',label='label historical data')
plt.plot(no_faults_dataframe["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58365)
plt.xlabel('time')
plt.ylabel('DO')

"""# Normalize the data to [0,1]."""

# The last element contains the labels
labels_train = raw_nofaults_data[:,-1]

# The other data points are the electrocadriogram data
data_train = raw_nofaults_data[:,1:-1]

# The last element contains the labels
labels_test = raw_faults_data[:,-1]

# The other data points are the electrocadriogram data
data_test = raw_faults_data[:,1:-1]

min_val = tf.reduce_min(data_train)
max_val = tf.reduce_max(data_train)

data_train = (data_train - min_val) / (max_val - min_val)
data_test = (data_test - min_val) / (max_val - min_val)

data_train = tf.cast(data_train, tf.float32)
data_test = tf.cast(data_test, tf.float32)

"""Train the AE using only the normal data, which are labeled in this dataset as 1. Separate the normal data from the abnormal data."""

plt.grid()
plt.plot(data_train)
plt.title("A Normal DO sensor data")
plt.xlim(0, 58365)
plt.show()

plt.grid()
plt.plot(data_test)
plt.title("An Anomalous DO sensor data")
plt.xlim(0, 58365)
plt.show()

"""# II. Create the model

# Build the model
"""

class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])

    self.decoder = tf.keras.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Dense(1, activation="sigmoid")])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

autoencoder.compile(optimizer='adam', loss='mae')

history = autoencoder.fit(data_train, data_train,
          epochs=50,
          batch_size=512,
          validation_data=(data_test, data_test),
          shuffle=True)

plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()

"""# Save the model"""

from google.colab import drive
drive.mount('/content/drive')

import tensorflow as tf
from keras.utils import plot_model
from IPython.display import display

plot_model(autoencoder, show_shapes=True)

# Plot and display the encoder architecture
from PIL import Image

plot_model(autoencoder.encoder, to_file='encoder_architecture.png', show_shapes=True, rankdir='TB')

# Plot and display the decoder architecture
plot_model(autoencoder.decoder, to_file='decoder_architecture.png', show_shapes=True, rankdir='TB')

# Save the encoder model
autoencoder.encoder.save('encoder_model.h5')

# Save the decoder model
autoencoder.decoder.save('decoder_model.h5')

import shutil

encoder_source_path = '/content/encoder_model.h5'
encoder_destination_path = '/content/drive/MyDrive/Autoencoder/models/encoder_model_mlp-ae.h5'

shutil.move(encoder_source_path, encoder_destination_path)

import shutil

decoder_source_path = '/content/decoder_model.h5'
decoder_destination_path = '/content/drive/MyDrive/Autoencoder/models/decoder_model_mlp-ae.h5'

shutil.move(decoder_source_path, decoder_destination_path)

"""# Load the model"""

from keras.models import load_model
encoder = load_model('/content/drive/MyDrive/Autoencoder/models/encoder_model_mlp-ae.h5')
decoder = load_model('/content/drive/MyDrive/Autoencoder/models/decoder_model_mlp-ae.h5')

"""# III. Detecting anomalies"""

encoded_data = encoder(data_test).numpy()
decoded_data = decoder(encoded_data).numpy()
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)
plt.plot(data_test, 'r')
plt.plot(decoded_data, 'b')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

autoencoder = tf.keras.Sequential([encoder, decoder])

reconstructions = autoencoder.predict(data_train)
train_loss = tf.keras.losses.mae(reconstructions, data_train)
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

reconstructions = autoencoder.predict(data_test)
test_loss = tf.keras.losses.mae(reconstructions, data_test)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

# Get test MAE loss.
x_test_pred = autoencoder.predict(data_test)
test_mae_loss = np.mean(np.abs(x_test_pred - data_test), axis=1)
test_mae_loss = test_mae_loss.reshape((-1))

plt.hist(test_mae_loss, bins=50)
plt.xlabel("test MAE loss")
plt.ylabel("No of samples")
plt.show()

def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))

preds = predict(autoencoder, data_test, threshold)
print_stats(preds, labels_test)

from sklearn.metrics import classification_report
target_names = ['0', '1']
#target_names = ['Anomalous', 'Normal']
print(classification_report(labels_test, preds, target_names=target_names, digits=4))

from sklearn.metrics import confusion_matrix
matrix_ae = confusion_matrix(labels_test, preds)

import seaborn as sns
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

# Assuming you have the 'matrix_ae' confusion matrix defined

plt.figure(figsize=(10, 8))
sns.set(font_scale=1.5)

# Define class labels for x and y axes
class_labels = ['Anomaly', 'Normal']

# Create a heatmap for the confusion matrix without the color bar
sns.heatmap(matrix_ae, annot=True, cmap='Blues', fmt=".1f", xticklabels=class_labels, yticklabels=class_labels,
            annot_kws={"color": "black", "fontsize": 14}, cbar=False)

# Set axis labels and title
plt.xlabel('Predicted Label', fontsize=14)
plt.ylabel('True Label', fontsize=14)
plt.title('Autoencoder Confusion Matrix', fontsize=16)

plt.show()

anomalies = (test_loss > threshold)
print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))

# Determine anomalies based on the threshold and condition
anomalies = (test_loss > threshold) | (test_loss ==0)

print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))