import gradio as gr import os import subprocess import spaces from typing import Tuple, List, Dict from pydub import AudioSegment @spaces.GPU def inference(audio_file: str, model_name: str, vocals: bool, drums: bool, bass: bool, other: bool, mp3: bool, mp3_bitrate: int) -> Tuple[str, gr.HTML]: log_messages = [] def stream_log(message): log_messages.append(f"[{model_name}] {message}") return gr.HTML("
" + "
".join(log_messages) + "
") yield None, stream_log("Starting separation process...") yield None, stream_log(f"Loading audio file: {audio_file}") if audio_file is None: yield None, stream_log("Error: No audio file provided") raise gr.Error("Please upload an audio file") # Use absolute paths base_output_dir = os.path.abspath("separated") output_dir = os.path.join(base_output_dir, model_name, os.path.splitext(os.path.basename(audio_file))[0]) os.makedirs(output_dir, exist_ok=True) # Construct the Demucs command with full paths cmd = [ "python", "-m", "demucs", "--out", base_output_dir, "-n", model_name, audio_file ] yield None, stream_log(f"Running Demucs command: {' '.join(cmd)}") try: # Run the Demucs command process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # Stream the output for line in process.stdout: yield None, stream_log(line.strip()) # Wait for the process to complete process.wait() if process.returncode != 0: error_output = process.stderr.read() yield None, stream_log(f"Error: Demucs command failed with return code {process.returncode}") yield None, stream_log(f"Error output: {error_output}") raise gr.Error(f"Demucs separation failed. Check the logs for details.") except Exception as e: yield None, stream_log(f"Unexpected error: {str(e)}") raise gr.Error(f"An unexpected error occurred: {str(e)}") yield None, stream_log("Separation completed. Processing stems...") # Change the stem search directory using full path stem_search_dir = os.path.join(base_output_dir, model_name, os.path.splitext(os.path.basename(audio_file))[0]) yield None, stream_log(f"Searching for stems in: {stem_search_dir}") stems: Dict[str, str] = {} for stem in ["vocals", "drums", "bass", "other"]: stem_path = os.path.join(stem_search_dir, f"{stem}.wav") yield None, stream_log(f"Checking for {stem} stem at: {stem_path}") if os.path.exists(stem_path): stems[stem] = stem_path yield None, stream_log(f"Found {stem} stem") else: yield None, stream_log(f"Warning: {stem} stem not found") if not stems: yield None, stream_log("Error: No stems found. Checking alternative directory...") stem_search_dir = os.path.join(base_output_dir, model_name) for stem in ["vocals", "drums", "bass", "other"]: stem_path = os.path.join(stem_search_dir, f"{stem}.wav") yield None, stream_log(f"Checking for {stem} stem at: {stem_path}") if os.path.exists(stem_path): stems[stem] = stem_path yield None, stream_log(f"Found {stem} stem") else: yield None, stream_log(f"Warning: {stem} stem not found") yield None, stream_log(f"All found stems: {list(stems.keys())}") selected_stems: List[str] = [] for stem, selected in zip(["vocals", "drums", "bass", "other"], [vocals, drums, bass, other]): if selected: yield None, stream_log(f"{stem} is selected by user") if stem in stems: selected_stems.append(stems[stem]) yield None, stream_log(f"Selected {stem} stem for mixing") else: yield None, stream_log(f"Warning: {stem} was selected but not found") yield None, stream_log(f"Final selected stems: {selected_stems}") if not selected_stems: yield None, stream_log("Error: No stems selected for mixing") raise gr.Error("Please select at least one stem to mix and ensure it was successfully separated.") output_file: str = os.path.join(output_dir, "mixed.wav") yield None, stream_log("Mixing selected stems...") if len(selected_stems) == 1: os.rename(selected_stems[0], output_file) else: mixed_audio: AudioSegment = AudioSegment.empty() for stem_path in selected_stems: mixed_audio += AudioSegment.from_wav(stem_path) mixed_audio.export(output_file, format="wav") if mp3: yield None, stream_log(f"Converting to MP3 (bitrate: {mp3_bitrate}k)...") mp3_output_file: str = os.path.splitext(output_file)[0] + ".mp3" mixed_audio.export(mp3_output_file, format="mp3", bitrate=str(mp3_bitrate) + "k") output_file = mp3_output_file yield None, stream_log("Process completed successfully!") yield output_file, gr.HTML("
Separation and mixing completed successfully!
") # Define the Gradio interface with gr.Blocks() as iface: gr.Markdown("# Demucs Music Source Separation and Mixing") gr.Markdown("Separate vocals, drums, bass, and other instruments from your music using Demucs and mix the selected stems.") with gr.Row(): with gr.Column(scale=1): audio_input = gr.Audio(type="filepath", label="Upload Audio File") model_dropdown = gr.Dropdown( ["htdemucs", "htdemucs_ft", "htdemucs_6s", "hdemucs_mmi", "mdx", "mdx_extra", "mdx_q", "mdx_extra_q"], label="Model Name", value="htdemucs_ft" ) with gr.Row(): vocals_checkbox = gr.Checkbox(label="Vocals", value=True) drums_checkbox = gr.Checkbox(label="Drums", value=True) with gr.Row(): bass_checkbox = gr.Checkbox(label="Bass", value=True) other_checkbox = gr.Checkbox(label="Other", value=True) mp3_checkbox = gr.Checkbox(label="Save as MP3", value=False) mp3_bitrate = gr.Slider(128, 320, step=32, label="MP3 Bitrate", visible=False) submit_btn = gr.Button("Process", variant="primary") with gr.Column(scale=1): output_audio = gr.Audio(type="filepath", label="Processed Audio") separation_log = gr.HTML() submit_btn.click( fn=inference, inputs=[audio_input, model_dropdown, vocals_checkbox, drums_checkbox, bass_checkbox, other_checkbox, mp3_checkbox, mp3_bitrate], outputs=[output_audio, separation_log] ) mp3_checkbox.change( fn=lambda mp3: gr.update(visible=mp3), inputs=mp3_checkbox, outputs=mp3_bitrate ) # Launch the Gradio interface iface.launch()