|
import json |
|
import os |
|
import time |
|
|
|
import gradio as gr |
|
import torch |
|
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer |
|
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "0" |
|
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
|
|
|
|
|
def get_gpu_memory(): |
|
return torch.cuda.memory_allocated() / 1024 / 1024 |
|
|
|
|
|
class TorchTracemalloc: |
|
def __init__(self): |
|
self.begin = 0 |
|
self.peak = 0 |
|
|
|
def __enter__(self): |
|
torch.cuda.empty_cache() |
|
torch.cuda.reset_peak_memory_stats() |
|
torch.cuda.synchronize() |
|
self.begin = get_gpu_memory() |
|
return self |
|
|
|
def __exit__(self, *exc): |
|
torch.cuda.synchronize() |
|
self.peak = ( |
|
torch.cuda.max_memory_allocated() / 1024 / 1024 |
|
) |
|
|
|
def consumed(self): |
|
return self.peak - self.begin |
|
|
|
|
|
def load_model_and_tokenizer(): |
|
model_name = "NousResearch/Hermes-2-Theta-Llama-3-8B" |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
special_tokens = {"pad_token": "<PAD>"} |
|
tokenizer.add_special_tokens(special_tokens) |
|
config = AutoConfig.from_pretrained(model_name) |
|
setattr( |
|
config, |
|
"quantizer_path", |
|
f"codebooks/Hermes-2-Theta-Llama-3-8B_1bit.xmad", |
|
) |
|
setattr(config, "window_length", 32) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_name, config=config, torch_dtype=torch.float16, device_map="cuda:2" |
|
) |
|
if len(tokenizer) > model.get_input_embeddings().weight.shape[0]: |
|
print( |
|
"WARNING: Resizing the embedding matrix to match the tokenizer vocab size." |
|
) |
|
model.resize_token_embeddings(len(tokenizer)) |
|
model.config.pad_token_id = tokenizer.pad_token_id |
|
return model, tokenizer |
|
|
|
|
|
def process_dialog(dialog, model, tokenizer): |
|
prompt = tokenizer.apply_chat_template( |
|
dialog, tokenize=False, add_generation_prompt=True |
|
) |
|
tokenized_input_prompt_ids = tokenizer( |
|
prompt, return_tensors="pt" |
|
).input_ids.to(model.device) |
|
|
|
torch.cuda.empty_cache() |
|
torch.cuda.reset_peak_memory_stats() |
|
|
|
with TorchTracemalloc() as tt: |
|
start_time = time.time() |
|
with torch.no_grad(): |
|
token_ids_for_each_answer = model.generate( |
|
tokenized_input_prompt_ids, |
|
max_new_tokens=512, |
|
temperature=0.7, |
|
do_sample=True, |
|
eos_token_id=tokenizer.eos_token_id, |
|
pad_token_id=tokenizer.pad_token_id, |
|
) |
|
torch.cuda.synchronize() |
|
end_time = time.time() |
|
|
|
response = token_ids_for_each_answer[0][ |
|
tokenized_input_prompt_ids.shape[-1] : |
|
] |
|
cleaned_response = tokenizer.decode( |
|
response, |
|
skip_special_tokens=True, |
|
clean_up_tokenization_spaces=True, |
|
) |
|
|
|
return cleaned_response |
|
|
|
|
|
model, tokenizer = load_model_and_tokenizer() |
|
|
|
|
|
def chatbot_interface(user_input, chat_history): |
|
dialog = [{"role": "user", "content": user_input}] |
|
response = process_dialog(dialog, model, tokenizer) |
|
chat_history.append((user_input, response)) |
|
return chat_history, chat_history |
|
|
|
|
|
def main(): |
|
with gr.Blocks() as demo: |
|
chatbot = gr.Chatbot() |
|
user_input = gr.Textbox(placeholder="Type your message here...") |
|
clear = gr.Button("Clear") |
|
|
|
user_input.submit(chatbot_interface, [user_input, chatbot], [chatbot, chatbot]) |
|
clear.click(lambda: None, None, chatbot) |
|
|
|
demo.launch() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|