|
import os |
|
import uuid |
|
import gradio as gr |
|
import torch |
|
import PIL |
|
from PIL import Image |
|
from pyramid_dit import PyramidDiTForVideoGeneration |
|
from diffusers.utils import export_to_video |
|
from huggingface_hub import snapshot_download |
|
import threading |
|
import random |
|
|
|
|
|
model_cache = {} |
|
|
|
|
|
model_cache_lock = threading.Lock() |
|
|
|
|
|
model_name = "pyramid_flux" |
|
model_repo = "rain1011/pyramid-flow-sd3" if model_name == "pyramid_mmdit" else "rain1011/pyramid-flow-miniflux" |
|
|
|
model_dtype = "bf16" |
|
variants = { |
|
'high': 'diffusion_transformer_768p', |
|
'low': 'diffusion_transformer_384p' |
|
} |
|
required_file = 'config.json' |
|
width_high = 1280 |
|
height_high = 768 |
|
width_low = 640 |
|
height_low = 384 |
|
cpu_offloading = True |
|
|
|
|
|
current_directory = os.getcwd() |
|
model_path = os.path.join(current_directory, "pyramid_flow_model") |
|
|
|
|
|
def download_model_from_hf(model_repo, model_dir, variants, required_file): |
|
need_download = False |
|
if not os.path.exists(model_dir): |
|
print(f"[INFO] Model directory '{model_dir}' does not exist. Initiating download...") |
|
need_download = True |
|
else: |
|
|
|
for variant_key, variant_dir in variants.items(): |
|
variant_path = os.path.join(model_dir, variant_dir) |
|
file_path = os.path.join(variant_path, required_file) |
|
if not os.path.exists(file_path): |
|
print(f"[WARNING] Required file '{required_file}' missing in '{variant_path}'.") |
|
need_download = True |
|
break |
|
|
|
if need_download: |
|
print(f"[INFO] Downloading model from '{model_repo}' to '{model_dir}'...") |
|
try: |
|
snapshot_download( |
|
repo_id=model_repo, |
|
local_dir=model_dir, |
|
local_dir_use_symlinks=False, |
|
repo_type='model' |
|
) |
|
print("[INFO] Model download complete.") |
|
except Exception as e: |
|
print(f"[ERROR] Failed to download the model: {e}") |
|
raise |
|
else: |
|
print(f"[INFO] All required model files are present in '{model_dir}'. Skipping download.") |
|
|
|
|
|
download_model_from_hf(model_repo, model_path, variants, required_file) |
|
|
|
|
|
def initialize_model(variant): |
|
print(f"[INFO] Initializing model with variant='{variant}', using bf16 precision...") |
|
|
|
|
|
variant_dir = variants['high'] if variant == '768p' else variants['low'] |
|
base_path = model_path |
|
|
|
print(f"[DEBUG] Model base path: {base_path}") |
|
|
|
|
|
config_path = os.path.join(model_path, variant_dir, 'config.json') |
|
if not os.path.exists(config_path): |
|
print(f"[ERROR] config.json not found in '{os.path.join(model_path, variant_dir)}'.") |
|
raise FileNotFoundError(f"config.json not found in '{os.path.join(model_path, variant_dir)}'.") |
|
|
|
if model_dtype == "bf16": |
|
torch_dtype_selected = torch.bfloat16 |
|
else: |
|
torch_dtype_selected = torch.float32 |
|
|
|
|
|
try: |
|
|
|
model = PyramidDiTForVideoGeneration( |
|
base_path, |
|
model_name=model_name, |
|
model_dtype=model_dtype, |
|
model_variant=variant_dir, |
|
cpu_offloading=cpu_offloading, |
|
) |
|
|
|
|
|
model.vae.enable_tiling() |
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
torch.cuda.set_device(0) |
|
|
|
if not cpu_offloading: |
|
model.vae.to("cuda") |
|
model.dit.to("cuda") |
|
model.text_encoder.to("cuda") |
|
else: |
|
print("[WARNING] CUDA is not available. Proceeding without GPU.") |
|
|
|
print("[INFO] Model initialized successfully.") |
|
return model, torch_dtype_selected |
|
except Exception as e: |
|
print(f"[ERROR] Error initializing model: {e}") |
|
raise |
|
|
|
|
|
def initialize_model_cached(variant, seed): |
|
key = variant |
|
|
|
if seed == 0: |
|
seed = random.randint(0, 2**8 - 1) |
|
torch.manual_seed(seed) |
|
if torch.cuda.is_available(): |
|
torch.cuda.manual_seed(seed) |
|
torch.cuda.manual_seed_all(seed) |
|
|
|
|
|
if key not in model_cache: |
|
with model_cache_lock: |
|
|
|
if key not in model_cache: |
|
model, dtype = initialize_model(variant) |
|
model_cache[key] = (model, dtype) |
|
|
|
return model_cache[key] |
|
|
|
def resize_crop_image(img: PIL.Image.Image, tgt_width, tgt_height): |
|
ori_width, ori_height = img.width, img.height |
|
scale = max(tgt_width / ori_width, tgt_height / ori_height) |
|
resized_width = round(ori_width * scale) |
|
resized_height = round(ori_height * scale) |
|
img = img.resize((resized_width, resized_height), resample=PIL.Image.LANCZOS) |
|
|
|
left = (resized_width - tgt_width) / 2 |
|
top = (resized_height - tgt_height) / 2 |
|
right = (resized_width + tgt_width) / 2 |
|
bottom = (resized_height + tgt_height) / 2 |
|
|
|
|
|
img = img.crop((left, top, right, bottom)) |
|
|
|
return img |
|
|
|
|
|
def generate_text_to_video(prompt, temp, guidance_scale, video_guidance_scale, resolution, seed, progress=gr.Progress()): |
|
progress(0, desc="Loading model") |
|
print("[DEBUG] generate_text_to_video called.") |
|
variant = '768p' if resolution == "768p" else '384p' |
|
height = height_high if resolution == "768p" else height_low |
|
width = width_high if resolution == "768p" else width_low |
|
|
|
def progress_callback(i, m): |
|
progress(i/m) |
|
|
|
|
|
try: |
|
model, torch_dtype_selected = initialize_model_cached(variant, seed) |
|
except Exception as e: |
|
print(f"[ERROR] Model initialization failed: {e}") |
|
return f"Model initialization failed: {e}" |
|
|
|
try: |
|
print("[INFO] Starting text-to-video generation...") |
|
with torch.no_grad(), torch.autocast('cuda', dtype=torch_dtype_selected): |
|
frames = model.generate( |
|
prompt=prompt, |
|
num_inference_steps=[20, 20, 20], |
|
video_num_inference_steps=[10, 10, 10], |
|
height=height, |
|
width=width, |
|
temp=temp, |
|
guidance_scale=guidance_scale, |
|
video_guidance_scale=video_guidance_scale, |
|
output_type="pil", |
|
cpu_offloading=cpu_offloading, |
|
save_memory=True, |
|
callback=progress_callback, |
|
) |
|
print("[INFO] Text-to-video generation completed.") |
|
except Exception as e: |
|
print(f"[ERROR] Error during text-to-video generation: {e}") |
|
return f"Error during video generation: {e}" |
|
|
|
video_path = f"{str(uuid.uuid4())}_text_to_video_sample.mp4" |
|
try: |
|
export_to_video(frames, video_path, fps=24) |
|
print(f"[INFO] Video exported to {video_path}.") |
|
except Exception as e: |
|
print(f"[ERROR] Error exporting video: {e}") |
|
return f"Error exporting video: {e}" |
|
return video_path |
|
|
|
|
|
def generate_image_to_video(image, prompt, temp, video_guidance_scale, resolution, seed, progress=gr.Progress()): |
|
progress(0, desc="Loading model") |
|
print("[DEBUG] generate_image_to_video called.") |
|
variant = '768p' if resolution == "768p" else '384p' |
|
height = height_high if resolution == "768p" else height_low |
|
width = width_high if resolution == "768p" else width_low |
|
|
|
try: |
|
image = resize_crop_image(image, width, height) |
|
print("[INFO] Image resized and cropped successfully.") |
|
except Exception as e: |
|
print(f"[ERROR] Error processing image: {e}") |
|
return f"Error processing image: {e}" |
|
|
|
def progress_callback(i, m): |
|
progress(i/m) |
|
|
|
|
|
try: |
|
model, torch_dtype_selected = initialize_model_cached(variant, seed) |
|
except Exception as e: |
|
print(f"[ERROR] Model initialization failed: {e}") |
|
return f"Model initialization failed: {e}" |
|
|
|
try: |
|
print("[INFO] Starting image-to-video generation...") |
|
with torch.no_grad(), torch.autocast('cuda', dtype=torch_dtype_selected): |
|
frames = model.generate_i2v( |
|
prompt=prompt, |
|
input_image=image, |
|
num_inference_steps=[10, 10, 10], |
|
temp=temp, |
|
video_guidance_scale=video_guidance_scale, |
|
output_type="pil", |
|
cpu_offloading=cpu_offloading, |
|
save_memory=True, |
|
callback=progress_callback, |
|
) |
|
print("[INFO] Image-to-video generation completed.") |
|
except Exception as e: |
|
print(f"[ERROR] Error during image-to-video generation: {e}") |
|
return f"Error during video generation: {e}" |
|
|
|
video_path = f"{str(uuid.uuid4())}_image_to_video_sample.mp4" |
|
try: |
|
export_to_video(frames, video_path, fps=24) |
|
print(f"[INFO] Video exported to {video_path}.") |
|
except Exception as e: |
|
print(f"[ERROR] Error exporting video: {e}") |
|
return f"Error exporting video: {e}" |
|
return video_path |
|
|
|
def update_slider(resolution): |
|
if resolution == "768p": |
|
return [gr.update(maximum=31), gr.update(maximum=31)] |
|
else: |
|
return [gr.update(maximum=16), gr.update(maximum=16)] |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown( |
|
""" |
|
# Pyramid Flow Video Generation Demo |
|
|
|
Pyramid Flow is a training-efficient **Autoregressive Video Generation** model based on **Flow Matching**. It is trained only on open-source datasets within 20.7k A100 GPU hours. |
|
|
|
[[Paper]](https://arxiv.org/abs/2410.05954) [[Project Page]](https://pyramid-flow.github.io) [[Code]](https://github.com/jy0205/Pyramid-Flow) [[Model]](https://huggingface.co./rain1011/pyramid-flow-sd3) |
|
""" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
resolution_dropdown = gr.Dropdown( |
|
choices=["768p", "384p"], |
|
value="384p", |
|
label="Model Resolution" |
|
) |
|
|
|
with gr.Tab("Text-to-Video"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
text_prompt = gr.Textbox(label="Prompt (Less than 128 words)", placeholder="Enter a text prompt for the video", lines=2) |
|
temp_slider = gr.Slider(1, 16, value=16, step=1, label="Duration") |
|
guidance_scale_slider = gr.Slider(1.0, 15.0, value=9.0, step=0.1, label="Guidance Scale") |
|
video_guidance_scale_slider = gr.Slider(1.0, 10.0, value=5.0, step=0.1, label="Video Guidance Scale") |
|
text_seed = gr.Number(label="Inference Seed (Enter a positive number, 0 for random)", value=0) |
|
txt_generate = gr.Button("Generate Video") |
|
with gr.Column(): |
|
txt_output = gr.Video(label="Generated Video") |
|
gr.Examples( |
|
examples=[ |
|
["A movie trailer featuring the adventures of the 30 year old space man wearing a red wool knitted motorcycle helmet, blue sky, salt desert, cinematic style, shot on 35mm film, vivid colors", 16, 7.0, 5.0, "384p"], |
|
["Beautiful, snowy Tokyo city is bustling. The camera moves through the bustling city street, following several people enjoying the beautiful snowy weather and shopping at nearby stalls. Gorgeous sakura petals are flying through the wind along with snowflakes", 16, 7.0, 5.0, "384p"], |
|
|
|
], |
|
inputs=[text_prompt, temp_slider, guidance_scale_slider, video_guidance_scale_slider, resolution_dropdown, text_seed], |
|
outputs=[txt_output], |
|
fn=generate_text_to_video, |
|
cache_examples='lazy', |
|
) |
|
|
|
with gr.Tab("Image-to-Video"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
image_input = gr.Image(type="pil", label="Input Image") |
|
image_prompt = gr.Textbox(label="Prompt (Less than 128 words)", placeholder="Enter a text prompt for the video", lines=2) |
|
image_temp_slider = gr.Slider(2, 16, value=16, step=1, label="Duration") |
|
image_video_guidance_scale_slider = gr.Slider(1.0, 7.0, value=4.0, step=0.1, label="Video Guidance Scale") |
|
image_seed = gr.Number(label="Inference Seed (Enter a positive number, 0 for random)", value=0) |
|
img_generate = gr.Button("Generate Video") |
|
with gr.Column(): |
|
img_output = gr.Video(label="Generated Video") |
|
gr.Examples( |
|
examples=[ |
|
['assets/the_great_wall.jpg', 'FPV flying over the Great Wall', 16, 4.0, "384p"] |
|
], |
|
inputs=[image_input, image_prompt, image_temp_slider, image_video_guidance_scale_slider, resolution_dropdown, image_seed], |
|
outputs=[img_output], |
|
fn=generate_image_to_video, |
|
cache_examples='lazy', |
|
) |
|
|
|
|
|
txt_generate.click( |
|
generate_text_to_video, |
|
inputs=[text_prompt, temp_slider, guidance_scale_slider, video_guidance_scale_slider, resolution_dropdown, text_seed], |
|
outputs=txt_output |
|
) |
|
|
|
img_generate.click( |
|
generate_image_to_video, |
|
inputs=[image_input, image_prompt, image_temp_slider, image_video_guidance_scale_slider, resolution_dropdown, image_seed], |
|
outputs=img_output |
|
) |
|
resolution_dropdown.change( |
|
fn=update_slider, |
|
inputs=resolution_dropdown, |
|
outputs=[temp_slider, image_temp_slider] |
|
) |
|
|
|
|
|
demo.launch(share=False) |
|
|