Spaces:
Running
Running
from gtts import gTTS | |
import edge_tts, asyncio, json, glob # noqa | |
from tqdm import tqdm | |
import librosa, os, re, torch, gc, subprocess # noqa | |
from .language_configuration import ( | |
fix_code_language, | |
BARK_VOICES_LIST, | |
VITS_VOICES_LIST, | |
) | |
from .utils import ( | |
download_manager, | |
create_directories, | |
copy_files, | |
rename_file, | |
remove_directory_contents, | |
remove_files, | |
run_command, | |
) | |
import numpy as np | |
from typing import Any, Dict | |
from pathlib import Path | |
import soundfile as sf | |
import platform | |
import logging | |
import traceback | |
from .logging_setup import logger | |
class TTS_OperationError(Exception): | |
def __init__(self, message="The operation did not complete successfully."): | |
self.message = message | |
super().__init__(self.message) | |
def verify_saved_file_and_size(filename): | |
if not os.path.exists(filename): | |
raise TTS_OperationError(f"File '{filename}' was not saved.") | |
if os.path.getsize(filename) == 0: | |
raise TTS_OperationError( | |
f"File '{filename}' has a zero size. " | |
"Related to incorrect TTS for the target language" | |
) | |
def error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename): | |
traceback.print_exc() | |
logger.error(f"Error: {str(error)}") | |
try: | |
from tempfile import TemporaryFile | |
tts = gTTS(segment["text"], lang=fix_code_language(TRANSLATE_AUDIO_TO)) | |
# tts.save(filename) | |
f = TemporaryFile() | |
tts.write_to_fp(f) | |
# Reset the file pointer to the beginning of the file | |
f.seek(0) | |
# Read audio data from the TemporaryFile using soundfile | |
audio_data, samplerate = sf.read(f) | |
f.close() # Close the TemporaryFile | |
sf.write( | |
filename, audio_data, samplerate, format="ogg", subtype="vorbis" | |
) | |
logger.warning( | |
'TTS auxiliary will be utilized ' | |
f'rather than TTS: {segment["tts_name"]}' | |
) | |
verify_saved_file_and_size(filename) | |
except Exception as error: | |
logger.critical(f"Error: {str(error)}") | |
sample_rate_aux = 22050 | |
duration = float(segment["end"]) - float(segment["start"]) | |
data = np.zeros(int(sample_rate_aux * duration)).astype(np.float32) | |
sf.write( | |
filename, data, sample_rate_aux, format="ogg", subtype="vorbis" | |
) | |
logger.error("Audio will be replaced -> [silent audio].") | |
verify_saved_file_and_size(filename) | |
def pad_array(array, sr): | |
if isinstance(array, list): | |
array = np.array(array) | |
if not array.shape[0]: | |
raise ValueError("The generated audio does not contain any data") | |
valid_indices = np.where(np.abs(array) > 0.001)[0] | |
if len(valid_indices) == 0: | |
logger.debug(f"No valid indices: {array}") | |
return array | |
try: | |
pad_indice = int(0.1 * sr) | |
start_pad = max(0, valid_indices[0] - pad_indice) | |
end_pad = min(len(array), valid_indices[-1] + 1 + pad_indice) | |
padded_array = array[start_pad:end_pad] | |
return padded_array | |
except Exception as error: | |
logger.error(str(error)) | |
return array | |
# ===================================== | |
# EDGE TTS | |
# ===================================== | |
def edge_tts_voices_list(): | |
try: | |
completed_process = subprocess.run( | |
["edge-tts", "--list-voices"], capture_output=True, text=True | |
) | |
lines = completed_process.stdout.strip().split("\n") | |
except Exception as error: | |
logger.debug(str(error)) | |
lines = [] | |
voices = [] | |
for line in lines: | |
if line.startswith("Name: "): | |
voice_entry = {} | |
voice_entry["Name"] = line.split(": ")[1] | |
elif line.startswith("Gender: "): | |
voice_entry["Gender"] = line.split(": ")[1] | |
voices.append(voice_entry) | |
formatted_voices = [ | |
f"{entry['Name']}-{entry['Gender']}" for entry in voices | |
] | |
if not formatted_voices: | |
logger.warning( | |
"The list of Edge TTS voices could not be obtained, " | |
"switching to an alternative method" | |
) | |
tts_voice_list = asyncio.new_event_loop().run_until_complete( | |
edge_tts.list_voices() | |
) | |
formatted_voices = sorted( | |
[f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] | |
) | |
if not formatted_voices: | |
logger.error("Can't get EDGE TTS - list voices") | |
return formatted_voices | |
def segments_egde_tts(filtered_edge_segments, TRANSLATE_AUDIO_TO, is_gui): | |
for segment in tqdm(filtered_edge_segments["segments"]): | |
speaker = segment["speaker"] # noqa | |
text = segment["text"] | |
start = segment["start"] | |
tts_name = segment["tts_name"] | |
# make the tts audio | |
filename = f"audio/{start}.ogg" | |
temp_file = filename[:-3] + "mp3" | |
logger.info(f"{text} >> {filename}") | |
try: | |
if is_gui: | |
asyncio.run( | |
edge_tts.Communicate( | |
text, "-".join(tts_name.split("-")[:-1]) | |
).save(temp_file) | |
) | |
else: | |
# nest_asyncio.apply() if not is_gui else None | |
command = f'edge-tts -t "{text}" -v "{tts_name.replace("-Male", "").replace("-Female", "")}" --write-media "{temp_file}"' | |
run_command(command) | |
verify_saved_file_and_size(temp_file) | |
data, sample_rate = sf.read(temp_file) | |
data = pad_array(data, sample_rate) | |
# os.remove(temp_file) | |
# Save file | |
sf.write( | |
file=filename, | |
samplerate=sample_rate, | |
data=data, | |
format="ogg", | |
subtype="vorbis", | |
) | |
verify_saved_file_and_size(filename) | |
except Exception as error: | |
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) | |
# ===================================== | |
# BARK TTS | |
# ===================================== | |
def segments_bark_tts( | |
filtered_bark_segments, TRANSLATE_AUDIO_TO, model_id_bark="suno/bark-small" | |
): | |
from transformers import AutoProcessor, BarkModel | |
from optimum.bettertransformer import BetterTransformer | |
device = os.environ.get("SONITR_DEVICE") | |
torch_dtype_env = torch.float16 if device == "cuda" else torch.float32 | |
# load model bark | |
model = BarkModel.from_pretrained( | |
model_id_bark, torch_dtype=torch_dtype_env | |
).to(device) | |
model = model.to(device) | |
processor = AutoProcessor.from_pretrained( | |
model_id_bark, return_tensors="pt" | |
) # , padding=True | |
if device == "cuda": | |
# convert to bettertransformer | |
model = BetterTransformer.transform(model, keep_original_model=False) | |
# enable CPU offload | |
# model.enable_cpu_offload() | |
sampling_rate = model.generation_config.sample_rate | |
# filtered_segments = filtered_bark_segments['segments'] | |
# Sorting the segments by 'tts_name' | |
# sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) | |
# logger.debug(sorted_segments) | |
for segment in tqdm(filtered_bark_segments["segments"]): | |
speaker = segment["speaker"] # noqa | |
text = segment["text"] | |
start = segment["start"] | |
tts_name = segment["tts_name"] | |
inputs = processor(text, voice_preset=BARK_VOICES_LIST[tts_name]).to( | |
device | |
) | |
# make the tts audio | |
filename = f"audio/{start}.ogg" | |
logger.info(f"{text} >> {filename}") | |
try: | |
# Infer | |
with torch.inference_mode(): | |
speech_output = model.generate( | |
**inputs, | |
do_sample=True, | |
fine_temperature=0.4, | |
coarse_temperature=0.8, | |
pad_token_id=processor.tokenizer.pad_token_id, | |
) | |
# Save file | |
data_tts = pad_array( | |
speech_output.cpu().numpy().squeeze().astype(np.float32), | |
sampling_rate, | |
) | |
sf.write( | |
file=filename, | |
samplerate=sampling_rate, | |
data=data_tts, | |
format="ogg", | |
subtype="vorbis", | |
) | |
verify_saved_file_and_size(filename) | |
except Exception as error: | |
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) | |
gc.collect() | |
torch.cuda.empty_cache() | |
try: | |
del processor | |
del model | |
gc.collect() | |
torch.cuda.empty_cache() | |
except Exception as error: | |
logger.error(str(error)) | |
gc.collect() | |
torch.cuda.empty_cache() | |
# ===================================== | |
# VITS TTS | |
# ===================================== | |
def uromanize(input_string): | |
"""Convert non-Roman strings to Roman using the `uroman` perl package.""" | |
# script_path = os.path.join(uroman_path, "bin", "uroman.pl") | |
if not os.path.exists("./uroman"): | |
logger.info( | |
"Clonning repository uroman https://github.com/isi-nlp/uroman.git" | |
" for romanize the text" | |
) | |
process = subprocess.Popen( | |
["git", "clone", "https://github.com/isi-nlp/uroman.git"], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
stdout, stderr = process.communicate() | |
script_path = os.path.join("./uroman", "bin", "uroman.pl") | |
command = ["perl", script_path] | |
process = subprocess.Popen( | |
command, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
# Execute the perl command | |
stdout, stderr = process.communicate(input=input_string.encode()) | |
if process.returncode != 0: | |
raise ValueError(f"Error {process.returncode}: {stderr.decode()}") | |
# Return the output as a string and skip the new-line character at the end | |
return stdout.decode()[:-1] | |
def segments_vits_tts(filtered_vits_segments, TRANSLATE_AUDIO_TO): | |
from transformers import VitsModel, AutoTokenizer | |
filtered_segments = filtered_vits_segments["segments"] | |
# Sorting the segments by 'tts_name' | |
sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) | |
logger.debug(sorted_segments) | |
model_name_key = None | |
for segment in tqdm(sorted_segments): | |
speaker = segment["speaker"] # noqa | |
text = segment["text"] | |
start = segment["start"] | |
tts_name = segment["tts_name"] | |
if tts_name != model_name_key: | |
model_name_key = tts_name | |
model = VitsModel.from_pretrained(VITS_VOICES_LIST[tts_name]) | |
tokenizer = AutoTokenizer.from_pretrained( | |
VITS_VOICES_LIST[tts_name] | |
) | |
sampling_rate = model.config.sampling_rate | |
if tokenizer.is_uroman: | |
romanize_text = uromanize(text) | |
logger.debug(f"Romanize text: {romanize_text}") | |
inputs = tokenizer(romanize_text, return_tensors="pt") | |
else: | |
inputs = tokenizer(text, return_tensors="pt") | |
# make the tts audio | |
filename = f"audio/{start}.ogg" | |
logger.info(f"{text} >> {filename}") | |
try: | |
# Infer | |
with torch.no_grad(): | |
speech_output = model(**inputs).waveform | |
data_tts = pad_array( | |
speech_output.cpu().numpy().squeeze().astype(np.float32), | |
sampling_rate, | |
) | |
# Save file | |
sf.write( | |
file=filename, | |
samplerate=sampling_rate, | |
data=data_tts, | |
format="ogg", | |
subtype="vorbis", | |
) | |
verify_saved_file_and_size(filename) | |
except Exception as error: | |
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) | |
gc.collect() | |
torch.cuda.empty_cache() | |
try: | |
del tokenizer | |
del model | |
gc.collect() | |
torch.cuda.empty_cache() | |
except Exception as error: | |
logger.error(str(error)) | |
gc.collect() | |
torch.cuda.empty_cache() | |
# ===================================== | |
# Coqui XTTS | |
# ===================================== | |
def coqui_xtts_voices_list(): | |
main_folder = "_XTTS_" | |
pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") | |
pattern_automatic_speaker = re.compile(r"AUTOMATIC_SPEAKER_\d+\.wav$") | |
# List only files in the directory matching the pattern but not matching | |
# AUTOMATIC_SPEAKER_00.wav, AUTOMATIC_SPEAKER_01.wav, etc. | |
wav_voices = [ | |
"_XTTS_/" + f | |
for f in os.listdir(main_folder) | |
if os.path.isfile(os.path.join(main_folder, f)) | |
and pattern_coqui.match(f) | |
and not pattern_automatic_speaker.match(f) | |
] | |
return ["_XTTS_/AUTOMATIC.wav"] + wav_voices | |
def seconds_to_hhmmss_ms(seconds): | |
hours = seconds // 3600 | |
minutes = (seconds % 3600) // 60 | |
seconds = seconds % 60 | |
milliseconds = int((seconds - int(seconds)) * 1000) | |
return "%02d:%02d:%02d.%03d" % (hours, minutes, int(seconds), milliseconds) | |
def audio_trimming(audio_path, destination, start, end): | |
if isinstance(start, (int, float)): | |
start = seconds_to_hhmmss_ms(start) | |
if isinstance(end, (int, float)): | |
end = seconds_to_hhmmss_ms(end) | |
if destination: | |
file_directory = destination | |
else: | |
file_directory = os.path.dirname(audio_path) | |
file_name = os.path.splitext(os.path.basename(audio_path))[0] | |
file_ = f"{file_name}_trim.wav" | |
# file_ = f'{os.path.splitext(audio_path)[0]}_trim.wav' | |
output_path = os.path.join(file_directory, file_) | |
# -t (duration from -ss) | -to (time stop) | -af silenceremove=1:0:-50dB (remove silence) | |
command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ss {start} -to {end} -acodec pcm_s16le -f wav "{output_path}"' | |
run_command(command) | |
return output_path | |
def convert_to_xtts_good_sample(audio_path: str = "", destination: str = ""): | |
if destination: | |
file_directory = destination | |
else: | |
file_directory = os.path.dirname(audio_path) | |
file_name = os.path.splitext(os.path.basename(audio_path))[0] | |
file_ = f"{file_name}_good_sample.wav" | |
# file_ = f'{os.path.splitext(audio_path)[0]}_good_sample.wav' | |
mono_path = os.path.join(file_directory, file_) # get root | |
command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 1 -ar 22050 -sample_fmt s16 -f wav "{mono_path}"' | |
run_command(command) | |
return mono_path | |
def sanitize_file_name(file_name): | |
import unicodedata | |
# Normalize the string to NFKD form to separate combined characters into | |
# base characters and diacritics | |
normalized_name = unicodedata.normalize("NFKD", file_name) | |
# Replace any non-ASCII characters or special symbols with an underscore | |
sanitized_name = re.sub(r"[^\w\s.-]", "_", normalized_name) | |
return sanitized_name | |
def create_wav_file_vc( | |
sample_name="", # name final file | |
audio_wav="", # path | |
start=None, # trim start | |
end=None, # trim end | |
output_final_path="_XTTS_", | |
get_vocals_dereverb=True, | |
): | |
sample_name = sample_name if sample_name else "default_name" | |
sample_name = sanitize_file_name(sample_name) | |
audio_wav = audio_wav if isinstance(audio_wav, str) else audio_wav.name | |
BASE_DIR = ( | |
"." # os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
) | |
output_dir = os.path.join(BASE_DIR, "clean_song_output") # remove content | |
# remove_directory_contents(output_dir) | |
if start or end: | |
# Cut file | |
audio_segment = audio_trimming(audio_wav, output_dir, start, end) | |
else: | |
# Complete file | |
audio_segment = audio_wav | |
from .mdx_net import process_uvr_task | |
try: | |
_, _, _, _, audio_segment = process_uvr_task( | |
orig_song_path=audio_segment, | |
main_vocals=True, | |
dereverb=get_vocals_dereverb, | |
) | |
except Exception as error: | |
logger.error(str(error)) | |
sample = convert_to_xtts_good_sample(audio_segment) | |
sample_name = f"{sample_name}.wav" | |
sample_rename = rename_file(sample, sample_name) | |
copy_files(sample_rename, output_final_path) | |
final_sample = os.path.join(output_final_path, sample_name) | |
if os.path.exists(final_sample): | |
logger.info(final_sample) | |
return final_sample | |
else: | |
raise Exception(f"Error wav: {final_sample}") | |
def create_new_files_for_vc( | |
speakers_coqui, | |
segments_base, | |
dereverb_automatic=True | |
): | |
# before function delete automatic delete_previous_automatic | |
output_dir = os.path.join(".", "clean_song_output") # remove content | |
remove_directory_contents(output_dir) | |
for speaker in speakers_coqui: | |
filtered_speaker = [ | |
segment | |
for segment in segments_base | |
if segment["speaker"] == speaker | |
] | |
if len(filtered_speaker) > 4: | |
filtered_speaker = filtered_speaker[1:] | |
if filtered_speaker[0]["tts_name"] == "_XTTS_/AUTOMATIC.wav": | |
name_automatic_wav = f"AUTOMATIC_{speaker}" | |
if os.path.exists(f"_XTTS_/{name_automatic_wav}.wav"): | |
logger.info(f"WAV automatic {speaker} exists") | |
# path_wav = path_automatic_wav | |
pass | |
else: | |
# create wav | |
wav_ok = False | |
for seg in filtered_speaker: | |
duration = float(seg["end"]) - float(seg["start"]) | |
if duration > 7.0 and duration < 12.0: | |
logger.info( | |
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' | |
) | |
create_wav_file_vc( | |
sample_name=name_automatic_wav, | |
audio_wav="audio.wav", | |
start=(float(seg["start"]) + 1.0), | |
end=(float(seg["end"]) - 1.0), | |
get_vocals_dereverb=dereverb_automatic, | |
) | |
wav_ok = True | |
break | |
if not wav_ok: | |
logger.info("Taking the first segment") | |
seg = filtered_speaker[0] | |
logger.info( | |
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' | |
) | |
max_duration = float(seg["end"]) - float(seg["start"]) | |
max_duration = max(2.0, min(max_duration, 9.0)) | |
create_wav_file_vc( | |
sample_name=name_automatic_wav, | |
audio_wav="audio.wav", | |
start=(float(seg["start"])), | |
end=(float(seg["start"]) + max_duration), | |
get_vocals_dereverb=dereverb_automatic, | |
) | |
def segments_coqui_tts( | |
filtered_coqui_segments, | |
TRANSLATE_AUDIO_TO, | |
model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", | |
speakers_coqui=None, | |
delete_previous_automatic=True, | |
dereverb_automatic=True, | |
emotion=None, | |
): | |
"""XTTS | |
Install: | |
pip install -q TTS==0.21.1 | |
pip install -q numpy==1.23.5 | |
Notes: | |
- tts_name is the wav|mp3|ogg|m4a file for VC | |
""" | |
from TTS.api import TTS | |
TRANSLATE_AUDIO_TO = fix_code_language(TRANSLATE_AUDIO_TO, syntax="coqui") | |
supported_lang_coqui = [ | |
"zh-cn", | |
"en", | |
"fr", | |
"de", | |
"it", | |
"pt", | |
"pl", | |
"tr", | |
"ru", | |
"nl", | |
"cs", | |
"ar", | |
"es", | |
"hu", | |
"ko", | |
"ja", | |
] | |
if TRANSLATE_AUDIO_TO not in supported_lang_coqui: | |
raise TTS_OperationError( | |
f"'{TRANSLATE_AUDIO_TO}' is not a supported language for Coqui XTTS" | |
) | |
# Emotion and speed can only be used with Coqui Studio models. discontinued | |
# emotions = ["Neutral", "Happy", "Sad", "Angry", "Dull"] | |
if delete_previous_automatic: | |
for spk in speakers_coqui: | |
remove_files(f"_XTTS_/AUTOMATIC_{spk}.wav") | |
directory_audios_vc = "_XTTS_" | |
create_directories(directory_audios_vc) | |
create_new_files_for_vc( | |
speakers_coqui, | |
filtered_coqui_segments["segments"], | |
dereverb_automatic, | |
) | |
# Init TTS | |
device = os.environ.get("SONITR_DEVICE") | |
model = TTS(model_id_coqui).to(device) | |
sampling_rate = 24000 | |
# filtered_segments = filtered_coqui_segments['segments'] | |
# Sorting the segments by 'tts_name' | |
# sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) | |
# logger.debug(sorted_segments) | |
for segment in tqdm(filtered_coqui_segments["segments"]): | |
speaker = segment["speaker"] | |
text = segment["text"] | |
start = segment["start"] | |
tts_name = segment["tts_name"] | |
if tts_name == "_XTTS_/AUTOMATIC.wav": | |
tts_name = f"_XTTS_/AUTOMATIC_{speaker}.wav" | |
# make the tts audio | |
filename = f"audio/{start}.ogg" | |
logger.info(f"{text} >> {filename}") | |
try: | |
# Infer | |
wav = model.tts( | |
text=text, speaker_wav=tts_name, language=TRANSLATE_AUDIO_TO | |
) | |
data_tts = pad_array( | |
wav, | |
sampling_rate, | |
) | |
# Save file | |
sf.write( | |
file=filename, | |
samplerate=sampling_rate, | |
data=data_tts, | |
format="ogg", | |
subtype="vorbis", | |
) | |
verify_saved_file_and_size(filename) | |
except Exception as error: | |
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) | |
gc.collect() | |
torch.cuda.empty_cache() | |
try: | |
del model | |
gc.collect() | |
torch.cuda.empty_cache() | |
except Exception as error: | |
logger.error(str(error)) | |
gc.collect() | |
torch.cuda.empty_cache() | |
# ===================================== | |
# PIPER TTS | |
# ===================================== | |
def piper_tts_voices_list(): | |
file_path = download_manager( | |
url="https://huggingface.co./rhasspy/piper-voices/resolve/main/voices.json", | |
path="./PIPER_MODELS", | |
) | |
with open(file_path, "r", encoding="utf8") as file: | |
data = json.load(file) | |
piper_id_models = [key + " VITS-onnx" for key in data.keys()] | |
return piper_id_models | |
def replace_text_in_json(file_path, key_to_replace, new_text, condition=None): | |
# Read the JSON file | |
with open(file_path, "r", encoding="utf-8") as file: | |
data = json.load(file) | |
# Modify the specified key's value with the new text | |
if key_to_replace in data: | |
if condition: | |
value_condition = condition | |
else: | |
value_condition = data[key_to_replace] | |
if data[key_to_replace] == value_condition: | |
data[key_to_replace] = new_text | |
# Write the modified content back to the JSON file | |
with open(file_path, "w") as file: | |
json.dump( | |
data, file, indent=2 | |
) # Write the modified data back to the file with indentation for readability | |
def load_piper_model( | |
model: str, | |
data_dir: list, | |
download_dir: str = "", | |
update_voices: bool = False, | |
): | |
from piper import PiperVoice | |
from piper.download import ensure_voice_exists, find_voice, get_voices | |
try: | |
import onnxruntime as rt | |
if rt.get_device() == "GPU" and os.environ.get("SONITR_DEVICE") == "cuda": | |
logger.debug("onnxruntime device > GPU") | |
cuda = True | |
else: | |
logger.info( | |
"onnxruntime device > CPU" | |
) # try pip install onnxruntime-gpu | |
cuda = False | |
except Exception as error: | |
raise TTS_OperationError(f"onnxruntime error: {str(error)}") | |
# Disable CUDA in Windows | |
if platform.system() == "Windows": | |
logger.info("Employing CPU exclusivity with Piper TTS") | |
cuda = False | |
if not download_dir: | |
# Download to first data directory by default | |
download_dir = data_dir[0] | |
else: | |
data_dir = [os.path.join(data_dir[0], download_dir)] | |
# Download voice if file doesn't exist | |
model_path = Path(model) | |
if not model_path.exists(): | |
# Load voice info | |
voices_info = get_voices(download_dir, update_voices=update_voices) | |
# Resolve aliases for backwards compatibility with old voice names | |
aliases_info: Dict[str, Any] = {} | |
for voice_info in voices_info.values(): | |
for voice_alias in voice_info.get("aliases", []): | |
aliases_info[voice_alias] = {"_is_alias": True, **voice_info} | |
voices_info.update(aliases_info) | |
ensure_voice_exists(model, data_dir, download_dir, voices_info) | |
model, config = find_voice(model, data_dir) | |
replace_text_in_json( | |
config, "phoneme_type", "espeak", "PhonemeType.ESPEAK" | |
) | |
# Load voice | |
voice = PiperVoice.load(model, config_path=config, use_cuda=cuda) | |
return voice | |
def synthesize_text_to_audio_np_array(voice, text, synthesize_args): | |
audio_stream = voice.synthesize_stream_raw(text, **synthesize_args) | |
# Collect the audio bytes into a single NumPy array | |
audio_data = b"" | |
for audio_bytes in audio_stream: | |
audio_data += audio_bytes | |
# Ensure correct data type and convert audio bytes to NumPy array | |
audio_np = np.frombuffer(audio_data, dtype=np.int16) | |
return audio_np | |
def segments_vits_onnx_tts(filtered_onnx_vits_segments, TRANSLATE_AUDIO_TO): | |
""" | |
Install: | |
pip install -q piper-tts==1.2.0 onnxruntime-gpu # for cuda118 | |
""" | |
data_dir = [ | |
str(Path.cwd()) | |
] # "Data directory to check for downloaded models (default: current directory)" | |
download_dir = "PIPER_MODELS" | |
# model_name = "en_US-lessac-medium" tts_name in a dict like VITS | |
update_voices = True # "Download latest voices.json during startup", | |
synthesize_args = { | |
"speaker_id": None, | |
"length_scale": 1.0, | |
"noise_scale": 0.667, | |
"noise_w": 0.8, | |
"sentence_silence": 0.0, | |
} | |
filtered_segments = filtered_onnx_vits_segments["segments"] | |
# Sorting the segments by 'tts_name' | |
sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"]) | |
logger.debug(sorted_segments) | |
model_name_key = None | |
for segment in tqdm(sorted_segments): | |
speaker = segment["speaker"] # noqa | |
text = segment["text"] | |
start = segment["start"] | |
tts_name = segment["tts_name"].replace(" VITS-onnx", "") | |
if tts_name != model_name_key: | |
model_name_key = tts_name | |
model = load_piper_model( | |
tts_name, data_dir, download_dir, update_voices | |
) | |
sampling_rate = model.config.sample_rate | |
# make the tts audio | |
filename = f"audio/{start}.ogg" | |
logger.info(f"{text} >> {filename}") | |
try: | |
# Infer | |
speech_output = synthesize_text_to_audio_np_array( | |
model, text, synthesize_args | |
) | |
data_tts = pad_array( | |
speech_output, # .cpu().numpy().squeeze().astype(np.float32), | |
sampling_rate, | |
) | |
# Save file | |
sf.write( | |
file=filename, | |
samplerate=sampling_rate, | |
data=data_tts, | |
format="ogg", | |
subtype="vorbis", | |
) | |
verify_saved_file_and_size(filename) | |
except Exception as error: | |
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) | |
gc.collect() | |
torch.cuda.empty_cache() | |
try: | |
del model | |
gc.collect() | |
torch.cuda.empty_cache() | |
except Exception as error: | |
logger.error(str(error)) | |
gc.collect() | |
torch.cuda.empty_cache() | |
# ===================================== | |
# CLOSEAI TTS | |
# ===================================== | |
def segments_openai_tts( | |
filtered_openai_tts_segments, TRANSLATE_AUDIO_TO | |
): | |
from openai import OpenAI | |
client = OpenAI() | |
sampling_rate = 24000 | |
# filtered_segments = filtered_openai_tts_segments['segments'] | |
# Sorting the segments by 'tts_name' | |
# sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name']) | |
for segment in tqdm(filtered_openai_tts_segments["segments"]): | |
speaker = segment["speaker"] # noqa | |
text = segment["text"].strip() | |
start = segment["start"] | |
tts_name = segment["tts_name"] | |
# make the tts audio | |
filename = f"audio/{start}.ogg" | |
logger.info(f"{text} >> {filename}") | |
try: | |
# Request | |
response = client.audio.speech.create( | |
model="tts-1-hd" if "HD" in tts_name else "tts-1", | |
voice=tts_name.split()[0][1:], | |
response_format="wav", | |
input=text | |
) | |
audio_bytes = b'' | |
for data in response.iter_bytes(chunk_size=4096): | |
audio_bytes += data | |
speech_output = np.frombuffer(audio_bytes, dtype=np.int16) | |
# Save file | |
data_tts = pad_array( | |
speech_output[240:], | |
sampling_rate, | |
) | |
sf.write( | |
file=filename, | |
samplerate=sampling_rate, | |
data=data_tts, | |
format="ogg", | |
subtype="vorbis", | |
) | |
verify_saved_file_and_size(filename) | |
except Exception as error: | |
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename) | |
# ===================================== | |
# Select task TTS | |
# ===================================== | |
def find_spkr(pattern, speaker_to_voice, segments): | |
return [ | |
speaker | |
for speaker, voice in speaker_to_voice.items() | |
if pattern.match(voice) and any( | |
segment["speaker"] == speaker for segment in segments | |
) | |
] | |
def filter_by_speaker(speakers, segments): | |
return { | |
"segments": [ | |
segment | |
for segment in segments | |
if segment["speaker"] in speakers | |
] | |
} | |
def audio_segmentation_to_voice( | |
result_diarize, | |
TRANSLATE_AUDIO_TO, | |
is_gui, | |
tts_voice00, | |
tts_voice01="", | |
tts_voice02="", | |
tts_voice03="", | |
tts_voice04="", | |
tts_voice05="", | |
tts_voice06="", | |
tts_voice07="", | |
tts_voice08="", | |
tts_voice09="", | |
tts_voice10="", | |
tts_voice11="", | |
dereverb_automatic=True, | |
model_id_bark="suno/bark-small", | |
model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2", | |
delete_previous_automatic=True, | |
): | |
remove_directory_contents("audio") | |
# Mapping speakers to voice variables | |
speaker_to_voice = { | |
"SPEAKER_00": tts_voice00, | |
"SPEAKER_01": tts_voice01, | |
"SPEAKER_02": tts_voice02, | |
"SPEAKER_03": tts_voice03, | |
"SPEAKER_04": tts_voice04, | |
"SPEAKER_05": tts_voice05, | |
"SPEAKER_06": tts_voice06, | |
"SPEAKER_07": tts_voice07, | |
"SPEAKER_08": tts_voice08, | |
"SPEAKER_09": tts_voice09, | |
"SPEAKER_10": tts_voice10, | |
"SPEAKER_11": tts_voice11, | |
} | |
# Assign 'SPEAKER_00' to segments without a 'speaker' key | |
for segment in result_diarize["segments"]: | |
if "speaker" not in segment: | |
segment["speaker"] = "SPEAKER_00" | |
logger.warning( | |
"NO SPEAKER DETECT IN SEGMENT: First TTS will be used in the" | |
f" segment time {segment['start'], segment['text']}" | |
) | |
# Assign the TTS name | |
segment["tts_name"] = speaker_to_voice[segment["speaker"]] | |
# Find TTS method | |
pattern_edge = re.compile(r".*-(Male|Female)$") | |
pattern_bark = re.compile(r".* BARK$") | |
pattern_vits = re.compile(r".* VITS$") | |
pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$") | |
pattern_vits_onnx = re.compile(r".* VITS-onnx$") | |
pattern_openai_tts = re.compile(r".* OpenAI-TTS$") | |
all_segments = result_diarize["segments"] | |
speakers_edge = find_spkr(pattern_edge, speaker_to_voice, all_segments) | |
speakers_bark = find_spkr(pattern_bark, speaker_to_voice, all_segments) | |
speakers_vits = find_spkr(pattern_vits, speaker_to_voice, all_segments) | |
speakers_coqui = find_spkr(pattern_coqui, speaker_to_voice, all_segments) | |
speakers_vits_onnx = find_spkr( | |
pattern_vits_onnx, speaker_to_voice, all_segments | |
) | |
speakers_openai_tts = find_spkr( | |
pattern_openai_tts, speaker_to_voice, all_segments | |
) | |
# Filter method in segments | |
filtered_edge = filter_by_speaker(speakers_edge, all_segments) | |
filtered_bark = filter_by_speaker(speakers_bark, all_segments) | |
filtered_vits = filter_by_speaker(speakers_vits, all_segments) | |
filtered_coqui = filter_by_speaker(speakers_coqui, all_segments) | |
filtered_vits_onnx = filter_by_speaker(speakers_vits_onnx, all_segments) | |
filtered_openai_tts = filter_by_speaker(speakers_openai_tts, all_segments) | |
# Infer | |
if filtered_edge["segments"]: | |
logger.info(f"EDGE TTS: {speakers_edge}") | |
segments_egde_tts(filtered_edge, TRANSLATE_AUDIO_TO, is_gui) # mp3 | |
if filtered_bark["segments"]: | |
logger.info(f"BARK TTS: {speakers_bark}") | |
segments_bark_tts( | |
filtered_bark, TRANSLATE_AUDIO_TO, model_id_bark | |
) # wav | |
if filtered_vits["segments"]: | |
logger.info(f"VITS TTS: {speakers_vits}") | |
segments_vits_tts(filtered_vits, TRANSLATE_AUDIO_TO) # wav | |
if filtered_coqui["segments"]: | |
logger.info(f"Coqui TTS: {speakers_coqui}") | |
segments_coqui_tts( | |
filtered_coqui, | |
TRANSLATE_AUDIO_TO, | |
model_id_coqui, | |
speakers_coqui, | |
delete_previous_automatic, | |
dereverb_automatic, | |
) # wav | |
if filtered_vits_onnx["segments"]: | |
logger.info(f"PIPER TTS: {speakers_vits_onnx}") | |
segments_vits_onnx_tts(filtered_vits_onnx, TRANSLATE_AUDIO_TO) # wav | |
if filtered_openai_tts["segments"]: | |
logger.info(f"OpenAI TTS: {speakers_openai_tts}") | |
segments_openai_tts(filtered_openai_tts, TRANSLATE_AUDIO_TO) # wav | |
[result.pop("tts_name", None) for result in result_diarize["segments"]] | |
return [ | |
speakers_edge, | |
speakers_bark, | |
speakers_vits, | |
speakers_coqui, | |
speakers_vits_onnx, | |
speakers_openai_tts | |
] | |
def accelerate_segments( | |
result_diarize, | |
max_accelerate_audio, | |
valid_speakers, | |
acceleration_rate_regulation=False, | |
folder_output="audio2", | |
): | |
logger.info("Apply acceleration") | |
( | |
speakers_edge, | |
speakers_bark, | |
speakers_vits, | |
speakers_coqui, | |
speakers_vits_onnx, | |
speakers_openai_tts | |
) = valid_speakers | |
create_directories(f"{folder_output}/audio/") | |
remove_directory_contents(f"{folder_output}/audio/") | |
audio_files = [] | |
speakers_list = [] | |
max_count_segments_idx = len(result_diarize["segments"]) - 1 | |
for i, segment in tqdm(enumerate(result_diarize["segments"])): | |
text = segment["text"] # noqa | |
start = segment["start"] | |
end = segment["end"] | |
speaker = segment["speaker"] | |
# find name audio | |
# if speaker in speakers_edge: | |
filename = f"audio/{start}.ogg" | |
# elif speaker in speakers_bark + speakers_vits + speakers_coqui + speakers_vits_onnx: | |
# filename = f"audio/{start}.wav" # wav | |
# duration | |
duration_true = end - start | |
duration_tts = librosa.get_duration(filename=filename) | |
# Accelerate percentage | |
acc_percentage = duration_tts / duration_true | |
# Smoth | |
if acceleration_rate_regulation and acc_percentage >= 1.3: | |
try: | |
next_segment = result_diarize["segments"][ | |
min(max_count_segments_idx, i + 1) | |
] | |
next_start = next_segment["start"] | |
next_speaker = next_segment["speaker"] | |
duration_with_next_start = next_start - start | |
if duration_with_next_start > duration_true: | |
extra_time = duration_with_next_start - duration_true | |
if speaker == next_speaker: | |
# half | |
smoth_duration = duration_true + (extra_time * 0.5) | |
else: | |
# 7/10 | |
smoth_duration = duration_true + (extra_time * 0.7) | |
logger.debug( | |
f"Base acc: {acc_percentage}, " | |
f"smoth acc: {duration_tts / smoth_duration}" | |
) | |
acc_percentage = max(1.2, (duration_tts / smoth_duration)) | |
except Exception as error: | |
logger.error(str(error)) | |
if acc_percentage > max_accelerate_audio: | |
acc_percentage = max_accelerate_audio | |
elif acc_percentage <= 1.15 and acc_percentage >= 0.8: | |
acc_percentage = 1.0 | |
elif acc_percentage <= 0.79: | |
acc_percentage = 0.8 | |
# Round | |
acc_percentage = round(acc_percentage + 0.0, 1) | |
# Format read if need | |
if speaker in speakers_edge: | |
info_enc = sf.info(filename).format | |
else: | |
info_enc = "OGG" | |
# Apply aceleration or opposite to the audio file in folder_output folder | |
if acc_percentage == 1.0 and info_enc == "OGG": | |
copy_files(filename, f"{folder_output}{os.sep}audio") | |
else: | |
os.system( | |
f"ffmpeg -y -loglevel panic -i {filename} -filter:a atempo={acc_percentage} {folder_output}/{filename}" | |
) | |
if logger.isEnabledFor(logging.DEBUG): | |
duration_create = librosa.get_duration( | |
filename=f"{folder_output}/{filename}" | |
) | |
logger.debug( | |
f"acc_percen is {acc_percentage}, tts duration " | |
f"is {duration_tts}, new duration is {duration_create}" | |
f", for {filename}" | |
) | |
audio_files.append(f"{folder_output}/{filename}") | |
speaker = "TTS Speaker {:02d}".format(int(speaker[-2:]) + 1) | |
speakers_list.append(speaker) | |
return audio_files, speakers_list | |
# ===================================== | |
# Tone color converter | |
# ===================================== | |
def se_process_audio_segments( | |
source_seg, tone_color_converter, device, remove_previous_processed=True | |
): | |
# list wav seg | |
source_audio_segs = glob.glob(f"{source_seg}/*.wav") | |
if not source_audio_segs: | |
raise ValueError( | |
f"No audio segments found in {str(source_audio_segs)}" | |
) | |
source_se_path = os.path.join(source_seg, "se.pth") | |
# if exist not create wav | |
if os.path.isfile(source_se_path): | |
se = torch.load(source_se_path).to(device) | |
logger.debug(f"Previous created {source_se_path}") | |
else: | |
se = tone_color_converter.extract_se(source_audio_segs, source_se_path) | |
return se | |
def create_wav_vc( | |
valid_speakers, | |
segments_base, | |
audio_name, | |
max_segments=10, | |
target_dir="processed", | |
get_vocals_dereverb=False, | |
): | |
# valid_speakers = list({item['speaker'] for item in segments_base}) | |
# Before function delete automatic delete_previous_automatic | |
output_dir = os.path.join(".", target_dir) # remove content | |
# remove_directory_contents(output_dir) | |
path_source_segments = [] | |
path_target_segments = [] | |
for speaker in valid_speakers: | |
filtered_speaker = [ | |
segment | |
for segment in segments_base | |
if segment["speaker"] == speaker | |
] | |
if len(filtered_speaker) > 4: | |
filtered_speaker = filtered_speaker[1:] | |
dir_name_speaker = speaker + audio_name | |
dir_name_speaker_tts = "tts" + speaker + audio_name | |
dir_path_speaker = os.path.join(output_dir, dir_name_speaker) | |
dir_path_speaker_tts = os.path.join(output_dir, dir_name_speaker_tts) | |
create_directories([dir_path_speaker, dir_path_speaker_tts]) | |
path_target_segments.append(dir_path_speaker) | |
path_source_segments.append(dir_path_speaker_tts) | |
# create wav | |
max_segments_count = 0 | |
for seg in filtered_speaker: | |
duration = float(seg["end"]) - float(seg["start"]) | |
if duration > 3.0 and duration < 18.0: | |
logger.info( | |
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}' | |
) | |
name_new_wav = str(seg["start"]) | |
check_segment_audio_target_file = os.path.join( | |
dir_path_speaker, f"{name_new_wav}.wav" | |
) | |
if os.path.exists(check_segment_audio_target_file): | |
logger.debug( | |
"Segment vc source exists: " | |
f"{check_segment_audio_target_file}" | |
) | |
pass | |
else: | |
create_wav_file_vc( | |
sample_name=name_new_wav, | |
audio_wav="audio.wav", | |
start=(float(seg["start"]) + 1.0), | |
end=(float(seg["end"]) - 1.0), | |
output_final_path=dir_path_speaker, | |
get_vocals_dereverb=get_vocals_dereverb, | |
) | |
file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" | |
# copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts) | |
convert_to_xtts_good_sample( | |
file_name_tts, dir_path_speaker_tts | |
) | |
max_segments_count += 1 | |
if max_segments_count == max_segments: | |
break | |
if max_segments_count == 0: | |
logger.info("Taking the first segment") | |
seg = filtered_speaker[0] | |
logger.info( | |
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}' | |
) | |
max_duration = float(seg["end"]) - float(seg["start"]) | |
max_duration = max(1.0, min(max_duration, 18.0)) | |
name_new_wav = str(seg["start"]) | |
create_wav_file_vc( | |
sample_name=name_new_wav, | |
audio_wav="audio.wav", | |
start=(float(seg["start"])), | |
end=(float(seg["start"]) + max_duration), | |
output_final_path=dir_path_speaker, | |
get_vocals_dereverb=get_vocals_dereverb, | |
) | |
file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg" | |
# copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts) | |
convert_to_xtts_good_sample(file_name_tts, dir_path_speaker_tts) | |
logger.debug(f"Base: {str(path_source_segments)}") | |
logger.debug(f"Target: {str(path_target_segments)}") | |
return path_source_segments, path_target_segments | |
def toneconverter_openvoice( | |
result_diarize, | |
preprocessor_max_segments, | |
remove_previous_process=True, | |
get_vocals_dereverb=False, | |
model="openvoice", | |
): | |
audio_path = "audio.wav" | |
# se_path = "se.pth" | |
target_dir = "processed" | |
create_directories(target_dir) | |
from openvoice import se_extractor | |
from openvoice.api import ToneColorConverter | |
audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" | |
# se_path = os.path.join(target_dir, audio_name, 'se.pth') | |
# create wav seg original and target | |
valid_speakers = list( | |
{item["speaker"] for item in result_diarize["segments"]} | |
) | |
logger.info("Openvoice preprocessor...") | |
if remove_previous_process: | |
remove_directory_contents(target_dir) | |
path_source_segments, path_target_segments = create_wav_vc( | |
valid_speakers, | |
result_diarize["segments"], | |
audio_name, | |
max_segments=preprocessor_max_segments, | |
get_vocals_dereverb=get_vocals_dereverb, | |
) | |
logger.info("Openvoice loading model...") | |
model_path_openvoice = "./OPENVOICE_MODELS" | |
url_model_openvoice = "https://huggingface.co./myshell-ai/OpenVoice/resolve/main/checkpoints/converter" | |
if "v2" in model: | |
model_path = os.path.join(model_path_openvoice, "v2") | |
url_model_openvoice = url_model_openvoice.replace( | |
"OpenVoice", "OpenVoiceV2" | |
).replace("checkpoints/", "") | |
else: | |
model_path = os.path.join(model_path_openvoice, "v1") | |
create_directories(model_path) | |
config_url = f"{url_model_openvoice}/config.json" | |
checkpoint_url = f"{url_model_openvoice}/checkpoint.pth" | |
config_path = download_manager(url=config_url, path=model_path) | |
checkpoint_path = download_manager( | |
url=checkpoint_url, path=model_path | |
) | |
device = os.environ.get("SONITR_DEVICE") | |
tone_color_converter = ToneColorConverter(config_path, device=device) | |
tone_color_converter.load_ckpt(checkpoint_path) | |
logger.info("Openvoice tone color converter:") | |
global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") | |
for source_seg, target_seg, speaker in zip( | |
path_source_segments, path_target_segments, valid_speakers | |
): | |
# source_se_path = os.path.join(source_seg, 'se.pth') | |
source_se = se_process_audio_segments(source_seg, tone_color_converter, device) | |
# target_se_path = os.path.join(target_seg, 'se.pth') | |
target_se = se_process_audio_segments(target_seg, tone_color_converter, device) | |
# Iterate throw segments | |
encode_message = "@MyShell" | |
filtered_speaker = [ | |
segment | |
for segment in result_diarize["segments"] | |
if segment["speaker"] == speaker | |
] | |
for seg in filtered_speaker: | |
src_path = ( | |
save_path | |
) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite | |
logger.debug(f"{src_path}") | |
tone_color_converter.convert( | |
audio_src_path=src_path, | |
src_se=source_se, | |
tgt_se=target_se, | |
output_path=save_path, | |
message=encode_message, | |
) | |
global_progress_bar.update(1) | |
global_progress_bar.close() | |
try: | |
del tone_color_converter | |
gc.collect() | |
torch.cuda.empty_cache() | |
except Exception as error: | |
logger.error(str(error)) | |
gc.collect() | |
torch.cuda.empty_cache() | |
def toneconverter_freevc( | |
result_diarize, | |
remove_previous_process=True, | |
get_vocals_dereverb=False, | |
): | |
audio_path = "audio.wav" | |
target_dir = "processed" | |
create_directories(target_dir) | |
from openvoice import se_extractor | |
audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}" | |
# create wav seg; original is target and dubbing is source | |
valid_speakers = list( | |
{item["speaker"] for item in result_diarize["segments"]} | |
) | |
logger.info("FreeVC preprocessor...") | |
if remove_previous_process: | |
remove_directory_contents(target_dir) | |
path_source_segments, path_target_segments = create_wav_vc( | |
valid_speakers, | |
result_diarize["segments"], | |
audio_name, | |
max_segments=1, | |
get_vocals_dereverb=get_vocals_dereverb, | |
) | |
logger.info("FreeVC loading model...") | |
device_id = os.environ.get("SONITR_DEVICE") | |
device = None if device_id == "cpu" else device_id | |
try: | |
from TTS.api import TTS | |
tts = TTS( | |
model_name="voice_conversion_models/multilingual/vctk/freevc24", | |
progress_bar=False | |
).to(device) | |
except Exception as error: | |
logger.error(str(error)) | |
logger.error("Error loading the FreeVC model.") | |
return | |
logger.info("FreeVC process:") | |
global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress") | |
for source_seg, target_seg, speaker in zip( | |
path_source_segments, path_target_segments, valid_speakers | |
): | |
filtered_speaker = [ | |
segment | |
for segment in result_diarize["segments"] | |
if segment["speaker"] == speaker | |
] | |
files_and_directories = os.listdir(target_seg) | |
wav_files = [file for file in files_and_directories if file.endswith(".wav")] | |
original_wav_audio_segment = os.path.join(target_seg, wav_files[0]) | |
for seg in filtered_speaker: | |
src_path = ( | |
save_path | |
) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite | |
logger.debug(f"{src_path} - {original_wav_audio_segment}") | |
wav = tts.voice_conversion( | |
source_wav=src_path, | |
target_wav=original_wav_audio_segment, | |
) | |
sf.write( | |
file=save_path, | |
samplerate=tts.voice_converter.vc_config.audio.output_sample_rate, | |
data=wav, | |
format="ogg", | |
subtype="vorbis", | |
) | |
global_progress_bar.update(1) | |
global_progress_bar.close() | |
try: | |
del tts | |
gc.collect() | |
torch.cuda.empty_cache() | |
except Exception as error: | |
logger.error(str(error)) | |
gc.collect() | |
torch.cuda.empty_cache() | |
def toneconverter( | |
result_diarize, | |
preprocessor_max_segments, | |
remove_previous_process=True, | |
get_vocals_dereverb=False, | |
method_vc="freevc" | |
): | |
if method_vc == "freevc": | |
if preprocessor_max_segments > 1: | |
logger.info("FreeVC only uses one segment.") | |
return toneconverter_freevc( | |
result_diarize, | |
remove_previous_process=remove_previous_process, | |
get_vocals_dereverb=get_vocals_dereverb, | |
) | |
elif "openvoice" in method_vc: | |
return toneconverter_openvoice( | |
result_diarize, | |
preprocessor_max_segments, | |
remove_previous_process=remove_previous_process, | |
get_vocals_dereverb=get_vocals_dereverb, | |
model=method_vc, | |
) | |
if __name__ == "__main__": | |
from segments import result_diarize | |
audio_segmentation_to_voice( | |
result_diarize, | |
TRANSLATE_AUDIO_TO="en", | |
max_accelerate_audio=2.1, | |
is_gui=True, | |
tts_voice00="en-facebook-mms VITS", | |
tts_voice01="en-CA-ClaraNeural-Female", | |
tts_voice02="en-GB-ThomasNeural-Male", | |
tts_voice03="en-GB-SoniaNeural-Female", | |
tts_voice04="en-NZ-MitchellNeural-Male", | |
tts_voice05="en-GB-MaisieNeural-Female", | |
) | |