RoboSkate-RL / scripts / python / RoboSkate / Image_CNN.py
Image_CNN.py
Raw
"""
RoboSkate classify images to driving direction

The expert data with the steering angle is used to train a model that extracts a steering angle from the images.
No good results have been achieved yet.
"""

import torch
from torch import nn
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision.transforms as T


import os
from torchvision.io import read_image
import pickle


# --------------------------------------------------------------------------------
# ------------------ Dataloder -------- ------------------------------------------
# --------------------------------------------------------------------------------

class RoboSkateImageDataset():

    def loadImageList(self):
        with open("./expert_data/Steering_angle/image_data.pkl", "rb") as fp:  # Pickling
            img_labels = pickle.load(fp)
        self.img_labels = img_labels
        print("Bilder: " + str(len(img_labels)))

    def __init__(self, transform=None, target_transform=None):
        self.loadImageList()
        self.img_dir = "./expert_data/Steering_angle/images/"
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)-1

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels[idx][0])
        image = read_image(img_path)
        label = self.img_labels[idx][1]

        autocontraster = T.RandomAutocontrast()
        image = autocontraster(image)
        cropper = T.RandomResizedCrop(size=(80, 80))
        image = cropper(image)

        return image, label






# --------------------------------------------------------------------------------
# ------------------ Model -------------------------------------------------------
# --------------------------------------------------------------------------------


# Define model
class CNNNetwork(nn.Module):
    def __init__(self):
        super(CNNNetwork, self).__init__()
        self.cnn_layers = nn.Sequential(
            # Defining a 2D convolution layer
            nn.Conv2d(3, 32, kernel_size=8, stride=4, padding=0),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            # Defining another 2D convolution layer
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            # Defining another 2D convolution layer
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
        )


        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(6*6*64, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 1)
        )
        


    # Defining the forward pass
    def forward(self, x):
        #print(x.size())
        x = self.cnn_layers(x.float())
        #print(x.size())
        x = x.view(x.size(0), -1)
        #print(x.size())
        x = self.flatten(x)
        #print(x.size())
        logits = self.linear_relu_stack(x)
        #print(logits.size())
        logits = torch.squeeze(logits)
        return logits.double()


# --------------------------------------------------------------------------------
# ------------------ Train loop --------------------------------------------------
# --------------------------------------------------------------------------------
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device).double()

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"Training loss: {loss:>7f} ")




# --------------------------------------------------------------------------------
# ------------------ Test loop --------------------------------------------------
# --------------------------------------------------------------------------------
def test(dataloader, model):
    size = len(dataloader.dataset)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
    test_loss /= size
    print(f"Test Avg loss: {test_loss:>8f} \n")

# --------------------------------------------------------------------------------
# ------------------ Script --------------------------------------------------
# --------------------------------------------------------------------------------

# create dataset
print("load dataset")
dataset = RoboSkateImageDataset()

batch_size = 64
test_split = .2
shuffle_dataset = True
random_seed= 42

# Creating data indices for training and test splits:
dataset_size = len(dataset)
indices = list(range(dataset_size))
split = int(np.floor(test_split * dataset_size))
if shuffle_dataset :
    np.random.seed(random_seed)
    np.random.shuffle(indices)
train_indices, test_indices = indices[split:], indices[:split]

# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                           sampler=train_sampler)
test_dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                                sampler=test_sampler)






# Display one image and label.
'''
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")
img = train_features[0].squeeze().permute(1,2,0)
label = train_labels[0]
plt.imshow(img)
plt.title(f"Label: {label}")
plt.show()
print(f"Label: {label}")
'''



# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))

# create Model
model = CNNNetwork().to(device)
#print(model)

# define lossfunktion and optimizer
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=5e-6)

if os.path.isfile("./scripts/python/RoboSkate/agent_models/CNN/model.pth"):
    # load previously created model trained model
    model.load_state_dict(torch.load("./scripts/python/RoboSkate/agent_models/CNN/model.pth"))
    print("Found previously model.")


# learning
epochs = 5000
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model)
print("Done!")



torch.save(model.state_dict(), "./scripts/python/RoboSkate/agent_models/CNN/model.pth")
print("Saved PyTorch Model State to ./scripts/python/RoboSkate/agent_models/CNN/model.pth")