SimBEV-Preview / simbev / simbev.py
simbev.py
Raw
# Academic Software License: Copyright © 2024.

import os
import yaml
import time
import json
import copy
import argparse
import traceback

import numpy as np

from carla_core import CarlaCore, kill_all_servers

CAM2EGO_T = [
    [0.4, 0.4, 1.6],
    [0.6, 0.0, 1.6],
    [0.4, -0.4, 1.6],
    [0.0, 0.4, 1.6],
    [-1.0, 0.0, 1.6],
    [0.0, -0.4, 1.6]
]
CAM2EGO_R = [
    [0.6743797, -0.6743797, 0.2126311, -0.2126311],
    [0.5, -0.5, 0.5, -0.5],
    [0.2126311, -0.2126311, 0.6743797, -0.6743797],
    [0.6963642, -0.6963642, -0.1227878, 0.1227878],
    [0.5, -0.5, -0.5, 0.5],
    [0.1227878, -0.1227878, -0.6963642, 0.6963642]
]

LI2EGO_T = [0.0, 0.0, 1.8]
LI2EGO_R = [1.0, 0.0, 0.0, 0.0]

RAD2EGO_T = [
    [0.0, 1.0, 0.6],
    [2.4, 0.0, 0.6],
    [0.0, -1.0, 0.6],
    [-2.4, 0.0, 0.6]
]
RAD2EGO_R = [
    [0.7071067, 0.0, 0.0, 0.7071067],
    [1.0, 0.0, 0.0, 0.0],
    [0.7071067, 0.0, 0.0, -0.7071067],
    [0.0, 0.0, 0.0, 1.0]
]

CAM2LI_T = CAM2EGO_T - LI2EGO_T * np.ones((6, 3))
CAM2LI_R = CAM2EGO_R

RAD2LI_T = RAD2EGO_T - LI2EGO_T * np.ones((4, 3))
RAD2LI_R = RAD2EGO_R

CAM_I = [
    [953.4029, 0.0, 800.0],
    [0.0, 953.4029, 450.0],
    [0.0, 0.0, 1.0]
]

CAM_NAME = [
    'CAM_FRONT_LEFT',
    'CAM_FRONT',
    'CAM_FRONT_RIGHT',
    'CAM_BACK_LEFT',
    'CAM_BACK',
    'CAM_BACK_RIGHT'
]

RAD_NAME = ['RAD_LEFT', 'RAD_FRONT', 'RAD_RIGHT', 'RAD_BACK']


argparser = argparse.ArgumentParser(description='SimBEV is a CARLA-based driving data generation tool.')
    
argparser.add_argument('config', help='configuration file')
argparser.add_argument(
    '--path',
    default='/dataset',
    help='path for saving the dataset (default: /dataset)')
argparser.add_argument(
    '--render',
    action='store_true',
    help='render sensor data')
argparser.add_argument(
    '--save',
    action='store_true',
    help='save sensor data (used by default)')
argparser.add_argument(
    '--no-save',
    dest='save',
    action='store_false',
    help='do not save sensor data')

argparser.set_defaults(save=True)

args = argparser.parse_args()

def parse_config(args):
    '''
    Parse the configuration file.

    Args:
        args: command line arguments.

    Returns:
        config: configuration dictionary.
    '''
    with open(args.config) as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    return config

def generate_metadata():
    '''
    Generate dataset metadata from sensor transformations.
    
    Returns:
        metadata: dataset metadata.
    '''
    metadata = {}

    metadata['camera_intrinsics'] = CAM_I

    metadata['LIDAR'] = {
        'sensor2lidar_translation': [0.0, 0.0, 0.0],
        'sensor2lidar_rotation': [1.0, 0.0, 0.0, 0.0],
        'sensor2ego_translation': LI2EGO_T,
        'sensor2ego_rotation': LI2EGO_R
    }
    
    for i in range(6):
        metadata[CAM_NAME[i]] = {
            'sensor2lidar_translation': CAM2LI_T[i].tolist(),
            'sensor2lidar_rotation': CAM2LI_R[i],
            'sensor2ego_translation': CAM2EGO_T[i],
            'sensor2ego_rotation': CAM2EGO_R[i]
        }
    
    for i in range(4):
        metadata[RAD_NAME[i]] = {
            'sensor2lidar_translation': RAD2LI_T[i].tolist(),
            'sensor2lidar_rotation': RAD2LI_R[i],
            'sensor2ego_translation': RAD2EGO_T[i],
            'sensor2ego_rotation': RAD2EGO_R[i]
        }
        
    return metadata

def main():
    args.config = parse_config(args)

    metadata = generate_metadata()

    try:
        if args.save:
            for name in CAM_NAME:
                if args.config['use_rgb_camera']:
                    os.makedirs(f'{args.path}/simbev/sweeps/RGB-{name}', exist_ok=True)
            
                if args.config['use_semantic_camera']:
                    os.makedirs(f'{args.path}/simbev/sweeps/SEG-{name}', exist_ok=True)
            
                if args.config['use_instance_camera']:
                    os.makedirs(f'{args.path}/simbev/sweeps/IST-{name}', exist_ok=True)
            
                if args.config['use_depth_camera']:
                    os.makedirs(f'{args.path}/simbev/sweeps/DPT-{name}', exist_ok=True)
            
                if args.config['use_flow_camera']:
                    os.makedirs(f'{args.path}/simbev/sweeps/FLW-{name}', exist_ok=True)
            
            if args.config['use_lidar']:
                os.makedirs(f'{args.path}/simbev/sweeps/LIDAR', exist_ok=True)
            
            if args.config['use_semantic_lidar']:
                os.makedirs(f'{args.path}/simbev/sweeps/SEG-LIDAR', exist_ok=True)
            
            if args.config['use_radar']:
                for name in RAD_NAME:
                    os.makedirs(f'{args.path}/simbev/sweeps/{name}', exist_ok=True)
            
            if args.config['use_gnss']:
                os.makedirs(f'{args.path}/simbev/sweeps/GNSS', exist_ok=True)
            
            if args.config['use_imu']:
                os.makedirs(f'{args.path}/simbev/sweeps/IMU', exist_ok=True)
            
            os.makedirs(f'{args.path}/simbev/ground-truth/seg', exist_ok=True)
            os.makedirs(f'{args.path}/simbev/ground-truth/det', exist_ok=True)
            os.makedirs(f'{args.path}/simbev/ground-truth/seg_viz', exist_ok=True)
            
            os.makedirs(f'{args.path}/simbev/infos', exist_ok=True)
            
            os.makedirs(f'{args.path}/simbev/logs', exist_ok=True)

            os.makedirs(f'{args.path}/simbev/configs', exist_ok=True)

        if args.config['mode'] == 'create':
            scene_counter = 0

            core = CarlaCore(args.config)

            # Load Town01 once to get around a bug in CARLA where the
            # pedestrian navigation information for the wrong town is loaded.
            core.load_map('Town01')

            core.spawn_vehicle()
            
            core.start_scene()

            core.tick()

            core.stop_scene()

            core.destroy_vehicle()
            
            # Check to see how many scenes have already been created. Then,
            # create the remaining scenes.
            for split in ['train', 'val', 'test']:
                if args.save and os.path.exists(f'{args.path}/simbev/infos/simbev_infos_{split}.json'):
                    with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'r') as f:
                        infos = json.load(f)
                    
                    scene_counter += len(infos['data'])

            for split in ['train', 'val', 'test']:
                data = {}

                if args.save and os.path.exists(f'{args.path}/simbev/infos/simbev_infos_{split}.json'):
                    with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'r') as f:
                        infos = json.load(f)

                    data = infos['data']

                    # For each split and each town, check how many scenes have
                    # already been created.
                    for key in data.keys():
                        town = data[key]['scene_info']['map']
                        
                        if town in args.config[f'{split}_scene_config']:
                            args.config[f'{split}_scene_config'][town] -= 1

                # Create the scenes for each town.
                if args.config[f'{split}_scene_config'] is not None:
                    for town in args.config[f'{split}_scene_config']:
                        if args.config[f'{split}_scene_config'][town] > 0:
                            core.connect_client()
                            
                            core.load_map(town)

                            core.spawn_vehicle()

                            vehicle_moved = False

                            for _ in range(args.config[f'{split}_scene_config'][town]):
                                print(f'Creating scene {scene_counter:04d} in {town} for the {split} set...')
                                
                                if vehicle_moved:
                                    core.move_vehicle()
                                
                                vehicle_moved = True

                                core.start_scene()

                                for _ in range(round(args.config['warmup_duration'] / args.config['timestep'])):
                                    core.tick()

                                # Start logging the scene.
                                if args.save:
                                    core.client.start_recorder(
                                        f'{args.path}/simbev/logs/SimBEV-scene-{scene_counter:04d}.log',
                                        True
                                    )

                                for j in range(round(args.config['scene_duration'] / args.config['timestep'])):
                                    core.tick(args.path, scene_counter, j, args.render, args.save)

                                if args.save:
                                    # Stop logging the scene.
                                    core.client.stop_recorder()

                                    scene_data = core.package_data()

                                    scene_data['scene_info']['log'] = f'{args.path}/simbev/logs/' \
                                        f'SimBEV-scene-{scene_counter:04d}.log'
                                    scene_data['scene_info']['config'] = f'{args.path}/simbev/configs/' \
                                        f'SimBEV-scene-{scene_counter:04d}.yaml'

                                    data[f'scene_{scene_counter:04d}'] = copy.deepcopy(scene_data)
                                    
                                    info = {'metadata': metadata, 'data': data}

                                    with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'w') as f:
                                        json.dump(info, f, indent=4)
                                    
                                    with open(
                                        f'{args.path}/simbev/configs/SimBEV-scene-{scene_counter:04d}.yaml',
                                        'w'
                                    ) as f:
                                        yaml.dump(args.config, f)

                                core.stop_scene()
                                
                                scene_counter += 1

                            core.destroy_vehicle()
        
        elif args.config['mode'] == 'replace' and args.save:
            core = CarlaCore(args.config)

            # Load Town01 once to get around a bug in CARLA where the
            # pedestrian navigation information for the wrong town is loaded.
            core.load_map('Town01')

            core.spawn_vehicle()
            
            core.start_scene()

            core.tick()

            core.stop_scene()

            core.destroy_vehicle()
            
            for split in ['train', 'val', 'test']:
                if os.path.exists(f'{args.path}/simbev/infos/simbev_infos_{split}.json'):
                    with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'r') as f:
                        infos = json.load(f)

                    data = infos['data']

                    # Replace the specified scenes.
                    for scene_counter in args.config['scene_config']:
                        if f'scene_{scene_counter:04d}' in data.keys():
                            town = data[f'scene_{scene_counter:04d}']['scene_info']['map']

                            if town != core.map_name:
                                core.connect_client()
                                
                                core.load_map(town)

                            print(f'Replacing scene {scene_counter:04d} in {town} for the {split} set...')
                            
                            core.spawn_vehicle()
                            
                            core.start_scene()

                            for _ in range(round(args.config['warmup_duration'] / args.config['timestep'])):
                                core.tick()

                            core.client.start_recorder(
                                f'{args.path}/simbev/logs/SimBEV-scene-{scene_counter:04d}.log',
                                True
                            )

                            for j in range(round(args.config['scene_duration'] / args.config['timestep'])):
                                core.tick(args.path, scene_counter, j, args.render, args.save)

                            core.client.stop_recorder()

                            scene_data = core.package_data()

                            scene_data['scene_info']['log'] = f'{args.path}/simbev/logs/' \
                                f'SimBEV-scene-{scene_counter:04d}.log'
                            scene_data['scene_info']['config'] = f'{args.path}/simbev/configs/' \
                                f'SimBEV-scene-{scene_counter:04d}.yaml'

                            data[f'scene_{scene_counter:04d}'] = copy.deepcopy(scene_data)
                            
                            info = {'metadata': metadata, 'data': data}

                            with open(f'{args.path}/simbev/infos/simbev_infos_{split}.json', 'w') as f:
                                json.dump(info, f, indent=4)

                            with open(f'{args.path}/simbev/configs/SimBEV-scene-{scene_counter:04d}.yaml', 'w') as f:
                                yaml.dump(args.config, f)
                            
                            core.stop_scene()

                            core.destroy_vehicle()
        
        print('Killing all servers...')
        
        kill_all_servers()

    except Exception:
        print(traceback.format_exc())

        print('Killing all servers...')

        kill_all_servers()

        time.sleep(3.0)

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Killing all servers...')
        
        kill_all_servers()

        time.sleep(3.0)
    finally:
        print('Done.')