HSI-PROCESSING / livetests.py
livetests.py
Raw
import cv2
import numpy as np
import queue
import threading
import os
import sys
import json
import timeit
from scipy.signal import savgol_filter, savgol_coeffs

#import the accelerated module
build_dir = os.path.relpath("build")
sys.path.append(build_dir)
import build.plastic_sorting as wrapper

class HyperSpectralCamera(cv2.VideoCapture):
    def __init__(self, path='/dev/qtec/video0', API=cv2.CAP_V4L2, *args, **kwargs):
        os.system(r'quark-ctl config load --file "/home/root/newhc/livetests/config.json"')
        super().__init__(path, API, *args, **kwargs)

        with open('/home/root/newhc/livetests/config.json') as f:
            settings = json.load(f)

        self.set(cv2.CAP_PROP_FRAME_WIDTH, settings['crops'][0][2])
        self.set(cv2.CAP_PROP_FRAME_HEIGHT, settings['crops'][0][3])
        self.set(cv2.CAP_PROP_FPS, settings['framerate'])
        self.set(cv2.CAP_PROP_EXPOSURE, settings['controls']['exposure_time_absolute']['value'])
        
        # queue for holding latest frame
        self.queue = queue.Queue()

        # start reader thread
        self.reader_thread = threading.Thread(target=self._reader, daemon=True)
        self.reader_thread.start()

    def _reader(self):
        while True:
            ret, frame = super().read()
            if not ret:
                break
            if not self.queue.empty():
                try:
                    self.queue.get_nowait()
                except queue.Empty:
                    pass
                
            self.queue.put(frame)
    def read(self):
        # ret, frame = super().read()
        frame = self.queue.get(timeout=5)
        frame = frame[:, :, 0] # opencv always reads in (height, width, channels) thus the output is (channels, pixels, 3) where the 3 axis is redundant
        frame = frame.transpose() # transpose to (pixels, channels)
        return frame

def convert_colors(frame):
    return frame
    colors = [
        (255, 0, 0),    # Class 0 - Red
        (0, 255, 0),    # Class 1 - Green
        (0, 0, 255),    # Class 2 - Blue
        (255, 255, 0),  # Class 3 - Cyan
        (255, 0, 255),  # Class 4 - Magenta
        (0, 255, 255),  # Class 5 - Yellow
        (128, 0, 0),    # Class 6 - Maroon
        (0, 128, 0),    # Class 7 - Dark Green
        (0, 0, 128),    # Class 8 - Navy
        (128, 128, 128) # Class 9 - Gray
    ]

    color_frame = np.zeros((frame.shape[0], frame.shape[1], 3), dtype=np.uint8)
    for i in range(10):
        color_frame[frame == i] = colors[i]
    
    return color_frame

#load the noncalibrated SVM model
pca_components_noncalibrated = np.load("models/noncalibratedSVM/pca_model_components.npy").astype(np.float32).T
pca_mean_noncalibrated = np.load("models/noncalibratedSVM/pca_model_mean.npy").astype(np.float32)
svm_coef_noncalibrated = np.load("models/noncalibratedSVM/svm_model_coef.npy").astype(np.float32).T
svm_intercept_noncalibrated = np.load("models/noncalibratedSVM/svm_model_intercept.npy").astype(np.float32)

#load the calibrated SVM model
pca_components_calibrated = np.load("models/calibratedSVM/pca_model_components.npy").astype(np.float32).T
pca_mean_calibrated = np.load("models/calibratedSVM/pca_model_mean.npy").astype(np.float32)
svm_coef_calibrated = np.load("models/calibratedSVM/svm_model_coef.npy").astype(np.float32).T
svm_intercept_calibrated = np.load("models/calibratedSVM/svm_model_intercept.npy").astype(np.float32)
white_matrix = np.load("models/calibratedSVM/whiteMatrix.npy").astype(np.float32)

#load the GPU accelerated SVM model
pca_components_calibrated_GPU = np.ascontiguousarray(np.load("models/calibratedSVM/pca_model_components.npy").astype(np.float32))
pca_mean_calibrated_GPU = np.load("models/calibratedSVM/pca_model_mean.npy").astype(np.float32)
svm_coef_calibrated_GPU = np.load("models/calibratedSVM/svm_model_coef.npy").astype(np.float32)
svm_intercept_calibrated_GPU = np.load("models/calibratedSVM/svm_model_intercept.npy").astype(np.float32)
white_matrix_GPU = np.ascontiguousarray(np.load("models/calibratedSVM/whiteMatrix.npy").astype(np.float32))

impl = wrapper.Kernel()
savgol_coef = savgol_coeffs(13, 3, 0)


impl.load_model(pca_components_calibrated_GPU, pca_mean_calibrated_GPU,
                SVM=svm_coef_calibrated_GPU,
                SVM_intercept=svm_intercept_calibrated_GPU,
                whitebalance=white_matrix_GPU,
                savgol_coef = savgol_coef,
                Wl_min=0,
                Wl_max=600,
                savgol_toggle=1,
                normalization_toggle=0,
                absorption_toggle=0)


output = np.full((white_matrix.shape[0]), 0, dtype=np.uint8)



def noncalibrated_svm(line, print_time=False, verbose=False):
    impl.runCPU_minimal(line, output, print_time=print_time, verbose=verbose)
    return output

def calibrated_svm(line, windowlenght=13, polyorder=3, deriv=0, print_time=False, verbose=False):
    if print_time and verbose:
        savgol_time = 0
        white_time = 0
        PCA_time = 0
        SVM_time = 0
        start = timeit.default_timer()
        line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest")
        stop = timeit.default_timer()
        savgol_time = stop - start

        start = timeit.default_timer()
        line_white = line / white_matrix * 255
        stop = timeit.default_timer()
        white_time = stop - start

        start = timeit.default_timer()
        line_centered = line_white - pca_mean_calibrated
        stop = timeit.default_timer()
        PCA_time = stop - start

        start = timeit.default_timer()
        line_pca = np.dot(line_centered, pca_components_calibrated)
        line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated
        prediction = np.argmax(line_svm, axis=1)
        stop = timeit.default_timer()
        SVM_time = stop - start

        print(f"(Python) White balance time: {white_time * 1000 * 1000} mus")
        print(f"(Python) Savitzky-Golay time: {savgol_time * 1000 * 1000} mus")
        print(f"(Python) PCA time: {PCA_time * 1000 * 1000} mus")
        print(f"(Python) SVM time: {SVM_time * 1000 * 1000} mus")
        return prediction
    
    if print_time:
        start = timeit.default_timer()
        line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest")
        line_white = line / white_matrix * 255
        line_centered = line_white - pca_mean_calibrated
        line_pca = np.dot(line_centered, pca_components_calibrated)
        line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated
        prediction = np.argmax(line_svm, axis=1)
        stop = timeit.default_timer()
        print(f"(Python) Computation time: {(stop - start) * 1000 * 1000} mus")
        return prediction
    
    if not print_time:
        line = savgol_filter(line, windowlenght, polyorder, deriv, axis=1, mode="nearest")
        line_white = line / white_matrix * 255
        line_centered = line_white - pca_mean_calibrated
        line_pca = np.dot(line_centered, pca_components_calibrated)
        line_svm = np.dot(line_pca, svm_coef_calibrated) + svm_intercept_calibrated
        prediction = np.argmax(line_svm, axis=1)
        return prediction
    

def gpu_svm(line, print_time=False, verbose=False):
    impl.run(line, output, print_time=print_time, verbose=verbose)
    return output

def cpu_svm(line, print_time=False, verbose=False):
    impl.runCPU(line, output, print_time=print_time, verbose=verbose)
    return output


# test the performance using the 3 to 25 principal components
cam = HyperSpectralCamera()
frame = cam.read()

pixel, channel = frame.shape

buffer_image = np.zeros((300, pixel), dtype=np.uint8)
default_display_channel = channel//2

replicates = 3


# test total timings step 1-6
for i in range(replicates + 1):
    # "warm up" the camera
    warmup_time = 0
    warmup_start = timeit.default_timer()
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = frame[:, default_display_channel]

        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', cv2.cvtColor(buffer_image, cv2.COLOR_GRAY2BGR))
        cv2.waitKey(1)
    warmup_end = timeit.default_timer()
    warmup_time = warmup_end - warmup_start
    

    # test the minimal model
    minimal_time = 0
    minimal_start = timeit.default_timer()
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = noncalibrated_svm(frame)

        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)
    minimal_end = timeit.default_timer()
    minimal_time = minimal_end - minimal_start


    #test python implementation
    python_time = 0
    python_start = timeit.default_timer()
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = calibrated_svm(frame)

        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)
    python_end = timeit.default_timer()
    python_time = python_end - python_start


    # test model on cpu
    calibrated_svm_time = 0
    calibrated_svm_start = timeit.default_timer()
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = cpu_svm(frame)

        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)
    calibrated_svm_end = timeit.default_timer()
    calibrated_svm_time = calibrated_svm_end - calibrated_svm_start


    # test GPU accelerated svm model
    gpu_svm_time = 0
    gpu_svm_start = timeit.default_timer()
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = gpu_svm(frame)

        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)
    gpu_svm_end = timeit.default_timer()
    gpu_svm_time = gpu_svm_end - gpu_svm_start




    print(f"(No processing) Warm up time: {warmup_time * 1000 * 1000} mus")
    print(f"(CPU minimal) Total time: {minimal_time * 1000 * 1000} mus")
    print(f"(Python) Total time: {python_time * 1000 * 1000} mus")
    print(f"(CPU) Total time: {calibrated_svm_time * 1000 * 1000} mus")
    print(f"(GPU) Total time: {gpu_svm_time * 1000 * 1000} mus")


# test only computation time (step 2-5)
# for i in range(replicates + 1):
#     # "warm up" the camera
#     for _ in range(1000):
#         frame = cam.read()

#     # test the minimal model
#     for _ in range(1000):
#         frame = cam.read()

#         buffer_image[0, :] = noncalibrated_svm(frame, print_time=True)
#         buffer_image = np.roll(buffer_image, -1, axis=0)

#         cv2.imshow('frame', convert_colors(buffer_image))
#         cv2.waitKey(1)
    
#     #test python implementation
#     python_time = 0
#     for _ in range(1000):
#         frame = cam.read()

#         python_start = timeit.default_timer()
#         buffer_image[0, :] = calibrated_svm(frame, print_time=True)
#         python_end = timeit.default_timer()
#         buffer_image = np.roll(buffer_image, -1, axis=0)

#         cv2.imshow('frame', convert_colors(buffer_image))
#         cv2.waitKey(1)
#         python_time += python_end - python_start
    
#     # test model on cpu
#     CPU_time = 0
#     for _ in range(1000):
#         frame = cam.read()

#         calibrated_svm_start = timeit.default_timer()
#         buffer_image[0, :] = cpu_svm(frame, print_time=True)
#         calibrated_svm_end = timeit.default_timer()
#         buffer_image = np.roll(buffer_image, -1, axis=0)

#         cv2.imshow('frame', convert_colors(buffer_image))
#         cv2.waitKey(1)
#         CPU_time += calibrated_svm_end - calibrated_svm_start
    
    # test GPU accelerated svm model
    GPU_time = 0
    for _ in range(1000):
        frame = cam.read()

        gpu_svm_start = timeit.default_timer()
        buffer_image[0, :] = gpu_svm(frame, print_time=True)
        gpu_svm_end = timeit.default_timer()
        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)
        GPU_time += gpu_svm_end - gpu_svm_start


# test individual timings
for _ in range(replicates + 1):
    # "warm up" the camera
    for _ in range(1000):
        frame = cam.read()
    
    # test the minimal model
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = noncalibrated_svm(frame, print_time=True, verbose=True)
        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)
    
    #test python implementation
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = calibrated_svm(frame, print_time=True, verbose=True)
        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)

    # test model on cpu
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = cpu_svm(frame, print_time=True, verbose=True)
        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)

    # test GPU accelerated svm model
    for _ in range(1000):
        frame = cam.read()

        buffer_image[0, :] = gpu_svm(frame, print_time=True, verbose=True)
        buffer_image = np.roll(buffer_image, -1, axis=0)

        cv2.imshow('frame', convert_colors(buffer_image))
        cv2.waitKey(1)




cv2.destroyAllWindows()