https://github.com/eunoiahyunseo/rofydeo-model-archiving/tree/main/models/ViT
해당 github 주소에 코드들은 올려 놓았습니다.
모델 구현
# pytocrh와 기타 util라이브러리를 import해온다. import torch import torch.nn.functional as F import matplotlib.pyplot as plt from torch import nn from torch import Tensor from PIL import Image from torchvision.transforms import Compose, Resize, ToTensor # 텐서의 차원관리를 해주는, einops from einops import rearrange, reduce, repeat from einops.layers.torch import Rearrange, Reduce # pytorch모델의 구조도와 요약을 확인할 수 있다. from torchsummary import summary
위와같이 구현에 필요한 모듈들을 불러와줍니다. 여기서 우리는 텐서의 차원관리를 einops를 통해 해보겠습니다. 한번 사용해보니까 매우 편하고 앞으로 애용하게 될 것 같습니다.
Patch Embedding
class PatchEmbedding(nn.Module): def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768): self.patch_size = patch_size super().__init__() self.projection = nn.Sequential( # ViT논문에서는 Conv2d를 사용하는게, Linear레이어 하나를 더 추가하는 것보다 더 계산 효율적이라고 했음 # 최종적으로 [batch_size, (h//patch_size)*(w//patch_size), embed_size)]크기의 텐서가 된다. nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size), Rearrange('b e (h) (w) -> b (h w) e') # Rearrange('b c (h s1) (w s2) -> b (h w) (s1 s2 c)', s1=patch_size, s2=patch_size), # nn.Linear(patch_size * patch_size * in_channels, emb_size) # linear projection ) def forward(self, x: Tensor) -> Tensor: x = self.projection(x) return x PatchEmbedding()(x).shape
patchEmbedding을 우선 구현한 부분입니다. 이는 학습 가능한 행렬로서 emb_size로 patch의 sequence들을 투영시킵니다. 하지만 논문의 Appendix에 구현상으로는 Conv2d를 쓰는 것이 더욱 계산 효율적이라고 되어있습니다.
CLS TOKEN
class PatchEmbedding(nn.Module): def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768): self.patch_size = patch_size super().__init__() self.projection = nn.Sequential( # ViT논문에서는 Conv2d를 사용하는게, Linear레이어 하나를 더 추가하는 것보다 더 계산 효율적이라고 했음 # 최종적으로 [batch_size, (h//patch_size)*(w//patch_size), embed_size)]크기의 텐서가 된다. nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size), Rearrange('b e (h) (w) -> b (h w) e') # Rearrange('b c (h s1) (w s2) -> b (h w) (s1 s2 c)', s1=patch_size, s2=patch_size), # nn.Linear(patch_size * patch_size * in_channels, emb_size) # linear projection ) # nn.parameter는 모델에 학습 가능한 파라미터를 추가할 때 텐서로 추가하는 방법이다. self.cls_token = nn.Parameter(torch.randn(1, 1, emb_size)) def forward(self, x: Tensor) -> Tensor: b, _, _, _ = x.shape x = self.projection(x) cls_token = repeat(self.cls_token, '() n e -> b n e', b=b) x = torch.cat([cls_token, x], dim=1) return x PatchEmbedding()(x).shape
CLS TOKEN을 추가해주었습니다. nn.Parameter로 학습가능한 가중치로서 모델에 z0부분에 추가해줍니다. 제가 주석으로 코드의 결과의 행렬 사이즈를 상세히 서술했으니 참고하시길 바랍니다.
POSITION EMBEDDING
class PatchEmbedding(nn.Module): def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768, img_size: int = 224): self.patch_size = patch_size super().__init__() self.projection = nn.Sequential( # ViT논문에서는 Conv2d를 사용하는게, Linear레이어 하나를 더 추가하는 것보다 더 계산 효율적이라고 했음 # 최종적으로 [batch_size, (h//patch_size)*(w//patch_size), embed_size)]크기의 텐서가 된다. nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size), Rearrange('b e (h) (w) -> b (h w) e') # Rearrange('b c (h s1) (w s2) -> b (h w) (s1 s2 c)', s1=patch_size, s2=patch_size), # nn.Linear(patch_size * patch_size * in_channels, emb_size) # linear projection ) # nn.parameter는 모델에 학습 가능한 파라미터를 추가할 때 텐서로 추가하는 방법이다. self.cls_token = nn.Parameter(torch.randn(1, 1, emb_size)) self.positions = nn.Parameter(torch.randn((img_size // patch_size) ** 2 + 1, emb_size)) def forward(self, x: Tensor) -> Tensor: b, _, _, _ = x.shape x = self.projection(x) cls_token = repeat(self.cls_token, '() n e -> b n e', b=b) x = torch.cat([cls_token, x], dim=1) x += self.positions return x PatchEmbedding()(x).shape
learnable한 position embedding을 추가해줍니다. 논문에 나와있는 수치 그대로를 nn.Parameter로 정의해주고 그대로 이전 결괏값과 더해줍니다. 이는 패치의 위치정보를 ViT에서 확실히 학습할 수 있게됩니다.
Transformer
''' 원래 트랜스포머에서 Wq, Wk, Wv 벡터의 차원은 d_model보다 작은 차원을 갖는다. [emb_size, d_model // num_heads]의 차원을 가지고 추후에 MSA의 끝단에서 concatenate하게 된다. ''' class MultiHeadAttention(nn.Module): def __init__(self, emb_size: int = 768, num_heads: int = 8, dropout: float = 0): super().__init__() self.emb_size = emb_size self.num_heads = num_heads self.keys = nn.Linear(emb_size, emb_size) self.queries = nn.Linear(emb_size, emb_size) self.values = nn.Linear(emb_size, emb_size) self.att_drop = nn.Dropout(dropout) self.projection = nn.Linear(emb_size, emb_size) self.scaling = (self.emb_size // num_heads) ** -0.5 def forward(self, x: Tensor, mask: Tensor = None) -> Tensor: # 위에서 말했던 것처럼 num_heads로 keys, queries, values를 쪼갠다. # [batch, heads, seq_len, emb_size] 크기의 텐서가 된다. # [1, 8, 197, 96] -> x는 나누기 전인 [1, 197, 768] queries = rearrange(self.queries(x), "b n (h d) -> b h n d", h=self.num_heads) keys = rearrange(self.keys(x), "b n (h d) -> b h n d", h=self.num_heads) values = rearrange(self.values(x), "b n (h d) -> b h n d", h=self.num_heads) # print('qureis shape -> ', queries.shape) # print('keys shape -> ', keys.shape) # print('values shape -> ', values.shape) # queries, keys를 이제 행렬곱 해주어야 한다. # 아래 코드와같이 하면 자동으로 transpose되고 내적이 된다. # [batch, heads, query_len, key_len] 크기의 텐서가 된다. # [1, 8, 197, 197] energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys) # print('energy shape -> ', energy.shape) if mask is not None: fill_value = torch.finfo(torch.float32).min # -max energy.mask_fill(~mask, fill_value) att = F.softmax(energy, dim=-1) * self.scaling # scaling된 attention score att = self.att_drop(att) # print('att shape -> ', att.shape) out = torch.einsum('bhal, bhlv -> bhav', att, values) out = rearrange(out, "b h n d -> b n (h d)") out = self.projection(out) return out patches_embedded = PatchEmbedding()(x) MultiHeadAttention()(patches_embedded).shape
transformer부분을 간단히 MSA부분만 구현해주었습니다.
Residual Network
class ResidualAdd(nn.Module): def __init__(self, fn): super().__init__() self.fn = fn def forward(self, x, **kwargs): res = x x = self.fn(x, **kwargs) x += res return x
ResNet구조를 구현했습니다.
MLP
class FeedForwardBlock(nn.Sequential): def __init__(self, emb_size: int, expansion: int = 4, drop_p: float = 0.): super().__init__( # upsample을 expansion ratio만큼 해주었음 원래 트랜스 포머에서도 d_model=512 -> dfff=2048 (expansion ratio = 4) nn.Linear(emb_size, expansion * emb_size), nn.GELU(), # Dropout은 원래 과적합이 일어나기 쉬운 Dense, Fully Connected Layer뒤에 적용하는 것이 일반적이다. # Attention Layer뒤에도 Dropout을 사용하는데, 이는 모델이 특정 헤드에 지나치게 의존하는 것을 방지한다. -> 위에 적용했음 nn.Dropout(drop_p), nn.Linear(expansion * emb_size, emb_size) )
MLP부분을 GELU activation function과 expansion ratio를 통해 bottle-neck구조로 만들어 주었습니다.
TransformerEncoderBlock
class TransformerEncoderBlock(nn.Sequential): def __init__(self, emb_size: int = 768, drop_p: float = 0, forward_expansion: int = 4, forward_drop_p: float = 0., **kwargs): super().__init__( ResidualAdd( nn.Sequential( nn.LayerNorm(emb_size), MultiHeadAttention(emb_size, **kwargs), nn.Dropout(drop_p))), ResidualAdd( nn.Sequential( nn.LayerNorm(emb_size), # layer normalization FeedForwardBlock( emb_size, expansion=forward_expansion, drop_p=forward_drop_p), nn.Dropout(drop_p))) ) patches_embedded = PatchEmbedding()(x) TransformerEncoderBlock()(patches_embedded).shape
Encoder Block을 nn.Sequential로 간단히 구현해줍니다.
TransformerEncoder
class TransformerEncoder(nn.Sequential): def __init__(self, depth: int = 12, **kwargs): super().__init__(*[TransformerEncoderBlock(**kwargs) for _ in range(depth)])
Encoder를 depth와함께 반복해 최종적으로 만들어줍니다.
Classification Head
class ClassificationHead(nn.Sequential): def __init__(self, emb_size: int = 768, n_classes: int = 1000): super().__init__( Reduce('b n e -> b e', reduction='mean'), nn.LayerNorm(emb_size), nn.Linear(emb_size, n_classes) )
classification head를 만들어줍니다.
ViT
class ViT(nn.Sequential): def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768, img_size: int = 224, depth: int = 12, n_classes: int = 1000, **kwargs): super().__init__( PatchEmbedding(in_channels, patch_size, emb_size, img_size), TransformerEncoder(depth, emb_size=emb_size, **kwargs), ClassificationHead(emb_size, n_classes) )
만들어준 head까지 붙혀서 ViT를 만들어줍니다.
학습
https://arxiv.org/pdf/2112.13492.pdf
해당 논문을 기반으로 트랜스포머의 configuration을 설정했습니다.
import os import time from tqdm import tqdm import argparse import torchvision.transforms as tfs from torch.utils.data import DataLoader from timm.models.layers import trunc_normal_ from torchvision.datasets.cifar import CIFAR10 from torch.utils.tensorboard import SummaryWriter
늘 그렇듯, 필요한 코드를 불러오고, CIFAR10데이터셋에 대해 training을 50epoch만 시켜볼 것이므로 이와 관련된 모듈을 불러옵니다.
class ArgumentParser(): def __init__(self, epoch: int = 50, batch_size: int = 128, lr: float = 1e-3, step_size: int = 100, root: str = './CIFAR10', log_dir: str = './log', name: str = 'vit_cifar10', rank: int = 0): self.epoch = epoch self.batch_size = batch_size self.lr = lr self.step_size = step_size self.root = root self.log_dir = log_dir self.name = name self.rank = rank return
관련 파라미터들을 하나의 클래스에 때려박아주었습니다.
vit_cifar_input: dict = { "img_size": 32, "patch_size": 4, "n_classes": 10, "emb_size": 192, "forward_expansion": 2 } ops = ArgumentParser() # device 셋팅 device = torch.device('cuda:{}'.format(0) if torch.cuda.is_available() else 'cpu') # dataset / dataloader 정의를 해준다. transform_cifar = tfs.Compose([ tfs.RandomCrop(32, padding=4), tfs.RandomHorizontalFlip(), tfs.ToTensor(), tfs.Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010)) ]) train_set = CIFAR10(root=ops.root, train=True, download=True, transform=transform_cifar) test_set = CIFAR10(root=ops.root, train=False, download=True, transform=transform_cifar) train_loader = DataLoader(dataset=train_set, shuffle=True, batch_size=ops.batch_size) test_loader = DataLoader(dataset=test_set, shuffle=True, batch_size=ops.batch_size) # model 정의 model = ViT(**vit_cifar_input).to(device) # criterion 정의 criterion = nn.CrossEntropyLoss() # optimizer 정의 optimizer = torch.optim.Adam(model.parameters(), lr=ops.lr, weight_decay=5e-5) # scheduler 정의 scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=ops.epoch, eta_min=1e-5) # logger 정의 os.makedirs(ops.log_dir, exist_ok=True)
그리고 dataloader와, model, criterion, optimizer, scheduler, logger를 정의해줍니다.
# training writer = SummaryWriter() print("training....") best_accuracy = 0.0 for epoch in range(ops.epoch): model.train() tic = time.time() for idx, (img, target) in enumerate(tqdm(train_loader)): img = img.to(device) # [N, 3, 32, 32] <- cifar with batch size target = target.to(device) # [N] output = model(img) # classification_head의 출력이니까 [N, 10] -> cifar10이니까 class=10 loss = criterion(output, target) # crossentropy 값 계산 -> 단순히 분류 문제이기 때문 optimizer.zero_grad() loss.backward() optimizer.step() for param_group in optimizer.param_groups: lr = param_group['lr'] if idx % ops.step_size == 0: writer.add_scalar('Training loss', loss, epoch * len(train_loader) + idx) print('Epoch : {}\t' 'step : [{}/{}]\t' 'loss : {}\t' 'lr : {}\t' 'time {}\t' .format(epoch, idx, len(train_loader), loss, lr, time.time() - tic)) save_path = os.path.join(ops.log_dir, ops.name, 'saves') os.makedirs(save_path, exist_ok=True) # test print('Validation of epoch[{}]'.format(epoch)) model.eval() correct = 0 val_avg_loss = 0 total = 0 with torch.no_grad(): for idx, (img, target) in enumerate(tqdm(test_loader)): img = img.to(device) target = target.to(device) output = model(img) loss = criterion(output, target) output = torch.softmax(output, dim=1) pred, idx_ = output.max(-1) correct += torch.eq(target, idx_).sum().item() total += target.size(0) val_avg_loss += loss.item() print('Epoch {} test : '.format(epoch)) accuracy = correct / total print("accuracy : {:.4f}%".format(accuracy * 100.)) val_avg_loss = val_avg_loss / len(test_loader) if epoch % 5 == 0 and accuracy > best_accuracy: best_accuracy = accuracy save_path = os.path.join(ops.log_dir, ops.name, 'saves') os.makedirs(save_path, exist_ok=True) checkpoint = { 'epoch': epoch, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'scheduler_state_dict': scheduler.state_dict(), 'best_accuracy': best_accuracy } torch.save(checkpoint, os.path.join(save_path, ops.name + '.{}.pth.tar'.format(epoch))) writer.add_scalar('Test loss', loss, epoch) writer.add_scalar('Tert accuracy', val_avg_loss, epoch) scheduler.step()
마지막으로 train 코드를 돌려줍니다. 그리고 ipynb파일을 python파일로 변경하고 background에서 모델을 돌려주어 tensorboard와 함께 모니터링 해주었습니다.
$jupyter nbconvert --to script {파일명}.ipynb $nohup python {파일명}.py &

이제 직접 학습시킨 모델을 통해 제가 다운받은 이미지를 예측시켜보도록 하겠습니다.
import matplotlib.pyplot as plt def display_image_with_label(img, label): plt.imshow(img ) plt.axis('off') # 축 제거 # 레이블(클래스 이름) 표시 plt.title(f"Predicted label: {label}") # 이미지와 레이블 함께 출력 plt.show() model = ViT(**vit_cifar_input) checkpoint_path = './log/vit_cifar10/saves/vit_cifar10.48.pth.tar' checkpoint = torch.load(checkpoint_path) model.load_state_dict(checkpoint['model_state_dict']) cifar10_transform = Compose([Resize((32, 32)), ToTensor()]) origin_img = Image.open('./image.jpg') # 분류하고자 하는 이미지 파일 img = cifar10_transform(origin_img) img = img.unsqueeze(0) # 배치 차원 추가 with torch.no_grad(): # 그래디언트 계산 비활성화 model.eval() outputs = model(img) _, predicted = torch.max(outputs, 1) classes = [ "airplane", # 비행기 "automobile", # 자동차 "bird", # 새 "cat", # 고양이 "deer", # 사슴 "dog", # 개 "frog", # 개구리 "horse", # 말 "ship", # 배 "truck" # 트럭 ] display_image_with_label(origin_img, classes[predicted[0]])

ViT로 정확도는 좀 낮지만, 잘 분류하는 것을 볼 수 있습니다.
그 외에도 hugging space를 이용해 fine-tuning한 코드도 있으니 한번 git들어가서 확인해보시길 바랍니다!
'AIML > 딥러닝 최신 트렌드 알고리즘' 카테고리의 다른 글
[ 딥러닝 최신알고리즘 - PRMI Lab ] - DeiT (data-efficient image transformers & distillation through attention) (1) | 2024.01.12 |
---|---|
[ 딥러닝 최신 알고리즘 - PRMI Lab ] - KD: Knowledge Distillation (0) | 2024.01.08 |
[ 딥러닝 최신 알고리즘 - PRMI Lab ] - ViT: Vision Transformer(2021) (1) | 2024.01.06 |
[ 딥러닝 최신 트랜드 - PRMI Lab ] - Variations of Transformers (0) | 2023.08.06 |
[ 딥러닝 구현 - PRMI Lab ] - 트랜스포머(Transformer)의 구현 (0) | 2023.08.03 |