Spaces:
Runtime error
Runtime error
import gradio as gr | |
import zipfile | |
import os | |
import shutil | |
from pyannote.audio import Pipeline | |
import torch | |
# Set up the directory for processing | |
TEMP_DIR = "temp_audio" | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
# Initialize the pyannote.audio pipeline | |
pipeline = Pipeline.from_pretrained( | |
"pyannote/speaker-diarization-3.1", | |
use_auth_token=os.getenv("HF_TOKEN") | |
) | |
# Move pipeline to GPU if available | |
if torch.cuda.is_available(): | |
pipeline.to(torch.device("cuda")) | |
def process_audio_zip(file_info): | |
# Unzip the uploaded file | |
with zipfile.ZipFile(file_info, 'r') as zip_ref: | |
zip_ref.extractall(TEMP_DIR) | |
speaker1_dir = os.path.join(TEMP_DIR, "speaker1") | |
speaker2_dir = os.path.join(TEMP_DIR, "speaker2") | |
os.makedirs(speaker1_dir, exist_ok=True) | |
os.makedirs(speaker2_dir, exist_ok=True) | |
# Process each audio file in the temporary directory | |
for filename in os.listdir(TEMP_DIR): | |
if filename.endswith(".wav"): | |
file_path = os.path.join(TEMP_DIR, filename) | |
# Run the diarization pipeline | |
diarization = pipeline(file_path) | |
# Determine if the audio is mostly from speaker1 or speaker2 | |
total_duration = {1: 0.0, 2: 0.0} | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
total_duration[speaker] += turn.duration | |
# Move file to the corresponding speaker directory | |
dominant_speaker = 1 if total_duration[1] >= total_duration[2] else 2 | |
if dominant_speaker == 1: | |
shutil.move(file_path, os.path.join(speaker1_dir, filename)) | |
else: | |
shutil.move(file_path, os.path.join(speaker2_dir, filename)) | |
# Zip the results | |
speaker1_zip = "speaker1.zip" | |
speaker2_zip = "speaker2.zip" | |
def zipdir(path, ziph): | |
# Zip the directories | |
for root, dirs, files in os.walk(path): | |
for file in files: | |
ziph.write(os.path.join(root, file), | |
os.path.relpath(os.path.join(root, file), | |
os.path.join(path, '..'))) | |
with zipfile.ZipFile(speaker1_zip, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
zipdir(speaker1_dir, zipf) | |
with zipfile.ZipFile(speaker2_zip, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
zipdir(speaker2_dir, zipf) | |
# Clean up the temporary directory | |
shutil.rmtree(TEMP_DIR) | |
return speaker1_zip, speaker2_zip | |
# Gradio interface | |
iface = gr.Interface( | |
fn=process_audio_zip, | |
inputs=gr.File(type="filepath"), | |
outputs=[ | |
gr.File(label="Speaker 1 Audio"), | |
gr.File(label="Speaker 2 Audio") | |
], | |
title="Speaker Diarization", | |
description="Upload a ZIP file containing audio files, and this will return two ZIP files containing diarized audio for each speaker." | |
) | |
iface.launch() |