Spaces:
Runtime error
Runtime error
from doctest import DocFileCase | |
from tqdm import tqdm | |
import numpy as np | |
import torch | |
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score | |
from sklearn.utils import shuffle | |
import random | |
import datetime as dt | |
import os | |
from glob import glob | |
from spacy.lang.en import English | |
import inspect | |
def checkpoint_save(model, val_loss, checkpoint_dir=None, wandb_name=None): | |
if checkpoint_dir is None: | |
checkpoint_dir = './save_model' | |
if not os.path.isdir(checkpoint_dir): | |
os.mkdir(checkpoint_dir) | |
x = dt.datetime.now() | |
y = x.year | |
m = x.month | |
d = x.day | |
if wandb_name is None: | |
wandb_name = "testing" | |
torch.save(model.state_dict(), "./save_model/{}_{}_{}_{:.4f}_{}.pt".format(y, m, d, val_loss, wandb_name)) | |
#saved_dict_list = glob(os.path.join(checkpoint_dir, '*.pt')) | |
saved_dict_list = glob(os.path.join(checkpoint_dir, '{}_{}_{}_*_{}.pt'.format(y,m,d,wandb_name))) | |
val_loss_list = np.array([float(os.path.basename(loss).split("_")[3]) for loss in saved_dict_list]) | |
saved_dict_list.pop(val_loss_list.argmax()) | |
for i in saved_dict_list: | |
os.remove(i) | |
def set_seed(seed): | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
np.random.seed(seed) | |
random.seed(seed) | |
def accuracy_per_class(preds, labels): | |
label_dict = {'Abstract':0, 'Intro':1, 'Main':2, 'Method':3, 'Summary':4, 'Caption':5} | |
label_dict_inverse = {v: k for k, v in label_dict.items()} | |
class_list = [] | |
acc_list = [] | |
for label in list(label_dict.values()): | |
y_preds = preds[labels==label] | |
y_true = labels[labels==label] | |
class_list.append(label_dict_inverse[label]) | |
acc_list.append("{0}/{1}".format(len(y_preds[y_preds==label]), len(y_true))) | |
print("{:10} {:10} {:10} {:10} {:10} {:10}".format(class_list[0], class_list[1], class_list[2], class_list[3], class_list[4], class_list[5])) | |
print("{:10} {:10} {:10} {:10} {:10} {:10}".format(acc_list[0], acc_list[1], acc_list[2], acc_list[3], acc_list[4], acc_list[5])) | |
def compute_metrics(output, target, task_type='onehot'): | |
if task_type=='onehot': | |
pred=np.argmax(output, axis=1).flatten() | |
labels=np.argmax(target, axis=1).flatten() | |
elif task_type=='scalar': | |
pred=np.argmax(output, axis=1).flatten() | |
labels=np.array(target).flatten() | |
accuracy = accuracy_score(y_true=labels, y_pred=pred) | |
recall = recall_score(y_true=labels, y_pred=pred, average='macro') | |
precision = precision_score(y_true=labels, y_pred=pred, average='macro', zero_division=0) | |
f1 = f1_score(y_true=labels, y_pred=pred, average='macro') | |
accuracy_per_class(pred, labels) | |
return [accuracy, precision, recall, f1] | |
def input_check(input_dict, model): | |
model_inputs = inspect.signature(model.forward).parameters.keys() | |
inputs = {} | |
for key, val in input_dict.items(): | |
if key in model_inputs: | |
inputs[key] = val | |
return inputs | |
def model_eval(model, device, loader, task_type='onehot', return_values=False, sentence_piece=False): | |
model.eval() | |
error = 0 | |
accuracy = 0 | |
precision = 0 | |
recall = 0 | |
f1 = 0 | |
eval_targets=[] | |
eval_outputs=[] | |
eval_texts=[] | |
with torch.no_grad(): | |
for data in tqdm(loader): | |
eval_texts.extend(data['text']) | |
input_ids=data['input_ids'].to(device, dtype=torch.long) | |
mask = data['attention_mask'].to(device, dtype=torch.long) | |
token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) | |
if task_type=='onehot': | |
targets=data['label_onehot'].to(device, dtype=torch.float) | |
elif task_type=='scalar': | |
targets=data['label'].to(device, dtype=torch.long) | |
position = data['position'] | |
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
'labels': targets, 'position': position} | |
if sentence_piece: | |
sentence_batch = data['sentence_batch'].to(device, dtype=torch.long) | |
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
'labels': targets, 'sentence_batch': sentence_batch, 'position': position} | |
outputs = model(inputs) | |
output = outputs[1] | |
loss = outputs[0] | |
#loss=loss_fn(output, targets) | |
error+=loss | |
#output = torch.sigmoid(output) | |
eval_targets.extend(targets.detach().cpu().numpy()) | |
eval_outputs.extend(output.detach().cpu().numpy()) | |
error = error / len(loader) | |
accuracy, precision, recall, f1 = compute_metrics(eval_outputs, eval_targets, task_type=task_type) | |
if return_values: | |
return [error, accuracy, precision, recall, f1, eval_targets, eval_outputs, eval_texts] | |
else: | |
return [error, accuracy, precision, recall, f1] | |
def get_hidden(model, device, loader, task_type='onehot', sentence_piece=False): | |
model.eval() | |
total_hidden_state = [] | |
total_targets=[] | |
with torch.no_grad(): | |
for data in tqdm(loader): | |
input_ids=data['input_ids'].to(device, dtype=torch.long) | |
mask = data['attention_mask'].to(device, dtype=torch.long) | |
token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) | |
if task_type=='onehot': | |
targets=data['label_onehot'].to(device, dtype=torch.float) | |
elif task_type=='scalar': | |
targets=data['label'].to(device, dtype=torch.long) | |
position = data['position'] | |
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
'labels': targets, 'position': position} | |
if sentence_piece: | |
sentence_batch = data['sentence_batch'].to(device, dtype=torch.long) | |
inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
'labels': targets, 'sentence_batch': sentence_batch, 'position': position} | |
outputs = model(inputs) | |
hidden_state = outputs[2] | |
total_hidden_state.extend(hidden_state.detach().cpu().numpy()) | |
total_targets.extend(targets.detach().cpu().numpy()) | |
return total_hidden_state, total_targets | |
def sentencepiece(paragraph_list, spacy_nlp, tokenizer, max_length=512): | |
# ํ์ฌ token type ids๊ฐ tokenizer์์ ์์ฑํ๋ ๋ฐ์ดํฐ๊ฐ ์๋ ๋ด๊ฐ ์์์ ์ผ๋ก 0, 1๋ก๋ง ๋ฃ๋๋ก ํด๋์์, XLNET ๊ฐ์๊ฑด CLS๊ฐ 2๋ก ๋๋ ๊ฒฝ์ฐ ๊ฐ์ด ์ด ๊ท์น์ ๋ฒ์ด๋๋ ๊ฒฝ์ฐ๊ฐ ์์ด์ ๋์ค์ ๋ฌธ์ ๋๋ฉด ์์ ํ์ | |
encode_datas = {'input_ids': [], 'token_type_ids': [], 'attention_mask': [], 'sentence_batch': []} | |
for paragraph in paragraph_list: | |
doc = spacy_nlp(paragraph) | |
sentence_encode = [sent.text for sent in doc.sents] | |
sentence_encode = tokenizer.batch_encode_plus(sentence_encode, max_length=max_length, padding='max_length', return_attention_mask=True, return_token_type_ids=True) | |
sentence_list = sentence_encode['input_ids'] | |
mask_list = sentence_encode['attention_mask'] | |
pad_token = None | |
pad_position = None | |
total_sentence = torch.tensor([], dtype=torch.int) | |
token_type_ids = [] | |
s_batch = [] | |
for n, s in enumerate(sentence_list): | |
if pad_token is None: | |
pad_token = s[mask_list[n].index(0)] | |
if pad_position is None: | |
if s[0] == pad_token: | |
pad_position = 'start' | |
else: | |
pad_position = 'end' | |
s=torch.tensor(s, dtype=torch.int) | |
s = s[s!=pad_token] | |
total_length = len(total_sentence) + len(s) | |
if total_length > max_length: | |
break | |
total_sentence = torch.concat([total_sentence, s]) | |
token_type_ids = token_type_ids + [n%2]*len(s) | |
s_batch = s_batch + [n]*len(s) | |
total_sentence = total_sentence.tolist() | |
pad_length = max_length - len(total_sentence) | |
attention_mask = [1]*len(total_sentence) | |
if pad_position == 'end': | |
total_sentence = total_sentence + [pad_token]*pad_length | |
attention_mask = attention_mask + [0]*pad_length | |
s_batch = s_batch + [max(s_batch)+1]*pad_length | |
if n%2 == 0: | |
token_type_ids = token_type_ids + [1]*pad_length | |
else: | |
token_type_ids = token_type_ids + [0]*pad_length | |
elif pad_position == 'start': | |
total_sentence = [pad_token]*pad_length + total_sentence | |
attention_mask = [0]*pad_length + attention_mask | |
s_batch = [max(s_batch)+1]*pad_length + s_batch | |
if n%2 == 0: | |
token_type_ids = [0]*pad_length + token_type_ids | |
else: | |
token_type_ids = [1]*pad_length + token_type_ids | |
encode_datas['input_ids'].append(total_sentence) | |
encode_datas['token_type_ids'].append(token_type_ids) | |
encode_datas['attention_mask'].append(attention_mask) | |
encode_datas['sentence_batch'].append(s_batch) | |
return encode_datas | |
class EarlyStopping: | |
"""์ฃผ์ด์ง patience ์ดํ๋ก validation loss๊ฐ ๊ฐ์ ๋์ง ์์ผ๋ฉด ํ์ต์ ์กฐ๊ธฐ ์ค์ง""" | |
def __init__(self, patience=7, verbose=False, delta=0): | |
""" | |
Args: | |
patience (int): validation loss๊ฐ ๊ฐ์ ๋ ํ ๊ธฐ๋ค๋ฆฌ๋ ๊ธฐ๊ฐ | |
Default: 7 | |
verbose (bool): True์ผ ๊ฒฝ์ฐ ๊ฐ validation loss์ ๊ฐ์ ์ฌํญ ๋ฉ์ธ์ง ์ถ๋ ฅ | |
Default: False | |
delta (float): ๊ฐ์ ๋์๋ค๊ณ ์ธ์ ๋๋ monitered quantity์ ์ต์ ๋ณํ | |
Default: 0 | |
""" | |
self.patience = patience | |
self.verbose = verbose | |
self.counter = 0 | |
self.best_score = None | |
self.early_stop = False | |
self.f1_score_max = 0. | |
self.delta = delta | |
def __call__(self, f1_score): | |
score = -f1_score | |
if self.best_score is None: | |
self.best_score = score | |
self.save_checkpoint(f1_score) | |
elif score > self.best_score + self.delta: | |
self.counter += 1 | |
print(f'EarlyStopping counter: {self.counter} out of {self.patience}') | |
if self.counter >= self.patience: | |
self.early_stop = True | |
else: | |
self.best_score = score | |
self.save_checkpoint(f1_score) | |
self.counter = 0 | |
def save_checkpoint(self, f1_score): | |
'''validation loss๊ฐ ๊ฐ์ํ๋ฉด ๊ฐ์๋ฅผ ์ถ๋ ฅํ๋ค.''' | |
if self.verbose: | |
print(f'F1 score increase ({self.f1_score_max:.6f} --> {f1_score:.6f}). ') | |
self.f1_score_max = f1_score | |
def model_freeze(model, freeze_layers=None): | |
if freeze_layers == 0: | |
return model | |
if freeze_layers is not None: | |
for param in model.pretrained_model.base_model.word_embedding.parameters(): | |
param.requires_grad = False | |
if freeze_layers != -1: | |
# if freeze_layer_count == -1, we only freeze the embedding layer | |
# otherwise we freeze the first `freeze_layer_count` encoder layers | |
for layer in model.pretrained_model.base_model.layer[:freeze_layers]: | |
for param in layer.parameters(): | |
param.requires_grad = False | |
return model | |
def pos_encoding(pos, d, n=10000): | |
encoding_list = [] | |
for p in pos: | |
P = np.zeros(d) | |
for i in np.arange(int(d/2)): | |
denominator = np.power(n, 2*i/d) | |
P[2*i] = np.sin(p/denominator) | |
P[2*i+1] = np.cos(p/denominator) | |
encoding_list.append(P) | |
return torch.tensor(np.array(encoding_list)) | |