import pandas as pd import numpy as np import torch from torch import nn from torch.utils.data import DataLoader, TensorDataset from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.metrics import r2_score, mean_squared_error import os import torch.nn.functional as F from sklearn.model_selection import ParameterGrid, ParameterSampler # Cargar el dataset file_path = 'C:/Users/Personal/Documents/Datasets/' file_path2 = 'C:/Users/Personal/Documents/Datasets/DeepModels/' file_dataset = 'DataSetDef4.csv' nombre_sin_extension = (lambda x: os.path.splitext(x)[0])(file_dataset) file_model = nombre_sin_extension+'_deepnn_model.pth' file_path2 = file_path2 + file_model file_path = file_path + file_dataset data = pd.read_csv(file_path) # Separar las características (X) y la variable objetivo (y) X = data.drop(columns=['clor_val']) y = data['clor_val'] # Normalizar los datos scaler = StandardScaler() X = scaler.fit_transform(X) # Convertir a tensores de PyTorch X_tensor = torch.tensor(X, dtype=torch.float32) y_tensor = torch.tensor(y.values, dtype=torch.float32).view(-1, 1) # Dividir el dataset en conjunto de entrenamiento y prueba X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.3, random_state=42) # Crear DataLoader para PyTorch train_dataset = TensorDataset(X_train, y_train) test_dataset = TensorDataset(X_test, y_test) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False) # Definir el modelo de red neuronal profunda mejorado class DeepNN(nn.Module): def __init__(self, input_dim): super(DeepNN, self).__init__() self.layer1 = nn.Linear(input_dim, 1024) self.bn1 = nn.BatchNorm1d(1024) self.layer2 = nn.Linear(1024, 512) self.bn2 = nn.BatchNorm1d(512) self.output = nn.Linear(512, 1) self.dropout = nn.Dropout(0.3) def forward(self, x): x = F.leaky_relu(self.bn1(self.layer1(x))) x = self.dropout(x) x = F.leaky_relu(self.bn2(self.layer2(x))) x = self.dropout(x) x = self.output(x) return x # Inicializar el modelo, la función de pérdida y el optimizador input_dim = X.shape[1] model = DeepNN(input_dim) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4) # Regularización L2 y LR ajustada # Scheduler para reducir la tasa de aprendizaje si la pérdida no mejora scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5) # Implementación de Early Stopping class EarlyStopping: def __init__(self, patience=10, min_delta=0): self.patience = patience self.min_delta = min_delta self.best_loss = None self.counter = 0 self.early_stop = False def __call__(self, loss): if self.best_loss is None: self.best_loss = loss elif loss > self.best_loss - self.min_delta: self.counter += 1 if self.counter >= self.patience: self.early_stop = True else: self.best_loss = loss self.counter = 0 # Función de entrenamiento def train_model(model, train_loader, criterion, optimizer, scheduler, epochs, early_stopping): model.train() for epoch in range(epochs): epoch_loss = 0 for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() epoch_loss += loss.item() # Scheduler y Early Stopping scheduler.step(epoch_loss/len(train_loader)) early_stopping(epoch_loss/len(train_loader)) print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(train_loader)}') if early_stopping.early_stop: print("Early stopping triggered") break # Función de evaluación def evaluate_model(model, test_loader, criterion): model.eval() test_loss = 0 all_outputs = [] all_targets = [] with torch.no_grad(): for X_batch, y_batch in test_loader: outputs = model(X_batch) loss = criterion(outputs, y_batch) test_loss += loss.item() all_outputs.append(outputs) all_targets.append(y_batch) all_outputs = torch.cat(all_outputs).cpu().numpy() all_targets = torch.cat(all_targets).cpu().numpy() r2 = r2_score(all_targets, all_outputs) rmse = np.sqrt(mean_squared_error(all_targets, all_outputs)) print(f'Test Loss: {test_loss/len(test_loader)}') print(f'R2 Score: {r2}') print(f'RMSE: {rmse}') return r2, rmse # Función para guardar el modelo def save_model(model, file_path): torch.save(model.state_dict(), file_path) print(f'Model saved to {file_path}') # Inicializar Early Stopping early_stopping = EarlyStopping(patience=10, min_delta=0.001) # Entrenar el modelo train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, early_stopping=early_stopping) # Evaluar el modelo r2, rmse = evaluate_model(model, test_loader, criterion) # Definir el espacio de hiperparámetros para la búsqueda param_grid = { 'learning_rate': [0.00005, 0.0001, 0.0005], 'dropout_rate': [0.2, 0.3, 0.4], 'num_neurons': [128, 256, 512], 'batch_size': [16, 32] } # Búsqueda en cuadrícula (Grid Search) grid_search = list(ParameterGrid(param_grid)) # Búsqueda aleatoria (Random Search) - 10 combinaciones aleatorias random_search = list(ParameterSampler(param_grid, n_iter=10, random_state=42)) best_r2 = -float('inf') best_params = None for params in random_search: # O reemplaza grid_search por random_search para la búsqueda aleatoria print(f"Probando combinación de parámetros: {params}") # Crear el modelo con los hiperparámetros actuales class DeepNN(nn.Module): def __init__(self, input_dim, num_neurons, dropout_rate): super(DeepNN, self).__init__() self.layer1 = nn.Linear(input_dim, num_neurons) self.layer2 = nn.Linear(num_neurons, num_neurons // 2) self.layer3 = nn.Linear(num_neurons // 2, num_neurons // 4) self.output = nn.Linear(num_neurons // 4, 1) self.dropout = nn.Dropout(dropout_rate) def forward(self, x): x = F.leaky_relu(self.layer1(x)) x = self.dropout(x) x = F.leaky_relu(self.layer2(x)) x = self.dropout(x) x = F.leaky_relu(self.layer3(x)) x = self.output(x) return x input_dim = X.shape[1] model = DeepNN(input_dim, params['num_neurons'], params['dropout_rate']) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate']) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5) early_stopping = EarlyStopping(patience=10, min_delta=0.001) # Entrenar el modelo con los parámetros actuales train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, early_stopping=early_stopping) r2, rmse = evaluate_model(model, test_loader, criterion) # Guardar los mejores hiperparámetros if r2 > best_r2: best_r2 = r2 best_params = params print(f"Mejores hiperparámetros: {best_params}") print(f"Mejor R2 Score: {best_r2}") # Imprimir las métricas finales print(f'Final R2 Score: {r2}') print(f'Final RMSE: {rmse}') # Guardar el modelo entrenado save_model(model, file_path2)