Chatbot_Law / app.py
minhdang14902's picture
Update app.py
9718bbe verified
raw
history blame
17.8 kB
import streamlit as st
import torch
import pytorch_lightning as pl
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline, T5Tokenizer, T5ForConditionalGeneration
import nltk
from transformers.models.roberta.modeling_roberta import *
from transformers import RobertaForQuestionAnswering
from nltk import word_tokenize
import json
import pandas as pd
# import re
import base64
# Set the background image
# background_image = """
# <style>
# [data-testid="stAppViewContainer"] > .main {
# background-image: url("https://images.unsplash.com/photo-1542281286-9e0a16bb7366");
# background-size: 100vw 100vh; # This sets the size to cover 100% of the viewport width and height
# background-position: center;
# background-repeat: no-repeat;
# }
# </style>
# """
# st.markdown(background_image, unsafe_allow_html=True)
# def set_bg_hack(main_bg):
# '''
# A function to unpack an image from root folder and set as bg.
# Returns
# -------
# The background.
# '''
# # set bg name
# main_bg_ext = "png"
# st.markdown(
# f"""
# <style>
# .stApp {{
# background: url(data:image/{main_bg_ext};base64,{base64.b64encode(open(main_bg, "rb").read()).decode()});
# background-size: cover
# }}
# </style>
# """,
# unsafe_allow_html=True
# )
# set_bg_hack("Background.png")
# image_url = "logo1.png"
# # Hiển thị hình ảnh mà không có caption và điều chỉnh kích thước nhỏ lại
# st.image(image_url, width=100)
# Download punkt for nltk
print("===================================================================")
@st.cache_data
def download_nltk_punkt():
nltk.download('punkt_tab')
# Cache loading PhoBert model and tokenizer
@st.cache_data
def load_phoBert():
model = AutoModelForSequenceClassification.from_pretrained('minhdang14902/Phobert_Law')
tokenizer = AutoTokenizer.from_pretrained('minhdang14902/Phobert_Law')
return model, tokenizer
# Call the cached functions
download_nltk_punkt()
phoBert_model, phoBert_tokenizer = load_phoBert()
# Initialize the pipeline with the loaded PhoBert model and tokenizer
chatbot_pipeline = pipeline("sentiment-analysis", model=phoBert_model, tokenizer=phoBert_tokenizer)
# Load spaCy Vietnamese model
# nlp = spacy.load('vi_core_news_lg')
# Load intents from json file
def load_json_file(filename):
with open(filename) as f:
file = json.load(f)
return file
filename = './Law_2907.json'
intents = load_json_file(filename)
def create_df():
df = pd.DataFrame({
'Pattern': [],
'Tag': []
})
return df
df = create_df()
def extract_json_info(json_file, df):
for intent in json_file['intents']:
for pattern in intent['patterns']:
sentence_tag = [pattern, intent['tag']]
df.loc[len(df.index)] = sentence_tag
return df
df = extract_json_info(intents, df)
df2 = df.copy()
labels = df2['Tag'].unique().tolist()
labels = [s.strip() for s in labels]
num_labels = len(labels)
id2label = {id: label for id, label in enumerate(labels)}
label2id = {label: id for id, label in enumerate(labels)}
# def tokenize_with_spacy(text):
# doc = nlp(text)
# tokens = [token.text for token in doc]
# tokenized_text = ' '.join(tokens)
# tokenized_text = re.sub(r'(?<!\s)([.,?])', r' \1', tokenized_text)
# tokenized_text = re.sub(r'([.,?])(?!\s)', r'\1 ', tokenized_text)
# return tokenized_text
# Load Roberta model and tokenizer
_CHECKPOINT_FOR_DOC = "roberta-base"
_CONFIG_FOR_DOC = "RobertaConfig"
_TOKENIZER_FOR_DOC = "RobertaTokenizer"
class MRCQuestionAnswering(RobertaPreTrainedModel):
config_class = RobertaConfig
def _reorder_cache(self, past, beam_idx):
pass
_keys_to_ignore_on_load_unexpected = [r"pooler"]
_keys_to_ignore_on_load_missing = [r"position_ids"]
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.roberta = RobertaModel(config, add_pooling_layer=False)
self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels)
self.init_weights()
def forward(
self,
input_ids=None,
words_lengths=None,
start_idx=None,
end_idx=None,
attention_mask=None,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
start_positions=None,
end_positions=None,
span_answer_ids=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids,
attention_mask=attention_mask,
token_type_ids=None, # Roberta doesn't use token_type_ids
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
context_embedding = sequence_output
batch_size = input_ids.shape[0]
max_sub_word = input_ids.shape[1]
max_word = words_lengths.shape[1]
align_matrix = torch.zeros((batch_size, max_word, max_sub_word))
for i, sample_length in enumerate(words_lengths):
for j in range(len(sample_length)):
start_idx = torch.sum(sample_length[:j])
align_matrix[i][j][start_idx: start_idx + sample_length[j]] = 1 if sample_length[j] > 0 else 0
align_matrix = align_matrix.to(context_embedding.device)
context_embedding_align = torch.bmm(align_matrix, context_embedding)
logits = self.qa_outputs(context_embedding_align)
start_logits, end_logits = logits.split(1, dim=-1)
start_logits = start_logits.squeeze(-1).contiguous()
end_logits = end_logits.squeeze(-1).contiguous()
total_loss = None
if start_positions is not None and end_positions is not None:
if len(start_positions.size()) > 1:
start_positions = start_positions.squeeze(-1)
if len(end_positions.size()) > 1:
end_positions = end_positions.squeeze(-1)
ignored_index = start_logits.size(1)
start_positions = start_positions.clamp(0, ignored_index)
end_positions = end_positions.clamp(0, ignored_index)
loss_fct = CrossEntropyLoss(ignore_index=ignored_index)
start_loss = loss_fct(start_logits, start_positions)
end_loss = loss_fct(end_logits, end_positions)
total_loss = (start_loss + end_loss) / 2
if not return_dict:
output = (start_logits, end_logits) + outputs[2:]
return ((total_loss,) + output) if total_loss is not None else output
return QuestionAnsweringModelOutput(
loss=total_loss,
start_logits=start_logits,
end_logits=end_logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
# roberta_model_checkpoint = "minhdang14902/Roberta_edu"
# roberta_tokenizer = AutoTokenizer.from_pretrained(roberta_model_checkpoint)
# roberta_model = MRCQuestionAnswering.from_pretrained(roberta_model_checkpoint)
# Cache loading Roberta model and tokenizer
@st.cache_data
def load_roberta_model():
model = MRCQuestionAnswering.from_pretrained('minhdang14902/Roberta_Law')
tokenizer = AutoTokenizer.from_pretrained('minhdang14902/Roberta_Law')
return model, tokenizer
roberta_model, roberta_tokenizer = load_roberta_model()
def chatRoberta(text):
label = label2id[chatbot_pipeline(text)[0]['label']]
response = intents['intents'][label]['responses']
print(response[0])
QA_input = {
'question': text,
'context': response[0]
}
# Tokenize input
encoded_input = tokenize_function(QA_input, roberta_tokenizer)
# Prepare batch samples
batch_samples = data_collator([encoded_input], roberta_tokenizer)
# Model prediction
roberta_model.eval()
with torch.no_grad():
inputs = {
'input_ids': batch_samples['input_ids'],
'attention_mask': batch_samples['attention_mask'],
'words_lengths': batch_samples['words_lengths'],
}
outputs = roberta_model(**inputs)
# Extract answer
result = extract_answer([encoded_input], outputs, roberta_tokenizer)
context = response[0]
return result, context
def tokenize_function(example, tokenizer):
question_word = word_tokenize(example["question"])
context_word = word_tokenize(example["context"])
question_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in question_word]
context_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in context_word]
valid = True
if len([j for i in question_sub_words_ids + context_sub_words_ids for j in i]) > tokenizer.model_max_length - 1:
valid = False
question_sub_words_ids = [[tokenizer.bos_token_id]] + question_sub_words_ids + [[tokenizer.eos_token_id]]
context_sub_words_ids = context_sub_words_ids + [[tokenizer.eos_token_id]]
input_ids = [j for i in question_sub_words_ids + context_sub_words_ids for j in i]
if len(input_ids) > tokenizer.model_max_length:
valid = False
words_lengths = [len(item) for item in question_sub_words_ids + context_sub_words_ids]
return {
"input_ids": input_ids,
"words_lengths": words_lengths,
"valid": valid
}
def data_collator(samples, tokenizer):
if len(samples) == 0:
return {}
def collate_tokens(values, pad_idx, eos_idx=None, left_pad=False, move_eos_to_beginning=False):
size = max(v.size(0) for v in values)
res = values[0].new(len(values), size).fill_(pad_idx)
def copy_tensor(src, dst):
assert dst.numel() == src.numel()
if move_eos_to_beginning:
assert src[-1] == eos_idx
dst[0] = eos_idx
dst[1:] = src[:-1]
else:
dst.copy_(src)
for i, v in enumerate(values):
copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)])
return res
input_ids = collate_tokens([torch.tensor(item['input_ids']) for item in samples], pad_idx=tokenizer.pad_token_id)
attention_mask = torch.zeros_like(input_ids)
for i in range(len(samples)):
attention_mask[i][:len(samples[i]['input_ids'])] = 1
words_lengths = collate_tokens([torch.tensor(item['words_lengths']) for item in samples], pad_idx=0)
batch_samples = {
'input_ids': input_ids,
'attention_mask': attention_mask,
'words_lengths': words_lengths,
}
return batch_samples
def extract_answer(inputs, outputs, tokenizer):
plain_result = []
for sample_input, start_logit, end_logit in zip(inputs, outputs.start_logits, outputs.end_logits):
sample_words_length = sample_input['words_lengths']
input_ids = sample_input['input_ids']
answer_start = sum(sample_words_length[:torch.argmax(start_logit)])
answer_end = sum(sample_words_length[:torch.argmax(end_logit) + 1])
if answer_start <= answer_end:
answer = tokenizer.convert_tokens_to_string(
tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))
if answer == tokenizer.bos_token:
answer = ''
else:
answer = ''
score_start = torch.max(torch.softmax(start_logit, dim=-1)).cpu().detach().numpy().tolist()
score_end = torch.max(torch.softmax(end_logit, dim=-1)).cpu().detach().numpy().tolist()
plain_result.append({
"answer": answer,
"score_start": score_start,
"score_end": score_end
})
return plain_result
#T555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
INPUT_MAX_LEN = 128 # Adjusted input length
OUTPUT_MAX_LEN = 512 # Adjusted output length
@st.cache_data
def download_model_name():
MODEL_NAME = "VietAI/vit5-base"
return MODEL_NAME
MODEL_NAME = download_model_name()
tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME, model_max_length=INPUT_MAX_LEN)
class T5Model(pl.LightningModule):
def __init__(self):
super().__init__()
self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, return_dict=True)
def forward(self, input_ids, attention_mask, labels=None):
output = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels
)
return output.loss, output.logits
def training_step(self, batch, batch_idx):
input_ids = batch["input_ids"].to(DEVICE)
attention_mask = batch["attention_mask"].to(DEVICE)
labels = batch["target"].to(DEVICE)
loss, logits = self(input_ids, attention_mask, labels)
self.log("train_loss", loss, prog_bar=True, logger=True)
return {'loss': loss}
def validation_step(self, batch, batch_idx):
input_ids = batch["input_ids"].to(DEVICE)
attention_mask = batch["attention_mask"].to(DEVICE)
labels = batch["target"].to(DEVICE)
loss, logits = self(input_ids, attention_mask, labels)
self.log("val_loss", loss, prog_bar=True, logger=True)
return {'val_loss': loss}
def configure_optimizers(self):
return AdamW(self.parameters(), lr=0.0001)
@st.cache_data
def load_t5():
train_model = T5Model.load_from_checkpoint('./data-law/law-model-v1.ckpt')
train_model.freeze()
return train_model
train_model = load_t5()
def generate_question(question):
inputs_encoding = tokenizer(
question,
add_special_tokens=True,
max_length=INPUT_MAX_LEN,
padding='max_length',
truncation='only_first',
return_attention_mask=True,
return_tensors="pt"
).to(DEVICE)
generate_ids = train_model.model.generate(
input_ids=inputs_encoding["input_ids"],
attention_mask=inputs_encoding["attention_mask"],
max_length=INPUT_MAX_LEN,
num_beams=4,
num_return_sequences=1,
no_repeat_ngram_size=2,
early_stopping=True,
)
preds = [
tokenizer.decode(gen_id, skip_special_tokens=True, clean_up_tokenization_spaces=True)
for gen_id in generate_ids
]
response = " ".join(preds[0].split())
print('T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5')
return response
# st.title("Chatbot Roberta")
# st.write("Hi! Tôi là trợ lý của bạn trong việc trả lời các câu hỏi.")
# text = st.text_input("User: ", key="input")
# if 'chat_history' not in st.session_state:
# st.session_state['chat_history'] = []
# def get_response(text):
# st.subheader("The Answer is:")
# st.write(text)
# answer, context = chatRoberta(text)
# result = answer[0]['answer']
# if result == "":
# return "Xin lỗi, tôi không thể tìm được đáp án phù hợp cho câu hỏi này ... Hãy thử trả lời bằng câu hỏi khác!"
# return result
# if st.button("Chat!"):
# st.session_state['chat_history'].append(("User", text))
# response = get_response(text)
# st.subheader("The Response is:")
# message = st.empty()
# result = ""
# for chunk in response:
# result += chunk
# message.markdown(result + "❚ ")
# message.markdown(result)
# st.session_state['chat_history'].append(("Bot", result))
# for i, (sender, message) in enumerate(st.session_state['chat_history']):
# if sender == "User":
# st.text_area(f"User:", value=message, height=100, max_chars=None, key=f"user_{i}")
# else:
# st.text_area(f"Bot:", value=message, height=100, max_chars=None, key=f"bot_{i}")
def get_response(text):
# Thay thế hàm này bằng model của bạn để lấy câu trả lời từ bot
# st.subheader("The Answer is:")
# st.write(text)
answer, context = chatRoberta(text)
result = answer[0]['answer']
if result == "":
print("Khởi chạy T5")
return generate_question(text)
return result
st.title("General Law Chatbot")
# Khởi tạo lịch sử tin nhắn
if "messages" not in st.session_state:
st.session_state.messages = []
# Hiển thị các tin nhắn từ lịch sử
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Nhận input từ người dùng
if prompt := st.chat_input("What is up?"):
# Thêm tin nhắn của người dùng vào lịch sử
st.session_state.messages.append({"role": "user", "content": prompt})
# Hiển thị tin nhắn của người dùng trong giao diện
with st.chat_message("user"):
st.markdown(prompt)
# Lấy câu trả lời từ bot
response = get_response(prompt)
# Hiển thị câu trả lời của bot trong giao diện
with st.chat_message("assistant"):
st.markdown(response)
# Thêm câu trả lời của bot vào lịch sử
st.session_state.messages.append({"role": "assistant", "content": response})