|
import random |
|
import cv2 |
|
import numpy as np |
|
from PIL import Image |
|
|
|
import torch |
|
import torchvision.transforms as TF |
|
import dataloaders.image_transforms as IT |
|
|
|
cv2.setNumThreads(0) |
|
|
|
|
|
class Resize(object): |
|
"""Rescale the image in a sample to a given size. |
|
|
|
Args: |
|
output_size (tuple or int): Desired output size. If tuple, output is |
|
matched to output_size. If int, smaller of image edges is matched |
|
to output_size keeping aspect ratio the same. |
|
""" |
|
def __init__(self, output_size, use_padding=False): |
|
assert isinstance(output_size, (int, tuple)) |
|
if isinstance(output_size, int): |
|
self.output_size = (output_size, output_size) |
|
else: |
|
self.output_size = output_size |
|
self.use_padding = use_padding |
|
|
|
def __call__(self, sample): |
|
return self.padding(sample) if self.use_padding else self.rescale( |
|
sample) |
|
|
|
def rescale(self, sample): |
|
prev_img = sample['prev_img'] |
|
h, w = prev_img.shape[:2] |
|
if self.output_size == (h, w): |
|
return sample |
|
else: |
|
new_h, new_w = self.output_size |
|
|
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
tmp = sample[elem] |
|
|
|
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
|
flagval = cv2.INTER_CUBIC |
|
else: |
|
flagval = cv2.INTER_NEAREST |
|
|
|
if elem == 'curr_img' or elem == 'curr_label': |
|
new_tmp = [] |
|
all_tmp = tmp |
|
for tmp in all_tmp: |
|
tmp = cv2.resize(tmp, |
|
dsize=(new_w, new_h), |
|
interpolation=flagval) |
|
new_tmp.append(tmp) |
|
tmp = new_tmp |
|
else: |
|
tmp = cv2.resize(tmp, |
|
dsize=(new_w, new_h), |
|
interpolation=flagval) |
|
|
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
def padding(self, sample): |
|
prev_img = sample['prev_img'] |
|
h, w = prev_img.shape[:2] |
|
if self.output_size == (h, w): |
|
return sample |
|
else: |
|
new_h, new_w = self.output_size |
|
|
|
def sep_pad(x): |
|
x0 = np.random.randint(0, x + 1) |
|
x1 = x - x0 |
|
return x0, x1 |
|
|
|
top_pad, bottom_pad = sep_pad(new_h - h) |
|
left_pad, right_pad = sep_pad(new_w - w) |
|
|
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
tmp = sample[elem] |
|
|
|
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
|
pad_value = (124, 116, 104) |
|
else: |
|
pad_value = (0) |
|
|
|
if elem == 'curr_img' or elem == 'curr_label': |
|
new_tmp = [] |
|
all_tmp = tmp |
|
for tmp in all_tmp: |
|
tmp = cv2.copyMakeBorder(tmp, |
|
top_pad, |
|
bottom_pad, |
|
left_pad, |
|
right_pad, |
|
cv2.BORDER_CONSTANT, |
|
value=pad_value) |
|
new_tmp.append(tmp) |
|
tmp = new_tmp |
|
else: |
|
tmp = cv2.copyMakeBorder(tmp, |
|
top_pad, |
|
bottom_pad, |
|
left_pad, |
|
right_pad, |
|
cv2.BORDER_CONSTANT, |
|
value=pad_value) |
|
|
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
|
|
class BalancedRandomCrop(object): |
|
"""Crop randomly the image in a sample. |
|
|
|
Args: |
|
output_size (tuple or int): Desired output size. If int, square crop |
|
is made. |
|
""" |
|
def __init__(self, |
|
output_size, |
|
max_step=5, |
|
max_obj_num=5, |
|
min_obj_pixel_num=100): |
|
assert isinstance(output_size, (int, tuple)) |
|
if isinstance(output_size, int): |
|
self.output_size = (output_size, output_size) |
|
else: |
|
assert len(output_size) == 2 |
|
self.output_size = output_size |
|
self.max_step = max_step |
|
self.max_obj_num = max_obj_num |
|
self.min_obj_pixel_num = min_obj_pixel_num |
|
|
|
def __call__(self, sample): |
|
|
|
image = sample['prev_img'] |
|
h, w = image.shape[:2] |
|
new_h, new_w = self.output_size |
|
new_h = h if new_h >= h else new_h |
|
new_w = w if new_w >= w else new_w |
|
ref_label = sample["ref_label"] |
|
prev_label = sample["prev_label"] |
|
curr_label = sample["curr_label"] |
|
|
|
is_contain_obj = False |
|
step = 0 |
|
while (not is_contain_obj) and (step < self.max_step): |
|
step += 1 |
|
top = np.random.randint(0, h - new_h + 1) |
|
left = np.random.randint(0, w - new_w + 1) |
|
after_crop = [] |
|
contains = [] |
|
for elem in ([ref_label, prev_label] + curr_label): |
|
tmp = elem[top:top + new_h, left:left + new_w] |
|
contains.append(np.unique(tmp)) |
|
after_crop.append(tmp) |
|
|
|
all_obj = list(np.sort(contains[0])) |
|
|
|
if all_obj[-1] == 0: |
|
continue |
|
|
|
|
|
if all_obj[0] == 0: |
|
all_obj = all_obj[1:] |
|
|
|
|
|
new_all_obj = [] |
|
for obj_id in all_obj: |
|
after_crop_pixels = np.sum(after_crop[0] == obj_id) |
|
if after_crop_pixels > self.min_obj_pixel_num: |
|
new_all_obj.append(obj_id) |
|
|
|
if len(new_all_obj) == 0: |
|
is_contain_obj = False |
|
else: |
|
is_contain_obj = True |
|
|
|
if len(new_all_obj) > self.max_obj_num: |
|
random.shuffle(new_all_obj) |
|
new_all_obj = new_all_obj[:self.max_obj_num] |
|
|
|
all_obj = [0] + new_all_obj |
|
|
|
post_process = [] |
|
for elem in after_crop: |
|
new_elem = elem * 0 |
|
for idx in range(len(all_obj)): |
|
obj_id = all_obj[idx] |
|
if obj_id == 0: |
|
continue |
|
mask = elem == obj_id |
|
|
|
new_elem += (mask * idx).astype(np.uint8) |
|
post_process.append(new_elem.astype(np.uint8)) |
|
|
|
sample["ref_label"] = post_process[0] |
|
sample["prev_label"] = post_process[1] |
|
curr_len = len(sample["curr_img"]) |
|
sample["curr_label"] = [] |
|
for idx in range(curr_len): |
|
sample["curr_label"].append(post_process[idx + 2]) |
|
|
|
for elem in sample.keys(): |
|
if 'meta' in elem or 'label' in elem: |
|
continue |
|
if elem == 'curr_img': |
|
new_tmp = [] |
|
for tmp_ in sample[elem]: |
|
tmp_ = tmp_[top:top + new_h, left:left + new_w] |
|
new_tmp.append(tmp_) |
|
sample[elem] = new_tmp |
|
else: |
|
tmp = sample[elem] |
|
tmp = tmp[top:top + new_h, left:left + new_w] |
|
sample[elem] = tmp |
|
|
|
obj_num = len(all_obj) - 1 |
|
|
|
sample['meta']['obj_num'] = obj_num |
|
|
|
return sample |
|
|
|
|
|
class RandomScale(object): |
|
"""Randomly resize the image and the ground truth to specified scales. |
|
Args: |
|
scales (list): the list of scales |
|
""" |
|
def __init__(self, min_scale=1., max_scale=1.3, short_edge=None): |
|
self.min_scale = min_scale |
|
self.max_scale = max_scale |
|
self.short_edge = short_edge |
|
|
|
def __call__(self, sample): |
|
|
|
sc = np.random.uniform(self.min_scale, self.max_scale) |
|
|
|
if self.short_edge is not None: |
|
image = sample['prev_img'] |
|
h, w = image.shape[:2] |
|
if h > w: |
|
sc *= float(self.short_edge) / w |
|
else: |
|
sc *= float(self.short_edge) / h |
|
|
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
tmp = sample[elem] |
|
|
|
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
|
flagval = cv2.INTER_CUBIC |
|
else: |
|
flagval = cv2.INTER_NEAREST |
|
|
|
if elem == 'curr_img' or elem == 'curr_label': |
|
new_tmp = [] |
|
for tmp_ in tmp: |
|
tmp_ = cv2.resize(tmp_, |
|
None, |
|
fx=sc, |
|
fy=sc, |
|
interpolation=flagval) |
|
new_tmp.append(tmp_) |
|
tmp = new_tmp |
|
else: |
|
tmp = cv2.resize(tmp, |
|
None, |
|
fx=sc, |
|
fy=sc, |
|
interpolation=flagval) |
|
|
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
|
|
class RandomScaleV2(object): |
|
"""Randomly resize the image and the ground truth to specified scales. |
|
Args: |
|
scales (list): the list of scales |
|
""" |
|
def __init__(self, |
|
min_scale=0.36, |
|
max_scale=1.0, |
|
short_edge=None, |
|
ratio=[3. / 4., 4. / 3.]): |
|
self.min_scale = min_scale |
|
self.max_scale = max_scale |
|
self.short_edge = short_edge |
|
self.ratio = ratio |
|
|
|
def __call__(self, sample): |
|
image = sample['prev_img'] |
|
h, w = image.shape[:2] |
|
|
|
new_h, new_w = self.get_params(h, w) |
|
|
|
sc_x = float(new_w) / w |
|
sc_y = float(new_h) / h |
|
|
|
|
|
if not (self.short_edge is None): |
|
if h > w: |
|
sc_x *= float(self.short_edge) / w |
|
sc_y *= float(self.short_edge) / w |
|
else: |
|
sc_x *= float(self.short_edge) / h |
|
sc_y *= float(self.short_edge) / h |
|
|
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
tmp = sample[elem] |
|
|
|
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
|
flagval = cv2.INTER_CUBIC |
|
else: |
|
flagval = cv2.INTER_NEAREST |
|
|
|
if elem == 'curr_img' or elem == 'curr_label': |
|
new_tmp = [] |
|
for tmp_ in tmp: |
|
tmp_ = cv2.resize(tmp_, |
|
None, |
|
fx=sc_x, |
|
fy=sc_y, |
|
interpolation=flagval) |
|
new_tmp.append(tmp_) |
|
tmp = new_tmp |
|
else: |
|
tmp = cv2.resize(tmp, |
|
None, |
|
fx=sc_x, |
|
fy=sc_y, |
|
interpolation=flagval) |
|
|
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
def get_params(self, height, width): |
|
area = height * width |
|
|
|
log_ratio = [np.log(item) for item in self.ratio] |
|
for _ in range(10): |
|
target_area = area * np.random.uniform(self.min_scale**2, |
|
self.max_scale**2) |
|
aspect_ratio = np.exp(np.random.uniform(log_ratio[0], |
|
log_ratio[1])) |
|
|
|
w = int(round(np.sqrt(target_area * aspect_ratio))) |
|
h = int(round(np.sqrt(target_area / aspect_ratio))) |
|
|
|
if 0 < w <= width and 0 < h <= height: |
|
return h, w |
|
|
|
|
|
in_ratio = float(width) / float(height) |
|
if in_ratio < min(self.ratio): |
|
w = width |
|
h = int(round(w / min(self.ratio))) |
|
elif in_ratio > max(self.ratio): |
|
h = height |
|
w = int(round(h * max(self.ratio))) |
|
else: |
|
w = width |
|
h = height |
|
|
|
return h, w |
|
|
|
class RestrictSize(object): |
|
"""Randomly resize the image and the ground truth to specified scales. |
|
Args: |
|
scales (list): the list of scales |
|
""" |
|
def __init__(self, max_short_edge=None, max_long_edge=800 * 1.3): |
|
self.max_short_edge = max_short_edge |
|
self.max_long_edge = max_long_edge |
|
assert ((max_short_edge is None)) or ((max_long_edge is None)) |
|
|
|
def __call__(self, sample): |
|
|
|
|
|
sc = None |
|
image = sample['ref_img'] |
|
h, w = image.shape[:2] |
|
|
|
if not (self.max_short_edge is None): |
|
if h > w: |
|
short_edge = w |
|
else: |
|
short_edge = h |
|
if short_edge < self.max_short_edge: |
|
sc = float(self.max_short_edge) / short_edge |
|
else: |
|
if h > w: |
|
long_edge = h |
|
else: |
|
long_edge = w |
|
if long_edge > self.max_long_edge: |
|
sc = float(self.max_long_edge) / long_edge |
|
|
|
if sc is None: |
|
new_h = h |
|
new_w = w |
|
else: |
|
new_h = int(sc * h) |
|
new_w = int(sc * w) |
|
new_h = new_h - (new_h - 1) % 4 |
|
new_w = new_w - (new_w - 1) % 4 |
|
if new_h == h and new_w == w: |
|
return sample |
|
|
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
tmp = sample[elem] |
|
|
|
if 'label' in elem: |
|
flagval = cv2.INTER_NEAREST |
|
else: |
|
flagval = cv2.INTER_CUBIC |
|
|
|
tmp = cv2.resize(tmp, dsize=(new_w, new_h), interpolation=flagval) |
|
|
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
|
|
class RandomHorizontalFlip(object): |
|
"""Horizontally flip the given image and ground truth randomly with a probability of 0.5.""" |
|
def __init__(self, prob): |
|
self.p = prob |
|
|
|
def __call__(self, sample): |
|
|
|
if random.random() < self.p: |
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
if elem == 'curr_img' or elem == 'curr_label': |
|
new_tmp = [] |
|
for tmp_ in sample[elem]: |
|
tmp_ = cv2.flip(tmp_, flipCode=1) |
|
new_tmp.append(tmp_) |
|
sample[elem] = new_tmp |
|
else: |
|
tmp = sample[elem] |
|
tmp = cv2.flip(tmp, flipCode=1) |
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
|
|
class RandomVerticalFlip(object): |
|
"""Vertically flip the given image and ground truth randomly with a probability of 0.5.""" |
|
def __init__(self, prob=0.3): |
|
self.p = prob |
|
|
|
def __call__(self, sample): |
|
|
|
if random.random() < self.p: |
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
if elem == 'curr_img' or elem == 'curr_label': |
|
new_tmp = [] |
|
for tmp_ in sample[elem]: |
|
tmp_ = cv2.flip(tmp_, flipCode=0) |
|
new_tmp.append(tmp_) |
|
sample[elem] = new_tmp |
|
else: |
|
tmp = sample[elem] |
|
tmp = cv2.flip(tmp, flipCode=0) |
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
|
|
class RandomGaussianBlur(object): |
|
def __init__(self, prob=0.3, sigma=[0.1, 2.]): |
|
self.aug = TF.RandomApply([IT.GaussianBlur(sigma)], p=prob) |
|
|
|
def __call__(self, sample): |
|
for elem in sample.keys(): |
|
if 'meta' in elem or 'label' in elem: |
|
continue |
|
|
|
if elem == 'curr_img': |
|
new_tmp = [] |
|
for tmp_ in sample[elem]: |
|
tmp_ = self.apply_augmentation(tmp_) |
|
new_tmp.append(tmp_) |
|
sample[elem] = new_tmp |
|
else: |
|
tmp = sample[elem] |
|
tmp = self.apply_augmentation(tmp) |
|
sample[elem] = tmp |
|
return sample |
|
|
|
def apply_augmentation(self, x): |
|
x = Image.fromarray(np.uint8(x)) |
|
x = self.aug(x) |
|
x = np.array(x, dtype=np.float32) |
|
return x |
|
|
|
|
|
class RandomGrayScale(RandomGaussianBlur): |
|
def __init__(self, prob=0.2): |
|
self.aug = TF.RandomGrayscale(p=prob) |
|
|
|
|
|
class RandomColorJitter(RandomGaussianBlur): |
|
def __init__(self, |
|
prob=0.8, |
|
brightness=0.4, |
|
contrast=0.4, |
|
saturation=0.2, |
|
hue=0.1): |
|
self.aug = TF.RandomApply( |
|
[TF.ColorJitter(brightness, contrast, saturation, hue)], p=prob) |
|
|
|
|
|
class SubtractMeanImage(object): |
|
def __init__(self, mean, change_channels=False): |
|
self.mean = mean |
|
self.change_channels = change_channels |
|
|
|
def __call__(self, sample): |
|
for elem in sample.keys(): |
|
if 'image' in elem: |
|
if self.change_channels: |
|
sample[elem] = sample[elem][:, :, [2, 1, 0]] |
|
sample[elem] = np.subtract( |
|
sample[elem], np.array(self.mean, dtype=np.float32)) |
|
return sample |
|
|
|
def __str__(self): |
|
return 'SubtractMeanImage' + str(self.mean) |
|
|
|
|
|
class ToTensor(object): |
|
"""Convert ndarrays in sample to Tensors.""" |
|
def __call__(self, sample): |
|
|
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
tmp = sample[elem] |
|
|
|
if elem == 'curr_img' or elem == 'curr_label': |
|
new_tmp = [] |
|
for tmp_ in tmp: |
|
if tmp_.ndim == 2: |
|
tmp_ = tmp_[:, :, np.newaxis] |
|
tmp_ = tmp_.transpose((2, 0, 1)) |
|
new_tmp.append(torch.from_numpy(tmp_).int()) |
|
else: |
|
tmp_ = tmp_ / 255. |
|
tmp_ -= (0.485, 0.456, 0.406) |
|
tmp_ /= (0.229, 0.224, 0.225) |
|
tmp_ = tmp_.transpose((2, 0, 1)) |
|
new_tmp.append(torch.from_numpy(tmp_)) |
|
tmp = new_tmp |
|
else: |
|
if tmp.ndim == 2: |
|
tmp = tmp[:, :, np.newaxis] |
|
tmp = tmp.transpose((2, 0, 1)) |
|
tmp = torch.from_numpy(tmp).int() |
|
else: |
|
tmp = tmp / 255. |
|
tmp -= (0.485, 0.456, 0.406) |
|
tmp /= (0.229, 0.224, 0.225) |
|
tmp = tmp.transpose((2, 0, 1)) |
|
tmp = torch.from_numpy(tmp) |
|
sample[elem] = tmp |
|
|
|
return sample |
|
|
|
|
|
class MultiRestrictSize(object): |
|
def __init__(self, |
|
max_short_edge=None, |
|
max_long_edge=800, |
|
flip=False, |
|
multi_scale=[1.3], |
|
align_corners=True, |
|
max_stride=16): |
|
self.max_short_edge = max_short_edge |
|
self.max_long_edge = max_long_edge |
|
self.multi_scale = multi_scale |
|
self.flip = flip |
|
self.align_corners = align_corners |
|
self.max_stride = max_stride |
|
|
|
def __call__(self, sample): |
|
samples = [] |
|
image = sample['current_img'] |
|
h, w = image.shape[:2] |
|
for scale in self.multi_scale: |
|
|
|
sc = 1. |
|
if self.max_short_edge is not None: |
|
if h > w: |
|
short_edge = w |
|
else: |
|
short_edge = h |
|
if short_edge > self.max_short_edge: |
|
sc *= float(self.max_short_edge) / short_edge |
|
new_h, new_w = sc * h, sc * w |
|
|
|
|
|
sc = 1. |
|
if self.max_long_edge is not None: |
|
if new_h > new_w: |
|
long_edge = new_h |
|
else: |
|
long_edge = new_w |
|
if long_edge > self.max_long_edge: |
|
sc *= float(self.max_long_edge) / long_edge |
|
|
|
new_h, new_w = sc * new_h, sc * new_w |
|
|
|
new_h = int(new_h * scale) |
|
new_w = int(new_w * scale) |
|
|
|
if self.align_corners: |
|
if (new_h - 1) % self.max_stride != 0: |
|
new_h = int( |
|
np.around((new_h - 1) / self.max_stride) * |
|
self.max_stride + 1) |
|
if (new_w - 1) % self.max_stride != 0: |
|
new_w = int( |
|
np.around((new_w - 1) / self.max_stride) * |
|
self.max_stride + 1) |
|
else: |
|
if new_h % self.max_stride != 0: |
|
new_h = int( |
|
np.around(new_h / self.max_stride) * self.max_stride) |
|
if new_w % self.max_stride != 0: |
|
new_w = int( |
|
np.around(new_w / self.max_stride) * self.max_stride) |
|
|
|
if new_h == h and new_w == w: |
|
samples.append(sample) |
|
else: |
|
new_sample = {} |
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
new_sample[elem] = sample[elem] |
|
continue |
|
tmp = sample[elem] |
|
if 'label' in elem: |
|
new_sample[elem] = sample[elem] |
|
continue |
|
else: |
|
flagval = cv2.INTER_CUBIC |
|
tmp = cv2.resize(tmp, |
|
dsize=(new_w, new_h), |
|
interpolation=flagval) |
|
new_sample[elem] = tmp |
|
samples.append(new_sample) |
|
|
|
if self.flip: |
|
now_sample = samples[-1] |
|
new_sample = {} |
|
for elem in now_sample.keys(): |
|
if 'meta' in elem: |
|
new_sample[elem] = now_sample[elem].copy() |
|
new_sample[elem]['flip'] = True |
|
continue |
|
tmp = now_sample[elem] |
|
tmp = tmp[:, ::-1].copy() |
|
new_sample[elem] = tmp |
|
samples.append(new_sample) |
|
|
|
return samples |
|
|
|
|
|
class MultiToTensor(object): |
|
def __call__(self, samples): |
|
for idx in range(len(samples)): |
|
sample = samples[idx] |
|
for elem in sample.keys(): |
|
if 'meta' in elem: |
|
continue |
|
tmp = sample[elem] |
|
if tmp is None: |
|
continue |
|
|
|
if tmp.ndim == 2: |
|
tmp = tmp[:, :, np.newaxis] |
|
tmp = tmp.transpose((2, 0, 1)) |
|
samples[idx][elem] = torch.from_numpy(tmp).int() |
|
else: |
|
tmp = tmp / 255. |
|
tmp -= (0.485, 0.456, 0.406) |
|
tmp /= (0.229, 0.224, 0.225) |
|
tmp = tmp.transpose((2, 0, 1)) |
|
samples[idx][elem] = torch.from_numpy(tmp) |
|
|
|
return samples |
|
|