Staticaliza commited on
Commit
956f73c
·
verified ·
1 Parent(s): 0f4a7cb

Upload 5 files

Browse files
dac/model/__init__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ from .base import CodecMixin
2
+ from .base import DACFile
3
+ from .dac import DAC
4
+ from .discriminator import Discriminator
dac/model/base.py ADDED
@@ -0,0 +1,294 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import math
2
+ from dataclasses import dataclass
3
+ from pathlib import Path
4
+ from typing import Union
5
+
6
+ import numpy as np
7
+ import torch
8
+ import tqdm
9
+ from audiotools import AudioSignal
10
+ from torch import nn
11
+
12
+ SUPPORTED_VERSIONS = ["1.0.0"]
13
+
14
+
15
+ @dataclass
16
+ class DACFile:
17
+ codes: torch.Tensor
18
+
19
+ # Metadata
20
+ chunk_length: int
21
+ original_length: int
22
+ input_db: float
23
+ channels: int
24
+ sample_rate: int
25
+ padding: bool
26
+ dac_version: str
27
+
28
+ def save(self, path):
29
+ artifacts = {
30
+ "codes": self.codes.numpy().astype(np.uint16),
31
+ "metadata": {
32
+ "input_db": self.input_db.numpy().astype(np.float32),
33
+ "original_length": self.original_length,
34
+ "sample_rate": self.sample_rate,
35
+ "chunk_length": self.chunk_length,
36
+ "channels": self.channels,
37
+ "padding": self.padding,
38
+ "dac_version": SUPPORTED_VERSIONS[-1],
39
+ },
40
+ }
41
+ path = Path(path).with_suffix(".dac")
42
+ with open(path, "wb") as f:
43
+ np.save(f, artifacts)
44
+ return path
45
+
46
+ @classmethod
47
+ def load(cls, path):
48
+ artifacts = np.load(path, allow_pickle=True)[()]
49
+ codes = torch.from_numpy(artifacts["codes"].astype(int))
50
+ if artifacts["metadata"].get("dac_version", None) not in SUPPORTED_VERSIONS:
51
+ raise RuntimeError(
52
+ f"Given file {path} can't be loaded with this version of descript-audio-codec."
53
+ )
54
+ return cls(codes=codes, **artifacts["metadata"])
55
+
56
+
57
+ class CodecMixin:
58
+ @property
59
+ def padding(self):
60
+ if not hasattr(self, "_padding"):
61
+ self._padding = True
62
+ return self._padding
63
+
64
+ @padding.setter
65
+ def padding(self, value):
66
+ assert isinstance(value, bool)
67
+
68
+ layers = [
69
+ l for l in self.modules() if isinstance(l, (nn.Conv1d, nn.ConvTranspose1d))
70
+ ]
71
+
72
+ for layer in layers:
73
+ if value:
74
+ if hasattr(layer, "original_padding"):
75
+ layer.padding = layer.original_padding
76
+ else:
77
+ layer.original_padding = layer.padding
78
+ layer.padding = tuple(0 for _ in range(len(layer.padding)))
79
+
80
+ self._padding = value
81
+
82
+ def get_delay(self):
83
+ # Any number works here, delay is invariant to input length
84
+ l_out = self.get_output_length(0)
85
+ L = l_out
86
+
87
+ layers = []
88
+ for layer in self.modules():
89
+ if isinstance(layer, (nn.Conv1d, nn.ConvTranspose1d)):
90
+ layers.append(layer)
91
+
92
+ for layer in reversed(layers):
93
+ d = layer.dilation[0]
94
+ k = layer.kernel_size[0]
95
+ s = layer.stride[0]
96
+
97
+ if isinstance(layer, nn.ConvTranspose1d):
98
+ L = ((L - d * (k - 1) - 1) / s) + 1
99
+ elif isinstance(layer, nn.Conv1d):
100
+ L = (L - 1) * s + d * (k - 1) + 1
101
+
102
+ L = math.ceil(L)
103
+
104
+ l_in = L
105
+
106
+ return (l_in - l_out) // 2
107
+
108
+ def get_output_length(self, input_length):
109
+ L = input_length
110
+ # Calculate output length
111
+ for layer in self.modules():
112
+ if isinstance(layer, (nn.Conv1d, nn.ConvTranspose1d)):
113
+ d = layer.dilation[0]
114
+ k = layer.kernel_size[0]
115
+ s = layer.stride[0]
116
+
117
+ if isinstance(layer, nn.Conv1d):
118
+ L = ((L - d * (k - 1) - 1) / s) + 1
119
+ elif isinstance(layer, nn.ConvTranspose1d):
120
+ L = (L - 1) * s + d * (k - 1) + 1
121
+
122
+ L = math.floor(L)
123
+ return L
124
+
125
+ @torch.no_grad()
126
+ def compress(
127
+ self,
128
+ audio_path_or_signal: Union[str, Path, AudioSignal],
129
+ win_duration: float = 1.0,
130
+ verbose: bool = False,
131
+ normalize_db: float = -16,
132
+ n_quantizers: int = None,
133
+ ) -> DACFile:
134
+ """Processes an audio signal from a file or AudioSignal object into
135
+ discrete codes. This function processes the signal in short windows,
136
+ using constant GPU memory.
137
+
138
+ Parameters
139
+ ----------
140
+ audio_path_or_signal : Union[str, Path, AudioSignal]
141
+ audio signal to reconstruct
142
+ win_duration : float, optional
143
+ window duration in seconds, by default 5.0
144
+ verbose : bool, optional
145
+ by default False
146
+ normalize_db : float, optional
147
+ normalize db, by default -16
148
+
149
+ Returns
150
+ -------
151
+ DACFile
152
+ Object containing compressed codes and metadata
153
+ required for decompression
154
+ """
155
+ audio_signal = audio_path_or_signal
156
+ if isinstance(audio_signal, (str, Path)):
157
+ audio_signal = AudioSignal.load_from_file_with_ffmpeg(str(audio_signal))
158
+
159
+ self.eval()
160
+ original_padding = self.padding
161
+ original_device = audio_signal.device
162
+
163
+ audio_signal = audio_signal.clone()
164
+ original_sr = audio_signal.sample_rate
165
+
166
+ resample_fn = audio_signal.resample
167
+ loudness_fn = audio_signal.loudness
168
+
169
+ # If audio is > 10 minutes long, use the ffmpeg versions
170
+ if audio_signal.signal_duration >= 10 * 60 * 60:
171
+ resample_fn = audio_signal.ffmpeg_resample
172
+ loudness_fn = audio_signal.ffmpeg_loudness
173
+
174
+ original_length = audio_signal.signal_length
175
+ resample_fn(self.sample_rate)
176
+ input_db = loudness_fn()
177
+
178
+ if normalize_db is not None:
179
+ audio_signal.normalize(normalize_db)
180
+ audio_signal.ensure_max_of_audio()
181
+
182
+ nb, nac, nt = audio_signal.audio_data.shape
183
+ audio_signal.audio_data = audio_signal.audio_data.reshape(nb * nac, 1, nt)
184
+ win_duration = (
185
+ audio_signal.signal_duration if win_duration is None else win_duration
186
+ )
187
+
188
+ if audio_signal.signal_duration <= win_duration:
189
+ # Unchunked compression (used if signal length < win duration)
190
+ self.padding = True
191
+ n_samples = nt
192
+ hop = nt
193
+ else:
194
+ # Chunked inference
195
+ self.padding = False
196
+ # Zero-pad signal on either side by the delay
197
+ audio_signal.zero_pad(self.delay, self.delay)
198
+ n_samples = int(win_duration * self.sample_rate)
199
+ # Round n_samples to nearest hop length multiple
200
+ n_samples = int(math.ceil(n_samples / self.hop_length) * self.hop_length)
201
+ hop = self.get_output_length(n_samples)
202
+
203
+ codes = []
204
+ range_fn = range if not verbose else tqdm.trange
205
+
206
+ for i in range_fn(0, nt, hop):
207
+ x = audio_signal[..., i : i + n_samples]
208
+ x = x.zero_pad(0, max(0, n_samples - x.shape[-1]))
209
+
210
+ audio_data = x.audio_data.to(self.device)
211
+ audio_data = self.preprocess(audio_data, self.sample_rate)
212
+ _, c, _, _, _ = self.encode(audio_data, n_quantizers)
213
+ codes.append(c.to(original_device))
214
+ chunk_length = c.shape[-1]
215
+
216
+ codes = torch.cat(codes, dim=-1)
217
+
218
+ dac_file = DACFile(
219
+ codes=codes,
220
+ chunk_length=chunk_length,
221
+ original_length=original_length,
222
+ input_db=input_db,
223
+ channels=nac,
224
+ sample_rate=original_sr,
225
+ padding=self.padding,
226
+ dac_version=SUPPORTED_VERSIONS[-1],
227
+ )
228
+
229
+ if n_quantizers is not None:
230
+ codes = codes[:, :n_quantizers, :]
231
+
232
+ self.padding = original_padding
233
+ return dac_file
234
+
235
+ @torch.no_grad()
236
+ def decompress(
237
+ self,
238
+ obj: Union[str, Path, DACFile],
239
+ verbose: bool = False,
240
+ ) -> AudioSignal:
241
+ """Reconstruct audio from a given .dac file
242
+
243
+ Parameters
244
+ ----------
245
+ obj : Union[str, Path, DACFile]
246
+ .dac file location or corresponding DACFile object.
247
+ verbose : bool, optional
248
+ Prints progress if True, by default False
249
+
250
+ Returns
251
+ -------
252
+ AudioSignal
253
+ Object with the reconstructed audio
254
+ """
255
+ self.eval()
256
+ if isinstance(obj, (str, Path)):
257
+ obj = DACFile.load(obj)
258
+
259
+ original_padding = self.padding
260
+ self.padding = obj.padding
261
+
262
+ range_fn = range if not verbose else tqdm.trange
263
+ codes = obj.codes
264
+ original_device = codes.device
265
+ chunk_length = obj.chunk_length
266
+ recons = []
267
+
268
+ for i in range_fn(0, codes.shape[-1], chunk_length):
269
+ c = codes[..., i : i + chunk_length].to(self.device)
270
+ z = self.quantizer.from_codes(c)[0]
271
+ r = self.decode(z)
272
+ recons.append(r.to(original_device))
273
+
274
+ recons = torch.cat(recons, dim=-1)
275
+ recons = AudioSignal(recons, self.sample_rate)
276
+
277
+ resample_fn = recons.resample
278
+ loudness_fn = recons.loudness
279
+
280
+ # If audio is > 10 minutes long, use the ffmpeg versions
281
+ if recons.signal_duration >= 10 * 60 * 60:
282
+ resample_fn = recons.ffmpeg_resample
283
+ loudness_fn = recons.ffmpeg_loudness
284
+
285
+ recons.normalize(obj.input_db)
286
+ resample_fn(obj.sample_rate)
287
+ recons = recons[..., : obj.original_length]
288
+ loudness_fn()
289
+ recons.audio_data = recons.audio_data.reshape(
290
+ -1, obj.channels, obj.original_length
291
+ )
292
+
293
+ self.padding = original_padding
294
+ return recons
dac/model/dac.py ADDED
@@ -0,0 +1,400 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import math
2
+ from typing import List
3
+ from typing import Union
4
+
5
+ import numpy as np
6
+ import torch
7
+ from audiotools import AudioSignal
8
+ from audiotools.ml import BaseModel
9
+ from torch import nn
10
+
11
+ from .base import CodecMixin
12
+ from dac.nn.layers import Snake1d
13
+ from dac.nn.layers import WNConv1d
14
+ from dac.nn.layers import WNConvTranspose1d
15
+ from dac.nn.quantize import ResidualVectorQuantize
16
+ from .encodec import SConv1d, SConvTranspose1d, SLSTM
17
+
18
+
19
+ def init_weights(m):
20
+ if isinstance(m, nn.Conv1d):
21
+ nn.init.trunc_normal_(m.weight, std=0.02)
22
+ nn.init.constant_(m.bias, 0)
23
+
24
+
25
+ class ResidualUnit(nn.Module):
26
+ def __init__(self, dim: int = 16, dilation: int = 1, causal: bool = False):
27
+ super().__init__()
28
+ conv1d_type = SConv1d# if causal else WNConv1d
29
+ pad = ((7 - 1) * dilation) // 2
30
+ self.block = nn.Sequential(
31
+ Snake1d(dim),
32
+ conv1d_type(dim, dim, kernel_size=7, dilation=dilation, padding=pad, causal=causal, norm='weight_norm'),
33
+ Snake1d(dim),
34
+ conv1d_type(dim, dim, kernel_size=1, causal=causal, norm='weight_norm'),
35
+ )
36
+
37
+ def forward(self, x):
38
+ y = self.block(x)
39
+ pad = (x.shape[-1] - y.shape[-1]) // 2
40
+ if pad > 0:
41
+ x = x[..., pad:-pad]
42
+ return x + y
43
+
44
+
45
+ class EncoderBlock(nn.Module):
46
+ def __init__(self, dim: int = 16, stride: int = 1, causal: bool = False):
47
+ super().__init__()
48
+ conv1d_type = SConv1d# if causal else WNConv1d
49
+ self.block = nn.Sequential(
50
+ ResidualUnit(dim // 2, dilation=1, causal=causal),
51
+ ResidualUnit(dim // 2, dilation=3, causal=causal),
52
+ ResidualUnit(dim // 2, dilation=9, causal=causal),
53
+ Snake1d(dim // 2),
54
+ conv1d_type(
55
+ dim // 2,
56
+ dim,
57
+ kernel_size=2 * stride,
58
+ stride=stride,
59
+ padding=math.ceil(stride / 2),
60
+ causal=causal,
61
+ norm='weight_norm',
62
+ ),
63
+ )
64
+
65
+ def forward(self, x):
66
+ return self.block(x)
67
+
68
+
69
+ class Encoder(nn.Module):
70
+ def __init__(
71
+ self,
72
+ d_model: int = 64,
73
+ strides: list = [2, 4, 8, 8],
74
+ d_latent: int = 64,
75
+ causal: bool = False,
76
+ lstm: int = 2,
77
+ ):
78
+ super().__init__()
79
+ conv1d_type = SConv1d# if causal else WNConv1d
80
+ # Create first convolution
81
+ self.block = [conv1d_type(1, d_model, kernel_size=7, padding=3, causal=causal, norm='weight_norm')]
82
+
83
+ # Create EncoderBlocks that double channels as they downsample by `stride`
84
+ for stride in strides:
85
+ d_model *= 2
86
+ self.block += [EncoderBlock(d_model, stride=stride, causal=causal)]
87
+
88
+ # Add LSTM if needed
89
+ self.use_lstm = lstm
90
+ if lstm:
91
+ self.block += [SLSTM(d_model, lstm)]
92
+
93
+ # Create last convolution
94
+ self.block += [
95
+ Snake1d(d_model),
96
+ conv1d_type(d_model, d_latent, kernel_size=3, padding=1, causal=causal, norm='weight_norm'),
97
+ ]
98
+
99
+ # Wrap black into nn.Sequential
100
+ self.block = nn.Sequential(*self.block)
101
+ self.enc_dim = d_model
102
+
103
+ def forward(self, x):
104
+ return self.block(x)
105
+
106
+ def reset_cache(self):
107
+ # recursively find all submodules named SConv1d in self.block and use their reset_cache method
108
+ def reset_cache(m):
109
+ if isinstance(m, SConv1d) or isinstance(m, SLSTM):
110
+ m.reset_cache()
111
+ return
112
+ for child in m.children():
113
+ reset_cache(child)
114
+
115
+ reset_cache(self.block)
116
+
117
+
118
+ class DecoderBlock(nn.Module):
119
+ def __init__(self, input_dim: int = 16, output_dim: int = 8, stride: int = 1, causal: bool = False):
120
+ super().__init__()
121
+ conv1d_type = SConvTranspose1d #if causal else WNConvTranspose1d
122
+ self.block = nn.Sequential(
123
+ Snake1d(input_dim),
124
+ conv1d_type(
125
+ input_dim,
126
+ output_dim,
127
+ kernel_size=2 * stride,
128
+ stride=stride,
129
+ padding=math.ceil(stride / 2),
130
+ causal=causal,
131
+ norm='weight_norm'
132
+ ),
133
+ ResidualUnit(output_dim, dilation=1, causal=causal),
134
+ ResidualUnit(output_dim, dilation=3, causal=causal),
135
+ ResidualUnit(output_dim, dilation=9, causal=causal),
136
+ )
137
+
138
+ def forward(self, x):
139
+ return self.block(x)
140
+
141
+
142
+ class Decoder(nn.Module):
143
+ def __init__(
144
+ self,
145
+ input_channel,
146
+ channels,
147
+ rates,
148
+ d_out: int = 1,
149
+ causal: bool = False,
150
+ lstm: int = 2,
151
+ ):
152
+ super().__init__()
153
+ conv1d_type = SConv1d# if causal else WNConv1d
154
+ # Add first conv layer
155
+ layers = [conv1d_type(input_channel, channels, kernel_size=7, padding=3, causal=causal, norm='weight_norm')]
156
+
157
+ if lstm:
158
+ layers += [SLSTM(channels, num_layers=lstm)]
159
+
160
+ # Add upsampling + MRF blocks
161
+ for i, stride in enumerate(rates):
162
+ input_dim = channels // 2**i
163
+ output_dim = channels // 2 ** (i + 1)
164
+ layers += [DecoderBlock(input_dim, output_dim, stride, causal=causal)]
165
+
166
+ # Add final conv layer
167
+ layers += [
168
+ Snake1d(output_dim),
169
+ conv1d_type(output_dim, d_out, kernel_size=7, padding=3, causal=causal, norm='weight_norm'),
170
+ nn.Tanh(),
171
+ ]
172
+
173
+ self.model = nn.Sequential(*layers)
174
+
175
+ def forward(self, x):
176
+ return self.model(x)
177
+
178
+
179
+ class DAC(BaseModel, CodecMixin):
180
+ def __init__(
181
+ self,
182
+ encoder_dim: int = 64,
183
+ encoder_rates: List[int] = [2, 4, 8, 8],
184
+ latent_dim: int = None,
185
+ decoder_dim: int = 1536,
186
+ decoder_rates: List[int] = [8, 8, 4, 2],
187
+ n_codebooks: int = 9,
188
+ codebook_size: int = 1024,
189
+ codebook_dim: Union[int, list] = 8,
190
+ quantizer_dropout: bool = False,
191
+ sample_rate: int = 44100,
192
+ lstm: int = 2,
193
+ causal: bool = False,
194
+ ):
195
+ super().__init__()
196
+
197
+ self.encoder_dim = encoder_dim
198
+ self.encoder_rates = encoder_rates
199
+ self.decoder_dim = decoder_dim
200
+ self.decoder_rates = decoder_rates
201
+ self.sample_rate = sample_rate
202
+
203
+ if latent_dim is None:
204
+ latent_dim = encoder_dim * (2 ** len(encoder_rates))
205
+
206
+ self.latent_dim = latent_dim
207
+
208
+ self.hop_length = np.prod(encoder_rates)
209
+ self.encoder = Encoder(encoder_dim, encoder_rates, latent_dim, causal=causal, lstm=lstm)
210
+
211
+ self.n_codebooks = n_codebooks
212
+ self.codebook_size = codebook_size
213
+ self.codebook_dim = codebook_dim
214
+ self.quantizer = ResidualVectorQuantize(
215
+ input_dim=latent_dim,
216
+ n_codebooks=n_codebooks,
217
+ codebook_size=codebook_size,
218
+ codebook_dim=codebook_dim,
219
+ quantizer_dropout=quantizer_dropout,
220
+ )
221
+
222
+ self.decoder = Decoder(
223
+ latent_dim,
224
+ decoder_dim,
225
+ decoder_rates,
226
+ lstm=lstm,
227
+ causal=causal,
228
+ )
229
+ self.sample_rate = sample_rate
230
+ self.apply(init_weights)
231
+
232
+ self.delay = self.get_delay()
233
+
234
+ def preprocess(self, audio_data, sample_rate):
235
+ if sample_rate is None:
236
+ sample_rate = self.sample_rate
237
+ assert sample_rate == self.sample_rate
238
+
239
+ length = audio_data.shape[-1]
240
+ right_pad = math.ceil(length / self.hop_length) * self.hop_length - length
241
+ audio_data = nn.functional.pad(audio_data, (0, right_pad))
242
+
243
+ return audio_data
244
+
245
+ def encode(
246
+ self,
247
+ audio_data: torch.Tensor,
248
+ n_quantizers: int = None,
249
+ ):
250
+ """Encode given audio data and return quantized latent codes
251
+
252
+ Parameters
253
+ ----------
254
+ audio_data : Tensor[B x 1 x T]
255
+ Audio data to encode
256
+ n_quantizers : int, optional
257
+ Number of quantizers to use, by default None
258
+ If None, all quantizers are used.
259
+
260
+ Returns
261
+ -------
262
+ dict
263
+ A dictionary with the following keys:
264
+ "z" : Tensor[B x D x T]
265
+ Quantized continuous representation of input
266
+ "codes" : Tensor[B x N x T]
267
+ Codebook indices for each codebook
268
+ (quantized discrete representation of input)
269
+ "latents" : Tensor[B x N*D x T]
270
+ Projected latents (continuous representation of input before quantization)
271
+ "vq/commitment_loss" : Tensor[1]
272
+ Commitment loss to train encoder to predict vectors closer to codebook
273
+ entries
274
+ "vq/codebook_loss" : Tensor[1]
275
+ Codebook loss to update the codebook
276
+ "length" : int
277
+ Number of samples in input audio
278
+ """
279
+ z = self.encoder(audio_data)
280
+ z, codes, latents, commitment_loss, codebook_loss = self.quantizer(
281
+ z, n_quantizers
282
+ )
283
+ return z, codes, latents, commitment_loss, codebook_loss
284
+
285
+ def decode(self, z: torch.Tensor):
286
+ """Decode given latent codes and return audio data
287
+
288
+ Parameters
289
+ ----------
290
+ z : Tensor[B x D x T]
291
+ Quantized continuous representation of input
292
+ length : int, optional
293
+ Number of samples in output audio, by default None
294
+
295
+ Returns
296
+ -------
297
+ dict
298
+ A dictionary with the following keys:
299
+ "audio" : Tensor[B x 1 x length]
300
+ Decoded audio data.
301
+ """
302
+ return self.decoder(z)
303
+
304
+ def forward(
305
+ self,
306
+ audio_data: torch.Tensor,
307
+ sample_rate: int = None,
308
+ n_quantizers: int = None,
309
+ ):
310
+ """Model forward pass
311
+
312
+ Parameters
313
+ ----------
314
+ audio_data : Tensor[B x 1 x T]
315
+ Audio data to encode
316
+ sample_rate : int, optional
317
+ Sample rate of audio data in Hz, by default None
318
+ If None, defaults to `self.sample_rate`
319
+ n_quantizers : int, optional
320
+ Number of quantizers to use, by default None.
321
+ If None, all quantizers are used.
322
+
323
+ Returns
324
+ -------
325
+ dict
326
+ A dictionary with the following keys:
327
+ "z" : Tensor[B x D x T]
328
+ Quantized continuous representation of input
329
+ "codes" : Tensor[B x N x T]
330
+ Codebook indices for each codebook
331
+ (quantized discrete representation of input)
332
+ "latents" : Tensor[B x N*D x T]
333
+ Projected latents (continuous representation of input before quantization)
334
+ "vq/commitment_loss" : Tensor[1]
335
+ Commitment loss to train encoder to predict vectors closer to codebook
336
+ entries
337
+ "vq/codebook_loss" : Tensor[1]
338
+ Codebook loss to update the codebook
339
+ "length" : int
340
+ Number of samples in input audio
341
+ "audio" : Tensor[B x 1 x length]
342
+ Decoded audio data.
343
+ """
344
+ length = audio_data.shape[-1]
345
+ audio_data = self.preprocess(audio_data, sample_rate)
346
+ z, codes, latents, commitment_loss, codebook_loss = self.encode(
347
+ audio_data, n_quantizers
348
+ )
349
+
350
+ x = self.decode(z)
351
+ return {
352
+ "audio": x[..., :length],
353
+ "z": z,
354
+ "codes": codes,
355
+ "latents": latents,
356
+ "vq/commitment_loss": commitment_loss,
357
+ "vq/codebook_loss": codebook_loss,
358
+ }
359
+
360
+
361
+ if __name__ == "__main__":
362
+ import numpy as np
363
+ from functools import partial
364
+
365
+ model = DAC().to("cpu")
366
+
367
+ for n, m in model.named_modules():
368
+ o = m.extra_repr()
369
+ p = sum([np.prod(p.size()) for p in m.parameters()])
370
+ fn = lambda o, p: o + f" {p/1e6:<.3f}M params."
371
+ setattr(m, "extra_repr", partial(fn, o=o, p=p))
372
+ print(model)
373
+ print("Total # of params: ", sum([np.prod(p.size()) for p in model.parameters()]))
374
+
375
+ length = 88200 * 2
376
+ x = torch.randn(1, 1, length).to(model.device)
377
+ x.requires_grad_(True)
378
+ x.retain_grad()
379
+
380
+ # Make a forward pass
381
+ out = model(x)["audio"]
382
+ print("Input shape:", x.shape)
383
+ print("Output shape:", out.shape)
384
+
385
+ # Create gradient variable
386
+ grad = torch.zeros_like(out)
387
+ grad[:, :, grad.shape[-1] // 2] = 1
388
+
389
+ # Make a backward pass
390
+ out.backward(grad)
391
+
392
+ # Check non-zero values
393
+ gradmap = x.grad.squeeze(0)
394
+ gradmap = (gradmap != 0).sum(0) # sum across features
395
+ rf = (gradmap != 0).sum()
396
+
397
+ print(f"Receptive field: {rf.item()}")
398
+
399
+ x = AudioSignal(torch.randn(1, 1, 44100 * 60), 44100)
400
+ model.decompress(model.compress(x, verbose=True), verbose=True)
dac/model/discriminator.py ADDED
@@ -0,0 +1,228 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import torch
2
+ import torch.nn as nn
3
+ import torch.nn.functional as F
4
+ from audiotools import AudioSignal
5
+ from audiotools import ml
6
+ from audiotools import STFTParams
7
+ from einops import rearrange
8
+ from torch.nn.utils import weight_norm
9
+
10
+
11
+ def WNConv1d(*args, **kwargs):
12
+ act = kwargs.pop("act", True)
13
+ conv = weight_norm(nn.Conv1d(*args, **kwargs))
14
+ if not act:
15
+ return conv
16
+ return nn.Sequential(conv, nn.LeakyReLU(0.1))
17
+
18
+
19
+ def WNConv2d(*args, **kwargs):
20
+ act = kwargs.pop("act", True)
21
+ conv = weight_norm(nn.Conv2d(*args, **kwargs))
22
+ if not act:
23
+ return conv
24
+ return nn.Sequential(conv, nn.LeakyReLU(0.1))
25
+
26
+
27
+ class MPD(nn.Module):
28
+ def __init__(self, period):
29
+ super().__init__()
30
+ self.period = period
31
+ self.convs = nn.ModuleList(
32
+ [
33
+ WNConv2d(1, 32, (5, 1), (3, 1), padding=(2, 0)),
34
+ WNConv2d(32, 128, (5, 1), (3, 1), padding=(2, 0)),
35
+ WNConv2d(128, 512, (5, 1), (3, 1), padding=(2, 0)),
36
+ WNConv2d(512, 1024, (5, 1), (3, 1), padding=(2, 0)),
37
+ WNConv2d(1024, 1024, (5, 1), 1, padding=(2, 0)),
38
+ ]
39
+ )
40
+ self.conv_post = WNConv2d(
41
+ 1024, 1, kernel_size=(3, 1), padding=(1, 0), act=False
42
+ )
43
+
44
+ def pad_to_period(self, x):
45
+ t = x.shape[-1]
46
+ x = F.pad(x, (0, self.period - t % self.period), mode="reflect")
47
+ return x
48
+
49
+ def forward(self, x):
50
+ fmap = []
51
+
52
+ x = self.pad_to_period(x)
53
+ x = rearrange(x, "b c (l p) -> b c l p", p=self.period)
54
+
55
+ for layer in self.convs:
56
+ x = layer(x)
57
+ fmap.append(x)
58
+
59
+ x = self.conv_post(x)
60
+ fmap.append(x)
61
+
62
+ return fmap
63
+
64
+
65
+ class MSD(nn.Module):
66
+ def __init__(self, rate: int = 1, sample_rate: int = 44100):
67
+ super().__init__()
68
+ self.convs = nn.ModuleList(
69
+ [
70
+ WNConv1d(1, 16, 15, 1, padding=7),
71
+ WNConv1d(16, 64, 41, 4, groups=4, padding=20),
72
+ WNConv1d(64, 256, 41, 4, groups=16, padding=20),
73
+ WNConv1d(256, 1024, 41, 4, groups=64, padding=20),
74
+ WNConv1d(1024, 1024, 41, 4, groups=256, padding=20),
75
+ WNConv1d(1024, 1024, 5, 1, padding=2),
76
+ ]
77
+ )
78
+ self.conv_post = WNConv1d(1024, 1, 3, 1, padding=1, act=False)
79
+ self.sample_rate = sample_rate
80
+ self.rate = rate
81
+
82
+ def forward(self, x):
83
+ x = AudioSignal(x, self.sample_rate)
84
+ x.resample(self.sample_rate // self.rate)
85
+ x = x.audio_data
86
+
87
+ fmap = []
88
+
89
+ for l in self.convs:
90
+ x = l(x)
91
+ fmap.append(x)
92
+ x = self.conv_post(x)
93
+ fmap.append(x)
94
+
95
+ return fmap
96
+
97
+
98
+ BANDS = [(0.0, 0.1), (0.1, 0.25), (0.25, 0.5), (0.5, 0.75), (0.75, 1.0)]
99
+
100
+
101
+ class MRD(nn.Module):
102
+ def __init__(
103
+ self,
104
+ window_length: int,
105
+ hop_factor: float = 0.25,
106
+ sample_rate: int = 44100,
107
+ bands: list = BANDS,
108
+ ):
109
+ """Complex multi-band spectrogram discriminator.
110
+ Parameters
111
+ ----------
112
+ window_length : int
113
+ Window length of STFT.
114
+ hop_factor : float, optional
115
+ Hop factor of the STFT, defaults to ``0.25 * window_length``.
116
+ sample_rate : int, optional
117
+ Sampling rate of audio in Hz, by default 44100
118
+ bands : list, optional
119
+ Bands to run discriminator over.
120
+ """
121
+ super().__init__()
122
+
123
+ self.window_length = window_length
124
+ self.hop_factor = hop_factor
125
+ self.sample_rate = sample_rate
126
+ self.stft_params = STFTParams(
127
+ window_length=window_length,
128
+ hop_length=int(window_length * hop_factor),
129
+ match_stride=True,
130
+ )
131
+
132
+ n_fft = window_length // 2 + 1
133
+ bands = [(int(b[0] * n_fft), int(b[1] * n_fft)) for b in bands]
134
+ self.bands = bands
135
+
136
+ ch = 32
137
+ convs = lambda: nn.ModuleList(
138
+ [
139
+ WNConv2d(2, ch, (3, 9), (1, 1), padding=(1, 4)),
140
+ WNConv2d(ch, ch, (3, 9), (1, 2), padding=(1, 4)),
141
+ WNConv2d(ch, ch, (3, 9), (1, 2), padding=(1, 4)),
142
+ WNConv2d(ch, ch, (3, 9), (1, 2), padding=(1, 4)),
143
+ WNConv2d(ch, ch, (3, 3), (1, 1), padding=(1, 1)),
144
+ ]
145
+ )
146
+ self.band_convs = nn.ModuleList([convs() for _ in range(len(self.bands))])
147
+ self.conv_post = WNConv2d(ch, 1, (3, 3), (1, 1), padding=(1, 1), act=False)
148
+
149
+ def spectrogram(self, x):
150
+ x = AudioSignal(x, self.sample_rate, stft_params=self.stft_params)
151
+ x = torch.view_as_real(x.stft())
152
+ x = rearrange(x, "b 1 f t c -> (b 1) c t f")
153
+ # Split into bands
154
+ x_bands = [x[..., b[0] : b[1]] for b in self.bands]
155
+ return x_bands
156
+
157
+ def forward(self, x):
158
+ x_bands = self.spectrogram(x)
159
+ fmap = []
160
+
161
+ x = []
162
+ for band, stack in zip(x_bands, self.band_convs):
163
+ for layer in stack:
164
+ band = layer(band)
165
+ fmap.append(band)
166
+ x.append(band)
167
+
168
+ x = torch.cat(x, dim=-1)
169
+ x = self.conv_post(x)
170
+ fmap.append(x)
171
+
172
+ return fmap
173
+
174
+
175
+ class Discriminator(nn.Module):
176
+ def __init__(
177
+ self,
178
+ rates: list = [],
179
+ periods: list = [2, 3, 5, 7, 11],
180
+ fft_sizes: list = [2048, 1024, 512],
181
+ sample_rate: int = 44100,
182
+ bands: list = BANDS,
183
+ ):
184
+ """Discriminator that combines multiple discriminators.
185
+
186
+ Parameters
187
+ ----------
188
+ rates : list, optional
189
+ sampling rates (in Hz) to run MSD at, by default []
190
+ If empty, MSD is not used.
191
+ periods : list, optional
192
+ periods (of samples) to run MPD at, by default [2, 3, 5, 7, 11]
193
+ fft_sizes : list, optional
194
+ Window sizes of the FFT to run MRD at, by default [2048, 1024, 512]
195
+ sample_rate : int, optional
196
+ Sampling rate of audio in Hz, by default 44100
197
+ bands : list, optional
198
+ Bands to run MRD at, by default `BANDS`
199
+ """
200
+ super().__init__()
201
+ discs = []
202
+ discs += [MPD(p) for p in periods]
203
+ discs += [MSD(r, sample_rate=sample_rate) for r in rates]
204
+ discs += [MRD(f, sample_rate=sample_rate, bands=bands) for f in fft_sizes]
205
+ self.discriminators = nn.ModuleList(discs)
206
+
207
+ def preprocess(self, y):
208
+ # Remove DC offset
209
+ y = y - y.mean(dim=-1, keepdims=True)
210
+ # Peak normalize the volume of input audio
211
+ y = 0.8 * y / (y.abs().max(dim=-1, keepdim=True)[0] + 1e-9)
212
+ return y
213
+
214
+ def forward(self, x):
215
+ x = self.preprocess(x)
216
+ fmaps = [d(x) for d in self.discriminators]
217
+ return fmaps
218
+
219
+
220
+ if __name__ == "__main__":
221
+ disc = Discriminator()
222
+ x = torch.zeros(1, 1, 44100)
223
+ results = disc(x)
224
+ for i, result in enumerate(results):
225
+ print(f"disc{i}")
226
+ for i, r in enumerate(result):
227
+ print(r.shape, r.mean(), r.min(), r.max())
228
+ print()
dac/model/encodec.py ADDED
@@ -0,0 +1,320 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Copyright (c) Meta Platforms, Inc. and affiliates.
2
+ # All rights reserved.
3
+ #
4
+ # This source code is licensed under the license found in the
5
+ # LICENSE file in the root directory of this source tree.
6
+
7
+ """Convolutional layers wrappers and utilities."""
8
+
9
+ import math
10
+ import typing as tp
11
+ import warnings
12
+
13
+ import torch
14
+ from torch import nn
15
+ from torch.nn import functional as F
16
+ from torch.nn.utils import spectral_norm, weight_norm
17
+
18
+ import typing as tp
19
+
20
+ import einops
21
+
22
+
23
+ class ConvLayerNorm(nn.LayerNorm):
24
+ """
25
+ Convolution-friendly LayerNorm that moves channels to last dimensions
26
+ before running the normalization and moves them back to original position right after.
27
+ """
28
+ def __init__(self, normalized_shape: tp.Union[int, tp.List[int], torch.Size], **kwargs):
29
+ super().__init__(normalized_shape, **kwargs)
30
+
31
+ def forward(self, x):
32
+ x = einops.rearrange(x, 'b ... t -> b t ...')
33
+ x = super().forward(x)
34
+ x = einops.rearrange(x, 'b t ... -> b ... t')
35
+ return
36
+
37
+
38
+ CONV_NORMALIZATIONS = frozenset(['none', 'weight_norm', 'spectral_norm',
39
+ 'time_layer_norm', 'layer_norm', 'time_group_norm'])
40
+
41
+
42
+ def apply_parametrization_norm(module: nn.Module, norm: str = 'none') -> nn.Module:
43
+ assert norm in CONV_NORMALIZATIONS
44
+ if norm == 'weight_norm':
45
+ return weight_norm(module)
46
+ elif norm == 'spectral_norm':
47
+ return spectral_norm(module)
48
+ else:
49
+ # We already check was in CONV_NORMALIZATION, so any other choice
50
+ # doesn't need reparametrization.
51
+ return module
52
+
53
+
54
+ def get_norm_module(module: nn.Module, causal: bool = False, norm: str = 'none', **norm_kwargs) -> nn.Module:
55
+ """Return the proper normalization module. If causal is True, this will ensure the returned
56
+ module is causal, or return an error if the normalization doesn't support causal evaluation.
57
+ """
58
+ assert norm in CONV_NORMALIZATIONS
59
+ if norm == 'layer_norm':
60
+ assert isinstance(module, nn.modules.conv._ConvNd)
61
+ return ConvLayerNorm(module.out_channels, **norm_kwargs)
62
+ elif norm == 'time_group_norm':
63
+ if causal:
64
+ raise ValueError("GroupNorm doesn't support causal evaluation.")
65
+ assert isinstance(module, nn.modules.conv._ConvNd)
66
+ return nn.GroupNorm(1, module.out_channels, **norm_kwargs)
67
+ else:
68
+ return nn.Identity()
69
+
70
+
71
+ def get_extra_padding_for_conv1d(x: torch.Tensor, kernel_size: int, stride: int,
72
+ padding_total: int = 0) -> int:
73
+ """See `pad_for_conv1d`.
74
+ """
75
+ length = x.shape[-1]
76
+ n_frames = (length - kernel_size + padding_total) / stride + 1
77
+ ideal_length = (math.ceil(n_frames) - 1) * stride + (kernel_size - padding_total)
78
+ return ideal_length - length
79
+
80
+
81
+ def pad_for_conv1d(x: torch.Tensor, kernel_size: int, stride: int, padding_total: int = 0):
82
+ """Pad for a convolution to make sure that the last window is full.
83
+ Extra padding is added at the end. This is required to ensure that we can rebuild
84
+ an output of the same length, as otherwise, even with padding, some time steps
85
+ might get removed.
86
+ For instance, with total padding = 4, kernel size = 4, stride = 2:
87
+ 0 0 1 2 3 4 5 0 0 # (0s are padding)
88
+ 1 2 3 # (output frames of a convolution, last 0 is never used)
89
+ 0 0 1 2 3 4 5 0 # (output of tr. conv., but pos. 5 is going to get removed as padding)
90
+ 1 2 3 4 # once you removed padding, we are missing one time step !
91
+ """
92
+ extra_padding = get_extra_padding_for_conv1d(x, kernel_size, stride, padding_total)
93
+ return F.pad(x, (0, extra_padding))
94
+
95
+
96
+ def pad1d(x: torch.Tensor, paddings: tp.Tuple[int, int], mode: str = 'zero', value: float = 0.):
97
+ """Tiny wrapper around F.pad, just to allow for reflect padding on small input.
98
+ If this is the case, we insert extra 0 padding to the right before the reflection happen.
99
+ """
100
+ length = x.shape[-1]
101
+ padding_left, padding_right = paddings
102
+ assert padding_left >= 0 and padding_right >= 0, (padding_left, padding_right)
103
+ if mode == 'reflect':
104
+ max_pad = max(padding_left, padding_right)
105
+ extra_pad = 0
106
+ if length <= max_pad:
107
+ extra_pad = max_pad - length + 1
108
+ x = F.pad(x, (0, extra_pad))
109
+ padded = F.pad(x, paddings, mode, value)
110
+ end = padded.shape[-1] - extra_pad
111
+ return padded[..., :end]
112
+ else:
113
+ return F.pad(x, paddings, mode, value)
114
+
115
+
116
+ def unpad1d(x: torch.Tensor, paddings: tp.Tuple[int, int]):
117
+ """Remove padding from x, handling properly zero padding. Only for 1d!"""
118
+ padding_left, padding_right = paddings
119
+ assert padding_left >= 0 and padding_right >= 0, (padding_left, padding_right)
120
+ assert (padding_left + padding_right) <= x.shape[-1]
121
+ end = x.shape[-1] - padding_right
122
+ return x[..., padding_left: end]
123
+
124
+
125
+ class NormConv1d(nn.Module):
126
+ """Wrapper around Conv1d and normalization applied to this conv
127
+ to provide a uniform interface across normalization approaches.
128
+ """
129
+ def __init__(self, *args, causal: bool = False, norm: str = 'none',
130
+ norm_kwargs: tp.Dict[str, tp.Any] = {}, **kwargs):
131
+ super().__init__()
132
+ self.conv = apply_parametrization_norm(nn.Conv1d(*args, **kwargs), norm)
133
+ self.norm = get_norm_module(self.conv, causal, norm, **norm_kwargs)
134
+ self.norm_type = norm
135
+
136
+ def forward(self, x):
137
+ x = self.conv(x)
138
+ x = self.norm(x)
139
+ return x
140
+
141
+
142
+ class NormConv2d(nn.Module):
143
+ """Wrapper around Conv2d and normalization applied to this conv
144
+ to provide a uniform interface across normalization approaches.
145
+ """
146
+ def __init__(self, *args, norm: str = 'none',
147
+ norm_kwargs: tp.Dict[str, tp.Any] = {}, **kwargs):
148
+ super().__init__()
149
+ self.conv = apply_parametrization_norm(nn.Conv2d(*args, **kwargs), norm)
150
+ self.norm = get_norm_module(self.conv, causal=False, norm=norm, **norm_kwargs)
151
+ self.norm_type = norm
152
+
153
+ def forward(self, x):
154
+ x = self.conv(x)
155
+ x = self.norm(x)
156
+ return x
157
+
158
+
159
+ class NormConvTranspose1d(nn.Module):
160
+ """Wrapper around ConvTranspose1d and normalization applied to this conv
161
+ to provide a uniform interface across normalization approaches.
162
+ """
163
+ def __init__(self, *args, causal: bool = False, norm: str = 'none',
164
+ norm_kwargs: tp.Dict[str, tp.Any] = {}, **kwargs):
165
+ super().__init__()
166
+ self.convtr = apply_parametrization_norm(nn.ConvTranspose1d(*args, **kwargs), norm)
167
+ self.norm = get_norm_module(self.convtr, causal, norm, **norm_kwargs)
168
+ self.norm_type = norm
169
+
170
+ def forward(self, x):
171
+ x = self.convtr(x)
172
+ x = self.norm(x)
173
+ return x
174
+
175
+
176
+ class NormConvTranspose2d(nn.Module):
177
+ """Wrapper around ConvTranspose2d and normalization applied to this conv
178
+ to provide a uniform interface across normalization approaches.
179
+ """
180
+ def __init__(self, *args, norm: str = 'none',
181
+ norm_kwargs: tp.Dict[str, tp.Any] = {}, **kwargs):
182
+ super().__init__()
183
+ self.convtr = apply_parametrization_norm(nn.ConvTranspose2d(*args, **kwargs), norm)
184
+ self.norm = get_norm_module(self.convtr, causal=False, norm=norm, **norm_kwargs)
185
+
186
+ def forward(self, x):
187
+ x = self.convtr(x)
188
+ x = self.norm(x)
189
+ return x
190
+
191
+
192
+ class SConv1d(nn.Module):
193
+ """Conv1d with some builtin handling of asymmetric or causal padding
194
+ and normalization.
195
+ """
196
+ def __init__(self, in_channels: int, out_channels: int,
197
+ kernel_size: int, stride: int = 1, dilation: int = 1,
198
+ groups: int = 1, bias: bool = True, causal: bool = False,
199
+ norm: str = 'none', norm_kwargs: tp.Dict[str, tp.Any] = {},
200
+ pad_mode: str = 'reflect', **kwargs):
201
+ super().__init__()
202
+ # warn user on unusual setup between dilation and stride
203
+ if stride > 1 and dilation > 1:
204
+ warnings.warn('SConv1d has been initialized with stride > 1 and dilation > 1'
205
+ f' (kernel_size={kernel_size} stride={stride}, dilation={dilation}).')
206
+ self.conv = NormConv1d(in_channels, out_channels, kernel_size, stride,
207
+ dilation=dilation, groups=groups, bias=bias, causal=causal,
208
+ norm=norm, norm_kwargs=norm_kwargs)
209
+ self.causal = causal
210
+ self.pad_mode = pad_mode
211
+
212
+ self.cache_enabled = False
213
+
214
+ def reset_cache(self):
215
+ """Reset the cache when starting a new stream."""
216
+ self.cache = None
217
+ self.cache_enabled = True
218
+
219
+ def forward(self, x):
220
+ B, C, T = x.shape
221
+ kernel_size = self.conv.conv.kernel_size[0]
222
+ stride = self.conv.conv.stride[0]
223
+ dilation = self.conv.conv.dilation[0]
224
+ kernel_size = (kernel_size - 1) * dilation + 1 # effective kernel size with dilations
225
+ padding_total = kernel_size - stride
226
+ extra_padding = get_extra_padding_for_conv1d(x, kernel_size, stride, padding_total)
227
+
228
+ if self.causal:
229
+ # Left padding for causal
230
+ if self.cache_enabled and self.cache is not None:
231
+ # Concatenate the cache (previous inputs) with the new input for streaming
232
+ x = torch.cat([self.cache, x], dim=2)
233
+ else:
234
+ x = pad1d(x, (padding_total, extra_padding), mode=self.pad_mode)
235
+ else:
236
+ # Asymmetric padding required for odd strides
237
+ padding_right = padding_total // 2
238
+ padding_left = padding_total - padding_right
239
+ x = pad1d(x, (padding_left, padding_right + extra_padding), mode=self.pad_mode)
240
+
241
+ # Store the most recent input frames for future cache use
242
+ if self.cache_enabled:
243
+ if self.cache is None:
244
+ # Initialize cache with zeros (at the start of streaming)
245
+ self.cache = torch.zeros(B, C, kernel_size - 1, device=x.device)
246
+ # Update the cache by storing the latest input frames
247
+ if kernel_size > 1:
248
+ self.cache = x[:, :, -kernel_size + 1:].detach() # Only store the necessary frames
249
+
250
+ return self.conv(x)
251
+
252
+
253
+
254
+ class SConvTranspose1d(nn.Module):
255
+ """ConvTranspose1d with some builtin handling of asymmetric or causal padding
256
+ and normalization.
257
+ """
258
+ def __init__(self, in_channels: int, out_channels: int,
259
+ kernel_size: int, stride: int = 1, causal: bool = False,
260
+ norm: str = 'none', trim_right_ratio: float = 1.,
261
+ norm_kwargs: tp.Dict[str, tp.Any] = {}, **kwargs):
262
+ super().__init__()
263
+ self.convtr = NormConvTranspose1d(in_channels, out_channels, kernel_size, stride,
264
+ causal=causal, norm=norm, norm_kwargs=norm_kwargs)
265
+ self.causal = causal
266
+ self.trim_right_ratio = trim_right_ratio
267
+ assert self.causal or self.trim_right_ratio == 1., \
268
+ "`trim_right_ratio` != 1.0 only makes sense for causal convolutions"
269
+ assert self.trim_right_ratio >= 0. and self.trim_right_ratio <= 1.
270
+
271
+ def forward(self, x):
272
+ kernel_size = self.convtr.convtr.kernel_size[0]
273
+ stride = self.convtr.convtr.stride[0]
274
+ padding_total = kernel_size - stride
275
+
276
+ y = self.convtr(x)
277
+
278
+ # We will only trim fixed padding. Extra padding from `pad_for_conv1d` would be
279
+ # removed at the very end, when keeping only the right length for the output,
280
+ # as removing it here would require also passing the length at the matching layer
281
+ # in the encoder.
282
+ if self.causal:
283
+ # Trim the padding on the right according to the specified ratio
284
+ # if trim_right_ratio = 1.0, trim everything from right
285
+ padding_right = math.ceil(padding_total * self.trim_right_ratio)
286
+ padding_left = padding_total - padding_right
287
+ y = unpad1d(y, (padding_left, padding_right))
288
+ else:
289
+ # Asymmetric padding required for odd strides
290
+ padding_right = padding_total // 2
291
+ padding_left = padding_total - padding_right
292
+ y = unpad1d(y, (padding_left, padding_right))
293
+ return y
294
+
295
+ class SLSTM(nn.Module):
296
+ """
297
+ LSTM without worrying about the hidden state, nor the layout of the data.
298
+ Expects input as convolutional layout.
299
+ """
300
+ def __init__(self, dimension: int, num_layers: int = 2, skip: bool = True):
301
+ super().__init__()
302
+ self.skip = skip
303
+ self.lstm = nn.LSTM(dimension, dimension, num_layers)
304
+ self.hidden = None
305
+ self.cache_enabled = False
306
+
307
+ def forward(self, x):
308
+ x = x.permute(2, 0, 1)
309
+ if self.training or not self.cache_enabled:
310
+ y, _ = self.lstm(x)
311
+ else:
312
+ y, self.hidden = self.lstm(x, self.hidden)
313
+ if self.skip:
314
+ y = y + x
315
+ y = y.permute(1, 2, 0)
316
+ return y
317
+
318
+ def reset_cache(self):
319
+ self.hidden = None
320
+ self.cache_enabled = True