Point-Feature-Histogram / src / hw4_icp.py
hw4_icp.py
Raw
#!/usr/bin/env python

"""
Author: Gillian Minnehan
Date: 11/19/2020

"""

import sys
import math
import utils
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
from scipy.spatial import distance_matrix
import random

def GetTransform(Cp, Cq, CpTot, CqTot):
    """

    Performs point set registration given a set of correspondances, Cp and Cq.

    Returns rotation matrix R and translation t to apply to source point cloud
    to best align the point sets

    Cp - list of numpy 3 x 1 matricies of source point cloud points
    Cq - list of numpy 3 x 1 matricies of target point cloud points in order
    of correpondance with the matching index in Cp
    CpTot - 3 x 1 matrix equal to the summation of all points in Cp
    CqTot - 3 x 1 matrix equal to the summation of all points in Cq

    """
    n = len(Cp) # number of correspondances
    p_centroid = CpTot/n # compute centroids and centered vectors
    q_centroid = CqTot/n
    X = utils.convert_pc_to_matrix(Cp) - p_centroid
    Y = utils.convert_pc_to_matrix(Cq) - q_centroid
    S = (X*Y.T)
    u, s, vh = np.linalg.svd(S) # compute SVD of covariance matrix of centered vectors
    
    # compute R and t
    mat = [[1, 0, 0], [0, 1, 0], [0, 0, np.linalg.det(vh.T*u.T)]]
    #mat = np.identity(16)
    #mat[15,15] = np.linalg.det(vh.T*u.T)
    R = vh.T*mat*u.T
    t = q_centroid - R*p_centroid
    return R, t

def updateP(P, R, t):
    """

    Updates point cloud P with point set registration values

    P - a list of numpy 3 x 1 matrices that represent the source point cloud
    R - rotation matrix computed from point set registration
    t - translation computed from point set registration

    """
    for i in range(0, len(P)):
        P[i] = R*P[i] + t
    return P

def totErr(R, t, Cp, Cq):
    """

    Computes total error for fit using point set registration variables
        R and t

    Cp - list of numpy 3 x 1 matricies of source point cloud points
    Cq - list of numpy 3 x 1 matricies of target point cloud points in order
    of correpondance with the matching index in Cp
    R - rotation matrix computed from point set registration
    t - translation computed from point set registration

    """
    total = 0
    for i in range(0, len(Cp)):
        total += pow(np.linalg.norm((R*Cp[i] + t) - Cq[i]), 2)
    #print(total)
    return total

def similarSig(pSig, qSig, t):
    """

    Returns whether pSig and qSig are similar.

    pSig - 8 x 1 matrix signature for point in P
    qSig - 8 x 1 matrix signature for point in Q
    t - accepted threshold of similarity

    """
    # for i in range(len(pSig)):
    #     if abs(qSig[i] - pSig[i]) >= t:
    #         return False
    if np.linalg.norm(pSig-qSig) >= t:
        return False
    return True

def sacError(e, t):
    """

    Compute error using Huber penalty measure.

    """
    if abs(e) <= t:
        return 0.5 * (e ** 2)
    else:
        return 0.5 * t * (2 * abs(e) - t)

def icp(P, Q, pSig, qSig, eta, sacIA):
    """

    Returns input P point cloud aligned to target Q point cloud

    P - a list of numpy 3 x 1 matrices that represent the source point cloud
    Q - a list of numpy 3 x 1 matrices that represent the target point cloud
    eta - accepted threshold of error for the computed fit

    """
    errors = []
    prevErr = float("inf")
    firstRun = True
    while True:
        CpTot = np.asmatrix([0.0,0.0,0.0]).T
        CqTot = np.asmatrix([0.0,0.0,0.0]).T
        # add pfh signature computation
        all_dists = []
        if sacIA and firstRun:
            bestErr = float("inf")
            bestR = 0
            bestT = 0
            numSample = 10
            pairDist = distance_matrix(np.squeeze(P, axis=2), np.squeeze(P, axis=2))
            distanceThreshold = np.median(pairDist)
            ids = np.argwhere(pairDist > distanceThreshold)
            samplePoints = []
            wrong = False
            for i in range(10):
                random.seed(0)
                samplePoints.append(ids[random.randint(0, len(ids))][0]) # random point
                for point1 in range(len(P)):
                    if point1 in samplePoints:
                        continue
                    for point2 in samplePoints:
                        if point1 > point2:
                            if [point2, point1] not in ids:
                                wrong = True
                                break
                        else:
                            if [point1, point2] not in ids:
                                wrong = True
                                break
                        # if distance_matrix(P[point1], P[point2]) < distanceThreshold:
                        #     wrong = True
                        #     break
                    if wrong:
                        wrong = False
                        continue
                    samplePoints.append(point1)
                    if len(samplePoints) >= numSample:
                        break

                correspondances = {}
                for pIndex in samplePoints: # find a list of points in Q with similar FPFH
                    similarSignatures = []
                    sig1 = pSig[pIndex]
                    for qIndex in range(len(qSig)):
                        sig2 = qSig[qIndex]
                        if similarSig(sig1, sig2, 0.1): # if signatures similar
                            similarSignatures.append(qIndex)
                        if len(similarSignatures) == 5: # stop when we find 5 similar sig
                            break
                    correspondances[pIndex] = similarSignatures 
                # print(correspondances)
                
                for key in correspondances:
                    val = correspondances[key]
                    correspondances[key] = val[random.randint(0, len(val) - 1)] # random select one sig
                Cp = []
                Cq = []
                for point1 in correspondances:
                    point2 = correspondances[point1]
                    CpTot += P[point1]
                    CqTot += Q[point2]
                    Cp.append(P[point1])
                    Cq.append(Q[point2])
                R, t = GetTransform(Cp, Cq, CpTot, CqTot) # compute the transformation
                error = totErr(R, t, Cp, Cq)
                error = sacError(error, 0.5)
                if error < bestErr:
                    bestErr = error
                    bestR = R
                    bestT = t
                print(error)
            firstRun = False
            P = updateP(P, bestR, bestT)   
            print("Transition to normal ICP:")         
        else:
            Cq = []
            if firstRun:
                all_dists = cdist(np.squeeze(pSig, axis=2), np.squeeze(qSig, axis=2), 'euclidean')
                firstRun = False
            else:
                all_dists = cdist(np.squeeze(P, axis=2), np.squeeze(Q, axis=2), 'euclidean')
            indices = all_dists.argmin(axis=1)
            distances = all_dists[np.arange(all_dists.shape[0]), indices]
            for i in range(len(P)):
                CpTot += P[i]
                CqTot += Q[indices[i]]
                Cq.append(Q[indices[i]])
            R, t = GetTransform(P, Cq, CpTot, CqTot) # compute the transformation
            error = totErr(R, t, P, Cq)
            errors.append(error)
            print(error)
            if(error < eta): # fit within in threshold
                return P, errors
            prevErr = error
            P = updateP(P, R, t) # update all p
        # print(error)

def icp2(P, Q, eta):
    """

    Returns input P point cloud aligned to target Q point cloud

    P - a list of numpy 3 x 1 matrices that represent the source point cloud
    Q - a list of numpy 3 x 1 matrices that represent the target point cloud
    eta - accepted threshold of error for the computed fit

    """
    errors = []
    prevErr = float("inf")
    while True:
        Cq = []
        CpTot = np.asmatrix([0.0,0.0,0.0]).T
        CqTot = np.asmatrix([0.0,0.0,0.0]).T
        # add pfh signature computation
        all_dists = cdist(np.squeeze(P, axis=2), np.squeeze(Q, axis=2), 'euclidean')
        indices = all_dists.argmin(axis=1)
        distances = all_dists[np.arange(all_dists.shape[0]), indices]
        for i in range(len(P)):
            CpTot += P[i]
            CqTot += Q[indices[i]]
            Cq.append(Q[indices[i]])
        R, t = GetTransform(P, Cq, CpTot, CqTot) # compute the transformation
        error = totErr(R, t, P, Cq)
        errors.append(error)
        if(error < eta): # fit within in threshold
            return P, errors
        prevErr = error
        P = updateP(P, R, t) # update all p
        print(error)

def main():

    # Import the clouds
    pc_source = utils.load_pc('/home/parallels/Documents/eecs498/final_project/data/hw4/cloud_icp_source.csv')
    pc_target = utils.load_pc('/home/parallels/Documents/eecs498/final_project/data/hw4/cloud_icp_target0.csv') # Change file for new target
    
    # Compute the aligned point cloud and errors per iterations
    pc_source, errors = icp(pc_source, pc_target, 0.05)

    # Plot aligned point cloud and target point cloud
    utils.view_pc([pc_source, pc_target], None, ['b', 'r'], ['o', '^'])
    plt.axis([-0.15, 0.15, -0.15, 0.15])

    # Plot error vs. num iterations
    fig = plt.figure()
    ax = fig.add_subplot(1,1,1)
    xdata = np.arange(0,len(errors))
    ax.plot(xdata, errors, color='tab:blue')
    ax.set_xlabel('num iterations')
    ax.set_ylabel('error')
    ax.set_title('Error v. Iterations for ICP')

    raw_input("Press enter to end:")

if __name__ == '__main__':
    main()