|
from agent_build_sdk.builder import AgentBuilder |
|
from agent_build_sdk.model.model import AgentResp, AgentReq, STATUS_DISTRIBUTION, STATUS_ROUND, STATUS_VOTE, \ |
|
STATUS_START, STATUS_VOTE_RESULT, STATUS_RESULT |
|
from agent_build_sdk.sdk.agent import BasicAgent |
|
from agent_build_sdk.sdk.agent import format_prompt |
|
|
|
from prompts import DESC_PROMPT, VOTE_PROMPT |
|
from agent_build_sdk.utils.logger import logger |
|
|
|
from openai import OpenAI |
|
import os |
|
|
|
|
|
class SpyAgent(BasicAgent): |
|
|
|
def perceive(self, req=AgentReq): |
|
logger.info("spy perceive: {}".format(req)) |
|
if req.status == STATUS_START: |
|
self.memory.clear() |
|
self.memory.set_variable("name", req.message) |
|
self.memory.append_history( |
|
'主持人: 女士们先生们,欢迎来到《谁是卧底》游戏!我们有一个由6名玩家组成的小组,在其中有一名卧底。让我们开始吧!每个人都会收到一张纸。其中5人的纸上拥有相同的单词,而卧底则会收到含义上相似的单词。我们将大多数人拿到的单词称为"公共词",将卧底拿到的单词称为"卧底词"。一旦你拿到了你的单词,首先需要根据其他人的发言判断自己是否拿到了卧底词。如果判断自己拿到了卧底词,请猜测公共词是什么,然后描述公共词来混淆视听,避免被投票淘汰。如果判断自己拿到了公共词,请思考如何巧妙地描述它而不泄露它,不能让卧底察觉,也要给同伴暗示。每人每轮用一句话描述自己拿到的词语,每个人的描述禁止重复,话中不能出现所持词语。每轮描述完毕,所有在场的人投票选出怀疑是卧底的那个人,得票数最多的人出局。卧底出局则游戏结束,若卧底未出局,游戏继续。现在游戏开始。') |
|
elif req.status == STATUS_DISTRIBUTION: |
|
self.memory.set_variable("word", req.word) |
|
self.memory.append_history( |
|
'主持人: 你好,{},你分配到的单词是:{}'.format(self.memory.load_variable("name"), req.word)) |
|
elif req.status == STATUS_ROUND: |
|
if req.name: |
|
|
|
self.memory.append_history(req.name + ': ' + req.message) |
|
else: |
|
|
|
self.memory.append_history('主持人: 现在进入第{}轮。'.format(str(req.round))) |
|
self.memory.append_history('主持人: 每个玩家描述自己分配到的单词。') |
|
elif req.status == STATUS_VOTE: |
|
self.memory.append_history(req.name + ': ' + req.message) |
|
elif req.status == STATUS_VOTE_RESULT: |
|
if req.name: |
|
self.memory.append_history('主持人: 投票结果是:{}。'.format(req.name)) |
|
else: |
|
self.memory.append_history('主持人: 无人出局。') |
|
elif req.status == STATUS_RESULT: |
|
self.memory.append_history(req.message) |
|
else: |
|
raise NotImplementedError |
|
|
|
def interact(self, req=AgentReq) -> AgentResp: |
|
logger.info("spy interact: {}".format(req)) |
|
if req.status == STATUS_ROUND: |
|
prompt = format_prompt(DESC_PROMPT, |
|
{"name": self.memory.load_variable("name"), |
|
"word": self.memory.load_variable("word"), |
|
"history": "\n".join(self.memory.load_history()) |
|
}) |
|
logger.info("prompt:" + prompt) |
|
result = self.llm_caller(prompt) |
|
logger.info("spy interact result: {}".format(result)) |
|
return AgentResp(success=True, result=result, errMsg=None) |
|
|
|
elif req.status == STATUS_VOTE: |
|
self.memory.append_history('主持人: 到了投票的时候了。每个人,请指向你认为可能是卧底的人。') |
|
choices = [name for name in req.message.split(",") if name != self.memory.load_variable("name")] |
|
self.memory.set_variable("choices", choices) |
|
prompt = format_prompt(VOTE_PROMPT, {"name": self.memory.load_variable("name"), |
|
"choices": choices, |
|
"history": "\n".join(self.memory.load_history()) |
|
}) |
|
logger.info("prompt:" + prompt) |
|
result = self.llm_caller(prompt) |
|
logger.info("spy interact result: {}".format(result)) |
|
return AgentResp(success=True, result=result, errMsg=None) |
|
else: |
|
raise NotImplementedError |
|
|
|
def llm_caller(self, prompt): |
|
client = OpenAI( |
|
api_key=os.getenv('API_KEY'), |
|
base_url=os.getenv('BASE_URL') |
|
) |
|
completion = client.chat.completions.create( |
|
model=self.model_name, |
|
messages=[ |
|
{'role': 'system', 'content': 'You are a helpful assistant.'}, |
|
{'role': 'user', 'content': prompt} |
|
], |
|
temperature=0 |
|
) |
|
try: |
|
return completion.choices[0].message.content |
|
except Exception as e: |
|
print(e) |
|
return None |
|
|
|
|
|
if __name__ == '__main__': |
|
name = 'spy' |
|
agent_builder = AgentBuilder(name, agent=SpyAgent(name, model_name=os.getenv('MODEL_NAME'))) |
|
agent_builder.start() |