ML-REFRANCE-CODES / UCI DATASET CODES / DEEP LSTM WITH 2 LAYERS.py
DEEP LSTM WITH 2 LAYERS.py
Raw
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

# Suppress TensorFlow GPU warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Paths to the dataset files
dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset'

# Load and preprocess data
def load_data():
    # Load the feature labels
    features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'), 
                          delim_whitespace=True, 
                          header=None, 
                          names=['index', 'feature'])
    
    # Load the activity labels
    activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'), 
                                delim_whitespace=True, 
                                header=None, 
                                names=['index', 'activity'])
    
    # Load training data
    X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'), 
                         delim_whitespace=True, 
                         header=None)
    y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'), 
                         delim_whitespace=True, 
                         header=None, 
                         names=['activity'])
    
    # Load test data
    X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'), 
                        delim_whitespace=True, 
                        header=None)
    y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'), 
                        delim_whitespace=True, 
                        header=None, 
                        names=['activity'])
    
    # Label the columns
    X_train.columns = features['feature']
    X_test.columns = features['feature']
    
    # Normalize the feature data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Reshape the data for LSTM (samples, time steps, features)
    X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
    X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))
    
    # Encode activity labels
    encoder = LabelEncoder()
    y_train_encoded = encoder.fit_transform(y_train['activity'])
    y_test_encoded = encoder.transform(y_test['activity'])
    
    # Convert to categorical
    y_train_categorical = to_categorical(y_train_encoded)
    y_test_categorical = to_categorical(y_test_encoded)
    
    return (X_train_reshaped, y_train_categorical, 
            X_test_reshaped, y_test_categorical, 
            activity_labels, X_train_scaled)

# Load data
print("Loading and preprocessing data...")
X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data()
print("Data loading completed.")
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Number of activities: {len(activity_labels)}")

# Visualization functions
def plot_training_history(history, title="Model Training History"):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy plot
    ax1.plot(history.history['accuracy'], label='Training Accuracy')
    ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
    ax1.set_title(f'{title} - Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    
    # Loss plot
    ax2.plot(history.history['loss'], label='Training Loss')
    ax2.plot(history.history['val_loss'], label='Validation Loss')
    ax2.set_title(f'{title} - Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    
    plt.tight_layout()
    plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    # Set x and y axis labels using activity names
    tick_positions = np.arange(len(activity_labels)) + 0.5
    plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right')
    plt.yticks(tick_positions, activity_labels['activity'], rotation=0)
    
    plt.tight_layout()
    plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_feature_importance(model, title="Feature Importance"):
    # Get the weights from the first LSTM layer
    weights = np.abs(model.layers[0].get_weights()[0]).mean(axis=0).mean(axis=0)
    
    # Create a dataframe of features and their importance
    feature_importance = pd.DataFrame({
        'feature': X_train_scaled.columns,
        'importance': weights
    }).sort_values('importance', ascending=False)
    
    # Plot top 20 features
    plt.figure(figsize=(12, 6))
    sns.barplot(data=feature_importance.head(20), x='importance', y='feature')
    plt.title(f'{title} - Top 20 Features')
    plt.tight_layout()
    plt.savefig(f'feature_importance_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_layer_activations(model, X_sample, layer_names, title="Layer Activations"):
    # Create models that output layer activations
    layer_outputs = [layer.output for layer in model.layers if layer.name in layer_names]
    activation_model = Model(inputs=model.input, outputs=layer_outputs)
    
    # Get activations
    activations = activation_model.predict(X_sample[:5])  # Get first 5 samples
    
    # Plot activations for each layer
    for i, layer_name in enumerate(layer_names):
        plt.figure(figsize=(10, 4))
        plt.title(f'{title} - {layer_name}')
        plt.imshow(activations[i][0].T, aspect='auto', cmap='viridis')
        plt.colorbar(label='Activation')
        plt.xlabel('Time Step')
        plt.ylabel('Units')
        plt.tight_layout()
        plt.savefig(f'layer_activation_{layer_name.lower()}_{title.lower().replace(" ", "_")}.png')
        plt.close()
        # Deep LSTM Model definition
def deep_lstm(input_shape, num_classes):
    model = Sequential([
        LSTM(100, input_shape=input_shape, return_sequences=True),
        Dropout(0.5),
        LSTM(100, return_sequences=False),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Function to train and evaluate model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="Deep LSTM"):
    # Compile model
    model.compile(optimizer=Adam(learning_rate=0.001),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])
    
    # Define callbacks
    reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                                 factor=0.2,
                                 patience=5,
                                 min_lr=0.00001)
    
    early_stopping = EarlyStopping(monitor='val_loss',
                                 patience=10,
                                 restore_best_weights=True)
    
    # Train model
    print(f"\nTraining {model_name}...")
    history = model.fit(X_train, y_train,
                       epochs=20,
                       batch_size=64,
                       validation_split=0.2,
                       verbose=1,
                       callbacks=[reduce_lr, early_stopping])
    
    # Evaluate model
    print(f"\nEvaluating {model_name}...")
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    
    # Make predictions
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    # Calculate metrics
    precision = precision_score(y_true, y_pred_classes, average='weighted')
    recall = recall_score(y_true, y_pred_classes, average='weighted')
    f1 = f1_score(y_true, y_pred_classes, average='weighted')
    
    # Print results
    print(f'\n{model_name} Results:')
    print(f'Test Accuracy: {accuracy*100:.2f}%')
    print(f'Test Loss: {loss:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    
    return history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred

def plot_learning_curves(history, model_name="Deep LSTM"):
    plt.figure(figsize=(12, 4))
    
    # Plot learning rate
    plt.subplot(1, 2, 1)
    plt.plot(history.history['lr'] if 'lr' in history.history else 
             [0.001] * len(history.history['loss']), marker='o')
    plt.title(f'{model_name} - Learning Rate Adjustment')
    plt.xlabel('Epoch')
    plt.ylabel('Learning Rate')
    plt.yscale('log')
    
    # Plot validation loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['val_loss'], marker='o')
    plt.title(f'{model_name} - Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    
    plt.tight_layout()
    plt.savefig(f'learning_curves_{model_name.lower().replace(" ", "_")}.png')
    plt.close()

def save_detailed_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred):
    with open(f'{model_name.lower().replace(" ", "_")}_detailed_results.txt', 'w') as f:
        # Basic metrics
        f.write(f"{model_name} Detailed Results\n")
        f.write("="*50 + "\n\n")
        f.write(f"Test Accuracy: {accuracy*100:.2f}%\n")
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"F1 Score: {f1:.4f}\n\n")
        
        # Classification report
        f.write("Classification Report:\n")
        f.write(classification_report(y_true, y_pred_classes, 
                                   target_names=activity_labels['activity']))
        
        # Confusion matrix
        f.write("\nConfusion Matrix:\n")
        cm = confusion_matrix(y_true, y_pred_classes)
        f.write(np.array2string(cm, separator=', '))
        
        # Per-class metrics
        f.write("\n\nPer-class Performance:\n")
        for i, activity in enumerate(activity_labels['activity']):
            true_class = (y_true == i)
            pred_class = (y_pred_classes == i)
            class_precision = precision_score(true_class, pred_class, average='binary')
            class_recall = recall_score(true_class, pred_class, average='binary')
            class_f1 = f1_score(true_class, pred_class, average='binary')
            
            f.write(f"\n{activity}:\n")
            f.write(f"Precision: {class_precision:.4f}\n")
            f.write(f"Recall: {class_recall:.4f}\n")
            f.write(f"F1 Score: {class_f1:.4f}\n")

def plot_prediction_distribution(y_true, y_pred_classes, model_name="Deep LSTM"):
    plt.figure(figsize=(12, 6))
    
    # Plot distribution of true vs predicted labels
    true_dist = pd.Series(y_true).value_counts()
    pred_dist = pd.Series(y_pred_classes).value_counts()
    
    x = np.arange(len(activity_labels))
    width = 0.35
    
    plt.bar(x - width/2, true_dist, width, label='True')
    plt.bar(x + width/2, pred_dist, width, label='Predicted')
    
    plt.xlabel('Activity')
    plt.ylabel('Count')
    plt.title(f'{model_name} - True vs Predicted Distribution')
    plt.xticks(x, activity_labels['activity'], rotation=45, ha='right')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig(f'prediction_distribution_{model_name.lower().replace(" ", "_")}.png')
    plt.close()

# Main execution
if __name__ == "__main__":
    print("UCI HAR Dataset - Deep LSTM Implementation")
    print("="*50)
    
    # Create and train model
    input_shape = (X_train.shape[1], X_train.shape[2])
    num_classes = y_train.shape[1]
    model = deep_lstm(input_shape, num_classes)
    
    # Print model summary
    print("\nModel Architecture:")
    model.summary()
    
    # Train and evaluate
    results = train_and_evaluate_model(
        model, X_train, y_train, X_test, y_test, "Deep LSTM"
    )
    history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred = results
    
    # Generate visualizations
    plot_training_history(history, "Deep LSTM")
    plot_confusion_matrix(y_true, y_pred_classes, "Deep LSTM")
    plot_feature_importance(model, "Deep LSTM")
    plot_learning_curves(history, "Deep LSTM")
    plot_layer_activations(model, X_test, ['lstm', 'lstm_1'], "Deep LSTM")
    plot_prediction_distribution(y_true, y_pred_classes, "Deep LSTM")
    
    # Save detailed results
    save_detailed_results("Deep LSTM", history, accuracy, precision, recall, f1,
                         y_true, y_pred_classes, y_pred)
    
    # Save model
    model.save('deep_lstm_uci_har.h5')
    
    print("\nAnalysis complete. All results and visualizations have been saved.")
    print(f"\nModel saved as 'deep_lstm_uci_har.h5'")
    print(f"Detailed results saved as 'deep_lstm_detailed_results.txt'")