Spaces:
Build error
Build error
import asyncio | |
import chainlit as cl | |
import json | |
import os | |
from classes import SessionState | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from langchain_openai import ChatOpenAI | |
from utils_data import get_company_data, get_opportunities, get_questions, get_customer_background | |
from utils_prompt import get_user_template, get_system_template | |
from utils_prompt import get_chat_prompt | |
from utils_output import display_evaluation_results, display_llm_responses | |
llm_model = "gpt-4o-mini" | |
# llm_model = "gpt-4o" | |
# llm_model = "gpt-3.5-turbo" | |
# llm_model = "gpt-4o-2024-08-06" | |
load_dotenv() | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
user_template = get_user_template() | |
system_template = get_system_template() | |
############################################# | |
# Action callbacks | |
############################################# | |
async def on_action(action): | |
scenarios = get_opportunities() | |
cl.user_session.set("scenarios", scenarios) | |
scenarios = cl.user_session.get("scenarios", None) | |
if scenarios is None: | |
await cl.Message(content="No scenarios found.").send() | |
return | |
scenario_actions = [] | |
for idx, row in scenarios.iterrows(): | |
if row['Opportunity Description'] != "": | |
scenario_action = cl.Action( | |
name="Scenario", | |
value=f"{idx}", # Send the row index as value | |
description=f"{row['Customer Name']}: {row['Opportunity Name']} ({row['Opportunity Stage']}) " | |
f"Value: {row['Opportunity Value']}. Meeting with {row['Customer Contact']} " | |
f"({row['Customer Contact Role']})" | |
) | |
scenario_actions.append(scenario_action) | |
await cl.Message(content="Select a scenario (hover for details):", actions=scenario_actions).send() | |
async def on_action(action): | |
await cl.Message(content="Please wait, I am gathering information...").send() | |
index = int(action.value) | |
scenarios = cl.user_session.get("scenarios", None) | |
if scenarios is None: | |
await cl.Message(content="No scenarios found.").send() | |
return | |
await cl.Message(content="...gathering scenario information").send() | |
await asyncio.sleep(1) | |
await cl.Message(content="...creating questions").send() | |
await asyncio.sleep(1) | |
selected_scenario = scenarios.iloc[index] | |
this_session = cl.user_session.get("session", None) | |
this_session.add_scenario_info(selected_scenario) | |
get_customer_background(this_session, selected_scenario['Customer Name']) | |
this_session.questions = get_questions(this_session.opportunity.stage, this_session.num_questions) | |
opening_message = this_session.get_opening() | |
await cl.Message(content=opening_message).send() | |
start_actions = [ | |
cl.Action(name="Start Scenario", value="start_scenario", description="Start Scenario"), | |
] | |
await cl.Message(content="Click to start scenario", actions=start_actions).send() | |
async def on_action(action): | |
this_session = cl.user_session.get("session", None) | |
start_time = datetime.now() | |
this_session.start_time = start_time | |
output = f"{this_session.customer.contact_name} joins the zoom call" | |
print(output) | |
await cl.Message(content=output).send() | |
async def on_action(action): | |
this_session = cl.user_session.get("session", None) | |
if this_session is None: | |
await cl.Message(content="No session found.").send() | |
return | |
await display_evaluation_results(cl, this_session) | |
async def on_action(action): | |
this_session = cl.user_session.get("session", None) | |
if this_session is None: | |
await cl.Message(content="No session found.").send() | |
return | |
await display_llm_responses(cl, this_session) | |
############################################# | |
### On Chat Start (Session Start) Section ### | |
############################################# | |
async def on_chat_start(): | |
this_session = SessionState() | |
get_company_data(this_session) | |
cl.user_session.set("session", this_session) | |
cl.user_session.set("message_count", 0) | |
cl.user_session.set("follow_up_questions", 0) | |
chat_prompt = get_chat_prompt() | |
chat_model = ChatOpenAI(model=llm_model) | |
simple_chain = chat_prompt | chat_model | |
cl.user_session.set("chain", simple_chain) | |
welcome_message = f"**Welcome to {this_session.company.name} SalesBuddy**\n*Home of {this_session.company.product}*" | |
await cl.Message(content=welcome_message).send() | |
await cl.Message(content=this_session.company.product_summary).send() | |
scenarios = get_opportunities() | |
cl.user_session.set("scenarios", scenarios) | |
scenarios = cl.user_session.get("scenarios", None) | |
if scenarios is None: | |
await cl.Message(content="No scenarios found.").send() | |
return | |
scenario_actions = [] | |
for idx, row in scenarios.iterrows(): | |
if row['Opportunity Description'] != "": | |
scenario_action = cl.Action( | |
name="Scenario", | |
value=f"{idx}", # Send the row index as value | |
description=f"{row['Customer Name']}: {row['Opportunity Name']} ({row['Opportunity Stage']}) " | |
f"Value: {row['Opportunity Value']}. Meeting with {row['Customer Contact']} " | |
f"({row['Customer Contact Role']})" | |
) | |
scenario_actions.append(scenario_action) | |
await cl.Message(content="Select a scenario (hover for details):", actions=scenario_actions).send() | |
######################################################### | |
### On Message Section - called for each User Message ### | |
######################################################### | |
async def main(message: cl.Message): | |
content = message.content.strip() | |
this_session = cl.user_session.get("session", None) | |
if content.startswith('!'): | |
# Tis is a control message | |
print("Received control message:", content[1:]) | |
await handle_control_message(content[1:]) | |
else: | |
if this_session.status == "active": | |
chain = cl.user_session.get("chain") | |
history = cl.user_session.get("history", []) | |
history.append({"role": "user", "content": message}) | |
this_session.previous_answer = message.content | |
prompt_parm = prepare_chain_parameters(this_session, message, history) | |
this_session.queries.append(prompt_parm) | |
response_content = chain.invoke(prompt_parm) | |
json_str = response_content.content.strip('```json\n').strip('```') | |
try: | |
this_response = json.loads(json_str) | |
except json.JSONDecodeError as e: | |
print(f"JSON Decode Error: {e}") | |
print(response_content.content) | |
print(f"Error at position {e.pos}: {json_str[max(0, e.pos-10):e.pos+10]}") | |
this_response = {"Response": "Error receiving response from LLM"} | |
llm_response = this_response.get("Response", "No response from LLM") | |
print("LLM Response:") | |
print(llm_response) | |
this_session.llm_responses.append(this_response) | |
print("Next question:") | |
print(this_response.get("Question", "No question")) | |
if this_session.question != "": | |
this_session.responses.append({ | |
"question_number": this_session.current_question_index, | |
"question": this_session.question, | |
"response": this_session.rep_answer, | |
"ground_truth": this_session.ground_truth, | |
"response_score": this_response.get("Score", "No score"), | |
"response_evaluation": this_response.get("Evaluation", "No evaluation"), | |
"mood_score": this_response.get("Mood Score", "No mood score"), | |
"overall_score": this_response.get("Overall Score", "No overall score"), | |
"overall_evaluation": this_response.get("Overall Evaluation", "No overall evaluation"), | |
}) | |
message_to_rep = llm_response + "\n\n" + this_response.get("Question", "No question") | |
if this_session.do_voice: | |
print(f"Voice Response: {message_to_rep}") | |
else: | |
await cl.Message(message_to_rep).send() | |
# await cl.Message(this_response).send() | |
history.append({"role": "assistant", "content": response_content}) | |
cl.user_session.set("history", history) | |
this_session.current_question_index += 1 | |
if this_session.current_question_index > len(this_session.questions): | |
this_session.status = "complete" | |
end_time = datetime.now() | |
duration = end_time - this_session.start_time | |
duration_minutes = round(duration.total_seconds() / 60) | |
this_session.end_time = end_time | |
this_session.duration_minutes = duration_minutes | |
if this_session.do_evaluation: | |
await display_evaluation_results(cl, this_session) | |
else: | |
evaluate_actions = [ | |
cl.Action(name="Evaluate Performance", value="evaluate", description="Evaluate Performance"), | |
cl.Action(name="Display Queries and Responses", value="display_llm_responses", description="Display LLM Responses") | |
] | |
await cl.Message(content="Click to evaluate", actions=evaluate_actions).send() | |
############################################# | |
### Support Functions | |
############################################# | |
# def display_responses(responses): | |
# table_data = [] | |
# for resp in responses: | |
# table_data.append([ | |
# resp["question_number"], | |
# resp["question"], | |
# resp["response"], | |
# resp["ground_truth"], | |
# resp["response_score"], | |
# resp["response_evaluation"], | |
# resp["mood_score"], | |
# resp["overall_evaluation"] | |
# ]) | |
# return table_data | |
def prepare_chain_parameters(this_session, message, history): | |
message = message.content | |
previous_question = "" | |
rep_answer = "" | |
next_question = "" | |
ground_truth = "" | |
command = "" | |
if this_session.current_question_index == 0: | |
previous_question = "" | |
rep_answer = "" | |
ground_truth = "" | |
next_question = this_session.questions[this_session.current_question_index]["question"] | |
command = "You should greet the rep" | |
elif this_session.current_question_index >= len(this_session.questions): | |
next_question = "" | |
previous_question = this_session.questions[this_session.current_question_index - 1]["question"] | |
rep_answer = this_session.previous_answer | |
ground_truth = this_session.questions[this_session.current_question_index - 1]["ground_truth"] | |
command = """Thank the customer, offer a comment on the answer and overall performance. | |
Conclude the conversation with a summary and give a farewell. | |
If the answers were good, give a positive farewell and offer a follow up meeting. | |
If the answers were poor, give a poor farewell. | |
You can add additional comments as needed. | |
""" | |
else: | |
previous_question = this_session.questions[this_session.current_question_index - 1]["question"] | |
rep_answer = this_session.previous_answer | |
next_question = this_session.questions[this_session.current_question_index]["question"] | |
ground_truth = this_session.questions[this_session.current_question_index]["ground_truth"] | |
command = "You should respond to the answer based on how well the rep answered the previous question." | |
this_session.ground_truth = ground_truth | |
this_session.question = previous_question | |
this_session.rep_answer = rep_answer | |
print("--------------------------------") | |
print(f"Message: {message}") | |
print("Sending the following:") | |
print(f"Command: {command}") | |
print(f"Previous question: {previous_question}") | |
print(f"Rep answer: {rep_answer}") | |
print(f"Next question: {next_question}") | |
rep_company_details = f""" | |
Name: {this_session.company.name} | |
Description: {this_session.company.description} | |
Product: {this_session.company.product} | |
Product Summary: {this_session.company.product_summary} | |
Product Description: {this_session.company.product_description} | |
""" | |
company_details = f""" | |
Name: {this_session.customer.name} | |
Description: {this_session.customer.background} | |
""" | |
scenario = f""" | |
Opportunity Name: {this_session.opportunity.name} | |
Opportunity Description: {this_session.opportunity.description} | |
Opportunity Stage: {this_session.opportunity.stage} | |
Opportunity Value: {this_session.opportunity.value} | |
Opportunity Close Date: {this_session.opportunity.close_date} | |
""" | |
parm = {"conversation_mode": this_session.qa_mode, | |
"message": message, | |
"name": this_session.customer.contact_name, | |
"company": company_details, | |
"role": this_session.customer.contact_role, | |
"sales_rep": "Tony Snell", | |
"rep_company": rep_company_details, | |
"attitude": this_session.attitude, | |
"mood_score": this_session.mood_score, | |
"scenario": scenario, | |
"stage": this_session.opportunity.stage, | |
"previous_question": previous_question, | |
"next_question": next_question, | |
"rep_answer": rep_answer, | |
"conversation_history": history, | |
"command": command, | |
} | |
return parm | |
async def handle_control_message(command: str): | |
command_parts = command.split() | |
main_command = command_parts[0].lower() | |
if main_command == 'start': | |
await cl.Message(content="Starting new session...").send() | |
elif main_command == 'stop': | |
this_session = cl.user_session.get("session") | |
end_time = datetime.now() | |
duration = end_time - this_session.start_time | |
duration_minutes = round(duration.total_seconds() / 60) | |
this_session.end_time = end_time | |
this_session.duration_minutes = duration_minutes | |
await cl.Message(content=f"Ending current session after {this_session.duration_minutes} minutes").send() | |
elif main_command == 'pause': | |
await cl.Message(content="Ending current session...").send() | |
elif main_command == 'time': | |
this_session = cl.user_session.get("session") | |
duration = this_session.get_session_duration() | |
await cl.Message(content=f"Current session duration: {duration}").send() | |
else: | |
await cl.Message(content=f"Unknown command: {main_command}").send() | |