|
import torch |
|
import torch.nn as nn |
|
|
|
from .utils import extract_patches, combine_patches, efficient_cdist, get_NNs_Dists |
|
|
|
def make_criteria(conf): |
|
if conf['type'] == 'PatchCoherentLoss': |
|
return PatchCoherentLoss(conf['patch_size'], stride=conf['stride'], loop=conf['loop'], coherent_alpha=conf['coherent_alpha']) |
|
elif conf['type'] == 'SWDLoss': |
|
raise NotImplementedError('SWDLoss is not implemented') |
|
else: |
|
raise ValueError('Invalid criteria: {}'.format(conf['criteria'])) |
|
|
|
class PatchCoherentLoss(torch.nn.Module): |
|
def __init__(self, patch_size=7, stride=1, loop=False, coherent_alpha=None, cache=False): |
|
super(PatchCoherentLoss, self).__init__() |
|
self.patch_size = patch_size |
|
self.stride = stride |
|
self.loop = loop |
|
self.coherent_alpha = coherent_alpha |
|
assert self.stride == 1, "Only support stride of 1" |
|
|
|
self.cache = cache |
|
if cache: |
|
self.cached_data = None |
|
|
|
def forward(self, X, Ys, dist_wrapper=None, ext=None, return_blended_results=False): |
|
"""For each patch in input X find its NN in target Y and sum the their distances""" |
|
assert X.shape[0] == 1, "Only support batch size of 1" |
|
dist_fn = lambda X, Y: dist_wrapper(efficient_cdist, X, Y) if dist_wrapper is not None else efficient_cdist(X, Y) |
|
|
|
x_patches = extract_patches(X, self.patch_size, self.stride, loop=self.loop) |
|
|
|
if not self.cache or self.cached_data is None: |
|
y_patches = [] |
|
for y in Ys: |
|
y_patches += [extract_patches(y, self.patch_size, self.stride, loop=False)] |
|
y_patches = torch.cat(y_patches, dim=1) |
|
self.cached_data = y_patches |
|
else: |
|
y_patches = self.cached_data |
|
|
|
nnf, dist = get_NNs_Dists(dist_fn, x_patches.squeeze(0), y_patches.squeeze(0), self.coherent_alpha) |
|
|
|
if return_blended_results: |
|
return combine_patches(X.shape, y_patches[:, nnf, :], self.patch_size, self.stride, loop=self.loop), dist.mean() |
|
else: |
|
return dist.mean() |
|
|
|
def clean_cache(self): |
|
self.cached_data = None |