# Academic Software License: Copyright © 2024.
'''
Sensor manager module that collects data from all sensors on a vehicle and
renders or saves them.
'''
import cv2
import numpy as np
from utils import CustomTimer
from scipy.spatial.transform import Rotation as R
class SensorManager:
'''
Sensor Manager class that manages data collection.
Args:
core: CarlaCore instance that manages the simulation.
vehicle: vehicle the sensor manager belongs to.
'''
def __init__(self, core, vehicle):
self.core = core
self.vehicle = vehicle
self.camera_list = []
self.semantic_camera_list = []
self.instance_camera_list = []
self.depth_camera_list = []
self.flow_camera_list = []
self.lidar_list = []
self.semantic_lidar_list = []
self.radar_list = []
self.gnss_list = []
self.imu_list = []
self.semantic_bev_camera_list = []
self.timer = CustomTimer()
self.camera_name_list = ['CAM_FRONT_LEFT', 'CAM_FRONT', 'CAM_FRONT_RIGHT',
'CAM_BACK_LEFT', 'CAM_BACK', 'CAM_BACK_RIGHT']
self.radar_name_list = ['RAD_LEFT', 'RAD_FRONT', 'RAD_RIGHT', 'RAD_BACK']
self.bev_camera_name_list = ['TOP_VIEW', 'BOTTOM_VIEW']
self.data = []
def add_camera(self, camera):
'''
Add camera to the list of cameras.
Args:
camera: camera to add to the list of cameras.
'''
self.camera_list.append(camera)
def add_semantic_camera(self, semantic_camera):
'''
Add semantic segmentation camera to the list of cameras.
Args:
semantic_camera: semantic segmentation camera to add to the list
of cameras.
'''
self.semantic_camera_list.append(semantic_camera)
def add_instance_camera(self, instance_camera):
'''
Add instance segmentation camera to the list of cameras.
Args:
instance_camera: instance segmentation camera to add to the list
of cameras.
'''
self.instance_camera_list.append(instance_camera)
def add_depth_camera(self, depth_camera):
'''
Add depth camera to the list of cameras.
Args:
depth_camera: depth camera to add to the list of cameras.
'''
self.depth_camera_list.append(depth_camera)
def add_flow_camera(self, flow_camera):
'''
Add optical flow camera to the list of cameras.
Args:
flow_camera: optical flow camera to add to the list of cameras.
'''
self.flow_camera_list.append(flow_camera)
def add_lidar(self, lidar):
'''
Add lidar to the list of lidars.
Args:
lidar: lidar to add to the list of lidars.
'''
self.lidar_list.append(lidar)
def add_semantic_lidar(self, semantic_lidar):
'''
Add semantic lidar to the list of semantic lidars.
Args:
semantic_lidar: semantic lidar to add to the list of semantic
lidars.
'''
self.semantic_lidar_list.append(semantic_lidar)
def add_radar(self, radar):
'''
Add radar to the list of radars.
Args:
radar: radar to add to the list of radars.
'''
self.radar_list.append(radar)
def add_gnss(self, gnss):
'''
Add GNSS sensor to the list of GNSS sensors.
Args:
gnss: GNSS sensor to add to the list of GNSS sensors.
'''
self.gnss_list.append(gnss)
def add_imu(self, imu):
'''
Add IMU sensor to the list of IMU sensors.
Args:
imu: IMU sensor to add to the list of IMU sensors.
'''
self.imu_list.append(imu)
def add_semantic_bev_camera(self, semantic_bev_camera):
'''
Add semantic segmentation camera to the list of BEV cameras.
Args:
semantic_bev_camera: semantic segmentation camera to add to the
list of BEV cameras.
'''
self.semantic_bev_camera_list.append(semantic_bev_camera)
def clear_queues(self):
'''
Clear sensor queues.
'''
for camera in self.camera_list:
camera.clear_queues()
for semantic_camera in self.semantic_camera_list:
semantic_camera.clear_queues()
for instance_camera in self.instance_camera_list:
instance_camera.clear_queues()
for depth_camera in self.depth_camera_list:
depth_camera.clear_queues()
for flow_camera in self.flow_camera_list:
flow_camera.clear_queues()
for lidar in self.lidar_list:
lidar.clear_queues()
for semantic_lidar in self.semantic_lidar_list:
semantic_lidar.clear_queues()
for radar in self.radar_list:
radar.clear_queues()
for gnss in self.gnss_list:
gnss.clear_queue()
for imu in self.imu_list:
imu.clear_queue()
for semantic_bev_camera in self.semantic_bev_camera_list:
semantic_bev_camera.clear_queues()
def render(self):
'''
Render sensor data.
'''
for camera, window_name in zip(self.camera_list, self.camera_name_list):
camera.render(window_name)
for semantic_camera, window_name in zip(self.semantic_camera_list, self.camera_name_list):
semantic_camera.render('SEG ' + window_name)
for instance_camera, window_name in zip(self.instance_camera_list, self.camera_name_list):
instance_camera.render('IST ' + window_name)
for depth_camera, window_name in zip(self.depth_camera_list, self.camera_name_list):
depth_camera.render('DPT ' + window_name)
for flow_camera, window_name in zip(self.flow_camera_list, self.camera_name_list):
flow_camera.render('FLW ' + window_name)
for lidar in self.lidar_list:
lidar.render()
for semantic_lidar in self.semantic_lidar_list:
semantic_lidar.render()
for radar, window_name in zip(self.radar_list, self.radar_name_list):
radar.render(window_name)
for semantic_bev_camera, window_name in zip(self.semantic_bev_camera_list, self.bev_camera_name_list):
semantic_bev_camera.render(window_name)
def save(self, path, scene, frame):
'''
Save sensor data.
Args:
path: root directory of the dataset.
scene: scene number.
frame: frame number.
'''
for camera, camera_name in zip(self.camera_list, self.camera_name_list):
camera.save(camera_name, path, scene, frame)
for semantic_camera, camera_name in zip(self.semantic_camera_list, self.camera_name_list):
semantic_camera.save(camera_name, path, scene, frame)
for instance_camera, camera_name in zip(self.instance_camera_list, self.camera_name_list):
instance_camera.save(camera_name, path, scene, frame)
for depth_camera, camera_name in zip(self.depth_camera_list, self.camera_name_list):
depth_camera.save(camera_name, path, scene, frame)
for flow_camera, camera_name in zip(self.flow_camera_list, self.camera_name_list):
flow_camera.save(camera_name, path, scene, frame)
for lidar in self.lidar_list:
lidar.save(path, scene, frame)
for semantic_lidar in self.semantic_lidar_list:
semantic_lidar.save(path, scene, frame)
for radar, radar_name in zip(self.radar_list, self.radar_name_list):
radar.save(radar_name, path, scene, frame)
for gnss in self.gnss_list:
gnss.save(path, scene, frame)
for imu in self.imu_list:
imu.save(path, scene, frame)
scene_data = {}
ego_transform = self.vehicle.get_transform()
scene_data['ego2global_translation'] = [ego_transform.location.x,
-ego_transform.location.y,
ego_transform.location.z]
scene_data['ego2global_rotation'] = np.roll(
R.from_euler(
'xyz',
[ego_transform.rotation.roll, -ego_transform.rotation.pitch, -ego_transform.rotation.yaw],
degrees=True).as_quat(), 1
).tolist()
scene_data['timestamp'] = round(self.timer.time() * 10e6)
for camera_name in self.camera_name_list:
if self.core.config['use_rgb_camera']:
scene_data[f'RGB-{camera_name}'] = f'{path}/simbev/sweeps/RGB-{camera_name}' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-RGB-{camera_name}.jpg'
if self.core.config['use_semantic_camera']:
scene_data[f'SEG-{camera_name}'] = f'{path}/simbev/sweeps/SEG-{camera_name}' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-SEG-{camera_name}.png'
if self.core.config['use_instance_camera']:
scene_data[f'IST-{camera_name}'] = f'{path}/simbev/sweeps/IST-{camera_name}' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-IST-{camera_name}.png'
if self.core.config['use_depth_camera']:
scene_data[f'DPT-{camera_name}'] = f'{path}/simbev/sweeps/DPT-{camera_name}' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-DPT-{camera_name}.png'
if self.core.config['use_flow_camera']:
scene_data[f'FLW-{camera_name}'] = f'{path}/simbev/sweeps/FLW-{camera_name}' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-FLW-{camera_name}.npz'
if self.core.config['use_lidar']:
scene_data['LIDAR'] = f'{path}/simbev/sweeps/LIDAR/SimBEV-scene-{scene:04d}-frame-{frame:04d}-LIDAR.npz'
if self.core.config['use_semantic_lidar']:
scene_data['SEG-LIDAR'] = f'{path}/simbev/sweeps/SEG-LIDAR' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-SEG-LIDAR.npz'
if self.core.config['use_radar']:
for radar_name in self.radar_name_list:
scene_data[f'{radar_name}'] = f'{path}/simbev/sweeps/{radar_name}' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-{radar_name}.npz'
if self.core.config['use_gnss']:
scene_data['GNSS'] = f'{path}/simbev/sweeps/GNSS/SimBEV-scene-{scene:04d}-frame-{frame:04d}-GNSS.bin'
if self.core.config['use_imu']:
scene_data['IMU'] = f'{path}/simbev/sweeps/IMU/SimBEV-scene-{scene:04d}-frame-{frame:04d}-IMU.bin'
scene_data['GT_SEG'] = f'{path}/simbev/ground-truth/seg/SimBEV-scene-{scene:04d}-frame-{frame:04d}-GT_SEG.npz'
scene_data['GT_SEG_VIZ'] = f'{path}/simbev/ground-truth/seg_viz' \
f'/SimBEV-scene-{scene:04d}-frame-{frame:04d}-GT_SEG_VIZ.jpg'
scene_data['GT_DET'] = f'{path}/simbev/ground-truth/det/SimBEV-scene-{scene:04d}-frame-{frame:04d}-GT_DET.bin'
scene_data['scene'] = scene
scene_data['frame'] = frame
self.data.append(scene_data)
def reset(self):
'''
Reset scenario data.
'''
self.data = []
def destroy(self):
'''
Destroy sensors.
'''
for camera in self.camera_list:
camera.destroy()
for semantic_camera in self.semantic_camera_list:
semantic_camera.destroy()
for instance_camera in self.instance_camera_list:
instance_camera.destroy()
for depth_camera in self.depth_camera_list:
depth_camera.destroy()
for flow_camera in self.flow_camera_list:
flow_camera.destroy()
for lidar in self.lidar_list:
lidar.destroy()
for semantic_lidar in self.semantic_lidar_list:
semantic_lidar.destroy()
for radar in self.radar_list:
radar.destroy()
for gnss in self.gnss_list:
gnss.destroy()
for imu in self.imu_list:
imu.destroy()
for semantic_bev_camera in self.semantic_bev_camera_list:
semantic_bev_camera.destroy()
cv2.destroyAllWindows()