|
"""Console logger utilities. |
|
|
|
Copied from https://github.com/HazyResearch/transformers/blob/master/src/utils/utils.py |
|
Copied from https://docs.python.org/3/howto/logging-cookbook.html#using-a-context-manager-for-selective-logging |
|
""" |
|
|
|
import logging |
|
import math |
|
|
|
import fsspec |
|
import lightning |
|
import torch |
|
from timm.scheduler import CosineLRScheduler |
|
|
|
|
|
def fsspec_exists(filename): |
|
"""Check if a file exists using fsspec.""" |
|
fs, _ = fsspec.core.url_to_fs(filename) |
|
return fs.exists(filename) |
|
|
|
|
|
def fsspec_listdir(dirname): |
|
"""Listdir in manner compatible with fsspec.""" |
|
fs, _ = fsspec.core.url_to_fs(dirname) |
|
return fs.ls(dirname) |
|
|
|
|
|
def fsspec_mkdirs(dirname, exist_ok=True): |
|
"""Mkdirs in manner compatible with fsspec.""" |
|
fs, _ = fsspec.core.url_to_fs(dirname) |
|
fs.makedirs(dirname, exist_ok=exist_ok) |
|
|
|
|
|
def print_nans(tensor, name): |
|
if torch.isnan(tensor).any(): |
|
print(name, tensor) |
|
|
|
|
|
class CosineDecayWarmupLRScheduler( |
|
CosineLRScheduler, |
|
torch.optim.lr_scheduler._LRScheduler): |
|
"""Wrap timm.scheduler.CosineLRScheduler |
|
Enables calling scheduler.step() without passing in epoch. |
|
Supports resuming as well. |
|
Adapted from: |
|
https://github.com/HazyResearch/hyena-dna/blob/main/src/utils/optim/schedulers.py |
|
""" |
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self._last_epoch = -1 |
|
self.step(epoch=0) |
|
|
|
def step(self, epoch=None): |
|
if epoch is None: |
|
self._last_epoch += 1 |
|
else: |
|
self._last_epoch = epoch |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.t_in_epochs: |
|
super().step(epoch=self._last_epoch) |
|
else: |
|
super().step_update(num_updates=self._last_epoch) |
|
|
|
|
|
class LoggingContext: |
|
"""Context manager for selective logging.""" |
|
def __init__(self, logger, level=None, handler=None, close=True): |
|
self.logger = logger |
|
self.level = level |
|
self.handler = handler |
|
self.close = close |
|
|
|
def __enter__(self): |
|
if self.level is not None: |
|
self.old_level = self.logger.level |
|
self.logger.setLevel(self.level) |
|
if self.handler: |
|
self.logger.addHandler(self.handler) |
|
|
|
def __exit__(self, et, ev, tb): |
|
if self.level is not None: |
|
self.logger.setLevel(self.old_level) |
|
if self.handler: |
|
self.logger.removeHandler(self.handler) |
|
if self.handler and self.close: |
|
self.handler.close() |
|
|
|
|
|
def get_logger(name=__name__, level=logging.INFO) -> logging.Logger: |
|
"""Initializes multi-GPU-friendly python logger.""" |
|
|
|
logger = logging.getLogger(name) |
|
logger.setLevel(level) |
|
|
|
|
|
|
|
for level in ('debug', 'info', 'warning', 'error', |
|
'exception', 'fatal', 'critical'): |
|
setattr(logger, |
|
level, |
|
lightning.pytorch.utilities.rank_zero_only( |
|
getattr(logger, level))) |
|
|
|
return logger |
|
|
|
|
|
class Sampler: |
|
def __init__(self, shape): |
|
self.shape = shape |
|
|
|
def _sampling_noise(self): |
|
pass |
|
|
|
def _hard_sample(self, logits): |
|
pass |
|
|
|
def _soft_sample(self, logits): |
|
return 0 |
|
|
|
def sample(self, logits): |
|
noise = self._sampling_noise() |
|
noise = noise[: logits.shape[0], :] |
|
logits = logits + noise.to( |
|
dtype=logits.dtype, device=logits.device) |
|
hard_sample = self._hard_sample(logits) |
|
soft_sample = self._soft_sample(logits) |
|
return soft_sample + (hard_sample - soft_sample).detach() |
|
|
|
|
|
class TopKSampler(Sampler): |
|
def __init__(self, k, shape, gamma_tau=1.0): |
|
super().__init__(shape) |
|
self.k = k |
|
self.gamma_tau = gamma_tau |
|
self.num_betas = 10 |
|
self.sampler = torch.distributions.gamma.Gamma( |
|
1 / k * torch.ones(self.num_betas, * self.shape), 1.0) |
|
|
|
def _sampling_noise(self): |
|
noise = self.sampler.sample() |
|
beta = self.k / torch.arange(1, self.num_betas + 1, 1, |
|
dtype=torch.float32) |
|
beta = beta[:, None, None] |
|
assert beta.ndim == noise.ndim |
|
s = noise / beta |
|
s = torch.sum(s, axis=0) |
|
s = s - math.log(10.0) |
|
s = self.gamma_tau * (s / self.k) |
|
return s |
|
|
|
def _hard_sample(self, logits): |
|
assert logits.ndim == 2 |
|
thresholds, _ = torch.sort(logits, dim=-1) |
|
thresholds = thresholds[:, - self.k][:, None] |
|
return (logits >= thresholds).type(logits.dtype) |
|
|
|
def _soft_sample(self, logits): |
|
soft_top_k = logits - torch.mean(logits, dim=-1, |
|
keepdim=True) |
|
return soft_top_k / torch.norm(soft_top_k, dim=-1, |
|
keepdim=True) |
|
|
|
|
|
class DeterministicTopK(TopKSampler): |
|
def __init__(self, k): |
|
super().__init__(k, shape=(1, 1)) |
|
|
|
def _sampling_noise(self): |
|
return 0 |
|
|
|
def discreize(self, x): |
|
hard_sample = self._hard_sample(x) |
|
soft_sample = self._soft_sample(x) |
|
return soft_sample + (hard_sample - soft_sample).detach() |
|
|
|
class GumbelSampler(Sampler): |
|
|
|
def __init__(self, shape, temperature=1.0): |
|
super().__init__(shape) |
|
self.temperature = temperature |
|
|
|
def _sampling_noise(self): |
|
return - (1e-10 - ( |
|
torch.rand(* self.shape) + 1e-10).log()).log() |
|
|
|
def _hard_sample(self, logits): |
|
assert logits.ndim == 2 |
|
indices = torch.argmax(logits, dim=-1) |
|
zeros = logits * 0 |
|
ones = torch.ones_like(logits[:, :, :1]) |
|
return torch.scatter(zeros, -1, indices[:, :, None], |
|
ones) |
|
|
|
def _soft_sample(self, logits): |
|
return torch.nn.functional.softmax( |
|
logits / self.temperature, dim=-1) |
|
|
|
|
|
class BinarySampler(GumbelSampler): |
|
|
|
def sample(self, probs): |
|
|
|
pos_noise = self._sampling_noise().to( |
|
dtype=probs.dtype, device=probs.device) |
|
neg_noise = self._sampling_noise().to( |
|
dtype=probs.dtype, device=probs.device) |
|
del_noise_exp = (neg_noise - pos_noise).exp() |
|
hard_sample = (probs * (1 + del_noise_exp) |
|
> 1).to(probs.dtype) |
|
soft_sample = probs / (probs + (1 - probs) * del_noise_exp) |
|
return soft_sample + (hard_sample - soft_sample).detach() |
|
|
|
|
|
class GaussianSampler: |
|
def __init__(self): |
|
self.softplus = torch.nn.Softplus() |
|
|
|
def sample(self, x): |
|
assert x.ndim == 2 |
|
n = x.shape[-1] // 2 |
|
mu = x[:, :n] |
|
sigma = self.softplus(x[:, n:]).sqrt() |
|
return mu + sigma * torch.randn_like(mu) |