import os import copy import json import time import torch import argparse from PIL import Image import numpy as np import soundfile as sf from tqdm import tqdm from diffusers import DDPMScheduler from models import build_pretrained_models, AudioDiffusion from transformers import AutoProcessor, ClapModel import torchaudio import tools.torch_tools as torch_tools from datasets import load_dataset # Check if CUDA is available and set the device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("Using device:", device) class dotdict(dict): """dot.notation access to dictionary attributes""" __getattr__ = dict.get __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] def parse_args(): parser = argparse.ArgumentParser(description="Inference for text to audio generation task.") parser.add_argument( "--original_args", type=str, default=None, help="Path for summary jsonl file saved during training." ) parser.add_argument( "--model", type=str, default=None, help="Path for saved model bin file." ) parser.add_argument( "--vae_model", type=str, default="audioldm-s-full", help="Path for saved model bin file." ) parser.add_argument( "--num_steps", type=int, default=200, help="How many denoising steps for generation.", ) parser.add_argument( "--guidance", type=float, default=3, help="Guidance scale for classifier free guidance." ) parser.add_argument( "--batch_size", type=int, default=1, help="Batch size for generation.", ) parser.add_argument( "--num_samples", type=int, default=1, help="How many samples per prompt.", ) parser.add_argument( "--num_test_instances", type=int, default=-1, help="How many test instances to evaluate.", ) parser.add_argument( "--sample_rate", type=int, default=16000, help="Sample rate for audio output.", ) parser.add_argument( "--max_duration", type=int, default=10, help="Maximum length duration for generated audio." ) parser.add_argument( "--save_dir", type=str, default="./outputs/tmp", help="output save dir" ) parser.add_argument( "--data_path", type=str, default="data/video_processed/video_gt_augment", help="inference data path" ) args = parser.parse_args() return args def main(): args = parse_args() train_args = dotdict(json.loads(open(args.original_args).readlines()[0])) if "hf_model" not in train_args: train_args["hf_model"] = None # Load Models # name = train_args.vae_model vae, stft = build_pretrained_models(name) vae, stft = vae.to(device), stft.to(device) # Ensure models are on the correct device model_class = AudioDiffusion if train_args.ib: print("*****USING MODEL IMAGEBIND*****") from models_imagebind import AudioDiffusion_IB model_class = AudioDiffusion_IB elif train_args.lb: print("*****USING MODEL LANGUAGEBIND*****") from models_languagebind import AudioDiffusion_LB model_class = AudioDiffusion_LB elif train_args.jepa: print("*****USING MODEL JEPA*****") from models_vjepa import AudioDiffusion_JEPA model_class = AudioDiffusion_JEPA model = model_class( train_args.fea_encoder_name, train_args.scheduler_name, train_args.unet_model_name, train_args.unet_model_config, train_args.snr_gamma, train_args.freeze_text_encoder, train_args.uncondition, train_args.img_pretrained_model_path, train_args.task, train_args.embedding_dim, train_args.pe ) model.eval() # Load Trained Weight # try: if args.model.endswith(".pt") or args.model.endswith(".bin"): model.load_state_dict(torch.load(args.model, map_location=device), strict=False) else: from safetensors.torch import load_model load_model(model, args.model, strict=False) except OSError as e: print(f"Error loading model with safetensors: {e}") print("Falling back to torch.load") model.load_state_dict(torch.load(args.model, map_location=device), strict=False) model.to(device) scheduler = DDPMScheduler.from_pretrained(train_args.scheduler_name, subfolder="scheduler") sample_rate = args.sample_rate # Define max_len_in_seconds globally for consistency max_len_in_seconds = args.max_duration def audio_text_matching(waveforms, text, sample_freq=24000, max_len_in_seconds=max_len_in_seconds): new_freq = 48000 resampled = [] for wav in waveforms: x = torchaudio.functional.resample(torch.tensor(wav, dtype=torch.float).reshape(1, -1), orig_freq=sample_freq, new_freq=new_freq)[0].numpy() resampled.append(x[:new_freq*max_len_in_seconds]) inputs = clap_processor(text=text, audios=resampled, return_tensors="pt", padding=True, sampling_rate=48000) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = clap(**inputs) logits_per_audio = outputs.logits_per_audio ranks = torch.argsort(logits_per_audio.flatten(), descending=True).cpu().numpy() return ranks # Load Data # if train_args.prefix: prefix = train_args.prefix else: prefix = "" data_path = args.data_path wavname = [f"{name.split('.')[0]}.wav" for name in os.listdir(data_path)] video_features = [] for video_file in os.listdir(data_path): video_path = os.path.join(data_path, video_file) video_feature = torch_tools.load_video(video_path, frame_rate=2, size=224) print(video_feature.shape) video_features.append(video_feature.to(device)) # Move to device # Generate # num_steps, guidance, batch_size, num_samples = args.num_steps, args.guidance, args.batch_size, args.num_samples all_outputs = [] for k in tqdm(range(0, len(wavname), batch_size)): with torch.no_grad(): prompt = video_features[k: k+batch_size] latents = model.inference(scheduler, None, prompt, None, num_steps, guidance, num_samples, disable_progress=True, device=device) mel = vae.decode_first_stage(latents) wave = vae.decode_to_waveform(mel) # Ensure the waveform is exactly 8 seconds long num_samples_n_seconds = sample_rate * max_len_in_seconds wave = [wav[:num_samples_n_seconds] for wav in wave] all_outputs += [item for item in wave] # Save # exp_id = str(int(time.time())) if not os.path.exists("outputs"): os.makedirs("outputs") if num_samples == 1: output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}_augment".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate) os.makedirs(output_dir, exist_ok=True) for j, wav in enumerate(all_outputs): sf.write("{}/{}".format(output_dir, wavname[j]), wav, samplerate=sample_rate) else: for i in range(num_samples): output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1) os.makedirs(output_dir, exist_ok=True) groups = list(chunks(all_outputs, num_samples)) for k in tqdm(range(len(groups))): wavs_for_text = groups[k] rank = audio_text_matching(wavs_for_text, text_prompts[k]) ranked_wavs_for_text = [wavs_for_text[r] for r in rank] for i, wav in enumerate(ranked_wavs_for_text): output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1) sf.write("{}/{}".format(output_dir, wavname[k]), wav, samplerate=sample_rate) if __name__ == "__main__": main()