vta-ldm / app.py
fffiloni's picture
Update app.py
4b9083b verified
raw
history blame
7.64 kB
import gradio as gr
import huggingface_hub
import os
import subprocess
import threading
import shutil
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from moviepy.editor import VideoFileClip, AudioFileClip
# download model
huggingface_hub.snapshot_download(
repo_id='ariesssxu/vta-ldm-clip4clip-v-large',
local_dir='./ckpt/vta-ldm-clip4clip-v-large'
)
def stream_output(pipe):
for line in iter(pipe.readline, ''):
print(line, end='')
def print_directory_contents(path):
for root, dirs, files in os.walk(path):
level = root.replace(path, '').count(os.sep)
indent = ' ' * 4 * (level)
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 4 * (level + 1)
for f in files:
print(f"{subindent}{f}")
# Print the ckpt directory contents
print_directory_contents('./ckpt')
def get_wav_files(path):
wav_files = [] # Initialize an empty list to store the paths of .wav files
for root, dirs, files in os.walk(path):
level = root.replace(path, '').count(os.sep)
indent = ' ' * 4 * (level)
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 4 * (level + 1)
for f in files:
file_path = os.path.join(root, f)
if f.lower().endswith('.wav'):
wav_files.append(file_path) # Add .wav file paths to the list
print(f"{subindent}{file_path}")
else:
print(f"{subindent}{f}")
return wav_files # Return the list of .wav file paths
def check_outputs_folder(folder_path):
# Check if the folder exists
if os.path.exists(folder_path) and os.path.isdir(folder_path):
# Delete all contents inside the folder
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path) # Remove file or link
elif os.path.isdir(file_path):
shutil.rmtree(file_path) # Remove directory
except Exception as e:
print(f'Failed to delete {file_path}. Reason: {e}')
else:
print(f'The folder {folder_path} does not exist.')
def plot_spectrogram(wav_file, output_image):
# Read the WAV file
sample_rate, audio_data = wavfile.read(wav_file)
# Check if audio_data is stereo (2 channels) and convert it to mono (1 channel) if needed
if len(audio_data.shape) == 2:
audio_data = audio_data.mean(axis=1)
# Create a plot for the spectrogram
plt.figure(figsize=(10, 2))
plt.specgram(audio_data, Fs=sample_rate, NFFT=1024, noverlap=512, cmap='gray', aspect='auto')
# Remove gridlines and ticks for a cleaner look
plt.grid(False)
plt.xticks([])
plt.yticks([])
# Save the plot as an image file
plt.savefig(output_image, bbox_inches='tight', pad_inches=0, dpi=300)
plt.close
def merge_audio_to_video(input_vid, input_aud):
# Load the video file
video = VideoFileClip(input_vid)
# Load the new audio file
new_audio = AudioFileClip(input_aud)
# Set the new audio to the video
video_with_new_audio = video.set_audio(new_audio)
# Save the result to a new file
video_with_new_audio.write_videofile("output_video.mp4", codec='libx264', audio_codec='aac')
return "output_video.mp4"
def infer(video_in):
# check if 'outputs' dir exists and empty it if necessary
check_outputs_folder('./outputs/tmp')
# Need to find path to gradio temp vid from video input
print(f"VIDEO IN PATH: {video_in}")
# Get the directory name
folder_path = os.path.dirname(video_in)
# Path to the input video file
input_video_path = video_in
# Load the video file
video = VideoFileClip(input_video_path)
# Get the length of the video in seconds
video_duration = int(video.duration)
print(f"Video duration: {video_duration} seconds")
# Check if the video duration is more than 10 seconds
if video_duration > 10:
# Cut the video to the first 10 seconds
cut_video = video.subclip(0, 10)
video_duration = 10
# Extract the directory and filename
dir_name = os.path.dirname(input_video_path)
base_name = os.path.basename(input_video_path)
# Generate the new filename
new_base_name = base_name.replace(".mp4", "_10sec_cut.mp4")
output_video_path = os.path.join(dir_name, new_base_name)
# Save the cut video
cut_video.write_videofile(output_video_path, codec='libx264', audio_codec='aac')
print(f"Cut video saved as: {output_video_path}")
video_in = output_video_path
# Delete the original video file
os.remove(input_video_path)
print(f"Original video file {input_video_path} deleted.")
else:
print("Video is 10 seconds or shorter; no cutting needed.")
# Execute the inference command
command = ['python', 'inference_from_video.py',
'--original_args', 'ckpt/vta-ldm-clip4clip-v-large/summary.jsonl',
'--model', 'ckpt/vta-ldm-clip4clip-v-large/pytorch_model_2.bin',
'--data_path', folder_path,
'--max_duration', f"{video_duration}"
]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1)
# Create threads to handle stdout and stderr
stdout_thread = threading.Thread(target=stream_output, args=(process.stdout,))
stderr_thread = threading.Thread(target=stream_output, args=(process.stderr,))
# Start the threads
stdout_thread.start()
stderr_thread.start()
# Wait for the process to complete and the threads to finish
process.wait()
stdout_thread.join()
stderr_thread.join()
print("Inference script finished with return code:", process.returncode)
# Need to find where are the results stored, default should be "./outputs/tmp"
# Print the outputs directory contents
print_directory_contents('./outputs/tmp')
wave_files = get_wav_files('./outputs/tmp')
print(wave_files)
plot_spectrogram(wave_files[0], 'spectrogram.png')
final_merged_out = merge_audio_to_video(video_in, wave_files[0])
return wave_files[0], 'spectrogram.png', final_merged_out
css = """
#col-container{
max-width: 800px;
margin: 0 auto;
}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("# Video-to-Audio Generation with Hidden Alignment")
gr.HTML("""
<div style="display:flex;column-gap:4px;">
<a href='https://sites.google.com/view/vta-ldm'>
<img src='https://img.shields.io/badge/Project-Page-Green'>
</a>
<a href='https://huggingface.co./papers/2407.07464'>
<img src='https://img.shields.io/badge/HF-Paper-red'>
</a>
</div>
""")
with gr.Column():
video_in = gr.Video(label='Video IN')
submit_btn = gr.Button("Submit")
output_sound = gr.Audio(label="Audio OUT")
output_spectrogram = gr.Image(label='Spectrogram')
merged_out = gr.Video(label="Merged video + generated audio")
submit_btn.click(
fn = infer,
inputs = [video_in],
outputs = [output_sound, output_spectrogram, merged_out],
show_api = False
)
demo.launch(show_api=False, show_error=True)