gpu=False #@title Utils #set up base_path import os root_path=os.getcwd() # root_path="/content" base_path=f"{root_path}/youtube" if not os.path.exists(base_path): os.mkdir(base_path) # base_path def make_folders(): global base_path folder_list=["download_audio","download_video"] for i in folder_list: if not os.path.exists(f"{base_path}/{i}"): os.mkdir(f"{base_path}/{i}") make_folders() import yt_dlp import ffmpeg import sys import uuid import re import shutil import uuid def get_audio(video_path): file_name, file_extension = os.path.splitext(os.path.basename(video_path)) random_str=str(uuid.uuid4())[:8] audio_path=f"{base_path}/download_audio/{random_str}.mp3" command=f"ffmpeg -i {video_path} {audio_path} -y" var=os.system(command) if var==0: return audio_path else: print(command) return None import os import re def clean_file_name(file_path): base_folder = os.path.dirname(file_path) # Get the base folder if len(base_folder)>=1: base_folder+="/" # Extract the filename and extension from the file path file_name, file_extension = os.path.splitext(os.path.basename(file_path)) file_name=file_name[:30] # Replace spaces with underscores file_name = file_name.replace(' ', '_') # Replace special characters with underscores using regex file_name = re.sub(r'[^\w\s-]', '_', file_name) # Remove any extra underscores that might result from consecutive special characters file_name = re.sub(r'__+', '_', file_name) file_name = re.sub(r'[^a-zA-Z0-9_]', '', file_name) # Concatenate the cleaned filename with the original extension cleaned_file_name = base_folder+file_name + file_extension # Rename the file with the cleaned filename # os.rename(file_path, cleaned_file_name) return base_folder,file_name , file_extension # Example usage: # file_path = "Google I⧸O 2024: Everything Revealed in 12 Minutes [PE89NysJEUY].f248.webm" # clean_file_name(file_path) choose_whisper_model = 'tiny' #@param ['tiny.en', 'tiny', 'base.en', 'base', 'small.en', 'small', 'medium.en', 'medium', 'large'] import whisper whisper_model = whisper.load_model(choose_whisper_model) import json def clean_word(word): word_lower=word.lower() word_lower=word.replace("-","") remove_char=["'",".","?","!",":",","] for i in remove_char: if i in word_lower: x = word_lower.index(i) word_lower=word_lower[:x] word_lower=word_lower.strip() return word_lower def speech_to_text_with_timestamp(audio_file_path,json_file_name,lang): global base_path,whisper_model,gpu json_file_path=f"{base_path}/download_audio/{json_file_name}" # Language = "Hindi" #@param ['Auto detection', 'Afrikaans', 'Albanian', 'Amharic', 'Arabic', 'Armenian', 'Assamese', 'Azerbaijani', 'Bashkir', 'Basque', 'Belarusian', 'Bengali', 'Bosnian', 'Breton', 'Bulgarian', 'Burmese', 'Castilian', 'Catalan', 'Chinese', 'Croatian', 'Czech', 'Danish', 'Dutch', 'English', 'Estonian', 'Faroese', 'Finnish', 'Flemish', 'French', 'Galician', 'Georgian', 'German', 'Greek', 'Gujarati', 'Haitian', 'Haitian Creole', 'Hausa', 'Hawaiian', 'Hebrew', 'Hindi', 'Hungarian', 'Icelandic', 'Indonesian', 'Italian', 'Japanese', 'Javanese', 'Kannada', 'Kazakh', 'Khmer', 'Korean', 'Lao', 'Latin', 'Latvian', 'Letzeburgesch', 'Lingala', 'Lithuanian', 'Luxembourgish', 'Macedonian', 'Malagasy', 'Malay', 'Malayalam', 'Maltese', 'Maori', 'Marathi', 'Moldavian', 'Moldovan', 'Mongolian', 'Myanmar', 'Nepali', 'Norwegian', 'Nynorsk', 'Occitan', 'Panjabi', 'Pashto', 'Persian', 'Polish', 'Portuguese', 'Punjabi', 'Pushto', 'Romanian', 'Russian', 'Sanskrit', 'Serbian', 'Shona', 'Sindhi', 'Sinhala', 'Sinhalese', 'Slovak', 'Slovenian', 'Somali', 'Spanish', 'Sundanese', 'Swahili', 'Swedish', 'Tagalog', 'Tajik', 'Tamil', 'Tatar', 'Telugu', 'Thai', 'Tibetan', 'Turkish', 'Turkmen', 'Ukrainian', 'Urdu', 'Uzbek', 'Valencian', 'Vietnamese', 'Welsh', 'Yiddish', 'Yoruba'] # English_translate = True #@param {type:"boolean"} English_translate=False if lang=='Auto detection': Language=None else: Language=lang if gpu: if English_translate: result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=True,language=Language,task='translate') else: result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=True,language=Language) else: if English_translate: result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=False,language=Language,task='translate') else: result = whisper_model.transcribe(audio_file_path,word_timestamps=True,fp16=False,language=Language) word_count_dict = {} for segment in result['segments']: for word in segment['words']: word_text = word['word'].strip() word_lower = clean_word(word_text).lower() word_start = word['start'] word_end = word['end'] if word_lower not in word_count_dict: # If the word is not in the dictionary, create a new entry word_count_dict[word_lower] = [{'word': word_text, 'start': word_start, 'end': word_end}] else: # If the word is already in the dictionary, append to the existing list word_count_dict[word_lower].append({'word': word_text, 'start': word_start, 'end': word_end}) json_string = json.dumps(word_count_dict) pretty_json_string = json.dumps(word_count_dict, indent=4) # Write the JSON string to the file with open(json_file_path, 'w') as json_file: json_file.write(pretty_json_string) return json_file_path import os import uuid import random import string word_count_video_file_name="" def get_rename_video(): # random_str=str(uuid.uuid4())[:8] random_alpha_str = ''.join(random.choice(string.ascii_letters) for _ in range(10))[:8] random_str=random_alpha_str # List all files in the specified directory file_list = os.listdir(f"{base_path}/download_video/single_video/") # Filter files ending with '.mp4' or '.webm' video_files = [filename for filename in file_list if filename.endswith('.mp4') or filename.endswith('.webm')] if video_files: video_path=f"{base_path}/download_video/single_video/{video_files[-1]}" _,_,f_ex=clean_file_name(video_files[-1]) new_name=f"{base_path}/download_video/{random_str}{f_ex}" shutil.copy(video_path,new_name) return new_name else: return None def count_yt_video(): directory_path=f"{base_path}/download_video/single_video" # List all files in the specified directory file_list = os.listdir(directory_path) # Filter files ending with '.mp4' or '.webm' video_files = [filename for filename in file_list if filename.endswith('.mp4') or filename.endswith('.webm')] return len(video_files) def download_youtube_video(yt_link): current_download_path=f"{base_path}/download_video/single_video" if os.path.exists(current_download_path): shutil.rmtree(current_download_path) os.mkdir(current_download_path) os.chdir(current_download_path) # command=(f"yt-dlp -f bestvideo+bestaudio {yt_link}") command=(f"yt-dlp -f best {yt_link}") var=os.system(command) if var==0: print("youtube video download successful") else: print(command) print("Failed to download") os.chdir(base_path) while True: if count_yt_video()==1: if os.path.exists(current_download_path): video_path=get_rename_video() break else: continue return video_path #@title video edit def float_to_time(seconds): # Extract the integer part (seconds) and the fractional part (milliseconds) # print(seconds) seconds_int = int(seconds) milliseconds = int((seconds - seconds_int) * 1000) # Calculate hours, minutes, and remaining seconds hours = seconds_int // 3600 minutes = (seconds_int % 3600) // 60 remaining_seconds = seconds_int % 60 # Format the time as "HH:MM:SS:SSSS" formatted_time = f"{hours:02}:{minutes:02}:{remaining_seconds:02}.{milliseconds:03}" return formatted_time def seconds_to_milliseconds(seconds): milliseconds = seconds * 1000 return milliseconds # # Example usage: # seconds = 6.54 # formatted_time = float_to_time(seconds) # print(formatted_time) # Output: 00:00:06:540 def get_word_time(json_data,word): global previous_word_duration multiple_times=[] for i in json_data: if i==word.lower(): # print(i,json_data[i]) for j in json_data[i]: # print(j) s=j['start']-previous_word_duration e=j['end'] diif=e-s #take 4 decimal places diff=round(diif,4) start=float_to_time(s) end=float_to_time(e) # start=seconds_to_milliseconds(j['start']) # end=seconds_to_milliseconds(j['end']) # print(start,end) # print(diff) multiple_times.append((start,end,diff)) return multiple_times import cv2 import subprocess def get_video_dimensions(video_path): # Open the video file video_capture = cv2.VideoCapture(video_path) # Check if the video file was successfully opened if not video_capture.isOpened(): print(f"Error: Could not open video file '{video_path}'") return None, None # Get the height and width of the video width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Release the video capture object video_capture.release() return width, height def add_text_to_video(video_path, output_path, text, font_size=90, font_color='white', font_file=f'{base_path}/roboto.ttf',border_size=2, border_color='black'): # Get video dimensions global gpu width, height = get_video_dimensions(video_path) x=width-170 y=50 # command = [ # 'ffmpeg', # '-i', video_path, # '-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}", # '-codec:a', 'copy', # output_path, # '-y' # Overwrite output file if it exists # ] if gpu: command = [ 'ffmpeg', '-hwaccel', 'cuda', # Use CUDA for hardware acceleration '-i', video_path, '-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}:borderw={border_size}:bordercolor={border_color}", '-c:a', 'copy', '-c:v', 'h264_nvenc', # NVIDIA NVENC encoder for video encoding '-y', # Overwrite output file if it exists output_path ] else: command = [ 'ffmpeg', '-i', video_path, '-vf', f"drawtext=fontfile={font_file}:text='{text}':fontcolor={font_color}:fontsize={font_size}:x={x}:y={y}:borderw={border_size}:bordercolor={border_color}", '-codec:a', 'copy', output_path, '-y' # Overwrite output file if it exists ] try: subprocess.run(command, check=True) print(f"Text added to video: {output_path}") print(command) except subprocess.CalledProcessError as e: print(f"Error adding text to video: {e}") import os import uuid def join_video(dir_path,save_path): global gpu files = os.listdir(dir_path) files_sorted = sorted(files, key=lambda x: int(os.path.splitext(x)[0])) output_path = f"{base_path}/join.txt" with open(output_path, "w") as f: for filename in files_sorted: f.write(f"file '{dir_path}/{filename}'\n") if gpu: loop_command = f'ffmpeg -hwaccel cuda -f concat -safe 0 -i {base_path}/join.txt -c copy "{save_path}" -y' else: loop_command = f'ffmpeg -f concat -safe 0 -i {base_path}/join.txt -c copy "{save_path}" -y' loop_result = os.system(loop_command) if loop_result == 0: print (f"video save at {save_path}") return save_path else: print("Failed to merge video") print(loop_command) return None import os import shutil def trim_video(input_video_path,timestamp_list,count): if os.path.exists(f"{base_path}/output"): shutil.rmtree(f"{base_path}/output") os.mkdir(f"{base_path}/output") if os.path.exists(f"{base_path}/text"): shutil.rmtree(f"{base_path}/text") os.mkdir(f"{base_path}/text") file_name, file_extension = os.path.splitext(os.path.basename(input_video_path)) # output_video_path=f"{base_path}/"+file_name+"_output"+file_extension new_str=str(uuid.uuid4())[:8] output_video_path=f"{base_path}/"+new_str+"_output"+file_extension for i in range(len(timestamp_list)): start_time=timestamp_list[i][0] end_time=timestamp_list[i][1] duration=timestamp_list[i][2] # command=f"ffmpeg -ss {start_time} -i {input_video_path} -t {duration} -codec copy ./output/{i+1}.mp4 -y" if gpu: command = f"ffmpeg -hwaccel cuda -ss {start_time} -i {input_video_path} -t {duration} -c:v h264_nvenc ./output/{i+1}.mp4 -y" else: command = f"ffmpeg -ss {start_time} -i {input_video_path} -t {duration} ./output/{i+1}.mp4 -y" var=os.system(command) if var==0: print(command) print(f"video clip {i} save") add_text_to_video(f"{base_path}/output/{i+1}.mp4", f"{base_path}/text/{i+1}.mp4", str(i+1)) else: print(f"video clip {i} failed") print(command) if count: join_video(f"{base_path}/text",output_video_path) else: join_video(f"{base_path}/output",output_video_path) return output_video_path import json import pandas as pd def clear_terminal(): # Clear the terminal screen using ANSI escape codes if os.name == 'posix': # Unix/Linux/MacOS _ = os.system('clear') elif os.name == 'nt': # Windows _ = os.system('cls') def process(json_file_path,video_file_path,find_word,count=True): with open(json_file_path, 'r') as json_file: data = json.load(json_file) df = pd.DataFrame([(key, len(data)) for key, data in data.items()], columns=['Word', 'Word Count']) df = df.sort_values('Word Count', ascending=False) word_list=list(df['Word']) print(word_list) timestamp_list=[] find_word=find_word.lower().strip() if find_word in word_list: timestamp_list=get_word_time(data,find_word) vid1=trim_video(video_file_path,timestamp_list,count) clear_terminal() print(f"File1 saved at {vid1}") return vid1 #@title Download font import os import shutil from tqdm import tqdm import urllib def conditional_download(url, download_file_path): print(f"Downloading {os.path.basename(download_file_path)}") base_path = os.path.dirname(download_file_path) if not os.path.exists(base_path): os.makedirs(base_path) if os.path.exists(download_file_path): os.remove(download_file_path) try: request = urllib.request.urlopen(url) # type: ignore[attr-defined] total = int(request.headers.get('Content-Length', 0)) except urllib.error.URLError as e: print(f"Error: Unable to open the URL - {url}") print(f"Reason: {e.reason}") return with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress: try: urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] except urllib.error.URLError as e: print(f"Error: Failed to download the file from the URL - {url}") print(f"Reason: {e.reason}") return print(f"Download successful!") print(f"URL: {url}") print(f"Save at: {download_file_path}") try: shutil.copy("./Roboto-Black.ttf",f'{base_path}/roboto.ttf') except: pass if not os.path.exists(f'{base_path}/roboto.ttf'): conditional_download("https://github.com/neuralfalcon/Video-Keyword-Cutter/raw/main/Roboto-Black.ttf", f'{base_path}/roboto.ttf') #@title gradio utils def highlight_cols(x): df = x.copy() df.loc[:, :] = 'color: white' df[['Word Count']] = 'color: green' return df old_data=[] def gradio_whisper_config(youtube_video_link, file_path,language): global base_path,word_count_video_file_name,old_data old_data=[] random_str = str(uuid.uuid4())[:8] if youtube_video_link and len(youtube_video_link) >= 1: yt_video_file_path = download_youtube_video(youtube_video_link) _, original_name, exten = clean_file_name(yt_video_file_path) video_file_path = f"{base_path}/download_video/{random_str}{exten}" os.rename(yt_video_file_path, video_file_path) else: if file_path!=None: if len(file_path)>=1: _, original_name, exten = clean_file_name(file_path) if file_path.lower().endswith(".mp4") or file_path.lower().endswith(".webm"): video_file_path = f"{base_path}/download_video/{random_str}{exten}" shutil.copy(file_path, video_file_path) word_count_video_file_name=f"{original_name}_{random_str}" print(video_file_path) audio_file_path = get_audio(video_file_path) print(audio_file_path) json_file_name = f"{random_str}.json" json_file_path = speech_to_text_with_timestamp(audio_file_path, json_file_name,language) with open(json_file_path, 'r') as json_file: data = json.load(json_file) df = pd.DataFrame([(key, len(data)) for key, data in data.items()],columns=['Word', 'Word Count']) df = df.sort_values('Word Count', ascending=False) df_filtered = df[df['Word Count'] >= 5] # Applying the style function s = df_filtered.style.apply(highlight_cols, axis = None) old_data.append(video_file_path) old_data.append(json_file_path) return json_file_path,s def video_edit_gradio(find_word,previous_word_duration,count): global old_data video_file_path=old_data[0] json_file_path=old_data[-1] if len(previous_word_duration)==0: previous_word_duration=0.0 previous_word_duration=float(previous_word_duration) video_path=process(json_file_path,video_file_path,find_word,count) return video_path,video_path #@title Run gradio webapp import gradio as gr whisper_examples = [["https://www.youtube.com/watch?v=PE89NysJEUY&t=9s&ab_channel=CNET",None]] whisper_inputs=[gr.Textbox(label="Enter YouTube Video Link"),gr.File(label="Upload Audio or Video File",type="filepath"),gr.Dropdown(['Auto detection','English','Hindi','Bengali', 'Afrikaans', 'Albanian', 'Amharic', 'Arabic', 'Armenian', 'Assamese', 'Azerbaijani', 'Bashkir', 'Basque', 'Belarusian', 'Bengali', 'Bosnian', 'Breton', 'Bulgarian', 'Burmese', 'Castilian', 'Catalan', 'Chinese', 'Croatian', 'Czech', 'Danish', 'Dutch', 'English', 'Estonian', 'Faroese', 'Finnish', 'Flemish', 'French', 'Galician', 'Georgian', 'German', 'Greek', 'Gujarati', 'Haitian', 'Haitian Creole', 'Hausa', 'Hawaiian', 'Hebrew', 'Hindi', 'Hungarian', 'Icelandic', 'Indonesian', 'Italian', 'Japanese', 'Javanese', 'Kannada', 'Kazakh', 'Khmer', 'Korean', 'Lao', 'Latin', 'Latvian', 'Letzeburgesch', 'Lingala', 'Lithuanian', 'Luxembourgish', 'Macedonian', 'Malagasy', 'Malay', 'Malayalam', 'Maltese', 'Maori', 'Marathi', 'Moldavian', 'Moldovan', 'Mongolian', 'Myanmar', 'Nepali', 'Norwegian', 'Nynorsk', 'Occitan', 'Panjabi', 'Pashto', 'Persian', 'Polish', 'Portuguese', 'Punjabi', 'Pushto', 'Romanian', 'Russian', 'Sanskrit', 'Serbian', 'Shona', 'Sindhi', 'Sinhala', 'Sinhalese', 'Slovak', 'Slovenian', 'Somali', 'Spanish', 'Sundanese', 'Swahili', 'Swedish', 'Tagalog', 'Tajik', 'Tamil', 'Tatar', 'Telugu', 'Thai', 'Tibetan', 'Turkish', 'Turkmen', 'Ukrainian', 'Urdu', 'Uzbek', 'Valencian', 'Vietnamese', 'Welsh', 'Yiddish', 'Yoruba'], label="Select Language", value='Auto detection')] whisper_outputs=[gr.File(label="Download Json File"),gr.DataFrame(label="Count word")] whisper_demo = gr.Interface(fn=gradio_whisper_config, inputs=whisper_inputs,outputs=whisper_outputs , title="Generate word level timestamps using Whisper",examples=whisper_examples) # whisper_demo.launch(share=True,debug=True) video_trim_examples = [["","0.0",True]] video_trim_inputs=[gr.Textbox(label="Which WORD you want to find?"),gr.Textbox(label="Previous word duration threshold",value="0.1"),gr.Checkbox(label="Display how many times the word is used",value=True)] video_trim_outputs=[gr.File(label="Download Video File"),gr.Video(label="Display Video")] video_trim_demo = gr.Interface(fn=video_edit_gradio, inputs=video_trim_inputs,outputs=video_trim_outputs , title="Trim and Marge Video Clips",examples=video_trim_examples) # video_trim_demo.launch(share=True,debug=True) demo = gr.TabbedInterface([whisper_demo,video_trim_demo], ["Generate word level timestamps using Whisper","Trim and Marge Video Clips"]) demo.launch(share=True,debug=False)