Spaces:
Running
Running
# copyright Guy Giesbrecht and Andy Walters January, 2024 | |
# | |
import os | |
os.system('pip install -U openai-whisper') | |
os.system('pip install -U git+https://github.com/linto-ai/whisper-timestamped') | |
os.system('pip install gradio moviepy whisper-timestamped') | |
import datetime as dt | |
import json | |
import whisper_timestamped as whisper | |
from moviepy.video.io.VideoFileClip import VideoFileClip | |
import gradio as gr | |
# Helper functions and global variables | |
outdir = dt.datetime.now().strftime("%Y%m%d%H%M") | |
if os.path.exists(outdir): | |
random_digits = str(random.randint(1000, 9999)) | |
new_outdir = outdir + random_digits | |
os.mkdir(new_outdir) | |
outdir = new_outdir | |
print("Created new output directory:", new_outdir) | |
else: | |
os.system(f"mkdir {outdir}") | |
print("date time now:" + outdir) | |
model = whisper.load_model("base") | |
def generate_timestamps(vidname): | |
audio = whisper.load_audio(vidname) | |
result = whisper.transcribe(model, audio, language="en") | |
return result | |
def get_segment_info(data): | |
new_list = [] | |
for segment in data.get("segments", []): | |
if "id" in segment and "start" in segment and "end" in segment and "text" in segment: | |
new_item = { | |
"id": segment["id"], | |
"start": segment["start"], | |
"end": segment["end"], | |
"text": segment["text"] | |
} | |
new_list.append(new_item) | |
return new_list | |
def combine_entries(entries): | |
combined_entries = [] | |
current_entry = None | |
total_duration = 0 | |
for entry in entries: | |
entry_duration = entry["end"] - entry["start"] | |
if total_duration + entry_duration > 30: | |
if current_entry: | |
current_entry["end"] = entry["end"] | |
combined_entries.append(current_entry) | |
current_entry = { | |
"start": entry["start"], | |
"end": entry["end"], | |
"text": entry["text"] | |
} | |
total_duration = entry_duration | |
else: | |
if current_entry: | |
current_entry["end"] = entry["end"] | |
current_entry["text"] += " " + entry["text"] | |
total_duration += entry_duration | |
else: | |
current_entry = { | |
"start": entry["start"], | |
"end": entry["end"], | |
"text": entry["text"] | |
} | |
total_duration = entry_duration | |
if current_entry: | |
combined_entries.append(current_entry) | |
return combined_entries | |
def extract_video_segment(input_video, output_video, start_time, end_time): | |
video_clip = VideoFileClip(input_video).subclip(start_time, end_time) | |
video_clip.write_videofile(output_video, codec="libx264", audio_codec="aac") | |
video_clip.close() | |
def save_segments(outdir, name, combined_entries): | |
segments = combined_entries | |
input_video = name | |
for i, segment in enumerate(segments): | |
start_time = segment['start'] | |
end_time = segment['end'] | |
output_video_file = f'{outdir}/output_segment_{i + 1}.mp4' | |
extract_video_segment(input_video, output_video_file, start_time, end_time) | |
def split_up_video(video_path, output_dir): | |
result = generate_timestamps(video_path) | |
combined_entries = combine_entries(get_segment_info(result)) | |
scribeout = open(f"{output_dir}/transcript.txt", "w") | |
scribeout.write(json.dumps(combined_entries, indent=2, ensure_ascii=False)) | |
scribeout.close() | |
save_segments(output_dir, video_path, combined_entries) | |
filename, extension = os.path.splitext(video_path) | |
os.system(f"zip -r {filename}.zip {output_dir}") | |
return f"{filename}.zip" | |
# Gradio interface | |
def process_video(video): | |
output_dir = dt.datetime.now().strftime("%Y%m%d%H%M") | |
os.mkdir(output_dir) | |
video_path = video.name | |
output_zip = split_up_video(video_path, output_dir) | |
return output_zip | |
iface = gr.Interface( | |
fn=process_video, | |
inputs=gr.File(file_count="single", type="filepath", label="Upload a Video"), | |
outputs=gr.File(label="Download Zipped Segments and Transcript"), | |
title="Video Splitter", | |
description="Upload a video and get a zipped file with segmented videos and a transcript." | |
) | |
iface.launch() | |