ML-REFRANCE-CODES / PAMAP DATASET CODES / SIMPLE LSTM WITH ONE LAYER.py
SIMPLE LSTM WITH ONE LAYER.py
Raw
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, roc_curve, auc
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import tensorflow as tf

# GPU Configuration
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU is available")
    except RuntimeError as e:
        print(e)
else:
    print("GPU is not available, using CPU")

# Paths to the datasets
PROTOCOL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Protocol"
OPTIONAL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Optional"

# Column names based on the readme file
COLUMN_NAMES = [
    'timestamp', 'activity_id', 'heart_rate',
    'hand_temperature', 'hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3',
    'hand_acc6_1', 'hand_acc6_2', 'hand_acc6_3',
    'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3',
    'hand_magno_1', 'hand_magno_2', 'hand_magno_3',
    'hand_ori_1', 'hand_ori_2', 'hand_ori_3', 'hand_ori_4',
    'chest_temperature', 'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3',
    'chest_acc6_1', 'chest_acc6_2', 'chest_acc6_3',
    'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3',
    'chest_magno_1', 'chest_magno_2', 'chest_magno_3',
    'chest_ori_1', 'chest_ori_2', 'chest_ori_3', 'chest_ori_4',
    'ankle_temperature', 'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3',
    'ankle_acc6_1', 'ankle_acc6_2', 'ankle_acc6_3',
    'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3',
    'ankle_magno_1', 'ankle_magno_2', 'ankle_magno_3',
    'ankle_ori_1', 'ankle_ori_2', 'ankle_ori_3', 'ankle_ori_4'
]

# Activity labels
ACTIVITY_LABELS = {
    1: 'lying',
    2: 'sitting',
    3: 'standing',
    4: 'walking',
    5: 'running',
    6: 'cycling',
    7: 'Nordic walking',
    9: 'watching TV',
    10: 'computer work',
    11: 'car driving',
    12: 'ascending stairs',
    13: 'descending stairs',
    16: 'vacuum cleaning',
    17: 'ironing',
    18: 'folding laundry',
    19: 'house cleaning',
    20: 'playing soccer',
    24: 'rope jumping'
}

def load_data(file_path):
    """Load data from a single file."""
    data = pd.read_csv(file_path, sep=' ', header=None, names=COLUMN_NAMES)
    return data

def load_all_data(base_path):
    """Load all data files from the given path."""
    all_data = []
    for file in os.listdir(base_path):
        if file.endswith(".dat"):
            file_path = os.path.join(base_path, file)
            data = load_data(file_path)
            all_data.append(data)
    return pd.concat(all_data, ignore_index=True)

def preprocess_data(data):
    """Preprocess the loaded data."""
    # Remove rows with activity_id 0 (transient activities)
    data = data[data['activity_id'] != 0]
    
    # Handle missing values (NaN)
    data = data.dropna()
    
    # Select relevant features (using 16g accelerometer data as recommended)
    features = ['hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3',
                'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3',
                'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3',
                'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3',
                'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3',
                'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3',
                'heart_rate']
    
    X = data[features]
    y = data['activity_id']
    
    # Normalize features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    return X_scaled, y

# Load and preprocess data
print("Loading protocol data...")
protocol_data = load_all_data(PROTOCOL_PATH)
print("Loading optional data...")
optional_data = load_all_data(OPTIONAL_PATH)

# Combine protocol and optional data
all_data = pd.concat([protocol_data, optional_data], ignore_index=True)

print("Preprocessing data...")
X, y = preprocess_data(all_data)

print("Data loading and preprocessing complete.")
print(f"Features shape: {X.shape}")
print(f"Labels shape: {y.shape}")

# Reshape the data for LSTM input (samples, time steps, features)
X = X.reshape((X.shape[0], 1, X.shape[1]))

# Convert labels to categorical
y = to_categorical(y)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Simple LSTM Model definition
def build_simple_lstm(input_shape, num_classes):
    model = Sequential([
        LSTM(100, input_shape=input_shape),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Function to train and evaluate the model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, name="Simple LSTM"):
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    
    history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=1)
    
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f'{name} - Test accuracy: {test_accuracy*100:.2f}%')
    print(f'{name} - Test loss: {test_loss:.4f}')
    
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    f1 = f1_score(y_true, y_pred_classes, average='weighted')
    precision = precision_score(y_true, y_pred_classes, average='weighted')
    recall = recall_score(y_true, y_pred_classes, average='weighted')
    
    print(f'{name} - F1 Score: {f1:.4f}')
    print(f'{name} - Precision: {precision:.4f}')
    print(f'{name} - Recall: {recall:.4f}')
    
    return history, test_accuracy, f1, precision, recall, y_pred_classes, y_pred

# Visualization functions
def plot_accuracy(history, name="Simple LSTM"):
    plt.figure(figsize=(12, 8))
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title(f'{name} - Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend()
    plt.savefig(f'accuracy_{name.lower().replace(" ", "_")}.png')
    plt.close()

def plot_loss(history, name="Simple LSTM"):
    plt.figure(figsize=(12, 8))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'{name} - Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend()
    plt.savefig(f'loss_{name.lower().replace(" ", "_")}.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred, name="Simple LSTM"):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(15, 15))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(f'{name} - Confusion Matrix')
    plt.colorbar()
    
    # Get the unique activity IDs present in the test set
    unique_activities = np.unique(y_true)
    
    # Create labels for the confusion matrix
    labels = [ACTIVITY_LABELS.get(act, f'Unknown ({act})') for act in unique_activities]
    
    plt.xticks(np.arange(len(labels)), labels, rotation=45, ha='right')
    plt.yticks(np.arange(len(labels)), labels)
    
    # Add text annotations
    thresh = cm.max() / 2.
    for i, j in np.ndindex(cm.shape):
        plt.text(j, i, format(cm[i, j], 'd'),
                ha="center", va="center",
                color="white" if cm[i, j] > thresh else "black")
    
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.savefig(f'confusion_matrix_{name.lower().replace(" ", "_")}.png')
    plt.close()

def plot_metrics_bar(accuracy, f1, precision, recall, name="Simple LSTM"):
    metrics = ['Accuracy', 'F1 Score', 'Precision', 'Recall']
    values = [accuracy, f1, precision, recall]
    
    plt.figure(figsize=(10, 6))
    plt.bar(metrics, values)
    plt.title(f'{name} - Performance Metrics')
    plt.ylim(0, 1)
    
    # Add value labels on top of each bar
    for i, v in enumerate(values):
        plt.text(i, v + 0.01, f'{v:.4f}', ha='center')
    
    plt.tight_layout()
    plt.savefig(f'metrics_{name.lower().replace(" ", "_")}.png')
    plt.close()

# Main execution
if __name__ == "__main__":
    # Create and compile model
    input_shape = (X_train.shape[1], X_train.shape[2])
    num_classes = y_train.shape[1]
    model = build_simple_lstm(input_shape, num_classes)
    
    # Train and evaluate model
    print("\nTraining and evaluating Simple LSTM...")
    history, accuracy, f1, precision, recall, y_pred_classes, y_pred = train_and_evaluate_model(
        model, X_train, y_train, X_test, y_test, "Simple LSTM"
    )
    
    # Generate visualizations
    plot_accuracy(history)
    plot_loss(history)
    plot_confusion_matrix(np.argmax(y_test, axis=1), y_pred_classes)
    plot_metrics_bar(accuracy, f1, precision, recall)
    
    # Save the model
    model.save('simple_lstm_pamap2.h5')
    
    print("\nAnalysis complete. All plots have been saved.")
    print(f"Final test accuracy: {accuracy*100:.2f}%")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")