|
|
|
|
|
from typing import List, Optional, Union |
|
import torch |
|
import torch.nn as nn |
|
from torch import Tensor, nn |
|
from torch.distributions.distribution import Distribution |
|
from .tools.resnet import Resnet1D |
|
from .tools.quantize_cnn import QuantizeEMAReset, Quantizer, QuantizeEMA, QuantizeReset |
|
from collections import OrderedDict |
|
|
|
|
|
class VQVae(nn.Module): |
|
|
|
def __init__(self, |
|
nfeats: int, |
|
quantizer: str = "ema_reset", |
|
code_num=512, |
|
code_dim=512, |
|
output_emb_width=512, |
|
down_t=3, |
|
stride_t=2, |
|
width=512, |
|
depth=3, |
|
dilation_growth_rate=3, |
|
norm=None, |
|
activation: str = "relu", |
|
**kwargs) -> None: |
|
|
|
super().__init__() |
|
|
|
self.code_dim = code_dim |
|
|
|
self.encoder = Encoder(nfeats, |
|
output_emb_width, |
|
down_t, |
|
stride_t, |
|
width, |
|
depth, |
|
dilation_growth_rate, |
|
activation=activation, |
|
norm=norm) |
|
|
|
self.decoder = Decoder(nfeats, |
|
output_emb_width, |
|
down_t, |
|
stride_t, |
|
width, |
|
depth, |
|
dilation_growth_rate, |
|
activation=activation, |
|
norm=norm) |
|
|
|
if quantizer == "ema_reset": |
|
self.quantizer = QuantizeEMAReset(code_num, code_dim, mu=0.99) |
|
elif quantizer == "orig": |
|
self.quantizer = Quantizer(code_num, code_dim, beta=1.0) |
|
elif quantizer == "ema": |
|
self.quantizer = QuantizeEMA(code_num, code_dim, mu=0.99) |
|
elif quantizer == "reset": |
|
self.quantizer = QuantizeReset(code_num, code_dim) |
|
|
|
def preprocess(self, x): |
|
|
|
x = x.permute(0, 2, 1) |
|
return x |
|
|
|
def postprocess(self, x): |
|
|
|
x = x.permute(0, 2, 1) |
|
return x |
|
|
|
def forward(self, features: Tensor): |
|
|
|
x_in = self.preprocess(features) |
|
|
|
|
|
x_encoder = self.encoder(x_in) |
|
|
|
|
|
x_quantized, loss, perplexity = self.quantizer(x_encoder) |
|
|
|
|
|
x_decoder = self.decoder(x_quantized) |
|
x_out = self.postprocess(x_decoder) |
|
|
|
return x_out, loss, perplexity |
|
|
|
def encode( |
|
self, |
|
features: Tensor, |
|
) -> Union[Tensor, Distribution]: |
|
|
|
N, T, _ = features.shape |
|
x_in = self.preprocess(features) |
|
x_encoder = self.encoder(x_in) |
|
x_encoder = self.postprocess(x_encoder) |
|
x_encoder = x_encoder.contiguous().view(-1, |
|
x_encoder.shape[-1]) |
|
code_idx = self.quantizer.quantize(x_encoder) |
|
code_idx = code_idx.view(N, -1) |
|
|
|
|
|
return code_idx, None |
|
|
|
def decode(self, z: Tensor): |
|
|
|
x_d = self.quantizer.dequantize(z) |
|
x_d = x_d.view(1, -1, self.code_dim).permute(0, 2, 1).contiguous() |
|
|
|
|
|
x_decoder = self.decoder(x_d) |
|
x_out = self.postprocess(x_decoder) |
|
return x_out |
|
|
|
|
|
class Encoder(nn.Module): |
|
|
|
def __init__(self, |
|
input_emb_width=3, |
|
output_emb_width=512, |
|
down_t=3, |
|
stride_t=2, |
|
width=512, |
|
depth=3, |
|
dilation_growth_rate=3, |
|
activation='relu', |
|
norm=None): |
|
super().__init__() |
|
|
|
blocks = [] |
|
filter_t, pad_t = stride_t * 2, stride_t // 2 |
|
blocks.append(nn.Conv1d(input_emb_width, width, 3, 1, 1)) |
|
blocks.append(nn.ReLU()) |
|
|
|
for i in range(down_t): |
|
input_dim = width |
|
block = nn.Sequential( |
|
nn.Conv1d(input_dim, width, filter_t, stride_t, pad_t), |
|
Resnet1D(width, |
|
depth, |
|
dilation_growth_rate, |
|
activation=activation, |
|
norm=norm), |
|
) |
|
blocks.append(block) |
|
blocks.append(nn.Conv1d(width, output_emb_width, 3, 1, 1)) |
|
self.model = nn.Sequential(*blocks) |
|
|
|
def forward(self, x): |
|
return self.model(x) |
|
|
|
|
|
class Decoder(nn.Module): |
|
|
|
def __init__(self, |
|
input_emb_width=3, |
|
output_emb_width=512, |
|
down_t=3, |
|
stride_t=2, |
|
width=512, |
|
depth=3, |
|
dilation_growth_rate=3, |
|
activation='relu', |
|
norm=None): |
|
super().__init__() |
|
blocks = [] |
|
|
|
filter_t, pad_t = stride_t * 2, stride_t // 2 |
|
blocks.append(nn.Conv1d(output_emb_width, width, 3, 1, 1)) |
|
blocks.append(nn.ReLU()) |
|
for i in range(down_t): |
|
out_dim = width |
|
block = nn.Sequential( |
|
Resnet1D(width, |
|
depth, |
|
dilation_growth_rate, |
|
reverse_dilation=True, |
|
activation=activation, |
|
norm=norm), nn.Upsample(scale_factor=2, |
|
mode='nearest'), |
|
nn.Conv1d(width, out_dim, 3, 1, 1)) |
|
blocks.append(block) |
|
blocks.append(nn.Conv1d(width, width, 3, 1, 1)) |
|
blocks.append(nn.ReLU()) |
|
blocks.append(nn.Conv1d(width, input_emb_width, 3, 1, 1)) |
|
self.model = nn.Sequential(*blocks) |
|
|
|
def forward(self, x): |
|
return self.model(x) |
|
|