Spaces:
Sleeping
Sleeping
import gradio as gr | |
from gradio.components import Textbox, Checkbox | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, T5ForConditionalGeneration | |
from peft import PeftModel | |
import torch | |
import datasets | |
from sentence_transformers import CrossEncoder | |
import math | |
import re | |
from nltk import sent_tokenize, word_tokenize | |
import nltk | |
nltk.download('punkt') | |
# Load bi encoder | |
# top_k = 10 | |
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
# Load your fine-tuned model and tokenizer | |
model_name = "google/flan-t5-large" | |
peft_name = "legacy107/flan-t5-large-ia3-newsqa-100" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
pretrained_model = T5ForConditionalGeneration.from_pretrained(model_name) | |
model = T5ForConditionalGeneration.from_pretrained(model_name) | |
model = PeftModel.from_pretrained(model, peft_name) | |
max_length = 512 | |
max_target_length = 200 | |
# Load your dataset | |
dataset = datasets.load_dataset("legacy107/newsqa", split="test") | |
# dataset = dataset.shuffle() | |
dataset = dataset.select([3, 9, 14, 24, 405, 51, 426, 471, 73, 34, 94, 0]) | |
# Context chunking | |
def chunk_splitter(context, chunk_size=100, overlap=0.10): | |
overlap_size = chunk_size * overlap | |
sentences = nltk.sent_tokenize(context) | |
chunks = [] | |
text = sentences[0] | |
if len(sentences) == 1: | |
chunks.append(text) | |
i = 1 | |
while i < len(sentences): | |
text += " " + sentences[i] | |
i += 1 | |
while i < len(sentences) and len(nltk.word_tokenize(f"{text} {sentences[i]}")) <= chunk_size: | |
text += " " + sentences[i] | |
i += 1 | |
text = text.replace('\"','"').replace("\'","'").replace('\n\n\n'," ").replace('\n\n'," ").replace('\n'," ") | |
chunks.append(text) | |
if (i >= len(sentences)): | |
break | |
j = i - 1 | |
text = sentences[j] | |
while j >= 0 and len(nltk.word_tokenize(f"{sentences[j]} {text}")) <= overlap_size: | |
text = sentences[j] + " " + text | |
j -= 1 | |
return chunks | |
def retrieve_context(query, contexts): | |
hits = [{"corpus_id": i} for i in range(len(contexts))] | |
cross_inp = [[query, contexts[hit["corpus_id"]]] for hit in hits] | |
cross_scores = cross_encoder.predict(cross_inp, show_progress_bar=False) | |
for idx in range(len(cross_scores)): | |
hits[idx]["cross-score"] = cross_scores[idx] | |
hits = sorted(hits, key=lambda x: x["cross-score"], reverse=True) | |
return " ".join( | |
[contexts[hit["corpus_id"]] for hit in hits] | |
).replace("\n", " ") | |
# Define your function to generate answers | |
def generate_answer(question, context, ground, do_pretrained): | |
contexts = chunk_splitter(context) | |
context = retrieve_context(question, contexts) | |
# Combine question and context | |
input_text = f"question: {question} context: {context}" | |
# Tokenize the input text | |
input_ids = tokenizer( | |
input_text, | |
return_tensors="pt", | |
padding="max_length", | |
truncation=True, | |
max_length=max_length, | |
).input_ids | |
# Decode the context back | |
decoded_context = tokenizer.decode(input_ids[0], skip_special_tokens=True)[len(f"question: {question} context: "):] | |
# Generate the answer | |
with torch.no_grad(): | |
generated_ids = model.generate(input_ids=input_ids, max_new_tokens=max_target_length) | |
# Decode and return the generated answer | |
generated_answer = tokenizer.decode(generated_ids[0], skip_special_tokens=True) | |
# Get pretrained model's answer | |
pretrained_answer = "" | |
if do_pretrained: | |
with torch.no_grad(): | |
pretrained_generated_ids = pretrained_model.generate(input_ids=input_ids, max_new_tokens=max_target_length) | |
pretrained_answer = tokenizer.decode(pretrained_generated_ids[0], skip_special_tokens=True) | |
return generated_answer, decoded_context, pretrained_answer | |
# Define a function to list examples from the dataset | |
def list_examples(): | |
examples = [] | |
for example in dataset: | |
context = example["context"] | |
question = example["question"] | |
answer = " | ".join(example["answers"]) | |
examples.append([question, context, answer, True]) | |
return examples | |
# Create a Gradio interface | |
iface = gr.Interface( | |
fn=generate_answer, | |
inputs=[ | |
Textbox(label="Question"), | |
Textbox(label="Context"), | |
Textbox(label="Ground truth"), | |
Checkbox(label="Include pretrained model's result") | |
], | |
outputs=[ | |
Textbox(label="Generated Answer"), | |
Textbox(label="Retrieved Context"), | |
Textbox(label="Pretrained Model's Answer") | |
], | |
examples=list_examples(), | |
examples_per_page=4 | |
) | |
# Launch the Gradio interface | |
iface.launch() |