Spaces:
Runtime error
Runtime error
from collections import deque | |
from typing import Dict, List, Optional, Any | |
import re | |
from langchain import LLMChain, OpenAI, PromptTemplate, SerpAPIWrapper | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.llms import BaseLLM | |
from langchain.vectorstores.base import VectorStore | |
from pydantic import BaseModel, Field | |
from langchain.chains.base import Chain | |
from langchain.vectorstores import FAISS | |
import faiss | |
from langchain.docstore import InMemoryDocstore | |
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor | |
from swarms.agent.executor import Executor, AgentExecutorWithTranslation | |
class ContextAwareAgent(ZeroShotAgent): | |
def get_full_inputs(self, intermediate_steps, **kwargs: Any) -> Dict[str, Any]: | |
"""Create the full inputs for the LLMChain from intermediate steps.""" | |
thoughts = self._construct_scratchpad(intermediate_steps) | |
new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop} | |
full_inputs = {**kwargs, **new_inputs} | |
return full_inputs | |
def _construct_scratchpad(self, intermediate_steps): | |
"""Construct the scratchpad that lets the agent continue its thought process.""" | |
thoughts = "" | |
# only modify the following line, [-2: ] | |
for action, observation in intermediate_steps[-2:]: | |
thoughts += action.log | |
thoughts += f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}" | |
if "is not a valid tool, try another one" in observation: | |
thoughts += ( | |
"You should select another tool rather than the invalid one.\n" | |
) | |
return thoughts | |
class TaskCreationChain(LLMChain): | |
"""Chain to generates tasks.""" | |
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain: | |
"""Get the response parser.""" | |
task_creation_template = ( | |
"You are an task creation AI that uses the result of an execution agent" | |
" to create new tasks with the following objective: {objective}," | |
" The last completed task has the result: {result}." | |
" This result was based on this task description: {task_description}." | |
" These are incomplete tasks: {incomplete_tasks}." | |
" Based on the result, create new tasks to be completed" | |
" by the AI system that do not overlap with incomplete tasks." | |
" For a simple objective, do not generate complex todo lists." | |
" Do not generate repetitive tasks (e.g., tasks that have already been completed)." | |
" If there is not futher task needed to complete the objective, return NO TASK." | |
" Now return the tasks as an array." | |
) | |
prompt = PromptTemplate( | |
template=task_creation_template, | |
input_variables=[ | |
"result", | |
"task_description", | |
"incomplete_tasks", | |
"objective", | |
], | |
) | |
return cls(prompt=prompt, llm=llm, verbose=verbose) | |
class InitialTaskCreationChain(LLMChain): | |
"""Chain to generates tasks.""" | |
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain: | |
"""Get the response parser.""" | |
task_creation_template = "You are a planner who is an expert at coming up with a todo list for a given objective. For a simple objective, do not generate a complex todo list. Generate the first (only one) task needed to do for this objective: {objective}" | |
prompt = PromptTemplate( | |
template=task_creation_template, | |
input_variables=["objective"], | |
) | |
return cls(prompt=prompt, llm=llm, verbose=verbose) | |
class TaskPrioritizationChain(LLMChain): | |
"""Chain to prioritize tasks.""" | |
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain: | |
"""Get the response parser.""" | |
task_prioritization_template = ( | |
"You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing" | |
" the following tasks: {task_names}." | |
" Consider the ultimate objective of your team: {objective}." | |
" Do not make up any tasks, just reorganize the existing tasks." | |
" Do not remove any tasks. Return the result as a numbered list, like:" | |
" #. First task" | |
" #. Second task" | |
" Start the task list with number {next_task_id}. (e.g., 2. ***, 3. ***, etc.)" | |
) | |
prompt = PromptTemplate( | |
template=task_prioritization_template, | |
input_variables=["task_names", "next_task_id", "objective"], | |
) | |
return cls(prompt=prompt, llm=llm, verbose=verbose) | |
def get_next_task( | |
task_creation_chain: LLMChain, | |
result: Dict, | |
task_description: str, | |
task_list: List[str], | |
objective: str, | |
) -> List[Dict]: | |
"""Get the next task.""" | |
incomplete_tasks = ", ".join(task_list) | |
response = task_creation_chain.run( | |
result=result, | |
task_description=task_description, | |
incomplete_tasks=incomplete_tasks, | |
objective=objective, | |
) | |
# change the split method to re matching | |
# new_tasks = response.split('\n') | |
task_pattern = re.compile(r"\d+\. (.+?)\n") | |
new_tasks = task_pattern.findall(response) | |
return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()] | |
def prioritize_tasks( | |
task_prioritization_chain: LLMChain, | |
this_task_id: int, | |
task_list: List[Dict], | |
objective: str, | |
) -> List[Dict]: | |
"""Prioritize tasks.""" | |
task_names = [t["task_name"] for t in task_list] | |
next_task_id = int(this_task_id) + 1 | |
response = task_prioritization_chain.run( | |
task_names=task_names, next_task_id=next_task_id, objective=objective | |
) | |
new_tasks = response.split("\n") | |
prioritized_task_list = [] | |
for task_string in new_tasks: | |
if not task_string.strip(): | |
continue | |
task_parts = task_string.strip().split(".", 1) | |
if len(task_parts) == 2: | |
task_id = task_parts[0].strip() | |
task_name = task_parts[1].strip() | |
prioritized_task_list.append({"task_id": task_id, "task_name": task_name}) | |
return prioritized_task_list | |
def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]: | |
"""Get the top k tasks based on the query.""" | |
results = vectorstore.similarity_search_with_score(query, k=k) | |
if not results: | |
return [] | |
sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True)) | |
return [str(item.metadata["task"]) for item in sorted_results] | |
def execute_task( | |
vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5 | |
) -> str: | |
"""Execute a task.""" | |
context = _get_top_tasks(vectorstore, query=objective, k=k) | |
return execution_chain.run(objective=objective, context=context, task=task) | |
class BabyAGI(Chain, BaseModel): | |
"""Controller model for the BabyAGI agent.""" | |
task_list: deque = Field(default_factory=deque) | |
task_creation_chain: TaskCreationChain = Field(...) | |
task_prioritization_chain: TaskPrioritizationChain = Field(...) | |
initial_task_creation_chain: InitialTaskCreationChain = Field(...) | |
execution_chain: AgentExecutor = Field(...) | |
task_id_counter: int = Field(1) | |
vectorstore: VectorStore = Field(init=False) | |
max_iterations: Optional[int] = None | |
class Config: | |
"""Configuration for this pydantic object.""" | |
arbitrary_types_allowed = True | |
def add_task(self, task: Dict): | |
self.task_list.append(task) | |
def print_task_list(self): | |
print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m") | |
for t in self.task_list: | |
print(str(t["task_id"]) + ": " + t["task_name"]) | |
def print_next_task(self, task: Dict): | |
print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m") | |
print(str(task["task_id"]) + ": " + task["task_name"]) | |
def print_task_result(self, result: str): | |
print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m") | |
print(result) | |
def input_keys(self) -> List[str]: | |
return ["objective"] | |
def output_keys(self) -> List[str]: | |
return [] | |
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]: | |
"""Run the agent.""" | |
# not an elegant implementation, but it works for the first task | |
objective = inputs["objective"] | |
first_task = inputs.get( | |
"first_task", self.initial_task_creation_chain.run(objective=objective) | |
) # self.task_creation_chain.llm(initial_task_prompt)) | |
self.add_task({"task_id": 1, "task_name": first_task}) | |
num_iters = 0 | |
while True: | |
if self.task_list: | |
self.print_task_list() | |
# Step 1: Pull the first task | |
task = self.task_list.popleft() | |
self.print_next_task(task) | |
# Step 2: Execute the task | |
result = execute_task( | |
self.vectorstore, self.execution_chain, objective, task["task_name"] | |
) | |
this_task_id = int(task["task_id"]) | |
self.print_task_result(result) | |
# Step 3: Store the result in Pinecone | |
result_id = f"result_{task['task_id']}" | |
self.vectorstore.add_texts( | |
texts=[result], | |
metadatas=[{"task": task["task_name"]}], | |
ids=[result_id], | |
) | |
# Step 4: Create new tasks and reprioritize task list | |
new_tasks = get_next_task( | |
self.task_creation_chain, | |
result, | |
task["task_name"], | |
[t["task_name"] for t in self.task_list], | |
objective, | |
) | |
for new_task in new_tasks: | |
self.task_id_counter += 1 | |
new_task.update({"task_id": self.task_id_counter}) | |
self.add_task(new_task) | |
if len(self.task_list) == 0: | |
print( | |
"\033[91m\033[1m" | |
+ "\n*****NO TASK, ABORTING*****\n" | |
+ "\033[0m\033[0m" | |
) | |
break | |
self.task_list = deque( | |
prioritize_tasks( | |
self.task_prioritization_chain, | |
this_task_id, | |
list(self.task_list), | |
objective, | |
) | |
) | |
num_iters += 1 | |
if self.max_iterations is not None and num_iters == self.max_iterations: | |
print( | |
"\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m" | |
) | |
break | |
return {} | |
def from_llm( | |
cls, | |
llm: BaseLLM, | |
prompt=None, | |
verbose: bool = False, | |
tools=None, | |
stream_output=None, | |
**kwargs, | |
) -> "BabyAGI": | |
embeddings_model = OpenAIEmbeddings() | |
embedding_size = 1536 | |
index = faiss.IndexFlatL2(embedding_size) | |
vectorstore = FAISS( | |
embeddings_model.embed_query, index, InMemoryDocstore({}), {} | |
) | |
task_creation_chain = TaskCreationChain.from_llm(llm, verbose=verbose) | |
initial_task_creation_chain = InitialTaskCreationChain.from_llm( | |
llm, verbose=verbose | |
) | |
task_prioritization_chain = TaskPrioritizationChain.from_llm( | |
llm, verbose=verbose | |
) | |
llm_chain = LLMChain(llm=llm, prompt=prompt) | |
tool_names = [tool.name for tool in tools] | |
agent = ContextAwareAgent(llm_chain=llm_chain, allowed_tools=tool_names) | |
if stream_output: | |
agent_executor = Executor.from_agent_and_tools( | |
agent=agent, tools=tools, verbose=True | |
) | |
else: | |
agent_executor = AgentExecutorWithTranslation.from_agent_and_tools( | |
agent=agent, tools=tools, verbose=True | |
) | |
return cls( | |
task_creation_chain=task_creation_chain, | |
task_prioritization_chain=task_prioritization_chain, | |
initial_task_creation_chain=initial_task_creation_chain, | |
execution_chain=agent_executor, | |
vectorstore=vectorstore, | |
**kwargs, | |
) | |
if __name__ == "__main__": | |
todo_prompt = PromptTemplate.from_template( | |
"You are a planner who is an expert at coming up with a todo list for a given objective. For a simple objective, do not generate a complex todo list. Come up with a todo list for this objective: {objective}" | |
) | |
todo_chain = LLMChain(llm=OpenAI(temperature=0), prompt=todo_prompt) | |
search = SerpAPIWrapper() | |
tools = [ | |
Tool( | |
name="Search", | |
func=search.run, | |
description="useful for when you need to answer questions about current events", | |
), | |
Tool( | |
name="TODO", | |
func=todo_chain.run, | |
description="useful for when you need to come up with todo lists. Input: an objective to create a todo list for. Output: a todo list for that objective. Please be very clear what the objective is!", | |
), | |
] | |
prefix = """You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}.""" | |
suffix = """Question: {task} | |
{agent_scratchpad}""" | |
prompt = ZeroShotAgent.create_prompt( | |
tools, | |
prefix=prefix, | |
suffix=suffix, | |
input_variables=["objective", "task", "context", "agent_scratchpad"], | |
) | |
OBJECTIVE = "Write a weather report for SF today" | |
llm = OpenAI(temperature=0) | |
# Logging of LLMChains | |
verbose = False | |
# If None, will keep on going forever | |
max_iterations: Optional[int] = 10 | |
baby_agi = BabyAGI.from_llm(llm=llm, verbose=verbose, max_iterations=max_iterations) | |
baby_agi({"objective": OBJECTIVE}) | |