이번에는 PyTorch로 ResNet, VGGNet등을 구현해 보았었는데, 가장 기본적인 CNN 뼈대를 체계적으로 구축하는 방법을 정리할 것입니다. 그리고 이를 바탕으로 EfficientNet-b1을 아래 git에서 다운 받은다음에, Transfer learning을 통해 Fine-tune을 해서 직접 우리가 구축한 모델과 성능을 비교해보는 시간을 가져보도록 하겠습니다.
https://github.com/lukemelas/EfficientNet-PyTorch
https://pytorch.org/docs/stable/nn.html#convolution-layers
그리고 당연하지만 PyTorch 2.0 을 기준으로 설명하겠습니다.
CNN 구축
Google Drive Mount
from google.colab import drive
drive.mount("/content/drive")
import os
import sys
from datetime import datetime
drive_project_path = "/content/drive/MyDrive/DeepLearning/fastcampus"
sys.path.append(drive_project_path)
!pip install -r "/content/drive/MyDrive/DeepLearning/fastcampus/requirements.txt"
우선 구글 코랩에서 작업하기 때문에, drive_project_path를 정의해준다음, 재사용해줍니다.
Import Package
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from omegaconf import OmegaConf
from omegaconf import DictConfig
import torch
from datetime import datetime
from torch import nn
import torch.nn.functional as F
from torch import optim
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import random_split
from torchvision.datasets import FashionMNIST
from torchvision import transforms
import wandb
from data_utils import dataset_split
Transform Function (with fashion-mnist-dataset)
data_root = os.path.join(os.getcwd(), "data")
# 전처리 부분 (preprocessing) & 데이터 셋 정의.
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5]), # mean, # std
]
)
fashion_mnist_dataset = FashionMNIST(data_root, download=True, train=True, transform=transform)
Define Dataloader
datasets = dataset_split(fashion_mnist_dataset, split=[0.9, 0.1])
train_dataset = datasets["train"]
val_dataset = datasets["val"]
train_batch_size = 100
val_batch_size = 10
train_dataloader = torch.utils.data.DataLoader(
train_dataset, batch_size=train_batch_size, shuffle=True, num_workers=1
)
val_dataloader = torch.utils.data.DataLoader(
val_dataset, batch_size=val_batch_size, shuffle=False, num_workers=1
)
Define CNN
# dictionary 형태로 CNN의 하이퍼 파라미터를 정의한다.
_cnn_cfg_dict: dict = {
"layer_1": {
"conv2d_in_channel": 1,
"conv2d_out_channel": 32,
"conv2d_kernel_size": 3,
"conv2d_padding": 1,
"maxpool2d_kernel_size": 2,
"maxpool2d_stride": 2,
},
"layer_2": {
"conv2d_in_channel": 32,
"conv2d_out_channel": 64,
"conv2d_kernel_size": 3,
"conv2d_padding": 0,
"maxpool2d_kernel_size": 2,
"maxpool2d_stride": 1,
},
"fc_1": {
"in_features": 7744,
"out_features": 512,
},
"fc_2": {
"in_features": 512,
"out_features": 128,
},
"fc_3": {
"in_features": 128,
"out_features": 10,
},
"dropout_prob": 0.25,
}
_cnn_cfg = OmegaConf.create(_cnn_cfg_dict)
with open("cnn_test.yaml", "w") as f:
OmegaConf.save(_cnn_cfg, f)
OmegaConf.load
class CNN(nn.Module):
def __init__(self, cfg: DictConfig = _cnn_cfg):
super().__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(
in_channels=cfg.layer_1.conv2d_in_channel,
out_channels=cfg.layer_1.conv2d_out_channel,
kernel_size=cfg.layer_1.conv2d_kernel_size,
padding=cfg.layer_1.conv2d_padding,
),
nn.BatchNorm2d(cfg.layer_1.conv2d_out_channel),
nn.ReLU(),
nn.MaxPool2d(
kernel_size=cfg.layer_1.maxpool2d_kernel_size,
stride=cfg.layer_1.maxpool2d_stride,
)
)
self.layer2 = nn.Sequential(
nn.Conv2d(
in_channels=cfg.layer_2.conv2d_in_channel,
out_channels=cfg.layer_2.conv2d_out_channel,
kernel_size=cfg.layer_2.conv2d_kernel_size,
padding=cfg.layer_2.conv2d_padding,
),
nn.BatchNorm2d(cfg.layer_2.conv2d_out_channel),
nn.ReLU(),
nn.MaxPool2d(
kernel_size=cfg.layer_2.maxpool2d_kernel_size,
stride=cfg.layer_2.maxpool2d_stride,
)
)
self.fc1 = nn.Linear(
in_features=cfg.fc_1.in_features,
out_features=cfg.fc_1.out_features,
)
self.fc2 = nn.Linear(
in_features=cfg.fc_2.in_features,
out_features=cfg.fc_2.out_features,
)
self.fc3 = nn.Linear(
in_features=cfg.fc_3.in_features,
out_features=cfg.fc_3.out_features,
)
self.dropout = nn.Dropout2d(cfg.dropout_prob)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.view(out.size(0), -1) # batch만 놔두고 flattening
out = self.fc1(out)
out = self.dropout(out)
out = self.fc2(out)
out = self.dropout(out)
out = self.fc3(out)
out = self.dropout(out)
return out
CNN()
CNN 모듈을 정의하는 부분입니다. 모델의 하이퍼 파라미터 관리를 용이하게 하기 위해 OmegaConf를 통해서 "layer1", "layer2", "fc1", "fc2", "fc3"을 정의하였고, 매개변수를 정의해주었습니다.
Define Model, Loss Function, Optimizer, Tensorboard Logger
# gpu setup
# gpu = None
gpu = 0
# define model.
# model = MLP(28*28, 128, 64, 10)
# model = MLPWithDropout(28*28, 128, 64, 10, dropout_prob=0.3)
model = CNN(cfg=_cnn_cfg)
model.cuda(gpu)
model_name = type(model).__name__
# define loss
loss_function = nn.CrossEntropyLoss()
lr = 1e-3
# define optimizer
optimizer = torch.optim.RAdam(model.parameters(), lr=lr)
optimizer_name = type(optimizer).__name__
# define scheduler
scheduler = None
scheduler_name = type(scheduler).__name__ if scheduler is not None else "no"
max_epoch = 50
# define tensorboard logger
run_name = f"{datetime.now().isoformat(timespec='seconds')}-{model_name}-{optimizer_name}_optim_{lr}_lr_with_{scheduler_name}"
run_dirname = "dnn-tutorial-fashion-mnist-runs"
log_dir = os.path.join(drive_project_path, "runs", run_dirname, run_name)
writer = SummaryWriter(log_dir=log_dir)
log_interval = 100
# define wandb
# project_name = "fastcampus_fashion_mnist_tutorials"
# run_tags = [project_name]
# wandb.init(
# project=project_name,
# name=run_name,
# tags=run_tags,
# config={"lr": lr, "model_name": model_name, "optimizer_name": optimizer_name, "scheduler_name": scheduler_name},
# reinit=True,
# )
# set save model path
log_model_path = os.path.join(log_dir, "models")
os.makedirs(log_model_path, exist_ok=True)
우선 모델을 gpu에 할당해주었습니다. loss function으로는 cross entropy loss를 사용해 주었고, Optimizer는 RAdam을 lr=1e-3으로 주었습니다. scheduler는 정의하지 않았으며, tensorboard logger를 google drive내에 모델의 형식에 따라 이름을 다르게 하여 정의하였습니다.
그리고 wandb와 관련된 설정은 생략해주었습니다.
Define Early Stopping callback Object Class
# With some modifications, source is from https://github.com/Bjarten/early-stopping-pytorch
class EarlyStopping:
"""Early stops the training if validation loss doesn't improve after a given patience."""
def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.ckpt', trace_func=print):
"""
Args:
patience (int): How long to wait after last time validation loss improved.
Default: 7
verbose (bool): If True, prints a message for each validation loss improvement.
Default: False
delta (float): Minimum change in the monitored quantity to qualify as an improvement.
Default: 0
path (str): Path for the checkpoint to be saved to.
Default: 'checkpoint.ckpt'
trace_func (function): trace print function.
Default: print
"""
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
self.path = path
self.trace_func = trace_func
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score + self.delta:
self.counter += 1
self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0
def save_checkpoint(self, val_loss, model):
'''Saves model when validation loss decrease.'''
if self.verbose:
self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
filename = self.path.split('/')[-1]
save_dir = os.path.dirname(self.path)
torch.save(model, os.path.join(save_dir, f"val_loss-{val_loss}-{filename}"))
self.val_loss_min = val_loss
EarlyStopping을 위한 모듈을 오픈소스를 참조하여 정의해줍니다. 이는 validation loss가 patient 만큼 참았는데, 계속 떨어지면 에포크를 중단합니다
Training with Validation
%load_ext tensorboard
%tensorboard --logdir /content/drive/MyDrive/DeepLearning/fastcampus/runs/dnn-tutorial-fashion-mnist-runs/
# define EarlyStopping.
early_stopper = EarlyStopping(
patience=3, verbose=True, path=os.path.join(log_model_path, "model.ckpt")
)
# do train with validation.
train_step = 0
for epoch in range(1, max_epoch+1):
# valid step
with torch.no_grad():
val_loss = 0.0
val_corrects = 0
model.eval()
for val_batch_idx, (val_images, val_labels) in enumerate(
tqdm(val_dataloader, position=0, leave=True, desc="validation")
):
if gpu is not None:
val_images = val_images.cuda(gpu)
val_labels = val_labels.cuda(gpu)
# forward
val_outputs = model(val_images)
_, val_preds = torch.max(val_outputs, 1)
# print(val_outputs.shape)
# loss & acc
val_loss += loss_function(val_outputs, val_labels) / val_outputs.shape[0]
val_corrects += torch.sum(val_preds == val_labels.data) / val_outputs.shape[0]
# valid step logging
val_epoch_loss = val_loss / len(val_dataloader)
val_epoch_acc = val_corrects / len(val_dataloader)
print(
f"{epoch} epoch, {train_step} step: val_loss: {val_epoch_loss}, val_acc: {val_epoch_acc}"
)
writer.add_scalar("Loss/val", val_epoch_loss, train_step)
writer.add_scalar("Acc/val", val_epoch_acc, train_step)
writer.add_images("Images/val", val_images, train_step)
# check model early stopping point & save model if the model reached the best performance.
early_stopper(val_epoch_loss, model)
if early_stopper.early_stop:
break
# train step
current_loss = 0
current_corrects = 0
model.train()
for batch_idx, (images, labels) in enumerate(
tqdm(train_dataloader, position=0, leave=True, desc="training")
):
if gpu is not None:
images = images.cuda(gpu)
labels = labels.cuda(gpu)
current_loss = 0.0
current_corrects = 0
# Forward
# get predictions
outputs = model(images)
_, preds = torch.max(outputs, 1)
# get loss (Loss 계산)
loss = loss_function(outputs, labels)
# Backpropagation
# optimizer 초기화 (zero화)
optimizer.zero_grad()
# Perform backward pass
loss.backward()
# Perform Optimization
optimizer.step()
current_loss += loss.item()
current_corrects += torch.sum(preds == labels.data)
if train_step % log_interval == 0:
train_loss = current_loss / log_interval
train_acc = current_corrects / log_interval
print(
f"{train_step}: train_loss: {train_loss}, train_acc: {train_acc}"
)
if scheduler is None:
cur_lr = optimizer.param_groups[0]["lr"] if scheduler is None else scheduler.get_last_lr()[0]
writer.add_scalar("Loss/train", train_loss, train_step)
writer.add_scalar("Acc/train", train_acc, train_step)
writer.add_images("Images/train", images, train_step)
writer.add_scalar("Learning Rate", cur_lr, train_step)
writer.add_graph(model, images)
current_loss = 0
current_corrects = 0
train_step += 1
이에 대한 결과는 나중에 Efficient Net의 tensorboard결과와 같이 살펴보겠습니다.
Load Model with Testing
우리는 위 format대로 모델의 checkpoint를 저장해주었습니다.
# save model
torch.save(model, os.path.join(log_model_path, "model.ckpt"))
# load model
loaded_model = torch.load(os.path.join(log_model_path, "model.ckpt"))
loaded_model.eval()
loaded_model.cpu()
print(loaded_model)
def softmax(x, axis=0):
"numpy softmax"
max = np.max(x, axis=axis, keepdims=True)
e_x = np.exp(x - max)
sum = np.sum(e_x, axis=axis, keepdims=True)
f_x = e_x / sum
return f_x
softmax 함수도 직접 정의해줍니다.
test_batch_size = 100
test_dataset = FashionMNIST(data_root, download=True, train=False, transform=transforms.ToTensor())
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False, num_workers=1)
test_labels_list = []
test_preds_list = []
test_outputs_list = []
for i, (test_images, test_labels) in enumerate(tqdm(test_dataloader, position=0, leave=True, desc="testing")):
# forward
test_outputs = loaded_model(test_images)
_, test_preds = torch.max(test_outputs, 1)
final_outs = softmax(test_outputs.detach().numpy(), axis=1)
test_outputs_list.extend(final_outs)
test_preds_list.extend(test_preds.detach().numpy())
test_labels_list.extend(test_labels.detach().numpy())
test_preds_list = np.array(test_preds_list)
test_labels_list = np.array(test_labels_list)
print(f"\nacc: {np.mean(test_preds_list == test_labels_list)*100}%")
test dataloader를 통해 테스트를 해보았습니다. 출력결과가 없어져서 보여주지 못하지만, accuracy가 0.4정도 나왔는데, 아마 overfitting이 되지 않았나 싶습니다.
# ROC Curve
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
fpr = {}
tpr = {}
thresh = {}
n_class = 10
for i in range(n_class):
fpr[i], tpr[i], thresh[i] = roc_curve(test_labels_list, np.array(test_outputs_list)[:, i], pos_label=i)
# plot.
for i in range(n_class):
plt.plot(fpr[i], tpr[i], linestyle="--", label=f"Class {i} vs Rest")
plt.title("Multi-class ROC Curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend(loc="best")
plt.show()
print("auc_score", roc_auc_score(test_labels_list, test_outputs_list, multi_class="ovo", average="macro"))
ROC curve도 정의해주어 확인해봅니다. 이는 0.9정도가 나왔었습니다.
Transfer learning with EfficientNet
이제 기존 CNN모델과 비교하기 위해 이전에 살펴본 AutoML로 최적의 해상도 조합을 찾은 EfficientNet을 fashion-mnist에 대해 fine-tuning을 진행해보겠습니다.
이를 위해서 requirements.txt에 efficientnet_pytorch=0.7.1 을 넣어주고 돌립니다.
그리고 colab의 gpu를 확인하고, 우리의 15G Mem을 가지는 P8 GPU 1개가 버틸 수 있을거 같은 efficientnet-b0을 선택해줍니다
위와같이 efficientnet-b{n}에서 n이 올라가면 #params가 높아지지만, 그만큼 FPS가 빠르고 Top-1 Acc가 높아집니다.
그리고 우리는 imageNet에 pre-trained된 effficientnet-b0을 사용해야 하기 때문에 fashion-mnist-dataset을 이에 맞게 transform해주어야 합니다.
data_root = os.path.join(os.getcwd(), "data")
# 전처리 부분 (preprocessing) & 데이터 셋 정의.
transform = transforms.Compose(
[
transforms.Resize(224), # EfficientNet image input size에 맞게 3x224x224로 맞추어 준다.
transforms.ToTensor(),
transforms.Lambda(lambda x: x.repeat(3, 1, 1)), # channel 3개로 FashinMnist 데이터 복사
transforms.Normalize([0.485, 0.456, 0.406], [0.228, 0.224, 0.225]), # mean, # std
]
)
fashion_mnist_dataset = FashionMNIST(data_root, download=True, train=True, transform=transform)
그리고 EfficientNet을 정의해주겠습니다.
_efficient_finetune_dict: dict = {
"efficient_net_model_name": "efficientnet-b1",
"num_classes": 10
}
_efficient_finetune_cfg = OmegaConf.create(_efficient_finetune_dict)
print(OmegaConf.to_yaml(_efficient_finetune_cfg))
class EfficientNetFinetune(nn.Module):
def __init__(self, cfg: DictConfig = _efficient_finetune_cfg):
super().__init__()
self.efficientnet = EfficientNet.from_pretrained(
cfg.efficient_net_model_name,
num_classes=cfg.num_classes
)
def forward(self, x):
out = self.efficientnet(x)
return out
그리고 기본적으로 모델을 불러오는 방법은 위에 첨부한 git에 있으니 참고하시길 바랍니다.
# gpu setup
# gpu = None
gpu = 0
# define model.
# model = MLP(28*28, 128, 64, 10)
# model = MLPWithDropout(28*28, 128, 64, 10, dropout_prob=0.3)
# model = CNN(cfg=_cnn_cfg)
model = EfficientNetFinetune(cfg=_efficient_finetune_cfg)
model.cuda(gpu)
model_name = type(model).__name__
# define loss
loss_function = nn.CrossEntropyLoss()
lr = 1e-3
# define optimizer
optimizer = torch.optim.RAdam(model.parameters(), lr=lr)
optimizer_name = type(optimizer).__name__
# define scheduler
scheduler = None
scheduler_name = type(scheduler).__name__ if scheduler is not None else "no"
max_epoch = 50
# define tensorboard logger
run_name = f"{datetime.now().isoformat(timespec='seconds')}-{model_name}-{optimizer_name}_optim_{lr}_lr_with_{scheduler_name}"
run_dirname = "dnn-tutorial-fashion-mnist-runs"
log_dir = os.path.join(drive_project_path, "runs", run_dirname, run_name)
writer = SummaryWriter(log_dir=log_dir)
log_interval = 100
# define wandb
# project_name = "fastcampus_fashion_mnist_tutorials"
# run_tags = [project_name]
# wandb.init(
# project=project_name,
# name=run_name,
# tags=run_tags,
# config={"lr": lr, "model_name": model_name, "optimizer_name": optimizer_name, "scheduler_name": scheduler_name},
# reinit=True,
# )
# set save model path
log_model_path = os.path.join(log_dir, "models")
os.makedirs(log_model_path, exist_ok=True)
그리고 이전과 동일하게 Gpu에 붙혀서 돌려봅니다.
실제로 돌려보니, 우리가 간단히 짠 CNN보다 Batch를 학습하는데, 훨씬 오래걸립니다. 체감 1epoch당 20분. 그래서 4epoch까지만 돌려보겠습니다.
tensorboard에서 이전에 돌린 CNN과 지금 돌리고 있는 efficientnet-b0을 tensorboard에서 표현한 그래프입니다.
확실헤 Train accuracy가 빠르게 높아지는걸 볼 수 있습니다.
validation accuracy은 비슷한거 같습니다.
train loss도 훨씬 빨리 떨어지지만, validation loss는 비슷하지만 약간 더 빨리 떨어지는거 같습니다.
실제 모델을 찍어보면 위와같이 나오는데, 벌써 어질어질합니다. 매우 복잡합니다.
이에 대한 test 결과를 찍어보면 램이 터져서 캡쳐는 못했지만, 0.6정도가 나옵니다. 4-epoch만을 돌렸는데도 말이죠.