# Academic Software License: Copyright © 2024.
'''
SimBEV perception and navigation sensors.
'''
import cv2
import time
import carla
import flow_vis
import numpy as np
import open3d as o3d
from queue import Queue
from matplotlib import colormaps as cm
RANGE = np.linspace(0.0, 1.0, 256)
RAINBOW = np.array(cm.get_cmap('rainbow')(RANGE))[:, :3]
LABEL_COLORS = np.array([
(255, 255, 255), # None
(128, 64, 128), # Road
(244, 35, 232), # Sidewalk
(70, 70, 70), # Building
(102, 102, 156), # Wall
(190, 153, 153), # Fence
(153, 153, 153), # Pole
(250, 170, 30), # TrafficLight
(220, 220, 0), # TrafficSign
(107, 142, 35), # Vegetation
(152, 251, 152), # Terrain
(70, 130, 180), # Sky
(220, 20, 60), # Pedestrian
(255, 0, 0), # Rider
(0, 0, 142), # Car
(0, 0, 70), # Truck
(0, 60, 100), # Bus
(0, 80, 100), # Train
(0, 0, 230), # Motorcycle
(119, 11, 32), # Bicycle
(110, 190, 160), # Static
(170, 120, 50), # Dynamic
(55, 90, 80), # Other
(45, 60, 150), # Water
(227, 227, 227), # RoadLine
(81, 0, 81), # Ground
(150, 100, 100), # Bridge
(230, 150, 140), # RailTrack
(180, 165, 180), # GuardRail
]) / 255.0
class BaseSensor:
'''
Base sensor class that manages the creation and data acquisition of a
sensor.
Args:
world: CARLA simulation world.
sensor_manager: SensorManager instance that the sensor belongs to.
transform: sensor's transform relative to what it is attached to.
attached: CARLA object the sensor is attached to.
'''
def __init__(self, world, sensor_manager, transform, attached):
self.world = world
self.sensor_manager = sensor_manager
def render(self):
'''
Render sensor data.
'''
raise NotImplementedError
def save(self):
'''
Save sensor data to file.
'''
raise NotImplementedError
def destroy(self):
'''
Destroy the sensor.
'''
raise NotImplementedError
class BaseCamera(BaseSensor):
'''
Base camera class that manages the creation and data acquisition of
cameras.
Args:
world: CARLA simulation world.
sensor_manager: SensorManager instance that the camera belongs to.
transform: the camera's transform relative to what it is attached to.
attached: CARLA object the camera is attached to.
width: image width in pixels.
height: image height in pixels.
options: dictionary of camera options.
'''
def __init__(self, world, sensor_manager, transform, attached, width, height, options):
self.world = world
self.sensor_manager = sensor_manager
self.width = width
self.height = height
self.options = options
self.image = None
# Create queues for rendering and saving images. Since queues are
# blocking, this ensures that at each time step images are fully
# acquired before the code continues.
self.render_queue = Queue()
self.save_queue = Queue()
self._get_camera()
self.camera_bp.set_attribute('image_size_x', str(self.width))
self.camera_bp.set_attribute('image_size_y', str(self.height))
for key in options:
self.camera_bp.set_attribute(key, options[key])
self.camera = self.world.spawn_actor(self.camera_bp, transform, attach_to=attached)
self.camera.listen(self._process_image)
def _get_camera(self):
'''
Add camera to the SensorManager and get the camera blueprint.
'''
raise NotImplementedError
def _process_image(self, image):
'''
Callback function for processing raw image data.
Args:
image: raw image data.
'''
# Convert image colors if necessary and reshape into a
# (height, width, 4) NumPy array.
image.convert(carla.ColorConverter.Raw)
array = np.frombuffer(image.raw_data, dtype=np.uint8)
array = np.reshape(array, (image.height, image.width, 4))
# Remove the alpha channel.
self.image = array[:, :, :3]
# Put the image in both queues.
self.render_queue.put(self.image)
self.save_queue.put(self.image)
def clear_queues(self):
'''
Clear the queues to ensure only the latest image is accessible.
'''
with self.render_queue.mutex:
self.render_queue.queue.clear()
self.render_queue.all_tasks_done.notify_all()
self.render_queue.unfinished_tasks = 0
with self.save_queue.mutex:
self.save_queue.queue.clear()
self.save_queue.all_tasks_done.notify_all()
self.save_queue.unfinished_tasks = 0
def destroy(self):
'''
Destroy the camera.
'''
self.camera.destroy()
class RGBCamera(BaseCamera):
'''
RGB camera class that manages the creation and data acquisition of RGB
cameras.
'''
def __init__(self, world, sensor_manager, transform, attached, width, height, options):
super().__init__(world, sensor_manager, transform, attached, width, height, options)
def _get_camera(self):
'''
Add RGB camera to the SensorManager and get the camera blueprint.
'''
self.sensor_manager.add_camera(self)
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.rgb')
def render(self, window_name='RGB Image'):
'''
Render RGB image.
Args:
window_name: window name for the rendered image.
'''
cv2.imshow(window_name, cv2.resize(self.render_queue.get(True, 10.0), (self.width // 4, self.height // 4)))
cv2.waitKey(1)
def save(self, camera_name, path, scene, frame):
'''
Save RGB image to file.
Args:
camera_name: name of the camera.
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
cv2.imwrite(
f'{path}/simbev/sweeps/RGB-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-RGB-{camera_name}.jpg',
self.save_queue.get(True, 10.0)
)
class SemanticCamera(BaseCamera):
'''
Semantic segmentation camera class that manages the creation and data
acquisition of semantic segmentation cameras.
'''
def __init__(self, world, sensor_manager, transform, attached, width, height, options):
super().__init__(world, sensor_manager, transform, attached, width, height, options)
def _get_camera(self):
'''
Add semantic segmentation camera to the SensorManager and get the
camera blueprint.
'''
self.sensor_manager.add_semantic_camera(self)
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.semantic_segmentation')
def _process_image(self, image):
'''
Callback function for processing raw image data.
Args:
image: raw image data.
'''
# Convert image colors using the CityScapes palette and reshape into a
# (height, width, 4) NumPy array.
image.convert(carla.ColorConverter.CityScapesPalette)
array = np.frombuffer(image.raw_data, dtype=np.uint8)
array = np.reshape(array, (image.height, image.width, 4))
# Remove the alpha channel.
self.image = array[:, :, :3]
# Put the image in both queues.
self.render_queue.put(self.image)
self.save_queue.put(self.image)
def render(self, window_name='Segmented Image'):
'''
Render semantic segmentation image.
Args:
window_name: window name for the rendered image.
'''
cv2.imshow(window_name, cv2.resize(self.render_queue.get(True, 10.0), (self.width // 4, self.height // 4)))
cv2.waitKey(1)
def save(self, camera_name, path, scene, frame):
'''
Save semantic segmentation image to file.
Args:
camera_name: name of the camera.
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
cv2.imwrite(
f'{path}/simbev/sweeps/SEG-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-SEG-{camera_name}.png',
self.save_queue.get(True, 10.0)
)
class InstanceCamera(BaseCamera):
'''
Instance segmentation camera class that manages the creation and data
acquisition of instance segmentation cameras.
'''
def __init__(self, world, sensor_manager, transform, attached, width, height, options):
super().__init__(world, sensor_manager, transform, attached, width, height, options)
def _get_camera(self):
'''
Add instance segmentation camera to the SensorManager and get the
camera blueprint.
'''
self.sensor_manager.add_instance_camera(self)
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.instance_segmentation')
def render(self, window_name='Instance Image'):
'''
Render instance segmentation image.
Args:
window_name: window name for the rendered image.
'''
cv2.imshow(window_name, cv2.resize(self.render_queue.get(True, 10.0), (self.width // 4, self.height // 4)))
cv2.waitKey(1)
def save(self, camera_name, path, scene, frame):
'''
Save instance segmentation image to file.
Args:
camera_name: name of the camera.
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
cv2.imwrite(
f'{path}/simbev/sweeps/IST-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-IST-{camera_name}.png',
self.save_queue.get(True, 10.0)
)
class DepthCamera(BaseCamera):
'''
Depth camera class that manages the creation and data acquisition of depth
cameras.
'''
def __init__(self, world, sensor_manager, transform, attached, width, height, options):
super().__init__(world, sensor_manager, transform, attached, width, height, options)
def _get_camera(self):
'''
Add depth camera to the SensorManager and get the camera blueprint.
'''
self.sensor_manager.add_depth_camera(self)
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.depth')
def render(self, window_name='Depth Image'):
'''
Render depth image.
Args:
window_name: window name for the rendered image.
'''
image = self.render_queue.get(True, 10.0)
# 1000 * (R + G * 256 + B * 256 * 256) / (256 * 256 * 256 - 1)
# provides the depth value in meters. This is then converted to a
# logarithmic scale for better visualization.
normalized_distance = (image[:, :, 2] + image[:, :, 1] * 256.0 + image[:, :, 0] * 256.0 * 256.0) \
/ (256.0 * 256.0 * 256.0 - 1)
log_distance = 255 * np.log(256.0 * normalized_distance + 1) / np.log(257.0)
cv2.imshow(window_name, cv2.resize(log_distance.astype(np.uint8), (self.width // 4, self.height // 4)))
cv2.waitKey(1)
def save(self, camera_name, path, scene, frame):
'''
Save depth image to file.
Args:
camera_name: name of the camera.
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
cv2.imwrite(
f'{path}/simbev/sweeps/DPT-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-DPT-{camera_name}.png',
self.save_queue.get(True, 10.0)
)
class FlowCamera(BaseCamera):
'''
Flow camera class that manages the creation and data acquisition of flow
cameras.
'''
def __init__(self, world, sensor_manager, transform, attached, width, height, options):
super().__init__(world, sensor_manager, transform, attached, width, height, options)
def _get_camera(self):
'''
Add flow camera to the SensorManager and get the camera blueprint.
'''
self.sensor_manager.add_flow_camera(self)
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.optical_flow')
def _process_image(self, image):
'''
Callback function for processing raw image data.
Args:
image: raw image data.
'''
# Reshape into a (height, width, 2) NumPy array.
array = np.frombuffer(image.raw_data, dtype=np.float32)
self.image = np.reshape(array, (image.height, image.width, 2))
# Put the image in both queues.
self.render_queue.put(self.image)
self.save_queue.put(self.image)
def render(self, window_name='Flow Image'):
'''
Render flow image.
Args:
window_name: window name for the rendered image.
'''
image = flow_vis.flow_to_color(self.render_queue.get(True, 10.0), convert_to_bgr=True)
cv2.imshow(window_name, cv2.resize(image, (self.width // 4, self.height // 4)))
cv2.waitKey(1)
def save(self, camera_name, path, scene, frame):
'''
Save flow image to file.
Args:
camera_name: name of the camera.
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
with open(
f'{path}/simbev/sweeps/FLW-{camera_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-FLW-{camera_name}.npz',
'wb'
) as f:
np.savez_compressed(f, data=self.save_queue.get(True, 10.0))
class BaseLidar(BaseSensor):
'''
Base lidar class that manages the creation and data acquisition of lidars.
Args:
world: CARLA simulation world.
sensor_manager: SensorManager instance that the lidar belongs to.
transform: the lidar's transform relative to what it is attached to.
attached: CARLA object the lidar is attached to.
channels: number of lidar channels (beams).
range: maximum range of the lidar.
options: dictionary of lidar options.
'''
def __init__(self, world, sensor_manager, transform, attached, channels, range, options):
self.world = world
self.sensor_manager = sensor_manager
self.channels = channels
self.range = range
self.options = options
self.frame = 0
self.point_list = o3d.geometry.PointCloud()
# Create queues for rendering and saving point clouds. Since queues
# are blocking, this ensures that at each time step point clouds are
# fully acquired before the code continues.
self.render_queue = Queue()
self.save_queue = Queue()
self._get_lidar()
self.lidar_bp.set_attribute('channels', str(self.channels))
self.lidar_bp.set_attribute('range', str(self.range))
for key in options:
self.lidar_bp.set_attribute(key, options[key])
self.lidar = self.world.spawn_actor(self.lidar_bp, transform, attach_to=attached)
self.lidar.listen(self._process_point_cloud)
def _get_lidar(self):
'''
Add lidar to the SensorManager and get the lidar blueprint.
'''
raise NotImplementedError
def _process_point_cloud(self, point_cloud):
'''
Callback function for processing raw point cloud data.
Args:
point_cloud: raw point cloud data.
'''
raise NotImplementedError
def clear_queues(self):
'''
Clear the queues to ensure only the latest point cloud is accessible.
'''
with self.render_queue.mutex:
self.render_queue.queue.clear()
self.render_queue.all_tasks_done.notify_all()
self.render_queue.unfinished_tasks = 0
with self.save_queue.mutex:
self.save_queue.queue.clear()
self.save_queue.all_tasks_done.notify_all()
self.save_queue.unfinished_tasks = 0
def _create_visualizer(self, window_name, width=1024, height=1024):
'''
Create Open3D visualizer.
Args:
window_name: window name for the point cloud visualizer.
width: visualizer window width in pixels.
height: visualizer window height in pixels.
'''
self.visualizer = o3d.visualization.Visualizer()
self.visualizer.create_window(window_name=window_name, width=width, height=height, left=0, top=0)
self.visualizer.get_render_option().point_size = 1.0
self.visualizer.get_render_option().background_color = [0.04, 0.04, 0.04]
self.visualizer.get_render_option().show_coordinate_frame = True
self.visualizer.add_geometry(self.point_list)
def _draw_points(self, color):
'''
Visualize point cloud data in Open3D.
Args:
color: point cloud colors.
'''
self.point_list.points = o3d.utility.Vector3dVector(self.render_queue.get(True, 10.0))
self.point_list.colors = o3d.utility.Vector3dVector(color)
if self.frame == 2:
self.visualizer.add_geometry(self.point_list)
# Place the camera at a height where the entire point cloud is
# visible. The camera's field of view is 60 degrees by default.
camera_height = np.sqrt(3.0) * self.range
camera = self.visualizer.get_view_control().convert_to_pinhole_camera_parameters()
pose = np.eye(4)
pose[1, 1] = -1
pose[2, 2] = -1
pose[:3, 3] = [0, 0, camera_height]
camera.extrinsic = pose
self.visualizer.get_view_control().convert_from_pinhole_camera_parameters(camera)
self.visualizer.update_geometry(self.point_list)
self.visualizer.poll_events()
self.visualizer.update_renderer()
self.frame += 1
time.sleep(0.005)
def destroy(self):
'''
Destroy the lidar.
'''
self.lidar.destroy()
try:
self.visualizer.destroy_window()
except AttributeError:
pass
class Lidar(BaseLidar):
'''
Lidar class that manages the creation and data acquisition of lidars.
'''
def __init__(self, world, sensor_manager, transform, attached, channels, range, options):
super().__init__(world, sensor_manager, transform, attached, channels, range, options)
def _get_lidar(self):
'''
Add lidar to the SensorManager and get the lidar blueprint.
'''
self.sensor_manager.add_lidar(self)
self.lidar_bp = self.world.get_blueprint_library().find('sensor.lidar.ray_cast')
def _process_point_cloud(self, point_cloud):
'''
Callback function for processing raw point cloud data.
Args:
point_cloud: raw point cloud data.
'''
# Point cloud data contains x, y, z, and intensity values.
self.data = np.copy(np.frombuffer(point_cloud.raw_data, dtype=np.dtype('f4')))
self.data = np.reshape(self.data, (int(self.data.shape[0] / 4), 4))
# Select the x, y, and z values and flip y data because CARLA uses a
# left-handed coordinate system.
self.points = self.data[:, :-1]
self.points[:, 1] *= -1
# Remove the points belonging to the ego vehicle. The limits are based
# on the 2016 Mustang dimensions and should be adjusted for other
# vehicles.
mask = np.logical_or(abs(self.points[:, 0]) > 2.44, abs(self.points[:, 1]) > 1.04)
self.data = self.data[mask]
self.points = self.points[mask]
# Put the point cloud in both queues.
self.render_queue.put(self.points)
self.save_queue.put(self.points)
def render(self):
'''
Render point cloud.
'''
if self.frame == 0:
self._create_visualizer(window_name='Lidar Point Cloud')
# Generate point cloud colors based on intensity values.
distance = np.linalg.norm(self.points, axis=1)
distance_log = np.log(distance)
distance_log_normalized = (
distance_log - distance_log.min()
) / (
distance_log.max() - distance_log.min() + 1e-6
)
intensity_color = np.c_[
np.interp(distance_log_normalized, RANGE, RAINBOW[:, 0]),
np.interp(distance_log_normalized, RANGE, RAINBOW[:, 1]),
np.interp(distance_log_normalized, RANGE, RAINBOW[:, 2])
]
self._draw_points(intensity_color)
def save(self, path, scene, frame):
'''
Save point cloud to file.
Args:
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
with open(f'{path}/simbev/sweeps/LIDAR/SimBEV-scene-{scene:04d}-frame-{frame:04d}-LIDAR.npz', 'wb') as f:
np.savez_compressed(f, data=self.save_queue.get(True, 10.0))
class SemanticLidar(BaseLidar):
'''
Semantic lidar class that manages the creation and data acquisition of
semantic lidars.
'''
def __init__(self, world, sensor_manager, transform, attached, channels, range, options):
super().__init__(world, sensor_manager, transform, attached, channels, range, options)
def _get_lidar(self):
'''
Add semantic lidar to the SensorManager and get the semantic lidar
blueprint.
'''
self.sensor_manager.add_semantic_lidar(self)
self.lidar_bp = self.world.get_blueprint_library().find('sensor.lidar.ray_cast_semantic')
def _process_point_cloud(self, point_cloud):
'''
Callback function for processing raw point cloud data.
Args:
point_cloud: raw point cloud data.
'''
# Point cloud data contains x, y, z, cosine angle, object index, and
# object tag values.
self.data = np.copy(
np.frombuffer(
point_cloud.raw_data,
dtype=np.dtype([
('x', np.float32),
('y', np.float32),
('z', np.float32),
('CosAngle', np.float32),
('ObjIdx', np.uint32),
('ObjTag', np.uint32)]
)
)
)
# Select the x, y, and z values and flip y data because CARLA uses a
# left-handed coordinate system.
self.data['y'] *= -1
self.points = np.array([self.data['x'], self.data['y'], self.data['z']]).T
# Remove the points belonging to the ego vehicle. The limits are based
# on the 2016 Mustang dimensions and should be adjusted for other
# vehicles.
mask = np.logical_or(abs(self.points[:, 0]) > 2.44, abs(self.points[:, 1]) > 1.04)
labels = np.array(self.data['ObjTag'])
# Set point cloud colors based on object labels.
self.label_color = LABEL_COLORS[labels]
self.data = self.data[mask]
self.points = self.points[mask]
self.label_color = self.label_color[mask]
# Put the point cloud in the render queue and the entire data in the
# save queue.
self.render_queue.put(self.points)
self.save_queue.put(self.data)
def render(self):
'''
Render point cloud.
'''
if self.frame == 0:
self._create_visualizer(window_name='Semantic Lidar Point Cloud')
self._draw_points(self.label_color)
def save(self, path, scene, frame):
'''
Save point cloud to file.
Args:
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
with open(
f'{path}/simbev/sweeps/SEG-LIDAR/SimBEV-scene-{scene:04d}-frame-{frame:04d}-SEG-LIDAR.npz',
'wb'
) as f:
np.savez_compressed(f, data=self.save_queue.get(True, 10.0))
class Radar(BaseSensor):
'''
Base radar class that manages the creation and data acquisition of radars.
Args:
world: CARLA simulation world.
sensor_manager: SensorManager instance that the radar belongs to.
transform: the radar's transform relative to what it is attached to.
attached: CARLA object the radar is attached to.
range: maximum range of the radar.
hfov: horizontal field of view of the radar.
vfov: vertical field of view of the radar.
options: dictionary of radar options.
'''
def __init__(self, world, sensor_manager, transform, attached, range, hfov, vfov, options):
self.world = world
self.sensor_manager = sensor_manager
self.range = range
self.hfov = hfov
self.vfov = vfov
self.options = options
self.frame = 0
self.point_list = o3d.geometry.PointCloud()
# Create queues for rendering and saving point clouds. Since queues
# are blocking, this ensures that at each time step point clouds are
# fully acquired before the code continues.
self.render_queue = Queue()
self.save_queue = Queue()
self._get_radar()
self.radar_bp.set_attribute('range', str(self.range))
self.radar_bp.set_attribute('horizontal_fov', str(self.hfov))
self.radar_bp.set_attribute('vertical_fov', str(self.vfov))
for key in options:
self.radar_bp.set_attribute(key, options[key])
self.radar = self.world.spawn_actor(self.radar_bp, transform, attach_to=attached)
self.radar.listen(self._process_point_cloud)
def _get_radar(self):
'''
Add radar to the SensorManager and get the radar blueprint.
'''
self.sensor_manager.add_radar(self)
self.radar_bp = self.world.get_blueprint_library().find('sensor.other.radar')
def _process_point_cloud(self, point_cloud):
'''
Callback function for processing raw point cloud data.
Args:
point_cloud: raw point cloud data.
'''
# Point cloud data contains depth, altitude, azimuth, and velocity
# values.
self.data = np.copy(np.frombuffer(point_cloud.raw_data, dtype=np.dtype('f4')))
self.data = np.reshape(self.data, (int(self.data.shape[0] / 4), 4))
self.data = self.data[:, ::-1]
# Flip azimuth values because CARLA uses a left-handed coordinate system.
self.data[:, 2] *= -1
# Put the point cloud in both queues.
self.render_queue.put(self.data)
self.save_queue.put(self.data)
def clear_queues(self):
'''
Clear the queues to ensure only the latest point cloud is accessible.
'''
with self.render_queue.mutex:
self.render_queue.queue.clear()
self.render_queue.all_tasks_done.notify_all()
self.render_queue.unfinished_tasks = 0
with self.save_queue.mutex:
self.save_queue.queue.clear()
self.save_queue.all_tasks_done.notify_all()
self.save_queue.unfinished_tasks = 0
def _create_visualizer(self, window_name, width=1024, height=1024):
'''
Create Open3D visualizer.
Args:
window_name: window name for the point cloud visualizer.
width: visualizer window width in pixels.
height: visualizer window height in pixels.
'''
self.visualizer = o3d.visualization.Visualizer()
self.visualizer.create_window(window_name=window_name, width=width, height=height, left=0, top=0)
self.visualizer.get_render_option().point_size = 4.0
self.visualizer.get_render_option().background_color = [0.04, 0.04, 0.04]
self.visualizer.get_render_option().show_coordinate_frame = True
self.visualizer.add_geometry(self.point_list)
def _draw_points(self, color):
'''
Visualize point cloud data in Open3D.
Args:
color: point cloud colors.
'''
self.point_list.points = o3d.utility.Vector3dVector(self.points)
self.point_list.colors = o3d.utility.Vector3dVector(color)
if self.frame == 2:
self.visualizer.add_geometry(self.point_list)
# Place the camera at a height where the entire point cloud is
# visible. The camera's field of view is 60 degrees by default.
camera_height = np.sqrt(3.0) * self.range * max(0.5, np.sin(np.deg2rad(self.hfov / 2)))
camera = self.visualizer.get_view_control().convert_to_pinhole_camera_parameters()
pose = np.eye(4)
pose[1, 1] = -1
pose[2, 2] = -1
pose[:3, 3] = [-camera_height / np.sqrt(3.0), 0, camera_height]
camera.extrinsic = pose
self.visualizer.get_view_control().convert_from_pinhole_camera_parameters(camera)
self.visualizer.update_geometry(self.point_list)
self.visualizer.poll_events()
self.visualizer.update_renderer()
self.frame += 1
time.sleep(0.005)
def render(self, window_name='Radar Point Cloud'):
'''
Render point cloud.
Args:
window_name: window name for the point cloud visualizer.
'''
if self.frame == 0:
self._create_visualizer(window_name=window_name)
radar_data = self.render_queue.get(True, 10.0)
points = radar_data[:, :-1]
# Convert the radar values of depth, altitude angle, and azimuth angle
# to x, y, and z coordinates.
x = points[:, 0] * np.cos(points[:, 1]) * np.cos(points[:, 2])
y = points[:, 0] * np.cos(points[:, 1]) * np.sin(points[:, 2])
z = points[:, 0] * np.sin(points[:, 1])
self.points = np.array([x, y, z]).T
# Generate point cloud colors based on velocity values.
velocity = np.abs(radar_data[:, -1])
velocity_log = np.log(1.0 + velocity)
velocity_log_normalized = (
velocity_log - velocity_log.min()
) / (
velocity_log.max() - velocity_log.min() + 1e-6
)
velocity_color = np.c_[
np.interp(velocity_log_normalized, RANGE, RAINBOW[:, 0]),
np.interp(velocity_log_normalized, RANGE, RAINBOW[:, 1]),
np.interp(velocity_log_normalized, RANGE, RAINBOW[:, 2])
]
self._draw_points(velocity_color)
def save(self, radar_name, path, scene, frame):
'''
Save point cloud to file.
Args:
radar_name: name of the radar.
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
with open(
f'{path}/simbev/sweeps/{radar_name}/SimBEV-scene-{scene:04d}-frame-{frame:04d}-{radar_name}.npz',
'wb'
) as f:
np.savez_compressed(f, data=self.save_queue.get(True, 10.0))
def destroy(self):
'''
Destroy the radar.
'''
self.radar.destroy()
try:
self.visualizer.destroy_window()
except AttributeError:
pass
class GNSS(BaseSensor):
'''
Base GNSS class that manages the creation and data acquisition of GNSS
sensors.
Args:
world: CARLA simulation world.
sensor_manager: SensorManager instance that the GNSS belongs to.
transform: the GNSS' transform relative to what it is attached to.
attached: CARLA object the GNSS is attached to.
options: dictionary of GNSS options.
'''
def __init__(self, world, sensor_manager, transform, attached, options):
self.world = world
self.sensor_manager = sensor_manager
self.options = options
# Create a queue for saving GNSS data. Since queues are blocking, this
# ensures that at each time step GNSS data is fully acquired before
# the code continues.
self.save_queue = Queue()
self._get_gnss()
for key in options:
self.gnss_bp.set_attribute(key, options[key])
self.gnss = self.world.spawn_actor(self.gnss_bp, transform, attach_to=attached)
self.gnss.listen(self._process_data)
def _get_gnss(self):
'''
Add GNSS to the SensorManager and get the GNSS blueprint.
'''
self.sensor_manager.add_gnss(self)
self.gnss_bp = self.world.get_blueprint_library().find('sensor.other.gnss')
def _process_data(self, data):
'''
Callback function for processing GNSS data.
Args:
data: GNSS data.
'''
# GNSS data contains latitude, longitude, and altitude values.
self.data = np.array([data.latitude, data.longitude, data.altitude])
# Put the GNSS data in the save queue.
self.save_queue.put(self.data)
def clear_queue(self):
'''
Clear the queue to ensure only the latest GNSS data is accessible.
'''
with self.save_queue.mutex:
self.save_queue.queue.clear()
self.save_queue.all_tasks_done.notify_all()
self.save_queue.unfinished_tasks = 0
def save(self, path, scene, frame):
'''
Save GNSS data to file.
Args:
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
with open(f'{path}/simbev/sweeps/GNSS/SimBEV-scene-{scene:04d}-frame-{frame:04d}-GNSS.bin', 'wb') as f:
np.save(f, self.save_queue.get(True, 10.0))
def destroy(self):
'''
Destroy the GNSS.
'''
self.gnss.destroy()
class IMU(BaseSensor):
'''
Base IMU class that manages the creation and data acquisition of IMU
sensors.
Args:
world: CARLA simulation world.
sensor_manager: SensorManager instance that the IMU belongs to.
transform: the IMU's transform relative to what it is attached to.
attached: CARLA object the IMU is attached to.
options: dictionary of IMU options.
'''
def __init__(self, world, sensor_manager, transform, attached, options):
self.world = world
self.sensor_manager = sensor_manager
self.options = options
# Create a queue for saving IMU data. Since queues are blocking, this
# ensures that at each time step IMU data is fully acquired before the
# code continues.
self.save_queue = Queue()
self._get_imu()
for key in options:
self.imu_bp.set_attribute(key, options[key])
self.imu = self.world.spawn_actor(self.imu_bp, transform, attach_to=attached)
self.imu.listen(self._process_data)
def _get_imu(self):
'''
Add IMU to the SensorManager and get the IMU blueprint.
'''
self.sensor_manager.add_imu(self)
self.imu_bp = self.world.get_blueprint_library().find('sensor.other.imu')
def _process_data(self, data):
'''
Callback function for processing IMU data.
Args:
data: IMU data.
'''
# IMU data contains accelerometer, gyroscope, and compass values. Some
# values are negated because CARLA uses a left-handed coordinate
# system.
self.data = np.array([
data.accelerometer.x,
-data.accelerometer.y,
data.accelerometer.z,
data.gyroscope.x,
-data.gyroscope.y,
-data.gyroscope.z,
data.compass
])
# Put the IMU data in the save queue.
self.save_queue.put(self.data)
def clear_queue(self):
'''
Clear the queue to ensure only the latest IMU data is accessible.
'''
with self.save_queue.mutex:
self.save_queue.queue.clear()
self.save_queue.all_tasks_done.notify_all()
self.save_queue.unfinished_tasks = 0
def save(self, path, scene, frame):
'''
Save IMU data to file.
Args:
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
with open(f'{path}/simbev/sweeps/IMU/SimBEV-scene-{scene:04d}-frame-{frame:04d}-IMU.bin', 'wb') as f:
np.save(f, self.save_queue.get(True, 10.0))
def destroy(self):
'''
Destroy the IMU.
'''
self.imu.destroy()
class SemanticBEVCamera(SemanticCamera):
'''
BEV semantic segmentation camera class that manages the creation and data
acquisition of BEV semantic segmentation cameras.
'''
def __init__(self, world, sensor_manager, transform, attached, width, height, options):
super().__init__(world, sensor_manager, transform, attached, width, height, options)
def _get_camera(self):
'''
Add BEV semantic segmentation camera to the SensorManager and get the
camera blueprint.
'''
self.sensor_manager.add_semantic_bev_camera(self)
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.semantic_segmentation')
def render(self, window_name='Segmented BEV Image'):
'''
Render BEV semantic segmentation image.
Args:
window_name: window name for the rendered image.
'''
cv2.imshow(window_name, self.render_queue.get(True, 10.0))
cv2.waitKey(1)