3dtest / tools /dataset_converters /lyft_converter.py
giantmonkeyTC
mm2
c2ca15f
# Copyright (c) OpenMMLab. All rights reserved.
import os
from logging import warning
from os import path as osp
import mmcv
import mmengine
import numpy as np
from lyft_dataset_sdk.lyftdataset import LyftDataset as Lyft
from pyquaternion import Quaternion
from mmdet3d.datasets.convert_utils import LyftNameMapping
from .nuscenes_converter import (get_2d_boxes, get_available_scenes,
obtain_sensor2top)
lyft_categories = ('car', 'truck', 'bus', 'emergency_vehicle', 'other_vehicle',
'motorcycle', 'bicycle', 'pedestrian', 'animal')
def create_lyft_infos(root_path,
info_prefix,
version='v1.01-train',
max_sweeps=10):
"""Create info file of lyft dataset.
Given the raw data, generate its related info file in pkl format.
Args:
root_path (str): Path of the data root.
info_prefix (str): Prefix of the info file to be generated.
version (str, optional): Version of the data.
Default: 'v1.01-train'.
max_sweeps (int, optional): Max number of sweeps.
Default: 10.
"""
lyft = Lyft(
data_path=osp.join(root_path, version),
json_path=osp.join(root_path, version, version),
verbose=True)
available_vers = ['v1.01-train', 'v1.01-test']
assert version in available_vers
if version == 'v1.01-train':
train_scenes = mmengine.list_from_file('data/lyft/train.txt')
val_scenes = mmengine.list_from_file('data/lyft/val.txt')
elif version == 'v1.01-test':
train_scenes = mmengine.list_from_file('data/lyft/test.txt')
val_scenes = []
else:
raise ValueError('unknown')
# filter existing scenes.
available_scenes = get_available_scenes(lyft)
available_scene_names = [s['name'] for s in available_scenes]
train_scenes = list(
filter(lambda x: x in available_scene_names, train_scenes))
val_scenes = list(filter(lambda x: x in available_scene_names, val_scenes))
train_scenes = set([
available_scenes[available_scene_names.index(s)]['token']
for s in train_scenes
])
val_scenes = set([
available_scenes[available_scene_names.index(s)]['token']
for s in val_scenes
])
test = 'test' in version
if test:
print(f'test scene: {len(train_scenes)}')
else:
print(f'train scene: {len(train_scenes)}, \
val scene: {len(val_scenes)}')
train_lyft_infos, val_lyft_infos = _fill_trainval_infos(
lyft, train_scenes, val_scenes, test, max_sweeps=max_sweeps)
metadata = dict(version=version)
if test:
print(f'test sample: {len(train_lyft_infos)}')
data = dict(infos=train_lyft_infos, metadata=metadata)
info_name = f'{info_prefix}_infos_test'
info_path = osp.join(root_path, f'{info_name}.pkl')
mmengine.dump(data, info_path)
else:
print(f'train sample: {len(train_lyft_infos)}, \
val sample: {len(val_lyft_infos)}')
data = dict(infos=train_lyft_infos, metadata=metadata)
train_info_name = f'{info_prefix}_infos_train'
info_path = osp.join(root_path, f'{train_info_name}.pkl')
mmengine.dump(data, info_path)
data['infos'] = val_lyft_infos
val_info_name = f'{info_prefix}_infos_val'
info_val_path = osp.join(root_path, f'{val_info_name}.pkl')
mmengine.dump(data, info_val_path)
def _fill_trainval_infos(lyft,
train_scenes,
val_scenes,
test=False,
max_sweeps=10):
"""Generate the train/val infos from the raw data.
Args:
lyft (:obj:`LyftDataset`): Dataset class in the Lyft dataset.
train_scenes (list[str]): Basic information of training scenes.
val_scenes (list[str]): Basic information of validation scenes.
test (bool, optional): Whether use the test mode. In the test mode, no
annotations can be accessed. Default: False.
max_sweeps (int, optional): Max number of sweeps. Default: 10.
Returns:
tuple[list[dict]]: Information of training set and
validation set that will be saved to the info file.
"""
train_lyft_infos = []
val_lyft_infos = []
for sample in mmengine.track_iter_progress(lyft.sample):
lidar_token = sample['data']['LIDAR_TOP']
sd_rec = lyft.get('sample_data', sample['data']['LIDAR_TOP'])
cs_record = lyft.get('calibrated_sensor',
sd_rec['calibrated_sensor_token'])
pose_record = lyft.get('ego_pose', sd_rec['ego_pose_token'])
abs_lidar_path, boxes, _ = lyft.get_sample_data(lidar_token)
# nuScenes devkit returns more convenient relative paths while
# lyft devkit returns absolute paths
abs_lidar_path = str(abs_lidar_path) # absolute path
lidar_path = abs_lidar_path.split(f'{os.getcwd()}/')[-1]
# relative path
mmengine.check_file_exist(lidar_path)
info = {
'lidar_path': lidar_path,
'num_features': 5,
'token': sample['token'],
'sweeps': [],
'cams': dict(),
'lidar2ego_translation': cs_record['translation'],
'lidar2ego_rotation': cs_record['rotation'],
'ego2global_translation': pose_record['translation'],
'ego2global_rotation': pose_record['rotation'],
'timestamp': sample['timestamp'],
}
l2e_r = info['lidar2ego_rotation']
l2e_t = info['lidar2ego_translation']
e2g_r = info['ego2global_rotation']
e2g_t = info['ego2global_translation']
l2e_r_mat = Quaternion(l2e_r).rotation_matrix
e2g_r_mat = Quaternion(e2g_r).rotation_matrix
# obtain 6 image's information per frame
camera_types = [
'CAM_FRONT',
'CAM_FRONT_RIGHT',
'CAM_FRONT_LEFT',
'CAM_BACK',
'CAM_BACK_LEFT',
'CAM_BACK_RIGHT',
]
for cam in camera_types:
cam_token = sample['data'][cam]
cam_path, _, cam_intrinsic = lyft.get_sample_data(cam_token)
cam_info = obtain_sensor2top(lyft, cam_token, l2e_t, l2e_r_mat,
e2g_t, e2g_r_mat, cam)
cam_info.update(cam_intrinsic=cam_intrinsic)
info['cams'].update({cam: cam_info})
# obtain sweeps for a single key-frame
sd_rec = lyft.get('sample_data', sample['data']['LIDAR_TOP'])
sweeps = []
while len(sweeps) < max_sweeps:
if not sd_rec['prev'] == '':
sweep = obtain_sensor2top(lyft, sd_rec['prev'], l2e_t,
l2e_r_mat, e2g_t, e2g_r_mat, 'lidar')
sweeps.append(sweep)
sd_rec = lyft.get('sample_data', sd_rec['prev'])
else:
break
info['sweeps'] = sweeps
# obtain annotation
if not test:
annotations = [
lyft.get('sample_annotation', token)
for token in sample['anns']
]
locs = np.array([b.center for b in boxes]).reshape(-1, 3)
dims = np.array([b.wlh for b in boxes]).reshape(-1, 3)
rots = np.array([b.orientation.yaw_pitch_roll[0]
for b in boxes]).reshape(-1, 1)
names = [b.name for b in boxes]
for i in range(len(names)):
if names[i] in LyftNameMapping:
names[i] = LyftNameMapping[names[i]]
names = np.array(names)
# we need to convert box size to
# the format of our lidar coordinate system
# which is x_size, y_size, z_size (corresponding to l, w, h)
gt_boxes = np.concatenate([locs, dims[:, [1, 0, 2]], rots], axis=1)
assert len(gt_boxes) == len(
annotations), f'{len(gt_boxes)}, {len(annotations)}'
info['gt_boxes'] = gt_boxes
info['gt_names'] = names
info['num_lidar_pts'] = np.array(
[a['num_lidar_pts'] for a in annotations])
info['num_radar_pts'] = np.array(
[a['num_radar_pts'] for a in annotations])
if sample['scene_token'] in train_scenes:
train_lyft_infos.append(info)
else:
val_lyft_infos.append(info)
return train_lyft_infos, val_lyft_infos
def export_2d_annotation(root_path, info_path, version):
"""Export 2d annotation from the info file and raw data.
Args:
root_path (str): Root path of the raw data.
info_path (str): Path of the info file.
version (str): Dataset version.
"""
warning.warn('DeprecationWarning: 2D annotations are not used on the '
'Lyft dataset. The function export_2d_annotation will be '
'deprecated.')
# get bbox annotations for camera
camera_types = [
'CAM_FRONT',
'CAM_FRONT_RIGHT',
'CAM_FRONT_LEFT',
'CAM_BACK',
'CAM_BACK_LEFT',
'CAM_BACK_RIGHT',
]
lyft_infos = mmengine.load(info_path)['infos']
lyft = Lyft(
data_path=osp.join(root_path, version),
json_path=osp.join(root_path, version, version),
verbose=True)
# info_2d_list = []
cat2Ids = [
dict(id=lyft_categories.index(cat_name), name=cat_name)
for cat_name in lyft_categories
]
coco_ann_id = 0
coco_2d_dict = dict(annotations=[], images=[], categories=cat2Ids)
for info in mmengine.track_iter_progress(lyft_infos):
for cam in camera_types:
cam_info = info['cams'][cam]
coco_infos = get_2d_boxes(
lyft,
cam_info['sample_data_token'],
visibilities=['', '1', '2', '3', '4'])
(height, width, _) = mmcv.imread(cam_info['data_path']).shape
coco_2d_dict['images'].append(
dict(
file_name=cam_info['data_path'],
id=cam_info['sample_data_token'],
width=width,
height=height))
for coco_info in coco_infos:
if coco_info is None:
continue
# add an empty key for coco format
coco_info['segmentation'] = []
coco_info['id'] = coco_ann_id
coco_2d_dict['annotations'].append(coco_info)
coco_ann_id += 1
mmengine.dump(coco_2d_dict, f'{info_path[:-4]}.coco.json')