동기
CNN의 다양한 모델들에 대해 공부하면서, RNN과 관련된 시계열 데이터 관련 모델 뿐만 아니라 생성형 모델, 자연어 처리등에 대한 사전 지식의 필요성을 느꼈습니다. DETR을 공부하면서 Panoptic Segmentation을 구현할 할 때에, Transformer(="2017 Attention is all you need")가 활용되는 것만 해도 알 수 있죠. 그리고 교수님 말로는 요즘 모델은 CNN이 단독으로 쓰이는게 아니라 다양한 분야의 모델들이 결합해서 더 좋은 성능 혹은 발견을 하게 된다면 그것이 논문으로 나오게 되는 것이라고 하셨습니다.
그리고 대부분의 CV 분야의 SOTA모델에 Transformer이 들어갈 정도로 매우 중요한 모델이기 때문에, 그 기초가 되는 seq2seq모델에서 하나는 attention없이, 하나는 addictive attention(=concatenate; badanau)을 사용하고 전자의 encoder-decoder에는 LSTM, 후자의 encoder-decoder에는 GRU를 사용하는 쪽으로 코드를 짜보았습니다.
https://tutorials.pytorch.kr/beginner/translation_transformer.html
위 자료와 pytorch 공식 문서를 참고하여 짰으니 참고 바랍니다. 그리고 현재 Mutli30k를 다운로드하는 서버가 맛이 갔는지는 몰라도 제가 임의로 url을 지정해주어 train, valid dataset을 다운했다는 점 참고 바랍니다.
그리고 설명은 주석에 텐서의 크기와 사용 용도를 간략하게 적어 놓았으니 이해하는데에는 편할겁니다.
구글 드라이브 마운트
from google.colab import drive
drive.mount("/content/drive")
import os
import sys
from datetime import datetime
drive_project_root = "/content/drive/MyDrive/DeepLearning/fastcampus"
sys.path.append(drive_project_root)
!pip install -r "/content/drive/MyDrive/DeepLearning/fastcampus/requirements.txt"
늘 하던데로 구글 드라이브에 마운트 하고, drive_project_root를 통해 작업 폴더를 정의하고 requirement를 다운 받아줍니다.
requirements.txt
torch
pytorch-lightning
torch-optimizer
hydra-core
wandb
torchtext
torchvision
spacy
efficientnet_pytorch
onnx
onnxruntime
onnx_tf
tf2onnx
portalocker
위 패키지를 모두 최신 버전으로 받아서 사용했습니다.
GPU 확인 (V100)
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
print('and then re-execute this cell.')
else:
print(gpu_info)
저는 Colab을 결제하였기 때문에, V100 16G GPU를 사용하고 있습니다.
필요 라이브러리 Import
# For data loading.
from typing import List
from typing import Dict
from typing import Union
from typing import Any
from typing import Optional
from typing import Iterable
from typing import Callable
from abc import abstractmethod
from abc import ABC
from datetime import datetime
from functools import partial
from collections import Counter
from collections import OrderedDict
import random
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from pprint import pprint
# pytorch에서 제공하는 텍스트 데이터
from torchtext import data
from torchtext import datasets
from torchtext.datasets import Multi30k, multi30k
from torchtext.data.utils import get_tokenizer
from torchtext.data.functional import to_map_style_dataset
from torchtext.vocab import Vocab, build_vocab_from_iterator, vocab
import spacy
# For configuration
from omegaconf import DictConfig
from omegaconf import OmegaConf
import hydra
from hydra.core.config_store import ConfigStore
# For logger
from torch.utils.tensorboard import SummaryWriter
import wandb
os.environ["WANDB_START_METHOD"]="thread"
우선 저희는 pytorch lightning, hydra를 사용하여 config를 작성하고 config를 작성하는 것을 통해 구현할 2가지 버전의 모델을 체계적으로 관리할 것입니다. 위에서 사용되는 라이브러리를 간단히 소개하겠습니다.
- Multi30k: de -> en(=독일어 -> 영어) 번역 데이터를 통해서 seq2seq 모델을 학습시킬 것입니다.
- Spacy: 이를 통해 단어를 알맞게 tokenizing할 것입니다.
- Omegaconf, hydra: 모델 설정 관련 라이브러리 입니다.
- wandb: tensorboard와 같은 다양한 matric을 확인할 수 있게 도와주는 라이브러리입니다.
필요 외부작성 스크립트 불러오기
from data_utils import dataset_split
from config_utils import flatten_dict
from config_utils import register_config
from config_utils import configure_optimizers_from_cfg
from config_utils import get_loggers
from config_utils import get_callbacks
from custom_math import softmax
data_utils.py
from typing import List, Dict
import numpy as np
import torch
def dataset_split(
dataset: torch.utils.data.Dataset,
split: List[float] = [0.9, 0.1],
random_train_val_split: bool = False,
) -> Dict[str, torch.utils.data.dataset.Subset]:
"""split torch.utils.data.Dataset by given split ratio.
Written by Jungbae Park.
Args:
dataset (torch.utils.data.Dataset): input interaction dataset.
split (List[float], optional): split ratio.
len(split) should be in [2, 3] & sum(split) should be 1.0
if len(split) == 2:
return {
"train": train_dataset,
"val": val_dataset
}
elif len(split) == 3:
return {
"train": train_dataset,
"val": val_dataset,
"test": test_dataset
}
Defaults to [0.9, 0.1].
random_train_val_split (bool, optional):
if it's True, will randomly mix mix of train, val indices.
In that case, test_dataset will remain
as the last portion of all_dataset.
else, will keep order for splits (sequential split)
Defaults to False.
Returns:
Dict[str, torch.utils.data.dataset.Subset]:
return subset of datasets as dictionaries.
i.e. {
"train": train_dataset,
"val": val_dataset,
"test": test_dataset
}
"""
assert len(split) in [2, 3]
assert sum(split) == 1.0
for frac in split:
assert frac >= 0.0
indices = list(range(len(dataset)))
modes = ["train", "val", "test"][: len(split)]
sizes = np.array(np.cumsum([0] + list(split)) * len(dataset), dtype=int)
# sizes = [0, train_size, train_size+val_size, len(datasets)]
if random_train_val_split:
train_and_val_idx = indices[: sizes[2]]
random.shuffle(train_and_val_idx)
indices = train_and_val_idx + indices[sizes[2] :]
datasets = {
mode: torch.utils.data.Subset(
dataset, indices[sizes[i] : sizes[i + 1]]
)
for i, mode in enumerate(modes)
}
return datasets
이는 데이터 셋을 나누기 위한 미리 작성된 스크립트입니다. 저도 오픈소스로 가져온거라 그냥 쓰시면됩니다.
config_utils.py
import os
from typing import Union
from omegaconf import DictConfig
import torch
from torch import optim
from torch_optimizer import RAdam
from torch_optimizer import AdamP
import pytorch_lightning as pl
import hydra
from hydra.core.config_store import ConfigStore
import wandb
def flatten_dict(
input_dict: Union[dict, DictConfig],
separator: str = '_',
prefix: str = ''
):
"""flattening dict,
used in wandb log.
"""
if isinstance(input_dict, DictConfig):
input_dict = dict(input_dict)
return {
prefix + separator + k if prefix else k : v
for kk, vv in input_dict.items()
for k, v in flatten_dict(vv, separator, kk).items()
} if isinstance(input_dict, dict) else {prefix: input_dict}
def register_config(configs_dict: dict) -> None:
"""hydra register configuration"""
cs = ConfigStore.instance()
for k, merged_cfg in configs_dict.items():
cs.store(name=k, node=merged_cfg)
def configure_optimizers_from_cfg(cfg: DictConfig, model):
optimizers = []
schedulers = []
# schedulers are optional but, if they're given, the below should be same.
if len(cfg.opt.lr_schedulers) > 0:
assert len(cfg.opt.lr_schedulers) == len(cfg.opt.optimizers)
# setup optimizer
for opt_cfg in cfg.opt.optimizers:
if opt_cfg.name == "RAdam":
optimizers.append(
RAdam(model.parameters(), **opt_cfg.kwargs)
)
elif opt_cfg.name == "SGD":
optimizers.append(
SGD(model.parameters(), **opt_cfg.kwargs)
)
elif opt_cfg.name == "AdamP":
optimizers.append(
AdamP(model.parameters(), **opt_cfg.kwargs)
)
elif opt_cfg.name == "Adam":
optimizers.append(
Adam(model.parameters(), **opt_cfg.kwargs)
)
else:
raise NotImplementedError(f"Not supported optimizer: {opt_cfg.name}")
# setup lr scheduler
for idx, lr_sch_cfg in enumerate(cfg.opt.lr_schedulers):
if lr_sch_cfg.name is None or lr_sch_cfg.name == "":
pass
elif lr_sch_cfg.name == "LinearWarmupLR":
schedulers.append(
LinearWarmupLR(optimizers[idx], **lr_sch_cfg.kwargs)
)
else:
raise NotImplementedError(f"Not supported lr_scheduler: {lr_sch_cfg.name}")
return optimizers, schedulers
def configure_optimizer_element(
opt_cfg: DictConfig, lr_sch_cfg: DictConfig, model
):
optimizer = None
scheduler = None
# setup optimizer
if opt_cfg.name == "RAdam":
optimizer = RAdam(model.parameters(), **opt_cfg.kwargs)
elif opt_cfg.name == "SGD":
optimizer = SGD(model.parameters(), **opt_cfg.kwargs)
elif opt_cfg.name == "AdamP":
optimizer = AdamP(model.parameters(), **opt_cfg.kwargs)
elif opt_cfg.name == "Adam":
optimizer = Adam(model.parameters(), **opt_cfg.kwargs)
else:
raise NotImplementedError(f"Not supported optimizer: {opt_cfg.name}")
# setup lr scheduler
if lr_sch_cfg.name is None or lr_sch_cfg.name == "":
pass
elif lr_sch_cfg.name == "LinearWarmupLR":
scheduler = LinearWarmupLR(optimizer, **lr_sch_cfg.kwargs)
else:
raise NotImplementedError(f"Not supported lr_scheduler: {lr_sch_cfg.name}")
return optimizer, scheduler
# loggers
def get_loggers(cfg: DictConfig):
logger = []
loggers_cfg = cfg.log.loggers
for name, kwargs_dict in loggers_cfg.items():
if name == "WandbLogger":
wandb.finish()
os.makedirs(kwargs_dict.save_dir, exist_ok=True)
logger.append(pl.loggers.WandbLogger(
config=flatten_dict(cfg),
# reinit=True,
settings=wandb.Settings(start_method="thread"),
**kwargs_dict,
))
elif name == "TensorBoardLogger":
logger.append(pl.loggers.TensorBoardLogger(
**kwargs_dict
))
else:
raise NotImplementedError(f"invalid loggers_cfg name {name}")
return logger
# callbacks
def get_callbacks(cfg: DictConfig):
callbacks = []
callbacks_cfg = cfg.log.callbacks
for name, kwargs_dict in callbacks_cfg.items():
if name == "ModelCheckpoint":
callbacks.append(pl.callbacks.ModelCheckpoint(
**kwargs_dict,
))
elif name == "EarlyStopping":
callbacks.append(pl.callbacks.EarlyStopping(
**kwargs_dict
))
else:
raise NotImplementedError(f"invalid callbacks_cfg name {name}")
return callbacks
이는 그냥 config파일에서 다양한 옵션들을 편하게 가져오기 위한 함수를 외부 수크립트로 따로 작성한 것입니다. 아래 코드를 보면서, 해당 코드가 어떻게 활용되었는지 보는걸 추천합니다.
custom_math.py
import numpy as np
def softmax(x, axis=0):
"numpy softmax"
max = np.max(x, axis=axis, keepdims=True)
e_x = np.exp(x - max)
sum = np.sum(e_x, axis=axis, keepdims=True)
f_x = e_x / sum
return f_x
여기에는 softmax를 axis에 맞게 해주는 함수가 정의되어 있습니다.
# download eng/d data.
!python -m spacy download en
!python -m spacy download en_core_web_sm
!python -m spacy download de
!python -m spacy download de_core_news_sm
그리고 위와같이 spacy를 필요한 모듈만 다운받아 줍니다.
data configuration 정의하기
data_spacy_de_en_cfg = {
"name": "spacy_de_en",
"data_root": os.path.join(os.getcwd(), "data"),
"tokenizer": "spacy",
"src_lang": "de",
"tgt_lang": "en",
"src_index": 0,
"tgt_index": 1,
"vocab": {
"special_symbol2index": {
"<unk>": 0,
"<pad>": 1,
"<bos>": 2,
"<eos>": 3,
},
"special_first": True,
"min_freq": 2
}
}
data_cfg = OmegaConf.create(data_spacy_de_en_cfg)
print(OmegaConf.to_yaml(data_cfg))
pprint(dict(data_cfg))
위와같이 de -> en으로 가기 위한 data configuration 파일을 작성하고 OmegaConf로 확인해보았습니다. 여기에는 src_lang, tgt_lang, index들, vocab.special_symbol2index에 special token들을 정의하고 자세한 옵션까지 정의했습니다.
외부 저장소 등록
multi30k.URL["train"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/training.tar.gz"
multi30k.URL["valid"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/validation.tar.gz"
# multi30k.URL["test"] = "https://github.com/multi30k/dataset/blob/master/data/task1/raw/test_2016_flickr.de.gz"
위에서 말했다 싶이 multi30k의 train, valid 데이터에 대한 외부 저장소를 등록합니다.
Transform process
# 1. token_transform
'''
spacy로 tokenizer를 진행
'''
def get_token_transform(data_cfg: DictConfig) -> dict:
token_transform: dict = {}
token_transform[data_cfg.src_lang] = get_tokenizer(
data_cfg.tokenizer, language=data_cfg.src_lang + "_core_news_sm")
token_transform[data_cfg.tgt_lang] = get_tokenizer(
data_cfg.tokenizer, language=data_cfg.tgt_lang + "_core_web_sm")
return token_transform
token_transform = get_token_transform(data_cfg)
# 2. vocab_transform
'''
vocab_transform: 토크화된 단어들을 vocabulary(=단어장)을 만들어서
토큰화한 문자들을 이에 맞는 indices로 바꾸어준다.
'''
def yield_tokens(
data_iter: Iterable, lang: str, lang2index: Dict[str, int]
) -> List[str]:
"""help function to yield list of tokens"""
for data_sample in data_iter:
yield token_transform[lang](data_sample[lang2index[lang]])
def get_vocab_transform(data_cfg: DictConfig) -> dict:
vocab_transform: dict = {}
for ln in [data_cfg.src_lang, data_cfg.tgt_lang]:
# Training data Iterator
# --> 단어장은 Multi30k의 train데이터로 만들어 버린다. 다른걸로 해도 좋음
train_iter = Multi30k(
split='train', language_pair=(data_cfg.src_lang, data_cfg.tgt_lang)
)
# Create torchtext's Vocab object
vocab_transform[ln] = build_vocab_from_iterator(
yield_tokens(
train_iter,
ln,
{
data_cfg.src_lang: data_cfg.src_index,
data_cfg.tgt_lang: data_cfg.tgt_index
}
),
min_freq=data_cfg.vocab.min_freq,
specials=list(data_cfg.vocab.special_symbol2index.keys()),
special_first=data_cfg.vocab.special_first,
)
# set UNKNOWN as the default index. --> index가 unknown으로 return되면 : token이 찾아지지 않은 경우
# 만약 세팅되지 않으면, runtime error가 날 수 있다.
for ln in [data_cfg.src_lang, data_cfg.tgt_lang]:
vocab_transform[ln].set_default_index(data_cfg.vocab.special_symbol2index["<unk>"])
return vocab_transform
vocab_transform = get_vocab_transform(data_cfg)
코드에 대한 설명은 읽어보면 이해가 가실겁니다. token transform -> vocab_transform과정입니다. 그냥 토크나이징 한다음에 vocabulary를 만들었다고 보시면 됩니다.
print(vocab_transform["de"]["<bos>"]) # 2
print(vocab_transform["en"]["<unk>"]) # 0 (default)
print(vocab_transform["en"]["hello"], vocab_transform["en"]["world"]) # 5465 1870
# 3. integrated transforms
# --> text_transform [token_transform --> vocab_transform --> torch.tensor transform]
# helper function for collate_fn
def sequential_transform(*transforms):
def func(txt_input):
for transform in transforms:
txt_input = transform(txt_input)
return txt_input
return func
# convert to torch.tensor with bos & eos
def tensor_transform(token_ids: List[int], bos_index: int, eos_index: int):
return torch.cat(
(torch.tensor([bos_index]),
torch.tensor(token_ids),
torch.tensor([eos_index]))
)
# src & tgt lang language text_transforms to convert raw strings -> tensor indices
def get_text_transform(data_cfg: DictConfig):
text_transform = {}
for ln in [data_cfg.src_lang, data_cfg.tgt_lang]:
text_transform[ln] = sequential_transform(
token_transform[ln],
vocab_transform[ln],
partial(
tensor_transform,
bos_index=data_cfg.vocab.special_symbol2index["<bos>"],
eos_index=data_cfg.vocab.special_symbol2index["<eos>"],
)
)
return text_transform
text_transform = get_text_transform(data_cfg)
print(text_transform["en"]("hello"))
print(text_transform["en"]("hello,"))
print(text_transform["en"]("hello, how"))
print(text_transform["en"]("hello, how are you ?")) # tensor([ 2, 5465, 15, 889, 17, 1328, 2470, 3])
위와같이 문장을 vocab에 등록되어 있는 단어로 잘 바꾸어 주는 것을 확인할 수 있습니다.
# function to collate data samples into batch tesors
def collate_fn(batch, data_cfg: DictConfig):
src_batch, tgt_batch = [], []
for src_sample, tgt_sample in batch:
src_batch.append(text_transform[data_cfg.src_lang](src_sample.rstrip("\n")))
tgt_batch.append(text_transform[data_cfg.tgt_lang](tgt_sample.rstrip("\n")))
# pad_sequence: T x B x * (T is the length of the longes sequence)
# default is batch_first=False because (for input to the LSTM)
# 즉 배치 사이즈 내에서 크기를 맞추어 주어야 한다.
src_batch = pad_sequence(src_batch, padding_value=data_cfg.vocab.special_symbol2index["<pad>"])
tgt_batch = pad_sequence(tgt_batch, padding_value=data_cfg.vocab.special_symbol2index["<pad>"])
return src_batch, tgt_batch
def get_collate_fn(cfg: DictConfig):
return partial(collate_fn, data_cfg=cfg.data)
def get_multi30k_dataloader(
split_mode: str, language_pair, batch_size: int, collate_fn
):
iter = Multi30k(split=split_mode, language_pair=language_pair)
dataset = to_map_style_dataset(iter)
# dataloader를 정의할 때, collate_fn을 통해서
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, collate_fn=collate_fn
)
return dataloader
이제 collate_fn함수를 통해서 배치 내에서의 다른 sequence들의 크기를 조정해주고 torchtext의 pad_squence를 통해 batch내에서 가장 큰 크기를 가진 시퀀스를 토대로 "<pad>"를 추가해줍니다.
그리고 get_multi30k_dataloader를 통해서 이를 가져옵니다. 그리고 이를 직접 아래와 같이 찍어보고 Multi30k를 가져오고 위에서 정의한 transform과정을 잘 하고 있는지 확인해줍니다.
test_dataloader = get_multi30k_dataloader("valid",
(data_cfg.src_lang, data_cfg.tgt_lang),
3,
collate_fn=partial(collate_fn, data_cfg=data_cfg))
for i in test_dataloader:
print(i)
break
그리고 멱등성 보장을 위해 SEED를 설정해줍니다.
# 멱등성 보장을 위해 시드를 설정해준다.
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
python lightning을 사용하면 더 쉽게 할 수 있다는데 일단 Pass하고 이렇게 하겠습니다.
BaseTranslateLightningModule 작성
def _text_postprocessing(res: List[str]) -> str:
if "<eos>" in res:
res = res[:res.index("<eos>")]
if "<pad>" in res:
res = res[:res.index("<pad>")]
res = " ".join(res).replace("<bos>", "")
return res
class BaseTranslateLightningModule(pl.LightningModule):
def __init__(self, cfg: DictConfig):
super().__init__()
self.cfg = cfg
self.loss_function = torch.nn.CrossEntropyLoss(
ignore_index=cfg.data.vocab.special_symbol2index["<pad>"]
)
def configure_optimizers(self):
self._optimizers, self._schedulers = configure_optimizers_from_cfg(
self.cfg, self
)
return self._optimizers, self._schedulers
@abstractmethod
def forward(self, src, tgt, mode: str, teacher_forcing_ratio: float):
raise NotImplementedError()
def _forward(self, src, tgt, mode: str, teacher_forcing_ratio: float = 0.5):
# teacher forcing:
# seq2seq 에서 많이 쓰인다.
# src -> tgt autogressive 학습하면, 맨 최초는 학습을 빠르게 한다. 근데, 미래부분 학습은?
# 랜덤으로 미래 정보도 조금 둬서 뒤에 있는 정보도 학습이 가능하게 하자
# 0.5는 --> 0.5 확률로 teacher_forcing을 하겠다.
assert mode in ["train", "val", "test"]
# get predictions
# teacher_forcing용 input
tgt_inputs = tgt[:-1, :] # delete ends for teacher forcing inputs.
outputs = self(src, tgt_inputs, teacher_forcing_ratio=teacher_forcing_ratio)
tgt_outputs = tgt[1:, :] # delete start points
loss = self.loss_function(
outputs.reshape(-1, outputs.shape[-1]), #[[batch X seq_size], other_output_shape]
tgt_outputs.reshape(-1),
)
# 학습이 어떻게 되고 있는지 확인하기 위해 찍어둔다.
log_detail = {
f"{mode}_src": src,
f"{mode}_tgt": tgt,
f"{mode}_results": outputs,
}
if mode in ["val", "test"]:
_, tgt_results = torch.max(outputs, dim=2)
src_texts = []
tgt_texts = []
res_texts = []
# convert [L X B X others] --> [B X L X others]
for src_i in torch.transpose(src, 0, 1).detach().cpu().numpy().tolist():
res = vocab_transform[self.cfg.data.src_lang].lookup_tokens(src_i) # indices -> token
src_texts.append(_text_postprocessing(res))
# convert [L X B X others] --> [B X L X others]
for tgt_i in torch.transpose(tgt, 0, 1).detach().cpu().numpy().tolist():
res = vocab_transform[self.cfg.data.tgt_lang].lookup_tokens(tgt_i) # indices -> token
tgt_texts.append(_text_postprocessing(res))
# convert [L X B X others] --> [B X L X others]
for tgt_res_i in torch.transpose(tgt_results, 0, 1).detach().cpu().numpy().tolist():
res = vocab_transform[self.cfg.data.tgt_lang].lookup_tokens(tgt_res_i) # indices -> token
res_texts.append(_text_postprocessing(res))
text_result_summary = {
f"{mode}_src_text": src_texts,
f"{mode}_tgt_text": tgt_texts,
f"{mode}_results_text": res_texts,
}
print(f"{self.global_step} step: \n src_text: {src_texts[0]}, \n tgt_text: {tgt_texts[0]}, \n result_text: {res_texts[0]}")
log_detail.update(text_result_summary)
return {f"{mode}_loss": loss}, log_detail
def training_step(self, batch, batch_idx):
src, tgt = batch[0], batch[1]
logs, _ = self._forward(src, tgt, "train", self.cfg.model.teacher_forcing_ratio)
self.log_dict(logs)
logs["loss"] = logs["train_loss"]
return logs
def validation_step(self, batch, batch_idx):
src, tgt = batch[0], batch[1]
logs, logs_detail = self._forward(src, tgt, "val", 0.0)
self.log_dict(logs)
logs["loss"] = logs["val_loss"]
logs.update(logs_detail)
return logs
def test_step(self, batch, batch_idx):
src, tgt = batch[0], batch[1]
logs, logs_detail = self._forward(src, tgt, "test", 0.0)
self.log_dict(logs)
logs["loss"] = logs["test_loss"]
logs.update(logs_detail)
return logs
위와같이 lightning module을 이용하여 Trainer로 돌릴 로직을 작성해줍니다. 참고로 _text_postprocessing은 <pad> 이런걸 없애고 공백 없애고 <eos>, <bos>를 번역한 문장에서 없애주는 후처리기라고 생각하면 될거 같습니다. 이렇게 안하면 매우 지저분해집니다. (참고로 <unk>는 유지)
# utils for initialize model weights
def init_weights(model: Union[nn.Module, pl.LightningModule]):
for name, param in model.named_parameters():
nn.init.uniform_(param.data, -0.08, 0.08)
그리고 모델의 학습가능한 가중치를 초기화 하는 로직을 간단히 짜주었습니다.
Model 정의 (Vanilla LSTMSeq2Seq)
# model definition
# 1. encoder
'''
lstm에서 출력결과는 각 시점에 대해 append한 값이 될 것이다.
pytorch 상에서의 input, output dim을 살펴보면
input: (L, N, H_{in}) <- (h0, c0)은 입력으로 주지 않으면 0으로 초기화
output: (L, N, D * H_{out}) <- bidirection lstm의 경우 D는 2 그렇지 않으면 1일 것이다.
--> (h_n, c_n)이 튜플형태로 또 나오게 된다.
--> h_n: (D * num_layers, N, H_out)
--> c_n: (D * num_layers, N, H_cell)
--> 여기서 H_cell과 H_out은 proj_size가 없는 이상 hidden size와 동일하다. 이는 lstm 논문을 보면 이해 가능하다.
'''
class LSTMEncoder(nn.Module):
def __init__(
self,
input_dim: int, # 인코더에서 사용할 입력층
embed_dim: int, # 인코더에서 사용할 임베딩 계층
hidden_dim: int, # 인코더에서 사용할 은닉층(이전 은닉층)
n_layers: int, # 인코더에서 사용할 LSTM의 계층 개수
dropout: float,
):
super().__init__()
self.hidden_dim = hidden_dim
self.n_layers = n_layers
# embed_dim으로 임베딩을 수행한다.
self.embedding = nn.Embedding(input_dim, embed_dim)
self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout)
self.dropout = nn.Dropout(dropout)
# nn.Module에 있는 모든 parameter를 초기화한다.
self.apply(init_weights)
def forward(self, src):
# src = [seq_len, batch_size]
embedded = self.dropout(self.embedding(src)) # [seq_len, batch_size, embed_dim]
outputs, (hidden, cell) = self.rnn(embedded)
# outputs = [seq_len, batch_size, hidden_dim * n directional]
# hidden, cell = [n layers * n directions, batch_size, hidden_dim]
# outputs will be used from top hidden layers
return hidden, cell
# 2. decoder
class LSTMDecoder(nn.Module):
def __init__(
self,
output_dim: int,
embed_dim: int,
hidden_dim: int,
n_layers: int,
dropout: float,
):
super().__init__()
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.output_dim = output_dim
self.embedding = nn.Embedding(output_dim, embed_dim)
self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout)
self.fc_out = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
self.softmax = nn.LogSoftmax(dim=1)
def forward(self, input, hidden, cell):
# input: [batch size, ....] <- start_token
# outputs = [seq_len, batch_size, hidden_dim * n directional]
# hidden, cell = [n layers * n directions, batch_size, hidden_dim]
input = input.unsqueeze(0) # <- [1, batch_size, ...] 입력을 (1, 배치크기)로 변경해야 한다.
embedded = self.dropout(self.embedding(input))
# embedding = [1, batch_size, embed_dim]
output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
# output = [1, batch_size, hidden_dim]
# hidden, cell = [n layers * 1 directions, batch_size, hidden_dim] <- not bidirectional
prediction = self.softmax(self.fc_out(output.squeeze(0))) # [batch_size, output_dim]
return prediction, hidden, cell
# 3. seq2seq <-- encoder + decoder
class LSTMSeq2Seq(BaseTranslateLightningModule):
def __init__(self, cfg: DictConfig):
super().__init__(cfg)
# encoder, decoder, device
self.encoder = LSTMEncoder(**cfg.model.enc)
self.decoder = LSTMDecoder(**cfg.model.dec)
assert self.encoder.hidden_dim == self.decoder.hidden_dim, \
"Hidden dimensions of encoder and decoer must be equal"
assert self.encoder.n_layers == self.decoder.n_layers, \
"Encoder and decoder must have equal number of layers" # -> lstm에서 나온 텐서의 크기를 일치 시켜주기 위함이다.
# --> [n_layers * D, batch_size, hidden_dim]..(1) 이런식으로 텐서의 크기가 맞추어 지기 때문이다.
# parameters들 init.
self.apply(init_weights)
def forward(self, src, tgt, teacher_forcing_ratio: float = 0.5):
# src, tgt = [seq_len (can be different), batch_size]
# for val, test teacher forcing should be 0.0
# e.g. if teacher_forcing_ratio is 0.75 we use ground-truth inputs 75% of the time
batch_size = tgt.shape[1]
tgt_len = tgt.shape[0]
tgt_vocab_size = self.decoder.output_dim
# tensor to store decoder outputs
outputs = torch.zeros(tgt_len, batch_size, tgt_vocab_size).to(self.device)
# last hidden state of the encoder is used as the initial hidden state of the decoder
hidden, cell = self.encoder(src) # (1) 텐서의 형태를 가질 것이다. -> decoder의 입력으로 들어간다.
# first input to the decoder is the <sos> tokens
input = tgt[0, :] # [<sos>, batch_size]
# Autogressive 만큼 tgt_len만큼 반복을 할 것이다.
for t in range(1, tgt_len):
# get one cell's output
output, hidden, cell = self.decoder(input, hidden, cell)
# set to all output result
outputs[t] = output
# decide whether going to use teacher forcing or not.
teacher_force = random.random() < teacher_forcing_ratio
# get the highest predicted token from our predictions
top1 = output.argmax(1)
input = tgt[t] if teacher_force else top1
return outputs
읽으면 잘 이해되실 겁니다. 아주 상세히 설명을 적어두었습니다. 나중에 시간이 된다면 그림으로 직접 그려보도록 하겠습니다.
Model 정의 (Additive Attention Based GRUSeq2Seq)
# Concat; Additive Attention 기반의 모델 새로 정의
# encoder, decoder rnn이 다를 수 있다.
# --> original seq2seq은 같은 lstm을 사용했지만, 같으리라는 법은 없다! 다르게 정의해보자.
class BidirectionalGRUEncoder(nn.Module):
def __init__(
self,
input_dim: int,
embed_dim: int,
enc_hidden_dim: int,
dec_hidden_dim: int,
n_layers: int,
dropout: float,
):
super().__init__()
self.input_dim = input_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(input_dim, embed_dim)
self.rnn = nn.GRU(embed_dim, enc_hidden_dim, n_layers, dropout=dropout, bidirectional=True)
self.fc = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim)
self.dropout = nn.Dropout(dropout)
# nn.Module에 있는 모든 parameter를 초기화한다.
self.apply(init_weights)
def forward(self, src):
# src = [seq_len, batch_size]
embedded = self.dropout(self.embedding(src)) # [seq_len, batch_size, embed_dim]
outputs, hidden = self.rnn(embedded)
# outputs = [seq_len, batch_size, hidden_dim * n directional]
# hidden, cell = [n layers * n directions, batch_size, hidden_dim]
# hidden -> [forward_1, backward_1, forward_2, backward_2, ...]
# 우리가 필요한 건 맨 마지막 layer의 forward backward 두개 concat한 것이 필요. -> 이걸 가지고 decoder에 넘겨야 하니까
# --> bidirectional GRU의 구조를 보면 이해할 수 있을것임
# encoder RNNs fed through a linear layer to connect decoder
hidden = torch.tanh(self.fc(
torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
))
return outputs, hidden
'''Bahdanau Attention
Query: t-1 시점의 은닉상태(=s_{t-1})를 사용함
score(s_{t-1}, h_{i}) = Wa^{T}*tanh(Wb*s_{t-1} + WcH) 으로 나타낼 수 있음
---> Wa, Wb, Wc는 학습 가능한 가중치 행렬, H는 h1, h2, h3, h4의 열백터로 이루어져있는 행렬임.
'''
class ConcatAttention(nn.Module):
def __init__(self, enc_hidden_dim: int, dec_hidden_dim: int):
super().__init__()
self.attn = nn.Linear((enc_hidden_dim * 2) + dec_hidden_dim, dec_hidden_dim)
self.v = nn.Linear(dec_hidden_dim, 1, bias=False)
def forward(self, hidden, encoder_outputs):
# hidden = [batch_size, dec_hidden_dim] (query)
# encoder_output = [src_len, batch_size, enc_hidden_dim * 2] (key, value)
batch_size = encoder_outputs.shape[1]
src_len = encoder_outputs.shape[0]
# repeat decoder hidden state src_len times ... -> concate해주기 위해 길이를 맞추어 주는 과정이다.
hidden = hidden.unsqueeze(1).repeat(1, src_len, 1) # [batch_size, src_len, dec_hidden_dim]
encoder_outputs = encoder_outputs.permute(1, 0, 2) # [batch_size, src_len, enc_hidden_dim * 2]
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2))) # [batch_size, src_len, dec_hidden_dim]
attention = self.v(energy).squeeze(2) # [batch_size, src_len]
return F.softmax(attention, dim=1)
class AttentionRNNDecoder(nn.Module):
def __init__(
self,
output_dim: int,
embed_dim: int,
enc_hidden_dim: int,
dec_hidden_dim: int,
n_layers: int,
dropout: float,
attention: nn.Module,
):
super().__init__()
self.output_dim = output_dim
self.attention = attention
self.embedding = nn.Embedding(output_dim, embed_dim)
self.rnn = nn.GRU((enc_hidden_dim * 2) + embed_dim, dec_hidden_dim, n_layers, dropout=dropout)
self.fc_out = nn.Linear((enc_hidden_dim*2) + dec_hidden_dim + embed_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, encoder_outputs):
# input [batch_size] <-- start_token
# hidden [batch_size, dec_hidden_dim]
# encoder_outputs [src_len, batch_size, enc_hidden_dim * 2]
input = input.unsqueeze(0) # [1, batch_size]
embedded = self.dropout(self.embedding(input)) # [1, batch_size, embed_dim]
a = self.attention(hidden, encoder_outputs) # [batch_size, src_len]
a = a.unsqueeze(1) # [batch_size, 1, src_len]
encoder_outputs = encoder_outputs.permute(1, 0, 2) # [batch_size, src_len, enc_hidden_dim * 2]
weighted = torch.bmm(a, encoder_outputs) # [batch_size, 1, enc_hidden_dim * 2] -> [batch_size, (attention context_vector)] 라고 할 수 있다.
weighted = weighted.permute(1, 0, 2) # [1, batch_size, enc_hidden_dim * 2]
rnn_input = torch.cat((embedded, weighted), dim=2) # [1, batch_size, (enc_hidden_dim * 2, embed_dim)]
# hidden_unsqueeze: [1, batch_size, dec_hidden_dim]
output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
# output = [seq_len, batch_size, dec_hidden_dim * n_directions] => [1, batch_size, dec_hidden_dim]
# hidden = [n layers * n_directions, batch_size, dec_hidden_dims] => [1, batch_size, dec_hidden_dim]
if not (output == hidden).all():
raise AssertionError()
# sequence dim을 없애고 concat할 전처리를 해준다. 바나다우 어텐션에서 fully-connected layer에 넣어주기 위해 해주어야 한다.
embedded = embedded.squeeze(0)
output = output.squeeze(0)
weighted = weighted.squeeze(0)
prediction = self.fc_out(torch.cat((output, weighted, embedded), dim=1)) # [batch_size, output_dim]
return prediction, hidden.squeeze(0)
class AttentionBasedSeq2Seq(BaseTranslateLightningModule):
def __init__(self, cfg: DictConfig):
super().__init__(cfg)
self.encoder = BidirectionalGRUEncoder(**cfg.model.enc)
self.attention = ConcatAttention(**cfg.model.attention)
self.decoder = AttentionRNNDecoder(
attention=self.attention,
**cfg.model.dec
)
def forward(self, src, tgt, teacher_forcing_ratio: float = 0.5):
# src, tgt = [seq_len (can be different), batch_size]
# for val, test teacher forcing should be 0.0
# e.g. if teacher_forcing_ratio is 0.75 we use ground-truth inputs 75% of the time
batch_size = tgt.shape[1]
tgt_len = tgt.shape[0]
tgt_vocab_size = self.decoder.output_dim
outputs = torch.zeros(tgt_len, batch_size, tgt_vocab_size).to(self.device)
encoder_outputs, hidden = self.encoder(src)
input = tgt[0, :] # [<sos>, batch_size]
# Autogressive 만큼 tgt_len만큼 반복을 할 것이다.
for t in range(1, tgt_len):
output, hidden = self.decoder(input, hidden, encoder_outputs)
# set to all output result
outputs[t] = output
# decide whether going to use teacher forcing or not.
teacher_force = random.random() < teacher_forcing_ratio
# get the highest predicted token from our predictions
top1 = output.argmax(1)
input = tgt[t] if teacher_force else top1
return outputs
이는 아까 말했다 싶이 Attention 을 GRUSeq2Seq에 적용해 구현한 모델입니다. 위 2개에 대한 config를 작성해보고, 비교분석해보겠습니다.
Configuration (data, model, opt, train, log)
data_spacy_de_en_cfg = {
"name": "spacy_de_en",
"data_root": os.path.join(os.getcwd(), "data"),
"tokenizer": "spacy",
"src_lang": "de",
"tgt_lang": "en",
"src_index": 0,
"tgt_index": 1,
"vocab": {
"special_symbol2index": {
"<unk>": 0,
"<pad>": 1,
"<bos>": 2,
"<eos>": 3,
},
"special_first": True,
"min_freq": 2
}
}
data_cfg = OmegaConf.create(data_spacy_de_en_cfg)
# get_dataset
train_iter = Multi30k(split='train', language_pair=(data_cfg.src_lang, data_cfg.tgt_lang))
valid_iter = Multi30k(split='valid', language_pair=(data_cfg.src_lang, data_cfg.tgt_lang))
token_transform = get_token_transform(data_cfg)
vocab_transform = get_vocab_transform(data_cfg)
먼저 위에서 작성한 data configuration파일을 가지고 dataset을 얻습니다.
# model config
model_translate_lstm_seq2seq_cfg = {
"name": "LSTMSeq2Seq",
"enc": {
"input_dim": len(vocab_transform[data_cfg.src_lang]),
"embed_dim": 256,
"hidden_dim": 256,
"n_layers": 2,
"dropout": 0.5,
},
"dec": {
"output_dim": len(vocab_transform[data_cfg.tgt_lang]),
"embed_dim": 256,
"hidden_dim": 256,
"n_layers": 2,
"dropout": 0.5,
},
"teacher_forcing_ratio": 0.5
}
model_translate_attention_based_seq2seq_cfg = {
"name": "AttentionBasedSeq2Seq",
"enc": {
"input_dim": len(vocab_transform[data_cfg.src_lang]),
"embed_dim": 256,
"enc_hidden_dim": 512,
"dec_hidden_dim": 512,
"n_layers": 2,
"dropout": 0.5,
},
"dec": {
"output_dim": len(vocab_transform[data_cfg.tgt_lang]),
"embed_dim": 256,
"enc_hidden_dim": 512,
"dec_hidden_dim": 512,
"n_layers": 1,
"dropout": 0.5,
},
"attention": {
"enc_hidden_dim": 512,
"dec_hidden_dim": 512,
},
"teacher_forcing_ratio": 0.5
}
# opt_config
opt_cfg = {
"optimizers": [
{
"name": "RAdam",
"kwargs": {
"lr": 1e-3,
}
}
],
"lr_schedulers": [
{
"name": None,
"kwars": {
"warmup_end_stage": 1000
}
}
]
}
_merged_cfg_presets = {
"LSTM_seq2seq_de_en_translate": {
"opt": opt_cfg,
"data": data_spacy_de_en_cfg,
"model": model_translate_lstm_seq2seq_cfg,
},
"attention_based_seq2seq_de_en_translate": {
"opt": opt_cfg,
"data": data_spacy_de_en_cfg,
"model": model_translate_attention_based_seq2seq_cfg,
}
}
# clear config hydra instance first
hydra.core.global_hydra.GlobalHydra.instance().clear()
# register preset configs
register_config(_merged_cfg_presets)
# initialization & compose configs
hydra.initialize(config_path=None, version_base="1.1")
cfg = hydra.compose("attention_based_seq2seq_de_en_translate")
# override some cfg
run_name = f"{datetime.now().isoformat(timespec='seconds')}-{cfg.model.name}-{cfg.data.name}"
project_root_dir = os.path.join(drive_project_root, "runs", "de_en_translate_tutorials")
save_dir = os.path.join(project_root_dir, run_name)
run_root_dir = os.path.join(project_root_dir, run_name)
train_cfg = {
"train_batch_size": 128,
"val_batch_size": 32,
"test_batch_size": 32,
"train_val_split": [0.9, 0.1],
"run_root_dir": run_root_dir,
"trainer_kwargs": {
# "accelerator": "dp",
# "gpus": "0",
"max_epochs": 50,
"val_check_interval": 1.0,
"log_every_n_steps": 100,
# "flush_logs_every_n_steps": 100,
}
}
# logger config
log_cfg = {
"loggers": {
"WandbLogger": {
"project": "fastcampus_de_en_translate_tutorials",
"name": run_name,
"tags": ["fastcampus_de_en_translate_tutorials"],
"save_dir": run_root_dir,
},
"TensorBoardLogger": {
"save_dir": project_root_dir,
"name": run_name,
"log_graph": True,
}
},
"callbacks": {
"ModelCheckpoint": {
"save_top_k": 3,
"monitor": "val_loss",
"mode": "min",
"verbose": True,
"dirpath": os.path.join(run_root_dir, "weights"),
"filename": "{epoch}-{val_loss:.3f}",
},
"EarlyStopping": {
"monitor": "val_loss",
"mode": "min",
"patience": 3,
"verbose": True,
}
}
}
# unlock config & set train_cfg & log_cfg
OmegaConf.set_struct(cfg, False)
cfg.train = train_cfg
cfg.log = log_cfg
# lock config
OmegaConf.set_struct(cfg, True)
print(OmegaConf.to_yaml(cfg))
그리고 위와같이 필요한 Configuration 파일을 정의해줍니다. 당연히 위를 잘 보시면, 2가지 버전에 따른 다른 model configuration 을 작성해서 merge한 것을 볼 수 있습니다.
일단 attention 기반의 configuration 내용을 hydra를 이용해 선택하였습니다.
그리고 opt와 같은 경우는 맨 위에서 우리가 외부 스크립트로 작성된 것을 토대로 cfg.opt.name = "RAdam"이러면 RAdam 관련 optimizer module을 base lightning module에서 configure_optimizers함수에서 주입합니다. 예를 들어 아래와 같이 말이죠.
이제 좀 왜 config_utils 외부 스크립트를 작성한지 이해가 되시죠. configuration을 더 용이하게 하기 위합니다. 나중에 하이퍼파라미터 튜닝을 해서 실험을 하는 경우에 매우 용이하게 사용됩니다.
저같은 경우는 model_translate_lstm_seq2seq_cfg에는 hidden_dim을 256, n_layers를 2로 두었습니다.
그리고 model_translate_attention_based_seq2seq_cfg는 아래와 같이 encoder-dec dim을 512로 두고 n_layers도 encoder, decoder 다르게 두었답니다.
코드를 보시면 아시겠지만 Attention 기반에서는 encoder는 BidirectionalGRUEncoder로서, hidden_dim을 512로 두었습니다. 이에 대한 텐서 계산이 조금 많이 빡셌습니다. 하지만 pytorch 공식 문서에 자세히 나와있으므로 참고 바랍니다.
https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html
https://pytorch.org/docs/stable/generated/torch.nn.GRU.html?highlight=gru#torch.nn.GRU
Dataloader 정의
# dataloader def
train_dataloader = get_multi30k_dataloader(
"train",
(cfg.data.src_lang, cfg.data.tgt_lang),
cfg.train.train_batch_size,
collate_fn=get_collate_fn(cfg)
)
val_dataloader = get_multi30k_dataloader(
"valid",
(cfg.data.src_lang, cfg.data.tgt_lang),
cfg.train.val_batch_size,
collate_fn=get_collate_fn(cfg)
)
# test_dataloader = get_multi30k_dataloader(
# "test",
# (cfg.data.src_lang, cfg.data.tgt_lang),
# cfg.train.test_batch_size,
# collate_fn=get_collate_fn(cfg)
# )
test dataloader는 일단 만들지 않았습니다.
Get Model 함수 정의
def get_pl_model(cfg: DictConfig, checkpoint_path: Optional[str] = None):
if cfg.model.name == "LSTMSeq2Seq":
model = LSTMSeq2Seq(cfg)
elif cfg.model.name == "AttentionBasedSeq2Seq":
model = AttentionBasedSeq2Seq(cfg)
else:
raise NotImplementedError("Not implemented model")
if checkpoint_path is not None:
model = model.load_from_checkpoint(cfg, checkpoint_path=checkpoint_path)
return model
model = None
model = get_pl_model(cfg)
print(model)
이제 우리가 작성한 모든 설정파일을 토대로 cfg.model.name에 따라서 서로 다른 모델을 가져오도록 합니다. 그리고 checkpoint가 있다면 이를 가져오도록 합니다.
우리가 최종적으로 돌릴 모델은 위와같습니다.
Train!!
# pytorch lightning trainer def
logger = get_loggers(cfg)
callbacks = get_callbacks(cfg)
trainer = pl.Trainer(
callbacks=callbacks,
logger=logger,
default_root_dir=cfg.train.run_root_dir,
num_sanity_val_steps=2,
**cfg.train.trainer_kwargs,
)
최종적으로 logger, callbacks를 정의하고 pl.Trainer에
이를 다 박습니다.
trainer.fit(model, train_dataloader, val_dataloader)
위와같이 배치마다 저장되는 한개씩의 값을 1epoch마다 아래의 로직으로 찍어보았습니다.
그리고 이전에 작성한 LSTMSeq2Seq을 돌렸었는데, 이와 비교한 Wandb의 로그를 보겠습니다.
파란색이 Attention기반인데, 훨씬 성능이 좋은거 같다는 생각이 팍팍 들죠??
하지만 여전히 번역이 잘 되는거 같나요?
2724 step)
- src_text: Zwei schwarz gekleidete Männer mit einer grünen und einer roten Fliege treten vor einer Menschenmenge auf .
- tgt_text: Two men in black clothes with blue and red bowties are performing in front of a crowd .
- result_text: <unk> men in black and green green and red are a crowd a crowd
뭔가 맥락은 잘 학습하는거 같지만, 상업적 용도로 이를 사용하면 해당 제품은 바로 망할거 같습니다.. 당연히 제가 하이퍼파라미터 값을 제대로 설정하지 않은 탓이기도 하겠지만 이는 모델의 Capacity가 너무 적고 추후에 발전된 모델들이 더 많이 나옵니다. Seq2Seq는 학습속도가 AutoRegressive하기 때문에 느리고, 고정된 Context-vector를 사용하기 때문에 한계점이 많습니다.
그래서 다음에 AutoRegressive와 관련된내용, 생성형모델(=AutoEncoder,..) 등을 살펴보고 최종적으로 Transformer를 실제 구현해보도록 하겠습니다.