English
Inference Endpoints
flux-test4 / handler.py
John6666's picture
Upload 2 files
4bdfc27 verified
# https://github.com/sayakpaul/diffusers-torchao
# https://github.com/pytorch/ao/releases
# https://developer.nvidia.com/cuda-gpus
import os
from typing import Any, Dict
import gc
import time
from PIL import Image
from huggingface_hub import hf_hub_download
import torch
from torchao.quantization import quantize_, autoquant, int8_dynamic_activation_int8_weight, int8_dynamic_activation_int4_weight, float8_dynamic_activation_float8_weight
from torchao.quantization.quant_api import PerRow
from diffusers import FluxPipeline, FluxTransformer2DModel, AutoencoderKL, TorchAoConfig
from transformers import T5EncoderModel, BitsAndBytesConfig
from optimum.quanto import freeze, qfloat8, quantize
from para_attn.first_block_cache.diffusers_adapters import apply_cache_on_pipe
from huggingface_inference_toolkit.logging import logger
import subprocess
subprocess.run("pip list", shell=True)
print("device name:", torch.cuda.get_device_name())
print("device capability:", torch.cuda.get_device_capability())
IS_TURBO = False
IS_4BIT = False
IS_PARA = True
IS_LVRAM = False
IS_COMPILE = True
IS_AUTOQ = False
IS_CC90 = True if torch.cuda.get_device_capability() >= (9, 0) else False
IS_CC89 = True if torch.cuda.get_device_capability() >= (8, 9) else False
# Set high precision for float32 matrix multiplications.
# This setting optimizes performance on NVIDIA GPUs with Ampere architecture (e.g., A100, RTX 30 series) or newer.
torch.set_float32_matmul_precision("high")
if IS_COMPILE:
import torch._dynamo
torch._dynamo.config.suppress_errors = True
def print_vram():
free = torch.cuda.mem_get_info()[0] / (1024 ** 3)
total = torch.cuda.mem_get_info()[1] / (1024 ** 3)
print(f"VRAM: {total - free:.2f}/{total:.2f}GB")
def pil_to_base64(image: Image.Image, modelname: str, prompt: str, height: int, width: int, steps: int, cfg: float, seed: int) -> str:
import base64
from io import BytesIO
import json
from PIL import PngImagePlugin
metadata = {"prompt": prompt, "num_inference_steps": steps, "guidance_scale": cfg, "seed": seed, "resolution": f"{width} x {height}",
"Model": {"Model": modelname.split("/")[-1]}}
info = PngImagePlugin.PngInfo()
info.add_text("metadata", json.dumps(metadata))
buffered = BytesIO()
image.save(buffered, "PNG", pnginfo=info)
return base64.b64encode(buffered.getvalue()).decode('ascii')
def load_te2(repo_id: str, dtype: torch.dtype) -> Any:
if IS_4BIT:
nf4_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=torch.bfloat16)
text_encoder_2 = T5EncoderModel.from_pretrained(repo_id, subfolder="text_encoder_2", torch_dtype=dtype, quantization_config=nf4_config)
else:
text_encoder_2 = T5EncoderModel.from_pretrained(repo_id, subfolder="text_encoder_2", torch_dtype=dtype)
quantize(text_encoder_2, weights=qfloat8)
freeze(text_encoder_2)
return text_encoder_2
def load_pipeline_stable(repo_id: str, dtype: torch.dtype) -> Any:
quantization_config = TorchAoConfig("int4dq" if IS_4BIT else "float8dq" if IS_CC90 else "int8wo")
vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype)
pipe = FluxPipeline.from_pretrained(repo_id, vae=vae, text_encoder_2=load_te2(repo_id, dtype), torch_dtype=dtype, quantization_config=quantization_config)
pipe.transformer.fuse_qkv_projections()
pipe.vae.fuse_qkv_projections()
return pipe
def load_pipeline_lowvram(repo_id: str, dtype: torch.dtype) -> Any:
int4_config = TorchAoConfig("int4dq")
float8_config = TorchAoConfig("float8dq")
vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype)
transformer = FluxTransformer2DModel.from_pretrained(repo_id, subfolder="transformer", torch_dtype=dtype, quantization_config=float8_config)
pipe = FluxPipeline.from_pretrained(repo_id, vae=None, transformer=None, text_encoder_2=load_te2(repo_id, dtype), torch_dtype=dtype, quantization_config=int4_config)
pipe.transformer = transformer
pipe.vae = vae
#pipe.transformer.fuse_qkv_projections()
#pipe.vae.fuse_qkv_projections()
pipe.to("cuda")
return pipe
def load_pipeline_compile(repo_id: str, dtype: torch.dtype) -> Any:
quantization_config = TorchAoConfig("int4dq" if IS_4BIT else "float8dq" if IS_CC90 else "int8wo")
vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype)
pipe = FluxPipeline.from_pretrained(repo_id, vae=vae, text_encoder_2=load_te2(repo_id, dtype), torch_dtype=dtype, quantization_config=quantization_config)
pipe.transformer.fuse_qkv_projections()
pipe.vae.fuse_qkv_projections()
pipe.transformer.to(memory_format=torch.channels_last)
pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True)
pipe.vae.to(memory_format=torch.channels_last)
pipe.vae = torch.compile(pipe.vae, mode="max-autotune", fullgraph=True)
return pipe
def load_pipeline_autoquant(repo_id: str, dtype: torch.dtype) -> Any:
pipe = FluxPipeline.from_pretrained(repo_id, torch_dtype=dtype)
pipe.transformer.fuse_qkv_projections()
pipe.vae.fuse_qkv_projections()
pipe.transformer.to(memory_format=torch.channels_last)
pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True)
pipe.vae.to(memory_format=torch.channels_last)
pipe.vae = torch.compile(pipe.vae, mode="max-autotune", fullgraph=True)
pipe.transformer = autoquant(pipe.transformer, error_on_unseen=False)
pipe.vae = autoquant(pipe.vae, error_on_unseen=False)
return pipe
def load_pipeline_turbo(repo_id: str, dtype: torch.dtype) -> Any:
pipe = FluxPipeline.from_pretrained(repo_id, torch_dtype=dtype)
pipe.load_lora_weights(hf_hub_download("ByteDance/Hyper-SD", "Hyper-FLUX.1-dev-8steps-lora.safetensors"), adapter_name="hyper-sd")
pipe.set_adapters(["hyper-sd"], adapter_weights=[0.125])
pipe.fuse_lora()
pipe.unload_lora_weights()
pipe.transformer.fuse_qkv_projections()
pipe.vae.fuse_qkv_projections()
weight = int8_dynamic_activation_int4_weight() if IS_4BIT else int8_dynamic_activation_int8_weight()
quantize_(pipe.transformer, weight, device="cuda")
quantize_(pipe.vae, weight, device="cuda")
return pipe
def load_pipeline_turbo_compile(repo_id: str, dtype: torch.dtype) -> Any:
pipe = FluxPipeline.from_pretrained(repo_id, torch_dtype=dtype)
pipe.load_lora_weights(hf_hub_download("ByteDance/Hyper-SD", "Hyper-FLUX.1-dev-8steps-lora.safetensors"), adapter_name="hyper-sd")
pipe.set_adapters(["hyper-sd"], adapter_weights=[0.125])
pipe.fuse_lora()
pipe.unload_lora_weights()
pipe.transformer.fuse_qkv_projections()
pipe.vae.fuse_qkv_projections()
weight = int8_dynamic_activation_int4_weight() if IS_4BIT else int8_dynamic_activation_int8_weight()
quantize_(pipe.transformer, weight, device="cuda")
quantize_(pipe.vae, weight, device="cuda")
pipe.transformer.to(memory_format=torch.channels_last)
pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True)
pipe.vae.to(memory_format=torch.channels_last)
pipe.vae = torch.compile(pipe.vae, mode="max-autotune", fullgraph=True)
return pipe
def load_pipeline_opt(repo_id: str, dtype: torch.dtype) -> Any:
quantization_config = TorchAoConfig("int4dq" if IS_4BIT else "float8dq" if IS_CC90 else "int8wo")
weight = int8_dynamic_activation_int4_weight() if IS_4BIT else int8_dynamic_activation_int8_weight()
transformer = FluxTransformer2DModel.from_pretrained(repo_id, subfolder="transformer", torch_dtype=dtype)
transformer.fuse_qkv_projections()
if IS_CC90: quantize_(transformer, float8_dynamic_activation_float8_weight(granularity=PerRow()), device="cuda")
elif IS_CC89: quantize_(transformer, float8_dynamic_activation_float8_weight(), device="cuda")
else: quantize_(transformer, weight, device="cuda")
transformer.to(memory_format=torch.channels_last)
transformer = torch.compile(transformer, mode="max-autotune", fullgraph=True)
vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype)
vae.fuse_qkv_projections()
if IS_CC90: quantize_(vae, float8_dynamic_activation_float8_weight(granularity=PerRow()), device="cuda")
elif IS_CC89: quantize_(vae, float8_dynamic_activation_float8_weight(), device="cuda")
else: quantize_(vae, weight, device="cuda")
vae.to(memory_format=torch.channels_last)
vae = torch.compile(vae, mode="max-autotune", fullgraph=True)
pipe = FluxPipeline.from_pretrained(repo_id, transformer=None, vae=None, torch_dtype=dtype, quantization_config=quantization_config)
pipe.transformer = transformer
pipe.vae = vae
return pipe
class EndpointHandler:
def __init__(self, path=""):
repo_id = "NoMoreCopyrightOrg/flux-dev-8step" if IS_TURBO else "NoMoreCopyrightOrg/flux-dev"
self.repo_id = repo_id
dtype = torch.bfloat16
#dtype = torch.float16 # for older nVidia GPUs
print_vram()
print("Loading pipeline...")
if IS_AUTOQ: self.pipeline = load_pipeline_autoquant(repo_id, dtype)
elif IS_COMPILE: self.pipeline = load_pipeline_opt(repo_id, dtype)
elif IS_LVRAM and IS_CC89: self.pipeline = load_pipeline_lowvram(repo_id, dtype)
else: self.pipeline = load_pipeline_stable(repo_id, dtype)
if IS_PARA: apply_cache_on_pipe(self.pipeline, residual_diff_threshold=0.12)
gc.collect()
torch.cuda.empty_cache()
self.pipeline.enable_vae_slicing()
self.pipeline.enable_vae_tiling()
self.pipeline.to("cuda")
print_vram()
def __call__(self, data: Dict[str, Any]) -> Image.Image:
logger.info(f"Received incoming request with {data=}")
if "inputs" in data and isinstance(data["inputs"], str):
prompt = data.pop("inputs")
elif "prompt" in data and isinstance(data["prompt"], str):
prompt = data.pop("prompt")
else:
raise ValueError(
"Provided input body must contain either the key `inputs` or `prompt` with the"
" prompt to use for the image generation, and it needs to be a non-empty string."
)
parameters = data.pop("parameters", {})
num_inference_steps = parameters.get("num_inference_steps", 8 if IS_TURBO else 28)
width = parameters.get("width", 1024)
height = parameters.get("height", 1024)
guidance_scale = parameters.get("guidance_scale", 3.5)
# seed generator (seed cannot be provided as is but via a generator)
seed = parameters.get("seed", 0)
generator = torch.manual_seed(seed)
start = time.time()
image = self.pipeline( # type: ignore
prompt,
height=height,
width=width,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
generator=generator,
output_type="pil",
).images[0]
end = time.time()
print(f'Elapsed {end - start:.3f} sec. / prompt:"{prompt}" / size:{width}x{height} / steps:{num_inference_steps} / guidance scale:{guidance_scale} / seed:{seed}')
return pil_to_base64(image, self.repo_id, prompt, height, width, num_inference_steps, guidance_scale, seed)