Spaces:
Runtime error
Runtime error
| from doctest import DocFileCase | |
| from tqdm import tqdm | |
| import numpy as np | |
| import torch | |
| from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score | |
| from sklearn.utils import shuffle | |
| import random | |
| import datetime as dt | |
| import os | |
| from glob import glob | |
| from spacy.lang.en import English | |
| import inspect | |
| def checkpoint_save(model, val_loss, checkpoint_dir=None, wandb_name=None): | |
| if checkpoint_dir is None: | |
| checkpoint_dir = './save_model' | |
| if not os.path.isdir(checkpoint_dir): | |
| os.mkdir(checkpoint_dir) | |
| x = dt.datetime.now() | |
| y = x.year | |
| m = x.month | |
| d = x.day | |
| if wandb_name is None: | |
| wandb_name = "testing" | |
| torch.save(model.state_dict(), "./save_model/{}_{}_{}_{:.4f}_{}.pt".format(y, m, d, val_loss, wandb_name)) | |
| #saved_dict_list = glob(os.path.join(checkpoint_dir, '*.pt')) | |
| saved_dict_list = glob(os.path.join(checkpoint_dir, '{}_{}_{}_*_{}.pt'.format(y,m,d,wandb_name))) | |
| val_loss_list = np.array([float(os.path.basename(loss).split("_")[3]) for loss in saved_dict_list]) | |
| saved_dict_list.pop(val_loss_list.argmax()) | |
| for i in saved_dict_list: | |
| os.remove(i) | |
| def set_seed(seed): | |
| torch.backends.cudnn.deterministic = True | |
| torch.backends.cudnn.benchmark = False | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| np.random.seed(seed) | |
| random.seed(seed) | |
| def accuracy_per_class(preds, labels): | |
| label_dict = {'Abstract':0, 'Intro':1, 'Main':2, 'Method':3, 'Summary':4, 'Caption':5} | |
| label_dict_inverse = {v: k for k, v in label_dict.items()} | |
| class_list = [] | |
| acc_list = [] | |
| for label in list(label_dict.values()): | |
| y_preds = preds[labels==label] | |
| y_true = labels[labels==label] | |
| class_list.append(label_dict_inverse[label]) | |
| acc_list.append("{0}/{1}".format(len(y_preds[y_preds==label]), len(y_true))) | |
| print("{:10} {:10} {:10} {:10} {:10} {:10}".format(class_list[0], class_list[1], class_list[2], class_list[3], class_list[4], class_list[5])) | |
| print("{:10} {:10} {:10} {:10} {:10} {:10}".format(acc_list[0], acc_list[1], acc_list[2], acc_list[3], acc_list[4], acc_list[5])) | |
| def compute_metrics(output, target, task_type='onehot'): | |
| if task_type=='onehot': | |
| pred=np.argmax(output, axis=1).flatten() | |
| labels=np.argmax(target, axis=1).flatten() | |
| elif task_type=='scalar': | |
| pred=np.argmax(output, axis=1).flatten() | |
| labels=np.array(target).flatten() | |
| accuracy = accuracy_score(y_true=labels, y_pred=pred) | |
| recall = recall_score(y_true=labels, y_pred=pred, average='macro') | |
| precision = precision_score(y_true=labels, y_pred=pred, average='macro', zero_division=0) | |
| f1 = f1_score(y_true=labels, y_pred=pred, average='macro') | |
| accuracy_per_class(pred, labels) | |
| return [accuracy, precision, recall, f1] | |
| def input_check(input_dict, model): | |
| model_inputs = inspect.signature(model.forward).parameters.keys() | |
| inputs = {} | |
| for key, val in input_dict.items(): | |
| if key in model_inputs: | |
| inputs[key] = val | |
| return inputs | |
| def model_eval(model, device, loader, task_type='onehot', return_values=False, sentence_piece=False): | |
| model.eval() | |
| error = 0 | |
| accuracy = 0 | |
| precision = 0 | |
| recall = 0 | |
| f1 = 0 | |
| eval_targets=[] | |
| eval_outputs=[] | |
| eval_texts=[] | |
| with torch.no_grad(): | |
| for data in tqdm(loader): | |
| eval_texts.extend(data['text']) | |
| input_ids=data['input_ids'].to(device, dtype=torch.long) | |
| mask = data['attention_mask'].to(device, dtype=torch.long) | |
| token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) | |
| if task_type=='onehot': | |
| targets=data['label_onehot'].to(device, dtype=torch.float) | |
| elif task_type=='scalar': | |
| targets=data['label'].to(device, dtype=torch.long) | |
| position = data['position'] | |
| inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
| 'labels': targets, 'position': position} | |
| if sentence_piece: | |
| sentence_batch = data['sentence_batch'].to(device, dtype=torch.long) | |
| inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
| 'labels': targets, 'sentence_batch': sentence_batch, 'position': position} | |
| outputs = model(inputs) | |
| output = outputs[1] | |
| loss = outputs[0] | |
| #loss=loss_fn(output, targets) | |
| error+=loss | |
| #output = torch.sigmoid(output) | |
| eval_targets.extend(targets.detach().cpu().numpy()) | |
| eval_outputs.extend(output.detach().cpu().numpy()) | |
| error = error / len(loader) | |
| accuracy, precision, recall, f1 = compute_metrics(eval_outputs, eval_targets, task_type=task_type) | |
| if return_values: | |
| return [error, accuracy, precision, recall, f1, eval_targets, eval_outputs, eval_texts] | |
| else: | |
| return [error, accuracy, precision, recall, f1] | |
| def get_hidden(model, device, loader, task_type='onehot', sentence_piece=False): | |
| model.eval() | |
| total_hidden_state = [] | |
| total_targets=[] | |
| with torch.no_grad(): | |
| for data in tqdm(loader): | |
| input_ids=data['input_ids'].to(device, dtype=torch.long) | |
| mask = data['attention_mask'].to(device, dtype=torch.long) | |
| token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) | |
| if task_type=='onehot': | |
| targets=data['label_onehot'].to(device, dtype=torch.float) | |
| elif task_type=='scalar': | |
| targets=data['label'].to(device, dtype=torch.long) | |
| position = data['position'] | |
| inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
| 'labels': targets, 'position': position} | |
| if sentence_piece: | |
| sentence_batch = data['sentence_batch'].to(device, dtype=torch.long) | |
| inputs = {'input_ids': input_ids, 'attention_mask': mask, 'token_type_ids': token_type_ids, | |
| 'labels': targets, 'sentence_batch': sentence_batch, 'position': position} | |
| outputs = model(inputs) | |
| hidden_state = outputs[2] | |
| total_hidden_state.extend(hidden_state.detach().cpu().numpy()) | |
| total_targets.extend(targets.detach().cpu().numpy()) | |
| return total_hidden_state, total_targets | |
| def sentencepiece(paragraph_list, spacy_nlp, tokenizer, max_length=512): | |
| # 현재 token type ids가 tokenizer에서 생성하는 데이터가 아닌 내가 임의적으로 0, 1로만 넣도록 해놓았음, XLNET 같은건 CLS가 2로 되는 경우 같이 이 규칙을 벗어나는 경우가 있어서 나중에 문제되면 수정 필요 | |
| encode_datas = {'input_ids': [], 'token_type_ids': [], 'attention_mask': [], 'sentence_batch': []} | |
| for paragraph in paragraph_list: | |
| doc = spacy_nlp(paragraph) | |
| sentence_encode = [sent.text for sent in doc.sents] | |
| sentence_encode = tokenizer.batch_encode_plus(sentence_encode, max_length=max_length, padding='max_length', return_attention_mask=True, return_token_type_ids=True) | |
| sentence_list = sentence_encode['input_ids'] | |
| mask_list = sentence_encode['attention_mask'] | |
| pad_token = None | |
| pad_position = None | |
| total_sentence = torch.tensor([], dtype=torch.int) | |
| token_type_ids = [] | |
| s_batch = [] | |
| for n, s in enumerate(sentence_list): | |
| if pad_token is None: | |
| pad_token = s[mask_list[n].index(0)] | |
| if pad_position is None: | |
| if s[0] == pad_token: | |
| pad_position = 'start' | |
| else: | |
| pad_position = 'end' | |
| s=torch.tensor(s, dtype=torch.int) | |
| s = s[s!=pad_token] | |
| total_length = len(total_sentence) + len(s) | |
| if total_length > max_length: | |
| break | |
| total_sentence = torch.concat([total_sentence, s]) | |
| token_type_ids = token_type_ids + [n%2]*len(s) | |
| s_batch = s_batch + [n]*len(s) | |
| total_sentence = total_sentence.tolist() | |
| pad_length = max_length - len(total_sentence) | |
| attention_mask = [1]*len(total_sentence) | |
| if pad_position == 'end': | |
| total_sentence = total_sentence + [pad_token]*pad_length | |
| attention_mask = attention_mask + [0]*pad_length | |
| s_batch = s_batch + [max(s_batch)+1]*pad_length | |
| if n%2 == 0: | |
| token_type_ids = token_type_ids + [1]*pad_length | |
| else: | |
| token_type_ids = token_type_ids + [0]*pad_length | |
| elif pad_position == 'start': | |
| total_sentence = [pad_token]*pad_length + total_sentence | |
| attention_mask = [0]*pad_length + attention_mask | |
| s_batch = [max(s_batch)+1]*pad_length + s_batch | |
| if n%2 == 0: | |
| token_type_ids = [0]*pad_length + token_type_ids | |
| else: | |
| token_type_ids = [1]*pad_length + token_type_ids | |
| encode_datas['input_ids'].append(total_sentence) | |
| encode_datas['token_type_ids'].append(token_type_ids) | |
| encode_datas['attention_mask'].append(attention_mask) | |
| encode_datas['sentence_batch'].append(s_batch) | |
| return encode_datas | |
| class EarlyStopping: | |
| """주어진 patience 이후로 validation loss가 개선되지 않으면 학습을 조기 중지""" | |
| def __init__(self, patience=7, verbose=False, delta=0): | |
| """ | |
| Args: | |
| patience (int): validation loss가 개선된 후 기다리는 기간 | |
| Default: 7 | |
| verbose (bool): True일 경우 각 validation loss의 개선 사항 메세지 출력 | |
| Default: False | |
| delta (float): 개선되었다고 인정되는 monitered quantity의 최소 변화 | |
| Default: 0 | |
| """ | |
| self.patience = patience | |
| self.verbose = verbose | |
| self.counter = 0 | |
| self.best_score = None | |
| self.early_stop = False | |
| self.f1_score_max = 0. | |
| self.delta = delta | |
| def __call__(self, f1_score): | |
| score = -f1_score | |
| if self.best_score is None: | |
| self.best_score = score | |
| self.save_checkpoint(f1_score) | |
| elif score > self.best_score + self.delta: | |
| self.counter += 1 | |
| print(f'EarlyStopping counter: {self.counter} out of {self.patience}') | |
| if self.counter >= self.patience: | |
| self.early_stop = True | |
| else: | |
| self.best_score = score | |
| self.save_checkpoint(f1_score) | |
| self.counter = 0 | |
| def save_checkpoint(self, f1_score): | |
| '''validation loss가 감소하면 감소를 출력한다.''' | |
| if self.verbose: | |
| print(f'F1 score increase ({self.f1_score_max:.6f} --> {f1_score:.6f}). ') | |
| self.f1_score_max = f1_score | |
| def model_freeze(model, freeze_layers=None): | |
| if freeze_layers == 0: | |
| return model | |
| if freeze_layers is not None: | |
| for param in model.pretrained_model.base_model.word_embedding.parameters(): | |
| param.requires_grad = False | |
| if freeze_layers != -1: | |
| # if freeze_layer_count == -1, we only freeze the embedding layer | |
| # otherwise we freeze the first `freeze_layer_count` encoder layers | |
| for layer in model.pretrained_model.base_model.layer[:freeze_layers]: | |
| for param in layer.parameters(): | |
| param.requires_grad = False | |
| return model | |
| def pos_encoding(pos, d, n=10000): | |
| encoding_list = [] | |
| for p in pos: | |
| P = np.zeros(d) | |
| for i in np.arange(int(d/2)): | |
| denominator = np.power(n, 2*i/d) | |
| P[2*i] = np.sin(p/denominator) | |
| P[2*i+1] = np.cos(p/denominator) | |
| encoding_list.append(P) | |
| return torch.tensor(np.array(encoding_list)) | |