|
from dataclasses import dataclass
|
|
from typing import Optional, Tuple
|
|
from copy import deepcopy
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
from transformers import CLIPTextModel, CLIPTokenizer, AutoTokenizer, AutoModel
|
|
from transformers.utils import ModelOutput
|
|
|
|
from ..constants import TEXT_ENCODER_PATH, TOKENIZER_PATH
|
|
from ..constants import PRECISION_TO_TYPE
|
|
|
|
|
|
def use_default(value, default):
|
|
return value if value is not None else default
|
|
|
|
|
|
def load_text_encoder(
|
|
text_encoder_type,
|
|
text_encoder_precision=None,
|
|
text_encoder_path=None,
|
|
logger=None,
|
|
device=None,
|
|
):
|
|
if text_encoder_path is None:
|
|
text_encoder_path = TEXT_ENCODER_PATH[text_encoder_type]
|
|
if logger is not None:
|
|
logger.info(
|
|
f"Loading text encoder model ({text_encoder_type}) from: {text_encoder_path}"
|
|
)
|
|
|
|
if text_encoder_type == "clipL":
|
|
text_encoder = CLIPTextModel.from_pretrained(text_encoder_path)
|
|
text_encoder.final_layer_norm = text_encoder.text_model.final_layer_norm
|
|
elif text_encoder_type == "llm":
|
|
text_encoder = AutoModel.from_pretrained(
|
|
text_encoder_path, low_cpu_mem_usage=True
|
|
)
|
|
text_encoder.final_layer_norm = text_encoder.norm
|
|
else:
|
|
raise ValueError(f"Unsupported text encoder type: {text_encoder_type}")
|
|
|
|
|
|
if text_encoder_precision is not None:
|
|
text_encoder = text_encoder.to(dtype=PRECISION_TO_TYPE[text_encoder_precision])
|
|
|
|
text_encoder.requires_grad_(False)
|
|
|
|
if logger is not None:
|
|
logger.info(f"Text encoder to dtype: {text_encoder.dtype}")
|
|
|
|
if device is not None:
|
|
text_encoder = text_encoder.to(device)
|
|
|
|
return text_encoder, text_encoder_path
|
|
|
|
|
|
def load_tokenizer(
|
|
tokenizer_type, tokenizer_path=None, padding_side="right", logger=None
|
|
):
|
|
if tokenizer_path is None:
|
|
tokenizer_path = TOKENIZER_PATH[tokenizer_type]
|
|
if logger is not None:
|
|
logger.info(f"Loading tokenizer ({tokenizer_type}) from: {tokenizer_path}")
|
|
|
|
if tokenizer_type == "clipL":
|
|
tokenizer = CLIPTokenizer.from_pretrained(tokenizer_path, max_length=77)
|
|
elif tokenizer_type == "llm":
|
|
tokenizer = AutoTokenizer.from_pretrained(
|
|
tokenizer_path, padding_side=padding_side
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported tokenizer type: {tokenizer_type}")
|
|
|
|
return tokenizer, tokenizer_path
|
|
|
|
|
|
@dataclass
|
|
class TextEncoderModelOutput(ModelOutput):
|
|
"""
|
|
Base class for model's outputs that also contains a pooling of the last hidden states.
|
|
|
|
Args:
|
|
hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
|
|
Sequence of hidden-states at the output of the last layer of the model.
|
|
attention_mask (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
|
|
Mask to avoid performing attention on padding token indices. Mask values selected in ``[0, 1]``:
|
|
hidden_states_list (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed):
|
|
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
|
|
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
|
|
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
|
|
text_outputs (`list`, *optional*, returned when `return_texts=True` is passed):
|
|
List of decoded texts.
|
|
"""
|
|
|
|
hidden_state: torch.FloatTensor = None
|
|
attention_mask: Optional[torch.LongTensor] = None
|
|
hidden_states_list: Optional[Tuple[torch.FloatTensor, ...]] = None
|
|
text_outputs: Optional[list] = None
|
|
|
|
|
|
class TextEncoder(nn.Module):
|
|
def __init__(
|
|
self,
|
|
text_encoder_type: str,
|
|
max_length: int,
|
|
text_encoder_precision: Optional[str] = None,
|
|
text_encoder_path: Optional[str] = None,
|
|
tokenizer_type: Optional[str] = None,
|
|
tokenizer_path: Optional[str] = None,
|
|
output_key: Optional[str] = None,
|
|
use_attention_mask: bool = True,
|
|
input_max_length: Optional[int] = None,
|
|
prompt_template: Optional[dict] = None,
|
|
prompt_template_video: Optional[dict] = None,
|
|
hidden_state_skip_layer: Optional[int] = None,
|
|
apply_final_norm: bool = False,
|
|
reproduce: bool = False,
|
|
logger=None,
|
|
device=None,
|
|
):
|
|
super().__init__()
|
|
self.text_encoder_type = text_encoder_type
|
|
self.max_length = max_length
|
|
self.precision = text_encoder_precision
|
|
self.model_path = text_encoder_path
|
|
self.tokenizer_type = (
|
|
tokenizer_type if tokenizer_type is not None else text_encoder_type
|
|
)
|
|
self.tokenizer_path = (
|
|
tokenizer_path if tokenizer_path is not None else text_encoder_path
|
|
)
|
|
self.use_attention_mask = use_attention_mask
|
|
if prompt_template_video is not None:
|
|
assert (
|
|
use_attention_mask is True
|
|
), "Attention mask is True required when training videos."
|
|
self.input_max_length = (
|
|
input_max_length if input_max_length is not None else max_length
|
|
)
|
|
self.prompt_template = prompt_template
|
|
self.prompt_template_video = prompt_template_video
|
|
self.hidden_state_skip_layer = hidden_state_skip_layer
|
|
self.apply_final_norm = apply_final_norm
|
|
self.reproduce = reproduce
|
|
self.logger = logger
|
|
|
|
self.use_template = self.prompt_template is not None
|
|
if self.use_template:
|
|
assert (
|
|
isinstance(self.prompt_template, dict)
|
|
and "template" in self.prompt_template
|
|
), f"`prompt_template` must be a dictionary with a key 'template', got {self.prompt_template}"
|
|
assert "{}" in str(self.prompt_template["template"]), (
|
|
"`prompt_template['template']` must contain a placeholder `{}` for the input text, "
|
|
f"got {self.prompt_template['template']}"
|
|
)
|
|
|
|
self.use_video_template = self.prompt_template_video is not None
|
|
if self.use_video_template:
|
|
if self.prompt_template_video is not None:
|
|
assert (
|
|
isinstance(self.prompt_template_video, dict)
|
|
and "template" in self.prompt_template_video
|
|
), f"`prompt_template_video` must be a dictionary with a key 'template', got {self.prompt_template_video}"
|
|
assert "{}" in str(self.prompt_template_video["template"]), (
|
|
"`prompt_template_video['template']` must contain a placeholder `{}` for the input text, "
|
|
f"got {self.prompt_template_video['template']}"
|
|
)
|
|
|
|
if "t5" in text_encoder_type:
|
|
self.output_key = output_key or "last_hidden_state"
|
|
elif "clip" in text_encoder_type:
|
|
self.output_key = output_key or "pooler_output"
|
|
elif "llm" in text_encoder_type or "glm" in text_encoder_type:
|
|
self.output_key = output_key or "last_hidden_state"
|
|
else:
|
|
raise ValueError(f"Unsupported text encoder type: {text_encoder_type}")
|
|
|
|
self.model, self.model_path = load_text_encoder(
|
|
text_encoder_type=self.text_encoder_type,
|
|
text_encoder_precision=self.precision,
|
|
text_encoder_path=self.model_path,
|
|
logger=self.logger,
|
|
device=device,
|
|
)
|
|
self.dtype = self.model.dtype
|
|
self.device = self.model.device
|
|
|
|
self.tokenizer, self.tokenizer_path = load_tokenizer(
|
|
tokenizer_type=self.tokenizer_type,
|
|
tokenizer_path=self.tokenizer_path,
|
|
padding_side="right",
|
|
logger=self.logger,
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"{self.text_encoder_type} ({self.precision} - {self.model_path})"
|
|
|
|
@staticmethod
|
|
def apply_text_to_template(text, template, prevent_empty_text=True):
|
|
"""
|
|
Apply text to template.
|
|
|
|
Args:
|
|
text (str): Input text.
|
|
template (str or list): Template string or list of chat conversation.
|
|
prevent_empty_text (bool): If Ture, we will prevent the user text from being empty
|
|
by adding a space. Defaults to True.
|
|
"""
|
|
if isinstance(template, str):
|
|
|
|
return template.format(text)
|
|
else:
|
|
raise TypeError(f"Unsupported template type: {type(template)}")
|
|
|
|
def text2tokens(self, text, data_type="image"):
|
|
"""
|
|
Tokenize the input text.
|
|
|
|
Args:
|
|
text (str or list): Input text.
|
|
"""
|
|
tokenize_input_type = "str"
|
|
if self.use_template:
|
|
if data_type == "image":
|
|
prompt_template = self.prompt_template["template"]
|
|
elif data_type == "video":
|
|
prompt_template = self.prompt_template_video["template"]
|
|
else:
|
|
raise ValueError(f"Unsupported data type: {data_type}")
|
|
if isinstance(text, (list, tuple)):
|
|
text = [
|
|
self.apply_text_to_template(one_text, prompt_template)
|
|
for one_text in text
|
|
]
|
|
if isinstance(text[0], list):
|
|
tokenize_input_type = "list"
|
|
elif isinstance(text, str):
|
|
text = self.apply_text_to_template(text, prompt_template)
|
|
if isinstance(text, list):
|
|
tokenize_input_type = "list"
|
|
else:
|
|
raise TypeError(f"Unsupported text type: {type(text)}")
|
|
|
|
kwargs = dict(
|
|
truncation=True,
|
|
max_length=self.max_length,
|
|
padding="max_length",
|
|
return_tensors="pt",
|
|
)
|
|
if tokenize_input_type == "str":
|
|
return self.tokenizer(
|
|
text,
|
|
return_length=False,
|
|
return_overflowing_tokens=False,
|
|
return_attention_mask=True,
|
|
**kwargs,
|
|
)
|
|
elif tokenize_input_type == "list":
|
|
return self.tokenizer.apply_chat_template(
|
|
text,
|
|
add_generation_prompt=True,
|
|
tokenize=True,
|
|
return_dict=True,
|
|
**kwargs,
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported tokenize_input_type: {tokenize_input_type}")
|
|
|
|
def encode(
|
|
self,
|
|
batch_encoding,
|
|
use_attention_mask=None,
|
|
output_hidden_states=False,
|
|
do_sample=None,
|
|
hidden_state_skip_layer=None,
|
|
return_texts=False,
|
|
data_type="image",
|
|
device=None,
|
|
):
|
|
"""
|
|
Args:
|
|
batch_encoding (dict): Batch encoding from tokenizer.
|
|
use_attention_mask (bool): Whether to use attention mask. If None, use self.use_attention_mask.
|
|
Defaults to None.
|
|
output_hidden_states (bool): Whether to output hidden states. If False, return the value of
|
|
self.output_key. If True, return the entire output. If set self.hidden_state_skip_layer,
|
|
output_hidden_states will be set True. Defaults to False.
|
|
do_sample (bool): Whether to sample from the model. Used for Decoder-Only LLMs. Defaults to None.
|
|
When self.produce is False, do_sample is set to True by default.
|
|
hidden_state_skip_layer (int): Number of hidden states to hidden_state_skip_layer. 0 means the last layer.
|
|
If None, self.output_key will be used. Defaults to None.
|
|
return_texts (bool): Whether to return the decoded texts. Defaults to False.
|
|
"""
|
|
device = self.model.device if device is None else device
|
|
use_attention_mask = use_default(use_attention_mask, self.use_attention_mask)
|
|
hidden_state_skip_layer = use_default(
|
|
hidden_state_skip_layer, self.hidden_state_skip_layer
|
|
)
|
|
do_sample = use_default(do_sample, not self.reproduce)
|
|
attention_mask = (
|
|
batch_encoding["attention_mask"].to(device) if use_attention_mask else None
|
|
)
|
|
outputs = self.model(
|
|
input_ids=batch_encoding["input_ids"].to(device),
|
|
attention_mask=attention_mask,
|
|
output_hidden_states=output_hidden_states
|
|
or hidden_state_skip_layer is not None,
|
|
)
|
|
if hidden_state_skip_layer is not None:
|
|
last_hidden_state = outputs.hidden_states[-(hidden_state_skip_layer + 1)]
|
|
|
|
|
|
if hidden_state_skip_layer > 0 and self.apply_final_norm:
|
|
last_hidden_state = self.model.final_layer_norm(last_hidden_state)
|
|
else:
|
|
last_hidden_state = outputs[self.output_key]
|
|
|
|
|
|
if self.use_template:
|
|
if data_type == "image":
|
|
crop_start = self.prompt_template.get("crop_start", -1)
|
|
elif data_type == "video":
|
|
crop_start = self.prompt_template_video.get("crop_start", -1)
|
|
else:
|
|
raise ValueError(f"Unsupported data type: {data_type}")
|
|
if crop_start > 0:
|
|
last_hidden_state = last_hidden_state[:, crop_start:]
|
|
attention_mask = (
|
|
attention_mask[:, crop_start:] if use_attention_mask else None
|
|
)
|
|
|
|
if output_hidden_states:
|
|
return TextEncoderModelOutput(
|
|
last_hidden_state, attention_mask, outputs.hidden_states
|
|
)
|
|
return TextEncoderModelOutput(last_hidden_state, attention_mask)
|
|
|
|
def forward(
|
|
self,
|
|
text,
|
|
use_attention_mask=None,
|
|
output_hidden_states=False,
|
|
do_sample=False,
|
|
hidden_state_skip_layer=None,
|
|
return_texts=False,
|
|
):
|
|
batch_encoding = self.text2tokens(text)
|
|
return self.encode(
|
|
batch_encoding,
|
|
use_attention_mask=use_attention_mask,
|
|
output_hidden_states=output_hidden_states,
|
|
do_sample=do_sample,
|
|
hidden_state_skip_layer=hidden_state_skip_layer,
|
|
return_texts=return_texts,
|
|
)
|
|
|