Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -2,21 +2,18 @@ import os
|
|
2 |
import shutil
|
3 |
import zipfile
|
4 |
from pathlib import Path
|
5 |
-
from datetime import datetime
|
6 |
|
7 |
import gradio as gr
|
8 |
-
import torch
|
9 |
from pydub import AudioSegment
|
10 |
from transformers import pipeline
|
11 |
|
12 |
-
#
|
13 |
-
# Configuration
|
14 |
-
#
|
15 |
|
16 |
MODEL_NAME = "openai/whisper-large-v3"
|
17 |
device = 0 if torch.cuda.is_available() else "cpu"
|
18 |
|
19 |
-
# Pipeline Whisper pour la transcription
|
20 |
pipe = pipeline(
|
21 |
task="automatic-speech-recognition",
|
22 |
model=MODEL_NAME,
|
@@ -24,237 +21,192 @@ pipe = pipeline(
|
|
24 |
model_kwargs={"low_cpu_mem_usage": True},
|
25 |
)
|
26 |
|
27 |
-
# Dossier temporaire pour stocker extraits & ZIP
|
28 |
TEMP_DIR = "./temp_audio"
|
29 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
30 |
|
31 |
-
#
|
32 |
-
#
|
33 |
-
#
|
34 |
def init_metadata_state():
|
|
|
|
|
|
|
35 |
return []
|
36 |
|
37 |
-
#
|
38 |
-
# Étape 2 : Transcription
|
39 |
-
#
|
40 |
def transcribe_audio(audio_path):
|
41 |
"""
|
42 |
-
|
43 |
-
2) Retourne la transcription brute et une table pré-remplie
|
44 |
-
(l'utilisateur pourra remplir manuellement la partie 'Texte' s'il le souhaite)
|
45 |
"""
|
46 |
if not audio_path:
|
47 |
-
return "Aucun fichier audio fourni", [], None
|
48 |
|
49 |
-
#
|
50 |
result = pipe(audio_path, return_timestamps="word")
|
51 |
text = result["text"]
|
52 |
-
chunks = result["chunks"] # liste de { 'timestamp': (start, end), 'text': ... }
|
53 |
|
54 |
-
# Transcription brute
|
55 |
-
raw_transcription = " ".join([
|
56 |
|
57 |
-
#
|
58 |
-
|
59 |
|
|
|
60 |
|
61 |
-
#
|
62 |
-
# Étape 5 : Validation
|
63 |
-
#
|
64 |
-
def
|
65 |
"""
|
66 |
-
|
67 |
-
les timestamps (`start_time` et `end_time`) pour les lignes sans valeurs.
|
68 |
"""
|
69 |
if not audio_path:
|
70 |
return [None] * 20, metadata_state
|
71 |
|
72 |
-
# Charger l'audio complet
|
73 |
-
original_audio = AudioSegment.from_file(audio_path)
|
74 |
-
audio_duration = len(original_audio) / 1000 # Durée totale en secondes
|
75 |
-
|
76 |
# Nettoyer le dossier temporaire
|
77 |
if os.path.exists(TEMP_DIR):
|
78 |
shutil.rmtree(TEMP_DIR)
|
79 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
80 |
|
|
|
|
|
81 |
segment_paths = []
|
82 |
updated_metadata = []
|
83 |
|
84 |
-
# Dernier temps utilisé (pour calculer les suivants)
|
85 |
-
last_end_time = 0.0
|
86 |
-
|
87 |
for i, row in enumerate(table_data):
|
88 |
-
|
89 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
90 |
continue
|
91 |
|
92 |
-
|
93 |
-
start_time = row[1] if row[1] is not None else last_end_time
|
94 |
-
end_time = row[2] if row[2] is not None else min(audio_duration, start_time + 5) # Segment de 5s par défaut
|
95 |
-
|
96 |
-
# Ajuster `end_time` si nécessaire
|
97 |
-
if end_time <= start_time:
|
98 |
-
end_time = min(audio_duration, start_time + 5) # Minimum 5s
|
99 |
-
|
100 |
-
seg_id = row[3] if row[3] else f"seg_{i+1:02d}" # Générer ID si absent
|
101 |
-
|
102 |
-
# Convertir en millisecondes
|
103 |
-
start_ms = int(start_time * 1000)
|
104 |
-
end_ms = int(end_time * 1000)
|
105 |
-
|
106 |
-
# Découpe de l'audio
|
107 |
-
segment_filename = f"{Path(audio_path).stem}_{seg_id}.wav"
|
108 |
segment_path = os.path.join(TEMP_DIR, segment_filename)
|
|
|
|
|
109 |
extract = original_audio[start_ms:end_ms]
|
110 |
extract.export(segment_path, format="wav")
|
111 |
|
112 |
-
#
|
113 |
-
|
114 |
-
|
115 |
-
# Stocker la méta (pour metadata_state)
|
116 |
updated_metadata.append({
|
117 |
"audio_file": segment_filename,
|
118 |
-
"text":
|
119 |
"start_time": start_time,
|
120 |
"end_time": end_time,
|
121 |
-
"id":
|
122 |
})
|
123 |
|
124 |
-
|
125 |
-
|
126 |
-
# Remplir les 20 slots si moins de 20 segments
|
127 |
output_paths = segment_paths + [None] * (20 - len(segment_paths))
|
128 |
|
129 |
return output_paths, updated_metadata
|
130 |
-
|
131 |
-
#
|
132 |
-
#
|
|
|
133 |
def generate_zip(metadata_state):
|
134 |
"""
|
135 |
-
|
136 |
-
2) Zip tous les extraits + metadata.csv
|
137 |
-
3) Renvoie le chemin du ZIP pour téléchargement
|
138 |
"""
|
139 |
if not metadata_state:
|
140 |
return None
|
141 |
|
142 |
zip_path = os.path.join(TEMP_DIR, "dataset.zip")
|
143 |
-
# Supprimer le zip précédent si existe
|
144 |
if os.path.exists(zip_path):
|
145 |
os.remove(zip_path)
|
146 |
|
147 |
-
# Créer le
|
148 |
metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
|
149 |
with open(metadata_csv_path, "w", encoding="utf-8") as f:
|
150 |
f.write("audio_file|text|speaker_name|API\n")
|
151 |
for seg in metadata_state:
|
152 |
-
# Exemple de speaker_name et API
|
153 |
-
# A adapter selon tes besoins
|
154 |
line = f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n"
|
155 |
f.write(line)
|
156 |
|
157 |
-
#
|
158 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
159 |
-
# Ajouter les segments
|
160 |
-
for seg in metadata_state:
|
161 |
-
segment_file = os.path.join(TEMP_DIR, seg["audio_file"])
|
162 |
-
if os.path.exists(segment_file):
|
163 |
-
zf.write(segment_file, seg["audio_file"])
|
164 |
-
# Ajouter le CSV
|
165 |
zf.write(metadata_csv_path, "metadata.csv")
|
|
|
|
|
|
|
|
|
166 |
|
167 |
return zip_path
|
168 |
|
169 |
-
|
170 |
-
# ------------------------
|
171 |
# Construction de l'interface Gradio
|
172 |
-
#
|
173 |
with gr.Blocks(css="style.css") as demo:
|
174 |
-
gr.Markdown("# Application de
|
175 |
|
176 |
-
# State
|
177 |
metadata_state = gr.State(init_metadata_state())
|
178 |
|
179 |
-
# Étape 1 :
|
180 |
with gr.Column():
|
181 |
-
gr.Markdown("### 1. Téléversez
|
182 |
-
audio_input = gr.Audio(
|
183 |
-
type="filepath", label="Fichier audio"
|
184 |
-
)
|
185 |
|
186 |
-
# Étape
|
187 |
raw_transcription = gr.Textbox(
|
188 |
-
label="Transcription
|
189 |
-
placeholder="
|
190 |
-
interactive=False
|
191 |
)
|
192 |
|
193 |
-
# Étape
|
194 |
gr.Markdown("### 2. Définissez vos segments")
|
195 |
-
gr.Markdown("**Colonne 1** : Le texte (copiez-coller depuis la transcription si besoin) **Colonne 2** : Temps de début (en secondes) **Colonne 3** : Temps de fin (en secondes) **Colonne 4** : ID de votre segment (optionnel).")
|
196 |
table = gr.Dataframe(
|
197 |
headers=["Texte", "Début (s)", "Fin (s)", "ID"],
|
198 |
datatype=["str", "number", "number", "str"],
|
199 |
-
row_count=
|
200 |
-
col_count=4
|
201 |
)
|
|
|
|
|
202 |
validate_button = gr.Button("Valider et générer les extraits")
|
203 |
|
204 |
-
#
|
205 |
-
|
206 |
-
|
207 |
-
|
208 |
-
audio_player_2 = gr.Audio(label="Extrait 2", interactive=False)
|
209 |
-
audio_player_3 = gr.Audio(label="Extrait 3", interactive=False)
|
210 |
-
audio_player_4 = gr.Audio(label="Extrait 4", interactive=False)
|
211 |
-
audio_player_5 = gr.Audio(label="Extrait 5", interactive=False)
|
212 |
-
|
213 |
-
# Pour la sortie 'Valider segments', on veut 2 retours :
|
214 |
-
# - la liste des chemins (jusqu'à 5)
|
215 |
-
# - la metadata mise à jour
|
216 |
-
# => On va mapper ces chemins sur les 5 players
|
217 |
-
def update_audio_players(segments):
|
218 |
-
"""
|
219 |
-
Prend la liste des chemins d'extraits audio (taille variable)
|
220 |
-
et renvoie une liste de 5 valeurs (None si pas d'extrait)
|
221 |
-
"""
|
222 |
-
max_slots = 5
|
223 |
-
audio_values = [None]*max_slots
|
224 |
-
for i, seg in enumerate(segments):
|
225 |
-
if i < max_slots:
|
226 |
-
audio_values[i] = seg
|
227 |
-
return tuple(audio_values)
|
228 |
-
|
229 |
-
# Étape 8 : Génération ZIP
|
230 |
generate_button = gr.Button("Générer le fichier ZIP")
|
231 |
zip_file = gr.File(label="Télécharger le ZIP")
|
232 |
|
233 |
-
#
|
234 |
-
#
|
235 |
-
#
|
236 |
|
237 |
-
# 1
|
238 |
audio_input.change(
|
239 |
fn=transcribe_audio,
|
240 |
inputs=audio_input,
|
241 |
-
outputs=[raw_transcription, table, audio_input],
|
242 |
)
|
243 |
|
|
|
244 |
validate_button.click(
|
245 |
-
fn=
|
246 |
inputs=[audio_input, table, metadata_state],
|
247 |
-
outputs=[
|
248 |
-
[audio_player_1, audio_player_2, audio_player_3, audio_player_4, audio_player_5],
|
249 |
-
metadata_state
|
250 |
-
]
|
251 |
)
|
252 |
-
|
253 |
-
# Étape
|
254 |
generate_button.click(
|
255 |
fn=generate_zip,
|
256 |
inputs=metadata_state,
|
257 |
-
outputs=zip_file
|
258 |
)
|
259 |
|
260 |
demo.queue().launch()
|
|
|
2 |
import shutil
|
3 |
import zipfile
|
4 |
from pathlib import Path
|
|
|
5 |
|
6 |
import gradio as gr
|
|
|
7 |
from pydub import AudioSegment
|
8 |
from transformers import pipeline
|
9 |
|
10 |
+
# -------------------------------------------------
|
11 |
+
# Configuration
|
12 |
+
# -------------------------------------------------
|
13 |
|
14 |
MODEL_NAME = "openai/whisper-large-v3"
|
15 |
device = 0 if torch.cuda.is_available() else "cpu"
|
16 |
|
|
|
17 |
pipe = pipeline(
|
18 |
task="automatic-speech-recognition",
|
19 |
model=MODEL_NAME,
|
|
|
21 |
model_kwargs={"low_cpu_mem_usage": True},
|
22 |
)
|
23 |
|
|
|
24 |
TEMP_DIR = "./temp_audio"
|
25 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
26 |
|
27 |
+
# -------------------------------------------------
|
28 |
+
# Gestion de l'état
|
29 |
+
# -------------------------------------------------
|
30 |
def init_metadata_state():
|
31 |
+
"""
|
32 |
+
Initialise l'état pour stocker les informations des segments validés.
|
33 |
+
"""
|
34 |
return []
|
35 |
|
36 |
+
# -------------------------------------------------
|
37 |
+
# Étape 2 : Transcription avec Whisper
|
38 |
+
# -------------------------------------------------
|
39 |
def transcribe_audio(audio_path):
|
40 |
"""
|
41 |
+
Retourne la transcription brute, un tableau de segments vides et le chemin de l'audio.
|
|
|
|
|
42 |
"""
|
43 |
if not audio_path:
|
44 |
+
return "Aucun fichier audio fourni", [["", None, None, ""] for _ in range(20)], None
|
45 |
|
46 |
+
# Transcrire
|
47 |
result = pipe(audio_path, return_timestamps="word")
|
48 |
text = result["text"]
|
|
|
49 |
|
50 |
+
# Transcription brute
|
51 |
+
raw_transcription = " ".join([chunk["text"] for chunk in result["chunks"]])
|
52 |
|
53 |
+
# Tableau de 20 lignes vides pour l'édition
|
54 |
+
table_init = [["", None, None, ""] for _ in range(20)]
|
55 |
|
56 |
+
return raw_transcription, table_init, audio_path
|
57 |
|
58 |
+
# -------------------------------------------------
|
59 |
+
# Étape 5 : Validation + découpe
|
60 |
+
# -------------------------------------------------
|
61 |
+
def validate_segments(audio_path, table_data, metadata_state):
|
62 |
"""
|
63 |
+
Découpe l'audio en fonction des segments validés et met à jour l'état.
|
|
|
64 |
"""
|
65 |
if not audio_path:
|
66 |
return [None] * 20, metadata_state
|
67 |
|
|
|
|
|
|
|
|
|
68 |
# Nettoyer le dossier temporaire
|
69 |
if os.path.exists(TEMP_DIR):
|
70 |
shutil.rmtree(TEMP_DIR)
|
71 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
72 |
|
73 |
+
original_audio = AudioSegment.from_file(audio_path)
|
74 |
+
|
75 |
segment_paths = []
|
76 |
updated_metadata = []
|
77 |
|
|
|
|
|
|
|
78 |
for i, row in enumerate(table_data):
|
79 |
+
if len(row) < 4:
|
80 |
+
continue # Ligne incomplète
|
81 |
+
text, start_time, end_time, segment_id = row
|
82 |
+
|
83 |
+
if not text or start_time is None or end_time is None:
|
84 |
+
continue # Ligne vide ou incomplète
|
85 |
+
if not segment_id:
|
86 |
+
segment_id = f"seg_{i+1:02d}"
|
87 |
+
|
88 |
+
start_ms = int(float(start_time) * 1000)
|
89 |
+
end_ms = int(float(end_time) * 1000)
|
90 |
+
if start_ms < 0 or end_ms <= start_ms:
|
91 |
continue
|
92 |
|
93 |
+
segment_filename = f"{Path(audio_path).stem}_{segment_id}.wav"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
94 |
segment_path = os.path.join(TEMP_DIR, segment_filename)
|
95 |
+
|
96 |
+
# Découpe et export
|
97 |
extract = original_audio[start_ms:end_ms]
|
98 |
extract.export(segment_path, format="wav")
|
99 |
|
100 |
+
# Stocker les informations
|
101 |
+
segment_paths.append(segment_path)
|
|
|
|
|
102 |
updated_metadata.append({
|
103 |
"audio_file": segment_filename,
|
104 |
+
"text": text,
|
105 |
"start_time": start_time,
|
106 |
"end_time": end_time,
|
107 |
+
"id": segment_id,
|
108 |
})
|
109 |
|
110 |
+
# Remplir les sorties audio (20 max)
|
|
|
|
|
111 |
output_paths = segment_paths + [None] * (20 - len(segment_paths))
|
112 |
|
113 |
return output_paths, updated_metadata
|
114 |
+
|
115 |
+
# -------------------------------------------------
|
116 |
+
# Étape 8 : Génération du ZIP
|
117 |
+
# -------------------------------------------------
|
118 |
def generate_zip(metadata_state):
|
119 |
"""
|
120 |
+
Génère un fichier ZIP contenant les segments audio et un fichier metadata.csv.
|
|
|
|
|
121 |
"""
|
122 |
if not metadata_state:
|
123 |
return None
|
124 |
|
125 |
zip_path = os.path.join(TEMP_DIR, "dataset.zip")
|
|
|
126 |
if os.path.exists(zip_path):
|
127 |
os.remove(zip_path)
|
128 |
|
129 |
+
# Créer le fichier metadata.csv
|
130 |
metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
|
131 |
with open(metadata_csv_path, "w", encoding="utf-8") as f:
|
132 |
f.write("audio_file|text|speaker_name|API\n")
|
133 |
for seg in metadata_state:
|
|
|
|
|
134 |
line = f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n"
|
135 |
f.write(line)
|
136 |
|
137 |
+
# Ajouter les fichiers au ZIP
|
138 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
|
|
|
|
|
|
|
|
|
|
|
139 |
zf.write(metadata_csv_path, "metadata.csv")
|
140 |
+
for seg in metadata_state:
|
141 |
+
file_path = os.path.join(TEMP_DIR, seg["audio_file"])
|
142 |
+
if os.path.exists(file_path):
|
143 |
+
zf.write(file_path, seg["audio_file"])
|
144 |
|
145 |
return zip_path
|
146 |
|
147 |
+
# -------------------------------------------------
|
|
|
148 |
# Construction de l'interface Gradio
|
149 |
+
# -------------------------------------------------
|
150 |
with gr.Blocks(css="style.css") as demo:
|
151 |
+
gr.Markdown("# Application de Découpe Audio (jusqu'à 20 segments)")
|
152 |
|
153 |
+
# State global
|
154 |
metadata_state = gr.State(init_metadata_state())
|
155 |
|
156 |
+
# Étape 1 : Téléversement audio
|
157 |
with gr.Column():
|
158 |
+
gr.Markdown("### 1. Téléversez un fichier audio (MP3/WAV)")
|
159 |
+
audio_input = gr.Audio(source="upload", type="filepath", label="Fichier audio")
|
|
|
|
|
160 |
|
161 |
+
# Étape 2 : Transcription brute
|
162 |
raw_transcription = gr.Textbox(
|
163 |
+
label="Transcription (Whisper)",
|
164 |
+
placeholder="Le texte apparaîtra ici après chargement.",
|
165 |
+
interactive=False,
|
166 |
)
|
167 |
|
168 |
+
# Étape 3 : Table des segments
|
169 |
gr.Markdown("### 2. Définissez vos segments")
|
|
|
170 |
table = gr.Dataframe(
|
171 |
headers=["Texte", "Début (s)", "Fin (s)", "ID"],
|
172 |
datatype=["str", "number", "number", "str"],
|
173 |
+
row_count=20,
|
174 |
+
col_count=4,
|
175 |
)
|
176 |
+
|
177 |
+
# Bouton de validation
|
178 |
validate_button = gr.Button("Valider et générer les extraits")
|
179 |
|
180 |
+
# 20 lecteurs audio
|
181 |
+
audio_players = [gr.Audio(label=f"Extrait {i+1}", interactive=False) for i in range(20)]
|
182 |
+
|
183 |
+
# Bouton pour générer le fichier ZIP
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
184 |
generate_button = gr.Button("Générer le fichier ZIP")
|
185 |
zip_file = gr.File(label="Télécharger le ZIP")
|
186 |
|
187 |
+
# ----------------
|
188 |
+
# Callbacks
|
189 |
+
# ----------------
|
190 |
|
191 |
+
# Étape 1 : Transcription audio
|
192 |
audio_input.change(
|
193 |
fn=transcribe_audio,
|
194 |
inputs=audio_input,
|
195 |
+
outputs=[raw_transcription, table, audio_input],
|
196 |
)
|
197 |
|
198 |
+
# Étape 5 : Validation des segments
|
199 |
validate_button.click(
|
200 |
+
fn=validate_segments,
|
201 |
inputs=[audio_input, table, metadata_state],
|
202 |
+
outputs=audio_players + [metadata_state],
|
|
|
|
|
|
|
203 |
)
|
204 |
+
|
205 |
+
# Étape 8 : Génération du fichier ZIP
|
206 |
generate_button.click(
|
207 |
fn=generate_zip,
|
208 |
inputs=metadata_state,
|
209 |
+
outputs=zip_file,
|
210 |
)
|
211 |
|
212 |
demo.queue().launch()
|