Spaces:
Running
on
L40S
Running
on
L40S
# Copyright (c) OpenMMLab. All rights reserved. | |
import os | |
import os.path as osp | |
import tempfile | |
import cv2 | |
import numpy as np | |
import pytest | |
from numpy.testing import assert_array_almost_equal, assert_array_equal | |
import mmcv | |
def test_flowread(): | |
data_dir = osp.join(osp.dirname(__file__), '../data') | |
flow_shape = (60, 80, 2) | |
# read .flo file | |
flow = mmcv.flowread(osp.join(data_dir, 'optflow.flo')) | |
assert flow.shape == flow_shape | |
# pseudo read | |
flow_same = mmcv.flowread(flow) | |
assert_array_equal(flow, flow_same) | |
# read quantized flow concatenated vertically | |
flow = mmcv.flowread( | |
osp.join(data_dir, 'optflow_concat0.jpg'), quantize=True, denorm=True) | |
assert flow.shape == flow_shape | |
# read quantized flow concatenated horizontally | |
flow = mmcv.flowread( | |
osp.join(data_dir, 'optflow_concat1.jpg'), | |
quantize=True, | |
concat_axis=1, | |
denorm=True) | |
assert flow.shape == flow_shape | |
# test exceptions | |
notflow_file = osp.join(data_dir, 'color.jpg') | |
with pytest.raises(TypeError): | |
mmcv.flowread(1) | |
with pytest.raises(IOError): | |
mmcv.flowread(notflow_file) | |
with pytest.raises(IOError): | |
mmcv.flowread(notflow_file, quantize=True) | |
with pytest.raises(ValueError): | |
mmcv.flowread(np.zeros((100, 100, 1))) | |
def test_flowwrite(): | |
flow = np.random.rand(100, 100, 2).astype(np.float32) | |
# write to a .flo file | |
tmp_filehandler, filename = tempfile.mkstemp() | |
mmcv.flowwrite(flow, filename) | |
flow_from_file = mmcv.flowread(filename) | |
assert_array_equal(flow, flow_from_file) | |
os.close(tmp_filehandler) | |
os.remove(filename) | |
# write to two .jpg files | |
tmp_filename = osp.join(tempfile.gettempdir(), 'mmcv_test_flow.jpg') | |
for concat_axis in range(2): | |
mmcv.flowwrite( | |
flow, tmp_filename, quantize=True, concat_axis=concat_axis) | |
shape = (200, 100) if concat_axis == 0 else (100, 200) | |
assert osp.isfile(tmp_filename) | |
assert mmcv.imread(tmp_filename, flag='unchanged').shape == shape | |
os.remove(tmp_filename) | |
# test exceptions | |
with pytest.raises(AssertionError): | |
mmcv.flowwrite(flow, tmp_filename, quantize=True, concat_axis=2) | |
def test_quantize_flow(): | |
flow = (np.random.rand(10, 8, 2).astype(np.float32) - 0.5) * 15 | |
max_val = 5.0 | |
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=False) | |
ref = np.zeros_like(flow, dtype=np.uint8) | |
for i in range(ref.shape[0]): | |
for j in range(ref.shape[1]): | |
for k in range(ref.shape[2]): | |
val = flow[i, j, k] + max_val | |
val = min(max(val, 0), 2 * max_val) | |
ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254) | |
assert_array_equal(dx, ref[..., 0]) | |
assert_array_equal(dy, ref[..., 1]) | |
max_val = 0.5 | |
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=True) | |
ref = np.zeros_like(flow, dtype=np.uint8) | |
for i in range(ref.shape[0]): | |
for j in range(ref.shape[1]): | |
for k in range(ref.shape[2]): | |
scale = flow.shape[1] if k == 0 else flow.shape[0] | |
val = flow[i, j, k] / scale + max_val | |
val = min(max(val, 0), 2 * max_val) | |
ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254) | |
assert_array_equal(dx, ref[..., 0]) | |
assert_array_equal(dy, ref[..., 1]) | |
def test_dequantize_flow(): | |
dx = np.random.randint(256, size=(10, 8), dtype=np.uint8) | |
dy = np.random.randint(256, size=(10, 8), dtype=np.uint8) | |
max_val = 5.0 | |
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=False) | |
ref = np.zeros_like(flow, dtype=np.float32) | |
for i in range(ref.shape[0]): | |
for j in range(ref.shape[1]): | |
ref[i, j, 0] = float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val | |
ref[i, j, 1] = float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val | |
assert_array_almost_equal(flow, ref) | |
max_val = 0.5 | |
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=True) | |
h, w = dx.shape | |
ref = np.zeros_like(flow, dtype=np.float32) | |
for i in range(ref.shape[0]): | |
for j in range(ref.shape[1]): | |
ref[i, j, | |
0] = (float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val) * w | |
ref[i, j, | |
1] = (float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val) * h | |
assert_array_almost_equal(flow, ref) | |
def test_flow2rgb(): | |
flow = np.array([[[0, 0], [0.5, 0.5], [1, 1], [2, 1], [3, np.inf]]], | |
dtype=np.float32) | |
flow_img = mmcv.flow2rgb(flow) | |
# yapf: disable | |
assert_array_almost_equal( | |
flow_img, | |
np.array([[[1., 1., 1.], | |
[1., 0.826074731, 0.683772236], | |
[1., 0.652149462, 0.367544472], | |
[1., 0.265650552, 5.96046448e-08], | |
[0., 0., 0.]]], | |
dtype=np.float32)) | |
# yapf: enable | |
def test_flow_warp(): | |
img = np.zeros((5, 5, 3)) | |
img[2, 2, 0] = 1 | |
flow = np.ones((5, 5, 2)) | |
res_nn = mmcv.flow_warp(img, flow, interpolate_mode='nearest') | |
res_bi = mmcv.flow_warp(img, flow, interpolate_mode='bilinear') | |
assert_array_almost_equal(res_nn, res_bi, decimal=5) | |
img = np.zeros((5, 5, 1)) | |
img[2, 2, 0] = 1 | |
img[2, 3, 0] = 0.75 | |
flow = np.zeros((5, 5, 2)) | |
flow[2, 2, :] = [0.5, 0.7] | |
res_ = np.copy(img) | |
res_[2, 2] = 0.5 * 0.3 + 0.75 * 0.5 * 0.3 | |
res_bi = mmcv.flow_warp(img, flow, interpolate_mode='bilinear') | |
assert_array_almost_equal(res_, res_bi, decimal=5) | |
with pytest.raises(NotImplementedError): | |
_ = mmcv.flow_warp(img, flow, interpolate_mode='xxx') | |
with pytest.raises(AssertionError): | |
_ = mmcv.flow_warp(img, flow[:, :, 0], interpolate_mode='xxx') | |
def test_make_color_wheel(): | |
default_color_wheel = mmcv.make_color_wheel() | |
color_wheel = mmcv.make_color_wheel([2, 2, 2, 2, 2, 2]) | |
# yapf: disable | |
assert_array_equal(default_color_wheel, np.array( | |
[[1. , 0. , 0. ], # noqa | |
[1. , 0.06666667, 0. ], # noqa | |
[1. , 0.13333334, 0. ], # noqa | |
[1. , 0.2 , 0. ], # noqa | |
[1. , 0.26666668, 0. ], # noqa | |
[1. , 0.33333334, 0. ], # noqa | |
[1. , 0.4 , 0. ], # noqa | |
[1. , 0.46666667, 0. ], # noqa | |
[1. , 0.53333336, 0. ], # noqa | |
[1. , 0.6 , 0. ], # noqa | |
[1. , 0.6666667 , 0. ], # noqa | |
[1. , 0.73333335, 0. ], # noqa | |
[1. , 0.8 , 0. ], # noqa | |
[1. , 0.8666667 , 0. ], # noqa | |
[1. , 0.93333334, 0. ], # noqa | |
[1. , 1. , 0. ], # noqa | |
[0.8333333 , 1. , 0. ], # noqa | |
[0.6666667 , 1. , 0. ], # noqa | |
[0.5 , 1. , 0. ], # noqa | |
[0.33333334, 1. , 0. ], # noqa | |
[0.16666667, 1. , 0. ], # noqa | |
[0. , 1. , 0. ], # noqa | |
[0. , 1. , 0.25 ], # noqa | |
[0. , 1. , 0.5 ], # noqa | |
[0. , 1. , 0.75 ], # noqa | |
[0. , 1. , 1. ], # noqa | |
[0. , 0.90909094, 1. ], # noqa | |
[0. , 0.8181818 , 1. ], # noqa | |
[0. , 0.72727275, 1. ], # noqa | |
[0. , 0.6363636 , 1. ], # noqa | |
[0. , 0.54545456, 1. ], # noqa | |
[0. , 0.45454547, 1. ], # noqa | |
[0. , 0.36363637, 1. ], # noqa | |
[0. , 0.27272728, 1. ], # noqa | |
[0. , 0.18181819, 1. ], # noqa | |
[0. , 0.09090909, 1. ], # noqa | |
[0. , 0. , 1. ], # noqa | |
[0.07692308, 0. , 1. ], # noqa | |
[0.15384616, 0. , 1. ], # noqa | |
[0.23076923, 0. , 1. ], # noqa | |
[0.30769232, 0. , 1. ], # noqa | |
[0.3846154 , 0. , 1. ], # noqa | |
[0.46153846, 0. , 1. ], # noqa | |
[0.53846157, 0. , 1. ], # noqa | |
[0.61538464, 0. , 1. ], # noqa | |
[0.6923077 , 0. , 1. ], # noqa | |
[0.7692308 , 0. , 1. ], # noqa | |
[0.84615386, 0. , 1. ], # noqa | |
[0.9230769 , 0. , 1. ], # noqa | |
[1. , 0. , 1. ], # noqa | |
[1. , 0. , 0.8333333 ], # noqa | |
[1. , 0. , 0.6666667 ], # noqa | |
[1. , 0. , 0.5 ], # noqa | |
[1. , 0. , 0.33333334], # noqa | |
[1. , 0. , 0.16666667]], dtype=np.float32)) # noqa | |
assert_array_equal( | |
color_wheel, | |
np.array([[1., 0. , 0. ], # noqa | |
[1. , 0.5, 0. ], # noqa | |
[1. , 1. , 0. ], # noqa | |
[0.5, 1. , 0. ], # noqa | |
[0. , 1. , 0. ], # noqa | |
[0. , 1. , 0.5], # noqa | |
[0. , 1. , 1. ], # noqa | |
[0. , 0.5, 1. ], # noqa | |
[0. , 0. , 1. ], # noqa | |
[0.5, 0. , 1. ], # noqa | |
[1. , 0. , 1. ], # noqa | |
[1. , 0. , 0.5]], dtype=np.float32)) # noqa | |
# yapf: enable | |
def test_flow_from_bytes(): | |
data_dir = osp.join(osp.dirname(__file__), '../data') | |
flow_shape = (60, 80, 2) | |
flow_file = osp.join(data_dir, 'optflow.flo') | |
# read .flo file | |
flow_fromfile = mmcv.flowread(flow_file) | |
with open(flow_file, 'rb') as f: | |
flow_bytes = f.read() | |
flow_frombytes = mmcv.flow_from_bytes(flow_bytes) | |
assert flow_frombytes.shape == flow_shape | |
assert np.all(flow_frombytes == flow_fromfile) | |
def test_sparse_flow_from_bytes(): | |
data_dir = osp.join(osp.dirname(__file__), '../data') | |
flow_file = osp.join(data_dir, 'sparse_flow.png') | |
with open(flow_file, 'rb') as f: | |
flow_bytes = f.read() | |
# read flow from bytes | |
flow_frombytes, valid_frombytes = mmcv.sparse_flow_from_bytes(flow_bytes) | |
# test flow shape is [H, W, 2] and valid shape is [H, W] | |
assert flow_frombytes.shape[:2] == valid_frombytes.shape | |
assert flow_frombytes.shape[2] == 2 | |
def read_sparse_flow_from_file(): | |
flow = cv2.imread(flow_file, cv2.IMREAD_ANYDEPTH | cv2.IMREAD_COLOR) | |
flow = flow[:, :, ::-1].astype(np.float32) | |
flow, valid = flow[:, :, :2], flow[:, :, 2] | |
flow = (flow - 2**15) / 64.0 | |
return flow, valid | |
# read flow from file | |
flow_flowfile, valid_fromfile = read_sparse_flow_from_file() | |
assert np.all(flow_frombytes == flow_flowfile) | |
assert np.all(valid_frombytes == valid_fromfile) | |