Spaces:
Running
on
Zero
Running
on
Zero
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import torch | |
import random | |
import numpy as np | |
import torchaudio | |
import librosa | |
from torch.nn import functional as F | |
from torch.nn.utils.rnn import pad_sequence | |
from utils.data_utils import * | |
from models.codec.codec_dataset import CodecDataset | |
class FAcodecDataset(torch.utils.data.Dataset): | |
def __init__(self, cfg, dataset, is_valid=False): | |
""" | |
Args: | |
cfg: config | |
dataset: dataset name | |
is_valid: whether to use train or valid dataset | |
""" | |
self.data_root_dir = cfg.dataset | |
self.data_list = [] | |
# walk through the dataset directory recursively, save all files ends with .wav/.mp3/.opus/.flac/.m4a | |
for root, _, files in os.walk(self.data_root_dir): | |
for file in files: | |
if file.endswith((".wav", ".mp3", ".opus", ".flac", ".m4a")): | |
self.data_list.append(os.path.join(root, file)) | |
self.sr = cfg.preprocess_params.sr | |
self.duration_range = cfg.preprocess_params.duration_range | |
self.to_mel = torchaudio.transforms.MelSpectrogram( | |
n_mels=cfg.preprocess_params.spect_params.n_mels, | |
n_fft=cfg.preprocess_params.spect_params.n_fft, | |
win_length=cfg.preprocess_params.spect_params.win_length, | |
hop_length=cfg.preprocess_params.spect_params.hop_length, | |
) | |
self.mean, self.std = -4, 4 | |
def preprocess(self, wave): | |
wave_tensor = ( | |
torch.from_numpy(wave).float() if isinstance(wave, np.ndarray) else wave | |
) | |
mel_tensor = self.to_mel(wave_tensor) | |
mel_tensor = (torch.log(1e-5 + mel_tensor.unsqueeze(0)) - self.mean) / self.std | |
return mel_tensor | |
def __len__(self): | |
# return len(self.data_list) | |
return len(self.data_list) # return a fixed number for testing | |
def __getitem__(self, index): | |
wave, _ = librosa.load(self.data_list[index], sr=self.sr) | |
wave = np.random.randn(self.sr * random.randint(*self.duration_range)) | |
wave = wave / np.max(np.abs(wave)) | |
mel = self.preprocess(wave).squeeze(0) | |
wave = torch.from_numpy(wave).float() | |
return wave, mel | |
class FAcodecCollator(object): | |
"""Zero-pads model inputs and targets based on number of frames per step""" | |
def __init__(self, cfg): | |
self.cfg = cfg | |
def __call__(self, batch): | |
# batch[0] = wave, mel, text, f0, speakerid | |
batch_size = len(batch) | |
# sort by mel length | |
lengths = [b[1].shape[1] for b in batch] | |
batch_indexes = np.argsort(lengths)[::-1] | |
batch = [batch[bid] for bid in batch_indexes] | |
nmels = batch[0][1].size(0) | |
max_mel_length = max([b[1].shape[1] for b in batch]) | |
max_wave_length = max([b[0].size(0) for b in batch]) | |
mels = torch.zeros((batch_size, nmels, max_mel_length)).float() - 10 | |
waves = torch.zeros((batch_size, max_wave_length)).float() | |
mel_lengths = torch.zeros(batch_size).long() | |
wave_lengths = torch.zeros(batch_size).long() | |
for bid, (wave, mel) in enumerate(batch): | |
mel_size = mel.size(1) | |
mels[bid, :, :mel_size] = mel | |
waves[bid, : wave.size(0)] = wave | |
mel_lengths[bid] = mel_size | |
wave_lengths[bid] = wave.size(0) | |
return waves, mels, wave_lengths, mel_lengths | |