Spaces:
Sleeping
Sleeping
import streamlit as st | |
import torch | |
import pytorch_lightning as pl | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline, T5Tokenizer, T5ForConditionalGeneration | |
import nltk | |
from transformers.models.roberta.modeling_roberta import * | |
from transformers import RobertaForQuestionAnswering | |
from nltk import word_tokenize | |
import json | |
import pandas as pd | |
# import re | |
import base64 | |
# Set the background image | |
# background_image = """ | |
# <style> | |
# [data-testid="stAppViewContainer"] > .main { | |
# background-image: url("https://images.unsplash.com/photo-1542281286-9e0a16bb7366"); | |
# background-size: 100vw 100vh; # This sets the size to cover 100% of the viewport width and height | |
# background-position: center; | |
# background-repeat: no-repeat; | |
# } | |
# </style> | |
# """ | |
# st.markdown(background_image, unsafe_allow_html=True) | |
# def set_bg_hack(main_bg): | |
# ''' | |
# A function to unpack an image from root folder and set as bg. | |
# Returns | |
# ------- | |
# The background. | |
# ''' | |
# # set bg name | |
# main_bg_ext = "png" | |
# st.markdown( | |
# f""" | |
# <style> | |
# .stApp {{ | |
# background: url(data:image/{main_bg_ext};base64,{base64.b64encode(open(main_bg, "rb").read()).decode()}); | |
# background-size: cover | |
# }} | |
# </style> | |
# """, | |
# unsafe_allow_html=True | |
# ) | |
# set_bg_hack("Background.png") | |
# image_url = "logo1.png" | |
# # Hiển thị hình ảnh mà không có caption và điều chỉnh kích thước nhỏ lại | |
# st.image(image_url, width=100) | |
# Download punkt for nltk | |
print("===================================================================") | |
def download_nltk_punkt(): | |
nltk.download('punkt_tab') | |
# Cache loading PhoBert model and tokenizer | |
def load_phoBert(): | |
model = AutoModelForSequenceClassification.from_pretrained('minhdang14902/Phobert_Law') | |
tokenizer = AutoTokenizer.from_pretrained('minhdang14902/Phobert_Law') | |
return model, tokenizer | |
# Call the cached functions | |
download_nltk_punkt() | |
phoBert_model, phoBert_tokenizer = load_phoBert() | |
# Initialize the pipeline with the loaded PhoBert model and tokenizer | |
chatbot_pipeline = pipeline("sentiment-analysis", model=phoBert_model, tokenizer=phoBert_tokenizer) | |
# Load spaCy Vietnamese model | |
# nlp = spacy.load('vi_core_news_lg') | |
# Load intents from json file | |
def load_json_file(filename): | |
with open(filename) as f: | |
file = json.load(f) | |
return file | |
filename = './Law_2907.json' | |
intents = load_json_file(filename) | |
def create_df(): | |
df = pd.DataFrame({ | |
'Pattern': [], | |
'Tag': [] | |
}) | |
return df | |
df = create_df() | |
def extract_json_info(json_file, df): | |
for intent in json_file['intents']: | |
for pattern in intent['patterns']: | |
sentence_tag = [pattern, intent['tag']] | |
df.loc[len(df.index)] = sentence_tag | |
return df | |
df = extract_json_info(intents, df) | |
df2 = df.copy() | |
labels = df2['Tag'].unique().tolist() | |
labels = [s.strip() for s in labels] | |
num_labels = len(labels) | |
id2label = {id: label for id, label in enumerate(labels)} | |
label2id = {label: id for id, label in enumerate(labels)} | |
# def tokenize_with_spacy(text): | |
# doc = nlp(text) | |
# tokens = [token.text for token in doc] | |
# tokenized_text = ' '.join(tokens) | |
# tokenized_text = re.sub(r'(?<!\s)([.,?])', r' \1', tokenized_text) | |
# tokenized_text = re.sub(r'([.,?])(?!\s)', r'\1 ', tokenized_text) | |
# return tokenized_text | |
# Load Roberta model and tokenizer | |
_CHECKPOINT_FOR_DOC = "roberta-base" | |
_CONFIG_FOR_DOC = "RobertaConfig" | |
_TOKENIZER_FOR_DOC = "RobertaTokenizer" | |
class MRCQuestionAnswering(RobertaPreTrainedModel): | |
config_class = RobertaConfig | |
def _reorder_cache(self, past, beam_idx): | |
pass | |
_keys_to_ignore_on_load_unexpected = [r"pooler"] | |
_keys_to_ignore_on_load_missing = [r"position_ids"] | |
def __init__(self, config): | |
super().__init__(config) | |
self.num_labels = config.num_labels | |
self.roberta = RobertaModel(config, add_pooling_layer=False) | |
self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids=None, | |
words_lengths=None, | |
start_idx=None, | |
end_idx=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
start_positions=None, | |
end_positions=None, | |
span_answer_ids=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.roberta( | |
input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=None, # Roberta doesn't use token_type_ids | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
sequence_output = outputs[0] | |
context_embedding = sequence_output | |
batch_size = input_ids.shape[0] | |
max_sub_word = input_ids.shape[1] | |
max_word = words_lengths.shape[1] | |
align_matrix = torch.zeros((batch_size, max_word, max_sub_word)) | |
for i, sample_length in enumerate(words_lengths): | |
for j in range(len(sample_length)): | |
start_idx = torch.sum(sample_length[:j]) | |
align_matrix[i][j][start_idx: start_idx + sample_length[j]] = 1 if sample_length[j] > 0 else 0 | |
align_matrix = align_matrix.to(context_embedding.device) | |
context_embedding_align = torch.bmm(align_matrix, context_embedding) | |
logits = self.qa_outputs(context_embedding_align) | |
start_logits, end_logits = logits.split(1, dim=-1) | |
start_logits = start_logits.squeeze(-1).contiguous() | |
end_logits = end_logits.squeeze(-1).contiguous() | |
total_loss = None | |
if start_positions is not None and end_positions is not None: | |
if len(start_positions.size()) > 1: | |
start_positions = start_positions.squeeze(-1) | |
if len(end_positions.size()) > 1: | |
end_positions = end_positions.squeeze(-1) | |
ignored_index = start_logits.size(1) | |
start_positions = start_positions.clamp(0, ignored_index) | |
end_positions = end_positions.clamp(0, ignored_index) | |
loss_fct = CrossEntropyLoss(ignore_index=ignored_index) | |
start_loss = loss_fct(start_logits, start_positions) | |
end_loss = loss_fct(end_logits, end_positions) | |
total_loss = (start_loss + end_loss) / 2 | |
if not return_dict: | |
output = (start_logits, end_logits) + outputs[2:] | |
return ((total_loss,) + output) if total_loss is not None else output | |
return QuestionAnsweringModelOutput( | |
loss=total_loss, | |
start_logits=start_logits, | |
end_logits=end_logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
# roberta_model_checkpoint = "minhdang14902/Roberta_edu" | |
# roberta_tokenizer = AutoTokenizer.from_pretrained(roberta_model_checkpoint) | |
# roberta_model = MRCQuestionAnswering.from_pretrained(roberta_model_checkpoint) | |
# Cache loading Roberta model and tokenizer | |
def load_roberta_model(): | |
model = MRCQuestionAnswering.from_pretrained('minhdang14902/Roberta_Law') | |
tokenizer = AutoTokenizer.from_pretrained('minhdang14902/Roberta_Law') | |
return model, tokenizer | |
roberta_model, roberta_tokenizer = load_roberta_model() | |
def chatRoberta(text): | |
label = label2id[chatbot_pipeline(text)[0]['label']] | |
response = intents['intents'][label]['responses'] | |
print(response[0]) | |
QA_input = { | |
'question': text, | |
'context': response[0] | |
} | |
# Tokenize input | |
encoded_input = tokenize_function(QA_input, roberta_tokenizer) | |
# Prepare batch samples | |
batch_samples = data_collator([encoded_input], roberta_tokenizer) | |
# Model prediction | |
roberta_model.eval() | |
with torch.no_grad(): | |
inputs = { | |
'input_ids': batch_samples['input_ids'], | |
'attention_mask': batch_samples['attention_mask'], | |
'words_lengths': batch_samples['words_lengths'], | |
} | |
outputs = roberta_model(**inputs) | |
# Extract answer | |
result = extract_answer([encoded_input], outputs, roberta_tokenizer) | |
context = response[0] | |
return result, context | |
def tokenize_function(example, tokenizer): | |
question_word = word_tokenize(example["question"]) | |
context_word = word_tokenize(example["context"]) | |
question_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in question_word] | |
context_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in context_word] | |
valid = True | |
if len([j for i in question_sub_words_ids + context_sub_words_ids for j in i]) > tokenizer.model_max_length - 1: | |
valid = False | |
question_sub_words_ids = [[tokenizer.bos_token_id]] + question_sub_words_ids + [[tokenizer.eos_token_id]] | |
context_sub_words_ids = context_sub_words_ids + [[tokenizer.eos_token_id]] | |
input_ids = [j for i in question_sub_words_ids + context_sub_words_ids for j in i] | |
if len(input_ids) > tokenizer.model_max_length: | |
valid = False | |
words_lengths = [len(item) for item in question_sub_words_ids + context_sub_words_ids] | |
return { | |
"input_ids": input_ids, | |
"words_lengths": words_lengths, | |
"valid": valid | |
} | |
def data_collator(samples, tokenizer): | |
if len(samples) == 0: | |
return {} | |
def collate_tokens(values, pad_idx, eos_idx=None, left_pad=False, move_eos_to_beginning=False): | |
size = max(v.size(0) for v in values) | |
res = values[0].new(len(values), size).fill_(pad_idx) | |
def copy_tensor(src, dst): | |
assert dst.numel() == src.numel() | |
if move_eos_to_beginning: | |
assert src[-1] == eos_idx | |
dst[0] = eos_idx | |
dst[1:] = src[:-1] | |
else: | |
dst.copy_(src) | |
for i, v in enumerate(values): | |
copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)]) | |
return res | |
input_ids = collate_tokens([torch.tensor(item['input_ids']) for item in samples], pad_idx=tokenizer.pad_token_id) | |
attention_mask = torch.zeros_like(input_ids) | |
for i in range(len(samples)): | |
attention_mask[i][:len(samples[i]['input_ids'])] = 1 | |
words_lengths = collate_tokens([torch.tensor(item['words_lengths']) for item in samples], pad_idx=0) | |
batch_samples = { | |
'input_ids': input_ids, | |
'attention_mask': attention_mask, | |
'words_lengths': words_lengths, | |
} | |
return batch_samples | |
def extract_answer(inputs, outputs, tokenizer): | |
plain_result = [] | |
for sample_input, start_logit, end_logit in zip(inputs, outputs.start_logits, outputs.end_logits): | |
sample_words_length = sample_input['words_lengths'] | |
input_ids = sample_input['input_ids'] | |
answer_start = sum(sample_words_length[:torch.argmax(start_logit)]) | |
answer_end = sum(sample_words_length[:torch.argmax(end_logit) + 1]) | |
if answer_start <= answer_end: | |
answer = tokenizer.convert_tokens_to_string( | |
tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end])) | |
if answer == tokenizer.bos_token: | |
answer = '' | |
else: | |
answer = '' | |
score_start = torch.max(torch.softmax(start_logit, dim=-1)).cpu().detach().numpy().tolist() | |
score_end = torch.max(torch.softmax(end_logit, dim=-1)).cpu().detach().numpy().tolist() | |
plain_result.append({ | |
"answer": answer, | |
"score_start": score_start, | |
"score_end": score_end | |
}) | |
return plain_result | |
#T555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555 | |
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
INPUT_MAX_LEN = 128 # Adjusted input length | |
OUTPUT_MAX_LEN = 512 # Adjusted output length | |
def download_model_name(): | |
MODEL_NAME = "VietAI/vit5-base" | |
return MODEL_NAME | |
MODEL_NAME = download_model_name() | |
tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME, model_max_length=INPUT_MAX_LEN) | |
class T5Model(pl.LightningModule): | |
def __init__(self): | |
super().__init__() | |
self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, return_dict=True) | |
def forward(self, input_ids, attention_mask, labels=None): | |
output = self.model( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
labels=labels | |
) | |
return output.loss, output.logits | |
def training_step(self, batch, batch_idx): | |
input_ids = batch["input_ids"].to(DEVICE) | |
attention_mask = batch["attention_mask"].to(DEVICE) | |
labels = batch["target"].to(DEVICE) | |
loss, logits = self(input_ids, attention_mask, labels) | |
self.log("train_loss", loss, prog_bar=True, logger=True) | |
return {'loss': loss} | |
def validation_step(self, batch, batch_idx): | |
input_ids = batch["input_ids"].to(DEVICE) | |
attention_mask = batch["attention_mask"].to(DEVICE) | |
labels = batch["target"].to(DEVICE) | |
loss, logits = self(input_ids, attention_mask, labels) | |
self.log("val_loss", loss, prog_bar=True, logger=True) | |
return {'val_loss': loss} | |
def configure_optimizers(self): | |
return AdamW(self.parameters(), lr=0.0001) | |
def load_t5(): | |
train_model = T5Model.load_from_checkpoint('./data-law/law-model-v1.ckpt') | |
train_model.freeze() | |
return train_model | |
train_model = load_t5() | |
def generate_question(question): | |
inputs_encoding = tokenizer( | |
question, | |
add_special_tokens=True, | |
max_length=INPUT_MAX_LEN, | |
padding='max_length', | |
truncation='only_first', | |
return_attention_mask=True, | |
return_tensors="pt" | |
).to(DEVICE) | |
generate_ids = train_model.model.generate( | |
input_ids=inputs_encoding["input_ids"], | |
attention_mask=inputs_encoding["attention_mask"], | |
max_length=INPUT_MAX_LEN, | |
num_beams=4, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
early_stopping=True, | |
) | |
preds = [ | |
tokenizer.decode(gen_id, skip_special_tokens=True, clean_up_tokenization_spaces=True) | |
for gen_id in generate_ids | |
] | |
response = " ".join(preds[0].split()) | |
print('T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5') | |
return response | |
# st.title("Chatbot Roberta") | |
# st.write("Hi! Tôi là trợ lý của bạn trong việc trả lời các câu hỏi.") | |
# text = st.text_input("User: ", key="input") | |
# if 'chat_history' not in st.session_state: | |
# st.session_state['chat_history'] = [] | |
# def get_response(text): | |
# st.subheader("The Answer is:") | |
# st.write(text) | |
# answer, context = chatRoberta(text) | |
# result = answer[0]['answer'] | |
# if result == "": | |
# return "Xin lỗi, tôi không thể tìm được đáp án phù hợp cho câu hỏi này ... Hãy thử trả lời bằng câu hỏi khác!" | |
# return result | |
# if st.button("Chat!"): | |
# st.session_state['chat_history'].append(("User", text)) | |
# response = get_response(text) | |
# st.subheader("The Response is:") | |
# message = st.empty() | |
# result = "" | |
# for chunk in response: | |
# result += chunk | |
# message.markdown(result + "❚ ") | |
# message.markdown(result) | |
# st.session_state['chat_history'].append(("Bot", result)) | |
# for i, (sender, message) in enumerate(st.session_state['chat_history']): | |
# if sender == "User": | |
# st.text_area(f"User:", value=message, height=100, max_chars=None, key=f"user_{i}") | |
# else: | |
# st.text_area(f"Bot:", value=message, height=100, max_chars=None, key=f"bot_{i}") | |
def get_response(text): | |
# Thay thế hàm này bằng model của bạn để lấy câu trả lời từ bot | |
# st.subheader("The Answer is:") | |
# st.write(text) | |
answer, context = chatRoberta(text) | |
result = answer[0]['answer'] | |
if result == "": | |
print("Khởi chạy T5") | |
return generate_question(text) | |
return result | |
st.title("General Law Chatbot") | |
# Khởi tạo lịch sử tin nhắn | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# Hiển thị các tin nhắn từ lịch sử | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Nhận input từ người dùng | |
if prompt := st.chat_input("What is up?"): | |
# Thêm tin nhắn của người dùng vào lịch sử | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Hiển thị tin nhắn của người dùng trong giao diện | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Lấy câu trả lời từ bot | |
response = get_response(prompt) | |
# Hiển thị câu trả lời của bot trong giao diện | |
with st.chat_message("assistant"): | |
st.markdown(response) | |
# Thêm câu trả lời của bot vào lịch sử | |
st.session_state.messages.append({"role": "assistant", "content": response}) |