Spaces:
Sleeping
Sleeping
# Version: Corrected After Test 3 (V2.3.0 - Structured, Commented, and ZIP Generation Restored) | |
# Description: Cette version structure le script selon l'ordre des étapes du processus. | |
# La génération du fichier ZIP a été réintégrée après avoir été omise dans la version précédente. | |
# Chaque section est commentée pour assurer une meilleure lisibilité et une logique claire. | |
import os | |
import shutil | |
import zipfile | |
import torch | |
import numpy as np | |
from pathlib import Path | |
import gradio as gr | |
from pydub import AudioSegment | |
from transformers import pipeline | |
# ------------------------------------------------- | |
# 1. Configuration et Initialisation | |
# ------------------------------------------------- | |
MODEL_NAME = "openai/whisper-large-v3" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Initialisation du modèle Whisper | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
device=device, | |
model_kwargs={"low_cpu_mem_usage": True}, | |
) | |
# Création du répertoire temporaire pour stocker les extraits audio | |
TEMP_DIR = "./temp_audio" | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
def init_metadata_state(): | |
return [] | |
# ------------------------------------------------- | |
# 2. Transcription de l'audio avec Whisper | |
# ------------------------------------------------- | |
def transcribe_audio(audio_path): | |
if not audio_path: | |
print("[LOG] Aucun fichier audio fourni.") | |
return "Aucun fichier audio fourni", [], None, [], "" | |
print(f"[LOG] Début de la transcription de {audio_path}...") | |
result = pipe(audio_path, return_timestamps="word") | |
words = result.get("chunks", []) | |
if not words: | |
print("[LOG ERROR] Erreur : Aucun timestamp détecté.") | |
return "Erreur : Aucun timestamp détecté.", [], None, [], "" | |
raw_transcription = " ".join([w["text"] for w in words]) | |
word_timestamps = [] | |
for i, w in enumerate(words): | |
word = w["text"] | |
start = w["timestamp"][0] | |
end = words[i + 1]["timestamp"][0] - 0.01 if i + 1 < len(words) else start + 0.5 # Fin = début du mot suivant - 10ms | |
word_timestamps.append((word, (start, end))) | |
transcription_with_timestamps = " ".join([f"{w[0]}[{w[1][0]:.2f}-{w[1][1]:.2f}]" for w in word_timestamps]) | |
print(f"[LOG] Transcription brute : {raw_transcription}") | |
print(f"[LOG DETAIL] Timestamps associés : {word_timestamps}") | |
return raw_transcription, [], audio_path, word_timestamps, transcription_with_timestamps | |
# ------------------------------------------------- | |
# 3. Prétraitement des segments : Associer les timestamps aux phrases sélectionnées | |
# ------------------------------------------------- | |
def preprocess_segments(table_data, word_timestamps): | |
print("[LOG] Début du prétraitement des segments...") | |
formatted_data = [] | |
for i, row in enumerate(table_data): | |
if not row or len(row) < 1 or not row[0].strip(): | |
print(f"[LOG WARNING] Ignoré : ligne vide à l'index {i}.") | |
continue | |
text = row[0].strip() | |
segment_id = f"seg_{i+1:02d}" | |
start_time, end_time = None, None | |
words_in_segment = text.split() | |
segment_indices = [] | |
for j, (word, (start, end)) in enumerate(word_timestamps): | |
if word in words_in_segment: | |
segment_indices.append((j, start, end)) | |
if segment_indices: | |
start_time = segment_indices[0][1] | |
end_time = segment_indices[-1][2] | |
formatted_data.append([text, start_time, end_time, segment_id]) | |
print(f"[LOG] Segment ajouté : {text} | Début: {start_time}, Fin: {end_time}, ID: {segment_id}") | |
return formatted_data | |
# ------------------------------------------------- | |
# 4. Validation et découpage des extraits audio | |
# ------------------------------------------------- | |
def validate_segments(audio_path, table_data, metadata_state, word_timestamps): | |
print("[LOG] Début de la validation des segments...") | |
if not audio_path or not word_timestamps: | |
print("[LOG ERROR] Erreur : Aucun timestamp valide trouvé !") | |
return [], metadata_state | |
if os.path.exists(TEMP_DIR): | |
shutil.rmtree(TEMP_DIR) | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
original_audio = AudioSegment.from_file(audio_path) | |
segment_paths = [] | |
updated_metadata = [] | |
for text, start_time, end_time, segment_id in table_data: | |
if start_time is None or end_time is None: | |
print(f"[LOG ERROR] Timestamp manquant pour : {text}") | |
continue | |
start_ms, end_ms = int(float(start_time) * 1000), int(float(end_time) * 1000) | |
if start_ms < 0 or end_ms <= start_ms: | |
print(f"[LOG ERROR] Problème de découpage : {text} | {start_time}s - {end_time}s") | |
continue | |
segment_filename = f"{Path(audio_path).stem}_{segment_id}.wav" | |
segment_path = os.path.join(TEMP_DIR, segment_filename) | |
extract = original_audio[start_ms:end_ms] | |
extract.export(segment_path, format="wav") | |
segment_paths.append(segment_path) | |
updated_metadata.append({ | |
"audio_file": segment_filename, | |
"text": text, | |
"start_time": start_time, | |
"end_time": end_time, | |
"id": segment_id, | |
}) | |
print(f"[LOG] Extrait généré : {segment_filename}") | |
return segment_paths, updated_metadata | |
# ------------------------------------------------- | |
# 5. Génération du fichier ZIP | |
# ------------------------------------------------- | |
def generate_zip(metadata_state): | |
if not metadata_state: | |
print("[LOG ERROR] Aucun segment valide trouvé pour la génération du ZIP.") | |
return None | |
zip_path = os.path.join(TEMP_DIR, "dataset.zip") | |
if os.path.exists(zip_path): | |
os.remove(zip_path) | |
metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv") | |
with open(metadata_csv_path, "w", encoding="utf-8") as f: | |
f.write("audio_file|text|speaker_name|API\n") | |
for seg in metadata_state: | |
f.write(f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n") | |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
zf.write(metadata_csv_path, "metadata.csv") | |
for seg in metadata_state: | |
file_path = os.path.join(TEMP_DIR, seg["audio_file"]) | |
if os.path.exists(file_path): | |
zf.write(file_path, seg["audio_file"]) | |
print("[LOG] Fichier ZIP généré avec succès.") | |
return zip_path | |
# ------------------------------------------------- | |
# 6. Interface utilisateur Gradio | |
# ------------------------------------------------- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Application de Découpe Audio") | |
metadata_state = gr.State(init_metadata_state()) | |
extracted_segments = gr.State([]) | |
audio_input = gr.Audio(type="filepath", label="Fichier audio") | |
raw_transcription = gr.Textbox(label="Transcription", interactive=False) | |
transcription_timestamps = gr.Textbox(label="Transcription avec Timestamps", interactive=False) | |
table = gr.Dataframe(headers=["Texte"], datatype=["str"], row_count=(1, "dynamic"), col_count=1) | |
generate_timestamps_button = gr.Button("Générer les timestamps") | |
validate_button = gr.Button("Valider") | |
generate_button = gr.Button("Générer ZIP") | |
zip_file = gr.File(label="Télécharger le ZIP") | |
word_timestamps = gr.State() | |
audio_input.change(transcribe_audio, inputs=audio_input, outputs=[raw_transcription, table, audio_input, word_timestamps, transcription_timestamps]) | |
generate_timestamps_button.click(preprocess_segments, inputs=[table, word_timestamps], outputs=table) | |
validate_button.click(validate_segments, inputs=[audio_input, table, metadata_state, word_timestamps], outputs=[extracted_segments, metadata_state]) | |
generate_button.click(generate_zip, inputs=metadata_state, outputs=zip_file) | |
demo.queue().launch() | |