Spaces:
Runtime error
Runtime error
#! /usr/bin/env python | |
# coding=utf-8 | |
# Copyright 2023 Bofeng Huang | |
import datetime | |
import logging | |
import os | |
import re | |
import warnings | |
import gradio as gr | |
import librosa | |
# import nltk | |
import pandas as pd | |
import psutil | |
import pytube as pt | |
import torch | |
# import torchaudio | |
from transformers import pipeline, Wav2Vec2ProcessorWithLM, AutoModelForCTC | |
from transformers.utils.logging import disable_progress_bar | |
# nltk.download("punkt") | |
# from nltk.tokenize import sent_tokenize | |
warnings.filterwarnings("ignore") | |
disable_progress_bar() | |
DEFAULT_MODEL_NAME = "bofenghuang/asr-wav2vec2-ctc-french" | |
SAMPLE_RATE = 16_000 | |
GEN_KWARGS = { | |
"chunk_length_s": 30, | |
"stride_length_s": 5, | |
} | |
logging.basicConfig( | |
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", | |
datefmt="%Y-%m-%dT%H:%M:%SZ", | |
) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
# device = 0 if torch.cuda.is_available() else "cpu" | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
logger.info(f"Model will be loaded on device `{device}`") | |
cached_models = {} | |
def _return_yt_html_embed(yt_url): | |
video_id = yt_url.split("?v=")[-1] | |
HTML_str = ( | |
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' | |
" </center>" | |
) | |
return HTML_str | |
def download_audio_from_youtube(yt_url, downloaded_filename="audio.wav"): | |
yt = pt.YouTube(yt_url) | |
stream = yt.streams.filter(only_audio=True)[0] | |
# stream.download(filename="audio.mp3") | |
stream.download(filename=downloaded_filename) | |
return downloaded_filename | |
def download_video_from_youtube(yt_url, downloaded_filename="video.mp4"): | |
yt = pt.YouTube(yt_url) | |
stream = yt.streams.filter(progressive=True, file_extension="mp4").order_by("resolution").desc().first() | |
stream.download(filename=downloaded_filename) | |
logger.info(f"Download YouTube video from {yt_url}") | |
return downloaded_filename | |
def _print_memory_info(): | |
memory = psutil.virtual_memory() | |
logger.info( | |
f"Memory info - Free: {memory.available / (1024 ** 3):.2f} Gb, used: {memory.percent}%, total: {memory.total / (1024 ** 3):.2f} Gb" | |
) | |
def _print_cuda_memory_info(): | |
used_mem, tot_mem = torch.cuda.mem_get_info() | |
logger.info( | |
f"CUDA memory info - Free: {used_mem / 1024 ** 3:.2f} Gb, used: {(tot_mem - used_mem) / 1024 ** 3:.2f} Gb, total: {tot_mem / 1024 ** 3:.2f} Gb" | |
) | |
def print_memory_info(): | |
_print_memory_info() | |
if torch.cuda.is_available(): | |
_print_cuda_memory_info() | |
def maybe_load_cached_pipeline(model_name): | |
model = cached_models.get(model_name) | |
if model is None: | |
pipe = pipeline(model=model_name, device=device) | |
# model = AutoModelForCTC.from_pretrained(model_name).to(device) | |
# processor = Wav2Vec2ProcessorWithLM.from_pretrained(model_name) | |
# pipe = pipeline( | |
# "automatic-speech-recognition", | |
# model=model, | |
# tokenizer=processor.tokenizer, | |
# feature_extractor=processor.feature_extractor, | |
# decoder=processor.decoder, | |
# ) | |
logger.info(f"`{model_name}` has been loaded on device `{device}`") | |
print_memory_info() | |
cached_models[model_name] = pipe | |
return model | |
def process_audio_file(audio_file): | |
# waveform, sample_rate = torchaudio.load(audio_file) | |
# waveform = waveform.squeeze(axis=0) # mono | |
# # resample | |
# if sample_rate != SAMPLE_RATE: | |
# resampler = torchaudio.transforms.Resample(sample_rate, SAMPLE_RATE) | |
# waveform = resampler(waveform) | |
waveform, sample_rate = librosa.load(audio_file, mono=True) | |
# resample | |
if sample_rate != SAMPLE_RATE: | |
waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=SAMPLE_RATE) | |
return waveform | |
def infer(model, filename, return_df=False): | |
audio_data = process_audio_file(filename) | |
text = model(audio_data, **GEN_KWARGS)["text"] | |
if return_df: | |
# return pd.DataFrame({"text": sent_tokenize(text)}) | |
return pd.DataFrame({"text": [text]}) | |
else: | |
return text | |
def transcribe(microphone, file_upload, model_name=DEFAULT_MODEL_NAME): | |
warn_output = "" | |
if (microphone is not None) and (file_upload is not None): | |
warn_output = ( | |
"WARNING: You've uploaded an audio file and used the microphone. " | |
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" | |
) | |
elif (microphone is None) and (file_upload is None): | |
return "ERROR: You have to either use the microphone or upload an audio file" | |
file = microphone if microphone is not None else file_upload | |
model = maybe_load_cached_pipeline(model_name) | |
text = infer(model, file, return_df=True) | |
logger.info(f'Transcription by `{model_name}`:\n{text.to_json(orient="index", force_ascii=False, indent=2)}\n') | |
# return warn_output + text | |
return text | |
def yt_transcribe(yt_url, model_name=DEFAULT_MODEL_NAME): | |
# html_embed_str = _return_yt_html_embed(yt_url) | |
audio_file_path = download_audio_from_youtube(yt_url) | |
model = maybe_load_cached_pipeline(model_name) | |
text = infer(model, audio_file_path, return_df=True) | |
logger.info( | |
f'Transcription by `{model_name}` of "{yt_url}":\n{text.to_json(orient="index", force_ascii=False, indent=2)}\n' | |
) | |
# return html_embed_str, text | |
return text | |
def video_transcribe(video_file_path, model_name=DEFAULT_MODEL_NAME): | |
if video_file_path is None: | |
raise ValueError("Failed to transcribe video as no video_file_path has been defined") | |
audio_file_path = re.sub(r"\.mp4$", ".wav", video_file_path) | |
os.system( | |
f'ffmpeg -hide_banner -loglevel error -y -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{audio_file_path}"' | |
) | |
model = maybe_load_cached_pipeline(model_name) | |
text = infer(model, audio_file_path, return_df=True) | |
logger.info(f'Transcription by `{model_name}`:\n{text.to_json(orient="index", force_ascii=False, indent=2)}\n') | |
return text | |
# load default model | |
maybe_load_cached_pipeline(DEFAULT_MODEL_NAME) | |
# default_text_output_df = pd.DataFrame(columns=["start", "end", "text"]) | |
default_text_output_df = pd.DataFrame(columns=["text"]) | |
with gr.Blocks() as demo: | |
with gr.Tab("Transcribe Audio"): | |
gr.Markdown( | |
f""" | |
<div> | |
<h1 style='text-align: center'>Speech-to-Text in French: Transcribe Audio</h1> | |
</div> | |
Transcribe long-form microphone or audio inputs! | |
Demo uses the fine-tuned wav2vec2 model <a href='https://huggingface.co./{DEFAULT_MODEL_NAME}' target='_blank'><b>{DEFAULT_MODEL_NAME}</b></a> and π€ Transformers to transcribe audio files of arbitrary length. | |
To achieve improved accuracy and well-punctuated text, please use the [Whisper demo](https://huggingface.co./spaces/bofenghuang/whisper-demo-french). | |
""" | |
) | |
microphone_input = gr.inputs.Audio(source="microphone", type="filepath", label="Record", optional=True) | |
upload_input = gr.inputs.Audio(source="upload", type="filepath", label="Upload File", optional=True) | |
# with_timestamps_input = gr.Checkbox(label="With timestamps?") | |
microphone_transcribe_btn = gr.Button("Transcribe Audio") | |
# gr.Markdown(''' | |
# Here you will get generated transcrit. | |
# ''') | |
# microphone_text_output = gr.outputs.Textbox(label="Transcription") | |
text_output_df2 = gr.DataFrame( | |
value=default_text_output_df, | |
label="Transcription", | |
row_count=(0, "dynamic"), | |
max_rows=10, | |
wrap=True, | |
overflow_row_behaviour="paginate", | |
) | |
microphone_transcribe_btn.click(transcribe, inputs=[microphone_input, upload_input], outputs=text_output_df2) | |
# with gr.Tab("Transcribe YouTube"): | |
# gr.Markdown( | |
# f""" | |
# <div> | |
# <h1 style='text-align: center'>Speech-to-Text in French: Transcribe YouTube</h1> | |
# </div> | |
# Transcribe long-form YouTube videos! | |
# Demo uses the fine-tuned checkpoint: <a href='https://huggingface.co./{DEFAULT_MODEL_NAME}' target='_blank'><b>{DEFAULT_MODEL_NAME}</b></a> to transcribe video files of arbitrary length. | |
# """ | |
# ) | |
# yt_link_input2 = gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL") | |
# with_timestamps_input2 = gr.Checkbox(label="With timestamps?", value=True) | |
# yt_transcribe_btn = gr.Button("Transcribe YouTube") | |
# # yt_text_output = gr.outputs.Textbox(label="Transcription") | |
# text_output_df3 = gr.DataFrame( | |
# value=default_text_output_df, | |
# label="Transcription", | |
# row_count=(0, "dynamic"), | |
# max_rows=10, | |
# wrap=True, | |
# overflow_row_behaviour="paginate", | |
# ) | |
# # yt_html_output = gr.outputs.HTML(label="YouTube Page") | |
# yt_transcribe_btn.click(yt_transcribe, inputs=[yt_link_input2, with_timestamps_input2], outputs=[text_output_df3]) | |
with gr.Tab("Transcribe Video"): | |
gr.Markdown( | |
f""" | |
<div> | |
<h1 style='text-align: center'>Speech-to-Text in French: Transcribe Video</h1> | |
</div> | |
Transcribe long-form YouTube videos or uploaded video inputs! | |
Demo uses the fine-tuned wav2vec2 model <a href='https://huggingface.co./{DEFAULT_MODEL_NAME}' target='_blank'><b>{DEFAULT_MODEL_NAME}</b></a> and π€ Transformers to transcribe audio files of arbitrary length. | |
To achieve improved accuracy and well-punctuated text, please use the [Whisper demo](https://huggingface.co./spaces/bofenghuang/whisper-demo-french). | |
""" | |
) | |
yt_link_input = gr.inputs.Textbox( | |
lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL" | |
) | |
download_youtube_btn = gr.Button("Download Youtube video") | |
downloaded_video_output = gr.Video(label="Video file", mirror_webcam=False) | |
download_youtube_btn.click( | |
download_video_from_youtube, inputs=[yt_link_input], outputs=[downloaded_video_output] | |
) | |
# with_timestamps_input3 = gr.Checkbox(label="With timestamps?", value=True) | |
video_transcribe_btn = gr.Button("Transcribe video") | |
text_output_df = gr.DataFrame( | |
value=default_text_output_df, | |
label="Transcription", | |
row_count=(0, "dynamic"), | |
max_rows=10, | |
wrap=True, | |
overflow_row_behaviour="paginate", | |
) | |
video_transcribe_btn.click(video_transcribe, inputs=[downloaded_video_output], outputs=[text_output_df]) | |
demo.launch(server_name="0.0.0.0", debug=True) | |
# demo.launch(server_name="0.0.0.0", debug=True, share=True) | |
# demo.launch(enable_queue=True) | |