ML-REFRANCE-CODES / UCI DATASET CODES / DEEP LSTM (2 LAYER) WITH ATTENTION MODULE.py
DEEP LSTM (2 LAYER) WITH ATTENTION MODULE.py
Raw
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (LSTM, Dense, Dropout, Input, Attention, LayerNormalization, 
                                   Concatenate, GlobalAveragePooling1D)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

# Suppress TensorFlow GPU warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Paths to the dataset files
dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset'

# Load and preprocess data
def load_data():
    # Load the feature labels
    features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'), 
                          delim_whitespace=True, 
                          header=None, 
                          names=['index', 'feature'])
    
    # Load the activity labels
    activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'), 
                                delim_whitespace=True, 
                                header=None, 
                                names=['index', 'activity'])
    
    # Load training data
    X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'), 
                         delim_whitespace=True, 
                         header=None)
    y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'), 
                         delim_whitespace=True, 
                         header=None, 
                         names=['activity'])
    
    # Load test data
    X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'), 
                        delim_whitespace=True, 
                        header=None)
    y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'), 
                        delim_whitespace=True, 
                        header=None, 
                        names=['activity'])
    
    # Label the columns
    X_train.columns = features['feature']
    X_test.columns = features['feature']
    
    # Normalize the feature data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Reshape the data for LSTM (samples, time steps, features)
    X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
    X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))
    
    # Encode activity labels
    encoder = LabelEncoder()
    y_train_encoded = encoder.fit_transform(y_train['activity'])
    y_test_encoded = encoder.transform(y_test['activity'])
    
    # Convert to categorical
    y_train_categorical = to_categorical(y_train_encoded)
    y_test_categorical = to_categorical(y_test_encoded)
    
    return (X_train_reshaped, y_train_categorical, 
            X_test_reshaped, y_test_categorical, 
            activity_labels, X_train_scaled)

# Visualization functions
def plot_training_history(history, title="Model Training History"):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy plot
    ax1.plot(history.history['accuracy'], label='Training Accuracy')
    ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
    ax1.set_title(f'{title} - Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    
    # Loss plot
    ax2.plot(history.history['loss'], label='Training Loss')
    ax2.plot(history.history['val_loss'], label='Validation Loss')
    ax2.set_title(f'{title} - Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    
    plt.tight_layout()
    plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    # Set x and y axis labels using activity names
    tick_positions = np.arange(len(activity_labels)) + 0.5
    plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right')
    plt.yticks(tick_positions, activity_labels['activity'], rotation=0)
    
    plt.tight_layout()
    plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_feature_importance(model, title="Feature Importance"):
    # Get the weights from the first LSTM layer
    weights = np.abs(model.layers[0].get_weights()[0]).mean(axis=0).mean(axis=0)
    
    # Create a dataframe of features and their importance
    feature_importance = pd.DataFrame({
        'feature': X_train_scaled.columns,
        'importance': weights
    }).sort_values('importance', ascending=False)
    
    # Plot top 20 features
    plt.figure(figsize=(12, 6))
    sns.barplot(data=feature_importance.head(20), x='importance', y='feature')
    plt.title(f'{title} - Top 20 Features')
    plt.tight_layout()
    plt.savefig(f'feature_importance_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_attention_weights(model, X_sample, title="Attention Weights"):
    # Get attention layer
    attention_layer = [layer for layer in model.layers if isinstance(layer, Attention)][0]
    
    # Create a model that outputs attention weights
    attention_model = Model(
        inputs=model.input,
        outputs=attention_layer.output
    )
    
    # Get attention weights for a few samples
    attention_weights = attention_model.predict(X_sample[:5])
    
    # Plot attention weights
    plt.figure(figsize=(12, 6))
    plt.imshow(attention_weights[0], aspect='auto', cmap='viridis')
    plt.colorbar(label='Attention Weight')
    plt.title(f'{title} - Attention Weights Visualization')
    plt.xlabel('Time Step')
    plt.ylabel('Feature Dimension')
    plt.tight_layout()
    plt.savefig(f'attention_weights_{title.lower().replace(" ", "_")}.png')
    plt.close()

# Load data
print("Loading and preprocessing data...")
X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data()
print("Data loading completed.")
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Number of activities: {len(activity_labels)}")
# LSTM with Attention Model definition
def lstm_attention(input_shape, num_classes):
    # Input layer
    inputs = Input(shape=input_shape)
    
    # First LSTM layer
    x = LSTM(100, return_sequences=True)(inputs)
    x = Dropout(0.5)(x)
    
    # Second LSTM layer
    x = LSTM(100, return_sequences=True)(x)
    x = Dropout(0.5)(x)
    
    # Attention mechanism
    attention = Attention()([x, x])
    attention = LayerNormalization()(attention)
    
    # Combine LSTM output with attention context
    x = Concatenate()([x, attention])
    
    # Global pooling
    x = GlobalAveragePooling1D()(x)
    
    # Dense layers
    x = Dense(100, activation='relu')(x)
    x = Dropout(0.5)(x)
    
    # Output layer
    outputs = Dense(num_classes, activation='softmax')(x)
    
    # Create model
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Function to train and evaluate model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="LSTM with Attention"):
    # Compile model
    model.compile(optimizer=Adam(learning_rate=0.001),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])
    
    # Define callbacks
    reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                                 factor=0.2,
                                 patience=5,
                                 min_lr=0.00001)
    
    early_stopping = EarlyStopping(monitor='val_loss',
                                 patience=10,
                                 restore_best_weights=True)
    
    # Train model
    print(f"\nTraining {model_name}...")
    history = model.fit(X_train, y_train,
                       epochs=20,
                       batch_size=64,
                       validation_split=0.2,
                       verbose=1,
                       callbacks=[reduce_lr, early_stopping])
    
    # Evaluate model
    print(f"\nEvaluating {model_name}...")
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    
    # Make predictions
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    # Calculate metrics
    precision = precision_score(y_true, y_pred_classes, average='weighted')
    recall = recall_score(y_true, y_pred_classes, average='weighted')
    f1 = f1_score(y_true, y_pred_classes, average='weighted')
    
    # Print results
    print(f'\n{model_name} Results:')
    print(f'Test Accuracy: {accuracy*100:.2f}%')
    print(f'Test Loss: {loss:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    
    return history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred

def plot_attention_analysis(model, X_test, y_test, activity_labels, model_name="LSTM with Attention"):
    # Get attention weights for test samples
    attention_layer = [layer for layer in model.layers if isinstance(layer, Attention)][0]
    attention_model = Model(inputs=model.input, outputs=attention_layer.output)
    attention_weights = attention_model.predict(X_test)
    
    # Plot average attention weights per activity
    plt.figure(figsize=(15, 8))
    y_true = np.argmax(y_test, axis=1)
    
    for i, activity in enumerate(activity_labels['activity']):
        activity_mask = (y_true == i)
        if np.any(activity_mask):
            avg_attention = np.mean(attention_weights[activity_mask], axis=0)
            plt.plot(avg_attention.mean(axis=1), label=activity)
    
    plt.title(f'{model_name} - Average Attention Weights per Activity')
    plt.xlabel('Time Step')
    plt.ylabel('Average Attention Weight')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()
    plt.savefig(f'attention_analysis_{model_name.lower().replace(" ", "_")}.png')
    plt.close()

def save_detailed_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred):
    with open(f'{model_name.lower().replace(" ", "_")}_detailed_results.txt', 'w') as f:
        # Basic metrics
        f.write(f"{model_name} Detailed Results\n")
        f.write("="*50 + "\n\n")
        f.write(f"Test Accuracy: {accuracy*100:.2f}%\n")
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"F1 Score: {f1:.4f}\n\n")
        
        # Classification report
        f.write("Classification Report:\n")
        f.write(classification_report(y_true, y_pred_classes, 
                                   target_names=activity_labels['activity']))
        
        # Confusion matrix
        f.write("\nConfusion Matrix:\n")
        cm = confusion_matrix(y_true, y_pred_classes)
        f.write(np.array2string(cm, separator=', '))
        
        # Training history
        f.write("\n\nTraining History:\n")
        f.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n")
        for i in range(len(history.history['loss'])):
            f.write(f"{i+1}\t{history.history['loss'][i]:.4f}\t")
            f.write(f"{history.history['accuracy'][i]:.4f}\t")
            f.write(f"{history.history['val_loss'][i]:.4f}\t")
            f.write(f"{history.history['val_accuracy'][i]:.4f}\n")

# Main execution
if __name__ == "__main__":
    print("UCI HAR Dataset - LSTM with Attention Implementation")
    print("="*50)
    
    # Create and train model
    input_shape = (X_train.shape[1], X_train.shape[2])
    num_classes = y_train.shape[1]
    model = lstm_attention(input_shape, num_classes)
    
    # Print model summary
    print("\nModel Architecture:")
    model.summary()
    
    # Train and evaluate
    history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred = train_and_evaluate_model(
        model, X_train, y_train, X_test, y_test, "LSTM with Attention"
    )
    
    # Generate visualizations
    plot_training_history(history, "LSTM with Attention")
    plot_confusion_matrix(y_true, y_pred_classes, "LSTM with Attention")
    plot_feature_importance(model, "LSTM with Attention")
    plot_attention_weights(model, X_test, "LSTM with Attention")
    plot_attention_analysis(model, X_test, y_test, activity_labels, "LSTM with Attention")
    
    # Save detailed results
    save_detailed_results("LSTM with Attention", history, accuracy, precision, recall, f1,
                         y_true, y_pred_classes, y_pred)
    
    # Save model
    model.save('lstm_attention_uci_har.h5')
    
    print("\nAnalysis complete. All results and visualizations have been saved.")
    print(f"\nModel saved as 'lstm_attention_uci_har.h5'")
    print(f"Detailed results saved as 'lstm_attention_detailed_results.txt'")

    # Print per-class performance
    print("\nPer-class Performance:")
    for i, activity in enumerate(activity_labels['activity']):
        true_class = (y_true == i)
        pred_class = (y_pred_classes == i)
        class_precision = precision_score(true_class, pred_class, average='binary')
        class_recall = recall_score(true_class, pred_class, average='binary')
        class_f1 = f1_score(true_class, pred_class, average='binary')
        
        print(f"\n{activity}:")
        print(f"Precision: {class_precision:.4f}")
        print(f"Recall: {class_recall:.4f}")
        print(f"F1 Score: {class_f1:.4f}")