import cv2
import numpy as np
import queue
import threading
import os
import sys
import json
import timeit
from scipy.signal import savgol_filter, savgol_coeffs
#import the accelerated module
build_dir = os.path.relpath("build")
sys.path.append(build_dir)
import build.plastic_sorting as wrapper
class HyperSpectralCamera(cv2.VideoCapture):
def __init__(self, path='/dev/qtec/video0', API=cv2.CAP_V4L2, *args, **kwargs):
os.system(r'quark-ctl config load --file "/home/root/newhc/livetests/config.json"')
super().__init__(path, API, *args, **kwargs)
with open('/home/root/newhc/livetests/config.json') as f:
settings = json.load(f)
self.set(cv2.CAP_PROP_FRAME_WIDTH, settings['crops'][0][2])
self.set(cv2.CAP_PROP_FRAME_HEIGHT, settings['crops'][0][3])
self.set(cv2.CAP_PROP_FPS, settings['framerate'])
self.set(cv2.CAP_PROP_EXPOSURE, settings['controls']['exposure_time_absolute']['value'])
# queue for holding latest frame
self.queue = queue.Queue()
# start reader thread
self.reader_thread = threading.Thread(target=self._reader, daemon=True)
self.reader_thread.start()
def _reader(self):
while True:
ret, frame = super().read()
if not ret:
break
if not self.queue.empty():
try:
self.queue.get_nowait()
except queue.Empty:
pass
self.queue.put(frame)
def read(self):
# ret, frame = super().read()
frame = self.queue.get(timeout=5)
frame = frame[:, :, 0] # opencv always reads in (height, width, channels) thus the output is (channels, pixels, 3) where the 3 axis is redundant
frame = frame.transpose() # transpose to (pixels, channels)
return frame
def convert_colors(frame):
return frame
colors = [
(255, 0, 0), # Class 0 - Red
(0, 255, 0), # Class 1 - Green
(0, 0, 255), # Class 2 - Blue
(255, 255, 0), # Class 3 - Cyan
(255, 0, 255), # Class 4 - Magenta
(0, 255, 255), # Class 5 - Yellow
(128, 0, 0), # Class 6 - Maroon
(0, 128, 0), # Class 7 - Dark Green
(0, 0, 128), # Class 8 - Navy
(128, 128, 128) # Class 9 - Gray
]
color_frame = np.zeros((frame.shape[0], frame.shape[1], 3), dtype=np.uint8)
for i in range(10):
color_frame[frame == i] = colors[i]
return color_frame
#load the noncalibrated SVM model
pca_components_noncalibrated = np.load("models/noncalibratedSVM/pca_model_components.npy").astype(np.float32).T
pca_mean_noncalibrated = np.load("models/noncalibratedSVM/pca_model_mean.npy").astype(np.float32)
svm_coef_noncalibrated = np.load("models/noncalibratedSVM/svm_model_coef.npy").astype(np.float32).T
svm_intercept_noncalibrated = np.load("models/noncalibratedSVM/svm_model_intercept.npy").astype(np.float32)
#load the calibrated SVM model
pca_components_calibrated = np.load("models/calibratedSVM/pca_model_components.npy").astype(np.float32).T
pca_mean_calibrated = np.load("models/calibratedSVM/pca_model_mean.npy").astype(np.float32)
svm_coef_calibrated = np.load("models/calibratedSVM/svm_model_coef.npy").astype(np.float32).T
svm_intercept_calibrated = np.load("models/calibratedSVM/svm_model_intercept.npy").astype(np.float32)
white_matrix = np.load("models/calibratedSVM/whiteMatrix.npy").astype(np.float32)
#load the GPU accelerated SVM model
pca_components_calibrated_GPU = np.ascontiguousarray(np.load("models/calibratedSVM/pca_model_components.npy").astype(np.float32))
pca_mean_calibrated_GPU = np.load("models/calibratedSVM/pca_model_mean.npy").astype(np.float32)
svm_coef_calibrated_GPU = np.load("models/calibratedSVM/svm_model_coef.npy").astype(np.float32)
svm_intercept_calibrated_GPU = np.load("models/calibratedSVM/svm_model_intercept.npy").astype(np.float32)
white_matrix_GPU = np.ascontiguousarray(np.load("models/calibratedSVM/whiteMatrix.npy").astype(np.float32))
impl = wrapper.Kernel()
savgol_coef = savgol_coeffs(13, 3, 0)
impl.load_model(pca_components_calibrated_GPU, pca_mean_calibrated_GPU,
SVM=svm_coef_calibrated_GPU,
SVM_intercept=svm_intercept_calibrated_GPU,
whitebalance=white_matrix_GPU,
savgol_coef = savgol_coef,
Wl_min=0,
Wl_max=600,
savgol_toggle=1,
normalization_toggle=0,
absorption_toggle=0)
output = np.full((white_matrix.shape[0]), 0, dtype=np.uint8)
def noncalibrated_svm(line, print_time=False, verbose=False):
impl.runCPU_minimal(line, output, print_time=print_time, verbose=verbose)
return output
def calibrated_svm(line, windowlenght=13, polyorder=3, deriv=0, print_time=False, verbose=False):
if print_time and verbose:
savgol_time = 0
white_time = 0
PCA_time = 0
SVM_time = 0
start = timeit.default_timer()
line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest")
stop = timeit.default_timer()
savgol_time = stop - start
start = timeit.default_timer()
line_white = line / white_matrix * 255
stop = timeit.default_timer()
white_time = stop - start
start = timeit.default_timer()
line_centered = line_white - pca_mean_calibrated
stop = timeit.default_timer()
PCA_time = stop - start
start = timeit.default_timer()
line_pca = np.dot(line_centered, pca_components_calibrated)
line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated
prediction = np.argmax(line_svm, axis=1)
stop = timeit.default_timer()
SVM_time = stop - start
print(f"(Python) White balance time: {white_time * 1000 * 1000} mus")
print(f"(Python) Savitzky-Golay time: {savgol_time * 1000 * 1000} mus")
print(f"(Python) PCA time: {PCA_time * 1000 * 1000} mus")
print(f"(Python) SVM time: {SVM_time * 1000 * 1000} mus")
return prediction
if print_time:
start = timeit.default_timer()
line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest")
line_white = line / white_matrix * 255
line_centered = line_white - pca_mean_calibrated
line_pca = np.dot(line_centered, pca_components_calibrated)
line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated
prediction = np.argmax(line_svm, axis=1)
stop = timeit.default_timer()
print(f"(Python) Computation time: {(stop - start) * 1000 * 1000} mus")
return prediction
if not print_time:
line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest")
line_white = line / white_matrix * 255
line_centered = line_white - pca_mean_calibrated
line_pca = np.dot(line_centered, pca_components_calibrated)
line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated
prediction = np.argmax(line_svm, axis=1)
return prediction
def gpu_svm(line, print_time=False, verbose=False):
impl.run(line, output, print_time=print_time, verbose=verbose)
return output
def cpu_svm(line, print_time=False, verbose=False):
impl.runCPU(line, output, print_time=print_time, verbose=verbose)
return output
# test the performance using the 3 to 25 principal components
cam = HyperSpectralCamera()
frame = cam.read()
pixel, channel = frame.shape
buffer_image = np.zeros((300, pixel), dtype=np.uint8)
default_display_channel = channel//2
replicates = 3
# test total timings step 1-6
for i in range(replicates + 1):
# "warm up" the camera
warmup_time = 0
warmup_start = timeit.default_timer()
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = frame[:, default_display_channel]
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', cv2.cvtColor(buffer_image, cv2.COLOR_GRAY2BGR))
cv2.waitKey(1)
warmup_end = timeit.default_timer()
warmup_time = warmup_end - warmup_start
# test the minimal model
minimal_time = 0
minimal_start = timeit.default_timer()
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = noncalibrated_svm(frame)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
minimal_end = timeit.default_timer()
minimal_time = minimal_end - minimal_start
#test python implementation
python_time = 0
python_start = timeit.default_timer()
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = calibrated_svm(frame)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
python_end = timeit.default_timer()
python_time = python_end - python_start
# test model on cpu
calibrated_svm_time = 0
calibrated_svm_start = timeit.default_timer()
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = cpu_svm(frame)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
calibrated_svm_end = timeit.default_timer()
calibrated_svm_time = calibrated_svm_end - calibrated_svm_start
# test GPU accelerated svm model
gpu_svm_time = 0
gpu_svm_start = timeit.default_timer()
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = gpu_svm(frame)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
gpu_svm_end = timeit.default_timer()
gpu_svm_time = gpu_svm_end - gpu_svm_start
print(f"(No processing) Warm up time: {warmup_time * 1000 * 1000} mus")
print(f"(CPU minimal) Total time: {minimal_time * 1000 * 1000} mus")
print(f"(Python) Total time: {python_time * 1000 * 1000} mus")
print(f"(CPU) Total time: {calibrated_svm_time * 1000 * 1000} mus")
print(f"(GPU) Total time: {gpu_svm_time * 1000 * 1000} mus")
# test only computation time (step 2-5)
# for i in range(replicates + 1):
# # "warm up" the camera
# for _ in range(1000):
# frame = cam.read()
# # test the minimal model
# for _ in range(1000):
# frame = cam.read()
# buffer_image[0, :] = noncalibrated_svm(frame, print_time=True)
# buffer_image = np.roll(buffer_image, -1, axis=0)
# cv2.imshow('frame', convert_colors(buffer_image))
# cv2.waitKey(1)
# #test python implementation
# python_time = 0
# for _ in range(1000):
# frame = cam.read()
# python_start = timeit.default_timer()
# buffer_image[0, :] = calibrated_svm(frame, print_time=True)
# python_end = timeit.default_timer()
# buffer_image = np.roll(buffer_image, -1, axis=0)
# cv2.imshow('frame', convert_colors(buffer_image))
# cv2.waitKey(1)
# python_time += python_end - python_start
# # test model on cpu
# CPU_time = 0
# for _ in range(1000):
# frame = cam.read()
# calibrated_svm_start = timeit.default_timer()
# buffer_image[0, :] = cpu_svm(frame, print_time=True)
# calibrated_svm_end = timeit.default_timer()
# buffer_image = np.roll(buffer_image, -1, axis=0)
# cv2.imshow('frame', convert_colors(buffer_image))
# cv2.waitKey(1)
# CPU_time += calibrated_svm_end - calibrated_svm_start
# test GPU accelerated svm model
GPU_time = 0
for _ in range(1000):
frame = cam.read()
gpu_svm_start = timeit.default_timer()
buffer_image[0, :] = gpu_svm(frame, print_time=True)
gpu_svm_end = timeit.default_timer()
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
GPU_time += gpu_svm_end - gpu_svm_start
# test individual timings
for _ in range(replicates + 1):
# "warm up" the camera
for _ in range(1000):
frame = cam.read()
# test the minimal model
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = noncalibrated_svm(frame, print_time=True, verbose=True)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
#test python implementation
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = calibrated_svm(frame, print_time=True, verbose=True)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
# test model on cpu
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = cpu_svm(frame, print_time=True, verbose=True)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
# test GPU accelerated svm model
for _ in range(1000):
frame = cam.read()
buffer_image[0, :] = gpu_svm(frame, print_time=True, verbose=True)
buffer_image = np.roll(buffer_image, -1, axis=0)
cv2.imshow('frame', convert_colors(buffer_image))
cv2.waitKey(1)
cv2.destroyAllWindows()