import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (LSTM, Dense, Dropout, Input, LayerNormalization,
Concatenate, GlobalAveragePooling1D, multiply, Layer, Reshape)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
# Suppress TensorFlow GPU warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# Custom Squeeze-and-Excitation Layer
class SqueezeExciteBlock(Layer):
def __init__(self, ratio=16, **kwargs):
super(SqueezeExciteBlock, self).__init__(**kwargs)
self.ratio = ratio
def build(self, input_shape):
self.global_avg_pool = GlobalAveragePooling1D()
self.dense1 = Dense(input_shape[-1] // self.ratio,
activation='relu',
kernel_initializer='he_normal',
use_bias=False)
self.dense2 = Dense(input_shape[-1],
activation='sigmoid',
kernel_initializer='he_normal',
use_bias=False)
super(SqueezeExciteBlock, self).build(input_shape)
def call(self, inputs):
x = self.global_avg_pool(inputs)
x = Reshape((1, inputs.shape[-1]))(x)
x = self.dense1(x)
x = self.dense2(x)
x = multiply([inputs, x])
return x
def get_config(self):
config = super(SqueezeExciteBlock, self).get_config()
config.update({"ratio": self.ratio})
return config
# Paths to the dataset files
dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset'
# Load and preprocess data
def load_data():
# Load the feature labels
features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'),
delim_whitespace=True,
header=None,
names=['index', 'feature'])
# Load the activity labels
activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'),
delim_whitespace=True,
header=None,
names=['index', 'activity'])
# Load training data
X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'),
delim_whitespace=True,
header=None)
y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'),
delim_whitespace=True,
header=None,
names=['activity'])
# Load test data
X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'),
delim_whitespace=True,
header=None)
y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'),
delim_whitespace=True,
header=None,
names=['activity'])
# Label the columns
X_train.columns = features['feature']
X_test.columns = features['feature']
# Normalize the feature data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Reshape the data for LSTM (samples, time steps, features)
X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))
# Encode activity labels
encoder = LabelEncoder()
y_train_encoded = encoder.fit_transform(y_train['activity'])
y_test_encoded = encoder.transform(y_test['activity'])
# Convert to categorical
y_train_categorical = to_categorical(y_train_encoded)
y_test_categorical = to_categorical(y_test_encoded)
return (X_train_reshaped, y_train_categorical,
X_test_reshaped, y_test_categorical,
activity_labels, X_train_scaled)
# Visualization functions
def plot_training_history(history, title="Model Training History"):
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
# Accuracy plot
ax1.plot(history.history['accuracy'], label='Training Accuracy')
ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
ax1.set_title(f'{title} - Accuracy')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.legend()
# Loss plot
ax2.plot(history.history['loss'], label='Training Loss')
ax2.plot(history.history['val_loss'], label='Validation Loss')
ax2.set_title(f'{title} - Loss')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.legend()
plt.tight_layout()
plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png')
plt.close()
def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title(title)
plt.ylabel('True label')
plt.xlabel('Predicted label')
# Set x and y axis labels using activity names
tick_positions = np.arange(len(activity_labels)) + 0.5
plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right')
plt.yticks(tick_positions, activity_labels['activity'], rotation=0)
plt.tight_layout()
plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png')
plt.close()
def plot_se_activations(model, X_test, title="SE Activations"):
# Get SE layers
se_layers = [layer for layer in model.layers if isinstance(layer, SqueezeExciteBlock)]
# Create models to get SE weights
se_models = [
Model(inputs=model.input, outputs=layer.output)
for layer in se_layers
]
# Get activations for a sample
sample_idx = 0
se_weights = [
model.predict(X_test[sample_idx:sample_idx+1])
for model in se_models
]
# Plot SE weights for each head
num_heads = len(se_layers)
fig, axes = plt.subplots(1, num_heads, figsize=(20, 5))
for i in range(num_heads):
im = axes[i].imshow(se_weights[i][0].T, aspect='auto', cmap='viridis')
axes[i].set_title(f'Head {i+1} SE Weights')
axes[i].set_xlabel('Time Step')
axes[i].set_ylabel('Channel')
plt.colorbar(im, ax=axes[i])
plt.suptitle(f'{title} - SE Weight Distribution')
plt.tight_layout()
plt.savefig(f'se_activations_{title.lower().replace(" ", "_")}.png')
plt.close()
# Load data
print("Loading and preprocessing data...")
X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data()
print("Data loading completed.")
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Number of activities: {len(activity_labels)}")
# Multi-head LSTM with SE Model definition
def multi_head_lstm_se(input_shape, num_classes, num_heads=3):
# Input layer
inputs = Input(shape=input_shape)
# Create multiple LSTM heads with SE blocks
lstm_outputs = []
for _ in range(num_heads):
# LSTM layer for each head
lstm = LSTM(100, return_sequences=True)(inputs)
lstm = Dropout(0.5)(lstm)
# Apply SE block to each head
se = SqueezeExciteBlock()(lstm)
lstm_outputs.append(se)
# Concatenate all heads
x = Concatenate()(lstm_outputs)
# Global pooling
x = GlobalAveragePooling1D()(x)
# Dense layers
x = Dense(100, activation='relu')(x)
x = Dropout(0.5)(x)
# Output layer
outputs = Dense(num_classes, activation='softmax')(x)
# Create model
model = Model(inputs=inputs, outputs=outputs)
return model
# Function to train and evaluate model
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="Multi-head LSTM with SE"):
# Compile model
model.compile(optimizer=Adam(learning_rate=0.001),
loss='categorical_crossentropy',
metrics=['accuracy'])
# Define callbacks
reduce_lr = ReduceLROnPlateau(monitor='val_loss',
factor=0.2,
patience=5,
min_lr=0.00001)
early_stopping = EarlyStopping(monitor='val_loss',
patience=10,
restore_best_weights=True)
# Train model
print(f"\nTraining {model_name}...")
history = model.fit(X_train, y_train,
epochs=20,
batch_size=64,
validation_split=0.2,
verbose=1,
callbacks=[reduce_lr, early_stopping])
# Evaluate model
print(f"\nEvaluating {model_name}...")
loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
# Make predictions
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)
# Calculate metrics
precision = precision_score(y_true, y_pred_classes, average='weighted')
recall = recall_score(y_true, y_pred_classes, average='weighted')
f1 = f1_score(y_true, y_pred_classes, average='weighted')
# Print results
print(f'\n{model_name} Results:')
print(f'Test Accuracy: {accuracy*100:.2f}%')
print(f'Test Loss: {loss:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')
return history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred
def plot_channel_excitation(model, X_test, num_heads, title="Channel Excitation Analysis"):
# Get SE layers
se_layers = [layer for layer in model.layers if isinstance(layer, SqueezeExciteBlock)]
# Create models for each SE block
se_models = []
for i in range(num_heads):
se_output = se_layers[i].dense2.output
se_model = Model(inputs=model.input, outputs=se_output)
se_models.append(se_model)
# Analyze channel excitation per activity
y_true = np.argmax(y_test, axis=1)
activity_channel_excitation = []
plt.figure(figsize=(15, 10))
for i, activity in enumerate(activity_labels['activity']):
activity_mask = (y_true == i)
activity_samples = X_test[activity_mask]
if len(activity_samples) > 0:
# Get average excitation for each head
head_excitations = []
for h in range(num_heads):
excitation = se_models[h].predict(activity_samples[:5]) # Use first 5 samples
head_excitations.append(np.mean(excitation, axis=0))
# Plot excitation pattern for this activity
plt.subplot(len(activity_labels), 1, i+1)
for h in range(num_heads):
plt.plot(head_excitations[h].flatten(),
label=f'Head {h+1}',
alpha=0.7)
plt.title(f'Channel Excitation Pattern - {activity}')
plt.xlabel('Channel')
plt.ylabel('Excitation')
if i == 0: # Only show legend for first subplot
plt.legend()
plt.tight_layout()
plt.savefig(f'channel_excitation_{title.lower().replace(" ", "_")}.png')
plt.close()
def save_detailed_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads):
with open(f'{model_name.lower().replace(" ", "_")}_detailed_results.txt', 'w') as f:
# Model architecture
f.write(f"{model_name} Configuration\n")
f.write("="*50 + "\n")
f.write(f"Number of heads: {num_heads}\n")
f.write(f"SE ratio: 16\n\n")
# Performance metrics
f.write("Performance Metrics\n")
f.write("-"*50 + "\n")
f.write(f"Test Accuracy: {accuracy*100:.2f}%\n")
f.write(f"Test Loss: {model.evaluate(X_test, y_test, verbose=0)[0]:.4f}\n")
f.write(f"Precision: {precision:.4f}\n")
f.write(f"Recall: {recall:.4f}\n")
f.write(f"F1 Score: {f1:.4f}\n\n")
# Per-class metrics
f.write("Per-class Performance\n")
f.write("-"*50 + "\n")
for i, activity in enumerate(activity_labels['activity']):
true_class = (y_true == i)
pred_class = (y_pred_classes == i)
class_precision = precision_score(true_class, pred_class, average='binary')
class_recall = recall_score(true_class, pred_class, average='binary')
class_f1 = f1_score(true_class, pred_class, average='binary')
f.write(f"\n{activity}:\n")
f.write(f"Precision: {class_precision:.4f}\n")
f.write(f"Recall: {class_recall:.4f}\n")
f.write(f"F1 Score: {class_f1:.4f}\n")
# Training history
f.write("\nTraining History\n")
f.write("-"*50 + "\n")
f.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n")
for i in range(len(history.history['loss'])):
f.write(f"{i+1}\t{history.history['loss'][i]:.4f}\t")
f.write(f"{history.history['accuracy'][i]:.4f}\t")
f.write(f"{history.history['val_loss'][i]:.4f}\t")
f.write(f"{history.history['val_accuracy'][i]:.4f}\n")
# Main execution
if __name__ == "__main__":
print("UCI HAR Dataset - Multi-head LSTM with SE Implementation")
print("="*50)
# Model parameters
num_heads = 3
# Create and train model
input_shape = (X_train.shape[1], X_train.shape[2])
num_classes = y_train.shape[1]
model = multi_head_lstm_se(input_shape, num_classes, num_heads)
# Print model summary
print("\nModel Architecture:")
model.summary()
# Train and evaluate
results = train_and_evaluate_model(
model, X_train, y_train, X_test, y_test, "Multi-head LSTM with SE"
)
history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred = results
# Generate visualizations
plot_training_history(history, "Multi-head LSTM with SE")
plot_confusion_matrix(y_true, y_pred_classes, "Multi-head LSTM with SE")
plot_se_activations(model, X_test, "Multi-head LSTM with SE")
plot_channel_excitation(model, X_test, num_heads, "Multi-head LSTM with SE")
# Save detailed results
save_detailed_results("Multi-head LSTM with SE", history, accuracy,
precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads)
# Save model
model.save('multihead_lstm_se_uci_har.h5')
print("\nAnalysis complete. All results and visualizations have been saved.")
print(f"\nModel saved as 'multihead_lstm_se_uci_har.h5'")
print(f"Detailed results saved as 'multihead_lstm_se_detailed_results.txt'")
# Print per-activity performance summary
print("\nPer-activity Performance Summary:")
for i, activity in enumerate(activity_labels['activity']):
true_class = (y_true == i)
pred_class = (y_pred_classes == i)
class_f1 = f1_score(true_class, pred_class, average='binary')
print(f"{activity:20} F1-Score: {class_f1:.4f}")