import torch from .vision_encoder import VisionEncoder from .configuration_moondream import MoondreamConfig from transformers import PreTrainedModel, TextIteratorStreamer from .modeling_phi import PhiForCausalLM from .configuration_moondream import PhiConfig from threading import Thread class Moondream(PreTrainedModel): config_class = MoondreamConfig _supports_flash_attn_2 = True def __init__(self, config): super().__init__(config) self.vision_encoder = VisionEncoder( use_flash_attn=config._attn_implementation == "flash_attention_2" ) if type(config.text_config) == dict: phi_config = PhiConfig( **config.text_config, attn_implementation=config._attn_implementation ) else: phi_config = config.text_config self.text_model = PhiForCausalLM(phi_config) @property def device(self): return self.text_model.device def encode_image(self, image): with torch.no_grad(): return self.vision_encoder(image) def input_embeds(self, prompt, image_embeds, tokenizer): def _tokenize(txt): return tokenizer( txt, return_tensors="pt", add_special_tokens=False ).input_ids.to(self.device) text_emb = self.text_model.get_input_embeddings() # Add BOS token embeds = [] embeds.append( text_emb((torch.tensor([[tokenizer.bos_token_id]], device=self.device))) ) if "" not in prompt: embeds.append(text_emb(_tokenize(prompt))) else: assert prompt.count("") == 1 before, after = prompt.split("") if len(before) > 0: embeds.append(text_emb(_tokenize(before))) embeds.append(image_embeds.to(self.device)) if len(after) > 0: embeds.append(text_emb(_tokenize(after))) return torch.cat(embeds, dim=1) def get_input_embeddings(self): return self.text_model.get_input_embeddings() async def generate( self, image_embeds, prompt, tokenizer, max_new_tokens=128, **kwargs, ): generate_config = { "eos_token_id": tokenizer.eos_token_id, "bos_token_id": tokenizer.bos_token_id, "pad_token_id": tokenizer.bos_token_id, "max_new_tokens": max_new_tokens, **kwargs, } with torch.no_grad(): inputs_embeds = self.input_embeds(prompt, image_embeds, tokenizer) streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) # Start generation in a separate thread thread = Thread(target=self.text_model.generate, kwargs={ "inputs_embeds": inputs_embeds, "streamer": streamer, **generate_config }) thread.start() # Yield generated text as it becomes available for new_text in streamer: print("NEW TEXT" + new_text) yield new_text thread.join() print("FINISHED") #return tokenizer.batch_decode(output_ids, skip_special_tokens=True) """ def answer_question( self, image_embeds, question, tokenizer, chat_history="", result_queue=None, **kwargs, ): prompt = f"\n\n{chat_history}Question: {question}\n\nAnswer:" answer = self.generate( image_embeds, prompt, tokenizer=tokenizer, max_new_tokens=512, **kwargs, )[0] cleaned_answer = answer.strip() # Use the result_queue to pass the result if it is provided if result_queue: result_queue.put(cleaned_answer) else: return cleaned_answer """ async def answer_question( self, image_embeds, question, tokenizer, chat_history="", **kwargs, ): prompt = f"\n\n{chat_history}Question: {question}\n\nAnswer:" generator = self.generate( image_embeds, prompt, tokenizer=tokenizer, max_new_tokens=512, **kwargs, ) async for token in generator: print("ONE") yield token def batch_answer( self, images, prompts, tokenizer, **kwargs, ): image_embeds = self.encode_image(images) templated_prompts = [ f"\n\nQuestion: {prompt}\n\nAnswer:" for prompt in prompts ] prompt_embs = [ self.input_embeds(prompt, image_embed.unsqueeze(0), tokenizer)[0] for prompt, image_embed in zip(templated_prompts, image_embeds) ] bos_emb = prompt_embs[0][0] max_len = max([p.shape[0] for p in prompt_embs]) inputs_embeds = torch.cat( [ torch.cat([bos_emb.repeat(max_len - p.shape[0], 1), p]).unsqueeze(0) for p in prompt_embs ], dim=0, ) attention_mask = torch.cat( [ torch.cat( [ torch.zeros( 1, max_len - p.shape[0], device=self.device, dtype=torch.long, ), torch.ones(1, p.shape[0], device=self.device, dtype=torch.long), ], dim=1, ) for p in prompt_embs ], dim=0, ) generate_config = { "eos_token_id": tokenizer.eos_token_id, "bos_token_id": tokenizer.bos_token_id, "pad_token_id": tokenizer.bos_token_id, "max_new_tokens": 512, **kwargs, } with torch.no_grad(): output_ids = self.text_model.generate( inputs_embeds=inputs_embeds, attention_mask=attention_mask, **generate_config, ) return [ x.strip() for x in tokenizer.batch_decode(output_ids, skip_special_tokens=True) ]