import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, roc_curve, auc
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, Attention, Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import tensorflow as tf
# GPU Configuration
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print("GPU is available")
except RuntimeError as e:
print(e)
else:
print("GPU is not available, using CPU")
# Paths to the datasets
PROTOCOL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Protocol"
OPTIONAL_PATH = r"C:\Users\LENOVO LEGION\Downloads\pamap2_extracted\PAMAP2_Dataset\Optional"
# Column names based on the readme file
COLUMN_NAMES = [
'timestamp', 'activity_id', 'heart_rate',
'hand_temperature', 'hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3',
'hand_acc6_1', 'hand_acc6_2', 'hand_acc6_3',
'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3',
'hand_magno_1', 'hand_magno_2', 'hand_magno_3',
'hand_ori_1', 'hand_ori_2', 'hand_ori_3', 'hand_ori_4',
'chest_temperature', 'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3',
'chest_acc6_1', 'chest_acc6_2', 'chest_acc6_3',
'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3',
'chest_magno_1', 'chest_magno_2', 'chest_magno_3',
'chest_ori_1', 'chest_ori_2', 'chest_ori_3', 'chest_ori_4',
'ankle_temperature', 'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3',
'ankle_acc6_1', 'ankle_acc6_2', 'ankle_acc6_3',
'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3',
'ankle_magno_1', 'ankle_magno_2', 'ankle_magno_3',
'ankle_ori_1', 'ankle_ori_2', 'ankle_ori_3', 'ankle_ori_4'
]
# Activity labels
ACTIVITY_LABELS = {
1: 'lying',
2: 'sitting',
3: 'standing',
4: 'walking',
5: 'running',
6: 'cycling',
7: 'Nordic walking',
9: 'watching TV',
10: 'computer work',
11: 'car driving',
12: 'ascending stairs',
13: 'descending stairs',
16: 'vacuum cleaning',
17: 'ironing',
18: 'folding laundry',
19: 'house cleaning',
20: 'playing soccer',
24: 'rope jumping'
}
def load_data(file_path):
"""Load data from a single file."""
data = pd.read_csv(file_path, sep=' ', header=None, names=COLUMN_NAMES)
return data
def load_all_data(base_path):
"""Load all data files from the given path."""
all_data = []
for file in os.listdir(base_path):
if file.endswith(".dat"):
file_path = os.path.join(base_path, file)
data = load_data(file_path)
all_data.append(data)
return pd.concat(all_data, ignore_index=True)
def preprocess_data(data):
"""Preprocess the loaded data."""
# Remove rows with activity_id 0 (transient activities)
data = data[data['activity_id'] != 0]
# Handle missing values (NaN)
data = data.dropna()
# Select relevant features (using 16g accelerometer data as recommended)
features = ['hand_acc16_1', 'hand_acc16_2', 'hand_acc16_3',
'hand_gyro_1', 'hand_gyro_2', 'hand_gyro_3',
'chest_acc16_1', 'chest_acc16_2', 'chest_acc16_3',
'chest_gyro_1', 'chest_gyro_2', 'chest_gyro_3',
'ankle_acc16_1', 'ankle_acc16_2', 'ankle_acc16_3',
'ankle_gyro_1', 'ankle_gyro_2', 'ankle_gyro_3',
'heart_rate']
X = data[features]
y = data['activity_id']
# Normalize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
return X_scaled, y
# Load and preprocess data
print("Loading protocol data...")
protocol_data = load_all_data(PROTOCOL_PATH)
print("Loading optional data...")
optional_data = load_all_data(OPTIONAL_PATH)
# Combine protocol and optional data
all_data = pd.concat([protocol_data, optional_data], ignore_index=True)
print("Preprocessing data...")
X, y = preprocess_data(all_data)
print("Data loading and preprocessing complete.")
print(f"Features shape: {X.shape}")
print(f"Labels shape: {y.shape}")
# Reshape the data for LSTM input (samples, time steps, features)
X = X.reshape((X.shape[0], 1, X.shape[1]))
# Convert labels to categorical
y = to_categorical(y)
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Multi-head LSTM with Attention Model definition
def build_multi_head_lstm_attention(input_shape, num_classes, num_heads=3):
# Input layer
inputs = Input(shape=input_shape)
# Create multiple LSTM heads
lstm_heads = [LSTM(50, return_sequences=True)(inputs) for _ in range(num_heads)]
lstm_heads = [Dropout(0.3)(head) for head in lstm_heads]
# Concatenate LSTM outputs
concat_lstm = Concatenate()(lstm_heads)
# Apply attention mechanism
attention = Attention()([concat_lstm, concat_lstm])
# Combine LSTM outputs with attention context
concat = Concatenate()([concat_lstm, attention])
# Final LSTM layer to process combined features
x = LSTM(50)(concat)
x = Dropout(0.3)(x)
# Output layer
outputs = Dense(num_classes, activation='softmax')(x)
# Create model
model = Model(inputs=inputs, outputs=outputs)
return model
# Function to train and evaluate the model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, name="Multi-head LSTM with Attention"):
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=1)
test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
print(f'{name} - Test accuracy: {test_accuracy*100:.2f}%')
print(f'{name} - Test loss: {test_loss:.4f}')
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)
f1 = f1_score(y_true, y_pred_classes, average='weighted')
precision = precision_score(y_true, y_pred_classes, average='weighted')
recall = recall_score(y_true, y_pred_classes, average='weighted')
print(f'{name} - F1 Score: {f1:.4f}')
print(f'{name} - Precision: {precision:.4f}')
print(f'{name} - Recall: {recall:.4f}')
return history, test_accuracy, f1, precision, recall, y_pred_classes, y_pred
def plot_accuracy(history, name="Multi-head LSTM with Attention"):
plt.figure(figsize=(12, 8))
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title(f'{name} - Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.savefig(f'accuracy_{name.lower().replace(" ", "_")}.png')
plt.close()
def plot_loss(history, name="Multi-head LSTM with Attention"):
plt.figure(figsize=(12, 8))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title(f'{name} - Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.savefig(f'loss_{name.lower().replace(" ", "_")}.png')
plt.close()
def plot_confusion_matrix(y_true, y_pred, name="Multi-head LSTM with Attention"):
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(15, 15))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title(f'{name} - Confusion Matrix')
plt.colorbar()
# Get the unique activity IDs present in the test set
unique_activities = np.unique(y_true)
# Create labels for the confusion matrix
labels = [ACTIVITY_LABELS.get(act, f'Unknown ({act})') for act in unique_activities]
plt.xticks(np.arange(len(labels)), labels, rotation=45, ha='right')
plt.yticks(np.arange(len(labels)), labels)
# Add text annotations
thresh = cm.max() / 2.
for i, j in np.ndindex(cm.shape):
plt.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()
plt.savefig(f'confusion_matrix_{name.lower().replace(" ", "_")}.png')
plt.close()
def plot_metrics_bar(accuracy, f1, precision, recall, name="Multi-head LSTM with Attention"):
metrics = ['Accuracy', 'F1 Score', 'Precision', 'Recall']
values = [accuracy, f1, precision, recall]
plt.figure(figsize=(10, 6))
plt.bar(metrics, values)
plt.title(f'{name} - Performance Metrics')
plt.ylim(0, 1)
# Add value labels on top of each bar
for i, v in enumerate(values):
plt.text(i, v + 0.01, f'{v:.4f}', ha='center')
plt.tight_layout()
plt.savefig(f'metrics_{name.lower().replace(" ", "_")}.png')
plt.close()
def plot_multi_head_attention(model, X_test, name="Multi-head LSTM with Attention"):
# Get attention layer
attention_layer = [layer for layer in model.layers if isinstance(layer, Attention)][0]
attention_func = tf.keras.backend.function([model.input], [attention_layer.output])
# Get attention weights for a few test samples
sample_size = min(5, len(X_test))
attention_weights = attention_func([X_test[:sample_size]])[0]
# Plot attention weights for each head
num_heads = 3 # Number of attention heads used in the model
fig, axes = plt.subplots(1, num_heads, figsize=(20, 6))
fig.suptitle(f'{name} - Multi-head Attention Weights')
for i in range(num_heads):
head_weights = attention_weights[:, :, i*50:(i+1)*50].mean(axis=2)
im = axes[i].imshow(head_weights.T, aspect='auto', cmap='viridis')
axes[i].set_title(f'Head {i+1}')
axes[i].set_xlabel('Sample')
axes[i].set_ylabel('Time Step')
fig.colorbar(im, ax=axes[i])
plt.tight_layout()
plt.savefig(f'attention_weights_{name.lower().replace(" ", "_")}.png')
plt.close()
def plot_roc_curves(y_test, y_pred, name="Multi-head LSTM with Attention"):
n_classes = y_test.shape[1]
fpr = dict()
tpr = dict()
roc_auc = dict()
# Compute ROC curve and ROC area for each class
for i in range(n_classes):
fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_pred[:, i])
roc_auc[i] = auc(fpr[i], tpr[i])
# Plot ROC curves
plt.figure(figsize=(15, 10))
for i in range(n_classes):
if i in ACTIVITY_LABELS: # Only plot for recognized activities
plt.plot(fpr[i], tpr[i],
label=f'{ACTIVITY_LABELS[i]} (AUC = {roc_auc[i]:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(f'{name} - ROC Curves per Activity')
plt.legend(loc="center left", bbox_to_anchor=(1, 0.5))
plt.tight_layout()
plt.savefig(f'roc_curves_{name.lower().replace(" ", "_")}.png')
plt.close()
# Main execution
if __name__ == "__main__":
# Create and compile model
input_shape = (X_train.shape[1], X_train.shape[2])
num_classes = y_train.shape[1]
model = build_multi_head_lstm_attention(input_shape, num_classes)
# Train and evaluate model
print("\nTraining and evaluating Multi-head LSTM with Attention...")
history, accuracy, f1, precision, recall, y_pred_classes, y_pred = train_and_evaluate_model(
model, X_train, y_train, X_test, y_test, "Multi-head LSTM with Attention"
)
# Generate visualizations
plot_accuracy(history)
plot_loss(history)
plot_confusion_matrix(np.argmax(y_test, axis=1), y_pred_classes)
plot_metrics_bar(accuracy, f1, precision, recall)
plot_multi_head_attention(model, X_test)
plot_roc_curves(y_test, y_pred)
# Save the model
model.save('multihead_lstm_attention_pamap2.h5')
print("\nAnalysis complete. All plots have been saved.")
print(f"Final test accuracy: {accuracy*100:.2f}%")
print(f"F1 Score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")