|
from openai import OpenAI |
|
import anthropic |
|
from together import Together |
|
import cohere |
|
import json |
|
import re |
|
import os |
|
import requests |
|
from prompts import ( |
|
JUDGE_SYSTEM_PROMPT, |
|
PROMETHEUS_PROMPT, |
|
PROMETHEUS_PROMPT_WITH_REFERENCE, |
|
ATLA_PROMPT, |
|
ATLA_PROMPT_WITH_REFERENCE, |
|
FLOW_JUDGE_PROMPT |
|
) |
|
from transformers import AutoTokenizer |
|
|
|
|
|
anthropic_client = anthropic.Anthropic() |
|
openai_client = OpenAI() |
|
together_client = Together() |
|
hf_api_key = os.getenv("HF_API_KEY") |
|
flow_judge_api_key = os.getenv("FLOW_JUDGE_API_KEY") |
|
cohere_client = cohere.ClientV2(os.getenv("CO_API_KEY")) |
|
salesforce_api_key = os.getenv("SALESFORCE_API_KEY") |
|
def get_openai_response(model_name, prompt, system_prompt=JUDGE_SYSTEM_PROMPT, max_tokens=500, temperature=0): |
|
"""Get response from OpenAI API""" |
|
try: |
|
response = openai_client.chat.completions.create( |
|
model=model_name, |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": prompt}, |
|
], |
|
max_completion_tokens=max_tokens, |
|
temperature=temperature, |
|
) |
|
return response.choices[0].message.content |
|
except Exception as e: |
|
return f"Error with OpenAI model {model_name}: {str(e)}" |
|
|
|
def get_anthropic_response(model_name, prompt, system_prompt=JUDGE_SYSTEM_PROMPT, max_tokens=500, temperature=0): |
|
"""Get response from Anthropic API""" |
|
try: |
|
response = anthropic_client.messages.create( |
|
model=model_name, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
system=system_prompt, |
|
messages=[{"role": "user", "content": [{"type": "text", "text": prompt}]}], |
|
) |
|
return response.content[0].text |
|
except Exception as e: |
|
return f"Error with Anthropic model {model_name}: {str(e)}" |
|
|
|
def get_together_response(model_name, prompt, system_prompt=JUDGE_SYSTEM_PROMPT, max_tokens=500, temperature=0): |
|
"""Get response from Together API""" |
|
try: |
|
response = together_client.chat.completions.create( |
|
model=model_name, |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": prompt}, |
|
], |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
stream=False, |
|
) |
|
return response.choices[0].message.content |
|
except Exception as e: |
|
return f"Error with Together model {model_name}: {str(e)}" |
|
|
|
def get_prometheus_response(model_name, prompt, system_prompt=None, max_tokens=500, temperature=0.01): |
|
"""Get response from Hugging Face model""" |
|
try: |
|
headers = { |
|
"Accept": "application/json", |
|
"Authorization": f"Bearer {hf_api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
messages = [] |
|
if system_prompt: |
|
messages.append({"role": "system", "content": system_prompt}) |
|
messages.append({"role": "user", "content": prompt}) |
|
|
|
|
|
model_id = "prometheus-eval/prometheus-7b-v2.0" |
|
tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_api_key) |
|
formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
|
payload = { |
|
"inputs": formatted_prompt, |
|
"parameters": { |
|
"max_new_tokens": max_tokens, |
|
"return_full_text": False, |
|
"temperature": temperature |
|
} |
|
} |
|
|
|
response = requests.post( |
|
"https://otb7jglxy6r37af6.us-east-1.aws.endpoints.huggingface.cloud", |
|
headers=headers, |
|
json=payload |
|
) |
|
return response.json()[0]["generated_text"] |
|
except Exception as e: |
|
return f"Error with Hugging Face model {model_name}: {str(e)}" |
|
|
|
def get_atla_response(model_name, prompt, system_prompt=None, max_tokens=500, temperature=0.01): |
|
"""Get response from HF endpoint for Atla model""" |
|
try: |
|
headers = { |
|
"Accept": "application/json", |
|
"Authorization": f"Bearer {hf_api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
messages = [] |
|
if system_prompt: |
|
messages.append({"role": "system", "content": system_prompt}) |
|
messages.append({"role": "user", "content": prompt}) |
|
|
|
|
|
model_id = "AtlaAI/Selene-1-Mini-Llama-3.1-8B" |
|
tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_api_key) |
|
formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
|
payload = { |
|
"inputs": formatted_prompt, |
|
"parameters": { |
|
"max_new_tokens": max_tokens, |
|
"return_full_text": False, |
|
"temperature": temperature, |
|
"seed": 42, |
|
"add_generation_prompt": True |
|
} |
|
} |
|
|
|
response = requests.post( |
|
"https://bkp9p28gri93egqh.us-east-1.aws.endpoints.huggingface.cloud", |
|
headers=headers, |
|
json=payload |
|
) |
|
return response.json()[0]["generated_text"] |
|
except Exception as e: |
|
return f"Error with Atla model {model_name}: {str(e)}" |
|
|
|
def get_flow_judge_response(model_name, prompt, max_tokens=2048, temperature=0.1, top_p=0.95) -> str: |
|
"""Get response from Flow Judge""" |
|
try: |
|
response = requests.post( |
|
"https://arena.flow-ai.io/v1/chat/completions", |
|
headers={ |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {flow_judge_api_key}" |
|
}, |
|
json={ |
|
"model": model_name, |
|
"messages": [ |
|
{"role": "user", "content": prompt} |
|
], |
|
"max_tokens": max_tokens, |
|
"temperature": temperature, |
|
"top_p": top_p, |
|
"stop": None |
|
} |
|
) |
|
response.raise_for_status() |
|
return response.json()["choices"][0]['message']['content'] |
|
except Exception as e: |
|
return f"Error with Flow Judge completions model {model_name}: {str(e)}" |
|
|
|
def get_cohere_response(model_name, prompt, system_prompt=JUDGE_SYSTEM_PROMPT, max_tokens=500, temperature=0): |
|
"""Get response from Cohere API""" |
|
try: |
|
response = cohere_client.chat( |
|
model=model_name, |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": prompt} |
|
], |
|
max_tokens=max_tokens, |
|
temperature=temperature |
|
) |
|
|
|
content_items = response.message.content |
|
if isinstance(content_items, list): |
|
|
|
return content_items[0].text |
|
return str(content_items) |
|
except Exception as e: |
|
return f"Error with Cohere model {model_name}: {str(e)}" |
|
|
|
def get_salesforce_response(model_name, prompt, system_prompt=None, max_tokens=2048, temperature=0): |
|
"""Get response from Salesforce Research API""" |
|
try: |
|
headers = { |
|
'accept': 'application/json', |
|
"content-type": "application/json", |
|
"X-Api-Key": salesforce_api_key, |
|
} |
|
|
|
|
|
messages = [] |
|
messages.append({"role": "user", "content": prompt}) |
|
|
|
json_data = { |
|
"prompts": messages, |
|
"temperature": temperature, |
|
"top_p": 1, |
|
"max_tokens": max_tokens, |
|
} |
|
|
|
response = requests.post( |
|
'https://gateway.salesforceresearch.ai/sfr-judge/process', |
|
headers=headers, |
|
json=json_data |
|
) |
|
response.raise_for_status() |
|
return response.json()['result'][0] |
|
except Exception as e: |
|
return f"Error with Salesforce model {model_name}: {str(e)}" |
|
|
|
def get_model_response( |
|
model_name, |
|
model_info, |
|
prompt_data, |
|
use_reference=False, |
|
max_tokens=500, |
|
temperature=0 |
|
): |
|
"""Get response from appropriate API based on model organization""" |
|
if not model_info: |
|
return "Model not found or unsupported." |
|
|
|
api_model = model_info["api_model"] |
|
organization = model_info["organization"] |
|
|
|
|
|
is_prometheus = (organization == "Prometheus") |
|
is_atla = (organization == "Atla") |
|
is_flow_judge = (organization == "Flow AI") |
|
is_salesforce = (organization == "Salesforce") |
|
|
|
|
|
system_prompt = None if (is_prometheus or is_atla or is_flow_judge or is_salesforce) else JUDGE_SYSTEM_PROMPT |
|
|
|
|
|
if is_atla or is_salesforce: |
|
base_prompt = ATLA_PROMPT_WITH_REFERENCE if use_reference else ATLA_PROMPT |
|
elif is_flow_judge: |
|
base_prompt = FLOW_JUDGE_PROMPT |
|
else: |
|
base_prompt = PROMETHEUS_PROMPT_WITH_REFERENCE if use_reference else PROMETHEUS_PROMPT |
|
|
|
|
|
if not (is_prometheus or is_atla or is_flow_judge or is_salesforce): |
|
base_prompt = base_prompt.replace( |
|
'3. The output format should look as follows: "Feedback: (write a feedback for criteria) [RESULT] (an integer number between 1 and 5)"', |
|
'3. Your output format should strictly adhere to JSON as follows: {{"feedback": "<write feedback>", "result": <numerical score>}}. Ensure the output is valid JSON, without additional formatting or explanations.' |
|
) |
|
|
|
try: |
|
if not is_flow_judge: |
|
|
|
final_prompt = base_prompt.format( |
|
human_input=prompt_data['human_input'], |
|
ai_response=prompt_data['ai_response'], |
|
ground_truth_input=prompt_data.get('ground_truth_input', ''), |
|
eval_criteria=prompt_data['eval_criteria'] |
|
) |
|
else: |
|
human_input = f"<user_input>\n{prompt_data['human_input']}\n</user_input>" |
|
ai_response = f"<response>\n{prompt_data['ai_response']}\n</response>" |
|
ground_truth = prompt_data.get('ground_truth_input', '') |
|
if ground_truth: |
|
response_reference = f"<response_reference>\n{ground_truth}\n</response_reference>" |
|
else: |
|
response_reference = "" |
|
|
|
|
|
eval_criteria_lines = prompt_data['eval_criteria'].split('\n') |
|
rubric_lines = [line for line in eval_criteria_lines if line.startswith('Score ')] |
|
rubric = '\n'.join(f"- {line}" for line in rubric_lines) |
|
|
|
if response_reference: |
|
inputs = human_input + "\n" + response_reference |
|
else: |
|
inputs = human_input |
|
|
|
final_prompt = base_prompt.format( |
|
INPUTS=inputs, |
|
OUTPUT=ai_response, |
|
EVALUATION_CRITERIA=prompt_data['eval_criteria'], |
|
RUBRIC=rubric |
|
) |
|
|
|
except KeyError as e: |
|
return f"Error formatting prompt: Missing required field {str(e)}" |
|
|
|
try: |
|
if organization == "OpenAI": |
|
return get_openai_response( |
|
api_model, final_prompt, system_prompt, max_tokens, temperature |
|
) |
|
elif organization == "Anthropic": |
|
return get_anthropic_response( |
|
api_model, final_prompt, system_prompt, max_tokens, temperature |
|
) |
|
elif organization == "Prometheus": |
|
return get_prometheus_response( |
|
api_model, final_prompt, system_prompt, max_tokens, temperature = 0.01 |
|
) |
|
elif organization == "Atla": |
|
return get_atla_response( |
|
api_model, final_prompt, system_prompt, max_tokens, temperature = 0.01 |
|
) |
|
elif organization == "Cohere": |
|
return get_cohere_response( |
|
api_model, final_prompt, system_prompt, max_tokens, temperature |
|
) |
|
elif organization == "Flow AI": |
|
return get_flow_judge_response( |
|
api_model, final_prompt |
|
) |
|
elif organization == "Salesforce": |
|
response = get_salesforce_response( |
|
api_model, final_prompt, system_prompt, max_tokens, temperature |
|
) |
|
return response |
|
else: |
|
|
|
return get_together_response( |
|
api_model, final_prompt, system_prompt, max_tokens, temperature |
|
) |
|
except Exception as e: |
|
return f"Error with {organization} model {model_name}: {str(e)}" |
|
|
|
def parse_model_response(response): |
|
try: |
|
|
|
print(f"Raw model response: {response}") |
|
|
|
|
|
if isinstance(response, dict): |
|
return str(response.get("result", "N/A")), response.get("feedback", "N/A") |
|
|
|
|
|
try: |
|
data = json.loads(response) |
|
return str(data.get("result", "N/A")), data.get("feedback", "N/A") |
|
except json.JSONDecodeError: |
|
|
|
if "**Reasoning:**" in response or "**Result:**" in response: |
|
|
|
return atla_parse_model_response(response) |
|
|
|
|
|
json_match = re.search(r"{.*}", response, re.DOTALL) |
|
if json_match: |
|
data = json.loads(json_match.group(0)) |
|
return str(data.get("result", "N/A")), data.get("feedback", "N/A") |
|
else: |
|
return "Error", f"Invalid response format returned - here is the raw model response: {response}" |
|
|
|
except Exception as e: |
|
|
|
print(f"Failed to parse response: {str(e)}") |
|
|
|
|
|
try: |
|
error_json_match = re.search(r"{.*}", str(e), re.DOTALL) |
|
if error_json_match: |
|
data = json.loads(error_json_match.group(0)) |
|
return str(data.get("result", "N/A")), data.get("feedback", "N/A") |
|
except: |
|
pass |
|
|
|
return "Error", f"Failed to parse response: {response}" |
|
|
|
def prometheus_parse_model_response(output): |
|
try: |
|
print(f"Raw model response: {output}") |
|
output = output.strip() |
|
|
|
|
|
output = re.sub(r'^feedback:\s*', '', output, flags=re.IGNORECASE) |
|
|
|
|
|
begin_result_pattern = r'^\[RESULT\]\s*(\d+)\s*\n*(.*?)$' |
|
begin_match = re.search(begin_result_pattern, output, re.DOTALL | re.IGNORECASE) |
|
if begin_match: |
|
score = int(begin_match.group(1)) |
|
feedback = begin_match.group(2).strip() |
|
return str(score), feedback |
|
|
|
|
|
pattern = r"(.*?)\s*\[RESULT\]\s*[\(\[]?(\d+)[\)\]]?" |
|
match = re.search(pattern, output, re.DOTALL | re.IGNORECASE) |
|
if match: |
|
feedback = match.group(1).strip() |
|
score = int(match.group(2)) |
|
return str(score), feedback |
|
|
|
|
|
pattern = r"(.*?)\s*(?:Score|Result)\s*:\s*[\(\[]?(\d+)[\)\]]?" |
|
match = re.search(pattern, output, re.DOTALL | re.IGNORECASE) |
|
if match: |
|
feedback = match.group(1).strip() |
|
score = int(match.group(2)) |
|
return str(score), feedback |
|
|
|
|
|
pattern = r"(.*?)\s*\[(?:Score|Result)\s*[\(\[]?(\d+)[\)\]]?\]$" |
|
match = re.search(pattern, output, re.DOTALL) |
|
if match: |
|
feedback = match.group(1).strip() |
|
score = int(match.group(2)) |
|
return str(score), feedback |
|
|
|
|
|
pattern = r"[\(\[]?(\d+)[\)\]]?\s*\]?$" |
|
match = re.search(pattern, output) |
|
if match: |
|
score = int(match.group(1)) |
|
feedback = output[:match.start()].rstrip() |
|
|
|
feedback = re.sub(r'\s*\[[^\]]*$', '', feedback).strip() |
|
return str(score), feedback |
|
|
|
return "Error", f"Failed to parse response: {output}" |
|
|
|
except Exception as e: |
|
print(f"Failed to parse response: {str(e)}") |
|
return "Error", f"Exception during parsing: {str(e)}" |
|
|
|
def atla_parse_model_response(output): |
|
"""Parse response from ATLA model""" |
|
try: |
|
print(f"Raw Atla model response: {output}") |
|
output = output.strip() |
|
|
|
|
|
reasoning_match = re.search(r'\*\*Reasoning:\*\*(.*?)(?=\*\*Result:|$)', output, re.DOTALL) |
|
result_match = re.search(r'\*\*Result:\*\*\s*(\d+)', output) |
|
|
|
if reasoning_match and result_match: |
|
feedback = reasoning_match.group(1).strip() |
|
score = result_match.group(1) |
|
return str(score), feedback |
|
|
|
return "Error", f"Failed to parse ATLA response format: {output}" |
|
|
|
except Exception as e: |
|
print(f"Failed to parse ATLA response: {str(e)}") |
|
return "Error", f"Exception during parsing: {str(e)}" |
|
|
|
def flow_judge_parse_model_response(output): |
|
try: |
|
print(f"Raw model response: {output}") |
|
|
|
output = re.sub(r'\n{2,}', '\n', output.strip()) |
|
|
|
|
|
feedback_pattern = re.compile(r"<feedback>\s*(.*?)\s*</feedback>", re.DOTALL) |
|
score_pattern = re.compile(r"<score>\s*(\d+)\s*</score>", re.DOTALL) |
|
|
|
feedback_match = feedback_pattern.search(output) |
|
score_match = score_pattern.search(output) |
|
|
|
if feedback_match or not score_match: |
|
feedback = feedback_match.group(1).strip() |
|
score = int(score_match.group(1).strip()) |
|
return str(score), feedback |
|
|
|
return "Error", f"Failed to parse response: {output}" |
|
|
|
except Exception as e: |
|
print(f"Failed to parse response: {str(e)}") |
|
return "Error", f"Exception during parsing: {str(e)}" |