radames's picture
Update app.py
7716f1f
import gradio as gr
import json
import ffmpeg
import os
from pathlib import Path
import time
from transformers import pipeline
import torch
# checkpoint = "openai/whisper-tiny"
# checkpoint = "openai/whisper-base"
checkpoint = "openai/whisper-small"
if torch.cuda.is_available() and torch.cuda.device_count() > 0:
from transformers import (
AutomaticSpeechRecognitionPipeline,
WhisperForConditionalGeneration,
WhisperProcessor,
)
model = WhisperForConditionalGeneration.from_pretrained(
checkpoint).to("cuda").half()
processor = WhisperProcessor.from_pretrained(checkpoint)
pipe = AutomaticSpeechRecognitionPipeline(
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
batch_size=8,
torch_dtype=torch.float16,
device="cuda:0"
)
else:
pipe = pipeline(model=checkpoint)
# TODO: no longer need to set these manually once the models have been updated on the Hub
# whisper-tiny
# pipe.model.generation_config.alignment_heads = [[2, 2], [3, 0], [3, 2], [3, 3], [3, 4], [3, 5]]
# whisper-base
# pipe.model.generation_config.alignment_heads = [[3, 1], [4, 2], [4, 3], [4, 7], [5, 1], [5, 2], [5, 4], [5, 6]]
# whisper-small
pipe.model.generation_config.alignment_heads = [[5, 3], [5, 9], [
8, 0], [8, 4], [8, 7], [8, 8], [9, 0], [9, 7], [9, 9], [10, 5]]
videos_out_path = Path("./videos_out")
videos_out_path.mkdir(parents=True, exist_ok=True)
samples_data = sorted(Path('examples').glob('*.json'))
SAMPLES = []
for file in samples_data:
with open(file) as f:
sample = json.load(f)
SAMPLES.append(sample)
VIDEOS = list(map(lambda x: [x['video']], SAMPLES))
async def speech_to_text(video_in):
"""
Takes a video path to convert to audio, transcribe audio channel to text and char timestamps
Using https://huggingface.co./tasks/automatic-speech-recognition pipeline
"""
video_in = video_in[0] if isinstance(video_in, list) else video_in
if (video_in == None):
raise ValueError("Video input undefined")
video_path = Path(video_in.name)
try:
# convert video to audio 16k using PIPE to audio_memory
audio_memory, _ = ffmpeg.input(video_path).output(
'-', format="wav", ac=1, ar=pipe.feature_extractor.sampling_rate).overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True)
except Exception as e:
raise RuntimeError("Error converting video to audio")
try:
print(f'Transcribing via local model')
output = pipe(audio_memory, chunk_length_s=10,
stride_length_s=[4, 2], return_timestamps="word")
transcription = output["text"]
chunks = output["chunks"]
timestamps_var = [{"word": chunk["text"], "timestamp":(
chunk["timestamp"][0], chunk["timestamp"][1]), "state": True} for chunk in chunks]
words = [(word['word'], '+' if word['state'] else '-')
for word in timestamps_var]
return (words, timestamps_var, video_in.name)
except Exception as e:
raise RuntimeError("Error Running inference with local model", e)
async def cut_timestamps_to_video(video_in, timestamps_var):
video_in = video_in[0] if isinstance(video_in, list) else video_in
if (video_in == None or timestamps_var == None):
raise ValueError("Inputs undefined")
video_path = Path(video_in.name)
video_file_name = video_path.stem
timestamps_to_cut = [
(timestamps_var[i]['timestamp'][0], timestamps_var[i]['timestamp'][1])
for i in range(len(timestamps_var)) if timestamps_var[i]['state']]
between_str = '+'.join(
map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut))
if timestamps_to_cut:
video_file = ffmpeg.input(video_path)
video = video_file.video.filter(
"select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB")
audio = video_file.audio.filter(
"aselect", f'({between_str})').filter("asetpts", "N/SR/TB")
output_video = f'./videos_out/{video_file_name}.mp4'
ffmpeg.concat(video, audio, v=1, a=1).output(
output_video).overwrite_output().global_args('-loglevel', 'quiet').run()
else:
output_video = video_path
return output_video
css = """
#words-container {
max-height: 400px;
overflow-y: scroll !important;
}
"""
with gr.Blocks(css=css) as demo:
timestamps_var = gr.JSON(visible=False)
with gr.Row():
with gr.Column():
gr.Markdown("""
# Whisper: Word-Level Video Trimming
Quick edit a video by trimming out words.
Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co./tasks/automatic-speech-recognition)
with [Whisper](https://huggingface.co./docs/transformers/model_doc/whisper)
""")
with gr.Row():
with gr.Column():
file_upload = gr.File(
label="Upload Video File", file_count=1, scale=1)
video_preview = gr.Video(
label="Video Preview", scale=3, interactive=False)
# with gr.Row():
# transcribe_btn = gr.Button(
# "Transcribe Audio")
with gr.Column():
text_in = gr.HighlightedText(
label="Transcription", combine_adjacent=False, show_legend=True, color_map={"+": "green", "-": "red"}, elem_id="words-container")
with gr.Row():
cut_btn = gr.Button("Cut Video")
select_all_words = gr.Button("Select All Words")
reset_words = gr.Button("Reset Words")
video_out = gr.Video(label="Video Out")
with gr.Row():
gr.Examples(
fn=speech_to_text,
examples=["./examples/ShiaLaBeouf.mp4",
"./examples/zuckyuval.mp4",
"./examples/cooking.mp4"],
inputs=[file_upload],
outputs=[text_in, timestamps_var, video_preview],
cache_examples=True)
with gr.Row():
gr.Markdown("""
#### Video Credits
1. [Cooking](https://vimeo.com/573792389)
1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0)
1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8)
""")
def select_text(evt: gr.SelectData, timestamps_var):
index = evt.index
timestamps_var[index]['state'] = not timestamps_var[index]['state']
words = [(word['word'], '+' if word['state'] else '-')
for word in timestamps_var]
return timestamps_var, words
def words_selection(timestamps_var, reset=False):
if reset:
for word in timestamps_var:
word['state'] = True
else:
# reverse the state of all words
for word in timestamps_var:
word['state'] = False
words = [(word['word'], '+' if word['state'] else '-')
for word in timestamps_var]
return timestamps_var, words
file_upload.upload(speech_to_text, inputs=[file_upload], outputs=[
text_in, timestamps_var, video_preview])
select_all_words.click(words_selection, inputs=[timestamps_var], outputs=[
timestamps_var, text_in], queue=False, show_progress=False)
reset_words.click(lambda x: words_selection(x, True), inputs=[timestamps_var], outputs=[
timestamps_var, text_in], queue=False, show_progress=False)
text_in.select(select_text, inputs=timestamps_var,
outputs=[timestamps_var, text_in], queue=False, show_progress=False)
# transcribe_btn.click(speech_to_text, inputs=[file_upload], outputs=[
# text_in, transcription_var, timestamps_var, video_preview])
cut_btn.click(cut_timestamps_to_video, [
file_upload, timestamps_var], [video_out])
demo.queue()
if __name__ == "__main__":
demo.launch(debug=True)