Spaces:
Running
on
Zero
Running
on
Zero
# Copyright 2024 The HuggingFace Team and The MeissonFlow Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from typing import Any, Callable, Dict, List, Optional, Tuple, Union | |
import torch | |
from transformers import CLIPTextModelWithProjection, CLIPTokenizer | |
from diffusers.image_processor import PipelineImageInput, VaeImageProcessor | |
from diffusers.models import UVit2DModel, VQModel | |
# from diffusers.schedulers import AmusedScheduler | |
from training.scheduling import Scheduler | |
from diffusers.utils import replace_example_docstring | |
from diffusers.pipelines.pipeline_utils import DiffusionPipeline, ImagePipelineOutput | |
from training.transformer import Transformer2DModel | |
EXAMPLE_DOC_STRING = """ | |
Examples: | |
```py | |
>>> image = pipe(prompt, input_image).images[0] | |
``` | |
""" | |
def _prepare_latent_image_ids(batch_size, height, width, device, dtype): | |
latent_image_ids = torch.zeros(height // 2, width // 2, 3) | |
latent_image_ids[..., 1] = latent_image_ids[..., 1] + torch.arange(height // 2)[:, None] | |
latent_image_ids[..., 2] = latent_image_ids[..., 2] + torch.arange(width // 2)[None, :] | |
latent_image_id_height, latent_image_id_width, latent_image_id_channels = latent_image_ids.shape | |
latent_image_ids = latent_image_ids.reshape( | |
latent_image_id_height * latent_image_id_width, latent_image_id_channels | |
) | |
# latent_image_ids = latent_image_ids.unsqueeze(0).repeat(batch_size, 1, 1) | |
return latent_image_ids.to(device=device, dtype=dtype) | |
class Img2ImgPipeline(DiffusionPipeline): | |
image_processor: VaeImageProcessor | |
vqvae: VQModel | |
tokenizer: CLIPTokenizer | |
text_encoder: CLIPTextModelWithProjection | |
transformer: Transformer2DModel #UVit2DModel | |
scheduler: Scheduler | |
model_cpu_offload_seq = "text_encoder->transformer->vqvae" | |
# TODO - when calling self.vqvae.quantize, it uses self.vqvae.quantize.embedding.weight before | |
# the forward method of self.vqvae.quantize, so the hook doesn't get called to move the parameter | |
# off the meta device. There should be a way to fix this instead of just not offloading it | |
_exclude_from_cpu_offload = ["vqvae"] | |
def __init__( | |
self, | |
vqvae: VQModel, | |
tokenizer: CLIPTokenizer, | |
text_encoder: CLIPTextModelWithProjection, | |
transformer: Transformer2DModel, #UVit2DModel, | |
scheduler: Scheduler, | |
): | |
super().__init__() | |
self.register_modules( | |
vqvae=vqvae, | |
tokenizer=tokenizer, | |
text_encoder=text_encoder, | |
transformer=transformer, | |
scheduler=scheduler, | |
) | |
self.vae_scale_factor = 2 ** (len(self.vqvae.config.block_out_channels) - 1) | |
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_normalize=False) | |
def __call__( | |
self, | |
prompt: Optional[Union[List[str], str]] = None, | |
image: PipelineImageInput = None, | |
strength: float = 0.5, | |
num_inference_steps: int = 12, | |
guidance_scale: float = 10.0, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
num_images_per_prompt: Optional[int] = 1, | |
generator: Optional[torch.Generator] = None, | |
prompt_embeds: Optional[torch.Tensor] = None, | |
encoder_hidden_states: Optional[torch.Tensor] = None, | |
negative_prompt_embeds: Optional[torch.Tensor] = None, | |
negative_encoder_hidden_states: Optional[torch.Tensor] = None, | |
output_type="pil", | |
return_dict: bool = True, | |
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, | |
callback_steps: int = 1, | |
cross_attention_kwargs: Optional[Dict[str, Any]] = None, | |
micro_conditioning_aesthetic_score: int = 6, | |
micro_conditioning_crop_coord: Tuple[int, int] = (0, 0), | |
temperature: Union[int, Tuple[int, int], List[int]] = (2, 0), | |
): | |
""" | |
The call function to the pipeline for generation. | |
Args: | |
prompt (`str` or `List[str]`, *optional*): | |
The prompt or prompts to guide image generation. If not defined, you need to pass `prompt_embeds`. | |
image (`torch.Tensor`, `PIL.Image.Image`, `np.ndarray`, `List[torch.Tensor]`, `List[PIL.Image.Image]`, or `List[np.ndarray]`): | |
`Image`, numpy array or tensor representing an image batch to be used as the starting point. For both | |
numpy array and pytorch tensor, the expected value range is between `[0, 1]` If it's a tensor or a list | |
or tensors, the expected shape should be `(B, C, H, W)` or `(C, H, W)`. If it is a numpy array or a | |
list of arrays, the expected shape should be `(B, H, W, C)` or `(H, W, C)` It can also accept image | |
latents as `image`, but if passing latents directly it is not encoded again. | |
strength (`float`, *optional*, defaults to 0.5): | |
Indicates extent to transform the reference `image`. Must be between 0 and 1. `image` is used as a | |
starting point and more noise is added the higher the `strength`. The number of denoising steps depends | |
on the amount of noise initially added. When `strength` is 1, added noise is maximum and the denoising | |
process runs for the full number of iterations specified in `num_inference_steps`. A value of 1 | |
essentially ignores `image`. | |
num_inference_steps (`int`, *optional*, defaults to 12): | |
The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
expense of slower inference. | |
guidance_scale (`float`, *optional*, defaults to 10.0): | |
A higher guidance scale value encourages the model to generate images closely linked to the text | |
`prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`. | |
negative_prompt (`str` or `List[str]`, *optional*): | |
The prompt or prompts to guide what to not include in image generation. If not defined, you need to | |
pass `negative_prompt_embeds` instead. Ignored when not using guidance (`guidance_scale < 1`). | |
num_images_per_prompt (`int`, *optional*, defaults to 1): | |
The number of images to generate per prompt. | |
generator (`torch.Generator`, *optional*): | |
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make | |
generation deterministic. | |
prompt_embeds (`torch.Tensor`, *optional*): | |
Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not | |
provided, text embeddings are generated from the `prompt` input argument. A single vector from the | |
pooled and projected final hidden states. | |
encoder_hidden_states (`torch.Tensor`, *optional*): | |
Pre-generated penultimate hidden states from the text encoder providing additional text conditioning. | |
negative_prompt_embeds (`torch.Tensor`, *optional*): | |
Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If | |
not provided, `negative_prompt_embeds` are generated from the `negative_prompt` input argument. | |
negative_encoder_hidden_states (`torch.Tensor`, *optional*): | |
Analogous to `encoder_hidden_states` for the positive prompt. | |
output_type (`str`, *optional*, defaults to `"pil"`): | |
The output format of the generated image. Choose between `PIL.Image` or `np.array`. | |
return_dict (`bool`, *optional*, defaults to `True`): | |
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a | |
plain tuple. | |
callback (`Callable`, *optional*): | |
A function that calls every `callback_steps` steps during inference. The function is called with the | |
following arguments: `callback(step: int, timestep: int, latents: torch.Tensor)`. | |
callback_steps (`int`, *optional*, defaults to 1): | |
The frequency at which the `callback` function is called. If not specified, the callback is called at | |
every step. | |
cross_attention_kwargs (`dict`, *optional*): | |
A kwargs dictionary that if specified is passed along to the [`AttentionProcessor`] as defined in | |
[`self.processor`](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py). | |
micro_conditioning_aesthetic_score (`int`, *optional*, defaults to 6): | |
The targeted aesthetic score according to the laion aesthetic classifier. See | |
https://laion.ai/blog/laion-aesthetics/ and the micro-conditioning section of | |
https://arxiv.org/abs/2307.01952. | |
micro_conditioning_crop_coord (`Tuple[int]`, *optional*, defaults to (0, 0)): | |
The targeted height, width crop coordinates. See the micro-conditioning section of | |
https://arxiv.org/abs/2307.01952. | |
temperature (`Union[int, Tuple[int, int], List[int]]`, *optional*, defaults to (2, 0)): | |
Configures the temperature scheduler on `self.scheduler` see `AmusedScheduler#set_timesteps`. | |
Examples: | |
Returns: | |
[`~pipelines.pipeline_utils.ImagePipelineOutput`] or `tuple`: | |
If `return_dict` is `True`, [`~pipelines.pipeline_utils.ImagePipelineOutput`] is returned, otherwise a | |
`tuple` is returned where the first element is a list with the generated images. | |
""" | |
if (prompt_embeds is not None and encoder_hidden_states is None) or ( | |
prompt_embeds is None and encoder_hidden_states is not None | |
): | |
raise ValueError("pass either both `prompt_embeds` and `encoder_hidden_states` or neither") | |
if (negative_prompt_embeds is not None and negative_encoder_hidden_states is None) or ( | |
negative_prompt_embeds is None and negative_encoder_hidden_states is not None | |
): | |
raise ValueError( | |
"pass either both `negative_prompt_embeds` and `negative_encoder_hidden_states` or neither" | |
) | |
if (prompt is None and prompt_embeds is None) or (prompt is not None and prompt_embeds is not None): | |
raise ValueError("pass only one of `prompt` or `prompt_embeds`") | |
if isinstance(prompt, str): | |
prompt = [prompt] | |
if prompt is not None: | |
batch_size = len(prompt) | |
else: | |
batch_size = prompt_embeds.shape[0] | |
batch_size = batch_size * num_images_per_prompt | |
if prompt_embeds is None: | |
input_ids = self.tokenizer( | |
prompt, | |
return_tensors="pt", | |
padding="max_length", | |
truncation=True, | |
max_length=77, #self.tokenizer.model_max_length, | |
).input_ids.to(self._execution_device) | |
outputs = self.text_encoder(input_ids, return_dict=True, output_hidden_states=True) | |
prompt_embeds = outputs.text_embeds | |
encoder_hidden_states = outputs.hidden_states[-2] | |
prompt_embeds = prompt_embeds.repeat(num_images_per_prompt, 1) | |
encoder_hidden_states = encoder_hidden_states.repeat(num_images_per_prompt, 1, 1) | |
if guidance_scale > 1.0: | |
if negative_prompt_embeds is None: | |
if negative_prompt is None: | |
negative_prompt = [""] * len(prompt) | |
if isinstance(negative_prompt, str): | |
negative_prompt = [negative_prompt] | |
input_ids = self.tokenizer( | |
negative_prompt, | |
return_tensors="pt", | |
padding="max_length", | |
truncation=True, | |
max_length=77, #self.tokenizer.model_max_length, | |
).input_ids.to(self._execution_device) | |
outputs = self.text_encoder(input_ids, return_dict=True, output_hidden_states=True) | |
negative_prompt_embeds = outputs.text_embeds | |
negative_encoder_hidden_states = outputs.hidden_states[-2] | |
negative_prompt_embeds = negative_prompt_embeds.repeat(num_images_per_prompt, 1) | |
negative_encoder_hidden_states = negative_encoder_hidden_states.repeat(num_images_per_prompt, 1, 1) | |
prompt_embeds = torch.concat([negative_prompt_embeds, prompt_embeds]) | |
encoder_hidden_states = torch.concat([negative_encoder_hidden_states, encoder_hidden_states]) | |
image = self.image_processor.preprocess(image) | |
height, width = image.shape[-2:] | |
# Note that the micro conditionings _do_ flip the order of width, height for the original size | |
# and the crop coordinates. This is how it was done in the original code base | |
micro_conds = torch.tensor( | |
[ | |
width, | |
height, | |
micro_conditioning_crop_coord[0], | |
micro_conditioning_crop_coord[1], | |
micro_conditioning_aesthetic_score, | |
], | |
device=self._execution_device, | |
dtype=encoder_hidden_states.dtype, | |
) | |
micro_conds = micro_conds.unsqueeze(0) | |
micro_conds = micro_conds.expand(2 * batch_size if guidance_scale > 1.0 else batch_size, -1) | |
self.scheduler.set_timesteps(num_inference_steps, temperature, self._execution_device) | |
num_inference_steps = int(len(self.scheduler.timesteps) * strength) | |
start_timestep_idx = len(self.scheduler.timesteps) - num_inference_steps | |
needs_upcasting = False # = self.vqvae.dtype == torch.float16 and self.vqvae.config.force_upcast | |
if needs_upcasting: | |
self.vqvae.float() | |
latents = self.vqvae.encode(image.to(dtype=self.vqvae.dtype, device=self._execution_device)).latents | |
latents_bsz, channels, latents_height, latents_width = latents.shape | |
latents = self.vqvae.quantize(latents)[2][2].reshape(latents_bsz, latents_height, latents_width) | |
latents = self.scheduler.add_noise( | |
latents, self.scheduler.timesteps[start_timestep_idx - 1], generator=generator | |
) | |
latents = latents.repeat(num_images_per_prompt, 1, 1) | |
with self.progress_bar(total=num_inference_steps) as progress_bar: | |
for i in range(start_timestep_idx, len(self.scheduler.timesteps)): | |
timestep = self.scheduler.timesteps[i] | |
if guidance_scale > 1.0: | |
model_input = torch.cat([latents] * 2) | |
else: | |
model_input = latents | |
if height == 1024: #args.resolution == 1024: | |
img_ids = _prepare_latent_image_ids(model_input.shape[0], model_input.shape[-2],model_input.shape[-1],model_input.device,model_input.dtype) | |
else: | |
img_ids = _prepare_latent_image_ids(model_input.shape[0],2*model_input.shape[-2],2*model_input.shape[-1],model_input.device,model_input.dtype) | |
txt_ids = torch.zeros(encoder_hidden_states.shape[1],3).to(device = encoder_hidden_states.device, dtype = encoder_hidden_states.dtype) | |
model_output = self.transformer( | |
model_input, | |
micro_conds=micro_conds, | |
pooled_projections=prompt_embeds, | |
encoder_hidden_states=encoder_hidden_states, | |
# cross_attention_kwargs=cross_attention_kwargs, | |
img_ids = img_ids, | |
txt_ids = txt_ids, | |
timestep = torch.tensor([timestep], device=model_input.device, dtype=torch.long), | |
) | |
if guidance_scale > 1.0: | |
uncond_logits, cond_logits = model_output.chunk(2) | |
model_output = uncond_logits + guidance_scale * (cond_logits - uncond_logits) | |
latents = self.scheduler.step( | |
model_output=model_output, | |
timestep=timestep, | |
sample=latents, | |
generator=generator, | |
).prev_sample | |
if i == len(self.scheduler.timesteps) - 1 or ((i + 1) % self.scheduler.order == 0): | |
progress_bar.update() | |
if callback is not None and i % callback_steps == 0: | |
step_idx = i // getattr(self.scheduler, "order", 1) | |
callback(step_idx, timestep, latents) | |
if output_type == "latent": | |
output = latents | |
else: | |
output = self.vqvae.decode( | |
latents, | |
force_not_quantize=True, | |
shape=( | |
batch_size, | |
height // self.vae_scale_factor, | |
width // self.vae_scale_factor, | |
self.vqvae.config.latent_channels, | |
), | |
).sample.clip(0, 1) | |
output = self.image_processor.postprocess(output, output_type) | |
if needs_upcasting: | |
self.vqvae.half() | |
self.maybe_free_model_hooks() | |
if not return_dict: | |
return (output,) | |
return ImagePipelineOutput(output) | |