import asyncio import chainlit as cl import json import os from classes import SessionState from datetime import datetime from dotenv import load_dotenv from langchain_openai import ChatOpenAI from utils_data import get_company_data, get_opportunities, get_questions, get_customer_background from utils_prompt import get_user_template, get_system_template from utils_prompt import get_chat_prompt from utils_output import display_evaluation_results, display_llm_responses llm_model = "gpt-4o-mini" # llm_model = "gpt-4o" # llm_model = "gpt-3.5-turbo" # llm_model = "gpt-4o-2024-08-06" load_dotenv() openai_api_key = os.getenv("OPENAI_API_KEY") user_template = get_user_template() system_template = get_system_template() ############################################# # Action callbacks ############################################# @cl.action_callback("Run a Scenario") async def on_action(action): scenarios = get_opportunities() cl.user_session.set("scenarios", scenarios) scenarios = cl.user_session.get("scenarios", None) if scenarios is None: await cl.Message(content="No scenarios found.").send() return scenario_actions = [] for idx, row in scenarios.iterrows(): if row['Opportunity Description'] != "": scenario_action = cl.Action( name="Scenario", value=f"{idx}", # Send the row index as value description=f"{row['Customer Name']}: {row['Opportunity Name']} ({row['Opportunity Stage']}) " f"Value: {row['Opportunity Value']}. Meeting with {row['Customer Contact']} " f"({row['Customer Contact Role']})" ) scenario_actions.append(scenario_action) await cl.Message(content="Select a scenario (hover for details):", actions=scenario_actions).send() @cl.action_callback("Scenario") async def on_action(action): await cl.Message(content="Please wait, I am gathering information...").send() index = int(action.value) scenarios = cl.user_session.get("scenarios", None) if scenarios is None: await cl.Message(content="No scenarios found.").send() return await cl.Message(content="...gathering scenario information").send() await asyncio.sleep(1) await cl.Message(content="...creating questions").send() await asyncio.sleep(1) selected_scenario = scenarios.iloc[index] this_session = cl.user_session.get("session", None) this_session.add_scenario_info(selected_scenario) get_customer_background(this_session, selected_scenario['Customer Name']) this_session.questions = get_questions(this_session.opportunity.stage, this_session.num_questions) opening_message = this_session.get_opening() await cl.Message(content=opening_message).send() start_actions = [ cl.Action(name="Start Scenario", value="start_scenario", description="Start Scenario"), ] await cl.Message(content="Click to start scenario", actions=start_actions).send() @cl.action_callback("Start Scenario") async def on_action(action): this_session = cl.user_session.get("session", None) start_time = datetime.now() this_session.start_time = start_time output = f"{this_session.customer.contact_name} joins the zoom call" print(output) await cl.Message(content=output).send() @cl.action_callback("Evaluate Performance") async def on_action(action): this_session = cl.user_session.get("session", None) if this_session is None: await cl.Message(content="No session found.").send() return await display_evaluation_results(cl, this_session) @cl.action_callback("Display Queries and Responses") async def on_action(action): this_session = cl.user_session.get("session", None) if this_session is None: await cl.Message(content="No session found.").send() return await display_llm_responses(cl, this_session) ############################################# ### On Chat Start (Session Start) Section ### ############################################# @cl.on_chat_start async def on_chat_start(): this_session = SessionState() get_company_data(this_session) cl.user_session.set("session", this_session) cl.user_session.set("message_count", 0) cl.user_session.set("follow_up_questions", 0) chat_prompt = get_chat_prompt() chat_model = ChatOpenAI(model=llm_model) simple_chain = chat_prompt | chat_model cl.user_session.set("chain", simple_chain) welcome_message = f"**Welcome to {this_session.company.name} SalesBuddy**\n*Home of {this_session.company.product}*" await cl.Message(content=welcome_message).send() await cl.Message(content=this_session.company.product_summary).send() scenarios = get_opportunities() cl.user_session.set("scenarios", scenarios) scenarios = cl.user_session.get("scenarios", None) if scenarios is None: await cl.Message(content="No scenarios found.").send() return scenario_actions = [] for idx, row in scenarios.iterrows(): if row['Opportunity Description'] != "": scenario_action = cl.Action( name="Scenario", value=f"{idx}", # Send the row index as value description=f"{row['Customer Name']}: {row['Opportunity Name']} ({row['Opportunity Stage']}) " f"Value: {row['Opportunity Value']}. Meeting with {row['Customer Contact']} " f"({row['Customer Contact Role']})" ) scenario_actions.append(scenario_action) await cl.Message(content="Select a scenario (hover for details):", actions=scenario_actions).send() ######################################################### ### On Message Section - called for each User Message ### ######################################################### @cl.on_message async def main(message: cl.Message): content = message.content.strip() this_session = cl.user_session.get("session", None) if content.startswith('!'): # Tis is a control message print("Received control message:", content[1:]) await handle_control_message(content[1:]) else: if this_session.status == "active": chain = cl.user_session.get("chain") history = cl.user_session.get("history", []) history.append({"role": "user", "content": message}) this_session.previous_answer = message.content prompt_parm = prepare_chain_parameters(this_session, message, history) this_session.queries.append(prompt_parm) response_content = chain.invoke(prompt_parm) json_str = response_content.content.strip('```json\n').strip('```') try: this_response = json.loads(json_str) except json.JSONDecodeError as e: print(f"JSON Decode Error: {e}") print(response_content.content) print(f"Error at position {e.pos}: {json_str[max(0, e.pos-10):e.pos+10]}") this_response = {"Response": "Error receiving response from LLM"} llm_response = this_response.get("Response", "No response from LLM") print("LLM Response:") print(llm_response) this_session.llm_responses.append(this_response) print("Next question:") print(this_response.get("Question", "No question")) if this_session.question != "": this_session.responses.append({ "question_number": this_session.current_question_index, "question": this_session.question, "response": this_session.rep_answer, "ground_truth": this_session.ground_truth, "response_score": this_response.get("Score", "No score"), "response_evaluation": this_response.get("Evaluation", "No evaluation"), "mood_score": this_response.get("Mood Score", "No mood score"), "overall_score": this_response.get("Overall Score", "No overall score"), "overall_evaluation": this_response.get("Overall Evaluation", "No overall evaluation"), }) message_to_rep = llm_response + "\n\n" + this_response.get("Question", "No question") if this_session.do_voice: print(f"Voice Response: {message_to_rep}") else: await cl.Message(message_to_rep).send() # await cl.Message(this_response).send() history.append({"role": "assistant", "content": response_content}) cl.user_session.set("history", history) this_session.current_question_index += 1 if this_session.current_question_index > len(this_session.questions): this_session.status = "complete" end_time = datetime.now() duration = end_time - this_session.start_time duration_minutes = round(duration.total_seconds() / 60) this_session.end_time = end_time this_session.duration_minutes = duration_minutes if this_session.do_evaluation: await display_evaluation_results(cl, this_session) else: evaluate_actions = [ cl.Action(name="Evaluate Performance", value="evaluate", description="Evaluate Performance"), cl.Action(name="Display Queries and Responses", value="display_llm_responses", description="Display LLM Responses") ] await cl.Message(content="Click to evaluate", actions=evaluate_actions).send() ############################################# ### Support Functions ############################################# # def display_responses(responses): # table_data = [] # for resp in responses: # table_data.append([ # resp["question_number"], # resp["question"], # resp["response"], # resp["ground_truth"], # resp["response_score"], # resp["response_evaluation"], # resp["mood_score"], # resp["overall_evaluation"] # ]) # return table_data def prepare_chain_parameters(this_session, message, history): message = message.content previous_question = "" rep_answer = "" next_question = "" ground_truth = "" command = "" if this_session.current_question_index == 0: previous_question = "" rep_answer = "" ground_truth = "" next_question = this_session.questions[this_session.current_question_index]["question"] command = "You should greet the rep" elif this_session.current_question_index >= len(this_session.questions): next_question = "" previous_question = this_session.questions[this_session.current_question_index - 1]["question"] rep_answer = this_session.previous_answer ground_truth = this_session.questions[this_session.current_question_index - 1]["ground_truth"] command = """Thank the customer, offer a comment on the answer and overall performance. Conclude the conversation with a summary and give a farewell. If the answers were good, give a positive farewell and offer a follow up meeting. If the answers were poor, give a poor farewell. You can add additional comments as needed. """ else: previous_question = this_session.questions[this_session.current_question_index - 1]["question"] rep_answer = this_session.previous_answer next_question = this_session.questions[this_session.current_question_index]["question"] ground_truth = this_session.questions[this_session.current_question_index]["ground_truth"] command = "You should respond to the answer based on how well the rep answered the previous question." this_session.ground_truth = ground_truth this_session.question = previous_question this_session.rep_answer = rep_answer print("--------------------------------") print(f"Message: {message}") print("Sending the following:") print(f"Command: {command}") print(f"Previous question: {previous_question}") print(f"Rep answer: {rep_answer}") print(f"Next question: {next_question}") rep_company_details = f""" Name: {this_session.company.name} Description: {this_session.company.description} Product: {this_session.company.product} Product Summary: {this_session.company.product_summary} Product Description: {this_session.company.product_description} """ company_details = f""" Name: {this_session.customer.name} Description: {this_session.customer.background} """ scenario = f""" Opportunity Name: {this_session.opportunity.name} Opportunity Description: {this_session.opportunity.description} Opportunity Stage: {this_session.opportunity.stage} Opportunity Value: {this_session.opportunity.value} Opportunity Close Date: {this_session.opportunity.close_date} """ parm = {"conversation_mode": this_session.qa_mode, "message": message, "name": this_session.customer.contact_name, "company": company_details, "role": this_session.customer.contact_role, "sales_rep": "Tony Snell", "rep_company": rep_company_details, "attitude": this_session.attitude, "mood_score": this_session.mood_score, "scenario": scenario, "stage": this_session.opportunity.stage, "previous_question": previous_question, "next_question": next_question, "rep_answer": rep_answer, "conversation_history": history, "command": command, } return parm async def handle_control_message(command: str): command_parts = command.split() main_command = command_parts[0].lower() if main_command == 'start': await cl.Message(content="Starting new session...").send() elif main_command == 'stop': this_session = cl.user_session.get("session") end_time = datetime.now() duration = end_time - this_session.start_time duration_minutes = round(duration.total_seconds() / 60) this_session.end_time = end_time this_session.duration_minutes = duration_minutes await cl.Message(content=f"Ending current session after {this_session.duration_minutes} minutes").send() elif main_command == 'pause': await cl.Message(content="Ending current session...").send() elif main_command == 'time': this_session = cl.user_session.get("session") duration = this_session.get_session_duration() await cl.Message(content=f"Current session duration: {duration}").send() else: await cl.Message(content=f"Unknown command: {main_command}").send()