Spaces:
Running
Running
import gradio as gr | |
from gradio_leaderboard import Leaderboard, SelectColumns, ColumnFilter | |
import config | |
from envs import RESULTS_REPO_ID, REPO_ID, API, HF_TOKEN | |
from pathlib import Path | |
import pandas as pd | |
import os | |
import json | |
from utils import parse_json_files, create_scatter_plot, create_flow_chart | |
from huggingface_hub import snapshot_download | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from datetime import datetime | |
import json | |
import re | |
import markdown | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID, token=HF_TOKEN) | |
# New function to download results | |
def download_latest_results(): | |
print("Downloading latest results...") | |
snapshot_download(RESULTS_REPO_ID, | |
local_dir=abs_path / "evals", | |
repo_type='dataset', | |
tqdm_class=None, | |
etag_timeout=30, | |
max_workers=4, | |
) | |
print("Download complete.") | |
abs_path = Path(__file__).parent | |
# load task_analyses.json from evals/usaco_traces folder | |
with open(os.path.join(abs_path, "evals", "usaco_traces", "task_analyses.json"), "r") as f: | |
analyzed_traces = json.load(f) | |
def update_task_analysis(task_id): | |
if task_id not in analyzed_traces: | |
return "No analysis available for this task.", None, [], "" | |
analysis = analyzed_traces[task_id] | |
summary = analysis['summary'] | |
if isinstance(summary, str): | |
try: | |
summary = json.loads(summary) | |
except json.JSONDecodeError: | |
return "Error: Unable to parse summary data.", None, [], "" | |
elif not isinstance(summary, dict): | |
return "Error: Summary data is in an unexpected format.", None, [], "" | |
overview = f"# Task Overview\n\n{summary.get('overview', 'No overview available.')}\n\n" | |
overview += f"## Successes\n{summary.get('successes', 'No successes listed.')}\n\n" | |
overview += f"## Challenges\n{summary.get('challenges', 'No challenges listed.')}\n\n" | |
steps = [(f"Step {i+1}", i) for i in range(len(analysis['steps']))] | |
flow_chart = create_flow_chart(analysis['steps']) | |
return overview, flow_chart, gr.Dropdown(choices=steps, label="Agent Steps"), "" | |
def update_step_details(task_id, step_index): | |
if task_id not in analyzed_traces: | |
return "No analysis available for this task." | |
if step_index is None: | |
return "Please select a step to view details." | |
steps = analyzed_traces[task_id]['steps'] | |
if isinstance(step_index, tuple): | |
step_index = step_index[1] | |
elif isinstance(step_index, str): | |
step_index = int(step_index.split()[-1]) - 1 | |
if step_index < 0 or step_index >= len(steps): | |
return f"Invalid step index: {step_index}" | |
step = steps[step_index] | |
analysis = step['analysis'] | |
if isinstance(analysis, str): | |
try: | |
analysis = json.loads(analysis) | |
except json.JSONDecodeError: | |
return "Error: Unable to parse step analysis data." | |
elif not isinstance(analysis, dict): | |
return "Error: Step analysis data is in an unexpected format." | |
details = f"# Step {step_index + 1} Details\n\n" | |
details += f"## Description\n{analysis.get('description', 'No description available.')}\n\n" | |
details += f"## Assessment\n{analysis.get('assessment', 'No assessment available.')}\n\n" | |
return details | |
def format_call_info(call, call_index): | |
call_data = call['call_data'] | |
analysis = call['analysis'] | |
def format_json(obj): | |
# if isinstance(obj, dict) and 'choices' in obj: | |
# # Special handling for message content | |
# formatted_content = format_message_content(obj['choices'][0]) | |
# return f'<div class="message-content">{formatted_content}</div>' | |
# else: | |
json_str = json.dumps(obj, indent=2) | |
json_str = json_str.replace(' ', ' ') | |
json_str = json_str.replace('\n', '<br>') | |
return f'<div class="json-wrapper">{json_str}</div>' | |
# Currently not used but we can enable it to format message content | |
def format_message_content(content): | |
# Convert Markdown to HTML | |
html_content = markdown.markdown(content) | |
# Replace ``` code blocks with styled pre blocks | |
html_content = re.sub(r'```python\n(.*?)```', lambda m: f'<pre class="code-block">{m.group(1)}</pre>', html_content, flags=re.DOTALL) | |
return html_content | |
formatted_info = f""" | |
<style> | |
.json-wrapper {{ | |
white-space: pre-wrap; | |
word-wrap: break-word; | |
font-family: monospace; | |
max-height: 300px; | |
overflow-y: auto; | |
background-color: #f5f5f5; | |
padding: 10px; | |
border-radius: 5px; | |
}} | |
.message-content {{ | |
white-space: normal; | |
word-wrap: break-word; | |
font-family: Arial, sans-serif; | |
max-height: 500px; | |
overflow-y: auto; | |
background-color: #ffffff; | |
padding: 10px; | |
border-radius: 5px; | |
border: 1px solid #e0e0e0; | |
}} | |
.code-block {{ | |
background-color: #f0f0f0; | |
padding: 10px; | |
border-radius: 5px; | |
font-family: monospace; | |
white-space: pre-wrap; | |
word-wrap: break-word; | |
}} | |
</style> | |
<h2>Step {call_index+1}: {analysis.get('step_outline', 'N/A')}</h2> | |
<h3>Call Metadata</h3> | |
<ul> | |
<li><strong>Weave Task ID:</strong> {call_data['weave_task_id']}</li> | |
<li><strong>Trace ID:</strong> {call_data['trace_id']}</li> | |
<li><strong>Project ID:</strong> {call_data['project_id']}</li> | |
<li><strong>Created Timestamp:</strong> {datetime.fromtimestamp(call_data['created_timestamp'])}</li> | |
<li><strong>Model:</strong> {call_data['inputs']['model']}</li> | |
</ul> | |
<h3>Inputs</h3> | |
{format_json(call_data['inputs'])} | |
<h3>Outputs</h3> | |
{format_json(call_data['outputs'])} | |
<h3>Usage</h3> | |
{format_json(call_data['summary'])} | |
<h3>Analysis</h3> | |
<ul> | |
<li><strong>Description:</strong> {analysis['description']}</li> | |
<li><strong>Assessment:</strong> {analysis['assessment']}</li> | |
<li><strong>Success:</strong> {analysis['success']}</li> | |
<li><strong>Action Type:</strong> {analysis['action_type']}</li> | |
</ul> | |
""" | |
return formatted_info | |
def update_call_details(task_id, call_index): | |
if task_id not in analyzed_traces or call_index is None: | |
return "Please select a task and step to view details." | |
calls = analyzed_traces[task_id]['steps'] | |
if isinstance(call_index, tuple): | |
call_index = call_index[1] | |
if call_index < 0 or call_index >= len(calls): | |
return f"Invalid call index: {call_index}" | |
call = calls[call_index] | |
return format_call_info(call, call_index) | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
# 🥇 Agent Leaderboard | |
""") | |
with gr.Tabs(): | |
with gr.Tab("SWE-Bench"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals"), 'swebench_lite'), "results_total_cost", "results_accuracy", "Cost (in USD)", "Accuracy", ["agent_name"])) | |
with gr.Column(scale=1): | |
Leaderboard( | |
value=parse_json_files(os.path.join(abs_path, "evals"), 'swebench_lite'), | |
select_columns=SelectColumns( | |
default_selection=config.SWEBENCH_ON_LOAD_COLUMNS, | |
cant_deselect=["agent_name"], | |
label="Select Columns to Display:", | |
), | |
search_columns=config.SWEBENCH_SEARCH_COLUMNS, | |
column_widths={"agent_name": 40, | |
"results_accuracy": 20, | |
"results_total_cost": 20}, | |
) | |
with gr.Tab("USACO"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals"), 'usaco'), "results_total_cost", "results_accuracy", "Cost", "Accuracy", ["agent_name"])) | |
with gr.Column(scale=1): | |
Leaderboard( | |
value=parse_json_files(os.path.join(abs_path, "evals"), 'usaco'), | |
select_columns=SelectColumns( | |
default_selection=config.USACO_ON_LOAD_COLUMNS, | |
cant_deselect=["agent_name"], | |
label="Select Columns to Display:", | |
), | |
search_columns=config.USACO_SEARCH_COLUMNS, | |
column_widths={"agent_name": 40, | |
"results_accuracy": 20, | |
"results_total_cost": 20}, | |
) | |
gr.Markdown("## Agent Monitor") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
task_dropdown = gr.Dropdown(choices=list(analyzed_traces.keys()), label="Select USACO Task") | |
task_overview = gr.Markdown() | |
with gr.Column(scale=1): | |
steps_dropdown = gr.Dropdown(label="Agent Steps") | |
step_details = gr.Markdown() | |
with gr.Row(): | |
flow_chart = gr.Plot(label="Task Flow") | |
task_dropdown.change(update_task_analysis, | |
inputs=[task_dropdown], | |
outputs=[task_overview, flow_chart, steps_dropdown, step_details]) | |
steps_dropdown.change(update_step_details, | |
inputs=[task_dropdown, steps_dropdown], | |
outputs=[step_details]) | |
gr.Markdown("## Raw Predictions") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
task_dropdown = gr.Dropdown(choices=list(analyzed_traces.keys()), label="Select USACO Task") | |
with gr.Column(scale=1): | |
call_dropdown = gr.Dropdown(label="Select Call") | |
with gr.Row(): | |
call_details = gr.HTML() | |
def update_call_dropdown(task_id): | |
calls = analyzed_traces.get(task_id, []) | |
return gr.Dropdown(choices=[(f"Call {i+1}", i) for i in range(len(calls))]) | |
task_dropdown.change(update_call_dropdown, | |
inputs=[task_dropdown], | |
outputs=[call_dropdown]) | |
call_dropdown.change(update_call_details, | |
inputs=[task_dropdown, call_dropdown], | |
outputs=[call_details]) | |
with gr.Tab("About"): | |
gr.Markdown((Path(__file__).parent / "about.md").read_text()) | |
if __name__ == "__main__": | |
# Download the results from the Hugging Face Hub | |
download_latest_results() | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, "interval", hours=1) # restarted every 1h | |
scheduler.add_job(download_latest_results, "interval", hours=1) # download latest results every 1h | |
scheduler.start() | |
demo.launch() |