import cv2 import numpy as np import queue import threading import os import sys import json import timeit from scipy.signal import savgol_filter, savgol_coeffs #import the accelerated module build_dir = os.path.relpath("build") sys.path.append(build_dir) import build.plastic_sorting as wrapper class HyperSpectralCamera(cv2.VideoCapture): def __init__(self, path='/dev/qtec/video0', API=cv2.CAP_V4L2, *args, **kwargs): os.system(r'quark-ctl config load --file "/home/root/newhc/livetests/config.json"') super().__init__(path, API, *args, **kwargs) with open('/home/root/newhc/livetests/config.json') as f: settings = json.load(f) self.set(cv2.CAP_PROP_FRAME_WIDTH, settings['crops'][0][2]) self.set(cv2.CAP_PROP_FRAME_HEIGHT, settings['crops'][0][3]) self.set(cv2.CAP_PROP_FPS, settings['framerate']) self.set(cv2.CAP_PROP_EXPOSURE, settings['controls']['exposure_time_absolute']['value']) # queue for holding latest frame self.queue = queue.Queue() # start reader thread self.reader_thread = threading.Thread(target=self._reader, daemon=True) self.reader_thread.start() def _reader(self): while True: ret, frame = super().read() if not ret: break if not self.queue.empty(): try: self.queue.get_nowait() except queue.Empty: pass self.queue.put(frame) def read(self): # ret, frame = super().read() frame = self.queue.get(timeout=5) frame = frame[:, :, 0] # opencv always reads in (height, width, channels) thus the output is (channels, pixels, 3) where the 3 axis is redundant frame = frame.transpose() # transpose to (pixels, channels) return frame def convert_colors(frame): return frame colors = [ (255, 0, 0), # Class 0 - Red (0, 255, 0), # Class 1 - Green (0, 0, 255), # Class 2 - Blue (255, 255, 0), # Class 3 - Cyan (255, 0, 255), # Class 4 - Magenta (0, 255, 255), # Class 5 - Yellow (128, 0, 0), # Class 6 - Maroon (0, 128, 0), # Class 7 - Dark Green (0, 0, 128), # Class 8 - Navy (128, 128, 128) # Class 9 - Gray ] color_frame = np.zeros((frame.shape[0], frame.shape[1], 3), dtype=np.uint8) for i in range(10): color_frame[frame == i] = colors[i] return color_frame #load the noncalibrated SVM model pca_components_noncalibrated = np.load("models/noncalibratedSVM/pca_model_components.npy").astype(np.float32).T pca_mean_noncalibrated = np.load("models/noncalibratedSVM/pca_model_mean.npy").astype(np.float32) svm_coef_noncalibrated = np.load("models/noncalibratedSVM/svm_model_coef.npy").astype(np.float32).T svm_intercept_noncalibrated = np.load("models/noncalibratedSVM/svm_model_intercept.npy").astype(np.float32) #load the calibrated SVM model pca_components_calibrated = np.load("models/calibratedSVM/pca_model_components.npy").astype(np.float32).T pca_mean_calibrated = np.load("models/calibratedSVM/pca_model_mean.npy").astype(np.float32) svm_coef_calibrated = np.load("models/calibratedSVM/svm_model_coef.npy").astype(np.float32).T svm_intercept_calibrated = np.load("models/calibratedSVM/svm_model_intercept.npy").astype(np.float32) white_matrix = np.load("models/calibratedSVM/whiteMatrix.npy").astype(np.float32) #load the GPU accelerated SVM model pca_components_calibrated_GPU = np.ascontiguousarray(np.load("models/calibratedSVM/pca_model_components.npy").astype(np.float32)) pca_mean_calibrated_GPU = np.load("models/calibratedSVM/pca_model_mean.npy").astype(np.float32) svm_coef_calibrated_GPU = np.load("models/calibratedSVM/svm_model_coef.npy").astype(np.float32) svm_intercept_calibrated_GPU = np.load("models/calibratedSVM/svm_model_intercept.npy").astype(np.float32) white_matrix_GPU = np.ascontiguousarray(np.load("models/calibratedSVM/whiteMatrix.npy").astype(np.float32)) impl = wrapper.Kernel() savgol_coef = savgol_coeffs(13, 3, 0) impl.load_model(pca_components_calibrated_GPU, pca_mean_calibrated_GPU, SVM=svm_coef_calibrated_GPU, SVM_intercept=svm_intercept_calibrated_GPU, whitebalance=white_matrix_GPU, savgol_coef = savgol_coef, Wl_min=0, Wl_max=600, savgol_toggle=1, normalization_toggle=0, absorption_toggle=0) output = np.full((white_matrix.shape[0]), 0, dtype=np.uint8) def noncalibrated_svm(line, print_time=False, verbose=False): impl.runCPU_minimal(line, output, print_time=print_time, verbose=verbose) return output def calibrated_svm(line, windowlenght=13, polyorder=3, deriv=0, print_time=False, verbose=False): if print_time and verbose: savgol_time = 0 white_time = 0 PCA_time = 0 SVM_time = 0 start = timeit.default_timer() line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest") stop = timeit.default_timer() savgol_time = stop - start start = timeit.default_timer() line_white = line / white_matrix * 255 stop = timeit.default_timer() white_time = stop - start start = timeit.default_timer() line_centered = line_white - pca_mean_calibrated stop = timeit.default_timer() PCA_time = stop - start start = timeit.default_timer() line_pca = np.dot(line_centered, pca_components_calibrated) line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated prediction = np.argmax(line_svm, axis=1) stop = timeit.default_timer() SVM_time = stop - start print(f"(Python) White balance time: {white_time * 1000 * 1000} mus") print(f"(Python) Savitzky-Golay time: {savgol_time * 1000 * 1000} mus") print(f"(Python) PCA time: {PCA_time * 1000 * 1000} mus") print(f"(Python) SVM time: {SVM_time * 1000 * 1000} mus") return prediction if print_time: start = timeit.default_timer() line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest") line_white = line / white_matrix * 255 line_centered = line_white - pca_mean_calibrated line_pca = np.dot(line_centered, pca_components_calibrated) line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated prediction = np.argmax(line_svm, axis=1) stop = timeit.default_timer() print(f"(Python) Computation time: {(stop - start) * 1000 * 1000} mus") return prediction if not print_time: line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest") line_white = line / white_matrix * 255 line_centered = line_white - pca_mean_calibrated line_pca = np.dot(line_centered, pca_components_calibrated) line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated prediction = np.argmax(line_svm, axis=1) return prediction def gpu_svm(line, print_time=False, verbose=False): impl.run(line, output, print_time=print_time, verbose=verbose) return output def cpu_svm(line, print_time=False, verbose=False): impl.runCPU(line, output, print_time=print_time, verbose=verbose) return output # test the performance using the 3 to 25 principal components cam = HyperSpectralCamera() frame = cam.read() pixel, channel = frame.shape buffer_image = np.zeros((300, pixel), dtype=np.uint8) default_display_channel = channel//2 replicates = 3 # test total timings step 1-6 for i in range(replicates + 1): # "warm up" the camera warmup_time = 0 warmup_start = timeit.default_timer() for _ in range(1000): frame = cam.read() buffer_image[0, :] = frame[:, default_display_channel] buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', cv2.cvtColor(buffer_image, cv2.COLOR_GRAY2BGR)) cv2.waitKey(1) warmup_end = timeit.default_timer() warmup_time = warmup_end - warmup_start # test the minimal model minimal_time = 0 minimal_start = timeit.default_timer() for _ in range(1000): frame = cam.read() buffer_image[0, :] = noncalibrated_svm(frame) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) minimal_end = timeit.default_timer() minimal_time = minimal_end - minimal_start #test python implementation python_time = 0 python_start = timeit.default_timer() for _ in range(1000): frame = cam.read() buffer_image[0, :] = calibrated_svm(frame) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) python_end = timeit.default_timer() python_time = python_end - python_start # test model on cpu calibrated_svm_time = 0 calibrated_svm_start = timeit.default_timer() for _ in range(1000): frame = cam.read() buffer_image[0, :] = cpu_svm(frame) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) calibrated_svm_end = timeit.default_timer() calibrated_svm_time = calibrated_svm_end - calibrated_svm_start # test GPU accelerated svm model gpu_svm_time = 0 gpu_svm_start = timeit.default_timer() for _ in range(1000): frame = cam.read() buffer_image[0, :] = gpu_svm(frame) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) gpu_svm_end = timeit.default_timer() gpu_svm_time = gpu_svm_end - gpu_svm_start print(f"(No processing) Warm up time: {warmup_time * 1000 * 1000} mus") print(f"(CPU minimal) Total time: {minimal_time * 1000 * 1000} mus") print(f"(Python) Total time: {python_time * 1000 * 1000} mus") print(f"(CPU) Total time: {calibrated_svm_time * 1000 * 1000} mus") print(f"(GPU) Total time: {gpu_svm_time * 1000 * 1000} mus") # test only computation time (step 2-5) # for i in range(replicates + 1): # # "warm up" the camera # for _ in range(1000): # frame = cam.read() # # test the minimal model # for _ in range(1000): # frame = cam.read() # buffer_image[0, :] = noncalibrated_svm(frame, print_time=True) # buffer_image = np.roll(buffer_image, -1, axis=0) # cv2.imshow('frame', convert_colors(buffer_image)) # cv2.waitKey(1) # #test python implementation # python_time = 0 # for _ in range(1000): # frame = cam.read() # python_start = timeit.default_timer() # buffer_image[0, :] = calibrated_svm(frame, print_time=True) # python_end = timeit.default_timer() # buffer_image = np.roll(buffer_image, -1, axis=0) # cv2.imshow('frame', convert_colors(buffer_image)) # cv2.waitKey(1) # python_time += python_end - python_start # # test model on cpu # CPU_time = 0 # for _ in range(1000): # frame = cam.read() # calibrated_svm_start = timeit.default_timer() # buffer_image[0, :] = cpu_svm(frame, print_time=True) # calibrated_svm_end = timeit.default_timer() # buffer_image = np.roll(buffer_image, -1, axis=0) # cv2.imshow('frame', convert_colors(buffer_image)) # cv2.waitKey(1) # CPU_time += calibrated_svm_end - calibrated_svm_start # test GPU accelerated svm model GPU_time = 0 for _ in range(1000): frame = cam.read() gpu_svm_start = timeit.default_timer() buffer_image[0, :] = gpu_svm(frame, print_time=True) gpu_svm_end = timeit.default_timer() buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) GPU_time += gpu_svm_end - gpu_svm_start # test individual timings for _ in range(replicates + 1): # "warm up" the camera for _ in range(1000): frame = cam.read() # test the minimal model for _ in range(1000): frame = cam.read() buffer_image[0, :] = noncalibrated_svm(frame, print_time=True, verbose=True) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) #test python implementation for _ in range(1000): frame = cam.read() buffer_image[0, :] = calibrated_svm(frame, print_time=True, verbose=True) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) # test model on cpu for _ in range(1000): frame = cam.read() buffer_image[0, :] = cpu_svm(frame, print_time=True, verbose=True) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) # test GPU accelerated svm model for _ in range(1000): frame = cam.read() buffer_image[0, :] = gpu_svm(frame, print_time=True, verbose=True) buffer_image = np.roll(buffer_image, -1, axis=0) cv2.imshow('frame', convert_colors(buffer_image)) cv2.waitKey(1) cv2.destroyAllWindows()