import os from transformers import AutoTokenizer, AutoModelForCausalLM import gradio as gr from threading import Thread from time import perf_counter from typing import List from transformers import TextIteratorStreamer import numpy as np # Model configuration and loading model_name = "susnato/phi-2" # Replace this with your Hugging Face model ID if necessary model_configuration = { "prompt_template": "{instruction}", "toeknizer_kwargs": {}, "response_key": "### Response", "end_key": "### End" } tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name) tokenizer_kwargs = model_configuration.get("toeknizer_kwargs", {}) response_key = model_configuration.get("response_key") tokenizer_response_key = None def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int: token_ids = tokenizer.encode(key) if len(token_ids) > 1: raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}") return token_ids[0] if response_key is not None: tokenizer_response_key = next( (token for token in tokenizer.additional_special_tokens if token.startswith(response_key)), None, ) end_key_token_id = None if tokenizer_response_key: try: end_key = model_configuration.get("end_key") if end_key: end_key_token_id = get_special_token_id(tokenizer, end_key) except ValueError: pass prompt_template = model_configuration.get("prompt_template", "{instruction}") end_key_token_id = end_key_token_id or tokenizer.eos_token_id pad_token_id = end_key_token_id or tokenizer.pad_token_id def estimate_latency( current_time: float, current_perf_text: str, new_gen_text: str, per_token_time: List[float], num_tokens: int, ): num_current_toks = len(tokenizer.encode(new_gen_text)) num_tokens += num_current_toks per_token_time.append(num_current_toks / current_time) if len(per_token_time) > 10 and len(per_token_time) % 4 == 0: current_bucket = per_token_time[:-10] return ( f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", num_tokens, ) return current_perf_text, num_tokens def run_generation( user_text: str, top_p: float, temperature: float, top_k: int, max_new_tokens: int, perf_text: str, ): prompt_text = prompt_template.format(instruction=user_text) model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs) streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( model_inputs, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, temperature=float(temperature), top_k=top_k, eos_token_id=end_key_token_id, pad_token_id=pad_token_id, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() model_output = "" per_token_time = [] num_tokens = 0 start = perf_counter() for new_text in streamer: current_time = perf_counter() - start model_output += new_text perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens) yield model_output, perf_text start = perf_counter() return model_output, perf_text def reset_textbox(instruction: str, response: str, perf: str): return "", "", "" examples = [ "Give me a recipe for pizza with pineapple", "Write me a tweet about the new OpenVINO release", "Explain the difference between CPU and GPU", "Give five ideas for a great weekend with family", "Do Androids dream of Electric sheep?", "Who is Dolly?", "Please give me advice on how to write resume?", "Name 3 advantages to being a cat", "Write instructions on how to become a good AI engineer", "Write a love letter to my best friend", ] def main(): with gr.Blocks() as demo: gr.Markdown( "# Question Answering with Model.\n" "Provide instruction which describes a task below or select among predefined examples and model writes response that performs requested task." ) with gr.Row(): with gr.Column(scale=4): user_text = gr.Textbox( placeholder="Write an email about an alpaca that likes flan", label="User instruction", ) model_output = gr.Textbox(label="Model response", interactive=False) performance = gr.Textbox(label="Performance", lines=1, interactive=False) with gr.Column(scale=1): button_clear = gr.Button(value="Clear") button_submit = gr.Button(value="Submit") gr.Examples(examples, user_text) with gr.Column(scale=1): max_new_tokens = gr.Slider( minimum=1, maximum=1000, value=256, step=1, interactive=True, label="Max New Tokens", ) top_p = gr.Slider( minimum=0.05, maximum=1.0, value=0.92, step=0.05, interactive=True, label="Top-p (nucleus sampling)", ) top_k = gr.Slider( minimum=0, maximum=50, value=0, step=1, interactive=True, label="Top-k", ) temperature = gr.Slider( minimum=0.1, maximum=5.0, value=0.8, step=0.1, interactive=True, label="Temperature", ) user_text.submit( run_generation, [user_text, top_p, temperature, top_k, max_new_tokens, performance], [model_output, performance], ) button_submit.click( run_generation, [user_text, top_p, temperature, top_k, max_new_tokens, performance], [model_output, performance], ) button_clear.click( reset_textbox, [user_text, model_output, performance], [user_text, model_output, performance], ) demo.queue() try: demo.launch(height=800) except Exception: demo.launch(share=True, height=800) if __name__ == "__main__": main()