import streamlit as st import torch import pytorch_lightning as pl from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline, T5Tokenizer, T5ForConditionalGeneration import nltk from transformers.models.roberta.modeling_roberta import * from transformers import RobertaForQuestionAnswering from nltk import word_tokenize import json import pandas as pd # import re # import base64 # Set the background image # background_image = """ # # """ # st.markdown(background_image, unsafe_allow_html=True) # def set_bg_hack(main_bg): # ''' # A function to unpack an image from root folder and set as bg. # Returns # ------- # The background. # ''' # # set bg name # main_bg_ext = "png" # st.markdown( # f""" # # """, # unsafe_allow_html=True # ) # set_bg_hack("Background.png") # image_url = "logo1.png" # # Hiển thị hình ảnh mà không có caption và điều chỉnh kích thước nhỏ lại # st.image(image_url, width=100) # Download punkt for nltk print("===================================================================") @st.cache_data def download_nltk_punkt(): nltk.download('punkt_tab') # Cache loading PhoBert model and tokenizer @st.cache_resource def load_phoBert(): model = AutoModelForSequenceClassification.from_pretrained('minhdang14902/Phobert_Law') tokenizer = AutoTokenizer.from_pretrained('minhdang14902/Phobert_Law') return model, tokenizer # Call the cached functions download_nltk_punkt() phoBert_model, phoBert_tokenizer = load_phoBert() # Initialize the pipeline with the loaded PhoBert model and tokenizer chatbot_pipeline = pipeline("sentiment-analysis", model=phoBert_model, tokenizer=phoBert_tokenizer) # Load spaCy Vietnamese model # nlp = spacy.load('vi_core_news_lg') # Load intents from json file def load_json_file(filename): with open(filename) as f: file = json.load(f) return file filename = './Law_2907.json' intents = load_json_file(filename) @st.cache_data def create_df(): df = pd.DataFrame({ 'Pattern': [], 'Tag': [] }) return df df = create_df() @st.cache_data def extract_json_info(json_file, df): for intent in json_file['intents']: for pattern in intent['patterns']: sentence_tag = [pattern, intent['tag']] df.loc[len(df.index)] = sentence_tag return df df = extract_json_info(intents, df) df2 = df.copy() labels = df2['Tag'].unique().tolist() labels = [s.strip() for s in labels] num_labels = len(labels) id2label = {id: label for id, label in enumerate(labels)} label2id = {label: id for id, label in enumerate(labels)} # def tokenize_with_spacy(text): # doc = nlp(text) # tokens = [token.text for token in doc] # tokenized_text = ' '.join(tokens) # tokenized_text = re.sub(r'(? 0 else 0 align_matrix = align_matrix.to(context_embedding.device) context_embedding_align = torch.bmm(align_matrix, context_embedding) logits = self.qa_outputs(context_embedding_align) start_logits, end_logits = logits.split(1, dim=-1) start_logits = start_logits.squeeze(-1).contiguous() end_logits = end_logits.squeeze(-1).contiguous() total_loss = None if start_positions is not None and end_positions is not None: if len(start_positions.size()) > 1: start_positions = start_positions.squeeze(-1) if len(end_positions.size()) > 1: end_positions = end_positions.squeeze(-1) ignored_index = start_logits.size(1) start_positions = start_positions.clamp(0, ignored_index) end_positions = end_positions.clamp(0, ignored_index) loss_fct = CrossEntropyLoss(ignore_index=ignored_index) start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) total_loss = (start_loss + end_loss) / 2 if not return_dict: output = (start_logits, end_logits) + outputs[2:] return ((total_loss,) + output) if total_loss is not None else output return QuestionAnsweringModelOutput( loss=total_loss, start_logits=start_logits, end_logits=end_logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) # roberta_model_checkpoint = "minhdang14902/Roberta_edu" # roberta_tokenizer = AutoTokenizer.from_pretrained(roberta_model_checkpoint) # roberta_model = MRCQuestionAnswering.from_pretrained(roberta_model_checkpoint) # Cache loading Roberta model and tokenizer @st.cache_resource def load_roberta_model(): model = MRCQuestionAnswering.from_pretrained('minhdang14902/Roberta_Law') tokenizer = AutoTokenizer.from_pretrained('minhdang14902/Roberta_Law') return model, tokenizer roberta_model, roberta_tokenizer = load_roberta_model() def chatRoberta(text): label = label2id[chatbot_pipeline(text)[0]['label']] response = intents['intents'][label]['responses'] print(response[0]) QA_input = { 'question': text, 'context': response[0] } # Tokenize input encoded_input = tokenize_function(QA_input, roberta_tokenizer) # Prepare batch samples batch_samples = data_collator([encoded_input], roberta_tokenizer) # Model prediction roberta_model.eval() with torch.no_grad(): inputs = { 'input_ids': batch_samples['input_ids'], 'attention_mask': batch_samples['attention_mask'], 'words_lengths': batch_samples['words_lengths'], } outputs = roberta_model(**inputs) # Extract answer result = extract_answer([encoded_input], outputs, roberta_tokenizer) context = response[0] return result, context def tokenize_function(example, tokenizer): question_word = word_tokenize(example["question"]) context_word = word_tokenize(example["context"]) question_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in question_word] context_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in context_word] valid = True if len([j for i in question_sub_words_ids + context_sub_words_ids for j in i]) > tokenizer.model_max_length - 1: valid = False question_sub_words_ids = [[tokenizer.bos_token_id]] + question_sub_words_ids + [[tokenizer.eos_token_id]] context_sub_words_ids = context_sub_words_ids + [[tokenizer.eos_token_id]] input_ids = [j for i in question_sub_words_ids + context_sub_words_ids for j in i] if len(input_ids) > tokenizer.model_max_length: valid = False words_lengths = [len(item) for item in question_sub_words_ids + context_sub_words_ids] return { "input_ids": input_ids, "words_lengths": words_lengths, "valid": valid } def data_collator(samples, tokenizer): if len(samples) == 0: return {} def collate_tokens(values, pad_idx, eos_idx=None, left_pad=False, move_eos_to_beginning=False): size = max(v.size(0) for v in values) res = values[0].new(len(values), size).fill_(pad_idx) def copy_tensor(src, dst): assert dst.numel() == src.numel() if move_eos_to_beginning: assert src[-1] == eos_idx dst[0] = eos_idx dst[1:] = src[:-1] else: dst.copy_(src) for i, v in enumerate(values): copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)]) return res input_ids = collate_tokens([torch.tensor(item['input_ids']) for item in samples], pad_idx=tokenizer.pad_token_id) attention_mask = torch.zeros_like(input_ids) for i in range(len(samples)): attention_mask[i][:len(samples[i]['input_ids'])] = 1 words_lengths = collate_tokens([torch.tensor(item['words_lengths']) for item in samples], pad_idx=0) batch_samples = { 'input_ids': input_ids, 'attention_mask': attention_mask, 'words_lengths': words_lengths, } return batch_samples def extract_answer(inputs, outputs, tokenizer): plain_result = [] for sample_input, start_logit, end_logit in zip(inputs, outputs.start_logits, outputs.end_logits): sample_words_length = sample_input['words_lengths'] input_ids = sample_input['input_ids'] answer_start = sum(sample_words_length[:torch.argmax(start_logit)]) answer_end = sum(sample_words_length[:torch.argmax(end_logit) + 1]) if answer_start <= answer_end: answer = tokenizer.convert_tokens_to_string( tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end])) if answer == tokenizer.bos_token: answer = '' else: answer = '' score_start = torch.max(torch.softmax(start_logit, dim=-1)).cpu().detach().numpy().tolist() score_end = torch.max(torch.softmax(end_logit, dim=-1)).cpu().detach().numpy().tolist() plain_result.append({ "answer": answer, "score_start": score_start, "score_end": score_end }) return plain_result #T555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555 DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') INPUT_MAX_LEN = 128 # Adjusted input length OUTPUT_MAX_LEN = 256 # Adjusted output length MODEL_NAME = "VietAI/vit5-base" tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME, model_max_length=INPUT_MAX_LEN) class T5Model(pl.LightningModule): def __init__(self): super().__init__() self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, return_dict=True) def forward(self, input_ids, attention_mask, labels=None): output = self.model( input_ids=input_ids, attention_mask=attention_mask, labels=labels ) return output.loss, output.logits def training_step(self, batch, batch_idx): input_ids = batch["input_ids"].to(DEVICE) attention_mask = batch["attention_mask"].to(DEVICE) labels = batch["target"].to(DEVICE) loss, logits = self(input_ids, attention_mask, labels) self.log("train_loss", loss, prog_bar=True, logger=True) return {'loss': loss} def validation_step(self, batch, batch_idx): input_ids = batch["input_ids"].to(DEVICE) attention_mask = batch["attention_mask"].to(DEVICE) labels = batch["target"].to(DEVICE) loss, logits = self(input_ids, attention_mask, labels) self.log("val_loss", loss, prog_bar=True, logger=True) return {'val_loss': loss} def configure_optimizers(self): return AdamW(self.parameters(), lr=0.0001) train_model = T5Model.load_from_checkpoint('./data-law/law-model-v1.ckpt') train_model.freeze() def generate_question(question): print("tokenizer") inputs_encoding = tokenizer( question, add_special_tokens=True, max_length=INPUT_MAX_LEN, padding='max_length', truncation='only_first', return_attention_mask=True, return_tensors="pt" ).to(DEVICE) print("generate id") generate_ids = train_model.model.generate( input_ids=inputs_encoding["input_ids"], attention_mask=inputs_encoding["attention_mask"], max_length=INPUT_MAX_LEN, num_beams=4, num_return_sequences=1, no_repeat_ngram_size=2, early_stopping=True, ) print("decode") preds = [ tokenizer.decode(gen_id, skip_special_tokens=True, clean_up_tokenization_spaces=True) for gen_id in generate_ids ] response = " ".join(preds[0].split()) print('T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5T5') return response def get_response(text): # Thay thế hàm này bằng model của bạn để lấy câu trả lời từ bot # st.subheader("The Answer is:") # st.write(text) answer, context = chatRoberta(text) result = answer[0]['answer'] if result == "": print("Khởi chạy T5") return generate_question(text) return result # st.title("General Law Chatbot") # # Khởi tạo lịch sử tin nhắn # if "messages" not in st.session_state: # st.session_state.messages = [] # # Hiển thị các tin nhắn từ lịch sử # for message in st.session_state.messages: # with st.chat_message(message["role"]): # st.markdown(message["content"]) # # Nhận input từ người dùng # if prompt := st.chat_input("What is up?"): # # Thêm tin nhắn của người dùng vào lịch sử # st.session_state.messages.append({"role": "user", "content": prompt}) # # Hiển thị tin nhắn của người dùng trong giao diện # with st.chat_message("user"): # st.markdown(prompt) # # Lấy câu trả lời từ bot # response = get_response(prompt) # # Hiển thị câu trả lời của bot trong giao diện # with st.chat_message("assistant"): # st.markdown(response) # # Thêm câu trả lời của bot vào lịch sử # st.session_state.messages.append({"role": "assistant", "content": response}) # Đọc file CSV và tạo dictionary từ file @st.cache_data def qa_dict(): df = pd.read_csv("./data-law/Data_law_2807.csv") # Đường dẫn đến file CSV của bạn qa_dict = dict(zip(df['question'], df['answer'])) return qa_dict qa_dict = qa_dict() st.title("General Law Chatbot") # Khởi tạo lịch sử tin nhắn if "messages" not in st.session_state: st.session_state.messages = [] # Hiển thị các tin nhắn từ lịch sử for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Nhận input từ người dùng if prompt := st.chat_input("What is up?"): # Thêm tin nhắn của người dùng vào lịch sử st.session_state.messages.append({"role": "user", "content": prompt}) # Hiển thị tin nhắn của người dùng trong giao diện with st.chat_message("user"): st.markdown(prompt) # Kiểm tra xem prompt có trong dictionary không if prompt in qa_dict: response = qa_dict[prompt] else: response = get_response(prompt) # Hiển thị câu trả lời của bot trong giao diện with st.chat_message("assistant"): st.markdown(response) # Thêm câu trả lời của bot vào lịch sử st.session_state.messages.append({"role": "assistant", "content": response})