sidekick_poc / app.py
rchrdgwr's picture
MVP
dacc583
raw
history blame
15.2 kB
import asyncio
import chainlit as cl
import json
import os
from classes import SessionState
from datetime import datetime
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from utils_data import get_company_data, get_opportunities, get_questions, get_customer_background
from utils_prompt import get_user_template, get_system_template
from utils_prompt import get_chat_prompt
from utils_output import display_evaluation_results, display_llm_responses
llm_model = "gpt-4o-mini"
# llm_model = "gpt-4o"
# llm_model = "gpt-3.5-turbo"
# llm_model = "gpt-4o-2024-08-06"
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
user_template = get_user_template()
system_template = get_system_template()
#############################################
# Action callbacks
#############################################
@cl.action_callback("Run a Scenario")
async def on_action(action):
scenarios = get_opportunities()
cl.user_session.set("scenarios", scenarios)
scenarios = cl.user_session.get("scenarios", None)
if scenarios is None:
await cl.Message(content="No scenarios found.").send()
return
scenario_actions = []
for idx, row in scenarios.iterrows():
if row['Opportunity Description'] != "":
scenario_action = cl.Action(
name="Scenario",
value=f"{idx}", # Send the row index as value
description=f"{row['Customer Name']}: {row['Opportunity Name']} ({row['Opportunity Stage']}) "
f"Value: {row['Opportunity Value']}. Meeting with {row['Customer Contact']} "
f"({row['Customer Contact Role']})"
)
scenario_actions.append(scenario_action)
await cl.Message(content="Select a scenario (hover for details):", actions=scenario_actions).send()
@cl.action_callback("Scenario")
async def on_action(action):
await cl.Message(content="Please wait, I am gathering information...").send()
index = int(action.value)
scenarios = cl.user_session.get("scenarios", None)
if scenarios is None:
await cl.Message(content="No scenarios found.").send()
return
await cl.Message(content="...gathering scenario information").send()
await asyncio.sleep(1)
await cl.Message(content="...creating questions").send()
await asyncio.sleep(1)
selected_scenario = scenarios.iloc[index]
this_session = cl.user_session.get("session", None)
this_session.add_scenario_info(selected_scenario)
get_customer_background(this_session, selected_scenario['Customer Name'])
this_session.questions = get_questions(this_session.opportunity.stage, this_session.num_questions)
opening_message = this_session.get_opening()
await cl.Message(content=opening_message).send()
start_actions = [
cl.Action(name="Start Scenario", value="start_scenario", description="Start Scenario"),
]
await cl.Message(content="Click to start scenario", actions=start_actions).send()
@cl.action_callback("Start Scenario")
async def on_action(action):
this_session = cl.user_session.get("session", None)
start_time = datetime.now()
this_session.start_time = start_time
output = f"{this_session.customer.contact_name} joins the zoom call"
print(output)
await cl.Message(content=output).send()
@cl.action_callback("Evaluate Performance")
async def on_action(action):
this_session = cl.user_session.get("session", None)
if this_session is None:
await cl.Message(content="No session found.").send()
return
await display_evaluation_results(cl, this_session)
@cl.action_callback("Display Queries and Responses")
async def on_action(action):
this_session = cl.user_session.get("session", None)
if this_session is None:
await cl.Message(content="No session found.").send()
return
await display_llm_responses(cl, this_session)
#############################################
### On Chat Start (Session Start) Section ###
#############################################
@cl.on_chat_start
async def on_chat_start():
this_session = SessionState()
get_company_data(this_session)
cl.user_session.set("session", this_session)
cl.user_session.set("message_count", 0)
cl.user_session.set("follow_up_questions", 0)
chat_prompt = get_chat_prompt()
chat_model = ChatOpenAI(model=llm_model)
simple_chain = chat_prompt | chat_model
cl.user_session.set("chain", simple_chain)
welcome_message = f"**Welcome to {this_session.company.name} SalesBuddy**\n*Home of {this_session.company.product}*"
await cl.Message(content=welcome_message).send()
await cl.Message(content=this_session.company.product_summary).send()
scenarios = get_opportunities()
cl.user_session.set("scenarios", scenarios)
scenarios = cl.user_session.get("scenarios", None)
if scenarios is None:
await cl.Message(content="No scenarios found.").send()
return
scenario_actions = []
for idx, row in scenarios.iterrows():
if row['Opportunity Description'] != "":
scenario_action = cl.Action(
name="Scenario",
value=f"{idx}", # Send the row index as value
description=f"{row['Customer Name']}: {row['Opportunity Name']} ({row['Opportunity Stage']}) "
f"Value: {row['Opportunity Value']}. Meeting with {row['Customer Contact']} "
f"({row['Customer Contact Role']})"
)
scenario_actions.append(scenario_action)
await cl.Message(content="Select a scenario (hover for details):", actions=scenario_actions).send()
#########################################################
### On Message Section - called for each User Message ###
#########################################################
@cl.on_message
async def main(message: cl.Message):
content = message.content.strip()
this_session = cl.user_session.get("session", None)
if content.startswith('!'):
# Tis is a control message
print("Received control message:", content[1:])
await handle_control_message(content[1:])
else:
if this_session.status == "active":
chain = cl.user_session.get("chain")
history = cl.user_session.get("history", [])
history.append({"role": "user", "content": message})
this_session.previous_answer = message.content
prompt_parm = prepare_chain_parameters(this_session, message, history)
this_session.queries.append(prompt_parm)
response_content = chain.invoke(prompt_parm)
json_str = response_content.content.strip('```json\n').strip('```')
try:
this_response = json.loads(json_str)
except json.JSONDecodeError as e:
print(f"JSON Decode Error: {e}")
print(response_content.content)
print(f"Error at position {e.pos}: {json_str[max(0, e.pos-10):e.pos+10]}")
this_response = {"Response": "Error receiving response from LLM"}
llm_response = this_response.get("Response", "No response from LLM")
print("LLM Response:")
print(llm_response)
this_session.llm_responses.append(this_response)
print("Next question:")
print(this_response.get("Question", "No question"))
if this_session.question != "":
this_session.responses.append({
"question_number": this_session.current_question_index,
"question": this_session.question,
"response": this_session.rep_answer,
"ground_truth": this_session.ground_truth,
"response_score": this_response.get("Score", "No score"),
"response_evaluation": this_response.get("Evaluation", "No evaluation"),
"mood_score": this_response.get("Mood Score", "No mood score"),
"overall_score": this_response.get("Overall Score", "No overall score"),
"overall_evaluation": this_response.get("Overall Evaluation", "No overall evaluation"),
})
message_to_rep = llm_response + "\n\n" + this_response.get("Question", "No question")
if this_session.do_voice:
print(f"Voice Response: {message_to_rep}")
else:
await cl.Message(message_to_rep).send()
# await cl.Message(this_response).send()
history.append({"role": "assistant", "content": response_content})
cl.user_session.set("history", history)
this_session.current_question_index += 1
if this_session.current_question_index > len(this_session.questions):
this_session.status = "complete"
end_time = datetime.now()
duration = end_time - this_session.start_time
duration_minutes = round(duration.total_seconds() / 60)
this_session.end_time = end_time
this_session.duration_minutes = duration_minutes
if this_session.do_evaluation:
await display_evaluation_results(cl, this_session)
else:
evaluate_actions = [
cl.Action(name="Evaluate Performance", value="evaluate", description="Evaluate Performance"),
cl.Action(name="Display Queries and Responses", value="display_llm_responses", description="Display LLM Responses")
]
await cl.Message(content="Click to evaluate", actions=evaluate_actions).send()
#############################################
### Support Functions
#############################################
# def display_responses(responses):
# table_data = []
# for resp in responses:
# table_data.append([
# resp["question_number"],
# resp["question"],
# resp["response"],
# resp["ground_truth"],
# resp["response_score"],
# resp["response_evaluation"],
# resp["mood_score"],
# resp["overall_evaluation"]
# ])
# return table_data
def prepare_chain_parameters(this_session, message, history):
message = message.content
previous_question = ""
rep_answer = ""
next_question = ""
ground_truth = ""
command = ""
if this_session.current_question_index == 0:
previous_question = ""
rep_answer = ""
ground_truth = ""
next_question = this_session.questions[this_session.current_question_index]["question"]
command = "You should greet the rep"
elif this_session.current_question_index >= len(this_session.questions):
next_question = ""
previous_question = this_session.questions[this_session.current_question_index - 1]["question"]
rep_answer = this_session.previous_answer
ground_truth = this_session.questions[this_session.current_question_index - 1]["ground_truth"]
command = """Thank the customer, offer a comment on the answer and overall performance.
Conclude the conversation with a summary and give a farewell.
If the answers were good, give a positive farewell and offer a follow up meeting.
If the answers were poor, give a poor farewell.
You can add additional comments as needed.
"""
else:
previous_question = this_session.questions[this_session.current_question_index - 1]["question"]
rep_answer = this_session.previous_answer
next_question = this_session.questions[this_session.current_question_index]["question"]
ground_truth = this_session.questions[this_session.current_question_index]["ground_truth"]
command = "You should respond to the answer based on how well the rep answered the previous question."
this_session.ground_truth = ground_truth
this_session.question = previous_question
this_session.rep_answer = rep_answer
print("--------------------------------")
print(f"Message: {message}")
print("Sending the following:")
print(f"Command: {command}")
print(f"Previous question: {previous_question}")
print(f"Rep answer: {rep_answer}")
print(f"Next question: {next_question}")
rep_company_details = f"""
Name: {this_session.company.name}
Description: {this_session.company.description}
Product: {this_session.company.product}
Product Summary: {this_session.company.product_summary}
Product Description: {this_session.company.product_description}
"""
company_details = f"""
Name: {this_session.customer.name}
Description: {this_session.customer.background}
"""
scenario = f"""
Opportunity Name: {this_session.opportunity.name}
Opportunity Description: {this_session.opportunity.description}
Opportunity Stage: {this_session.opportunity.stage}
Opportunity Value: {this_session.opportunity.value}
Opportunity Close Date: {this_session.opportunity.close_date}
"""
parm = {"conversation_mode": this_session.qa_mode,
"message": message,
"name": this_session.customer.contact_name,
"company": company_details,
"role": this_session.customer.contact_role,
"sales_rep": "Tony Snell",
"rep_company": rep_company_details,
"attitude": this_session.attitude,
"mood_score": this_session.mood_score,
"scenario": scenario,
"stage": this_session.opportunity.stage,
"previous_question": previous_question,
"next_question": next_question,
"rep_answer": rep_answer,
"conversation_history": history,
"command": command,
}
return parm
async def handle_control_message(command: str):
command_parts = command.split()
main_command = command_parts[0].lower()
if main_command == 'start':
await cl.Message(content="Starting new session...").send()
elif main_command == 'stop':
this_session = cl.user_session.get("session")
end_time = datetime.now()
duration = end_time - this_session.start_time
duration_minutes = round(duration.total_seconds() / 60)
this_session.end_time = end_time
this_session.duration_minutes = duration_minutes
await cl.Message(content=f"Ending current session after {this_session.duration_minutes} minutes").send()
elif main_command == 'pause':
await cl.Message(content="Ending current session...").send()
elif main_command == 'time':
this_session = cl.user_session.get("session")
duration = this_session.get_session_duration()
await cl.Message(content=f"Current session duration: {duration}").send()
else:
await cl.Message(content=f"Unknown command: {main_command}").send()