VITA-1.5 / vita /model /language_model /vita_fo_qwen2.py
lxysl's picture
upload vita-1.5 app.py
bc752b1
from typing import List, Optional, Tuple, Union
import torch
import torch.nn as nn
from PIL import Image
from torch.nn import CrossEntropyLoss
from transformers import (
AutoConfig,
AutoModelForCausalLM,
Qwen2Config,
Qwen2ForCausalLM,
Qwen2Model,
)
from transformers.cache_utils import Cache, DynamicCache
from transformers.modeling_outputs import CausalLMOutputWithPast, MoeCausalLMOutputWithPast
from transformers.generation.utils import GenerateOutput
from ..vita_arch import VITAMetaForCausalLM, VITAMetaModel
from ...constants import IGNORE_INDEX
from .vita_qwen2 import custom_forward
Qwen2ForCausalLM.forward = custom_forward
class VITAFOQwen2Config(Qwen2Config):
model_type = "vita-fo-Qwen2"
class VITAFOQwen2Model(VITAMetaModel, Qwen2Model):
config_class = VITAFOQwen2Config
def __init__(self, config: Qwen2Config):
super(VITAFOQwen2Model, self).__init__(config)
class VITAFOQwen2ForCausalLM(Qwen2ForCausalLM, VITAMetaForCausalLM):
config_class = VITAFOQwen2Config
def __init__(self, config):
super(Qwen2ForCausalLM, self).__init__(config)
self.model = VITAFOQwen2Model(config)
self.vocab_size = config.vocab_size
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
self.predict_usr_state = 0#2
if self.predict_usr_state:
self.predictor_head = torch.nn.Linear(config.hidden_size, self.predict_usr_state + 1) # +1 for the dummy class
else:
self.predictor_head = None
# Initialize weights and apply final processing
self.post_init()
def get_model(self):
return self.model
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
images: Optional[torch.FloatTensor] = None,
audios: Optional[dict] = None,
sf_masks: Optional[torch.Tensor] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
) -> Union[Tuple, CausalLMOutputWithPast]:
if inputs_embeds is None:
(
input_ids,
position_ids,
attention_mask,
past_key_values,
inputs_embeds,
labels,
) = self.prepare_inputs_labels_for_multimodal(
input_ids, position_ids, attention_mask, past_key_values, labels, images, audios, sf_masks
)
if labels is not None:
state_labels = labels
labels = torch.where(labels>=0, labels, IGNORE_INDEX)
output_hidden_states = True
outputs = super().forward(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
labels=labels,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
cache_position=cache_position,
)
# state loss
if self.predictor_head is not None:
state_logits = self.predictor_head(outputs[2][-1]).view(-1, self.predict_usr_state+1) # +1 for the dummy class
if labels is not None:
loss = outputs[0]
weight = torch.Tensor([1, 5, 1]).to(torch.bfloat16).to(inputs_embeds.device)
loss_fct = torch.nn.CrossEntropyLoss(weight=weight)
s_labels= torch.where(
state_labels < IGNORE_INDEX,
IGNORE_INDEX-state_labels-1,
IGNORE_INDEX).view(-1)
#assert all(label in [0, 1, IGNORE_INDEX] for label in s_labels), "s_labels must contain only 0, 1, or -100"
state_loss = loss_fct(state_logits, s_labels)
loss = loss + state_loss
outputs['loss'] = loss
return outputs
@torch.no_grad()
def generate(
self,
inputs: Optional[torch.Tensor] = None,
images: Optional[torch.Tensor] = None,
audios: Optional[torch.Tensor] = None,
sf_masks: Optional[torch.Tensor] = None,
**kwargs,
) -> Union[GenerateOutput, torch.LongTensor]:
position_ids = kwargs.pop("position_ids", None)
attention_mask = kwargs.pop("attention_mask", None)
if "inputs_embeds" in kwargs:
raise NotImplementedError("`inputs_embeds` is not supported")
if images is not None or audios is not None:
(
inputs,
position_ids,
attention_mask,
_,
inputs_embeds,
_
) = self.prepare_inputs_labels_for_multimodal(
inputs,
position_ids,
attention_mask,
None,
None,
images,
audios,
sf_masks,
)
else:
inputs_embeds = self.get_model().embed_tokens(inputs)
return super().generate(
position_ids=position_ids,
attention_mask=attention_mask,
inputs_embeds=inputs_embeds,
**kwargs
)
def prepare_inputs_for_generation(
self,
input_ids,
past_key_values=None,
inputs_embeds=None,
attention_mask=None,
**kwargs,
):
images = kwargs.pop("images", None)
audios = kwargs.pop("audios", None)
sf_masks = kwargs.pop("sf_masks", None)
_inputs = super().prepare_inputs_for_generation(
input_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
attention_mask=attention_mask,
**kwargs,
)
if images is not None:
_inputs["images"] = images
if audios is not None:
_inputs["audios"] = audios
if sf_masks is not None:
_inputs["sf_masks"] = sf_masks
return _inputs
def expand2square(self, pil_img, background_color):
width, height = pil_img.size
if width == height:
return pil_img
elif width > height:
result = Image.new(pil_img.mode, (width, width), background_color)
result.paste(pil_img, (0, (width - height) // 2))
return result
else:
result = Image.new(pil_img.mode, (height, height), background_color)
result.paste(pil_img, ((height - width) // 2, 0))
return result
def process_images(self, images, model_cfg):
vision_tower = self.get_vision_tower()
if not vision_tower.is_loaded:
vision_tower.load_model()
image_processor = vision_tower.image_processor
image_aspect_ratio = getattr(model_cfg, "image_aspect_ratio", None)
new_images = []
if image_aspect_ratio == "pad":
for image in images:
image = self.expand2square(
image, tuple(int(x * 255) for x in image_processor.image_mean)
)
image = image_processor.preprocess(image, return_tensors="pt")["pixel_values"][0]
new_images.append(image)
else:
return image_processor(images, return_tensors="pt")["pixel_values"]
if all(x.shape == new_images[0].shape for x in new_images):
new_images = torch.stack(new_images, dim=0)
return new_images
AutoConfig.register("vita-fo-Qwen2", VITAFOQwen2Config)
AutoModelForCausalLM.register(VITAFOQwen2Config, VITAFOQwen2ForCausalLM)