# Install necessary libraries #!pip install -q transformers accelerate gguf datasets gradio sympy matplotlib pandas import torch from transformers import AutoModelForCausalLM, AutoTokenizer from llama_cpp import Llama from huggingface_hub import hf_hub_download import matplotlib.pyplot as plt import pandas as pd # Define model paths MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct" QUANTIZED_PRM_PATH = hf_hub_download( repo_id="mradermacher/Llama3.1-8B-PRM-Mistral-Data-GGUF", filename="Llama3.1-8B-PRM-Mistral-Data.Q4_K_S.gguf" ) device = "cuda" if torch.cuda.is_available() else "cpu" def load_model(model_name, quantized=False, quantized_model_path=None): if quantized: n_gpu_layers = -1 if torch.cuda.is_available() else 0 model = Llama( model_path=quantized_model_path, n_ctx=2048, n_batch=512, n_gpu_layers=n_gpu_layers, verbose=False ) return model, None else: tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side='left') if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto") return model, tokenizer # Load models llama_model, llama_tokenizer = load_model(MODEL_NAME) prm_model, _ = load_model(None, quantized=True, quantized_model_path=QUANTIZED_PRM_PATH) # Strategies def majority_voting(prompt, num_samples=5): outputs = [] for _ in range(num_samples): input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) output = llama_model.generate(input_ids, max_new_tokens=50) outputs.append(llama_tokenizer.decode(output[0], skip_special_tokens=True)) return max(set(outputs), key=outputs.count) def best_of_n(prompt, num_samples=5): scored_outputs = [] for _ in range(num_samples): input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) output = llama_model.generate(input_ids, max_new_tokens=50) response = llama_tokenizer.decode(output[0], skip_special_tokens=True) score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item() scored_outputs.append((response, score)) return max(scored_outputs, key=lambda x: x[1])[0] def beam_search(prompt, num_beams=5): input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) outputs = llama_model.generate(input_ids, max_new_tokens=50, num_beams=num_beams, num_return_sequences=num_beams) return [llama_tokenizer.decode(output, skip_special_tokens=True) for output in outputs] def dvts(prompt, depth=3, breadth=2): results = [] for _ in range(breadth): input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) output = llama_model.generate(input_ids, max_new_tokens=50) response = llama_tokenizer.decode(output[0], skip_special_tokens=True) score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item() results.append((response, score)) for _ in range(depth - 1): best_responses = sorted(results, key=lambda x: x[1], reverse=True)[:breadth] for response, _ in best_responses: input_ids = llama_tokenizer(response, return_tensors="pt").input_ids.to(device) output = llama_model.generate(input_ids, max_new_tokens=50) extended_response = llama_tokenizer.decode(output[0], skip_special_tokens=True) score = prm_model(**prm_tokenizer(extended_response, return_tensors="pt").to(device)).logits.mean().item() results.append((extended_response, score)) return max(results, key=lambda x: x[1])[0] def temperature_sampling(model, tokenizer, prompt, temperature=0.7, num_samples=5): outputs = [] for _ in range(num_samples): input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) output = model.generate(input_ids, max_new_tokens=50, temperature=temperature) outputs.append(tokenizer.decode(output[0], skip_special_tokens=True)) return { "outputs": outputs, "final_result": outputs[0] } def top_p_sampling(model, tokenizer, prompt, top_p=0.9, num_samples=5): outputs = [] for _ in range(num_samples): input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) output = model.generate(input_ids, max_new_tokens=50, top_p=top_p) outputs.append(tokenizer.decode(output[0], skip_special_tokens=True)) return { "outputs": outputs, "final_result": outputs[0] } def custom_strategy(prompt, flow): intermediate_results = [] for step in flow: strategy = step.get("strategy") params = step.get("params", {}) if strategy == "majority_voting": result = majority_voting(prompt, **params) elif strategy == "best_of_n": result = best_of_n(prompt, **params) elif strategy == "beam_search": result = beam_search(prompt, **params) elif strategy == "top_p_sampling": result = top_p_sampling(prompt, **params) else: continue intermediate_results.append({"strategy": strategy, "result": result}) prompt = result["final_result"] return intermediate_results def compare_strategies(model, tokenizer, prm_model, prompt, num_samples=5): print("Running comparison...") strategies = { "Majority Voting": majority_voting(model, tokenizer, prompt, num_samples), "Best-of-N": best_of_n(model, tokenizer, prm_model, prompt, num_samples), "Beam Search": beam_search(model, tokenizer, prompt, 5) #num_beams #... } plt.figure(figsize=(10, 6)) plt.bar(strategies.keys(), [len(s["outputs"]) for s in strategies.values()]) plt.title("Strategy Comparison") plt.ylabel("Number of Outputs") plt.xticks(rotation=45) plt.tight_layout() plt.show() df = pd.DataFrame.from_dict({ strategy: { "Final Result": data["final_result"], "Outputs": data["outputs"] } for strategy, data in strategies.items() }, orient="index") return strategies, df def test_generation(): sample_prompt = "Explain the concept of neural networks in simple terms." print("Starting generation test...") strategies_results, results_df = compare_strategies(llama_model, llama_tokenizer, prm_model, sample_prompt, 1) print("\nResults DataFrame:") print(results_df) return strategies_results, results_df test_generation() ##### ###### ##### ##### ### import torch from transformers import AutoModelForCausalLM, AutoTokenizer from llama_cpp import Llama from huggingface_hub import hf_hub_download import matplotlib.pyplot as plt import pandas as pd import gradio as gr import time import json import numpy as np from datetime import datetime def calculate_metrics(text): return { 'token_count': len(text.split()), 'char_count': len(text), 'sentence_count': len([s for s in text.split('.') if s.strip()]), } def create_performance_plot(times, strategies): plt.figure(figsize=(10, 5)) plt.bar(strategies, times) plt.title('Generation Time by Strategy') plt.ylabel('Time (seconds)') plt.xticks(rotation=45) plt.tight_layout() return plt def create_token_plot(tokens, strategies): plt.figure(figsize=(10, 5)) plt.bar(strategies, tokens) plt.title('Output Token Count by Strategy') plt.ylabel('Number of Tokens') plt.xticks(rotation=45) plt.tight_layout() return plt def format_metrics(metrics): print(type(metrics)) # Check if it's a list or dictionary print(metrics) # Inspect its contents return f""" ### Metrics - Token Count: {metrics[0]['token_count']} - Character Count: {metrics[0]['char_count']} - Sentence Count: {metrics[0]['sentence_count']} - Generation Time: {metrics[0]['generation_time']:.2f}s """ def run_single_strategy(prompt, strategy, num_samples): if not prompt: return "Please enter a prompt.", None, None, None start_time = time.time() strategies = { "Majority Voting": lambda: majority_voting(llama_model, llama_tokenizer, prompt, num_samples), "Best-of-N": lambda: best_of_n(llama_model, llama_tokenizer, prm_model, prompt, num_samples), "Beam Search": lambda: beam_search(llama_model, llama_tokenizer, prompt, num_beams=num_samples) } if strategy not in strategies: return "Invalid strategy selected.", None, None, None result = strategies[strategy]() generation_time = time.time() - start_time # Calculate metrics metrics = calculate_metrics(result['final_result']) metrics['generation_time'] = generation_time # Create visualizations performance_fig = create_performance_plot([generation_time], [strategy]) token_fig = create_token_plot([metrics['token_count']], [strategy]) formatted_output = f""" # Results for {strategy} ## Final Result {result['final_result']} {format_metrics(metrics)} ## All Outputs {format_metrics(result['outputs'])} ## Generation Details - Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Number of samples: {num_samples} - Model: {MODEL_NAME} - Device: {device} """ return formatted_output, performance_fig, token_fig, metrics def run_all_strategies(prompt, num_samples): if not prompt: return "Please enter a prompt.", None, None, None all_metrics = {} all_times = [] all_tokens = [] strategies = ["Majority Voting", "Best-of-N", "Beam Search"] output_text = "# Results from All Strategies\n\n" for strategy in strategies: start_time = time.time() result = run_single_strategy(prompt, strategy, num_samples)[0] generation_time = time.time() - start_time metrics = calculate_metrics(result['final_result']) metrics['generation_time'] = generation_time all_metrics[strategy] = metrics all_times.append(generation_time) all_tokens.append(metrics['token_count']) output_text += f""" ## {strategy} {result} --- """ # Create comparison visualizations performance_fig = create_performance_plot(all_times, strategies) token_fig = create_token_plot(all_tokens, strategies) # Add comparison summary output_text += """ # Strategy Comparison Summary """ for strategy, metrics in all_metrics.items(): output_text += f""" ## {strategy} {format_metrics(metrics)} """ return output_text, performance_fig, token_fig, all_metrics # Create the enhanced Gradio interface with gr.Blocks(title="Advanced Text Generation Strategies") as demo: gr.Markdown("# Advanced Text Generation Strategies Demo") with gr.Row(): with gr.Column(scale=2): prompt_input = gr.Textbox( label="Enter your prompt", placeholder="Type your prompt here...", lines=3 ) with gr.Row(): num_samples = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="Number of samples/beams" ) strategy_dropdown = gr.Dropdown( choices=["Majority Voting", "Best-of-N", "Beam Search"], label="Select Strategy", value="Majority Voting" ) with gr.Row(): single_strategy_btn = gr.Button("Run Selected Strategy") all_strategies_btn = gr.Button("Run All Strategies") with gr.Column(scale=3): output_display = gr.Markdown(label="Results") with gr.Row(): performance_plot = gr.Plot(label="Performance Comparison") token_plot = gr.Plot(label="Token Count Comparison") metrics_display = gr.JSON(label="Detailed Metrics") # Set up event handlers single_strategy_btn.click( fn=run_single_strategy, inputs=[prompt_input, strategy_dropdown, num_samples], outputs=[output_display, performance_plot, token_plot, metrics_display] ) all_strategies_btn.click( fn=run_all_strategies, inputs=[prompt_input, num_samples], outputs=[output_display, performance_plot, token_plot, metrics_display] ) if __name__ == "__main__": demo.launch(debug=True)