CodeExamples / Face Recognition / lightning.py
lightning.py
Raw
import time

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import transforms

import datasets
import models

batch_size = 24
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")


training_data = datasets.DigiFaceDataset(
    path = "datasets/DigiFace/minitest", #subjects_0-1999_72_imgs"
    split='train'
    #transform=transforms.RandomRotatiion,
    #download=True
)
testing_data = datasets.DigiFaceDataset(
    path = "datasets/DigiFace/minitest", #subjects_0-1999_72_imgs"
    split='test'
    #transform=transforms.RandomRotatiion,
    #download=True
)

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(testing_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

model = models.Backbone().to(device)
def count_parameters(model): return sum(p.numel() for p in model.parameters() if p.requires_grad) 
print(model)
print(count_parameters(model))

#model.forward(training_data[0][0])

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9,
                            nesterov=True)



def train(dataloader, model, loss_fn, optimizer, isLastEpoch=False):
    size = len(dataloader.dataset)
    model.train()
    start = time.time()    
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)#, isNorm=False, isPrint=False, visualize=False)
        pred_loss = loss_fn(pred, y)                
        loss = pred_loss
        
        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        if batch % 50 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") 
    end = time.time()
    print("Epoch time: " + str((end - start)))

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

epochs = 100
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer, t == epochs-1)
    test(test_dataloader, model, loss_fn)
print("Done!")

print("Printing layer representations")
with torch.no_grad():
    i = 0
    for batch, (X, y) in enumerate(test_dataloader):
        X, y = X.to(device), y.to(device)
        # Compute prediction error
        pred = model(X)#, isNorm=False, isPrint=False, visualize=True)
        i += 1
        if i >= 0 :break
print("done.")

torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")