AiOS / mmcv /tests /test_video /test_optflow.py
ttxskk
update
d7e58f0
raw
history blame
11 kB
# Copyright (c) OpenMMLab. All rights reserved.
import os
import os.path as osp
import tempfile
import cv2
import numpy as np
import pytest
from numpy.testing import assert_array_almost_equal, assert_array_equal
import mmcv
def test_flowread():
data_dir = osp.join(osp.dirname(__file__), '../data')
flow_shape = (60, 80, 2)
# read .flo file
flow = mmcv.flowread(osp.join(data_dir, 'optflow.flo'))
assert flow.shape == flow_shape
# pseudo read
flow_same = mmcv.flowread(flow)
assert_array_equal(flow, flow_same)
# read quantized flow concatenated vertically
flow = mmcv.flowread(
osp.join(data_dir, 'optflow_concat0.jpg'), quantize=True, denorm=True)
assert flow.shape == flow_shape
# read quantized flow concatenated horizontally
flow = mmcv.flowread(
osp.join(data_dir, 'optflow_concat1.jpg'),
quantize=True,
concat_axis=1,
denorm=True)
assert flow.shape == flow_shape
# test exceptions
notflow_file = osp.join(data_dir, 'color.jpg')
with pytest.raises(TypeError):
mmcv.flowread(1)
with pytest.raises(IOError):
mmcv.flowread(notflow_file)
with pytest.raises(IOError):
mmcv.flowread(notflow_file, quantize=True)
with pytest.raises(ValueError):
mmcv.flowread(np.zeros((100, 100, 1)))
def test_flowwrite():
flow = np.random.rand(100, 100, 2).astype(np.float32)
# write to a .flo file
tmp_filehandler, filename = tempfile.mkstemp()
mmcv.flowwrite(flow, filename)
flow_from_file = mmcv.flowread(filename)
assert_array_equal(flow, flow_from_file)
os.close(tmp_filehandler)
os.remove(filename)
# write to two .jpg files
tmp_filename = osp.join(tempfile.gettempdir(), 'mmcv_test_flow.jpg')
for concat_axis in range(2):
mmcv.flowwrite(
flow, tmp_filename, quantize=True, concat_axis=concat_axis)
shape = (200, 100) if concat_axis == 0 else (100, 200)
assert osp.isfile(tmp_filename)
assert mmcv.imread(tmp_filename, flag='unchanged').shape == shape
os.remove(tmp_filename)
# test exceptions
with pytest.raises(AssertionError):
mmcv.flowwrite(flow, tmp_filename, quantize=True, concat_axis=2)
def test_quantize_flow():
flow = (np.random.rand(10, 8, 2).astype(np.float32) - 0.5) * 15
max_val = 5.0
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=False)
ref = np.zeros_like(flow, dtype=np.uint8)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
for k in range(ref.shape[2]):
val = flow[i, j, k] + max_val
val = min(max(val, 0), 2 * max_val)
ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254)
assert_array_equal(dx, ref[..., 0])
assert_array_equal(dy, ref[..., 1])
max_val = 0.5
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=True)
ref = np.zeros_like(flow, dtype=np.uint8)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
for k in range(ref.shape[2]):
scale = flow.shape[1] if k == 0 else flow.shape[0]
val = flow[i, j, k] / scale + max_val
val = min(max(val, 0), 2 * max_val)
ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254)
assert_array_equal(dx, ref[..., 0])
assert_array_equal(dy, ref[..., 1])
def test_dequantize_flow():
dx = np.random.randint(256, size=(10, 8), dtype=np.uint8)
dy = np.random.randint(256, size=(10, 8), dtype=np.uint8)
max_val = 5.0
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=False)
ref = np.zeros_like(flow, dtype=np.float32)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
ref[i, j, 0] = float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val
ref[i, j, 1] = float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val
assert_array_almost_equal(flow, ref)
max_val = 0.5
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=True)
h, w = dx.shape
ref = np.zeros_like(flow, dtype=np.float32)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
ref[i, j,
0] = (float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val) * w
ref[i, j,
1] = (float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val) * h
assert_array_almost_equal(flow, ref)
def test_flow2rgb():
flow = np.array([[[0, 0], [0.5, 0.5], [1, 1], [2, 1], [3, np.inf]]],
dtype=np.float32)
flow_img = mmcv.flow2rgb(flow)
# yapf: disable
assert_array_almost_equal(
flow_img,
np.array([[[1., 1., 1.],
[1., 0.826074731, 0.683772236],
[1., 0.652149462, 0.367544472],
[1., 0.265650552, 5.96046448e-08],
[0., 0., 0.]]],
dtype=np.float32))
# yapf: enable
def test_flow_warp():
img = np.zeros((5, 5, 3))
img[2, 2, 0] = 1
flow = np.ones((5, 5, 2))
res_nn = mmcv.flow_warp(img, flow, interpolate_mode='nearest')
res_bi = mmcv.flow_warp(img, flow, interpolate_mode='bilinear')
assert_array_almost_equal(res_nn, res_bi, decimal=5)
img = np.zeros((5, 5, 1))
img[2, 2, 0] = 1
img[2, 3, 0] = 0.75
flow = np.zeros((5, 5, 2))
flow[2, 2, :] = [0.5, 0.7]
res_ = np.copy(img)
res_[2, 2] = 0.5 * 0.3 + 0.75 * 0.5 * 0.3
res_bi = mmcv.flow_warp(img, flow, interpolate_mode='bilinear')
assert_array_almost_equal(res_, res_bi, decimal=5)
with pytest.raises(NotImplementedError):
_ = mmcv.flow_warp(img, flow, interpolate_mode='xxx')
with pytest.raises(AssertionError):
_ = mmcv.flow_warp(img, flow[:, :, 0], interpolate_mode='xxx')
def test_make_color_wheel():
default_color_wheel = mmcv.make_color_wheel()
color_wheel = mmcv.make_color_wheel([2, 2, 2, 2, 2, 2])
# yapf: disable
assert_array_equal(default_color_wheel, np.array(
[[1. , 0. , 0. ], # noqa
[1. , 0.06666667, 0. ], # noqa
[1. , 0.13333334, 0. ], # noqa
[1. , 0.2 , 0. ], # noqa
[1. , 0.26666668, 0. ], # noqa
[1. , 0.33333334, 0. ], # noqa
[1. , 0.4 , 0. ], # noqa
[1. , 0.46666667, 0. ], # noqa
[1. , 0.53333336, 0. ], # noqa
[1. , 0.6 , 0. ], # noqa
[1. , 0.6666667 , 0. ], # noqa
[1. , 0.73333335, 0. ], # noqa
[1. , 0.8 , 0. ], # noqa
[1. , 0.8666667 , 0. ], # noqa
[1. , 0.93333334, 0. ], # noqa
[1. , 1. , 0. ], # noqa
[0.8333333 , 1. , 0. ], # noqa
[0.6666667 , 1. , 0. ], # noqa
[0.5 , 1. , 0. ], # noqa
[0.33333334, 1. , 0. ], # noqa
[0.16666667, 1. , 0. ], # noqa
[0. , 1. , 0. ], # noqa
[0. , 1. , 0.25 ], # noqa
[0. , 1. , 0.5 ], # noqa
[0. , 1. , 0.75 ], # noqa
[0. , 1. , 1. ], # noqa
[0. , 0.90909094, 1. ], # noqa
[0. , 0.8181818 , 1. ], # noqa
[0. , 0.72727275, 1. ], # noqa
[0. , 0.6363636 , 1. ], # noqa
[0. , 0.54545456, 1. ], # noqa
[0. , 0.45454547, 1. ], # noqa
[0. , 0.36363637, 1. ], # noqa
[0. , 0.27272728, 1. ], # noqa
[0. , 0.18181819, 1. ], # noqa
[0. , 0.09090909, 1. ], # noqa
[0. , 0. , 1. ], # noqa
[0.07692308, 0. , 1. ], # noqa
[0.15384616, 0. , 1. ], # noqa
[0.23076923, 0. , 1. ], # noqa
[0.30769232, 0. , 1. ], # noqa
[0.3846154 , 0. , 1. ], # noqa
[0.46153846, 0. , 1. ], # noqa
[0.53846157, 0. , 1. ], # noqa
[0.61538464, 0. , 1. ], # noqa
[0.6923077 , 0. , 1. ], # noqa
[0.7692308 , 0. , 1. ], # noqa
[0.84615386, 0. , 1. ], # noqa
[0.9230769 , 0. , 1. ], # noqa
[1. , 0. , 1. ], # noqa
[1. , 0. , 0.8333333 ], # noqa
[1. , 0. , 0.6666667 ], # noqa
[1. , 0. , 0.5 ], # noqa
[1. , 0. , 0.33333334], # noqa
[1. , 0. , 0.16666667]], dtype=np.float32)) # noqa
assert_array_equal(
color_wheel,
np.array([[1., 0. , 0. ], # noqa
[1. , 0.5, 0. ], # noqa
[1. , 1. , 0. ], # noqa
[0.5, 1. , 0. ], # noqa
[0. , 1. , 0. ], # noqa
[0. , 1. , 0.5], # noqa
[0. , 1. , 1. ], # noqa
[0. , 0.5, 1. ], # noqa
[0. , 0. , 1. ], # noqa
[0.5, 0. , 1. ], # noqa
[1. , 0. , 1. ], # noqa
[1. , 0. , 0.5]], dtype=np.float32)) # noqa
# yapf: enable
def test_flow_from_bytes():
data_dir = osp.join(osp.dirname(__file__), '../data')
flow_shape = (60, 80, 2)
flow_file = osp.join(data_dir, 'optflow.flo')
# read .flo file
flow_fromfile = mmcv.flowread(flow_file)
with open(flow_file, 'rb') as f:
flow_bytes = f.read()
flow_frombytes = mmcv.flow_from_bytes(flow_bytes)
assert flow_frombytes.shape == flow_shape
assert np.all(flow_frombytes == flow_fromfile)
def test_sparse_flow_from_bytes():
data_dir = osp.join(osp.dirname(__file__), '../data')
flow_file = osp.join(data_dir, 'sparse_flow.png')
with open(flow_file, 'rb') as f:
flow_bytes = f.read()
# read flow from bytes
flow_frombytes, valid_frombytes = mmcv.sparse_flow_from_bytes(flow_bytes)
# test flow shape is [H, W, 2] and valid shape is [H, W]
assert flow_frombytes.shape[:2] == valid_frombytes.shape
assert flow_frombytes.shape[2] == 2
def read_sparse_flow_from_file():
flow = cv2.imread(flow_file, cv2.IMREAD_ANYDEPTH | cv2.IMREAD_COLOR)
flow = flow[:, :, ::-1].astype(np.float32)
flow, valid = flow[:, :, :2], flow[:, :, 2]
flow = (flow - 2**15) / 64.0
return flow, valid
# read flow from file
flow_flowfile, valid_fromfile = read_sparse_flow_from_file()
assert np.all(flow_frombytes == flow_flowfile)
assert np.all(valid_frombytes == valid_fromfile)