import json from pathlib import Path import sqlite3 import pickle from functools import lru_cache import threading import pandas as pd import ast from scipy import stats import yaml import numpy as np class TracePreprocessor: def __init__(self, db_path='preprocessed_traces.db'): self.db_path = db_path self.local = threading.local() def get_conn(self): if not hasattr(self.local, 'conn'): self.local.conn = sqlite3.connect(self.db_path) return self.local.conn def create_tables(self): with self.get_conn() as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS preprocessed_traces ( benchmark_name TEXT, agent_name TEXT, date TEXT, run_id TEXT, raw_logging_results BLOB, PRIMARY KEY (benchmark_name, agent_name, run_id) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS failure_reports ( benchmark_name TEXT, agent_name TEXT, date TEXT, run_id TEXT, failure_report BLOB, PRIMARY KEY (benchmark_name, agent_name, run_id) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS parsed_results ( benchmark_name TEXT, agent_name TEXT, date TEXT, run_id TEXT, successful_tasks TEXT, failed_tasks TEXT, total_cost REAL, accuracy REAL, precision REAL, recall REAL, f1_score REAL, auc REAL, overall_score REAL, vectorization_score REAL, fathomnet_score REAL, feedback_score REAL, house_price_score REAL, spaceship_titanic_score REAL, amp_parkinsons_disease_progression_prediction_score REAL, cifar10_score REAL, imdb_score REAL, PRIMARY KEY (benchmark_name, agent_name, run_id) ) ''') def preprocess_traces(self, processed_dir="evals_live"): self.create_tables() processed_dir = Path(processed_dir) for file in processed_dir.glob('*.json'): with open(file, 'r') as f: data = json.load(f) agent_name = data['config']['agent_name'] benchmark_name = data['config']['benchmark_name'] date = data['config']['date'] config = data['config'] try: raw_logging_results = pickle.dumps(data['raw_logging_results']) with self.get_conn() as conn: conn.execute(''' INSERT OR REPLACE INTO preprocessed_traces (benchmark_name, agent_name, date, run_id, raw_logging_results) VALUES (?, ?, ?, ?, ?) ''', (benchmark_name, agent_name, date, config['run_id'], raw_logging_results)) except Exception as e: print(f"Error preprocessing raw_logging_results in {file}: {e}") try: failure_report = pickle.dumps(data['failure_report']) with self.get_conn() as conn: conn.execute(''' INSERT INTO failure_reports (benchmark_name, agent_name, date, run_id, failure_report) VALUES (?, ?, ?, ? ,?) ''', (benchmark_name, agent_name, date, config['run_id'], failure_report)) except Exception as e: print(f"Error preprocessing failure_report in {file}: {e}") try: config = data['config'] results = data['results'] with self.get_conn() as conn: conn.execute(''' INSERT INTO parsed_results (benchmark_name, agent_name, date, run_id, successful_tasks, failed_tasks, total_cost, accuracy, precision, recall, f1_score, auc, overall_score, vectorization_score, fathomnet_score, feedback_score, house_price_score, spaceship_titanic_score, amp_parkinsons_disease_progression_prediction_score, cifar10_score, imdb_score) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( benchmark_name, agent_name, config['date'], config['run_id'], str(results.get('successful_tasks')), str(results.get('failed_tasks')), results.get('total_cost'), results.get('accuracy'), results.get('precision'), results.get('recall'), results.get('f1_score'), results.get('auc'), results.get('overall_score'), results.get('vectorization_score'), results.get('fathomnet_score'), results.get('feedback_score'), results.get('house-price_score'), results.get('spaceship-titanic_score'), results.get('amp-parkinsons-disease-progression-prediction_score'), results.get('cifar10_score'), results.get('imdb_score') )) except Exception as e: print(f"Error preprocessing parsed results in {file}: {e}") @lru_cache(maxsize=100) def get_analyzed_traces(self, agent_name, benchmark_name): with self.get_conn() as conn: query = ''' SELECT agent_name, raw_logging_results, date FROM preprocessed_traces WHERE benchmark_name = ? AND agent_name = ? ''' df = pd.read_sql_query(query, conn, params=(benchmark_name, agent_name)) # check for each row if raw_logging_results is not None with pickle.loads because it is stored as a byte string df = df[df['raw_logging_results'].apply(lambda x: pickle.loads(x) is not None and x != 'None')] if len(df) == 0: return None # select latest run df = df.sort_values('date', ascending=False).groupby('agent_name').first().reset_index() return pickle.loads(df['raw_logging_results'][0]) @lru_cache(maxsize=100) def get_failure_report(self, agent_name, benchmark_name): with self.get_conn() as conn: query = ''' SELECT agent_name, date, failure_report FROM failure_reports WHERE benchmark_name = ? AND agent_name = ? ''' df = pd.read_sql_query(query, conn, params=(benchmark_name, agent_name)) # Select only rows for which failure report is not None and None is a string df = df[df['failure_report'].apply(lambda x: pickle.loads(x) is not None and x != 'None')] if len(df) == 0: return None # if there is multiple failure reports, take the last one df = df.sort_values('date', ascending=False).groupby('agent_name').first().reset_index() # if there is a failure report, return the first one return pickle.loads(df['failure_report'][0]) def _calculate_ci(self, data, confidence=0.95): data = data[np.isfinite(data)] if len(data) < 2: return '', '', '' # No CI for less than 2 samples n = len(data) mean = np.mean(data) sem = stats.sem(data) ci = stats.t.interval(confidence, n-1, loc=mean, scale=sem) return mean, ci[0], ci[1] def get_parsed_results(self, benchmark_name, aggregate=True): with self.get_conn() as conn: query = ''' SELECT * FROM parsed_results WHERE benchmark_name = ? ORDER BY accuracy DESC ''' df = pd.read_sql_query(query, conn, params=(benchmark_name,)) # Load verified agents verified_agents = self.load_verified_agents() # Add 'Verified' column df['Verified'] = df.apply(lambda row: '✓' if (benchmark_name, row['agent_name']) in verified_agents else '', axis=1) # Add column for how many times an agent_name appears in the DataFrame df['Runs'] = df.groupby('agent_name')['agent_name'].transform('count') # Compute the 95% confidence interval for accuracy and cost for agents that have been run more than once df['acc_ci'] = None df['cost_ci'] = None for agent_name in df['agent_name'].unique(): agent_df = df[df['agent_name'] == agent_name] if len(agent_df) > 1: accuracy_mean, accuracy_lower, accuracy_upper = self._calculate_ci(agent_df['accuracy']) cost_mean, cost_lower, cost_upper = self._calculate_ci(agent_df['total_cost']) # format the confidence interval with +/- sign accuracy_ci = f"± {abs(accuracy_mean - accuracy_lower):.3f}" cost_ci = f"± {abs(cost_mean - cost_lower):.3f}" df.loc[df['agent_name'] == agent_name, 'acc_ci'] = accuracy_ci df.loc[df['agent_name'] == agent_name, 'cost_ci'] = cost_ci df = df.drop(columns=['successful_tasks', 'failed_tasks', 'run_id'], axis=1) if aggregate: # For agents that have been run more than once, compute the average accuracy and cost and use that as the value in the DataFrame df = df.groupby('agent_name').agg({ 'date': 'first', 'total_cost': 'mean', 'accuracy': 'mean', 'precision': 'mean', 'recall': 'mean', 'f1_score': 'mean', 'auc': 'mean', 'overall_score': 'mean', 'vectorization_score': 'mean', 'fathomnet_score': 'mean', 'feedback_score': 'mean', 'house_price_score': 'mean', 'spaceship_titanic_score': 'mean', 'amp_parkinsons_disease_progression_prediction_score': 'mean', 'cifar10_score': 'mean', 'imdb_score': 'mean', 'Verified': 'first', 'Runs': 'first', 'acc_ci': 'first', 'cost_ci': 'first' }).reset_index() # Round float columns to 3 decimal places float_columns = ['total_cost', 'accuracy', 'precision', 'recall', 'f1_score', 'auc', 'overall_score', 'vectorization_score', 'fathomnet_score', 'feedback_score', 'house-price_score', 'spaceship-titanic_score', 'amp-parkinsons-disease-progression-prediction_score', 'cifar10_score', 'imdb_score'] for column in float_columns: if column in df.columns: df[column] = df[column].round(3) # sort by accuracy df = df.sort_values('accuracy', ascending=False) # Rename columns df = df.rename(columns={ 'agent_name': 'Agent Name', 'date': 'Date', 'total_cost': 'Total Cost', 'accuracy': 'Accuracy', 'precision': 'Precision', 'recall': 'Recall', 'f1_score': 'F1 Score', 'auc': 'AUC', 'overall_score': 'Overall Score', 'vectorization_score': 'Vectorization Score', 'fathomnet_score': 'Fathomnet Score', 'feedback_score': 'Feedback Score', 'house_price_score': 'House Price Score', 'spaceship_titanic_score': 'Spaceship Titanic Score', 'amp_parkinsons_disease_progression_prediction_score': 'AMP Parkinsons Disease Progression Prediction Score', 'cifar10_score': 'CIFAR10 Score', 'imdb_score': 'IMDB Score', 'acc_ci': 'Accuracy CI', 'cost_ci': 'Total Cost CI' }) return df def get_task_success_data(self, benchmark_name): with self.get_conn() as conn: query = ''' SELECT agent_name, accuracy, successful_tasks, failed_tasks FROM parsed_results WHERE benchmark_name = ? ''' df = pd.read_sql_query(query, conn, params=(benchmark_name,)) # for agent_names that have been run more than once, take the run with the highest accuracy df = df.sort_values('accuracy', ascending=False).groupby('agent_name').first().reset_index() # Get all unique task IDs task_ids = set() for tasks in df['successful_tasks']: if ast.literal_eval(tasks) is not None: task_ids.update(ast.literal_eval(tasks)) for tasks in df['failed_tasks']: if ast.literal_eval(tasks) is not None: task_ids.update(ast.literal_eval(tasks)) # Create a DataFrame with agent_name, task_ids, and success columns data_list = [] for _, row in df.iterrows(): agent_name = row['agent_name'] for task_id in task_ids: success = 1 if task_id in row['successful_tasks'] else 0 data_list.append({ 'agent_name': agent_name, 'task_id': task_id, 'success': success }) df = pd.DataFrame(data_list) df = df.rename(columns={ 'agent_name': 'Agent Name', 'task_id': 'Task ID', 'success': 'Success' }) return df def load_verified_agents(self, file_path='verified_agents.yaml'): with open(file_path, 'r') as f: verified_data = yaml.safe_load(f) verified_agents = set() for benchmark, agents in verified_data.items(): for agent in agents: verified_agents.add((benchmark, agent['agent_name'])) return verified_agents if __name__ == '__main__': preprocessor = TracePreprocessor() preprocessor.preprocess_traces()