import numpy as np from datetime import datetime import time import yaml import threading import os import json import shutil from pathlib import Path from swarmai.utils.CustomLogger import CustomLogger from swarmai.utils.memory import VectorMemory from swarmai.utils.task_queue.PandasQueue import PandasQueue from swarmai.utils.task_queue.Task import Task from swarmai.agents import ManagerAgent, GeneralPurposeAgent, GooglerAgent, CrunchbaseSearcher class Swarm: """This class is responsible for managing the swarm of agents. The logic: 1. User submits a problem to the swarm 2. The swarm consists of agents, shared memory and a task queue. 3. Agents have different roles. 4. Manager agents are responsible for creating tasks and assigning them to the task queue. 5. The swarm has a shared memory that the agents can query. The tasks of the swarm class are: 1. Create and store the agents 2. Start the swarm 3. Provide the agents with the access to the shared memory and the task queue 4. Maintain stuck agents 5. Logging Swarm tips (to be extanded as we gather more experience): 1. To avoid the swarm being stuck in a local maximum, the swarm should include agents with high and low exploration rates (models temperature). 2. High reward solutions need to be reinfoced by the swarm, and the low reward solutions need to be punished, so that the swarm algorithm converges. 3. The swarm architecture should have enough flexibility to allow for an emerging behaviour of the swarm (greater than the sum of its parts). TODO: - adaptation algorithm (dynamically change the number of agents and their roles) - vector database for the shared memory """ WORKER_ROLES = { "manager": ManagerAgent, "googler": GooglerAgent, "analyst": GeneralPurposeAgent, "crunchbase_searcher": CrunchbaseSearcher } TASK_TYPES = [ Task.TaskTypes.breakdown_to_subtasks, Task.TaskTypes.google_search, Task.TaskTypes.analysis, Task.TaskTypes.report_preparation, Task.TaskTypes.crunchbase_search ] TASK_ASSOCIATIONS = { "manager": [Task.TaskTypes.breakdown_to_subtasks, Task.TaskTypes.report_preparation], "googler": [Task.TaskTypes.google_search], "analyst": [Task.TaskTypes.analysis], "crunchbase_searcher": [Task.TaskTypes.crunchbase_search] } def __init__(self, swarm_config_loc): """Initializes the swarm. Args: agent_role_distribution (dict): The dictionary that maps the agent roles to the weight of agents with that role """ self.swarm_config_loc = swarm_config_loc self._parse_swarm_config() # creating shared memory self.shared_memory_file = self.data_dir / 'shared_memory' self.shared_memory = VectorMemory(self.shared_memory_file) self.output_file = str((self.data_dir / 'output.txt').resolve()) with open(self.output_file, 'w') as f: f.write("") out_json = Path(str(self.output_file).replace(".txt", ".json")) if out_json.exists(): with open(self.output_file, 'w') as f: f.write("") out_pretty = Path(str(self.output_file).replace(".txt", "_pretty.txt")) if out_pretty.exists(): with open(self.output_file, 'w') as f: f.write("") # creating task queue self.task_queue = PandasQueue(self.TASK_TYPES, self.WORKER_ROLES.keys(), self.TASK_ASSOCIATIONS) # creating the logger self.logger = CustomLogger(self.data_dir) # creating agents self.agents_ids = [] self.agents = self._create_agents() # returns just a list of agents # get a lock self.lock = threading.Lock() def _create_agents(self): """Creates the tesnor of agents according to the tensor shape and the agent role distribution. For now just randomly allocating them in the swarm""" agents = [] counter = 0 for key, val in self.agent_role_distribution.items(): agent_role = key agent_role = self._check_keys_and_agents(agent_role) n = val for _ in range(n): agent_id = counter counter += 1 # need each agent to have its own challenge instance, because sometimes the agens submit the answers with infinite loops # also included a timeout for the agent's computation in the AgentBase class agents.append(self.WORKER_ROLES[agent_role](agent_id, agent_role, self, self.logger)) self.agents_ids.append(agent_id) self.log(f"Created {len(agents)} agents with roles: {[agent.agent_type for agent in agents]}") return np.array(agents) def _check_keys_and_agents(self, agent_role): # if GOOGLE_API_KEY and GOOGLE_CSE_ID are not in os.environ, then the googler agent will be treated as a general purpose agent if agent_role == "googler" and ("GOOGLE_API_KEY" not in os.environ or "GOOGLE_CSE_ID" not in os.environ): agent_role = "analyst" return agent_role def run_swarm(self): """Runs the swarm for a given number of cycles or until the termination condition is met. """ # add the main task to the task queue n_initial_manager_tasks = len(self.goals) for i in range(n_initial_manager_tasks): task_i = Task( priority=100, task_type=Task.TaskTypes.breakdown_to_subtasks, task_description=f"Act as:\n{self.role}Gloabl goal:\n{self.global_goal}\nYour specific task is:\n{self.goals[i]}" ) self.task_queue.add_task(task_i) self.create_report_qa_task() # start the agents for agent in self.agents: agent.max_cycles = 50 agent.name = f"Agent {agent.agent_id}" # inherited from threading.Thread => thread name self.log(f"Starting agent {agent.agent_id} with type {agent.agent_type}") agent.start() if self.timeout is not None: self.log(f"Swarm will run for {self.timeout} seconds") time.sleep(self.timeout) else: time.sleep(1000000000000000000000000) self.stop() self.log("All agents have finished their work") return True def create_report_qa_task(self): """Creates a task that will be used to evaluate the report quality. Make it as a method, because it will be called by the manager agent too. """ task_i = Task( priority=50, task_type=Task.TaskTypes.report_preparation, task_description=f"Prepare a final report about a global goal." ) self.task_queue.add_task(task_i) def stop(self): for agent in self.agents: agent.ifRun = False for agent in self.agents: agent.join() def _parse_swarm_config(self): """Parses the swarm configuration file and returns the agent role distribution. It's a yaml file with the following structure: swarm: agents: # supported: manager, analyst, googler - type: manager n: 5 - type: analyst n: 10 timeout: 10m run_dir: /tmp/swarm task: role: | professional venture capital agency, who has a proven track reckord of consistently funding successful startups global_goal: | A new startup just send us their pitch. Find if the startup is worth investing in. The startup is in the space of brain computer interfaces. Their value proposition is to provide objective user experience research for new games beased directly on the brain activity of the user. goals: - Generate a comprehensive description of the startup. Find any mentions of the startup in the news, social media, etc. - Find top companies and startups in this field. Find out their locations, raised funding, value proposition, differentiation, etc. """ file = self.swarm_config_loc with open(file, "r") as f: config = yaml.safe_load(f) self.agent_role_distribution = {} for agent in config["swarm"]["agents"]: self.agent_role_distribution[agent["type"]] = agent["n"] self.timeout = config["swarm"]["timeout_min"]*60 self.data_dir = Path(".", config["swarm"]["run_dir"]).resolve() # first, try to delete the directory with all the data try: # delete all files and subdirectories in the data directory for dir_i in self.data_dir.iterdir(): if dir_i.is_dir(): shutil.rmtree(dir_i) else: dir_i.unlink() self.log(f"Deleted old {dir_i}") except Exception: pass self.data_dir.mkdir(parents=True, exist_ok=True) # getting the tasks self.role = config["task"]["role"] self.global_goal = config["task"]["global_goal"] self.goals = config["task"]["goals"] def interact_with_output(self, message, method="write"): """Writed/read the report file. Needed to do it as one method due to multithreading. """ with self.lock: if method == "write": # completely overwriting the file with open(self.output_file, "w") as f: f.write(message) f.close() # try to write it to json. can somtimes be malformated out_json = str(self.output_file).replace(".txt", ".json") message_dict = json.loads(message) with open(out_json, "w") as f: try: json.dump(message_dict, f, indent=4) except: pass f.close() # pretty output. take json and outpout it as a text but with sections out_pretty = str(self.output_file).replace(".txt", "_pretty.txt") with open(out_pretty, "w") as f: for _, value in message_dict.items(): f.write("========================================\n") f.write("========================================\n") for key, value in value.items(): f.write(f"**{key}**:\n{value}\n\n") f.write("\n") f.close() return message elif method == "read": # reading the report file with open(self.output_file, "r") as f: message = f.read() f.close() return message else: raise ValueError(f"Unknown method {method}") def log(self, message, level="info"): level = level.lower() if level == "info": level = 20 elif level == "debug": level = 10 elif level == "warning": level = 30 elif level == "error": level = 40 elif level == "critical": level = 50 else: level = 0 self.logger.log(level=level, msg= {'message': message})