import gradio as gr from transformers import pipeline import numpy as np import os from huggingface_hub import login from scipy.io.wavfile import write import uuid import torch import boto3 # from pyannote.audio import Model # from pyannote.audio.pipelines import VoiceActivityDetection import datetime import time import spaces print("cuda", torch.cuda.is_available()) access_token_read = os.environ.get('HF_TOKEN', None) access_key = os.environ.get('access_key', None) secret_access_key = os.environ.get('secret_access_key', None) pannote = os.environ.get('pannote', None) login(token = access_token_read) session = boto3.Session( aws_access_key_id=access_key, aws_secret_access_key=secret_access_key, ) s3 = session.resource('s3') BUCKET = "audio-text-938" print("cur path", os.listdir(os.path.join("..", "..", ".."))) if not os.path.isdir(os.path.join("..", "..", "..", "data", "hfcache")): os.mkdir(os.path.join("..", "..", "..", "data", "hfcache")) if not os.path.isdir(os.path.join("..", "..", "..", "data", "audio")): os.mkdir(os.path.join("..", "..", "..", "data", "audio")) if not os.path.isdir(os.path.join("..", "..", "..", "data", "audio_texts")): os.mkdir(os.path.join("..", "..", "..", "data", "audio_texts")) os.environ["HF_HOME"] = os.path.join("..", "..", "..", "data", "hfcache") transcriber = pipeline("automatic-speech-recognition", model='Simranjit/whisper-medical-french', chunk_length_s=30, device="cuda") # model = Model.from_pretrained("pyannote/segmentation", # use_auth_token=pannote) # vad_pipeline = VoiceActivityDetection(segmentation=model) # HYPER_PARAMETERS = { # "onset": 0.5, "offset": 0.5, # "min_duration_on": 0.5, # "min_duration_off": 0.4 # } # vad_pipeline.instantiate(HYPER_PARAMETERS) def post_process(text): text = text.replace("nouvelle ligne", "\n") text = text.replace("à la ligne", "\n") text = text.replace("point d'intérogation", "?") text = text.replace("point d'intérrogation", "?") text = text.replace("point d'interrogation", "?") text = text.replace("point d'interogation", "?") text = text.replace(" virgule", ",") text = text.replace(" virgule", ",") text = text.replace(" deux points", ":") text = text.replace(" deux points", ":") text = text.replace(" point", ".") text = text.replace(" point", ".") text = text.replace(" nouveau paragraphe ", "\n\n") text = text.replace(" paragraphe ", "\n\n") text = text.split("\n") text = [t.strip() for t in text] text = "\n".join(text) return text def streaming_audio_transcribe_last_segment(stream, current_speaches, old_text, last_text, inp): sr, y = inp y = y.astype(np.float32) y /= np.max(np.abs(y)) if stream is not None: stream = np.concatenate([stream, y]) else: stream = y wave = torch.tensor(np.expand_dims(stream, axis=0)) vad = vad_pipeline({"waveform": wave, "sample_rate": sr}) vad_text = str(vad) start = vad_text.split("\n") start = [s for s in start if s!=""] if len(start)>0: s = start[-1].split('-->')[0] s = s.replace("[", "") s = s.strip() s = time.strptime(s.split('.')[0],'%H:%M:%S') s = datetime.timedelta(hours=s.tm_hour,minutes=s.tm_min,seconds=s.tm_sec).total_seconds() s = s * 1000 e = start[-1].split('-->')[1] e = e.split("]")[0] e = e.strip() ex = e e = time.strptime(e.split('.')[0],'%H:%M:%S') e = datetime.timedelta(hours=e.tm_hour,minutes=e.tm_min,seconds=e.tm_sec).total_seconds() e = (e * 1000) + float(ex.split('.')[1]) + 400 else: s = 0 e = 0 if (e - s)> 500: start_seconds = s/1000 start_num = start_seconds *sr end_seconds = e/1000 end_num = end_seconds * sr segment = stream[int(start_num):int(end_num)] output = transcriber({"sampling_rate": sr, "raw": segment}) text = output["text"] else: text = "start dictating" if len(start)>current_speaches: current_speaches = len(start) old_text = old_text + last_text last_text = text else: last_text = text all_text = old_text + text all_text = post_process(all_text) return stream, current_speaches, old_text, last_text, all_text @spaces.GPU def transcribe(state, audio): sr, y = audio y = y.astype(np.float32) y /= np.max(np.abs(y)) if state is not None: state = np.concatenate([state, y]) else: state = y text = transcriber({"sampling_rate": sr, "raw": state})["text"] text = post_process(text) return state, text def save_fn(audio, text): sr, y = audio y = y.astype(np.float32) y /= np.max(np.abs(y)) uid = str(uuid.uuid4()) with open(f"{uid}.txt", "w", encoding="utf-8") as f: f.write(text) s3.Bucket(BUCKET).upload_file(f"{uid}.txt", f"texts/{uid}.txt") #local path, bucket path write(f"{uid}.wav", sr, y) s3.Bucket(BUCKET).upload_file(f"{uid}.wav", f"audios/{uid}.wav") #local path, bucket path return [None, None, ""] with gr.Blocks() as demo: state = gr.State(None) current_speaches = gr.State(1) old_text = gr.State("") last_text = gr.State("") audio = gr.Audio(streaming=True) text = gr.TextArea(show_copy_button=True) audio.stream(fn=transcribe, inputs=[state, audio], outputs=[state, text]) # audio.stream(fn=streaming_audio_transcribe_last_segment, # inputs=[state, current_speaches, old_text, last_text, audio], # outputs=[state, current_speaches, old_text, last_text, text]) save = gr.Button("save") save.click(fn=save_fn, inputs=[audio, text], outputs=[state, audio, text]) demo.launch(share=True)