kakumusic's picture
Upload folder using huggingface_hub
b225a21 verified
raw
history blame
17 kB
import glob
import json
import logging
import os
import subprocess
import sys
import tempfile
from collections import deque
from pathlib import Path
from typing import Annotated, Any, ClassVar, Iterator, Literal, Optional
import pytest
from agent_protocol_client import AgentApi, ApiClient
from agent_protocol_client import Configuration as ClientConfig
from agent_protocol_client import Step
from colorama import Fore, Style
from openai import _load_client as get_openai_client
from pydantic import (
BaseModel,
Field,
StringConstraints,
ValidationInfo,
field_validator,
)
from agbenchmark.agent_api_interface import download_agent_artifacts_into_folder
from agbenchmark.agent_interface import copy_challenge_artifacts_into_workspace
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.utils.data_types import Category, DifficultyLevel, EvalResult
from agbenchmark.utils.prompts import (
END_PROMPT,
FEW_SHOT_EXAMPLES,
PROMPT_MAP,
SCORING_MAP,
)
from .base import BaseChallenge, ChallengeInfo
logger = logging.getLogger(__name__)
with open(Path(__file__).parent / "optional_categories.json") as f:
OPTIONAL_CATEGORIES: list[str] = json.load(f)["optional_categories"]
class BuiltinChallengeSpec(BaseModel):
eval_id: str = ""
name: str
task: str
category: list[Category]
dependencies: list[str]
cutoff: int
class Info(BaseModel):
difficulty: DifficultyLevel
description: Annotated[
str, StringConstraints(pattern=r"^Tests if the agent can.*")
]
side_effects: list[str] = Field(default_factory=list)
info: Info
class Ground(BaseModel):
answer: str
should_contain: Optional[list[str]] = None
should_not_contain: Optional[list[str]] = None
files: list[str]
case_sensitive: Optional[bool] = True
class Eval(BaseModel):
type: str
scoring: Optional[Literal["percentage", "scale", "binary"]] = None
template: Optional[
Literal["rubric", "reference", "question", "custom"]
] = None
examples: Optional[str] = None
@field_validator("scoring", "template")
def validate_eval_fields(cls, value, info: ValidationInfo):
field_name = info.field_name
if "type" in info.data and info.data["type"] == "llm":
if value is None:
raise ValueError(
f"{field_name} must be provided when eval type is 'llm'"
)
else:
if value is not None:
raise ValueError(
f"{field_name} should only exist when eval type is 'llm'"
)
return value
eval: Eval
ground: Ground
metadata: Optional[dict[str, Any]] = None
spec_file: Path | None = Field(None, exclude=True)
class BuiltinChallenge(BaseChallenge):
"""
Base class for AGBenchmark's built-in challenges (challenges/**/*.json).
All of the logic is present in this class. Individual challenges are created as
subclasses of `BuiltinChallenge` with challenge-specific values assigned to the
ClassVars `_spec` etc.
Dynamically constructing subclasses rather than class instances for the individual
challenges makes them suitable for collection by Pytest, which will run their
`test_method` like any regular test item.
"""
_spec: ClassVar[BuiltinChallengeSpec]
CHALLENGE_LOCATION: ClassVar[str]
ARTIFACTS_LOCATION: ClassVar[str]
SOURCE_URI_PREFIX = "__BUILTIN__"
@classmethod
def from_challenge_spec(
cls, spec: BuiltinChallengeSpec
) -> type["BuiltinChallenge"]:
if not spec.spec_file:
raise ValueError("spec.spec_file not defined")
challenge_info = ChallengeInfo(
eval_id=spec.eval_id,
name=spec.name,
task=spec.task,
task_artifacts_dir=spec.spec_file.parent,
category=spec.category,
difficulty=spec.info.difficulty,
description=spec.info.description,
dependencies=spec.dependencies,
reference_answer=spec.ground.answer,
source_uri=(
f"__BUILTIN__/{spec.spec_file.relative_to(Path(__file__).parent)}"
),
)
challenge_class_name = f"Test{challenge_info.name}"
logger.debug(f"Creating {challenge_class_name} from spec: {spec.spec_file}")
return type(
challenge_class_name,
(BuiltinChallenge,),
{
"info": challenge_info,
"_spec": spec,
"CHALLENGE_LOCATION": str(spec.spec_file),
"ARTIFACTS_LOCATION": str(spec.spec_file.resolve().parent),
},
)
@classmethod
def from_challenge_spec_file(cls, spec_file: Path) -> type["BuiltinChallenge"]:
challenge_spec = BuiltinChallengeSpec.model_validate_json(spec_file.read_text())
challenge_spec.spec_file = spec_file
return cls.from_challenge_spec(challenge_spec)
@classmethod
def from_source_uri(cls, source_uri: str) -> type["BuiltinChallenge"]:
if not source_uri.startswith(cls.SOURCE_URI_PREFIX):
raise ValueError(f"Invalid source_uri for BuiltinChallenge: {source_uri}")
path = source_uri.split("/", 1)[1]
spec_file = Path(__file__).parent / path
return cls.from_challenge_spec_file(spec_file)
@pytest.mark.asyncio
async def test_method(
self,
config: AgentBenchmarkConfig,
request: pytest.FixtureRequest,
i_attempt: int,
) -> None:
# if os.environ.get("HELICONE_API_KEY"):
# from helicone.lock import HeliconeLockManager
# HeliconeLockManager.write_custom_property("challenge", self.info.name)
timeout = self._spec.cutoff or 60
if request.config.getoption("--nc"):
timeout = 100000
elif cutoff := request.config.getoption("--cutoff"):
timeout = int(cutoff) # type: ignore
task_id = ""
n_steps = 0
timed_out = None
agent_task_cost = None
steps: list[Step] = []
try:
async for step in self.run_challenge(
config, timeout, mock=bool(request.config.getoption("--mock"))
):
if not task_id:
task_id = step.task_id
n_steps += 1
steps.append(step.model_copy())
if step.additional_output:
agent_task_cost = step.additional_output.get(
"task_total_cost",
step.additional_output.get("task_cumulative_cost"),
)
timed_out = False
except TimeoutError:
timed_out = True
assert isinstance(request.node, pytest.Item)
request.node.user_properties.append(("steps", steps))
request.node.user_properties.append(("n_steps", n_steps))
request.node.user_properties.append(("timed_out", timed_out))
request.node.user_properties.append(("agent_task_cost", agent_task_cost))
agent_client_config = ClientConfig(host=config.host)
async with ApiClient(agent_client_config) as api_client:
api_instance = AgentApi(api_client)
eval_results = await self.evaluate_task_state(api_instance, task_id)
if not eval_results:
if timed_out:
raise TimeoutError("Timed out, no results to evaluate")
else:
raise ValueError("No results to evaluate")
request.node.user_properties.append(
(
"answers",
[r.result for r in eval_results]
if request.config.getoption("--keep-answers")
else None,
)
)
request.node.user_properties.append(("scores", [r.score for r in eval_results]))
# FIXME: this allows partial failure
assert any(r.passed for r in eval_results), (
f"No passed evals: {eval_results}"
if not timed_out
else f"Timed out; no passed evals: {eval_results}"
)
@classmethod
async def evaluate_task_state(
cls, agent: AgentApi, task_id: str
) -> list[EvalResult]:
with tempfile.TemporaryDirectory() as workspace:
workspace = Path(workspace)
await download_agent_artifacts_into_folder(agent, task_id, workspace)
if cls.info.task_artifacts_dir:
copy_challenge_artifacts_into_workspace(
cls.info.task_artifacts_dir, "custom_python", workspace
)
return list(cls.evaluate_workspace_content(workspace))
@classmethod
def evaluate_workspace_content(cls, workspace: Path) -> Iterator[EvalResult]:
result_ground = cls._spec.ground
outputs_for_eval = cls.get_outputs_for_eval(workspace, result_ground)
if result_ground.should_contain or result_ground.should_not_contain:
for source, content in outputs_for_eval:
score = cls.score_result(content, result_ground)
if score is not None:
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", score)
yield EvalResult(
result=content,
result_source=str(source),
score=score,
passed=score > 0.9, # FIXME: arbitrary threshold
)
if result_ground.eval.type in ("python", "pytest"):
for py_file, output in outputs_for_eval:
yield EvalResult(
result=output,
result_source=str(py_file),
score=float(not output.startswith("Error:")),
passed=not output.startswith("Error:"),
)
if result_ground.eval.type == "llm":
combined_results = "\n".join(output[1] for output in outputs_for_eval)
llm_eval = cls.score_result_with_llm(combined_results, result_ground)
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", llm_eval)
if result_ground.eval.scoring == "percentage":
score = llm_eval / 100
elif result_ground.eval.scoring == "scale":
score = llm_eval / 10
else:
score = llm_eval
yield EvalResult(
result=combined_results,
result_source=", ".join(str(res[0]) for res in outputs_for_eval),
score=score,
passed=score > 0.9, # FIXME: arbitrary threshold
)
@staticmethod
def get_outputs_for_eval(
workspace: str | Path | dict[str, str], ground: BuiltinChallengeSpec.Ground
) -> Iterator[tuple[str | Path, str]]:
if isinstance(workspace, dict):
workspace = workspace["output"]
script_dir = workspace
for file_pattern in ground.files:
# Check if it is a file extension
if file_pattern.startswith("."):
# Find all files with the given extension in the workspace
matching_files = glob.glob(os.path.join(script_dir, "*" + file_pattern))
else:
# Otherwise, it is a specific file
matching_files = [os.path.join(script_dir, file_pattern)]
logger.debug(
f"Files to evaluate for pattern `{file_pattern}`: {matching_files}"
)
for file_path in matching_files:
relative_file_path = Path(file_path).relative_to(workspace)
logger.debug(
f"Evaluating {relative_file_path} "
f"(eval type: {ground.eval.type})..."
)
if ground.eval.type == "python":
result = subprocess.run(
[sys.executable, file_path],
cwd=os.path.abspath(workspace),
capture_output=True,
text=True,
)
if "error" in result.stderr or result.returncode != 0:
yield relative_file_path, f"Error: {result.stderr}\n"
else:
yield relative_file_path, f"Output: {result.stdout}\n"
else:
with open(file_path, "r") as f:
yield relative_file_path, f.read()
else:
if ground.eval.type == "pytest":
result = subprocess.run(
[sys.executable, "-m", "pytest"],
cwd=os.path.abspath(workspace),
capture_output=True,
text=True,
)
logger.debug(f"EXIT CODE: {result.returncode}")
logger.debug(f"STDOUT: {result.stdout}")
logger.debug(f"STDERR: {result.stderr}")
if "error" in result.stderr or result.returncode != 0:
yield "pytest", f"Error: {result.stderr.strip() or result.stdout}\n"
else:
yield "pytest", f"Output: {result.stdout}\n"
@staticmethod
def score_result(content: str, ground: BuiltinChallengeSpec.Ground) -> float | None:
print(f"{Fore.BLUE}Scoring content:{Style.RESET_ALL}", content)
if ground.should_contain:
for should_contain_word in ground.should_contain:
if not ground.case_sensitive:
should_contain_word = should_contain_word.lower()
content = content.lower()
print_content = (
f"{Fore.BLUE}Word that should exist{Style.RESET_ALL}"
f" - {should_contain_word}:"
)
if should_contain_word not in content:
print(print_content, "False")
return 0.0
else:
print(print_content, "True")
return 1.0
if ground.should_not_contain:
for should_not_contain_word in ground.should_not_contain:
if not ground.case_sensitive:
should_not_contain_word = should_not_contain_word.lower()
content = content.lower()
print_content = (
f"{Fore.BLUE}Word that should not exist{Style.RESET_ALL}"
f" - {should_not_contain_word}:"
)
if should_not_contain_word in content:
print(print_content, "False")
return 0.0
else:
print(print_content, "True")
return 1.0
@classmethod
def score_result_with_llm(
cls, content: str, ground: BuiltinChallengeSpec.Ground, *, mock: bool = False
) -> float:
if mock:
return 1.0
# the validation for this is done in the Eval BaseModel
scoring = SCORING_MAP[ground.eval.scoring] # type: ignore
prompt = PROMPT_MAP[ground.eval.template].format( # type: ignore
task=cls._spec.task, scoring=scoring, answer=ground.answer, response=content
)
if ground.eval.examples:
prompt += FEW_SHOT_EXAMPLES.format(examples=ground.eval.examples)
prompt += END_PROMPT
answer = get_openai_client().chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": prompt},
],
)
return float(answer.choices[0].message.content) # type: ignore
def load_builtin_challenges() -> Iterator[type[BuiltinChallenge]]:
logger.info("Loading built-in challenges...")
challenges_path = Path(__file__).parent
logger.debug(f"Looking for challenge spec files in {challenges_path}...")
json_files = deque(challenges_path.rglob("data.json"))
logger.debug(f"Found {len(json_files)} built-in challenges.")
loaded, ignored = 0, 0
while json_files:
# Take and remove the first element from json_files
json_file = json_files.popleft()
if _challenge_should_be_ignored(json_file):
ignored += 1
continue
challenge = BuiltinChallenge.from_challenge_spec_file(json_file)
logger.debug(f"Generated test for {challenge.info.name}")
yield challenge
loaded += 1
logger.info(
f"Loading built-in challenges complete: loaded {loaded}, ignored {ignored}."
)
def _challenge_should_be_ignored(json_file_path: Path):
return (
"challenges/deprecated" in json_file_path.as_posix()
or "challenges/library" in json_file_path.as_posix()
)