Spaces:
Runtime error
Runtime error
import torch | |
import torch.nn.functional as F | |
from module import * | |
import numpy as np | |
import math | |
def sequence_mask(length, max_length=None): | |
if max_length is None: | |
max_length = max(length) | |
x = torch.arange(max_length, dtype=length.dtype, device=length.device) | |
return x.unsqueeze(0) < length.unsqueeze(1) | |
def maximum_path(value, mask, max_neg_val=-np.inf): | |
""" Numpy-friendly version. It's about 4 times faster than torch version. | |
value: [b, t_x, t_y] | |
mask: [b, t_x, t_y] | |
""" | |
value = value * mask | |
device = value.device | |
dtype = value.dtype | |
value = value.cpu().detach().numpy() | |
mask = mask.cpu().detach().numpy().astype(np.bool) | |
b, t_x, t_y = value.shape | |
direction = np.zeros(value.shape, dtype=np.int64) | |
v = np.zeros((b, t_x), dtype=np.float32) | |
x_range = np.arange(t_x, dtype=np.float32).reshape(1, -1) | |
for j in range(t_y): | |
v0 = np.pad(v, [[0, 0], [1, 0]], mode="constant", constant_values=max_neg_val)[:, :-1] | |
v1 = v | |
max_mask = (v1 >= v0) | |
v_max = np.where(max_mask, v1, v0) | |
direction[:, :, j] = max_mask | |
index_mask = (x_range <= j) | |
v = np.where(index_mask, v_max + value[:, :, j], max_neg_val) | |
direction = np.where(mask, direction, 1) | |
path = np.zeros(value.shape, dtype=np.float32) | |
index = mask[:, :, 0].sum(1).astype(np.int64) - 1 | |
index_range = np.arange(b) | |
for j in reversed(range(t_y)): | |
path[index_range, index, j] = 1 | |
index = index + direction[index_range, index, j] - 1 | |
path = path * mask.astype(np.float32) | |
path = torch.from_numpy(path).to(device=device, dtype=dtype) | |
return path | |
def generate_path(duration, mask): | |
""" | |
duration: [b, t_x] | |
mask: [b, t_x, t_y] | |
""" | |
device = duration.device | |
b, t_x, t_y = mask.shape | |
cum_duration = torch.cumsum(duration, 1) | |
path = torch.zeros(b, t_x, t_y, dtype=mask.dtype).to(device=device) | |
cum_duration_flat = cum_duration.view(b * t_x) | |
path = sequence_mask(cum_duration_flat, t_y).to(mask.dtype) | |
path = path.view(b, t_x, t_y) | |
path = path - F.pad(path, (0, 0, 1, 0, 0, 0))[:, :-1] | |
path = path * mask | |
return path | |
def mle_loss(z, m, logs, logdet, mask): | |
# neg normal likelihood w/o the constant term | |
l = torch.sum(logs) + 0.5 * torch.sum(torch.exp(-2 * logs) * ((z - m)**2)) | |
l = l - torch.sum(logdet) # log jacobian determinant | |
# averaging across batch, channel and time axes | |
l = l / torch.sum(torch.ones_like(z) * mask) | |
l = l + 0.5 * math.log(2 * math.pi) # add the remaining constant term | |
return l | |
def duration_loss(logw, logw_, lengths): | |
l = torch.sum((logw - logw_)**2) / torch.sum(lengths) | |
return l | |
class AttrDict(dict): | |
def __init__(self, *args, **kwargs): | |
super(AttrDict, self).__init__(*args, **kwargs) | |
self.__dict__ = self | |
def GAN_Loss_Generator(gen_outputs): | |
""" | |
gen_outputs: (B, ?) list # MPD(len=5) 또는 MSD(len=3)의 출력 | |
""" | |
loss = 0 | |
for DG in gen_outputs: | |
loss += torch.mean((DG-1)**2) | |
return loss | |
def GAN_Loss_Discriminator(real_outputs, gen_outputs): | |
""" | |
real_outputs: (B, ?) list # MPD(len=5) 또는 MSD(len=3)의 출력 | |
gen_outputs: (B, ?) list # MPD(len=5) 또는 MSD(len=3)의 출력 | |
""" | |
loss = 0 | |
for D, DG in zip(real_outputs, gen_outputs): | |
loss += torch.mean((D-1)**2 + DG**2) | |
return loss | |
def Mel_Spectrogram_Loss(real_mel, gen_mel): | |
""" | |
real_mel: (B, F, 80) # Dataloader로부터 가져온 mel-spectrogram | |
gen_mel: (B, F, 80) # Generator가 생성한 waveform의 mel-spectrogram | |
""" | |
loss = F.l1_loss(real_mel, gen_mel) | |
return 45*loss | |
def Feature_Matching_Loss(real_features, gen_features): | |
""" | |
real_features: (?, ..., ?) list of list # MPD(len=[5, 6]) 또는 MSD(len=[3, 7])의 출력 | |
gen_features: (?, ..., ?) list of list # MPD(len=[5, 6]) 또는 MSD(len=[3, 7])의 출력 | |
""" | |
loss = 0 | |
for Ds, DGs in zip(real_features, gen_features): | |
for D, DG in zip(Ds, DGs): | |
loss += torch.mean(torch.abs(D - DG)) | |
return 2*loss | |
def Final_Loss_Generator(mpd_gen_outputs, mpd_real_features, mpd_gen_features, | |
msd_gen_outputs, msd_real_features, msd_gen_features, | |
real_mel, gen_mel): | |
""" | |
=====inputs===== | |
[:3]: MPD outputs 뒤쪽 3개 | |
[3:6]: MSD outputs 뒤쪽 3개 | |
[7:8]: real_mel and gen_mel | |
=====outputs===== | |
Generator_Loss | |
Mel_Loss | |
""" | |
Gen_Adv1 = GAN_Loss_Generator(mpd_gen_outputs) | |
Gen_Adv2 = GAN_Loss_Generator(msd_gen_outputs) | |
Adv = Gen_Adv1 + Gen_Adv2 | |
FM1 = Feature_Matching_Loss(mpd_real_features, mpd_gen_features) | |
FM2 = Feature_Matching_Loss(msd_real_features, msd_gen_features) | |
FM = FM1 + FM2 | |
Mel_Loss = Mel_Spectrogram_Loss(real_mel, gen_mel) | |
Generator_Loss = Adv + FM + Mel_Loss | |
return Generator_Loss, Mel_Loss , Adv, FM | |
def Final_Loss_Discriminator(mpd_real_outputs, mpd_gen_outputs, | |
msd_real_outputs, msd_gen_outputs): | |
""" | |
=====inputs===== | |
[:2]: MPD outputs 앞쪽 2개 | |
[2:4]: MSD outputs 앞쪽 2개 | |
=====outputs===== | |
Discriminator_Loss | |
""" | |
Disc_Adv1 = GAN_Loss_Discriminator(mpd_real_outputs, mpd_gen_outputs) | |
Disc_Adv2 = GAN_Loss_Discriminator(msd_real_outputs, msd_gen_outputs) | |
Discriminator_Loss = Disc_Adv1 + Disc_Adv2 | |
return Discriminator_Loss | |
class Adam(): | |
def __init__(self, params, scheduler, dim_model, warmup_steps=4000, lr=1e0, betas=(0.9, 0.98), eps=1e-9): | |
self.params = params | |
self.scheduler = scheduler | |
self.dim_model = dim_model | |
self.warmup_steps = warmup_steps | |
self.lr = lr | |
self.betas = betas | |
self.eps = eps | |
self.step_num = 1 | |
self.cur_lr = lr * self._get_lr_scale() | |
self._optim = torch.optim.Adam(params, lr=self.cur_lr, betas=betas, eps=eps) | |
self.param_groups = self._optim.param_groups | |
def _get_lr_scale(self): | |
if self.scheduler == "noam": | |
return np.power(self.dim_model, -0.5) * np.min([np.power(self.step_num, -0.5), self.step_num * np.power(self.warmup_steps, -1.5)]) | |
else: | |
return 1 | |
def _update_learning_rate(self): | |
self.step_num += 1 | |
if self.scheduler == "noam": | |
self.cur_lr = self.lr * self._get_lr_scale() | |
for param_group in self._optim.param_groups: | |
param_group['lr'] = self.cur_lr | |
def get_lr(self): | |
return self.cur_lr | |
def step(self): | |
self._optim.step() | |
self._update_learning_rate() | |
def zero_grad(self): | |
self._optim.zero_grad() | |
def load_state_dict(self, d): | |
self._optim.load_state_dict(d) | |
def state_dict(self): | |
return self._optim.state_dict() |