Spaces:
Running
Running
import asyncio | |
from openai import AsyncOpenAI | |
from collections import defaultdict | |
import weave | |
from pydantic import BaseModel | |
from abc import ABC, abstractmethod | |
import json | |
class StepAnalysis(BaseModel): | |
description: str | |
action_type: str | |
assessment: str | |
success: bool | |
headline: str | |
class TaskSummary(BaseModel): | |
overview: str | |
key_successes: str | |
main_challenges: str | |
overall_assessment: str | |
def get_weave_calls(client): | |
calls = client.calls() | |
processed_calls = [] | |
for call in calls: | |
ChatCompletion = weave.ref(call.output).get() | |
choices = [choice.message.content for choice in ChatCompletion.choices] | |
output = { | |
'weave_task_id': call.attributes['weave_task_id'], | |
'trace_id': call.trace_id, | |
'project_id': call.project_id, | |
'created_timestamp': ChatCompletion.created, | |
'inputs': dict(call.inputs), | |
'id': call.id, | |
'outputs': {'choices' : choices}, | |
'exception': call.exception, | |
'summary': call.summary, | |
'display_name': call.display_name, | |
'attributes': dict(call.attributes), | |
"_children": call._children, | |
'_feedback': call._feedback, | |
} | |
processed_calls.append(output) | |
return processed_calls | |
class AsyncLLMClient(ABC): | |
async def generate_text(self, prompt, system_message=None, response_format=None): | |
pass | |
class AsyncOpenAIClient(AsyncLLMClient): | |
def __init__(self, model="gpt-4o-mini"): | |
self.model = model | |
self.client = AsyncOpenAI() | |
async def generate_text(self, prompt, system_message=None, response_format=None): | |
messages = [ | |
{"role": "system", "content": system_message or "You are a helpful AI assistant."}, | |
{"role": "user", "content": prompt} | |
] | |
response = await self.client.beta.chat.completions.parse(model=self.model, messages=messages, response_format=response_format) | |
return response.choices[0].message.content | |
async def analyze_agent_steps(processed_calls, llm_client): | |
task_calls = defaultdict(list) | |
for call in processed_calls: | |
task_calls[call['weave_task_id']].append(call) | |
for task_id in task_calls: | |
task_calls[task_id].sort(key=lambda x: x['created_timestamp']) | |
tasks = [analyze_task(calls, llm_client) for task_id, calls in task_calls.items()] | |
task_analyses = await asyncio.gather(*tasks) | |
return dict(zip(task_calls.keys(), task_analyses)) | |
async def analyze_task(calls, llm_client): | |
step_tasks = [analyze_step(call, i+1, len(calls), llm_client) for i, call in enumerate(calls)] | |
steps = await asyncio.gather(*step_tasks) | |
try: | |
task_analysis = await summarize_task(steps, llm_client) | |
return { | |
'steps': steps, | |
'task_analysis': task_analysis | |
} | |
except Exception as e: | |
print(f"Error in task summarization: {str(e)}") | |
return TaskSummary( | |
overview="Summary generation failed", | |
key_successes=[], | |
main_challenges=[], | |
overall_assessment="Unable to assess due to error" | |
) | |
async def analyze_step(call, step_number, total_steps, llm_client): | |
prompt = f""" | |
Analyze Step {step_number}/{total_steps} of AI agent task: | |
Input: {call['inputs']} | |
Output: {call['outputs']} | |
Exception: {call['exception']} | |
Summary: {call['summary']} | |
Provide an analysis with: | |
1. A brief description of the agent's action. | |
2. Classify the action as one of: 'plan', 'tool', 'retrieve', or 'other'. | |
3. Give a brief evaluation of progress, obstacles, or errors. | |
4. Indicate if the agent successfully completed its intended action. | |
5. Write a concise headline summarizing the agent's action that is ideally less than 7 words long. | |
Ensure accuracy and conciseness. Be specific and avoid too high-level descriptions. | |
""" | |
system_message = "You are an expert AI system analyst, skilled in categorizing and evaluating AI agent actions." | |
analysis = await llm_client.generate_text(prompt, system_message, response_format=StepAnalysis) | |
try: | |
analysis = json.loads(analysis) | |
except json.JSONDecodeError: | |
print(f"Error parsing analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}. Using default values.") | |
analysis = print(f"Error in analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}: {str(e)}") | |
analysis = StepAnalysis( | |
description="Analysis failed", | |
category='other', | |
success=False, | |
assessment="Unable to assess due to error" | |
) | |
return { | |
'call_data': call, | |
'analysis': analysis | |
} | |
async def summarize_task(steps, llm_client): | |
steps_summary = "\n".join([f"Step {i+1}: {step['analysis']}" for i, step in enumerate(steps)]) | |
prompt = f""" | |
Analyze the following AI agent task steps: | |
{steps_summary} | |
Provide a summary with: | |
1. A concise overview of the agent's approach. | |
2. Main achievements or breakthroughs. | |
3. Primary obstacles or errors encountered. | |
4. A brief evaluation of the agent's overall performance. | |
Focus on patterns in the agent's approach and effectiveness. Be concise and insightful. | |
""" | |
system_message = "You are an expert AI performance analyst, skilled in evaluating and summarizing AI agent task execution." | |
analysis = await llm_client.generate_text(prompt, system_message, response_format=TaskSummary) | |
return json.loads(analysis) | |
# async def main(): | |
# client = weave.init("citp_agent_eval/usaco_1723148990") | |
# processed_calls = get_weave_calls(client) | |
# weave.finish() | |
# openai_client = AsyncOpenAIClient(model="gpt-4o-mini") | |
# task_analyses_openai = await analyze_agent_steps(processed_calls, openai_client) | |
# with open("task_analyses.json", "w") as f: | |
# json.dump(task_analyses_openai, f, indent=4) | |
# if __name__ == "__main__": | |
# asyncio.run(main()) |