import os |
from logging import warning |
from os import path as osp |
import mmcv |
import mmengine |
import numpy as np |
from lyft_dataset_sdk.lyftdataset import LyftDataset as Lyft |
from pyquaternion import Quaternion |
from mmdet3d.datasets.convert_utils import LyftNameMapping |
from .nuscenes_converter import (get_2d_boxes, get_available_scenes, |
obtain_sensor2top) |
lyft_categories = ('car', 'truck', 'bus', 'emergency_vehicle', 'other_vehicle', |
'motorcycle', 'bicycle', 'pedestrian', 'animal') |
def create_lyft_infos(root_path, |
info_prefix, |
version='v1.01-train', |
max_sweeps=10): |
"""Create info file of lyft dataset. |
Given the raw data, generate its related info file in pkl format. |
Args: |
root_path (str): Path of the data root. |
info_prefix (str): Prefix of the info file to be generated. |
version (str, optional): Version of the data. |
Default: 'v1.01-train'. |
max_sweeps (int, optional): Max number of sweeps. |
Default: 10. |
""" |
lyft = Lyft( |
data_path=osp.join(root_path, version), |
json_path=osp.join(root_path, version, version), |
verbose=True) |
available_vers = ['v1.01-train', 'v1.01-test'] |
assert version in available_vers |
if version == 'v1.01-train': |
train_scenes = mmengine.list_from_file('data/lyft/train.txt') |
val_scenes = mmengine.list_from_file('data/lyft/val.txt') |
elif version == 'v1.01-test': |
train_scenes = mmengine.list_from_file('data/lyft/test.txt') |
val_scenes = [] |
else: |
raise ValueError('unknown') |
available_scenes = get_available_scenes(lyft) |
available_scene_names = [s['name'] for s in available_scenes] |
train_scenes = list( |
filter(lambda x: x in available_scene_names, train_scenes)) |
val_scenes = list(filter(lambda x: x in available_scene_names, val_scenes)) |
train_scenes = set([ |
available_scenes[available_scene_names.index(s)]['token'] |
for s in train_scenes |
]) |
val_scenes = set([ |
available_scenes[available_scene_names.index(s)]['token'] |
for s in val_scenes |
]) |
test = 'test' in version |
if test: |
print(f'test scene: {len(train_scenes)}') |
else: |
print(f'train scene: {len(train_scenes)}, \ |
val scene: {len(val_scenes)}') |
train_lyft_infos, val_lyft_infos = _fill_trainval_infos( |
lyft, train_scenes, val_scenes, test, max_sweeps=max_sweeps) |
metadata = dict(version=version) |
if test: |
print(f'test sample: {len(train_lyft_infos)}') |
data = dict(infos=train_lyft_infos, metadata=metadata) |
info_name = f'{info_prefix}_infos_test' |
info_path = osp.join(root_path, f'{info_name}.pkl') |
mmengine.dump(data, info_path) |
else: |
print(f'train sample: {len(train_lyft_infos)}, \ |
val sample: {len(val_lyft_infos)}') |
data = dict(infos=train_lyft_infos, metadata=metadata) |
train_info_name = f'{info_prefix}_infos_train' |
info_path = osp.join(root_path, f'{train_info_name}.pkl') |
mmengine.dump(data, info_path) |
data['infos'] = val_lyft_infos |
val_info_name = f'{info_prefix}_infos_val' |
info_val_path = osp.join(root_path, f'{val_info_name}.pkl') |
mmengine.dump(data, info_val_path) |
def _fill_trainval_infos(lyft, |
train_scenes, |
val_scenes, |
test=False, |
max_sweeps=10): |
"""Generate the train/val infos from the raw data. |
Args: |
lyft (:obj:`LyftDataset`): Dataset class in the Lyft dataset. |
train_scenes (list[str]): Basic information of training scenes. |
val_scenes (list[str]): Basic information of validation scenes. |
test (bool, optional): Whether use the test mode. In the test mode, no |
annotations can be accessed. Default: False. |
max_sweeps (int, optional): Max number of sweeps. Default: 10. |
Returns: |
tuple[list[dict]]: Information of training set and |
validation set that will be saved to the info file. |
""" |
train_lyft_infos = [] |
val_lyft_infos = [] |
for sample in mmengine.track_iter_progress(lyft.sample): |
lidar_token = sample['data']['LIDAR_TOP'] |
sd_rec = lyft.get('sample_data', sample['data']['LIDAR_TOP']) |
cs_record = lyft.get('calibrated_sensor', |
sd_rec['calibrated_sensor_token']) |
pose_record = lyft.get('ego_pose', sd_rec['ego_pose_token']) |
abs_lidar_path, boxes, _ = lyft.get_sample_data(lidar_token) |
abs_lidar_path = str(abs_lidar_path) |
lidar_path = abs_lidar_path.split(f'{os.getcwd()}/')[-1] |
mmengine.check_file_exist(lidar_path) |
info = { |
'lidar_path': lidar_path, |
'num_features': 5, |
'token': sample['token'], |
'sweeps': [], |
'cams': dict(), |
'lidar2ego_translation': cs_record['translation'], |
'lidar2ego_rotation': cs_record['rotation'], |
'ego2global_translation': pose_record['translation'], |
'ego2global_rotation': pose_record['rotation'], |
'timestamp': sample['timestamp'], |
} |
l2e_r = info['lidar2ego_rotation'] |
l2e_t = info['lidar2ego_translation'] |
e2g_r = info['ego2global_rotation'] |
e2g_t = info['ego2global_translation'] |
l2e_r_mat = Quaternion(l2e_r).rotation_matrix |
e2g_r_mat = Quaternion(e2g_r).rotation_matrix |
camera_types = [ |
] |
for cam in camera_types: |
cam_token = sample['data'][cam] |
cam_path, _, cam_intrinsic = lyft.get_sample_data(cam_token) |
cam_info = obtain_sensor2top(lyft, cam_token, l2e_t, l2e_r_mat, |
e2g_t, e2g_r_mat, cam) |
cam_info.update(cam_intrinsic=cam_intrinsic) |
info['cams'].update({cam: cam_info}) |
sd_rec = lyft.get('sample_data', sample['data']['LIDAR_TOP']) |
sweeps = [] |
while len(sweeps) < max_sweeps: |
if not sd_rec['prev'] == '': |
sweep = obtain_sensor2top(lyft, sd_rec['prev'], l2e_t, |
l2e_r_mat, e2g_t, e2g_r_mat, 'lidar') |
sweeps.append(sweep) |
sd_rec = lyft.get('sample_data', sd_rec['prev']) |
else: |
break |
info['sweeps'] = sweeps |
if not test: |
annotations = [ |
lyft.get('sample_annotation', token) |
for token in sample['anns'] |
] |
locs = np.array([b.center for b in boxes]).reshape(-1, 3) |
dims = np.array([b.wlh for b in boxes]).reshape(-1, 3) |
rots = np.array([b.orientation.yaw_pitch_roll[0] |
for b in boxes]).reshape(-1, 1) |
names = [b.name for b in boxes] |
for i in range(len(names)): |
if names[i] in LyftNameMapping: |
names[i] = LyftNameMapping[names[i]] |
names = np.array(names) |
gt_boxes = np.concatenate([locs, dims[:, [1, 0, 2]], rots], axis=1) |
assert len(gt_boxes) == len( |
annotations), f'{len(gt_boxes)}, {len(annotations)}' |
info['gt_boxes'] = gt_boxes |
info['gt_names'] = names |
info['num_lidar_pts'] = np.array( |
[a['num_lidar_pts'] for a in annotations]) |
info['num_radar_pts'] = np.array( |
[a['num_radar_pts'] for a in annotations]) |
if sample['scene_token'] in train_scenes: |
train_lyft_infos.append(info) |
else: |
val_lyft_infos.append(info) |
return train_lyft_infos, val_lyft_infos |
def export_2d_annotation(root_path, info_path, version): |
"""Export 2d annotation from the info file and raw data. |
Args: |
root_path (str): Root path of the raw data. |
info_path (str): Path of the info file. |
version (str): Dataset version. |
""" |
warning.warn('DeprecationWarning: 2D annotations are not used on the ' |
'Lyft dataset. The function export_2d_annotation will be ' |
'deprecated.') |
camera_types = [ |
] |
lyft_infos = mmengine.load(info_path)['infos'] |
lyft = Lyft( |
data_path=osp.join(root_path, version), |
json_path=osp.join(root_path, version, version), |
verbose=True) |
cat2Ids = [ |
dict(id=lyft_categories.index(cat_name), name=cat_name) |
for cat_name in lyft_categories |
] |
coco_ann_id = 0 |
coco_2d_dict = dict(annotations=[], images=[], categories=cat2Ids) |
for info in mmengine.track_iter_progress(lyft_infos): |
for cam in camera_types: |
cam_info = info['cams'][cam] |
coco_infos = get_2d_boxes( |
lyft, |
cam_info['sample_data_token'], |
visibilities=['', '1', '2', '3', '4']) |
(height, width, _) = mmcv.imread(cam_info['data_path']).shape |
coco_2d_dict['images'].append( |
dict( |
file_name=cam_info['data_path'], |
id=cam_info['sample_data_token'], |
width=width, |
height=height)) |
for coco_info in coco_infos: |
if coco_info is None: |
continue |
coco_info['segmentation'] = [] |
coco_info['id'] = coco_ann_id |
coco_2d_dict['annotations'].append(coco_info) |
coco_ann_id += 1 |
mmengine.dump(coco_2d_dict, f'{info_path[:-4]}.coco.json') |