Spaces:
Runtime error
Runtime error
import numpy as np | |
from datetime import datetime | |
import time | |
import yaml | |
import threading | |
import os | |
import json | |
import shutil | |
from pathlib import Path | |
from swarmai.utils.CustomLogger import CustomLogger | |
from swarmai.utils.memory import VectorMemory | |
from swarmai.utils.task_queue.PandasQueue import PandasQueue | |
from swarmai.utils.task_queue.Task import Task | |
from swarmai.agents import ManagerAgent, GeneralPurposeAgent, GooglerAgent, CrunchbaseSearcher | |
class Swarm: | |
"""This class is responsible for managing the swarm of agents. | |
The logic: | |
1. User submits a problem to the swarm | |
2. The swarm consists of agents, shared memory and a task queue. | |
3. Agents have different roles. | |
4. Manager agents are responsible for creating tasks and assigning them to the task queue. | |
5. The swarm has a shared memory that the agents can query. | |
The tasks of the swarm class are: | |
1. Create and store the agents | |
2. Start the swarm | |
3. Provide the agents with the access to the shared memory and the task queue | |
4. Maintain stuck agents | |
5. Logging | |
Swarm tips (to be extanded as we gather more experience): | |
1. To avoid the swarm being stuck in a local maximum, the swarm should include agents with high and low exploration rates (models temperature). | |
2. High reward solutions need to be reinfoced by the swarm, and the low reward solutions need to be punished, so that the swarm algorithm converges. | |
3. The swarm architecture should have enough flexibility to allow for an emerging behaviour of the swarm (greater than the sum of its parts). | |
TODO: | |
- adaptation algorithm (dynamically change the number of agents and their roles) | |
- vector database for the shared memory | |
""" | |
WORKER_ROLES = { | |
"manager": ManagerAgent, | |
"googler": GooglerAgent, | |
"analyst": GeneralPurposeAgent, | |
"crunchbase_searcher": CrunchbaseSearcher | |
} | |
TASK_TYPES = [ | |
Task.TaskTypes.breakdown_to_subtasks, | |
Task.TaskTypes.google_search, | |
Task.TaskTypes.analysis, | |
Task.TaskTypes.report_preparation, | |
Task.TaskTypes.crunchbase_search | |
] | |
TASK_ASSOCIATIONS = { | |
"manager": [Task.TaskTypes.breakdown_to_subtasks, Task.TaskTypes.report_preparation], | |
"googler": [Task.TaskTypes.google_search], | |
"analyst": [Task.TaskTypes.analysis], | |
"crunchbase_searcher": [Task.TaskTypes.crunchbase_search] | |
} | |
def __init__(self, swarm_config_loc): | |
"""Initializes the swarm. | |
Args: | |
agent_role_distribution (dict): The dictionary that maps the agent roles to the weight of agents with that role | |
""" | |
self.swarm_config_loc = swarm_config_loc | |
self._parse_swarm_config() | |
# creating shared memory | |
self.shared_memory_file = self.data_dir / 'shared_memory' | |
self.shared_memory = VectorMemory(self.shared_memory_file) | |
self.output_file = str((self.data_dir / 'output.txt').resolve()) | |
with open(self.output_file, 'w') as f: | |
f.write("") | |
out_json = Path(str(self.output_file).replace(".txt", ".json")) | |
if out_json.exists(): | |
with open(self.output_file, 'w') as f: | |
f.write("") | |
out_pretty = Path(str(self.output_file).replace(".txt", "_pretty.txt")) | |
if out_pretty.exists(): | |
with open(self.output_file, 'w') as f: | |
f.write("") | |
# creating task queue | |
self.task_queue = PandasQueue(self.TASK_TYPES, self.WORKER_ROLES.keys(), self.TASK_ASSOCIATIONS) | |
# creating the logger | |
self.logger = CustomLogger(self.data_dir) | |
# creating agents | |
self.agents_ids = [] | |
self.agents = self._create_agents() # returns just a list of agents | |
# get a lock | |
self.lock = threading.Lock() | |
def _create_agents(self): | |
"""Creates the tesnor of agents according to the tensor shape and the agent role distribution. | |
For now just randomly allocating them in the swarm""" | |
agents = [] | |
counter = 0 | |
for key, val in self.agent_role_distribution.items(): | |
agent_role = key | |
agent_role = self._check_keys_and_agents(agent_role) | |
n = val | |
for _ in range(n): | |
agent_id = counter | |
counter += 1 | |
# need each agent to have its own challenge instance, because sometimes the agens submit the answers with infinite loops | |
# also included a timeout for the agent's computation in the AgentBase class | |
agents.append(self.WORKER_ROLES[agent_role](agent_id, agent_role, self, self.logger)) | |
self.agents_ids.append(agent_id) | |
self.log(f"Created {len(agents)} agents with roles: {[agent.agent_type for agent in agents]}") | |
return np.array(agents) | |
def _check_keys_and_agents(self, agent_role): | |
# if GOOGLE_API_KEY and GOOGLE_CSE_ID are not in os.environ, then the googler agent will be treated as a general purpose agent | |
if agent_role == "googler" and ("GOOGLE_API_KEY" not in os.environ or "GOOGLE_CSE_ID" not in os.environ): | |
agent_role = "analyst" | |
return agent_role | |
def run_swarm(self): | |
"""Runs the swarm for a given number of cycles or until the termination condition is met. | |
""" | |
# add the main task to the task queue | |
n_initial_manager_tasks = len(self.goals) | |
for i in range(n_initial_manager_tasks): | |
task_i = Task( | |
priority=100, | |
task_type=Task.TaskTypes.breakdown_to_subtasks, | |
task_description=f"Act as:\n{self.role}Gloabl goal:\n{self.global_goal}\nYour specific task is:\n{self.goals[i]}" | |
) | |
self.task_queue.add_task(task_i) | |
self.create_report_qa_task() | |
# start the agents | |
for agent in self.agents: | |
agent.max_cycles = 50 | |
agent.name = f"Agent {agent.agent_id}" # inherited from threading.Thread => thread name | |
self.log(f"Starting agent {agent.agent_id} with type {agent.agent_type}") | |
agent.start() | |
if self.timeout is not None: | |
self.log(f"Swarm will run for {self.timeout} seconds") | |
time.sleep(self.timeout) | |
else: | |
time.sleep(1000000000000000000000000) | |
self.stop() | |
self.log("All agents have finished their work") | |
return True | |
def create_report_qa_task(self): | |
"""Creates a task that will be used to evaluate the report quality. | |
Make it as a method, because it will be called by the manager agent too. | |
""" | |
task_i = Task( | |
priority=50, | |
task_type=Task.TaskTypes.report_preparation, | |
task_description=f"Prepare a final report about a global goal." | |
) | |
self.task_queue.add_task(task_i) | |
def stop(self): | |
for agent in self.agents: | |
agent.ifRun = False | |
for agent in self.agents: | |
agent.join() | |
def _parse_swarm_config(self): | |
"""Parses the swarm configuration file and returns the agent role distribution. | |
It's a yaml file with the following structure: | |
swarm: | |
agents: # supported: manager, analyst, googler | |
- type: manager | |
n: 5 | |
- type: analyst | |
n: 10 | |
timeout: 10m | |
run_dir: /tmp/swarm | |
task: | |
role: | | |
professional venture capital agency, who has a proven track reckord of consistently funding successful startups | |
global_goal: | | |
A new startup just send us their pitch. Find if the startup is worth investing in. The startup is in the space of brain computer interfaces. | |
Their value proposition is to provide objective user experience research for new games beased directly on the brain activity of the user. | |
goals: | |
- Generate a comprehensive description of the startup. Find any mentions of the startup in the news, social media, etc. | |
- Find top companies and startups in this field. Find out their locations, raised funding, value proposition, differentiation, etc. | |
""" | |
file = self.swarm_config_loc | |
with open(file, "r") as f: | |
config = yaml.safe_load(f) | |
self.agent_role_distribution = {} | |
for agent in config["swarm"]["agents"]: | |
self.agent_role_distribution[agent["type"]] = agent["n"] | |
self.timeout = config["swarm"]["timeout_min"]*60 | |
self.data_dir = Path(".", config["swarm"]["run_dir"]).resolve() | |
# first, try to delete the directory with all the data | |
try: | |
# delete all files and subdirectories in the data directory | |
for dir_i in self.data_dir.iterdir(): | |
if dir_i.is_dir(): | |
shutil.rmtree(dir_i) | |
else: | |
dir_i.unlink() | |
self.log(f"Deleted old {dir_i}") | |
except Exception: | |
pass | |
self.data_dir.mkdir(parents=True, exist_ok=True) | |
# getting the tasks | |
self.role = config["task"]["role"] | |
self.global_goal = config["task"]["global_goal"] | |
self.goals = config["task"]["goals"] | |
def interact_with_output(self, message, method="write"): | |
"""Writed/read the report file. | |
Needed to do it as one method due to multithreading. | |
""" | |
with self.lock: | |
if method == "write": | |
# completely overwriting the file | |
with open(self.output_file, "w") as f: | |
f.write(message) | |
f.close() | |
# try to write it to json. can somtimes be malformated | |
out_json = str(self.output_file).replace(".txt", ".json") | |
message_dict = json.loads(message) | |
with open(out_json, "w") as f: | |
try: | |
json.dump(message_dict, f, indent=4) | |
except: | |
pass | |
f.close() | |
# pretty output. take json and outpout it as a text but with sections | |
out_pretty = str(self.output_file).replace(".txt", "_pretty.txt") | |
with open(out_pretty, "w") as f: | |
for _, value in message_dict.items(): | |
f.write("========================================\n") | |
f.write("========================================\n") | |
for key, value in value.items(): | |
f.write(f"**{key}**:\n{value}\n\n") | |
f.write("\n") | |
f.close() | |
return message | |
elif method == "read": | |
# reading the report file | |
with open(self.output_file, "r") as f: | |
message = f.read() | |
f.close() | |
return message | |
else: | |
raise ValueError(f"Unknown method {method}") | |
def log(self, message, level="info"): | |
level = level.lower() | |
if level == "info": | |
level = 20 | |
elif level == "debug": | |
level = 10 | |
elif level == "warning": | |
level = 30 | |
elif level == "error": | |
level = 40 | |
elif level == "critical": | |
level = 50 | |
else: | |
level = 0 | |
self.logger.log(level=level, msg= {'message': message}) | |