|
import spaces |
|
import torch |
|
import random |
|
import numpy as np |
|
from inspect import signature |
|
from diffusers import ( |
|
FluxPipeline, |
|
StableDiffusion3Pipeline, |
|
PixArtSigmaPipeline, |
|
SanaPipeline, |
|
AuraFlowPipeline, |
|
Kandinsky3Pipeline, |
|
HunyuanDiTPipeline, |
|
LuminaText2ImgPipeline,AutoPipelineForText2Image |
|
) |
|
import gradio as gr |
|
from diffusers.pipelines.pipeline_utils import DiffusionPipeline |
|
|
|
MAX_SEED = np.iinfo(np.int32).max |
|
MAX_IMAGE_SIZE = 1024 |
|
|
|
class ProgressPipeline(DiffusionPipeline): |
|
def __init__(self, original_pipeline): |
|
super().__init__() |
|
self.original_pipeline = original_pipeline |
|
|
|
for attr_name, attr_value in vars(original_pipeline).items(): |
|
setattr(self, attr_name, attr_value) |
|
|
|
@torch.no_grad() |
|
def __call__( |
|
self, |
|
prompt, |
|
num_inference_steps=30, |
|
generator=None, |
|
guidance_scale=7.5, |
|
callback=None, |
|
callback_steps=1, |
|
**kwargs |
|
): |
|
|
|
self._num_inference_steps = num_inference_steps |
|
self._step = 0 |
|
|
|
def progress_callback(step_index, timestep, callback_kwargs): |
|
if callback and step_index % callback_steps == 0: |
|
|
|
callback(self, step_index, timestep, callback_kwargs) |
|
return callback_kwargs |
|
|
|
|
|
original_step = self.original_pipeline.scheduler.step |
|
def wrapped_step(*args, **kwargs): |
|
self._step += 1 |
|
progress_callback(self._step, None, {}) |
|
return original_step(*args, **kwargs) |
|
|
|
self.original_pipeline.scheduler.step = wrapped_step |
|
|
|
try: |
|
|
|
result = self.original_pipeline( |
|
prompt=prompt, |
|
num_inference_steps=num_inference_steps, |
|
generator=generator, |
|
guidance_scale=guidance_scale, |
|
**kwargs |
|
) |
|
|
|
return result |
|
finally: |
|
|
|
self.original_pipeline.scheduler.step = original_step |
|
|
|
cache_dir = '/workspace/hf_cache' |
|
|
|
MODEL_CONFIGS = { |
|
"FLUX": { |
|
"repo_id": "black-forest-labs/FLUX.1-dev", |
|
"pipeline_class": FluxPipeline, |
|
}, |
|
"Stable Diffusion 3.5": { |
|
"repo_id": "stabilityai/stable-diffusion-3.5-large", |
|
"pipeline_class": StableDiffusion3Pipeline, |
|
|
|
}, |
|
"PixArt": { |
|
"repo_id": "PixArt-alpha/PixArt-Sigma-XL-2-1024-MS", |
|
"pipeline_class": PixArtSigmaPipeline, |
|
|
|
}, |
|
"SANA": { |
|
"repo_id": "Efficient-Large-Model/Sana_1600M_1024px_BF16_diffusers", |
|
"pipeline_class": SanaPipeline, |
|
|
|
}, |
|
"AuraFlow": { |
|
"repo_id": "fal/AuraFlow", |
|
"pipeline_class": AuraFlowPipeline, |
|
|
|
}, |
|
"Kandinsky": { |
|
"repo_id": "kandinsky-community/kandinsky-3", |
|
"pipeline_class": Kandinsky3Pipeline, |
|
|
|
}, |
|
"Hunyuan": { |
|
"repo_id": "Tencent-Hunyuan/HunyuanDiT-Diffusers", |
|
"pipeline_class": HunyuanDiTPipeline, |
|
|
|
}, |
|
"Lumina": { |
|
"repo_id": "Alpha-VLLM/Lumina-Next-SFT-diffusers", |
|
"pipeline_class": LuminaText2ImgPipeline, |
|
|
|
} |
|
} |
|
|
|
def generate_image_with_progress(model_name,pipe, prompt, num_steps, guidance_scale=3.5, seed=None,negative_prompt=None, randomize_seed=None, width=1024, height=1024, num_inference_steps=40, progress=gr.Progress(track_tqdm=True)): |
|
generator = None |
|
if randomize_seed: |
|
seed = random.randint(0, MAX_SEED) |
|
if seed is not None: |
|
generator = torch.Generator("cuda").manual_seed(seed) |
|
else: |
|
generator = torch.Generator("cuda") |
|
|
|
def callback(pipe, step_index, timestep, callback_kwargs): |
|
print(f" callback => {step_index}, {timestep}") |
|
if step_index is None: |
|
step_index = 0 |
|
cur_prg = step_index / num_steps |
|
progress(cur_prg, desc=f"Step {step_index}/{num_steps}") |
|
return callback_kwargs |
|
print(f"START GENR ") |
|
|
|
pipe_signature = signature(pipe) |
|
|
|
|
|
has_guidance_scale = "guidance_scale" in pipe_signature.parameters |
|
has_callback_on_step_end = "callback_on_step_end" in pipe_signature.parameters |
|
|
|
|
|
common_args = { |
|
"prompt": prompt, |
|
"num_inference_steps": num_steps, |
|
"negative_prompt": negative_prompt, |
|
"width": width, |
|
"height": height, |
|
"generator": generator, |
|
} |
|
|
|
if has_guidance_scale: |
|
common_args["guidance_scale"] = guidance_scale |
|
|
|
if has_callback_on_step_end: |
|
print("has callback_on_step_end and", "has guidance_scale" if has_guidance_scale else "NO guidance_scale") |
|
common_args["callback_on_step_end"] = callback |
|
else: |
|
print("NO callback_on_step_end and", "has guidance_scale" if has_guidance_scale else "NO guidance_scale") |
|
common_args["callback"] = callback |
|
common_args["callback_steps"] = 1 |
|
|
|
|
|
image = pipe(**common_args).images[0] |
|
|
|
return seed, image |
|
|
|
@spaces.GPU(duration=170) |
|
def create_pipeline_logic(prompt_text, model_name, negative_prompt="", seed=42, randomize_seed=False, width=1024, height=1024, guidance_scale=4.5, num_inference_steps=40,): |
|
print(f"starting {model_name}") |
|
progress = gr.Progress(track_tqdm=True) |
|
config = MODEL_CONFIGS[model_name] |
|
pipe_class = config["pipeline_class"] |
|
pipe = None |
|
b_pipe = AutoPipelineForText2Image.from_pretrained( |
|
config["repo_id"], |
|
|
|
|
|
torch_dtype=torch.bfloat16 |
|
).to("cuda") |
|
pipe_signature = signature(b_pipe) |
|
|
|
has_callback_on_step_end = "callback_on_step_end" in pipe_signature.parameters |
|
if not has_callback_on_step_end: |
|
pipe = ProgressPipeline(b_pipe) |
|
print("ProgressPipeline specal") |
|
else: |
|
pipe = b_pipe |
|
|
|
gen_seed,image = generate_image_with_progress( |
|
model_name,pipe, prompt_text, num_steps=num_inference_steps, guidance_scale=guidance_scale, seed=seed,negative_prompt = negative_prompt, randomize_seed = randomize_seed, width = width, height = height, progress=progress |
|
) |
|
return f"Seed: {gen_seed}", image |
|
|
|
def main(): |
|
with gr.Blocks() as app: |
|
gr.Markdown("# Dynamic Multiple Model Image Generation") |
|
|
|
prompt_text = gr.Textbox(label="Enter prompt") |
|
|
|
with gr.Accordion("Advanced Settings", open=False): |
|
negative_prompt = gr.Text( |
|
label="Negative prompt", |
|
max_lines=1, |
|
placeholder="Enter a negative prompt", |
|
) |
|
|
|
seed = gr.Slider( |
|
label="Seed", |
|
minimum=0, |
|
maximum=MAX_SEED, |
|
step=100, |
|
value=0, |
|
) |
|
|
|
randomize_seed = gr.Checkbox(label="Randomize seed", value=True) |
|
|
|
with gr.Row(): |
|
width = gr.Slider( |
|
label="Width", |
|
minimum=512, |
|
maximum=MAX_IMAGE_SIZE, |
|
step=32, |
|
value=1024, |
|
) |
|
height = gr.Slider( |
|
label="Height", |
|
minimum=512, |
|
maximum=MAX_IMAGE_SIZE, |
|
step=32, |
|
value=1024, |
|
) |
|
|
|
with gr.Row(): |
|
guidance_scale = gr.Slider( |
|
label="Guidance scale", |
|
minimum=0.0, |
|
maximum=7.5, |
|
step=0.1, |
|
value=4.5, |
|
) |
|
num_inference_steps = gr.Slider( |
|
label="Number of inference steps", |
|
minimum=1, |
|
maximum=50, |
|
step=1, |
|
value=40, |
|
) |
|
|
|
for model_name, config in MODEL_CONFIGS.items(): |
|
with gr.Tab(model_name): |
|
button = gr.Button(f"Run {model_name}") |
|
output = gr.Textbox(label="Status") |
|
img = gr.Image(label=model_name, height=300) |
|
|
|
button.click(fn=create_pipeline_logic, inputs=[prompt_text, gr.Text(value= model_name,visible=False), negative_prompt, |
|
seed, |
|
randomize_seed, |
|
width, |
|
height, |
|
guidance_scale, |
|
num_inference_steps], outputs=[output, img]) |
|
|
|
app.launch() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|