coding=utf-8 # Copyright (c) 2024, MeetKai Inc. All rights reserved. """PyTorch LLaMA model.""" import json from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union import torch import torch.utils.checkpoint from transformers.generation.configuration_utils import GenerationConfig from transformers.generation.logits_process import LogitsProcessorList from transformers.generation.stopping_criteria import StoppingCriteriaList from transformers.generation.utils import ( GenerateBeamDecoderOnlyOutput, GenerateBeamEncoderDecoderOutput, GenerateDecoderOnlyOutput, GenerateEncoderDecoderOutput ) from transformers.models.llama.modeling_llama import LlamaForCausalLM from transformers.utils import logging if TYPE_CHECKING: from transformers.modeling_utils import PreTrainedModel from transformers.generation.streamers import BaseStreamer logger = logging.get_logger(__name__) GenerateNonBeamOutput = Union[GenerateDecoderOnlyOutput, GenerateEncoderDecoderOutput] GenerateBeamOutput = Union[GenerateBeamDecoderOnlyOutput, GenerateBeamEncoderDecoderOutput] GenerateOutput = Union[GenerateNonBeamOutput, GenerateBeamOutput] class FunctionaryForCausalLM(LlamaForCausalLM): def generate_tool_use( self, inputs: Optional[torch.Tensor] = None, generation_config: Optional[GenerationConfig] = None, logits_processor: Optional[LogitsProcessorList] = None, stopping_criteria: Optional[StoppingCriteriaList] = None, prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None, synced_gpus: Optional[bool] = None, assistant_model: Optional["PreTrainedModel"] = None, streamer: Optional["BaseStreamer"] = None, negative_prompt_ids: Optional[torch.Tensor] = None, negative_prompt_attention_mask: Optional[torch.Tensor] = None, **kwargs, ) -> Union[GenerateOutput, torch.LongTensor]: tokenizer = kwargs.pop("tokenizer", None) # Pull this out first, we use it to parse raw output results = self.generate( inputs=inputs, generation_config=generation_config, logits_processor=logits_processor, stopping_criteria=stopping_criteria, prefix_allowed_tokens_fn=prefix_allowed_tokens_fn, synced_gpus=synced_gpus, assistant_model=assistant_model, streamer=streamer, negative_prompt_ids=negative_prompt_ids, negative_prompt_attention_mask=negative_prompt_attention_mask, **kwargs, ) input_ids = kwargs.pop("input_ids") function_call_token = ">>>" correct_results = [] for input_id, result in zip(input_ids, results): final_output_json = {"role": "assistant", "content": None, "tool_calls": None} tool_calls = [] raw_output_str = tokenizer.decode(result[len(input_id):].cpu()) chunks = raw_output_str.split(function_call_token) for i, chunk in enumerate(chunks): if len(chunk) == 0: continue chunk = chunk.replace(tokenizer.pad_token, "") has_text = True if chunk.startswith("all") else False if i == 0 and has_text is not False: final_output_json["content"] = chunk.strip[:-len("<|eot_id|>")] if chunk.endswith("<|eot_id|>") else chunk final_output_json["content"] = final_output_json["content"][len("all\n"):] else: tool_calls.append( { "name": chunk[: chunk.index("\n{")], "arguments": chunk[chunk.index("\n{") + 1: -len("<|eot_id|>")] if chunk.endswith("<|eot_id|>") else chunk[chunk.index("\n{") + 1:] } ) if len(tool_calls) > 0: final_output_json["tool_calls"] = tool_calls final_output_str = json.dumps(final_output_json, indent=4) final_output_ids = tokenizer(final_output_str, add_special_tokens=False)["input_ids"] correct_results.append( torch.cat( (result[:len(input_id)].cpu(), torch.tensor(final_output_ids)) ) ) max_len = max([tensor.shape[0] for tensor in correct_results]) correct_results = [ torch.nn.functional.pad( correct_result, (0, max_len - correct_result.shape[0]), value=tokenizer.eos_token_id ) for correct_result in correct_results ] correct_results = torch.stack(correct_results) return correct_results