import asyncio import sys import threading import time from ast import literal_eval import autogen import chromadb import panel as pn from autogen import Agent, AssistantAgent, UserProxyAgent from autogen.agentchat.contrib.compressible_agent import CompressibleAgent from autogen.agentchat.contrib.gpt_assistant_agent import GPTAssistantAgent from autogen.agentchat.contrib.llava_agent import LLaVAAgent from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent from autogen.agentchat.contrib.teachable_agent import TeachableAgent from autogen.code_utils import extract_code from configs import Q1, Q2, Q3, TIMEOUT, TITLE from panel.widgets import TextAreaInput try: from termcolor import colored except ImportError: def colored(x, *args, **kwargs): return x def get_retrieve_config(docs_path, model_name, collection_name): return { "docs_path": literal_eval(docs_path), "chunk_token_size": 1000, "model": model_name, "embedding_model": "all-mpnet-base-v2", "get_or_create": True, "client": chromadb.PersistentClient(path=".chromadb"), "collection_name": collection_name, } # autogen.ChatCompletion.start_logging() def termination_msg(x): """Check if a message is a termination message.""" _msg = str(x.get("content", "")).upper().strip().strip("\n").strip(".") return isinstance(x, dict) and (_msg.endswith("TERMINATE") or _msg.startswith("TERMINATE")) def _is_termination_msg(message): """Check if a message is a termination message. Terminate when no code block is detected. Currently only detect python code blocks. """ if isinstance(message, dict): message = message.get("content") if message is None: return False cb = extract_code(message) contain_code = False for c in cb: # todo: support more languages if c[0] == "python": contain_code = True break return not contain_code def new_generate_oai_reply( self, messages=None, sender=None, config=None, ): """Generate a reply using autogen.oai.""" client = self.client if config is None else config if client is None: return False, None if messages is None: messages = self._oai_messages[sender] # handle 336006 https://cloud.baidu.com/doc/WENXINWORKSHOP/s/tlmyncueh _context = messages[-1].pop("context", None) _messages = self._oai_system_message + messages for idx, msg in enumerate(_messages): if idx == 0: continue if idx % 2 == 1: msg["role"] = "user" if msg.get("role") != "function" else "function" else: msg["role"] = "assistant" if len(_messages) % 2 == 1: _messages.append({"content": "Please reply exactly `TERMINATE` to me if the task is done.", "role": "user"}) # print(f"messages: {_messages}") response = client.create(context=_context, messages=_messages) # print(f"{response=}") return True, client.extract_text_or_function_call(response)[0] def initialize_agents( llm_config, agent_name, system_msg, agent_type, retrieve_config=None, code_execution_config=False ): if "RetrieveUserProxyAgent" == agent_type: agent = RetrieveUserProxyAgent( name=agent_name, is_termination_msg=termination_msg, human_input_mode="TERMINATE", max_consecutive_auto_reply=5, retrieve_config=retrieve_config, code_execution_config=code_execution_config, # set to False if you don't want to execute the code default_auto_reply="Please reply exactly `TERMINATE` to me if the task is done.", ) elif "GPTAssistantAgent" == agent_type: agent = GPTAssistantAgent( name=agent_name, instructions=system_msg, llm_config=llm_config, is_termination_msg=termination_msg, ) elif "CompressibleAgent" == agent_type: compress_config = { "mode": "COMPRESS", "trigger_count": 600, # set this to a large number for less frequent compression "verbose": True, # to allow printing of compression information: contex before and after compression "leave_last_n": 2, } agent = CompressibleAgent( name=agent_name, system_message=system_msg, llm_config=llm_config, compress_config=compress_config, is_termination_msg=termination_msg, ) elif "UserProxy" in agent_type: agent = globals()[agent_type]( name=agent_name, is_termination_msg=termination_msg, human_input_mode="TERMINATE", system_message=system_msg, default_auto_reply="Please reply exactly `TERMINATE` to me if the task is done.", max_consecutive_auto_reply=5, code_execution_config=code_execution_config, ) else: agent = globals()[agent_type]( name=agent_name, is_termination_msg=termination_msg, human_input_mode="NEVER", system_message=system_msg, llm_config=llm_config, ) # if any(["ernie" in cfg["model"].lower() for cfg in llm_config["config_list"]]): if "ernie" in llm_config["config_list"][0]["model"].lower(): # Hack for ERNIE Bot models # print("Hack for ERNIE Bot models.") agent._reply_func_list.pop(-1) agent.register_reply([Agent, None], new_generate_oai_reply, -1) return agent async def get_human_input(name, prompt: str, instance=None) -> str: """Get human input.""" if instance is None: return input(prompt) get_input_widget = TextAreaInput(placeholder=prompt, name="", sizing_mode="stretch_width") get_input_checkbox = pn.widgets.Checkbox(name="Check to Submit Feedback") instance.send(pn.Row(get_input_widget, get_input_checkbox), user=name, respond=False) ts = time.time() while True: if time.time() - ts > TIMEOUT: instance.send( f"You didn't provide your feedback in {TIMEOUT} seconds, skip and use auto-reply.", user=name, respond=False, ) reply = "" break if get_input_widget.value != "" and get_input_checkbox.value is True: get_input_widget.disabled = True reply = get_input_widget.value break await asyncio.sleep(0.1) return reply async def check_termination_and_human_reply( self, messages=None, sender=None, config=None, instance=None, ): """Check if the conversation should be terminated, and if human reply is provided.""" if config is None: config = self if messages is None: messages = self._oai_messages[sender] message = messages[-1] reply = "" no_human_input_msg = "" if self.human_input_mode == "ALWAYS": reply = await get_human_input( self.name, f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: ", instance, ) no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" # if the human input is empty, and the message is a termination message, then we will terminate the conversation reply = reply if reply or not self._is_termination_msg(message) else "exit" else: if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]: if self.human_input_mode == "NEVER": reply = "exit" else: # self.human_input_mode == "TERMINATE": terminate = self._is_termination_msg(message) reply = await get_human_input( self.name, f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " if terminate else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: ", instance, ) no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" # if the human input is empty, and the message is a termination message, then we will terminate the conversation reply = reply if reply or not terminate else "exit" elif self._is_termination_msg(message): if self.human_input_mode == "NEVER": reply = "exit" else: # self.human_input_mode == "TERMINATE": reply = await get_human_input( self.name, f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: ", instance, ) no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" # if the human input is empty, and the message is a termination message, then we will terminate the conversation reply = reply or "exit" # print the no_human_input_msg if no_human_input_msg: print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True) # stop the conversation if reply == "exit": # reset the consecutive_auto_reply_counter self._consecutive_auto_reply_counter[sender] = 0 return True, None # send the human reply if reply or self._max_consecutive_auto_reply_dict[sender] == 0: # reset the consecutive_auto_reply_counter self._consecutive_auto_reply_counter[sender] = 0 return True, reply # increment the consecutive_auto_reply_counter self._consecutive_auto_reply_counter[sender] += 1 if self.human_input_mode != "NEVER": print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True) return False, None