owiedotch's picture
Update app.py
4a925ac verified
raw
history blame
No virus
7 kB
import gradio as gr
import os
import subprocess
import spaces
from typing import Tuple, List, Dict
from pydub import AudioSegment
@spaces.GPU
def inference(audio_file: str, model_name: str, vocals: bool, drums: bool, bass: bool, other: bool, mp3: bool, mp3_bitrate: int) -> Tuple[str, gr.HTML]:
log_messages = []
def stream_log(message):
log_messages.append(f"[{model_name}] {message}")
return gr.HTML("<pre style='margin-bottom: 0;'>" + "<br>".join(log_messages) + "</pre>")
yield None, stream_log("Starting separation process...")
yield None, stream_log(f"Loading audio file: {audio_file}")
if audio_file is None:
yield None, stream_log("Error: No audio file provided")
raise gr.Error("Please upload an audio file")
# Use absolute paths
base_output_dir = os.path.abspath("separated")
output_dir = os.path.join(base_output_dir, model_name, os.path.splitext(os.path.basename(audio_file))[0])
os.makedirs(output_dir, exist_ok=True)
# Construct the Demucs command with full paths
cmd = [
"python", "-m", "demucs",
"--out", base_output_dir,
"-n", model_name,
audio_file
]
yield None, stream_log(f"Running Demucs command: {' '.join(cmd)}")
try:
# Run the Demucs command
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# Stream the output
for line in process.stdout:
yield None, stream_log(line.strip())
# Wait for the process to complete
process.wait()
if process.returncode != 0:
error_output = process.stderr.read()
yield None, stream_log(f"Error: Demucs command failed with return code {process.returncode}")
yield None, stream_log(f"Error output: {error_output}")
raise gr.Error(f"Demucs separation failed. Check the logs for details.")
except Exception as e:
yield None, stream_log(f"Unexpected error: {str(e)}")
raise gr.Error(f"An unexpected error occurred: {str(e)}")
yield None, stream_log("Separation completed. Processing stems...")
# Change the stem search directory using full path
stem_search_dir = os.path.join(base_output_dir, model_name, os.path.splitext(os.path.basename(audio_file))[0])
yield None, stream_log(f"Searching for stems in: {stem_search_dir}")
stems: Dict[str, str] = {}
for stem in ["vocals", "drums", "bass", "other"]:
stem_path = os.path.join(stem_search_dir, f"{stem}.wav")
yield None, stream_log(f"Checking for {stem} stem at: {stem_path}")
if os.path.exists(stem_path):
stems[stem] = stem_path
yield None, stream_log(f"Found {stem} stem")
else:
yield None, stream_log(f"Warning: {stem} stem not found")
if not stems:
yield None, stream_log("Error: No stems found. Checking alternative directory...")
stem_search_dir = os.path.join(base_output_dir, model_name)
for stem in ["vocals", "drums", "bass", "other"]:
stem_path = os.path.join(stem_search_dir, f"{stem}.wav")
yield None, stream_log(f"Checking for {stem} stem at: {stem_path}")
if os.path.exists(stem_path):
stems[stem] = stem_path
yield None, stream_log(f"Found {stem} stem")
else:
yield None, stream_log(f"Warning: {stem} stem not found")
yield None, stream_log(f"All found stems: {list(stems.keys())}")
selected_stems: List[str] = []
for stem, selected in zip(["vocals", "drums", "bass", "other"], [vocals, drums, bass, other]):
if selected:
yield None, stream_log(f"{stem} is selected by user")
if stem in stems:
selected_stems.append(stems[stem])
yield None, stream_log(f"Selected {stem} stem for mixing")
else:
yield None, stream_log(f"Warning: {stem} was selected but not found")
yield None, stream_log(f"Final selected stems: {selected_stems}")
if not selected_stems:
yield None, stream_log("Error: No stems selected for mixing")
raise gr.Error("Please select at least one stem to mix and ensure it was successfully separated.")
output_file: str = os.path.join(output_dir, "mixed.wav")
yield None, stream_log("Mixing selected stems...")
if len(selected_stems) == 1:
os.rename(selected_stems[0], output_file)
else:
mixed_audio: AudioSegment = AudioSegment.empty()
for stem_path in selected_stems:
mixed_audio += AudioSegment.from_wav(stem_path)
mixed_audio.export(output_file, format="wav")
if mp3:
yield None, stream_log(f"Converting to MP3 (bitrate: {mp3_bitrate}k)...")
mp3_output_file: str = os.path.splitext(output_file)[0] + ".mp3"
mixed_audio.export(mp3_output_file, format="mp3", bitrate=str(mp3_bitrate) + "k")
output_file = mp3_output_file
yield None, stream_log("Process completed successfully!")
yield output_file, gr.HTML("<pre style='color: green;'>Separation and mixing completed successfully!</pre>")
# Define the Gradio interface
with gr.Blocks() as iface:
gr.Markdown("# Demucs Music Source Separation and Mixing")
gr.Markdown("Separate vocals, drums, bass, and other instruments from your music using Demucs and mix the selected stems.")
with gr.Row():
with gr.Column(scale=1):
audio_input = gr.Audio(type="filepath", label="Upload Audio File")
model_dropdown = gr.Dropdown(
["htdemucs", "htdemucs_ft", "htdemucs_6s", "hdemucs_mmi", "mdx", "mdx_extra", "mdx_q", "mdx_extra_q"],
label="Model Name",
value="htdemucs_ft"
)
with gr.Row():
vocals_checkbox = gr.Checkbox(label="Vocals", value=True)
drums_checkbox = gr.Checkbox(label="Drums", value=True)
with gr.Row():
bass_checkbox = gr.Checkbox(label="Bass", value=True)
other_checkbox = gr.Checkbox(label="Other", value=True)
mp3_checkbox = gr.Checkbox(label="Save as MP3", value=False)
mp3_bitrate = gr.Slider(128, 320, step=32, label="MP3 Bitrate", visible=False)
submit_btn = gr.Button("Process", variant="primary")
with gr.Column(scale=1):
output_audio = gr.Audio(type="filepath", label="Processed Audio")
separation_log = gr.HTML()
submit_btn.click(
fn=inference,
inputs=[audio_input, model_dropdown, vocals_checkbox, drums_checkbox, bass_checkbox, other_checkbox, mp3_checkbox, mp3_bitrate],
outputs=[output_audio, separation_log]
)
mp3_checkbox.change(
fn=lambda mp3: gr.update(visible=mp3),
inputs=mp3_checkbox,
outputs=mp3_bitrate
)
# Launch the Gradio interface
iface.launch()