Woziii commited on
Commit
3d786ab
·
verified ·
1 Parent(s): ec37ecb

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +148 -106
app.py CHANGED
@@ -7,9 +7,7 @@ import gradio as gr
7
  from pydub import AudioSegment
8
  from transformers import pipeline
9
 
10
- # -------------------------------------------------
11
  # Configuration
12
- # -------------------------------------------------
13
  MODEL_NAME = "openai/whisper-large-v3"
14
  device = "cuda" if torch.cuda.is_available() else "cpu"
15
 
@@ -28,137 +26,181 @@ def init_metadata_state():
28
 
29
  def transcribe_audio(audio_path):
30
  if not audio_path:
31
- return "Aucun fichier audio fourni", [], None, []
32
 
33
- result = pipe(audio_path, return_timestamps="word", generate_kwargs={"language": "french"})
34
- words = result.get("chunks", [])
35
 
36
- if not words:
37
- return "Erreur lors de la récupération des timestamps", [], None, []
38
 
39
- raw_transcription = " ".join([w["text"] for w in words])
40
- word_timestamps = [(w["text"], w["timestamp"]) for w in words]
 
41
 
42
- return raw_transcription, [], audio_path, word_timestamps
43
-
44
- def preprocess_segments(table_data, word_timestamps):
45
- formatted_data = []
46
 
47
- for i, row in enumerate(table_data):
48
- if not row or len(row) < 1 or not row[0].strip():
49
- continue
50
-
51
- text = row[0].strip()
52
- segment_id = f"seg_{i+1:02d}"
53
-
54
- matching_timestamps = [
55
- (start, end) for word, (start, end) in word_timestamps if word in text
56
- ]
57
-
58
- if matching_timestamps:
59
- start_time, end_time = matching_timestamps[0]
60
- else:
61
- start_time, end_time = None, None
62
-
63
- formatted_data.append([text, start_time, end_time, segment_id])
64
-
65
- return formatted_data
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
 
67
- def validate_segments(audio_path, table_data, metadata_state, word_timestamps):
68
- if not audio_path or not word_timestamps:
69
  return [], metadata_state
70
 
71
  if os.path.exists(TEMP_DIR):
72
  shutil.rmtree(TEMP_DIR)
73
  os.makedirs(TEMP_DIR, exist_ok=True)
74
 
75
- original_audio = AudioSegment.from_file(audio_path)
76
- segment_paths = []
77
- updated_metadata = []
78
-
79
- for text, start_time, end_time, segment_id in table_data:
80
- if start_time is None or end_time is None:
81
- continue
82
-
83
- start_ms, end_ms = int(float(start_time) * 1000), int(float(end_time) * 1000)
84
- if start_ms < 0 or end_ms <= start_ms:
85
- continue
86
-
87
- segment_filename = f"{Path(audio_path).stem}_{segment_id}.wav"
88
- segment_path = os.path.join(TEMP_DIR, segment_filename)
89
-
90
- extract = original_audio[start_ms:end_ms]
91
- extract.export(segment_path, format="wav")
92
-
93
- segment_paths.append(segment_path)
94
- updated_metadata.append({
95
- "audio_file": segment_filename,
96
- "text": text,
97
- "start_time": start_time,
98
- "end_time": end_time,
99
- "id": segment_id,
100
- })
101
-
102
- return segment_paths, updated_metadata
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
  def generate_zip(metadata_state):
105
  if not metadata_state:
106
  return None
107
 
108
- zip_path = os.path.join(TEMP_DIR, "dataset.zip")
109
- if os.path.exists(zip_path):
110
- os.remove(zip_path)
111
-
112
- metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
113
- with open(metadata_csv_path, "w", encoding="utf-8") as f:
114
- f.write("audio_file|text|speaker_name|API\n")
115
- for seg in metadata_state:
116
- f.write(f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n")
117
-
118
- with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
119
- zf.write(metadata_csv_path, "metadata.csv")
120
- for seg in metadata_state:
121
- file_path = os.path.join(TEMP_DIR, seg["audio_file"])
122
- if os.path.exists(file_path):
123
- zf.write(file_path, seg["audio_file"])
124
-
125
- return zip_path
 
 
 
 
126
 
127
- # -------------------------------------------------
128
  # Interface Gradio
129
- # -------------------------------------------------
130
  with gr.Blocks() as demo:
131
  gr.Markdown("# Application de Découpe Audio")
132
  metadata_state = gr.State(init_metadata_state())
133
- extracted_segments = gr.State([])
134
 
135
- audio_input = gr.Audio(type="filepath", label="Fichier audio")
136
- raw_transcription = gr.Textbox(label="Transcription", interactive=False)
137
- table = gr.Dataframe(headers=["Texte"], datatype=["str"], row_count=(1, "dynamic"), col_count=1)
138
- validate_button = gr.Button("Valider")
139
- generate_button = gr.Button("Générer ZIP")
 
 
 
 
 
 
 
 
 
 
 
 
140
  zip_file = gr.File(label="Télécharger le ZIP")
141
 
142
- word_timestamps = gr.State()
143
-
144
- audio_input.change(transcribe_audio, inputs=audio_input, outputs=[raw_transcription, table, audio_input, word_timestamps])
 
 
 
 
145
 
 
146
  validate_button.click(
147
- fn=lambda table_data, word_timestamps, audio_path, metadata_state: validate_segments(
148
- audio_path, preprocess_segments(table_data, word_timestamps), metadata_state, word_timestamps
149
- ),
150
- inputs=[table, word_timestamps, audio_input, metadata_state],
151
- outputs=[extracted_segments, metadata_state],
152
  )
153
-
154
- @gr.render(inputs=extracted_segments)
155
- def show_audio_excerpts(segments):
156
- if not segments:
157
- gr.Markdown("Aucun extrait généré.")
158
- else:
159
- for i, seg in enumerate(segments):
160
- gr.Audio(label=f"Extrait {i+1}", value=seg)
161
-
162
  generate_button.click(generate_zip, inputs=metadata_state, outputs=zip_file)
163
 
164
- demo.queue().launch()
 
7
  from pydub import AudioSegment
8
  from transformers import pipeline
9
 
 
10
  # Configuration
 
11
  MODEL_NAME = "openai/whisper-large-v3"
12
  device = "cuda" if torch.cuda.is_available() else "cpu"
13
 
 
26
 
27
  def transcribe_audio(audio_path):
28
  if not audio_path:
29
+ return "Aucun fichier audio fourni", [], None
30
 
31
+ # Utilisation explicite des timestamps au niveau des mots
32
+ result = pipe(audio_path, return_timestamps="word")
33
 
34
+ # Extraction du texte complet
35
+ text = result["text"]
36
 
37
+ # Création des segments initiaux à partir des timestamps
38
+ chunks = result["chunks"]
39
+ table_data = []
40
 
41
+ # Regrouper les mots en phrases (séparées par la ponctuation)
42
+ current_segment = []
43
+ current_start = None
 
44
 
45
+ for chunk in chunks:
46
+ if not current_start:
47
+ current_start = chunk["timestamp"][0]
48
+
49
+ current_segment.append(chunk["text"])
50
+
51
+ # Si on trouve une ponctuation de fin ou si le segment est assez long
52
+ if any(punct in chunk["text"] for punct in ".!?") or len(" ".join(current_segment)) > 100:
53
+ segment_text = " ".join(current_segment).strip()
54
+ if segment_text:
55
+ table_data.append([
56
+ segment_text,
57
+ round(current_start, 2),
58
+ round(chunk["timestamp"][1], 2),
59
+ f"seg_{len(table_data)+1:02d}"
60
+ ])
61
+ current_segment = []
62
+ current_start = None
63
+
64
+ # Ajouter le dernier segment s'il existe
65
+ if current_segment:
66
+ last_chunk = chunks[-1]
67
+ segment_text = " ".join(current_segment).strip()
68
+ if segment_text:
69
+ table_data.append([
70
+ segment_text,
71
+ round(current_start, 2),
72
+ round(last_chunk["timestamp"][1], 2),
73
+ f"seg_{len(table_data)+1:02d}"
74
+ ])
75
+
76
+ # S'assurer qu'on a au moins 5 lignes pour l'interface
77
+ while len(table_data) < 5:
78
+ table_data.append(["", None, None, ""])
79
+
80
+ return text, table_data, audio_path
81
 
82
+ def validate_segments(audio_path, table_data, metadata_state):
83
+ if not audio_path:
84
  return [], metadata_state
85
 
86
  if os.path.exists(TEMP_DIR):
87
  shutil.rmtree(TEMP_DIR)
88
  os.makedirs(TEMP_DIR, exist_ok=True)
89
 
90
+ try:
91
+ original_audio = AudioSegment.from_file(audio_path)
92
+ segment_paths = []
93
+ updated_metadata = []
94
+
95
+ for i, row in enumerate(table_data):
96
+ if not row or len(row) < 4:
97
+ continue
98
+
99
+ text, start_time, end_time, segment_id = row
100
+
101
+ if not text or start_time is None or end_time is None:
102
+ continue
103
+
104
+ if not segment_id:
105
+ segment_id = f"seg_{i+1:02d}"
106
+
107
+ start_ms = int(float(start_time) * 1000)
108
+ end_ms = int(float(end_time) * 1000)
109
+
110
+ if start_ms < 0 or end_ms <= start_ms or end_ms > len(original_audio):
111
+ print(f"Invalid timestamps for segment {segment_id}: {start_time}-{end_time}")
112
+ continue
113
+
114
+ segment_filename = f"{Path(audio_path).stem}_{segment_id}.wav"
115
+ segment_path = os.path.join(TEMP_DIR, segment_filename)
116
+
117
+ try:
118
+ segment_audio = original_audio[start_ms:end_ms]
119
+ segment_audio.export(segment_path, format="wav")
120
+
121
+ segment_paths.append(segment_path)
122
+ updated_metadata.append({
123
+ "audio_file": segment_filename,
124
+ "text": text,
125
+ "start_time": start_time,
126
+ "end_time": end_time,
127
+ "id": segment_id,
128
+ })
129
+ except Exception as e:
130
+ print(f"Error processing segment {segment_id}: {str(e)}")
131
+ continue
132
+
133
+ return segment_paths, updated_metadata
134
+
135
+ except Exception as e:
136
+ print(f"Error in validate_segments: {str(e)}")
137
+ return [], metadata_state
138
 
139
  def generate_zip(metadata_state):
140
  if not metadata_state:
141
  return None
142
 
143
+ try:
144
+ zip_path = os.path.join(TEMP_DIR, "dataset.zip")
145
+ if os.path.exists(zip_path):
146
+ os.remove(zip_path)
147
+
148
+ metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
149
+ with open(metadata_csv_path, "w", encoding="utf-8") as f:
150
+ f.write("audio_file|text|speaker_name|API\n")
151
+ for seg in metadata_state:
152
+ f.write(f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n")
153
+
154
+ with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
155
+ zf.write(metadata_csv_path, "metadata.csv")
156
+ for seg in metadata_state:
157
+ file_path = os.path.join(TEMP_DIR, seg["audio_file"])
158
+ if os.path.exists(file_path):
159
+ zf.write(file_path, seg["audio_file"])
160
+
161
+ return zip_path
162
+ except Exception as e:
163
+ print(f"Error generating ZIP: {str(e)}")
164
+ return None
165
 
 
166
  # Interface Gradio
 
167
  with gr.Blocks() as demo:
168
  gr.Markdown("# Application de Découpe Audio")
169
  metadata_state = gr.State(init_metadata_state())
 
170
 
171
+ with gr.Row():
172
+ audio_input = gr.Audio(type="filepath", label="Fichier audio")
173
+ raw_transcription = gr.Textbox(label="Transcription", interactive=False)
174
+
175
+ table = gr.Dataframe(
176
+ headers=["Texte", "Début (s)", "Fin (s)", "ID"],
177
+ datatype=["str", "number", "number", "str"],
178
+ row_count=5,
179
+ col_count=4,
180
+ interactive=True
181
+ )
182
+
183
+ with gr.Row():
184
+ validate_button = gr.Button("Valider")
185
+ generate_button = gr.Button("Générer ZIP")
186
+
187
+ status_message = gr.Textbox(label="Status", interactive=False)
188
  zip_file = gr.File(label="Télécharger le ZIP")
189
 
190
+ def process_validation(audio_path, table_data, metadata_state):
191
+ if not audio_path:
192
+ return [], [], "Aucun fichier audio fourni"
193
+
194
+ segments, metadata = validate_segments(audio_path, table_data, metadata_state)
195
+ status = f"{len(segments)} segments générés" if segments else "Aucun segment généré"
196
+ return table_data, metadata, status
197
 
198
+ audio_input.change(transcribe_audio, inputs=audio_input, outputs=[raw_transcription, table, audio_input])
199
  validate_button.click(
200
+ process_validation,
201
+ inputs=[audio_input, table, metadata_state],
202
+ outputs=[table, metadata_state, status_message]
 
 
203
  )
 
 
 
 
 
 
 
 
 
204
  generate_button.click(generate_zip, inputs=metadata_state, outputs=zip_file)
205
 
206
+ demo.queue().launch()