from typing import Any from diffusers import LCMScheduler import torch from backend.models.lcmdiffusion_setting import LCMDiffusionSetting import numpy as np from constants import DEVICE from backend.models.lcmdiffusion_setting import LCMLora from utils_sd import register_normal_pipeline, register_faster_forward, register_parallel_pipeline, seed_everything # 1.import package from backend.device import is_openvino_device from backend.openvino.pipelines import ( get_ov_text_to_image_pipeline, ov_load_taesd, get_ov_image_to_image_pipeline, ) from backend.pipelines.lcm import ( get_lcm_model_pipeline, load_taesd, get_image_to_image_pipeline, ) from backend.pipelines.lcm_lora import get_lcm_lora_pipeline from backend.models.lcmdiffusion_setting import DiffusionTask from image_ops import resize_pil_image from math import ceil class LCMTextToImage: def __init__( self, device: str = "cpu", ) -> None: self.pipeline = None self.use_openvino = False self.device = "" self.previous_model_id = None self.previous_use_tae_sd = False self.previous_use_lcm_lora = False self.previous_ov_model_id = "" self.previous_safety_checker = False self.previous_use_openvino = False self.img_to_img_pipeline = None self.is_openvino_init = False self.task_type = DiffusionTask.text_to_image self.torch_data_type = ( torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16 ) print(f"Torch datatype : {self.torch_data_type}") def _pipeline_to_device(self): print(f"Pipeline device : {DEVICE}") print(f"Pipeline dtype : {self.torch_data_type}") self.pipeline.to( torch_device=DEVICE, torch_dtype=self.torch_data_type, ) def _add_freeu(self): pipeline_class = self.pipeline.__class__.__name__ if isinstance(self.pipeline.scheduler, LCMScheduler): if pipeline_class == "StableDiffusionPipeline": print("Add FreeU - SD") self.pipeline.enable_freeu( s1=0.9, s2=0.2, b1=1.2, b2=1.4, ) elif pipeline_class == "StableDiffusionXLPipeline": print("Add FreeU - SDXL") self.pipeline.enable_freeu( s1=0.6, s2=0.4, b1=1.1, b2=1.2, ) def _update_lcm_scheduler_params(self): if isinstance(self.pipeline.scheduler, LCMScheduler): self.pipeline.scheduler = LCMScheduler.from_config( self.pipeline.scheduler.config, beta_start=0.001, beta_end=0.01, ) def make_even(self,num): if num % 2 == 0: return num else: print("取整了") return num+1 def init( self, device: str = "cpu", lcm_diffusion_setting: LCMDiffusionSetting = LCMDiffusionSetting(), ) -> None: self.device = device self.use_openvino = lcm_diffusion_setting.use_openvino model_id = lcm_diffusion_setting.lcm_model_id use_local_model = lcm_diffusion_setting.use_offline_model use_tiny_auto_encoder = lcm_diffusion_setting.use_tiny_auto_encoder use_lora = lcm_diffusion_setting.use_lcm_lora lcm_lora: LCMLora = lcm_diffusion_setting.lcm_lora ov_model_id = lcm_diffusion_setting.openvino_lcm_model_id if lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value: w, h = lcm_diffusion_setting.init_image.size newW = lcm_diffusion_setting.image_width newH = self.make_even(int(h * newW / w)) lcm_diffusion_setting.image_height=newH img = lcm_diffusion_setting.init_image.resize((newW, newH)) print("新图",newH,newW, lcm_diffusion_setting.image_height) lcm_diffusion_setting.init_image = resize_pil_image( img, lcm_diffusion_setting.image_width, lcm_diffusion_setting.image_height, ) print("图片大小",lcm_diffusion_setting.init_image) if ( self.pipeline is None or self.previous_model_id != model_id or self.previous_use_tae_sd != use_tiny_auto_encoder or self.previous_lcm_lora_base_id != lcm_lora.base_model_id or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id or self.previous_use_lcm_lora != use_lora or self.previous_ov_model_id != ov_model_id or self.previous_safety_checker != lcm_diffusion_setting.use_safety_checker or self.previous_use_openvino != lcm_diffusion_setting.use_openvino or self.previous_task_type != lcm_diffusion_setting.diffusion_task ): if self.use_openvino and is_openvino_device(): if self.pipeline: del self.pipeline self.pipeline = None self.is_openvino_init = True if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): print(f"***** Init Text to image (OpenVINO) - {ov_model_id} *****") self.pipeline = get_ov_text_to_image_pipeline( ov_model_id, use_local_model, ) elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): print(f"***** Image to image (OpenVINO) - {ov_model_id} *****") self.pipeline = get_ov_image_to_image_pipeline( ov_model_id, use_local_model, ) else: if self.pipeline: del self.pipeline self.pipeline = None if self.img_to_img_pipeline: del self.img_to_img_pipeline self.img_to_img_pipeline = None if use_lora: print( f"***** Init LCM-LoRA pipeline - {lcm_lora.base_model_id} *****" ) self.pipeline = get_lcm_lora_pipeline( lcm_lora.base_model_id, lcm_lora.lcm_lora_id, use_local_model, torch_data_type=self.torch_data_type, ) else: print(f"***** Init LCM Model pipeline - {model_id} *****") self.pipeline = get_lcm_model_pipeline( model_id, use_local_model, ) if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): self.img_to_img_pipeline = get_image_to_image_pipeline( self.pipeline ) self._pipeline_to_device() if use_tiny_auto_encoder: if self.use_openvino and is_openvino_device(): print("Using Tiny Auto Encoder (OpenVINO)") ov_load_taesd( self.pipeline, use_local_model, ) else: print("Using Tiny Auto Encoder") if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): load_taesd( self.pipeline, use_local_model, self.torch_data_type, ) elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): load_taesd( self.img_to_img_pipeline, use_local_model, self.torch_data_type, ) if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value and lcm_diffusion_setting.use_openvino ): self.pipeline.scheduler = LCMScheduler.from_config( self.pipeline.scheduler.config, ) else: self._update_lcm_scheduler_params() if use_lora: self._add_freeu() self.previous_model_id = model_id self.previous_ov_model_id = ov_model_id self.previous_use_tae_sd = use_tiny_auto_encoder self.previous_lcm_lora_base_id = lcm_lora.base_model_id self.previous_lcm_lora_id = lcm_lora.lcm_lora_id self.previous_use_lcm_lora = use_lora self.previous_safety_checker = lcm_diffusion_setting.use_safety_checker self.previous_use_openvino = lcm_diffusion_setting.use_openvino self.previous_task_type = lcm_diffusion_setting.diffusion_task if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): print(f"Pipeline : {self.pipeline}") elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): if self.use_openvino and is_openvino_device(): print(f"Pipeline : {self.pipeline}") else: print(f"Pipeline : {self.img_to_img_pipeline}") def generate( self, lcm_diffusion_setting: LCMDiffusionSetting, reshape: bool = False, ) -> Any: guidance_scale = lcm_diffusion_setting.guidance_scale img_to_img_inference_steps = lcm_diffusion_setting.inference_steps check_step_value = int( lcm_diffusion_setting.inference_steps * lcm_diffusion_setting.strength ) if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value and check_step_value < 1 ): img_to_img_inference_steps = ceil(1 / lcm_diffusion_setting.strength) print( f"Strength: {lcm_diffusion_setting.strength},{img_to_img_inference_steps}" ) if lcm_diffusion_setting.use_seed: cur_seed = lcm_diffusion_setting.seed if self.use_openvino: np.random.seed(cur_seed) else: torch.manual_seed(cur_seed) is_openvino_pipe = lcm_diffusion_setting.use_openvino and is_openvino_device() if is_openvino_pipe: print("Using OpenVINO") if reshape and not self.is_openvino_init: print("Reshape and compile") self.pipeline.reshape( batch_size=-1, height=lcm_diffusion_setting.image_height, width=lcm_diffusion_setting.image_width, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ) self.pipeline.compile() if self.is_openvino_init: self.is_openvino_init = False if not lcm_diffusion_setting.use_safety_checker: self.pipeline.safety_checker = None if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value and not is_openvino_pipe ): self.img_to_img_pipeline.safety_checker = None if ( not lcm_diffusion_setting.use_lcm_lora and not lcm_diffusion_setting.use_openvino and lcm_diffusion_setting.guidance_scale != 1.0 ): print("Not using LCM-LoRA so setting guidance_scale 1.0") guidance_scale = 1.0 if lcm_diffusion_setting.use_openvino: if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): result_images = self.pipeline( prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=lcm_diffusion_setting.inference_steps, guidance_scale=guidance_scale, width=lcm_diffusion_setting.image_width, height=lcm_diffusion_setting.image_height, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ).images elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): register_parallel_pipeline(self.pipeline) # 2. enable parallel. If memory is limited, replace it with `register_normal_pipeline(pipe)` register_faster_forward(self.pipeline.unet) # 3. encoder propagation result_images = self.pipeline.call( image=lcm_diffusion_setting.init_image, strength=lcm_diffusion_setting.strength, prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=img_to_img_inference_steps * 3, guidance_scale=guidance_scale, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ).images else: if ( lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value ): result_images = self.pipeline( prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=lcm_diffusion_setting.inference_steps, guidance_scale=guidance_scale, width=lcm_diffusion_setting.image_width, height=lcm_diffusion_setting.image_height, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ).images elif ( lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value ): result_images = self.img_to_img_pipeline( image=lcm_diffusion_setting.init_image, strength=lcm_diffusion_setting.strength, prompt=lcm_diffusion_setting.prompt, negative_prompt=lcm_diffusion_setting.negative_prompt, num_inference_steps=img_to_img_inference_steps, guidance_scale=guidance_scale, width=lcm_diffusion_setting.image_width, height=lcm_diffusion_setting.image_height, num_images_per_prompt=lcm_diffusion_setting.number_of_images, ).images return result_images