functionary-small-v2.5 / modeling_functionary.py
jeffreymeetkai's picture
add respone parsing remove code, update README
2e54059
raw
history blame
5.57 kB
# coding=utf-8
# Copyright 2022 EleutherAI and the HuggingFace Inc. team. All rights reserved.
#
# This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX
# and OPT implementations in this library. It has been modified from its
# original forms to accommodate minor architectural differences compared
# to GPT-NeoX and OPT used by the Meta AI team that trained the model.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""PyTorch LLaMA model."""
import json
import re
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union
import torch
import torch.utils.checkpoint
from transformers.generation.configuration_utils import GenerationConfig
from transformers.generation.logits_process import LogitsProcessorList
from transformers.generation.stopping_criteria import StoppingCriteriaList
from transformers.generation.utils import (
GenerateBeamDecoderOnlyOutput,
GenerateBeamEncoderDecoderOutput,
GenerateDecoderOnlyOutput,
GenerateEncoderDecoderOutput
)
from transformers.models.llama.modeling_llama import LlamaForCausalLM
from transformers.utils import logging
if TYPE_CHECKING:
from transformers.modeling_utils import PreTrainedModel
from transformers.generation.streamers import BaseStreamer
logger = logging.get_logger(__name__)
GenerateNonBeamOutput = Union[GenerateDecoderOnlyOutput, GenerateEncoderDecoderOutput]
GenerateBeamOutput = Union[GenerateBeamDecoderOnlyOutput, GenerateBeamEncoderDecoderOutput]
GenerateOutput = Union[GenerateNonBeamOutput, GenerateBeamOutput]
class FunctionaryForCausalLM(LlamaForCausalLM):
def generate_tool_use(
self,
inputs: Optional[torch.Tensor] = None,
generation_config: Optional[GenerationConfig] = None,
logits_processor: Optional[LogitsProcessorList] = None,
stopping_criteria: Optional[StoppingCriteriaList] = None,
prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None,
synced_gpus: Optional[bool] = None,
assistant_model: Optional["PreTrainedModel"] = None,
streamer: Optional["BaseStreamer"] = None,
negative_prompt_ids: Optional[torch.Tensor] = None,
negative_prompt_attention_mask: Optional[torch.Tensor] = None,
**kwargs,
) -> Union[GenerateOutput, torch.LongTensor]:
results = self.generate(
inputs=inputs,
generation_config=generation_config,
logits_processor=logits_processor,
stopping_criteria=stopping_criteria,
prefix_allowed_tokens_fn=prefix_allowed_tokens_fn,
synced_gpus=synced_gpus,
assistant_model=assistant_model,
streamer=streamer,
negative_prompt_ids=negative_prompt_ids,
negative_prompt_attention_mask=negative_prompt_attention_mask,
**kwargs,
)
tokenizer = kwargs.pop("tokenizer", None) # Pull this out first, we use it to parse raw output
input_ids = kwargs.pop("input_ids")
function_call_token = "<|reserved_special_token_249|>"
correct_results = []
for input_id, result in zip(input_ids, results):
final_output_json = {"role": "assistant", "content": None, "tool_calls": None}
tool_calls = []
raw_output_str = tokenizer.decode(result[len(input_id):].cpu())
has_text = False if raw_output_str.startswith(function_call_token) else True
chunks = raw_output_str.split(function_call_token)
for i, chunk in enumerate(chunks):
if len(chunk) == 0:
continue
chunk = chunk.replace(tokenizer.pad_token, "")
if i == 0 and has_text is not False:
final_output_json["content"] = chunk.strip[:-len("<|eot_id|>")] if chunk.endswith("<|eot_id|>") else chunk
else:
tool_calls.append(
{
"name": chunk[: chunk.index("\n{")],
"arguments": chunk[chunk.index("\n{") + 1: -len("<|eot_id|>")] if chunk.endswith("<|eot_id|>") else chunk[chunk.index("\n{") + 1:]
}
)
if len(tool_calls) > 0:
final_output_json["tool_calls"] = tool_calls
final_output_str = json.dumps(final_output_json, indent=4)
final_output_ids = tokenizer(final_output_str, add_special_tokens=False)["input_ids"]
correct_results.append(
torch.cat(
(result[:len(input_id)].cpu(), torch.tensor(final_output_ids))
)
)
max_len = max([tensor.shape[0] for tensor in correct_results])
correct_results = [
torch.nn.functional.pad(
correct_result, (0, max_len - correct_result.shape[0]), value=tokenizer.eos_token_id
) for correct_result in correct_results
]
correct_results = torch.stack(correct_results)
return correct_results