Chris4K's picture
Update app.py
01b4eab verified
# Install necessary libraries
#!pip install -q transformers accelerate gguf datasets gradio sympy matplotlib pandas
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import matplotlib.pyplot as plt
import pandas as pd
# Define model paths
MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct"
QUANTIZED_PRM_PATH = hf_hub_download(
repo_id="mradermacher/Llama3.1-8B-PRM-Mistral-Data-GGUF",
filename="Llama3.1-8B-PRM-Mistral-Data.Q4_K_S.gguf"
)
device = "cuda" if torch.cuda.is_available() else "cpu"
def load_model(model_name, quantized=False, quantized_model_path=None):
if quantized:
n_gpu_layers = -1 if torch.cuda.is_available() else 0
model = Llama(
model_path=quantized_model_path,
n_ctx=2048,
n_batch=512,
n_gpu_layers=n_gpu_layers,
verbose=False
)
return model, None
else:
tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side='left')
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")
return model, tokenizer
# Load models
llama_model, llama_tokenizer = load_model(MODEL_NAME)
prm_model, _ = load_model(None, quantized=True, quantized_model_path=QUANTIZED_PRM_PATH)
# Strategies
def majority_voting(prompt, num_samples=5):
outputs = []
for _ in range(num_samples):
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device)
output = llama_model.generate(input_ids, max_new_tokens=50)
outputs.append(llama_tokenizer.decode(output[0], skip_special_tokens=True))
return max(set(outputs), key=outputs.count)
def best_of_n(prompt, num_samples=5):
scored_outputs = []
for _ in range(num_samples):
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device)
output = llama_model.generate(input_ids, max_new_tokens=50)
response = llama_tokenizer.decode(output[0], skip_special_tokens=True)
score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item()
scored_outputs.append((response, score))
return max(scored_outputs, key=lambda x: x[1])[0]
def beam_search(prompt, num_beams=5):
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device)
outputs = llama_model.generate(input_ids, max_new_tokens=50, num_beams=num_beams, num_return_sequences=num_beams)
return [llama_tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
def dvts(prompt, depth=3, breadth=2):
results = []
for _ in range(breadth):
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device)
output = llama_model.generate(input_ids, max_new_tokens=50)
response = llama_tokenizer.decode(output[0], skip_special_tokens=True)
score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item()
results.append((response, score))
for _ in range(depth - 1):
best_responses = sorted(results, key=lambda x: x[1], reverse=True)[:breadth]
for response, _ in best_responses:
input_ids = llama_tokenizer(response, return_tensors="pt").input_ids.to(device)
output = llama_model.generate(input_ids, max_new_tokens=50)
extended_response = llama_tokenizer.decode(output[0], skip_special_tokens=True)
score = prm_model(**prm_tokenizer(extended_response, return_tensors="pt").to(device)).logits.mean().item()
results.append((extended_response, score))
return max(results, key=lambda x: x[1])[0]
def temperature_sampling(model, tokenizer, prompt, temperature=0.7, num_samples=5):
outputs = []
for _ in range(num_samples):
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
output = model.generate(input_ids, max_new_tokens=50, temperature=temperature)
outputs.append(tokenizer.decode(output[0], skip_special_tokens=True))
return {
"outputs": outputs,
"final_result": outputs[0]
}
def top_p_sampling(model, tokenizer, prompt, top_p=0.9, num_samples=5):
outputs = []
for _ in range(num_samples):
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
output = model.generate(input_ids, max_new_tokens=50, top_p=top_p)
outputs.append(tokenizer.decode(output[0], skip_special_tokens=True))
return {
"outputs": outputs,
"final_result": outputs[0]
}
def custom_strategy(prompt, flow):
intermediate_results = []
for step in flow:
strategy = step.get("strategy")
params = step.get("params", {})
if strategy == "majority_voting":
result = majority_voting(prompt, **params)
elif strategy == "best_of_n":
result = best_of_n(prompt, **params)
elif strategy == "beam_search":
result = beam_search(prompt, **params)
elif strategy == "top_p_sampling":
result = top_p_sampling(prompt, **params)
else:
continue
intermediate_results.append({"strategy": strategy, "result": result})
prompt = result["final_result"]
return intermediate_results
def compare_strategies(model, tokenizer, prm_model, prompt, num_samples=5):
print("Running comparison...")
strategies = {
"Majority Voting": majority_voting(model, tokenizer, prompt, num_samples),
"Best-of-N": best_of_n(model, tokenizer, prm_model, prompt, num_samples),
"Beam Search": beam_search(model, tokenizer, prompt, 5) #num_beams
#...
}
plt.figure(figsize=(10, 6))
plt.bar(strategies.keys(), [len(s["outputs"]) for s in strategies.values()])
plt.title("Strategy Comparison")
plt.ylabel("Number of Outputs")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
df = pd.DataFrame.from_dict({
strategy: {
"Final Result": data["final_result"],
"Outputs": data["outputs"]
} for strategy, data in strategies.items()
}, orient="index")
return strategies, df
def test_generation():
sample_prompt = "Explain the concept of neural networks in simple terms."
print("Starting generation test...")
strategies_results, results_df = compare_strategies(llama_model, llama_tokenizer, prm_model, sample_prompt, 1)
print("\nResults DataFrame:")
print(results_df)
return strategies_results, results_df
test_generation()
#####
######
#####
#####
###
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import matplotlib.pyplot as plt
import pandas as pd
import gradio as gr
import time
import json
import numpy as np
from datetime import datetime
def calculate_metrics(text):
return {
'token_count': len(text.split()),
'char_count': len(text),
'sentence_count': len([s for s in text.split('.') if s.strip()]),
}
def create_performance_plot(times, strategies):
plt.figure(figsize=(10, 5))
plt.bar(strategies, times)
plt.title('Generation Time by Strategy')
plt.ylabel('Time (seconds)')
plt.xticks(rotation=45)
plt.tight_layout()
return plt
def create_token_plot(tokens, strategies):
plt.figure(figsize=(10, 5))
plt.bar(strategies, tokens)
plt.title('Output Token Count by Strategy')
plt.ylabel('Number of Tokens')
plt.xticks(rotation=45)
plt.tight_layout()
return plt
def format_metrics(metrics):
print(type(metrics)) # Check if it's a list or dictionary
print(metrics) # Inspect its contents
return f"""
### Metrics
- Token Count: {metrics[0]['token_count']}
- Character Count: {metrics[0]['char_count']}
- Sentence Count: {metrics[0]['sentence_count']}
- Generation Time: {metrics[0]['generation_time']:.2f}s
"""
def run_single_strategy(prompt, strategy, num_samples):
if not prompt:
return "Please enter a prompt.", None, None, None
start_time = time.time()
strategies = {
"Majority Voting": lambda: majority_voting(llama_model, llama_tokenizer, prompt, num_samples),
"Best-of-N": lambda: best_of_n(llama_model, llama_tokenizer, prm_model, prompt, num_samples),
"Beam Search": lambda: beam_search(llama_model, llama_tokenizer, prompt, num_beams=num_samples)
}
if strategy not in strategies:
return "Invalid strategy selected.", None, None, None
result = strategies[strategy]()
generation_time = time.time() - start_time
# Calculate metrics
metrics = calculate_metrics(result['final_result'])
metrics['generation_time'] = generation_time
# Create visualizations
performance_fig = create_performance_plot([generation_time], [strategy])
token_fig = create_token_plot([metrics['token_count']], [strategy])
formatted_output = f"""
# Results for {strategy}
## Final Result
{result['final_result']}
{format_metrics(metrics)}
## All Outputs
{format_metrics(result['outputs'])}
## Generation Details
- Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- Number of samples: {num_samples}
- Model: {MODEL_NAME}
- Device: {device}
"""
return formatted_output, performance_fig, token_fig, metrics
def run_all_strategies(prompt, num_samples):
if not prompt:
return "Please enter a prompt.", None, None, None
all_metrics = {}
all_times = []
all_tokens = []
strategies = ["Majority Voting", "Best-of-N", "Beam Search"]
output_text = "# Results from All Strategies\n\n"
for strategy in strategies:
start_time = time.time()
result = run_single_strategy(prompt, strategy, num_samples)[0]
generation_time = time.time() - start_time
metrics = calculate_metrics(result['final_result'])
metrics['generation_time'] = generation_time
all_metrics[strategy] = metrics
all_times.append(generation_time)
all_tokens.append(metrics['token_count'])
output_text += f"""
## {strategy}
{result}
---
"""
# Create comparison visualizations
performance_fig = create_performance_plot(all_times, strategies)
token_fig = create_token_plot(all_tokens, strategies)
# Add comparison summary
output_text += """
# Strategy Comparison Summary
"""
for strategy, metrics in all_metrics.items():
output_text += f"""
## {strategy}
{format_metrics(metrics)}
"""
return output_text, performance_fig, token_fig, all_metrics
# Create the enhanced Gradio interface
with gr.Blocks(title="Advanced Text Generation Strategies") as demo:
gr.Markdown("# Advanced Text Generation Strategies Demo")
with gr.Row():
with gr.Column(scale=2):
prompt_input = gr.Textbox(
label="Enter your prompt",
placeholder="Type your prompt here...",
lines=3
)
with gr.Row():
num_samples = gr.Slider(
minimum=1,
maximum=10,
value=5,
step=1,
label="Number of samples/beams"
)
strategy_dropdown = gr.Dropdown(
choices=["Majority Voting", "Best-of-N", "Beam Search"],
label="Select Strategy",
value="Majority Voting"
)
with gr.Row():
single_strategy_btn = gr.Button("Run Selected Strategy")
all_strategies_btn = gr.Button("Run All Strategies")
with gr.Column(scale=3):
output_display = gr.Markdown(label="Results")
with gr.Row():
performance_plot = gr.Plot(label="Performance Comparison")
token_plot = gr.Plot(label="Token Count Comparison")
metrics_display = gr.JSON(label="Detailed Metrics")
# Set up event handlers
single_strategy_btn.click(
fn=run_single_strategy,
inputs=[prompt_input, strategy_dropdown, num_samples],
outputs=[output_display, performance_plot, token_plot, metrics_display]
)
all_strategies_btn.click(
fn=run_all_strategies,
inputs=[prompt_input, num_samples],
outputs=[output_display, performance_plot, token_plot, metrics_display]
)
if __name__ == "__main__":
demo.launch(debug=True)