File size: 5,484 Bytes
030159f 5401faf 030159f bcdcfae 030159f bcdcfae 030159f bcdcfae 030159f bcdcfae 030159f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
from typing import Dict, Any, Union, List
import torch
from diffusers import (
CogVideoXPipeline,
CogVideoXDPMScheduler,
CogVideoXVideoToVideoPipeline,
CogVideoXImageToVideoPipeline
)
from diffusers.utils import load_video, load_image
from PIL import Image
import base64
import io
import numpy as np
class EndpointHandler:
def __init__(self, path: str = ""):
"""Initialize the CogVideoX pipeline.
Args:
path (str): Path to the model weights
"""
# Initialize pipeline with bfloat16 for optimal performance as recommended in docs
self.pipe = CogVideoXPipeline.from_pretrained(
path or "jbilcke-hf/CogVideoX-Fun-V1.5-5b-for-InferenceEndpoints",
torch_dtype=torch.bfloat16
).to("cuda")
# Set up the scheduler with trailing timesteps as shown in example
self.pipe.scheduler = CogVideoXDPMScheduler.from_config(
self.pipe.scheduler.config,
timestep_spacing="trailing"
)
# those two pipelines - generated by Claude - are interesting, but loading it all at once is too much.
# # Initialize video-to-video pipeline
# self.pipe_video = CogVideoXVideoToVideoPipeline.from_pretrained(
# path or "jbilcke-hf/CogVideoX-Fun-V1.5-5b-for-InferenceEndpoints",
# transformer=self.pipe.transformer,
# vae=self.pipe.vae,
# scheduler=self.pipe.scheduler,
# tokenizer=self.pipe.tokenizer,
# text_encoder=self.pipe.text_encoder,
# torch_dtype=torch.bfloat16
# ).to("cuda")
#
# # Initialize image-to-video pipeline
# self.pipe_image = CogVideoXImageToVideoPipeline.from_pretrained(
# path or "THUDM/CogVideoX1.5-5B-I2V",
# vae=self.pipe.vae,
# scheduler=self.pipe.scheduler,
# tokenizer=self.pipe.tokenizer,
# text_encoder=self.pipe.text_encoder,
# torch_dtype=torch.bfloat16
# ).to("cuda")
def _decode_base64_to_image(self, base64_string: str) -> Image.Image:
"""Convert base64 string to PIL Image."""
image_data = base64.b64decode(base64_string)
image = Image.open(io.BytesIO(image_data))
return image
def _encode_video_to_base64(self, video_frames: List[np.ndarray]) -> str:
"""Convert video frames to base64 string."""
# Convert frames to a video file in memory
import imageio
output_bytes = io.BytesIO()
imageio.mimsave(output_bytes, video_frames, format='mp4', fps=8)
return base64.b64encode(output_bytes.getvalue()).decode('utf-8')
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process the input data and generate video using CogVideoX.
Args:
data (Dict[str, Any]): Input data containing:
- prompt (str): Text prompt for generation
- image (str, optional): Base64 encoded image for image-to-video
- video (str, optional): Base64 encoded video for video-to-video
- num_inference_steps (int, optional): Number of inference steps
- guidance_scale (float, optional): Guidance scale for generation
Returns:
Dict[str, Any]: Generated video as base64 string
"""
# Extract parameters from input
prompt = data.get("prompt", "")
num_inference_steps = data.get("num_inference_steps", 50)
guidance_scale = data.get("guidance_scale", 7.0)
# Set up generation parameters
generation_kwargs = {
"prompt": prompt,
"num_inference_steps": num_inference_steps,
"guidance_scale": guidance_scale,
"num_videos_per_prompt": 1,
"use_dynamic_cfg": True,
"output_type": "np", # Get numpy array output
}
# Handle different input types
if "image" in data:
# Image to video generation
input_image = self._decode_base64_to_image(data["image"])
input_image = input_image.resize((720, 480)) # Resize as per example
image = load_image(input_image)
#raise ValueError("image to video isn't supported yet (takes up too much RAM right now)")
return {"error": "Image to video generation not yet supported"}
#video_frames = self.pipe_image(
# image=image,
# **generation_kwargs
#).frames[0]
elif "video" in data:
# Video to video generation
# TODO: Implement video loading from base64
# For now, returning error
return {"error": "Video to video generation not yet supported"}
else:
# Text to video generation
generation_kwargs["num_frames"] = 49 # As per example
video_frames = self.pipe(**generation_kwargs).frames[0]
# Convert output to base64
video_base64 = self._encode_video_to_base64(video_frames)
return {
"video": video_base64
}
def cleanup(self):
"""Cleanup the model and free GPU memory."""
# Move models to CPU to free GPU memory
self.pipe.to("cpu")
#self.pipe_video.to("cpu")
#self.pipe_image.to("cpu")
# Clear CUDA cache
torch.cuda.empty_cache() |