|
from typing import Any |
|
from diffusers import LCMScheduler |
|
import torch |
|
from backend.models.lcmdiffusion_setting import LCMDiffusionSetting |
|
import numpy as np |
|
from constants import DEVICE |
|
from backend.models.lcmdiffusion_setting import LCMLora |
|
from utils_sd import register_normal_pipeline, register_faster_forward, register_parallel_pipeline, seed_everything |
|
|
|
from backend.device import is_openvino_device |
|
from backend.openvino.pipelines import ( |
|
get_ov_text_to_image_pipeline, |
|
ov_load_taesd, |
|
get_ov_image_to_image_pipeline, |
|
) |
|
from backend.pipelines.lcm import ( |
|
get_lcm_model_pipeline, |
|
load_taesd, |
|
get_image_to_image_pipeline, |
|
) |
|
from backend.pipelines.lcm_lora import get_lcm_lora_pipeline |
|
from backend.models.lcmdiffusion_setting import DiffusionTask |
|
from image_ops import resize_pil_image |
|
from math import ceil |
|
|
|
|
|
class LCMTextToImage: |
|
def __init__( |
|
self, |
|
device: str = "cpu", |
|
) -> None: |
|
self.pipeline = None |
|
self.use_openvino = False |
|
self.device = "" |
|
self.previous_model_id = None |
|
self.previous_use_tae_sd = False |
|
self.previous_use_lcm_lora = False |
|
self.previous_ov_model_id = "" |
|
self.previous_safety_checker = False |
|
self.previous_use_openvino = False |
|
self.img_to_img_pipeline = None |
|
self.is_openvino_init = False |
|
self.task_type = DiffusionTask.text_to_image |
|
self.torch_data_type = ( |
|
torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16 |
|
) |
|
print(f"Torch datatype : {self.torch_data_type}") |
|
|
|
def _pipeline_to_device(self): |
|
print(f"Pipeline device : {DEVICE}") |
|
print(f"Pipeline dtype : {self.torch_data_type}") |
|
self.pipeline.to( |
|
torch_device=DEVICE, |
|
torch_dtype=self.torch_data_type, |
|
) |
|
|
|
def _add_freeu(self): |
|
pipeline_class = self.pipeline.__class__.__name__ |
|
if isinstance(self.pipeline.scheduler, LCMScheduler): |
|
if pipeline_class == "StableDiffusionPipeline": |
|
print("Add FreeU - SD") |
|
self.pipeline.enable_freeu( |
|
s1=0.9, |
|
s2=0.2, |
|
b1=1.2, |
|
b2=1.4, |
|
) |
|
elif pipeline_class == "StableDiffusionXLPipeline": |
|
print("Add FreeU - SDXL") |
|
self.pipeline.enable_freeu( |
|
s1=0.6, |
|
s2=0.4, |
|
b1=1.1, |
|
b2=1.2, |
|
) |
|
|
|
def _update_lcm_scheduler_params(self): |
|
if isinstance(self.pipeline.scheduler, LCMScheduler): |
|
self.pipeline.scheduler = LCMScheduler.from_config( |
|
self.pipeline.scheduler.config, |
|
beta_start=0.001, |
|
beta_end=0.01, |
|
) |
|
|
|
def make_even(self,num): |
|
if num % 2 == 0: |
|
return num |
|
else: |
|
print("取整了") |
|
return num+1 |
|
|
|
def init( |
|
self, |
|
device: str = "cpu", |
|
lcm_diffusion_setting: LCMDiffusionSetting = LCMDiffusionSetting(), |
|
) -> None: |
|
self.device = device |
|
self.use_openvino = lcm_diffusion_setting.use_openvino |
|
model_id = lcm_diffusion_setting.lcm_model_id |
|
use_local_model = lcm_diffusion_setting.use_offline_model |
|
use_tiny_auto_encoder = lcm_diffusion_setting.use_tiny_auto_encoder |
|
use_lora = lcm_diffusion_setting.use_lcm_lora |
|
lcm_lora: LCMLora = lcm_diffusion_setting.lcm_lora |
|
ov_model_id = lcm_diffusion_setting.openvino_lcm_model_id |
|
|
|
if lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value: |
|
w, h = lcm_diffusion_setting.init_image.size |
|
newW = lcm_diffusion_setting.image_width |
|
newH = self.make_even(int(h * newW / w)) |
|
lcm_diffusion_setting.image_height=newH |
|
img = lcm_diffusion_setting.init_image.resize((newW, newH)) |
|
print("新图",newH,newW, lcm_diffusion_setting.image_height) |
|
lcm_diffusion_setting.init_image = resize_pil_image( |
|
img, |
|
lcm_diffusion_setting.image_width, |
|
lcm_diffusion_setting.image_height, |
|
) |
|
print("图片大小",lcm_diffusion_setting.init_image) |
|
|
|
if ( |
|
self.pipeline is None |
|
or self.previous_model_id != model_id |
|
or self.previous_use_tae_sd != use_tiny_auto_encoder |
|
or self.previous_lcm_lora_base_id != lcm_lora.base_model_id |
|
or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id |
|
or self.previous_use_lcm_lora != use_lora |
|
or self.previous_ov_model_id != ov_model_id |
|
or self.previous_safety_checker != lcm_diffusion_setting.use_safety_checker |
|
or self.previous_use_openvino != lcm_diffusion_setting.use_openvino |
|
or self.previous_task_type != lcm_diffusion_setting.diffusion_task |
|
): |
|
if self.use_openvino and is_openvino_device(): |
|
if self.pipeline: |
|
del self.pipeline |
|
self.pipeline = None |
|
self.is_openvino_init = True |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
print(f"***** Init Text to image (OpenVINO) - {ov_model_id} *****") |
|
self.pipeline = get_ov_text_to_image_pipeline( |
|
ov_model_id, |
|
use_local_model, |
|
) |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
print(f"***** Image to image (OpenVINO) - {ov_model_id} *****") |
|
self.pipeline = get_ov_image_to_image_pipeline( |
|
ov_model_id, |
|
use_local_model, |
|
) |
|
else: |
|
if self.pipeline: |
|
del self.pipeline |
|
self.pipeline = None |
|
if self.img_to_img_pipeline: |
|
del self.img_to_img_pipeline |
|
self.img_to_img_pipeline = None |
|
|
|
if use_lora: |
|
print( |
|
f"***** Init LCM-LoRA pipeline - {lcm_lora.base_model_id} *****" |
|
) |
|
self.pipeline = get_lcm_lora_pipeline( |
|
lcm_lora.base_model_id, |
|
lcm_lora.lcm_lora_id, |
|
use_local_model, |
|
torch_data_type=self.torch_data_type, |
|
) |
|
else: |
|
print(f"***** Init LCM Model pipeline - {model_id} *****") |
|
self.pipeline = get_lcm_model_pipeline( |
|
model_id, |
|
use_local_model, |
|
) |
|
|
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
self.img_to_img_pipeline = get_image_to_image_pipeline( |
|
self.pipeline |
|
) |
|
self._pipeline_to_device() |
|
|
|
if use_tiny_auto_encoder: |
|
if self.use_openvino and is_openvino_device(): |
|
print("Using Tiny Auto Encoder (OpenVINO)") |
|
ov_load_taesd( |
|
self.pipeline, |
|
use_local_model, |
|
) |
|
else: |
|
print("Using Tiny Auto Encoder") |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
load_taesd( |
|
self.pipeline, |
|
use_local_model, |
|
self.torch_data_type, |
|
) |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
load_taesd( |
|
self.img_to_img_pipeline, |
|
use_local_model, |
|
self.torch_data_type, |
|
) |
|
|
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
and lcm_diffusion_setting.use_openvino |
|
): |
|
self.pipeline.scheduler = LCMScheduler.from_config( |
|
self.pipeline.scheduler.config, |
|
) |
|
else: |
|
self._update_lcm_scheduler_params() |
|
|
|
if use_lora: |
|
self._add_freeu() |
|
|
|
self.previous_model_id = model_id |
|
self.previous_ov_model_id = ov_model_id |
|
self.previous_use_tae_sd = use_tiny_auto_encoder |
|
self.previous_lcm_lora_base_id = lcm_lora.base_model_id |
|
self.previous_lcm_lora_id = lcm_lora.lcm_lora_id |
|
self.previous_use_lcm_lora = use_lora |
|
self.previous_safety_checker = lcm_diffusion_setting.use_safety_checker |
|
self.previous_use_openvino = lcm_diffusion_setting.use_openvino |
|
self.previous_task_type = lcm_diffusion_setting.diffusion_task |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
print(f"Pipeline : {self.pipeline}") |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
if self.use_openvino and is_openvino_device(): |
|
print(f"Pipeline : {self.pipeline}") |
|
else: |
|
print(f"Pipeline : {self.img_to_img_pipeline}") |
|
|
|
def generate( |
|
self, |
|
lcm_diffusion_setting: LCMDiffusionSetting, |
|
reshape: bool = False, |
|
) -> Any: |
|
guidance_scale = lcm_diffusion_setting.guidance_scale |
|
img_to_img_inference_steps = lcm_diffusion_setting.inference_steps |
|
check_step_value = int( |
|
lcm_diffusion_setting.inference_steps * lcm_diffusion_setting.strength |
|
) |
|
if ( |
|
lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value |
|
and check_step_value < 1 |
|
): |
|
img_to_img_inference_steps = ceil(1 / lcm_diffusion_setting.strength) |
|
print( |
|
f"Strength: {lcm_diffusion_setting.strength},{img_to_img_inference_steps}" |
|
) |
|
|
|
if lcm_diffusion_setting.use_seed: |
|
cur_seed = lcm_diffusion_setting.seed |
|
if self.use_openvino: |
|
np.random.seed(cur_seed) |
|
else: |
|
torch.manual_seed(cur_seed) |
|
|
|
is_openvino_pipe = lcm_diffusion_setting.use_openvino and is_openvino_device() |
|
if is_openvino_pipe: |
|
print("Using OpenVINO") |
|
if reshape and not self.is_openvino_init: |
|
print("Reshape and compile") |
|
self.pipeline.reshape( |
|
batch_size=-1, |
|
height=lcm_diffusion_setting.image_height, |
|
width=lcm_diffusion_setting.image_width, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
) |
|
self.pipeline.compile() |
|
|
|
if self.is_openvino_init: |
|
self.is_openvino_init = False |
|
|
|
if not lcm_diffusion_setting.use_safety_checker: |
|
self.pipeline.safety_checker = None |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
and not is_openvino_pipe |
|
): |
|
self.img_to_img_pipeline.safety_checker = None |
|
|
|
if ( |
|
not lcm_diffusion_setting.use_lcm_lora |
|
and not lcm_diffusion_setting.use_openvino |
|
and lcm_diffusion_setting.guidance_scale != 1.0 |
|
): |
|
print("Not using LCM-LoRA so setting guidance_scale 1.0") |
|
guidance_scale = 1.0 |
|
|
|
if lcm_diffusion_setting.use_openvino: |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
result_images = self.pipeline( |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=lcm_diffusion_setting.inference_steps, |
|
guidance_scale=guidance_scale, |
|
width=lcm_diffusion_setting.image_width, |
|
height=lcm_diffusion_setting.image_height, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
).images |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
register_parallel_pipeline(self.pipeline) |
|
register_faster_forward(self.pipeline.unet) |
|
|
|
result_images = self.pipeline.call( |
|
image=lcm_diffusion_setting.init_image, |
|
strength=lcm_diffusion_setting.strength, |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=img_to_img_inference_steps * 3, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
).images |
|
|
|
else: |
|
if ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.text_to_image.value |
|
): |
|
result_images = self.pipeline( |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=lcm_diffusion_setting.inference_steps, |
|
guidance_scale=guidance_scale, |
|
width=lcm_diffusion_setting.image_width, |
|
height=lcm_diffusion_setting.image_height, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
).images |
|
elif ( |
|
lcm_diffusion_setting.diffusion_task |
|
== DiffusionTask.image_to_image.value |
|
): |
|
result_images = self.img_to_img_pipeline( |
|
image=lcm_diffusion_setting.init_image, |
|
strength=lcm_diffusion_setting.strength, |
|
prompt=lcm_diffusion_setting.prompt, |
|
negative_prompt=lcm_diffusion_setting.negative_prompt, |
|
num_inference_steps=img_to_img_inference_steps, |
|
guidance_scale=guidance_scale, |
|
width=lcm_diffusion_setting.image_width, |
|
height=lcm_diffusion_setting.image_height, |
|
num_images_per_prompt=lcm_diffusion_setting.number_of_images, |
|
).images |
|
|
|
return result_images |
|
|