benediktstroebl's picture
Upload 2 files
b69a733 verified
raw
history blame
15 kB
import json
from pathlib import Path
import sqlite3
import pickle
from functools import lru_cache
import threading
import pandas as pd
import ast
from scipy import stats
import yaml
import numpy as np
class TracePreprocessor:
def __init__(self, db_path='preprocessed_traces.db'):
self.db_path = db_path
self.local = threading.local()
def get_conn(self):
if not hasattr(self.local, 'conn'):
self.local.conn = sqlite3.connect(self.db_path)
return self.local.conn
def create_tables(self):
with self.get_conn() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS preprocessed_traces (
benchmark_name TEXT,
agent_name TEXT,
date TEXT,
run_id TEXT,
raw_logging_results BLOB,
PRIMARY KEY (benchmark_name, agent_name, run_id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS failure_reports (
benchmark_name TEXT,
agent_name TEXT,
date TEXT,
run_id TEXT,
failure_report BLOB,
PRIMARY KEY (benchmark_name, agent_name, run_id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS parsed_results (
benchmark_name TEXT,
agent_name TEXT,
date TEXT,
run_id TEXT,
successful_tasks TEXT,
failed_tasks TEXT,
total_cost REAL,
accuracy REAL,
precision REAL,
recall REAL,
f1_score REAL,
auc REAL,
overall_score REAL,
vectorization_score REAL,
fathomnet_score REAL,
feedback_score REAL,
house_price_score REAL,
spaceship_titanic_score REAL,
amp_parkinsons_disease_progression_prediction_score REAL,
cifar10_score REAL,
imdb_score REAL,
PRIMARY KEY (benchmark_name, agent_name, run_id)
)
''')
def preprocess_traces(self, processed_dir="evals_live"):
self.create_tables()
processed_dir = Path(processed_dir)
for file in processed_dir.glob('*.json'):
with open(file, 'r') as f:
data = json.load(f)
agent_name = data['config']['agent_name']
benchmark_name = data['config']['benchmark_name']
date = data['config']['date']
config = data['config']
try:
raw_logging_results = pickle.dumps(data['raw_logging_results'])
with self.get_conn() as conn:
conn.execute('''
INSERT OR REPLACE INTO preprocessed_traces
(benchmark_name, agent_name, date, run_id, raw_logging_results)
VALUES (?, ?, ?, ?, ?)
''', (benchmark_name, agent_name, date, config['run_id'], raw_logging_results))
except Exception as e:
print(f"Error preprocessing raw_logging_results in {file}: {e}")
try:
failure_report = pickle.dumps(data['failure_report'])
with self.get_conn() as conn:
conn.execute('''
INSERT INTO failure_reports
(benchmark_name, agent_name, date, run_id, failure_report)
VALUES (?, ?, ?, ? ,?)
''', (benchmark_name, agent_name, date, config['run_id'], failure_report))
except Exception as e:
print(f"Error preprocessing failure_report in {file}: {e}")
try:
config = data['config']
results = data['results']
with self.get_conn() as conn:
conn.execute('''
INSERT INTO parsed_results
(benchmark_name, agent_name, date, run_id, successful_tasks, failed_tasks, total_cost, accuracy, precision, recall, f1_score, auc, overall_score, vectorization_score, fathomnet_score, feedback_score, house_price_score, spaceship_titanic_score, amp_parkinsons_disease_progression_prediction_score, cifar10_score, imdb_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
benchmark_name,
agent_name,
config['date'],
config['run_id'],
str(results.get('successful_tasks')),
str(results.get('failed_tasks')),
results.get('total_cost'),
results.get('accuracy'),
results.get('precision'),
results.get('recall'),
results.get('f1_score'),
results.get('auc'),
results.get('overall_score'),
results.get('vectorization_score'),
results.get('fathomnet_score'),
results.get('feedback_score'),
results.get('house-price_score'),
results.get('spaceship-titanic_score'),
results.get('amp-parkinsons-disease-progression-prediction_score'),
results.get('cifar10_score'),
results.get('imdb_score')
))
except Exception as e:
print(f"Error preprocessing parsed results in {file}: {e}")
@lru_cache(maxsize=100)
def get_analyzed_traces(self, agent_name, benchmark_name):
with self.get_conn() as conn:
query = '''
SELECT agent_name, raw_logging_results, date FROM preprocessed_traces
WHERE benchmark_name = ? AND agent_name = ?
'''
df = pd.read_sql_query(query, conn, params=(benchmark_name, agent_name))
# check for each row if raw_logging_results is not None with pickle.loads because it is stored as a byte string
df = df[df['raw_logging_results'].apply(lambda x: pickle.loads(x) is not None and x != 'None')]
if len(df) == 0:
return None
# select latest run
df = df.sort_values('date', ascending=False).groupby('agent_name').first().reset_index()
return pickle.loads(df['raw_logging_results'][0])
@lru_cache(maxsize=100)
def get_failure_report(self, agent_name, benchmark_name):
with self.get_conn() as conn:
query = '''
SELECT agent_name, date, failure_report FROM failure_reports
WHERE benchmark_name = ? AND agent_name = ?
'''
df = pd.read_sql_query(query, conn, params=(benchmark_name, agent_name))
# Select only rows for which failure report is not None and None is a string
df = df[df['failure_report'].apply(lambda x: pickle.loads(x) is not None and x != 'None')]
if len(df) == 0:
return None
# if there is multiple failure reports, take the last one
df = df.sort_values('date', ascending=False).groupby('agent_name').first().reset_index()
# if there is a failure report, return the first one
return pickle.loads(df['failure_report'][0])
def _calculate_ci(self, data, confidence=0.95, type='minmax'):
data = data[np.isfinite(data)]
if len(data) < 2:
return '', '', '' # No CI for less than 2 samples
n = len(data)
mean = np.mean(data)
if type == 't':
sem = stats.sem(data)
ci = stats.t.interval(confidence, n-1, loc=mean, scale=sem)
elif type == 'minmax':
min = np.min(data)
max = np.max(data)
ci = (min, max)
return mean, ci[0], ci[1]
def get_parsed_results(self, benchmark_name, aggregate=True):
with self.get_conn() as conn:
query = '''
SELECT * FROM parsed_results
WHERE benchmark_name = ?
ORDER BY accuracy DESC
'''
df = pd.read_sql_query(query, conn, params=(benchmark_name,))
# Load verified agents
verified_agents = self.load_verified_agents()
# Add 'Verified' column
df['Verified'] = df.apply(lambda row: '✓' if (benchmark_name, row['agent_name']) in verified_agents else '', axis=1)
# Add column for how many times an agent_name appears in the DataFrame
df['Runs'] = df.groupby('agent_name')['agent_name'].transform('count')
# Compute the 95% confidence interval for accuracy and cost for agents that have been run more than once
df['acc_ci'] = None
df['cost_ci'] = None
for agent_name in df['agent_name'].unique():
agent_df = df[df['agent_name'] == agent_name]
if len(agent_df) > 1:
accuracy_mean, accuracy_lower, accuracy_upper = self._calculate_ci(agent_df['accuracy'], type='minmax')
cost_mean, cost_lower, cost_upper = self._calculate_ci(agent_df['total_cost'], type='minmax')
# format the confidence interval with +/- sign
# accuracy_ci = f"± {abs(accuracy_mean - accuracy_lower):.3f}"
# cost_ci = f"± {abs(cost_mean - cost_lower):.3f}"
accuracy_ci = f"-{abs(accuracy_mean - accuracy_lower):.3f}/+{abs(accuracy_mean - accuracy_upper):.3f}"
cost_ci = f"-{abs(cost_mean - cost_lower):.3f}/+{abs(cost_mean - cost_upper):.3f}"
df.loc[df['agent_name'] == agent_name, 'acc_ci'] = accuracy_ci
df.loc[df['agent_name'] == agent_name, 'cost_ci'] = cost_ci
df = df.drop(columns=['successful_tasks', 'failed_tasks', 'run_id'], axis=1)
if aggregate:
# For agents that have been run more than once, compute the average accuracy and cost and use that as the value in the DataFrame
df = df.groupby('agent_name').agg({
'date': 'first',
'total_cost': 'mean',
'accuracy': 'mean',
'precision': 'mean',
'recall': 'mean',
'f1_score': 'mean',
'auc': 'mean',
'overall_score': 'mean',
'vectorization_score': 'mean',
'fathomnet_score': 'mean',
'feedback_score': 'mean',
'house_price_score': 'mean',
'spaceship_titanic_score': 'mean',
'amp_parkinsons_disease_progression_prediction_score': 'mean',
'cifar10_score': 'mean',
'imdb_score': 'mean',
'Verified': 'first',
'Runs': 'first',
'acc_ci': 'first',
'cost_ci': 'first'
}).reset_index()
# Round float columns to 3 decimal places
float_columns = ['total_cost', 'accuracy', 'precision', 'recall', 'f1_score', 'auc', 'overall_score', 'vectorization_score', 'fathomnet_score', 'feedback_score', 'house-price_score', 'spaceship-titanic_score', 'amp-parkinsons-disease-progression-prediction_score', 'cifar10_score', 'imdb_score']
for column in float_columns:
if column in df.columns:
df[column] = df[column].round(3)
# sort by accuracy
df = df.sort_values('accuracy', ascending=False)
# Rename columns
df = df.rename(columns={
'agent_name': 'Agent Name',
'date': 'Date',
'total_cost': 'Total Cost',
'accuracy': 'Accuracy',
'precision': 'Precision',
'recall': 'Recall',
'f1_score': 'F1 Score',
'auc': 'AUC',
'overall_score': 'Overall Score',
'vectorization_score': 'Vectorization Score',
'fathomnet_score': 'Fathomnet Score',
'feedback_score': 'Feedback Score',
'house_price_score': 'House Price Score',
'spaceship_titanic_score': 'Spaceship Titanic Score',
'amp_parkinsons_disease_progression_prediction_score': 'AMP Parkinsons Disease Progression Prediction Score',
'cifar10_score': 'CIFAR10 Score',
'imdb_score': 'IMDB Score',
'acc_ci': 'Accuracy CI',
'cost_ci': 'Total Cost CI'
})
return df
def get_task_success_data(self, benchmark_name):
with self.get_conn() as conn:
query = '''
SELECT agent_name, accuracy, successful_tasks, failed_tasks
FROM parsed_results
WHERE benchmark_name = ?
'''
df = pd.read_sql_query(query, conn, params=(benchmark_name,))
# for agent_names that have been run more than once, take the run with the highest accuracy
df = df.sort_values('accuracy', ascending=False).groupby('agent_name').first().reset_index()
# Get all unique task IDs
task_ids = set()
for tasks in df['successful_tasks']:
if ast.literal_eval(tasks) is not None:
task_ids.update(ast.literal_eval(tasks))
for tasks in df['failed_tasks']:
if ast.literal_eval(tasks) is not None:
task_ids.update(ast.literal_eval(tasks))
# Create a DataFrame with agent_name, task_ids, and success columns
data_list = []
for _, row in df.iterrows():
agent_name = row['agent_name']
for task_id in task_ids:
success = 1 if task_id in row['successful_tasks'] else 0
data_list.append({
'agent_name': agent_name,
'task_id': task_id,
'success': success
})
df = pd.DataFrame(data_list)
df = df.rename(columns={
'agent_name': 'Agent Name',
'task_id': 'Task ID',
'success': 'Success'
})
return df
def load_verified_agents(self, file_path='verified_agents.yaml'):
with open(file_path, 'r') as f:
verified_data = yaml.safe_load(f)
verified_agents = set()
for benchmark, agents in verified_data.items():
for agent in agents:
verified_agents.add((benchmark, agent['agent_name']))
return verified_agents
if __name__ == '__main__':
preprocessor = TracePreprocessor()
preprocessor.preprocess_traces()