# Academic Software License: Copyright © 2024. ''' SimBEV perception and navigation sensors. ''' import cv2 import time import carla import flow_vis import numpy as np import open3d as o3d from queue import Queue from matplotlib import colormaps as cm RANGE = np.linspace(0.0, 1.0, 256) RAINBOW = np.array(cm.get_cmap('rainbow')(RANGE))[:, :3] LABEL_COLORS = np.array([ (255, 255, 255), # None (128, 64, 128), # Road (244, 35, 232), # Sidewalk (70, 70, 70), # Building (102, 102, 156), # Wall (190, 153, 153), # Fence (153, 153, 153), # Pole (250, 170, 30), # TrafficLight (220, 220, 0), # TrafficSign (107, 142, 35), # Vegetation (152, 251, 152), # Terrain (70, 130, 180), # Sky (220, 20, 60), # Pedestrian (255, 0, 0), # Rider (0, 0, 142), # Car (0, 0, 70), # Truck (0, 60, 100), # Bus (0, 80, 100), # Train (0, 0, 230), # Motorcycle (119, 11, 32), # Bicycle (110, 190, 160), # Static (170, 120, 50), # Dynamic (55, 90, 80), # Other (45, 60, 150), # Water (227, 227, 227), # RoadLine (81, 0, 81), # Ground (150, 100, 100), # Bridge (230, 150, 140), # RailTrack (180, 165, 180), # GuardRail ]) / 255.0 class BaseSensor: ''' Base sensor class that manages the creation and data acquisition of a sensor. Args: world: CARLA simulation world. sensor_manager: SensorManager instance that the sensor belongs to. transform: sensor's transform relative to what it is attached to. attached: CARLA object the sensor is attached to. ''' def __init__(self, world, sensor_manager, transform, attached): self.world = world self.sensor_manager = sensor_manager def render(self): ''' Render sensor data. ''' raise NotImplementedError def save(self): ''' Save sensor data to file. ''' raise NotImplementedError def destroy(self): ''' Destroy the sensor. ''' raise NotImplementedError class BaseCamera(BaseSensor): ''' Base camera class that manages the creation and data acquisition of cameras. Args: world: CARLA simulation world. sensor_manager: SensorManager instance that the camera belongs to. transform: the camera's transform relative to what it is attached to. attached: CARLA object the camera is attached to. width: image width in pixels. height: image height in pixels. options: dictionary of camera options. ''' def __init__(self, world, sensor_manager, transform, attached, width, height, options): self.world = world self.sensor_manager = sensor_manager self.width = width self.height = height self.options = options self.image = None # Create queues for rendering and saving images. Since queues are # blocking, this ensures that at each time step images are fully # acquired before the code continues. self.render_queue = Queue() self.save_queue = Queue() self._get_camera() self.camera_bp.set_attribute('image_size_x', str(self.width)) self.camera_bp.set_attribute('image_size_y', str(self.height)) for key in options: self.camera_bp.set_attribute(key, options[key]) self.camera = self.world.spawn_actor(self.camera_bp, transform, attach_to=attached) self.camera.listen(self._process_image) def _get_camera(self): ''' Add camera to the SensorManager and get the camera blueprint. ''' raise NotImplementedError def _process_image(self, image): ''' Callback function for processing raw image data. Args: image: raw image data. ''' # Convert image colors if necessary and reshape into a # (height, width, 4) NumPy array. image.convert(carla.ColorConverter.Raw) array = np.frombuffer(image.raw_data, dtype=np.uint8) array = np.reshape(array, (image.height, image.width, 4)) # Remove the alpha channel. self.image = array[:, :, :3] # Put the image in both queues. self.render_queue.put(self.image) self.save_queue.put(self.image) def clear_queues(self): ''' Clear the queues to ensure only the latest image is accessible. ''' with self.render_queue.mutex: self.render_queue.queue.clear() self.render_queue.all_tasks_done.notify_all() self.render_queue.unfinished_tasks = 0 with self.save_queue.mutex: self.save_queue.queue.clear() self.save_queue.all_tasks_done.notify_all() self.save_queue.unfinished_tasks = 0 def destroy(self): ''' Destroy the camera. ''' self.camera.destroy() class RGBCamera(BaseCamera): ''' RGB camera class that manages the creation and data acquisition of RGB cameras. ''' def __init__(self, world, sensor_manager, transform, attached, width, height, options): super().__init__(world, sensor_manager, transform, attached, width, height, options) def _get_camera(self): ''' Add RGB camera to the SensorManager and get the camera blueprint. ''' self.sensor_manager.add_camera(self) self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.rgb') def render(self, window_name='RGB Image'): ''' Render RGB image. Args: window_name: window name for the rendered image. ''' cv2.imshow(window_name, cv2.resize(self.render_queue.get(True, 10.0), (self.width // 4, self.height // 4))) cv2.waitKey(1) def save(self, camera_name, path, scene, frame): ''' Save RGB image to file. Args: camera_name: name of the camera. path: root directory of the dataset. scene: scene number. frame: frame number. ''' cv2.imwrite( f'{path}/simbev/sweeps/RGB-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-RGB-{camera_name}.jpg', self.save_queue.get(True, 10.0) ) class SemanticCamera(BaseCamera): ''' Semantic segmentation camera class that manages the creation and data acquisition of semantic segmentation cameras. ''' def __init__(self, world, sensor_manager, transform, attached, width, height, options): super().__init__(world, sensor_manager, transform, attached, width, height, options) def _get_camera(self): ''' Add semantic segmentation camera to the SensorManager and get the camera blueprint. ''' self.sensor_manager.add_semantic_camera(self) self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.semantic_segmentation') def _process_image(self, image): ''' Callback function for processing raw image data. Args: image: raw image data. ''' # Convert image colors using the CityScapes palette and reshape into a # (height, width, 4) NumPy array. image.convert(carla.ColorConverter.CityScapesPalette) array = np.frombuffer(image.raw_data, dtype=np.uint8) array = np.reshape(array, (image.height, image.width, 4)) # Remove the alpha channel. self.image = array[:, :, :3] # Put the image in both queues. self.render_queue.put(self.image) self.save_queue.put(self.image) def render(self, window_name='Segmented Image'): ''' Render semantic segmentation image. Args: window_name: window name for the rendered image. ''' cv2.imshow(window_name, cv2.resize(self.render_queue.get(True, 10.0), (self.width // 4, self.height // 4))) cv2.waitKey(1) def save(self, camera_name, path, scene, frame): ''' Save semantic segmentation image to file. Args: camera_name: name of the camera. path: root directory of the dataset. scene: scene number. frame: frame number. ''' cv2.imwrite( f'{path}/simbev/sweeps/SEG-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-SEG-{camera_name}.png', self.save_queue.get(True, 10.0) ) class InstanceCamera(BaseCamera): ''' Instance segmentation camera class that manages the creation and data acquisition of instance segmentation cameras. ''' def __init__(self, world, sensor_manager, transform, attached, width, height, options): super().__init__(world, sensor_manager, transform, attached, width, height, options) def _get_camera(self): ''' Add instance segmentation camera to the SensorManager and get the camera blueprint. ''' self.sensor_manager.add_instance_camera(self) self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.instance_segmentation') def render(self, window_name='Instance Image'): ''' Render instance segmentation image. Args: window_name: window name for the rendered image. ''' cv2.imshow(window_name, cv2.resize(self.render_queue.get(True, 10.0), (self.width // 4, self.height // 4))) cv2.waitKey(1) def save(self, camera_name, path, scene, frame): ''' Save instance segmentation image to file. Args: camera_name: name of the camera. path: root directory of the dataset. scene: scene number. frame: frame number. ''' cv2.imwrite( f'{path}/simbev/sweeps/IST-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-IST-{camera_name}.png', self.save_queue.get(True, 10.0) ) class DepthCamera(BaseCamera): ''' Depth camera class that manages the creation and data acquisition of depth cameras. ''' def __init__(self, world, sensor_manager, transform, attached, width, height, options): super().__init__(world, sensor_manager, transform, attached, width, height, options) def _get_camera(self): ''' Add depth camera to the SensorManager and get the camera blueprint. ''' self.sensor_manager.add_depth_camera(self) self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.depth') def render(self, window_name='Depth Image'): ''' Render depth image. Args: window_name: window name for the rendered image. ''' image = self.render_queue.get(True, 10.0) # 1000 * (R + G * 256 + B * 256 * 256) / (256 * 256 * 256 - 1) # provides the depth value in meters. This is then converted to a # logarithmic scale for better visualization. normalized_distance = (image[:, :, 2] + image[:, :, 1] * 256.0 + image[:, :, 0] * 256.0 * 256.0) \ / (256.0 * 256.0 * 256.0 - 1) log_distance = 255 * np.log(256.0 * normalized_distance + 1) / np.log(257.0) cv2.imshow(window_name, cv2.resize(log_distance.astype(np.uint8), (self.width // 4, self.height // 4))) cv2.waitKey(1) def save(self, camera_name, path, scene, frame): ''' Save depth image to file. Args: camera_name: name of the camera. path: root directory of the dataset. scene: scene number. frame: frame number. ''' cv2.imwrite( f'{path}/simbev/sweeps/DPT-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-DPT-{camera_name}.png', self.save_queue.get(True, 10.0) ) class FlowCamera(BaseCamera): ''' Flow camera class that manages the creation and data acquisition of flow cameras. ''' def __init__(self, world, sensor_manager, transform, attached, width, height, options): super().__init__(world, sensor_manager, transform, attached, width, height, options) def _get_camera(self): ''' Add flow camera to the SensorManager and get the camera blueprint. ''' self.sensor_manager.add_flow_camera(self) self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.optical_flow') def _process_image(self, image): ''' Callback function for processing raw image data. Args: image: raw image data. ''' # Reshape into a (height, width, 2) NumPy array. array = np.frombuffer(image.raw_data, dtype=np.float32) self.image = np.reshape(array, (image.height, image.width, 2)) # Put the image in both queues. self.render_queue.put(self.image) self.save_queue.put(self.image) def render(self, window_name='Flow Image'): ''' Render flow image. Args: window_name: window name for the rendered image. ''' image = flow_vis.flow_to_color(self.render_queue.get(True, 10.0), convert_to_bgr=True) cv2.imshow(window_name, cv2.resize(image, (self.width // 4, self.height // 4))) cv2.waitKey(1) def save(self, camera_name, path, scene, frame): ''' Save flow image to file. Args: camera_name: name of the camera. path: root directory of the dataset. scene: scene number. frame: frame number. ''' with open( f'{path}/simbev/sweeps/FLW-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-FLW-{camera_name}.npz', 'wb' ) as f: np.savez_compressed(f, data=self.save_queue.get(True, 10.0)) class BaseLidar(BaseSensor): ''' Base lidar class that manages the creation and data acquisition of lidars. Args: world: CARLA simulation world. sensor_manager: SensorManager instance that the lidar belongs to. transform: the lidar's transform relative to what it is attached to. attached: CARLA object the lidar is attached to. channels: number of lidar channels (beams). range: maximum range of the lidar. options: dictionary of lidar options. ''' def __init__(self, world, sensor_manager, transform, attached, channels, range, options): self.world = world self.sensor_manager = sensor_manager self.channels = channels self.range = range self.options = options self.frame = 0 self.point_list = o3d.geometry.PointCloud() # Create queues for rendering and saving point clouds. Since queues # are blocking, this ensures that at each time step point clouds are # fully acquired before the code continues. self.render_queue = Queue() self.save_queue = Queue() self._get_lidar() self.lidar_bp.set_attribute('channels', str(self.channels)) self.lidar_bp.set_attribute('range', str(self.range)) for key in options: self.lidar_bp.set_attribute(key, options[key]) self.lidar = self.world.spawn_actor(self.lidar_bp, transform, attach_to=attached) self.lidar.listen(self._process_point_cloud) def _get_lidar(self): ''' Add lidar to the SensorManager and get the lidar blueprint. ''' raise NotImplementedError def _process_point_cloud(self, point_cloud): ''' Callback function for processing raw point cloud data. Args: point_cloud: raw point cloud data. ''' raise NotImplementedError def clear_queues(self): ''' Clear the queues to ensure only the latest point cloud is accessible. ''' with self.render_queue.mutex: self.render_queue.queue.clear() self.render_queue.all_tasks_done.notify_all() self.render_queue.unfinished_tasks = 0 with self.save_queue.mutex: self.save_queue.queue.clear() self.save_queue.all_tasks_done.notify_all() self.save_queue.unfinished_tasks = 0 def _create_visualizer(self, window_name, width=1024, height=1024): ''' Create Open3D visualizer. Args: window_name: window name for the point cloud visualizer. width: visualizer window width in pixels. height: visualizer window height in pixels. ''' self.visualizer = o3d.visualization.Visualizer() self.visualizer.create_window(window_name=window_name, width=width, height=height, left=0, top=0) self.visualizer.get_render_option().point_size = 1.0 self.visualizer.get_render_option().background_color = [0.04, 0.04, 0.04] self.visualizer.get_render_option().show_coordinate_frame = True self.visualizer.add_geometry(self.point_list) def _draw_points(self, color): ''' Visualize point cloud data in Open3D. Args: color: point cloud colors. ''' self.point_list.points = o3d.utility.Vector3dVector(self.render_queue.get(True, 10.0)) self.point_list.colors = o3d.utility.Vector3dVector(color) if self.frame == 2: self.visualizer.add_geometry(self.point_list) # Place the camera at a height where the entire point cloud is # visible. The camera's field of view is 60 degrees by default. camera_height = np.sqrt(3.0) * self.range camera = self.visualizer.get_view_control().convert_to_pinhole_camera_parameters() pose = np.eye(4) pose[1, 1] = -1 pose[2, 2] = -1 pose[:3, 3] = [0, 0, camera_height] camera.extrinsic = pose self.visualizer.get_view_control().convert_from_pinhole_camera_parameters(camera) self.visualizer.update_geometry(self.point_list) self.visualizer.poll_events() self.visualizer.update_renderer() self.frame += 1 time.sleep(0.005) def destroy(self): ''' Destroy the lidar. ''' self.lidar.destroy() try: self.visualizer.destroy_window() except AttributeError: pass class Lidar(BaseLidar): ''' Lidar class that manages the creation and data acquisition of lidars. ''' def __init__(self, world, sensor_manager, transform, attached, channels, range, options): super().__init__(world, sensor_manager, transform, attached, channels, range, options) def _get_lidar(self): ''' Add lidar to the SensorManager and get the lidar blueprint. ''' self.sensor_manager.add_lidar(self) self.lidar_bp = self.world.get_blueprint_library().find('sensor.lidar.ray_cast') def _process_point_cloud(self, point_cloud): ''' Callback function for processing raw point cloud data. Args: point_cloud: raw point cloud data. ''' # Point cloud data contains x, y, z, and intensity values. self.data = np.copy(np.frombuffer(point_cloud.raw_data, dtype=np.dtype('f4'))) self.data = np.reshape(self.data, (int(self.data.shape[0] / 4), 4)) # Select the x, y, and z values and flip y data because CARLA uses a # left-handed coordinate system. self.points = self.data[:, :-1] self.points[:, 1] *= -1 # Remove the points belonging to the ego vehicle. The limits are based # on the 2016 Mustang dimensions and should be adjusted for other # vehicles. mask = np.logical_or(abs(self.points[:, 0]) > 2.44, abs(self.points[:, 1]) > 1.04) self.data = self.data[mask] self.points = self.points[mask] # Put the point cloud in both queues. self.render_queue.put(self.points) self.save_queue.put(self.points) def render(self): ''' Render point cloud. ''' if self.frame == 0: self._create_visualizer(window_name='Lidar Point Cloud') # Generate point cloud colors based on intensity values. distance = np.linalg.norm(self.points, axis=1) distance_log = np.log(distance) distance_log_normalized = ( distance_log - distance_log.min() ) / ( distance_log.max() - distance_log.min() + 1e-6 ) intensity_color = np.c_[ np.interp(distance_log_normalized, RANGE, RAINBOW[:, 0]), np.interp(distance_log_normalized, RANGE, RAINBOW[:, 1]), np.interp(distance_log_normalized, RANGE, RAINBOW[:, 2]) ] self._draw_points(intensity_color) def save(self, path, scene, frame): ''' Save point cloud to file. Args: path: root directory of the dataset. scene: scene number. frame: frame number. ''' with open(f'{path}/simbev/sweeps/LIDAR/SimBEV-scene-{scene:04d}-frame-{frame:04d}-LIDAR.npz', 'wb') as f: np.savez_compressed(f, data=self.save_queue.get(True, 10.0)) class SemanticLidar(BaseLidar): ''' Semantic lidar class that manages the creation and data acquisition of semantic lidars. ''' def __init__(self, world, sensor_manager, transform, attached, channels, range, options): super().__init__(world, sensor_manager, transform, attached, channels, range, options) def _get_lidar(self): ''' Add semantic lidar to the SensorManager and get the semantic lidar blueprint. ''' self.sensor_manager.add_semantic_lidar(self) self.lidar_bp = self.world.get_blueprint_library().find('sensor.lidar.ray_cast_semantic') def _process_point_cloud(self, point_cloud): ''' Callback function for processing raw point cloud data. Args: point_cloud: raw point cloud data. ''' # Point cloud data contains x, y, z, cosine angle, object index, and # object tag values. self.data = np.copy( np.frombuffer( point_cloud.raw_data, dtype=np.dtype([ ('x', np.float32), ('y', np.float32), ('z', np.float32), ('CosAngle', np.float32), ('ObjIdx', np.uint32), ('ObjTag', np.uint32)] ) ) ) # Select the x, y, and z values and flip y data because CARLA uses a # left-handed coordinate system. self.data['y'] *= -1 self.points = np.array([self.data['x'], self.data['y'], self.data['z']]).T # Remove the points belonging to the ego vehicle. The limits are based # on the 2016 Mustang dimensions and should be adjusted for other # vehicles. mask = np.logical_or(abs(self.points[:, 0]) > 2.44, abs(self.points[:, 1]) > 1.04) labels = np.array(self.data['ObjTag']) # Set point cloud colors based on object labels. self.label_color = LABEL_COLORS[labels] self.data = self.data[mask] self.points = self.points[mask] self.label_color = self.label_color[mask] # Put the point cloud in the render queue and the entire data in the # save queue. self.render_queue.put(self.points) self.save_queue.put(self.data) def render(self): ''' Render point cloud. ''' if self.frame == 0: self._create_visualizer(window_name='Semantic Lidar Point Cloud') self._draw_points(self.label_color) def save(self, path, scene, frame): ''' Save point cloud to file. Args: path: root directory of the dataset. scene: scene number. frame: frame number. ''' with open( f'{path}/simbev/sweeps/SEG-LIDAR/SimBEV-scene-{scene:04d}-frame-{frame:04d}-SEG-LIDAR.npz', 'wb' ) as f: np.savez_compressed(f, data=self.save_queue.get(True, 10.0)) class Radar(BaseSensor): ''' Base radar class that manages the creation and data acquisition of radars. Args: world: CARLA simulation world. sensor_manager: SensorManager instance that the radar belongs to. transform: the radar's transform relative to what it is attached to. attached: CARLA object the radar is attached to. range: maximum range of the radar. hfov: horizontal field of view of the radar. vfov: vertical field of view of the radar. options: dictionary of radar options. ''' def __init__(self, world, sensor_manager, transform, attached, range, hfov, vfov, options): self.world = world self.sensor_manager = sensor_manager self.range = range self.hfov = hfov self.vfov = vfov self.options = options self.frame = 0 self.point_list = o3d.geometry.PointCloud() # Create queues for rendering and saving point clouds. Since queues # are blocking, this ensures that at each time step point clouds are # fully acquired before the code continues. self.render_queue = Queue() self.save_queue = Queue() self._get_radar() self.radar_bp.set_attribute('range', str(self.range)) self.radar_bp.set_attribute('horizontal_fov', str(self.hfov)) self.radar_bp.set_attribute('vertical_fov', str(self.vfov)) for key in options: self.radar_bp.set_attribute(key, options[key]) self.radar = self.world.spawn_actor(self.radar_bp, transform, attach_to=attached) self.radar.listen(self._process_point_cloud) def _get_radar(self): ''' Add radar to the SensorManager and get the radar blueprint. ''' self.sensor_manager.add_radar(self) self.radar_bp = self.world.get_blueprint_library().find('sensor.other.radar') def _process_point_cloud(self, point_cloud): ''' Callback function for processing raw point cloud data. Args: point_cloud: raw point cloud data. ''' # Point cloud data contains depth, altitude, azimuth, and velocity # values. self.data = np.copy(np.frombuffer(point_cloud.raw_data, dtype=np.dtype('f4'))) self.data = np.reshape(self.data, (int(self.data.shape[0] / 4), 4)) self.data = self.data[:, ::-1] # Flip azimuth values because CARLA uses a left-handed coordinate system. self.data[:, 2] *= -1 # Put the point cloud in both queues. self.render_queue.put(self.data) self.save_queue.put(self.data) def clear_queues(self): ''' Clear the queues to ensure only the latest point cloud is accessible. ''' with self.render_queue.mutex: self.render_queue.queue.clear() self.render_queue.all_tasks_done.notify_all() self.render_queue.unfinished_tasks = 0 with self.save_queue.mutex: self.save_queue.queue.clear() self.save_queue.all_tasks_done.notify_all() self.save_queue.unfinished_tasks = 0 def _create_visualizer(self, window_name, width=1024, height=1024): ''' Create Open3D visualizer. Args: window_name: window name for the point cloud visualizer. width: visualizer window width in pixels. height: visualizer window height in pixels. ''' self.visualizer = o3d.visualization.Visualizer() self.visualizer.create_window(window_name=window_name, width=width, height=height, left=0, top=0) self.visualizer.get_render_option().point_size = 4.0 self.visualizer.get_render_option().background_color = [0.04, 0.04, 0.04] self.visualizer.get_render_option().show_coordinate_frame = True self.visualizer.add_geometry(self.point_list) def _draw_points(self, color): ''' Visualize point cloud data in Open3D. Args: color: point cloud colors. ''' self.point_list.points = o3d.utility.Vector3dVector(self.points) self.point_list.colors = o3d.utility.Vector3dVector(color) if self.frame == 2: self.visualizer.add_geometry(self.point_list) # Place the camera at a height where the entire point cloud is # visible. The camera's field of view is 60 degrees by default. camera_height = np.sqrt(3.0) * self.range * max(0.5, np.sin(np.deg2rad(self.hfov / 2))) camera = self.visualizer.get_view_control().convert_to_pinhole_camera_parameters() pose = np.eye(4) pose[1, 1] = -1 pose[2, 2] = -1 pose[:3, 3] = [-camera_height / np.sqrt(3.0), 0, camera_height] camera.extrinsic = pose self.visualizer.get_view_control().convert_from_pinhole_camera_parameters(camera) self.visualizer.update_geometry(self.point_list) self.visualizer.poll_events() self.visualizer.update_renderer() self.frame += 1 time.sleep(0.005) def render(self, window_name='Radar Point Cloud'): ''' Render point cloud. Args: window_name: window name for the point cloud visualizer. ''' if self.frame == 0: self._create_visualizer(window_name=window_name) radar_data = self.render_queue.get(True, 10.0) points = radar_data[:, :-1] # Convert the radar values of depth, altitude angle, and azimuth angle # to x, y, and z coordinates. x = points[:, 0] * np.cos(points[:, 1]) * np.cos(points[:, 2]) y = points[:, 0] * np.cos(points[:, 1]) * np.sin(points[:, 2]) z = points[:, 0] * np.sin(points[:, 1]) self.points = np.array([x, y, z]).T # Generate point cloud colors based on velocity values. velocity = np.abs(radar_data[:, -1]) velocity_log = np.log(1.0 + velocity) velocity_log_normalized = ( velocity_log - velocity_log.min() ) / ( velocity_log.max() - velocity_log.min() + 1e-6 ) velocity_color = np.c_[ np.interp(velocity_log_normalized, RANGE, RAINBOW[:, 0]), np.interp(velocity_log_normalized, RANGE, RAINBOW[:, 1]), np.interp(velocity_log_normalized, RANGE, RAINBOW[:, 2]) ] self._draw_points(velocity_color) def save(self, radar_name, path, scene, frame): ''' Save point cloud to file. Args: radar_name: name of the radar. path: root directory of the dataset. scene: scene number. frame: frame number. ''' with open( f'{path}/simbev/sweeps/{radar_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-{radar_name}.npz', 'wb' ) as f: np.savez_compressed(f, data=self.save_queue.get(True, 10.0)) def destroy(self): ''' Destroy the radar. ''' self.radar.destroy() try: self.visualizer.destroy_window() except AttributeError: pass class GNSS(BaseSensor): ''' Base GNSS class that manages the creation and data acquisition of GNSS sensors. Args: world: CARLA simulation world. sensor_manager: SensorManager instance that the GNSS belongs to. transform: the GNSS' transform relative to what it is attached to. attached: CARLA object the GNSS is attached to. options: dictionary of GNSS options. ''' def __init__(self, world, sensor_manager, transform, attached, options): self.world = world self.sensor_manager = sensor_manager self.options = options # Create a queue for saving GNSS data. Since queues are blocking, this # ensures that at each time step GNSS data is fully acquired before # the code continues. self.save_queue = Queue() self._get_gnss() for key in options: self.gnss_bp.set_attribute(key, options[key]) self.gnss = self.world.spawn_actor(self.gnss_bp, transform, attach_to=attached) self.gnss.listen(self._process_data) def _get_gnss(self): ''' Add GNSS to the SensorManager and get the GNSS blueprint. ''' self.sensor_manager.add_gnss(self) self.gnss_bp = self.world.get_blueprint_library().find('sensor.other.gnss') def _process_data(self, data): ''' Callback function for processing GNSS data. Args: data: GNSS data. ''' # GNSS data contains latitude, longitude, and altitude values. self.data = np.array([data.latitude, data.longitude, data.altitude]) # Put the GNSS data in the save queue. self.save_queue.put(self.data) def clear_queue(self): ''' Clear the queue to ensure only the latest GNSS data is accessible. ''' with self.save_queue.mutex: self.save_queue.queue.clear() self.save_queue.all_tasks_done.notify_all() self.save_queue.unfinished_tasks = 0 def save(self, path, scene, frame): ''' Save GNSS data to file. Args: path: root directory of the dataset. scene: scene number. frame: frame number. ''' with open(f'{path}/simbev/sweeps/GNSS/SimBEV-scene-{scene:04d}-frame-{frame:04d}-GNSS.bin', 'wb') as f: np.save(f, self.save_queue.get(True, 10.0)) def destroy(self): ''' Destroy the GNSS. ''' self.gnss.destroy() class IMU(BaseSensor): ''' Base IMU class that manages the creation and data acquisition of IMU sensors. Args: world: CARLA simulation world. sensor_manager: SensorManager instance that the IMU belongs to. transform: the IMU's transform relative to what it is attached to. attached: CARLA object the IMU is attached to. options: dictionary of IMU options. ''' def __init__(self, world, sensor_manager, transform, attached, options): self.world = world self.sensor_manager = sensor_manager self.options = options # Create a queue for saving IMU data. Since queues are blocking, this # ensures that at each time step IMU data is fully acquired before the # code continues. self.save_queue = Queue() self._get_imu() for key in options: self.imu_bp.set_attribute(key, options[key]) self.imu = self.world.spawn_actor(self.imu_bp, transform, attach_to=attached) self.imu.listen(self._process_data) def _get_imu(self): ''' Add IMU to the SensorManager and get the IMU blueprint. ''' self.sensor_manager.add_imu(self) self.imu_bp = self.world.get_blueprint_library().find('sensor.other.imu') def _process_data(self, data): ''' Callback function for processing IMU data. Args: data: IMU data. ''' # IMU data contains accelerometer, gyroscope, and compass values. Some # values are negated because CARLA uses a left-handed coordinate # system. self.data = np.array([ data.accelerometer.x, -data.accelerometer.y, data.accelerometer.z, data.gyroscope.x, -data.gyroscope.y, -data.gyroscope.z, data.compass ]) # Put the IMU data in the save queue. self.save_queue.put(self.data) def clear_queue(self): ''' Clear the queue to ensure only the latest IMU data is accessible. ''' with self.save_queue.mutex: self.save_queue.queue.clear() self.save_queue.all_tasks_done.notify_all() self.save_queue.unfinished_tasks = 0 def save(self, path, scene, frame): ''' Save IMU data to file. Args: path: root directory of the dataset. scene: scene number. frame: frame number. ''' with open(f'{path}/simbev/sweeps/IMU/SimBEV-scene-{scene:04d}-frame-{frame:04d}-IMU.bin', 'wb') as f: np.save(f, self.save_queue.get(True, 10.0)) def destroy(self): ''' Destroy the IMU. ''' self.imu.destroy() class SemanticBEVCamera(SemanticCamera): ''' BEV semantic segmentation camera class that manages the creation and data acquisition of BEV semantic segmentation cameras. ''' def __init__(self, world, sensor_manager, transform, attached, width, height, options): super().__init__(world, sensor_manager, transform, attached, width, height, options) def _get_camera(self): ''' Add BEV semantic segmentation camera to the SensorManager and get the camera blueprint. ''' self.sensor_manager.add_semantic_bev_camera(self) self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.semantic_segmentation') def render(self, window_name='Segmented BEV Image'): ''' Render BEV semantic segmentation image. Args: window_name: window name for the rendered image. ''' cv2.imshow(window_name, self.render_queue.get(True, 10.0)) cv2.waitKey(1)