import gradio as gr import torch import itertools import pandas as pd import spaces import random from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel from sklearn.metrics import pairwise_distances from collections import Counter from itertools import chain from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction import math model_name = 'philipp-zettl/t5-small-long-qa' qa_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) model_name = 'philipp-zettl/t5-small-qg' qg_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) tokenizer = AutoTokenizer.from_pretrained('google/flan-t5-small') embedding_model = AutoModel.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2') embedding_tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-MiniLM-L6-v2') # Move only the student model to GPU if available device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') qa_model = qa_model.to(device) qg_model = qg_model.to(device) embedding_model = embedding_model.to(device) max_questions = 1 max_answers = 1 max_elem_value = 100 def ngrams(sequence, n): return [tuple(sequence[i:i+n]) for i in range(len(sequence)-n+1)] def count_ngrams(sequence, max_n): counts = Counter() for n in range(1, max_n + 1): counts.update(ngrams(sequence, n)) return counts def self_bleu(outputs): smoothing_function = SmoothingFunction().method1 scores = [] for i in range(len(outputs)): references = outputs[:i] + outputs[i+1:] # Avoid calculating BLEU score for empty references if references: scores.append(sentence_bleu(references, outputs[i], smoothing_function=smoothing_function)) # If all references are empty, return a default value if not scores: return 0 return sum(scores) / len(scores) def dist_n(outputs, n): all_ngrams = list(chain(*[ngrams(output, n) for output in outputs])) unique_ngrams = set(all_ngrams) return len(unique_ngrams) / len(all_ngrams) if all_ngrams else 0 def perplexity(model, tokenizer, texts): encodings = tokenizer(texts, return_tensors='pt', padding=True, truncation=True) max_length = model.config.n_positions stride = 512 lls = [] for i in range(0, encodings.input_ids.size(1), stride): begin_loc = max(i + stride - max_length, 0) end_loc = i + stride trg_len = end_loc - i input_ids = encodings.input_ids[:, begin_loc:end_loc].to(model.device) target_ids = input_ids.clone() target_ids[:, :-trg_len] = -100 with torch.no_grad(): outputs = model(input_ids, labels=target_ids) log_likelihood = outputs.loss * trg_len lls.append(log_likelihood) ppl = torch.exp(torch.stack(lls).sum() / end_loc) return ppl.item() def embedding_similarity(inputs, outputs): global embedding_model, embedding_tokenizer, device def embed(texts): inputs = embedding_tokenizer(texts, return_tensors='pt', padding=True, truncation=True).to(device) with torch.no_grad(): outputs = embedding_model(**inputs) return outputs.last_hidden_state.mean(dim=1).cpu().numpy() input_embeddings = embed(inputs) output_embeddings = embed(outputs) similarities = pairwise_distances(input_embeddings, output_embeddings, metric='cosine') return sum(similarities) / len(similarities) def js_divergence(p, q): def kl_divergence(p, q): return sum(p[i] * math.log(p[i] / q[i]) for i in range(len(p)) if p[i] != 0 and q[i] != 0) p_norm = [float(i)/sum(p) for i in p] q_norm = [float(i)/sum(q) for i in q] m = [(p_norm[i] + q_norm[i]) / 2 for i in range(len(p_norm))] return (kl_divergence(p_norm, m) + kl_divergence(q_norm, m)) / 2 def evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=85): generated_outputs = [] for input_text in eval_data: input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device) outputs = model.generate( input_ids, num_beams=num_beams, num_beam_groups=num_beam_groups, diversity_penalty=1.0, max_new_tokens=max_length, ) decoded_text = tokenizer.decode(outputs[0], skip_special_tokens=True) generated_outputs.append(decoded_text.split()) # Self-BLEU for diversity diversity_score = self_bleu(generated_outputs) # Dist-1 and Dist-2 for diversity dist1 = dist_n(generated_outputs, 1) dist2 = dist_n(generated_outputs, 2) # Perplexity for fluency and relevance fluency_score = perplexity(model, tokenizer, [" ".join(output) for output in generated_outputs]) # Embedding similarity for contextual relevance contextual_score = embedding_similarity(eval_data, [" ".join(output) for output in generated_outputs]) # Jensen-Shannon Divergence for distribution similarity generated_ngrams = count_ngrams(list(chain(*generated_outputs)), 4) reference_ngrams = count_ngrams(list(chain(*[tokenizer.tokenize(text) for text in eval_data])), 4) all_ngrams = set(generated_ngrams.keys()).union(set(reference_ngrams.keys())) p = [generated_ngrams[ngram] for ngram in all_ngrams] q = [reference_ngrams[ngram] for ngram in all_ngrams] jsd_score = js_divergence(p, q) return { "diversity_score": diversity_score, "dist1": dist1, "dist2": dist2, "fluency_score": fluency_score, "contextual_score": contextual_score, "jsd_score": jsd_score } def find_best_parameters(eval_data, model, tokenizer, max_length=85): # Parameter ranges parameter_map = { 2: [2], 4: [2], 6: [2], # 6x3 == 4x2 8: [2], # 8x4 == 6x3 == 4x2 10: [2], # 10x5 == 8x4 == 6x3 == 4x2 } # Find the best parameters best_score = -float('inf') best_params = None for num_beams in parameter_map.keys(): for num_beam_groups in parameter_map[num_beams]: if num_beam_groups > num_beams: continue # num_beam_groups should not be greater than num_beams scores = evaluate_model(num_beams, num_beam_groups, model, tokenizer, eval_data, max_length=max_length) # Combine scores to determine the best parameters combined_score = (scores['dist1'] + scores['dist2'] - scores['fluency_score'] + scores['contextual_score'] - scores['jsd_score']).mean() print(f"num_beams={num_beams}, num_beam_groups={num_beam_groups}, avg combined score={combined_score}") if combined_score > best_score: best_score = combined_score best_params = (num_beams, num_beam_groups) print(f"Best parameters: num_beams={best_params[0]}, num_beam_groups={best_params[1]} with combined score={best_score}") return best_params def run_model(inputs, tokenizer, model, num_beams=2, num_beam_groups=2, temperature=0.5, num_return_sequences=1, max_length=85, seed=42069): all_outputs = [] torch.manual_seed(seed) for input_text in inputs: model_inputs = tokenizer([input_text], max_length=512, padding=True, truncation=True) input_ids = torch.tensor(model_inputs['input_ids']).to(device) for sample in input_ids: sample_outputs = [] with torch.no_grad(): sample_output = model.generate( input_ids[:1], max_length=max_length, #temperature=temperature, #do_sample=True, num_return_sequences=num_return_sequences, low_memory=True, #top_p=temperature, #num_beams=max(2, num_return_sequences), use_cache=True, # Contrastive search #penalty_alpha=0.6, #top_k=4, # Multi-nomial sampling #do_sample=True, #num_beams=1, # Beam search #num_beams=5, # Beam search multinomial sampling #num_beams=5, #do_sample=True, # Diverse Beam search decoding num_beams=max(2, num_return_sequences), num_beam_groups=max(2, num_return_sequences), diversity_penalty=temperature, #do_sample=True, ) for i, sample_output in enumerate(sample_output): sample_output = sample_output.unsqueeze(0) sample_output = tokenizer.decode(sample_output[0], skip_special_tokens=True) sample_outputs.append(sample_output) all_outputs.append(sample_outputs) return all_outputs @spaces.GPU def gen(content, temperature_qg=0.5, temperature_qa=0.75, num_return_sequences_qg=1, num_return_sequences_qa=1, max_length=85, seed=42069, optimize_questions=False): inputs = [ f'context: {content}' ] question = run_model( inputs, tokenizer, qg_model, num_beams=num_return_sequences_qg, num_beam_groups=num_return_sequences_qg, temperature=temperature_qg, num_return_sequences=num_return_sequences_qg, max_length=max_length, seed=seed ) if optimize_questions: q_params = find_best_parameters(list(chain.from_iterable(question)), qg_model, tokenizer, max_length=max_length) question = run_model( inputs, tokenizer, qg_model, num_beams=q_params[0], num_beam_groups=q_params[1], temperature=temperature_qg, num_return_sequences=num_return_sequences_qg, max_length=max_length, seed=seed ) inputs = list(chain.from_iterable([ [f'question: {q} context: {content}' for q in q_set] for q_set in question ])) answer = run_model( inputs, tokenizer, qa_model, num_beams=num_return_sequences_qa, num_beam_groups=num_return_sequences_qa, temperature=temperature_qa, num_return_sequences=num_return_sequences_qa, max_length=max_length, seed=seed ) questions = list(chain.from_iterable(question)) answers = list(chain.from_iterable(answer)) results = [] for idx, ans in enumerate(answers): results.append({'question': questions[idx % num_return_sequences_qg], 'answer': ans}) return results def variable_outputs(k, max_elems=10): global max_elem_value k = int(k) return [gr.Text(visible=True)] * k + [gr.Text(visible=False)] * (max(max_elems, max_elem_value)- k) def set_outputs(content, max_elems=10): c = eval(content) print('received content: ', c) return [gr.Text(value=t, visible=True) for t in c] + [gr.Text(visible=False)] * (max(max_elems, 10) - len(c)) def create_file_download(qnas): with open('qnas.tsv', 'w') as f: for idx, qna in qnas.iterrows(): f.write(qna['Question'] + '\t' + qna['Answer']) if idx < len(qnas) - 1: f.write('\n') return 'qnas.tsv' with gr.Blocks(css='.hidden_input {display: none;}') as demo: with gr.Row(equal_height=True): gr.Markdown( """ # QA-Generator A combination of fine-tuned flan-T5(-small) models chained into sequence to generate: a) a versatile set of questions b) an accurate set of matching answers according to a given piece of text content. The idea is simple: 1. Add your content 2. Select the amount of questions you want to generate 2.2 (optional) Select the amount of answers you want to generate per goven question 3. Press generate 4. ??? 5. Profit If you're satisfied with the generated data set, you can export it as TSV to edit or import it into your favourite tool. """) with gr.Row(equal_height=True): with gr.Group("Content"): content = gr.Textbox(label='Content', lines=15, placeholder='Enter text here', max_lines=10_000) with gr.Group("Settings"): temperature_qg = gr.Slider(label='Temperature QG', value=0.2, minimum=0, maximum=1, step=0.01) temperature_qa = gr.Slider(label='Temperature QA', value=0.5, minimum=0, maximum=1, step=0.01) max_length = gr.Number(label='Max Length', value=85, minimum=1, step=1, maximum=512) num_return_sequences_qg = gr.Number(label='Number Questions', value=max_questions, minimum=1, step=1, maximum=max(max_questions, max_elem_value)) num_return_sequences_qa = gr.Number(label="Number Answers", value=max_answers, minimum=1, step=1, maximum=max(max_questions, max_elem_value)) seed = gr.Number(label="seed", value=42069) optimize_questions = gr.Checkbox(label="Optimize questions?", value=False) with gr.Row(): gen_btn = gr.Button("Generate") @gr.render( inputs=[ content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length, seed, optimize_questions ], triggers=[gen_btn.click] ) def render_results(content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length, seed, optimize_questions): qnas = gen( content, temperature_qg, temperature_qa, num_return_sequences_qg, num_return_sequences_qa, max_length, seed, optimize_questions ) df = gr.Dataframe( value=[u.values() for u in qnas], headers=['Question', 'Answer'], col_count=2, wrap=True ) pd_df = pd.DataFrame([u.values() for u in qnas], columns=['Question', 'Answer']) download = gr.DownloadButton(label='Download (without headers)', value=create_file_download(pd_df)) demo.queue() demo.launch()