benediktstroebl's picture
format update and added monitor llm client backend
cd69490
raw
history blame
6.29 kB
import asyncio
from openai import AsyncOpenAI
from collections import defaultdict
import weave
from pydantic import BaseModel
from abc import ABC, abstractmethod
import json
class StepAnalysis(BaseModel):
description: str
action_type: str
assessment: str
success: bool
headline: str
class TaskSummary(BaseModel):
overview: str
key_successes: str
main_challenges: str
overall_assessment: str
def get_weave_calls(client):
calls = client.calls()
processed_calls = []
for call in calls:
ChatCompletion = weave.ref(call.output).get()
choices = [choice.message.content for choice in ChatCompletion.choices]
output = {
'weave_task_id': call.attributes['weave_task_id'],
'trace_id': call.trace_id,
'project_id': call.project_id,
'created_timestamp': ChatCompletion.created,
'inputs': dict(call.inputs),
'id': call.id,
'outputs': {'choices' : choices},
'exception': call.exception,
'summary': call.summary,
'display_name': call.display_name,
'attributes': dict(call.attributes),
"_children": call._children,
'_feedback': call._feedback,
}
processed_calls.append(output)
return processed_calls
class AsyncLLMClient(ABC):
@abstractmethod
async def generate_text(self, prompt, system_message=None, response_format=None):
pass
class AsyncOpenAIClient(AsyncLLMClient):
def __init__(self, model="gpt-4o-mini"):
self.model = model
self.client = AsyncOpenAI()
async def generate_text(self, prompt, system_message=None, response_format=None):
messages = [
{"role": "system", "content": system_message or "You are a helpful AI assistant."},
{"role": "user", "content": prompt}
]
response = await self.client.beta.chat.completions.parse(model=self.model, messages=messages, response_format=response_format)
return response.choices[0].message.content
async def analyze_agent_steps(processed_calls, llm_client):
task_calls = defaultdict(list)
for call in processed_calls:
task_calls[call['weave_task_id']].append(call)
for task_id in task_calls:
task_calls[task_id].sort(key=lambda x: x['created_timestamp'])
tasks = [analyze_task(calls, llm_client) for task_id, calls in task_calls.items()]
task_analyses = await asyncio.gather(*tasks)
return dict(zip(task_calls.keys(), task_analyses))
async def analyze_task(calls, llm_client):
step_tasks = [analyze_step(call, i+1, len(calls), llm_client) for i, call in enumerate(calls)]
steps = await asyncio.gather(*step_tasks)
try:
task_analysis = await summarize_task(steps, llm_client)
return {
'steps': steps,
'task_analysis': task_analysis
}
except Exception as e:
print(f"Error in task summarization: {str(e)}")
return TaskSummary(
overview="Summary generation failed",
key_successes=[],
main_challenges=[],
overall_assessment="Unable to assess due to error"
)
async def analyze_step(call, step_number, total_steps, llm_client):
prompt = f"""
Analyze Step {step_number}/{total_steps} of AI agent task:
Input: {call['inputs']}
Output: {call['outputs']}
Exception: {call['exception']}
Summary: {call['summary']}
Provide an analysis with:
1. A brief description of the agent's action.
2. Classify the action as one of: 'plan', 'tool', 'retrieve', or 'other'.
3. Give a brief evaluation of progress, obstacles, or errors.
4. Indicate if the agent successfully completed its intended action.
5. Write a concise headline summarizing the agent's action that is ideally less than 7 words long.
Ensure accuracy and conciseness. Be specific and avoid too high-level descriptions.
"""
system_message = "You are an expert AI system analyst, skilled in categorizing and evaluating AI agent actions."
analysis = await llm_client.generate_text(prompt, system_message, response_format=StepAnalysis)
try:
analysis = json.loads(analysis)
except json.JSONDecodeError:
print(f"Error parsing analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}. Using default values.")
analysis = print(f"Error in analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}: {str(e)}")
analysis = StepAnalysis(
description="Analysis failed",
category='other',
success=False,
assessment="Unable to assess due to error"
)
return {
'call_data': call,
'analysis': analysis
}
async def summarize_task(steps, llm_client):
steps_summary = "\n".join([f"Step {i+1}: {step['analysis']}" for i, step in enumerate(steps)])
prompt = f"""
Analyze the following AI agent task steps:
{steps_summary}
Provide a summary with:
1. A concise overview of the agent's approach.
2. Main achievements or breakthroughs.
3. Primary obstacles or errors encountered.
4. A brief evaluation of the agent's overall performance.
Focus on patterns in the agent's approach and effectiveness. Be concise and insightful.
"""
system_message = "You are an expert AI performance analyst, skilled in evaluating and summarizing AI agent task execution."
analysis = await llm_client.generate_text(prompt, system_message, response_format=TaskSummary)
return json.loads(analysis)
# async def main():
# client = weave.init("citp_agent_eval/usaco_1723148990")
# processed_calls = get_weave_calls(client)
# weave.finish()
# openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
# task_analyses_openai = await analyze_agent_steps(processed_calls, openai_client)
# with open("task_analyses.json", "w") as f:
# json.dump(task_analyses_openai, f, indent=4)
# if __name__ == "__main__":
# asyncio.run(main())