import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, f_classif
import tensorflow as tf
# GPU setup
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print("GPU is available")
except RuntimeError as e:
print(e)
else:
print("GPU is not available, using CPU")
# Data loading function
def load_dataset(path):
data_files = [f for f in os.listdir(path) if f.endswith('.dat')]
all_data = []
for file in data_files:
df = pd.read_csv(os.path.join(path, file), header=None, sep=' ')
all_data.append(df)
return pd.concat(all_data, ignore_index=True)
# Load data
path = r"C:\Users\LENOVO LEGION\Downloads\opportunity_dataset\OpportunityUCIDataset\dataset"
data = load_dataset(path)
print(f"Original dataset shape: {data.shape}")
print(f"Number of NaN values: {data.isna().sum().sum()}")
# Remove rows with NaN values
data = data.dropna()
print(f"Dataset shape after removing NaN: {data.shape}")
# Check for infinite values
print(f"Number of infinite values: {np.isinf(data.select_dtypes(include=np.number)).sum().sum()}")
# Replace infinite values with NaN and then drop those rows
data = data.replace([np.inf, -np.inf], np.nan).dropna()
print(f"Dataset shape after removing infinite values: {data.shape}")
# Define the mapping from numerical labels to activity names
label_mapping = {
0: 'NULL', # Adding NULL class for label 0
1: 'Stand', 2: 'Walk', 4: 'Sit', 5: 'Lie',
101: 'Relaxing', 102: 'Coffee time', 103: 'Early morning', 104: 'Cleanup', 105: 'Sandwich time',
201: 'unlock', 202: 'stir', 203: 'lock', 204: 'close', 205: 'reach', 206: 'open', 207: 'sip',
208: 'clean', 209: 'bite', 210: 'cut', 211: 'spread', 212: 'release', 213: 'move',
301: 'Bottle', 302: 'Salami', 303: 'Bread', 304: 'Sugar', 305: 'Dishwasher', 306: 'Switch',
307: 'Milk', 308: 'Drawer3 (lower)', 309: 'Spoon', 310: 'Knife cheese', 311: 'Drawer2 (middle)',
312: 'Table', 313: 'Glass', 314: 'Cheese', 315: 'Chair', 316: 'Door1', 317: 'Door2', 318: 'Plate',
319: 'Drawer1 (top)', 320: 'Fridge', 321: 'Cup', 322: 'Knife salami', 323: 'Lazychair',
401: 'unlock', 402: 'stir', 403: 'lock', 404: 'close', 405: 'reach', 406: 'open', 407: 'sip',
408: 'clean', 409: 'bite', 410: 'cut', 411: 'spread', 412: 'release', 413: 'move',
501: 'Bottle', 502: 'Salami', 503: 'Bread', 504: 'Sugar', 505: 'Dishwasher', 506: 'Switch',
507: 'Milk', 508: 'Drawer3 (lower)', 509: 'Spoon', 510: 'Knife cheese', 511: 'Drawer2 (middle)',
512: 'Table', 513: 'Glass', 514: 'Cheese', 515: 'Chair', 516: 'Door1', 517: 'Door2', 518: 'Plate',
519: 'Drawer1 (top)', 520: 'Fridge', 521: 'Cup', 522: 'Knife salami', 523: 'Lazychair',
406516: 'Open Door 1', 406517: 'Open Door 2', 404516: 'Close Door 1', 404517: 'Close Door 2',
406520: 'Open Fridge', 404520: 'Close Fridge', 406505: 'Open Dishwasher', 404505: 'Close Dishwasher',
406519: 'Open Drawer 1', 404519: 'Close Drawer 1', 406511: 'Open Drawer 2', 404511: 'Close Drawer 2',
406508: 'Open Drawer 3', 404508: 'Close Drawer 3', 408512: 'Clean Table', 407521: 'Drink from Cup',
405506: 'Toggle Switch'
}
# Assuming the last column is the label and the rest are features
X = data.iloc[:, :-1].values # Convert to numpy array
y = data.iloc[:, -1].values
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
# Ensure X and y have the same number of samples
min_samples = min(X.shape[0], y.shape[0])
X = X[:min_samples]
y = y[:min_samples]
print(f"X shape after alignment: {X.shape}")
print(f"y shape after alignment: {y.shape}")
# Get unique labels in the dataset
unique_labels = np.unique(y)
print(f"Number of unique labels in the dataset: {len(unique_labels)}")
print(f"Unique labels in the dataset: {unique_labels}")
# Check if all labels in the dataset are in our mapping
if set(unique_labels).issubset(set(label_mapping.keys())):
print("All labels in the dataset are recognized.")
else:
unrecognized_labels = set(unique_labels) - set(label_mapping.keys())
print(f"Warning: Unrecognized labels found in the dataset: {unrecognized_labels}")
for label in unrecognized_labels:
label_mapping[label] = f"Unknown_{label}"
# Map numerical labels to activity names
activity_labels = [label_mapping[label] for label in unique_labels]
print(f"\nMapped activity labels: {activity_labels}")
# Create a new label encoder with these activity labels
label_encoder = LabelEncoder()
label_encoder.fit(activity_labels)
# Transform the original numerical labels
y_transformed = np.array([label_mapping[label] for label in y])
y_encoded = label_encoder.transform(y_transformed)
print(f"\nNumber of classes after encoding: {len(np.unique(y_encoded))}")
# Replace the original y with the encoded version
y = y_encoded
# Feature engineering
def engineer_features(X, y):
# Calculate rolling statistics
X_rolled_mean = pd.DataFrame(X).rolling(window=10).mean().values
X_rolled_std = pd.DataFrame(X).rolling(window=10).std().values
# Rate of change
X_diff = np.diff(X, axis=0)
X_diff = np.vstack([np.zeros((1, X.shape[1])), X_diff])
# Combine features
X_new = np.hstack([X, X_rolled_mean, X_rolled_std, X_diff])
# Handle NaN values
imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X_new)
# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_imputed)
# Select top 5000 features
selector = SelectKBest(f_classif, k=min(5000, X_scaled.shape[1]))
X_selected = selector.fit_transform(X_scaled, y)
print(f"Number of features after selection: {X_selected.shape[1]}")
return X_selected, y
def create_simple_lstm(input_shape, num_classes):
model = tf.keras.Sequential([
tf.keras.layers.LSTM(100, input_shape=input_shape,
kernel_regularizer=tf.keras.regularizers.l2(0.01),
recurrent_regularizer=tf.keras.regularizers.l2(0.01)),
tf.keras.layers.Dense(num_classes, activation='softmax')
])
return model
# Apply feature engineering
X, y = engineer_features(X, y)
print(f"X shape after engineering: {X.shape}")
print(f"y shape after engineering: {y.shape}")
# Reshape input to be 3D [samples, time steps, features]
X = X.reshape((X.shape[0], 1, X.shape[1]))
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f"\nTraining set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
# Create and compile model
input_shape = (X_train.shape[1], X_train.shape[2])
num_classes = len(np.unique(y))
model = create_simple_lstm(input_shape, num_classes)
# Define optimizer and compile model
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.0001, clipvalue=1.0)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# Callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001)
# Train model
history = model.fit(X_train, y_train, epochs=100, batch_size=16, validation_split=0.2,
callbacks=[early_stopping, reduce_lr], verbose=1)
# Evaluate model
test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
y_pred = model.predict(X_test).argmax(axis=1)
# Calculate metrics
f1 = f1_score(y_test, y_pred, average='weighted')
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
cm = confusion_matrix(y_test, y_pred)
# Plotting functions
def plot_confusion_matrices():
fig, ax = plt.subplots(figsize=(12, 12))
cm = confusion_matrix(y_test, y_pred)
im = ax.imshow(cm, interpolation='nearest', cmap='Blues', aspect='auto')
ax.set_title(f"Simple LSTM - Confusion Matrix", fontsize=12)
tick_marks = np.arange(len(activity_labels))
ax.set_xticks(tick_marks)
ax.set_yticks(tick_marks)
ax.set_xticklabels(activity_labels, rotation=90, ha='right', fontsize=8)
ax.set_yticklabels(activity_labels, fontsize=8)
thresh = cm.max() / 2.
for i, j in np.ndindex(cm.shape):
ax.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black",
fontsize=6)
ax.set_ylabel('True label')
ax.set_xlabel('Predicted label')
fig.colorbar(im, ax=ax, label='Number of samples', orientation='vertical', pad=0.01)
plt.tight_layout()
plt.savefig('confusion_matrix_Simple_LSTM.png', dpi=300, bbox_inches='tight')
plt.close()
def plot_accuracy_epochs():
plt.figure(figsize=(12, 8))
plt.plot(history.history['accuracy'], label='Training')
plt.plot(history.history['val_accuracy'], label='Validation')
plt.title('Accuracy over Epochs - Simple LSTM')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig('accuracy_epochs_Simple_LSTM.png')
plt.close()
def plot_loss():
plt.figure(figsize=(12, 8))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss - Simple LSTM')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss_Simple_LSTM.png')
plt.close()
def plot_f1_precision_recall():
metrics = ['F1 Score', 'Precision', 'Recall']
values = [f1, precision, recall]
plt.figure(figsize=(10, 6))
plt.bar(metrics, values)
plt.title('Model Metrics - Simple LSTM')
plt.ylim(0, 1)
for i, v in enumerate(values):
plt.text(i, v + 0.01, f'{v:.4f}', ha='center')
plt.tight_layout()
plt.savefig('metrics_Simple_LSTM.png')
plt.close()
# Generate all plots
plot_confusion_matrices()
plot_accuracy_epochs()
plot_loss()
plot_f1_precision_recall()
# Print results
print("\nSimple LSTM Results:")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
# Save the model
model.save('simple_lstm_model.h5')