import os import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, roc_curve, auc from tensorflow.keras.models import Model, Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, Attention, Concatenate, TimeDistributed, GlobalAveragePooling1D, Reshape, Multiply from tensorflow.keras.optimizers import Adam from tensorflow.keras.utils import to_categorical import tensorflow as tf # GPU Configuration gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) print("GPU is available") except RuntimeError as e: print(e) else: print("GPU is not available, using CPU") # Paths to the datasets PROTOCOL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Protocol" OPTIONAL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Optional" # Column names based on the readme file COLUMN_NAMES = [ 'timestamp', 'activity_id', 'heart_rate', 'hand_temperature', 'hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3', 'hand_acc6_1', 'hand_acc6_2', 'hand_acc6_3', 'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3', 'hand_magno_1', 'hand_magno_2', 'hand_magno_3', 'hand_ori_1', 'hand_ori_2', 'hand_ori_3', 'hand_ori_4', 'chest_temperature', 'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3', 'chest_acc6_1', 'chest_acc6_2', 'chest_acc6_3', 'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3', 'chest_magno_1', 'chest_magno_2', 'chest_magno_3', 'chest_ori_1', 'chest_ori_2', 'chest_ori_3', 'chest_ori_4', 'ankle_temperature', 'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3', 'ankle_acc6_1', 'ankle_acc6_2', 'ankle_acc6_3', 'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3', 'ankle_magno_1', 'ankle_magno_2', 'ankle_magno_3', 'ankle_ori_1', 'ankle_ori_2', 'ankle_ori_3', 'ankle_ori_4' ] # Activity labels ACTIVITY_LABELS = { 1: 'lying', 2: 'sitting', 3: 'standing', 4: 'walking', 5: 'running', 6: 'cycling', 7: 'Nordic walking', 9: 'watching TV', 10: 'computer work', 11: 'car driving', 12: 'ascending stairs', 13: 'descending stairs', 16: 'vacuum cleaning', 17: 'ironing', 18: 'folding laundry', 19: 'house cleaning', 20: 'playing soccer', 24: 'rope jumping' } def load_data(file_path): """Load data from a single file.""" data = pd.read_csv(file_path, sep=' ', header=None, names=COLUMN_NAMES) return data def load_all_data(base_path): """Load all data files from the given path.""" all_data = [] for file in os.listdir(base_path): if file.endswith(".dat"): file_path = os.path.join(base_path, file) data = load_data(file_path) all_data.append(data) return pd.concat(all_data, ignore_index=True) def preprocess_data(data): """Preprocess the loaded data.""" # Remove rows with activity_id 0 (transient activities) data = data[data['activity_id'] != 0] # Handle missing values (NaN) data = data.dropna() # Select relevant features (using 16g accelerometer data as recommended) features = ['hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3', 'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3', 'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3', 'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3', 'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3', 'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3', 'heart_rate'] X = data[features] y = data['activity_id'] # Normalize features scaler = StandardScaler() X_scaled = scaler.fit_transform(X) return X_scaled, y # Load and preprocess data print("Loading protocol data...") protocol_data = load_all_data(PROTOCOL_PATH) print("Loading optional data...") optional_data = load_all_data(OPTIONAL_PATH) # Combine protocol and optional data all_data = pd.concat([protocol_data, optional_data], ignore_index=True) print("Preprocessing data...") X, y = preprocess_data(all_data) print("Data loading and preprocessing complete.") print(f"Features shape: {X.shape}") print(f"Labels shape: {y.shape}") # Reshape the data for LSTM input (samples, time steps, features) X = X.reshape((X.shape[0], 1, X.shape[1])) # Convert labels to categorical y = to_categorical(y) # Split the data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Model definitions def build_simple_lstm(input_shape, num_classes): model = Sequential([ LSTM(100, input_shape=input_shape), Dropout(0.5), Dense(num_classes, activation='softmax') ]) return model def build_deep_lstm(input_shape, num_classes): model = Sequential([ LSTM(100, input_shape=input_shape, return_sequences=True), Dropout(0.3), LSTM(50), Dropout(0.3), Dense(num_classes, activation='softmax') ]) return model def build_lstm_attention(input_shape, num_classes): inputs = Input(shape=input_shape) lstm = LSTM(100, return_sequences=True)(inputs) lstm = Dropout(0.3)(lstm) attention = Attention()([lstm, lstm]) concat = Concatenate()([lstm, attention]) time_distributed = TimeDistributed(Dense(num_classes))(concat) pooled = GlobalAveragePooling1D()(time_distributed) outputs = Dense(num_classes, activation='softmax')(pooled) model = Model(inputs=inputs, outputs=outputs) return model def build_multi_head_lstm_attention(input_shape, num_classes, num_heads=3): inputs = Input(shape=input_shape) lstm_heads = [LSTM(50, return_sequences=True)(inputs) for _ in range(num_heads)] lstm_heads = [Dropout(0.3)(head) for head in lstm_heads] concat_lstm = Concatenate()(lstm_heads) attention = Attention()([concat_lstm, concat_lstm]) concat = Concatenate()([concat_lstm, attention]) x = LSTM(50)(concat) x = Dropout(0.3)(x) outputs = Dense(num_classes, activation='softmax')(x) model = Model(inputs=inputs, outputs=outputs) return model def squeeze_excite_block(input_tensor, ratio=16): filters = input_tensor.shape[-1] se = GlobalAveragePooling1D()(input_tensor) se = Reshape((1, filters))(se) se = Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se) se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se) se = Multiply()([input_tensor, se]) return se def build_multi_head_lstm_se(input_shape, num_classes, num_heads=3): inputs = Input(shape=input_shape) lstm_heads = [LSTM(50, return_sequences=True)(inputs) for _ in range(num_heads)] lstm_heads = [Dropout(0.3)(head) for head in lstm_heads] lstm_heads = [squeeze_excite_block(head) for head in lstm_heads] concat_lstm = Concatenate()(lstm_heads) time_distributed = TimeDistributed(Dense(num_classes))(concat_lstm) pooled = GlobalAveragePooling1D()(time_distributed) outputs = Dense(num_classes, activation='softmax')(pooled) model = Model(inputs=inputs, outputs=outputs) return model # Function to train and evaluate a model def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, name): optimizer = Adam(learning_rate=0.001) model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=1) test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0) print(f'{name} - Test accuracy: {test_accuracy*100:.2f}%') print(f'{name} - Test loss: {test_loss:.4f}') y_pred = model.predict(X_test) y_pred_classes = np.argmax(y_pred, axis=1) y_true = np.argmax(y_test, axis=1) f1 = f1_score(y_true, y_pred_classes, average='weighted') precision = precision_score(y_true, y_pred_classes, average='weighted') recall = recall_score(y_true, y_pred_classes, average='weighted') print(f'{name} - F1 Score: {f1:.4f}') print(f'{name} - Precision: {precision:.4f}') print(f'{name} - Recall: {recall:.4f}') return history, test_accuracy, f1, precision, recall, y_pred_classes, y_pred # Visualization functions def plot_accuracy_comparison(results): plt.figure(figsize=(12, 8)) for name, result in results.items(): plt.plot(result['history'].history['accuracy'], label=f'{name} - Training') plt.plot(result['history'].history['val_accuracy'], label=f'{name} - Validation') plt.title('Model Accuracy Comparison') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend() plt.show() def plot_loss_comparison(results): plt.figure(figsize=(12, 8)) for name, result in results.items(): plt.plot(result['history'].history['loss'], label=f'{name} - Training') plt.plot(result['history'].history['val_loss'], label=f'{name} - Validation') plt.title('Model Loss Comparison') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend() plt.show() def plot_correlation_matrix(X): corr_matrix = np.corrcoef(X.reshape(X.shape[0], -1).T) plt.figure(figsize=(12, 10)) plt.imshow(corr_matrix, cmap='coolwarm', aspect='auto') plt.colorbar() plt.title('Correlation Matrix of Features') plt.show() def plot_recognition_error(results): plt.figure(figsize=(12, 8)) for name, result in results.items(): plt.plot(1 - np.array(result['history'].history['accuracy']), label=f'{name} - Training Error') plt.plot(1 - np.array(result['history'].history['val_accuracy']), label=f'{name} - Validation Error') plt.title('Recognition Error over Time') plt.xlabel('Epoch') plt.ylabel('Error Rate') plt.legend() plt.show() def plot_f1_precision_recall(results): metrics = ['F1 Score', 'Precision', 'Recall'] plt.figure(figsize=(12, 8)) for i, name in enumerate(results.keys()): values = [results[name]['f1'], results[name]['precision'], results[name]['recall']] plt.bar([x + i*0.25 for x in range(len(metrics))], values, width=0.25, label=name) plt.title('F1 Score, Precision, and Recall') plt.xlabel('Metrics') plt.ylabel('Score') plt.xticks([x + 0.25 for x in range(len(metrics))], metrics) plt.legend() plt.show() def plot_heatmap_metrics(results): metrics = ['Accuracy', 'Precision', 'Recall', 'F1 Score'] model_names = list(results.keys()) data = np.array([[result['accuracy'], result['precision'], result['recall'], result['f1']] for result in results.values()]) fig, ax = plt.subplots(figsize=(12, 8)) im = ax.imshow(data.T, cmap='RdYlBu_r') # Add colorbar cbar = ax.figure.colorbar(im, ax=ax) # Show all ticks and label them ax.set_xticks(np.arange(len(model_names))) ax.set_yticks(np.arange(len(metrics))) ax.set_xticklabels(model_names) ax.set_yticklabels(metrics) # Rotate the tick labels and set their alignment plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor") # Loop over data dimensions and create text annotations for i in range(len(metrics)): for j in range(len(model_names)): text = ax.text(j, i, f"{data.T[i, j]:.2f}", ha="center", va="center", color="black") ax.set_title('Model Performance Metrics Heatmap') fig.tight_layout() plt.show() def plot_box_plot(results): plt.figure(figsize=(12, 8)) data = [result['history'].history['accuracy'] for result in results.values()] plt.boxplot(data, labels=results.keys()) plt.title('Box Plot of Training Accuracies') plt.ylabel('Accuracy') plt.show() def plot_confusion_matrices(results): n_models = len(results) # Plot first 4 confusion matrices (2 per image) for i in range(0, 4, 2): fig, axes = plt.subplots(1, 2, figsize=(20, 10)) for j, ax in enumerate(axes): name = list(results.keys())[i + j] result = results[name] cm = confusion_matrix(np.argmax(y_test, axis=1), result['y_pred_classes']) im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) fig.colorbar(im, ax=ax) # Get the unique activity IDs present in the test set unique_activities = np.unique(np.argmax(y_test, axis=1)) # Create labels for the confusion matrix labels = [ACTIVITY_LABELS.get(act, f'Unknown ({act})') for act in unique_activities] ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]), xticklabels=labels, yticklabels=labels, title=f'{name} - Confusion Matrix', ylabel='True label', xlabel='Predicted label') # Rotate the tick labels and set their alignment plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor") # Loop over data dimensions and create text annotations thresh = cm.max() / 2. for i_cm in range(cm.shape[0]): for j_cm in range(cm.shape[1]): ax.text(j_cm, i_cm, format(cm[i_cm, j_cm], 'd'), ha="center", va="center", color="white" if cm[i_cm, j_cm] > thresh else "black") plt.tight_layout() plt.show() # Plot the last confusion matrix separately fig, ax = plt.subplots(figsize=(10, 10)) name = list(results.keys())[-1] result = results[name] cm = confusion_matrix(np.argmax(y_test, axis=1), result['y_pred_classes']) im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) fig.colorbar(im, ax=ax) # Get the unique activity IDs present in the test set unique_activities = np.unique(np.argmax(y_test, axis=1)) # Create labels for the confusion matrix labels = [ACTIVITY_LABELS.get(act, f'Unknown ({act})') for act in unique_activities] ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]), xticklabels=labels, yticklabels=labels, title=f'{name} - Confusion Matrix', ylabel='True label', xlabel='Predicted label') # Rotate the tick labels and set their alignment plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor") # Loop over data dimensions and create text annotations thresh = cm.max() / 2. for i_cm in range(cm.shape[0]): for j_cm in range(cm.shape[1]): ax.text(j_cm, i_cm, format(cm[i_cm, j_cm], 'd'), ha="center", va="center", color="white" if cm[i_cm, j_cm] > thresh else "black") plt.tight_layout() plt.show() def plot_accuracy_vs_epoch(results): plt.figure(figsize=(12, 8)) for name, result in results.items(): plt.scatter(range(1, len(result['history'].history['accuracy']) + 1), result['history'].history['accuracy'], label=f'{name} - Training') plt.scatter(range(1, len(result['history'].history['val_accuracy']) + 1), result['history'].history['val_accuracy'], label=f'{name} - Validation') plt.title('Accuracy vs Epoch') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() plt.show() # Main execution if __name__ == "__main__": # Create and compile models models = { 'Simple LSTM': build_simple_lstm(X_train.shape[1:], y_train.shape[1]), 'Deep LSTM': build_deep_lstm(X_train.shape[1:], y_train.shape[1]), 'LSTM with Attention': build_lstm_attention(X_train.shape[1:], y_train.shape[1]), 'Multi-head LSTM with Attention': build_multi_head_lstm_attention(X_train.shape[1:], y_train.shape[1]), 'Multi-head LSTM with SE': build_multi_head_lstm_se(X_train.shape[1:], y_train.shape[1]) } results = {} # Train and evaluate models for name, model in models.items(): print(f"\nTraining and evaluating {name}...") history, accuracy, f1, precision, recall, y_pred_classes, y_pred = train_and_evaluate_model(model, X_train, y_train, X_test, y_test, name) results[name] = { 'history': history, 'accuracy': accuracy, 'f1': f1, 'precision': precision, 'recall': recall, 'y_pred_classes': y_pred_classes, 'y_pred': y_pred } # Generate visualizations plot_accuracy_comparison(results) plot_loss_comparison(results) plot_correlation_matrix(X) plot_recognition_error(results) plot_f1_precision_recall(results) plot_heatmap_metrics(results) plot_box_plot(results) plot_confusion_matrices(results) plot_accuracy_vs_epoch(results) print("Analysis complete. All plots have been displayed.")