NeuralFalcon's picture
Create app.py
06bf28b verified
gpu=False
#@title Utils
#set up base_path
import os
root_path=os.getcwd()
# root_path="/content"
base_path=f"{root_path}/youtube"
if not os.path.exists(base_path):
os.mkdir(base_path)
# base_path
def make_folders():
global base_path
folder_list=["download_audio","download_video"]
for i in folder_list:
if not os.path.exists(f"{base_path}/{i}"):
os.mkdir(f"{base_path}/{i}")
make_folders()
import yt_dlp
import ffmpeg
import sys
import uuid
import re
import shutil
import uuid
def get_audio(video_path):
file_name, file_extension = os.path.splitext(os.path.basename(video_path))
random_str=str(uuid.uuid4())[:8]
audio_path=f"{base_path}/download_audio/{random_str}.mp3"
command=f"ffmpeg -i {video_path} {audio_path} -y"
var=os.system(command)
if var==0:
return audio_path
else:
print(command)
return None
import os
import re
def clean_file_name(file_path):
base_folder = os.path.dirname(file_path) # Get the base folder
if len(base_folder)>=1:
base_folder+="/"
# Extract the filename and extension from the file path
file_name, file_extension = os.path.splitext(os.path.basename(file_path))
file_name=file_name[:30]
# Replace spaces with underscores
file_name = file_name.replace(' ', '_')
# Replace special characters with underscores using regex
file_name = re.sub(r'[^\w\s-]', '_', file_name)
# Remove any extra underscores that might result from consecutive special characters
file_name = re.sub(r'__+', '_', file_name)
file_name = re.sub(r'[^a-zA-Z0-9_]', '', file_name)
# Concatenate the cleaned filename with the original extension
cleaned_file_name = base_folder+file_name + file_extension
# Rename the file with the cleaned filename
# os.rename(file_path, cleaned_file_name)
return base_folder,file_name , file_extension
# Example usage:
# file_path = "Google I⧸O 2024: Everything Revealed in 12 Minutes [PE89NysJEUY].f248.webm"
# clean_file_name(file_path)
choose_whisper_model = 'tiny' #@param ['tiny.en', 'tiny', 'base.en', 'base', 'small.en', 'small', 'medium.en', 'medium', 'large']
import whisper
whisper_model = whisper.load_model(choose_whisper_model)
import json
def clean_word(word):
word_lower=word.lower()
word_lower=word.replace("-","")
remove_char=["'",".","?","!",":",","]
for i in remove_char:
if i in word_lower:
x = word_lower.index(i)
word_lower=word_lower[:x]
word_lower=word_lower.strip()
return word_lower
def speech_to_text_with_timestamp(audio_file_path,json_file_name,lang):
global base_path,whisper_model,gpu
json_file_path=f"{base_path}/download_audio/{json_file_name}"
# Language = "Hindi" #@param ['Auto detection', 'Afrikaans', 'Albanian', 'Amharic', 'Arabic', 'Armenian', 'Assamese', 'Azerbaijani', 'Bashkir', 'Basque', 'Belarusian', 'Bengali', 'Bosnian', 'Breton', 'Bulgarian', 'Burmese', 'Castilian', 'Catalan', 'Chinese', 'Croatian', 'Czech', 'Danish', 'Dutch', 'English', 'Estonian', 'Faroese', 'Finnish', 'Flemish', 'French', 'Galician', 'Georgian', 'German', 'Greek', 'Gujarati', 'Haitian', 'Haitian Creole', 'Hausa', 'Hawaiian', 'Hebrew', 'Hindi', 'Hungarian', 'Icelandic', 'Indonesian', 'Italian', 'Japanese', 'Javanese', 'Kannada', 'Kazakh', 'Khmer', 'Korean', 'Lao', 'Latin', 'Latvian', 'Letzeburgesch', 'Lingala', 'Lithuanian', 'Luxembourgish', 'Macedonian', 'Malagasy', 'Malay', 'Malayalam', 'Maltese', 'Maori', 'Marathi', 'Moldavian', 'Moldovan', 'Mongolian', 'Myanmar', 'Nepali', 'Norwegian', 'Nynorsk', 'Occitan', 'Panjabi', 'Pashto', 'Persian', 'Polish', 'Portuguese', 'Punjabi', 'Pushto', 'Romanian', 'Russian', 'Sanskrit', 'Serbian', 'Shona', 'Sindhi', 'Sinhala', 'Sinhalese', 'Slovak', 'Slovenian', 'Somali', 'Spanish', 'Sundanese', 'Swahili', 'Swedish', 'Tagalog', 'Tajik', 'Tamil', 'Tatar', 'Telugu', 'Thai', 'Tibetan', 'Turkish', 'Turkmen', 'Ukrainian', 'Urdu', 'Uzbek', 'Valencian', 'Vietnamese', 'Welsh', 'Yiddish', 'Yoruba']
# English_translate = True #@param {type:"boolean"}
English_translate=False
if lang=='Auto detection':
Language=None
else:
Language=lang
if gpu:
if English_translate:
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=True,language=Language,task='translate')
else:
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=True,language=Language)
else:
if English_translate:
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=False,language=Language,task='translate')
else:
result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=False,language=Language)
word_count_dict = {}
for segment in result['segments']:
for word in segment['words']:
word_text = word['word'].strip()
word_lower = clean_word(word_text).lower()
word_start = word['start']
word_end = word['end']
if word_lower not in word_count_dict:
# If the word is not in the dictionary, create a new entry
word_count_dict[word_lower] = [{'word': word_text, 'start': word_start, 'end': word_end}]
else:
# If the word is already in the dictionary, append to the existing list
word_count_dict[word_lower].append({'word': word_text, 'start': word_start, 'end': word_end})
json_string = json.dumps(word_count_dict)
pretty_json_string = json.dumps(word_count_dict, indent=4)
# Write the JSON string to the file
with open(json_file_path, 'w') as json_file:
json_file.write(pretty_json_string)
return json_file_path
import os
import uuid
import random
import string
word_count_video_file_name=""
def get_rename_video():
# random_str=str(uuid.uuid4())[:8]
random_alpha_str = ''.join(random.choice(string.ascii_letters) for _ in range(10))[:8]
random_str=random_alpha_str
# List all files in the specified directory
file_list = os.listdir(f"{base_path}/download_video/single_video/")
# Filter files ending with '.mp4' or '.webm'
video_files = [filename for filename in file_list if filename.endswith('.mp4') or filename.endswith('.webm')]
if video_files:
video_path=f"{base_path}/download_video/single_video/{video_files[-1]}"
_,_,f_ex=clean_file_name(video_files[-1])
new_name=f"{base_path}/download_video/{random_str}{f_ex}"
shutil.copy(video_path,new_name)
return new_name
else:
return None
def count_yt_video():
directory_path=f"{base_path}/download_video/single_video"
# List all files in the specified directory
file_list = os.listdir(directory_path)
# Filter files ending with '.mp4' or '.webm'
video_files = [filename for filename in file_list if filename.endswith('.mp4') or filename.endswith('.webm')]
return len(video_files)
def download_youtube_video(yt_link):
current_download_path=f"{base_path}/download_video/single_video"
if os.path.exists(current_download_path):
shutil.rmtree(current_download_path)
os.mkdir(current_download_path)
os.chdir(current_download_path)
# command=(f"yt-dlp -f bestvideo+bestaudio {yt_link}")
command=(f"yt-dlp -f best {yt_link}")
var=os.system(command)
if var==0:
print("youtube video download successful")
else:
print(command)
print("Failed to download")
os.chdir(base_path)
while True:
if count_yt_video()==1:
if os.path.exists(current_download_path):
video_path=get_rename_video()
break
else:
continue
return video_path
#@title video edit
def float_to_time(seconds):
# Extract the integer part (seconds) and the fractional part (milliseconds)
# print(seconds)
seconds_int = int(seconds)
milliseconds = int((seconds - seconds_int) * 1000)
# Calculate hours, minutes, and remaining seconds
hours = seconds_int // 3600
minutes = (seconds_int % 3600) // 60
remaining_seconds = seconds_int % 60
# Format the time as "HH:MM:SS:SSSS"
formatted_time = f"{hours:02}:{minutes:02}:{remaining_seconds:02}.{milliseconds:03}"
return formatted_time
def seconds_to_milliseconds(seconds):
milliseconds = seconds * 1000
return milliseconds
# # Example usage:
# seconds = 6.54
# formatted_time = float_to_time(seconds)
# print(formatted_time) # Output: 00:00:06:540
def get_word_time(json_data,word):
global previous_word_duration
multiple_times=[]
for i in json_data:
if i==word.lower():
# print(i,json_data[i])
for j in json_data[i]:
# print(j)
s=j['start']-previous_word_duration
e=j['end']
diif=e-s
#take 4 decimal places
diff=round(diif,4)
start=float_to_time(s)
end=float_to_time(e)
# start=seconds_to_milliseconds(j['start'])
# end=seconds_to_milliseconds(j['end'])
# print(start,end)
# print(diff)
multiple_times.append((start,end,diff))
return multiple_times
import cv2
import subprocess
def get_video_dimensions(video_path):
# Open the video file
video_capture = cv2.VideoCapture(video_path)
# Check if the video file was successfully opened
if not video_capture.isOpened():
print(f"Error: Could not open video file '{video_path}'")
return None, None
# Get the height and width of the video
width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Release the video capture object
video_capture.release()
return width, height
def add_text_to_video(video_path, output_path, text, font_size=90, font_color='white', font_file=f'{base_path}/roboto.ttf',border_size=2, border_color='black'):
# Get video dimensions
global gpu
width, height = get_video_dimensions(video_path)
x=width-170
y=50
# command = [
# 'ffmpeg',
# '-i', video_path,
# '-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}",
# '-codec:a', 'copy',
# output_path,
# '-y' # Overwrite output file if it exists
# ]
if gpu:
command = [
'ffmpeg',
'-hwaccel', 'cuda', # Use CUDA for hardware acceleration
'-i', video_path,
'-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}:borderw={border_size}:bordercolor={border_color}",
'-c:a', 'copy',
'-c:v', 'h264_nvenc', # NVIDIA NVENC encoder for video encoding
'-y', # Overwrite output file if it exists
output_path
]
else:
command = [
'ffmpeg',
'-i', video_path,
'-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}:borderw={border_size}:bordercolor={border_color}",
'-codec:a', 'copy',
output_path,
'-y' # Overwrite output file if it exists
]
try:
subprocess.run(command, check=True)
print(f"Text added to video: {output_path}")
print(command)
except subprocess.CalledProcessError as e:
print(f"Error adding text to video: {e}")
import os
import uuid
def join_video(dir_path,save_path):
global gpu
files = os.listdir(dir_path)
files_sorted = sorted(files, key=lambda x: int(os.path.splitext(x)[0]))
output_path = f"{base_path}/join.txt"
with open(output_path, "w") as f:
for filename in files_sorted:
f.write(f"file '{dir_path}/{filename}'\n")
if gpu:
loop_command = f'ffmpeg -hwaccel cuda -f concat -safe 0 -i {base_path}/join.txt -c copy "{save_path}" -y'
else:
loop_command = f'ffmpeg -f concat -safe 0 -i {base_path}/join.txt -c copy "{save_path}" -y'
loop_result = os.system(loop_command)
if loop_result == 0:
print (f"video save at {save_path}")
return save_path
else:
print("Failed to merge video")
print(loop_command)
return None
import os
import shutil
def trim_video(input_video_path,timestamp_list,count):
if os.path.exists(f"{base_path}/output"):
shutil.rmtree(f"{base_path}/output")
os.mkdir(f"{base_path}/output")
if os.path.exists(f"{base_path}/text"):
shutil.rmtree(f"{base_path}/text")
os.mkdir(f"{base_path}/text")
file_name, file_extension = os.path.splitext(os.path.basename(input_video_path))
# output_video_path=f"{base_path}/"+file_name+"_output"+file_extension
new_str=str(uuid.uuid4())[:8]
output_video_path=f"{base_path}/"+new_str+"_output"+file_extension
for i in range(len(timestamp_list)):
start_time=timestamp_list[i][0]
end_time=timestamp_list[i][1]
duration=timestamp_list[i][2]
# command=f"ffmpeg -ss {start_time} -i {input_video_path} -t {duration} -codec copy ./output/{i+1}.mp4 -y"
if gpu:
command = f"ffmpeg -hwaccel cuda -ss {start_time} -i {input_video_path} -t {duration} -c:v h264_nvenc ./output/{i+1}.mp4 -y"
else:
command = f"ffmpeg -ss {start_time} -i {input_video_path} -t {duration} ./output/{i+1}.mp4 -y"
var=os.system(command)
if var==0:
print(command)
print(f"video clip {i} save")
add_text_to_video(f"{base_path}/output/{i+1}.mp4", f"{base_path}/text/{i+1}.mp4", str(i+1))
else:
print(f"video clip {i} failed")
print(command)
if count:
join_video(f"{base_path}/text",output_video_path)
else:
join_video(f"{base_path}/output",output_video_path)
return output_video_path
import json
import pandas as pd
def clear_terminal():
# Clear the terminal screen using ANSI escape codes
if os.name == 'posix': # Unix/Linux/MacOS
_ = os.system('clear')
elif os.name == 'nt': # Windows
_ = os.system('cls')
def process(json_file_path,video_file_path,find_word,count=True):
with open(json_file_path, 'r') as json_file:
data = json.load(json_file)
df = pd.DataFrame([(key, len(data)) for key, data in data.items()],
columns=['Word', 'Word Count'])
df = df.sort_values('Word Count', ascending=False)
word_list=list(df['Word'])
print(word_list)
timestamp_list=[]
find_word=find_word.lower().strip()
if find_word in word_list:
timestamp_list=get_word_time(data,find_word)
vid1=trim_video(video_file_path,timestamp_list,count)
clear_terminal()
print(f"File1 saved at {vid1}")
return vid1
#@title Download font
import os
import shutil
from tqdm import tqdm
import urllib
def conditional_download(url, download_file_path):
print(f"Downloading {os.path.basename(download_file_path)}")
base_path = os.path.dirname(download_file_path)
if not os.path.exists(base_path):
os.makedirs(base_path)
if os.path.exists(download_file_path):
os.remove(download_file_path)
try:
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get('Content-Length', 0))
except urllib.error.URLError as e:
print(f"Error: Unable to open the URL - {url}")
print(f"Reason: {e.reason}")
return
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
try:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
except urllib.error.URLError as e:
print(f"Error: Failed to download the file from the URL - {url}")
print(f"Reason: {e.reason}")
return
print(f"Download successful!")
print(f"URL: {url}")
print(f"Save at: {download_file_path}")
try:
shutil.copy("./Roboto-Black.ttf",f'{base_path}/roboto.ttf')
except:
pass
if not os.path.exists(f'{base_path}/roboto.ttf'):
conditional_download("https://github.com/neuralfalcon/Video-Keyword-Cutter/raw/main/Roboto-Black.ttf", f'{base_path}/roboto.ttf')
#@title gradio utils
def highlight_cols(x):
df = x.copy()
df.loc[:, :] = 'color: white'
df[['Word Count']] = 'color: green'
return df
old_data=[]
def gradio_whisper_config(youtube_video_link, file_path,language):
global base_path,word_count_video_file_name,old_data
old_data=[]
random_str = str(uuid.uuid4())[:8]
if youtube_video_link and len(youtube_video_link) >= 1:
yt_video_file_path = download_youtube_video(youtube_video_link)
_, original_name, exten = clean_file_name(yt_video_file_path)
video_file_path = f"{base_path}/download_video/{random_str}{exten}"
os.rename(yt_video_file_path, video_file_path)
else:
if file_path!=None:
if len(file_path)>=1:
_, original_name, exten = clean_file_name(file_path)
if file_path.lower().endswith(".mp4") or file_path.lower().endswith(".webm"):
video_file_path = f"{base_path}/download_video/{random_str}{exten}"
shutil.copy(file_path, video_file_path)
word_count_video_file_name=f"{original_name}_{random_str}"
print(video_file_path)
audio_file_path = get_audio(video_file_path)
print(audio_file_path)
json_file_name = f"{random_str}.json"
json_file_path = speech_to_text_with_timestamp(audio_file_path, json_file_name,language)
with open(json_file_path, 'r') as json_file:
data = json.load(json_file)
df = pd.DataFrame([(key, len(data)) for key, data in data.items()],columns=['Word', 'Word Count'])
df = df.sort_values('Word Count', ascending=False)
df_filtered = df[df['Word Count'] >= 5]
# Applying the style function
s = df_filtered.style.apply(highlight_cols, axis = None)
old_data.append(video_file_path)
old_data.append(json_file_path)
return json_file_path,s
def video_edit_gradio(find_word,previous_word_duration,count):
global old_data
video_file_path=old_data[0]
json_file_path=old_data[-1]
if len(previous_word_duration)==0:
previous_word_duration=0.0
previous_word_duration=float(previous_word_duration)
video_path=process(json_file_path,video_file_path,find_word,count)
return video_path,video_path
#@title Run gradio webapp
import gradio as gr
whisper_examples = [["https://www.youtube.com/watch?v=PE89NysJEUY&t=9s&ab_channel=CNET",None]]
whisper_inputs=[gr.Textbox(label="Enter YouTube Video Link"),gr.File(label="Upload Audio or Video File",type="filepath"),gr.Dropdown(['Auto detection','English','Hindi','Bengali', 'Afrikaans', 'Albanian', 'Amharic', 'Arabic', 'Armenian', 'Assamese', 'Azerbaijani', 'Bashkir', 'Basque', 'Belarusian', 'Bengali', 'Bosnian', 'Breton', 'Bulgarian', 'Burmese', 'Castilian', 'Catalan', 'Chinese', 'Croatian', 'Czech', 'Danish', 'Dutch', 'English', 'Estonian', 'Faroese', 'Finnish', 'Flemish', 'French', 'Galician', 'Georgian', 'German', 'Greek', 'Gujarati', 'Haitian', 'Haitian Creole', 'Hausa', 'Hawaiian', 'Hebrew', 'Hindi', 'Hungarian', 'Icelandic', 'Indonesian', 'Italian', 'Japanese', 'Javanese', 'Kannada', 'Kazakh', 'Khmer', 'Korean', 'Lao', 'Latin', 'Latvian', 'Letzeburgesch', 'Lingala', 'Lithuanian', 'Luxembourgish', 'Macedonian', 'Malagasy', 'Malay', 'Malayalam', 'Maltese', 'Maori', 'Marathi', 'Moldavian', 'Moldovan', 'Mongolian', 'Myanmar', 'Nepali', 'Norwegian', 'Nynorsk', 'Occitan', 'Panjabi', 'Pashto', 'Persian', 'Polish', 'Portuguese', 'Punjabi', 'Pushto', 'Romanian', 'Russian', 'Sanskrit', 'Serbian', 'Shona', 'Sindhi', 'Sinhala', 'Sinhalese', 'Slovak', 'Slovenian', 'Somali', 'Spanish', 'Sundanese', 'Swahili', 'Swedish', 'Tagalog', 'Tajik', 'Tamil', 'Tatar', 'Telugu', 'Thai', 'Tibetan', 'Turkish', 'Turkmen', 'Ukrainian', 'Urdu', 'Uzbek', 'Valencian', 'Vietnamese', 'Welsh', 'Yiddish', 'Yoruba'], label="Select Language", value='Auto detection')]
whisper_outputs=[gr.File(label="Download Json File"),gr.DataFrame(label="Count word")]
whisper_demo = gr.Interface(fn=gradio_whisper_config, inputs=whisper_inputs,outputs=whisper_outputs , title="Generate word level timestamps using Whisper",examples=whisper_examples)
# whisper_demo.launch(share=True,debug=True)
video_trim_examples = [["","0.0",True]]
video_trim_inputs=[gr.Textbox(label="Which WORD you want to find?"),gr.Textbox(label="Previous word duration threshold",value="0.1"),gr.Checkbox(label="Display how many times the word is used",value=True)]
video_trim_outputs=[gr.File(label="Download Video File"),gr.Video(label="Display Video")]
video_trim_demo = gr.Interface(fn=video_edit_gradio, inputs=video_trim_inputs,outputs=video_trim_outputs , title="Trim and Marge Video Clips",examples=video_trim_examples)
# video_trim_demo.launch(share=True,debug=True)
demo = gr.TabbedInterface([whisper_demo,video_trim_demo], ["Generate word level timestamps using Whisper","Trim and Marge Video Clips"])
demo.launch(share=True,debug=False)