|
|
|
import os |
|
from logging import warning |
|
from os import path as osp |
|
|
|
import mmcv |
|
import mmengine |
|
import numpy as np |
|
from lyft_dataset_sdk.lyftdataset import LyftDataset as Lyft |
|
from pyquaternion import Quaternion |
|
|
|
from mmdet3d.datasets.convert_utils import LyftNameMapping |
|
from .nuscenes_converter import (get_2d_boxes, get_available_scenes, |
|
obtain_sensor2top) |
|
|
|
lyft_categories = ('car', 'truck', 'bus', 'emergency_vehicle', 'other_vehicle', |
|
'motorcycle', 'bicycle', 'pedestrian', 'animal') |
|
|
|
|
|
def create_lyft_infos(root_path, |
|
info_prefix, |
|
version='v1.01-train', |
|
max_sweeps=10): |
|
"""Create info file of lyft dataset. |
|
|
|
Given the raw data, generate its related info file in pkl format. |
|
|
|
Args: |
|
root_path (str): Path of the data root. |
|
info_prefix (str): Prefix of the info file to be generated. |
|
version (str, optional): Version of the data. |
|
Default: 'v1.01-train'. |
|
max_sweeps (int, optional): Max number of sweeps. |
|
Default: 10. |
|
""" |
|
lyft = Lyft( |
|
data_path=osp.join(root_path, version), |
|
json_path=osp.join(root_path, version, version), |
|
verbose=True) |
|
available_vers = ['v1.01-train', 'v1.01-test'] |
|
assert version in available_vers |
|
if version == 'v1.01-train': |
|
train_scenes = mmengine.list_from_file('data/lyft/train.txt') |
|
val_scenes = mmengine.list_from_file('data/lyft/val.txt') |
|
elif version == 'v1.01-test': |
|
train_scenes = mmengine.list_from_file('data/lyft/test.txt') |
|
val_scenes = [] |
|
else: |
|
raise ValueError('unknown') |
|
|
|
|
|
available_scenes = get_available_scenes(lyft) |
|
available_scene_names = [s['name'] for s in available_scenes] |
|
train_scenes = list( |
|
filter(lambda x: x in available_scene_names, train_scenes)) |
|
val_scenes = list(filter(lambda x: x in available_scene_names, val_scenes)) |
|
train_scenes = set([ |
|
available_scenes[available_scene_names.index(s)]['token'] |
|
for s in train_scenes |
|
]) |
|
val_scenes = set([ |
|
available_scenes[available_scene_names.index(s)]['token'] |
|
for s in val_scenes |
|
]) |
|
|
|
test = 'test' in version |
|
if test: |
|
print(f'test scene: {len(train_scenes)}') |
|
else: |
|
print(f'train scene: {len(train_scenes)}, \ |
|
val scene: {len(val_scenes)}') |
|
train_lyft_infos, val_lyft_infos = _fill_trainval_infos( |
|
lyft, train_scenes, val_scenes, test, max_sweeps=max_sweeps) |
|
|
|
metadata = dict(version=version) |
|
if test: |
|
print(f'test sample: {len(train_lyft_infos)}') |
|
data = dict(infos=train_lyft_infos, metadata=metadata) |
|
info_name = f'{info_prefix}_infos_test' |
|
info_path = osp.join(root_path, f'{info_name}.pkl') |
|
mmengine.dump(data, info_path) |
|
else: |
|
print(f'train sample: {len(train_lyft_infos)}, \ |
|
val sample: {len(val_lyft_infos)}') |
|
data = dict(infos=train_lyft_infos, metadata=metadata) |
|
train_info_name = f'{info_prefix}_infos_train' |
|
info_path = osp.join(root_path, f'{train_info_name}.pkl') |
|
mmengine.dump(data, info_path) |
|
data['infos'] = val_lyft_infos |
|
val_info_name = f'{info_prefix}_infos_val' |
|
info_val_path = osp.join(root_path, f'{val_info_name}.pkl') |
|
mmengine.dump(data, info_val_path) |
|
|
|
|
|
def _fill_trainval_infos(lyft, |
|
train_scenes, |
|
val_scenes, |
|
test=False, |
|
max_sweeps=10): |
|
"""Generate the train/val infos from the raw data. |
|
|
|
Args: |
|
lyft (:obj:`LyftDataset`): Dataset class in the Lyft dataset. |
|
train_scenes (list[str]): Basic information of training scenes. |
|
val_scenes (list[str]): Basic information of validation scenes. |
|
test (bool, optional): Whether use the test mode. In the test mode, no |
|
annotations can be accessed. Default: False. |
|
max_sweeps (int, optional): Max number of sweeps. Default: 10. |
|
|
|
Returns: |
|
tuple[list[dict]]: Information of training set and |
|
validation set that will be saved to the info file. |
|
""" |
|
train_lyft_infos = [] |
|
val_lyft_infos = [] |
|
|
|
for sample in mmengine.track_iter_progress(lyft.sample): |
|
lidar_token = sample['data']['LIDAR_TOP'] |
|
sd_rec = lyft.get('sample_data', sample['data']['LIDAR_TOP']) |
|
cs_record = lyft.get('calibrated_sensor', |
|
sd_rec['calibrated_sensor_token']) |
|
pose_record = lyft.get('ego_pose', sd_rec['ego_pose_token']) |
|
abs_lidar_path, boxes, _ = lyft.get_sample_data(lidar_token) |
|
|
|
|
|
abs_lidar_path = str(abs_lidar_path) |
|
lidar_path = abs_lidar_path.split(f'{os.getcwd()}/')[-1] |
|
|
|
|
|
mmengine.check_file_exist(lidar_path) |
|
|
|
info = { |
|
'lidar_path': lidar_path, |
|
'num_features': 5, |
|
'token': sample['token'], |
|
'sweeps': [], |
|
'cams': dict(), |
|
'lidar2ego_translation': cs_record['translation'], |
|
'lidar2ego_rotation': cs_record['rotation'], |
|
'ego2global_translation': pose_record['translation'], |
|
'ego2global_rotation': pose_record['rotation'], |
|
'timestamp': sample['timestamp'], |
|
} |
|
|
|
l2e_r = info['lidar2ego_rotation'] |
|
l2e_t = info['lidar2ego_translation'] |
|
e2g_r = info['ego2global_rotation'] |
|
e2g_t = info['ego2global_translation'] |
|
l2e_r_mat = Quaternion(l2e_r).rotation_matrix |
|
e2g_r_mat = Quaternion(e2g_r).rotation_matrix |
|
|
|
|
|
camera_types = [ |
|
'CAM_FRONT', |
|
'CAM_FRONT_RIGHT', |
|
'CAM_FRONT_LEFT', |
|
'CAM_BACK', |
|
'CAM_BACK_LEFT', |
|
'CAM_BACK_RIGHT', |
|
] |
|
for cam in camera_types: |
|
cam_token = sample['data'][cam] |
|
cam_path, _, cam_intrinsic = lyft.get_sample_data(cam_token) |
|
cam_info = obtain_sensor2top(lyft, cam_token, l2e_t, l2e_r_mat, |
|
e2g_t, e2g_r_mat, cam) |
|
cam_info.update(cam_intrinsic=cam_intrinsic) |
|
info['cams'].update({cam: cam_info}) |
|
|
|
|
|
sd_rec = lyft.get('sample_data', sample['data']['LIDAR_TOP']) |
|
sweeps = [] |
|
while len(sweeps) < max_sweeps: |
|
if not sd_rec['prev'] == '': |
|
sweep = obtain_sensor2top(lyft, sd_rec['prev'], l2e_t, |
|
l2e_r_mat, e2g_t, e2g_r_mat, 'lidar') |
|
sweeps.append(sweep) |
|
sd_rec = lyft.get('sample_data', sd_rec['prev']) |
|
else: |
|
break |
|
info['sweeps'] = sweeps |
|
|
|
if not test: |
|
annotations = [ |
|
lyft.get('sample_annotation', token) |
|
for token in sample['anns'] |
|
] |
|
locs = np.array([b.center for b in boxes]).reshape(-1, 3) |
|
dims = np.array([b.wlh for b in boxes]).reshape(-1, 3) |
|
rots = np.array([b.orientation.yaw_pitch_roll[0] |
|
for b in boxes]).reshape(-1, 1) |
|
|
|
names = [b.name for b in boxes] |
|
for i in range(len(names)): |
|
if names[i] in LyftNameMapping: |
|
names[i] = LyftNameMapping[names[i]] |
|
names = np.array(names) |
|
|
|
|
|
|
|
|
|
gt_boxes = np.concatenate([locs, dims[:, [1, 0, 2]], rots], axis=1) |
|
assert len(gt_boxes) == len( |
|
annotations), f'{len(gt_boxes)}, {len(annotations)}' |
|
info['gt_boxes'] = gt_boxes |
|
info['gt_names'] = names |
|
info['num_lidar_pts'] = np.array( |
|
[a['num_lidar_pts'] for a in annotations]) |
|
info['num_radar_pts'] = np.array( |
|
[a['num_radar_pts'] for a in annotations]) |
|
|
|
if sample['scene_token'] in train_scenes: |
|
train_lyft_infos.append(info) |
|
else: |
|
val_lyft_infos.append(info) |
|
|
|
return train_lyft_infos, val_lyft_infos |
|
|
|
|
|
def export_2d_annotation(root_path, info_path, version): |
|
"""Export 2d annotation from the info file and raw data. |
|
|
|
Args: |
|
root_path (str): Root path of the raw data. |
|
info_path (str): Path of the info file. |
|
version (str): Dataset version. |
|
""" |
|
warning.warn('DeprecationWarning: 2D annotations are not used on the ' |
|
'Lyft dataset. The function export_2d_annotation will be ' |
|
'deprecated.') |
|
|
|
camera_types = [ |
|
'CAM_FRONT', |
|
'CAM_FRONT_RIGHT', |
|
'CAM_FRONT_LEFT', |
|
'CAM_BACK', |
|
'CAM_BACK_LEFT', |
|
'CAM_BACK_RIGHT', |
|
] |
|
lyft_infos = mmengine.load(info_path)['infos'] |
|
lyft = Lyft( |
|
data_path=osp.join(root_path, version), |
|
json_path=osp.join(root_path, version, version), |
|
verbose=True) |
|
|
|
cat2Ids = [ |
|
dict(id=lyft_categories.index(cat_name), name=cat_name) |
|
for cat_name in lyft_categories |
|
] |
|
coco_ann_id = 0 |
|
coco_2d_dict = dict(annotations=[], images=[], categories=cat2Ids) |
|
for info in mmengine.track_iter_progress(lyft_infos): |
|
for cam in camera_types: |
|
cam_info = info['cams'][cam] |
|
coco_infos = get_2d_boxes( |
|
lyft, |
|
cam_info['sample_data_token'], |
|
visibilities=['', '1', '2', '3', '4']) |
|
(height, width, _) = mmcv.imread(cam_info['data_path']).shape |
|
coco_2d_dict['images'].append( |
|
dict( |
|
file_name=cam_info['data_path'], |
|
id=cam_info['sample_data_token'], |
|
width=width, |
|
height=height)) |
|
for coco_info in coco_infos: |
|
if coco_info is None: |
|
continue |
|
|
|
coco_info['segmentation'] = [] |
|
coco_info['id'] = coco_ann_id |
|
coco_2d_dict['annotations'].append(coco_info) |
|
coco_ann_id += 1 |
|
mmengine.dump(coco_2d_dict, f'{info_path[:-4]}.coco.json') |
|
|