Spaces:
Runtime error
Runtime error
import pandas as pd | |
import numpy as np | |
# from konlpy.tag import Okt | |
from string import whitespace, punctuation | |
import re | |
import unicodedata | |
from sentence_transformers import SentenceTransformer, util | |
import gradio as gr | |
import pytorch_lightning as pl | |
import torch | |
from transformers import PreTrainedTokenizerFast, BartForConditionalGeneration | |
from transformers import BartForConditionalGeneration, PreTrainedTokenizerFast | |
from transformers.optimization import get_cosine_schedule_with_warmup | |
from torch.utils.data import DataLoader, Dataset | |
# classification | |
def CleanEnd(text): | |
email = re.compile( | |
r'[-_0-9a-z]+@[-_0-9a-z]+(?:\.[0-9a-z]+)+', flags=re.IGNORECASE) | |
url = re.compile( | |
r'(?:https?:\/\/)?[-_0-9a-z]+(?:\.[-_0-9a-z]+)+', flags=re.IGNORECASE) | |
etc = re.compile( | |
r'\.([^\.]*(?:๊ธฐ์|ํนํ์|๊ต์|์๊ฐ|๋ํ|๋ ผ์ค|๊ณ ๋ฌธ|์ฃผํ|๋ถ๋ฌธ์ฅ|ํ์ฅ|์ฅ๊ด|์์ฅ|์ฐ๊ตฌ์|์ด์ฌ์ฅ|์์|์ค์ฅ|์ฐจ์ฅ|๋ถ์ฅ|์์ธ์ด|ํ๋ฐฑ|์ฌ์ค|์์ฅ|๋จ์ฅ|๊ณผ์ฅ|๊ธฐํ์|ํ๋ ์ดํฐ|์ ์๊ถ|ํ๋ก ๊ฐ|ยฉ|ยฉ|โ|\@|\/|=|โถ|๋ฌด๋จ|์ ์ฌ|์ฌ๋ฐฐํฌ|๊ธ์ง|\[|\]|\(\))[^\.]*)$') | |
bracket = re.compile(r'^((?:\[.+\])|(?:ใ.+ใ)|(?:<.+>)|(?:โ.+โ)\s)') | |
result = email.sub('', text) | |
result = url.sub('', result) | |
result = etc.sub('.', result) | |
result = bracket.sub('', result).strip() | |
return result | |
def TextFilter(text): | |
punct = ''.join([chr for chr in punctuation if chr != '%']) | |
filtering = re.compile(f'[{whitespace}{punct}]+') | |
onlyText = re.compile(r'[^\% ใฑ-ใ ฃ๊ฐ-ํฃ]+') | |
result = filtering.sub(' ', text) | |
result = onlyText.sub(' ', result).strip() | |
result = filtering.sub(' ', result) | |
return result | |
def is_clickbait(title, content, threshold=0.815): | |
model = SentenceTransformer( | |
'./model/onlineContrastive') | |
pattern_whitespace = re.compile(f'[{whitespace}]+') | |
title = unicodedata.normalize('NFC', re.sub( | |
pattern_whitespace, ' ', title)).strip() | |
title = CleanEnd(title) | |
title = TextFilter(title) | |
content = unicodedata.normalize('NFC', re.sub( | |
pattern_whitespace, ' ', content)).strip() | |
content = CleanEnd(content) | |
content = TextFilter(content) | |
# Noun Extraction | |
# okt = Okt() | |
# title = ' '.join(okt.nouns(title)) | |
# content = ' '.join(okt.nouns(content)) | |
# Compute embedding | |
embeddings1 = model.encode(title, convert_to_tensor=True) | |
embeddings2 = model.encode(content, convert_to_tensor=True) | |
# Compute cosine-similarities | |
cosine_score = util.cos_sim(embeddings1, embeddings2) | |
similarity = cosine_score.numpy()[0][0] | |
if similarity < threshold: | |
return 0, similarity # clickbait | |
else: | |
return 1, similarity # non-clickbait | |
# Generation | |
df_train = pd.DataFrame() | |
df_train['input_text'] = ['1', '2'] | |
df_train['target_text'] = ['1', '2'] | |
def CleanEnd_g(text): | |
email = re.compile( | |
r'[-_0-9a-z]+@[-_0-9a-z]+(?:\.[0-9a-z]+)+', flags=re.IGNORECASE) | |
# url = re.compile(r'(?:https?:\/\/)?[-_0-9a-z]+(?:\.[-_0-9a-z]+)+', flags=re.IGNORECASE) | |
# etc = re.compile(r'\.([^\.]*(?:๊ธฐ์|ํนํ์|๊ต์|์๊ฐ|๋ํ|๋ ผ์ค|๊ณ ๋ฌธ|์ฃผํ|๋ถ๋ฌธ์ฅ|ํ์ฅ|์ฅ๊ด|์์ฅ|์ฐ๊ตฌ์|์ด์ฌ์ฅ|์์|์ค์ฅ|์ฐจ์ฅ|๋ถ์ฅ|์์ธ์ด|ํ๋ฐฑ|์ฌ์ค|์์ฅ|๋จ์ฅ|๊ณผ์ฅ|๊ธฐํ์|ํ๋ ์ดํฐ|์ ์๊ถ|ํ๋ก ๊ฐ|ยฉ|ยฉ|โ|\@|\/|=|โถ|๋ฌด๋จ|์ ์ฌ|์ฌ๋ฐฐํฌ|๊ธ์ง|\[|\]|\(\))[^\.]*)$') | |
# bracket = re.compile(r'^((?:\[.+\])|(?:ใ.+ใ)|(?:<.+>)|(?:โ.+โ)\s)') | |
result = email.sub('', text) | |
# result = url.sub('', result) | |
# result = etc.sub('.', result) | |
# result = bracket.sub('', result).strip() | |
return result | |
class DatasetFromDataframe(Dataset): | |
def __init__(self, df, dataset_args): | |
self.data = df | |
self.max_length = dataset_args['max_length'] | |
self.tokenizer = dataset_args['tokenizer'] | |
self.start_token = '<s>' | |
self.end_token = '</s>' | |
def __len__(self): | |
return len(self.data) | |
def create_tokens(self, text): | |
tokens = self.tokenizer.encode( | |
self.start_token + text + self.end_token) | |
tokenLength = len(tokens) | |
remain = self.max_length - tokenLength | |
if remain >= 0: | |
tokens = tokens + [self.tokenizer.pad_token_id] * remain | |
attention_mask = [1] * tokenLength + [0] * remain | |
else: | |
tokens = tokens[: self.max_length - 1] + \ | |
self.tokenizer.encode(self.end_token) | |
attention_mask = [1] * self.max_length | |
return tokens, attention_mask | |
def __getitem__(self, index): | |
record = self.data.iloc[index] | |
question, answer = record['input_text'], record['target_text'] | |
input_id, input_mask = self.create_tokens(question) | |
output_id, output_mask = self.create_tokens(answer) | |
label = output_id[1:(self.max_length + 1)] | |
label = label + (self.max_length - len(label)) * [-100] | |
return { | |
'input_ids': torch.LongTensor(input_id), | |
'attention_mask': torch.LongTensor(input_mask), | |
'decoder_input_ids': torch.LongTensor(output_id), | |
'decoder_attention_mask': torch.LongTensor(output_mask), | |
"labels": torch.LongTensor(label) | |
} | |
class OneSourceDataModule(pl.LightningDataModule): | |
def __init__( | |
self, | |
**kwargs | |
): | |
super().__init__() | |
self.data = kwargs.get('data') | |
self.dataset_args = kwargs.get("dataset_args") | |
self.batch_size = kwargs.get("batch_size") or 32 | |
self.train_size = kwargs.get("train_size") or 0.9 | |
def setup(self, stage=""): | |
# trainset, testset = train_test_split(df_train, train_size=self.train_size, shuffle=True) | |
self.trainset = DatasetFromDataframe(df_train, self.dataset_args) | |
self.testset = DatasetFromDataframe(df_train, self.dataset_args) | |
def train_dataloader(self): | |
train = DataLoader( | |
self.trainset, | |
batch_size=self.batch_size | |
) | |
return train | |
def val_dataloader(self): | |
val = DataLoader( | |
self.testset, | |
batch_size=self.batch_size | |
) | |
return val | |
def test_dataloader(self): | |
test = DataLoader( | |
self.testset, | |
batch_size=self.batch_size | |
) | |
return test | |
class KoBARTConditionalGeneration(pl.LightningModule): | |
def __init__(self, hparams, **kwargs): | |
super(KoBARTConditionalGeneration, self).__init__() | |
self.hparams.update(hparams) | |
self.model = kwargs['model'] | |
self.tokenizer = kwargs['tokenizer'] | |
self.model.train() | |
def configure_optimizers(self): | |
param_optimizer = list(self.model.named_parameters()) | |
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] | |
optimizer_grouped_parameters = [{ | |
'params': [ | |
p for n, p in param_optimizer if not any(nd in n for nd in no_decay) | |
], | |
'weight_decay': 0.01 | |
}, { | |
'params': [ | |
p for n, p in param_optimizer if any(nd in n for nd in no_decay) | |
], | |
'weight_decay': 0.0 | |
}] | |
optimizer = torch.optim.AdamW( | |
optimizer_grouped_parameters, | |
lr=self.hparams.lr | |
) | |
# num_workers = gpus * num_nodes | |
data_len = len(self.train_dataloader().dataset) | |
print(f'ํ์ต ๋ฐ์ดํฐ ์: {data_len}') | |
num_train_steps = int( | |
data_len / self.hparams.batch_size * self.hparams.max_epochs) | |
print(f'Step ์: {num_train_steps}') | |
num_warmup_steps = int(num_train_steps * self.hparams.warmup_ratio) | |
print(f'Warmup Step ์: {num_warmup_steps}') | |
scheduler = get_cosine_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=num_warmup_steps, | |
num_training_steps=num_train_steps | |
) | |
lr_scheduler = { | |
'scheduler': scheduler, | |
'monitor': 'loss', | |
'interval': 'step', | |
'frequency': 1 | |
} | |
return [optimizer], [lr_scheduler] | |
def forward(self, inputs): | |
return self.model( | |
input_ids=inputs['input_ids'], | |
attention_mask=inputs['attention_mask'], | |
decoder_input_ids=inputs['decoder_input_ids'], | |
decoder_attention_mask=inputs['decoder_attention_mask'], | |
labels=inputs['labels'], | |
return_dict=True | |
) | |
def training_step(self, batch, batch_idx): | |
loss = self(batch).loss | |
return loss | |
def validation_step(self, batch, batch_idx): | |
loss = self(batch).loss | |
def test(self, text): | |
tokens = self.tokenizer.encode("<s>" + text + "</s>") | |
tokenLength = len(tokens) | |
remain = self.hparams.max_length - tokenLength | |
if remain >= 0: | |
tokens = tokens + [self.tokenizer.pad_token_id] * remain | |
attention_mask = [1] * tokenLength + [0] * remain | |
else: | |
tokens = tokens[: self.hparams.max_length - 1] + \ | |
self.tokenizer.encode("</s>") | |
attention_mask = [1] * self.hparams.max_length | |
tokens = torch.LongTensor([tokens]) | |
attention_mask = torch.LongTensor([attention_mask]) | |
self.model = self.model | |
result = self.model.generate( | |
tokens, | |
max_length=self.hparams.max_length, | |
attention_mask=attention_mask, | |
num_beams=10 | |
)[0] | |
a = self.tokenizer.decode(result) | |
return a | |
def generation(szContent): | |
tokenizer = PreTrainedTokenizerFast.from_pretrained( | |
"gogamza/kobart-summarization") | |
model1 = BartForConditionalGeneration.from_pretrained( | |
"gogamza/kobart-summarization") | |
if len(szContent) > 500: | |
input_ids = tokenizer.encode(szContent[:500], return_tensors="pt") | |
else: | |
input_ids = tokenizer.encode(szContent, return_tensors="pt") | |
summary = model1.generate( | |
input_ids=input_ids, | |
bos_token_id=model1.config.bos_token_id, | |
eos_token_id=model1.config.eos_token_id, | |
length_penalty=.3, # bigger than 1= longer, smaller than 1=shorter summary | |
max_length=35, | |
min_length=25, | |
num_beams=5) | |
szSummary = tokenizer.decode(summary[0], skip_special_tokens=True) | |
print(szSummary) | |
KoBARTModel = BartForConditionalGeneration.from_pretrained( | |
'./model/final2.h5') | |
BATCH_SIZE = 32 | |
MAX_LENGTH = 128 | |
EPOCHS = 0 | |
model2 = KoBARTConditionalGeneration({ | |
"lr": 5e-6, | |
"warmup_ratio": 0.1, | |
"batch_size": BATCH_SIZE, | |
"max_length": MAX_LENGTH, | |
"max_epochs": EPOCHS | |
}, | |
tokenizer=tokenizer, | |
model=KoBARTModel | |
) | |
dm = OneSourceDataModule( | |
data=df_train, | |
batch_size=BATCH_SIZE, | |
train_size=0.9, | |
dataset_args={ | |
"tokenizer": tokenizer, | |
"max_length": MAX_LENGTH, | |
} | |
) | |
trainer = pl.Trainer( | |
max_epochs=EPOCHS, | |
gpus=0 | |
) | |
trainer.fit(model2, dm) | |
szTitle = model2.test(szSummary) | |
df = pd.DataFrame() | |
df['newTitle'] = [szTitle] | |
df['content'] = [szContent] | |
# White space, punctuation removal | |
pattern_whitespace = re.compile(f'[{whitespace}]+') | |
df['newTitle'] = df.newTitle.fillna('').replace(pattern_whitespace, ' ').map( | |
lambda x: unicodedata.normalize('NFC', x)).str.strip() | |
df['newTitle'] = df.newTitle.map(CleanEnd_g) | |
df['newTitle'] = df.newTitle.map(TextFilter) | |
return df.newTitle[0] | |
def new_headline(title, content): | |
label = is_clickbait(title, content) | |
if label[0] == 0: | |
return generation(content) | |
elif label[0] == 1: | |
return '๋์์ฑ ๊ธฐ์ฌ๊ฐ ์๋๋๋ค.' | |
# gradio | |
with gr.Blocks() as demo1: | |
gr.Markdown( | |
""" | |
<h1 align="center"> | |
clickbait news classifier and new headline generator | |
</h1> | |
""") | |
gr.Markdown( | |
""" | |
๋ด์ค ๊ธฐ์ฌ ์ ๋ชฉ๊ณผ ๋ณธ๋ฌธ์ ์ ๋ ฅํ๋ฉด ๋์์ฑ ๊ธฐ์ฌ์ธ์ง ๋ถ๋ฅํ๊ณ , | |
๋์์ฑ ๊ธฐ์ฌ์ด๋ฉด ์๋ก์ด ์ ๋ชฉ์ ์์ฑํด์ฃผ๋ ํ๋ก๊ทธ๋จ์ ๋๋ค. | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
inputs = [gr.Textbox(placeholder="๋ด์ค๊ธฐ์ฌ ์ ๋ชฉ์ ์ ๋ ฅํด์ฃผ์ธ์", label='headline'), | |
gr.Textbox( | |
lines=10, placeholder="๋ด์ค๊ธฐ์ฌ ๋ณธ๋ฌธ์ ์ ๋ ฅํด์ฃผ์ธ์", label='content')] | |
with gr.Row(): | |
btn = gr.Button("๊ฒฐ๊ณผ ์ถ๋ ฅ") | |
with gr.Column(): | |
output = gr.Text(label='Result') | |
btn.click(fn=new_headline, inputs=inputs, outputs=output) | |
if __name__ == "__main__": | |
demo1.launch() | |