Unsupervised-ML / Scenario3 / Test-Conv-AE / test_dataset3_conv_ae_anomaly_detection.py
test_dataset3_conv_ae_anomaly_detection.py
Raw

import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
from matplotlib import pyplot as plt

"""# I. Import .CSV data"""

from google.colab import files
setdate = files.upload()

# reading the input states and parameters from .csv files
import pandas as pd
dataset = pd.read_csv("nofaults_AE.csv")

from google.colab import files
setdate1 = files.upload()

# reading the input states and parameters from .csv files
import pandas as pd
dataset_faults = pd.read_csv("faults_AE.csv")

"""# Quick look at the data"""

new_dataset_nofaults = dataset.iloc[:,1:2]
labels_nofaults=dataset.iloc[:,2]
labels_nofaults

new_dataset_nofaults.head()

new_dataset_faults = dataset_faults.iloc[:,1:2]
labels_faults=dataset_faults.iloc[:,2]
labels_faults

new_dataset_faults.head()

"""# Visualize the data

1. Normal Data
"""

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=80)

plt.figure(1)
plt.plot(new_dataset_nofaults["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58466)
plt.xlabel('time')
plt.ylabel('DO')

"""2. Data with anomalies"""

# exploring DO2 sensor output
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(8, 6), dpi=100)

plt.figure(1)
plt.plot(new_dataset_faults["DO_sensor"],label='DO2 historical data')
plt.xlim(0, 58466)
plt.xlabel('time')
plt.ylabel('DO')

"""# II. Import the Conv-AE model"""

from google.colab import drive
drive.mount('/content/drive')

from tensorflow.keras.models import load_model

model_path = '/content/drive/MyDrive/Autoencoder/models/model_conv_ae_1.h5'
loaded_model = load_model(model_path)

loaded_model.summary()

"""# III. Detecting anomalies"""

# Normalize and save the mean and std we get,
# for normalizing test data.
training_mean = labels_faults.mean()
training_std = labels_faults.std()
df_training_value = (new_dataset_nofaults - training_mean) / training_std
print("Number of training samples:", len(df_training_value))

"""# Create sequences

Get data values from the training timeseries data file and normalize the value data. We have a value for every 15 mins for 609 days.

BSM2: Data is evaluated at each 15 minutes interval starting from 245th day

24*60/15 = 96 timesteps per day

96 * 609 days = 58464 data points in total
"""

TIME_STEPS = 96

# Generated training sequences for use in the model.
def create_sequences(values, labels, time_steps=TIME_STEPS):
    x_output = []
    y_output=[]
    for i in range(len(values) - time_steps + 1):
        x_output.append(values[i : (i + time_steps)])
        y_output.append(labels[i + time_steps-1])
    return np.stack(x_output), np.stack(y_output)


x_train, y_train = create_sequences(df_training_value.values, labels_nofaults)
print("Training input shape: ", x_train.shape)
print("Training input shape: ", y_train.shape)

"""We will detect anomalies by determining how well our model can reconstruct the input data.

Find MAE loss on training samples.
Find max MAE loss value. This is the worst our model has performed trying to reconstruct a sample. We will make this the threshold for anomaly detection.
If the reconstruction loss for a sample is greater than this threshold value then we can infer that the model is seeing a pattern that it isn't familiar with. We will label this sample as an anomaly.
"""

# Get train MAE loss.
x_train_pred = loaded_model.predict(x_train)
train_mae_loss = np.mean(np.abs(x_train_pred - x_train), axis=1)

plt.hist(train_mae_loss, bins=50)
plt.xlabel("Train MAE loss")
plt.ylabel("No of samples")
plt.show()

# Get reconstruction loss threshold.
threshold = np.max(train_mae_loss)
print("Reconstruction error threshold: ", threshold)

"""**1. Compare reconstruction:** check how our model has recontructed the first sample."""

# Checking how the first sequence is learnt
plt.plot(x_train[0])
plt.plot(x_train_pred[0])
plt.show()

"""# Prepare test data"""

df_test_value = (new_dataset_faults - training_mean) / training_std
fig, ax = plt.subplots()
df_test_value.plot(legend=False, ax=ax)
plt.show()

# Create sequences from test values.

x_test,y_test = create_sequences(df_test_value.values,labels_faults)
print("Test input shape: ", x_test.shape)
print("Test input shape: ", y_test.shape)

# Get test MAE loss.
x_test_pred = loaded_model.predict(x_test)
test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1)
test_mae_loss = test_mae_loss.reshape((-1))

plt.hist(test_mae_loss, bins=50)
plt.xlabel("test MAE loss")
plt.ylabel("No of samples")
plt.show()

# Detect all the samples which are anomalies.

anomalies = test_mae_loss > threshold
print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))

"""# Plot anomalies"""

# data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies
anomalous_data_indices = []
for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1):
    if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
        anomalous_data_indices.append(data_idx)

"""overlay the anomalies on the original test data plot."""

# Subset the DataFrame to select the anomalous data
df_subset = new_dataset_faults.iloc[anomalous_data_indices]

# Plot the data with anomalies
fig, ax = plt.subplots()
new_dataset_faults.plot(legend=False, ax=ax)
df_subset.plot(legend=False, ax=ax, style='x')  # Anomalies highlighted in red



plt.xlabel('Samples', fontsize=12)  # Adjust the font size for y-axis label
plt.ylabel('DO [mg/L]', fontsize=12)  # Adjust the font size for x-axis label
plt.xlim(0,58466)
plt.show()

from sklearn.metrics import classification_report

# Define a function to classify samples as normal or anomalous based on the threshold
def classify_samples(mae_loss, threshold):
    return (mae_loss <= threshold).astype(int)

# Classify test samples as normal (0) or anomalous (1) based on the threshold
predicted_labels = classify_samples(test_mae_loss, threshold)

# Convert the labels_faults to binary (0: normal, 1: anomalous) based on the threshold
true_labels = classify_samples(test_mae_loss, threshold)

# Print the classification report
target_names = ['Anomalous', 'Normal']
print(classification_report(y_test, predicted_labels, target_names=target_names, digits=4))

from sklearn.metrics import confusion_matrix
matrix_ae = confusion_matrix(y_test, predicted_labels)

import seaborn as sns
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

# Assuming you have the 'matrix_ae' confusion matrix defined

plt.figure(figsize=(10, 8))
sns.set(font_scale=1.5)

# Define class labels for x and y axes
class_labels = ['Anomaly', 'Normal']

# Create a heatmap for the confusion matrix without the color bar
sns.heatmap(matrix_ae, annot=True, cmap='Blues', fmt=".1f", xticklabels=class_labels, yticklabels=class_labels,
            annot_kws={"color": "black", "fontsize": 14}, cbar=False)

# Set axis labels and title
plt.xlabel('Predicted Label', fontsize=14)
plt.ylabel('True Label', fontsize=14)
plt.title('Autoencoder Confusion Matrix', fontsize=16)

plt.show()

import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, roc_curve

def plot_roc_curve(true_y, y_prob):
    """
    plots the roc curve based of the probabilities
    """

    fpr, tpr, thresholds = roc_curve(true_y, y_prob)
    plt.plot(fpr, tpr)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')

plot_roc_curve(y_test, predicted_labels)
print(f'model CNN AUC score: {roc_auc_score(y_test, predicted_labels)}')

import matplotlib.pyplot as plt

# Convert true_labels to a binary signal
binary_signal_true = y_test.copy()
binary_signal_true[binary_signal_true == 1] = 1  # Set anomalous samples to 1
binary_signal_true[binary_signal_true == 0] = 0  # Set normal samples to 0

# Plot the binary signal
plt.figure(figsize=(12, 6))
plt.step(range(len(binary_signal_true)), binary_signal_true, color='blue', label='Anomaly Signal')
plt.xlabel('Time Step')
plt.ylabel('Anomaly Status (0: Normal, 1: Anomaly)')
plt.title('Anomaly Detection Signal')
plt.legend()
plt.show()

# Detect all the samples which are anomalies.
anomalies = test_mae_loss > threshold
print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))

import numpy as np
import time

# 7. Detect anomalies and measure detection time.
start_time_total = time.time()

end_time_total = time.time()
total_time_seconds = end_time_total - start_time_total
total_time_minutes = total_time_seconds / 60.0
print(f"Total time taken for anomaly detection: {total_time_seconds:.6f} minutes.")

# 8. Additional code for anomaly detection after loop

print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))

import numpy as np
import matplotlib.pyplot as plt

# Assuming you have the 'reconstruction_errors' array and 'threshold' defined

# Create a binary signal based on the reconstruction_errors and threshold
binary_signal = np.where(test_mae_loss > threshold, 0, 1)

# Set up a professional plotting style
plt.style.use('seaborn-whitegrid')

# Plot the binary signal
plt.figure(figsize=(12, 7))  # Increase the figure width for better visibility

# Plot the binary signal with markers and increased marker size
plt.plot(binary_signal, linestyle='-', color='tab:orange', label='Autoencoder')
plt.plot(range(len(binary_signal_true)), binary_signal_true, color='blue', label='Dataset')

# Set the threshold line and label
plt.axhline(y=threshold, color='tab:red', linestyle='--', label='Threshold')

plt.xlabel('Time', fontsize=12)  # Adjust the font size for x-axis label
plt.ylabel('AE Detection Alarm', fontsize=12)  # Adjust the font size for y-axis label
plt.title('Autoencoder Anomaly Detection', fontsize=16, fontweight='bold')  # Adjust the font size and add bold title
plt.legend(loc='upper left', fontsize=10, frameon=True, fancybox=True, edgecolor='black')  # Move the legend outside the plot area

# Add descriptive annotations
plt.text(26500, 0.5, '1 (Normal)', fontsize=10, color='tab:blue', fontweight='bold')
plt.text(51500, 0.5, '0 (Anomaly)', fontsize=10, color='tab:orange', fontweight='bold')

plt.grid(True)  # Add grid lines

# Adjust the x-axis and y-axis limits to zoom out
plt.xlim(25000, 53000)  # Display all data points on the x-axis
plt.ylim(-0.1, 1.1)  # Set y-axis limits from -0.1 to 1.1

# Adjust x-axis and y-axis tick positions and labels for better readability
x_tick_positions = [20000, 25000, 30000, 35000, 40000, 45000, 50000, 53000]
x_tick_labels = ['20K', '25K', '30K', '35K', '40K', '45K', '50K', '53K']
plt.xticks(x_tick_positions, x_tick_labels, fontsize=10)

plt.yticks([0, 1], fontsize=10)

plt.tight_layout()  # Improves spacing of plot elements
plt.show()

import numpy as np

# Assuming you have the 'binary_signal' array
switch_points = np.where((binary_signal[:-1] == 1) & (binary_signal[1:] == 0))[0] + 1

print("Indices of switches from 1 to 0:", switch_points)

import numpy as np

# Assuming you have the 'binary_signal' array
switch_points_TRUE = np.where((binary_signal_true[:-1] == 1) & (binary_signal_true[1:] == 0))[0] + 1

print("Indices of switches from 1 to 0:", switch_points_TRUE)

len(switch_points_TRUE), len(switch_points)

import pandas as pd
data={'Dataset switching points':switch_points_TRUE}
df=pd.DataFrame(data)
df.head(5)
df.to_csv('Dataset.csv',index=False)

import pandas as pd
data={'Dataset switching points':switch_points}
df=pd.DataFrame(data)
df.head(5)
df.to_csv('AE.csv',index=False)

# Compare Dataset.csv with AE.csv and modify delay value
time_interval_minutes = 24 * 60 / 100

# Number of measurements taken so far (including the 16th measurement)
num_measurements = 2#delay

# Calculate the time passed after the 16th measurement in hours
time_passed_hours = (time_interval_minutes * num_measurements) / 60

print("Time passed after the 16th measurement:", time_passed_hours, "hours")