ML-REFRANCE-CODES / UCI DATASET CODES / SINGLE LAYER SIMPLE LSTM.py
SINGLE LAYER SIMPLE LSTM.py
Raw
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

# Suppress TensorFlow GPU warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Paths to the dataset files
dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset'

# Load and preprocess data
def load_data():
    # Load the feature labels
    features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'), 
                          delim_whitespace=True, 
                          header=None, 
                          names=['index', 'feature'])
    
    # Load the activity labels
    activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'), 
                                delim_whitespace=True, 
                                header=None, 
                                names=['index', 'activity'])
    
    # Load training data
    X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'), 
                         delim_whitespace=True, 
                         header=None)
    y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'), 
                         delim_whitespace=True, 
                         header=None, 
                         names=['activity'])
    
    # Load test data
    X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'), 
                        delim_whitespace=True, 
                        header=None)
    y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'), 
                        delim_whitespace=True, 
                        header=None, 
                        names=['activity'])
    
    # Label the columns
    X_train.columns = features['feature']
    X_test.columns = features['feature']
    
    # Normalize the feature data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Reshape the data for LSTM (samples, time steps, features)
    X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
    X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))
    
    # Encode activity labels
    encoder = LabelEncoder()
    y_train_encoded = encoder.fit_transform(y_train['activity'])
    y_test_encoded = encoder.transform(y_test['activity'])
    
    # Convert to categorical
    y_train_categorical = to_categorical(y_train_encoded)
    y_test_categorical = to_categorical(y_test_encoded)
    
    return (X_train_reshaped, y_train_categorical, 
            X_test_reshaped, y_test_categorical, 
            activity_labels, X_train_scaled)

# Load data
print("Loading and preprocessing data...")
X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data()
print("Data loading completed.")
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Number of activities: {len(activity_labels)}")

# Visualization functions
def plot_training_history(history, title="Model Training History"):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy plot
    ax1.plot(history.history['accuracy'], label='Training Accuracy')
    ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
    ax1.set_title(f'{title} - Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    
    # Loss plot
    ax2.plot(history.history['loss'], label='Training Loss')
    ax2.plot(history.history['val_loss'], label='Validation Loss')
    ax2.set_title(f'{title} - Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    
    plt.tight_layout()
    plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    # Set x and y axis labels using activity names
    tick_positions = np.arange(len(activity_labels)) + 0.5
    plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right')
    plt.yticks(tick_positions, activity_labels['activity'], rotation=0)
    
    plt.tight_layout()
    plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png')
    plt.close()

def plot_feature_importance(model, title="Feature Importance"):
    # Get the weights from the first dense layer
    weights = np.abs(model.layers[0].get_weights()[0]).mean(axis=0)
    
    # Create a dataframe of features and their importance
    feature_importance = pd.DataFrame({
        'feature': X_train.columns,
        'importance': weights
    }).sort_values('importance', ascending=False)
    
    # Plot top 20 features
    plt.figure(figsize=(12, 6))
    sns.barplot(data=feature_importance.head(20), x='importance', y='feature')
    plt.title(f'{title} - Top 20 Features')
    plt.tight_layout()
    plt.savefig(f'feature_importance_{title.lower().replace(" ", "_")}.png')
    plt.close()
    # Simple LSTM Model definition
def simple_lstm(input_shape, num_classes):
    model = Sequential()
    model.add(LSTM(100, input_shape=input_shape, return_sequences=False))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    return model

# Function to train and evaluate model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="Simple LSTM"):
    # Compile model
    model.compile(optimizer=Adam(learning_rate=0.001),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])
    
    # Define callbacks
    reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                                 factor=0.2,
                                 patience=5,
                                 min_lr=0.00001)
    
    early_stopping = EarlyStopping(monitor='val_loss',
                                 patience=10,
                                 restore_best_weights=True)
    
    # Train model
    print(f"\nTraining {model_name}...")
    history = model.fit(X_train, y_train,
                       epochs=20,
                       batch_size=64,
                       validation_split=0.2,
                       verbose=1,
                       callbacks=[reduce_lr, early_stopping])
    
    # Evaluate model
    print(f"\nEvaluating {model_name}...")
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    
    # Make predictions
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    # Calculate metrics
    precision = precision_score(y_true, y_pred_classes, average='weighted')
    recall = recall_score(y_true, y_pred_classes, average='weighted')
    f1 = f1_score(y_true, y_pred_classes, average='weighted')
    
    # Print results
    print(f'\n{model_name} Results:')
    print(f'Test Accuracy: {accuracy*100:.2f}%')
    print(f'Test Loss: {loss:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    
    return history, accuracy, precision, recall, f1, y_true, y_pred_classes

def save_model_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes):
    # Save results to file
    with open(f'{model_name.lower().replace(" ", "_")}_results.txt', 'w') as f:
        f.write(f"{model_name} Results\n")
        f.write("="*50 + "\n\n")
        
        f.write("Model Performance Metrics:\n")
        f.write(f"Test Accuracy: {accuracy*100:.2f}%\n")
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"F1 Score: {f1:.4f}\n\n")
        
        f.write("Classification Report:\n")
        f.write(classification_report(y_true, y_pred_classes, 
                                   target_names=activity_labels['activity']))
        
        f.write("\nTraining History:\n")
        f.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n")
        for i in range(len(history.history['loss'])):
            f.write(f"{i+1}\t{history.history['loss'][i]:.4f}\t")
            f.write(f"{history.history['accuracy'][i]:.4f}\t")
            f.write(f"{history.history['val_loss'][i]:.4f}\t")
            f.write(f"{history.history['val_accuracy'][i]:.4f}\n")

def plot_model_metrics(history, accuracy, precision, recall, f1, model_name="Simple LSTM"):
    # Plot training history
    plot_training_history(history, model_name)
    
    # Plot metrics comparison
    plt.figure(figsize=(10, 6))
    metrics = ['Accuracy', 'Precision', 'Recall', 'F1 Score']
    values = [accuracy, precision, recall, f1]
    
    plt.bar(metrics, values)
    plt.title(f'{model_name} - Performance Metrics')
    plt.ylim([0, 1])
    
    # Add value labels on top of each bar
    for i, v in enumerate(values):
        plt.text(i, v + 0.01, f'{v:.4f}', ha='center')
    
    plt.tight_layout()
    plt.savefig(f'metrics_comparison_{model_name.lower().replace(" ", "_")}.png')
    plt.close()

def plot_loss_convergence(history, model_name="Simple LSTM"):
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'{model_name} - Loss Convergence')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'loss_convergence_{model_name.lower().replace(" ", "_")}.png')
    plt.close()

# Main execution
if __name__ == "__main__":
    print("UCI HAR Dataset - Simple LSTM Implementation")
    print("="*50)
    
    # Create and train model
    input_shape = (X_train.shape[1], X_train.shape[2])
    num_classes = y_train.shape[1]
    model = simple_lstm(input_shape, num_classes)
    
    # Print model summary
    print("\nModel Architecture:")
    model.summary()
    
    # Train and evaluate
    history, accuracy, precision, recall, f1, y_true, y_pred_classes = train_and_evaluate_model(
        model, X_train, y_train, X_test, y_test, "Simple LSTM"
    )
    
    # Generate visualizations
    plot_model_metrics(history, accuracy, precision, recall, f1)
    plot_confusion_matrix(y_true, y_pred_classes, "Simple LSTM")
    plot_feature_importance(model, "Simple LSTM")
    plot_loss_convergence(history)
    
    # Save results
    save_model_results("Simple LSTM", history, accuracy, precision, recall, f1, y_true, y_pred_classes)
    
    # Save model
    model.save('simple_lstm_uci_har.h5')
    
    print("\nAnalysis complete. All results and visualizations have been saved.")