import glob import json import logging import os import subprocess import sys import tempfile from collections import deque from pathlib import Path from typing import Annotated, Any, ClassVar, Iterator, Literal, Optional import pytest from agent_protocol_client import AgentApi, ApiClient from agent_protocol_client import Configuration as ClientConfig from agent_protocol_client import Step from colorama import Fore, Style from openai import _load_client as get_openai_client from pydantic import ( BaseModel, Field, StringConstraints, ValidationInfo, field_validator, ) from agbenchmark.agent_api_interface import download_agent_artifacts_into_folder from agbenchmark.agent_interface import copy_challenge_artifacts_into_workspace from agbenchmark.config import AgentBenchmarkConfig from agbenchmark.utils.data_types import Category, DifficultyLevel, EvalResult from agbenchmark.utils.prompts import ( END_PROMPT, FEW_SHOT_EXAMPLES, PROMPT_MAP, SCORING_MAP, ) from .base import BaseChallenge, ChallengeInfo logger = logging.getLogger(__name__) with open(Path(__file__).parent / "optional_categories.json") as f: OPTIONAL_CATEGORIES: list[str] = json.load(f)["optional_categories"] class BuiltinChallengeSpec(BaseModel): eval_id: str = "" name: str task: str category: list[Category] dependencies: list[str] cutoff: int class Info(BaseModel): difficulty: DifficultyLevel description: Annotated[ str, StringConstraints(pattern=r"^Tests if the agent can.*") ] side_effects: list[str] = Field(default_factory=list) info: Info class Ground(BaseModel): answer: str should_contain: Optional[list[str]] = None should_not_contain: Optional[list[str]] = None files: list[str] case_sensitive: Optional[bool] = True class Eval(BaseModel): type: str scoring: Optional[Literal["percentage", "scale", "binary"]] = None template: Optional[ Literal["rubric", "reference", "question", "custom"] ] = None examples: Optional[str] = None @field_validator("scoring", "template") def validate_eval_fields(cls, value, info: ValidationInfo): field_name = info.field_name if "type" in info.data and info.data["type"] == "llm": if value is None: raise ValueError( f"{field_name} must be provided when eval type is 'llm'" ) else: if value is not None: raise ValueError( f"{field_name} should only exist when eval type is 'llm'" ) return value eval: Eval ground: Ground metadata: Optional[dict[str, Any]] = None spec_file: Path | None = Field(None, exclude=True) class BuiltinChallenge(BaseChallenge): """ Base class for AGBenchmark's built-in challenges (challenges/**/*.json). All of the logic is present in this class. Individual challenges are created as subclasses of `BuiltinChallenge` with challenge-specific values assigned to the ClassVars `_spec` etc. Dynamically constructing subclasses rather than class instances for the individual challenges makes them suitable for collection by Pytest, which will run their `test_method` like any regular test item. """ _spec: ClassVar[BuiltinChallengeSpec] CHALLENGE_LOCATION: ClassVar[str] ARTIFACTS_LOCATION: ClassVar[str] SOURCE_URI_PREFIX = "__BUILTIN__" @classmethod def from_challenge_spec( cls, spec: BuiltinChallengeSpec ) -> type["BuiltinChallenge"]: if not spec.spec_file: raise ValueError("spec.spec_file not defined") challenge_info = ChallengeInfo( eval_id=spec.eval_id, name=spec.name, task=spec.task, task_artifacts_dir=spec.spec_file.parent, category=spec.category, difficulty=spec.info.difficulty, description=spec.info.description, dependencies=spec.dependencies, reference_answer=spec.ground.answer, source_uri=( f"__BUILTIN__/{spec.spec_file.relative_to(Path(__file__).parent)}" ), ) challenge_class_name = f"Test{challenge_info.name}" logger.debug(f"Creating {challenge_class_name} from spec: {spec.spec_file}") return type( challenge_class_name, (BuiltinChallenge,), { "info": challenge_info, "_spec": spec, "CHALLENGE_LOCATION": str(spec.spec_file), "ARTIFACTS_LOCATION": str(spec.spec_file.resolve().parent), }, ) @classmethod def from_challenge_spec_file(cls, spec_file: Path) -> type["BuiltinChallenge"]: challenge_spec = BuiltinChallengeSpec.model_validate_json(spec_file.read_text()) challenge_spec.spec_file = spec_file return cls.from_challenge_spec(challenge_spec) @classmethod def from_source_uri(cls, source_uri: str) -> type["BuiltinChallenge"]: if not source_uri.startswith(cls.SOURCE_URI_PREFIX): raise ValueError(f"Invalid source_uri for BuiltinChallenge: {source_uri}") path = source_uri.split("/", 1)[1] spec_file = Path(__file__).parent / path return cls.from_challenge_spec_file(spec_file) @pytest.mark.asyncio async def test_method( self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest, i_attempt: int, ) -> None: # if os.environ.get("HELICONE_API_KEY"): # from helicone.lock import HeliconeLockManager # HeliconeLockManager.write_custom_property("challenge", self.info.name) timeout = self._spec.cutoff or 60 if request.config.getoption("--nc"): timeout = 100000 elif cutoff := request.config.getoption("--cutoff"): timeout = int(cutoff) # type: ignore task_id = "" n_steps = 0 timed_out = None agent_task_cost = None steps: list[Step] = [] try: async for step in self.run_challenge( config, timeout, mock=bool(request.config.getoption("--mock")) ): if not task_id: task_id = step.task_id n_steps += 1 steps.append(step.model_copy()) if step.additional_output: agent_task_cost = step.additional_output.get( "task_total_cost", step.additional_output.get("task_cumulative_cost"), ) timed_out = False except TimeoutError: timed_out = True assert isinstance(request.node, pytest.Item) request.node.user_properties.append(("steps", steps)) request.node.user_properties.append(("n_steps", n_steps)) request.node.user_properties.append(("timed_out", timed_out)) request.node.user_properties.append(("agent_task_cost", agent_task_cost)) agent_client_config = ClientConfig(host=config.host) async with ApiClient(agent_client_config) as api_client: api_instance = AgentApi(api_client) eval_results = await self.evaluate_task_state(api_instance, task_id) if not eval_results: if timed_out: raise TimeoutError("Timed out, no results to evaluate") else: raise ValueError("No results to evaluate") request.node.user_properties.append( ( "answers", [r.result for r in eval_results] if request.config.getoption("--keep-answers") else None, ) ) request.node.user_properties.append(("scores", [r.score for r in eval_results])) # FIXME: this allows partial failure assert any(r.passed for r in eval_results), ( f"No passed evals: {eval_results}" if not timed_out else f"Timed out; no passed evals: {eval_results}" ) @classmethod async def evaluate_task_state( cls, agent: AgentApi, task_id: str ) -> list[EvalResult]: with tempfile.TemporaryDirectory() as workspace: workspace = Path(workspace) await download_agent_artifacts_into_folder(agent, task_id, workspace) if cls.info.task_artifacts_dir: copy_challenge_artifacts_into_workspace( cls.info.task_artifacts_dir, "custom_python", workspace ) return list(cls.evaluate_workspace_content(workspace)) @classmethod def evaluate_workspace_content(cls, workspace: Path) -> Iterator[EvalResult]: result_ground = cls._spec.ground outputs_for_eval = cls.get_outputs_for_eval(workspace, result_ground) if result_ground.should_contain or result_ground.should_not_contain: for source, content in outputs_for_eval: score = cls.score_result(content, result_ground) if score is not None: print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", score) yield EvalResult( result=content, result_source=str(source), score=score, passed=score > 0.9, # FIXME: arbitrary threshold ) if result_ground.eval.type in ("python", "pytest"): for py_file, output in outputs_for_eval: yield EvalResult( result=output, result_source=str(py_file), score=float(not output.startswith("Error:")), passed=not output.startswith("Error:"), ) if result_ground.eval.type == "llm": combined_results = "\n".join(output[1] for output in outputs_for_eval) llm_eval = cls.score_result_with_llm(combined_results, result_ground) print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", llm_eval) if result_ground.eval.scoring == "percentage": score = llm_eval / 100 elif result_ground.eval.scoring == "scale": score = llm_eval / 10 else: score = llm_eval yield EvalResult( result=combined_results, result_source=", ".join(str(res[0]) for res in outputs_for_eval), score=score, passed=score > 0.9, # FIXME: arbitrary threshold ) @staticmethod def get_outputs_for_eval( workspace: str | Path | dict[str, str], ground: BuiltinChallengeSpec.Ground ) -> Iterator[tuple[str | Path, str]]: if isinstance(workspace, dict): workspace = workspace["output"] script_dir = workspace for file_pattern in ground.files: # Check if it is a file extension if file_pattern.startswith("."): # Find all files with the given extension in the workspace matching_files = glob.glob(os.path.join(script_dir, "*" + file_pattern)) else: # Otherwise, it is a specific file matching_files = [os.path.join(script_dir, file_pattern)] logger.debug( f"Files to evaluate for pattern `{file_pattern}`: {matching_files}" ) for file_path in matching_files: relative_file_path = Path(file_path).relative_to(workspace) logger.debug( f"Evaluating {relative_file_path} " f"(eval type: {ground.eval.type})..." ) if ground.eval.type == "python": result = subprocess.run( [sys.executable, file_path], cwd=os.path.abspath(workspace), capture_output=True, text=True, ) if "error" in result.stderr or result.returncode != 0: yield relative_file_path, f"Error: {result.stderr}\n" else: yield relative_file_path, f"Output: {result.stdout}\n" else: with open(file_path, "r") as f: yield relative_file_path, f.read() else: if ground.eval.type == "pytest": result = subprocess.run( [sys.executable, "-m", "pytest"], cwd=os.path.abspath(workspace), capture_output=True, text=True, ) logger.debug(f"EXIT CODE: {result.returncode}") logger.debug(f"STDOUT: {result.stdout}") logger.debug(f"STDERR: {result.stderr}") if "error" in result.stderr or result.returncode != 0: yield "pytest", f"Error: {result.stderr.strip() or result.stdout}\n" else: yield "pytest", f"Output: {result.stdout}\n" @staticmethod def score_result(content: str, ground: BuiltinChallengeSpec.Ground) -> float | None: print(f"{Fore.BLUE}Scoring content:{Style.RESET_ALL}", content) if ground.should_contain: for should_contain_word in ground.should_contain: if not ground.case_sensitive: should_contain_word = should_contain_word.lower() content = content.lower() print_content = ( f"{Fore.BLUE}Word that should exist{Style.RESET_ALL}" f" - {should_contain_word}:" ) if should_contain_word not in content: print(print_content, "False") return 0.0 else: print(print_content, "True") return 1.0 if ground.should_not_contain: for should_not_contain_word in ground.should_not_contain: if not ground.case_sensitive: should_not_contain_word = should_not_contain_word.lower() content = content.lower() print_content = ( f"{Fore.BLUE}Word that should not exist{Style.RESET_ALL}" f" - {should_not_contain_word}:" ) if should_not_contain_word in content: print(print_content, "False") return 0.0 else: print(print_content, "True") return 1.0 @classmethod def score_result_with_llm( cls, content: str, ground: BuiltinChallengeSpec.Ground, *, mock: bool = False ) -> float: if mock: return 1.0 # the validation for this is done in the Eval BaseModel scoring = SCORING_MAP[ground.eval.scoring] # type: ignore prompt = PROMPT_MAP[ground.eval.template].format( # type: ignore task=cls._spec.task, scoring=scoring, answer=ground.answer, response=content ) if ground.eval.examples: prompt += FEW_SHOT_EXAMPLES.format(examples=ground.eval.examples) prompt += END_PROMPT answer = get_openai_client().chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": prompt}, ], ) return float(answer.choices[0].message.content) # type: ignore def load_builtin_challenges() -> Iterator[type[BuiltinChallenge]]: logger.info("Loading built-in challenges...") challenges_path = Path(__file__).parent logger.debug(f"Looking for challenge spec files in {challenges_path}...") json_files = deque(challenges_path.rglob("data.json")) logger.debug(f"Found {len(json_files)} built-in challenges.") loaded, ignored = 0, 0 while json_files: # Take and remove the first element from json_files json_file = json_files.popleft() if _challenge_should_be_ignored(json_file): ignored += 1 continue challenge = BuiltinChallenge.from_challenge_spec_file(json_file) logger.debug(f"Generated test for {challenge.info.name}") yield challenge loaded += 1 logger.info( f"Loading built-in challenges complete: loaded {loaded}, ignored {ignored}." ) def _challenge_should_be_ignored(json_file_path: Path): return ( "challenges/deprecated" in json_file_path.as_posix() or "challenges/library" in json_file_path.as_posix() )