paascorb's picture
Update app.py
8283ea8
import gradio as gr
from pathlib import Path
import os
from transformers import AutoTokenizer, AutoModel, AutoModelForQuestionAnswering, pipeline
from transformers import MarianMTModel, MarianTokenizer
from nltk.tokenize import sent_tokenize
from nltk.tokenize import LineTokenizer
import math
import torch
import nltk
import numpy as np
import time
import hashlib
from tqdm import tqdm
device = "cuda:0" if torch.cuda.is_available() else "cpu"
import textract
from scipy.special import softmax
import pandas as pd
from datetime import datetime
nltk.download('punkt')
docs = None
# Definimos los modelos:
# Traducción
mname = "Helsinki-NLP/opus-mt-es-en"
tokenizer_es_en = MarianTokenizer.from_pretrained(mname)
model_es_en = MarianMTModel.from_pretrained(mname)
model_es_en.to(device)
mname = "Helsinki-NLP/opus-mt-en-es"
tokenizer_en_es = MarianTokenizer.from_pretrained(mname)
model_en_es = MarianMTModel.from_pretrained(mname)
model_en_es.to(device)
lt = LineTokenizer()
# Responder preguntas
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1")
model = AutoModel.from_pretrained("sentence-transformers/multi-qa-mpnet-base-dot-v1").to(device).eval()
tokenizer_ans = AutoTokenizer.from_pretrained("deepset/roberta-large-squad2")
model_ans = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-large-squad2").to(device).eval()
if device == 'cuda:0':
pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans,device = 0)
else:
pipe = pipeline("question-answering",model_ans,tokenizer =tokenizer_ans)
def validate_dataset(dataset):
global docs
docs = None # clear it out if dataset is modified
docs_ready = dataset.iloc[-1, 0] != ""
if docs_ready:
return "✨Listo✨"
else:
return "⚠️Esperando documentos..."
def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ):
parrafos_traducidos = []
for parrafo in parrafos:
frases = sent_tokenize(parrafo)
batches = math.ceil(len(frases) / tam_bloque)
traducido = []
for i in range(batches):
bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque]
model_inputs = tokenizer(bloque_enviado, return_tensors="pt",
padding=True, truncation=True,
max_length=500).to(device)
with torch.no_grad():
bloque_traducido = model.generate(**model_inputs)
traducido += bloque_traducido
traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido]
parrafos_traducidos += [" ".join(traducido)]
return parrafos_traducidos
def traducir_es_en(texto):
parrafos = lt.tokenize(texto)
par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en)
return "\n".join(par_tra)
def traducir_en_es(texto):
parrafos = lt.tokenize(texto)
par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es)
return "\n".join(par_tra)
def request_pathname(files):
if files is None:
return [[]]
return [[file.name, file.name.split('/')[-1]] for file in files]
def cls_pooling(model_output):
return model_output.last_hidden_state[:,0]
def encode_query(query):
encoded_input = tokenizer(query, truncation=True, return_tensors='pt').to(device)
with torch.no_grad():
model_output = model(**encoded_input, return_dict=True)
embeddings = cls_pooling(model_output)
return embeddings.cpu()
def encode_docs(docs,maxlen = 64, stride = 32):
encoded_input = []
embeddings = []
spans = []
file_names = []
name, text = docs
text = text.split(" ")
if len(text) < maxlen:
text = " ".join(text)
encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device))
spans.append(temp_text)
file_names.append(name)
else:
num_iters = int(len(text)/maxlen)+1
for i in range(num_iters):
if i == 0:
temp_text = " ".join(text[i*maxlen:(i+1)*maxlen+stride])
else:
temp_text = " ".join(text[(i-1)*maxlen:(i)*maxlen][-stride:] + text[i*maxlen:(i+1)*maxlen])
encoded_input.append(tokenizer(temp_text, return_tensors='pt', truncation = True).to(device))
spans.append(temp_text)
file_names.append(name)
with torch.no_grad():
for encoded in tqdm(encoded_input):
model_output = model(**encoded, return_dict=True)
embeddings.append(cls_pooling(model_output))
embeddings = np.float32(torch.stack(embeddings).transpose(0, 1).cpu())
np.save("emb_{}.npy".format(name),dict(zip(list(range(len(embeddings))),embeddings)))
np.save("spans_{}.npy".format(name),dict(zip(list(range(len(spans))),spans)))
np.save("file_{}.npy".format(name),dict(zip(list(range(len(file_names))),file_names)))
return embeddings, spans, file_names
def predict(query,data):
query = traducir_es_en(query)
name_to_save = data.name.split("/")[-1].split(".")[0][:-8]
k=2
st = str([query,name_to_save])
st_hashed = str(hashlib.sha256(st.encode()).hexdigest()) #just to speed up examples load
hist = st + " " + st_hashed
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
try: #if the same question was already asked for this document, upload question and answer
df = pd.read_csv("{}.csv".format(hash(st)))
list_outputs = []
for i in range(k):
temp = [df.iloc[n] for n in range(k)][i]
tupla = (traducir_en_es(temp.Respuesta),
traducir_en_es(temp.Contexto),
traducir_en_es(temp.Probabilidades))
# text = ''
# text += 'Probabilidades: '+ temp.Probabilidades + '\n\n'
# text += 'Respuesta: ' +temp.Respuesta + '\n\n'
# text += 'Contexto: '+temp.Contexto + '\n\n'
list_outputs.append(tupla)
return list_outputs[0]
except Exception as e:
print(e)
print(st)
if name_to_save+".txt" in os.listdir(): #if the document was already used, load its embeddings
doc_emb = np.load('emb_{}.npy'.format(name_to_save),allow_pickle='TRUE').item()
doc_text = np.load('spans_{}.npy'.format(name_to_save),allow_pickle='TRUE').item()
file_names_dicto = np.load('file_{}.npy'.format(name_to_save),allow_pickle='TRUE').item()
doc_emb = np.array(list(doc_emb.values())).reshape(-1,768)
doc_text = list(doc_text.values())
file_names = list(file_names_dicto.values())
else:
text = textract.process("{}".format(data.name)).decode('utf8')
text = text.replace("\r", " ")
text = text.replace("\n", " ")
text = text.replace(" . "," ")
text = traducir_es_en(text)
doc_emb, doc_text, file_names = encode_docs((name_to_save,text),maxlen = 64, stride = 32)
doc_emb = doc_emb.reshape(-1, 768)
with open("{}.txt".format(name_to_save),"w",encoding="utf-8") as f:
f.write(text)
#once embeddings are calculated, run MIPS
start = time.time()
query_emb = encode_query(query)
scores = np.matmul(query_emb, doc_emb.transpose(1,0))[0].tolist()
doc_score_pairs = list(zip(doc_text, scores, file_names))
doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
probs_sum = 0
probs = softmax(sorted(scores,reverse = True)[:k])
table = {"Contexto":[],"Respuesta":[],"Probabilidades":[]}
#get answers for each pair of question (from user) and top best passages
for i, (passage, _, names) in enumerate(doc_score_pairs[:k]):
passage = passage.replace("\n","")
#passage = passage.replace(" . "," ")
if probs[i] > 0.1 or (i < 3 and probs[i] > 0.05): #generate answers for more likely passages but no less than 2
QA = {'question':query,'context':passage}
ans = pipe(QA)
probabilities = "P(a|p): {}, P(a|p,q): {}, P(p|q): {}".format(round(ans["score"],5),
round(ans["score"]*probs[i],5),
round(probs[i],5))
table["Contexto"].append(passage)
table["Respuesta"].append(str(ans["answer"]).upper())
table["Probabilidades"].append(probabilities)
else:
table["Contexto"].append(passage)
table["Respuesta"].append("no_answer_calculated")
table["Probabilidades"].append("P(p|q): {}".format(round(probs[i],5)))
#format answers for ~nice output and save it for future (if the same question is asked again using same pdf)
df = pd.DataFrame(table)
print(df)
print("time: "+ str(time.time()-start))
with open("HISTORY.txt","a", encoding = "utf-8") as f:
f.write(hist)
f.write(" " + str(current_time))
f.write("\n")
f.close()
df.to_csv("{}.csv".format(hash(st)), index=False)
list_outputs = []
for i in range(k):
temp = [df.iloc[n] for n in range(k)][i]
tupla = (traducir_en_es(temp.Respuesta),
traducir_en_es(temp.Contexto),
traducir_en_es(temp.Probabilidades))
# text = ''
# text += 'Probabilidades: '+ temp.Probabilidades + '\n\n'
# text += 'Respuesta: ' +temp.Respuesta + '\n\n'
# text += 'Contexto: '+temp.Contexto + '\n\n'
list_outputs.append(tupla)
return list_outputs[0]
with gr.Blocks() as demo:
gr.Markdown("""
# Document Question and Answer adaptado al castellano por Pablo Ascorbe.
Este espacio ha sido clonado y adaptado de: https://huggingface.co./spaces/whitead/paper-qa
La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad"
y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano
a inglés y luego volver a traducir en sentido contrario.
## Instrucciones:
Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee.
""")
file = gr.File(
label="Sus documentos subidos (PDF o txt)")
# dataset = gr.Dataframe(
# headers=["filepath", "citation string"],
# datatype=["str", "str"],
# col_count=(2, "fixed"),
# interactive=True,
# label="Documentos y citas"
# )
# buildb = gr.Textbox("⚠️Esperando documentos...",
# label="Estado", interactive=False, show_label=True)
# dataset.change(validate_dataset, inputs=[
# dataset], outputs=[buildb])
# uploaded_files.change(request_pathname, inputs=[
# uploaded_files], outputs=[dataset])
query = gr.Textbox(
placeholder="Introduzca su pregunta aquí...", label="Pregunta")
ask = gr.Button("Preguntar")
gr.Markdown("## Respuesta")
answer = gr.Markdown(label="Respuesta")
prob = gr.Markdown(label="Probabilidades")
with gr.Accordion("Contexto", open=False):
gr.Markdown(
"### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:")
context = gr.Markdown(label="Contexto")
# ask.click(fn=do_ask, inputs=[query, buildb,
# dataset], outputs=[answer, context])
ask.click(fn=predict, inputs=[query, file],
outputs=[answer, context, prob])
examples = gr.Examples(examples=[["¿Cuándo suelen comenzar las adicciones?","Entrevista Miguel Ruiz.txt"]],
inputs=[query, file])
demo.queue(concurrency_count=20)
demo.launch(show_error=True)
# iface = gr.Interface(fn =predict,
# inputs = [gr.inputs.Textbox(default="What is Open-domain question answering?"),
# gr.inputs.File(),
# ],
# outputs = [
# gr.outputs.Carousel(['text']),
# ],
# description=description,
# title = title,
# allow_flagging ="manual",flagging_options = ["correct","wrong"],
# allow_screenshot=False)
# iface.launch(enable_queue=True, show_error =True)
# Definimos los modelos:
# Traducción
# mname = "Helsinki-NLP/opus-mt-es-en"
# tokenizer_es_en = MarianTokenizer.from_pretrained(mname)
# model_es_en = MarianMTModel.from_pretrained(mname)
# model_es_en.to(device)
# mname = "Helsinki-NLP/opus-mt-en-es"
# tokenizer_en_es = MarianTokenizer.from_pretrained(mname)
# model_en_es = MarianMTModel.from_pretrained(mname)
# model_en_es.to(device)
# lt = LineTokenizer()
# Responder preguntas
# question_answerer = pipeline("question-answering", model='distilbert-base-cased-distilled-squad')
# def request_pathname(files):
# if files is None:
# return [[]]
# return [[file.name, file.name.split('/')[-1]] for file in files]
# def traducir_parrafos(parrafos, tokenizer, model, tam_bloque=8, ):
# parrafos_traducidos = []
# for parrafo in parrafos:
# frases = sent_tokenize(parrafo)
# batches = math.ceil(len(frases) / tam_bloque)
# traducido = []
# for i in range(batches):
# bloque_enviado = frases[i*tam_bloque:(i+1)*tam_bloque]
# model_inputs = tokenizer(bloque_enviado, return_tensors="pt",
# padding=True, truncation=True,
# max_length=500).to(device)
# with torch.no_grad():
# bloque_traducido = model.generate(**model_inputs)
# traducido += bloque_traducido
# traducido = [tokenizer.decode(t, skip_special_tokens=True) for t in traducido]
# parrafos_traducidos += [" ".join(traducido)]
# return parrafos_traducidos
# def traducir_es_en(texto):
# parrafos = lt.tokenize(texto)
# par_tra = traducir_parrafos(parrafos, tokenizer_es_en, model_es_en)
# return "\n".join(par_tra)
# def traducir_en_es(texto):
# parrafos = lt.tokenize(texto)
# par_tra = traducir_parrafos(parrafos, tokenizer_en_es, model_en_es)
# return "\n".join(par_tra)
# def validate_dataset(dataset):
# global docs
# docs = None # clear it out if dataset is modified
# docs_ready = dataset.iloc[-1, 0] != ""
# if docs_ready:
# return "✨Listo✨"
# else:
# return "⚠️Esperando documentos..."
# def do_ask(question, button, dataset):
# global docs
# docs_ready = dataset.iloc[-1, 0] != ""
# if button == "✨Listo✨" and docs_ready:
# for _, row in dataset.iterrows():
# path = row['filepath']
# text = Path(f'{path}').read_text()
# text_en = traducir_es_en(text)
# QA_input = {
# 'question': traducir_es_en(question),
# 'context': text_en
# }
# return traducir_en_es(question_answerer(QA_input)['answer'])
# else:
# return ""
# # def do_ask(question, button, dataset, progress=gr.Progress()):
# # global docs
# # docs_ready = dataset.iloc[-1, 0] != ""
# # if button == "✨Listo✨" and docs_ready:
# # if docs is None: # don't want to rebuild index if it's already built
# # import paperqa
# # docs = paperqa.Docs()
# # # dataset is pandas dataframe
# # for _, row in dataset.iterrows():
# # key = None
# # if ',' not in row['citation string']:
# # key = row['citation string']
# # docs.add(row['filepath'], row['citation string'], key=key)
# # else:
# # return ""
# # progress(0, "Construyendo índices...")
# # docs._build_faiss_index()
# # progress(0.25, "Encolando...")
# # result = docs.query(question)
# # progress(1.0, "¡Hecho!")
# # return result.formatted_answer, result.context
# with gr.Blocks() as demo:
# gr.Markdown("""
# # Document Question and Answer adaptado al castellano por Pablo Ascorbe.
# Este espacio ha sido clonado y adaptado de: https://huggingface.co./spaces/whitead/paper-qa
# La idea es utilizar un modelo preentrenado de HuggingFace como "distilbert-base-cased-distilled-squad"
# y responder las preguntas en inglés, para ello, será necesario hacer primero una traducción de los textos en castellano
# a inglés y luego volver a traducir en sentido contrario.
# ## Instrucciones:
# Adjunte su documento, ya sea en formato .txt o .pdf, y pregunte lo que desee.
# """)
# uploaded_files = gr.File(
# label="Sus documentos subidos (PDF o txt)", file_count="multiple", )
# dataset = gr.Dataframe(
# headers=["filepath", "citation string"],
# datatype=["str", "str"],
# col_count=(2, "fixed"),
# interactive=True,
# label="Documentos y citas"
# )
# buildb = gr.Textbox("⚠️Esperando documentos...",
# label="Estado", interactive=False, show_label=True)
# dataset.change(validate_dataset, inputs=[
# dataset], outputs=[buildb])
# uploaded_files.change(request_pathname, inputs=[
# uploaded_files], outputs=[dataset])
# query = gr.Textbox(
# placeholder="Introduzca su pregunta aquí...", label="Pregunta")
# ask = gr.Button("Preguntar")
# gr.Markdown("## Respuesta")
# answer = gr.Markdown(label="Respuesta")
# with gr.Accordion("Contexto", open=False):
# gr.Markdown(
# "### Contexto\n\nEl siguiente contexto ha sido utilizado para generar la respuesta:")
# context = gr.Markdown(label="Contexto")
# # ask.click(fn=do_ask, inputs=[query, buildb,
# # dataset], outputs=[answer, context])
# ask.click(fn=do_ask, inputs=[query, buildb,
# dataset], outputs=[answer])
# demo.queue(concurrency_count=20)
# demo.launch(show_error=True)