import os import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, roc_curve, auc from tensorflow.keras.models import Model, Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, Attention, Concatenate from tensorflow.keras.optimizers import Adam from tensorflow.keras.utils import to_categorical import tensorflow as tf # GPU Configuration gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) print("GPU is available") except RuntimeError as e: print(e) else: print("GPU is not available, using CPU") # Paths to the datasets PROTOCOL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Protocol" OPTIONAL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Optional" # Column names based on the readme file COLUMN_NAMES = [ 'timestamp', 'activity_id', 'heart_rate', 'hand_temperature', 'hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3', 'hand_acc6_1', 'hand_acc6_2', 'hand_acc6_3', 'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3', 'hand_magno_1', 'hand_magno_2', 'hand_magno_3', 'hand_ori_1', 'hand_ori_2', 'hand_ori_3', 'hand_ori_4', 'chest_temperature', 'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3', 'chest_acc6_1', 'chest_acc6_2', 'chest_acc6_3', 'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3', 'chest_magno_1', 'chest_magno_2', 'chest_magno_3', 'chest_ori_1', 'chest_ori_2', 'chest_ori_3', 'chest_ori_4', 'ankle_temperature', 'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3', 'ankle_acc6_1', 'ankle_acc6_2', 'ankle_acc6_3', 'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3', 'ankle_magno_1', 'ankle_magno_2', 'ankle_magno_3', 'ankle_ori_1', 'ankle_ori_2', 'ankle_ori_3', 'ankle_ori_4' ] # Activity labels ACTIVITY_LABELS = { 1: 'lying', 2: 'sitting', 3: 'standing', 4: 'walking', 5: 'running', 6: 'cycling', 7: 'Nordic walking', 9: 'watching TV', 10: 'computer work', 11: 'car driving', 12: 'ascending stairs', 13: 'descending stairs', 16: 'vacuum cleaning', 17: 'ironing', 18: 'folding laundry', 19: 'house cleaning', 20: 'playing soccer', 24: 'rope jumping' } def load_data(file_path): """Load data from a single file.""" data = pd.read_csv(file_path, sep=' ', header=None, names=COLUMN_NAMES) return data def load_all_data(base_path): """Load all data files from the given path.""" all_data = [] for file in os.listdir(base_path): if file.endswith(".dat"): file_path = os.path.join(base_path, file) data = load_data(file_path) all_data.append(data) return pd.concat(all_data, ignore_index=True) def preprocess_data(data): """Preprocess the loaded data.""" # Remove rows with activity_id 0 (transient activities) data = data[data['activity_id'] != 0] # Handle missing values (NaN) data = data.dropna() # Select relevant features (using 16g accelerometer data as recommended) features = ['hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3', 'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3', 'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3', 'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3', 'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3', 'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3', 'heart_rate'] X = data[features] y = data['activity_id'] # Normalize features scaler = StandardScaler() X_scaled = scaler.fit_transform(X) return X_scaled, y # Load and preprocess data print("Loading protocol data...") protocol_data = load_all_data(PROTOCOL_PATH) print("Loading optional data...") optional_data = load_all_data(OPTIONAL_PATH) # Combine protocol and optional data all_data = pd.concat([protocol_data, optional_data], ignore_index=True) print("Preprocessing data...") X, y = preprocess_data(all_data) print("Data loading and preprocessing complete.") print(f"Features shape: {X.shape}") print(f"Labels shape: {y.shape}") # Reshape the data for LSTM input (samples, time steps, features) X = X.reshape((X.shape[0], 1, X.shape[1])) # Convert labels to categorical y = to_categorical(y) # Split the data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Multi-head LSTM with Attention Model definition def build_multi_head_lstm_attention(input_shape, num_classes, num_heads=3): # Input layer inputs = Input(shape=input_shape) # Create multiple LSTM heads lstm_heads = [LSTM(50, return_sequences=True)(inputs) for _ in range(num_heads)] lstm_heads = [Dropout(0.3)(head) for head in lstm_heads] # Concatenate LSTM outputs concat_lstm = Concatenate()(lstm_heads) # Apply attention mechanism attention = Attention()([concat_lstm, concat_lstm]) # Combine LSTM outputs with attention context concat = Concatenate()([concat_lstm, attention]) # Final LSTM layer to process combined features x = LSTM(50)(concat) x = Dropout(0.3)(x) # Output layer outputs = Dense(num_classes, activation='softmax')(x) # Create model model = Model(inputs=inputs, outputs=outputs) return model # Function to train and evaluate the model def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, name="Multi-head LSTM with Attention"): optimizer = Adam(learning_rate=0.001) model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=1) test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0) print(f'{name} - Test accuracy: {test_accuracy*100:.2f}%') print(f'{name} - Test loss: {test_loss:.4f}') y_pred = model.predict(X_test) y_pred_classes = np.argmax(y_pred, axis=1) y_true = np.argmax(y_test, axis=1) f1 = f1_score(y_true, y_pred_classes, average='weighted') precision = precision_score(y_true, y_pred_classes, average='weighted') recall = recall_score(y_true, y_pred_classes, average='weighted') print(f'{name} - F1 Score: {f1:.4f}') print(f'{name} - Precision: {precision:.4f}') print(f'{name} - Recall: {recall:.4f}') return history, test_accuracy, f1, precision, recall, y_pred_classes, y_pred def plot_accuracy(history, name="Multi-head LSTM with Attention"): plt.figure(figsize=(12, 8)) plt.plot(history.history['accuracy'], label='Training Accuracy') plt.plot(history.history['val_accuracy'], label='Validation Accuracy') plt.title(f'{name} - Model Accuracy') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend() plt.savefig(f'accuracy_{name.lower().replace(" ", "_")}.png') plt.close() def plot_loss(history, name="Multi-head LSTM with Attention"): plt.figure(figsize=(12, 8)) plt.plot(history.history['loss'], label='Training Loss') plt.plot(history.history['val_loss'], label='Validation Loss') plt.title(f'{name} - Model Loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend() plt.savefig(f'loss_{name.lower().replace(" ", "_")}.png') plt.close() def plot_confusion_matrix(y_true, y_pred, name="Multi-head LSTM with Attention"): cm = confusion_matrix(y_true, y_pred) plt.figure(figsize=(15, 15)) plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) plt.title(f'{name} - Confusion Matrix') plt.colorbar() # Get the unique activity IDs present in the test set unique_activities = np.unique(y_true) # Create labels for the confusion matrix labels = [ACTIVITY_LABELS.get(act, f'Unknown ({act})') for act in unique_activities] plt.xticks(np.arange(len(labels)), labels, rotation=45, ha='right') plt.yticks(np.arange(len(labels)), labels) # Add text annotations thresh = cm.max() / 2. for i, j in np.ndindex(cm.shape): plt.text(j, i, format(cm[i, j], 'd'), ha="center", va="center", color="white" if cm[i, j] > thresh else "black") plt.ylabel('True label') plt.xlabel('Predicted label') plt.tight_layout() plt.savefig(f'confusion_matrix_{name.lower().replace(" ", "_")}.png') plt.close() def plot_metrics_bar(accuracy, f1, precision, recall, name="Multi-head LSTM with Attention"): metrics = ['Accuracy', 'F1 Score', 'Precision', 'Recall'] values = [accuracy, f1, precision, recall] plt.figure(figsize=(10, 6)) plt.bar(metrics, values) plt.title(f'{name} - Performance Metrics') plt.ylim(0, 1) # Add value labels on top of each bar for i, v in enumerate(values): plt.text(i, v + 0.01, f'{v:.4f}', ha='center') plt.tight_layout() plt.savefig(f'metrics_{name.lower().replace(" ", "_")}.png') plt.close() def plot_multi_head_attention(model, X_test, name="Multi-head LSTM with Attention"): # Get attention layer attention_layer = [layer for layer in model.layers if isinstance(layer, Attention)][0] attention_func = tf.keras.backend.function([model.input], [attention_layer.output]) # Get attention weights for a few test samples sample_size = min(5, len(X_test)) attention_weights = attention_func([X_test[:sample_size]])[0] # Plot attention weights for each head num_heads = 3 # Number of attention heads used in the model fig, axes = plt.subplots(1, num_heads, figsize=(20, 6)) fig.suptitle(f'{name} - Multi-head Attention Weights') for i in range(num_heads): head_weights = attention_weights[:, :, i*50:(i+1)*50].mean(axis=2) im = axes[i].imshow(head_weights.T, aspect='auto', cmap='viridis') axes[i].set_title(f'Head {i+1}') axes[i].set_xlabel('Sample') axes[i].set_ylabel('Time Step') fig.colorbar(im, ax=axes[i]) plt.tight_layout() plt.savefig(f'attention_weights_{name.lower().replace(" ", "_")}.png') plt.close() def plot_roc_curves(y_test, y_pred, name="Multi-head LSTM with Attention"): n_classes = y_test.shape[1] fpr = dict() tpr = dict() roc_auc = dict() # Compute ROC curve and ROC area for each class for i in range(n_classes): fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_pred[:, i]) roc_auc[i] = auc(fpr[i], tpr[i]) # Plot ROC curves plt.figure(figsize=(15, 10)) for i in range(n_classes): if i in ACTIVITY_LABELS: # Only plot for recognized activities plt.plot(fpr[i], tpr[i], label=f'{ACTIVITY_LABELS[i]} (AUC = {roc_auc[i]:.2f})') plt.plot([0, 1], [0, 1], 'k--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title(f'{name} - ROC Curves per Activity') plt.legend(loc="center left", bbox_to_anchor=(1, 0.5)) plt.tight_layout() plt.savefig(f'roc_curves_{name.lower().replace(" ", "_")}.png') plt.close() # Main execution if __name__ == "__main__": # Create and compile model input_shape = (X_train.shape[1], X_train.shape[2]) num_classes = y_train.shape[1] model = build_multi_head_lstm_attention(input_shape, num_classes) # Train and evaluate model print("\nTraining and evaluating Multi-head LSTM with Attention...") history, accuracy, f1, precision, recall, y_pred_classes, y_pred = train_and_evaluate_model( model, X_train, y_train, X_test, y_test, "Multi-head LSTM with Attention" ) # Generate visualizations plot_accuracy(history) plot_loss(history) plot_confusion_matrix(np.argmax(y_test, axis=1), y_pred_classes) plot_metrics_bar(accuracy, f1, precision, recall) plot_multi_head_attention(model, X_test) plot_roc_curves(y_test, y_pred) # Save the model model.save('multihead_lstm_attention_pamap2.h5') print("\nAnalysis complete. All plots have been saved.") print(f"Final test accuracy: {accuracy*100:.2f}%") print(f"F1 Score: {f1:.4f}") print(f"Precision: {precision:.4f}") print(f"Recall: {recall:.4f}")