uvr5 / inst.py
lorneluo's picture
optimise import
723de54
raw
history blame
5.71 kB
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
from time import sleep
import requests
from tqdm import tqdm
from args import mdx23c_8kfft_instvoc_hq_process_data, htdemucs_ft_process_data, uvr_mdx_net_voc_ft_process_data
from download import download_model
from gui_data.constants import VR_ARCH_TYPE, MDX_ARCH_TYPE, DEMUCS_ARCH_TYPE, WAV
from lib_v5 import spec_utils
from separate import SeperateDemucs, SeperateMDX, SeperateMDXC, SeperateVR, save_format # Model-related
def run_ensemble_models(audio_path, export_path, format=WAV, clean=True):
vocals_final_path = Path(export_path) / f"{Path(audio_path).stem}.vocal.{format.lower()}"
instrumental_final_path = Path(export_path) / f"{Path(audio_path).stem}.instrumental.{format.lower()}"
if os.path.isfile(instrumental_final_path) and os.path.isfile(vocals_final_path):
return instrumental_final_path, vocals_final_path
start = datetime.now()
process_datas = [mdx23c_8kfft_instvoc_hq_process_data, uvr_mdx_net_voc_ft_process_data,
htdemucs_ft_process_data]
# download models
for process_data in process_datas:
download_model(process_data['model_name'])
# create folder
os.makedirs(export_path, exist_ok=True)
temp_export_path = os.path.join(export_path, 'uvr5_' + datetime.now().strftime("%Y-%m-%d_%H%M%S"))
os.makedirs(temp_export_path, exist_ok=True)
print(f'temp_export_path', temp_export_path)
instrumental_export_paths = []
vocals_export_paths = []
for process_data in process_datas:
progress_bar = tqdm(total=100, desc=process_data["model_name"], unit="%")
def set_progress_bar(step, inference_iterations=0):
# print(step, inference_iterations, round(inference_iterations * 100, 2))
if inference_iterations > 0:
progress_bar.update(round(inference_iterations * 100, 2) - progress_bar.n)
def write_to_console(progress_text, base_text=''):
text = f"{progress_text} {base_text}"
if text.strip():
return f'{text} @ process_data["model_name"]'
current_model = process_data['model_data']
audio_file_base = Path(audio_path).stem + '_' + current_model.model_basename
process_data['export_path'] = temp_export_path
process_data['audio_file_base'] = audio_file_base
process_data['audio_file'] = audio_path
process_data['set_progress_bar'] = set_progress_bar
process_data['write_to_console'] = write_to_console
if current_model.process_method == VR_ARCH_TYPE:
seperator = SeperateVR(current_model, process_data)
elif current_model.process_method == MDX_ARCH_TYPE:
seperator = SeperateMDXC(current_model, process_data) if current_model.is_mdx_c else SeperateMDX(
current_model, process_data)
elif current_model.process_method == DEMUCS_ARCH_TYPE:
seperator = SeperateDemucs(current_model, process_data, vocal_stem_path=(audio_path, audio_file_base))
else:
raise Exception(f'model not found')
seperator.seperate()
instrumental_path = Path(temp_export_path) / f"{audio_file_base}_(Instrumental).{format.lower()}"
vocals_path = Path(temp_export_path) / f"{audio_file_base}_(Vocals).{format.lower()}"
instrumental_export_paths.append(str(instrumental_path))
vocals_export_paths.append(str(vocals_path))
# merge each model outputs
ensemble(vocals_export_paths, vocals_final_path)
ensemble(instrumental_export_paths, instrumental_final_path)
print(f'instrumental_final_path', instrumental_final_path)
print(f'vocals_final_path', vocals_final_path)
print(f'Finished in {datetime.now() - start}')
if clean:
sleep(10)
shutil.rmtree(temp_export_path, ignore_errors=True)
return instrumental_final_path, vocals_final_path
def ensemble(stem_outputs, stem_save_path, format=WAV):
stem_save_path = str(stem_save_path)
stem_outputs = [str(s) for s in stem_outputs]
algorithm = 'Average'
is_normalization = True
spec_utils.ensemble_inputs(stem_outputs, algorithm, is_normalization, 'PCM_16', stem_save_path, is_wave=True)
save_format(stem_save_path, format, '320k')
def uvr_job(song_id, platform='netease'):
audio_dir = os.getcwd()
audio_file = f'{song_id}.m4a' if platform == 'youtube' else f'{song_id}.mp3'
audio_path = os.path.join(audio_dir, audio_file)
if not os.path.isfile(audio_path):
url = f"http://or.luotao.net/api/download_song?song_id={song_id}&platform={platform}"
r = requests.get(url, allow_redirects=True)
open(audio_path, 'wb').write(r.content)
instrumental_path, vocals_path = run_ensemble_models(audio_file, audio_dir)
return instrumental_path
# /Users/taoluo/Downloads/test/kimk_audio_MDX23C-8KFFT-InstVoc_HQ_(Instrumental).WAV
#
if __name__ == '__main__':
audio_file = '/Users/taoluo/Downloads/assets/audio/kimk_audio.mp3'
audio_file = sys.argv[1]
platform = sys.argv[2] if len(sys.argv) > 2 else 'netease'
# exist file
if os.path.isfile(audio_file):
output_dir = os.path.dirname(audio_file)
instrumental_path, vocals_path = run_ensemble_models(audio_file, output_dir)
print('instrumental_path: ', instrumental_path)
sys.exit(0)
# download from platform
song_id = sys.argv[1]
instrumental_path = uvr_job(song_id, platform)
print('instrumental_path: ', instrumental_path)