Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -2,18 +2,21 @@ import os
|
|
2 |
import shutil
|
3 |
import zipfile
|
4 |
from pathlib import Path
|
|
|
5 |
|
6 |
import gradio as gr
|
7 |
import torch
|
8 |
from pydub import AudioSegment
|
9 |
from transformers import pipeline
|
10 |
|
11 |
-
#
|
12 |
-
# Configuration
|
13 |
-
#
|
|
|
14 |
MODEL_NAME = "openai/whisper-large-v3"
|
15 |
device = 0 if torch.cuda.is_available() else "cpu"
|
16 |
|
|
|
17 |
pipe = pipeline(
|
18 |
task="automatic-speech-recognition",
|
19 |
model=MODEL_NAME,
|
@@ -21,206 +24,242 @@ pipe = pipeline(
|
|
21 |
model_kwargs={"low_cpu_mem_usage": True},
|
22 |
)
|
23 |
|
|
|
24 |
TEMP_DIR = "./temp_audio"
|
25 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
26 |
|
27 |
-
#
|
28 |
-
#
|
29 |
-
#
|
30 |
def init_metadata_state():
|
31 |
-
"""Initialise l'état pour stocker les informations des segments validés."""
|
32 |
return []
|
33 |
|
34 |
-
#
|
35 |
-
# Étape 2 : Transcription
|
36 |
-
#
|
37 |
def transcribe_audio(audio_path):
|
38 |
-
"""
|
|
|
|
|
|
|
|
|
39 |
if not audio_path:
|
40 |
-
return "Aucun fichier audio fourni
|
41 |
|
42 |
-
# Transcription avec
|
43 |
result = pipe(audio_path, return_timestamps="word")
|
44 |
text = result["text"]
|
|
|
45 |
|
46 |
-
#
|
47 |
-
|
48 |
-
return text, table_init, audio_path
|
49 |
-
|
50 |
-
# -------------------------------------------------
|
51 |
-
# Étape intermédiaire : Génération des timestamps
|
52 |
-
# -------------------------------------------------
|
53 |
-
def generate_timestamps(audio_path, table_data):
|
54 |
-
"""Génère les timestamps des segments en fonction des extraits textuels fournis."""
|
55 |
-
if not audio_path or not table_data:
|
56 |
-
return table_data
|
57 |
-
|
58 |
-
# Transcription avec Whisper pour obtenir les timestamps
|
59 |
-
result = pipe(audio_path, return_timestamps="word")
|
60 |
-
chunks = result["chunks"]
|
61 |
-
|
62 |
-
updated_table = []
|
63 |
-
for row in table_data:
|
64 |
-
if not row[0]: # Ignorer les lignes sans texte
|
65 |
-
updated_table.append(row)
|
66 |
-
continue
|
67 |
-
|
68 |
-
text = row[0]
|
69 |
-
start_time, end_time = None, None
|
70 |
-
|
71 |
-
# Chercher les timestamps correspondant au texte
|
72 |
-
for chunk in chunks:
|
73 |
-
if text in chunk["text"]:
|
74 |
-
start_time, end_time = chunk["timestamp"]
|
75 |
-
break
|
76 |
|
77 |
-
|
|
|
78 |
|
79 |
-
return updated_table
|
80 |
|
81 |
-
#
|
82 |
-
# Étape 5 : Validation + découpe
|
83 |
-
#
|
84 |
def validate_segments(audio_path, table_data, metadata_state):
|
85 |
-
"""
|
86 |
-
|
87 |
-
|
|
|
|
|
|
|
|
|
88 |
|
89 |
-
#
|
90 |
if os.path.exists(TEMP_DIR):
|
91 |
shutil.rmtree(TEMP_DIR)
|
92 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
93 |
|
|
|
94 |
original_audio = AudioSegment.from_file(audio_path)
|
95 |
|
96 |
-
|
97 |
updated_metadata = []
|
98 |
|
99 |
for i, row in enumerate(table_data):
|
100 |
-
|
101 |
-
|
102 |
-
|
103 |
-
if not text or start_time is None or end_time is None:
|
104 |
continue
|
105 |
-
if not segment_id:
|
106 |
-
segment_id = f"seg_{i + 1:02d}"
|
107 |
-
|
108 |
-
try:
|
109 |
-
start_ms = int(float(start_time) * 1000)
|
110 |
-
end_ms = int(float(end_time) * 1000)
|
111 |
-
except (ValueError, TypeError):
|
112 |
-
continue # Ignorer les lignes avec des timestamps invalides
|
113 |
|
114 |
-
|
|
|
|
|
115 |
continue
|
116 |
|
117 |
-
|
118 |
-
|
|
|
119 |
|
120 |
-
# Découpe
|
|
|
|
|
121 |
extract = original_audio[start_ms:end_ms]
|
122 |
-
extract.export(segment_path, format="wav")
|
123 |
|
124 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
125 |
updated_metadata.append({
|
126 |
"audio_file": segment_filename,
|
127 |
-
"text":
|
128 |
"start_time": start_time,
|
129 |
"end_time": end_time,
|
130 |
-
"id":
|
131 |
})
|
132 |
|
133 |
-
|
|
|
|
|
|
|
134 |
|
135 |
-
#
|
136 |
-
# Étape
|
137 |
-
#
|
138 |
def generate_zip(metadata_state):
|
139 |
-
"""
|
|
|
|
|
|
|
|
|
140 |
if not metadata_state:
|
141 |
return None
|
142 |
|
143 |
zip_path = os.path.join(TEMP_DIR, "dataset.zip")
|
|
|
144 |
if os.path.exists(zip_path):
|
145 |
os.remove(zip_path)
|
146 |
|
147 |
-
# Créer le
|
148 |
metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
|
149 |
with open(metadata_csv_path, "w", encoding="utf-8") as f:
|
150 |
f.write("audio_file|text|speaker_name|API\n")
|
151 |
for seg in metadata_state:
|
|
|
|
|
152 |
line = f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n"
|
153 |
f.write(line)
|
154 |
|
155 |
-
#
|
156 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
157 |
-
|
158 |
for seg in metadata_state:
|
159 |
-
|
160 |
-
if os.path.exists(
|
161 |
-
zf.write(
|
|
|
|
|
162 |
|
163 |
return zip_path
|
164 |
|
165 |
-
|
166 |
-
#
|
167 |
-
#
|
|
|
168 |
with gr.Blocks(css="style.css") as demo:
|
169 |
-
gr.Markdown("# Application de
|
170 |
|
171 |
-
#
|
172 |
metadata_state = gr.State(init_metadata_state())
|
173 |
|
174 |
-
# Étape 1 :
|
175 |
-
|
176 |
-
|
177 |
-
|
178 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
179 |
|
180 |
-
# Étape
|
|
|
|
|
|
|
|
|
|
|
181 |
table = gr.Dataframe(
|
182 |
headers=["Texte", "Début (s)", "Fin (s)", "ID"],
|
183 |
datatype=["str", "number", "number", "str"],
|
184 |
-
row_count=
|
185 |
-
col_count=4
|
186 |
)
|
187 |
-
|
188 |
-
# Étape intermédiaire : Générer les timestamps
|
189 |
-
generate_timestamps_button = gr.Button("Générer les timestamps")
|
190 |
-
|
191 |
-
# Bouton pour valider et générer les segments
|
192 |
validate_button = gr.Button("Valider et générer les extraits")
|
193 |
-
audio_outputs = [gr.Audio(label=f"Extrait {i+1}", interactive=False) for i in range(20)]
|
194 |
|
195 |
-
#
|
196 |
-
|
197 |
-
|
198 |
-
|
199 |
-
|
200 |
-
|
201 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
202 |
audio_input.change(
|
203 |
fn=transcribe_audio,
|
204 |
inputs=audio_input,
|
205 |
-
outputs=[raw_transcription, table, audio_input],
|
206 |
-
)
|
207 |
-
|
208 |
-
generate_timestamps_button.click(
|
209 |
-
fn=generate_timestamps,
|
210 |
-
inputs=[audio_input, table],
|
211 |
-
outputs=table,
|
212 |
)
|
213 |
|
|
|
214 |
validate_button.click(
|
215 |
fn=validate_segments,
|
216 |
inputs=[audio_input, table, metadata_state],
|
217 |
-
outputs=
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
218 |
)
|
219 |
|
220 |
-
|
|
|
221 |
fn=generate_zip,
|
222 |
inputs=metadata_state,
|
223 |
-
outputs=
|
224 |
)
|
225 |
|
226 |
demo.queue().launch()
|
|
|
2 |
import shutil
|
3 |
import zipfile
|
4 |
from pathlib import Path
|
5 |
+
from datetime import datetime
|
6 |
|
7 |
import gradio as gr
|
8 |
import torch
|
9 |
from pydub import AudioSegment
|
10 |
from transformers import pipeline
|
11 |
|
12 |
+
# ------------------------
|
13 |
+
# Configuration générale
|
14 |
+
# ------------------------
|
15 |
+
|
16 |
MODEL_NAME = "openai/whisper-large-v3"
|
17 |
device = 0 if torch.cuda.is_available() else "cpu"
|
18 |
|
19 |
+
# Pipeline Whisper pour la transcription
|
20 |
pipe = pipeline(
|
21 |
task="automatic-speech-recognition",
|
22 |
model=MODEL_NAME,
|
|
|
24 |
model_kwargs={"low_cpu_mem_usage": True},
|
25 |
)
|
26 |
|
27 |
+
# Dossier temporaire pour stocker extraits & ZIP
|
28 |
TEMP_DIR = "./temp_audio"
|
29 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
30 |
|
31 |
+
# On stocke la métadonnée globale dans un State
|
32 |
+
# pour pouvoir y accéder lors de la génération du ZIP
|
33 |
+
# (table de correspondance entre segments et infos)
|
34 |
def init_metadata_state():
|
|
|
35 |
return []
|
36 |
|
37 |
+
# ------------------------
|
38 |
+
# Étape 2 : Transcription
|
39 |
+
# ------------------------
|
40 |
def transcribe_audio(audio_path):
|
41 |
+
"""
|
42 |
+
1) Transcrit l'audio avec Whisper large-v3
|
43 |
+
2) Retourne la transcription brute et une table pré-remplie
|
44 |
+
(l'utilisateur pourra remplir manuellement la partie 'Texte' s'il le souhaite)
|
45 |
+
"""
|
46 |
if not audio_path:
|
47 |
+
return "Aucun fichier audio fourni", [], None
|
48 |
|
49 |
+
# Transcription Whisper avec timestamps de chaque mot
|
50 |
result = pipe(audio_path, return_timestamps="word")
|
51 |
text = result["text"]
|
52 |
+
chunks = result["chunks"] # liste de { 'timestamp': (start, end), 'text': ... }
|
53 |
|
54 |
+
# Transcription brute (sans timestamps, par exemple)
|
55 |
+
raw_transcription = " ".join([w["text"] for w in chunks])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
56 |
|
57 |
+
# On conserve le chemin de l'audio dans un State (pour la découpe ultérieure)
|
58 |
+
return raw_transcription, [], audio_path
|
59 |
|
|
|
60 |
|
61 |
+
# ------------------------
|
62 |
+
# Étape 5 : Validation des segments + découpe
|
63 |
+
# ------------------------
|
64 |
def validate_segments(audio_path, table_data, metadata_state):
|
65 |
+
"""
|
66 |
+
1) Pour chaque ligne du tableau, on découpe l'audio
|
67 |
+
2) On stocke les chemins des extraits dans 'metadata_state'
|
68 |
+
3) On renvoie une liste de chemins pour écoute sur l'interface
|
69 |
+
"""
|
70 |
+
if not audio_path:
|
71 |
+
return ["Aucun fichier audio..."], metadata_state
|
72 |
|
73 |
+
# Nettoyage du dossier temporaire avant de recréer les extraits
|
74 |
if os.path.exists(TEMP_DIR):
|
75 |
shutil.rmtree(TEMP_DIR)
|
76 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
77 |
|
78 |
+
# Charger l'audio complet pour la découpe
|
79 |
original_audio = AudioSegment.from_file(audio_path)
|
80 |
|
81 |
+
segment_paths = []
|
82 |
updated_metadata = []
|
83 |
|
84 |
for i, row in enumerate(table_data):
|
85 |
+
# row = [ Texte, Start, End, ID ] (4 colonnes)
|
86 |
+
if len(row) < 4:
|
87 |
+
# S'il n'y a pas toutes les colonnes, on ignore
|
|
|
88 |
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
89 |
|
90 |
+
segment_text, start_time, end_time, seg_id = row
|
91 |
+
if not segment_text or start_time is None or end_time is None:
|
92 |
+
# Ignore ligne vide ou incomplète
|
93 |
continue
|
94 |
|
95 |
+
# Générer un ID si l'utilisateur ne l'a pas renseigné
|
96 |
+
if not seg_id:
|
97 |
+
seg_id = f"seg_{i+1:02d}"
|
98 |
|
99 |
+
# Découpe réelle de l'audio
|
100 |
+
start_ms = int(float(start_time) * 1000)
|
101 |
+
end_ms = int(float(end_time) * 1000)
|
102 |
extract = original_audio[start_ms:end_ms]
|
|
|
103 |
|
104 |
+
segment_filename = f"{Path(audio_path).stem}_{seg_id}.wav"
|
105 |
+
segment_filepath = os.path.join(TEMP_DIR, segment_filename)
|
106 |
+
extract.export(segment_filepath, format="wav")
|
107 |
+
|
108 |
+
segment_paths.append(segment_filepath)
|
109 |
+
|
110 |
+
# Stocker la méta (pour le futur CSV)
|
111 |
updated_metadata.append({
|
112 |
"audio_file": segment_filename,
|
113 |
+
"text": segment_text,
|
114 |
"start_time": start_time,
|
115 |
"end_time": end_time,
|
116 |
+
"id": seg_id
|
117 |
})
|
118 |
|
119 |
+
# Mettre à jour le State
|
120 |
+
# (Dans Gradio, on renvoie la nouvelle valeur)
|
121 |
+
return segment_paths, updated_metadata
|
122 |
+
|
123 |
|
124 |
+
# ------------------------
|
125 |
+
# Étape 7 : Génération du ZIP (avec metadata.csv)
|
126 |
+
# ------------------------
|
127 |
def generate_zip(metadata_state):
|
128 |
+
"""
|
129 |
+
1) Crée le fichier 'metadata.csv'
|
130 |
+
2) Zip tous les extraits + metadata.csv
|
131 |
+
3) Renvoie le chemin du ZIP pour téléchargement
|
132 |
+
"""
|
133 |
if not metadata_state:
|
134 |
return None
|
135 |
|
136 |
zip_path = os.path.join(TEMP_DIR, "dataset.zip")
|
137 |
+
# Supprimer le zip précédent si existe
|
138 |
if os.path.exists(zip_path):
|
139 |
os.remove(zip_path)
|
140 |
|
141 |
+
# Créer le CSV
|
142 |
metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
|
143 |
with open(metadata_csv_path, "w", encoding="utf-8") as f:
|
144 |
f.write("audio_file|text|speaker_name|API\n")
|
145 |
for seg in metadata_state:
|
146 |
+
# Exemple de speaker_name et API
|
147 |
+
# A adapter selon tes besoins
|
148 |
line = f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n"
|
149 |
f.write(line)
|
150 |
|
151 |
+
# Créer le ZIP final
|
152 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
153 |
+
# Ajouter les segments
|
154 |
for seg in metadata_state:
|
155 |
+
segment_file = os.path.join(TEMP_DIR, seg["audio_file"])
|
156 |
+
if os.path.exists(segment_file):
|
157 |
+
zf.write(segment_file, seg["audio_file"])
|
158 |
+
# Ajouter le CSV
|
159 |
+
zf.write(metadata_csv_path, "metadata.csv")
|
160 |
|
161 |
return zip_path
|
162 |
|
163 |
+
|
164 |
+
# ------------------------
|
165 |
+
# Construction de l'interface Gradio
|
166 |
+
# ------------------------
|
167 |
with gr.Blocks(css="style.css") as demo:
|
168 |
+
gr.Markdown("# Application de découpe audio + Transcription Whisper")
|
169 |
|
170 |
+
# State pour conserver l'audio path + la liste de metadata
|
171 |
metadata_state = gr.State(init_metadata_state())
|
172 |
|
173 |
+
# Étape 1 : Choix de l'audio
|
174 |
+
with gr.Column():
|
175 |
+
gr.Markdown("### 1. Téléversez votre fichier audio (MP3/WAV)")
|
176 |
+
audio_input = gr.Audio(
|
177 |
+
type="filepath", label="Fichier audio"
|
178 |
+
)
|
179 |
+
|
180 |
+
# Étape 3 : Affichage transcription brute
|
181 |
+
raw_transcription = gr.Textbox(
|
182 |
+
label="Transcription brute (générée par Whisper)",
|
183 |
+
placeholder="La transcription apparaîtra ici",
|
184 |
+
interactive=False
|
185 |
+
)
|
186 |
|
187 |
+
# Étape 4 : Tableau pour choisir les extraits
|
188 |
+
gr.Markdown("### 2. Définissez vos segments")
|
189 |
+
gr.Markdown("**Colonne 1** : Le texte (copiez-coller depuis la transcription si besoin)
|
190 |
+
**Colonne 2** : Temps de début (en secondes)
|
191 |
+
**Colonne 3** : Temps de fin (en secondes)
|
192 |
+
**Colonne 4** : ID de votre segment (optionnel).")
|
193 |
table = gr.Dataframe(
|
194 |
headers=["Texte", "Début (s)", "Fin (s)", "ID"],
|
195 |
datatype=["str", "number", "number", "str"],
|
196 |
+
row_count=5,
|
197 |
+
col_count=4
|
198 |
)
|
|
|
|
|
|
|
|
|
|
|
199 |
validate_button = gr.Button("Valider et générer les extraits")
|
|
|
200 |
|
201 |
+
# Étape 6 : Écoute des extraits
|
202 |
+
# On prévoit jusqu'à 5 extraits (correspondant à row_count=5)
|
203 |
+
# Si tu prévois plus, augmente simplement ce bloc
|
204 |
+
audio_player_1 = gr.Audio(label="Extrait 1", interactive=False)
|
205 |
+
audio_player_2 = gr.Audio(label="Extrait 2", interactive=False)
|
206 |
+
audio_player_3 = gr.Audio(label="Extrait 3", interactive=False)
|
207 |
+
audio_player_4 = gr.Audio(label="Extrait 4", interactive=False)
|
208 |
+
audio_player_5 = gr.Audio(label="Extrait 5", interactive=False)
|
209 |
+
|
210 |
+
# Pour la sortie 'Valider segments', on veut 2 retours :
|
211 |
+
# - la liste des chemins (jusqu'à 5)
|
212 |
+
# - la metadata mise à jour
|
213 |
+
# => On va mapper ces chemins sur les 5 players
|
214 |
+
def update_audio_players(segments):
|
215 |
+
"""
|
216 |
+
Prend la liste des chemins d'extraits audio (taille variable)
|
217 |
+
et renvoie une liste de 5 valeurs (None si pas d'extrait)
|
218 |
+
"""
|
219 |
+
max_slots = 5
|
220 |
+
audio_values = [None]*max_slots
|
221 |
+
for i, seg in enumerate(segments):
|
222 |
+
if i < max_slots:
|
223 |
+
audio_values[i] = seg
|
224 |
+
return tuple(audio_values)
|
225 |
+
|
226 |
+
# Étape 8 : Génération ZIP
|
227 |
+
generate_button = gr.Button("Générer le fichier ZIP")
|
228 |
+
zip_file = gr.File(label="Télécharger le ZIP")
|
229 |
+
|
230 |
+
# ------------------------
|
231 |
+
# Logique de callbacks
|
232 |
+
# ------------------------
|
233 |
+
|
234 |
+
# 1) Callback quand on charge l'audio
|
235 |
audio_input.change(
|
236 |
fn=transcribe_audio,
|
237 |
inputs=audio_input,
|
238 |
+
outputs=[raw_transcription, table, audio_input], # On renvoie le path en 3e
|
|
|
|
|
|
|
|
|
|
|
|
|
239 |
)
|
240 |
|
241 |
+
# 2) Callback quand on valide les segments => on découpe
|
242 |
validate_button.click(
|
243 |
fn=validate_segments,
|
244 |
inputs=[audio_input, table, metadata_state],
|
245 |
+
outputs=[ # On reçoit segment_paths, updated_metadata
|
246 |
+
[audio_player_1, audio_player_2, audio_player_3, audio_player_4, audio_player_5],
|
247 |
+
metadata_state
|
248 |
+
],
|
249 |
+
_js="(p) => { /* rien en JS */ }",
|
250 |
+
# On va quand même mapper les segments sur 5 players python-side
|
251 |
+
post_process=True
|
252 |
+
).then(
|
253 |
+
fn=update_audio_players,
|
254 |
+
inputs=None, # Pas besoin : la sortie segments est déjà captée par les players
|
255 |
+
outputs=[audio_player_1, audio_player_2, audio_player_3, audio_player_4, audio_player_5],
|
256 |
)
|
257 |
|
258 |
+
# 3) Callback pour générer le ZIP
|
259 |
+
generate_button.click(
|
260 |
fn=generate_zip,
|
261 |
inputs=metadata_state,
|
262 |
+
outputs=zip_file
|
263 |
)
|
264 |
|
265 |
demo.queue().launch()
|