import os import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.impute import SimpleImputer from sklearn.feature_selection import SelectKBest, f_classif import tensorflow as tf # GPU setup gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) print("GPU is available") except RuntimeError as e: print(e) else: print("GPU is not available, using CPU") # Custom Layers class MultiHeadLSTM(tf.keras.layers.Layer): def __init__(self, units, num_heads, **kwargs): super(MultiHeadLSTM, self).__init__(**kwargs) self.units = units self.num_heads = num_heads self.heads = [tf.keras.layers.LSTM(units // num_heads, return_sequences=True) for _ in range(num_heads)] def call(self, inputs): head_outputs = [head(inputs) for head in self.heads] return tf.keras.layers.Concatenate()(head_outputs) class SEModule(tf.keras.layers.Layer): def __init__(self, channels, ratio=16): super(SEModule, self).__init__() self.avg_pool = tf.keras.layers.GlobalAveragePooling1D() self.fc1 = tf.keras.layers.Dense(channels // ratio, activation='relu') self.fc2 = tf.keras.layers.Dense(channels, activation='sigmoid') def call(self, inputs): batch, _, channels = inputs.shape x = self.avg_pool(inputs) x = self.fc1(x) x = self.fc2(x) x = tf.reshape(x, (-1, 1, channels)) return inputs * x # Data loading function def load_dataset(path): data_files = [f for f in os.listdir(path) if f.endswith('.dat')] all_data = [] for file in data_files: df = pd.read_csv(os.path.join(path, file), header=None, sep=' ') all_data.append(df) return pd.concat(all_data, ignore_index=True) # Load data path = r"C:\Users\LENOVO LEGION\Downloads\opportunity_dataset\OpportunityUCIDataset\dataset" data = load_dataset(path) print(f"Original dataset shape: {data.shape}") print(f"Number of NaN values: {data.isna().sum().sum()}") # Remove rows with NaN values data = data.dropna() print(f"Dataset shape after removing NaN: {data.shape}") # Check for infinite values print(f"Number of infinite values: {np.isinf(data.select_dtypes(include=np.number)).sum().sum()}") # Replace infinite values with NaN and then drop those rows data = data.replace([np.inf, -np.inf], np.nan).dropna() print(f"Dataset shape after removing infinite values: {data.shape}") # Define the mapping from numerical labels to activity names label_mapping = { 0: 'NULL', # Adding NULL class for label 0 1: 'Stand', 2: 'Walk', 4: 'Sit', 5: 'Lie', 101: 'Relaxing', 102: 'Coffee time', 103: 'Early morning', 104: 'Cleanup', 105: 'Sandwich time', 201: 'unlock', 202: 'stir', 203: 'lock', 204: 'close', 205: 'reach', 206: 'open', 207: 'sip', 208: 'clean', 209: 'bite', 210: 'cut', 211: 'spread', 212: 'release', 213: 'move', 301: 'Bottle', 302: 'Salami', 303: 'Bread', 304: 'Sugar', 305: 'Dishwasher', 306: 'Switch', 307: 'Milk', 308: 'Drawer3 (lower)', 309: 'Spoon', 310: 'Knife cheese', 311: 'Drawer2 (middle)', 312: 'Table', 313: 'Glass', 314: 'Cheese', 315: 'Chair', 316: 'Door1', 317: 'Door2', 318: 'Plate', 319: 'Drawer1 (top)', 320: 'Fridge', 321: 'Cup', 322: 'Knife salami', 323: 'Lazychair', 401: 'unlock', 402: 'stir', 403: 'lock', 404: 'close', 405: 'reach', 406: 'open', 407: 'sip', 408: 'clean', 409: 'bite', 410: 'cut', 411: 'spread', 412: 'release', 413: 'move', 501: 'Bottle', 502: 'Salami', 503: 'Bread', 504: 'Sugar', 505: 'Dishwasher', 506: 'Switch', 507: 'Milk', 508: 'Drawer3 (lower)', 509: 'Spoon', 510: 'Knife cheese', 511: 'Drawer2 (middle)', 512: 'Table', 513: 'Glass', 514: 'Cheese', 515: 'Chair', 516: 'Door1', 517: 'Door2', 518: 'Plate', 519: 'Drawer1 (top)', 520: 'Fridge', 521: 'Cup', 522: 'Knife salami', 523: 'Lazychair', 406516: 'Open Door 1', 406517: 'Open Door 2', 404516: 'Close Door 1', 404517: 'Close Door 2', 406520: 'Open Fridge', 404520: 'Close Fridge', 406505: 'Open Dishwasher', 404505: 'Close Dishwasher', 406519: 'Open Drawer 1', 404519: 'Close Drawer 1', 406511: 'Open Drawer 2', 404511: 'Close Drawer 2', 406508: 'Open Drawer 3', 404508: 'Close Drawer 3', 408512: 'Clean Table', 407521: 'Drink from Cup', 405506: 'Toggle Switch' } # Feature engineering def engineer_features(X, y): # Calculate rolling statistics X_rolled_mean = pd.DataFrame(X).rolling(window=10).mean().values X_rolled_std = pd.DataFrame(X).rolling(window=10).std().values # Rate of change X_diff = np.diff(X, axis=0) X_diff = np.vstack([np.zeros((1, X.shape[1])), X_diff]) # Combine features X_new = np.hstack([X, X_rolled_mean, X_rolled_std, X_diff]) # Handle NaN values imputer = SimpleImputer(strategy='mean') X_imputed = imputer.fit_transform(X_new) # Standardize features scaler = StandardScaler() X_scaled = scaler.fit_transform(X_imputed) # Select top 5000 features selector = SelectKBest(f_classif, k=min(5000, X_scaled.shape[1])) X_selected = selector.fit_transform(X_scaled, y) print(f"Number of features after selection: {X_selected.shape[1]}") return X_selected, y # Model definition def create_multihead_lstm_se(input_shape, num_classes): model = tf.keras.Sequential([ MultiHeadLSTM(100, num_heads=4, input_shape=input_shape), tf.keras.layers.LayerNormalization(), tf.keras.layers.Dropout(0.1), SEModule(100), tf.keras.layers.Flatten(), tf.keras.layers.Dense(50, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)), tf.keras.layers.Dropout(0.1), tf.keras.layers.Dense(num_classes, activation='softmax') ]) return model # Assuming the last column is the label and the rest are features X = data.iloc[:, :-1].values # Convert to numpy array y = data.iloc[:, -1].values print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") # Ensure X and y have the same number of samples min_samples = min(X.shape[0], y.shape[0]) X = X[:min_samples] y = y[:min_samples] print(f"X shape after alignment: {X.shape}") print(f"y shape after alignment: {y.shape}") # Get unique labels in the dataset unique_labels = np.unique(y) print(f"Number of unique labels in the dataset: {len(unique_labels)}") print(f"Unique labels in the dataset: {unique_labels}") # Check if all labels in the dataset are in our mapping if set(unique_labels).issubset(set(label_mapping.keys())): print("All labels in the dataset are recognized.") else: unrecognized_labels = set(unique_labels) - set(label_mapping.keys()) print(f"Warning: Unrecognized labels found in the dataset: {unrecognized_labels}") for label in unrecognized_labels: label_mapping[label] = f"Unknown_{label}" # Map numerical labels to activity names activity_labels = [label_mapping[label] for label in unique_labels] print(f"\nMapped activity labels: {activity_labels}") # Create a new label encoder with these activity labels label_encoder = LabelEncoder() label_encoder.fit(activity_labels) # Transform the original numerical labels y_transformed = np.array([label_mapping[label] for label in y]) y_encoded = label_encoder.transform(y_transformed) print(f"\nNumber of classes after encoding: {len(np.unique(y_encoded))}") # Replace the original y with the encoded version y = y_encoded # Apply feature engineering X, y = engineer_features(X, y) print(f"X shape after engineering: {X.shape}") print(f"y shape after engineering: {y.shape}") # Reshape input to be 3D [samples, time steps, features] X = X.reshape((X.shape[0], 1, X.shape[1])) # Split the data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) print(f"\nTraining set shape: {X_train.shape}") print(f"Test set shape: {X_test.shape}") # Create and compile model input_shape = (X_train.shape[1], X_train.shape[2]) num_classes = len(np.unique(y)) model = create_multihead_lstm_se(input_shape, num_classes) # Compile model with same settings as Multi-head LSTM optimizer = tf.keras.optimizers.Adam(learning_rate=0.001, clipnorm=1.0) model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy']) # Callbacks early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001) # Train model history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_split=0.2, callbacks=[early_stopping, reduce_lr], verbose=1) # Evaluate model test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0) y_pred = model.predict(X_test).argmax(axis=1) # Calculate metrics f1 = f1_score(y_test, y_pred, average='weighted') precision = precision_score(y_test, y_pred, average='weighted') recall = recall_score(y_test, y_pred, average='weighted') cm = confusion_matrix(y_test, y_pred) # Plotting functions def plot_confusion_matrices(): fig, ax = plt.subplots(figsize=(12, 12)) im = ax.imshow(cm, interpolation='nearest', cmap='Blues', aspect='auto') ax.set_title(f"Multi-head LSTM with SE - Confusion Matrix", fontsize=12) tick_marks = np.arange(len(activity_labels)) ax.set_xticks(tick_marks) ax.set_yticks(tick_marks) ax.set_xticklabels(activity_labels, rotation=90, ha='right', fontsize=8) ax.set_yticklabels(activity_labels, fontsize=8) thresh = cm.max() / 2. for i, j in np.ndindex(cm.shape): ax.text(j, i, format(cm[i, j], 'd'), ha="center", va="center", color="white" if cm[i, j] > thresh else "black", fontsize=6) ax.set_ylabel('True label') ax.set_xlabel('Predicted label') fig.colorbar(im, ax=ax, label='Number of samples', orientation='vertical', pad=0.01) plt.tight_layout() plt.savefig('confusion_matrix_MultiHead_LSTM_SE.png', dpi=300, bbox_inches='tight') plt.close() def plot_accuracy_epochs(): plt.figure(figsize=(12, 8)) plt.plot(history.history['accuracy'], label='Training') plt.plot(history.history['val_accuracy'], label='Validation') plt.title('Model Accuracy - Multi-head LSTM with SE') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() plt.savefig('accuracy_epochs_MultiHead_LSTM_SE.png') plt.close() def plot_loss(): plt.figure(figsize=(12, 8)) plt.plot(history.history['loss'], label='Training Loss') plt.plot(history.history['val_loss'], label='Validation Loss') plt.title('Model Loss - Multi-head LSTM with SE') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.savefig('loss_MultiHead_LSTM_SE.png') plt.close() def plot_f1_precision_recall(): metrics = ['F1 Score', 'Precision', 'Recall'] values = [f1, precision, recall] plt.figure(figsize=(10, 6)) plt.bar(metrics, values) plt.title('Model Metrics - Multi-head LSTM with SE') plt.ylim(0, 1) for i, v in enumerate(values): plt.text(i, v + 0.01, f'{v:.4f}', ha='center') plt.tight_layout() plt.savefig('metrics_MultiHead_LSTM_SE.png') plt.close() # Generate all plots plot_confusion_matrices() plot_accuracy_epochs() plot_loss() plot_f1_precision_recall() # Print results print("\nMulti-head LSTM with SE Results:") print(f"Test Accuracy: {test_accuracy:.4f}") print(f"F1 Score: {f1:.4f}") print(f"Precision: {precision:.4f}") print(f"Recall: {recall:.4f}") # Save the model model.save('multihead_lstm_se_model.h5')