import gradio as gr from gradio_leaderboard import Leaderboard, SelectColumns, ColumnFilter import config from envs import RESULTS_REPO_ID, REPO_ID, API, HF_TOKEN from pathlib import Path import pandas as pd import os import json from utils.data import parse_json_files from utils.viz import create_scatter_plot, create_flow_chart from utils.processing import check_and_process_uploads from huggingface_hub import snapshot_download from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime import json import re import markdown import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler import weave from datetime import datetime weave.init(f'leaderboard_testing_{datetime.now().strftime("%Y%m%d%H%M%S")}') def restart_space(): API.restart_space(repo_id=REPO_ID, token=HF_TOKEN) # New function to download results def download_latest_results(): print("Downloading latest results...") snapshot_download(RESULTS_REPO_ID, local_dir=abs_path / "evals_upload", repo_type='dataset', tqdm_class=None, etag_timeout=30, max_workers=4, ) print("Download complete.") abs_path = Path(__file__).parent # Global variable to store preprocessed data preprocessed_traces = {} def preprocess_traces(): global preprocessed_traces processed_dir = Path("evals_live") for file in processed_dir.glob('*.json'): try: with open(file, 'r') as f: data = json.load(f) agent_name = data['config']['agent_name'] benchmark_name = data['config']['benchmark_name'] if benchmark_name not in preprocessed_traces: preprocessed_traces[benchmark_name] = {} assert type(data['raw_logging_results']) == dict, f"Invalid format for raw_logging_results: {type(data['raw_logging_results'])}" preprocessed_traces[benchmark_name][agent_name] = data['raw_logging_results'] except AssertionError as e: preprocessed_traces[benchmark_name][agent_name] = None except Exception as e: print(f"Error preprocessing {file}: {e}") preprocessed_traces[benchmark_name][agent_name] = None def get_analyzed_traces(agent_name, benchmark_name): return preprocessed_traces.get(benchmark_name, {}).get(agent_name) def update_agent_dropdown(benchmark_name, metric): df = parse_json_files(os.path.join(abs_path, "evals_live"), benchmark_name) agents = df['Agent Name'].tolist() best_agent = get_best_agent(benchmark_name, metric) return gr.Dropdown(choices=agents, value=best_agent, label="Select Agent") def get_best_agent(benchmark_name, metric): df = parse_json_files(os.path.join(abs_path, "evals_live"), benchmark_name) return df.loc[df[metric].idxmax()]['Agent Name'] def update_task_analysis(benchmark_name, agent_name): if not agent_name: return "Please select an agent.", None, None, "" analyzed_traces = get_analyzed_traces(agent_name, benchmark_name) if not analyzed_traces: return f"No analysis available for agent: {agent_name}", None, None, "" task_ids = list(analyzed_traces.keys()) overview, flow_chart, _ = update_task_details(benchmark_name, agent_name, task_ids[0]) return overview, flow_chart, gr.Dropdown(choices=task_ids, value=task_ids[0], label="Select Task"), "" def update_task_details(benchmark_name, agent_name, task_id): if not task_id: return "Please select a task.", None, "" analyzed_traces = get_analyzed_traces(agent_name, benchmark_name) if not analyzed_traces or task_id not in analyzed_traces: return f"No analysis available for task: {task_id}", None, "" analysis = analyzed_traces[task_id] summary = analysis.get('task_analysis', {}) overview = f"## Task Overview\n\n{summary.get('overview', 'No overview available.')}\n\n" overview += f"### Successes\n{summary.get('key_successes', 'No successes listed.')}\n\n" overview += f"### Challenges\n{summary.get('main_challenges', 'No challenges listed.')}\n\n" overview += f"### Overall Assessment\n{summary.get('overall_assessment', 'No assessment available.')}\n\n" flow_chart = create_flow_chart(analysis['steps']) return overview, flow_chart, "" def format_call_info(step, step_index): call_data = step['call_data'] analysis = step['analysis'] def format_json(obj): # if isinstance(obj, dict) and 'choices' in obj: # # Special handling for message content # formatted_content = format_message_content(obj['choices'][0]) # return f'
{formatted_content}
' # else: json_str = json.dumps(obj, indent=2) json_str = json_str.replace(' ', ' ') json_str = json_str.replace('\n', '
') return f'
{json_str}
' # Currently not used but we can enable it to format message content def format_message_content(content): # Convert Markdown to HTML html_content = markdown.markdown(content) # Replace ``` code blocks with styled pre blocks html_content = re.sub(r'```python\n(.*?)```', lambda m: f'
{m.group(1)}
', html_content, flags=re.DOTALL) return html_content formatted_info = f"""

Step {step_index + 1}: {analysis.get('headline', 'N/A')}

Call Metadata

Inputs

{format_json(call_data['inputs'])}

Outputs

{format_json(call_data['outputs'])}

Usage

{format_json(call_data['summary'])}

Analysis

""" return formatted_info with gr.Blocks() as demo: gr.Markdown(""" # 🥇 Agent Leaderboard """) with gr.Tabs(): with gr.Tab("USACO"): with gr.Row(): with gr.Column(scale=2): Leaderboard( value=parse_json_files(os.path.join(abs_path, "evals_live"), 'usaco'), select_columns=SelectColumns( default_selection=config.USACO_ON_LOAD_COLUMNS, cant_deselect=["Agent Name"], label="Select Columns to Display:", ), search_columns=config.USACO_SEARCH_COLUMNS, column_widths={"Agent Name": 40, "Accuracy": 20, "Total Cost": 20}, ) with gr.Row(): scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals_live"), 'usaco'), "Total Cost", "Accuracy", "Total Cost (in USD)", "Accuracy", ["Agent Name"])) gr.Markdown("## Agent Monitor") with gr.Row(): with gr.Column(scale=1): agent_dropdown = gr.Dropdown(label="Select Agent") with gr.Column(scale=1): task_dropdown = gr.Dropdown(label="Select USACO Task") with gr.Row(): task_overview = gr.Markdown() with gr.Row(): flow_chart = gr.Plot(label="Task Flow") # Initialize the agent dropdown with the best agent demo.load(update_agent_dropdown, inputs=[gr.Textbox(value="usaco", visible=False), gr.Textbox(value="Accuracy", visible=False)], outputs=[agent_dropdown]) demo.load(update_task_analysis, inputs=[gr.Textbox(value="usaco", visible=False), agent_dropdown], outputs=[task_overview, flow_chart, task_dropdown, gr.Textbox(visible=False)]) agent_dropdown.change(update_task_analysis, inputs=[gr.Textbox(value="usaco", visible=False), agent_dropdown], outputs=[task_overview, flow_chart, task_dropdown, gr.Textbox(visible=False)]) task_dropdown.change(update_task_details, inputs=[gr.Textbox(value="usaco", visible=False), agent_dropdown, task_dropdown], outputs=[task_overview, flow_chart, gr.Textbox(visible=False)]) gr.Markdown("## Raw Predictions") with gr.Row(): with gr.Column(scale=1): raw_agent_dropdown = gr.Dropdown(label="Select Agent") with gr.Column(scale=1): raw_task_dropdown = gr.Dropdown(label="Select Task") with gr.Column(scale=1): raw_step_dropdown = gr.Dropdown(label="Select Step") with gr.Row(): raw_call_details = gr.HTML() def update_raw_task_dropdown(agent_name): analyzed_traces = get_analyzed_traces(agent_name, "usaco") if not analyzed_traces: return gr.Dropdown(choices=[], label="Select Task"), gr.Dropdown(choices=[], label="Select Step"), f"No raw predictions data available for agent: {agent_name}." task_ids = list(analyzed_traces.keys()) steps = analyzed_traces[task_ids[0]]['steps'] return gr.Dropdown(choices=task_ids, label="Select Task", value=task_ids[0]), gr.Dropdown(choices=[(f"Step {i+1}", i) for i in range(len(steps))], label="Select Step", value=0), update_raw_call_details(agent_name, task_ids[0], 0) def update_raw_step_dropdown(agent_name, task_id): analyzed_traces = get_analyzed_traces(agent_name, "usaco") if not analyzed_traces or task_id not in analyzed_traces: return gr.Dropdown(choices=[], label="Select Step", value="No data available.") steps = analyzed_traces[task_id]['steps'] return gr.Dropdown(choices=[(f"Step {i+1}", i) for i in range(len(steps))], label="Select Step", value=0) def update_raw_call_details(agent_name, task_id, step_index): analyzed_traces = get_analyzed_traces(agent_name, "usaco") if not analyzed_traces or task_id not in analyzed_traces: return "No data available for this selection." steps = analyzed_traces[task_id]['steps'] if step_index is None: return "Invalid step selection." step = steps[step_index] return format_call_info(step, step_index) # Initialize the raw agent dropdown with all agents demo.load(update_agent_dropdown, inputs=[gr.Textbox(value="usaco", visible=False), gr.Textbox(value="Accuracy", visible=False)], outputs=[raw_agent_dropdown]) demo.load(update_raw_task_dropdown, inputs=[raw_agent_dropdown], outputs=[raw_task_dropdown, raw_step_dropdown]) demo.load(update_raw_call_details, inputs=[raw_agent_dropdown, raw_task_dropdown, raw_step_dropdown], outputs=[raw_call_details]) raw_agent_dropdown.change(update_raw_task_dropdown, inputs=[raw_agent_dropdown], outputs=[raw_task_dropdown, raw_step_dropdown, raw_call_details]) raw_task_dropdown.change(update_raw_step_dropdown, inputs=[raw_agent_dropdown, raw_task_dropdown], outputs=[raw_step_dropdown]) raw_step_dropdown.change(update_raw_call_details, inputs=[raw_agent_dropdown, raw_task_dropdown, raw_step_dropdown], outputs=[raw_call_details]) with gr.Tab("SWE-Bench"): with gr.Row(): with gr.Column(scale=1): scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals_live"), 'swebench_lite'), "Total Cost", "Accuracy", "Total Cost (in USD)", "Accuracy", ["Agent Name"])) with gr.Column(scale=1): Leaderboard( value=parse_json_files(os.path.join(abs_path, "evals_live"), 'swebench_lite'), select_columns=SelectColumns( default_selection=config.SWEBENCH_ON_LOAD_COLUMNS, cant_deselect=["Agent Name"], label="Select Columns to Display:", ), search_columns=config.SWEBENCH_SEARCH_COLUMNS, column_widths={"Agent Name": 40, "Accuracy": 20, "Total Cost": 20}, ) with gr.Tab("About"): gr.Markdown((Path(__file__).parent / "about.md").read_text()) async def main(): # Preprocess traces preprocess_traces() # Download the results from the Hugging Face Hub await asyncio.to_thread(download_latest_results) # Check for new uploads and process them await check_and_process_uploads() scheduler = AsyncIOScheduler() scheduler.add_job(restart_space, "interval", hours=1) scheduler.add_job(download_latest_results, "interval", hours=1) scheduler.add_job(check_and_process_uploads, "interval", hours=1) scheduler.start() await demo.launch() if __name__ == "__main__": asyncio.run(main())