Spaces:
Running
Running
import numpy as np | |
from models import chat_with_model, embed | |
from prompts import create_gen_prompt, create_judge_prompt | |
import time | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import threading | |
import streamlit as st # Import Streamlit | |
import queue | |
def generate_answer(question, previous_answers, model_name, open_router_key, openai_api_key,temperature,top_p): | |
"""Generates an answer to a question using the specified language model.""" | |
gen_prompt = create_gen_prompt(question, previous_answers) | |
try: | |
new_answer = chat_with_model(prompt=gen_prompt, model=model_name, open_router_key=open_router_key, | |
openai_api_key=openai_api_key,temperature=temperature,top_p=top_p) | |
return new_answer | |
except Exception as e: | |
st.error(f"Error generating answer: {str(e)}") # Use st.error | |
return None | |
def evaluate_answer(question, new_answer, open_router_key, openai_api_key, judge_model_name,temperature,top_p): | |
"""Evaluates the coherence and novelty of an answer.""" | |
judge_prompt = create_judge_prompt(question, new_answer) | |
judge = judge_model_name # Use the judge_model_name passed to the function | |
try: | |
judge_response = chat_with_model(prompt=judge_prompt, model=judge, open_router_key=open_router_key, | |
openai_api_key=openai_api_key,temperature=temperature,top_p=top_p) | |
coherence_score = int(judge_response.split("<coherence_score>")[1].split("</coherence_score>")[0]) | |
return coherence_score | |
except Exception as e: | |
st.error(f"Error getting judge response: {str(e)}") # Use st.error | |
return None | |
def process_question(question, model_name, open_router_key, openai_api_key, result_queue, judge_model_name,coherence_threshold,novelty_threshold,temperature,top_p): | |
start_time = time.time() | |
previous_answers = [] | |
question_novelty = 0 | |
try: | |
while True: | |
new_answer = generate_answer(question, previous_answers, model_name, open_router_key, openai_api_key, temperature,top_p) | |
if new_answer is None: | |
break | |
coherence_score = evaluate_answer(question, new_answer, open_router_key, openai_api_key, judge_model_name,temperature,top_p) | |
if coherence_score is None: | |
break | |
if coherence_score <= coherence_threshold: | |
break | |
novelty_score = get_novelty_score(new_answer, previous_answers, openai_api_key) | |
if novelty_score < novelty_threshold: | |
break | |
result_dict = { | |
"type": "answer", | |
"question": question, | |
"answer": new_answer, | |
"coherence_score": coherence_score, | |
"novelty_score": novelty_score, | |
"results": [ | |
{ | |
"question": question, | |
"answers": previous_answers.copy() + [new_answer], | |
"coherence_score": coherence_score, | |
"novelty_score": question_novelty + novelty_score | |
} | |
] | |
} | |
if result_queue is not None: # Check if result_queue is provided | |
result_queue.put(result_dict) | |
yield result_dict # Use yield to return the result immediately | |
previous_answers.append(new_answer) | |
question_novelty += novelty_score | |
except Exception as e: | |
if result_queue is not None: # Check if result_queue is provided | |
result_queue.put({"type": "error", "message": str(e)}) | |
time_taken = time.time() - start_time | |
if result_queue is not None: # Check if result_queue is provided | |
result_queue.put({ | |
"type": "summary", | |
"question": question, | |
"total_novelty": question_novelty, | |
"time_taken": time_taken | |
}) | |
return question_novelty, [ | |
{ | |
"question": question, | |
"answers": previous_answers, | |
"coherence_score": coherence_score, | |
"novelty_score": question_novelty | |
} | |
] | |
def get_novelty_score(new_answer: str, previous_answers: list, openai_api_key): | |
new_embedding = embed(new_answer, openai_api_key) | |
# If there are no previous answers, return maximum novelty | |
if not previous_answers: | |
return 1.0 | |
previous_embeddings = [embed(answer, openai_api_key) for answer in previous_answers] | |
similarities = [ | |
np.dot(new_embedding, prev_embedding) / | |
(np.linalg.norm(new_embedding) * np.linalg.norm(prev_embedding)) | |
for prev_embedding in previous_embeddings | |
] | |
max_similarity = max(similarities) | |
novelty = 1 - max_similarity | |
return novelty | |
def benchmark_model_multithreaded(model_name, questions, open_router_key, openai_api_key, max_threads=None, judge_model_name=None,coherence_threshold=None,novelty_threshold=None,temperature=0,top_p=0): | |
novelty_score = 0 | |
results = [] | |
result_queue = queue.Queue() # Create a queue for communication | |
# Use max_threads if provided, otherwise default to the number of questions | |
if max_threads is None: | |
max_workers = len(questions) | |
else: | |
max_workers = max_threads | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
# Submit tasks to the thread pool | |
future_to_question = { | |
executor.submit(process_question, question, model_name, open_router_key, openai_api_key, result_queue, judge_model_name,coherence_threshold,novelty_threshold,temperature,top_p): question | |
for question in questions | |
} | |
# Collect results as they become available from futures and the queue | |
for future in as_completed(future_to_question): | |
for result in future.result(): # Iterate over yielded results from process_question | |
if result["type"] == "answer": | |
st.write(f"**Question:** {result['question']}") | |
st.write(f"**New Answer:**\n{result['answer']}") | |
st.write(f"Coherence Score: {result['coherence_score']}") # st.success for coherence | |
st.write(f"**Novelty Score:** {result['novelty_score']}") | |
results.extend(result["results"]) | |
novelty_score += result["novelty_score"] | |
st.info(f"Total novelty score across all questions (so far): {novelty_score}") # st.info for running total | |
elif result["type"] == "summary": | |
st.info(f"Total novelty score for question '{result['question']}': {result['total_novelty']}") # st.info for summary | |
st.info(f"Time taken: {result['time_taken']} seconds") # st.info for summary | |
elif result["type"] == "error": | |
st.error(f"Error in thread: {result['message']}") # st.error for errors | |
# Process remaining results in the queue (if any) | |
while not result_queue.empty(): | |
result = result_queue.get() | |
if result["type"] == "answer": | |
st.write(f"**Question:** {result['question']}") | |
st.write(f"**New Answer:**\n{result['answer']}") | |
st.success(f"Coherence Score: {result['coherence_score']}") # st.success for coherence | |
st.write(f"**Novelty Score:** {result['novelty_score']}") | |
results.extend(result["results"]) # Add results here | |
novelty_score += result["novelty_score"] # Update novelty score | |
st.warning(f"Total novelty score across all questions (so far): {novelty_score}") | |
elif result["type"] == "summary": | |
st.info(f"Total novelty score for question '{result['question']}': {result['total_novelty']}") # st.info for summary | |
st.info(f"Time taken: {result['time_taken']} seconds") # st.info for summary | |
elif result["type"] == "error": | |
st.error(f"Error in thread: {result['message']}") # st.error for errors | |
st.info(f"Final total novelty score across all questions: {novelty_score}") | |
return results | |
def benchmark_model_sequential(model_name, questions, open_router_key, openai_api_key, judge_model_name,coherence_threshold,novelty_threshold,temperature,top_p): | |
novelty_score = 0 | |
results = [] | |
for i, question in enumerate(questions): | |
for result in process_question(question, model_name, open_router_key, openai_api_key, None, judge_model_name,coherence_threshold,novelty_threshold,temperature,top_p): | |
if result["type"] == "answer": | |
st.write(f"**Question:** {result['question']}") | |
st.write(f"**New Answer:**\n{result['answer']}") | |
st.success(f"Coherence Score: {result['coherence_score']}") # st.success for coherence | |
st.write(f"**Novelty Score:** {result['novelty_score']}") | |
results.extend(result["results"]) | |
novelty_score += result["novelty_score"] # Add to novelty score | |
st.success(f"Coherence Score: {result['coherence_score']}") # st.success for coherence | |
elif result["type"] == "summary": | |
st.info(f"Total novelty score for question '{result['question']}': {result['total_novelty']}") # st.info for summary | |
st.info(f"Time taken: {result['time_taken']} seconds") # st.info for summary | |
elif result["type"] == "error": | |
st.error(f"Error in thread: {result['message']}") # st.error for errors | |
st.info(f"Final total novelty score across all questions: {novelty_score}") | |
return results | |