Spaces:
Build error
Build error
gpu=False | |
#@title Utils | |
#set up base_path | |
import os | |
root_path=os.getcwd() | |
# root_path="/content" | |
base_path=f"{root_path}/youtube" | |
if not os.path.exists(base_path): | |
os.mkdir(base_path) | |
# base_path | |
def make_folders(): | |
global base_path | |
folder_list=["download_audio","download_video"] | |
for i in folder_list: | |
if not os.path.exists(f"{base_path}/{i}"): | |
os.mkdir(f"{base_path}/{i}") | |
make_folders() | |
import yt_dlp | |
import ffmpeg | |
import sys | |
import uuid | |
import re | |
import shutil | |
import uuid | |
def get_audio(video_path): | |
file_name, file_extension = os.path.splitext(os.path.basename(video_path)) | |
random_str=str(uuid.uuid4())[:8] | |
audio_path=f"{base_path}/download_audio/{random_str}.mp3" | |
command=f"ffmpeg -i {video_path} {audio_path} -y" | |
var=os.system(command) | |
if var==0: | |
return audio_path | |
else: | |
print(command) | |
return None | |
import os | |
import re | |
def clean_file_name(file_path): | |
base_folder = os.path.dirname(file_path) # Get the base folder | |
if len(base_folder)>=1: | |
base_folder+="/" | |
# Extract the filename and extension from the file path | |
file_name, file_extension = os.path.splitext(os.path.basename(file_path)) | |
file_name=file_name[:30] | |
# Replace spaces with underscores | |
file_name = file_name.replace(' ', '_') | |
# Replace special characters with underscores using regex | |
file_name = re.sub(r'[^\w\s-]', '_', file_name) | |
# Remove any extra underscores that might result from consecutive special characters | |
file_name = re.sub(r'__+', '_', file_name) | |
file_name = re.sub(r'[^a-zA-Z0-9_]', '', file_name) | |
# Concatenate the cleaned filename with the original extension | |
cleaned_file_name = base_folder+file_name + file_extension | |
# Rename the file with the cleaned filename | |
# os.rename(file_path, cleaned_file_name) | |
return base_folder,file_name , file_extension | |
# Example usage: | |
# file_path = "Google I⧸O 2024: Everything Revealed in 12 Minutes [PE89NysJEUY].f248.webm" | |
# clean_file_name(file_path) | |
choose_whisper_model = 'tiny' #@param ['tiny.en', 'tiny', 'base.en', 'base', 'small.en', 'small', 'medium.en', 'medium', 'large'] | |
import whisper | |
whisper_model = whisper.load_model(choose_whisper_model) | |
import json | |
def clean_word(word): | |
word_lower=word.lower() | |
word_lower=word.replace("-","") | |
remove_char=["'",".","?","!",":",","] | |
for i in remove_char: | |
if i in word_lower: | |
x = word_lower.index(i) | |
word_lower=word_lower[:x] | |
word_lower=word_lower.strip() | |
return word_lower | |
def speech_to_text_with_timestamp(audio_file_path,json_file_name,lang): | |
global base_path,whisper_model,gpu | |
json_file_path=f"{base_path}/download_audio/{json_file_name}" | |
# Language = "Hindi" #@param ['Auto detection', 'Afrikaans', 'Albanian', 'Amharic', 'Arabic', 'Armenian', 'Assamese', 'Azerbaijani', 'Bashkir', 'Basque', 'Belarusian', 'Bengali', 'Bosnian', 'Breton', 'Bulgarian', 'Burmese', 'Castilian', 'Catalan', 'Chinese', 'Croatian', 'Czech', 'Danish', 'Dutch', 'English', 'Estonian', 'Faroese', 'Finnish', 'Flemish', 'French', 'Galician', 'Georgian', 'German', 'Greek', 'Gujarati', 'Haitian', 'Haitian Creole', 'Hausa', 'Hawaiian', 'Hebrew', 'Hindi', 'Hungarian', 'Icelandic', 'Indonesian', 'Italian', 'Japanese', 'Javanese', 'Kannada', 'Kazakh', 'Khmer', 'Korean', 'Lao', 'Latin', 'Latvian', 'Letzeburgesch', 'Lingala', 'Lithuanian', 'Luxembourgish', 'Macedonian', 'Malagasy', 'Malay', 'Malayalam', 'Maltese', 'Maori', 'Marathi', 'Moldavian', 'Moldovan', 'Mongolian', 'Myanmar', 'Nepali', 'Norwegian', 'Nynorsk', 'Occitan', 'Panjabi', 'Pashto', 'Persian', 'Polish', 'Portuguese', 'Punjabi', 'Pushto', 'Romanian', 'Russian', 'Sanskrit', 'Serbian', 'Shona', 'Sindhi', 'Sinhala', 'Sinhalese', 'Slovak', 'Slovenian', 'Somali', 'Spanish', 'Sundanese', 'Swahili', 'Swedish', 'Tagalog', 'Tajik', 'Tamil', 'Tatar', 'Telugu', 'Thai', 'Tibetan', 'Turkish', 'Turkmen', 'Ukrainian', 'Urdu', 'Uzbek', 'Valencian', 'Vietnamese', 'Welsh', 'Yiddish', 'Yoruba'] | |
# English_translate = True #@param {type:"boolean"} | |
English_translate=False | |
if lang=='Auto detection': | |
Language=None | |
else: | |
Language=lang | |
if gpu: | |
if English_translate: | |
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=True,language=Language,task='translate') | |
else: | |
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=True,language=Language) | |
else: | |
if English_translate: | |
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=False,language=Language,task='translate') | |
else: | |
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=False,language=Language) | |
word_count_dict = {} | |
for segment in result['segments']: | |
for word in segment['words']: | |
word_text = word['word'].strip() | |
word_lower = clean_word(word_text).lower() | |
word_start = word['start'] | |
word_end = word['end'] | |
if word_lower not in word_count_dict: | |
# If the word is not in the dictionary, create a new entry | |
word_count_dict[word_lower] = [{'word': word_text, 'start': word_start, 'end': word_end}] | |
else: | |
# If the word is already in the dictionary, append to the existing list | |
word_count_dict[word_lower].append({'word': word_text, 'start': word_start, 'end': word_end}) | |
json_string = json.dumps(word_count_dict) | |
pretty_json_string = json.dumps(word_count_dict, indent=4) | |
# Write the JSON string to the file | |
with open(json_file_path, 'w') as json_file: | |
json_file.write(pretty_json_string) | |
return json_file_path | |
import os | |
import uuid | |
import random | |
import string | |
word_count_video_file_name="" | |
def get_rename_video(): | |
# random_str=str(uuid.uuid4())[:8] | |
random_alpha_str = ''.join(random.choice(string.ascii_letters) for _ in range(10))[:8] | |
random_str=random_alpha_str | |
# List all files in the specified directory | |
file_list = os.listdir(f"{base_path}/download_video/single_video/") | |
# Filter files ending with '.mp4' or '.webm' | |
video_files = [filename for filename in file_list if filename.endswith('.mp4') or filename.endswith('.webm')] | |
if video_files: | |
video_path=f"{base_path}/download_video/single_video/{video_files[-1]}" | |
_,_,f_ex=clean_file_name(video_files[-1]) | |
new_name=f"{base_path}/download_video/{random_str}{f_ex}" | |
shutil.copy(video_path,new_name) | |
return new_name | |
else: | |
return None | |
def count_yt_video(): | |
directory_path=f"{base_path}/download_video/single_video" | |
# List all files in the specified directory | |
file_list = os.listdir(directory_path) | |
# Filter files ending with '.mp4' or '.webm' | |
video_files = [filename for filename in file_list if filename.endswith('.mp4') or filename.endswith('.webm')] | |
return len(video_files) | |
def download_youtube_video(yt_link): | |
current_download_path=f"{base_path}/download_video/single_video" | |
if os.path.exists(current_download_path): | |
shutil.rmtree(current_download_path) | |
os.mkdir(current_download_path) | |
os.chdir(current_download_path) | |
# command=(f"yt-dlp -f bestvideo+bestaudio {yt_link}") | |
command=(f"yt-dlp -f best {yt_link}") | |
var=os.system(command) | |
if var==0: | |
print("youtube video download successful") | |
else: | |
print(command) | |
print("Failed to download") | |
os.chdir(base_path) | |
while True: | |
if count_yt_video()==1: | |
if os.path.exists(current_download_path): | |
video_path=get_rename_video() | |
break | |
else: | |
continue | |
return video_path | |
#@title video edit | |
def float_to_time(seconds): | |
# Extract the integer part (seconds) and the fractional part (milliseconds) | |
# print(seconds) | |
seconds_int = int(seconds) | |
milliseconds = int((seconds - seconds_int) * 1000) | |
# Calculate hours, minutes, and remaining seconds | |
hours = seconds_int // 3600 | |
minutes = (seconds_int % 3600) // 60 | |
remaining_seconds = seconds_int % 60 | |
# Format the time as "HH:MM:SS:SSSS" | |
formatted_time = f"{hours:02}:{minutes:02}:{remaining_seconds:02}.{milliseconds:03}" | |
return formatted_time | |
def seconds_to_milliseconds(seconds): | |
milliseconds = seconds * 1000 | |
return milliseconds | |
# # Example usage: | |
# seconds = 6.54 | |
# formatted_time = float_to_time(seconds) | |
# print(formatted_time) # Output: 00:00:06:540 | |
def get_word_time(json_data,word): | |
global previous_word_duration | |
multiple_times=[] | |
for i in json_data: | |
if i==word.lower(): | |
# print(i,json_data[i]) | |
for j in json_data[i]: | |
# print(j) | |
s=j['start']-previous_word_duration | |
e=j['end'] | |
diif=e-s | |
#take 4 decimal places | |
diff=round(diif,4) | |
start=float_to_time(s) | |
end=float_to_time(e) | |
# start=seconds_to_milliseconds(j['start']) | |
# end=seconds_to_milliseconds(j['end']) | |
# print(start,end) | |
# print(diff) | |
multiple_times.append((start,end,diff)) | |
return multiple_times | |
import cv2 | |
import subprocess | |
def get_video_dimensions(video_path): | |
# Open the video file | |
video_capture = cv2.VideoCapture(video_path) | |
# Check if the video file was successfully opened | |
if not video_capture.isOpened(): | |
print(f"Error: Could not open video file '{video_path}'") | |
return None, None | |
# Get the height and width of the video | |
width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Release the video capture object | |
video_capture.release() | |
return width, height | |
def add_text_to_video(video_path, output_path, text, font_size=90, font_color='white', font_file=f'{base_path}/roboto.ttf',border_size=2, border_color='black'): | |
# Get video dimensions | |
global gpu | |
width, height = get_video_dimensions(video_path) | |
x=width-170 | |
y=50 | |
# command = [ | |
# 'ffmpeg', | |
# '-i', video_path, | |
# '-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}", | |
# '-codec:a', 'copy', | |
# output_path, | |
# '-y' # Overwrite output file if it exists | |
# ] | |
if gpu: | |
command = [ | |
'ffmpeg', | |
'-hwaccel', 'cuda', # Use CUDA for hardware acceleration | |
'-i', video_path, | |
'-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}:borderw={border_size}:bordercolor={border_color}", | |
'-c:a', 'copy', | |
'-c:v', 'h264_nvenc', # NVIDIA NVENC encoder for video encoding | |
'-y', # Overwrite output file if it exists | |
output_path | |
] | |
else: | |
command = [ | |
'ffmpeg', | |
'-i', video_path, | |
'-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}:borderw={border_size}:bordercolor={border_color}", | |
'-codec:a', 'copy', | |
output_path, | |
'-y' # Overwrite output file if it exists | |
] | |
try: | |
subprocess.run(command, check=True) | |
print(f"Text added to video: {output_path}") | |
print(command) | |
except subprocess.CalledProcessError as e: | |
print(f"Error adding text to video: {e}") | |
import os | |
import uuid | |
def join_video(dir_path,save_path): | |
global gpu | |
files = os.listdir(dir_path) | |
files_sorted = sorted(files, key=lambda x: int(os.path.splitext(x)[0])) | |
output_path = f"{base_path}/join.txt" | |
with open(output_path, "w") as f: | |
for filename in files_sorted: | |
f.write(f"file '{dir_path}/{filename}'\n") | |
if gpu: | |
loop_command = f'ffmpeg -hwaccel cuda -f concat -safe 0 -i {base_path}/join.txt -c copy "{save_path}" -y' | |
else: | |
loop_command = f'ffmpeg -f concat -safe 0 -i {base_path}/join.txt -c copy "{save_path}" -y' | |
loop_result = os.system(loop_command) | |
if loop_result == 0: | |
print (f"video save at {save_path}") | |
return save_path | |
else: | |
print("Failed to merge video") | |
print(loop_command) | |
return None | |
import os | |
import shutil | |
def trim_video(input_video_path,timestamp_list,count): | |
if os.path.exists(f"{base_path}/output"): | |
shutil.rmtree(f"{base_path}/output") | |
os.mkdir(f"{base_path}/output") | |
if os.path.exists(f"{base_path}/text"): | |
shutil.rmtree(f"{base_path}/text") | |
os.mkdir(f"{base_path}/text") | |
file_name, file_extension = os.path.splitext(os.path.basename(input_video_path)) | |
# output_video_path=f"{base_path}/"+file_name+"_output"+file_extension | |
new_str=str(uuid.uuid4())[:8] | |
output_video_path=f"{base_path}/"+new_str+"_output"+file_extension | |
for i in range(len(timestamp_list)): | |
start_time=timestamp_list[i][0] | |
end_time=timestamp_list[i][1] | |
duration=timestamp_list[i][2] | |
# command=f"ffmpeg -ss {start_time} -i {input_video_path} -t {duration} -codec copy ./output/{i+1}.mp4 -y" | |
if gpu: | |
command = f"ffmpeg -hwaccel cuda -ss {start_time} -i {input_video_path} -t {duration} -c:v h264_nvenc ./output/{i+1}.mp4 -y" | |
else: | |
command = f"ffmpeg -ss {start_time} -i {input_video_path} -t {duration} ./output/{i+1}.mp4 -y" | |
var=os.system(command) | |
if var==0: | |
print(command) | |
print(f"video clip {i} save") | |
add_text_to_video(f"{base_path}/output/{i+1}.mp4", f"{base_path}/text/{i+1}.mp4", str(i+1)) | |
else: | |
print(f"video clip {i} failed") | |
print(command) | |
if count: | |
join_video(f"{base_path}/text",output_video_path) | |
else: | |
join_video(f"{base_path}/output",output_video_path) | |
return output_video_path | |
import json | |
import pandas as pd | |
def clear_terminal(): | |
# Clear the terminal screen using ANSI escape codes | |
if os.name == 'posix': # Unix/Linux/MacOS | |
_ = os.system('clear') | |
elif os.name == 'nt': # Windows | |
_ = os.system('cls') | |
def process(json_file_path,video_file_path,find_word,count=True): | |
with open(json_file_path, 'r') as json_file: | |
data = json.load(json_file) | |
df = pd.DataFrame([(key, len(data)) for key, data in data.items()], | |
columns=['Word', 'Word Count']) | |
df = df.sort_values('Word Count', ascending=False) | |
word_list=list(df['Word']) | |
print(word_list) | |
timestamp_list=[] | |
find_word=find_word.lower().strip() | |
if find_word in word_list: | |
timestamp_list=get_word_time(data,find_word) | |
vid1=trim_video(video_file_path,timestamp_list,count) | |
clear_terminal() | |
print(f"File1 saved at {vid1}") | |
return vid1 | |
#@title Download font | |
import os | |
import shutil | |
from tqdm import tqdm | |
import urllib | |
def conditional_download(url, download_file_path): | |
print(f"Downloading {os.path.basename(download_file_path)}") | |
base_path = os.path.dirname(download_file_path) | |
if not os.path.exists(base_path): | |
os.makedirs(base_path) | |
if os.path.exists(download_file_path): | |
os.remove(download_file_path) | |
try: | |
request = urllib.request.urlopen(url) # type: ignore[attr-defined] | |
total = int(request.headers.get('Content-Length', 0)) | |
except urllib.error.URLError as e: | |
print(f"Error: Unable to open the URL - {url}") | |
print(f"Reason: {e.reason}") | |
return | |
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress: | |
try: | |
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] | |
except urllib.error.URLError as e: | |
print(f"Error: Failed to download the file from the URL - {url}") | |
print(f"Reason: {e.reason}") | |
return | |
print(f"Download successful!") | |
print(f"URL: {url}") | |
print(f"Save at: {download_file_path}") | |
try: | |
shutil.copy("./Roboto-Black.ttf",f'{base_path}/roboto.ttf') | |
except: | |
pass | |
if not os.path.exists(f'{base_path}/roboto.ttf'): | |
conditional_download("https://github.com/neuralfalcon/Video-Keyword-Cutter/raw/main/Roboto-Black.ttf", f'{base_path}/roboto.ttf') | |
#@title gradio utils | |
def highlight_cols(x): | |
df = x.copy() | |
df.loc[:, :] = 'color: white' | |
df[['Word Count']] = 'color: green' | |
return df | |
old_data=[] | |
def gradio_whisper_config(youtube_video_link, file_path,language): | |
global base_path,word_count_video_file_name,old_data | |
old_data=[] | |
random_str = str(uuid.uuid4())[:8] | |
if youtube_video_link and len(youtube_video_link) >= 1: | |
yt_video_file_path = download_youtube_video(youtube_video_link) | |
_, original_name, exten = clean_file_name(yt_video_file_path) | |
video_file_path = f"{base_path}/download_video/{random_str}{exten}" | |
os.rename(yt_video_file_path, video_file_path) | |
else: | |
if file_path!=None: | |
if len(file_path)>=1: | |
_, original_name, exten = clean_file_name(file_path) | |
if file_path.lower().endswith(".mp4") or file_path.lower().endswith(".webm"): | |
video_file_path = f"{base_path}/download_video/{random_str}{exten}" | |
shutil.copy(file_path, video_file_path) | |
word_count_video_file_name=f"{original_name}_{random_str}" | |
print(video_file_path) | |
audio_file_path = get_audio(video_file_path) | |
print(audio_file_path) | |
json_file_name = f"{random_str}.json" | |
json_file_path = speech_to_text_with_timestamp(audio_file_path, json_file_name,language) | |
with open(json_file_path, 'r') as json_file: | |
data = json.load(json_file) | |
df = pd.DataFrame([(key, len(data)) for key, data in data.items()],columns=['Word', 'Word Count']) | |
df = df.sort_values('Word Count', ascending=False) | |
df_filtered = df[df['Word Count'] >= 5] | |
# Applying the style function | |
s = df_filtered.style.apply(highlight_cols, axis = None) | |
old_data.append(video_file_path) | |
old_data.append(json_file_path) | |
return json_file_path,s | |
def video_edit_gradio(find_word,previous_word_duration,count): | |
global old_data | |
video_file_path=old_data[0] | |
json_file_path=old_data[-1] | |
if len(previous_word_duration)==0: | |
previous_word_duration=0.0 | |
previous_word_duration=float(previous_word_duration) | |
video_path=process(json_file_path,video_file_path,find_word,count) | |
return video_path,video_path | |
#@title Run gradio webapp | |
import gradio as gr | |
whisper_examples = [["https://www.youtube.com/watch?v=PE89NysJEUY&t=9s&ab_channel=CNET",None]] | |
whisper_inputs=[gr.Textbox(label="Enter YouTube Video Link"),gr.File(label="Upload Audio or Video File",type="filepath"),gr.Dropdown(['Auto detection','English','Hindi','Bengali', 'Afrikaans', 'Albanian', 'Amharic', 'Arabic', 'Armenian', 'Assamese', 'Azerbaijani', 'Bashkir', 'Basque', 'Belarusian', 'Bengali', 'Bosnian', 'Breton', 'Bulgarian', 'Burmese', 'Castilian', 'Catalan', 'Chinese', 'Croatian', 'Czech', 'Danish', 'Dutch', 'English', 'Estonian', 'Faroese', 'Finnish', 'Flemish', 'French', 'Galician', 'Georgian', 'German', 'Greek', 'Gujarati', 'Haitian', 'Haitian Creole', 'Hausa', 'Hawaiian', 'Hebrew', 'Hindi', 'Hungarian', 'Icelandic', 'Indonesian', 'Italian', 'Japanese', 'Javanese', 'Kannada', 'Kazakh', 'Khmer', 'Korean', 'Lao', 'Latin', 'Latvian', 'Letzeburgesch', 'Lingala', 'Lithuanian', 'Luxembourgish', 'Macedonian', 'Malagasy', 'Malay', 'Malayalam', 'Maltese', 'Maori', 'Marathi', 'Moldavian', 'Moldovan', 'Mongolian', 'Myanmar', 'Nepali', 'Norwegian', 'Nynorsk', 'Occitan', 'Panjabi', 'Pashto', 'Persian', 'Polish', 'Portuguese', 'Punjabi', 'Pushto', 'Romanian', 'Russian', 'Sanskrit', 'Serbian', 'Shona', 'Sindhi', 'Sinhala', 'Sinhalese', 'Slovak', 'Slovenian', 'Somali', 'Spanish', 'Sundanese', 'Swahili', 'Swedish', 'Tagalog', 'Tajik', 'Tamil', 'Tatar', 'Telugu', 'Thai', 'Tibetan', 'Turkish', 'Turkmen', 'Ukrainian', 'Urdu', 'Uzbek', 'Valencian', 'Vietnamese', 'Welsh', 'Yiddish', 'Yoruba'], label="Select Language", value='Auto detection')] | |
whisper_outputs=[gr.File(label="Download Json File"),gr.DataFrame(label="Count word")] | |
whisper_demo = gr.Interface(fn=gradio_whisper_config, inputs=whisper_inputs,outputs=whisper_outputs , title="Generate word level timestamps using Whisper",examples=whisper_examples) | |
# whisper_demo.launch(share=True,debug=True) | |
video_trim_examples = [["","0.0",True]] | |
video_trim_inputs=[gr.Textbox(label="Which WORD you want to find?"),gr.Textbox(label="Previous word duration threshold",value="0.1"),gr.Checkbox(label="Display how many times the word is used",value=True)] | |
video_trim_outputs=[gr.File(label="Download Video File"),gr.Video(label="Display Video")] | |
video_trim_demo = gr.Interface(fn=video_edit_gradio, inputs=video_trim_inputs,outputs=video_trim_outputs , title="Trim and Marge Video Clips",examples=video_trim_examples) | |
# video_trim_demo.launch(share=True,debug=True) | |
demo = gr.TabbedInterface([whisper_demo,video_trim_demo], ["Generate word level timestamps using Whisper","Trim and Marge Video Clips"]) | |
demo.launch(share=True,debug=False) | |