import spaces import time import os import gradio as gr import torch from einops import rearrange from PIL import Image from transformers import pipeline from flux.cli import SamplingOptions from flux.sampling import denoise, get_noise, get_schedule, prepare, unpack from flux.util import load_ae, load_clip, load_flow_model, load_t5 from pulid.pipeline_flux import PuLIDPipeline from pulid.utils import resize_numpy_image_long # 한영 번역 모델 로드 translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ko-en") def get_models(name: str, device: torch.device, offload: bool): t5 = load_t5(device, max_length=128) clip = load_clip(device) model = load_flow_model(name, device="cpu" if offload else device) model.eval() ae = load_ae(name, device="cpu" if offload else device) return model, ae, t5, clip class FluxGenerator: def __init__(self): self.device = torch.device('cuda') self.offload = False self.model_name = 'flux-dev' self.model, self.ae, self.t5, self.clip = get_models( self.model_name, device=self.device, offload=self.offload, ) self.pulid_model = PuLIDPipeline(self.model, 'cuda', weight_dtype=torch.bfloat16) self.pulid_model.load_pretrain() flux_generator = FluxGenerator() @spaces.GPU @torch.inference_mode() def generate_image( width, height, num_steps, start_step, guidance, seed, prompt, id_image=None, id_weight=1.0, neg_prompt="", true_cfg=1.0, timestep_to_start_cfg=1, max_sequence_length=128, ): # 한글 프롬프트를 영어로 번역 if any('\u3131' <= c <= '\u318E' or '\uAC00' <= c <= '\uD7A3' for c in prompt): translated = translator(prompt)[0]['translation_text'] prompt = translated flux_generator.t5.max_length = max_sequence_length seed = int(seed) if seed == -1: seed = None opts = SamplingOptions( prompt=prompt, width=width, height=height, num_steps=num_steps, guidance=guidance, seed=seed, ) if opts.seed is None: opts.seed = torch.Generator(device="cpu").seed() t0 = time.perf_counter() use_true_cfg = abs(true_cfg - 1.0) > 1e-2 if id_image is not None: id_image = resize_numpy_image_long(id_image, 1024) id_embeddings, uncond_id_embeddings = flux_generator.pulid_model.get_id_embedding(id_image, cal_uncond=use_true_cfg) else: id_embeddings = None uncond_id_embeddings = None # prepare input x = get_noise( 1, opts.height, opts.width, device=flux_generator.device, dtype=torch.bfloat16, seed=opts.seed, ) timesteps = get_schedule( opts.num_steps, x.shape[-1] * x.shape[-2] // 4, shift=True, ) if flux_generator.offload: flux_generator.t5, flux_generator.clip = flux_generator.t5.to(flux_generator.device), flux_generator.clip.to(flux_generator.device) inp = prepare(t5=flux_generator.t5, clip=flux_generator.clip, img=x, prompt=opts.prompt) inp_neg = prepare(t5=flux_generator.t5, clip=flux_generator.clip, img=x, prompt=neg_prompt) if use_true_cfg else None # offload TEs to CPU, load model to gpu if flux_generator.offload: flux_generator.t5, flux_generator.clip = flux_generator.t5.cpu(), flux_generator.clip.cpu() torch.cuda.empty_cache() flux_generator.model = flux_generator.model.to(flux_generator.device) # denoise initial noise x = denoise( flux_generator.model, **inp, timesteps=timesteps, guidance=opts.guidance, id=id_embeddings, id_weight=id_weight, start_step=start_step, uncond_id=uncond_id_embeddings, true_cfg=true_cfg, timestep_to_start_cfg=timestep_to_start_cfg, neg_txt=inp_neg["txt"] if use_true_cfg else None, neg_txt_ids=inp_neg["txt_ids"] if use_true_cfg else None, neg_vec=inp_neg["vec"] if use_true_cfg else None, ) # offload model, load autoencoder to gpu if flux_generator.offload: flux_generator.model.cpu() torch.cuda.empty_cache() flux_generator.ae.decoder.to(x.device) # decode latents to pixel space x = unpack(x.float(), opts.height, opts.width) with torch.autocast(device_type=flux_generator.device.type, dtype=torch.bfloat16): x = flux_generator.ae.decode(x) if flux_generator.offload: flux_generator.ae.decoder.cpu() torch.cuda.empty_cache() t1 = time.perf_counter() # bring into PIL format x = x.clamp(-1, 1) x = rearrange(x[0], "c h w -> h w c") img = Image.fromarray((127.5 * (x + 1.0)).cpu().byte().numpy()) return img, str(opts.seed), flux_generator.pulid_model.debug_img_list css = """ footer { visibility: hidden; } """ def create_demo(args, model_name: str, device: str = "cuda" if torch.cuda.is_available() else "cpu", offload: bool = False): with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css) as demo: gr.Markdown("# 제목") gr.Markdown("## 설명") with gr.Row(): with gr.Column(): prompt = gr.Textbox(label="프롬프트", value="초상화, 색감, 영화적") id_image = gr.Image(label="ID 이미지", source="webcam", type="numpy") generate_btn = gr.Button("생성") with gr.Column(): output_image = gr.Image(label="생성된 이미지") with gr.Row(): with gr.Column(): gr.Markdown("## 예시") all_examples = [ ['여자가 \"PuLID for FLUX\"라고 쓰인 빛나는 녹색 표지판을 들고 있다', 'example_inputs/liuyifei.png'], ['옆모습 초상화', 'example_inputs/liuyifei.png'], ['VR 기술 분위기의 흰 머리 여성', 'example_inputs/liuyifei.png'], ['어린 아이가 아이스크림을 먹고 있다', 'example_inputs/liuyifei.png'], ['남자가 \"PuLID for FLUX\"라고 쓰인 표지판을 들고 있다, 겨울, 눈 내림, 산 정상', 'example_inputs/pengwei.jpg'], ['초상화, 촛불 조명', 'example_inputs/pengwei.jpg'], ['25세 남성의 어두운 프로필 사진, 입에서 연기가 나오고 있음', 'example_inputs/pengwei.jpg'], ['미국 만화 스타일, 소년 1명', 'example_inputs/pengwei.jpg'], ['초상화, 픽사 스타일', 'example_inputs/pengwei.jpg'], ['초상화, 얼음 조각상', 'example_inputs/lecun.jpg'], ] example_images = [example[1] for example in all_examples] example_captions = [example[0] for example in all_examples] gallery = gr.Gallery( value=list(zip(example_images, example_captions)), label="예시 갤러리", show_label=False, elem_id="gallery", columns=5, rows=2, object_fit="contain", height="auto" ) def fill_example(evt: gr.SelectData): return [all_examples[evt.index][i] for i in [0, 1]] gallery.select( fill_example, None, [prompt, id_image], ) generate_btn.click( fn=generate_image, inputs=[ gr.Slider(256, 1536, 896, step=16, visible=False), # width gr.Slider(256, 1536, 1152, step=16, visible=False), # height gr.Slider(1, 20, 20, step=1, visible=False), # num_steps gr.Slider(0, 10, 0, step=1, visible=False), # start_step gr.Slider(1.0, 10.0, 4, step=0.1, visible=False), # guidance gr.Textbox(-1, visible=False), # seed prompt, id_image, gr.Slider(0.0, 3.0, 1, step=0.05, visible=False), # id_weight gr.Textbox("저품질, 최악의 품질, 텍스트, 서명, 워터마크, 여분의 팔다리", visible=False), # neg_prompt gr.Slider(1.0, 10.0, 1, step=0.1, visible=False), # true_cfg gr.Slider(0, 20, 1, step=1, visible=False), # timestep_to_start_cfg gr.Slider(128, 512, 128, step=128, visible=False), # max_sequence_length ], outputs=[output_image], ) return demo if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="PuLID for FLUX.1-dev") parser.add_argument("--name", type=str, default="flux-dev", choices=list('flux-dev'), help="현재는 flux-dev만 지원합니다") parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu", help="사용할 디바이스") parser.add_argument("--offload", action="store_true", help="사용하지 않을 때 모델을 CPU로 옮깁니다") parser.add_argument("--port", type=int, default=8080, help="사용할 포트") parser.add_argument("--dev", action='store_true', help="개발 모드") parser.add_argument("--pretrained_model", type=str, help='개발용') args = parser.parse_args() import huggingface_hub huggingface_hub.login(os.getenv('HF_TOKEN')) demo = create_demo(args, args.name, args.device, args.offload) demo.launch()