Spaces:
Runtime error
Runtime error
# Install necessary libraries | |
#!pip install -q transformers accelerate gguf datasets gradio sympy matplotlib pandas | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from llama_cpp import Llama | |
from huggingface_hub import hf_hub_download | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
# Define model paths | |
MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct" | |
QUANTIZED_PRM_PATH = hf_hub_download( | |
repo_id="mradermacher/Llama3.1-8B-PRM-Mistral-Data-GGUF", | |
filename="Llama3.1-8B-PRM-Mistral-Data.Q4_K_S.gguf" | |
) | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
def load_model(model_name, quantized=False, quantized_model_path=None): | |
if quantized: | |
n_gpu_layers = -1 if torch.cuda.is_available() else 0 | |
model = Llama( | |
model_path=quantized_model_path, | |
n_ctx=2048, | |
n_batch=512, | |
n_gpu_layers=n_gpu_layers, | |
verbose=False | |
) | |
return model, None | |
else: | |
tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side='left') | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto") | |
return model, tokenizer | |
# Load models | |
llama_model, llama_tokenizer = load_model(MODEL_NAME) | |
prm_model, _ = load_model(None, quantized=True, quantized_model_path=QUANTIZED_PRM_PATH) | |
# Strategies | |
def majority_voting(prompt, num_samples=5): | |
outputs = [] | |
for _ in range(num_samples): | |
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
output = llama_model.generate(input_ids, max_new_tokens=50) | |
outputs.append(llama_tokenizer.decode(output[0], skip_special_tokens=True)) | |
return max(set(outputs), key=outputs.count) | |
def best_of_n(prompt, num_samples=5): | |
scored_outputs = [] | |
for _ in range(num_samples): | |
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
output = llama_model.generate(input_ids, max_new_tokens=50) | |
response = llama_tokenizer.decode(output[0], skip_special_tokens=True) | |
score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item() | |
scored_outputs.append((response, score)) | |
return max(scored_outputs, key=lambda x: x[1])[0] | |
def beam_search(prompt, num_beams=5): | |
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
outputs = llama_model.generate(input_ids, max_new_tokens=50, num_beams=num_beams, num_return_sequences=num_beams) | |
return [llama_tokenizer.decode(output, skip_special_tokens=True) for output in outputs] | |
def dvts(prompt, depth=3, breadth=2): | |
results = [] | |
for _ in range(breadth): | |
input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
output = llama_model.generate(input_ids, max_new_tokens=50) | |
response = llama_tokenizer.decode(output[0], skip_special_tokens=True) | |
score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item() | |
results.append((response, score)) | |
for _ in range(depth - 1): | |
best_responses = sorted(results, key=lambda x: x[1], reverse=True)[:breadth] | |
for response, _ in best_responses: | |
input_ids = llama_tokenizer(response, return_tensors="pt").input_ids.to(device) | |
output = llama_model.generate(input_ids, max_new_tokens=50) | |
extended_response = llama_tokenizer.decode(output[0], skip_special_tokens=True) | |
score = prm_model(**prm_tokenizer(extended_response, return_tensors="pt").to(device)).logits.mean().item() | |
results.append((extended_response, score)) | |
return max(results, key=lambda x: x[1])[0] | |
def temperature_sampling(model, tokenizer, prompt, temperature=0.7, num_samples=5): | |
outputs = [] | |
for _ in range(num_samples): | |
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
output = model.generate(input_ids, max_new_tokens=50, temperature=temperature) | |
outputs.append(tokenizer.decode(output[0], skip_special_tokens=True)) | |
return { | |
"outputs": outputs, | |
"final_result": outputs[0] | |
} | |
def top_p_sampling(model, tokenizer, prompt, top_p=0.9, num_samples=5): | |
outputs = [] | |
for _ in range(num_samples): | |
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
output = model.generate(input_ids, max_new_tokens=50, top_p=top_p) | |
outputs.append(tokenizer.decode(output[0], skip_special_tokens=True)) | |
return { | |
"outputs": outputs, | |
"final_result": outputs[0] | |
} | |
def custom_strategy(prompt, flow): | |
intermediate_results = [] | |
for step in flow: | |
strategy = step.get("strategy") | |
params = step.get("params", {}) | |
if strategy == "majority_voting": | |
result = majority_voting(prompt, **params) | |
elif strategy == "best_of_n": | |
result = best_of_n(prompt, **params) | |
elif strategy == "beam_search": | |
result = beam_search(prompt, **params) | |
elif strategy == "top_p_sampling": | |
result = top_p_sampling(prompt, **params) | |
else: | |
continue | |
intermediate_results.append({"strategy": strategy, "result": result}) | |
prompt = result["final_result"] | |
return intermediate_results | |
def compare_strategies(model, tokenizer, prm_model, prompt, num_samples=5): | |
print("Running comparison...") | |
strategies = { | |
"Majority Voting": majority_voting(model, tokenizer, prompt, num_samples), | |
"Best-of-N": best_of_n(model, tokenizer, prm_model, prompt, num_samples), | |
"Beam Search": beam_search(model, tokenizer, prompt, 5) #num_beams | |
#... | |
} | |
plt.figure(figsize=(10, 6)) | |
plt.bar(strategies.keys(), [len(s["outputs"]) for s in strategies.values()]) | |
plt.title("Strategy Comparison") | |
plt.ylabel("Number of Outputs") | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
plt.show() | |
df = pd.DataFrame.from_dict({ | |
strategy: { | |
"Final Result": data["final_result"], | |
"Outputs": data["outputs"] | |
} for strategy, data in strategies.items() | |
}, orient="index") | |
return strategies, df | |
def test_generation(): | |
sample_prompt = "Explain the concept of neural networks in simple terms." | |
print("Starting generation test...") | |
strategies_results, results_df = compare_strategies(llama_model, llama_tokenizer, prm_model, sample_prompt, 1) | |
print("\nResults DataFrame:") | |
print(results_df) | |
return strategies_results, results_df | |
test_generation() | |
##### | |
###### | |
##### | |
##### | |
### | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from llama_cpp import Llama | |
from huggingface_hub import hf_hub_download | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import gradio as gr | |
import time | |
import json | |
import numpy as np | |
from datetime import datetime | |
def calculate_metrics(text): | |
return { | |
'token_count': len(text.split()), | |
'char_count': len(text), | |
'sentence_count': len([s for s in text.split('.') if s.strip()]), | |
} | |
def create_performance_plot(times, strategies): | |
plt.figure(figsize=(10, 5)) | |
plt.bar(strategies, times) | |
plt.title('Generation Time by Strategy') | |
plt.ylabel('Time (seconds)') | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
return plt | |
def create_token_plot(tokens, strategies): | |
plt.figure(figsize=(10, 5)) | |
plt.bar(strategies, tokens) | |
plt.title('Output Token Count by Strategy') | |
plt.ylabel('Number of Tokens') | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
return plt | |
def format_metrics(metrics): | |
print(type(metrics)) # Check if it's a list or dictionary | |
print(metrics) # Inspect its contents | |
return f""" | |
### Metrics | |
- Token Count: {metrics[0]['token_count']} | |
- Character Count: {metrics[0]['char_count']} | |
- Sentence Count: {metrics[0]['sentence_count']} | |
- Generation Time: {metrics[0]['generation_time']:.2f}s | |
""" | |
def run_single_strategy(prompt, strategy, num_samples): | |
if not prompt: | |
return "Please enter a prompt.", None, None, None | |
start_time = time.time() | |
strategies = { | |
"Majority Voting": lambda: majority_voting(llama_model, llama_tokenizer, prompt, num_samples), | |
"Best-of-N": lambda: best_of_n(llama_model, llama_tokenizer, prm_model, prompt, num_samples), | |
"Beam Search": lambda: beam_search(llama_model, llama_tokenizer, prompt, num_beams=num_samples) | |
} | |
if strategy not in strategies: | |
return "Invalid strategy selected.", None, None, None | |
result = strategies[strategy]() | |
generation_time = time.time() - start_time | |
# Calculate metrics | |
metrics = calculate_metrics(result['final_result']) | |
metrics['generation_time'] = generation_time | |
# Create visualizations | |
performance_fig = create_performance_plot([generation_time], [strategy]) | |
token_fig = create_token_plot([metrics['token_count']], [strategy]) | |
formatted_output = f""" | |
# Results for {strategy} | |
## Final Result | |
{result['final_result']} | |
{format_metrics(metrics)} | |
## All Outputs | |
{format_metrics(result['outputs'])} | |
## Generation Details | |
- Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
- Number of samples: {num_samples} | |
- Model: {MODEL_NAME} | |
- Device: {device} | |
""" | |
return formatted_output, performance_fig, token_fig, metrics | |
def run_all_strategies(prompt, num_samples): | |
if not prompt: | |
return "Please enter a prompt.", None, None, None | |
all_metrics = {} | |
all_times = [] | |
all_tokens = [] | |
strategies = ["Majority Voting", "Best-of-N", "Beam Search"] | |
output_text = "# Results from All Strategies\n\n" | |
for strategy in strategies: | |
start_time = time.time() | |
result = run_single_strategy(prompt, strategy, num_samples)[0] | |
generation_time = time.time() - start_time | |
metrics = calculate_metrics(result['final_result']) | |
metrics['generation_time'] = generation_time | |
all_metrics[strategy] = metrics | |
all_times.append(generation_time) | |
all_tokens.append(metrics['token_count']) | |
output_text += f""" | |
## {strategy} | |
{result} | |
--- | |
""" | |
# Create comparison visualizations | |
performance_fig = create_performance_plot(all_times, strategies) | |
token_fig = create_token_plot(all_tokens, strategies) | |
# Add comparison summary | |
output_text += """ | |
# Strategy Comparison Summary | |
""" | |
for strategy, metrics in all_metrics.items(): | |
output_text += f""" | |
## {strategy} | |
{format_metrics(metrics)} | |
""" | |
return output_text, performance_fig, token_fig, all_metrics | |
# Create the enhanced Gradio interface | |
with gr.Blocks(title="Advanced Text Generation Strategies") as demo: | |
gr.Markdown("# Advanced Text Generation Strategies Demo") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
prompt_input = gr.Textbox( | |
label="Enter your prompt", | |
placeholder="Type your prompt here...", | |
lines=3 | |
) | |
with gr.Row(): | |
num_samples = gr.Slider( | |
minimum=1, | |
maximum=10, | |
value=5, | |
step=1, | |
label="Number of samples/beams" | |
) | |
strategy_dropdown = gr.Dropdown( | |
choices=["Majority Voting", "Best-of-N", "Beam Search"], | |
label="Select Strategy", | |
value="Majority Voting" | |
) | |
with gr.Row(): | |
single_strategy_btn = gr.Button("Run Selected Strategy") | |
all_strategies_btn = gr.Button("Run All Strategies") | |
with gr.Column(scale=3): | |
output_display = gr.Markdown(label="Results") | |
with gr.Row(): | |
performance_plot = gr.Plot(label="Performance Comparison") | |
token_plot = gr.Plot(label="Token Count Comparison") | |
metrics_display = gr.JSON(label="Detailed Metrics") | |
# Set up event handlers | |
single_strategy_btn.click( | |
fn=run_single_strategy, | |
inputs=[prompt_input, strategy_dropdown, num_samples], | |
outputs=[output_display, performance_plot, token_plot, metrics_display] | |
) | |
all_strategies_btn.click( | |
fn=run_all_strategies, | |
inputs=[prompt_input, num_samples], | |
outputs=[output_display, performance_plot, token_plot, metrics_display] | |
) | |
if __name__ == "__main__": | |
demo.launch(debug=True) |