framework-chl-temp / trainer.py
trainer.py
Raw
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
import os
import torch.nn.functional as F
from sklearn.model_selection import ParameterGrid, ParameterSampler

# Cargar el dataset
file_path = 'C:/Users/Personal/Documents/Datasets/'
file_path2 = 'C:/Users/Personal/Documents/Datasets/DeepModels/'
file_dataset = 'DataSetDef4.csv'
nombre_sin_extension = (lambda x: os.path.splitext(x)[0])(file_dataset)
file_model = nombre_sin_extension+'_deepnn_model.pth'
file_path2 = file_path2 + file_model
file_path = file_path + file_dataset
data = pd.read_csv(file_path)

# Separar las características (X) y la variable objetivo (y)
X = data.drop(columns=['clor_val'])
y = data['clor_val']

# Normalizar los datos
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Convertir a tensores de PyTorch
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y.values, dtype=torch.float32).view(-1, 1)

# Dividir el dataset en conjunto de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.3, random_state=42)

# Crear DataLoader para PyTorch
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Definir el modelo de red neuronal profunda mejorado
class DeepNN(nn.Module):
    def __init__(self, input_dim):
        super(DeepNN, self).__init__()
        self.layer1 = nn.Linear(input_dim, 1024)
        self.bn1 = nn.BatchNorm1d(1024)
        self.layer2 = nn.Linear(1024, 512)
        self.bn2 = nn.BatchNorm1d(512)
        self.output = nn.Linear(512, 1)
        self.dropout = nn.Dropout(0.3)
        
    def forward(self, x):
        x = F.leaky_relu(self.bn1(self.layer1(x)))
        x = self.dropout(x)
        x = F.leaky_relu(self.bn2(self.layer2(x)))
        x = self.dropout(x)
        x = self.output(x)
        return x



# Inicializar el modelo, la función de pérdida y el optimizador
input_dim = X.shape[1]
model = DeepNN(input_dim)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4)  # Regularización L2 y LR ajustada

# Scheduler para reducir la tasa de aprendizaje si la pérdida no mejora
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

# Implementación de Early Stopping
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, loss):
        if self.best_loss is None:
            self.best_loss = loss
        elif loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = loss
            self.counter = 0

# Función de entrenamiento
def train_model(model, train_loader, criterion, optimizer, scheduler, epochs, early_stopping):
    model.train()
    for epoch in range(epochs):
        epoch_loss = 0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        
        # Scheduler y Early Stopping
        scheduler.step(epoch_loss/len(train_loader))
        early_stopping(epoch_loss/len(train_loader))
        print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(train_loader)}')

        if early_stopping.early_stop:
            print("Early stopping triggered")
            break

# Función de evaluación
def evaluate_model(model, test_loader, criterion):
    model.eval()
    test_loss = 0
    all_outputs = []
    all_targets = []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item()
            all_outputs.append(outputs)
            all_targets.append(y_batch)

    all_outputs = torch.cat(all_outputs).cpu().numpy()
    all_targets = torch.cat(all_targets).cpu().numpy()

    r2 = r2_score(all_targets, all_outputs)
    rmse = np.sqrt(mean_squared_error(all_targets, all_outputs))

    print(f'Test Loss: {test_loss/len(test_loader)}')
    print(f'R2 Score: {r2}')
    print(f'RMSE: {rmse}')

    return r2, rmse

# Función para guardar el modelo
def save_model(model, file_path):
    torch.save(model.state_dict(), file_path)
    print(f'Model saved to {file_path}')

# Inicializar Early Stopping
early_stopping = EarlyStopping(patience=10, min_delta=0.001)

# Entrenar el modelo
train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, early_stopping=early_stopping)

# Evaluar el modelo
r2, rmse = evaluate_model(model, test_loader, criterion)

# Definir el espacio de hiperparámetros para la búsqueda
param_grid = {
    'learning_rate': [0.00005, 0.0001, 0.0005],
    'dropout_rate': [0.2, 0.3, 0.4],
    'num_neurons': [128, 256, 512],
    'batch_size': [16, 32]
}

# Búsqueda en cuadrícula (Grid Search)
grid_search = list(ParameterGrid(param_grid))

# Búsqueda aleatoria (Random Search) - 10 combinaciones aleatorias
random_search = list(ParameterSampler(param_grid, n_iter=10, random_state=42))

best_r2 = -float('inf')
best_params = None

for params in random_search:  # O reemplaza grid_search por random_search para la búsqueda aleatoria
    print(f"Probando combinación de parámetros: {params}")

    # Crear el modelo con los hiperparámetros actuales
    class DeepNN(nn.Module):
        def __init__(self, input_dim, num_neurons, dropout_rate):
            super(DeepNN, self).__init__()
            self.layer1 = nn.Linear(input_dim, num_neurons)
            self.layer2 = nn.Linear(num_neurons, num_neurons // 2)
            self.layer3 = nn.Linear(num_neurons // 2, num_neurons // 4)
            self.output = nn.Linear(num_neurons // 4, 1)
            self.dropout = nn.Dropout(dropout_rate)

        def forward(self, x):
            x = F.leaky_relu(self.layer1(x))
            x = self.dropout(x)
            x = F.leaky_relu(self.layer2(x))
            x = self.dropout(x)
            x = F.leaky_relu(self.layer3(x))
            x = self.output(x)
            return x

    input_dim = X.shape[1]
    model = DeepNN(input_dim, params['num_neurons'], params['dropout_rate'])

    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate'])
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
    early_stopping = EarlyStopping(patience=10, min_delta=0.001)

    # Entrenar el modelo con los parámetros actuales
    train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, early_stopping=early_stopping)
    r2, rmse = evaluate_model(model, test_loader, criterion)

    # Guardar los mejores hiperparámetros
    if r2 > best_r2:
        best_r2 = r2
        best_params = params

print(f"Mejores hiperparámetros: {best_params}")
print(f"Mejor R2 Score: {best_r2}")

# Imprimir las métricas finales
print(f'Final R2 Score: {r2}')
print(f'Final RMSE: {rmse}')

# Guardar el modelo entrenado
save_model(model, file_path2)