File size: 6,489 Bytes
f378009
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os, random, time
import torch
from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler
from tqdm import tqdm

from memo.models.audio_proj import AudioProjModel
from memo.models.image_proj import ImageProjModel
from memo.models.unet_2d_condition import UNet2DConditionModel
from memo.models.unet_3d import UNet3DConditionModel
from memo.pipelines.video_pipeline import VideoPipeline
from memo.utils.audio_utils import extract_audio_emotion_labels, preprocess_audio, resample_audio
from memo.utils.vision_utils import preprocess_image, tensor_to_video

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
weight_dtype = torch.bfloat16

with torch.inference_mode():
    vae = AutoencoderKL.from_pretrained("/content/memo/checkpoints/vae").to(device=device, dtype=weight_dtype)
    reference_net = UNet2DConditionModel.from_pretrained("/content/memo/checkpoints", subfolder="reference_net", use_safetensors=True)
    diffusion_net = UNet3DConditionModel.from_pretrained("/content/memo/checkpoints", subfolder="diffusion_net", use_safetensors=True)
    image_proj = ImageProjModel.from_pretrained("/content/memo/checkpoints", subfolder="image_proj", use_safetensors=True)
    audio_proj = AudioProjModel.from_pretrained("/content/memo/checkpoints", subfolder="audio_proj", use_safetensors=True)

    vae.requires_grad_(False).eval()
    reference_net.requires_grad_(False).eval()
    diffusion_net.requires_grad_(False).eval()
    image_proj.requires_grad_(False).eval()
    audio_proj.requires_grad_(False).eval()
    reference_net.enable_xformers_memory_efficient_attention()
    diffusion_net.enable_xformers_memory_efficient_attention()

    noise_scheduler = FlowMatchEulerDiscreteScheduler()
    pipeline = VideoPipeline(vae=vae, reference_net=reference_net, diffusion_net=diffusion_net, scheduler=noise_scheduler, image_proj=image_proj)
    pipeline.to(device=device, dtype=weight_dtype)

@torch.inference_mode()
def generate(input_video, input_audio, seed):
    resolution = 512
    num_generated_frames_per_clip = 16
    fps = 30
    num_init_past_frames = 2
    num_past_frames = 16
    inference_steps = 20
    cfg_scale = 3.5

    if seed == 0:
        random.seed(int(time.time()))
        seed = random.randint(0, 18446744073709551615)

    generator = torch.manual_seed(seed)
    img_size = (resolution, resolution)
    pixel_values, face_emb = preprocess_image(face_analysis_model="/content/memo/checkpoints/misc/face_analysis", image_path=input_video, image_size=resolution)

    output_dir = "/content/memo/outputs"
    os.makedirs(output_dir, exist_ok=True)
    cache_dir = os.path.join(output_dir, "audio_preprocess")
    os.makedirs(cache_dir, exist_ok=True)
    input_audio = resample_audio(input_audio, os.path.join(cache_dir, f"{os.path.basename(input_audio).split('.')[0]}-16k.wav"))

    audio_emb, audio_length = preprocess_audio(
        wav_path=input_audio,
        num_generated_frames_per_clip=num_generated_frames_per_clip,
        fps=fps,
        wav2vec_model="/content/memo/checkpoints/wav2vec2",
        vocal_separator_model="/content/memo/checkpoints/misc/vocal_separator/Kim_Vocal_2.onnx",
        cache_dir=cache_dir,
        device=device,
    )
    audio_emotion, num_emotion_classes = extract_audio_emotion_labels(
        model="/content/memo/checkpoints",
        wav_path=input_audio,
        emotion2vec_model="/content/memo/checkpoints/emotion2vec_plus_large",
        audio_length=audio_length,
        device=device,
    )

    video_frames = []
    num_clips = audio_emb.shape[0] // num_generated_frames_per_clip
    for t in tqdm(range(num_clips), desc="Generating video clips"):
        if len(video_frames) == 0:
            past_frames = pixel_values.repeat(num_init_past_frames, 1, 1, 1)
            past_frames = past_frames.to(dtype=pixel_values.dtype, device=pixel_values.device)
            pixel_values_ref_img = torch.cat([pixel_values, past_frames], dim=0)
        else:
            past_frames = video_frames[-1][0]
            past_frames = past_frames.permute(1, 0, 2, 3)
            past_frames = past_frames[0 - num_past_frames :]
            past_frames = past_frames * 2.0 - 1.0
            past_frames = past_frames.to(dtype=pixel_values.dtype, device=pixel_values.device)
            pixel_values_ref_img = torch.cat([pixel_values, past_frames], dim=0)

        pixel_values_ref_img = pixel_values_ref_img.unsqueeze(0)
        audio_tensor = (audio_emb[t * num_generated_frames_per_clip : min((t + 1) * num_generated_frames_per_clip, audio_emb.shape[0])].unsqueeze(0).to(device=audio_proj.device, dtype=audio_proj.dtype))
        audio_tensor = audio_proj(audio_tensor)
        audio_emotion_tensor = audio_emotion[t * num_generated_frames_per_clip : min((t + 1) * num_generated_frames_per_clip, audio_emb.shape[0])]

        pipeline_output = pipeline(
            ref_image=pixel_values_ref_img,
            audio_tensor=audio_tensor,
            audio_emotion=audio_emotion_tensor,
            emotion_class_num=num_emotion_classes,
            face_emb=face_emb,
            width=img_size[0],
            height=img_size[1],
            video_length=num_generated_frames_per_clip,
            num_inference_steps=inference_steps,
            guidance_scale=cfg_scale,
            generator=generator,
        )
        video_frames.append(pipeline_output.videos)

    video_frames = torch.cat(video_frames, dim=2)
    video_frames = video_frames.squeeze(0)
    video_frames = video_frames[:, :audio_length]

    video_path = f"/content/memo-{seed}-tost.mp4"
    tensor_to_video(video_frames, video_path, input_audio, fps=fps)

    return video_path

import gradio as gr

with gr.Blocks(css=".gradio-container {max-width: 1080px !important}", analytics_enabled=False) as demo:
    with gr.Row():
        with gr.Column():
            input_video = gr.Image(label="Upload Input Image", type="filepath")
            input_audio = gr.Audio(label="Upload Input Audio", type="filepath")
            seed = gr.Number(label="Seed (0 for Random)", value=0, precision=0)
        with gr.Column():
            video_output = gr.Video(label="Generated Video")
            generate_button = gr.Button("Generate")

    generate_button.click(
        fn=generate,
        inputs=[input_video, input_audio, seed],
        outputs=[video_output],
    )

demo.queue().launch(inline=False, share=False, debug=True, server_name='0.0.0.0', server_port=7860, allowed_paths=["/content"])