Spaces:
Runtime error
Runtime error
import os | |
import sys | |
import time | |
import librosa | |
import numpy as np | |
from scipy import signal | |
from scipy.io import wavfile | |
from multiprocessing import cpu_count, Pool | |
from pydub import AudioSegment | |
now_directory = os.getcwd() | |
sys.path.append(now_directory) | |
from rvc.lib.utils import load_audio | |
from rvc.train.slicer import Slicer | |
# Parse command line arguments | |
experiment_directory = str(sys.argv[1]) | |
input_root = str(sys.argv[2]) | |
sample_rate = int(sys.argv[3]) | |
percentage = float(sys.argv[4]) | |
num_processes = int(sys.argv[5]) if len(sys.argv) > 5 else cpu_count() | |
# Define constants | |
OVERLAP = 0.3 | |
TAIL = percentage + OVERLAP | |
MAX_AMPLITUDE = 0.9 | |
ALPHA = 0.75 | |
HIGH_PASS_CUTOFF = 48 | |
SAMPLE_RATE_16K = 16000 | |
# Define directory paths | |
GT_WAVS_DIR = os.path.join(experiment_directory, "sliced_audios") | |
WAVS16K_DIR = os.path.join(experiment_directory, "sliced_audios_16k") | |
class PreProcess: | |
def __init__(self, sr: int, exp_dir: str, per: float): | |
self.slicer = Slicer( | |
sr=sr, | |
threshold=-42, | |
min_length=1500, | |
min_interval=400, | |
hop_size=15, | |
max_sil_kept=500, | |
) | |
self.sr = sr | |
self.b_high, self.a_high = signal.butter( | |
N=5, Wn=HIGH_PASS_CUTOFF, btype="high", fs=self.sr | |
) | |
self.per = per | |
self.exp_dir = exp_dir | |
def _normalize_audio(self, audio: np.ndarray): | |
"""Normalizes the audio to the desired amplitude.""" | |
tmp_max = np.abs(audio).max() | |
if tmp_max > 2.5: | |
return None # Indicate audio should be filtered out | |
return (audio / tmp_max * (MAX_AMPLITUDE * ALPHA)) + (1 - ALPHA) * audio | |
def _write_audio(self, audio: np.ndarray, filename: str, sr: int): | |
"""Writes the audio to a WAV file.""" | |
wavfile.write(filename, sr, audio.astype(np.float32)) | |
def process_audio_segment(self, audio_segment: np.ndarray, idx0: int, idx1: int): | |
"""Processes a single audio segment.""" | |
normalized_audio = self._normalize_audio(audio_segment) | |
if normalized_audio is None: | |
print(f"{idx0}-{idx1}-filtered") | |
return | |
# Write original sample rate audio | |
gt_wav_path = os.path.join(GT_WAVS_DIR, f"{idx0}_{idx1}.wav") | |
self._write_audio(normalized_audio, gt_wav_path, self.sr) | |
# Resample and write 16kHz audio | |
audio_16k = librosa.resample( | |
normalized_audio, orig_sr=self.sr, target_sr=SAMPLE_RATE_16K | |
) | |
wav_16k_path = os.path.join(WAVS16K_DIR, f"{idx0}_{idx1}.wav") | |
self._write_audio(audio_16k, wav_16k_path, SAMPLE_RATE_16K) | |
def process_audio(self, path: str, idx0: int): | |
"""Processes a single audio file.""" | |
try: | |
audio = load_audio(path, self.sr) | |
audio = signal.lfilter(self.b_high, self.a_high, audio) | |
idx1 = 0 | |
for audio_segment in self.slicer.slice(audio): | |
i = 0 | |
while True: | |
start = int(self.sr * (self.per - OVERLAP) * i) | |
i += 1 | |
if len(audio_segment[start:]) > TAIL * self.sr: | |
tmp_audio = audio_segment[ | |
start : start + int(self.per * self.sr) | |
] | |
self.process_audio_segment(tmp_audio, idx0, idx1) | |
idx1 += 1 | |
else: | |
tmp_audio = audio_segment[start:] | |
self.process_audio_segment(tmp_audio, idx0, idx1) | |
idx1 += 1 | |
break | |
except Exception as error: | |
print(f"An error occurred on {path} path: {error}") | |
def process_audio_file(self, file_path_idx): | |
file_path, idx0 = file_path_idx | |
# Convert the audio file to WAV format using pydub if necessary | |
ext = os.path.splitext(file_path)[1].lower() | |
if ext not in [".wav"]: | |
audio = AudioSegment.from_file(file_path) | |
file_path = os.path.join("/tmp", f"{idx0}.wav") | |
audio.export(file_path, format="wav") | |
self.process_audio(file_path, idx0) | |
def process_audio_multiprocessing_input_directory( | |
self, input_root: str, num_processes: int | |
): | |
# Get list of files | |
files = [ | |
(os.path.join(input_root, f), idx) | |
for idx, f in enumerate(os.listdir(input_root)) | |
if f.lower().endswith((".wav", ".mp3", ".flac", ".ogg")) | |
] | |
# Create the directories if they don't exist | |
os.makedirs(GT_WAVS_DIR, exist_ok=True) | |
os.makedirs(WAVS16K_DIR, exist_ok=True) | |
# Use multiprocessing to process files | |
with Pool(processes=num_processes) as pool: | |
pool.map(self.process_audio_file, files) | |
def preprocess_training_set( | |
input_root: str, sr: int, num_processes: int, exp_dir: str, per: float | |
): | |
start_time = time.time() | |
pp = PreProcess(sr, exp_dir, per) | |
print(f"Starting preprocess with {num_processes} cores...") | |
pp.process_audio_multiprocessing_input_directory(input_root, num_processes) | |
elapsed_time = time.time() - start_time | |
print(f"Preprocess completed in {elapsed_time:.2f} seconds.") | |
if __name__ == "__main__": | |
preprocess_training_set( | |
input_root, sample_rate, num_processes, experiment_directory, percentage | |
) | |