Unsupervised-ML / Unsupervised MLs / mlp_ae_anomalies_detection.py
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model

"""# I. Import .CSV data"""

from google.colab import files
setdate = files.upload()

# reading the input states and parameters from .csv files

no_faults_dataframe = pd.read_csv("nofaults_AE.csv")
raw_nofaults_data = no_faults_dataframe.values

from google.colab import files
setdate = files.upload()

# reading the input states and parameters from .csv files
faults_dataframe = pd.read_csv("faults_AE.csv")
raw_faults_data = faults_dataframe.values

"""# Visualize data

1 - normal; 0 - fault

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.plot(faults_dataframe["label"],'r',label='label historical data')
plt.plot(faults_dataframe["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58365)

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.plot(no_faults_dataframe["label"],'red',label='label historical data')
plt.plot(no_faults_dataframe["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58365)

"""# Normalize the data to [0,1]."""

# The last element contains the labels
labels_train = raw_nofaults_data[:,-1]

# The other data points are the electrocadriogram data
data_train = raw_nofaults_data[:,1:-1]

# The last element contains the labels
labels_test = raw_faults_data[:,-1]

# The other data points are the electrocadriogram data
data_test = raw_faults_data[:,1:-1]

min_val = tf.reduce_min(data_train)
max_val = tf.reduce_max(data_train)

data_train = (data_train - min_val) / (max_val - min_val)
data_test = (data_test - min_val) / (max_val - min_val)

data_train = tf.cast(data_train, tf.float32)
data_test = tf.cast(data_test, tf.float32)

"""Train the AE using only the normal data, which are labeled in this dataset as 1. Separate the normal data from the abnormal data."""

plt.title("A Normal DO sensor data")
plt.xlim(0, 58365)

plt.title("An Anomalous DO sensor data")
plt.xlim(0, 58365)

"""# II. Create the model

# Build the model

class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])

    self.decoder = tf.keras.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Dense(1, activation="sigmoid")])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

autoencoder.compile(optimizer='adam', loss='mae')

history = autoencoder.fit(data_train, data_train,
          validation_data=(data_test, data_test),

plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")

"""# Save the model"""

from google.colab import drive

import tensorflow as tf
from keras.utils import plot_model
from IPython.display import display

plot_model(autoencoder, show_shapes=True)

# Plot and display the encoder architecture
from PIL import Image

plot_model(autoencoder.encoder, to_file='encoder_architecture.png', show_shapes=True, rankdir='TB')

# Plot and display the decoder architecture
plot_model(autoencoder.decoder, to_file='decoder_architecture.png', show_shapes=True, rankdir='TB')

# Save the encoder model

# Save the decoder model

import shutil

encoder_source_path = '/content/encoder_model.h5'
encoder_destination_path = '/content/drive/MyDrive/Autoencoder/models/encoder_model_mlp-ae.h5'

shutil.move(encoder_source_path, encoder_destination_path)

import shutil

decoder_source_path = '/content/decoder_model.h5'
decoder_destination_path = '/content/drive/MyDrive/Autoencoder/models/decoder_model_mlp-ae.h5'

shutil.move(decoder_source_path, decoder_destination_path)

"""# Load the model"""

from keras.models import load_model
encoder = load_model('/content/drive/MyDrive/Autoencoder/models/encoder_model_mlp-ae.h5')
decoder = load_model('/content/drive/MyDrive/Autoencoder/models/decoder_model_mlp-ae.h5')

"""# III. Detecting anomalies"""

encoded_data = encoder(data_test).numpy()
decoded_data = decoder(encoded_data).numpy()
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)
plt.plot(data_test, 'r')
plt.plot(decoded_data, 'b')
plt.legend(labels=["Input", "Reconstruction", "Error"])

autoencoder = tf.keras.Sequential([encoder, decoder])

reconstructions = autoencoder.predict(data_train)
train_loss = tf.keras.losses.mae(reconstructions, data_train)
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")

threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

reconstructions = autoencoder.predict(data_test)
test_loss = tf.keras.losses.mae(reconstructions, data_test)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")

# Get test MAE loss.
x_test_pred = autoencoder.predict(data_test)
test_mae_loss = np.mean(np.abs(x_test_pred - data_test), axis=1)
test_mae_loss = test_mae_loss.reshape((-1))

plt.hist(test_mae_loss, bins=50)
plt.xlabel("test MAE loss")
plt.ylabel("No of samples")

def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))

preds = predict(autoencoder, data_test, threshold)
print_stats(preds, labels_test)

from sklearn.metrics import classification_report
target_names = ['0', '1']
#target_names = ['Anomalous', 'Normal']
print(classification_report(labels_test, preds, target_names=target_names, digits=4))

from sklearn.metrics import confusion_matrix
matrix_ae = confusion_matrix(labels_test, preds)

import seaborn as sns
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

# Assuming you have the 'matrix_ae' confusion matrix defined

plt.figure(figsize=(10, 8))

# Define class labels for x and y axes
class_labels = ['Anomaly', 'Normal']

# Create a heatmap for the confusion matrix without the color bar
sns.heatmap(matrix_ae, annot=True, cmap='Blues', fmt=".1f", xticklabels=class_labels, yticklabels=class_labels,
            annot_kws={"color": "black", "fontsize": 14}, cbar=False)

# Set axis labels and title
plt.xlabel('Predicted Label', fontsize=14)
plt.ylabel('True Label', fontsize=14)
plt.title('Autoencoder Confusion Matrix', fontsize=16)


anomalies = (test_loss > threshold)
print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))

# Determine anomalies based on the threshold and condition
anomalies = (test_loss > threshold) | (test_loss ==0)

print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))