Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
from transformers import BartForConditionalGeneration, TapexTokenizer, T5ForConditionalGeneration, T5Tokenizer | |
import datetime | |
import sentencepiece as spm | |
# Load CSV file | |
df = pd.read_csv("anomalies_with_explanations_pt.csv", quotechar='"', encoding='utf-8') | |
df.rename(columns={"ds": "Ano e mês", "real": "Valor Monetário", "Group": "Grupo"}, inplace=True) | |
df.sort_values(by=['Ano e mês', 'Valor Monetário'], ascending=False, inplace=True) | |
df = df[df['Valor Monetário'] >= 10000000.] | |
df['Valor Monetário'] = df['Valor Monetário'].apply(lambda x: f"{x:.2f}") | |
df = df.fillna('').astype(str) | |
table_data = df | |
# Load translation models | |
pt_en_translator = T5ForConditionalGeneration.from_pretrained("unicamp-dl/translation-pt-en-t5") | |
en_pt_translator = T5ForConditionalGeneration.from_pretrained("unicamp-dl/translation-en-pt-t5") | |
tokenizer = T5Tokenizer.from_pretrained("unicamp-dl/translation-pt-en-t5") | |
# Load TAPEX model | |
tapex_model = BartForConditionalGeneration.from_pretrained("microsoft/tapex-large-finetuned-wtq") | |
tapex_tokenizer = TapexTokenizer.from_pretrained("microsoft/tapex-large-finetuned-wtq") | |
def translate(text, model, tokenizer, source_lang="pt", target_lang="en"): | |
input_ids = tokenizer.encode(text, return_tensors="pt", add_special_tokens=True) | |
outputs = model.generate(input_ids) | |
translated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return translated_text | |
def find_previous_explanation(row, table_data): | |
current_date = pd.to_datetime(row['Ano e mês']) | |
current_group = row['Grupo'] | |
while True: | |
# Subtract one year to look for the previous year's same month | |
current_date = current_date - pd.DateOffset(years=1) | |
previous_row = table_data[ | |
(table_data['Ano e mês'] == current_date.strftime('%Y-%m-01')) & | |
(table_data['Grupo'] == current_group) | |
] | |
if not previous_row.empty and previous_row.iloc[0]['Explicação']: | |
return f"Em {current_date.strftime('%B de %Y')}, a explicação foi: {previous_row.iloc[0]['Explicação']}" | |
# Stop if we've searched 10 years back without finding anything | |
if current_date.year < pd.to_datetime(row['Ano e mês']).year - 10: | |
break | |
return "Não foi encontrada nenhuma explicação em anos anteriores." | |
def response(user_question, table_data): | |
# Translate question to English | |
question_en = translate(user_question, pt_en_translator, tokenizer, source_lang="pt", target_lang="en") | |
# Generate response in English | |
encoding = tapex_tokenizer(table=table_data, query=[question_en], padding=True, return_tensors="pt", truncation=True) | |
outputs = tapex_model.generate(**encoding) | |
response_en = tapex_tokenizer.batch_decode(outputs, skip_special_tokens=True)[0] | |
# Translate response to Portuguese | |
response_pt = translate(response_en, en_pt_translator, tokenizer, source_lang="en", target_lang="pt") | |
# Check if the response contains a request for an explanation | |
if "Explicação" in user_question: | |
row = table_data[table_data['Explicação'] == response_pt].iloc[0] | |
if not row['Explicação']: | |
response_pt = find_previous_explanation(row, table_data) | |
return response_pt | |
# Streamlit interface | |
st.dataframe(table_data.head()) | |
st.markdown(""" | |
<div style='display: flex; align-items: center;'> | |
<div style='width: 40px; height: 40px; background-color: green; border-radius: 50%; margin-right: 5px;'></div> | |
<div style='width: 40px; height: 40px; background-color: red; border-radius: 50%; margin-right: 5px;'></div> | |
<div style='width: 40px; height: 40px; background-color: yellow; border-radius: 50%; margin-right: 5px;'></div> | |
<span style='font-size: 40px; font-weight: bold;'>Chatbot do Tesouro RS</span> | |
</div> | |
""", unsafe_allow_html=True) | |
# Chat history | |
if 'history' not in st.session_state: | |
st.session_state['history'] = [] | |
# Input box for user question | |
user_question = st.text_input("Escreva sua questão aqui:", "") | |
if user_question: | |
# Add human emoji when user asks a question | |
st.session_state['history'].append(('👤', user_question)) | |
st.markdown(f"**👤 {user_question}**") | |
# Generate the response | |
bot_response = response(user_question, table_data) | |
# Add robot emoji when generating response and align to the right | |
st.session_state['history'].append(('🤖', bot_response)) | |
st.markdown(f"<div style='text-align: right'>**🤖 {bot_response}**</div>", unsafe_allow_html=True) | |
# Clear history button | |
if st.button("Limpar"): | |
st.session_state['history'] = [] | |
# Display chat history | |
for sender, message in st.session_state['history']: | |
if sender == '👤': | |
st.markdown(f"**👤 {message}**") | |
elif sender == '🤖': | |
st.markdown(f"<div style='text-align: right'>**🤖 {message}**</div>", unsafe_allow_html=True) |