import gradio as gr import torch import itertools import pandas as pd import spaces import random from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel from sklearn.metrics import pairwise_distances from collections import Counter from itertools import chain from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction import math model_name = 'philipp-zettl/t5-small-long-qa' qa_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) model_name = 'philipp-zettl/t5-small-qg' qg_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) tokenizer = AutoTokenizer.from_pretrained('google/flan-t5-small') embedding_model = AutoModel.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2') embedding_tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2') # Move only the student model to GPU if available device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') qa_model = qa_model.to(device) qg_model = qg_model.to(device) embedding_model = embedding_model.to(device) max_questions = 1 max_answers = 1 max_elem_value = 100 def ngrams(sequence, n): return [tuple(sequence[i:i+n]) for i in range(len(sequence)-n+1)] def count_ngrams(sequence, max_n): counts = Counter() for n in range(1, max_n + 1): counts.update(ngrams(sequence, n)) return counts def self_bleu(outputs): smoothing_function = SmoothingFunction().method1 scores = [] for i in range(len(outputs)): references = outputs[:i] + outputs[i+1:] # Avoid calculating BLEU score for empty references if references: scores.append(sentence_bleu(references, outputs[i], smoothing_function=smoothing_function)) # If all references are empty, return a default value if not scores: return 0 return sum(scores) / len(scores) def dist_n(outputs, n): all_ngrams = list(chain(*[ngrams(output, n) for output in outputs])) unique_ngrams = set(all_ngrams) return len(unique_ngrams) / len(all_ngrams) if all_ngrams else 0 def perplexity(model, tokenizer, texts): encodings = tokenizer(texts, return_tensors='pt', padding=True, truncation=True) max_length = model.config.n_positions stride = 512 lls = [] for i in range(0, encodings.input_ids.size(1), stride): begin_loc = max(i + stride - max_length, 0) end_loc = i + stride trg_len = end_loc - i input_ids = encodings.input_ids[:, begin_loc:end_loc].to(model.device) target_ids = input_ids.clone() target_ids[:, :-trg_len] = -100 with torch.no_grad(): outputs = model(input_ids, labels=target_ids) log_likelihood = outputs.loss * trg_len lls.append(log_likelihood) ppl = torch.exp(torch.stack(lls).sum() / end_loc) return ppl.item() def embedding_similarity(inputs, outputs): global embedding_model, embedding_tokenizer, device def embed(texts): inputs = embedding_tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(device) with torch.no_grad(): outputs = embedding_model(**inputs) return outputs.last_hidden_state.mean(dim=1).cpu().numpy() input_embeddings = embed(inputs) output_embeddings = embed(outputs) similarities = pairwise_distances(input_embeddings, output_embeddings, metric='cosine') return sum(similarities) / len(similarities) def js_divergence(p, q): def kl_divergence(p, q): return sum(p[i] * math.log(p[i] / q[i]) for i in range(len(p)) if p[i] != 0 and q[i] != 0) p_norm = [float(i)/sum(p) for i in p] q_norm = [float(i)/sum(q) for i in q] m = [(p_norm[i] + q_norm[i]) / 2 for i in range(len(p_norm))] return (kl_divergence(p_norm, m) + kl_divergence(q_norm, m)) / 2 def evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=85): generated_outputs = [] for input_text in eval_data: input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device) outputs = model.generate( input_ids, num_beams=num_beams, num_beam_groups=num_beam_groups, diversity_penalty=1.0, max_new_tokens=max_length, ) decoded_text = tokenizer.decode(outputs[0], skip_special_tokens=True) generated_outputs.append(decoded_text.split()) # Self-BLEU for diversity diversity_score = self_bleu(generated_outputs) # Dist-1 and Dist-2 for diversity dist1 = dist_n(generated_outputs, 1) dist2 = dist_n(generated_outputs, 2) # Perplexity for fluency and relevance fluency_score = perplexity(model, tokenizer, [" ".join(output) for output in generated_outputs]) # Embedding similarity for contextual relevance contextual_score = embedding_similarity(eval_data, [" ".join(output) for output in generated_outputs]) # Jensen-Shannon Divergence for distribution similarity generated_ngrams = count_ngrams(list(chain(*generated_outputs)), 4) reference_ngrams = count_ngrams(list(chain(*[tokenizer.tokenize(text) for text in eval_data])), 4) all_ngrams = set(generated_ngrams.keys()).union(set(reference_ngrams.keys())) p = [generated_ngrams[ngram] for ngram in all_ngrams] q = [reference_ngrams[ngram] for ngram in all_ngrams] jsd_score = js_divergence(p, q) return { "diversity_score": diversity_score, "dist1": dist1, "dist2": dist2, "fluency_score": fluency_score, "contextual_score": contextual_score, "jsd_score": jsd_score } def find_best_parameters(eval_data, model, tokenizer, max_length=85): # Parameter ranges parameter_map = { 2: [2], 4: [2], 6: [2], # 6x3 == 4x2 8: [2], # 8x4 == 6x3 == 4x2 9: [3], 10: [2], # 10x5 == 8x4 == 6x3 == 4x2 } # Find the best parameters best_score = -float('inf') best_params = None for num_beams in parameter_map.keys(): for num_beam_groups in parameter_map[num_beams]: if num_beam_groups > num_beams: continue # num_beam_groups should not be greater than num_beams scores = evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=max_length) # Combine scores to determine the best parameters combined_score = (scores['dist1'] + scores['dist2'] - scores['fluency_score'] + scores['contextual_score'] - scores['jsd_score']).mean() print(f"num_beams={num_beams}, num_beam_groups={num_beam_groups}, avg combined score={combined_score}") if combined_score > best_score: best_score = combined_score best_params = (num_beams, num_beam_groups) print(f"Best parameters: num_beams={best_params[0]}, num_beam_groups={best_params[1]} with combined score={best_score}") return best_params def run_model(inputs, tokenizer, model, num_beams=2, num_beam_groups=2, temperature=0.5, num_return_sequences=1, max_length=85, seed=42069): all_outputs = [] torch.manual_seed(seed) for input_text in inputs: model_inputs = tokenizer([input_text], max_length=512, padding=True, truncation=True) input_ids = torch.tensor(model_inputs['input_ids']).to(device) for sample in input_ids: sample_outputs = [] with torch.no_grad(): sample_output = model.generate( input_ids[:1], max_length=max_length, #temperature=temperature, #do_sample=True, num_return_sequences=num_return_sequences, low_memory=True, #top_p=temperature, #num_beams=max(2, num_return_sequences), use_cache=True, # Contrastive search #penalty_alpha=0.6, #top_k=4, # Multi-nomial sampling #do_sample=True, #num_beams=1, # Beam search #num_beams=5, # Beam search multinomial sampling #num_beams=5, #do_sample=True, # Diverse Beam search decoding num_beams=max(2, num_return_sequences), num_beam_groups=max(2, num_return_sequences), diversity_penalty=temperature, #do_sample=True, ) for i, sample_output in enumerate(sample_output): sample_output = sample_output.unsqueeze(0) sample_output = tokenizer.decode(sample_output[0], skip_special_tokens=True) sample_outputs.append(sample_output) all_outputs.append(sample_outputs) return all_outputs @spaces.GPU def gen(content, temperature_qg=0.5, temperature_qa=0.75, num_return_sequences_qg=1, num_return_sequences_qa=1, max_length=85, seed=42069, optimize_questions=False): inputs = [ f'context: {content}' ] question = run_model( inputs, tokenizer, qg_model, num_beams=num_return_sequences_qg, num_beam_groups=num_return_sequences_qg, temperature=temperature_qg, num_return_sequences=num_return_sequences_qg, max_length=max_length, seed=seed ) if optimize_questions: q_params = find_best_parameters( list(chain.from_iterable(question)), qg_model, tokenizer, max_length=max_length ) question = run_model( inputs, tokenizer, qg_model, num_beams=q_params[0], num_beam_groups=q_params[1], temperature=temperature_qg, num_return_sequences=num_return_sequences_qg, max_length=max_length, seed=seed ) inputs = list(chain.from_iterable([ [f'question: {q} context: {content}' for q in q_set] for q_set in question ])) answer = run_model( inputs, tokenizer, qa_model, num_beams=num_return_sequences_qa, num_beam_groups=num_return_sequences_qa, temperature=temperature_qa, num_return_sequences=num_return_sequences_qa, max_length=max_length, seed=seed ) questions = list(chain.from_iterable(question)) answers = list(chain.from_iterable(answer)) results = [] for idx, ans in enumerate(answers): results.append({'question': questions[idx % num_return_sequences_qg], 'answer': ans}) return results def variable_outputs(k, max_elems=10): global max_elem_value k = int(k) return [gr.Text(visible=True)] * k + [gr.Text(visible=False)] * (max(max_elems, max_elem_value)- k) def set_outputs(content, max_elems=10): c = eval(content) print('received content: ', c) return [gr.Text(value=t, visible=True) for t in c] + [gr.Text(visible=False)] * (max(max_elems, 10) - len(c)) def create_file_download(qnas): with open('qnas.tsv', 'w') as f: for idx, qna in qnas.iterrows(): f.write(qna['Question'] + '\t' + qna['Answer']) if idx < len(qnas) - 1: f.write('\n') return 'qnas.tsv' with gr.Blocks() as demo: with gr.Tab(label='Description'): with gr.Row(equal_height=True): with gr.Column(): gr.Markdown( """ # QA-Generator A combination of fine-tuned flan-T5(-small) models chained into sequence to generate: a) a versatile set of questions b) an accurate set of matching answers according to a given piece of text content.""") with gr.Column(): gr.Markdown( """ The idea is simple: 1. Add your content 2. Select the amount of questions you want to generate 3. (optional) Select the amount of answers you want to generate per goven question 4. Press generate 5. ??? 6. Profit """) with gr.Row(equal_height=True): gr.Markdown(""" If you're satisfied with the generated data set, you can export it as TSV to edit or import it into your favourite tool. """) with gr.Row(equal_height=True): with gr.Accordion(label='Optimization', open=False): gr.Markdown(""" For optimization of the question generation we apply the following combined score: $$\\text{combined} = \\text{dist1} + \\text{dist2} - \\text{fluency} + \\text{contextual} - \\text{jsd}$$ Here's a brief explanation of each component: 1. **dist1 and dist2**: These represent the diversity of the generated outputs. dist1 measures the ratio of unique unigrams to total unigrams, and dist2 measures the ratio of unique bigrams to total bigrams. **Higher values indicate more diverse outputs.** 2. **fluency**: This is the perplexity of the generated outputs, which measures how well the outputs match the language model's expectations. **Lower values indicate better fluency.** 3. **contextual**: This measures the similarity between the input and generated outputs using embedding similarity. **Higher values indicate better contextual relevance.** 4. **jsd**: This is the Jensen-Shannon Divergence between the n-gram distributions of the generated outputs and the reference data. **Lower values indicate greater similarity between distributions.** """, latex_delimiters=[{'display': False, 'left': '$$', 'right': '$$'}]) with gr.Tab(label='QA Generator'): with gr.Row(equal_height=True): with gr.Group("Content"): content = gr.Textbox(label='Content', lines=15, placeholder='Enter text here', max_lines=10_000) with gr.Group("Settings"): temperature_qg = gr.Slider(label='Diversity Penalty QG', value=0.2, minimum=0, maximum=1, step=0.01) temperature_qa = gr.Slider(label='Diversity Penalty QA', value=0.5, minimum=0, maximum=1, step=0.01) max_length = gr.Number(label='Max Length', value=85, minimum=1, step=1, maximum=512) num_return_sequences_qg = gr.Number(label='Number Questions', value=max_questions, minimum=1, step=1, maximum=max(max_questions, max_elem_value)) num_return_sequences_qa = gr.Number(label="Number Answers", value=max_answers, minimum=1, step=1, maximum=max(max_questions, max_elem_value)) seed = gr.Number(label="seed", value=42069) optimize_questions = gr.Checkbox(label="Optimize questions?", value=False) with gr.Row(): gen_btn = gr.Button("Generate") @gr.render( inputs=[ content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length, seed, optimize_questions ], triggers=[gen_btn.click] ) def render_results(content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length, seed, optimize_questions): if not content.strip(): raise gr.Error('Please enter some content to generate questions and answers.') qnas = gen( content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length, seed, optimize_questions ) df = gr.Dataframe( value=[u.values() for u in qnas], headers=['Question', 'Answer'], col_count=2, wrap=True ) pd_df = pd.DataFrame([u.values() for u in qnas], columns=['Question', 'Answer']) download = gr.DownloadButton(label='Download (without headers)', value=create_file_download(pd_df)) content.change(lambda x: x.strip(), content) demo.queue() demo.launch()