import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
import os
import torch.nn.functional as F
from sklearn.model_selection import ParameterGrid, ParameterSampler
# Cargar el dataset
file_path = 'C:/Users/Personal/Documents/Datasets/'
file_path2 = 'C:/Users/Personal/Documents/Datasets/DeepModels/'
file_dataset = 'DataSetDef4.csv'
nombre_sin_extension = (lambda x: os.path.splitext(x)[0])(file_dataset)
file_model = nombre_sin_extension+'_deepnn_model.pth'
file_path2 = file_path2 + file_model
file_path = file_path + file_dataset
data = pd.read_csv(file_path)
# Separar las características (X) y la variable objetivo (y)
X = data.drop(columns=['clor_val'])
y = data['clor_val']
# Normalizar los datos
scaler = StandardScaler()
X = scaler.fit_transform(X)
# Convertir a tensores de PyTorch
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y.values, dtype=torch.float32).view(-1, 1)
# Dividir el dataset en conjunto de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.3, random_state=42)
# Crear DataLoader para PyTorch
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# Definir el modelo de red neuronal profunda mejorado
class DeepNN(nn.Module):
def __init__(self, input_dim):
super(DeepNN, self).__init__()
self.layer1 = nn.Linear(input_dim, 1024)
self.bn1 = nn.BatchNorm1d(1024)
self.layer2 = nn.Linear(1024, 512)
self.bn2 = nn.BatchNorm1d(512)
self.output = nn.Linear(512, 1)
self.dropout = nn.Dropout(0.3)
def forward(self, x):
x = F.leaky_relu(self.bn1(self.layer1(x)))
x = self.dropout(x)
x = F.leaky_relu(self.bn2(self.layer2(x)))
x = self.dropout(x)
x = self.output(x)
return x
# Inicializar el modelo, la función de pérdida y el optimizador
input_dim = X.shape[1]
model = DeepNN(input_dim)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4) # Regularización L2 y LR ajustada
# Scheduler para reducir la tasa de aprendizaje si la pérdida no mejora
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
# Implementación de Early Stopping
class EarlyStopping:
def __init__(self, patience=10, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.best_loss = None
self.counter = 0
self.early_stop = False
def __call__(self, loss):
if self.best_loss is None:
self.best_loss = loss
elif loss > self.best_loss - self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = loss
self.counter = 0
# Función de entrenamiento
def train_model(model, train_loader, criterion, optimizer, scheduler, epochs, early_stopping):
model.train()
for epoch in range(epochs):
epoch_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
# Scheduler y Early Stopping
scheduler.step(epoch_loss/len(train_loader))
early_stopping(epoch_loss/len(train_loader))
print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(train_loader)}')
if early_stopping.early_stop:
print("Early stopping triggered")
break
# Función de evaluación
def evaluate_model(model, test_loader, criterion):
model.eval()
test_loss = 0
all_outputs = []
all_targets = []
with torch.no_grad():
for X_batch, y_batch in test_loader:
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
test_loss += loss.item()
all_outputs.append(outputs)
all_targets.append(y_batch)
all_outputs = torch.cat(all_outputs).cpu().numpy()
all_targets = torch.cat(all_targets).cpu().numpy()
r2 = r2_score(all_targets, all_outputs)
rmse = np.sqrt(mean_squared_error(all_targets, all_outputs))
print(f'Test Loss: {test_loss/len(test_loader)}')
print(f'R2 Score: {r2}')
print(f'RMSE: {rmse}')
return r2, rmse
# Función para guardar el modelo
def save_model(model, file_path):
torch.save(model.state_dict(), file_path)
print(f'Model saved to {file_path}')
# Inicializar Early Stopping
early_stopping = EarlyStopping(patience=10, min_delta=0.001)
# Entrenar el modelo
train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, early_stopping=early_stopping)
# Evaluar el modelo
r2, rmse = evaluate_model(model, test_loader, criterion)
# Definir el espacio de hiperparámetros para la búsqueda
param_grid = {
'learning_rate': [0.00005, 0.0001, 0.0005],
'dropout_rate': [0.2, 0.3, 0.4],
'num_neurons': [128, 256, 512],
'batch_size': [16, 32]
}
# Búsqueda en cuadrícula (Grid Search)
grid_search = list(ParameterGrid(param_grid))
# Búsqueda aleatoria (Random Search) - 10 combinaciones aleatorias
random_search = list(ParameterSampler(param_grid, n_iter=10, random_state=42))
best_r2 = -float('inf')
best_params = None
for params in random_search: # O reemplaza grid_search por random_search para la búsqueda aleatoria
print(f"Probando combinación de parámetros: {params}")
# Crear el modelo con los hiperparámetros actuales
class DeepNN(nn.Module):
def __init__(self, input_dim, num_neurons, dropout_rate):
super(DeepNN, self).__init__()
self.layer1 = nn.Linear(input_dim, num_neurons)
self.layer2 = nn.Linear(num_neurons, num_neurons // 2)
self.layer3 = nn.Linear(num_neurons // 2, num_neurons // 4)
self.output = nn.Linear(num_neurons // 4, 1)
self.dropout = nn.Dropout(dropout_rate)
def forward(self, x):
x = F.leaky_relu(self.layer1(x))
x = self.dropout(x)
x = F.leaky_relu(self.layer2(x))
x = self.dropout(x)
x = F.leaky_relu(self.layer3(x))
x = self.output(x)
return x
input_dim = X.shape[1]
model = DeepNN(input_dim, params['num_neurons'], params['dropout_rate'])
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate'])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
early_stopping = EarlyStopping(patience=10, min_delta=0.001)
# Entrenar el modelo con los parámetros actuales
train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, early_stopping=early_stopping)
r2, rmse = evaluate_model(model, test_loader, criterion)
# Guardar los mejores hiperparámetros
if r2 > best_r2:
best_r2 = r2
best_params = params
print(f"Mejores hiperparámetros: {best_params}")
print(f"Mejor R2 Score: {best_r2}")
# Imprimir las métricas finales
print(f'Final R2 Score: {r2}')
print(f'Final RMSE: {rmse}')
# Guardar el modelo entrenado
save_model(model, file_path2)