ML-REFRANCE-CODES / WISDM DATSET CODES / COMPARISION BETTWEN THE 5 LSTM ALGORITHMS (ORIGINAL CODES).py
COMPARISION BETTWEN THE 5 LSTM ALGORITHMS (ORIGINAL CODES).py
Raw
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, roc_curve, auc, precision_recall_curve, classification_report
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, Attention, LayerNormalization, Concatenate, GlobalAveragePooling1D, multiply, Reshape, Layer
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
import tarfile
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger()

# Suppress TensorFlow GPU warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Enable memory growth for the GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        logger.error(f"GPU error: {e}")

logger.info("Starting data preprocessing...")

# Path to the dataset file
dataset_path = r'C:\Users\LENOVO LEGION\Desktop\ml codes\ML CODES WISDIM\WISDM_ar_latest.tar.gz'
extract_path = r'C:\Users\LENOVO LEGION\Desktop\ml codes\ML CODES WISDIM\WISDM_ar_latest'

# Extract the dataset
if not os.path.exists(extract_path):
    logger.info("Extracting dataset...")
    with tarfile.open(dataset_path, 'r:gz') as tar:
        tar.extractall(path=extract_path)

# Define the path to the main dataset file
data_file = os.path.join(extract_path, 'WISDM_ar_v1.1', 'WISDM_ar_v1.1_raw.txt')

# Load the dataset, skipping bad lines
logger.info("Loading dataset...")
column_names = ['user', 'activity', 'timestamp', 'x', 'y', 'z']
wisdm_data = pd.read_csv(data_file, header=None, names=column_names, on_bad_lines='skip')

logger.info(f"Initial dataset shape: {wisdm_data.shape}")

# Convert all values to strings
wisdm_data['x'] = wisdm_data['x'].astype(str)
wisdm_data['y'] = wisdm_data['y'].astype(str)
wisdm_data['z'] = wisdm_data['z'].astype(str)

# Remove non-numeric characters from 'x', 'y', 'z' columns
wisdm_data['x'] = wisdm_data['x'].str.replace(';', '', regex=False)
wisdm_data['y'] = wisdm_data['y'].str.replace(';', '', regex=False)
wisdm_data['z'] = wisdm_data['z'].str.replace(';', '', regex=False)

# Remove rows with non-numeric values
wisdm_data = wisdm_data[wisdm_data['x'].apply(lambda x: x.replace('.', '', 1).isdigit())]
wisdm_data = wisdm_data[wisdm_data['y'].apply(lambda y: y.replace('.', '', 1).isdigit())]
wisdm_data = wisdm_data[wisdm_data['z'].apply(lambda z: z.replace('.', '', 1).isdigit())]

# Convert columns back to numeric
wisdm_data['x'] = pd.to_numeric(wisdm_data['x'])
wisdm_data['y'] = pd.to_numeric(wisdm_data['y'])
wisdm_data['z'] = pd.to_numeric(wisdm_data['z'])

# Handle missing values
wisdm_data = wisdm_data.dropna()

logger.info(f"Dataset shape after cleaning: {wisdm_data.shape}")

# Feature Engineering
logger.info("Performing feature engineering...")

# Calculate magnitude
wisdm_data['magnitude'] = np.sqrt(wisdm_data['x']**2 + wisdm_data['y']**2 + wisdm_data['z']**2)

# Calculate jerk (derivative of acceleration)
# Use np.diff to calculate differences and handle potential division by zero
for axis in ['x', 'y', 'z']:
    diff = np.diff(wisdm_data[axis])
    time_diff = np.diff(wisdm_data['timestamp'])
    jerk = np.zeros(len(wisdm_data))
    jerk[1:] = np.where(time_diff != 0, diff / time_diff, 0)
    wisdm_data[f'{axis}_jerk'] = jerk

# Calculate rolling mean and standard deviation
window_size = 20  # Adjust as needed
for axis in ['x', 'y', 'z']:
    wisdm_data[f'{axis}_rolling_mean'] = wisdm_data.groupby('user')[axis].rolling(window=window_size).mean().reset_index(0, drop=True)
    wisdm_data[f'{axis}_rolling_std'] = wisdm_data.groupby('user')[axis].rolling(window=window_size).std().reset_index(0, drop=True)

# Handle NaN and infinite values
wisdm_data = wisdm_data.replace([np.inf, -np.inf], np.nan).fillna(method='ffill').fillna(method='bfill')

# Map activity labels to integers
activity_mapping = {label: idx for idx, label in enumerate(wisdm_data['activity'].unique())}
wisdm_data['activity'] = wisdm_data['activity'].map(activity_mapping)

# Reverse mapping for later use in confusion matrix
reverse_activity_mapping = {v: k for k, v in activity_mapping.items()}

# Check if the dataset is empty
if wisdm_data.empty:
    raise ValueError("Dataset is empty after preprocessing")

# Normalize the feature data
logger.info("Normalizing features...")
scaler = StandardScaler()
features = ['x', 'y', 'z', 'magnitude', 'x_jerk', 'y_jerk', 'z_jerk', 
            'x_rolling_mean', 'y_rolling_mean', 'z_rolling_mean', 
            'x_rolling_std', 'y_rolling_std', 'z_rolling_std']
wisdm_data[features] = scaler.fit_transform(wisdm_data[features])

# Additional check for any remaining infinite values
if np.isinf(wisdm_data[features]).any().any():
    logger.warning("Infinite values detected after normalization. Replacing with large finite values.")
    wisdm_data[features] = wisdm_data[features].replace([np.inf, -np.inf], np.finfo(np.float64).max)

# Reshape the data
sequence_length = 200  # You can adjust this based on your preference

def create_sequences(data, seq_length, step=1):
    sequences = []
    labels = []
    for start in range(0, len(data) - seq_length, step):
        sequences.append(data.iloc[start:start + seq_length][features].values)
        labels.append(data.iloc[start + seq_length - 1]['activity'])
    return np.array(sequences), np.array(labels)

logger.info("Creating sequences...")
X, y = create_sequences(wisdm_data, sequence_length)
logger.info(f"Shape of X after sequence creation: {X.shape}")
logger.info(f"Shape of y after sequence creation: {y.shape}")

# Final check for any NaN or infinite values
if np.isnan(X).any() or np.isinf(X).any():
    logger.error("NaN or infinite values detected in the final dataset. Please check the data preprocessing steps.")
    raise ValueError("Dataset contains NaN or infinite values after preprocessing.")

# Encode labels to categorical
y_categorical = to_categorical(y)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y_categorical, test_size=0.2, random_state=42)
logger.info(f"Training set shape: {X_train.shape}")
logger.info(f"Testing set shape: {X_test.shape}")

# Define model building functions
def build_simple_lstm(input_shape, num_classes):
    model = Sequential()
    model.add(LSTM(100, input_shape=input_shape, return_sequences=False))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def build_deep_lstm(input_shape, num_classes):
    model = Sequential()
    model.add(LSTM(100, input_shape=input_shape, return_sequences=True))
    model.add(Dropout(0.5))
    model.add(LSTM(100, return_sequences=False))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_attention(input_shape, num_classes):
    inputs = Input(shape=input_shape)
    x = LSTM(100, return_sequences=True)(inputs)
    x = Dropout(0.5)(x)
    x = LSTM(100, return_sequences=True)(x)
    x = Dropout(0.5)(x)

    attention = Attention()([x, x])
    attention = LayerNormalization()(attention)

    x = Concatenate()([x, attention])
    x = GlobalAveragePooling1D()(x)
    x = Dense(100, activation='relu')(x)
    x = Dropout(0.5)(x)
    
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def build_multi_head_lstm_attention(input_shape, num_classes):
    def multi_head_lstm(input_layer, num_heads, units):
        lstm_heads = []
        for _ in range(num_heads):
            lstm = LSTM(units, return_sequences=True)(input_layer)
            lstm_heads.append(lstm)
        return Concatenate()(lstm_heads)

    inputs = Input(shape=input_shape)
    x = multi_head_lstm(inputs, num_heads=3, units=50)
    x = Dropout(0.5)(x)

    attention = Attention()([x, x])
    attention = LayerNormalization()(attention)

    x = Concatenate()([x, attention])
    x = GlobalAveragePooling1D()(x)
    x = Dense(100, activation='relu')(x)
    x = Dropout(0.5)(x)
    
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

class SqueezeExciteBlock(Layer):
    def __init__(self, ratio=16, **kwargs):
        super(SqueezeExciteBlock, self).__init__(**kwargs)
        self.ratio = ratio

    def build(self, input_shape):
        self.global_avg_pool = GlobalAveragePooling1D()
        self.dense1 = Dense(input_shape[-1] // self.ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)
        self.dense2 = Dense(input_shape[-1], activation='sigmoid', kernel_initializer='he_normal', use_bias=False)
        super(SqueezeExciteBlock, self).build(input_shape)

    def call(self, inputs):
        x = self.global_avg_pool(inputs)
        x = Reshape((1, inputs.shape[-1]))(x)
        x = self.dense1(x)
        x = self.dense2(x)
        x = multiply([inputs, x])
        return x

    def get_config(self):
        config = super(SqueezeExciteBlock, self).get_config()
        config.update({"ratio": self.ratio})
        return config

def build_multi_head_lstm_se(input_shape, num_classes):
    def multi_head_lstm(input_layer, num_heads, units):
        lstm_heads = []
        for _ in range(num_heads):
            lstm = LSTM(units, return_sequences=True)(input_layer)
            lstm_heads.append(lstm)
        return Concatenate()(lstm_heads)

    inputs = Input(shape=input_shape)
    x = multi_head_lstm(inputs, num_heads=3, units=50)
    x = Dropout(0.5)(x)
    
    # SE block
    se = SqueezeExciteBlock()(x)
    
    # Combine LSTM output and SE output
    x = Concatenate()([x, se])
    x = GlobalAveragePooling1D()(x)
    x = Dense(100, activation='relu')(x)
    x = Dropout(0.5)(x)
    
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def train_and_evaluate(model, X_train, y_train, X_test, y_test, epochs=20, batch_size=64):
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001)
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=0.2, verbose=1, callbacks=[reduce_lr, early_stopping])
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    return model, history, accuracy

input_shape = (X_train.shape[1], X_train.shape[2])
num_classes = y_train.shape[1]

models = {
    'Simple LSTM': build_simple_lstm(input_shape, num_classes),
    'Deep LSTM': build_deep_lstm(input_shape, num_classes),
    'LSTM with Attention': build_lstm_attention(input_shape, num_classes),
    'Multi-head LSTM with Attention': build_multi_head_lstm_attention(input_shape, num_classes),
    'Multi-head LSTM with SE': build_multi_head_lstm_se(input_shape, num_classes)
}

histories = {}
accuracies = {}
f1_scores = {}
precisions = {}
recalls = {}
confusion_matrices = {}

for name, model in models.items():
    logger.info(f'Training {name}...')
    model, history, accuracy = train_and_evaluate(model, X_train, y_train, X_test, y_test)
    histories[name] = history
    accuracies[name] = accuracy
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    report = classification_report(y_true, y_pred_classes, output_dict=True)
    f1_scores[name] = report['weighted avg']['f1-score']
    precisions[name] = report['weighted avg']['precision']
    recalls[name] = report['weighted avg']['recall']
    confusion_matrices[name] = confusion_matrix(y_true, y_pred_classes)
    logger.info(f'{name} Test Accuracy: {accuracy*100:.2f}%')

logger.info("Starting visualization...")

# Plot confusion matrices
plt.figure(figsize=(20, 15))
for i, (name, cm) in enumerate(list(confusion_matrices.items())[:4], 1):
    plt.subplot(2, 2, i)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'{name} - Confusion Matrix')
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    # Set x and y tick labels to activity names
    tick_marks = np.arange(len(reverse_activity_mapping))
    plt.xticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=45, ha='right')
    plt.yticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=0)

plt.tight_layout()
plt.show()

# Plot the last confusion matrix separately
plt.figure(figsize=(10, 8))
name, cm = list(confusion_matrices.items())[-1]
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title(f'{name} - Confusion Matrix')
plt.ylabel('True label')
plt.xlabel('Predicted label')

# Set x and y tick labels to activity names
tick_marks = np.arange(len(reverse_activity_mapping))
plt.xticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=45, ha='right')
plt.yticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=0)

plt.tight_layout()
plt.show()

# Plot training & validation accuracy values
plt.figure(figsize=(15, 10))
for name, history in histories.items():
    plt.plot(history.history['accuracy'], label=f'{name} Training Accuracy')
    plt.plot(history.history['val_accuracy'], label=f'{name} Validation Accuracy', linestyle='dashed')
plt.title('Model Training and Validation Accuracy Comparison')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(loc='lower right')
plt.show()

# Plot training & validation loss values
plt.figure(figsize=(15, 10))
for name, history in histories.items():
    plt.plot(history.history['loss'], label=f'{name} Training Loss')
    plt.plot(history.history['val_loss'], label=f'{name} Validation Loss', linestyle='dashed')
plt.title('Model Training and Validation Loss Comparison')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper right')
plt.show()

# Comparison between validation accuracy and epoch plot
plt.figure(figsize=(15, 10))
for name, history in histories.items():
    plt.plot(history.history['val_accuracy'], label=name)
plt.title('Validation Accuracy Comparison Over Epochs')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(loc='lower right')
plt.show()

# Box plot for accuracies
plt.figure(figsize=(12, 8))
accuracy_data = [[acc] for acc in accuracies.values()]  # Convert each accuracy to a list
plt.boxplot(accuracy_data, labels=list(accuracies.keys()))
plt.title('Model Accuracy Comparison')
plt.ylabel('Accuracy')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Box plot for training accuracies
plt.figure(figsize=(12, 8))
training_accuracies = [history.history['accuracy'] for history in histories.values()]
plt.boxplot(training_accuracies, labels=list(models.keys()))
plt.title('Box Plot of Training Accuracies')
plt.ylabel('Accuracy')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# Recognition error over time
plt.figure(figsize=(15, 10))
for name, history in histories.items():
    recognition_error = 1 - np.array(history.history['accuracy'])
    plt.plot(recognition_error, label=f'{name} Recognition Error')
plt.title('Recognition Error Over Time')
plt.ylabel('Recognition Error')
plt.xlabel('Epoch')
plt.legend(loc='upper right')
plt.show()

# F1 Score, Precision, and Recall comparison
plt.figure(figsize=(15, 10))
bar_width = 0.2
index = np.arange(len(models))

plt.bar(index, list(f1_scores.values()), bar_width, label='F1 Score')
plt.bar(index + bar_width, list(precisions.values()), bar_width, label='Precision')
plt.bar(index + 2 * bar_width, list(recalls.values()), bar_width, label='Recall')

plt.xlabel('Model')
plt.ylabel('Score')
plt.title('F1 Score, Precision, and Recall Comparison')
plt.xticks(index + bar_width, list(models.keys()), rotation=45)
plt.legend()
plt.tight_layout()
plt.show()

# ROC and Precision-Recall curves
fpr = {}
tpr = {}
roc_auc = {}
precision_pr = {}
recall_pr = {}
pr_auc = {}

for name, model in models.items():
    y_pred = model.predict(X_test)
    for i in range(y_test.shape[1]):
        fpr[f'{name}_class_{i}'], tpr[f'{name}_class_{i}'], _ = roc_curve(y_test[:, i], y_pred[:, i])
        roc_auc[f'{name}_class_{i}'] = auc(fpr[f'{name}_class_{i}'], tpr[f'{name}_class_{i}'])
        precision_pr[f'{name}_class_{i}'], recall_pr[f'{name}_class_{i}'], _ = precision_recall_curve(y_test[:, i], y_pred[:, i])
        pr_auc[f'{name}_class_{i}'] = auc(recall_pr[f'{name}_class_{i}'], precision_pr[f'{name}_class_{i}'])

# Plot ROC curves
plt.figure(figsize=(20, 15))
for name in models.keys():
    for i in range(y_test.shape[1]):
        plt.plot(fpr[f'{name}_class_{i}'], tpr[f'{name}_class_{i}'], lw=2, 
                 label='{} (class {} ROC area = {:.2f})'.format(name, reverse_activity_mapping[i], roc_auc[f'{name}_class_{i}']))
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curves')
plt.legend(loc='lower right', fontsize='small')
plt.show()

# Plot Precision-Recall curves
plt.figure(figsize=(20, 15))
for name in models.keys():
    for i in range(y_test.shape[1]):
        plt.plot(recall_pr[f'{name}_class_{i}'], precision_pr[f'{name}_class_{i}'], lw=2, 
                 label='{} (class {} PR area = {:.2f})'.format(name, reverse_activity_mapping[i], pr_auc[f'{name}_class_{i}']))
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curves')
plt.legend(loc='lower left', fontsize='small')
plt.show()

# Correlation matrix
correlation_matrix = pd.DataFrame(X_train.reshape(X_train.shape[0], -1)).corr()

plt.figure(figsize=(12, 10))
sns.heatmap(correlation_matrix, cmap='coolwarm', annot=False)
plt.title('Correlation Matrix of Input Features')
plt.show()

# If the correlation matrix is too large, visualize only a subset
if correlation_matrix.shape[0] > 50:
    plt.figure(figsize=(12, 10))
    sns.heatmap(correlation_matrix.iloc[:50, :50], cmap='coolwarm', annot=False)
    plt.title('Correlation Matrix of First 50 Input Features')
    plt.show()

# Updated function for heatmap metrics comparison using seaborn
def plot_heatmap_metrics_comparison(accuracies, f1_scores, precisions, recalls):
    plt.figure(figsize=(12, 8))
    
    # Prepare data for heatmap
    data = pd.DataFrame({
        'Accuracy': list(accuracies.values()),
        'Precision': list(precisions.values()),
        'Recall': list(recalls.values()),
        'F1 Score': list(f1_scores.values())
    }, index=list(accuracies.keys()))
    
    # Create heatmap
    sns.heatmap(data, annot=True, cmap='YlOrRd', fmt='.4f')
    
    plt.title('Model Performance Metrics Heatmap')
    plt.tight_layout()
    plt.show()

# Call the updated heatmap function
plot_heatmap_metrics_comparison(accuracies, f1_scores, precisions, recalls)

# Print summary of best performing model
best_model = max(accuracies, key=accuracies.get)
logger.info(f"Best performing model: {best_model}")
logger.info(f"Best accuracy: {accuracies[best_model]:.4f}")
logger.info(f"Best F1 score: {f1_scores[best_model]:.4f}")
logger.info(f"Best precision: {precisions[best_model]:.4f}")
logger.info(f"Best recall: {recalls[best_model]:.4f}")

logger.info("Analysis completed.")