from scipy import integrate import torch from tqdm.auto import trange, tqdm import torch.nn as nn def append_zero(x): return torch.cat([x, x.new_zeros([1])]) def append_dims(x, target_dims): """Appends dimensions to the end of a tensor until it has target_dims dimensions.""" dims_to_append = target_dims - x.ndim if dims_to_append < 0: raise ValueError(f'input has {x.ndim} dims but target_dims is {target_dims}, which is less') return x[(...,) + (None,) * dims_to_append] def get_ancestral_step(sigma_from, sigma_to): """Calculates the noise level (sigma_down) to step down to and the amount of noise to add (sigma_up) when doing an ancestral sampling step.""" sigma_up = (sigma_to ** 2 * (sigma_from ** 2 - sigma_to ** 2) / sigma_from ** 2) ** 0.5 sigma_down = (sigma_to ** 2 - sigma_up ** 2) ** 0.5 return sigma_down, sigma_up class DiscreteSchedule(nn.Module): """A mapping between continuous noise levels (sigmas) and a list of discrete noise levels.""" def __init__(self, sigmas, quantize): super().__init__() self.register_buffer('sigmas', sigmas) self.quantize = quantize def get_sigmas(self, n=None): if n is None: return append_zero(self.sigmas.flip(0)) t_max = len(self.sigmas) - 1 t = torch.linspace(t_max, 0, n, device=self.sigmas.device) return append_zero(self.t_to_sigma(t)) def sigma_to_t(self, sigma, quantize=None): quantize = self.quantize if quantize is None else quantize dists = torch.abs(sigma - self.sigmas[:, None]) if quantize: return torch.argmin(dists, dim=0).view(sigma.shape) low_idx, high_idx = torch.sort(torch.topk(dists, dim=0, k=2, largest=False).indices, dim=0)[0] low, high = self.sigmas[low_idx], self.sigmas[high_idx] w = (low - sigma) / (low - high) w = w.clamp(0, 1) t = (1 - w) * low_idx + w * high_idx return t.view(sigma.shape) def t_to_sigma(self, t): t = t.float() low_idx, high_idx, w = t.floor().long(), t.ceil().long(), t.frac() # print(low_idx, high_idx, w ) return (1 - w) * self.sigmas[low_idx] + w * self.sigmas[high_idx] class DiscreteEpsDDPMDenoiser(DiscreteSchedule): """A wrapper for discrete schedule DDPM models that output eps (the predicted noise).""" def __init__(self, alphas_cumprod, quantize): super().__init__(((1 - alphas_cumprod) / alphas_cumprod) ** 0.5, quantize) self.sigma_data = 1. def get_scalings(self, sigma): c_out = -sigma c_in = 1 / (sigma ** 2 + self.sigma_data ** 2) ** 0.5 return c_out, c_in def get_eps(self, *args, **kwargs): return self.inner_model(*args, **kwargs) def forward(self, input, sigma, **kwargs): c_out, c_in = [append_dims(x, input.ndim) for x in self.get_scalings(sigma)] eps = self.get_eps(input * c_in, self.sigma_to_t(sigma), **kwargs) return input + eps * c_out class CompVisDenoiser(DiscreteEpsDDPMDenoiser): """A wrapper for CompVis diffusion models.""" def __init__(self, alphas_cumprod, quantize=False, device='cpu'): super().__init__(alphas_cumprod, quantize=quantize) def get_eps(self, *args, **kwargs): return self.inner_model.apply_model(*args, **kwargs) def to_d(x, sigma, denoised): """Converts a denoiser output to a Karras ODE derivative.""" return (x - denoised) / append_dims(sigma, x.ndim) def get_ancestral_step(sigma_from, sigma_to): """Calculates the noise level (sigma_down) to step down to and the amount of noise to add (sigma_up) when doing an ancestral sampling step.""" sigma_up = (sigma_to ** 2 * (sigma_from ** 2 - sigma_to ** 2) / sigma_from ** 2) ** 0.5 sigma_down = (sigma_to ** 2 - sigma_up ** 2) ** 0.5 return sigma_down, sigma_up @torch.no_grad() def sample_euler(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.): """Implements Algorithm 2 (Euler steps) from Karras et al. (2022).""" extra_args = {} if extra_args is None else extra_args s_in = x.new_ones([x.shape[0]]) for i in trange(len(sigmas) - 1, disable=disable): gamma = min(s_churn / (len(sigmas) - 1), 2 ** 0.5 - 1) if s_tmin <= sigmas[i] <= s_tmax else 0. eps = torch.randn_like(x) * s_noise sigma_hat = sigmas[i] * (gamma + 1) if gamma > 0: x = x + eps * (sigma_hat ** 2 - sigmas[i] ** 2) ** 0.5 denoised = model(x, sigma_hat * s_in, **extra_args) d = to_d(x, sigma_hat, denoised) if callback is not None: callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigma_hat, 'denoised': denoised}) dt = sigmas[i + 1] - sigma_hat # Euler method x = x + d * dt return x @torch.no_grad() def sample_euler_ancestral(model, x, sigmas, extra_args=None, callback=None, disable=None): """Ancestral sampling with Euler method steps.""" extra_args = {} if extra_args is None else extra_args s_in = x.new_ones([x.shape[0]]) for i in trange(len(sigmas) - 1, disable=disable): denoised = model(x, sigmas[i] * s_in, **extra_args) sigma_down, sigma_up = get_ancestral_step(sigmas[i], sigmas[i + 1]) if callback is not None: callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised}) d = to_d(x, sigmas[i], denoised) # Euler method dt = sigma_down - sigmas[i] x = x + d * dt x = x + torch.randn_like(x) * sigma_up return x @torch.no_grad() def sample_heun(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.): """Implements Algorithm 2 (Heun steps) from Karras et al. (2022).""" extra_args = {} if extra_args is None else extra_args s_in = x.new_ones([x.shape[0]]) for i in trange(len(sigmas) - 1, disable=disable): gamma = min(s_churn / (len(sigmas) - 1), 2 ** 0.5 - 1) if s_tmin <= sigmas[i] <= s_tmax else 0. eps = torch.randn_like(x) * s_noise sigma_hat = sigmas[i] * (gamma + 1) if gamma > 0: x = x + eps * (sigma_hat ** 2 - sigmas[i] ** 2) ** 0.5 denoised = model(x, sigma_hat * s_in, **extra_args) d = to_d(x, sigma_hat, denoised) if callback is not None: callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigma_hat, 'denoised': denoised}) dt = sigmas[i + 1] - sigma_hat if sigmas[i + 1] == 0: # Euler method x = x + d * dt else: # Heun's method x_2 = x + d * dt denoised_2 = model(x_2, sigmas[i + 1] * s_in, **extra_args) d_2 = to_d(x_2, sigmas[i + 1], denoised_2) d_prime = (d + d_2) / 2 x = x + d_prime * dt return x @torch.no_grad() def sample_dpm_2(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.): """A sampler inspired by DPM-Solver-2 and Algorithm 2 from Karras et al. (2022).""" extra_args = {} if extra_args is None else extra_args s_in = x.new_ones([x.shape[0]]) for i in trange(len(sigmas) - 1, disable=disable): gamma = min(s_churn / (len(sigmas) - 1), 2 ** 0.5 - 1) if s_tmin <= sigmas[i] <= s_tmax else 0. eps = torch.randn_like(x) * s_noise sigma_hat = sigmas[i] * (gamma + 1) if gamma > 0: x = x + eps * (sigma_hat ** 2 - sigmas[i] ** 2) ** 0.5 denoised = model(x, sigma_hat * s_in, **extra_args) d = to_d(x, sigma_hat, denoised) if callback is not None: callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigma_hat, 'denoised': denoised}) # Midpoint method, where the midpoint is chosen according to a rho=3 Karras schedule sigma_mid = ((sigma_hat ** (1 / 3) + sigmas[i + 1] ** (1 / 3)) / 2) ** 3 dt_1 = sigma_mid - sigma_hat dt_2 = sigmas[i + 1] - sigma_hat x_2 = x + d * dt_1 denoised_2 = model(x_2, sigma_mid * s_in, **extra_args) d_2 = to_d(x_2, sigma_mid, denoised_2) x = x + d_2 * dt_2 return x @torch.no_grad() def sample_dpm_2_ancestral(model, x, sigmas, extra_args=None, callback=None, disable=None): """Ancestral sampling with DPM-Solver inspired second-order steps.""" extra_args = {} if extra_args is None else extra_args s_in = x.new_ones([x.shape[0]]) for i in trange(len(sigmas) - 1, disable=disable): denoised = model(x, sigmas[i] * s_in, **extra_args) sigma_down, sigma_up = get_ancestral_step(sigmas[i], sigmas[i + 1]) if callback is not None: callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised}) d = to_d(x, sigmas[i], denoised) # Midpoint method, where the midpoint is chosen according to a rho=3 Karras schedule sigma_mid = ((sigmas[i] ** (1 / 3) + sigma_down ** (1 / 3)) / 2) ** 3 dt_1 = sigma_mid - sigmas[i] dt_2 = sigma_down - sigmas[i] x_2 = x + d * dt_1 denoised_2 = model(x_2, sigma_mid * s_in, **extra_args) d_2 = to_d(x_2, sigma_mid, denoised_2) x = x + d_2 * dt_2 x = x + torch.randn_like(x) * sigma_up return x def linear_multistep_coeff(order, t, i, j): if order - 1 > i: raise ValueError(f'Order {order} too high for step {i}') def fn(tau): prod = 1. for k in range(order): if j == k: continue prod *= (tau - t[i - k]) / (t[i - j] - t[i - k]) return prod return integrate.quad(fn, t[i], t[i + 1], epsrel=1e-4)[0] @torch.no_grad() def sample_lms(model, x, sigmas, extra_args=None, callback=None, disable=None, order=4): extra_args = {} if extra_args is None else extra_args s_in = x.new_ones([x.shape[0]]) ds = [] for i in trange(len(sigmas) - 1, disable=disable): denoised = model(x, sigmas[i] * s_in, **extra_args) d = to_d(x, sigmas[i], denoised) ds.append(d) if len(ds) > order: ds.pop(0) if callback is not None: callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised}) cur_order = min(i + 1, order) coeffs = [linear_multistep_coeff(cur_order, sigmas.cpu(), i, j) for j in range(cur_order)] x = x + sum(coeff * d for coeff, d in zip(coeffs, reversed(ds))) return x