import gradio as gr from gradio_leaderboard import Leaderboard, SelectColumns, ColumnFilter import config from envs import RESULTS_REPO_ID, REPO_ID, API, HF_TOKEN from pathlib import Path import pandas as pd import os import json from utils import parse_json_files, create_scatter_plot, create_flow_chart from huggingface_hub import snapshot_download from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime import json import re import markdown def restart_space(): API.restart_space(repo_id=REPO_ID, token=HF_TOKEN) # New function to download results def download_latest_results(): print("Downloading latest results...") snapshot_download(RESULTS_REPO_ID, local_dir=abs_path / "evals", repo_type='dataset', tqdm_class=None, etag_timeout=30, max_workers=4, ) print("Download complete.") abs_path = Path(__file__).parent # load task_analyses.json from evals/usaco_traces folder with open(os.path.join(abs_path, "evals", "usaco_traces", "task_analyses.json"), "r") as f: analyzed_traces = json.load(f) def update_task_analysis(task_id): if task_id not in analyzed_traces: return "No analysis available for this task.", None, [], "" analysis = analyzed_traces[task_id] summary = analysis['summary'] if isinstance(summary, str): try: summary = json.loads(summary) except json.JSONDecodeError: return "Error: Unable to parse summary data.", None, [], "" elif not isinstance(summary, dict): return "Error: Summary data is in an unexpected format.", None, [], "" overview = f"# Task Overview\n\n{summary.get('overview', 'No overview available.')}\n\n" overview += f"## Successes\n{summary.get('successes', 'No successes listed.')}\n\n" overview += f"## Challenges\n{summary.get('challenges', 'No challenges listed.')}\n\n" steps = [(f"Step {i+1}", i) for i in range(len(analysis['steps']))] flow_chart = create_flow_chart(analysis['steps']) return overview, flow_chart, gr.Dropdown(choices=steps, label="Agent Steps"), "" def update_step_details(task_id, step_index): if task_id not in analyzed_traces: return "No analysis available for this task." if step_index is None: return "Please select a step to view details." steps = analyzed_traces[task_id]['steps'] if isinstance(step_index, tuple): step_index = step_index[1] elif isinstance(step_index, str): step_index = int(step_index.split()[-1]) - 1 if step_index < 0 or step_index >= len(steps): return f"Invalid step index: {step_index}" step = steps[step_index] analysis = step['analysis'] if isinstance(analysis, str): try: analysis = json.loads(analysis) except json.JSONDecodeError: return "Error: Unable to parse step analysis data." elif not isinstance(analysis, dict): return "Error: Step analysis data is in an unexpected format." details = f"# Step {step_index + 1} Details\n\n" details += f"## Description\n{analysis.get('description', 'No description available.')}\n\n" details += f"## Assessment\n{analysis.get('assessment', 'No assessment available.')}\n\n" return details def format_call_info(call, call_index): call_data = call['call_data'] analysis = call['analysis'] def format_json(obj): # if isinstance(obj, dict) and 'choices' in obj: # # Special handling for message content # formatted_content = format_message_content(obj['choices'][0]) # return f'
{formatted_content}
' # else: json_str = json.dumps(obj, indent=2) json_str = json_str.replace(' ', ' ') json_str = json_str.replace('\n', '
') return f'
{json_str}
' # Currently not used but we can enable it to format message content def format_message_content(content): # Convert Markdown to HTML html_content = markdown.markdown(content) # Replace ``` code blocks with styled pre blocks html_content = re.sub(r'```python\n(.*?)```', lambda m: f'
{m.group(1)}
', html_content, flags=re.DOTALL) return html_content formatted_info = f"""

Step {call_index+1}: {analysis.get('step_outline', 'N/A')}

Call Metadata

Inputs

{format_json(call_data['inputs'])}

Outputs

{format_json(call_data['outputs'])}

Usage

{format_json(call_data['summary'])}

Analysis

""" return formatted_info def update_call_details(task_id, call_index): if task_id not in analyzed_traces or call_index is None: return "Please select a task and step to view details." calls = analyzed_traces[task_id]['steps'] if isinstance(call_index, tuple): call_index = call_index[1] if call_index < 0 or call_index >= len(calls): return f"Invalid call index: {call_index}" call = calls[call_index] return format_call_info(call, call_index) with gr.Blocks() as demo: gr.Markdown(""" # 🥇 Agent Leaderboard """) with gr.Tabs(): with gr.Tab("SWE-Bench"): with gr.Row(): with gr.Column(scale=1): scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals"), 'swebench_lite'), "results_total_cost", "results_accuracy", "Cost (in USD)", "Accuracy", ["agent_name"])) with gr.Column(scale=1): Leaderboard( value=parse_json_files(os.path.join(abs_path, "evals"), 'swebench_lite'), select_columns=SelectColumns( default_selection=config.SWEBENCH_ON_LOAD_COLUMNS, cant_deselect=["agent_name"], label="Select Columns to Display:", ), search_columns=config.SWEBENCH_SEARCH_COLUMNS, column_widths={"agent_name": 40, "results_accuracy": 20, "results_total_cost": 20}, ) with gr.Tab("USACO"): with gr.Row(): with gr.Column(scale=1): scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals"), 'usaco'), "results_total_cost", "results_accuracy", "Cost", "Accuracy", ["agent_name"])) with gr.Column(scale=1): Leaderboard( value=parse_json_files(os.path.join(abs_path, "evals"), 'usaco'), select_columns=SelectColumns( default_selection=config.USACO_ON_LOAD_COLUMNS, cant_deselect=["agent_name"], label="Select Columns to Display:", ), search_columns=config.USACO_SEARCH_COLUMNS, column_widths={"agent_name": 40, "results_accuracy": 20, "results_total_cost": 20}, ) gr.Markdown("## Agent Monitor") with gr.Row(): with gr.Column(scale=1): task_dropdown = gr.Dropdown(choices=list(analyzed_traces.keys()), label="Select USACO Task") task_overview = gr.Markdown() with gr.Column(scale=1): steps_dropdown = gr.Dropdown(label="Agent Steps") step_details = gr.Markdown() with gr.Row(): flow_chart = gr.Plot(label="Task Flow") task_dropdown.change(update_task_analysis, inputs=[task_dropdown], outputs=[task_overview, flow_chart, steps_dropdown, step_details]) steps_dropdown.change(update_step_details, inputs=[task_dropdown, steps_dropdown], outputs=[step_details]) gr.Markdown("## Raw Predictions") with gr.Row(): with gr.Column(scale=1): task_dropdown = gr.Dropdown(choices=list(analyzed_traces.keys()), label="Select USACO Task") with gr.Column(scale=1): call_dropdown = gr.Dropdown(label="Select Call") with gr.Row(): call_details = gr.HTML() def update_call_dropdown(task_id): calls = analyzed_traces.get(task_id, []) return gr.Dropdown(choices=[(f"Call {i+1}", i) for i in range(len(calls))]) task_dropdown.change(update_call_dropdown, inputs=[task_dropdown], outputs=[call_dropdown]) call_dropdown.change(update_call_details, inputs=[task_dropdown, call_dropdown], outputs=[call_details]) with gr.Tab("About"): gr.Markdown((Path(__file__).parent / "about.md").read_text()) if __name__ == "__main__": # Download the results from the Hugging Face Hub download_latest_results() scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", hours=1) # restarted every 1h scheduler.add_job(download_latest_results, "interval", hours=1) # download latest results every 1h scheduler.start() demo.launch()