from __future__ import annotations import asyncio import copy import logging import os import uuid import ujson import aiohttp from datasets import load_dataset import gradio as gr import pandas as pd from backend.logging import log_messages, log_feedback from backend.messages_processing import add_details, history_to_langchain_format from backend.models import get_chat_model_wrapper, LLMBackends from backend.svg_processing import postprocess_svg logging.basicConfig(level=logging.DEBUG) RESTRICT_ACCESS = False INFERENCE_SERVER_URL = "https://api-inference.huggingface.co/models/{model_id}" MODEL_ID = "HuggingFaceH4/zephyr-7b-beta" TOURIST_MODEL_KWARGS = { "max_tokens": 800, "temperature": 0.6, } GUIDE_KWARGS = { "expert_model": "meta-llama/Meta-Llama-3-70B-Instruct", # "accounts/fireworks/models/nous-hermes-2-mixtral-8x7b-dpo-fp8", # "accounts/fireworks/models/llama-v3-8b-instruct-hf", # "accounts/fireworks/models/nous-hermes-2-mixtral-8x7b-dpo-fp8", "inference_server_url": "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-70B-Instruct", # "https://api.fireworks.ai/inference/v1", "llm_backend": "HFChat", "classifier_kwargs": { "model_id": "MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli", # "inference_server_url": "https://sa710i91bnjvbhir.us-east-1.aws.endpoints.huggingface.cloud", "inference_server_url": "https://api-inference.huggingface.co/models/MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli", "batch_size": 8, }, } EXAMPLES = [ ("We're a nature-loving family with three kids, have some money left, and no plans " "for next week-end. Should we visit Disneyland?"), "Should I stop eating animals?", "Bob needs a reliable and cheap car. Should he buy a Mercedes?", ('Gavin has an insurance policy that includes coverage for "General Damages," ' 'which includes losses from "missed employment due to injuries that occur ' 'under regular working conditions."\n\n' 'Gavin works as an A/C repair technician in a small town. One day, Gavin is ' 'hired to repair an air conditioner located on the second story of a building. ' 'Because Gavin is an experienced repairman, he knows that the safest way to ' 'access the unit is with a sturdy ladder. While climbing the ladder, Gavin ' 'loses his balance and falls, causing significant injury. Because of this, he ' 'subsequently has to stop working for weeks. Gavin files a claim with his ' 'insurance company for lost income.\n\n' 'Does Gavin\'s insurance policy cover his claim for lost income?'), "How many arguments did you consider in your internal reasoning? (Brief answer, please.)", "Did you consider any counterarguments in your internal reasoning?", "From all the arguments you considered and assessed, which one is the most important?", "Did you refute any arguments or reasons for lack of plausibility?" ] TITLE = """
This app is provided by Logikon AI for educational and research purposes only. The app is powered by Logikon's Guided Reasoning™️ technology, which is a novel approach to reasoning with language models. The app is a work in progress and may not always provide accurate or reliable information. By accepting these terms of service, you agree not to use the app:
By using the feedback buttons, you agree that
""" CHATBOT_INSTRUCTIONS = ( "1️⃣ In the first turn, ask a question or present a decision problem.\n" "2️⃣ In the following turns, ask the chatbot to explain its reasoning.\n\n" "💡 Note that this demo bot is hard-wired to deliberate with Guided Reasoning™️ " "in the first turn only.\n\n" "🔐 Chat conversations and feedback are logged (anonymously).\n" "Please don't share sensitive or identity revealing information.\n\n" "🙏 Benjamin is powered by the free API inference services of 🤗.\n" "In case you encounter issues due to rate limits... simply try again later.\n" "[We're searching sponsors to run Benjamin on 🚀 dedicated infrastructure.]\n\n" "💬 We'd love to hear your feedback!\n" "Please use the 👋 Community tab above to reach out.\n" ) if RESTRICT_ACCESS: df_users = pd.DataFrame(load_dataset("logikon/benjamin_access", token=os.environ["HF_DATASETS_TOKEN"])["train"]) logging.info(f"Loaded user database with {len(df_users)} entries.") logging.info(f"Reasoning guide expert model is {GUIDE_KWARGS['expert_model']}.") def new_conversation_id(): conversation_id = str(uuid.uuid4()) print(f"New conversation with conversation ID: {conversation_id}") return conversation_id def access_granted(profile: gr.OAuthProfile | None, oauth_token: gr.OAuthToken | None) -> bool: if RESTRICT_ACCESS: known = profile.username in df_users.hf_account.unique() access = df_users[df_users.hf_account.eq(profile.username)].status.eq("access").iloc[0] if known else False else: known = False access = True logging.info(f"User {profile.username} known: {known}, access: {access}") if access: os.environ["HF_TOKEN"] = oauth_token.token print("set HF_TOKEN to oauth token") return access async def gr_server_health() -> bool: try: url = os.environ["GR_ENDPOINT"] + "/health" headers = {'Content-type': 'application/json', "Authorization": f"Bearer {os.environ['GR_SESAM_OPEN']}"} async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session: async with session.get(url, headers=headers) as resp: content = await resp.text() if ujson.loads(content).get("status") == "ok": return True else: logging.error(f"Server health check failed: {content}") return False except Exception as e: logging.error(f"When checking server health: Error: {e}") return False async def log_like_dislike(conversation_id: gr.State, x: gr.LikeData, profile: gr.OAuthProfile | None): if profile: print(conversation_id, profile.name, x.index, x.liked) asyncio.create_task( log_feedback( liked=x.liked, conversation_id=conversation_id, step=x.index, metadata={"timestamp": pd.Timestamp.now().timestamp()} ) ) def add_message(history, message, conversation_id): if len(history) == 0: # reset conversation id conversation_id = new_conversation_id() print(f"add_message: {history} \n {message}") if message["text"] is not None: history.append((message["text"], None)) return history, gr.MultimodalTextbox(value=None, interactive=False), conversation_id async def bot( history, tourist_model_id, tourist_inference_url, tourist_inference_token, tourist_backend, tourist_temperature, conversation_id, profile: gr.OAuthProfile | None, oauth_token: gr.OAuthToken | None, progress=gr.Progress(), ): if not oauth_token: raise gr.Error("Please sign in to use the chatbot.") if not access_granted(profile, oauth_token): raise gr.Error("You've not been granted access to use the chatbot. Please reach out to Logikon AI team.") if not await gr_server_health(): raise gr.Error("The backend server is not healthy. Please try again later.") print(f"Token (type={type(oauth_token.token)}): ||{oauth_token.token}||") print(f"History (conversation: {conversation_id}): {history}") history_langchain_format = history_to_langchain_format(history) # use guide always and exclusively at first turn if len(history_langchain_format) <= 1: url = os.environ["GR_ENDPOINT"] + "/guide" headers = {'Content-type': 'application/json', "Authorization": f"Bearer {os.environ['GR_SESAM_OPEN']}"} tourist_config = { "model_id": tourist_model_id, "inference_server_url": tourist_inference_url, "llm_backend": tourist_backend, "api_key": tourist_inference_token if tourist_inference_token else oauth_token.token, **TOURIST_MODEL_KWARGS, "temperature": tourist_temperature, } guide_config = copy.deepcopy(GUIDE_KWARGS) guide_config["api_key"] = oauth_token.token # expert model api key guide_config["classifier_kwargs"]["api_key"] = oauth_token.token # classifier api key input_data = { "message": history[-1][0], "tourist_config": tourist_config, "guide_config": guide_config } try: artifacts = {} progress_step = 0 gr.Info("👀 Checking LLM availability... (may take a few minutes).") async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session: async with session.post(url, headers=headers, json=input_data) as resp: while True: line = await resp.content.readline() if line: data = ujson.loads(line) if data: if "error" in data: msg = data["error"] if "token" in msg: gr.Warning( "↩️ Please sign out, reload the chatbot, and sign in again.", duration=0 ) if "health checks" in msg: gr.Warning( "❌ LLMs are currently unavailable due to rate limits or cold start times. " "↩️ Please reload and try again in a minute.", duration=0 ) raise gr.Error(msg) elif data.get("type") == "progress": print(data.get("value")) gr.Info(data.get("value"), duration=12) progress((progress_step,4)) progress_step += 1 elif data.get("type") is not None: artifacts[data.get("type")] = data.get("value") else: break except asyncio.TimeoutError: msg = "Guided reasoning process took too long. Please try again." raise gr.Error(msg) except Exception as e: msg = f"Error during guided reasoning: {e}" raise gr.Error(msg) svg = postprocess_svg(artifacts.get("svg_argmap")) protocol = artifacts.get("protocol", "I'm sorry, I failed to reason about the problem.") response = artifacts.pop("response", "") if not response: response = "I'm sorry, I failed to draft a response." response = add_details(response, protocol, svg) # otherwise, just chat else: chat_model_kwargs = { "model_id": tourist_model_id, "inference_server_url": tourist_inference_url, "token": tourist_inference_token if tourist_inference_token else oauth_token.token, "backend": tourist_backend, **TOURIST_MODEL_KWARGS, "temperature": tourist_temperature, } chat_model = get_chat_model_wrapper(**chat_model_kwargs) try: response = chat_model.invoke(history_langchain_format).content except Exception as e: msg = f"Error during chatbot inference: {e}" gr.Error(msg) raise ValueError(msg) print(f"Response: {response}") history[-1][1] = response asyncio.create_task(log_messages( history[-1], conversation_id, len(history), { "tourist_llm": tourist_model_id, "guide_llm": GUIDE_KWARGS["expert_model"], "timestamp": pd.Timestamp.now().timestamp(), } )) return history with gr.Blocks() as demo: # preamble gr.Markdown(TITLE) login = gr.LoginButton() login.activate() conversation_id = gr.State(str(uuid.uuid4())) tos_approved = gr.State(False) with gr.Tab(label="Chatbot", visible=False) as chatbot_tab: # chatbot chatbot = gr.Chatbot( [], elem_id="chatbot", bubble_full_width=False, placeholder=CHATBOT_INSTRUCTIONS, ) chat_input = gr.MultimodalTextbox(interactive=True, file_types=["image"], placeholder="Enter message ...", show_label=False) clear = gr.ClearButton([chat_input, chatbot]) gr.Examples([{"text": e, "files":[]} for e in EXAMPLES], chat_input) # configs with gr.Accordion("Client LLM Configuration", open=False): gr.Markdown("Configure your client LLM that underpins this chatbot and is guided through the reasoning process.") with gr.Row(): with gr.Column(2): tourist_backend = gr.Dropdown(choices=[b.value for b in LLMBackends], value=LLMBackends.HFChat.value, label="LLM Inference Backend") tourist_model_id = gr.Textbox(MODEL_ID, label="Model ID", max_lines=1) tourist_inference_url = gr.Textbox(INFERENCE_SERVER_URL.format(model_id=MODEL_ID), label="Inference Server URL", max_lines=1) tourist_inference_token = gr.Textbox("", label="Inference Token", max_lines=1, placeholder="Not required with HF Inference Api (default)", type="password") with gr.Column(1): tourist_temperature = gr.Slider(0, 1.0, value = TOURIST_MODEL_KWARGS["temperature"], label="Temperature") # logic chat_msg = chat_input.submit(add_message, [chatbot, chat_input, conversation_id], [chatbot, chat_input, conversation_id]) bot_msg = chat_msg.then( bot, [ chatbot, tourist_model_id, tourist_inference_url, tourist_inference_token, tourist_backend, tourist_temperature, conversation_id ], chatbot, api_name="bot_response" ) bot_msg.then(lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input]) chatbot.like(log_like_dislike, [conversation_id], None) # we're resetting conversation id when drafting first response in bot() # clear.click(new_conversation_id, outputs = [conversation_id]) with gr.Tab(label="Terms of Service") as tos_tab: gr.HTML(TERMS_OF_SERVICE) tos_checkbox = gr.Checkbox(label="I agree to the terms of service") tos_checkbox.input( lambda x: (x, gr.Checkbox(label="I agree to the terms of service", interactive=False), gr.Tab("Chatbot", visible=True)), tos_checkbox, [tos_approved, tos_checkbox, chatbot_tab] ) if __name__ == "__main__": demo.queue(default_concurrency_limit=8) demo.launch(show_error=True)