머신러닝&딥러닝

UNET 구조 구현하기 #1: 데이터 저장하기 ~ 모델 클래스 구현

seungbeomdo 2023. 1. 24. 14:32

Pytorch 기반의 딥러닝 모델 구현을 연습해보기.

 

한요섭님 유튜브 강의에서 다룬 코드들을 꼼꼼하게 리뷰하면서

딥러닝 모델 구현의.. 일종의 메뉴얼을 습득해보려고 한다.

 

hanyoseob

의료데이터 분석 및 인공지능 개발자, 한요섭 입니다 :D 머신러닝/딥러닝, 신호처리, 병렬 컴퓨팅 (CUDA), 블록체인 (Solidity) 그리고 논문작성 꿀팁 등을 공돌이 관점에서 실습을 통해 알아보도록

www.youtube.com


1. 데이터 저장하기

1.1. 라이브러리 임포트 및 데이터 불러오기

#구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive') 

#필요한 라이브러리 임포트
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

#데이터 불러오기
dir_data = '/content/drive/MyDrive/DSL/OOP/data' #데이터 위치

name_label = 'train-labels.tif' #라벨 
name_input = 'train-volume.tif' #인풋
img_label = Image.open(os.path.join(dir_data, name_label)) #라벨 이미지로 열기
img_input = Image.open(os.path.join(dir_data, name_input)) #인풋 이미지로 열기

ny, nx = img_label.size #이미지 사이즈 512*512 셀
nframe = img_label.n_frames #30개의 이미지

1.2. Train, Validation, Test Set 분리하여 폴더별로 정리하기

#각 데이터셋에 포함할 이미지 개수
nframe_train = 24
nframe_val = 3
nframe_test = 3

#각 데이터셋이 저장되는 위치
dir_save_train = os.path.join(dir_data, 'train') 
dir_save_val = os.path.join(dir_data, 'val')
dir_save_test = os.path.join(dir_data, 'test')

#각 데이터셋을 담을 폴더 만들기
if not os.path.exists(dir_save_train): #'dir_save_train'이라는 위치가 없는 경우
    os.makedirs(dir_save_train) #그 위치를 생성
if not os.path.exists(dir_save_val):
    os.makedirs(dir_save_val)
if not os.path.exists(dir_save_test):
    os.makedirs(dir_save_test)
#원 데이터셋으로부터 랜덤하게 이미지를 뽑아 
#각 폴더로 분배하기 위하여 ID를 셔플하기
id_frame = np.arange(nframe)
np.random.shuffle(id_frame)

#train set 저장 코드
offset_nframe = 0 #id_frame에서 0번째 id부터 시작

##총 24개의 이미지를 저장
for i in range(nframe_train):

    ###라벨 이미지 중 주어진 id에 해당하는 이미지 찾기
    img_label.seek(id_frame[i + offset_nframe]) 

    ###인풋 이미지 중 주어진 id에 해당하는 이미지 찾기
    img_input.seek(id_frame[i + offset_nframe]) 

    ###array로 변환
	label_ = np.asarray(img_label) 
    input_ = np.asarray(img_input)

    ###저장
    np.save(os.path.join(dir_save_train, 'label_%03d.npy' % i), label_) 
    np.save(os.path.join(dir_save_train, 'input_%03d.npy' % i), input_)
    
#validation set 저장 코드

##24번째 id부터 시작
##(train set에 포함된 이미지 제외하기 위해)
offset_nframe = nframe_train 
for i in range(nframe_val):
    img_label.seek(id_frame[i + offset_nframe])
    img_input.seek(id_frame[i + offset_nframe])

    label_ = np.asarray(img_label)
    input_ = np.asarray(img_input)

    np.save(os.path.join(dir_save_val, 'label_%03d.npy' % i), label_)
    np.save(os.path.join(dir_save_val, 'input_%03d.npy' % i), input_)
    
#test set 저장 코드

##28번째 id부터 시작
offset_nframe = nframe_train + nframe_val 

for i in range(nframe_test):
    img_label.seek(id_frame[i + offset_nframe])
    img_input.seek(id_frame[i + offset_nframe])

    label_ = np.asarray(img_label)
    input_ = np.asarray(img_input)

    np.save(os.path.join(dir_save_test, 'label_%03d.npy' % i), label_)
    np.save(os.path.join(dir_save_test, 'input_%03d.npy' % i), input_)


2. UNET 구조 구현하기

2.1. 필요한 라이브러리 임포트

#라이브러리 임포트
import argparse
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms, datasets

#GPU 사용
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 

#훈련 기록 저장할 폴더
data_dir = '/content/drive/MyDrive/DSL/OOP/data'
ckpt_dir = '/content/drive/MyDrive/DSL/OOP/checkpoint'
log_dir = '/content/drive/MyDrive/DSL/OOP/log'

2.2. 훈련 파라미터

lr = 1e-3 #학습률
batch_size = 4 #배치 사이즈
num_epoch = 100 #에포크 수

2.3. UNET 클래스 만들기

아래의 구조 이미지에 나온 설명대로 차근차근 클래스를 만들면 된다.

#UNET 구조 만들기

class UNet(nn.Module): #nn.module 상속
  
    def __init__(self):
        super(UNet, self).__init__()

        ##CBR2d 레이어를 추가해주는 메소드
        
        def CBR2d(in_channels, out_channels, kernel_size=3, 
        		  stride=1, padding=1, bias=True):
            layers = []
            
            ###컨볼루션
            layers += [nn.Conv2d(in_channels=in_channels, 
            		   out_channels=out_channels,
                       kernel_size=kernel_size, 
                       stride=stride, padding=padding,
                       bias=bias)]
                                 
            ###배치정규화
            layers += [nn.BatchNorm2d(num_features=out_channels)]
            
            ###ReLU 함수로 activate
            layers += [nn.ReLU()]

            cbr = nn.Sequential(*layers)

            return cbr

        ## Contracting path
        
        ###encoder의 1번째 층의 1번째 스테이지에서의 컨볼루션
        self.enc1_1 = CBR2d(in_channels=1, out_channels=64) 
       
        ###encoder의 1번째 층의 2번째 스테이지에서의 컨볼루션
        self.enc1_2 = CBR2d(in_channels=64, out_channels=64)

        ###첫번째 풀링: 2*2 사이즈로, max pooling
        self.pool1 = nn.MaxPool2d(kernel_size=2) #pooling(붉은 화살표)

        self.enc2_1 = CBR2d(in_channels=64, out_channels=128)
        self.enc2_2 = CBR2d(in_channels=128, out_channels=128)

        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.enc3_1 = CBR2d(in_channels=128, out_channels=256)
        self.enc3_2 = CBR2d(in_channels=256, out_channels=256)

        self.pool3 = nn.MaxPool2d(kernel_size=2)

        self.enc4_1 = CBR2d(in_channels=256, out_channels=512)
        self.enc4_2 = CBR2d(in_channels=512, out_channels=512)

        self.pool4 = nn.MaxPool2d(kernel_size=2)

        self.enc5_1 = CBR2d(in_channels=512, out_channels=1024) 
        ###encoder 파트 완성

        ##Expansive path
        
        ###디코더의 5번째 층의 1번째 컨볼루션
        self.dec5_1 = CBR2d(in_channels=1024, out_channels=512)

        ###언풀링
        self.unpool4 = nn.ConvTranspose2d(in_channels=512, out_channels=512,
                                          kernel_size=2, stride=2, 
                                          padding=0, bias=True)

        ###언풀링된 레이어와, 
        ###4번째층에서 컨볼루션된 레이어의 concatenate가 이루어지므로 
        ###input size가 1024
        self.dec4_2 = CBR2d(in_channels=2 * 512, out_channels=512) 
        self.dec4_1 = CBR2d(in_channels=512, out_channels=256)

        self.unpool3 = nn.ConvTranspose2d(in_channels=256, out_channels=256,
                                          kernel_size=2, stride=2, 
                                          padding=0, bias=True)

        self.dec3_2 = CBR2d(in_channels=2 * 256, out_channels=256)
        self.dec3_1 = CBR2d(in_channels=256, out_channels=128)

        self.unpool2 = nn.ConvTranspose2d(in_channels=128, out_channels=128,
                                          kernel_size=2, stride=2, padding=0, 
                                          bias=True)

        self.dec2_2 = CBR2d(in_channels=2 * 128, out_channels=128)
        self.dec2_1 = CBR2d(in_channels=128, out_channels=64)

        self.unpool1 = nn.ConvTranspose2d(in_channels=64, out_channels=64,
                                          kernel_size=2, stride=2, 
                                          padding=0, bias=True)

        self.dec1_2 = CBR2d(in_channels=2 * 64, out_channels=64)
        self.dec1_1 = CBR2d(in_channels=64, out_channels=64)

        self.fc = nn.Conv2d(in_channels=64, out_channels=1, kernel_size=1, 
                            stride=1, padding=0, bias=True)


    ##뉴럴넷의 foward-path
    def forward(self, x): 
        enc1_1 = self.enc1_1(x)
        enc1_2 = self.enc1_2(enc1_1)
        pool1 = self.pool1(enc1_2)

        enc2_1 = self.enc2_1(pool1)
        enc2_2 = self.enc2_2(enc2_1)
        pool2 = self.pool2(enc2_2)

        enc3_1 = self.enc3_1(pool2)
        enc3_2 = self.enc3_2(enc3_1)
        pool3 = self.pool3(enc3_2)

        enc4_1 = self.enc4_1(pool3)
        enc4_2 = self.enc4_2(enc4_1)
        pool4 = self.pool4(enc4_2)

        enc5_1 = self.enc5_1(pool4)

        dec5_1 = self.dec5_1(enc5_1)

        unpool4 = self.unpool4(dec5_1)
        cat4 = torch.cat((unpool4, enc4_2), dim=1)
        dec4_2 = self.dec4_2(cat4)
        dec4_1 = self.dec4_1(dec4_2)

        unpool3 = self.unpool3(dec4_1)
        cat3 = torch.cat((unpool3, enc3_2), dim=1)
        dec3_2 = self.dec3_2(cat3)
        dec3_1 = self.dec3_1(dec3_2)

        unpool2 = self.unpool2(dec3_1)
        cat2 = torch.cat((unpool2, enc2_2), dim=1)
        dec2_2 = self.dec2_2(cat2)
        dec2_1 = self.dec2_1(dec2_2)

        unpool1 = self.unpool1(dec2_1)
        cat1 = torch.cat((unpool1, enc1_2), dim=1)
        dec1_2 = self.dec1_2(cat1)
        dec1_1 = self.dec1_1(dec1_2)

        x = self.fc(dec1_1)

        return x