Python-House-Price-Predictor / House Price Predictor GH / predictor.py
predictor.py
Raw
# data sourced from https://www.kaggle.com/code/jesucristo/1-house-prices-solution-top-1
import numpy as np
import pandas as pd
import datetime
import torch
import torch.nn as nn
from sklearn.preprocessing import LabelEncoder

# read only some pertinent columns for predicting sale price
# dropna drops null values
df=pd.read_csv('houseprice.csv',usecols=["SalePrice", "MSSubClass", "MSZoning", "LotFrontage", "LotArea",
                                         "Street", "YearBuilt", "LotShape", "1stFlrSF", "2ndFlrSF"]).dropna()
# 1200 viable properties after dropping, 10 features/attributes

# replace YearBuilt with Total Years to use house age instead
df['Total Years']=datetime.datetime.now().year-df['YearBuilt']
df.drop("YearBuilt",axis=1,inplace=True)

# all categorical features in df
cat_features=["MSSubClass", "MSZoning", "Street", "LotShape"]
# desired output prediction
out_feature="SalePrice"

# use LabelEncoder to write the categorical features as indices from 0 to len(unique(feature)) - 1 which is needed later for layer embedding
lbl_encoders={}
for feature in cat_features:
    lbl_encoders[feature]=LabelEncoder()
    df[feature]=lbl_encoders[feature].fit_transform(df[feature])

# use numpy to place all categorical features into one array and flip values vertically using second argument=1 in order to make tensors
cat_features=np.stack([df['MSSubClass'],df['MSZoning'],df['Street'],df['LotShape']],1)
# convert to tensors
cat_features=torch.tensor(cat_features,dtype=torch.int64)

# make list of all continuous features excluding SalePrice which is the desired output
cont_features=[]
for i in df.columns:
    if i in ["MSSubClass", "MSZoning", "Street", "LotShape","SalePrice"]:
        pass
    else:
        cont_features.append(i)

# place all continuous features into an array, create tensor
cont_values=np.stack([df[i].values for i in cont_features],axis=1)
cont_values=torch.tensor(cont_values,dtype=torch.float)

# convert desired SalePrice output to a tensor, use reshape to make it 2D
y=torch.tensor(df['SalePrice'].values,dtype=torch.float).reshape(-1,1)

# embedding layers, necessary only for the categorical features
# need dimensions of the cat features, the # of unique categories in each feature
cat_dims=[len(df[col].unique()) for col in ["MSSubClass", "MSZoning", "Street", "LotShape"]]

# embedding dimension tuple rule of thumb follows (cat_dim, min(50, (x + 1) // 2))
embedding_dim=[(x, min(50, (x + 1) // 2)) for x in cat_dims]

# neural network setup
# use ModuleList to store embedded layers of the given category dimensions, Embedding converts the cat data into vectors for the nn
embed_representation=nn.ModuleList([nn.Embedding(inp,out) for inp,out in embedding_dim])

pd.set_option('display.max_rows', 500)
# embedding_val holds the result of applying the embedded layer tuples on the cat features (4 of them), so it has 4 tensors
embedding_val=[]
for i,e in enumerate(embed_representation):
    embedding_val.append(e(cat_features[:,i]))

# concatenate the list of tensors and flip column-wise
z = torch.cat(embedding_val, 1)

# set dropout during training to randomly deactivate 40% of neurons during each forward pass so that model doesn't rely too much on the same neurons, prevents overfitting
dropout=nn.Dropout(.4)
final_embed=dropout(z)
# done embedding layers


# implement the nn
# feed forward nn, so the flow of data is one-way
class FeedForwardNN(nn.Module):
    # args: dimension tuples, # of continuous features, output value, list containing # of neurons to use in each layer, dropout probability
    def __init__(self, embedding_dim, n_cont, out_sz, layers, p=0.5):
        super().__init__()
        self.embeds = nn.ModuleList([nn.Embedding(inp,out) for inp,out in embedding_dim])
        self.emb_drop = nn.Dropout(p)
        # normalize the # of continuous features
        self.bn_cont = nn.BatchNorm1d(n_cont)

        layerlist = []
        # sum all embedding outputs, min(50, (x + 1) // 2)
        n_emb = sum((out for inp,out in embedding_dim))
        # input in the embedding outputs + # of continuous features
        n_in = n_emb + n_cont

        # i is # of neurons used in hidden layer
        for i in layers:
            # add linear transformation layer
            layerlist.append(nn.Linear(n_in,i))
            # add ReLU activation function layer for non-linearity, zeroing out negative tensors
            layerlist.append(nn.ReLU(inplace=True))
            layerlist.append(nn.BatchNorm1d(i))
            layerlist.append(nn.Dropout(p))
            n_in = i

        # final layer with 1 output, SalePrice
        layerlist.append(nn.Linear(layers[-1],out_sz))
        # model flows through layers in the order they were added
        self.layers = nn.Sequential(*layerlist)

    def forward(self, x_cat, x_cont):
        embeddings = []
        # apply embeddings, set dropout
        for i,e in enumerate(self.embeds):
            embeddings.append(e(x_cat[:,i]))
        x = torch.cat(embeddings, 1)
        x = self.emb_drop(x)
        # concatenate all features
        x_cont = self.bn_cont(x_cont)
        x = torch.cat([x, x_cont], 1)
        x = self.layers(x)
        return x


# set random seed
torch.manual_seed(100)
# create model
model=FeedForwardNN(embedding_dim,len(cont_features),1,[100,50],p=0.4)

# training the nn, use mean-squared error for loss function
loss_function=nn.MSELoss()
# define optimizer, parameters from model and learning rate
optimizer=torch.optim.Adam(model.parameters(),lr=0.01)

# training splits to use, 1200 houses
batch_size=1200
# use 15% of the batch as a test size
test_size=int(batch_size*0.15)
# divide training, testing of cat features/cont features/sale price based on test size
train_categorical=cat_features[:batch_size-test_size]
test_categorical=cat_features[batch_size-test_size:batch_size]
train_cont=cont_values[:batch_size-test_size]
test_cont=cont_values[batch_size-test_size:batch_size]
y_train=y[:batch_size-test_size]
y_test=y[batch_size-test_size:batch_size]

# actual training
# using 5000 iterations through the nn
epochs=5000
final_losses=[]
for i in range(epochs):
    i=i+1
    # training categorical, continuous features
    y_pred=model(train_categorical,train_cont)
    # using RMSE
    loss=torch.sqrt(loss_function(y_pred,y_train))
    # track computed loss
    final_losses.append(loss)
    # print every 10th epoch and loss for view
    if i % 10==1:
        print("Epoch number: {} and the loss : {}".format(i,loss.item()))
    # reset optimizer
    optimizer.zero_grad()
    # back propagation
    loss.backward()
    # adjust weights based on calculated loss
    optimizer.step()

# validating test data
y_pred=""
# no_grad() disables gradient calculations temporarily for validating
with torch.no_grad():
    y_pred=model(test_categorical,test_cont)
    loss=torch.sqrt(loss_function(y_pred,y_test))

# test results - only showing 180 results as per training splits
data_verify=pd.DataFrame(y_test.tolist(),columns=["Test"])
# prediction results
data_predicted=pd.DataFrame(y_pred.tolist(),columns=["Prediction"])

# concatenate to one table for view
final_output=pd.concat([data_verify,data_predicted],axis=1)
final_output['Difference']=final_output['Test']-final_output['Prediction']

# final output displaying actual house price vs. the ML algorithms' predicted house price based on the given categorical and continuous features
print(final_output)

# saving the model
torch.save(model,'HousePrice.pt')
torch.save(model.state_dict(),'HouseWeights.pt')

# to load the model
# embs_size=[(15, 8), (5, 3), (2, 1), (4, 2)]
# model1=FeedForwardNN(embs_size,5,1,[100,50],p=0.4)
# model1.load_state_dict(torch.load('HouseWeights.pt'))
# model1.eval()