ML-REFRANCE-CODES / UCI DATASET CODES / MULTI-HEAD LSTM MODEL WITH SQUEEZE-AND-EXCITATION (SE) BLOCKS.py
MULTI-HEAD LSTM MODEL WITH SQUEEZE-AND-EXCITATION (SE) BLOCKS.py
Raw
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (LSTM, Dense, Dropout, Input, LayerNormalization, 
                                   Concatenate, GlobalAveragePooling1D, multiply, Layer, Reshape)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

# Suppress TensorFlow GPU warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Custom Squeeze-and-Excitation Layer
class SqueezeExciteBlock(Layer):
    def __init__(self, ratio=16, **kwargs):
        super(SqueezeExciteBlock, self).__init__(**kwargs)
        self.ratio = ratio

    def build(self, input_shape):
        self.global_avg_pool = GlobalAveragePooling1D()
        self.dense1 = Dense(input_shape[-1] // self.ratio, 
                           activation='relu', 
                           kernel_initializer='he_normal', 
                           use_bias=False)
        self.dense2 = Dense(input_shape[-1], 
                           activation='sigmoid', 
                           kernel_initializer='he_normal', 
                           use_bias=False)
        super(SqueezeExciteBlock, self).build(input_shape)

    def call(self, inputs):
        x = self.global_avg_pool(inputs)
        x = Reshape((1, inputs.shape[-1]))(x)
        x = self.dense1(x)
        x = self.dense2(x)
        x = multiply([inputs, x])
        return x

    def get_config(self):
        config = super(SqueezeExciteBlock, self).get_config()
        config.update({"ratio": self.ratio})
        return config

# Paths to the dataset files
dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset'

# Load and preprocess data
def load_data():
    # Load the feature labels
    features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'), 
                          delim_whitespace=True, 
                          header=None, 
                          names=['index', 'feature'])
    
    # Load the activity labels
    activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'), 
                                delim_whitespace=True, 
                                header=None, 
                                names=['index', 'activity'])
    
    # Load training data
    X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'), 
                         delim_whitespace=True, 
                         header=None)
    y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'), 
                         delim_whitespace=True, 
                         header=None, 
                         names=['activity'])
    
    # Load test data
    X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'), 
                        delim_whitespace=True, 
                        header=None)
    y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'), 
                        delim_whitespace=True, 
                        header=None, 
                        names=['activity'])
    
    # Label the columns
    X_train.columns = features['feature']
    X_test.columns = features['feature']
    
    # Normalize the feature data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Reshape the data for LSTM (samples, time steps, features)
    X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
    X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))
    
    # Encode activity labels
    encoder = LabelEncoder()
    y_train_encoded = encoder.fit_transform(y_train['activity'])
    y_test_encoded = encoder.transform(y_test['activity'])
    
    # Convert to categorical
    y_train_categorical = to_categorical(y_train_encoded)
    y_test_categorical = to_categorical(y_test_encoded)
    
    return (X_train_reshaped, y_train_categorical, 
            X_test_reshaped, y_test_categorical, 
            activity_labels, X_train_scaled)

# Visualization functions
def plot_training_history(history, title="Model Training History"):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy plot
    ax1.plot(history.history['accuracy'], label='Training Accuracy')
    ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
    ax1.set_title(f'{title} - Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    
    # Loss plot
    ax2.plot(history.history['loss'], label='Training Loss')
    ax2.plot(history.history['val_loss'], label='Validation Loss')
    ax2.set_title(f'{title} - Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    
    plt.tight_layout()
    plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    # Set x and y axis labels using activity names
    tick_positions = np.arange(len(activity_labels)) + 0.5
    plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right')
    plt.yticks(tick_positions, activity_labels['activity'], rotation=0)
    
    plt.tight_layout()
    plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_se_activations(model, X_test, title="SE Activations"):
    # Get SE layers
    se_layers = [layer for layer in model.layers if isinstance(layer, SqueezeExciteBlock)]
    
    # Create models to get SE weights
    se_models = [
        Model(inputs=model.input, outputs=layer.output)
        for layer in se_layers
    ]
    
    # Get activations for a sample
    sample_idx = 0
    se_weights = [
        model.predict(X_test[sample_idx:sample_idx+1])
        for model in se_models
    ]
    
    # Plot SE weights for each head
    num_heads = len(se_layers)
    fig, axes = plt.subplots(1, num_heads, figsize=(20, 5))
    
    for i in range(num_heads):
        im = axes[i].imshow(se_weights[i][0].T, aspect='auto', cmap='viridis')
        axes[i].set_title(f'Head {i+1} SE Weights')
        axes[i].set_xlabel('Time Step')
        axes[i].set_ylabel('Channel')
        plt.colorbar(im, ax=axes[i])
    
    plt.suptitle(f'{title} - SE Weight Distribution')
    plt.tight_layout()
    plt.savefig(f'se_activations_{title.lower().replace(" ", "_")}.png')
    plt.close()

# Load data
print("Loading and preprocessing data...")
X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data()
print("Data loading completed.")
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Number of activities: {len(activity_labels)}")
# Multi-head LSTM with SE Model definition
def multi_head_lstm_se(input_shape, num_classes, num_heads=3):
    # Input layer
    inputs = Input(shape=input_shape)
    
    # Create multiple LSTM heads with SE blocks
    lstm_outputs = []
    for _ in range(num_heads):
        # LSTM layer for each head
        lstm = LSTM(100, return_sequences=True)(inputs)
        lstm = Dropout(0.5)(lstm)
        
        # Apply SE block to each head
        se = SqueezeExciteBlock()(lstm)
        lstm_outputs.append(se)
    
    # Concatenate all heads
    x = Concatenate()(lstm_outputs)
    
    # Global pooling
    x = GlobalAveragePooling1D()(x)
    
    # Dense layers
    x = Dense(100, activation='relu')(x)
    x = Dropout(0.5)(x)
    
    # Output layer
    outputs = Dense(num_classes, activation='softmax')(x)
    
    # Create model
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Function to train and evaluate model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="Multi-head LSTM with SE"):
    # Compile model
    model.compile(optimizer=Adam(learning_rate=0.001),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])
    
    # Define callbacks
    reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                                 factor=0.2,
                                 patience=5,
                                 min_lr=0.00001)
    
    early_stopping = EarlyStopping(monitor='val_loss',
                                 patience=10,
                                 restore_best_weights=True)
    
    # Train model
    print(f"\nTraining {model_name}...")
    history = model.fit(X_train, y_train,
                       epochs=20,
                       batch_size=64,
                       validation_split=0.2,
                       verbose=1,
                       callbacks=[reduce_lr, early_stopping])
    
    # Evaluate model
    print(f"\nEvaluating {model_name}...")
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    
    # Make predictions
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    # Calculate metrics
    precision = precision_score(y_true, y_pred_classes, average='weighted')
    recall = recall_score(y_true, y_pred_classes, average='weighted')
    f1 = f1_score(y_true, y_pred_classes, average='weighted')
    
    # Print results
    print(f'\n{model_name} Results:')
    print(f'Test Accuracy: {accuracy*100:.2f}%')
    print(f'Test Loss: {loss:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    
    return history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred

def plot_channel_excitation(model, X_test, num_heads, title="Channel Excitation Analysis"):
    # Get SE layers
    se_layers = [layer for layer in model.layers if isinstance(layer, SqueezeExciteBlock)]
    
    # Create models for each SE block
    se_models = []
    for i in range(num_heads):
        se_output = se_layers[i].dense2.output
        se_model = Model(inputs=model.input, outputs=se_output)
        se_models.append(se_model)
    
    # Analyze channel excitation per activity
    y_true = np.argmax(y_test, axis=1)
    activity_channel_excitation = []
    
    plt.figure(figsize=(15, 10))
    
    for i, activity in enumerate(activity_labels['activity']):
        activity_mask = (y_true == i)
        activity_samples = X_test[activity_mask]
        
        if len(activity_samples) > 0:
            # Get average excitation for each head
            head_excitations = []
            for h in range(num_heads):
                excitation = se_models[h].predict(activity_samples[:5])  # Use first 5 samples
                head_excitations.append(np.mean(excitation, axis=0))
            
            # Plot excitation pattern for this activity
            plt.subplot(len(activity_labels), 1, i+1)
            for h in range(num_heads):
                plt.plot(head_excitations[h].flatten(), 
                        label=f'Head {h+1}', 
                        alpha=0.7)
            
            plt.title(f'Channel Excitation Pattern - {activity}')
            plt.xlabel('Channel')
            plt.ylabel('Excitation')
            if i == 0:  # Only show legend for first subplot
                plt.legend()
    
    plt.tight_layout()
    plt.savefig(f'channel_excitation_{title.lower().replace(" ", "_")}.png')
    plt.close()

def save_detailed_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads):
    with open(f'{model_name.lower().replace(" ", "_")}_detailed_results.txt', 'w') as f:
        # Model architecture
        f.write(f"{model_name} Configuration\n")
        f.write("="*50 + "\n")
        f.write(f"Number of heads: {num_heads}\n")
        f.write(f"SE ratio: 16\n\n")
        
        # Performance metrics
        f.write("Performance Metrics\n")
        f.write("-"*50 + "\n")
        f.write(f"Test Accuracy: {accuracy*100:.2f}%\n")
        f.write(f"Test Loss: {model.evaluate(X_test, y_test, verbose=0)[0]:.4f}\n")
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"F1 Score: {f1:.4f}\n\n")
        
        # Per-class metrics
        f.write("Per-class Performance\n")
        f.write("-"*50 + "\n")
        for i, activity in enumerate(activity_labels['activity']):
            true_class = (y_true == i)
            pred_class = (y_pred_classes == i)
            class_precision = precision_score(true_class, pred_class, average='binary')
            class_recall = recall_score(true_class, pred_class, average='binary')
            class_f1 = f1_score(true_class, pred_class, average='binary')
            
            f.write(f"\n{activity}:\n")
            f.write(f"Precision: {class_precision:.4f}\n")
            f.write(f"Recall: {class_recall:.4f}\n")
            f.write(f"F1 Score: {class_f1:.4f}\n")
        
        # Training history
        f.write("\nTraining History\n")
        f.write("-"*50 + "\n")
        f.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n")
        for i in range(len(history.history['loss'])):
            f.write(f"{i+1}\t{history.history['loss'][i]:.4f}\t")
            f.write(f"{history.history['accuracy'][i]:.4f}\t")
            f.write(f"{history.history['val_loss'][i]:.4f}\t")
            f.write(f"{history.history['val_accuracy'][i]:.4f}\n")

# Main execution
if __name__ == "__main__":
    print("UCI HAR Dataset - Multi-head LSTM with SE Implementation")
    print("="*50)
    
    # Model parameters
    num_heads = 3
    
    # Create and train model
    input_shape = (X_train.shape[1], X_train.shape[2])
    num_classes = y_train.shape[1]
    model = multi_head_lstm_se(input_shape, num_classes, num_heads)
    
    # Print model summary
    print("\nModel Architecture:")
    model.summary()
    
    # Train and evaluate
    results = train_and_evaluate_model(
        model, X_train, y_train, X_test, y_test, "Multi-head LSTM with SE"
    )
    history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred = results
    
    # Generate visualizations
    plot_training_history(history, "Multi-head LSTM with SE")
    plot_confusion_matrix(y_true, y_pred_classes, "Multi-head LSTM with SE")
    plot_se_activations(model, X_test, "Multi-head LSTM with SE")
    plot_channel_excitation(model, X_test, num_heads, "Multi-head LSTM with SE")
    
    # Save detailed results
    save_detailed_results("Multi-head LSTM with SE", history, accuracy, 
                         precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads)
    
    # Save model
    model.save('multihead_lstm_se_uci_har.h5')
    
    print("\nAnalysis complete. All results and visualizations have been saved.")
    print(f"\nModel saved as 'multihead_lstm_se_uci_har.h5'")
    print(f"Detailed results saved as 'multihead_lstm_se_detailed_results.txt'")

    # Print per-activity performance summary
    print("\nPer-activity Performance Summary:")
    for i, activity in enumerate(activity_labels['activity']):
        true_class = (y_true == i)
        pred_class = (y_pred_classes == i)
        class_f1 = f1_score(true_class, pred_class, average='binary')
        print(f"{activity:20} F1-Score: {class_f1:.4f}")