import os import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler from sklearn.metrics import confusion_matrix, roc_curve, auc, precision_recall_curve, classification_report from tensorflow.keras.models import Model, Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, Attention, LayerNormalization, Concatenate, GlobalAveragePooling1D, multiply, Reshape, Layer from tensorflow.keras.utils import to_categorical from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping from sklearn.model_selection import train_test_split import numpy as np import tensorflow as tf import tarfile import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') logger = logging.getLogger() # Suppress TensorFlow GPU warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Enable memory growth for the GPU gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) except RuntimeError as e: logger.error(f"GPU error: {e}") logger.info("Starting data preprocessing...") # Path to the dataset file dataset_path = r'C:\Users\LENOVO LEGION\Desktop\ml codes\ML CODES WISDIM\WISDM_ar_latest.tar.gz' extract_path = r'C:\Users\LENOVO LEGION\Desktop\ml codes\ML CODES WISDIM\WISDM_ar_latest' # Extract the dataset if not os.path.exists(extract_path): logger.info("Extracting dataset...") with tarfile.open(dataset_path, 'r:gz') as tar: tar.extractall(path=extract_path) # Define the path to the main dataset file data_file = os.path.join(extract_path, 'WISDM_ar_v1.1', 'WISDM_ar_v1.1_raw.txt') # Load the dataset, skipping bad lines logger.info("Loading dataset...") column_names = ['user', 'activity', 'timestamp', 'x', 'y', 'z'] wisdm_data = pd.read_csv(data_file, header=None, names=column_names, on_bad_lines='skip') logger.info(f"Initial dataset shape: {wisdm_data.shape}") # Convert all values to strings wisdm_data['x'] = wisdm_data['x'].astype(str) wisdm_data['y'] = wisdm_data['y'].astype(str) wisdm_data['z'] = wisdm_data['z'].astype(str) # Remove non-numeric characters from 'x', 'y', 'z' columns wisdm_data['x'] = wisdm_data['x'].str.replace(';', '', regex=False) wisdm_data['y'] = wisdm_data['y'].str.replace(';', '', regex=False) wisdm_data['z'] = wisdm_data['z'].str.replace(';', '', regex=False) # Remove rows with non-numeric values wisdm_data = wisdm_data[wisdm_data['x'].apply(lambda x: x.replace('.', '', 1).isdigit())] wisdm_data = wisdm_data[wisdm_data['y'].apply(lambda y: y.replace('.', '', 1).isdigit())] wisdm_data = wisdm_data[wisdm_data['z'].apply(lambda z: z.replace('.', '', 1).isdigit())] # Convert columns back to numeric wisdm_data['x'] = pd.to_numeric(wisdm_data['x']) wisdm_data['y'] = pd.to_numeric(wisdm_data['y']) wisdm_data['z'] = pd.to_numeric(wisdm_data['z']) # Handle missing values wisdm_data = wisdm_data.dropna() logger.info(f"Dataset shape after cleaning: {wisdm_data.shape}") # Feature Engineering logger.info("Performing feature engineering...") # Calculate magnitude wisdm_data['magnitude'] = np.sqrt(wisdm_data['x']**2 + wisdm_data['y']**2 + wisdm_data['z']**2) # Calculate jerk (derivative of acceleration) # Use np.diff to calculate differences and handle potential division by zero for axis in ['x', 'y', 'z']: diff = np.diff(wisdm_data[axis]) time_diff = np.diff(wisdm_data['timestamp']) jerk = np.zeros(len(wisdm_data)) jerk[1:] = np.where(time_diff != 0, diff / time_diff, 0) wisdm_data[f'{axis}_jerk'] = jerk # Calculate rolling mean and standard deviation window_size = 20 # Adjust as needed for axis in ['x', 'y', 'z']: wisdm_data[f'{axis}_rolling_mean'] = wisdm_data.groupby('user')[axis].rolling(window=window_size).mean().reset_index(0, drop=True) wisdm_data[f'{axis}_rolling_std'] = wisdm_data.groupby('user')[axis].rolling(window=window_size).std().reset_index(0, drop=True) # Handle NaN and infinite values wisdm_data = wisdm_data.replace([np.inf, -np.inf], np.nan).fillna(method='ffill').fillna(method='bfill') # Map activity labels to integers activity_mapping = {label: idx for idx, label in enumerate(wisdm_data['activity'].unique())} wisdm_data['activity'] = wisdm_data['activity'].map(activity_mapping) # Reverse mapping for later use in confusion matrix reverse_activity_mapping = {v: k for k, v in activity_mapping.items()} # Check if the dataset is empty if wisdm_data.empty: raise ValueError("Dataset is empty after preprocessing") # Normalize the feature data logger.info("Normalizing features...") scaler = StandardScaler() features = ['x', 'y', 'z', 'magnitude', 'x_jerk', 'y_jerk', 'z_jerk', 'x_rolling_mean', 'y_rolling_mean', 'z_rolling_mean', 'x_rolling_std', 'y_rolling_std', 'z_rolling_std'] wisdm_data[features] = scaler.fit_transform(wisdm_data[features]) # Additional check for any remaining infinite values if np.isinf(wisdm_data[features]).any().any(): logger.warning("Infinite values detected after normalization. Replacing with large finite values.") wisdm_data[features] = wisdm_data[features].replace([np.inf, -np.inf], np.finfo(np.float64).max) # Reshape the data sequence_length = 200 # You can adjust this based on your preference def create_sequences(data, seq_length, step=1): sequences = [] labels = [] for start in range(0, len(data) - seq_length, step): sequences.append(data.iloc[start:start + seq_length][features].values) labels.append(data.iloc[start + seq_length - 1]['activity']) return np.array(sequences), np.array(labels) logger.info("Creating sequences...") X, y = create_sequences(wisdm_data, sequence_length) logger.info(f"Shape of X after sequence creation: {X.shape}") logger.info(f"Shape of y after sequence creation: {y.shape}") # Final check for any NaN or infinite values if np.isnan(X).any() or np.isinf(X).any(): logger.error("NaN or infinite values detected in the final dataset. Please check the data preprocessing steps.") raise ValueError("Dataset contains NaN or infinite values after preprocessing.") # Encode labels to categorical y_categorical = to_categorical(y) # Split the data into training and testing sets X_train, X_test, y_train, y_test = train_test_split(X, y_categorical, test_size=0.2, random_state=42) logger.info(f"Training set shape: {X_train.shape}") logger.info(f"Testing set shape: {X_test.shape}") # Define model building functions def build_simple_lstm(input_shape, num_classes): model = Sequential() model.add(LSTM(100, input_shape=input_shape, return_sequences=False)) model.add(Dropout(0.5)) model.add(Dense(num_classes, activation='softmax')) model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) return model def build_deep_lstm(input_shape, num_classes): model = Sequential() model.add(LSTM(100, input_shape=input_shape, return_sequences=True)) model.add(Dropout(0.5)) model.add(LSTM(100, return_sequences=False)) model.add(Dropout(0.5)) model.add(Dense(num_classes, activation='softmax')) model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) return model def build_lstm_attention(input_shape, num_classes): inputs = Input(shape=input_shape) x = LSTM(100, return_sequences=True)(inputs) x = Dropout(0.5)(x) x = LSTM(100, return_sequences=True)(x) x = Dropout(0.5)(x) attention = Attention()([x, x]) attention = LayerNormalization()(attention) x = Concatenate()([x, attention]) x = GlobalAveragePooling1D()(x) x = Dense(100, activation='relu')(x) x = Dropout(0.5)(x) outputs = Dense(num_classes, activation='softmax')(x) model = Model(inputs, outputs) model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) return model def build_multi_head_lstm_attention(input_shape, num_classes): def multi_head_lstm(input_layer, num_heads, units): lstm_heads = [] for _ in range(num_heads): lstm = LSTM(units, return_sequences=True)(input_layer) lstm_heads.append(lstm) return Concatenate()(lstm_heads) inputs = Input(shape=input_shape) x = multi_head_lstm(inputs, num_heads=3, units=50) x = Dropout(0.5)(x) attention = Attention()([x, x]) attention = LayerNormalization()(attention) x = Concatenate()([x, attention]) x = GlobalAveragePooling1D()(x) x = Dense(100, activation='relu')(x) x = Dropout(0.5)(x) outputs = Dense(num_classes, activation='softmax')(x) model = Model(inputs, outputs) model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) return model class SqueezeExciteBlock(Layer): def __init__(self, ratio=16, **kwargs): super(SqueezeExciteBlock, self).__init__(**kwargs) self.ratio = ratio def build(self, input_shape): self.global_avg_pool = GlobalAveragePooling1D() self.dense1 = Dense(input_shape[-1] // self.ratio, activation='relu', kernel_initializer='he_normal', use_bias=False) self.dense2 = Dense(input_shape[-1], activation='sigmoid', kernel_initializer='he_normal', use_bias=False) super(SqueezeExciteBlock, self).build(input_shape) def call(self, inputs): x = self.global_avg_pool(inputs) x = Reshape((1, inputs.shape[-1]))(x) x = self.dense1(x) x = self.dense2(x) x = multiply([inputs, x]) return x def get_config(self): config = super(SqueezeExciteBlock, self).get_config() config.update({"ratio": self.ratio}) return config def build_multi_head_lstm_se(input_shape, num_classes): def multi_head_lstm(input_layer, num_heads, units): lstm_heads = [] for _ in range(num_heads): lstm = LSTM(units, return_sequences=True)(input_layer) lstm_heads.append(lstm) return Concatenate()(lstm_heads) inputs = Input(shape=input_shape) x = multi_head_lstm(inputs, num_heads=3, units=50) x = Dropout(0.5)(x) # SE block se = SqueezeExciteBlock()(x) # Combine LSTM output and SE output x = Concatenate()([x, se]) x = GlobalAveragePooling1D()(x) x = Dense(100, activation='relu')(x) x = Dropout(0.5)(x) outputs = Dense(num_classes, activation='softmax')(x) model = Model(inputs, outputs) model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) return model def train_and_evaluate(model, X_train, y_train, X_test, y_test, epochs=20, batch_size=64): reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001) early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=0.2, verbose=1, callbacks=[reduce_lr, early_stopping]) loss, accuracy = model.evaluate(X_test, y_test, verbose=0) return model, history, accuracy input_shape = (X_train.shape[1], X_train.shape[2]) num_classes = y_train.shape[1] models = { 'Simple LSTM': build_simple_lstm(input_shape, num_classes), 'Deep LSTM': build_deep_lstm(input_shape, num_classes), 'LSTM with Attention': build_lstm_attention(input_shape, num_classes), 'Multi-head LSTM with Attention': build_multi_head_lstm_attention(input_shape, num_classes), 'Multi-head LSTM with SE': build_multi_head_lstm_se(input_shape, num_classes) } histories = {} accuracies = {} f1_scores = {} precisions = {} recalls = {} confusion_matrices = {} for name, model in models.items(): logger.info(f'Training {name}...') model, history, accuracy = train_and_evaluate(model, X_train, y_train, X_test, y_test) histories[name] = history accuracies[name] = accuracy y_pred = model.predict(X_test) y_pred_classes = np.argmax(y_pred, axis=1) y_true = np.argmax(y_test, axis=1) report = classification_report(y_true, y_pred_classes, output_dict=True) f1_scores[name] = report['weighted avg']['f1-score'] precisions[name] = report['weighted avg']['precision'] recalls[name] = report['weighted avg']['recall'] confusion_matrices[name] = confusion_matrix(y_true, y_pred_classes) logger.info(f'{name} Test Accuracy: {accuracy*100:.2f}%') logger.info("Starting visualization...") # Plot confusion matrices plt.figure(figsize=(20, 15)) for i, (name, cm) in enumerate(list(confusion_matrices.items())[:4], 1): plt.subplot(2, 2, i) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') plt.title(f'{name} - Confusion Matrix') plt.ylabel('True label') plt.xlabel('Predicted label') # Set x and y tick labels to activity names tick_marks = np.arange(len(reverse_activity_mapping)) plt.xticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=45, ha='right') plt.yticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=0) plt.tight_layout() plt.show() # Plot the last confusion matrix separately plt.figure(figsize=(10, 8)) name, cm = list(confusion_matrices.items())[-1] sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') plt.title(f'{name} - Confusion Matrix') plt.ylabel('True label') plt.xlabel('Predicted label') # Set x and y tick labels to activity names tick_marks = np.arange(len(reverse_activity_mapping)) plt.xticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=45, ha='right') plt.yticks(tick_marks + 0.5, [reverse_activity_mapping[i] for i in range(len(reverse_activity_mapping))], rotation=0) plt.tight_layout() plt.show() # Plot training & validation accuracy values plt.figure(figsize=(15, 10)) for name, history in histories.items(): plt.plot(history.history['accuracy'], label=f'{name} Training Accuracy') plt.plot(history.history['val_accuracy'], label=f'{name} Validation Accuracy', linestyle='dashed') plt.title('Model Training and Validation Accuracy Comparison') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend(loc='lower right') plt.show() # Plot training & validation loss values plt.figure(figsize=(15, 10)) for name, history in histories.items(): plt.plot(history.history['loss'], label=f'{name} Training Loss') plt.plot(history.history['val_loss'], label=f'{name} Validation Loss', linestyle='dashed') plt.title('Model Training and Validation Loss Comparison') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(loc='upper right') plt.show() # Comparison between validation accuracy and epoch plot plt.figure(figsize=(15, 10)) for name, history in histories.items(): plt.plot(history.history['val_accuracy'], label=name) plt.title('Validation Accuracy Comparison Over Epochs') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend(loc='lower right') plt.show() # Box plot for accuracies plt.figure(figsize=(12, 8)) accuracy_data = [[acc] for acc in accuracies.values()] # Convert each accuracy to a list plt.boxplot(accuracy_data, labels=list(accuracies.keys())) plt.title('Model Accuracy Comparison') plt.ylabel('Accuracy') plt.xticks(rotation=45) plt.tight_layout() plt.show() # Box plot for training accuracies plt.figure(figsize=(12, 8)) training_accuracies = [history.history['accuracy'] for history in histories.values()] plt.boxplot(training_accuracies, labels=list(models.keys())) plt.title('Box Plot of Training Accuracies') plt.ylabel('Accuracy') plt.xticks(rotation=45, ha='right') plt.tight_layout() plt.show() # Recognition error over time plt.figure(figsize=(15, 10)) for name, history in histories.items(): recognition_error = 1 - np.array(history.history['accuracy']) plt.plot(recognition_error, label=f'{name} Recognition Error') plt.title('Recognition Error Over Time') plt.ylabel('Recognition Error') plt.xlabel('Epoch') plt.legend(loc='upper right') plt.show() # F1 Score, Precision, and Recall comparison plt.figure(figsize=(15, 10)) bar_width = 0.2 index = np.arange(len(models)) plt.bar(index, list(f1_scores.values()), bar_width, label='F1 Score') plt.bar(index + bar_width, list(precisions.values()), bar_width, label='Precision') plt.bar(index + 2 * bar_width, list(recalls.values()), bar_width, label='Recall') plt.xlabel('Model') plt.ylabel('Score') plt.title('F1 Score, Precision, and Recall Comparison') plt.xticks(index + bar_width, list(models.keys()), rotation=45) plt.legend() plt.tight_layout() plt.show() # ROC and Precision-Recall curves fpr = {} tpr = {} roc_auc = {} precision_pr = {} recall_pr = {} pr_auc = {} for name, model in models.items(): y_pred = model.predict(X_test) for i in range(y_test.shape[1]): fpr[f'{name}_class_{i}'], tpr[f'{name}_class_{i}'], _ = roc_curve(y_test[:, i], y_pred[:, i]) roc_auc[f'{name}_class_{i}'] = auc(fpr[f'{name}_class_{i}'], tpr[f'{name}_class_{i}']) precision_pr[f'{name}_class_{i}'], recall_pr[f'{name}_class_{i}'], _ = precision_recall_curve(y_test[:, i], y_pred[:, i]) pr_auc[f'{name}_class_{i}'] = auc(recall_pr[f'{name}_class_{i}'], precision_pr[f'{name}_class_{i}']) # Plot ROC curves plt.figure(figsize=(20, 15)) for name in models.keys(): for i in range(y_test.shape[1]): plt.plot(fpr[f'{name}_class_{i}'], tpr[f'{name}_class_{i}'], lw=2, label='{} (class {} ROC area = {:.2f})'.format(name, reverse_activity_mapping[i], roc_auc[f'{name}_class_{i}'])) plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('Receiver Operating Characteristic (ROC) Curves') plt.legend(loc='lower right', fontsize='small') plt.show() # Plot Precision-Recall curves plt.figure(figsize=(20, 15)) for name in models.keys(): for i in range(y_test.shape[1]): plt.plot(recall_pr[f'{name}_class_{i}'], precision_pr[f'{name}_class_{i}'], lw=2, label='{} (class {} PR area = {:.2f})'.format(name, reverse_activity_mapping[i], pr_auc[f'{name}_class_{i}'])) plt.xlabel('Recall') plt.ylabel('Precision') plt.title('Precision-Recall Curves') plt.legend(loc='lower left', fontsize='small') plt.show() # Correlation matrix correlation_matrix = pd.DataFrame(X_train.reshape(X_train.shape[0], -1)).corr() plt.figure(figsize=(12, 10)) sns.heatmap(correlation_matrix, cmap='coolwarm', annot=False) plt.title('Correlation Matrix of Input Features') plt.show() # If the correlation matrix is too large, visualize only a subset if correlation_matrix.shape[0] > 50: plt.figure(figsize=(12, 10)) sns.heatmap(correlation_matrix.iloc[:50, :50], cmap='coolwarm', annot=False) plt.title('Correlation Matrix of First 50 Input Features') plt.show() # Updated function for heatmap metrics comparison using seaborn def plot_heatmap_metrics_comparison(accuracies, f1_scores, precisions, recalls): plt.figure(figsize=(12, 8)) # Prepare data for heatmap data = pd.DataFrame({ 'Accuracy': list(accuracies.values()), 'Precision': list(precisions.values()), 'Recall': list(recalls.values()), 'F1 Score': list(f1_scores.values()) }, index=list(accuracies.keys())) # Create heatmap sns.heatmap(data, annot=True, cmap='YlOrRd', fmt='.4f') plt.title('Model Performance Metrics Heatmap') plt.tight_layout() plt.show() # Call the updated heatmap function plot_heatmap_metrics_comparison(accuracies, f1_scores, precisions, recalls) # Print summary of best performing model best_model = max(accuracies, key=accuracies.get) logger.info(f"Best performing model: {best_model}") logger.info(f"Best accuracy: {accuracies[best_model]:.4f}") logger.info(f"Best F1 score: {f1_scores[best_model]:.4f}") logger.info(f"Best precision: {precisions[best_model]:.4f}") logger.info(f"Best recall: {recalls[best_model]:.4f}") logger.info("Analysis completed.")