Spaces:
Sleeping
Sleeping
import diffusers | |
import transformers | |
import utils.log | |
import torch | |
import PIL | |
from typing import Union, Dict, Any, Optional, List, Tuple, Callable | |
import os | |
import re | |
class SimpleDiffusion(diffusers.DiffusionPipeline): | |
""" | |
An unified interface for diffusion models. This allow us to use : | |
- txt2img | |
- img2img | |
- inpainting | |
- unconditional image generation | |
This class is highly inspired from the Stable-Diffusion-Mega pipeline. | |
DiffusionPipeline class allow us to load/download all the models hubbed by HuggingFace with an ease. Read more information | |
about the DiffusionPipeline class here: https://huggingface.co./transformers/main_classes/pipelines.html#transformers.DiffusionPipeline | |
Args: | |
logger (:obj:`utils.log.Logger`): | |
The logger to use for logging any information. | |
vae ([`AutoencoderKL`]): | |
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations. | |
text_encoder ([`CLIPTextModel`]): | |
Frozen text-encoder. Stable Diffusion uses the text portion of | |
[CLIP](https://huggingface.co./docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically | |
the [clip-vit-large-patch14](https://huggingface.co./openai/clip-vit-large-patch14) variant. | |
tokenizer (`CLIPTokenizer`): | |
Tokenizer of class | |
[CLIPTokenizer](https://huggingface.co./docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). | |
unet ([`UNet2DConditionModel`]): | |
Conditional U-Net architecture to denoise the encoded image latents. | |
scheduler ([`SchedulerMixin`]): | |
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of | |
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. | |
safety_checker ([`StableDiffusionMegaSafetyChecker`]): | |
Classification module that estimates whether generated images could be considered offensive or harmful. | |
Please, refer to the [model card](https://huggingface.co./runwayml/stable-diffusion-v1-5) for details. | |
feature_extractor ([`CLIPFeatureExtractor`]): | |
Model that extracts features from generated images to be used as inputs for the `safety_checker`. | |
""" | |
def __init__( | |
self, | |
vae: diffusers.AutoencoderKL, | |
text_encoder: transformers.CLIPTextModel, | |
tokenizer: transformers.CLIPTokenizer, | |
unet: diffusers.UNet2DConditionModel, | |
scheduler: Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler], | |
safety_checker: diffusers.pipelines.stable_diffusion.safety_checker.StableDiffusionSafetyChecker, | |
feature_extractor: transformers.CLIPFeatureExtractor, | |
prompt_generation = "succinctly/text2image-prompt-generator" | |
): | |
super().__init__() | |
self._logger = None | |
self.register_modules( # already defined in ConfigMixin class, from_pretrained loads these modules | |
vae=vae, | |
text_encoder=text_encoder, | |
tokenizer=tokenizer, | |
unet=unet, | |
scheduler=scheduler, | |
safety_checker=safety_checker, | |
feature_extractor=feature_extractor, | |
) | |
self._generated_prompts = [] | |
self._enable_prompt_generation = False | |
if prompt_generation: | |
self._enable_prompt_generation = True | |
self._prompt_generator = transformers.pipeline('text-generation', model='Gustavosta/MagicPrompt-Stable-Diffusion', tokenizer='gpt2') | |
def _generate_prompt(self, prompt, **kwargs): | |
""" | |
Generate a prompt from a given text. | |
Args: | |
prompt (str): The text to generate a prompt from. | |
**kwargs: Additional keyword arguments passed to the prompt generator pipeline. | |
""" | |
max_length = kwargs.pop("max_length", None) | |
num_return_sequences = kwargs.pop("num_return_sequences", None) | |
prompt = self._prompt_generator(prompt, max_length=max_length, num_return_sequences=num_return_sequences) | |
prompt = self._process_prompt(prompt, **kwargs) | |
return prompt[0]['generated_text'] | |
def _process_prompt(self,original_prompt, prompt_list): | |
# TODO : Add documentation; add more prompt processing | |
response_list = [] | |
for x in prompt_list: | |
resp = x['generated_text'].strip() | |
if resp != original_prompt and len(resp) > (len(original_prompt) + 4) and resp.endswith((":", "-", "—")) is False: | |
response_list.append(resp+'\n') | |
response_end = "\n".join(response_list) | |
response_end = re.sub('[^ ]+\.[^ ]+','', response_end) | |
response_end = response_end.replace("<", "").replace(">", "") | |
if response_end != "": | |
return response_end | |
# Following components are required for the DiffusionPipeline class - but they exist in the StableDiffusionModel class | |
def enable_attention_slicing(self, slice_size: Optional[Union[str, int]] = "auto"): | |
r""" | |
Enable sliced attention computation. | |
Refer to the [StableDiffusionModel](https://github.com/huggingface/diffusers/blob/main/examples/community/stable_diffusion_mega.py) repo | |
for more information. | |
When this option is enabled, the attention module will split the input tensor in slices, to compute attention | |
in several steps. This is useful to save some memory in exchange for a small speed decrease. | |
Args: | |
slice_size (`str` or `int`, *optional*, defaults to `"auto"`): | |
When `"auto"`, halves the input to the attention heads, so attention will be computed in two steps. If | |
a number is provided, uses as many slices as `attention_head_dim // slice_size`. In this case, | |
`attention_head_dim` must be a multiple of `slice_size`. | |
""" | |
if slice_size == "auto": | |
# half the attention head size is usually a good trade-off between | |
# speed and memory | |
if self._logger is not None: | |
self._logger.info("Attention slicing enabled!") | |
slice_size = self.unet.config.attention_head_dim // 2 | |
self.unet.set_attention_slice(slice_size) | |
def disable_attention_slicing(self): | |
r""" | |
Disable sliced attention computation. If `enable_attention_slicing` was previously invoked, this method will go | |
back to computing attention in one step. | |
""" | |
if self._logger is not None: | |
self._logger.info("Attention slicing disabled!") | |
self.enable_attention_slicing(None) | |
def set_logger(self, logger): | |
r""" | |
Set logger. This is useful to log information about the model. | |
""" | |
self._logger = logger | |
def components(self) -> Dict[str, Any]: | |
# Return the non-private variables | |
return {k : getattr(self, k) for k in self.config.keys() if not k.startswith("_")} | |
def inpaint( | |
self, | |
prompt: Union[str, List[str]], | |
init_image: Union[torch.FloatTensor, PIL.Image.Image], | |
mask_image: Union[torch.FloatTensor, PIL.Image.Image], | |
strength: float = 0.8, | |
num_inference_steps: Optional[int] = 50, | |
guidance_scale: Optional[float] = 7.5, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
num_images_per_prompt: Optional[int] = 1, | |
eta: Optional[float] = 0.0, | |
generator: Optional[torch.Generator] = None, | |
output_type: Optional[str] = "pil", | |
return_dict: bool = True, | |
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
callback_steps: Optional[int] = 1, | |
**kwargs, | |
): | |
if self._enable_prompt_generation: | |
prompt = self._generate_prompt(p, **kwargs)[0] | |
self._logger.info(f"Generated prompt: {prompt}") | |
# For more information on how this function works, please see: https://huggingface.co./docs/diffusers/api/pipelines/stable_diffusion#diffusers.StableDiffusionImg2ImgPipeline | |
return diffusers.StableDiffusionInpaintPipelineLegacy(**self.components)( | |
prompt=prompt, | |
init_image=init_image, | |
mask_image=mask_image, | |
strength=strength, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=guidance_scale, | |
negative_prompt=negative_prompt, | |
num_images_per_prompt=num_images_per_prompt, | |
eta=eta, | |
generator=generator, | |
output_type=output_type, | |
return_dict=return_dict, | |
callback=callback, | |
) | |
def img2img( | |
self, | |
prompt: Union[str, List[str]], | |
init_image: Union[torch.FloatTensor, PIL.Image.Image], | |
strength: float = 0.8, | |
num_inference_steps: Optional[int] = 50, | |
guidance_scale: Optional[float] = 7.5, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
num_images_per_prompt: Optional[int] = 1, | |
eta: Optional[float] = 0.0, | |
generator: Optional[torch.Generator] = None, | |
output_type: Optional[str] = "pil", | |
return_dict: bool = True, | |
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
callback_steps: Optional[int] = 1, | |
**kwargs, | |
): | |
if self._enable_prompt_generation: | |
prompt = self._generate_prompt(p, **kwargs)[0] | |
self._logger.info(f"Generated prompt: {prompt}") | |
# For more information on how this function works, please see: https://huggingface.co./docs/diffusers/api/pipelines/stable_diffusion#diffusers.StableDiffusionImg2ImgPipeline | |
return diffusers.StableDiffusionImg2ImgPipeline(**self.components)( | |
prompt=prompt, | |
init_image=init_image, | |
strength=strength, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=guidance_scale, | |
negative_prompt=negative_prompt, | |
num_images_per_prompt=num_images_per_prompt, | |
eta=eta, | |
generator=generator, | |
output_type=output_type, | |
return_dict=return_dict, | |
callback=callback, | |
callback_steps=callback_steps, | |
) | |
def text2img( | |
self, | |
prompt: Union[str, List[str]], | |
height: int = 512, | |
width: int = 512, | |
num_inference_steps: int = 50, | |
guidance_scale: float = 7.5, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
num_images_per_prompt: Optional[int] = 1, | |
eta: float = 0.0, | |
generator: Optional[torch.Generator] = None, | |
latents: Optional[torch.FloatTensor] = None, | |
output_type: Optional[str] = "pil", | |
return_dict: bool = True, | |
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
callback_steps: Optional[int] = 1, | |
): | |
if self._enable_prompt_generation: | |
prompt = self._generate_prompt(p, **kwargs)[0] | |
self._logger.info(f"Generated prompt: {prompt}") | |
# For more information on how this function https://huggingface.co./docs/diffusers/api/pipelines/stable_diffusion#diffusers.StableDiffusionPipeline | |
return diffusers.StableDiffusionPipeline(**self.components)( | |
prompt=prompt, | |
height=height, | |
width=width, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=guidance_scale, | |
negative_prompt=negative_prompt, | |
num_images_per_prompt=num_images_per_prompt, | |
eta=eta, | |
generator=generator, | |
latents=latents, | |
output_type=output_type, | |
return_dict=return_dict, | |
callback=callback, | |
callback_steps=callback_steps, | |
) | |
def upscale( | |
self, | |
prompt: Union[str, List[str]], | |
init_image: Union[torch.FloatTensor, PIL.Image.Image], | |
num_inference_steps: Optional[int] = 75, | |
guidance_scale: Optional[float] = 9.0, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
num_images_per_prompt: Optional[int] = 1, | |
eta: Optional[float] = 0.0, | |
generator: Optional[torch.Generator] = None, | |
latents: Optional[torch.FloatTensor] = None, | |
output_type: Optional[str] = "pil", | |
return_dict: bool = True, | |
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
callback_steps: Optional[int] = 1, | |
): | |
""" | |
Upscale an image using the StableDiffusionUpscalePipeline. | |
""" | |
if self._enable_prompt_generation: | |
prompt = self._generate_prompt(p, **kwargs)[0] | |
self._logger.info(f"Generated prompt: {prompt}") | |
return diffusers.StableDiffusionUpscalePipeline(**self.components)( | |
prompt=prompt, | |
image=init_image, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=guidance_scale, | |
negative_prompt=negative_prompt, | |
num_images_per_prompt=num_images_per_prompt, | |
eta=eta, | |
generator=generator, | |
latents=latents, | |
output_type = output_type, | |
return_dict=return_dict, | |
callback=callback, | |
callback_steps=callback_steps) | |
def set_scheduler(self, scheduler: Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler, diffusers.EulerDiscreteScheduler]): | |
""" | |
Set the scheduler for the pipeline. This is useful for controlling the diffusion process. | |
Args: | |
scheduler (Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler]): The scheduler to use. | |
""" | |
self.components["scheduler"] = scheduler |