|
import diffusers |
|
import transformers |
|
import utils.log |
|
import torch |
|
import PIL |
|
from typing import Union, Dict, Any, Optional, List, Tuple, Callable |
|
import os |
|
import re |
|
|
|
class SimpleDiffusion(diffusers.DiffusionPipeline): |
|
""" |
|
An unified interface for diffusion models. This allow us to use : |
|
- txt2img |
|
- img2img |
|
- inpainting |
|
- unconditional image generation |
|
|
|
This class is highly inspired from the Stable-Diffusion-Mega pipeline. |
|
DiffusionPipeline class allow us to load/download all the models hubbed by HuggingFace with an ease. Read more information |
|
about the DiffusionPipeline class here: https://huggingface.co./transformers/main_classes/pipelines.html#transformers.DiffusionPipeline |
|
|
|
Args: |
|
logger (:obj:`utils.log.Logger`): |
|
The logger to use for logging any information. |
|
vae ([`AutoencoderKL`]): |
|
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations. |
|
text_encoder ([`CLIPTextModel`]): |
|
Frozen text-encoder. Stable Diffusion uses the text portion of |
|
[CLIP](https://huggingface.co./docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically |
|
the [clip-vit-large-patch14](https://huggingface.co./openai/clip-vit-large-patch14) variant. |
|
tokenizer (`CLIPTokenizer`): |
|
Tokenizer of class |
|
[CLIPTokenizer](https://huggingface.co./docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). |
|
unet ([`UNet2DConditionModel`]): |
|
Conditional U-Net architecture to denoise the encoded image latents. |
|
scheduler ([`SchedulerMixin`]): |
|
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of |
|
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. |
|
safety_checker ([`StableDiffusionMegaSafetyChecker`]): |
|
Classification module that estimates whether generated images could be considered offensive or harmful. |
|
Please, refer to the [model card](https://huggingface.co./runwayml/stable-diffusion-v1-5) for details. |
|
feature_extractor ([`CLIPFeatureExtractor`]): |
|
Model that extracts features from generated images to be used as inputs for the `safety_checker`. |
|
|
|
""" |
|
def __init__( |
|
self, |
|
vae: diffusers.AutoencoderKL, |
|
text_encoder: transformers.CLIPTextModel, |
|
tokenizer: transformers.CLIPTokenizer, |
|
unet: diffusers.UNet2DConditionModel, |
|
scheduler: Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler], |
|
safety_checker: diffusers.pipelines.stable_diffusion.safety_checker.StableDiffusionSafetyChecker, |
|
feature_extractor: transformers.CLIPFeatureExtractor, |
|
prompt_generation = "succinctly/text2image-prompt-generator" |
|
): |
|
super().__init__() |
|
self._logger = None |
|
self.register_modules( |
|
vae=vae, |
|
text_encoder=text_encoder, |
|
tokenizer=tokenizer, |
|
unet=unet, |
|
scheduler=scheduler, |
|
safety_checker=safety_checker, |
|
feature_extractor=feature_extractor, |
|
|
|
) |
|
self._generated_prompts = [] |
|
self._enable_prompt_generation = False |
|
if prompt_generation: |
|
self._enable_prompt_generation = True |
|
self._prompt_generator = transformers.pipeline('text-generation', model='Gustavosta/MagicPrompt-Stable-Diffusion', tokenizer='gpt2') |
|
|
|
def _generate_prompt(self, prompt, **kwargs): |
|
""" |
|
Generate a prompt from a given text. |
|
Args: |
|
prompt (str): The text to generate a prompt from. |
|
**kwargs: Additional keyword arguments passed to the prompt generator pipeline. |
|
""" |
|
max_length = kwargs.pop("max_length", None) |
|
num_return_sequences = kwargs.pop("num_return_sequences", None) |
|
|
|
prompt = self._prompt_generator(prompt, max_length=max_length, num_return_sequences=num_return_sequences) |
|
prompt = self._process_prompt(prompt, **kwargs) |
|
return prompt[0]['generated_text'] |
|
|
|
def _process_prompt(self,original_prompt, prompt_list): |
|
|
|
response_list = [] |
|
for x in prompt_list: |
|
resp = x['generated_text'].strip() |
|
if resp != original_prompt and len(resp) > (len(original_prompt) + 4) and resp.endswith((":", "-", "—")) is False: |
|
response_list.append(resp+'\n') |
|
|
|
response_end = "\n".join(response_list) |
|
response_end = re.sub('[^ ]+\.[^ ]+','', response_end) |
|
response_end = response_end.replace("<", "").replace(">", "") |
|
|
|
if response_end != "": |
|
return response_end |
|
|
|
|
|
def enable_attention_slicing(self, slice_size: Optional[Union[str, int]] = "auto"): |
|
r""" |
|
Enable sliced attention computation. |
|
Refer to the [StableDiffusionModel](https://github.com/huggingface/diffusers/blob/main/examples/community/stable_diffusion_mega.py) repo |
|
for more information. |
|
When this option is enabled, the attention module will split the input tensor in slices, to compute attention |
|
in several steps. This is useful to save some memory in exchange for a small speed decrease. |
|
Args: |
|
slice_size (`str` or `int`, *optional*, defaults to `"auto"`): |
|
When `"auto"`, halves the input to the attention heads, so attention will be computed in two steps. If |
|
a number is provided, uses as many slices as `attention_head_dim // slice_size`. In this case, |
|
`attention_head_dim` must be a multiple of `slice_size`. |
|
""" |
|
if slice_size == "auto": |
|
|
|
|
|
if self._logger is not None: |
|
self._logger.info("Attention slicing enabled!") |
|
slice_size = self.unet.config.attention_head_dim // 2 |
|
self.unet.set_attention_slice(slice_size) |
|
|
|
def disable_attention_slicing(self): |
|
r""" |
|
Disable sliced attention computation. If `enable_attention_slicing` was previously invoked, this method will go |
|
back to computing attention in one step. |
|
""" |
|
if self._logger is not None: |
|
self._logger.info("Attention slicing disabled!") |
|
self.enable_attention_slicing(None) |
|
|
|
def set_logger(self, logger): |
|
r""" |
|
Set logger. This is useful to log information about the model. |
|
""" |
|
self._logger = logger |
|
|
|
@property |
|
def components(self) -> Dict[str, Any]: |
|
|
|
return {k : getattr(self, k) for k in self.config.keys() if not k.startswith("_")} |
|
|
|
@torch.no_grad() |
|
def inpaint( |
|
self, |
|
prompt: Union[str, List[str]], |
|
init_image: Union[torch.FloatTensor, PIL.Image.Image], |
|
mask_image: Union[torch.FloatTensor, PIL.Image.Image], |
|
strength: float = 0.8, |
|
num_inference_steps: Optional[int] = 50, |
|
guidance_scale: Optional[float] = 7.5, |
|
negative_prompt: Optional[Union[str, List[str]]] = None, |
|
num_images_per_prompt: Optional[int] = 1, |
|
eta: Optional[float] = 0.0, |
|
generator: Optional[torch.Generator] = None, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, |
|
callback_steps: Optional[int] = 1, |
|
**kwargs, |
|
): |
|
if self._enable_prompt_generation: |
|
prompt = self._generate_prompt(p, **kwargs)[0] |
|
self._logger.info(f"Generated prompt: {prompt}") |
|
|
|
return diffusers.StableDiffusionInpaintPipelineLegacy(**self.components)( |
|
prompt=prompt, |
|
init_image=init_image, |
|
mask_image=mask_image, |
|
strength=strength, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
negative_prompt=negative_prompt, |
|
num_images_per_prompt=num_images_per_prompt, |
|
eta=eta, |
|
generator=generator, |
|
output_type=output_type, |
|
return_dict=return_dict, |
|
callback=callback, |
|
) |
|
|
|
@torch.no_grad() |
|
def img2img( |
|
self, |
|
prompt: Union[str, List[str]], |
|
init_image: Union[torch.FloatTensor, PIL.Image.Image], |
|
strength: float = 0.8, |
|
num_inference_steps: Optional[int] = 50, |
|
guidance_scale: Optional[float] = 7.5, |
|
negative_prompt: Optional[Union[str, List[str]]] = None, |
|
num_images_per_prompt: Optional[int] = 1, |
|
eta: Optional[float] = 0.0, |
|
generator: Optional[torch.Generator] = None, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, |
|
callback_steps: Optional[int] = 1, |
|
**kwargs, |
|
): |
|
if self._enable_prompt_generation: |
|
prompt = self._generate_prompt(p, **kwargs)[0] |
|
self._logger.info(f"Generated prompt: {prompt}") |
|
|
|
return diffusers.StableDiffusionImg2ImgPipeline(**self.components)( |
|
prompt=prompt, |
|
init_image=init_image, |
|
strength=strength, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
negative_prompt=negative_prompt, |
|
num_images_per_prompt=num_images_per_prompt, |
|
eta=eta, |
|
generator=generator, |
|
output_type=output_type, |
|
return_dict=return_dict, |
|
callback=callback, |
|
callback_steps=callback_steps, |
|
) |
|
|
|
@torch.no_grad() |
|
def text2img( |
|
self, |
|
prompt: Union[str, List[str]], |
|
height: int = 512, |
|
width: int = 512, |
|
num_inference_steps: int = 50, |
|
guidance_scale: float = 7.5, |
|
negative_prompt: Optional[Union[str, List[str]]] = None, |
|
num_images_per_prompt: Optional[int] = 1, |
|
eta: float = 0.0, |
|
generator: Optional[torch.Generator] = None, |
|
latents: Optional[torch.FloatTensor] = None, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, |
|
callback_steps: Optional[int] = 1, |
|
): |
|
if self._enable_prompt_generation: |
|
prompt = self._generate_prompt(p, **kwargs)[0] |
|
self._logger.info(f"Generated prompt: {prompt}") |
|
|
|
|
|
return diffusers.StableDiffusionPipeline(**self.components)( |
|
prompt=prompt, |
|
height=height, |
|
width=width, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
negative_prompt=negative_prompt, |
|
num_images_per_prompt=num_images_per_prompt, |
|
eta=eta, |
|
generator=generator, |
|
latents=latents, |
|
output_type=output_type, |
|
return_dict=return_dict, |
|
callback=callback, |
|
callback_steps=callback_steps, |
|
) |
|
|
|
@torch.no_grad() |
|
def upscale( |
|
self, |
|
prompt: Union[str, List[str]], |
|
init_image: Union[torch.FloatTensor, PIL.Image.Image], |
|
num_inference_steps: Optional[int] = 75, |
|
guidance_scale: Optional[float] = 9.0, |
|
negative_prompt: Optional[Union[str, List[str]]] = None, |
|
num_images_per_prompt: Optional[int] = 1, |
|
eta: Optional[float] = 0.0, |
|
generator: Optional[torch.Generator] = None, |
|
latents: Optional[torch.FloatTensor] = None, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, |
|
callback_steps: Optional[int] = 1, |
|
): |
|
""" |
|
Upscale an image using the StableDiffusionUpscalePipeline. |
|
""" |
|
if self._enable_prompt_generation: |
|
prompt = self._generate_prompt(p, **kwargs)[0] |
|
self._logger.info(f"Generated prompt: {prompt}") |
|
|
|
return diffusers.StableDiffusionUpscalePipeline(**self.components)( |
|
prompt=prompt, |
|
image=init_image, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
negative_prompt=negative_prompt, |
|
num_images_per_prompt=num_images_per_prompt, |
|
eta=eta, |
|
generator=generator, |
|
latents=latents, |
|
output_type = output_type, |
|
return_dict=return_dict, |
|
callback=callback, |
|
callback_steps=callback_steps) |
|
|
|
def set_scheduler(self, scheduler: Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler, diffusers.EulerDiscreteScheduler]): |
|
""" |
|
Set the scheduler for the pipeline. This is useful for controlling the diffusion process. |
|
Args: |
|
scheduler (Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler]): The scheduler to use. |
|
|
|
""" |
|
self.components["scheduler"] = scheduler |