|
from abc import ABC, abstractmethod |
|
|
|
import random |
|
import numpy as np |
|
import torch as th |
|
import torch.distributed as dist |
|
|
|
|
|
def create_named_schedule_sampler(name, diffusion): |
|
""" |
|
Create a ScheduleSampler from a library of pre-defined samplers. |
|
|
|
:param name: the name of the sampler. |
|
:param diffusion: the diffusion object to sample for. |
|
""" |
|
if name == "uniform": |
|
return UniformSampler(diffusion) |
|
elif name == "loss-second-moment": |
|
return LossSecondMomentResampler(diffusion) |
|
else: |
|
raise NotImplementedError(f"unknown schedule sampler: {name}") |
|
|
|
|
|
class ScheduleSampler(ABC): |
|
""" |
|
A distribution over timesteps in the diffusion process, intended to reduce |
|
variance of the objective. |
|
|
|
By default, samplers perform unbiased importance sampling, in which the |
|
objective's mean is unchanged. |
|
However, subclasses may override sample() to change how the resampled |
|
terms are reweighted, allowing for actual changes in the objective. |
|
""" |
|
|
|
@abstractmethod |
|
def weights(self): |
|
""" |
|
Get a numpy array of weights, one per diffusion step. |
|
|
|
The weights needn't be normalized, but must be positive. |
|
""" |
|
|
|
def sample(self, batch_size, device): |
|
""" |
|
Importance-sample timesteps for a batch. |
|
|
|
:param batch_size: the number of timesteps. |
|
:param device: the torch device to save to. |
|
:return: a tuple (timesteps, weights): |
|
- timesteps: a tensor of timestep indices. |
|
- weights: a tensor of weights to scale the resulting losses. |
|
""" |
|
w = self.weights() |
|
p = w / th.sum(w) |
|
indices_np = random.choice(len(p), size=(batch_size,), p=p) |
|
indices = th.from_numpy(indices_np).long().to(device) |
|
weights_np = 1 / (len(p) * p[indices_np]) |
|
weights = th.from_numpy(weights_np).float().to(device) |
|
return indices, weights |
|
|
|
|
|
class UniformSampler: |
|
def __init__(self, num_timesteps): |
|
self.num_timesteps = num_timesteps |
|
self._weights = th.ones([num_timesteps]) |
|
|
|
def sample(self, batch_size, device, use_fp16=False): |
|
indices = th.randint(0, self.num_timesteps, (batch_size, ), device=device) |
|
if use_fp16: |
|
weights = th.ones_like(indices).half() |
|
else: |
|
weights = th.ones_like(indices).float() |
|
return indices, weights |
|
|
|
class LossAwareSampler(ScheduleSampler): |
|
def update_with_local_losses(self, local_ts, local_losses): |
|
""" |
|
Update the reweighting using losses from a model. |
|
|
|
Call this method from each rank with a batch of timesteps and the |
|
corresponding losses for each of those timesteps. |
|
This method will perform synchronization to make sure all of the ranks |
|
maintain the exact same reweighting. |
|
|
|
:param local_ts: an integer Tensor of timesteps. |
|
:param local_losses: a 1D Tensor of losses. |
|
""" |
|
batch_sizes = [ |
|
th.tensor([0], dtype=th.int32, device=local_ts.device) |
|
for _ in range(dist.get_world_size()) |
|
] |
|
dist.all_gather( |
|
batch_sizes, |
|
th.tensor([len(local_ts)], dtype=th.int32, device=local_ts.device), |
|
) |
|
|
|
|
|
batch_sizes = [x.item() for x in batch_sizes] |
|
max_bs = max(batch_sizes) |
|
|
|
timestep_batches = [th.zeros(max_bs).to(local_ts) for bs in batch_sizes] |
|
loss_batches = [th.zeros(max_bs).to(local_losses) for bs in batch_sizes] |
|
dist.all_gather(timestep_batches, local_ts) |
|
dist.all_gather(loss_batches, local_losses) |
|
timesteps = [ |
|
x.item() for y, bs in zip(timestep_batches, batch_sizes) for x in y[:bs] |
|
] |
|
losses = [x.item() for y, bs in zip(loss_batches, batch_sizes) for x in y[:bs]] |
|
self.update_with_all_losses(timesteps, losses) |
|
|
|
@abstractmethod |
|
def update_with_all_losses(self, ts, losses): |
|
""" |
|
Update the reweighting using losses from a model. |
|
|
|
Sub-classes should override this method to update the reweighting |
|
using losses from the model. |
|
|
|
This method directly updates the reweighting without synchronizing |
|
between workers. It is called by update_with_local_losses from all |
|
ranks with identical arguments. Thus, it should have deterministic |
|
behavior to maintain state across workers. |
|
|
|
:param ts: a list of int timesteps. |
|
:param losses: a list of float losses, one per timestep. |
|
""" |
|
|
|
|
|
class LossSecondMomentResampler(LossAwareSampler): |
|
def __init__(self, diffusion, history_per_term=10, uniform_prob=0.001): |
|
self.diffusion = diffusion |
|
self.history_per_term = history_per_term |
|
self.uniform_prob = uniform_prob |
|
self._loss_history = np.zeros( |
|
[diffusion.num_timesteps, history_per_term], dtype=np.float64 |
|
) |
|
self._loss_counts = np.zeros([diffusion.num_timesteps], dtype=np.int) |
|
|
|
def weights(self): |
|
if not self._warmed_up(): |
|
return np.ones([self.diffusion.num_timesteps], dtype=np.float64) |
|
weights = np.sqrt(np.mean(self._loss_history ** 2, axis=-1)) |
|
weights /= np.sum(weights) |
|
weights *= 1 - self.uniform_prob |
|
weights += self.uniform_prob / len(weights) |
|
return weights |
|
|
|
def update_with_all_losses(self, ts, losses): |
|
for t, loss in zip(ts, losses): |
|
if self._loss_counts[t] == self.history_per_term: |
|
|
|
self._loss_history[t, :-1] = self._loss_history[t, 1:] |
|
self._loss_history[t, -1] = loss |
|
else: |
|
self._loss_history[t, self._loss_counts[t]] = loss |
|
self._loss_counts[t] += 1 |
|
|
|
def _warmed_up(self): |
|
return (self._loss_counts == self.history_per_term).all() |
|
|