ML-REFRANCE-CODES / UCI DATASET CODES / MULTI-HEAD LSTM WITH ATTENTION MODULE.py
MULTI-HEAD LSTM WITH ATTENTION MODULE.py
Raw
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (LSTM, Dense, Dropout, Input, Attention, LayerNormalization, 
                                   Concatenate, GlobalAveragePooling1D)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

# Suppress TensorFlow GPU warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Paths to the dataset files
dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset'

# Load and preprocess data
def load_data():
    # Load the feature labels
    features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'), 
                          delim_whitespace=True, 
                          header=None, 
                          names=['index', 'feature'])
    
    # Load the activity labels
    activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'), 
                                delim_whitespace=True, 
                                header=None, 
                                names=['index', 'activity'])
    
    # Load training data
    X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'), 
                         delim_whitespace=True, 
                         header=None)
    y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'), 
                         delim_whitespace=True, 
                         header=None, 
                         names=['activity'])
    
    # Load test data
    X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'), 
                        delim_whitespace=True, 
                        header=None)
    y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'), 
                        delim_whitespace=True, 
                        header=None, 
                        names=['activity'])
    
    # Label the columns
    X_train.columns = features['feature']
    X_test.columns = features['feature']
    
    # Normalize the feature data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Reshape the data for LSTM (samples, time steps, features)
    X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
    X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))
    
    # Encode activity labels
    encoder = LabelEncoder()
    y_train_encoded = encoder.fit_transform(y_train['activity'])
    y_test_encoded = encoder.transform(y_test['activity'])
    
    # Convert to categorical
    y_train_categorical = to_categorical(y_train_encoded)
    y_test_categorical = to_categorical(y_test_encoded)
    
    return (X_train_reshaped, y_train_categorical, 
            X_test_reshaped, y_test_categorical, 
            activity_labels, X_train_scaled)

# Visualization functions
def plot_training_history(history, title="Model Training History"):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy plot
    ax1.plot(history.history['accuracy'], label='Training Accuracy')
    ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
    ax1.set_title(f'{title} - Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    
    # Loss plot
    ax2.plot(history.history['loss'], label='Training Loss')
    ax2.plot(history.history['val_loss'], label='Validation Loss')
    ax2.set_title(f'{title} - Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    
    plt.tight_layout()
    plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    # Set x and y axis labels using activity names
    tick_positions = np.arange(len(activity_labels)) + 0.5
    plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right')
    plt.yticks(tick_positions, activity_labels['activity'], rotation=0)
    
    plt.tight_layout()
    plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_multi_head_attention(model, X_test, num_heads, title="Multi-head Attention"):
    # Get attention layers
    attention_layers = [layer for layer in model.layers if isinstance(layer, Attention)]
    
    # Create models to get attention weights for each head
    attention_models = [
        Model(inputs=model.input, outputs=layer.output)
        for layer in attention_layers
    ]
    
    # Get attention weights for a sample
    sample_idx = 0
    attention_weights = [
        model.predict(X_test[sample_idx:sample_idx+1])
        for model in attention_models
    ]
    
    # Plot attention weights for each head
    fig, axes = plt.subplots(1, num_heads, figsize=(20, 5))
    for i in range(num_heads):
        im = axes[i].imshow(attention_weights[i][0], aspect='auto', cmap='viridis')
        axes[i].set_title(f'Head {i+1}')
        axes[i].set_xlabel('Time Step')
        axes[i].set_ylabel('Feature Dimension')
        plt.colorbar(im, ax=axes[i])
    
    plt.suptitle(f'{title} - Attention Weights per Head')
    plt.tight_layout()
    plt.savefig(f'multi_head_attention_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_feature_importance_per_head(model, feature_names, num_heads, title="Feature Importance per Head"):
    # Get LSTM layers (one per head)
    lstm_layers = [layer for layer in model.layers if isinstance(layer, LSTM)][:num_heads]
    
    # Get weights for each head
    head_weights = [np.abs(layer.get_weights()[0]).mean(axis=0) for layer in lstm_layers]
    
    # Plot feature importance for each head
    fig, axes = plt.subplots(num_heads, 1, figsize=(12, 4*num_heads))
    for i, weights in enumerate(head_weights):
        # Get top 10 features
        feature_importance = pd.DataFrame({
            'feature': feature_names,
            'importance': weights.mean(axis=1)
        }).sort_values('importance', ascending=False).head(10)
        
        sns.barplot(data=feature_importance, x='importance', y='feature', ax=axes[i])
        axes[i].set_title(f'Head {i+1} - Top 10 Features')
    
    plt.suptitle(f'{title} - Feature Importance Analysis')
    plt.tight_layout()
    plt.savefig(f'feature_importance_per_head_{title.lower().replace(" ", "_")}.png')
    plt.close()

# Load data
print("Loading and preprocessing data...")
X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data()
print("Data loading completed.")
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Number of activities: {len(activity_labels)}")
# Multi-head LSTM with Attention Model definition
def multi_head_lstm_attention(input_shape, num_classes, num_heads=3):
    # Input layer
    inputs = Input(shape=input_shape)
    
    # Create multiple LSTM heads
    lstm_outputs = []
    attention_outputs = []
    
    for _ in range(num_heads):
        # LSTM layers for each head
        lstm = LSTM(50, return_sequences=True)(inputs)
        lstm = Dropout(0.5)(lstm)
        lstm_outputs.append(lstm)
        
        # Attention mechanism for each head
        attention = Attention()([lstm, lstm])
        attention = LayerNormalization()(attention)
        attention_outputs.append(attention)
    
    # Concatenate all LSTM outputs
    x = Concatenate()(lstm_outputs)
    
    # Apply dropout to combined LSTM outputs
    x = Dropout(0.5)(x)
    
    # Concatenate attention outputs
    attention_combined = Concatenate()(attention_outputs)
    
    # Combine LSTM and attention outputs
    x = Concatenate()([x, attention_combined])
    
    # Global pooling
    x = GlobalAveragePooling1D()(x)
    
    # Dense layers
    x = Dense(100, activation='relu')(x)
    x = Dropout(0.5)(x)
    
    # Output layer
    outputs = Dense(num_classes, activation='softmax')(x)
    
    # Create model
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Function to train and evaluate model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="Multi-head LSTM with Attention"):
    # Compile model
    model.compile(optimizer=Adam(learning_rate=0.001),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])
    
    # Define callbacks
    reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                                 factor=0.2,
                                 patience=5,
                                 min_lr=0.00001)
    
    early_stopping = EarlyStopping(monitor='val_loss',
                                 patience=10,
                                 restore_best_weights=True)
    
    # Train model
    print(f"\nTraining {model_name}...")
    history = model.fit(X_train, y_train,
                       epochs=20,
                       batch_size=64,
                       validation_split=0.2,
                       verbose=1,
                       callbacks=[reduce_lr, early_stopping])
    
    # Evaluate model
    print(f"\nEvaluating {model_name}...")
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    
    # Make predictions
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    # Calculate metrics
    precision = precision_score(y_true, y_pred_classes, average='weighted')
    recall = recall_score(y_true, y_pred_classes, average='weighted')
    f1 = f1_score(y_true, y_pred_classes, average='weighted')
    
    # Print results
    print(f'\n{model_name} Results:')
    print(f'Test Accuracy: {accuracy*100:.2f}%')
    print(f'Test Loss: {loss:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    
    return history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred

def plot_head_contribution_analysis(model, X_test, y_test, num_heads, title="Head Contribution Analysis"):
    # Get attention layers
    attention_layers = [layer for layer in model.layers if isinstance(layer, Attention)]
    
    # Create models for each head
    head_models = []
    for i in range(num_heads):
        head_output = attention_layers[i].output
        head_model = Model(inputs=model.input, outputs=head_output)
        head_models.append(head_model)
    
    # Analyze contributions per activity
    y_true = np.argmax(y_test, axis=1)
    activity_head_contributions = np.zeros((len(activity_labels), num_heads))
    
    for i, activity in enumerate(range(len(activity_labels))):
        activity_mask = (y_true == activity)
        activity_samples = X_test[activity_mask]
        
        if len(activity_samples) > 0:
            for h in range(num_heads):
                head_output = head_models[h].predict(activity_samples)
                activity_head_contributions[i, h] = np.mean(np.abs(head_output))
    
    # Plot heatmap of contributions
    plt.figure(figsize=(12, 8))
    sns.heatmap(activity_head_contributions, 
                xticklabels=[f'Head {i+1}' for i in range(num_heads)],
                yticklabels=activity_labels['activity'],
                cmap='YlOrRd',
                annot=True,
                fmt='.2f')
    
    plt.title(f'{title} - Head Contributions per Activity')
    plt.tight_layout()
    plt.savefig(f'head_contributions_{title.lower().replace(" ", "_")}.png')
    plt.close()

def save_detailed_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads):
    with open(f'{model_name.lower().replace(" ", "_")}_detailed_results.txt', 'w') as f:
        # Model architecture
        f.write(f"{model_name} Configuration\n")
        f.write("="*50 + "\n")
        f.write(f"Number of attention heads: {num_heads}\n\n")
        
        # Basic metrics
        f.write("Performance Metrics\n")
        f.write("-"*50 + "\n")
        f.write(f"Test Accuracy: {accuracy*100:.2f}%\n")
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"F1 Score: {f1:.4f}\n\n")
        
        # Per-class metrics
        f.write("Per-class Performance\n")
        f.write("-"*50 + "\n")
        for i, activity in enumerate(activity_labels['activity']):
            true_class = (y_true == i)
            pred_class = (y_pred_classes == i)
            class_precision = precision_score(true_class, pred_class, average='binary')
            class_recall = recall_score(true_class, pred_class, average='binary')
            class_f1 = f1_score(true_class, pred_class, average='binary')
            
            f.write(f"\n{activity}:\n")
            f.write(f"Precision: {class_precision:.4f}\n")
            f.write(f"Recall: {class_recall:.4f}\n")
            f.write(f"F1 Score: {class_f1:.4f}\n")
        
        # Classification report
        f.write("\nClassification Report\n")
        f.write("-"*50 + "\n")
        f.write(classification_report(y_true, y_pred_classes, 
                                   target_names=activity_labels['activity']))
        
        # Training history
        f.write("\nTraining History\n")
        f.write("-"*50 + "\n")
        f.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n")
        for i in range(len(history.history['loss'])):
            f.write(f"{i+1}\t{history.history['loss'][i]:.4f}\t")
            f.write(f"{history.history['accuracy'][i]:.4f}\t")
            f.write(f"{history.history['val_loss'][i]:.4f}\t")
            f.write(f"{history.history['val_accuracy'][i]:.4f}\n")

# Main execution
if __name__ == "__main__":
    print("UCI HAR Dataset - Multi-head LSTM with Attention Implementation")
    print("="*50)
    
    # Model parameters
    num_heads = 3
    
    # Create and train model
    input_shape = (X_train.shape[1], X_train.shape[2])
    num_classes = y_train.shape[1]
    model = multi_head_lstm_attention(input_shape, num_classes, num_heads)
    
    # Print model summary
    print("\nModel Architecture:")
    model.summary()
    
    # Train and evaluate
    results = train_and_evaluate_model(
        model, X_train, y_train, X_test, y_test, "Multi-head LSTM with Attention"
    )
    history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred = results
    
    # Generate visualizations
    plot_training_history(history, "Multi-head LSTM with Attention")
    plot_confusion_matrix(y_true, y_pred_classes, "Multi-head LSTM with Attention")
    plot_multi_head_attention(model, X_test, num_heads, "Multi-head LSTM with Attention")
    plot_feature_importance_per_head(model, X_train_scaled.columns, num_heads, "Multi-head LSTM with Attention")
    plot_head_contribution_analysis(model, X_test, y_test, num_heads, "Multi-head LSTM with Attention")
    
    # Save detailed results
    save_detailed_results("Multi-head LSTM with Attention", history, accuracy, 
                         precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads)
    
    # Save model
    model.save('multihead_lstm_attention_uci_har.h5')
    
    print("\nAnalysis complete. All results and visualizations have been saved.")
    print(f"\nModel saved as 'multihead_lstm_attention_uci_har.h5'")
    print(f"Detailed results saved as 'multihead_lstm_attention_detailed_results.txt'")