import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score from tensorflow.keras.models import Model, Sequential from tensorflow.keras.layers import (LSTM, Dense, Dropout, Input, Attention, LayerNormalization, Concatenate, GlobalAveragePooling1D) from tensorflow.keras.utils import to_categorical from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping # Suppress TensorFlow GPU warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Paths to the dataset files dataset_dir_path = r'C:\Users\LENOVO LEGION\Downloads\human+activity+recognition+using+smartphones\UCI HAR Dataset\UCI HAR Dataset' # Load and preprocess data def load_data(): # Load the feature labels features = pd.read_csv(os.path.join(dataset_dir_path, 'features.txt'), delim_whitespace=True, header=None, names=['index', 'feature']) # Load the activity labels activity_labels = pd.read_csv(os.path.join(dataset_dir_path, 'activity_labels.txt'), delim_whitespace=True, header=None, names=['index', 'activity']) # Load training data X_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'X_train.txt'), delim_whitespace=True, header=None) y_train = pd.read_csv(os.path.join(dataset_dir_path, 'train', 'y_train.txt'), delim_whitespace=True, header=None, names=['activity']) # Load test data X_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'X_test.txt'), delim_whitespace=True, header=None) y_test = pd.read_csv(os.path.join(dataset_dir_path, 'test', 'y_test.txt'), delim_whitespace=True, header=None, names=['activity']) # Label the columns X_train.columns = features['feature'] X_test.columns = features['feature'] # Normalize the feature data scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Reshape the data for LSTM (samples, time steps, features) X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1])) X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1])) # Encode activity labels encoder = LabelEncoder() y_train_encoded = encoder.fit_transform(y_train['activity']) y_test_encoded = encoder.transform(y_test['activity']) # Convert to categorical y_train_categorical = to_categorical(y_train_encoded) y_test_categorical = to_categorical(y_test_encoded) return (X_train_reshaped, y_train_categorical, X_test_reshaped, y_test_categorical, activity_labels, X_train_scaled) # Visualization functions def plot_training_history(history, title="Model Training History"): fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5)) # Accuracy plot ax1.plot(history.history['accuracy'], label='Training Accuracy') ax1.plot(history.history['val_accuracy'], label='Validation Accuracy') ax1.set_title(f'{title} - Accuracy') ax1.set_xlabel('Epoch') ax1.set_ylabel('Accuracy') ax1.legend() # Loss plot ax2.plot(history.history['loss'], label='Training Loss') ax2.plot(history.history['val_loss'], label='Validation Loss') ax2.set_title(f'{title} - Loss') ax2.set_xlabel('Epoch') ax2.set_ylabel('Loss') ax2.legend() plt.tight_layout() plt.savefig(f'training_history_{title.lower().replace(" ", "_")}.png') plt.close() def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): cm = confusion_matrix(y_true, y_pred) plt.figure(figsize=(12, 10)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') plt.title(title) plt.ylabel('True label') plt.xlabel('Predicted label') # Set x and y axis labels using activity names tick_positions = np.arange(len(activity_labels)) + 0.5 plt.xticks(tick_positions, activity_labels['activity'], rotation=45, ha='right') plt.yticks(tick_positions, activity_labels['activity'], rotation=0) plt.tight_layout() plt.savefig(f'confusion_matrix_{title.lower().replace(" ", "_")}.png') plt.close() def plot_multi_head_attention(model, X_test, num_heads, title="Multi-head Attention"): # Get attention layers attention_layers = [layer for layer in model.layers if isinstance(layer, Attention)] # Create models to get attention weights for each head attention_models = [ Model(inputs=model.input, outputs=layer.output) for layer in attention_layers ] # Get attention weights for a sample sample_idx = 0 attention_weights = [ model.predict(X_test[sample_idx:sample_idx+1]) for model in attention_models ] # Plot attention weights for each head fig, axes = plt.subplots(1, num_heads, figsize=(20, 5)) for i in range(num_heads): im = axes[i].imshow(attention_weights[i][0], aspect='auto', cmap='viridis') axes[i].set_title(f'Head {i+1}') axes[i].set_xlabel('Time Step') axes[i].set_ylabel('Feature Dimension') plt.colorbar(im, ax=axes[i]) plt.suptitle(f'{title} - Attention Weights per Head') plt.tight_layout() plt.savefig(f'multi_head_attention_{title.lower().replace(" ", "_")}.png') plt.close() def plot_feature_importance_per_head(model, feature_names, num_heads, title="Feature Importance per Head"): # Get LSTM layers (one per head) lstm_layers = [layer for layer in model.layers if isinstance(layer, LSTM)][:num_heads] # Get weights for each head head_weights = [np.abs(layer.get_weights()[0]).mean(axis=0) for layer in lstm_layers] # Plot feature importance for each head fig, axes = plt.subplots(num_heads, 1, figsize=(12, 4*num_heads)) for i, weights in enumerate(head_weights): # Get top 10 features feature_importance = pd.DataFrame({ 'feature': feature_names, 'importance': weights.mean(axis=1) }).sort_values('importance', ascending=False).head(10) sns.barplot(data=feature_importance, x='importance', y='feature', ax=axes[i]) axes[i].set_title(f'Head {i+1} - Top 10 Features') plt.suptitle(f'{title} - Feature Importance Analysis') plt.tight_layout() plt.savefig(f'feature_importance_per_head_{title.lower().replace(" ", "_")}.png') plt.close() # Load data print("Loading and preprocessing data...") X_train, y_train, X_test, y_test, activity_labels, X_train_scaled = load_data() print("Data loading completed.") print(f"Training data shape: {X_train.shape}") print(f"Test data shape: {X_test.shape}") print(f"Number of activities: {len(activity_labels)}") # Multi-head LSTM with Attention Model definition def multi_head_lstm_attention(input_shape, num_classes, num_heads=3): # Input layer inputs = Input(shape=input_shape) # Create multiple LSTM heads lstm_outputs = [] attention_outputs = [] for _ in range(num_heads): # LSTM layers for each head lstm = LSTM(50, return_sequences=True)(inputs) lstm = Dropout(0.5)(lstm) lstm_outputs.append(lstm) # Attention mechanism for each head attention = Attention()([lstm, lstm]) attention = LayerNormalization()(attention) attention_outputs.append(attention) # Concatenate all LSTM outputs x = Concatenate()(lstm_outputs) # Apply dropout to combined LSTM outputs x = Dropout(0.5)(x) # Concatenate attention outputs attention_combined = Concatenate()(attention_outputs) # Combine LSTM and attention outputs x = Concatenate()([x, attention_combined]) # Global pooling x = GlobalAveragePooling1D()(x) # Dense layers x = Dense(100, activation='relu')(x) x = Dropout(0.5)(x) # Output layer outputs = Dense(num_classes, activation='softmax')(x) # Create model model = Model(inputs=inputs, outputs=outputs) return model # Function to train and evaluate model def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name="Multi-head LSTM with Attention"): # Compile model model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) # Define callbacks reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001) early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) # Train model print(f"\nTraining {model_name}...") history = model.fit(X_train, y_train, epochs=20, batch_size=64, validation_split=0.2, verbose=1, callbacks=[reduce_lr, early_stopping]) # Evaluate model print(f"\nEvaluating {model_name}...") loss, accuracy = model.evaluate(X_test, y_test, verbose=0) # Make predictions y_pred = model.predict(X_test) y_pred_classes = np.argmax(y_pred, axis=1) y_true = np.argmax(y_test, axis=1) # Calculate metrics precision = precision_score(y_true, y_pred_classes, average='weighted') recall = recall_score(y_true, y_pred_classes, average='weighted') f1 = f1_score(y_true, y_pred_classes, average='weighted') # Print results print(f'\n{model_name} Results:') print(f'Test Accuracy: {accuracy*100:.2f}%') print(f'Test Loss: {loss:.4f}') print(f'Precision: {precision:.4f}') print(f'Recall: {recall:.4f}') print(f'F1 Score: {f1:.4f}') return history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred def plot_head_contribution_analysis(model, X_test, y_test, num_heads, title="Head Contribution Analysis"): # Get attention layers attention_layers = [layer for layer in model.layers if isinstance(layer, Attention)] # Create models for each head head_models = [] for i in range(num_heads): head_output = attention_layers[i].output head_model = Model(inputs=model.input, outputs=head_output) head_models.append(head_model) # Analyze contributions per activity y_true = np.argmax(y_test, axis=1) activity_head_contributions = np.zeros((len(activity_labels), num_heads)) for i, activity in enumerate(range(len(activity_labels))): activity_mask = (y_true == activity) activity_samples = X_test[activity_mask] if len(activity_samples) > 0: for h in range(num_heads): head_output = head_models[h].predict(activity_samples) activity_head_contributions[i, h] = np.mean(np.abs(head_output)) # Plot heatmap of contributions plt.figure(figsize=(12, 8)) sns.heatmap(activity_head_contributions, xticklabels=[f'Head {i+1}' for i in range(num_heads)], yticklabels=activity_labels['activity'], cmap='YlOrRd', annot=True, fmt='.2f') plt.title(f'{title} - Head Contributions per Activity') plt.tight_layout() plt.savefig(f'head_contributions_{title.lower().replace(" ", "_")}.png') plt.close() def save_detailed_results(model_name, history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads): with open(f'{model_name.lower().replace(" ", "_")}_detailed_results.txt', 'w') as f: # Model architecture f.write(f"{model_name} Configuration\n") f.write("="*50 + "\n") f.write(f"Number of attention heads: {num_heads}\n\n") # Basic metrics f.write("Performance Metrics\n") f.write("-"*50 + "\n") f.write(f"Test Accuracy: {accuracy*100:.2f}%\n") f.write(f"Precision: {precision:.4f}\n") f.write(f"Recall: {recall:.4f}\n") f.write(f"F1 Score: {f1:.4f}\n\n") # Per-class metrics f.write("Per-class Performance\n") f.write("-"*50 + "\n") for i, activity in enumerate(activity_labels['activity']): true_class = (y_true == i) pred_class = (y_pred_classes == i) class_precision = precision_score(true_class, pred_class, average='binary') class_recall = recall_score(true_class, pred_class, average='binary') class_f1 = f1_score(true_class, pred_class, average='binary') f.write(f"\n{activity}:\n") f.write(f"Precision: {class_precision:.4f}\n") f.write(f"Recall: {class_recall:.4f}\n") f.write(f"F1 Score: {class_f1:.4f}\n") # Classification report f.write("\nClassification Report\n") f.write("-"*50 + "\n") f.write(classification_report(y_true, y_pred_classes, target_names=activity_labels['activity'])) # Training history f.write("\nTraining History\n") f.write("-"*50 + "\n") f.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n") for i in range(len(history.history['loss'])): f.write(f"{i+1}\t{history.history['loss'][i]:.4f}\t") f.write(f"{history.history['accuracy'][i]:.4f}\t") f.write(f"{history.history['val_loss'][i]:.4f}\t") f.write(f"{history.history['val_accuracy'][i]:.4f}\n") # Main execution if __name__ == "__main__": print("UCI HAR Dataset - Multi-head LSTM with Attention Implementation") print("="*50) # Model parameters num_heads = 3 # Create and train model input_shape = (X_train.shape[1], X_train.shape[2]) num_classes = y_train.shape[1] model = multi_head_lstm_attention(input_shape, num_classes, num_heads) # Print model summary print("\nModel Architecture:") model.summary() # Train and evaluate results = train_and_evaluate_model( model, X_train, y_train, X_test, y_test, "Multi-head LSTM with Attention" ) history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred = results # Generate visualizations plot_training_history(history, "Multi-head LSTM with Attention") plot_confusion_matrix(y_true, y_pred_classes, "Multi-head LSTM with Attention") plot_multi_head_attention(model, X_test, num_heads, "Multi-head LSTM with Attention") plot_feature_importance_per_head(model, X_train_scaled.columns, num_heads, "Multi-head LSTM with Attention") plot_head_contribution_analysis(model, X_test, y_test, num_heads, "Multi-head LSTM with Attention") # Save detailed results save_detailed_results("Multi-head LSTM with Attention", history, accuracy, precision, recall, f1, y_true, y_pred_classes, y_pred, num_heads) # Save model model.save('multihead_lstm_attention_uci_har.h5') print("\nAnalysis complete. All results and visualizations have been saved.") print(f"\nModel saved as 'multihead_lstm_attention_uci_har.h5'") print(f"Detailed results saved as 'multihead_lstm_attention_detailed_results.txt'")