머신러닝&딥러닝

UNET 구조 구현하기 #2: 데이터 로더 ~ 모델 검증

seungbeomdo 2023. 1. 30. 12:52

Pytorch 기반의 딥러닝 모델 구현을 연습해보기.

 

한요섭님 유튜브 강의에서 다룬 코드들을 꼼꼼하게 리뷰하면서

딥러닝 모델 구현의.. 일종의 메뉴얼을 습득해보려고 한다.

 

hanyoseob

의료데이터 분석 및 인공지능 개발자, 한요섭 입니다 :D 머신러닝/딥러닝, 신호처리, 병렬 컴퓨팅 (CUDA), 블록체인 (Solidity) 그리고 논문작성 꿀팁 등을 공돌이 관점에서 실습을 통해 알아보도록

www.youtube.com

 

1편

 

UNET 구조 구현하기 #1: 데이터 저장하기 ~ 모델 클래스 구현

Pytorch 기반의 딥러닝 모델 구현을 연습해보기. 한요섭님 유튜브 강의에서 다룬 코드들을 꼼꼼하게 리뷰하면서 딥러닝 모델 구현의.. 일종의 메뉴얼을 습득해보려고 한다. hanyoseob 의료데이터 분

seungbeomdo.tistory.com


3. 데이터 로더(Loader) 구현

class Dataset(torch.utils.data.Dataset):

    def __init__(self, data_dir, transform=None):
    
    	#데이터셋이 위치한 폴더
        self.data_dir = data_dir 
        self.transform = transform

        #폴더 내 파일 리스트: 이미지들의 이름
        lst_data = os.listdir(self.data_dir)

        #만약 이미지 이름이 label로 시작하면 label에 포함
        lst_label = [f for f in lst_data if f.startswith('label')]
        #만약 이미지 이름이 input으로 시작하면 input에 포함
        lst_input = [f for f in lst_data if f.startswith('input')]

        #이름순 정렬
        lst_label.sort()
        lst_input.sort()

        self.lst_label = lst_label
        self.lst_input = lst_input

    def __len__(self):
        return len(self.lst_label)

    def __getitem__(self, index):
        label = np.load(os.path.join(self.data_dir, self.lst_label[index]))
        input = np.load(os.path.join(self.data_dir, self.lst_input[index]))

        label = label/255.0 #정규화
        input = input/255.0

        if label.ndim == 2:
            label = label[:, :, np.newaxis]
        if input.ndim == 2:
            input = input[:, :, np.newaxis]

        data = {'input': input, 'label': label}

        if self.transform:
            data = self.transform(data)

        return data


4. Transform 구현하기

넘파이 array 형태로 불러와진 이미지 데이터를 tensor로 변환하기

#Tensor로 변환하기
class ToTensor(object):
    def __call__(self, data):
        label, input = data['label'], data['input']

        label = label.transpose((2, 0, 1)).astype(np.float32)
        input = input.transpose((2, 0, 1)).astype(np.float32)

        data = {'label': torch.from_numpy(label), 
                'input': torch.from_numpy(input)}

        return data

#정규화
class Normalization(object):
    def __init__(self, mean=0.5, std=0.5):
        self.mean = mean
        self.std = std

    def __call__(self, data):
        label, input = data['label'], data['input']
        input = (input - self.mean) / self.std
        data = {'label': label, 'input': input}
        return data

class RandomFlip(object):
    def __call__(self, data):
        label, input = data['label'], data['input']

        #이미지 원본을 랜덤하게 좌우반전
        if np.random.rand() > 0.5:
            label = np.fliplr(label)
            input = np.fliplr(input)

        #이미지 원본을 랜덤하게 상하반전
        if np.random.rand() > 0.5:
            label = np.flipud(label)
            input = np.flipud(input)

        data = {'label': label, 'input': input}

        return data
transform = transforms.Compose([Normalization(mean=0.5, std=0.5), RandomFlip(), ToTensor()])

dataset_train = Dataset(data_dir = os.path.join(data_dir, 'train'), transform = transform)
loader_train = DataLoader(dataset_train, batch_size = batch_size, shuffle = True, num_workers = 8)

dataset_val = Dataset(data_dir = os.path.join(data_dir, 'val'), transform = transform)
loader_val = DataLoader(dataset_train, batch_size = batch_size, shuffle = True, num_workers = 8)

5. 모델 훈련하기

#네트워크 생성하기
net = UNet().to(device)

#손실함수 정의하기
fn_loss = nn.BCEWithLogitsLoss().to(device)

#Optimizer 설정하기
optim = torch.optim.Adam(net.parameters(), lr=lr)

#부수적인 variables 설정하기
##dataset의 크기
num_data_train = len(dataset_train)
num_data_val = len(dataset_val)

##배치의 개수 = (dataset 크기 / 배치 크기)의 정수 반올림
num_batch_train = np.ceil(num_data_train / batch_size)
num_batch_val = np.ceil(num_data_val / batch_size)

##부수적인 function 설정하기
###
fn_tonumpy = lambda x: x.to('cpu').detach().numpy().transpose(0, 2, 3, 1)
###표준화된 값을 다시 원래 값으로 돌리는 함수
fn_denorm = lambda x, mean, std: (x * std) + mean
###소득구간(><50K) 구분하기 위한 확률 임계값
fn_class = lambda x: 1.0 * (x > 0.5)

#Tensorboard 를 사용하기 위한 SummaryWriter 설정
writer_train = SummaryWriter(log_dir=os.path.join(log_dir, 'train'))
writer_val = SummaryWriter(log_dir=os.path.join(log_dir, 'val'))
#네트워크 저장하기
def save(ckpt_dir, net, optim, epoch):
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    torch.save({'net': net.state_dict(), 'optim': optim.state_dict()},
               "%s/model_epoch%d.pth" % (ckpt_dir, epoch))

#네트워크 불러오기
def load(ckpt_dir, net, optim):
    if not os.path.exists(ckpt_dir):
        epoch = 0
        return net, optim, epoch

    ckpt_lst = os.listdir(ckpt_dir)
    ckpt_lst.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))

    dict_model = torch.load('%s/%s' % (ckpt_dir, ckpt_lst[-1]))

    net.load_state_dict(dict_model['net'])
    optim.load_state_dict(dict_model['optim'])
    epoch = int(ckpt_lst[-1].split('epoch')[1].split('.pth')[0])

    return net, optim, epoch
#네트워크 학습시키기

st_epoch = 0

net, optim, st_epoch = load(ckpt_dir=ckpt_dir, net=net, optim=optim)

for epoch in range(st_epoch + 1, num_epoch + 1):

    net.train()
    loss_arr = []

    for batch, data in enumerate(loader_train, 1):
        # forward pass
        label = data['label'].to(device)
        input = data['input'].to(device)

        output = net(input)

        # backward pass
        optim.zero_grad()

        loss = fn_loss(output, label)
        loss.backward()

        optim.step()

        # 손실함수 계산
        loss_arr += [loss.item()]

        print("TRAIN: EPOCH %04d / %04d | BATCH %04d / %04d | LOSS %.4f" %
              (epoch, num_epoch, batch, num_batch_train, np.mean(loss_arr)))

        # Tensorboard 저장하기
        label = fn_tonumpy(label)
        input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
        output = fn_tonumpy(fn_class(output))

        writer_train.add_image('label', label, num_batch_train * (epoch - 1) 
                               + batch, dataformats='NHWC')
        writer_train.add_image('input', input, num_batch_train * (epoch - 1) 
                               + batch, dataformats='NHWC')
        writer_train.add_image('output', output, num_batch_train * (epoch - 1) 
                               + batch, dataformats='NHWC')

    writer_train.add_scalar('loss', np.mean(loss_arr), epoch)

    with torch.no_grad():
            net.eval()
            loss_arr = []

            for batch, data in enumerate(loader_val, 1):
                # forward pass
                label = data['label'].to(device)
                input = data['input'].to(device)

                output = net(input)

                # 손실함수 계산하기
                loss = fn_loss(output, label)

                loss_arr += [loss.item()]

                print("VALID: EPOCH %04d / %04d | BATCH %04d / %04d | LOSS %.4f" %
                      (epoch, num_epoch, batch, num_batch_val, np.mean(loss_arr)))

                # Tensorboard 저장하기
                label = fn_tonumpy(label)
                input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
                output = fn_tonumpy(fn_class(output))

                writer_val.add_image('label', label, num_batch_val * (epoch - 1) 
                                     + batch, dataformats='NHWC')
                writer_val.add_image('input', input, num_batch_val * (epoch - 1) 
                                     + batch, dataformats='NHWC')
                writer_val.add_image('output', output, num_batch_val * (epoch - 1) 
                                     + batch, dataformats='NHWC')

    writer_val.add_scalar('loss', np.mean(loss_arr), epoch)

    if epoch % 50 == 0:
        save(ckpt_dir=ckpt_dir, net=net, optim=optim, epoch=epoch)

writer_train.close()
writer_val.close()

6. 모델 검증

## 이미지 트랜스폼
transform = transforms.Compose([Normalization(mean=0.5, std=0.5), ToTensor()])

## 테스트셋
dataset_test = Dataset(data_dir=os.path.join(data_dir, 'test'), transform=transform)
loader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False, num_workers=8)

## 네트워크 생성하기
net = UNet().to(device)

## 손실함수 정의하기
fn_loss = nn.BCEWithLogitsLoss().to(device)

## Optimizer 설정하기
optim = torch.optim.Adam(net.parameters(), lr=lr)

## 그밖에 부수적인 variables 설정하기
num_data_test = len(dataset_test)

num_batch_test = np.ceil(num_data_test / batch_size)

## 그밖에 부수적인 functions 설정하기
fn_tonumpy = lambda x: x.to('cpu').detach().numpy().transpose(0, 2, 3, 1)
fn_denorm = lambda x, mean, std: (x * std) + mean
fn_class = lambda x: 1.0 * (x > 0.5)

## 네트워크 저장하기
def save(ckpt_dir, net, optim, epoch):
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    torch.save({'net': net.state_dict(), 'optim': optim.state_dict()},
               "%s/model_epoch%d.pth" % (ckpt_dir, epoch))
    # torch.save({'net': net.state_dict(), 'optim': optim.state_dict()},
    #         "./%s/model_epoch%d.pth" % (ckpt_dir, epoch))

## 네트워크 불러오기
def load(ckpt_dir, net, optim):
    if not os.path.exists(ckpt_dir):
        epoch = 0
        return net, optim, epoch

    ckpt_lst = os.listdir(ckpt_dir)
    ckpt_lst.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))

    # dict_model = torch.load('./%s/%s' % (ckpt_dir, ckpt_lst[-1]))
    dict_model = torch.load('%s/%s' % (ckpt_dir, ckpt_lst[-1]))

    net.load_state_dict(dict_model['net'])
    optim.load_state_dict(dict_model['optim'])
    epoch = int(ckpt_lst[-1].split('epoch')[1].split('.pth')[0])

    return net, optim, epoch

## 네트워크 학습시키기
st_epoch = 0
net, optim, st_epoch = load(ckpt_dir=ckpt_dir, net=net, optim=optim)

with torch.no_grad():
    net.eval()
    loss_arr = []

    for batch, data in enumerate(loader_test, 1):
        # forward pass
        label = data['label'].to(device)
        input = data['input'].to(device)

        output = net(input)

        # 손실함수 계산하기
        loss = fn_loss(output, label)

        loss_arr += [loss.item()]

        print("TEST: BATCH %04d / %04d | LOSS %.4f" %
              (batch, num_batch_test, np.mean(loss_arr)))

        # Tensorboard 저장하기
        label = fn_tonumpy(label)
        input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
        output = fn_tonumpy(fn_class(output))

        for j in range(label.shape[0]):
            id = num_batch_test * (batch - 1) + j

            plt.imsave(os.path.join(result_dir, 'png', 'label_%04d.png' % id), label[j].squeeze(), cmap='gray')
            plt.imsave(os.path.join(result_dir, 'png', 'input_%04d.png' % id), input[j].squeeze(), cmap='gray')
            plt.imsave(os.path.join(result_dir, 'png', 'output_%04d.png' % id), output[j].squeeze(), cmap='gray')

            np.save(os.path.join(result_dir, 'numpy', 'label_%04d.npy' % id), label[j].squeeze())
            np.save(os.path.join(result_dir, 'numpy', 'input_%04d.npy' % id), input[j].squeeze())
            np.save(os.path.join(result_dir, 'numpy', 'output_%04d.npy' % id), output[j].squeeze())

print("AVERAGE TEST: BATCH %04d / %04d | LOSS %.4f" %
      (batch, num_batch_test, np.mean(loss_arr)))
#결과 보기

result_dir = '/content/drive/MyDrive/DSL/OOP/results/numpy'

lst_data = os.listdir(result_dir)

lst_label = [f for f in lst_data if f.startswith('label')]
lst_input = [f for f in lst_data if f.startswith('input')]
lst_output = [f for f in lst_data if f.startswith('output')]

lst_label.sort()
lst_input.sort()
lst_output.sort()

#첫번째 결과
id = 0
label = np.load(os.path.join(result_dir, lst_label[id]))
input = np.load(os.path.join(result_dir, lst_input[id]))
output = np.load(os.path.join(result_dir, lst_output[id]))

#결과 시각화
plt.subplot(131)
plt.imshow(input, cmap='gray')
plt.title('input')

plt.subplot(132)
plt.imshow(label, cmap='gray')
plt.title('label')

plt.subplot(133)
plt.imshow(output, cmap='gray')
plt.title('output')

plt.show()