import numpy as np import pandas as pd from tensorflow import keras from tensorflow.keras import layers from matplotlib import pyplot as plt """# I. Import .CSV data""" from google.colab import files setdate = files.upload() # reading the input states and parameters from .csv files import pandas as pd dataset = pd.read_csv("nofaults_AE.csv") from google.colab import files setdate1 = files.upload() # reading the input states and parameters from .csv files import pandas as pd dataset_faults = pd.read_csv("faults_AE.csv") """# Quick look at the data""" new_dataset_nofaults = dataset.iloc[:,1:2] labels_nofaults=dataset.iloc[:,2] labels_nofaults new_dataset_nofaults.head() new_dataset_faults = dataset_faults.iloc[:,1:2] labels_faults=dataset_faults.iloc[:,2] labels_faults new_dataset_faults.head() """# Visualize the data 1. Normal Data """ # exploring DO2 sensor output import matplotlib.pyplot as plt from matplotlib.pyplot import figure figure(figsize=(8, 6), dpi=80) plt.figure(1) plt.plot(new_dataset_nofaults["DO_sensor"],label='DO2 historical data') plt.xlim(0, 58466) plt.xlabel('time') plt.ylabel('DO') """2. Data with anomalies""" # exploring DO2 sensor output import matplotlib.pyplot as plt from matplotlib.pyplot import figure figure(figsize=(8, 6), dpi=100) plt.figure(1) plt.plot(new_dataset_faults["DO_sensor"],label='DO2 historical data') plt.xlim(0, 58466) plt.xlabel('time') plt.ylabel('DO') """# II. Import the Conv-AE model""" from google.colab import drive drive.mount('/content/drive') from tensorflow.keras.models import load_model model_path = '/content/drive/MyDrive/Autoencoder/models/model_conv_ae_1.h5' loaded_model = load_model(model_path) loaded_model.summary() """# III. Detecting anomalies""" # Normalize and save the mean and std we get, # for normalizing test data. training_mean = labels_faults.mean() training_std = labels_faults.std() df_training_value = (new_dataset_nofaults - training_mean) / training_std print("Number of training samples:", len(df_training_value)) """# Create sequences Get data values from the training timeseries data file and normalize the value data. We have a value for every 15 mins for 609 days. BSM2: Data is evaluated at each 15 minutes interval starting from 245th day 24*60/15 = 96 timesteps per day 96 * 609 days = 58464 data points in total """ TIME_STEPS = 96 # Generated training sequences for use in the model. def create_sequences(values, labels, time_steps=TIME_STEPS): x_output = [] y_output=[] for i in range(len(values) - time_steps + 1): x_output.append(values[i : (i + time_steps)]) y_output.append(labels[i + time_steps-1]) return np.stack(x_output), np.stack(y_output) x_train, y_train = create_sequences(df_training_value.values, labels_nofaults) print("Training input shape: ", x_train.shape) print("Training input shape: ", y_train.shape) """We will detect anomalies by determining how well our model can reconstruct the input data. Find MAE loss on training samples. Find max MAE loss value. This is the worst our model has performed trying to reconstruct a sample. We will make this the threshold for anomaly detection. If the reconstruction loss for a sample is greater than this threshold value then we can infer that the model is seeing a pattern that it isn't familiar with. We will label this sample as an anomaly. """ # Get train MAE loss. x_train_pred = loaded_model.predict(x_train) train_mae_loss = np.mean(np.abs(x_train_pred - x_train), axis=1) plt.hist(train_mae_loss, bins=50) plt.xlabel("Train MAE loss") plt.ylabel("No of samples") plt.show() # Get reconstruction loss threshold. threshold = np.max(train_mae_loss) print("Reconstruction error threshold: ", threshold) """**1. Compare reconstruction:** check how our model has recontructed the first sample.""" # Checking how the first sequence is learnt plt.plot(x_train[0]) plt.plot(x_train_pred[0]) plt.show() """# Prepare test data""" df_test_value = (new_dataset_faults - training_mean) / training_std fig, ax = plt.subplots() df_test_value.plot(legend=False, ax=ax) plt.show() # Create sequences from test values. x_test,y_test = create_sequences(df_test_value.values,labels_faults) print("Test input shape: ", x_test.shape) print("Test input shape: ", y_test.shape) # Get test MAE loss. x_test_pred = loaded_model.predict(x_test) test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1) test_mae_loss = test_mae_loss.reshape((-1)) plt.hist(test_mae_loss, bins=50) plt.xlabel("test MAE loss") plt.ylabel("No of samples") plt.show() # Detect all the samples which are anomalies. anomalies = test_mae_loss > threshold print("Number of anomaly samples: ", np.sum(anomalies)) print("Indices of anomaly samples: ", np.where(anomalies)) """# Plot anomalies""" # data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies anomalous_data_indices = [] for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1): if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]): anomalous_data_indices.append(data_idx) """overlay the anomalies on the original test data plot.""" # Subset the DataFrame to select the anomalous data df_subset = new_dataset_faults.iloc[anomalous_data_indices] # Plot the data with anomalies fig, ax = plt.subplots() new_dataset_faults.plot(legend=False, ax=ax) df_subset.plot(legend=False, ax=ax, style='x') # Anomalies highlighted in red plt.xlabel('Samples', fontsize=12) # Adjust the font size for y-axis label plt.ylabel('DO [mg/L]', fontsize=12) # Adjust the font size for x-axis label plt.xlim(0,58466) plt.show() from sklearn.metrics import classification_report # Define a function to classify samples as normal or anomalous based on the threshold def classify_samples(mae_loss, threshold): return (mae_loss <= threshold).astype(int) # Classify test samples as normal (0) or anomalous (1) based on the threshold predicted_labels = classify_samples(test_mae_loss, threshold) # Convert the labels_faults to binary (0: normal, 1: anomalous) based on the threshold true_labels = classify_samples(test_mae_loss, threshold) # Print the classification report target_names = ['Anomalous', 'Normal'] print(classification_report(y_test, predicted_labels, target_names=target_names, digits=4)) from sklearn.metrics import confusion_matrix matrix_ae = confusion_matrix(y_test, predicted_labels) import seaborn as sns from sklearn.metrics import confusion_matrix import matplotlib.pyplot as plt # Assuming you have the 'matrix_ae' confusion matrix defined plt.figure(figsize=(10, 8)) sns.set(font_scale=1.5) # Define class labels for x and y axes class_labels = ['Anomaly', 'Normal'] # Create a heatmap for the confusion matrix without the color bar sns.heatmap(matrix_ae, annot=True, cmap='Blues', fmt=".1f", xticklabels=class_labels, yticklabels=class_labels, annot_kws={"color": "black", "fontsize": 14}, cbar=False) # Set axis labels and title plt.xlabel('Predicted Label', fontsize=14) plt.ylabel('True Label', fontsize=14) plt.title('Autoencoder Confusion Matrix', fontsize=16) plt.show() import matplotlib.pyplot as plt import numpy as np from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, roc_curve def plot_roc_curve(true_y, y_prob): """ plots the roc curve based of the probabilities """ fpr, tpr, thresholds = roc_curve(true_y, y_prob) plt.plot(fpr, tpr) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plot_roc_curve(y_test, predicted_labels) print(f'model CNN AUC score: {roc_auc_score(y_test, predicted_labels)}') import matplotlib.pyplot as plt # Convert true_labels to a binary signal binary_signal_true = y_test.copy() binary_signal_true[binary_signal_true == 1] = 1 # Set anomalous samples to 1 binary_signal_true[binary_signal_true == 0] = 0 # Set normal samples to 0 # Plot the binary signal plt.figure(figsize=(12, 6)) plt.step(range(len(binary_signal_true)), binary_signal_true, color='blue', label='Anomaly Signal') plt.xlabel('Time Step') plt.ylabel('Anomaly Status (0: Normal, 1: Anomaly)') plt.title('Anomaly Detection Signal') plt.legend() plt.show() # Detect all the samples which are anomalies. anomalies = test_mae_loss > threshold print("Number of anomaly samples: ", np.sum(anomalies)) print("Indices of anomaly samples: ", np.where(anomalies)) import numpy as np import time # 7. Detect anomalies and measure detection time. start_time_total = time.time() end_time_total = time.time() total_time_seconds = end_time_total - start_time_total total_time_minutes = total_time_seconds / 60.0 print(f"Total time taken for anomaly detection: {total_time_seconds:.6f} minutes.") # 8. Additional code for anomaly detection after loop print("Number of anomaly samples: ", np.sum(anomalies)) print("Indices of anomaly samples: ", np.where(anomalies)) import numpy as np import matplotlib.pyplot as plt # Assuming you have the 'reconstruction_errors' array and 'threshold' defined # Create a binary signal based on the reconstruction_errors and threshold binary_signal = np.where(test_mae_loss > threshold, 0, 1) # Set up a professional plotting style plt.style.use('seaborn-whitegrid') # Plot the binary signal plt.figure(figsize=(12, 7)) # Increase the figure width for better visibility # Plot the binary signal with markers and increased marker size plt.plot(binary_signal, linestyle='-', color='tab:orange', label='Autoencoder') plt.plot(range(len(binary_signal_true)), binary_signal_true, color='blue', label='Dataset') # Set the threshold line and label plt.axhline(y=threshold, color='tab:red', linestyle='--', label='Threshold') plt.xlabel('Time', fontsize=12) # Adjust the font size for x-axis label plt.ylabel('AE Detection Alarm', fontsize=12) # Adjust the font size for y-axis label plt.title('Autoencoder Anomaly Detection', fontsize=16, fontweight='bold') # Adjust the font size and add bold title plt.legend(loc='upper left', fontsize=10, frameon=True, fancybox=True, edgecolor='black') # Move the legend outside the plot area # Add descriptive annotations plt.text(26500, 0.5, '1 (Normal)', fontsize=10, color='tab:blue', fontweight='bold') plt.text(51500, 0.5, '0 (Anomaly)', fontsize=10, color='tab:orange', fontweight='bold') plt.grid(True) # Add grid lines # Adjust the x-axis and y-axis limits to zoom out plt.xlim(25000, 53000) # Display all data points on the x-axis plt.ylim(-0.1, 1.1) # Set y-axis limits from -0.1 to 1.1 # Adjust x-axis and y-axis tick positions and labels for better readability x_tick_positions = [20000, 25000, 30000, 35000, 40000, 45000, 50000, 53000] x_tick_labels = ['20K', '25K', '30K', '35K', '40K', '45K', '50K', '53K'] plt.xticks(x_tick_positions, x_tick_labels, fontsize=10) plt.yticks([0, 1], fontsize=10) plt.tight_layout() # Improves spacing of plot elements plt.show() import numpy as np # Assuming you have the 'binary_signal' array switch_points = np.where((binary_signal[:-1] == 1) & (binary_signal[1:] == 0))[0] + 1 print("Indices of switches from 1 to 0:", switch_points) import numpy as np # Assuming you have the 'binary_signal' array switch_points_TRUE = np.where((binary_signal_true[:-1] == 1) & (binary_signal_true[1:] == 0))[0] + 1 print("Indices of switches from 1 to 0:", switch_points_TRUE) len(switch_points_TRUE), len(switch_points) import pandas as pd data={'Dataset switching points':switch_points_TRUE} df=pd.DataFrame(data) df.head(5) df.to_csv('Dataset.csv',index=False) import pandas as pd data={'Dataset switching points':switch_points} df=pd.DataFrame(data) df.head(5) df.to_csv('AE.csv',index=False) # Compare Dataset.csv with AE.csv and modify delay value time_interval_minutes = 24 * 60 / 100 # Number of measurements taken so far (including the 16th measurement) num_measurements = 10 #delay # Calculate the time passed after the 16th measurement in hours time_passed_hours = (time_interval_minutes * num_measurements) / 60 print("Time passed after the 16th measurement:", time_passed_hours, "hours")