AutoGen_Playground / autogen_utils.py
thinkall's picture
Add get human input
e11ba6d
raw
history blame
10.2 kB
import asyncio
import sys
import threading
import time
from ast import literal_eval
import autogen
import chromadb
import panel as pn
from autogen import Agent, AssistantAgent, UserProxyAgent
from autogen.agentchat.contrib.compressible_agent import CompressibleAgent
from autogen.agentchat.contrib.gpt_assistant_agent import GPTAssistantAgent
from autogen.agentchat.contrib.llava_agent import LLaVAAgent
from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent
from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
from autogen.agentchat.contrib.teachable_agent import TeachableAgent
from autogen.code_utils import extract_code
from configs import Q1, Q2, Q3, TIMEOUT, TITLE
from panel.widgets import TextAreaInput
try:
from termcolor import colored
except ImportError:
def colored(x, *args, **kwargs):
return x
def get_retrieve_config(docs_path, model_name, collection_name):
return {
"docs_path": literal_eval(docs_path),
"chunk_token_size": 1000,
"model": model_name,
"embedding_model": "all-mpnet-base-v2",
"get_or_create": True,
"client": chromadb.PersistentClient(path=".chromadb"),
"collection_name": collection_name,
}
# autogen.ChatCompletion.start_logging()
def termination_msg(x):
"""Check if a message is a termination message."""
_msg = str(x.get("content", "")).upper().strip().strip("\n").strip(".")
return isinstance(x, dict) and (_msg.endswith("TERMINATE") or _msg.startswith("TERMINATE"))
def _is_termination_msg(message):
"""Check if a message is a termination message.
Terminate when no code block is detected. Currently only detect python code blocks.
"""
if isinstance(message, dict):
message = message.get("content")
if message is None:
return False
cb = extract_code(message)
contain_code = False
for c in cb:
# todo: support more languages
if c[0] == "python":
contain_code = True
break
return not contain_code
def new_generate_oai_reply(
self,
messages=None,
sender=None,
config=None,
):
"""Generate a reply using autogen.oai."""
client = self.client if config is None else config
if client is None:
return False, None
if messages is None:
messages = self._oai_messages[sender]
# handle 336006 https://cloud.baidu.com/doc/WENXINWORKSHOP/s/tlmyncueh
_context = messages[-1].pop("context", None)
_messages = self._oai_system_message + messages
for idx, msg in enumerate(_messages):
if idx == 0:
continue
if idx % 2 == 1:
msg["role"] = "user" if msg.get("role") != "function" else "function"
else:
msg["role"] = "assistant"
if len(_messages) % 2 == 1:
_messages.append({"content": "Please reply exactly `TERMINATE` to me if the task is done.", "role": "user"})
# print(f"messages: {_messages}")
response = client.create(context=_context, messages=_messages)
# print(f"{response=}")
return True, client.extract_text_or_function_call(response)[0]
def initialize_agents(
llm_config, agent_name, system_msg, agent_type, retrieve_config=None, code_execution_config=False
):
if "RetrieveUserProxyAgent" == agent_type:
agent = RetrieveUserProxyAgent(
name=agent_name,
is_termination_msg=termination_msg,
human_input_mode="TERMINATE",
max_consecutive_auto_reply=5,
retrieve_config=retrieve_config,
code_execution_config=code_execution_config, # set to False if you don't want to execute the code
default_auto_reply="Please reply exactly `TERMINATE` to me if the task is done.",
)
elif "GPTAssistantAgent" == agent_type:
agent = GPTAssistantAgent(
name=agent_name,
instructions=system_msg,
llm_config=llm_config,
is_termination_msg=termination_msg,
)
elif "CompressibleAgent" == agent_type:
compress_config = {
"mode": "COMPRESS",
"trigger_count": 600, # set this to a large number for less frequent compression
"verbose": True, # to allow printing of compression information: contex before and after compression
"leave_last_n": 2,
}
agent = CompressibleAgent(
name=agent_name,
system_message=system_msg,
llm_config=llm_config,
compress_config=compress_config,
is_termination_msg=termination_msg,
)
elif "UserProxy" in agent_type:
agent = globals()[agent_type](
name=agent_name,
is_termination_msg=termination_msg,
human_input_mode="TERMINATE",
system_message=system_msg,
default_auto_reply="Please reply exactly `TERMINATE` to me if the task is done.",
max_consecutive_auto_reply=5,
code_execution_config=code_execution_config,
)
else:
agent = globals()[agent_type](
name=agent_name,
is_termination_msg=termination_msg,
human_input_mode="NEVER",
system_message=system_msg,
llm_config=llm_config,
)
# if any(["ernie" in cfg["model"].lower() for cfg in llm_config["config_list"]]):
if "ernie" in llm_config["config_list"][0]["model"].lower():
# Hack for ERNIE Bot models
# print("Hack for ERNIE Bot models.")
agent._reply_func_list.pop(-1)
agent.register_reply([Agent, None], new_generate_oai_reply, -1)
return agent
async def get_human_input(name, prompt: str, instance=None) -> str:
"""Get human input."""
if instance is None:
return input(prompt)
get_input_widget = TextAreaInput(placeholder=prompt, name="", sizing_mode="stretch_width")
get_input_checkbox = pn.widgets.Checkbox(name="Check to Submit Feedback")
instance.send(pn.Row(get_input_widget, get_input_checkbox), user=name, respond=False)
ts = time.time()
while True:
if time.time() - ts > TIMEOUT:
instance.send(
f"You didn't provide your feedback in {TIMEOUT} seconds, skip and use auto-reply.",
user=name,
respond=False,
)
reply = ""
break
if get_input_widget.value != "" and get_input_checkbox.value is True:
get_input_widget.disabled = True
reply = get_input_widget.value
break
await asyncio.sleep(0.1)
return reply
async def check_termination_and_human_reply(
self,
messages=None,
sender=None,
config=None,
instance=None,
):
"""Check if the conversation should be terminated, and if human reply is provided."""
if config is None:
config = self
if messages is None:
messages = self._oai_messages[sender]
message = messages[-1]
reply = ""
no_human_input_msg = ""
if self.human_input_mode == "ALWAYS":
reply = await get_human_input(
self.name,
f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: ",
instance,
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not self._is_termination_msg(message) else "exit"
else:
if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]:
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
terminate = self._is_termination_msg(message)
reply = await get_human_input(
self.name,
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
if terminate
else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: ",
instance,
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not terminate else "exit"
elif self._is_termination_msg(message):
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
reply = await get_human_input(
self.name,
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: ",
instance,
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply or "exit"
# print the no_human_input_msg
if no_human_input_msg:
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
# stop the conversation
if reply == "exit":
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, None
# send the human reply
if reply or self._max_consecutive_auto_reply_dict[sender] == 0:
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, reply
# increment the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] += 1
if self.human_input_mode != "NEVER":
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
return False, None