ML-REFRANCE-CODES / PAMAP DATASET CODES / THE 5 ALGORITHMS COMPARISON.py
THE 5 ALGORITHMS COMPARISON.py
Raw
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, roc_curve, auc
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, Attention, Concatenate, TimeDistributed, GlobalAveragePooling1D, Reshape, Multiply
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import tensorflow as tf

# GPU Configuration
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU is available")
    except RuntimeError as e:
        print(e)
else:
    print("GPU is not available, using CPU")

# Paths to the datasets
PROTOCOL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Protocol"
OPTIONAL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Optional"

# Column names based on the readme file
COLUMN_NAMES = [
    'timestamp', 'activity_id', 'heart_rate',
    'hand_temperature', 'hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3',
    'hand_acc6_1', 'hand_acc6_2', 'hand_acc6_3',
    'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3',
    'hand_magno_1', 'hand_magno_2', 'hand_magno_3',
    'hand_ori_1', 'hand_ori_2', 'hand_ori_3', 'hand_ori_4',
    'chest_temperature', 'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3',
    'chest_acc6_1', 'chest_acc6_2', 'chest_acc6_3',
    'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3',
    'chest_magno_1', 'chest_magno_2', 'chest_magno_3',
    'chest_ori_1', 'chest_ori_2', 'chest_ori_3', 'chest_ori_4',
    'ankle_temperature', 'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3',
    'ankle_acc6_1', 'ankle_acc6_2', 'ankle_acc6_3',
    'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3',
    'ankle_magno_1', 'ankle_magno_2', 'ankle_magno_3',
    'ankle_ori_1', 'ankle_ori_2', 'ankle_ori_3', 'ankle_ori_4'
]

# Activity labels
ACTIVITY_LABELS = {
    1: 'lying',
    2: 'sitting',
    3: 'standing',
    4: 'walking',
    5: 'running',
    6: 'cycling',
    7: 'Nordic walking',
    9: 'watching TV',
    10: 'computer work',
    11: 'car driving',
    12: 'ascending stairs',
    13: 'descending stairs',
    16: 'vacuum cleaning',
    17: 'ironing',
    18: 'folding laundry',
    19: 'house cleaning',
    20: 'playing soccer',
    24: 'rope jumping'
}

def load_data(file_path):
    """Load data from a single file."""
    data = pd.read_csv(file_path, sep=' ', header=None, names=COLUMN_NAMES)
    return data

def load_all_data(base_path):
    """Load all data files from the given path."""
    all_data = []
    for file in os.listdir(base_path):
        if file.endswith(".dat"):
            file_path = os.path.join(base_path, file)
            data = load_data(file_path)
            all_data.append(data)
    return pd.concat(all_data, ignore_index=True)

def preprocess_data(data):
    """Preprocess the loaded data."""
    # Remove rows with activity_id 0 (transient activities)
    data = data[data['activity_id'] != 0]
    
    # Handle missing values (NaN)
    data = data.dropna()
    
    # Select relevant features (using 16g accelerometer data as recommended)
    features = ['hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3',
                'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3',
                'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3',
                'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3',
                'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3',
                'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3',
                'heart_rate']
    
    X = data[features]
    y = data['activity_id']
    
    # Normalize features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    return X_scaled, y

# Load and preprocess data
print("Loading protocol data...")
protocol_data = load_all_data(PROTOCOL_PATH)
print("Loading optional data...")
optional_data = load_all_data(OPTIONAL_PATH)

# Combine protocol and optional data
all_data = pd.concat([protocol_data, optional_data], ignore_index=True)

print("Preprocessing data...")
X, y = preprocess_data(all_data)

print("Data loading and preprocessing complete.")
print(f"Features shape: {X.shape}")
print(f"Labels shape: {y.shape}")

# Reshape the data for LSTM input (samples, time steps, features)
X = X.reshape((X.shape[0], 1, X.shape[1]))

# Convert labels to categorical
y = to_categorical(y)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Model definitions

def build_simple_lstm(input_shape, num_classes):
    model = Sequential([
        LSTM(100, input_shape=input_shape),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    return model

def build_deep_lstm(input_shape, num_classes):
    model = Sequential([
        LSTM(100, input_shape=input_shape, return_sequences=True),
        Dropout(0.3),
        LSTM(50),
        Dropout(0.3),
        Dense(num_classes, activation='softmax')
    ])
    return model

def build_lstm_attention(input_shape, num_classes):
    inputs = Input(shape=input_shape)
    lstm = LSTM(100, return_sequences=True)(inputs)
    lstm = Dropout(0.3)(lstm)
    attention = Attention()([lstm, lstm])
    concat = Concatenate()([lstm, attention])
    time_distributed = TimeDistributed(Dense(num_classes))(concat)
    pooled = GlobalAveragePooling1D()(time_distributed)
    outputs = Dense(num_classes, activation='softmax')(pooled)
    model = Model(inputs=inputs, outputs=outputs)
    return model

def build_multi_head_lstm_attention(input_shape, num_classes, num_heads=3):
    inputs = Input(shape=input_shape)
    lstm_heads = [LSTM(50, return_sequences=True)(inputs) for _ in range(num_heads)]
    lstm_heads = [Dropout(0.3)(head) for head in lstm_heads]
    concat_lstm = Concatenate()(lstm_heads)
    attention = Attention()([concat_lstm, concat_lstm])
    concat = Concatenate()([concat_lstm, attention])
    x = LSTM(50)(concat)
    x = Dropout(0.3)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model

def squeeze_excite_block(input_tensor, ratio=16):
    filters = input_tensor.shape[-1]
    se = GlobalAveragePooling1D()(input_tensor)
    se = Reshape((1, filters))(se)
    se = Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)
    se = Multiply()([input_tensor, se])
    return se

def build_multi_head_lstm_se(input_shape, num_classes, num_heads=3):
    inputs = Input(shape=input_shape)
    lstm_heads = [LSTM(50, return_sequences=True)(inputs) for _ in range(num_heads)]
    lstm_heads = [Dropout(0.3)(head) for head in lstm_heads]
    lstm_heads = [squeeze_excite_block(head) for head in lstm_heads]
    concat_lstm = Concatenate()(lstm_heads)
    time_distributed = TimeDistributed(Dense(num_classes))(concat_lstm)
    pooled = GlobalAveragePooling1D()(time_distributed)
    outputs = Dense(num_classes, activation='softmax')(pooled)
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Function to train and evaluate a model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, name):
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    
    history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=1)
    
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f'{name} - Test accuracy: {test_accuracy*100:.2f}%')
    print(f'{name} - Test loss: {test_loss:.4f}')
    
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    f1 = f1_score(y_true, y_pred_classes, average='weighted')
    precision = precision_score(y_true, y_pred_classes, average='weighted')
    recall = recall_score(y_true, y_pred_classes, average='weighted')
    
    print(f'{name} - F1 Score: {f1:.4f}')
    print(f'{name} - Precision: {precision:.4f}')
    print(f'{name} - Recall: {recall:.4f}')
    
    return history, test_accuracy, f1, precision, recall, y_pred_classes, y_pred

# Visualization functions

def plot_accuracy_comparison(results):
    plt.figure(figsize=(12, 8))
    for name, result in results.items():
        plt.plot(result['history'].history['accuracy'], label=f'{name} - Training')
        plt.plot(result['history'].history['val_accuracy'], label=f'{name} - Validation')
    plt.title('Model Accuracy Comparison')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend()
    plt.show()

def plot_loss_comparison(results):
    plt.figure(figsize=(12, 8))
    for name, result in results.items():
        plt.plot(result['history'].history['loss'], label=f'{name} - Training')
        plt.plot(result['history'].history['val_loss'], label=f'{name} - Validation')
    plt.title('Model Loss Comparison')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend()
    plt.show()

def plot_correlation_matrix(X):
    corr_matrix = np.corrcoef(X.reshape(X.shape[0], -1).T)
    plt.figure(figsize=(12, 10))
    plt.imshow(corr_matrix, cmap='coolwarm', aspect='auto')
    plt.colorbar()
    plt.title('Correlation Matrix of Features')
    plt.show()

def plot_recognition_error(results):
    plt.figure(figsize=(12, 8))
    for name, result in results.items():
        plt.plot(1 - np.array(result['history'].history['accuracy']), label=f'{name} - Training Error')
        plt.plot(1 - np.array(result['history'].history['val_accuracy']), label=f'{name} - Validation Error')
    plt.title('Recognition Error over Time')
    plt.xlabel('Epoch')
    plt.ylabel('Error Rate')
    plt.legend()
    plt.show()

def plot_f1_precision_recall(results):
    metrics = ['F1 Score', 'Precision', 'Recall']
    plt.figure(figsize=(12, 8))
    for i, name in enumerate(results.keys()):
        values = [results[name]['f1'], results[name]['precision'], results[name]['recall']]
        plt.bar([x + i*0.25 for x in range(len(metrics))], values, width=0.25, label=name)
    plt.title('F1 Score, Precision, and Recall')
    plt.xlabel('Metrics')
    plt.ylabel('Score')
    plt.xticks([x + 0.25 for x in range(len(metrics))], metrics)
    plt.legend()
    plt.show()

def plot_heatmap_metrics(results):
    metrics = ['Accuracy', 'Precision', 'Recall', 'F1 Score']
    model_names = list(results.keys())
    
    data = np.array([[result['accuracy'], result['precision'], result['recall'], result['f1']] 
                     for result in results.values()])
    
    fig, ax = plt.subplots(figsize=(12, 8))
    im = ax.imshow(data.T, cmap='RdYlBu_r')
    
    # Add colorbar
    cbar = ax.figure.colorbar(im, ax=ax)
    
    # Show all ticks and label them
    ax.set_xticks(np.arange(len(model_names)))
    ax.set_yticks(np.arange(len(metrics)))
    ax.set_xticklabels(model_names)
    ax.set_yticklabels(metrics)

    # Rotate the tick labels and set their alignment
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

    # Loop over data dimensions and create text annotations
    for i in range(len(metrics)):
        for j in range(len(model_names)):
            text = ax.text(j, i, f"{data.T[i, j]:.2f}",
                           ha="center", va="center", color="black")

    ax.set_title('Model Performance Metrics Heatmap')
    fig.tight_layout()
    plt.show()

def plot_box_plot(results):
    plt.figure(figsize=(12, 8))
    data = [result['history'].history['accuracy'] for result in results.values()]
    plt.boxplot(data, labels=results.keys())
    plt.title('Box Plot of Training Accuracies')
    plt.ylabel('Accuracy')
    plt.show()
    
def plot_confusion_matrices(results):
    n_models = len(results)
    
    # Plot first 4 confusion matrices (2 per image)
    for i in range(0, 4, 2):
        fig, axes = plt.subplots(1, 2, figsize=(20, 10))
        
        for j, ax in enumerate(axes):
            name = list(results.keys())[i + j]
            result = results[name]
            cm = confusion_matrix(np.argmax(y_test, axis=1), result['y_pred_classes'])
            im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
            fig.colorbar(im, ax=ax)
            
            # Get the unique activity IDs present in the test set
            unique_activities = np.unique(np.argmax(y_test, axis=1))
            
            # Create labels for the confusion matrix
            labels = [ACTIVITY_LABELS.get(act, f'Unknown ({act})') for act in unique_activities]
            
            ax.set(xticks=np.arange(cm.shape[1]),
                   yticks=np.arange(cm.shape[0]),
                   xticklabels=labels,
                   yticklabels=labels,
                   title=f'{name} - Confusion Matrix',
                   ylabel='True label',
                   xlabel='Predicted label')

            # Rotate the tick labels and set their alignment
            plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

            # Loop over data dimensions and create text annotations
            thresh = cm.max() / 2.
            for i_cm in range(cm.shape[0]):
                for j_cm in range(cm.shape[1]):
                    ax.text(j_cm, i_cm, format(cm[i_cm, j_cm], 'd'),
                            ha="center", va="center",
                            color="white" if cm[i_cm, j_cm] > thresh else "black")
        
        plt.tight_layout()
        plt.show()

    # Plot the last confusion matrix separately
    fig, ax = plt.subplots(figsize=(10, 10))
    name = list(results.keys())[-1]
    result = results[name]
    cm = confusion_matrix(np.argmax(y_test, axis=1), result['y_pred_classes'])
    im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    fig.colorbar(im, ax=ax)
    
    # Get the unique activity IDs present in the test set
    unique_activities = np.unique(np.argmax(y_test, axis=1))
    
    # Create labels for the confusion matrix
    labels = [ACTIVITY_LABELS.get(act, f'Unknown ({act})') for act in unique_activities]
    
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=labels,
           yticklabels=labels,
           title=f'{name} - Confusion Matrix',
           ylabel='True label',
           xlabel='Predicted label')

    # Rotate the tick labels and set their alignment
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

    # Loop over data dimensions and create text annotations
    thresh = cm.max() / 2.
    for i_cm in range(cm.shape[0]):
        for j_cm in range(cm.shape[1]):
            ax.text(j_cm, i_cm, format(cm[i_cm, j_cm], 'd'),
                    ha="center", va="center",
                    color="white" if cm[i_cm, j_cm] > thresh else "black")

    plt.tight_layout()
    plt.show()

def plot_accuracy_vs_epoch(results):
    plt.figure(figsize=(12, 8))
    for name, result in results.items():
        plt.scatter(range(1, len(result['history'].history['accuracy']) + 1), 
                    result['history'].history['accuracy'], label=f'{name} - Training')
        plt.scatter(range(1, len(result['history'].history['val_accuracy']) + 1), 
                    result['history'].history['val_accuracy'], label=f'{name} - Validation')
    plt.title('Accuracy vs Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

# Main execution
if __name__ == "__main__":
    # Create and compile models
    models = {
        'Simple LSTM': build_simple_lstm(X_train.shape[1:], y_train.shape[1]),
        'Deep LSTM': build_deep_lstm(X_train.shape[1:], y_train.shape[1]),
        'LSTM with Attention': build_lstm_attention(X_train.shape[1:], y_train.shape[1]),
        'Multi-head LSTM with Attention': build_multi_head_lstm_attention(X_train.shape[1:], y_train.shape[1]),
        'Multi-head LSTM with SE': build_multi_head_lstm_se(X_train.shape[1:], y_train.shape[1])
    }

    results = {}

    # Train and evaluate models
    for name, model in models.items():
        print(f"\nTraining and evaluating {name}...")
        history, accuracy, f1, precision, recall, y_pred_classes, y_pred = train_and_evaluate_model(model, X_train, y_train, X_test, y_test, name)
        results[name] = {
            'history': history,
            'accuracy': accuracy,
            'f1': f1,
            'precision': precision,
            'recall': recall,
            'y_pred_classes': y_pred_classes,
            'y_pred': y_pred
        }

    # Generate visualizations
    plot_accuracy_comparison(results)
    plot_loss_comparison(results)
    plot_correlation_matrix(X)
    plot_recognition_error(results)
    plot_f1_precision_recall(results)
    plot_heatmap_metrics(results)
    plot_box_plot(results)
    plot_confusion_matrices(results)
    plot_accuracy_vs_epoch(results)

    print("Analysis complete. All plots have been displayed.")