Team11-Drona-Task2 / vision / vision_pipeline.py
vision_pipeline.py
Raw
import pyrealsense2 as rs
import numpy as np
import cv2
from cv2 import aruco
import time
from scipy.spatial.transform import Rotation
import math
import yaml

class VisionPipeline():
    """
        This is a class for initializing Intel Pyrealsense2 D435i camera and also finding 
        depth and rgb frames along with detection of aruco marker.
       
        Attributes:
            depth_res: defines the depth resolution. Default is (720, 1080).
            rgb_res: defines the rgb resolution. Default is (720, 1080).
            align_to: defines frame to which alignment must be done. Default is "rgb".
            marker_size: defines the marker size of the aruco tag. Default is 3.75.
            marker_type: defines the aruco marker type. Default is DICT_4X4_50.
            marker_dict: gets the required aruco dictionary.
            require_marker_id: defines the marker id of the aruco tag placed on the drone.
            calib_file_path: defines the path for camera calibration.
            DEBUG: a flag to enable prints in code for debugging. Default is False.
        
    """
    def __init__(self,
                 depth_res=(720, 1280),
                 rgb_res=(720, 1280),
                 align_to="rgb",
                 marker_size=3.75,
                 marker_type=aruco.DICT_4X4_50,
                 required_marker_id=1,
                 debug=0,
                 padding=50,
                 config_file="./vision/config.yaml",
                 fps_moving_window_size=10) -> None:


        self.depth_res = depth_res
        self.rgb_res = rgb_res
        self.align_to = align_to
        self.marker_size = marker_size
        self.marker_type = marker_type
        self.marker_dict = aruco.getPredefinedDictionary(self.marker_type)

        self.required_marker_id = required_marker_id

        with open(config_file, 'r') as f:
            self.camera_config = yaml.load(f, Loader=yaml.FullLoader)
        
        self.DEBUG = debug
        self.padding = padding
        self.fps_moving_window_size = fps_moving_window_size
        self.fps_moving_window = []

        self.do_tracking = self.camera_config['tracking']['enable']
        self.last_detected_marker = None
        self.tracking_area_th = self.camera_config['tracking']['area_th']
        self.tracking_point_th = self.camera_config['tracking']['centroid_th']

        self.estimated_pose = None
        self.current_waypoint = None
        self.current_midpoint = None

        self.calib_yaw_at_start = self.camera_config['extrinsics']['calibrate_yaw']
        self.imu_calib_data = self.camera_config['extrinsics']['imu_correction']

        self.cam_init()

        self.last_time = None
        self.current_time = None
        self.counter = 0
        
        self.avg_fps = None
        self.cam_rvec = np.array([0.0, 0.0, 0.0])
        self.raw_calib_yaw = 0.0
        if self.calib_yaw_at_start:
            self.calibrate_yaw()
        else:
            self.cam_rvec = np.array([0.0, 0.0, 0.0])
        


    
    def init_realsense(self):
        """
        Initializes Realsense by enabling both depth and RGB stream and sets up parameters such as sharpness, contrast, exposure etc.
        
        Parameter:
            None         
        
        Returns:
            None
        """

        # Start realsense pipeline
        self.pipeline = rs.pipeline()

        config = rs.config()
        config.enable_stream(rs.stream.depth, self.depth_res[1], self.depth_res[0], rs.format.z16, 30)
        config.enable_stream(rs.stream.color, self.rgb_res[1], self.rgb_res[0], rs.format.bgr8, 30)
        
        profile = self.pipeline.start(config)

        colorSensor = profile.get_device().query_sensors()[1]

        # Set camera parameters

        # rs.option.enable_auto_exposure
        rs.option.enable_motion_correction

        colorSensor.set_option(rs.option.enable_auto_exposure, self.camera_config['camera_params']['enable_auto_exposure'])
        colorSensor.set_option(rs.option.enable_auto_white_balance, self.camera_config['camera_params']['enable_auto_white_balance'])

        colorSensor.set_option(rs.option.sharpness, self.camera_config['camera_params']['sharpness'])
        colorSensor.set_option(rs.option.contrast, self.camera_config['camera_params']['contrast'])
        # colorSensor.set_option(rs.option.gamma, 0)
        colorSensor.set_option(rs.option.brightness, self.camera_config['camera_params']['brightness'])

        colorSensor.set_option(rs.option.exposure, self.camera_config['camera_params']['exposure'])
        # colorSensor.set_option(rs.option.gain, 300)

        self.depth_sensor = profile.get_device().first_depth_sensor()
        self.depth_scale = self.depth_sensor.get_depth_scale()


        if self.align_to.lower() == "rgb":
            self.align_frames = rs.align(rs.stream.color)
        elif self.align_to.lower() == "depth":
            self.align_frames = rs.align(rs.stream.depth)
        else:
            raise NotImplementedError(f"Align to {self.align_to} not implemented!")


    
    def init_aruco_detector(self):
        """
        Initializes camera Intrinsic and Extrinsic Parameters and generates Aruco detector parameters.
        
        Parameters:
            None
        
        Returns:
            None
        """ 

        self.cam_matrix = np.array(self.camera_config['intrinsics']['cam_matrix']) #calib_data["camMatrix"]
        self.dist_coef = np.array(self.camera_config['intrinsics']['dist_coeff']) #np.zeros((5, 1))  #calib_data["distCoef"]
        self.cam_rvec = np.array([0.0, 0.0, 0.0])
        self.cam_tvec = np.array(self.camera_config['extrinsics']['global_origin'])
        self.cam_tf = np.linalg.pinv(self.make_tf_matrix(self.cam_rvec, self.cam_tvec))


        self.param_markers = aruco.DetectorParameters_create()
        

    def stop(self):
        """
        Stops the pipeline.
        
        Parameters:
            None
        
        Returns:
            None
        """
        self.pipeline.stop()


    def get_frames(self):
        """
        Returns the aligned depth and rgb frames for proper depth detection.
        
        Parameters:
            None
        
        Returns:
            aligned frames: Returns frame with depth and RGB frame aligned.
            
        """
        self.frames = self.pipeline.wait_for_frames()
        self.aligned_frames = self.align_frames.process(self.frames)

        return self.aligned_frames

    
    def extract_depth(self, frame):
        """
        Returns depth frame from Realsense Depth Camera.
        
        Parameters:
            frame
        
        Returns:
            depth frame
        """
        return frame.get_depth_frame()

    
    def extract_rgb(self, frame):
        """
        Extracts RGB frame from Realsense RGB Camera.
        
        Parameters:
            frame
        
        Returns:
            frame.get_color_frame(): colour frame
        """
        return frame.get_color_frame()


    def to_image(self, frame):
        """
        Converts pyrealsense2.frame to np.array.
        
        Parameters:
            frame: input frame of type pyrealsense2.frame
        
        Returns:
            np.asarray(frame.get_data())
        """
        return np.asarray(frame.get_data())
    
    
    def find_area(self, corners):
        """
        Utility function to find area of the quadrilateral using corner data.
        
        Parameters:
            corners: co-ordinate of the corners whose area needs to be calculated
        
        Returns:
            area:  returns area of the quadrilateral, specified by its corners
        """  
        corners = corners[0]
        area = (corners[0][0] * corners[1][1] + corners[1][0] * corners[2][1] + corners[2][0] * corners[3][1] + corners[3][0] * corners[0][1])
        area = area - (corners[1][0] * corners[0][1] + corners[2][0] * corners[1][1] + corners[3][0] * corners[2][1] + corners[0][0] * corners[3][1])
        return area

    
    def detect_marker(self, frame):
        """
        Detection of ArUco markers returning its corners.
        
        Parameters:
            frame: input frame of type numpy.array
        
        Returns:
            Case 1:
                None : Returns if no marker detected
            Case 2:
                marker_corners : Returns Corners of detected Aruco Marker
            Case 3:
                "None" : Returns a string, if goes out of region of interest
        """
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        marker_corners, marker_IDs, reject = aruco.detectMarkers(
            gray_frame, self.marker_dict, parameters=self.param_markers
        )

        if self.DEBUG:
            frame = self.plot_markers(frame, marker_corners, marker_IDs)
            rgb_frame = self.plot_markers(frame.copy(), marker_corners, marker_IDs)

        if marker_IDs is None:
            
            if (self.do_tracking == True):
                
                if (reject is None) or (self.last_detected_marker is None):
                    if self.DEBUG:
                        frame = self.plot_rej_markers(frame, reject)
                        rgb_frame = self.plot_rej_markers(rgb_frame, reject)
                        self.show_frame(frame, rgb_frame)
                        print("NO marker detected")
                    return None
                
                else:
                    last_area = self.find_area(self.last_detected_marker)
                    last_center = np.mean(self.last_detected_marker[0], 0) / 4.0
                    
                    for i, reject_corners in enumerate(reject):
                        new_center = np.mean(reject_corners[0], 0) / 4.0
                        new_area = self.find_area(reject_corners)
                        
                        if((abs(new_area - last_area) <= self.tracking_area_th) and (math.sqrt(np.sum((new_center - last_center) ** 2)) <= self.tracking_point_th)):
                            self.last_detected_marker = reject_corners.copy()
                            
                            if self.DEBUG:
                                frame = cv2.polylines(frame, [reject_corners.astype(np.int32)], True, (255, 0, 0), 4, cv2.LINE_AA)
                                rgb_frame = cv2.polylines(rgb_frame, [reject_corners.astype(np.int32)], True, (255, 0, 0), 4, cv2.LINE_AA)
                                self.show_frame(frame, rgb_frame)
                                
                            return reject_corners.astype(np.int32)
                        
                        else:
                            
                            if self.DEBUG:
                                print("NO TRACKING: ", new_area, new_center)
                                print("LAST: ", last_area, last_center)


                    if self.DEBUG:
                        frame = self.plot_rej_markers(frame, reject)
                        rgb_frame = self.plot_rej_markers(rgb_frame, reject)
                        print("NO marker detected!")
                        self.show_frame(frame, rgb_frame)
                    
                    self.last_detected_marker = None
                    return None
            
            else:
                if (reject is None):
                    if self.DEBUG:
                        frame = self.plot_rej_markers(frame, reject)
                        rgb_frame = self.plot_rej_markers(rgb_frame, reject)
                        self.show_frame(frame, rgb_frame)
                        print("NO marker detected")
                    return None
                    
        else:
            if self.DEBUG:
                frame = self.plot_rej_markers(frame, reject)
                rgb_frame = self.plot_rej_markers(rgb_frame, reject)
                self.show_frame(frame, rgb_frame)            
                        

            for i, id_ in enumerate(marker_IDs):
                
                if id_ == self.required_marker_id:
                    mid_point = np.sum(marker_corners[i][0], 0) / 4.0
                    if (mid_point[0] >= self.rgb_res[1] - self.padding) or (mid_point[0] <= self.padding) or (mid_point[1] >= self.rgb_res[0] - self.padding) or (mid_point[1] <= self.padding):
                        return "None"

                    self.last_detected_marker = marker_corners[i].copy()
                    return marker_corners[i].astype(np.int32)

        return None



    def plot_markers(self, frame, marker_corners, marker_ids):
        """
        Draws lines around the detected frames.
        
        Parameters:
            frame: RGB frame
            marker_corners: corners of the detected aruco tag
            marker_ids: ids of the detected markers
        
        Returns:
            frame with plotted markers
        """
        
        frame = frame.copy()
        for i, corners in enumerate(marker_corners):
            if marker_ids[i] == self.required_marker_id:
                frame = cv2.polylines(frame, [corners.astype(np.int32)], True, (0, 255, 0), 4, cv2.LINE_AA)

            else:
                frame = cv2.polylines(frame, [corners.astype(np.int32)], True, (0, 0, 255), 4, cv2.LINE_AA)

        return frame

    def plot_rej_markers(self, frame, marker_corners):
        """
        Draws lines around the detected frames.
        
        Parameters:
            frame: RGB frame
            marker_corners: corners of the detected aruco tag            
        
        Returns:
            frame: frame with plotted rejected markers
        """
        frame = frame.copy()
        for i, corners in enumerate(marker_corners):
            frame = cv2.polylines(frame, [corners.astype(np.int32)], True, (0, 255, 255), 4, cv2.LINE_AA)

        return frame


    def update_waypoint(self, waypoint):
        """
        Updates the waypoint that is plotted on the image frame to be displayed.
        
        Parameters:
            waypoint: this is the new waypoint to be updated         
        """
        self.current_waypoint = waypoint
        self.current_waypoint = np.array(self.current_waypoint) * 100.0


    def show_frame(self, frame, rgb_frame, window_name="Frame"):
        """
        Displays frame for debugging purpose.
        
        Parameters:
            frame: edited RGB Frame with texts
            rgb_frame: original RGB Frame 
            window_name: name of the window where the frame is displayed
        
        Returns:
            None
        """

        if (self.current_waypoint is None):
            pass
        else:
            cv2.putText(frame, f"Goal (m): [{round(self.current_waypoint[0]/100.0, 2)}, {round(self.current_waypoint[1]/100.0, 2)}, {round(self.current_waypoint[2]/100.0, 2)}]", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
        
        if self.estimated_pose is None:
            pass
        else:
            cv2.putText(frame, f"Current Estimate (m): [{round(self.estimated_pose[0],2)}, {round(self.estimated_pose[1],2)}, {round(self.estimated_pose[2],2)}]", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
        camera_yaw = round(Rotation.from_rotvec(self.cam_rvec).as_euler('xyz', degrees=True)[2], 2)
        camera_roll = round(self.imu_calib_data[0] * 180.0 / np.pi, 2)
        camera_pitch = round(self.imu_calib_data[1] * 180.0 / np.pi, 2)
        cv2.putText(frame, f"Cam RPY (deg): [{camera_roll, camera_pitch, camera_yaw}]", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
        m = math.tan(self.raw_calib_yaw + np.pi/2.0)
        if self.current_midpoint is not None:
            c = self.current_midpoint[1] - m * (self.current_midpoint[0])

        if not(self.avg_fps is None):
            cv2.putText(frame, f"Average FPS: {round(self.avg_fps,2)}", (50, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)

        cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
        cv2.imshow(window_name, frame)

        cv2.namedWindow("RGB Image", cv2.WINDOW_NORMAL)
        cv2.imshow("RGB Image", rgb_frame)
        key = cv2.waitKey(1)


    

    def estimate_uncalib_pose(self, marker_corners):
        """
        Returns attitude of Aruco to estimate initial yaw (used by calibrate_yaw() method).
        
        Parameters:
            marker_corners: corners of the detected aruco tag            
        
        Returns:
            uncalib_rot: returns attitude of Aruco
        """
        rVec, tVec, _ = aruco.estimatePoseSingleMarkers(
                [marker_corners.astype(np.float32)], self.marker_size, self.cam_matrix, self.dist_coef
            )


        tf = self.make_tf_matrix(rVec[0, 0, :], tVec[0, 0, :])
        
        correction_tf = self.make_tf_matrix(rvec=Rotation.from_euler('xyz', np.array(self.imu_calib_data)).as_rotvec(), tvec=np.array([0.0, 0.0, 0.0]))
        tf = correction_tf @ tf
        tf = np.linalg.pinv(tf)

        uncalib_rot = Rotation.from_matrix(tf[:3, :3]).as_euler('xyz')
        return uncalib_rot[2]



    def estimate_pose(self, marker_corners, frame_of_ref="camera"):  
        """
        Estimates the pose of the transform matrix using large ArUco tag.
        
        Parameters:
            marker_corners: list of marker corners
            frame_of_ref: frame of reference. Default value is "camera"
        
        Returns:
            6-DOF pose estimate of aruco marker in world frame    
        """  
        rVec, tVec, _ = aruco.estimatePoseSingleMarkers(
                [marker_corners.astype(np.float32)], self.marker_size, self.cam_matrix, self.dist_coef
            )
        tf = self.make_tf_matrix(np.array([0, 0, 0]), tVec[0, 0, :])
        if frame_of_ref == "camera":
            
            correction_tf = self.make_tf_matrix(rvec=Rotation.from_euler('xyz', np.array(self.imu_calib_data)).as_rotvec(), tvec=np.array([0.0, 0.0, 0.0]))
            tf = correction_tf @ tf
            
            translation_tf = self.make_tf_matrix(rvec=self.cam_rvec, tvec=self.cam_tvec)
            translation_tf = np.linalg.pinv(translation_tf)
            tf = translation_tf @ tf
            pass
        else:        
            tf = self.cam_tf @ tf
        return self.tf_to_outformat(tf) / 100.0


    def make_tf_matrix(self, rvec, tvec):
        """
        Creates the transormation matrix using the rotation and translational vectors.
        
        Parameters:
            rvec: contains rotational vectors
            tvec: contains trsnslational vectors
        
        Returns:
            tf: transformation matrix for the camera
        """
        rot_mat = Rotation.from_rotvec(rvec).as_matrix()
        tf = np.eye(4)
        tf[:3, :3] = rot_mat
        tf[:3, 3] = tvec
        return tf

    
    def tf_to_outformat(self, tf):
        """
        Converts the transformation matrix to a list.
        
        Parameters:
            tf: transformation matrix
        
        Returns:
            out_vec: conversion of the transformation matrix into list
        """
        out_vec = np.zeros((6, 1))
        out_vec[3:, 0] = Rotation.from_matrix(tf[:3, :3]).as_euler('xyz')
        out_vec[:3, 0] = tf[:3, 3]
        return out_vec


    def outformat_to_tf(self, input):
        """
        Converts the list to the transformation matrix. 
        
        Parameters:
            out_vec: conversion of the transformation matrix into list            
        
        Returns:            
            tf: transformation matrix
        """
        tf = np.eye(4)
        tf[:3, :3] = Rotation.from_euler('xyz', input[3:, 0]).as_matrix()
        tf[:3, 3] = input[:3, 0]
        return tf


    def depth_from_marker(self, depth_frame, marker_corners, kernel_size=1):
        """
        Finds depth from the estimated pose obtained from the ArUco tag.
        Parameters:
            depth_frame: depth image from realsense
            marker_corners: contains the list of aruco marker corners
            kernel_size: contains the size of the kernel
        Returns:
            depths: depth of the point filtered using median filter
        """
        mid_point = np.sum(marker_corners[0], 0) / 4.0
        mid_point = (mid_point + 0.5).astype(np.int32)

        depths = []
        for j in range(-int(kernel_size / 2.0), int(kernel_size / 2.0) + 1):
            for k in range(-int(kernel_size / 2.0), int(kernel_size / 2.0) + 1):
                try:
                    depth = depth_frame.get_distance(mid_point[0] + j, mid_point[1] + k)
                    depths.append(depth)
                except:
                    pass

        depths.sort()
        num_values = len(depths)
        if num_values % 2 == 0:
            return (depths[int(num_values / 2.0)] + depths[int(num_values / 2.0) - 1]) / 2.0
        else:
            return depths[int(num_values / 2.0) - 1]

    def cam_init(self):
        """
        Initializes camera and starts ArUco detection.
        Parameters:
            None
        Returns:
            None
        """
        self.init_realsense()
        self.init_aruco_detector()
        
    def calibrate_yaw(self):
            """
            Updates the estimated initial yaw in the rotation vector, by taking median of multiple yaw readings.
            
            Parameters:
                None            
            
            Returns:
                None
            """
            max_iters = 100
            num_calib_frames = 0
            rvec_uncalib = []
            while True:
                aligned_frames = self.get_frames()    
                color_frame = self.extract_rgb(aligned_frames)
                depth_frame = self.extract_depth(aligned_frames)
                if not depth_frame or not color_frame:
                    continue
                
                color_img = self.to_image(color_frame)
                marker_corners = self.detect_marker(color_img)
                
                if marker_corners is None:
                    pass
                elif type(marker_corners) is str:
                    pass
                else:
                    num_calib_frames += 1
                    rvec_uncalib.append(self.estimate_uncalib_pose(marker_corners))
                    if(num_calib_frames >= max_iters):
                        print("Yaw Calibrated ")
                        break   

            rvec_uncalib.sort()
            self.raw_calib_yaw = rvec_uncalib[int(len(rvec_uncalib) / 2.0)]
            temp_ = Rotation.from_euler('xyz', np.array([0.0, 0.0, self.raw_calib_yaw])).as_rotvec()[2]
            yawTemp_ = (np.pi+temp_+np.pi/2)%(2*np.pi)-np.pi
            self.cam_rvec = np.array([0.0, 0.0, yawTemp_])
            print(f"YAW Value from Calibration: {self.cam_rvec}")
        
    def cam_process(self, cam_queue):
        """
        Processes video frames to detect and locate Aruco markers, find depth value using the depth frame and then updating the camera data queue with the estimated readings.
        
        Parameters:
            cam_queue: a queue that is storing camera data where the latest reading is accessed by its last element            
        
        Returns:
            None
        """ 
        flag = 0
        aligned_frames = self.get_frames()    
        color_frame = self.extract_rgb(aligned_frames)
        depth_frame = self.extract_depth(aligned_frames)

        if not depth_frame or not color_frame:
            return

        self.current_time = time.time()
        if not(self.last_time is None):
            time_diff = self.current_time - self.last_time
            if(time_diff != 0.0):
                self.fps_moving_window.append(1/time_diff)
                self.fps_moving_window[-self.fps_moving_window_size:]
                self.avg_fps = np.mean(self.fps_moving_window)
            
        self.last_time = self.current_time

        if self.DEBUG:
            if not(self.avg_fps is None):
                print(f"-----------------------------------------------------Average FPS: ", self.avg_fps)
            

        color_img = self.to_image(color_frame)

        marker_corners = self.detect_marker(color_img)
            
        if marker_corners is None:
            self.counter += 1
            if self.counter >= 30:
                flag = 2
                cam_queue.append([flag])
                self.counter = 0
                pass
        elif type(marker_corners) == type("None"):
            flag = 1
            cam_queue.append([flag])
        else:
            self.counter = 0
            aruco_pose = self.estimate_pose(marker_corners)
            _intrisics = rs.intrinsics()
            _intrisics.width = self.rgb_res[1]
            _intrisics.height = self.rgb_res[0]
            _intrisics.ppx = self.cam_matrix[0][2]
            _intrisics.ppy = self.cam_matrix[1][2]
            _intrisics.fx = self.cam_matrix[0][0]
            _intrisics.fy = self.cam_matrix[1][1]

            z_from_realsense = self.depth_from_marker(depth_frame, marker_corners, kernel_size=3)
            mid_point = np.sum(marker_corners[0], 0) / 4.0
            mid_point = (mid_point + 0.5).astype(np.int32)

            self.current_midpoint = mid_point.copy()
            new_tf = self.make_tf_matrix(rvec=Rotation.from_euler('xyz', np.array(self.imu_calib_data)).as_rotvec(), tvec=np.array([0.0, 0.0, 0.0]))
            
            point_from_rs = rs.rs2_deproject_pixel_to_point(_intrisics, [mid_point[0], mid_point[1]], z_from_realsense)
            z_from_realsense = (new_tf @ np.array([point_from_rs[0] * 100.0, point_from_rs[1] * 100.0, point_from_rs[2] * 100.0, 1]))[2] / 100.0
            z_from_realsense = -z_from_realsense + 2.858
            
            aruco_pose[0] = -aruco_pose[0]
                
            cam_queue.append([self.current_time, aruco_pose, z_from_realsense])
            self.estimated_pose = [aruco_pose[0][0], aruco_pose[1][0], z_from_realsense]