Spaces:
Running
Running
import json | |
from pathlib import Path | |
import sqlite3 | |
import pickle | |
from functools import lru_cache | |
import threading | |
import pandas as pd | |
import ast | |
from scipy import stats | |
import yaml | |
import numpy as np | |
class TracePreprocessor: | |
def __init__(self, db_path='preprocessed_traces.db'): | |
self.db_path = db_path | |
self.local = threading.local() | |
def get_conn(self): | |
if not hasattr(self.local, 'conn'): | |
self.local.conn = sqlite3.connect(self.db_path) | |
return self.local.conn | |
def create_tables(self): | |
with self.get_conn() as conn: | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS preprocessed_traces ( | |
benchmark_name TEXT, | |
agent_name TEXT, | |
date TEXT, | |
run_id TEXT, | |
raw_logging_results BLOB, | |
PRIMARY KEY (benchmark_name, agent_name, run_id) | |
) | |
''') | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS failure_reports ( | |
benchmark_name TEXT, | |
agent_name TEXT, | |
date TEXT, | |
run_id TEXT, | |
failure_report BLOB, | |
PRIMARY KEY (benchmark_name, agent_name, run_id) | |
) | |
''') | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS parsed_results ( | |
benchmark_name TEXT, | |
agent_name TEXT, | |
date TEXT, | |
run_id TEXT, | |
successful_tasks TEXT, | |
failed_tasks TEXT, | |
total_cost REAL, | |
accuracy REAL, | |
precision REAL, | |
recall REAL, | |
f1_score REAL, | |
auc REAL, | |
overall_score REAL, | |
vectorization_score REAL, | |
fathomnet_score REAL, | |
feedback_score REAL, | |
house_price_score REAL, | |
spaceship_titanic_score REAL, | |
amp_parkinsons_disease_progression_prediction_score REAL, | |
cifar10_score REAL, | |
imdb_score REAL, | |
PRIMARY KEY (benchmark_name, agent_name, run_id) | |
) | |
''') | |
def preprocess_traces(self, processed_dir="evals_live"): | |
self.create_tables() | |
processed_dir = Path(processed_dir) | |
for file in processed_dir.glob('*.json'): | |
with open(file, 'r') as f: | |
data = json.load(f) | |
agent_name = data['config']['agent_name'] | |
benchmark_name = data['config']['benchmark_name'] | |
date = data['config']['date'] | |
config = data['config'] | |
try: | |
raw_logging_results = pickle.dumps(data['raw_logging_results']) | |
with self.get_conn() as conn: | |
conn.execute(''' | |
INSERT OR REPLACE INTO preprocessed_traces | |
(benchmark_name, agent_name, date, run_id, raw_logging_results) | |
VALUES (?, ?, ?, ?, ?) | |
''', (benchmark_name, agent_name, date, config['run_id'], raw_logging_results)) | |
except Exception as e: | |
print(f"Error preprocessing raw_logging_results in {file}: {e}") | |
try: | |
failure_report = pickle.dumps(data['failure_report']) | |
with self.get_conn() as conn: | |
conn.execute(''' | |
INSERT INTO failure_reports | |
(benchmark_name, agent_name, date, run_id, failure_report) | |
VALUES (?, ?, ?, ? ,?) | |
''', (benchmark_name, agent_name, date, config['run_id'], failure_report)) | |
except Exception as e: | |
print(f"Error preprocessing failure_report in {file}: {e}") | |
try: | |
config = data['config'] | |
results = data['results'] | |
with self.get_conn() as conn: | |
conn.execute(''' | |
INSERT INTO parsed_results | |
(benchmark_name, agent_name, date, run_id, successful_tasks, failed_tasks, total_cost, accuracy, precision, recall, f1_score, auc, overall_score, vectorization_score, fathomnet_score, feedback_score, house_price_score, spaceship_titanic_score, amp_parkinsons_disease_progression_prediction_score, cifar10_score, imdb_score) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
benchmark_name, | |
agent_name, | |
config['date'], | |
config['run_id'], | |
str(results.get('successful_tasks')), | |
str(results.get('failed_tasks')), | |
results.get('total_cost'), | |
results.get('accuracy'), | |
results.get('precision'), | |
results.get('recall'), | |
results.get('f1_score'), | |
results.get('auc'), | |
results.get('overall_score'), | |
results.get('vectorization_score'), | |
results.get('fathomnet_score'), | |
results.get('feedback_score'), | |
results.get('house-price_score'), | |
results.get('spaceship-titanic_score'), | |
results.get('amp-parkinsons-disease-progression-prediction_score'), | |
results.get('cifar10_score'), | |
results.get('imdb_score') | |
)) | |
except Exception as e: | |
print(f"Error preprocessing parsed results in {file}: {e}") | |
def get_analyzed_traces(self, agent_name, benchmark_name): | |
with self.get_conn() as conn: | |
query = ''' | |
SELECT agent_name, raw_logging_results, date FROM preprocessed_traces | |
WHERE benchmark_name = ? AND agent_name = ? | |
''' | |
df = pd.read_sql_query(query, conn, params=(benchmark_name, agent_name)) | |
# check for each row if raw_logging_results is not None with pickle.loads because it is stored as a byte string | |
df = df[df['raw_logging_results'].apply(lambda x: pickle.loads(x) is not None and x != 'None')] | |
if len(df) == 0: | |
return None | |
# select latest run | |
df = df.sort_values('date', ascending=False).groupby('agent_name').first().reset_index() | |
return pickle.loads(df['raw_logging_results'][0]) | |
def get_failure_report(self, agent_name, benchmark_name): | |
with self.get_conn() as conn: | |
query = ''' | |
SELECT agent_name, date, failure_report FROM failure_reports | |
WHERE benchmark_name = ? AND agent_name = ? | |
''' | |
df = pd.read_sql_query(query, conn, params=(benchmark_name, agent_name)) | |
# Select only rows for which failure report is not None and None is a string | |
df = df[df['failure_report'].apply(lambda x: pickle.loads(x) is not None and x != 'None')] | |
if len(df) == 0: | |
return None | |
# if there is multiple failure reports, take the last one | |
df = df.sort_values('date', ascending=False).groupby('agent_name').first().reset_index() | |
# if there is a failure report, return the first one | |
return pickle.loads(df['failure_report'][0]) | |
def _calculate_ci(self, data, confidence=0.95, type='minmax'): | |
data = data[np.isfinite(data)] | |
if len(data) < 2: | |
return '', '', '' # No CI for less than 2 samples | |
n = len(data) | |
mean = np.mean(data) | |
if type == 't': | |
sem = stats.sem(data) | |
ci = stats.t.interval(confidence, n-1, loc=mean, scale=sem) | |
elif type == 'minmax': | |
min = np.min(data) | |
max = np.max(data) | |
ci = (min, max) | |
return mean, ci[0], ci[1] | |
def get_parsed_results(self, benchmark_name, aggregate=True): | |
with self.get_conn() as conn: | |
query = ''' | |
SELECT * FROM parsed_results | |
WHERE benchmark_name = ? | |
ORDER BY accuracy DESC | |
''' | |
df = pd.read_sql_query(query, conn, params=(benchmark_name,)) | |
# Load verified agents | |
verified_agents = self.load_verified_agents() | |
# Add 'Verified' column | |
df['Verified'] = df.apply(lambda row: '✓' if (benchmark_name, row['agent_name']) in verified_agents else '', axis=1) | |
# Add column for how many times an agent_name appears in the DataFrame | |
df['Runs'] = df.groupby('agent_name')['agent_name'].transform('count') | |
# Compute the 95% confidence interval for accuracy and cost for agents that have been run more than once | |
df['acc_ci'] = None | |
df['cost_ci'] = None | |
for agent_name in df['agent_name'].unique(): | |
agent_df = df[df['agent_name'] == agent_name] | |
if len(agent_df) > 1: | |
accuracy_mean, accuracy_lower, accuracy_upper = self._calculate_ci(agent_df['accuracy'], type='minmax') | |
cost_mean, cost_lower, cost_upper = self._calculate_ci(agent_df['total_cost'], type='minmax') | |
# format the confidence interval with +/- sign | |
# accuracy_ci = f"± {abs(accuracy_mean - accuracy_lower):.3f}" | |
# cost_ci = f"± {abs(cost_mean - cost_lower):.3f}" | |
accuracy_ci = f"-{abs(accuracy_mean - accuracy_lower):.3f}/+{abs(accuracy_mean - accuracy_upper):.3f}" | |
cost_ci = f"-{abs(cost_mean - cost_lower):.3f}/+{abs(cost_mean - cost_upper):.3f}" | |
df.loc[df['agent_name'] == agent_name, 'acc_ci'] = accuracy_ci | |
df.loc[df['agent_name'] == agent_name, 'cost_ci'] = cost_ci | |
df = df.drop(columns=['successful_tasks', 'failed_tasks', 'run_id'], axis=1) | |
if aggregate: | |
# For agents that have been run more than once, compute the average accuracy and cost and use that as the value in the DataFrame | |
df = df.groupby('agent_name').agg({ | |
'date': 'first', | |
'total_cost': 'mean', | |
'accuracy': 'mean', | |
'precision': 'mean', | |
'recall': 'mean', | |
'f1_score': 'mean', | |
'auc': 'mean', | |
'overall_score': 'mean', | |
'vectorization_score': 'mean', | |
'fathomnet_score': 'mean', | |
'feedback_score': 'mean', | |
'house_price_score': 'mean', | |
'spaceship_titanic_score': 'mean', | |
'amp_parkinsons_disease_progression_prediction_score': 'mean', | |
'cifar10_score': 'mean', | |
'imdb_score': 'mean', | |
'Verified': 'first', | |
'Runs': 'first', | |
'acc_ci': 'first', | |
'cost_ci': 'first' | |
}).reset_index() | |
# Round float columns to 3 decimal places | |
float_columns = ['total_cost', 'accuracy', 'precision', 'recall', 'f1_score', 'auc', 'overall_score', 'vectorization_score', 'fathomnet_score', 'feedback_score', 'house-price_score', 'spaceship-titanic_score', 'amp-parkinsons-disease-progression-prediction_score', 'cifar10_score', 'imdb_score'] | |
for column in float_columns: | |
if column in df.columns: | |
df[column] = df[column].round(3) | |
# sort by accuracy | |
df = df.sort_values('accuracy', ascending=False) | |
# Rename columns | |
df = df.rename(columns={ | |
'agent_name': 'Agent Name', | |
'date': 'Date', | |
'total_cost': 'Total Cost', | |
'accuracy': 'Accuracy', | |
'precision': 'Precision', | |
'recall': 'Recall', | |
'f1_score': 'F1 Score', | |
'auc': 'AUC', | |
'overall_score': 'Overall Score', | |
'vectorization_score': 'Vectorization Score', | |
'fathomnet_score': 'Fathomnet Score', | |
'feedback_score': 'Feedback Score', | |
'house_price_score': 'House Price Score', | |
'spaceship_titanic_score': 'Spaceship Titanic Score', | |
'amp_parkinsons_disease_progression_prediction_score': 'AMP Parkinsons Disease Progression Prediction Score', | |
'cifar10_score': 'CIFAR10 Score', | |
'imdb_score': 'IMDB Score', | |
'acc_ci': 'Accuracy CI', | |
'cost_ci': 'Total Cost CI' | |
}) | |
return df | |
def get_task_success_data(self, benchmark_name): | |
with self.get_conn() as conn: | |
query = ''' | |
SELECT agent_name, accuracy, successful_tasks, failed_tasks | |
FROM parsed_results | |
WHERE benchmark_name = ? | |
''' | |
df = pd.read_sql_query(query, conn, params=(benchmark_name,)) | |
# for agent_names that have been run more than once, take the run with the highest accuracy | |
df = df.sort_values('accuracy', ascending=False).groupby('agent_name').first().reset_index() | |
# Get all unique task IDs | |
task_ids = set() | |
for tasks in df['successful_tasks']: | |
if ast.literal_eval(tasks) is not None: | |
task_ids.update(ast.literal_eval(tasks)) | |
for tasks in df['failed_tasks']: | |
if ast.literal_eval(tasks) is not None: | |
task_ids.update(ast.literal_eval(tasks)) | |
# Create a DataFrame with agent_name, task_ids, and success columns | |
data_list = [] | |
for _, row in df.iterrows(): | |
agent_name = row['agent_name'] | |
for task_id in task_ids: | |
success = 1 if task_id in row['successful_tasks'] else 0 | |
data_list.append({ | |
'agent_name': agent_name, | |
'task_id': task_id, | |
'success': success | |
}) | |
df = pd.DataFrame(data_list) | |
df = df.rename(columns={ | |
'agent_name': 'Agent Name', | |
'task_id': 'Task ID', | |
'success': 'Success' | |
}) | |
return df | |
def load_verified_agents(self, file_path='verified_agents.yaml'): | |
with open(file_path, 'r') as f: | |
verified_data = yaml.safe_load(f) | |
verified_agents = set() | |
for benchmark, agents in verified_data.items(): | |
for agent in agents: | |
verified_agents.add((benchmark, agent['agent_name'])) | |
return verified_agents | |
if __name__ == '__main__': | |
preprocessor = TracePreprocessor() | |
preprocessor.preprocess_traces() |