import matplotlib.pyplot as plt import numpy as np import pandas as pd import tensorflow as tf from sklearn.metrics import accuracy_score, precision_score, recall_score from sklearn.model_selection import train_test_split from tensorflow.keras import layers, losses from tensorflow.keras.models import Model """# I. Import .CSV data""" from google.colab import files setdate = files.upload() # reading the input states and parameters from .csv files no_faults_dataframe = pd.read_csv("nofaults_AE.csv") raw_nofaults_data = no_faults_dataframe.values no_faults_dataframe.head() from google.colab import files setdate = files.upload() # reading the input states and parameters from .csv files faults_dataframe = pd.read_csv("faults_AE.csv") raw_faults_data = faults_dataframe.values faults_dataframe.head() """# Visualize data 1 - normal; 0 - fault """ # exploring DO2 sensor output import matplotlib.pyplot as plt from matplotlib.pyplot import figure figure(figsize=(8, 6), dpi=80) plt.figure(1) plt.plot(faults_dataframe["label"],'r',label='label historical data') plt.plot(faults_dataframe["DO_sensor"],label='DO2 historical data') plt.xlim(0, 58365) plt.xlabel('time') plt.ylabel('DO') # exploring DO2 sensor output import matplotlib.pyplot as plt from matplotlib.pyplot import figure figure(figsize=(8, 6), dpi=80) plt.figure(1) plt.plot(no_faults_dataframe["label"],'red',label='label historical data') plt.plot(no_faults_dataframe["DO_sensor"],label='DO2 historical data') plt.xlim(0, 58365) plt.xlabel('time') plt.ylabel('DO') """# Normalize the data to [0,1].""" # The last element contains the labels labels_train = raw_nofaults_data[:,-1] # The other data points are the electrocadriogram data data_train = raw_nofaults_data[:,1:-1] # The last element contains the labels labels_test = raw_faults_data[:,-1] # The other data points are the electrocadriogram data data_test = raw_faults_data[:,1:-1] min_val = tf.reduce_min(data_train) max_val = tf.reduce_max(data_train) data_train = (data_train - min_val) / (max_val - min_val) data_test = (data_test - min_val) / (max_val - min_val) data_train = tf.cast(data_train, tf.float32) data_test = tf.cast(data_test, tf.float32) """Train the AE using only the normal data, which are labeled in this dataset as 1. Separate the normal data from the abnormal data.""" plt.grid() plt.plot(data_train) plt.title("A Normal DO sensor data") plt.xlim(0, 58365) plt.show() plt.grid() plt.plot(data_test) plt.title("An Anomalous DO sensor data") plt.xlim(0, 58365) plt.show() """# II. Create the model # Build the model """ class AnomalyDetector(Model): def __init__(self): super(AnomalyDetector, self).__init__() self.encoder = tf.keras.Sequential([ layers.Dense(32, activation="relu"), layers.Dense(16, activation="relu"), layers.Dense(8, activation="relu")]) self.decoder = tf.keras.Sequential([ layers.Dense(16, activation="relu"), layers.Dense(32, activation="relu"), layers.Dense(1, activation="sigmoid")]) def call(self, x): encoded = self.encoder(x) decoded = self.decoder(encoded) return decoded autoencoder = AnomalyDetector() autoencoder.compile(optimizer='adam', loss='mae') history = autoencoder.fit(data_train, data_train, epochs=50, batch_size=512, validation_data=(data_test, data_test), shuffle=True) plt.plot(history.history["loss"], label="Training Loss") plt.plot(history.history["val_loss"], label="Validation Loss") plt.legend() """# Save the model""" from google.colab import drive drive.mount('/content/drive') import tensorflow as tf from keras.utils import plot_model from IPython.display import display plot_model(autoencoder, show_shapes=True) # Plot and display the encoder architecture from PIL import Image plot_model(autoencoder.encoder, to_file='encoder_architecture.png', show_shapes=True, rankdir='TB') # Plot and display the decoder architecture plot_model(autoencoder.decoder, to_file='decoder_architecture.png', show_shapes=True, rankdir='TB') # Save the encoder model autoencoder.encoder.save('encoder_model.h5') # Save the decoder model autoencoder.decoder.save('decoder_model.h5') import shutil encoder_source_path = '/content/encoder_model.h5' encoder_destination_path = '/content/drive/MyDrive/Autoencoder/models/encoder_model_mlp-ae.h5' shutil.move(encoder_source_path, encoder_destination_path) import shutil decoder_source_path = '/content/decoder_model.h5' decoder_destination_path = '/content/drive/MyDrive/Autoencoder/models/decoder_model_mlp-ae.h5' shutil.move(decoder_source_path, decoder_destination_path) """# Load the model""" from keras.models import load_model encoder = load_model('/content/drive/MyDrive/Autoencoder/models/encoder_model_mlp-ae.h5') decoder = load_model('/content/drive/MyDrive/Autoencoder/models/decoder_model_mlp-ae.h5') """# III. Detecting anomalies""" encoded_data = encoder(data_test).numpy() decoded_data = decoder(encoded_data).numpy() import matplotlib.pyplot as plt from matplotlib.pyplot import figure figure(figsize=(8, 6), dpi=80) plt.plot(data_test, 'r') plt.plot(decoded_data, 'b') plt.legend(labels=["Input", "Reconstruction", "Error"]) plt.show() autoencoder = tf.keras.Sequential([encoder, decoder]) reconstructions = autoencoder.predict(data_train) train_loss = tf.keras.losses.mae(reconstructions, data_train) import matplotlib.pyplot as plt from matplotlib.pyplot import figure figure(figsize=(8, 6), dpi=80) plt.hist(train_loss[None,:], bins=50) plt.xlabel("Train loss") plt.ylabel("No of examples") plt.show() threshold = np.mean(train_loss) + np.std(train_loss) print("Threshold: ", threshold) reconstructions = autoencoder.predict(data_test) test_loss = tf.keras.losses.mae(reconstructions, data_test) plt.hist(test_loss[None, :], bins=50) plt.xlabel("Test loss") plt.ylabel("No of examples") plt.show() # Get test MAE loss. x_test_pred = autoencoder.predict(data_test) test_mae_loss = np.mean(np.abs(x_test_pred - data_test), axis=1) test_mae_loss = test_mae_loss.reshape((-1)) plt.hist(test_mae_loss, bins=50) plt.xlabel("test MAE loss") plt.ylabel("No of samples") plt.show() def predict(model, data, threshold): reconstructions = model(data) loss = tf.keras.losses.mae(reconstructions, data) return tf.math.less(loss, threshold) def print_stats(predictions, labels): print("Accuracy = {}".format(accuracy_score(labels, predictions))) print("Precision = {}".format(precision_score(labels, predictions))) print("Recall = {}".format(recall_score(labels, predictions))) preds = predict(autoencoder, data_test, threshold) print_stats(preds, labels_test) from sklearn.metrics import classification_report target_names = ['0', '1'] #target_names = ['Anomalous', 'Normal'] print(classification_report(labels_test, preds, target_names=target_names, digits=4)) from sklearn.metrics import confusion_matrix matrix_ae = confusion_matrix(labels_test, preds) import seaborn as sns from sklearn.metrics import confusion_matrix import matplotlib.pyplot as plt # Assuming you have the 'matrix_ae' confusion matrix defined plt.figure(figsize=(10, 8)) sns.set(font_scale=1.5) # Define class labels for x and y axes class_labels = ['Anomaly', 'Normal'] # Create a heatmap for the confusion matrix without the color bar sns.heatmap(matrix_ae, annot=True, cmap='Blues', fmt=".1f", xticklabels=class_labels, yticklabels=class_labels, annot_kws={"color": "black", "fontsize": 14}, cbar=False) # Set axis labels and title plt.xlabel('Predicted Label', fontsize=14) plt.ylabel('True Label', fontsize=14) plt.title('Autoencoder Confusion Matrix', fontsize=16) plt.show() anomalies = (test_loss > threshold) print("Number of anomaly samples: ", np.sum(anomalies)) print("Indices of anomaly samples: ", np.where(anomalies)) # Determine anomalies based on the threshold and condition anomalies = (test_loss > threshold) | (test_loss ==0) print("Number of anomaly samples: ", np.sum(anomalies)) print("Indices of anomaly samples: ", np.where(anomalies))