|
|
|
from collections import OrderedDict |
|
from concurrent import futures as futures |
|
from os import path as osp |
|
from pathlib import Path |
|
|
|
import mmengine |
|
import numpy as np |
|
from PIL import Image |
|
from skimage import io |
|
|
|
|
|
def get_image_index_str(img_idx, use_prefix_id=False): |
|
if use_prefix_id: |
|
return '{:07d}'.format(img_idx) |
|
else: |
|
return '{:06d}'.format(img_idx) |
|
|
|
|
|
def get_kitti_info_path(idx, |
|
prefix, |
|
info_type='image_2', |
|
file_tail='.png', |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
use_prefix_id=False): |
|
img_idx_str = get_image_index_str(idx, use_prefix_id) |
|
img_idx_str += file_tail |
|
prefix = Path(prefix) |
|
if training: |
|
file_path = Path('training') / info_type / img_idx_str |
|
else: |
|
file_path = Path('testing') / info_type / img_idx_str |
|
if exist_check and not (prefix / file_path).exists(): |
|
raise ValueError('file not exist: {}'.format(file_path)) |
|
if relative_path: |
|
return str(file_path) |
|
else: |
|
return str(prefix / file_path) |
|
|
|
|
|
def get_image_path(idx, |
|
prefix, |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
info_type='image_2', |
|
file_tail='.png', |
|
use_prefix_id=False): |
|
return get_kitti_info_path(idx, prefix, info_type, file_tail, training, |
|
relative_path, exist_check, use_prefix_id) |
|
|
|
|
|
def get_label_path(idx, |
|
prefix, |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
info_type='label_2', |
|
use_prefix_id=False): |
|
return get_kitti_info_path(idx, prefix, info_type, '.txt', training, |
|
relative_path, exist_check, use_prefix_id) |
|
|
|
|
|
def get_plane_path(idx, |
|
prefix, |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
info_type='planes', |
|
use_prefix_id=False): |
|
return get_kitti_info_path(idx, prefix, info_type, '.txt', training, |
|
relative_path, exist_check, use_prefix_id) |
|
|
|
|
|
def get_velodyne_path(idx, |
|
prefix, |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
use_prefix_id=False): |
|
return get_kitti_info_path(idx, prefix, 'velodyne', '.bin', training, |
|
relative_path, exist_check, use_prefix_id) |
|
|
|
|
|
def get_calib_path(idx, |
|
prefix, |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
use_prefix_id=False): |
|
return get_kitti_info_path(idx, prefix, 'calib', '.txt', training, |
|
relative_path, exist_check, use_prefix_id) |
|
|
|
|
|
def get_pose_path(idx, |
|
prefix, |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
use_prefix_id=False): |
|
return get_kitti_info_path(idx, prefix, 'pose', '.txt', training, |
|
relative_path, exist_check, use_prefix_id) |
|
|
|
|
|
def get_timestamp_path(idx, |
|
prefix, |
|
training=True, |
|
relative_path=True, |
|
exist_check=True, |
|
use_prefix_id=False): |
|
return get_kitti_info_path(idx, prefix, 'timestamp', '.txt', training, |
|
relative_path, exist_check, use_prefix_id) |
|
|
|
|
|
def get_label_anno(label_path): |
|
annotations = {} |
|
annotations.update({ |
|
'name': [], |
|
'truncated': [], |
|
'occluded': [], |
|
'alpha': [], |
|
'bbox': [], |
|
'dimensions': [], |
|
'location': [], |
|
'rotation_y': [] |
|
}) |
|
with open(label_path, 'r') as f: |
|
lines = f.readlines() |
|
|
|
|
|
|
|
content = [line.strip().split(' ') for line in lines] |
|
num_objects = len([x[0] for x in content if x[0] != 'DontCare']) |
|
annotations['name'] = np.array([x[0] for x in content]) |
|
num_gt = len(annotations['name']) |
|
annotations['truncated'] = np.array([float(x[1]) for x in content]) |
|
annotations['occluded'] = np.array([int(x[2]) for x in content]) |
|
annotations['alpha'] = np.array([float(x[3]) for x in content]) |
|
annotations['bbox'] = np.array([[float(info) for info in x[4:8]] |
|
for x in content]).reshape(-1, 4) |
|
|
|
annotations['dimensions'] = np.array([[float(info) for info in x[8:11]] |
|
for x in content |
|
]).reshape(-1, 3)[:, [2, 0, 1]] |
|
annotations['location'] = np.array([[float(info) for info in x[11:14]] |
|
for x in content]).reshape(-1, 3) |
|
annotations['rotation_y'] = np.array([float(x[14]) |
|
for x in content]).reshape(-1) |
|
if len(content) != 0 and len(content[0]) == 16: |
|
annotations['score'] = np.array([float(x[15]) for x in content]) |
|
else: |
|
annotations['score'] = np.zeros((annotations['bbox'].shape[0], )) |
|
index = list(range(num_objects)) + [-1] * (num_gt - num_objects) |
|
annotations['index'] = np.array(index, dtype=np.int32) |
|
annotations['group_ids'] = np.arange(num_gt, dtype=np.int32) |
|
return annotations |
|
|
|
|
|
def _extend_matrix(mat): |
|
mat = np.concatenate([mat, np.array([[0., 0., 0., 1.]])], axis=0) |
|
return mat |
|
|
|
|
|
def get_kitti_image_info(path, |
|
training=True, |
|
label_info=True, |
|
velodyne=False, |
|
calib=False, |
|
with_plane=False, |
|
image_ids=7481, |
|
extend_matrix=True, |
|
num_worker=8, |
|
relative_path=True, |
|
with_imageshape=True): |
|
""" |
|
KITTI annotation format version 2: |
|
{ |
|
[optional]points: [N, 3+] point cloud |
|
[optional, for kitti]image: { |
|
image_idx: ... |
|
image_path: ... |
|
image_shape: ... |
|
} |
|
point_cloud: { |
|
num_features: 4 |
|
velodyne_path: ... |
|
} |
|
[optional, for kitti]calib: { |
|
R0_rect: ... |
|
Tr_velo_to_cam: ... |
|
P2: ... |
|
} |
|
annos: { |
|
location: [num_gt, 3] array |
|
dimensions: [num_gt, 3] array |
|
rotation_y: [num_gt] angle array |
|
name: [num_gt] ground truth name array |
|
[optional]difficulty: kitti difficulty |
|
[optional]group_ids: used for multi-part object |
|
} |
|
} |
|
""" |
|
root_path = Path(path) |
|
if not isinstance(image_ids, list): |
|
image_ids = list(range(image_ids)) |
|
|
|
def map_func(idx): |
|
info = {} |
|
pc_info = {'num_features': 4} |
|
calib_info = {} |
|
|
|
image_info = {'image_idx': idx} |
|
annotations = None |
|
if velodyne: |
|
pc_info['velodyne_path'] = get_velodyne_path( |
|
idx, path, training, relative_path) |
|
image_info['image_path'] = get_image_path(idx, path, training, |
|
relative_path) |
|
if with_imageshape: |
|
img_path = image_info['image_path'] |
|
if relative_path: |
|
img_path = str(root_path / img_path) |
|
image_info['image_shape'] = np.array( |
|
io.imread(img_path).shape[:2], dtype=np.int32) |
|
if label_info: |
|
label_path = get_label_path(idx, path, training, relative_path) |
|
if relative_path: |
|
label_path = str(root_path / label_path) |
|
annotations = get_label_anno(label_path) |
|
info['image'] = image_info |
|
info['point_cloud'] = pc_info |
|
if calib: |
|
calib_path = get_calib_path( |
|
idx, path, training, relative_path=False) |
|
with open(calib_path, 'r') as f: |
|
lines = f.readlines() |
|
P0 = np.array([float(info) for info in lines[0].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
P1 = np.array([float(info) for info in lines[1].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
P2 = np.array([float(info) for info in lines[2].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
P3 = np.array([float(info) for info in lines[3].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
if extend_matrix: |
|
P0 = _extend_matrix(P0) |
|
P1 = _extend_matrix(P1) |
|
P2 = _extend_matrix(P2) |
|
P3 = _extend_matrix(P3) |
|
R0_rect = np.array([ |
|
float(info) for info in lines[4].split(' ')[1:10] |
|
]).reshape([3, 3]) |
|
if extend_matrix: |
|
rect_4x4 = np.zeros([4, 4], dtype=R0_rect.dtype) |
|
rect_4x4[3, 3] = 1. |
|
rect_4x4[:3, :3] = R0_rect |
|
else: |
|
rect_4x4 = R0_rect |
|
|
|
Tr_velo_to_cam = np.array([ |
|
float(info) for info in lines[5].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
Tr_imu_to_velo = np.array([ |
|
float(info) for info in lines[6].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
if extend_matrix: |
|
Tr_velo_to_cam = _extend_matrix(Tr_velo_to_cam) |
|
Tr_imu_to_velo = _extend_matrix(Tr_imu_to_velo) |
|
calib_info['P0'] = P0 |
|
calib_info['P1'] = P1 |
|
calib_info['P2'] = P2 |
|
calib_info['P3'] = P3 |
|
calib_info['R0_rect'] = rect_4x4 |
|
calib_info['Tr_velo_to_cam'] = Tr_velo_to_cam |
|
calib_info['Tr_imu_to_velo'] = Tr_imu_to_velo |
|
info['calib'] = calib_info |
|
|
|
if with_plane: |
|
plane_path = get_plane_path(idx, path, training, relative_path) |
|
if relative_path: |
|
plane_path = str(root_path / plane_path) |
|
lines = mmengine.list_from_file(plane_path) |
|
info['plane'] = np.array([float(i) for i in lines[3].split()]) |
|
|
|
if annotations is not None: |
|
info['annos'] = annotations |
|
add_difficulty_to_annos(info) |
|
return info |
|
|
|
with futures.ThreadPoolExecutor(num_worker) as executor: |
|
image_infos = executor.map(map_func, image_ids) |
|
|
|
return list(image_infos) |
|
|
|
|
|
class WaymoInfoGatherer: |
|
""" |
|
Parallel version of waymo dataset information gathering. |
|
Waymo annotation format version like KITTI: |
|
{ |
|
[optional]points: [N, 3+] point cloud |
|
[optional, for kitti]image: { |
|
image_idx: ... |
|
image_path: ... |
|
image_shape: ... |
|
} |
|
point_cloud: { |
|
num_features: 6 |
|
velodyne_path: ... |
|
} |
|
[optional, for kitti]calib: { |
|
R0_rect: ... |
|
Tr_velo_to_cam0: ... |
|
P0: ... |
|
} |
|
annos: { |
|
location: [num_gt, 3] array |
|
dimensions: [num_gt, 3] array |
|
rotation_y: [num_gt] angle array |
|
name: [num_gt] ground truth name array |
|
[optional]difficulty: kitti difficulty |
|
[optional]group_ids: used for multi-part object |
|
} |
|
} |
|
""" |
|
|
|
def __init__(self, |
|
path, |
|
training=True, |
|
label_info=True, |
|
velodyne=False, |
|
calib=False, |
|
pose=False, |
|
extend_matrix=True, |
|
num_worker=8, |
|
relative_path=True, |
|
with_imageshape=True, |
|
max_sweeps=5) -> None: |
|
self.path = path |
|
self.training = training |
|
self.label_info = label_info |
|
self.velodyne = velodyne |
|
self.calib = calib |
|
self.pose = pose |
|
self.extend_matrix = extend_matrix |
|
self.num_worker = num_worker |
|
self.relative_path = relative_path |
|
self.with_imageshape = with_imageshape |
|
self.max_sweeps = max_sweeps |
|
|
|
def gather_single(self, idx): |
|
root_path = Path(self.path) |
|
info = {} |
|
pc_info = {'num_features': 6} |
|
calib_info = {} |
|
|
|
image_info = {'image_idx': idx} |
|
annotations = None |
|
if self.velodyne: |
|
pc_info['velodyne_path'] = get_velodyne_path( |
|
idx, |
|
self.path, |
|
self.training, |
|
self.relative_path, |
|
use_prefix_id=True) |
|
with open( |
|
get_timestamp_path( |
|
idx, |
|
self.path, |
|
self.training, |
|
relative_path=False, |
|
use_prefix_id=True)) as f: |
|
info['timestamp'] = np.int64(f.read()) |
|
image_info['image_path'] = get_image_path( |
|
idx, |
|
self.path, |
|
self.training, |
|
self.relative_path, |
|
info_type='image_0', |
|
file_tail='.jpg', |
|
use_prefix_id=True) |
|
if self.with_imageshape: |
|
img_path = image_info['image_path'] |
|
if self.relative_path: |
|
img_path = str(root_path / img_path) |
|
|
|
w, h = Image.open(img_path).size |
|
image_info['image_shape'] = np.array((h, w), dtype=np.int32) |
|
if self.label_info: |
|
label_path = get_label_path( |
|
idx, |
|
self.path, |
|
self.training, |
|
self.relative_path, |
|
info_type='label_all', |
|
use_prefix_id=True) |
|
cam_sync_label_path = get_label_path( |
|
idx, |
|
self.path, |
|
self.training, |
|
self.relative_path, |
|
info_type='cam_sync_label_all', |
|
use_prefix_id=True) |
|
if self.relative_path: |
|
label_path = str(root_path / label_path) |
|
cam_sync_label_path = str(root_path / cam_sync_label_path) |
|
annotations = get_label_anno(label_path) |
|
cam_sync_annotations = get_label_anno(cam_sync_label_path) |
|
info['image'] = image_info |
|
info['point_cloud'] = pc_info |
|
if self.calib: |
|
calib_path = get_calib_path( |
|
idx, |
|
self.path, |
|
self.training, |
|
relative_path=False, |
|
use_prefix_id=True) |
|
with open(calib_path, 'r') as f: |
|
lines = f.readlines() |
|
P0 = np.array([float(info) for info in lines[0].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
P1 = np.array([float(info) for info in lines[1].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
P2 = np.array([float(info) for info in lines[2].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
P3 = np.array([float(info) for info in lines[3].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
P4 = np.array([float(info) for info in lines[4].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
if self.extend_matrix: |
|
P0 = _extend_matrix(P0) |
|
P1 = _extend_matrix(P1) |
|
P2 = _extend_matrix(P2) |
|
P3 = _extend_matrix(P3) |
|
P4 = _extend_matrix(P4) |
|
R0_rect = np.array([ |
|
float(info) for info in lines[5].split(' ')[1:10] |
|
]).reshape([3, 3]) |
|
if self.extend_matrix: |
|
rect_4x4 = np.zeros([4, 4], dtype=R0_rect.dtype) |
|
rect_4x4[3, 3] = 1. |
|
rect_4x4[:3, :3] = R0_rect |
|
else: |
|
rect_4x4 = R0_rect |
|
|
|
|
|
Tr_velo_to_cam = np.array([ |
|
float(info) for info in lines[6].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
Tr_velo_to_cam1 = np.array([ |
|
float(info) for info in lines[7].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
Tr_velo_to_cam2 = np.array([ |
|
float(info) for info in lines[8].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
Tr_velo_to_cam3 = np.array([ |
|
float(info) for info in lines[9].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
Tr_velo_to_cam4 = np.array([ |
|
float(info) for info in lines[10].split(' ')[1:13] |
|
]).reshape([3, 4]) |
|
if self.extend_matrix: |
|
Tr_velo_to_cam = _extend_matrix(Tr_velo_to_cam) |
|
Tr_velo_to_cam1 = _extend_matrix(Tr_velo_to_cam1) |
|
Tr_velo_to_cam2 = _extend_matrix(Tr_velo_to_cam2) |
|
Tr_velo_to_cam3 = _extend_matrix(Tr_velo_to_cam3) |
|
Tr_velo_to_cam4 = _extend_matrix(Tr_velo_to_cam4) |
|
calib_info['P0'] = P0 |
|
calib_info['P1'] = P1 |
|
calib_info['P2'] = P2 |
|
calib_info['P3'] = P3 |
|
calib_info['P4'] = P4 |
|
calib_info['R0_rect'] = rect_4x4 |
|
calib_info['Tr_velo_to_cam'] = Tr_velo_to_cam |
|
calib_info['Tr_velo_to_cam1'] = Tr_velo_to_cam1 |
|
calib_info['Tr_velo_to_cam2'] = Tr_velo_to_cam2 |
|
calib_info['Tr_velo_to_cam3'] = Tr_velo_to_cam3 |
|
calib_info['Tr_velo_to_cam4'] = Tr_velo_to_cam4 |
|
info['calib'] = calib_info |
|
|
|
if self.pose: |
|
pose_path = get_pose_path( |
|
idx, |
|
self.path, |
|
self.training, |
|
relative_path=False, |
|
use_prefix_id=True) |
|
info['pose'] = np.loadtxt(pose_path) |
|
|
|
if annotations is not None: |
|
info['annos'] = annotations |
|
info['annos']['camera_id'] = info['annos'].pop('score') |
|
add_difficulty_to_annos(info) |
|
info['cam_sync_annos'] = cam_sync_annotations |
|
|
|
|
|
|
|
|
|
info['cam_sync_annos']['camera_id'] = info['cam_sync_annos'].pop( |
|
'score') |
|
|
|
sweeps = [] |
|
prev_idx = idx |
|
while len(sweeps) < self.max_sweeps: |
|
prev_info = {} |
|
prev_idx -= 1 |
|
prev_info['velodyne_path'] = get_velodyne_path( |
|
prev_idx, |
|
self.path, |
|
self.training, |
|
self.relative_path, |
|
exist_check=False, |
|
use_prefix_id=True) |
|
if_prev_exists = osp.exists( |
|
Path(self.path) / prev_info['velodyne_path']) |
|
if if_prev_exists: |
|
with open( |
|
get_timestamp_path( |
|
prev_idx, |
|
self.path, |
|
self.training, |
|
relative_path=False, |
|
use_prefix_id=True)) as f: |
|
prev_info['timestamp'] = np.int64(f.read()) |
|
prev_info['image_path'] = get_image_path( |
|
prev_idx, |
|
self.path, |
|
self.training, |
|
self.relative_path, |
|
info_type='image_0', |
|
file_tail='.jpg', |
|
use_prefix_id=True) |
|
prev_pose_path = get_pose_path( |
|
prev_idx, |
|
self.path, |
|
self.training, |
|
relative_path=False, |
|
use_prefix_id=True) |
|
prev_info['pose'] = np.loadtxt(prev_pose_path) |
|
sweeps.append(prev_info) |
|
else: |
|
break |
|
info['sweeps'] = sweeps |
|
|
|
return info |
|
|
|
def gather(self, image_ids): |
|
if not isinstance(image_ids, list): |
|
image_ids = list(range(image_ids)) |
|
image_infos = mmengine.track_parallel_progress(self.gather_single, |
|
image_ids, |
|
self.num_worker) |
|
return list(image_infos) |
|
|
|
|
|
def kitti_anno_to_label_file(annos, folder): |
|
folder = Path(folder) |
|
for anno in annos: |
|
image_idx = anno['metadata']['image_idx'] |
|
label_lines = [] |
|
for j in range(anno['bbox'].shape[0]): |
|
label_dict = { |
|
'name': anno['name'][j], |
|
'alpha': anno['alpha'][j], |
|
'bbox': anno['bbox'][j], |
|
'location': anno['location'][j], |
|
'dimensions': anno['dimensions'][j], |
|
'rotation_y': anno['rotation_y'][j], |
|
'score': anno['score'][j], |
|
} |
|
label_line = kitti_result_line(label_dict) |
|
label_lines.append(label_line) |
|
label_file = folder / f'{get_image_index_str(image_idx)}.txt' |
|
label_str = '\n'.join(label_lines) |
|
with open(label_file, 'w') as f: |
|
f.write(label_str) |
|
|
|
|
|
def add_difficulty_to_annos(info): |
|
min_height = [40, 25, |
|
25] |
|
max_occlusion = [ |
|
0, 1, 2 |
|
] |
|
max_trunc = [ |
|
0.15, 0.3, 0.5 |
|
] |
|
annos = info['annos'] |
|
dims = annos['dimensions'] |
|
bbox = annos['bbox'] |
|
height = bbox[:, 3] - bbox[:, 1] |
|
occlusion = annos['occluded'] |
|
truncation = annos['truncated'] |
|
diff = [] |
|
easy_mask = np.ones((len(dims), ), dtype=bool) |
|
moderate_mask = np.ones((len(dims), ), dtype=bool) |
|
hard_mask = np.ones((len(dims), ), dtype=bool) |
|
i = 0 |
|
for h, o, t in zip(height, occlusion, truncation): |
|
if o > max_occlusion[0] or h <= min_height[0] or t > max_trunc[0]: |
|
easy_mask[i] = False |
|
if o > max_occlusion[1] or h <= min_height[1] or t > max_trunc[1]: |
|
moderate_mask[i] = False |
|
if o > max_occlusion[2] or h <= min_height[2] or t > max_trunc[2]: |
|
hard_mask[i] = False |
|
i += 1 |
|
is_easy = easy_mask |
|
is_moderate = np.logical_xor(easy_mask, moderate_mask) |
|
is_hard = np.logical_xor(hard_mask, moderate_mask) |
|
|
|
for i in range(len(dims)): |
|
if is_easy[i]: |
|
diff.append(0) |
|
elif is_moderate[i]: |
|
diff.append(1) |
|
elif is_hard[i]: |
|
diff.append(2) |
|
else: |
|
diff.append(-1) |
|
annos['difficulty'] = np.array(diff, np.int32) |
|
return diff |
|
|
|
|
|
def kitti_result_line(result_dict, precision=4): |
|
prec_float = '{' + ':.{}f'.format(precision) + '}' |
|
res_line = [] |
|
all_field_default = OrderedDict([ |
|
('name', None), |
|
('truncated', -1), |
|
('occluded', -1), |
|
('alpha', -10), |
|
('bbox', None), |
|
('dimensions', [-1, -1, -1]), |
|
('location', [-1000, -1000, -1000]), |
|
('rotation_y', -10), |
|
('score', 0.0), |
|
]) |
|
res_dict = [(key, None) for key, val in all_field_default.items()] |
|
res_dict = OrderedDict(res_dict) |
|
for key, val in result_dict.items(): |
|
if all_field_default[key] is None and val is None: |
|
raise ValueError('you must specify a value for {}'.format(key)) |
|
res_dict[key] = val |
|
|
|
for key, val in res_dict.items(): |
|
if key == 'name': |
|
res_line.append(val) |
|
elif key in ['truncated', 'alpha', 'rotation_y', 'score']: |
|
if val is None: |
|
res_line.append(str(all_field_default[key])) |
|
else: |
|
res_line.append(prec_float.format(val)) |
|
elif key == 'occluded': |
|
if val is None: |
|
res_line.append(str(all_field_default[key])) |
|
else: |
|
res_line.append('{}'.format(val)) |
|
elif key in ['bbox', 'dimensions', 'location']: |
|
if val is None: |
|
res_line += [str(v) for v in all_field_default[key]] |
|
else: |
|
res_line += [prec_float.format(v) for v in val] |
|
else: |
|
raise ValueError('unknown key. supported key:{}'.format( |
|
res_dict.keys())) |
|
return ' '.join(res_line) |
|
|