# https://github.com/sayakpaul/diffusers-torchao # https://github.com/pytorch/ao/releases # https://developer.nvidia.com/cuda-gpus import os from typing import Any, Dict import gc import time from PIL import Image from huggingface_hub import hf_hub_download import torch from torchao.quantization import quantize_, autoquant, int8_dynamic_activation_int8_weight, int8_dynamic_activation_int4_weight, float8_dynamic_activation_float8_weight from torchao.quantization.quant_api import PerRow from diffusers import FluxPipeline, FluxTransformer2DModel, AutoencoderKL, TorchAoConfig from transformers import T5EncoderModel, BitsAndBytesConfig from optimum.quanto import freeze, qfloat8, quantize from para_attn.first_block_cache.diffusers_adapters import apply_cache_on_pipe from huggingface_inference_toolkit.logging import logger import subprocess subprocess.run("pip list", shell=True) print("device name:", torch.cuda.get_device_name()) print("device capability:", torch.cuda.get_device_capability()) IS_TURBO = False IS_4BIT = False IS_PARA = True IS_LVRAM = False IS_COMPILE = True IS_AUTOQ = False IS_CC90 = True if torch.cuda.get_device_capability() >= (9, 0) else False IS_CC89 = True if torch.cuda.get_device_capability() >= (8, 9) else False # Set high precision for float32 matrix multiplications. # This setting optimizes performance on NVIDIA GPUs with Ampere architecture (e.g., A100, RTX 30 series) or newer. torch.set_float32_matmul_precision("high") if IS_COMPILE: import torch._dynamo torch._dynamo.config.suppress_errors = True def print_vram(): free = torch.cuda.mem_get_info()[0] / (1024 ** 3) total = torch.cuda.mem_get_info()[1] / (1024 ** 3) print(f"VRAM: {total - free:.2f}/{total:.2f}GB") def pil_to_base64(image: Image.Image, modelname: str, prompt: str, height: int, width: int, steps: int, cfg: float, seed: int) -> str: import base64 from io import BytesIO import json from PIL import PngImagePlugin metadata = {"prompt": prompt, "num_inference_steps": steps, "guidance_scale": cfg, "seed": seed, "resolution": f"{width} x {height}", "Model": {"Model": modelname.split("/")[-1]}} info = PngImagePlugin.PngInfo() info.add_text("metadata", json.dumps(metadata)) buffered = BytesIO() image.save(buffered, "PNG", pnginfo=info) return base64.b64encode(buffered.getvalue()).decode('ascii') def load_te2(repo_id: str, dtype: torch.dtype) -> Any: if IS_4BIT: nf4_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=torch.bfloat16) text_encoder_2 = T5EncoderModel.from_pretrained(repo_id, subfolder="text_encoder_2", torch_dtype=dtype, quantization_config=nf4_config) else: text_encoder_2 = T5EncoderModel.from_pretrained(repo_id, subfolder="text_encoder_2", torch_dtype=dtype) quantize(text_encoder_2, weights=qfloat8) freeze(text_encoder_2) return text_encoder_2 def load_pipeline_stable(repo_id: str, dtype: torch.dtype) -> Any: quantization_config = TorchAoConfig("int4dq" if IS_4BIT else "float8dq" if IS_CC90 else "int8wo") vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype) pipe = FluxPipeline.from_pretrained(repo_id, vae=vae, text_encoder_2=load_te2(repo_id, dtype), torch_dtype=dtype, quantization_config=quantization_config) pipe.transformer.fuse_qkv_projections() pipe.vae.fuse_qkv_projections() return pipe def load_pipeline_lowvram(repo_id: str, dtype: torch.dtype) -> Any: int4_config = TorchAoConfig("int4dq") float8_config = TorchAoConfig("float8dq") vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype) transformer = FluxTransformer2DModel.from_pretrained(repo_id, subfolder="transformer", torch_dtype=dtype, quantization_config=float8_config) pipe = FluxPipeline.from_pretrained(repo_id, vae=None, transformer=None, text_encoder_2=load_te2(repo_id, dtype), torch_dtype=dtype, quantization_config=int4_config) pipe.transformer = transformer pipe.vae = vae #pipe.transformer.fuse_qkv_projections() #pipe.vae.fuse_qkv_projections() pipe.to("cuda") return pipe def load_pipeline_compile(repo_id: str, dtype: torch.dtype) -> Any: quantization_config = TorchAoConfig("int4dq" if IS_4BIT else "float8dq" if IS_CC90 else "int8wo") vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype) pipe = FluxPipeline.from_pretrained(repo_id, vae=vae, text_encoder_2=load_te2(repo_id, dtype), torch_dtype=dtype, quantization_config=quantization_config) pipe.transformer.fuse_qkv_projections() pipe.vae.fuse_qkv_projections() pipe.transformer.to(memory_format=torch.channels_last) pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True) pipe.vae.to(memory_format=torch.channels_last) pipe.vae = torch.compile(pipe.vae, mode="max-autotune", fullgraph=True) return pipe def load_pipeline_autoquant(repo_id: str, dtype: torch.dtype) -> Any: pipe = FluxPipeline.from_pretrained(repo_id, torch_dtype=dtype) pipe.transformer.fuse_qkv_projections() pipe.vae.fuse_qkv_projections() pipe.transformer.to(memory_format=torch.channels_last) pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True) pipe.vae.to(memory_format=torch.channels_last) pipe.vae = torch.compile(pipe.vae, mode="max-autotune", fullgraph=True) pipe.transformer = autoquant(pipe.transformer, error_on_unseen=False) pipe.vae = autoquant(pipe.vae, error_on_unseen=False) return pipe def load_pipeline_turbo(repo_id: str, dtype: torch.dtype) -> Any: pipe = FluxPipeline.from_pretrained(repo_id, torch_dtype=dtype) pipe.load_lora_weights(hf_hub_download("ByteDance/Hyper-SD", "Hyper-FLUX.1-dev-8steps-lora.safetensors"), adapter_name="hyper-sd") pipe.set_adapters(["hyper-sd"], adapter_weights=[0.125]) pipe.fuse_lora() pipe.unload_lora_weights() pipe.transformer.fuse_qkv_projections() pipe.vae.fuse_qkv_projections() weight = int8_dynamic_activation_int4_weight() if IS_4BIT else int8_dynamic_activation_int8_weight() quantize_(pipe.transformer, weight, device="cuda") quantize_(pipe.vae, weight, device="cuda") return pipe def load_pipeline_turbo_compile(repo_id: str, dtype: torch.dtype) -> Any: pipe = FluxPipeline.from_pretrained(repo_id, torch_dtype=dtype) pipe.load_lora_weights(hf_hub_download("ByteDance/Hyper-SD", "Hyper-FLUX.1-dev-8steps-lora.safetensors"), adapter_name="hyper-sd") pipe.set_adapters(["hyper-sd"], adapter_weights=[0.125]) pipe.fuse_lora() pipe.unload_lora_weights() pipe.transformer.fuse_qkv_projections() pipe.vae.fuse_qkv_projections() weight = int8_dynamic_activation_int4_weight() if IS_4BIT else int8_dynamic_activation_int8_weight() quantize_(pipe.transformer, weight, device="cuda") quantize_(pipe.vae, weight, device="cuda") pipe.transformer.to(memory_format=torch.channels_last) pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True) pipe.vae.to(memory_format=torch.channels_last) pipe.vae = torch.compile(pipe.vae, mode="max-autotune", fullgraph=True) return pipe def load_pipeline_opt(repo_id: str, dtype: torch.dtype) -> Any: quantization_config = TorchAoConfig("int4dq" if IS_4BIT else "float8dq" if IS_CC90 else "int8wo") weight = int8_dynamic_activation_int4_weight() if IS_4BIT else int8_dynamic_activation_int8_weight() transformer = FluxTransformer2DModel.from_pretrained(repo_id, subfolder="transformer", torch_dtype=dtype) transformer.fuse_qkv_projections() if IS_CC90: quantize_(transformer, float8_dynamic_activation_float8_weight(granularity=PerRow()), device="cuda") elif IS_CC89: quantize_(transformer, float8_dynamic_activation_float8_weight(), device="cuda") else: quantize_(transformer, weight, device="cuda") transformer.to(memory_format=torch.channels_last) transformer = torch.compile(transformer, mode="max-autotune", fullgraph=True) vae = AutoencoderKL.from_pretrained(repo_id, subfolder="vae", torch_dtype=dtype) vae.fuse_qkv_projections() if IS_CC90: quantize_(vae, float8_dynamic_activation_float8_weight(granularity=PerRow()), device="cuda") elif IS_CC89: quantize_(vae, float8_dynamic_activation_float8_weight(), device="cuda") else: quantize_(vae, weight, device="cuda") vae.to(memory_format=torch.channels_last) vae = torch.compile(vae, mode="max-autotune", fullgraph=True) pipe = FluxPipeline.from_pretrained(repo_id, transformer=None, vae=None, torch_dtype=dtype, quantization_config=quantization_config) pipe.transformer = transformer pipe.vae = vae return pipe class EndpointHandler: def __init__(self, path=""): repo_id = "NoMoreCopyrightOrg/flux-dev-8step" if IS_TURBO else "NoMoreCopyrightOrg/flux-dev" self.repo_id = repo_id dtype = torch.bfloat16 #dtype = torch.float16 # for older nVidia GPUs print_vram() print("Loading pipeline...") if IS_AUTOQ: self.pipeline = load_pipeline_autoquant(repo_id, dtype) elif IS_COMPILE: self.pipeline = load_pipeline_opt(repo_id, dtype) elif IS_LVRAM and IS_CC89: self.pipeline = load_pipeline_lowvram(repo_id, dtype) else: self.pipeline = load_pipeline_stable(repo_id, dtype) if IS_PARA: apply_cache_on_pipe(self.pipeline, residual_diff_threshold=0.12) gc.collect() torch.cuda.empty_cache() self.pipeline.enable_vae_slicing() self.pipeline.enable_vae_tiling() self.pipeline.to("cuda") print_vram() def __call__(self, data: Dict[str, Any]) -> Image.Image: logger.info(f"Received incoming request with {data=}") if "inputs" in data and isinstance(data["inputs"], str): prompt = data.pop("inputs") elif "prompt" in data and isinstance(data["prompt"], str): prompt = data.pop("prompt") else: raise ValueError( "Provided input body must contain either the key `inputs` or `prompt` with the" " prompt to use for the image generation, and it needs to be a non-empty string." ) parameters = data.pop("parameters", {}) num_inference_steps = parameters.get("num_inference_steps", 8 if IS_TURBO else 28) width = parameters.get("width", 1024) height = parameters.get("height", 1024) guidance_scale = parameters.get("guidance_scale", 3.5) # seed generator (seed cannot be provided as is but via a generator) seed = parameters.get("seed", 0) generator = torch.manual_seed(seed) start = time.time() image = self.pipeline( # type: ignore prompt, height=height, width=width, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, generator=generator, output_type="pil", ).images[0] end = time.time() print(f'Elapsed {end - start:.3f} sec. / prompt:"{prompt}" / size:{width}x{height} / steps:{num_inference_steps} / guidance scale:{guidance_scale} / seed:{seed}') return pil_to_base64(image, self.repo_id, prompt, height, width, num_inference_steps, guidance_scale, seed)