# Academic Software License: Copyright © 2024. import os import yaml import time import json import copy import argparse import traceback import numpy as np from carla_core import CarlaCore, kill_all_servers CAM2EGO_T = [ [0.4, 0.4, 1.6], [0.6, 0.0, 1.6], [0.4, -0.4, 1.6], [0.0, 0.4, 1.6], [-1.0, 0.0, 1.6], [0.0, -0.4, 1.6] ] CAM2EGO_R = [ [0.6743797, -0.6743797, 0.2126311, -0.2126311], [0.5, -0.5, 0.5, -0.5], [0.2126311, -0.2126311, 0.6743797, -0.6743797], [0.6963642, -0.6963642, -0.1227878, 0.1227878], [0.5, -0.5, -0.5, 0.5], [0.1227878, -0.1227878, -0.6963642, 0.6963642] ] LI2EGO_T = [0.0, 0.0, 1.8] LI2EGO_R = [1.0, 0.0, 0.0, 0.0] RAD2EGO_T = [ [0.0, 1.0, 0.6], [2.4, 0.0, 0.6], [0.0, -1.0, 0.6], [-2.4, 0.0, 0.6] ] RAD2EGO_R = [ [0.7071067, 0.0, 0.0, 0.7071067], [1.0, 0.0, 0.0, 0.0], [0.7071067, 0.0, 0.0, -0.7071067], [0.0, 0.0, 0.0, 1.0] ] CAM2LI_T = CAM2EGO_T - LI2EGO_T * np.ones((6, 3)) CAM2LI_R = CAM2EGO_R RAD2LI_T = RAD2EGO_T - LI2EGO_T * np.ones((4, 3)) RAD2LI_R = RAD2EGO_R CAM_I = [ [953.4029, 0.0, 800.0], [0.0, 953.4029, 450.0], [0.0, 0.0, 1.0] ] CAM_NAME = [ 'CAM_FRONT_LEFT', 'CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_BACK_LEFT', 'CAM_BACK', 'CAM_BACK_RIGHT' ] RAD_NAME = ['RAD_LEFT', 'RAD_FRONT', 'RAD_RIGHT', 'RAD_BACK'] argparser = argparse.ArgumentParser(description='SimBEV is a CARLA-based driving data generation tool.') argparser.add_argument('config', help='configuration file') argparser.add_argument( '--path', default='/dataset', help='path for saving the dataset (default: /dataset)') argparser.add_argument( '--render', action='store_true', help='render sensor data') argparser.add_argument( '--save', action='store_true', help='save sensor data (used by default)') argparser.add_argument( '--no-save', dest='save', action='store_false', help='do not save sensor data') argparser.set_defaults(save=True) args = argparser.parse_args() def parse_config(args): ''' Parse the configuration file. Args: args: command line arguments. Returns: config: configuration dictionary. ''' with open(args.config) as f: config = yaml.load(f, Loader=yaml.FullLoader) return config def generate_metadata(): ''' Generate dataset metadata from sensor transformations. Returns: metadata: dataset metadata. ''' metadata = {} metadata['camera_intrinsics'] = CAM_I metadata['LIDAR'] = { 'sensor2lidar_translation': [0.0, 0.0, 0.0], 'sensor2lidar_rotation': [1.0, 0.0, 0.0, 0.0], 'sensor2ego_translation': LI2EGO_T, 'sensor2ego_rotation': LI2EGO_R } for i in range(6): metadata[CAM_NAME[i]] = { 'sensor2lidar_translation': CAM2LI_T[i].tolist(), 'sensor2lidar_rotation': CAM2LI_R[i], 'sensor2ego_translation': CAM2EGO_T[i], 'sensor2ego_rotation': CAM2EGO_R[i] } for i in range(4): metadata[RAD_NAME[i]] = { 'sensor2lidar_translation': RAD2LI_T[i].tolist(), 'sensor2lidar_rotation': RAD2LI_R[i], 'sensor2ego_translation': RAD2EGO_T[i], 'sensor2ego_rotation': RAD2EGO_R[i] } return metadata def main(): args.config = parse_config(args) metadata = generate_metadata() try: if args.save: for name in CAM_NAME: if args.config['use_rgb_camera']: os.makedirs(f'{args.path}/simbev/sweeps/RGB-{name}', exist_ok=True) if args.config['use_semantic_camera']: os.makedirs(f'{args.path}/simbev/sweeps/SEG-{name}', exist_ok=True) if args.config['use_instance_camera']: os.makedirs(f'{args.path}/simbev/sweeps/IST-{name}', exist_ok=True) if args.config['use_depth_camera']: os.makedirs(f'{args.path}/simbev/sweeps/DPT-{name}', exist_ok=True) if args.config['use_flow_camera']: os.makedirs(f'{args.path}/simbev/sweeps/FLW-{name}', exist_ok=True) if args.config['use_lidar']: os.makedirs(f'{args.path}/simbev/sweeps/LIDAR', exist_ok=True) if args.config['use_semantic_lidar']: os.makedirs(f'{args.path}/simbev/sweeps/SEG-LIDAR', exist_ok=True) if args.config['use_radar']: for name in RAD_NAME: os.makedirs(f'{args.path}/simbev/sweeps/{name}', exist_ok=True) if args.config['use_gnss']: os.makedirs(f'{args.path}/simbev/sweeps/GNSS', exist_ok=True) if args.config['use_imu']: os.makedirs(f'{args.path}/simbev/sweeps/IMU', exist_ok=True) os.makedirs(f'{args.path}/simbev/ground-truth/seg', exist_ok=True) os.makedirs(f'{args.path}/simbev/ground-truth/det', exist_ok=True) os.makedirs(f'{args.path}/simbev/ground-truth/seg_viz', exist_ok=True) os.makedirs(f'{args.path}/simbev/infos', exist_ok=True) os.makedirs(f'{args.path}/simbev/logs', exist_ok=True) os.makedirs(f'{args.path}/simbev/configs', exist_ok=True) if args.config['mode'] == 'create': scene_counter = 0 core = CarlaCore(args.config) # Load Town01 once to get around a bug in CARLA where the # pedestrian navigation information for the wrong town is loaded. core.load_map('Town01') core.spawn_vehicle() core.start_scene() core.tick() core.stop_scene() core.destroy_vehicle() # Check to see how many scenes have already been created. Then, # create the remaining scenes. for split in ['train', 'val', 'test']: if args.save and os.path.exists(f'{args.path}/simbev/infos/simbev_infos_{split}.json'): with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'r') as f: infos = json.load(f) scene_counter += len(infos['data']) for split in ['train', 'val', 'test']: data = {} if args.save and os.path.exists(f'{args.path}/simbev/infos/simbev_infos_{split}.json'): with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'r') as f: infos = json.load(f) data = infos['data'] # For each split and each town, check how many scenes have # already been created. for key in data.keys(): town = data[key]['scene_info']['map'] if town in args.config[f'{split}_scene_config']: args.config[f'{split}_scene_config'][town] -= 1 # Create the scenes for each town. if args.config[f'{split}_scene_config'] is not None: for town in args.config[f'{split}_scene_config']: if args.config[f'{split}_scene_config'][town] > 0: core.connect_client() core.load_map(town) core.spawn_vehicle() vehicle_moved = False for _ in range(args.config[f'{split}_scene_config'][town]): print(f'Creating scene {scene_counter:04d} in {town} for the {split} set...') if vehicle_moved: core.move_vehicle() vehicle_moved = True core.start_scene() for _ in range(round(args.config['warmup_duration'] / args.config['timestep'])): core.tick() # Start logging the scene. if args.save: core.client.start_recorder( f'{args.path}/simbev/logs/SimBEV-scene-{scene_counter:04d}.log', True ) for j in range(round(args.config['scene_duration'] / args.config['timestep'])): core.tick(args.path, scene_counter, j, args.render, args.save) if args.save: # Stop logging the scene. core.client.stop_recorder() scene_data = core.package_data() scene_data['scene_info']['log'] = f'{args.path}/simbev/logs/' \ f'SimBEV-scene-{scene_counter:04d}.log' scene_data['scene_info']['config'] = f'{args.path}/simbev/configs/' \ f'SimBEV-scene-{scene_counter:04d}.yaml' data[f'scene_{scene_counter:04d}'] = copy.deepcopy(scene_data) info = {'metadata': metadata, 'data': data} with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'w') as f: json.dump(info, f, indent=4) with open( f'{args.path}/simbev/configs/SimBEV-scene-{scene_counter:04d}.yaml', 'w' ) as f: yaml.dump(args.config, f) core.stop_scene() scene_counter += 1 core.destroy_vehicle() elif args.config['mode'] == 'replace' and args.save: core = CarlaCore(args.config) # Load Town01 once to get around a bug in CARLA where the # pedestrian navigation information for the wrong town is loaded. core.load_map('Town01') core.spawn_vehicle() core.start_scene() core.tick() core.stop_scene() core.destroy_vehicle() for split in ['train', 'val', 'test']: if os.path.exists(f'{args.path}/simbev/infos/simbev_infos_{split}.json'): with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'r') as f: infos = json.load(f) data = infos['data'] # Replace the specified scenes. for scene_counter in args.config['scene_config']: if f'scene_{scene_counter:04d}' in data.keys(): town = data[f'scene_{scene_counter:04d}']['scene_info']['map'] if town != core.map_name: core.connect_client() core.load_map(town) print(f'Replacing scene {scene_counter:04d} in {town} for the {split} set...') core.spawn_vehicle() core.start_scene() for _ in range(round(args.config['warmup_duration'] / args.config['timestep'])): core.tick() core.client.start_recorder( f'{args.path}/simbev/logs/SimBEV-scene-{scene_counter:04d}.log', True ) for j in range(round(args.config['scene_duration'] / args.config['timestep'])): core.tick(args.path, scene_counter, j, args.render, args.save) core.client.stop_recorder() scene_data = core.package_data() scene_data['scene_info']['log'] = f'{args.path}/simbev/logs/' \ f'SimBEV-scene-{scene_counter:04d}.log' scene_data['scene_info']['config'] = f'{args.path}/simbev/configs/' \ f'SimBEV-scene-{scene_counter:04d}.yaml' data[f'scene_{scene_counter:04d}'] = copy.deepcopy(scene_data) info = {'metadata': metadata, 'data': data} with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'w') as f: json.dump(info, f, indent=4) with open(f'{args.path}/simbev/configs/SimBEV-scene-{scene_counter:04d}.yaml', 'w') as f: yaml.dump(args.config, f) core.stop_scene() core.destroy_vehicle() print('Killing all servers...') kill_all_servers() except Exception: print(traceback.format_exc()) print('Killing all servers...') kill_all_servers() time.sleep(3.0) if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Killing all servers...') kill_all_servers() time.sleep(3.0) finally: print('Done.')