Spaces:
Running
Running
from __future__ import annotations | |
import asyncio | |
import copy | |
import logging | |
import os | |
import uuid | |
import ujson | |
import aiohttp | |
from datasets import load_dataset | |
import gradio as gr | |
import pandas as pd | |
from backend.logging import log_messages, log_feedback | |
from backend.messages_processing import add_details, history_to_langchain_format | |
from backend.models import get_chat_model_wrapper, LLMBackends | |
from backend.svg_processing import postprocess_svg | |
logging.basicConfig(level=logging.DEBUG) | |
RESTRICT_ACCESS = False | |
INFERENCE_SERVER_URL = "https://api-inference.huggingface.co/models/{model_id}" | |
MODEL_ID = "HuggingFaceH4/zephyr-7b-beta" | |
TOURIST_MODEL_KWARGS = { | |
"max_tokens": 800, | |
"temperature": 0.6, | |
} | |
GUIDE_KWARGS = { | |
"expert_model": "meta-llama/Meta-Llama-3-70B-Instruct", | |
# "accounts/fireworks/models/nous-hermes-2-mixtral-8x7b-dpo-fp8", | |
# "accounts/fireworks/models/llama-v3-8b-instruct-hf", | |
# "accounts/fireworks/models/nous-hermes-2-mixtral-8x7b-dpo-fp8", | |
"inference_server_url": "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-70B-Instruct", | |
# "https://api.fireworks.ai/inference/v1", | |
"llm_backend": "HFChat", | |
"classifier_kwargs": { | |
"model_id": "MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli", | |
# "inference_server_url": "https://sa710i91bnjvbhir.us-east-1.aws.endpoints.huggingface.cloud", | |
"inference_server_url": "https://api-inference.huggingface.co/models/MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli", | |
"batch_size": 8, | |
}, | |
} | |
EXAMPLES = [ | |
("We're a nature-loving family with three kids, have some money left, and no plans " | |
"for next week-end. Should we visit Disneyland?"), | |
"Should I stop eating animals?", | |
"Bob needs a reliable and cheap car. Should he buy a Mercedes?", | |
('Gavin has an insurance policy that includes coverage for "General Damages," ' | |
'which includes losses from "missed employment due to injuries that occur ' | |
'under regular working conditions."\n\n' | |
'Gavin works as an A/C repair technician in a small town. One day, Gavin is ' | |
'hired to repair an air conditioner located on the second story of a building. ' | |
'Because Gavin is an experienced repairman, he knows that the safest way to ' | |
'access the unit is with a sturdy ladder. While climbing the ladder, Gavin ' | |
'loses his balance and falls, causing significant injury. Because of this, he ' | |
'subsequently has to stop working for weeks. Gavin files a claim with his ' | |
'insurance company for lost income.\n\n' | |
'Does Gavin\'s insurance policy cover his claim for lost income?'), | |
"How many arguments did you consider in your internal reasoning? (Brief answer, please.)", | |
"Did you consider any counterarguments in your internal reasoning?", | |
"From all the arguments you considered and assessed, which one is the most important?", | |
"Did you refute any arguments or reasons for lack of plausibility?" | |
] | |
TITLE = """<div align=left> | |
<h1>🪁 Benjamin Chatbot with Logikon <i>Guided Reasoning™️</i></h1> | |
</div>""" | |
TERMS_OF_SERVICE ="""<h2>Terms of Service</h2> | |
<p>This app is provided by Logikon AI for educational and research purposes only. | |
The app is powered by Logikon's <i>Guided Reasoning™️</i> technology, which is a novel approach to | |
reasoning with language models. The app is a work in progress and may not always provide accurate or reliable information. | |
By accepting these terms of service, you agree not to use the app:</p> | |
<ol> | |
<li>In any way that violates any applicable national, federal, state, local or international law or regulation;</li> | |
<li>For the purpose of exploiting, harming or attempting to exploit or harm minors in any way;</li> | |
<li>To generate and/or disseminate malware (e.g. ransomware) or any other content to be used for the purpose of harming electronic systems;</li> | |
<li>To generate or disseminate verifiably false information and/or content with the purpose of harming others;</li> | |
<li>To generate or disseminate personal identifiable information that can be used to harm an individual;</li> | |
<li>To generate or disseminate information and/or content (e.g. images, code, posts, articles), and place the information and/or content in any public context (e.g. bot generating tweets) without expressly and intelligibly disclaiming that the information and/or content is machine generated;</li> | |
<li>To defame, disparage or otherwise harass others;</li> | |
<li>To impersonate or attempt to impersonate (e.g. deepfakes) others without their consent;</li> | |
<li>For fully automated decision making that adversely impacts an individual’s legal rights or otherwise creates or modifies a binding, enforceable obligation;</li> | |
<li>For any use intended to or which has the effect of discriminating against or harming individuals or groups based on online or offline social behavior or known or predicted personal or personality characteristics;</li> | |
<li>To exploit any of the vulnerabilities of a specific group of persons based on their age, social, physical or mental characteristics, in order to materially distort the behavior of a person pertaining to that group in a manner that causes or is likely to cause that person or another person physical or psychological harm;</li> | |
<li>For any use intended to or which has the effect of discriminating against individuals or groups based on legally protected characteristics or categories;</li> | |
<li>To provide medical advice and medical results interpretation;</li> | |
<li>To generate or disseminate information for the purpose to be used for administration of justice, law enforcement, immigration or asylum processes, such as predicting an individual will commit fraud/crime commitment (e.g. by text profiling, drawing causal relationships between assertions made in documents, indiscriminate and arbitrarily-targeted use). </li> | |
</ol> | |
<p>By using the feedback buttons, you agree that </p> | |
""" | |
CHATBOT_INSTRUCTIONS = ( | |
"1️⃣ In the first turn, ask a question or present a decision problem.\n" | |
"2️⃣ In the following turns, ask the chatbot to explain its reasoning.\n\n" | |
"💡 Note that this demo bot is hard-wired to deliberate with Guided Reasoning™️ " | |
"in the first turn only.\n\n" | |
"🔐 Chat conversations and feedback are logged (anonymously).\n" | |
"Please don't share sensitive or identity revealing information.\n\n" | |
"🙏 Benjamin is powered by the free API inference services of 🤗.\n" | |
"In case you encounter issues due to rate limits... simply try again later.\n" | |
"[We're searching sponsors to run Benjamin on 🚀 dedicated infrastructure.]\n\n" | |
"💬 We'd love to hear your feedback!\n" | |
"Please use the 👋 Community tab above to reach out.\n" | |
) | |
if RESTRICT_ACCESS: | |
df_users = pd.DataFrame(load_dataset("logikon/benjamin_access", token=os.environ["HF_DATASETS_TOKEN"])["train"]) | |
logging.info(f"Loaded user database with {len(df_users)} entries.") | |
logging.info(f"Reasoning guide expert model is {GUIDE_KWARGS['expert_model']}.") | |
def new_conversation_id(): | |
conversation_id = str(uuid.uuid4()) | |
print(f"New conversation with conversation ID: {conversation_id}") | |
return conversation_id | |
def access_granted(profile: gr.OAuthProfile | None, oauth_token: gr.OAuthToken | None) -> bool: | |
if RESTRICT_ACCESS: | |
known = profile.username in df_users.hf_account.unique() | |
access = df_users[df_users.hf_account.eq(profile.username)].status.eq("access").iloc[0] if known else False | |
else: | |
known = False | |
access = True | |
logging.info(f"User {profile.username} known: {known}, access: {access}") | |
if access: | |
os.environ["HF_TOKEN"] = oauth_token.token | |
print("set HF_TOKEN to oauth token") | |
return access | |
async def gr_server_health() -> bool: | |
try: | |
url = os.environ["GR_ENDPOINT"] + "/health" | |
headers = {'Content-type': 'application/json', "Authorization": f"Bearer {os.environ['GR_SESAM_OPEN']}"} | |
async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session: | |
async with session.get(url, headers=headers) as resp: | |
content = await resp.text() | |
if ujson.loads(content).get("status") == "ok": | |
return True | |
else: | |
logging.error(f"Server health check failed: {content}") | |
return False | |
except Exception as e: | |
logging.error(f"When checking server health: Error: {e}") | |
return False | |
async def log_like_dislike(conversation_id: gr.State, x: gr.LikeData, profile: gr.OAuthProfile | None): | |
if profile: | |
print(conversation_id, profile.name, x.index, x.liked) | |
asyncio.create_task( | |
log_feedback( | |
liked=x.liked, | |
conversation_id=conversation_id, | |
step=x.index, | |
metadata={"timestamp": pd.Timestamp.now().timestamp()} | |
) | |
) | |
def add_message(history, message, conversation_id): | |
if len(history) == 0: | |
# reset conversation id | |
conversation_id = new_conversation_id() | |
print(f"add_message: {history} \n {message}") | |
if message["text"] is not None: | |
history.append((message["text"], None)) | |
return history, gr.MultimodalTextbox(value=None, interactive=False), conversation_id | |
async def bot( | |
history, | |
tourist_model_id, | |
tourist_inference_url, | |
tourist_inference_token, | |
tourist_backend, | |
tourist_temperature, | |
conversation_id, | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None, | |
progress=gr.Progress(), | |
): | |
if not oauth_token: | |
raise gr.Error("Please sign in to use the chatbot.") | |
if not access_granted(profile, oauth_token): | |
raise gr.Error("You've not been granted access to use the chatbot. Please reach out to Logikon AI team.") | |
if not await gr_server_health(): | |
raise gr.Error("The backend server is not healthy. Please try again later.") | |
print(f"Token (type={type(oauth_token.token)}): ||{oauth_token.token}||") | |
print(f"History (conversation: {conversation_id}): {history}") | |
history_langchain_format = history_to_langchain_format(history) | |
# use guide always and exclusively at first turn | |
if len(history_langchain_format) <= 1: | |
url = os.environ["GR_ENDPOINT"] + "/guide" | |
headers = {'Content-type': 'application/json', "Authorization": f"Bearer {os.environ['GR_SESAM_OPEN']}"} | |
tourist_config = { | |
"model_id": tourist_model_id, | |
"inference_server_url": tourist_inference_url, | |
"llm_backend": tourist_backend, | |
"api_key": tourist_inference_token if tourist_inference_token else oauth_token.token, | |
**TOURIST_MODEL_KWARGS, | |
"temperature": tourist_temperature, | |
} | |
guide_config = copy.deepcopy(GUIDE_KWARGS) | |
guide_config["api_key"] = oauth_token.token # expert model api key | |
guide_config["classifier_kwargs"]["api_key"] = oauth_token.token # classifier api key | |
input_data = { | |
"message": history[-1][0], | |
"tourist_config": tourist_config, | |
"guide_config": guide_config | |
} | |
try: | |
artifacts = {} | |
progress_step = 0 | |
gr.Info("👀 Checking LLM availability... (may take a few minutes).") | |
async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session: | |
async with session.post(url, headers=headers, json=input_data) as resp: | |
while True: | |
line = await resp.content.readline() | |
if line: | |
data = ujson.loads(line) | |
if data: | |
if "error" in data: | |
msg = data["error"] | |
if "token" in msg: | |
gr.Warning( | |
"↩️ Please sign out, reload the chatbot, and sign in again.", | |
duration=0 | |
) | |
if "health checks" in msg: | |
gr.Warning( | |
"❌ LLMs are currently unavailable due to rate limits or cold start times. " | |
"↩️ Please reload and try again in a minute.", | |
duration=0 | |
) | |
raise gr.Error(msg) | |
elif data.get("type") == "progress": | |
print(data.get("value")) | |
gr.Info(data.get("value"), duration=12) | |
progress((progress_step,4)) | |
progress_step += 1 | |
elif data.get("type") is not None: | |
artifacts[data.get("type")] = data.get("value") | |
else: | |
break | |
except asyncio.TimeoutError: | |
msg = "Guided reasoning process took too long. Please try again." | |
raise gr.Error(msg) | |
except Exception as e: | |
msg = f"Error during guided reasoning: {e}" | |
raise gr.Error(msg) | |
svg = postprocess_svg(artifacts.get("svg_argmap")) | |
protocol = artifacts.get("protocol", "I'm sorry, I failed to reason about the problem.") | |
response = artifacts.pop("response", "") | |
if not response: | |
response = "I'm sorry, I failed to draft a response." | |
response = add_details(response, protocol, svg) | |
# otherwise, just chat | |
else: | |
chat_model_kwargs = { | |
"model_id": tourist_model_id, | |
"inference_server_url": tourist_inference_url, | |
"token": tourist_inference_token if tourist_inference_token else oauth_token.token, | |
"backend": tourist_backend, | |
**TOURIST_MODEL_KWARGS, | |
"temperature": tourist_temperature, | |
} | |
chat_model = get_chat_model_wrapper(**chat_model_kwargs) | |
try: | |
response = chat_model.invoke(history_langchain_format).content | |
except Exception as e: | |
msg = f"Error during chatbot inference: {e}" | |
gr.Error(msg) | |
raise ValueError(msg) | |
print(f"Response: {response}") | |
history[-1][1] = response | |
asyncio.create_task(log_messages( | |
history[-1], | |
conversation_id, | |
len(history), | |
{ | |
"tourist_llm": tourist_model_id, | |
"guide_llm": GUIDE_KWARGS["expert_model"], | |
"timestamp": pd.Timestamp.now().timestamp(), | |
} | |
)) | |
return history | |
with gr.Blocks() as demo: | |
# preamble | |
gr.Markdown(TITLE) | |
login = gr.LoginButton() | |
login.activate() | |
conversation_id = gr.State(str(uuid.uuid4())) | |
tos_approved = gr.State(False) | |
with gr.Tab(label="Chatbot", visible=False) as chatbot_tab: | |
# chatbot | |
chatbot = gr.Chatbot( | |
[], | |
elem_id="chatbot", | |
bubble_full_width=False, | |
placeholder=CHATBOT_INSTRUCTIONS, | |
) | |
chat_input = gr.MultimodalTextbox(interactive=True, file_types=["image"], placeholder="Enter message ...", show_label=False) | |
clear = gr.ClearButton([chat_input, chatbot]) | |
gr.Examples([{"text": e, "files":[]} for e in EXAMPLES], chat_input) | |
# configs | |
with gr.Accordion("Client LLM Configuration", open=False): | |
gr.Markdown("Configure your client LLM that underpins this chatbot and is guided through the reasoning process.") | |
with gr.Row(): | |
with gr.Column(2): | |
tourist_backend = gr.Dropdown(choices=[b.value for b in LLMBackends], value=LLMBackends.HFChat.value, label="LLM Inference Backend") | |
tourist_model_id = gr.Textbox(MODEL_ID, label="Model ID", max_lines=1) | |
tourist_inference_url = gr.Textbox(INFERENCE_SERVER_URL.format(model_id=MODEL_ID), label="Inference Server URL", max_lines=1) | |
tourist_inference_token = gr.Textbox("", label="Inference Token", max_lines=1, placeholder="Not required with HF Inference Api (default)", type="password") | |
with gr.Column(1): | |
tourist_temperature = gr.Slider(0, 1.0, value = TOURIST_MODEL_KWARGS["temperature"], label="Temperature") | |
# logic | |
chat_msg = chat_input.submit(add_message, [chatbot, chat_input, conversation_id], [chatbot, chat_input, conversation_id]) | |
bot_msg = chat_msg.then( | |
bot, | |
[ | |
chatbot, | |
tourist_model_id, | |
tourist_inference_url, | |
tourist_inference_token, | |
tourist_backend, | |
tourist_temperature, | |
conversation_id | |
], | |
chatbot, | |
api_name="bot_response" | |
) | |
bot_msg.then(lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input]) | |
chatbot.like(log_like_dislike, [conversation_id], None) | |
# we're resetting conversation id when drafting first response in bot() | |
# clear.click(new_conversation_id, outputs = [conversation_id]) | |
with gr.Tab(label="Terms of Service") as tos_tab: | |
gr.HTML(TERMS_OF_SERVICE) | |
tos_checkbox = gr.Checkbox(label="I agree to the terms of service") | |
tos_checkbox.input( | |
lambda x: (x, gr.Checkbox(label="I agree to the terms of service", interactive=False), gr.Tab("Chatbot", visible=True)), | |
tos_checkbox, | |
[tos_approved, tos_checkbox, chatbot_tab] | |
) | |
if __name__ == "__main__": | |
demo.queue(default_concurrency_limit=8) | |
demo.launch(show_error=True) | |