import gradio as gr from gradio.components import Textbox, Checkbox from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, T5ForConditionalGeneration from peft import PeftModel import torch import datasets from sentence_transformers import CrossEncoder import math import re from nltk import sent_tokenize, word_tokenize import nltk nltk.download('punkt') # Load bi encoder # top_k = 10 cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') # Load your fine-tuned model and tokenizer model_name = "google/flan-t5-large" peft_name = "legacy107/flan-t5-large-ia3-newsqa-100" tokenizer = AutoTokenizer.from_pretrained(model_name) pretrained_model = T5ForConditionalGeneration.from_pretrained(model_name) model = T5ForConditionalGeneration.from_pretrained(model_name) model = PeftModel.from_pretrained(model, peft_name) max_length = 512 max_target_length = 200 # Load your dataset dataset = datasets.load_dataset("legacy107/newsqa", split="test") # dataset = dataset.shuffle() dataset = dataset.select([3, 9, 14, 24, 405, 51, 426, 471, 73, 34, 94, 0]) # Context chunking def chunk_splitter(context, chunk_size=100, overlap=0.10): overlap_size = chunk_size * overlap sentences = nltk.sent_tokenize(context) chunks = [] text = sentences[0] if len(sentences) == 1: chunks.append(text) i = 1 while i < len(sentences): text += " " + sentences[i] i += 1 while i < len(sentences) and len(nltk.word_tokenize(f"{text} {sentences[i]}")) <= chunk_size: text += " " + sentences[i] i += 1 text = text.replace('\"','"').replace("\'","'").replace('\n\n\n'," ").replace('\n\n'," ").replace('\n'," ") chunks.append(text) if (i >= len(sentences)): break j = i - 1 text = sentences[j] while j >= 0 and len(nltk.word_tokenize(f"{sentences[j]} {text}")) <= overlap_size: text = sentences[j] + " " + text j -= 1 return chunks def retrieve_context(query, contexts): hits = [{"corpus_id": i} for i in range(len(contexts))] cross_inp = [[query, contexts[hit["corpus_id"]]] for hit in hits] cross_scores = cross_encoder.predict(cross_inp, show_progress_bar=False) for idx in range(len(cross_scores)): hits[idx]["cross-score"] = cross_scores[idx] hits = sorted(hits, key=lambda x: x["cross-score"], reverse=True) return " ".join( [contexts[hit["corpus_id"]] for hit in hits] ).replace("\n", " ") # Define your function to generate answers def generate_answer(question, context, ground, do_pretrained): contexts = chunk_splitter(context) context = retrieve_context(question, contexts) # Combine question and context input_text = f"question: {question} context: {context}" # Tokenize the input text input_ids = tokenizer( input_text, return_tensors="pt", padding="max_length", truncation=True, max_length=max_length, ).input_ids # Decode the context back decoded_context = tokenizer.decode(input_ids[0], skip_special_tokens=True)[len(f"question: {question} context: "):] # Generate the answer with torch.no_grad(): generated_ids = model.generate(input_ids=input_ids, max_new_tokens=max_target_length) # Decode and return the generated answer generated_answer = tokenizer.decode(generated_ids[0], skip_special_tokens=True) # Get pretrained model's answer pretrained_answer = "" if do_pretrained: with torch.no_grad(): pretrained_generated_ids = pretrained_model.generate(input_ids=input_ids, max_new_tokens=max_target_length) pretrained_answer = tokenizer.decode(pretrained_generated_ids[0], skip_special_tokens=True) return generated_answer, decoded_context, pretrained_answer # Define a function to list examples from the dataset def list_examples(): examples = [] for example in dataset: context = example["context"] question = example["question"] answer = " | ".join(example["answers"]) examples.append([question, context, answer, True]) return examples # Create a Gradio interface iface = gr.Interface( fn=generate_answer, inputs=[ Textbox(label="Question"), Textbox(label="Context"), Textbox(label="Ground truth"), Checkbox(label="Include pretrained model's result") ], outputs=[ Textbox(label="Generated Answer"), Textbox(label="Retrieved Context"), Textbox(label="Pretrained Model's Answer") ], examples=list_examples(), examples_per_page=4 ) # Launch the Gradio interface iface.launch()