import logging
import re
import pandas as pd
import numpy as np
import tensorflow as tf
import nltk
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
from transformers import AutoTokenizer, TFBertModel
from tensorflow.keras import backend as K
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Layer
from tensorflow_addons.optimizers import AdamW
import streamlit as st
from nltk.corpus import stopwords
from concurrent.futures import ThreadPoolExecutor
import kagglehub
import os
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def load_dataset():
path = kagglehub.dataset_download("dannytheodore/brimo-app-review")
dataset_path = f"{path}/brimo_googleplaystore_review.csv"
return pd.read_csv(dataset_path, index_col=0)
except Exception as e:
logging.error(f"Error loading dataset: {e}")
st.error("Failed to load the dataset.")
return None
def map_labels(score):
if score >= 4:
return 2
elif score == 3:
return 1
return 0
def preprocess_text(text, stop_words, stemmer):
text = text.lower()
text = re.sub(r"@[A-Za-z0-9_]+", " ", text)
text = re.sub(r"#[A-Za-z0-9_]+", " ", text)
text = re.sub(r"http\S+", " ", text)
text = re.sub(r"www.\S+", " ", text)
text = re.sub(r"[^A-Za-z\s']", " ", text)
tokens = text.split()
tokens = [word for word in tokens if word not in stop_words]
tokens = [stemmer.stem(word) for word in tokens]
return ' '.join(tokens)
except Exception as e:
logging.error(f"Error processing text: {text}\n{e}")
return text
def preprocess_and_tokenize_reviews(reviews, tokenizer, stop_words, stemmer, max_length=128):
with ThreadPoolExecutor() as executor:
cleaned_reviews = list(executor.map(lambda x: preprocess_text(x, stop_words, stemmer), reviews))
return tokenizer(cleaned_reviews, padding='max_length', truncation=True, max_length=max_length, return_tensors='tf')
class BertLayer(Layer):
def __init__(self, base_model, **kwargs):
super(BertLayer, self).__init__(**kwargs)
self.base_model = base_model
def call(self, inputs):
input_ids, attention_mask = inputs
outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
return outputs.last_hidden_state
def get_config(self):
config = super(BertLayer, self).get_config()
config.update({"base_model": self.base_model})
return config
class PoolerLayer(Layer):
def __init__(self, **kwargs):
super(PoolerLayer, self).__init__(**kwargs)
def call(self, inputs):
cls_token = inputs[:, 0, :]
pooled_output = tf.keras.activations.tanh(cls_token)
return pooled_output
class F1Score(tf.keras.metrics.Metric):
def __init__(self, name="f1_score", **kwargs):
super(F1Score, self).__init__(name=name, **kwargs)
self.true_positives = self.add_weight(name="tp", initializer="zeros")
self.false_positives = self.add_weight(name="fp", initializer="zeros")
self.false_negatives = self.add_weight(name="fn", initializer="zeros")
def update_state(self, y_true, y_pred, sample_weight=None):
y_pred = tf.argmax(y_pred, axis=-1)
y_true = tf.argmax(y_true, axis=-1)
tp = tf.reduce_sum(tf.cast((y_true == y_pred) & (y_true != 0), tf.float32))
fp = tf.reduce_sum(tf.cast((y_true != y_pred) & (y_pred != 0), tf.float32))
fn = tf.reduce_sum(tf.cast((y_true != y_pred) & (y_true != 0), tf.float32))
def result(self):
precision = self.true_positives / (self.true_positives + self.false_positives + K.epsilon())
recall = self.true_positives / (self.true_positives + self.false_negatives + K.epsilon())
f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
return f1
def reset_state(self):
def load_model_and_tokenizer():
model_path = 'best_model.h5'
if os.path.exists(model_path):
model = load_model(model_path, custom_objects={'TFBertModel': TFBertModel, 'BertLayer': BertLayer, 'PoolerLayer': PoolerLayer, 'F1Score': F1Score})
st.error("Model file not found. Please check the file path.")
return None, None
except Exception as e:
logging.error(f"Error loading model: {e}")
st.error("Failed to load the model. Please check the model file and try again.")
return None, None
optimizer = AdamW(learning_rate=2e-5, weight_decay=1e-5)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=[F1Score()])
tokenizer = AutoTokenizer.from_pretrained('indobenchmark/indobert-base-p1')
except Exception as e:
logging.error(f"Error loading tokenizer: {e}")
st.error("Failed to load the tokenizer. Please check the tokenizer files.")
return None, None
return model, tokenizer
sentiment_map = {0: 'Negative', 1: 'Neutral', 2: 'Positive'}
def run(model, tokenizer, stop_words, stemmer):
st.title('Sentiment Analysis using IndoBERT')
st.subheader('This application analyzes the sentiment of user-provided reviews.')
with st.form(key='review_form'):
review_input = st.text_area("Enter Review:", height=150)
submit_button = st.form_submit_button("Analyze Sentiment")
if submit_button:
if review_input:
tokenized_review = preprocess_and_tokenize_reviews([review_input], tokenizer, stop_words, stemmer)
if model:
predictions = model.predict({'input_ids': tokenized_review['input_ids'], 'attention_mask': tokenized_review['attention_mask']})
predicted_label = np.argmax(predictions, axis=-1)
sentiment = sentiment_map[predicted_label[0]]
st.write(f"### Predicted Sentiment: {sentiment}")
st.error("Model is not loaded. Please check the model file and try again.")
st.error("Please enter a review to analyze.")
if __name__ == "__main__":
df = load_dataset()
model, tokenizer = load_model_and_tokenizer()
if df is not None and model is not None and tokenizer is not None:
manual_stopwords = ["di", "ke", "dari", "yang", "dan", "atau", "dengan", "untuk", "ini", "itu", "aja", "saja", "lah", "bri", "brimo", "aplikasi", "rekening", "coba", "yg", "ke", "untuk", "nya", "saya", "dia", "dan", "sangat", "video", "login", "apk", "jadi", "akun", "malah", "uang", "banget", "dalam", "atm", "padahal"]
stop_words = set(stopwords.words('indonesian'))
factory = StemmerFactory()
stemmer = factory.create_stemmer()
df['label'] = df['score'].apply(map_labels)
run(model, tokenizer, stop_words, stemmer)
if df is None:
logging.error("Failed to load dataset.")
st.error("Failed to load the dataset. Please check the dataset file.")
if model is None or tokenizer is None:
logging.error("Failed to load model or tokenizer.")
st.error("Failed to load the model or tokenizer. Please check the model file.") |