Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""text-to-image.ipynb | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/1OcehPd4sJRgAE0kaYV9y8oTf0G0VElbU | |
""" | |
# Commented out IPython magic to ensure Python compatibility. | |
'''pip install -q "openvino>=2023.1.0" | |
pip install -q --extra-index-url https://download.pytorch.org/whl/cpu "diffusers[torch]>=0.9.0" | |
pip install -q "huggingface-hub>=0.9.1" | |
pip install -q gradio | |
pip install -q transformers | |
pip install kaleido cohere openai tiktoken | |
pip install typing-extensions==3.10.0.2 | |
pip install diffusers transformers''' | |
from diffusers import StableDiffusionPipeline | |
import gc | |
pipe = StableDiffusionPipeline.from_pretrained("prompthero/openjourney").to("cpu") | |
text_encoder = pipe.text_encoder | |
text_encoder.eval() | |
unet = pipe.unet | |
unet.eval() | |
vae = pipe.vae | |
vae.eval() | |
del pipe | |
gc.collect() | |
from pathlib import Path | |
import torch | |
import openvino as ov | |
TEXT_ENCODER_OV_PATH = Path("text_encoder.xml") | |
def cleanup_torchscript_cache(): | |
""" | |
Helper for removing cached model representation | |
""" | |
torch._C._jit_clear_class_registry() | |
torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore() | |
torch.jit._state._clear_class_state() | |
def convert_encoder(text_encoder: torch.nn.Module, ir_path:Path): | |
""" | |
Convert Text Encoder mode. | |
Function accepts text encoder model, and prepares example inputs for conversion, | |
Parameters: | |
text_encoder (torch.nn.Module): text_encoder model from Stable Diffusion pipeline | |
ir_path (Path): File for storing model | |
Returns: | |
None | |
""" | |
input_ids = torch.ones((1, 77), dtype=torch.long) | |
# switch model to inference mode | |
text_encoder.eval() | |
# disable gradients calculation for reducing memory consumption | |
with torch.no_grad(): | |
# Export model to IR format | |
ov_model = ov.convert_model(text_encoder, example_input=input_ids, input=[(1,77),]) | |
ov.save_model(ov_model, ir_path) | |
del ov_model | |
cleanup_torchscript_cache() | |
print(f'Text Encoder successfully converted to IR and saved to {ir_path}') | |
if not TEXT_ENCODER_OV_PATH.exists(): | |
convert_encoder(text_encoder, TEXT_ENCODER_OV_PATH) | |
else: | |
print(f"Text encoder will be loaded from {TEXT_ENCODER_OV_PATH}") | |
del text_encoder | |
gc.collect() | |
import numpy as np | |
UNET_OV_PATH = Path('unet.xml') | |
dtype_mapping = { | |
torch.float32: ov.Type.f32, | |
torch.float64: ov.Type.f64 | |
} | |
def convert_unet(unet:torch.nn.Module, ir_path:Path): | |
""" | |
Convert U-net model to IR format. | |
Function accepts unet model, prepares example inputs for conversion, | |
Parameters: | |
unet (StableDiffusionPipeline): unet from Stable Diffusion pipeline | |
ir_path (Path): File for storing model | |
Returns: | |
None | |
""" | |
# prepare inputs | |
encoder_hidden_state = torch.ones((2, 77, 768)) | |
latents_shape = (2, 4, 512 // 8, 512 // 8) | |
latents = torch.randn(latents_shape) | |
t = torch.from_numpy(np.array(1, dtype=float)) | |
dummy_inputs = (latents, t, encoder_hidden_state) | |
input_info = [] | |
for input_tensor in dummy_inputs: | |
shape = ov.PartialShape(tuple(input_tensor.shape)) | |
element_type = dtype_mapping[input_tensor.dtype] | |
input_info.append((shape, element_type)) | |
unet.eval() | |
with torch.no_grad(): | |
ov_model = ov.convert_model(unet, example_input=dummy_inputs, input=input_info) | |
ov.save_model(ov_model, ir_path) | |
del ov_model | |
cleanup_torchscript_cache() | |
print(f'Unet successfully converted to IR and saved to {ir_path}') | |
if not UNET_OV_PATH.exists(): | |
convert_unet(unet, UNET_OV_PATH) | |
gc.collect() | |
else: | |
print(f"Unet will be loaded from {UNET_OV_PATH}") | |
del unet | |
gc.collect() | |
VAE_ENCODER_OV_PATH = Path("vae_encoder.xml") | |
def convert_vae_encoder(vae: torch.nn.Module, ir_path: Path): | |
""" | |
Convert VAE model for encoding to IR format. | |
Function accepts vae model, creates wrapper class for export only necessary for inference part, | |
prepares example inputs for conversion, | |
Parameters: | |
vae (torch.nn.Module): VAE model from StableDiffusio pipeline | |
ir_path (Path): File for storing model | |
Returns: | |
None | |
""" | |
class VAEEncoderWrapper(torch.nn.Module): | |
def __init__(self, vae): | |
super().__init__() | |
self.vae = vae | |
def forward(self, image): | |
return self.vae.encode(x=image)["latent_dist"].sample() | |
vae_encoder = VAEEncoderWrapper(vae) | |
vae_encoder.eval() | |
image = torch.zeros((1, 3, 512, 512)) | |
with torch.no_grad(): | |
ov_model = ov.convert_model(vae_encoder, example_input=image, input=[((1,3,512,512),)]) | |
ov.save_model(ov_model, ir_path) | |
del ov_model | |
cleanup_torchscript_cache() | |
print(f'VAE encoder successfully converted to IR and saved to {ir_path}') | |
if not VAE_ENCODER_OV_PATH.exists(): | |
convert_vae_encoder(vae, VAE_ENCODER_OV_PATH) | |
else: | |
print(f"VAE encoder will be loaded from {VAE_ENCODER_OV_PATH}") | |
VAE_DECODER_OV_PATH = Path('vae_decoder.xml') | |
def convert_vae_decoder(vae: torch.nn.Module, ir_path: Path): | |
""" | |
Convert VAE model for decoding to IR format. | |
Function accepts vae model, creates wrapper class for export only necessary for inference part, | |
prepares example inputs for conversion, | |
Parameters: | |
vae (torch.nn.Module): VAE model frm StableDiffusion pipeline | |
ir_path (Path): File for storing model | |
Returns: | |
None | |
""" | |
class VAEDecoderWrapper(torch.nn.Module): | |
def __init__(self, vae): | |
super().__init__() | |
self.vae = vae | |
def forward(self, latents): | |
return self.vae.decode(latents) | |
vae_decoder = VAEDecoderWrapper(vae) | |
latents = torch.zeros((1, 4, 64, 64)) | |
vae_decoder.eval() | |
with torch.no_grad(): | |
ov_model = ov.convert_model(vae_decoder, example_input=latents, input=[((1,4,64,64),)]) | |
ov.save_model(ov_model, ir_path) | |
del ov_model | |
cleanup_torchscript_cache() | |
print(f'VAE decoder successfully converted to IR and saved to {ir_path}') | |
if not VAE_DECODER_OV_PATH.exists(): | |
convert_vae_decoder(vae, VAE_DECODER_OV_PATH) | |
else: | |
print(f"VAE decoder will be loaded from {VAE_DECODER_OV_PATH}") | |
del vae | |
gc.collect() | |
import inspect | |
from typing import List, Optional, Union, Dict | |
import PIL | |
import cv2 | |
from transformers import CLIPTokenizer | |
from diffusers.pipelines.pipeline_utils import DiffusionPipeline | |
from diffusers.schedulers import DDIMScheduler, LMSDiscreteScheduler, PNDMScheduler | |
from openvino.runtime import Model | |
def scale_fit_to_window(dst_width:int, dst_height:int, image_width:int, image_height:int): | |
""" | |
Preprocessing helper function for calculating image size for resize with peserving original aspect ratio | |
and fitting image to specific window size | |
Parameters: | |
dst_width (int): destination window width | |
dst_height (int): destination window height | |
image_width (int): source image width | |
image_height (int): source image height | |
Returns: | |
result_width (int): calculated width for resize | |
result_height (int): calculated height for resize | |
""" | |
im_scale = min(dst_height / image_height, dst_width / image_width) | |
return int(im_scale * image_width), int(im_scale * image_height) | |
def preprocess(image: PIL.Image.Image): | |
""" | |
Image preprocessing function. Takes image in PIL.Image format, resizes it to keep aspect ration and fits to model input window 512x512, | |
then converts it to np.ndarray and adds padding with zeros on right or bottom side of image (depends from aspect ratio), after that | |
converts data to float32 data type and change range of values from [0, 255] to [-1, 1], finally, converts data layout from planar NHWC to NCHW. | |
The function returns preprocessed input tensor and padding size, which can be used in postprocessing. | |
Parameters: | |
image (PIL.Image.Image): input image | |
Returns: | |
image (np.ndarray): preprocessed image tensor | |
meta (Dict): dictionary with preprocessing metadata info | |
""" | |
src_width, src_height = image.size | |
dst_width, dst_height = scale_fit_to_window( | |
512, 512, src_width, src_height) | |
image = np.array(image.resize((dst_width, dst_height), | |
resample=PIL.Image.Resampling.LANCZOS))[None, :] | |
pad_width = 512 - dst_width | |
pad_height = 512 - dst_height | |
pad = ((0, 0), (0, pad_height), (0, pad_width), (0, 0)) | |
image = np.pad(image, pad, mode="constant") | |
image = image.astype(np.float32) / 255.0 | |
image = 2.0 * image - 1.0 | |
image = image.transpose(0, 3, 1, 2) | |
return image, {"padding": pad, "src_width": src_width, "src_height": src_height} | |
class OVStableDiffusionPipeline(DiffusionPipeline): | |
def __init__( | |
self, | |
vae_decoder: Model, | |
text_encoder: Model, | |
tokenizer: CLIPTokenizer, | |
unet: Model, | |
scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler], | |
vae_encoder: Model = None, | |
): | |
""" | |
Pipeline for text-to-image generation using Stable Diffusion. | |
Parameters: | |
vae (Model): | |
Variational Auto-Encoder (VAE) Model to decode images to and from latent representations. | |
text_encoder (Model): | |
Frozen text-encoder. Stable Diffusion uses the text portion of | |
[CLIP](https://huggingface.co./docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically | |
the clip-vit-large-patch14(https://huggingface.co./openai/clip-vit-large-patch14) variant. | |
tokenizer (CLIPTokenizer): | |
Tokenizer of class CLIPTokenizer(https://huggingface.co./docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). | |
unet (Model): Conditional U-Net architecture to denoise the encoded image latents. | |
scheduler (SchedulerMixin): | |
A scheduler to be used in combination with unet to denoise the encoded image latents. Can be one of | |
DDIMScheduler, LMSDiscreteScheduler, or PNDMScheduler. | |
""" | |
super().__init__() | |
self.scheduler = scheduler | |
self.vae_decoder = vae_decoder | |
self.vae_encoder = vae_encoder | |
self.text_encoder = text_encoder | |
self.unet = unet | |
self._text_encoder_output = text_encoder.output(0) | |
self._unet_output = unet.output(0) | |
self._vae_d_output = vae_decoder.output(0) | |
self._vae_e_output = vae_encoder.output(0) if vae_encoder is not None else None | |
self.height = 512 | |
self.width = 512 | |
self.tokenizer = tokenizer | |
def __call__( | |
self, | |
prompt: Union[str, List[str]], | |
image: PIL.Image.Image = None, | |
num_inference_steps: Optional[int] = 50, | |
negative_prompt: Union[str, List[str]] = None, | |
guidance_scale: Optional[float] = 7.5, | |
eta: Optional[float] = 0.0, | |
output_type: Optional[str] = "pil", | |
seed: Optional[int] = None, | |
strength: float = 1.0, | |
gif: Optional[bool] = False, | |
**kwargs, | |
): | |
""" | |
Function invoked when calling the pipeline for generation. | |
Parameters: | |
prompt (str or List[str]): | |
The prompt or prompts to guide the image generation. | |
image (PIL.Image.Image, *optional*, None): | |
Intinal image for generation. | |
num_inference_steps (int, *optional*, defaults to 50): | |
The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
expense of slower inference. | |
negative_prompt (str or List[str]): | |
The negative prompt or prompts to guide the image generation. | |
guidance_scale (float, *optional*, defaults to 7.5): | |
Guidance scale as defined in Classifier-Free Diffusion Guidance(https://arxiv.org/abs/2207.12598). | |
guidance_scale is defined as `w` of equation 2. | |
Higher guidance scale encourages to generate images that are closely linked to the text prompt, | |
usually at the expense of lower image quality. | |
eta (float, *optional*, defaults to 0.0): | |
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to | |
[DDIMScheduler], will be ignored for others. | |
output_type (`str`, *optional*, defaults to "pil"): | |
The output format of the generate image. Choose between | |
[PIL](https://pillow.readthedocs.io/en/stable/): PIL.Image.Image or np.array. | |
seed (int, *optional*, None): | |
Seed for random generator state initialization. | |
gif (bool, *optional*, False): | |
Flag for storing all steps results or not. | |
Returns: | |
Dictionary with keys: | |
sample - the last generated image PIL.Image.Image or np.array | |
iterations - *optional* (if gif=True) images for all diffusion steps, List of PIL.Image.Image or np.array. | |
""" | |
if seed is not None: | |
np.random.seed(seed) | |
img_buffer = [] | |
do_classifier_free_guidance = guidance_scale > 1.0 | |
# get prompt text embeddings | |
text_embeddings = self._encode_prompt(prompt, do_classifier_free_guidance=do_classifier_free_guidance, negative_prompt=negative_prompt) | |
# set timesteps | |
accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys()) | |
extra_set_kwargs = {} | |
if accepts_offset: | |
extra_set_kwargs["offset"] = 1 | |
self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs) | |
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength) | |
latent_timestep = timesteps[:1] | |
# get the initial random noise unless the user supplied it | |
latents, meta = self.prepare_latents(image, latent_timestep) | |
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature | |
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers. | |
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502 | |
# and should be between [0, 1] | |
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) | |
extra_step_kwargs = {} | |
if accepts_eta: | |
extra_step_kwargs["eta"] = eta | |
for i, t in enumerate(self.progress_bar(timesteps)): | |
# expand the latents if you are doing classifier free guidance | |
latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents | |
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) | |
# predict the noise residual | |
noise_pred = self.unet([latent_model_input, t, text_embeddings])[self._unet_output] | |
# perform guidance | |
if do_classifier_free_guidance: | |
noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1] | |
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) | |
# compute the previous noisy sample x_t -> x_t-1 | |
latents = self.scheduler.step(torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs)["prev_sample"].numpy() | |
if gif: | |
image = self.vae_decoder(latents * (1 / 0.18215))[self._vae_d_output] | |
image = self.postprocess_image(image, meta, output_type) | |
img_buffer.extend(image) | |
# scale and decode the image latents with vae | |
image = self.vae_decoder(latents * (1 / 0.18215))[self._vae_d_output] | |
image = self.postprocess_image(image, meta, output_type) | |
return {"sample": image, 'iterations': img_buffer} | |
def _encode_prompt(self, prompt:Union[str, List[str]], num_images_per_prompt:int = 1, do_classifier_free_guidance:bool = True, negative_prompt:Union[str, List[str]] = None): | |
""" | |
Encodes the prompt into text encoder hidden states. | |
Parameters: | |
prompt (str or list(str)): prompt to be encoded | |
num_images_per_prompt (int): number of images that should be generated per prompt | |
do_classifier_free_guidance (bool): whether to use classifier free guidance or not | |
negative_prompt (str or list(str)): negative prompt to be encoded | |
Returns: | |
text_embeddings (np.ndarray): text encoder hidden states | |
""" | |
batch_size = len(prompt) if isinstance(prompt, list) else 1 | |
# tokenize input prompts | |
text_inputs = self.tokenizer( | |
prompt, | |
padding="max_length", | |
max_length=self.tokenizer.model_max_length, | |
truncation=True, | |
return_tensors="np", | |
) | |
text_input_ids = text_inputs.input_ids | |
text_embeddings = self.text_encoder( | |
text_input_ids)[self._text_encoder_output] | |
# duplicate text embeddings for each generation per prompt | |
if num_images_per_prompt != 1: | |
bs_embed, seq_len, _ = text_embeddings.shape | |
text_embeddings = np.tile( | |
text_embeddings, (1, num_images_per_prompt, 1)) | |
text_embeddings = np.reshape( | |
text_embeddings, (bs_embed * num_images_per_prompt, seq_len, -1)) | |
# get unconditional embeddings for classifier free guidance | |
if do_classifier_free_guidance: | |
uncond_tokens: List[str] | |
max_length = text_input_ids.shape[-1] | |
if negative_prompt is None: | |
uncond_tokens = [""] * batch_size | |
elif isinstance(negative_prompt, str): | |
uncond_tokens = [negative_prompt] | |
else: | |
uncond_tokens = negative_prompt | |
uncond_input = self.tokenizer( | |
uncond_tokens, | |
padding="max_length", | |
max_length=max_length, | |
truncation=True, | |
return_tensors="np", | |
) | |
uncond_embeddings = self.text_encoder(uncond_input.input_ids)[self._text_encoder_output] | |
# duplicate unconditional embeddings for each generation per prompt, using mps friendly method | |
seq_len = uncond_embeddings.shape[1] | |
uncond_embeddings = np.tile(uncond_embeddings, (1, num_images_per_prompt, 1)) | |
uncond_embeddings = np.reshape(uncond_embeddings, (batch_size * num_images_per_prompt, seq_len, -1)) | |
# For classifier free guidance, we need to do two forward passes. | |
# Here we concatenate the unconditional and text embeddings into a single batch | |
# to avoid doing two forward passes | |
text_embeddings = np.concatenate([uncond_embeddings, text_embeddings]) | |
return text_embeddings | |
def prepare_latents(self, image:PIL.Image.Image = None, latent_timestep:torch.Tensor = None): | |
""" | |
Function for getting initial latents for starting generation | |
Parameters: | |
image (PIL.Image.Image, *optional*, None): | |
Input image for generation, if not provided randon noise will be used as starting point | |
latent_timestep (torch.Tensor, *optional*, None): | |
Predicted by scheduler initial step for image generation, required for latent image mixing with nosie | |
Returns: | |
latents (np.ndarray): | |
Image encoded in latent space | |
""" | |
latents_shape = (1, 4, self.height // 8, self.width // 8) | |
noise = np.random.randn(*latents_shape).astype(np.float32) | |
if image is None: | |
# if you use LMSDiscreteScheduler, let's make sure latents are multiplied by sigmas | |
if isinstance(self.scheduler, LMSDiscreteScheduler): | |
noise = noise * self.scheduler.sigmas[0].numpy() | |
return noise, {} | |
input_image, meta = preprocess(image) | |
latents = self.vae_encoder(input_image)[self._vae_e_output] * 0.18215 | |
latents = self.scheduler.add_noise(torch.from_numpy(latents), torch.from_numpy(noise), latent_timestep).numpy() | |
return latents, meta | |
def postprocess_image(self, image:np.ndarray, meta:Dict, output_type:str = "pil"): | |
""" | |
Postprocessing for decoded image. Takes generated image decoded by VAE decoder, unpad it to initila image size (if required), | |
normalize and convert to [0, 255] pixels range. Optionally, convertes it from np.ndarray to PIL.Image format | |
Parameters: | |
image (np.ndarray): | |
Generated image | |
meta (Dict): | |
Metadata obtained on latents preparing step, can be empty | |
output_type (str, *optional*, pil): | |
Output format for result, can be pil or numpy | |
Returns: | |
image (List of np.ndarray or PIL.Image.Image): | |
Postprocessed images | |
""" | |
if "padding" in meta: | |
pad = meta["padding"] | |
(_, end_h), (_, end_w) = pad[1:3] | |
h, w = image.shape[2:] | |
unpad_h = h - end_h | |
unpad_w = w - end_w | |
image = image[:, :, :unpad_h, :unpad_w] | |
image = np.clip(image / 2 + 0.5, 0, 1) | |
image = np.transpose(image, (0, 2, 3, 1)) | |
# 9. Convert to PIL | |
if output_type == "pil": | |
image = self.numpy_to_pil(image) | |
if "src_height" in meta: | |
orig_height, orig_width = meta["src_height"], meta["src_width"] | |
image = [img.resize((orig_width, orig_height), | |
PIL.Image.Resampling.LANCZOS) for img in image] | |
else: | |
if "src_height" in meta: | |
orig_height, orig_width = meta["src_height"], meta["src_width"] | |
image = [cv2.resize(img, (orig_width, orig_width)) | |
for img in image] | |
return image | |
def get_timesteps(self, num_inference_steps:int, strength:float): | |
""" | |
Helper function for getting scheduler timesteps for generation | |
In case of image-to-image generation, it updates number of steps according to strength | |
Parameters: | |
num_inference_steps (int): | |
number of inference steps for generation | |
strength (float): | |
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image. | |
Values that approach 1.0 enable lots of variations but will also produce images that are not semantically consistent with the input. | |
""" | |
# get the original timestep using init_timestep | |
init_timestep = min(int(num_inference_steps * strength), num_inference_steps) | |
t_start = max(num_inference_steps - init_timestep, 0) | |
timesteps = self.scheduler.timesteps[t_start:] | |
return timesteps, num_inference_steps - t_start | |
core = ov.Core() | |
"""Select device from dropdown list for running inference using OpenVINO.""" | |
import ipywidgets as widgets | |
device = widgets.Dropdown( | |
options=core.available_devices + ["AUTO"], | |
value='CPU', | |
description='Device:', | |
disabled=False, | |
) | |
device | |
text_enc = core.compile_model(TEXT_ENCODER_OV_PATH, device.value) | |
unet_model = core.compile_model(UNET_OV_PATH, device.value) | |
ov_config = {"INFERENCE_PRECISION_HINT": "f32"} if device.value != "CPU" else {} | |
vae_decoder = core.compile_model(VAE_DECODER_OV_PATH, device.value, ov_config) | |
vae_encoder = core.compile_model(VAE_ENCODER_OV_PATH, device.value, ov_config) | |
"""Model tokenizer and scheduler are also important parts of the pipeline. Let us define them and put all components together""" | |
from transformers import CLIPTokenizer | |
from diffusers.schedulers import LMSDiscreteScheduler | |
lms = LMSDiscreteScheduler( | |
beta_start=0.00085, | |
beta_end=0.012, | |
beta_schedule="scaled_linear" | |
) | |
tokenizer = CLIPTokenizer.from_pretrained('openai/clip-vit-large-patch14') | |
ov_pipe = OVStableDiffusionPipeline( | |
tokenizer=tokenizer, | |
text_encoder=text_enc, | |
unet=unet_model, | |
vae_encoder=vae_encoder, | |
vae_decoder=vae_decoder, | |
scheduler=lms | |
) |