import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score from tensorflow.keras.models import Model, Sequential from tensorflow.keras.layers import (LSTM, Dense, Dropout, Input, LayerNormalization, Concatenate, GlobalAveragePooling1D, multiply, Layer, Reshape) from tensorflow.keras.utils import to_categorical from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping # Suppress TensorFlow GPU warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Custom Squeeze-and-Excitation Layer class SqueezeExciteBlock(Layer): def __init__(self, ratio=16, **kwargs): super(SqueezeExciteBlock, self).__init__(**kwargs) self.ratio = ratio def build(self, input_shape): self.global_avg_pool = GlobalAveragePooling1D() self.dense1 = Dense(input_shape[-1] // self.ratio, activation='relu', kernel_initializer='he_normal', use_bias=False) self.dense2 = Dense(input_shape[-1], activation='sigmoid', kernel_initializer='he_normal', use_bias=False) super(SqueezeExciteBlock, self).build(input_shape) def call(self, inputs): x = self.global_avg_pool(inputs) x = Reshape((1, inputs.shape[-1]))(x) x = self.dense1(x) x = self.dense2(x) x = multiply([inputs, x]) return x def get_config(self): config = super(SqueezeExciteBlock, self).get_config() config.update({"ratio": self.ratio}) return config # Paths to the dataset files dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset' # Load and preprocess data def load_data(): # Load the feature labels features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'), delim_whitespace=True, header=None, names=['index', 'feature']) # Load the activity labels activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'), delim_whitespace=True, header=None, names=['index', 'activity']) # Load training data X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'), delim_whitespace=True, header=None) y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'), delim_whitespace=True, header=None, names=['activity']) # Load test data X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'), delim_whitespace=True, header=None) y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'), delim_whitespace=True, header=None, names=['activity']) # Label the columns X_train.columns = features['feature'] X_test.columns = features['feature'] # Normalize the feature data scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Reshape the data for LSTM (samples, time steps, features) X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1])) X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1])) # Encode activity labels encoder = LabelEncoder() y_train_encoded = encoder.fit_transform(y_train['activity']) y_test_encoded = encoder.transform(y_test['activity']) # Convert to categorical y_train_categorical = to_categorical(y_train_encoded) y_test_categorical = to_categorical(y_test_encoded) return (X_train_reshaped, y_train_categorical, X_test_reshaped, y_test_categorical, activity_labels, X_train_scaled) # Visualization functions def plot_training_history(history, title="Model Training History"): fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5)) # Accuracy plot ax1.plot(history.history['accuracy'], label='Training Accuracy') ax1.plot(history.history['val_accuracy'], label='Validation Accuracy') ax1.set_title(f'{title} - Accuracy') ax1.set_xlabel('Epoch') ax1.set_ylabel('Accuracy') ax1.legend() # Loss plot ax2.plot(history.history['loss'], label='Training Loss') ax2.plot(history.history['val_loss'], label='Validation Loss') ax2.set_title(f'{title} - Loss') ax2.set_xlabel('Epoch') ax2.set_ylabel('Loss') ax2.legend() plt.tight_layout() plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png') plt.close() def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): cm = confusion_matrix(y_true, y_pred) plt.figure(figsize=(12, 10)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') plt.title(title) plt.ylabel('True label') plt.xlabel('Predicted label') # Set x and y axis labels using activity names tick_positions = np.arange(len(activity_labels)) + 0.5 plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right') plt.yticks(tick_positions, activity_labels['activity'], rotation=0) plt.tight_layout() plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png') plt.close() def plot_se_activations(model, X_test, title="SE Activations"): # Get SE layers se_layers = [layer for layer in model.layers if isinstance(layer, SqueezeExciteBlock)] # Create models to get SE weights se_models = [ Model(inputs=model.input, outputs=layer.output) for layer in se_layers ] # Get activations for a sample sample_idx = 0 se_weights = [ model.predict(X_test[sample_idx:sample_idx+1]) for model in se_models ] # Plot SE weights for each head num_heads = len(se_layers) fig, axes = plt.subplots(1, num_heads, figsize=(20, 5)) for i in range(num_heads): im = axes[i].imshow(se_weights[i][0].T, aspect='auto', cmap='viridis') axes[i].set_title(f'Head {i+1} SE Weights') axes[i].set_xlabel('Time Step') axes[i].set_ylabel('Channel') plt.colorbar(im, ax=axes[i]) plt.suptitle(f'{title} - SE Weight Distribution') plt.tight_layout() plt.savefig(f'se_activations_{title.lower().replace(" ", "_")}.png') plt.close() # Load data print("Loading and preprocessing data...") X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data() print("Data loading completed.") print(f"Training data shape: {X_train.shape}") print(f"Test data shape: {X_test.shape}") print(f"Number of activities: {len(activity_labels)}") # Multi-head LSTM with SE Model definition def multi_head_lstm_se(input_shape, num_classes, num_heads=3): # Input layer inputs = Input(shape=input_shape) # Create multiple LSTM heads with SE blocks lstm_outputs = [] for _ in range(num_heads): # LSTM layer for each head lstm = LSTM(100, return_sequences=True)(inputs) lstm = Dropout(0.5)(lstm) # Apply SE block to each head se = SqueezeExciteBlock()(lstm) lstm_outputs.append(se) # Concatenate all heads x = Concatenate()(lstm_outputs) # Global pooling x = GlobalAveragePooling1D()(x) # Dense layers x = Dense(100, activation='relu')(x) x = Dropout(0.5)(x) # Output layer outputs = Dense(num_classes, activation='softmax')(x) # Create model model = Model(inputs=inputs, outputs=outputs) return model # Function to train and evaluate model def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="Multi-head LSTM with SE"): # Compile model model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) # Define callbacks reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001) early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) # Train model print(f"\nTraining {model_name}...") history = model.fit(X_train, y_train, epochs=20, batch_size=64, validation_split=0.2, verbose=1, callbacks=[reduce_lr, early_stopping]) # Evaluate model print(f"\nEvaluating {model_name}...") loss, accuracy = model.evaluate(X_test, y_test, verbose=0) # Make predictions y_pred = model.predict(X_test) y_pred_classes = np.argmax(y_pred, axis=1) y_true = np.argmax(y_test, axis=1) # Calculate metrics precision = precision_score(y_true, y_pred_classes, average='weighted') recall = recall_score(y_true, y_pred_classes, average='weighted') f1 = f1_score(y_true, y_pred_classes, average='weighted') # Print results print(f'\n{model_name} Results:') print(f'Test Accuracy: {accuracy*100:.2f}%') print(f'Test Loss: {loss:.4f}') print(f'Precision: {precision:.4f}') print(f'Recall: {recall:.4f}') print(f'F1 Score: {f1:.4f}') return history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred def plot_channel_excitation(model, X_test, num_heads, title="Channel Excitation Analysis"): # Get SE layers se_layers = [layer for layer in model.layers if isinstance(layer, SqueezeExciteBlock)] # Create models for each SE block se_models = [] for i in range(num_heads): se_output = se_layers[i].dense2.output se_model = Model(inputs=model.input, outputs=se_output) se_models.append(se_model) # Analyze channel excitation per activity y_true = np.argmax(y_test, axis=1) activity_channel_excitation = [] plt.figure(figsize=(15, 10)) for i, activity in enumerate(activity_labels['activity']): activity_mask = (y_true == i) activity_samples = X_test[activity_mask] if len(activity_samples) > 0: # Get average excitation for each head head_excitations = [] for h in range(num_heads): excitation = se_models[h].predict(activity_samples[:5]) # Use first 5 samples head_excitations.append(np.mean(excitation, axis=0)) # Plot excitation pattern for this activity plt.subplot(len(activity_labels), 1, i+1) for h in range(num_heads): plt.plot(head_excitations[h].flatten(), label=f'Head {h+1}', alpha=0.7) plt.title(f'Channel Excitation Pattern - {activity}') plt.xlabel('Channel') plt.ylabel('Excitation') if i == 0: # Only show legend for first subplot plt.legend() plt.tight_layout() plt.savefig(f'channel_excitation_{title.lower().replace(" ", "_")}.png') plt.close() def save_detailed_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads): with open(f'{model_name.lower().replace(" ", "_")}_detailed_results.txt', 'w') as f: # Model architecture f.write(f"{model_name} Configuration\n") f.write("="*50 + "\n") f.write(f"Number of heads: {num_heads}\n") f.write(f"SE ratio: 16\n\n") # Performance metrics f.write("Performance Metrics\n") f.write("-"*50 + "\n") f.write(f"Test Accuracy: {accuracy*100:.2f}%\n") f.write(f"Test Loss: {model.evaluate(X_test, y_test, verbose=0)[0]:.4f}\n") f.write(f"Precision: {precision:.4f}\n") f.write(f"Recall: {recall:.4f}\n") f.write(f"F1 Score: {f1:.4f}\n\n") # Per-class metrics f.write("Per-class Performance\n") f.write("-"*50 + "\n") for i, activity in enumerate(activity_labels['activity']): true_class = (y_true == i) pred_class = (y_pred_classes == i) class_precision = precision_score(true_class, pred_class, average='binary') class_recall = recall_score(true_class, pred_class, average='binary') class_f1 = f1_score(true_class, pred_class, average='binary') f.write(f"\n{activity}:\n") f.write(f"Precision: {class_precision:.4f}\n") f.write(f"Recall: {class_recall:.4f}\n") f.write(f"F1 Score: {class_f1:.4f}\n") # Training history f.write("\nTraining History\n") f.write("-"*50 + "\n") f.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n") for i in range(len(history.history['loss'])): f.write(f"{i+1}\t{history.history['loss'][i]:.4f}\t") f.write(f"{history.history['accuracy'][i]:.4f}\t") f.write(f"{history.history['val_loss'][i]:.4f}\t") f.write(f"{history.history['val_accuracy'][i]:.4f}\n") # Main execution if __name__ == "__main__": print("UCI HAR Dataset - Multi-head LSTM with SE Implementation") print("="*50) # Model parameters num_heads = 3 # Create and train model input_shape = (X_train.shape[1], X_train.shape[2]) num_classes = y_train.shape[1] model = multi_head_lstm_se(input_shape, num_classes, num_heads) # Print model summary print("\nModel Architecture:") model.summary() # Train and evaluate results = train_and_evaluate_model( model, X_train, y_train, X_test, y_test, "Multi-head LSTM with SE" ) history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred = results # Generate visualizations plot_training_history(history, "Multi-head LSTM with SE") plot_confusion_matrix(y_true, y_pred_classes, "Multi-head LSTM with SE") plot_se_activations(model, X_test, "Multi-head LSTM with SE") plot_channel_excitation(model, X_test, num_heads, "Multi-head LSTM with SE") # Save detailed results save_detailed_results("Multi-head LSTM with SE", history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads) # Save model model.save('multihead_lstm_se_uci_har.h5') print("\nAnalysis complete. All results and visualizations have been saved.") print(f"\nModel saved as 'multihead_lstm_se_uci_har.h5'") print(f"Detailed results saved as 'multihead_lstm_se_detailed_results.txt'") # Print per-activity performance summary print("\nPer-activity Performance Summary:") for i, activity in enumerate(activity_labels['activity']): true_class = (y_true == i) pred_class = (y_pred_classes == i) class_f1 = f1_score(true_class, pred_class, average='binary') print(f"{activity:20} F1-Score: {class_f1:.4f}")