Spaces:
Runtime error
Runtime error
#! fork: https://github.com/NVIDIA/TensorRT/blob/main/demo/Diffusion/models.py | |
# | |
# SPDX-FileCopyrightText: Copyright (c) 1993-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
import onnx_graphsurgeon as gs | |
import torch | |
from onnx import shape_inference | |
from polygraphy.backend.onnx.loader import fold_constants | |
class Optimizer: | |
def __init__(self, onnx_graph, verbose=False): | |
self.graph = gs.import_onnx(onnx_graph) | |
self.verbose = verbose | |
def info(self, prefix): | |
if self.verbose: | |
print( | |
f"{prefix} .. {len(self.graph.nodes)} nodes, {len(self.graph.tensors().keys())} tensors, {len(self.graph.inputs)} inputs, {len(self.graph.outputs)} outputs" | |
) | |
def cleanup(self, return_onnx=False): | |
self.graph.cleanup().toposort() | |
if return_onnx: | |
return gs.export_onnx(self.graph) | |
def select_outputs(self, keep, names=None): | |
self.graph.outputs = [self.graph.outputs[o] for o in keep] | |
if names: | |
for i, name in enumerate(names): | |
self.graph.outputs[i].name = name | |
def fold_constants(self, return_onnx=False): | |
onnx_graph = fold_constants(gs.export_onnx(self.graph), allow_onnxruntime_shape_inference=True) | |
self.graph = gs.import_onnx(onnx_graph) | |
if return_onnx: | |
return onnx_graph | |
def infer_shapes(self, return_onnx=False): | |
onnx_graph = gs.export_onnx(self.graph) | |
if onnx_graph.ByteSize() > 2147483648: | |
raise TypeError("ERROR: model size exceeds supported 2GB limit") | |
else: | |
onnx_graph = shape_inference.infer_shapes(onnx_graph) | |
self.graph = gs.import_onnx(onnx_graph) | |
if return_onnx: | |
return onnx_graph | |
class BaseModel: | |
def __init__( | |
self, | |
fp16=False, | |
device="cuda", | |
verbose=True, | |
max_batch_size=16, | |
min_batch_size=1, | |
embedding_dim=768, | |
text_maxlen=77, | |
): | |
self.name = "SD Model" | |
self.fp16 = fp16 | |
self.device = device | |
self.verbose = verbose | |
self.min_batch = min_batch_size | |
self.max_batch = max_batch_size | |
self.min_image_shape = 256 # min image resolution: 256x256 | |
self.max_image_shape = 1024 # max image resolution: 1024x1024 | |
self.min_latent_shape = self.min_image_shape // 8 | |
self.max_latent_shape = self.max_image_shape // 8 | |
self.embedding_dim = embedding_dim | |
self.text_maxlen = text_maxlen | |
def get_model(self): | |
pass | |
def get_input_names(self): | |
pass | |
def get_output_names(self): | |
pass | |
def get_dynamic_axes(self): | |
return None | |
def get_sample_input(self, batch_size, image_height, image_width): | |
pass | |
def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): | |
return None | |
def get_shape_dict(self, batch_size, image_height, image_width): | |
return None | |
def optimize(self, onnx_graph): | |
opt = Optimizer(onnx_graph, verbose=self.verbose) | |
opt.info(self.name + ": original") | |
opt.cleanup() | |
opt.info(self.name + ": cleanup") | |
opt.fold_constants() | |
opt.info(self.name + ": fold constants") | |
opt.infer_shapes() | |
opt.info(self.name + ": shape inference") | |
onnx_opt_graph = opt.cleanup(return_onnx=True) | |
opt.info(self.name + ": finished") | |
return onnx_opt_graph | |
def check_dims(self, batch_size, image_height, image_width): | |
assert batch_size >= self.min_batch and batch_size <= self.max_batch | |
assert image_height % 8 == 0 or image_width % 8 == 0 | |
latent_height = image_height // 8 | |
latent_width = image_width // 8 | |
assert latent_height >= self.min_latent_shape and latent_height <= self.max_latent_shape | |
assert latent_width >= self.min_latent_shape and latent_width <= self.max_latent_shape | |
return (latent_height, latent_width) | |
def get_minmax_dims(self, batch_size, image_height, image_width, static_batch, static_shape): | |
min_batch = batch_size if static_batch else self.min_batch | |
max_batch = batch_size if static_batch else self.max_batch | |
latent_height = image_height // 8 | |
latent_width = image_width // 8 | |
min_image_height = image_height if static_shape else self.min_image_shape | |
max_image_height = image_height if static_shape else self.max_image_shape | |
min_image_width = image_width if static_shape else self.min_image_shape | |
max_image_width = image_width if static_shape else self.max_image_shape | |
min_latent_height = latent_height if static_shape else self.min_latent_shape | |
max_latent_height = latent_height if static_shape else self.max_latent_shape | |
min_latent_width = latent_width if static_shape else self.min_latent_shape | |
max_latent_width = latent_width if static_shape else self.max_latent_shape | |
return ( | |
min_batch, | |
max_batch, | |
min_image_height, | |
max_image_height, | |
min_image_width, | |
max_image_width, | |
min_latent_height, | |
max_latent_height, | |
min_latent_width, | |
max_latent_width, | |
) | |
class CLIP(BaseModel): | |
def __init__(self, device, max_batch_size, embedding_dim, min_batch_size=1): | |
super(CLIP, self).__init__( | |
device=device, | |
max_batch_size=max_batch_size, | |
min_batch_size=min_batch_size, | |
embedding_dim=embedding_dim, | |
) | |
self.name = "CLIP" | |
def get_input_names(self): | |
return ["input_ids"] | |
def get_output_names(self): | |
return ["text_embeddings", "pooler_output"] | |
def get_dynamic_axes(self): | |
return {"input_ids": {0: "B"}, "text_embeddings": {0: "B"}} | |
def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): | |
self.check_dims(batch_size, image_height, image_width) | |
min_batch, max_batch, _, _, _, _, _, _, _, _ = self.get_minmax_dims( | |
batch_size, image_height, image_width, static_batch, static_shape | |
) | |
return { | |
"input_ids": [ | |
(min_batch, self.text_maxlen), | |
(batch_size, self.text_maxlen), | |
(max_batch, self.text_maxlen), | |
] | |
} | |
def get_shape_dict(self, batch_size, image_height, image_width): | |
self.check_dims(batch_size, image_height, image_width) | |
return { | |
"input_ids": (batch_size, self.text_maxlen), | |
"text_embeddings": (batch_size, self.text_maxlen, self.embedding_dim), | |
} | |
def get_sample_input(self, batch_size, image_height, image_width): | |
self.check_dims(batch_size, image_height, image_width) | |
return torch.zeros(batch_size, self.text_maxlen, dtype=torch.int32, device=self.device) | |
def optimize(self, onnx_graph): | |
opt = Optimizer(onnx_graph) | |
opt.info(self.name + ": original") | |
opt.select_outputs([0]) # delete graph output#1 | |
opt.cleanup() | |
opt.info(self.name + ": remove output[1]") | |
opt.fold_constants() | |
opt.info(self.name + ": fold constants") | |
opt.infer_shapes() | |
opt.info(self.name + ": shape inference") | |
opt.select_outputs([0], names=["text_embeddings"]) # rename network output | |
opt.info(self.name + ": remove output[0]") | |
opt_onnx_graph = opt.cleanup(return_onnx=True) | |
opt.info(self.name + ": finished") | |
return opt_onnx_graph | |
class UNet(BaseModel): | |
def __init__( | |
self, | |
fp16=False, | |
device="cuda", | |
max_batch_size=16, | |
min_batch_size=1, | |
embedding_dim=768, | |
text_maxlen=77, | |
unet_dim=4, | |
): | |
super(UNet, self).__init__( | |
fp16=fp16, | |
device=device, | |
max_batch_size=max_batch_size, | |
min_batch_size=min_batch_size, | |
embedding_dim=embedding_dim, | |
text_maxlen=text_maxlen, | |
) | |
self.unet_dim = unet_dim | |
self.name = "UNet" | |
def get_input_names(self): | |
return ["sample", "timestep", "encoder_hidden_states"] | |
def get_output_names(self): | |
return ["latent"] | |
def get_dynamic_axes(self): | |
return { | |
"sample": {0: "2B", 2: "H", 3: "W"}, | |
"timestep": {0: "2B"}, | |
"encoder_hidden_states": {0: "2B"}, | |
"latent": {0: "2B", 2: "H", 3: "W"}, | |
} | |
def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): | |
latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) | |
( | |
min_batch, | |
max_batch, | |
_, | |
_, | |
_, | |
_, | |
min_latent_height, | |
max_latent_height, | |
min_latent_width, | |
max_latent_width, | |
) = self.get_minmax_dims(batch_size, image_height, image_width, static_batch, static_shape) | |
return { | |
"sample": [ | |
(min_batch, self.unet_dim, min_latent_height, min_latent_width), | |
(batch_size, self.unet_dim, latent_height, latent_width), | |
(max_batch, self.unet_dim, max_latent_height, max_latent_width), | |
], | |
"timestep": [(min_batch,), (batch_size,), (max_batch,)], | |
"encoder_hidden_states": [ | |
(min_batch, self.text_maxlen, self.embedding_dim), | |
(batch_size, self.text_maxlen, self.embedding_dim), | |
(max_batch, self.text_maxlen, self.embedding_dim), | |
], | |
} | |
def get_shape_dict(self, batch_size, image_height, image_width): | |
latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) | |
return { | |
"sample": (2 * batch_size, self.unet_dim, latent_height, latent_width), | |
"timestep": (2 * batch_size,), | |
"encoder_hidden_states": (2 * batch_size, self.text_maxlen, self.embedding_dim), | |
"latent": (2 * batch_size, 4, latent_height, latent_width), | |
} | |
def get_sample_input(self, batch_size, image_height, image_width): | |
latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) | |
dtype = torch.float16 if self.fp16 else torch.float32 | |
return ( | |
torch.randn( | |
2 * batch_size, self.unet_dim, latent_height, latent_width, dtype=torch.float32, device=self.device | |
), | |
torch.ones((2 * batch_size,), dtype=torch.float32, device=self.device), | |
torch.randn(2 * batch_size, self.text_maxlen, self.embedding_dim, dtype=dtype, device=self.device), | |
) | |
class VAE(BaseModel): | |
def __init__(self, device, max_batch_size, min_batch_size=1): | |
super(VAE, self).__init__( | |
device=device, | |
max_batch_size=max_batch_size, | |
min_batch_size=min_batch_size, | |
embedding_dim=None, | |
) | |
self.name = "VAE decoder" | |
def get_input_names(self): | |
return ["latent"] | |
def get_output_names(self): | |
return ["images"] | |
def get_dynamic_axes(self): | |
return { | |
"latent": {0: "B", 2: "H", 3: "W"}, | |
"images": {0: "B", 2: "8H", 3: "8W"}, | |
} | |
def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): | |
latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) | |
( | |
min_batch, | |
max_batch, | |
_, | |
_, | |
_, | |
_, | |
min_latent_height, | |
max_latent_height, | |
min_latent_width, | |
max_latent_width, | |
) = self.get_minmax_dims(batch_size, image_height, image_width, static_batch, static_shape) | |
return { | |
"latent": [ | |
(min_batch, 4, min_latent_height, min_latent_width), | |
(batch_size, 4, latent_height, latent_width), | |
(max_batch, 4, max_latent_height, max_latent_width), | |
] | |
} | |
def get_shape_dict(self, batch_size, image_height, image_width): | |
latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) | |
return { | |
"latent": (batch_size, 4, latent_height, latent_width), | |
"images": (batch_size, 3, image_height, image_width), | |
} | |
def get_sample_input(self, batch_size, image_height, image_width): | |
latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) | |
return torch.randn( | |
batch_size, | |
4, | |
latent_height, | |
latent_width, | |
dtype=torch.float32, | |
device=self.device, | |
) | |
class VAEEncoder(BaseModel): | |
def __init__(self, device, max_batch_size, min_batch_size=1): | |
super(VAEEncoder, self).__init__( | |
device=device, | |
max_batch_size=max_batch_size, | |
min_batch_size=min_batch_size, | |
embedding_dim=None, | |
) | |
self.name = "VAE encoder" | |
def get_input_names(self): | |
return ["images"] | |
def get_output_names(self): | |
return ["latent"] | |
def get_dynamic_axes(self): | |
return { | |
"images": {0: "B", 2: "8H", 3: "8W"}, | |
"latent": {0: "B", 2: "H", 3: "W"}, | |
} | |
def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): | |
assert batch_size >= self.min_batch and batch_size <= self.max_batch | |
min_batch = batch_size if static_batch else self.min_batch | |
max_batch = batch_size if static_batch else self.max_batch | |
self.check_dims(batch_size, image_height, image_width) | |
( | |
min_batch, | |
max_batch, | |
min_image_height, | |
max_image_height, | |
min_image_width, | |
max_image_width, | |
_, | |
_, | |
_, | |
_, | |
) = self.get_minmax_dims(batch_size, image_height, image_width, static_batch, static_shape) | |
return { | |
"images": [ | |
(min_batch, 3, min_image_height, min_image_width), | |
(batch_size, 3, image_height, image_width), | |
(max_batch, 3, max_image_height, max_image_width), | |
], | |
} | |
def get_shape_dict(self, batch_size, image_height, image_width): | |
latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) | |
return { | |
"images": (batch_size, 3, image_height, image_width), | |
"latent": (batch_size, 4, latent_height, latent_width), | |
} | |
def get_sample_input(self, batch_size, image_height, image_width): | |
self.check_dims(batch_size, image_height, image_width) | |
return torch.randn( | |
batch_size, | |
3, | |
image_height, | |
image_width, | |
dtype=torch.float32, | |
device=self.device, | |
) | |