#!/usr/bin/env python """ Author: Gillian Minnehan Date: 11/19/2020 """ import sys import math import utils import numpy as np import matplotlib.pyplot as plt from scipy.spatial.distance import cdist from scipy.spatial import distance_matrix import random def GetTransform(Cp, Cq, CpTot, CqTot): """ Performs point set registration given a set of correspondances, Cp and Cq. Returns rotation matrix R and translation t to apply to source point cloud to best align the point sets Cp - list of numpy 3 x 1 matricies of source point cloud points Cq - list of numpy 3 x 1 matricies of target point cloud points in order of correpondance with the matching index in Cp CpTot - 3 x 1 matrix equal to the summation of all points in Cp CqTot - 3 x 1 matrix equal to the summation of all points in Cq """ n = len(Cp) # number of correspondances p_centroid = CpTot/n # compute centroids and centered vectors q_centroid = CqTot/n X = utils.convert_pc_to_matrix(Cp) - p_centroid Y = utils.convert_pc_to_matrix(Cq) - q_centroid S = (X*Y.T) u, s, vh = np.linalg.svd(S) # compute SVD of covariance matrix of centered vectors # compute R and t mat = [[1, 0, 0], [0, 1, 0], [0, 0, np.linalg.det(vh.T*u.T)]] #mat = np.identity(16) #mat[15,15] = np.linalg.det(vh.T*u.T) R = vh.T*mat*u.T t = q_centroid - R*p_centroid return R, t def updateP(P, R, t): """ Updates point cloud P with point set registration values P - a list of numpy 3 x 1 matrices that represent the source point cloud R - rotation matrix computed from point set registration t - translation computed from point set registration """ for i in range(0, len(P)): P[i] = R*P[i] + t return P def totErr(R, t, Cp, Cq): """ Computes total error for fit using point set registration variables R and t Cp - list of numpy 3 x 1 matricies of source point cloud points Cq - list of numpy 3 x 1 matricies of target point cloud points in order of correpondance with the matching index in Cp R - rotation matrix computed from point set registration t - translation computed from point set registration """ total = 0 for i in range(0, len(Cp)): total += pow(np.linalg.norm((R*Cp[i] + t) - Cq[i]), 2) #print(total) return total def similarSig(pSig, qSig, t): """ Returns whether pSig and qSig are similar. pSig - 8 x 1 matrix signature for point in P qSig - 8 x 1 matrix signature for point in Q t - accepted threshold of similarity """ # for i in range(len(pSig)): # if abs(qSig[i] - pSig[i]) >= t: # return False if np.linalg.norm(pSig-qSig) >= t: return False return True def sacError(e, t): """ Compute error using Huber penalty measure. """ if abs(e) <= t: return 0.5 * (e ** 2) else: return 0.5 * t * (2 * abs(e) - t) def icp(P, Q, pSig, qSig, eta, sacIA): """ Returns input P point cloud aligned to target Q point cloud P - a list of numpy 3 x 1 matrices that represent the source point cloud Q - a list of numpy 3 x 1 matrices that represent the target point cloud eta - accepted threshold of error for the computed fit """ errors = [] prevErr = float("inf") firstRun = True while True: CpTot = np.asmatrix([0.0,0.0,0.0]).T CqTot = np.asmatrix([0.0,0.0,0.0]).T # add pfh signature computation all_dists = [] if sacIA and firstRun: bestErr = float("inf") bestR = 0 bestT = 0 numSample = 10 pairDist = distance_matrix(np.squeeze(P, axis=2), np.squeeze(P, axis=2)) distanceThreshold = np.median(pairDist) ids = np.argwhere(pairDist > distanceThreshold) samplePoints = [] wrong = False for i in range(10): random.seed(0) samplePoints.append(ids[random.randint(0, len(ids))][0]) # random point for point1 in range(len(P)): if point1 in samplePoints: continue for point2 in samplePoints: if point1 > point2: if [point2, point1] not in ids: wrong = True break else: if [point1, point2] not in ids: wrong = True break # if distance_matrix(P[point1], P[point2]) < distanceThreshold: # wrong = True # break if wrong: wrong = False continue samplePoints.append(point1) if len(samplePoints) >= numSample: break correspondances = {} for pIndex in samplePoints: # find a list of points in Q with similar FPFH similarSignatures = [] sig1 = pSig[pIndex] for qIndex in range(len(qSig)): sig2 = qSig[qIndex] if similarSig(sig1, sig2, 0.1): # if signatures similar similarSignatures.append(qIndex) if len(similarSignatures) == 5: # stop when we find 5 similar sig break correspondances[pIndex] = similarSignatures # print(correspondances) for key in correspondances: val = correspondances[key] correspondances[key] = val[random.randint(0, len(val) - 1)] # random select one sig Cp = [] Cq = [] for point1 in correspondances: point2 = correspondances[point1] CpTot += P[point1] CqTot += Q[point2] Cp.append(P[point1]) Cq.append(Q[point2]) R, t = GetTransform(Cp, Cq, CpTot, CqTot) # compute the transformation error = totErr(R, t, Cp, Cq) error = sacError(error, 0.5) if error < bestErr: bestErr = error bestR = R bestT = t print(error) firstRun = False P = updateP(P, bestR, bestT) print("Transition to normal ICP:") else: Cq = [] if firstRun: all_dists = cdist(np.squeeze(pSig, axis=2), np.squeeze(qSig, axis=2), 'euclidean') firstRun = False else: all_dists = cdist(np.squeeze(P, axis=2), np.squeeze(Q, axis=2), 'euclidean') indices = all_dists.argmin(axis=1) distances = all_dists[np.arange(all_dists.shape[0]), indices] for i in range(len(P)): CpTot += P[i] CqTot += Q[indices[i]] Cq.append(Q[indices[i]]) R, t = GetTransform(P, Cq, CpTot, CqTot) # compute the transformation error = totErr(R, t, P, Cq) errors.append(error) print(error) if(error < eta): # fit within in threshold return P, errors prevErr = error P = updateP(P, R, t) # update all p # print(error) def icp2(P, Q, eta): """ Returns input P point cloud aligned to target Q point cloud P - a list of numpy 3 x 1 matrices that represent the source point cloud Q - a list of numpy 3 x 1 matrices that represent the target point cloud eta - accepted threshold of error for the computed fit """ errors = [] prevErr = float("inf") while True: Cq = [] CpTot = np.asmatrix([0.0,0.0,0.0]).T CqTot = np.asmatrix([0.0,0.0,0.0]).T # add pfh signature computation all_dists = cdist(np.squeeze(P, axis=2), np.squeeze(Q, axis=2), 'euclidean') indices = all_dists.argmin(axis=1) distances = all_dists[np.arange(all_dists.shape[0]), indices] for i in range(len(P)): CpTot += P[i] CqTot += Q[indices[i]] Cq.append(Q[indices[i]]) R, t = GetTransform(P, Cq, CpTot, CqTot) # compute the transformation error = totErr(R, t, P, Cq) errors.append(error) if(error < eta): # fit within in threshold return P, errors prevErr = error P = updateP(P, R, t) # update all p print(error) def main(): # Import the clouds pc_source = utils.load_pc('/home/parallels/Documents/eecs498/final_project/data/hw4/cloud_icp_source.csv') pc_target = utils.load_pc('/home/parallels/Documents/eecs498/final_project/data/hw4/cloud_icp_target0.csv') # Change file for new target # Compute the aligned point cloud and errors per iterations pc_source, errors = icp(pc_source, pc_target, 0.05) # Plot aligned point cloud and target point cloud utils.view_pc([pc_source, pc_target], None, ['b', 'r'], ['o', '^']) plt.axis([-0.15, 0.15, -0.15, 0.15]) # Plot error vs. num iterations fig = plt.figure() ax = fig.add_subplot(1,1,1) xdata = np.arange(0,len(errors)) ax.plot(xdata, errors, color='tab:blue') ax.set_xlabel('num iterations') ax.set_ylabel('error') ax.set_title('Error v. Iterations for ICP') raw_input("Press enter to end:") if __name__ == '__main__': main()