Spaces:
Running
Running
""" | |
Chatbot Arena (side-by-side) tab. | |
Users chat with two chosen models. | |
""" | |
import json | |
import time | |
import gradio as gr | |
import numpy as np | |
from src.constants import ( | |
MODERATION_MSG, | |
CONVERSATION_LIMIT_MSG, | |
INPUT_CHAR_LEN_LIMIT, | |
CONVERSATION_TURN_LIMIT, | |
) | |
from src.model.model_adapter import get_conversation_template | |
from src.serve.gradio_web_server import ( | |
State, | |
bot_response, | |
get_conv_log_filename, | |
no_change_btn, | |
enable_btn, | |
disable_btn, | |
invisible_btn, | |
acknowledgment_md, | |
get_ip, | |
_prepare_text_with_image, | |
get_model_description_md, | |
) | |
from src.serve.remote_logger import get_remote_logger | |
from src.utils import ( | |
build_logger, | |
moderation_filter, | |
) | |
logger = build_logger("gradio_web_server_multi", "gradio_web_server_multi.log") | |
num_sides = 2 | |
enable_moderation = False | |
def set_global_vars_named(enable_moderation_): | |
global enable_moderation | |
enable_moderation = enable_moderation_ | |
def load_demo_side_by_side_named(models, url_params): | |
states = (None,) * num_sides | |
model_left = models[0] if len(models) > 0 else "" | |
if len(models) > 1: | |
weights = ([8] * 4 + [4] * 8 + [1] * 32)[: len(models) - 1] | |
weights = weights / np.sum(weights) | |
model_right = np.random.choice(models[1:], p=weights) | |
else: | |
model_right = model_left | |
selector_updates = ( | |
gr.Dropdown(choices=models, value=model_left, visible=True), | |
gr.Dropdown(choices=models, value=model_right, visible=True), | |
) | |
return states + selector_updates | |
def vote_last_response(states, vote_type, model_selectors, request: gr.Request): | |
with open(get_conv_log_filename(), "a") as fout: | |
data = { | |
"tstamp": round(time.time(), 4), | |
"type": vote_type, | |
"models": [x for x in model_selectors], | |
"states": [x.dict() for x in states], | |
"ip": get_ip(request), | |
} | |
fout.write(json.dumps(data) + "\n") | |
get_remote_logger().log(data) | |
def leftvote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"leftvote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "leftvote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
def rightvote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"rightvote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "rightvote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
def tievote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"tievote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "tievote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
def bothbad_vote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"bothbad_vote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "bothbad_vote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
def regenerate(state0, state1, request: gr.Request): | |
logger.info(f"regenerate (named). ip: {get_ip(request)}") | |
states = [state0, state1] | |
if state0.regen_support and state1.regen_support: | |
for i in range(num_sides): | |
states[i].conv.update_last_message(None) | |
return ( | |
states + [x.to_gradio_chatbot() for x in states] + [""] + [disable_btn] * 6 | |
) | |
states[0].skip_next = True | |
states[1].skip_next = True | |
return states + [x.to_gradio_chatbot() for x in states] + [""] + [no_change_btn] * 6 | |
def clear_history(request: gr.Request): | |
logger.info(f"clear_history (named). ip: {get_ip(request)}") | |
return ( | |
[None] * num_sides | |
+ [None] * num_sides | |
+ [""] | |
+ [invisible_btn] * 4 | |
+ [disable_btn] * 2 | |
) | |
def share_click(state0, state1, model_selector0, model_selector1, request: gr.Request): | |
logger.info(f"share (named). ip: {get_ip(request)}") | |
if state0 is not None and state1 is not None: | |
vote_last_response( | |
[state0, state1], "share", [model_selector0, model_selector1], request | |
) | |
def add_text( | |
state0, state1, model_selector0, model_selector1, text, image, request: gr.Request | |
): | |
ip = get_ip(request) | |
logger.info(f"add_text (named). ip: {ip}. len: {len(text)}") | |
states = [state0, state1] | |
model_selectors = [model_selector0, model_selector1] | |
# Init states if necessary | |
for i in range(num_sides): | |
if states[i] is None: | |
states[i] = State(model_selectors[i]) | |
if len(text) <= 0: | |
for i in range(num_sides): | |
states[i].skip_next = True | |
return ( | |
states | |
+ [x.to_gradio_chatbot() for x in states] | |
+ ["", None] | |
+ [ | |
no_change_btn, | |
] | |
* 6 | |
) | |
model_list = [states[i].model_name for i in range(num_sides)] | |
all_conv_text_left = states[0].conv.get_prompt() | |
all_conv_text_right = states[1].conv.get_prompt() | |
all_conv_text = ( | |
all_conv_text_left[-1000:] + all_conv_text_right[-1000:] + "\nuser: " + text | |
) | |
flagged = moderation_filter(all_conv_text, model_list) | |
if flagged: | |
logger.info(f"violate moderation (named). ip: {ip}. text: {text}") | |
# overwrite the original text | |
text = MODERATION_MSG | |
conv = states[0].conv | |
if (len(conv.messages) - conv.offset) // 2 >= CONVERSATION_TURN_LIMIT: | |
logger.info(f"conversation turn limit. ip: {ip}. text: {text}") | |
for i in range(num_sides): | |
states[i].skip_next = True | |
return ( | |
states | |
+ [x.to_gradio_chatbot() for x in states] | |
+ [CONVERSATION_LIMIT_MSG, None] | |
+ [ | |
no_change_btn, | |
] | |
* 6 | |
) | |
text = text[:INPUT_CHAR_LEN_LIMIT] # Hard cut-off | |
for i in range(num_sides): | |
post_processed_text = _prepare_text_with_image( | |
states[i], text, image, csam_flag=False | |
) | |
states[i].conv.append_message(states[i].conv.roles[0], post_processed_text) | |
states[i].conv.append_message(states[i].conv.roles[1], None) | |
states[i].skip_next = False | |
return ( | |
states | |
+ [x.to_gradio_chatbot() for x in states] | |
+ ["", None] | |
+ [ | |
disable_btn, | |
] | |
* 6 | |
) | |
def bot_response_multi( | |
state0, | |
state1, | |
temperature, | |
top_p, | |
max_new_tokens, | |
request: gr.Request, | |
): | |
logger.info(f"bot_response_multi (named). ip: {get_ip(request)}") | |
if state0.skip_next: | |
# This generate call is skipped due to invalid inputs | |
yield ( | |
state0, | |
state1, | |
state0.to_gradio_chatbot(), | |
state1.to_gradio_chatbot(), | |
) + (no_change_btn,) * 6 | |
return | |
states = [state0, state1] | |
gen = [] | |
for i in range(num_sides): | |
gen.append( | |
bot_response( | |
states[i], | |
temperature, | |
top_p, | |
max_new_tokens, | |
request, | |
) | |
) | |
is_stream_batch = [] | |
for i in range(num_sides): | |
is_stream_batch.append( | |
states[i].model_name | |
in [ | |
"gemini-pro", | |
"gemini-pro-dev-api", | |
"gemma-1.1-2b-it", | |
"gemma-1.1-7b-it", | |
] | |
) | |
chatbots = [None] * num_sides | |
iters = 0 | |
while True: | |
stop = True | |
iters += 1 | |
for i in range(num_sides): | |
try: | |
# yield gemini fewer times as its chunk size is larger | |
# otherwise, gemini will stream too fast | |
if not is_stream_batch[i] or (iters % 30 == 1 or iters < 3): | |
ret = next(gen[i]) | |
states[i], chatbots[i] = ret[0], ret[1] | |
stop = False | |
except StopIteration: | |
pass | |
yield states + chatbots + [disable_btn] * 6 | |
if stop: | |
break | |
def flash_buttons(): | |
btn_updates = [ | |
[disable_btn] * 4 + [enable_btn] * 2, | |
[enable_btn] * 6, | |
] | |
for i in range(4): | |
yield btn_updates[i % 2] | |
time.sleep(0.3) | |
def build_side_by_side_ui_named(models): | |
notice_markdown = """ | |
# βοΈ Chatbot Arena: Benchmarking LLMs in the Wild | |
- | [Blog](https://lmsys.org/blog/2023-05-03-arena/) | [GitHub](https://github.com/lm-sys/FastChat) | [Paper](https://arxiv.org/abs/2306.05685) | [Dataset](https://github.com/lm-sys/FastChat/blob/main/docs/dataset_release.md) | [Twitter](https://twitter.com/lmsysorg) | [Discord](https://discord.gg/HSWAKCrnFx) | | |
## π Rules | |
- Chat with any two models side-by-side and vote! | |
- You can continue chatting for multiple rounds. | |
- Click "Clear history" to start a new round. | |
## π€ Choose two models to compare | |
""" | |
states = [gr.State() for _ in range(num_sides)] | |
model_selectors = [None] * num_sides | |
chatbots = [None] * num_sides | |
notice = gr.Markdown(notice_markdown, elem_id="notice_markdown") | |
with gr.Group(elem_id="share-region-named"): | |
with gr.Row(): | |
for i in range(num_sides): | |
with gr.Column(): | |
model_selectors[i] = gr.Dropdown( | |
choices=models, | |
value=models[i] if len(models) > i else "", | |
interactive=True, | |
show_label=False, | |
container=False, | |
) | |
with gr.Row(): | |
with gr.Accordion( | |
f"π Expand to see the descriptions of {len(models)} models", open=False | |
): | |
model_description_md = get_model_description_md(models) | |
gr.Markdown(model_description_md, elem_id="model_description_markdown") | |
with gr.Row(): | |
for i in range(num_sides): | |
label = "Model A" if i == 0 else "Model B" | |
with gr.Column(): | |
chatbots[i] = gr.Chatbot( | |
label=label, | |
elem_id=f"chatbot", | |
height=550, | |
show_copy_button=True, | |
) | |
with gr.Row(): | |
leftvote_btn = gr.Button( | |
value="π A is better", visible=False, interactive=False | |
) | |
rightvote_btn = gr.Button( | |
value="π B is better", visible=False, interactive=False | |
) | |
tie_btn = gr.Button(value="π€ Tie", visible=False, interactive=False) | |
bothbad_btn = gr.Button( | |
value="π Both are bad", visible=False, interactive=False | |
) | |
with gr.Row(): | |
textbox = gr.Textbox( | |
show_label=False, | |
placeholder="π Enter your prompt and press ENTER", | |
elem_id="input_box", | |
) | |
send_btn = gr.Button(value="Send", variant="primary", scale=0) | |
with gr.Row() as button_row: | |
clear_btn = gr.Button(value="ποΈ Clear history", interactive=False) | |
regenerate_btn = gr.Button(value="π Regenerate", interactive=False) | |
share_btn = gr.Button(value="π· Share") | |
with gr.Accordion("Parameters", open=False) as parameter_row: | |
temperature = gr.Slider( | |
minimum=0.0, | |
maximum=1.0, | |
value=0.7, | |
step=0.1, | |
interactive=True, | |
label="Temperature", | |
) | |
top_p = gr.Slider( | |
minimum=0.0, | |
maximum=1.0, | |
value=1.0, | |
step=0.1, | |
interactive=True, | |
label="Top P", | |
) | |
max_output_tokens = gr.Slider( | |
minimum=16, | |
maximum=2048, | |
value=1024, | |
step=64, | |
interactive=True, | |
label="Max output tokens", | |
) | |
gr.Markdown(acknowledgment_md, elem_id="ack_markdown") | |
# Register listeners | |
imagebox = gr.State(None) | |
btn_list = [ | |
leftvote_btn, | |
rightvote_btn, | |
tie_btn, | |
bothbad_btn, | |
regenerate_btn, | |
clear_btn, | |
] | |
leftvote_btn.click( | |
leftvote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
rightvote_btn.click( | |
rightvote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
tie_btn.click( | |
tievote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
bothbad_btn.click( | |
bothbad_vote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
regenerate_btn.click( | |
regenerate, states, states + chatbots + [textbox] + btn_list | |
).then( | |
bot_response_multi, | |
states + [temperature, top_p, max_output_tokens], | |
states + chatbots + btn_list, | |
).then( | |
flash_buttons, [], btn_list | |
) | |
clear_btn.click(clear_history, None, states + chatbots + [textbox] + btn_list) | |
share_js = """ | |
function (a, b, c, d) { | |
const captureElement = document.querySelector('#share-region-named'); | |
html2canvas(captureElement) | |
.then(canvas => { | |
canvas.style.display = 'none' | |
document.body.appendChild(canvas) | |
return canvas | |
}) | |
.then(canvas => { | |
const image = canvas.toDataURL('image/png') | |
const a = document.createElement('a') | |
a.setAttribute('download', 'chatbot-arena.png') | |
a.setAttribute('href', image) | |
a.click() | |
canvas.remove() | |
}); | |
return [a, b, c, d]; | |
} | |
""" | |
share_btn.click(share_click, states + model_selectors, [], js=share_js) | |
for i in range(num_sides): | |
model_selectors[i].change( | |
clear_history, None, states + chatbots + [textbox] + btn_list | |
) | |
textbox.submit( | |
add_text, | |
states + model_selectors + [textbox, imagebox], | |
states + chatbots + [textbox, imagebox] + btn_list, | |
).then( | |
bot_response_multi, | |
states + [temperature, top_p, max_output_tokens], | |
states + chatbots + btn_list, | |
).then( | |
flash_buttons, [], btn_list | |
) | |
send_btn.click( | |
add_text, | |
states + model_selectors + [textbox, imagebox], | |
states + chatbots + [textbox, imagebox] + btn_list, | |
).then( | |
bot_response_multi, | |
states + [temperature, top_p, max_output_tokens], | |
states + chatbots + btn_list, | |
).then( | |
flash_buttons, [], btn_list | |
) | |
return states + model_selectors | |