GuakGuak's picture
add
dc07399
raw
history blame
12.2 kB
from doctest import DocFileCase
from tqdm import tqdm
import numpy as np
import torch
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
from sklearn.utils import shuffle
import random
import datetime as dt
import os
from glob import glob
from spacy.lang.en import English
import inspect
def checkpoint_save(model, val_loss, checkpoint_dir=None, wandb_name=None):
if checkpoint_dir is None:
checkpoint_dir = './save_model'
if not os.path.isdir(checkpoint_dir):
os.mkdir(checkpoint_dir)
x = dt.datetime.now()
y = x.year
m = x.month
d = x.day
if wandb_name is None:
wandb_name = "testing"
torch.save(model.state_dict(), "./save_model/{}_{}_{}_{:.4f}_{}.pt".format(y, m, d, val_loss, wandb_name))
#saved_dict_list = glob(os.path.join(checkpoint_dir, '*.pt'))
saved_dict_list = glob(os.path.join(checkpoint_dir, '{}_{}_{}_*_{}.pt'.format(y,m,d,wandb_name)))
val_loss_list = np.array([float(os.path.basename(loss).split("_")[3]) for loss in saved_dict_list])
saved_dict_list.pop(val_loss_list.argmax())
for i in saved_dict_list:
os.remove(i)
def set_seed(seed):
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
def accuracy_per_class(preds, labels):
label_dict = {'Abstract':0, 'Intro':1, 'Main':2, 'Method':3, 'Summary':4, 'Caption':5}
label_dict_inverse = {v: k for k, v in label_dict.items()}
class_list = []
acc_list = []
for label in list(label_dict.values()):
y_preds = preds[labels==label]
y_true = labels[labels==label]
class_list.append(label_dict_inverse[label])
acc_list.append("{0}/{1}".format(len(y_preds[y_preds==label]), len(y_true)))
print("{:10} {:10} {:10} {:10} {:10} {:10}".format(class_list[0], class_list[1], class_list[2], class_list[3], class_list[4], class_list[5]))
print("{:10} {:10} {:10} {:10} {:10} {:10}".format(acc_list[0], acc_list[1], acc_list[2], acc_list[3], acc_list[4], acc_list[5]))
def compute_metrics(output, target, task_type='onehot'):
if task_type=='onehot':
pred=np.argmax(output, axis=1).flatten()
labels=np.argmax(target, axis=1).flatten()
elif task_type=='scalar':
pred=np.argmax(output, axis=1).flatten()
labels=np.array(target).flatten()
accuracy = accuracy_score(y_true=labels, y_pred=pred)
recall = recall_score(y_true=labels, y_pred=pred, average='macro')
precision = precision_score(y_true=labels, y_pred=pred, average='macro', zero_division=0)
f1 = f1_score(y_true=labels, y_pred=pred, average='macro')
accuracy_per_class(pred, labels)
return [accuracy, precision, recall, f1]
def input_check(input_dict, model):
model_inputs = inspect.signature(model.forward).parameters.keys()
inputs = {}
for key, val in input_dict.items():
if key in model_inputs:
inputs[key] = val
return inputs
def model_eval(model, device, loader, task_type='onehot', return_values=False, sentence_piece=False):
model.eval()
error = 0
accuracy = 0
precision = 0
recall = 0
f1 = 0
eval_targets=[]
eval_outputs=[]
eval_texts=[]
with torch.no_grad():
for data in tqdm(loader):
eval_texts.extend(data['text'])
input_ids=data['input_ids'].to(device, dtype=torch.long)
mask = data['attention_mask'].to(device, dtype=torch.long)
token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
if task_type=='onehot':
targets=data['label_onehot'].to(device, dtype=torch.float)
elif task_type=='scalar':
targets=data['label'].to(device, dtype=torch.long)
position = data['position']
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids,
'labels': targets, 'position': position}
if sentence_piece:
sentence_batch = data['sentence_batch'].to(device, dtype=torch.long)
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids,
'labels': targets, 'sentence_batch': sentence_batch, 'position': position}
outputs = model(inputs)
output = outputs[1]
loss = outputs[0]
#loss=loss_fn(output, targets)
error+=loss
#output = torch.sigmoid(output)
eval_targets.extend(targets.detach().cpu().numpy())
eval_outputs.extend(output.detach().cpu().numpy())
error = error / len(loader)
accuracy, precision, recall, f1 = compute_metrics(eval_outputs, eval_targets, task_type=task_type)
if return_values:
return [error, accuracy, precision, recall, f1, eval_targets, eval_outputs, eval_texts]
else:
return [error, accuracy, precision, recall, f1]
def get_hidden(model, device, loader, task_type='onehot', sentence_piece=False):
model.eval()
total_hidden_state = []
total_targets=[]
with torch.no_grad():
for data in tqdm(loader):
input_ids=data['input_ids'].to(device, dtype=torch.long)
mask = data['attention_mask'].to(device, dtype=torch.long)
token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
if task_type=='onehot':
targets=data['label_onehot'].to(device, dtype=torch.float)
elif task_type=='scalar':
targets=data['label'].to(device, dtype=torch.long)
position = data['position']
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids,
'labels': targets, 'position': position}
if sentence_piece:
sentence_batch = data['sentence_batch'].to(device, dtype=torch.long)
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids,
'labels': targets, 'sentence_batch': sentence_batch, 'position': position}
outputs = model(inputs)
hidden_state = outputs[2]
total_hidden_state.extend(hidden_state.detach().cpu().numpy())
total_targets.extend(targets.detach().cpu().numpy())
return total_hidden_state, total_targets
def sentencepiece(paragraph_list, spacy_nlp, tokenizer, max_length=512):
# ํ˜„์žฌ token type ids๊ฐ€ tokenizer์—์„œ ์ƒ์„ฑํ•˜๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์•„๋‹Œ ๋‚ด๊ฐ€ ์ž„์˜์ ์œผ๋กœ 0, 1๋กœ๋งŒ ๋„ฃ๋„๋ก ํ•ด๋†“์•˜์Œ, XLNET ๊ฐ™์€๊ฑด CLS๊ฐ€ 2๋กœ ๋˜๋Š” ๊ฒฝ์šฐ ๊ฐ™์ด ์ด ๊ทœ์น™์„ ๋ฒ—์–ด๋‚˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ์–ด์„œ ๋‚˜์ค‘์— ๋ฌธ์ œ๋˜๋ฉด ์ˆ˜์ • ํ•„์š”
encode_datas = {'input_ids': [], 'token_type_ids': [], 'attention_mask': [], 'sentence_batch': []}
for paragraph in paragraph_list:
doc = spacy_nlp(paragraph)
sentence_encode = [sent.text for sent in doc.sents]
sentence_encode = tokenizer.batch_encode_plus(sentence_encode, max_length=max_length, padding='max_length', return_attention_mask=True, return_token_type_ids=True)
sentence_list = sentence_encode['input_ids']
mask_list = sentence_encode['attention_mask']
pad_token = None
pad_position = None
total_sentence = torch.tensor([], dtype=torch.int)
token_type_ids = []
s_batch = []
for n, s in enumerate(sentence_list):
if pad_token is None:
pad_token = s[mask_list[n].index(0)]
if pad_position is None:
if s[0] == pad_token:
pad_position = 'start'
else:
pad_position = 'end'
s=torch.tensor(s, dtype=torch.int)
s = s[s!=pad_token]
total_length = len(total_sentence) + len(s)
if total_length > max_length:
break
total_sentence = torch.concat([total_sentence, s])
token_type_ids = token_type_ids + [n%2]*len(s)
s_batch = s_batch + [n]*len(s)
total_sentence = total_sentence.tolist()
pad_length = max_length - len(total_sentence)
attention_mask = [1]*len(total_sentence)
if pad_position == 'end':
total_sentence = total_sentence + [pad_token]*pad_length
attention_mask = attention_mask + [0]*pad_length
s_batch = s_batch + [max(s_batch)+1]*pad_length
if n%2 == 0:
token_type_ids = token_type_ids + [1]*pad_length
else:
token_type_ids = token_type_ids + [0]*pad_length
elif pad_position == 'start':
total_sentence = [pad_token]*pad_length + total_sentence
attention_mask = [0]*pad_length + attention_mask
s_batch = [max(s_batch)+1]*pad_length + s_batch
if n%2 == 0:
token_type_ids = [0]*pad_length + token_type_ids
else:
token_type_ids = [1]*pad_length + token_type_ids
encode_datas['input_ids'].append(total_sentence)
encode_datas['token_type_ids'].append(token_type_ids)
encode_datas['attention_mask'].append(attention_mask)
encode_datas['sentence_batch'].append(s_batch)
return encode_datas
class EarlyStopping:
"""์ฃผ์–ด์ง„ patience ์ดํ›„๋กœ validation loss๊ฐ€ ๊ฐœ์„ ๋˜์ง€ ์•Š์œผ๋ฉด ํ•™์Šต์„ ์กฐ๊ธฐ ์ค‘์ง€"""
def __init__(self, patience=7, verbose=False, delta=0):
"""
Args:
patience (int): validation loss๊ฐ€ ๊ฐœ์„ ๋œ ํ›„ ๊ธฐ๋‹ค๋ฆฌ๋Š” ๊ธฐ๊ฐ„
Default: 7
verbose (bool): True์ผ ๊ฒฝ์šฐ ๊ฐ validation loss์˜ ๊ฐœ์„  ์‚ฌํ•ญ ๋ฉ”์„ธ์ง€ ์ถœ๋ ฅ
Default: False
delta (float): ๊ฐœ์„ ๋˜์—ˆ๋‹ค๊ณ  ์ธ์ •๋˜๋Š” monitered quantity์˜ ์ตœ์†Œ ๋ณ€ํ™”
Default: 0
"""
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.f1_score_max = 0.
self.delta = delta
def __call__(self, f1_score):
score = -f1_score
if self.best_score is None:
self.best_score = score
self.save_checkpoint(f1_score)
elif score > self.best_score + self.delta:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(f1_score)
self.counter = 0
def save_checkpoint(self, f1_score):
'''validation loss๊ฐ€ ๊ฐ์†Œํ•˜๋ฉด ๊ฐ์†Œ๋ฅผ ์ถœ๋ ฅํ•œ๋‹ค.'''
if self.verbose:
print(f'F1 score increase ({self.f1_score_max:.6f} --> {f1_score:.6f}). ')
self.f1_score_max = f1_score
def model_freeze(model, freeze_layers=None):
if freeze_layers == 0:
return model
if freeze_layers is not None:
for param in model.pretrained_model.base_model.word_embedding.parameters():
param.requires_grad = False
if freeze_layers != -1:
# if freeze_layer_count == -1, we only freeze the embedding layer
# otherwise we freeze the first `freeze_layer_count` encoder layers
for layer in model.pretrained_model.base_model.layer[:freeze_layers]:
for param in layer.parameters():
param.requires_grad = False
return model
def pos_encoding(pos, d, n=10000):
encoding_list = []
for p in pos:
P = np.zeros(d)
for i in np.arange(int(d/2)):
denominator = np.power(n, 2*i/d)
P[2*i] = np.sin(p/denominator)
P[2*i+1] = np.cos(p/denominator)
encoding_list.append(P)
return torch.tensor(np.array(encoding_list))