딥러닝 CV&LLM

[딥러닝 컴퓨터비전]불균형 데이터셋에 대한 샘플링 전략 적용법 (Pytorch)

컴퓨터비전 LCK 2024. 5. 6. 21:54

 

안녕하세요, 오늘은 [논문리뷰]를 진행하며 알아본 불균형 데이터셋에서의 샘플링 전략 적용을 위한 코드 구현법에 대해서 알아보겠습니다.

 

 

[논문 리뷰] Decoupling representation and classifier for long-tailed recognition(롱테일 인식을 위한 특징 추출기

1. Introduction ImageNet과 같은 거대한 데이터셋들을 주로 활용하면서, 딥 CNN신경망과 함께 이미지 분류 모델은 엄청난 속도로 발전해왔습니다. 이러한 데이터셋들은 모델 훈련시 준수한 성능을 낼

deep-learning00.tistory.com

 

 

해당 논문에서는 데이터셋  클래스간 불균형을 극복하기 위해 리샘플링 전략을 사용했습니다.

 

re-sampling 전략

 

 

가공 전의 데이터셋은 클래스마다 샘플의 숫자가 다르기 때문에 샘플이 많은 클래스들이 미니배치에 뽑힐 확률이 높습니다. 위 사진과 같은 수학적 모델링을 통해 모든 클래스들이 균형있게 미니배치에 포함되도록 확률을 조정하는 기법을 resampling이라고 합니다.

 

이러한 resampling 기법들을 코드로 구현하는 방법을 예시 코드와 함께 알아보겠습니다.(전체 실행코드는 맨 밑에 첨부했습니다!)

 

import torch
from torch.utils.data import DataLoader
from torch.utils.data.sampler import WeightedRandomSampler

sampler = WeightedRandomSampler(weights_tensor, len(weights), replacement=True)
Dataloader_wrs = DataLoader(dataset, batch_size=5, sampler=sampler) # sampler 매개변수를 통해 조절 가능

 

(중간의 코드를 뜯어온 것입니다.)

 

기존에 Dataloader로 데이터들을 불러오게 되면 위 사진에서의 Instance-balanced 샘플링과 같이 클래스간의 샘플 수 격차에 따라 미니배치에 포함되는 샘플들의 분포의 차이가 발생하게 되지만, sampler를 적절히 내가 원하는 전략을 적용시킨 메커니즘으로 바꿔줌으로써 Dataloader가 데이터를 불러오는 확률을 조정해줄 수 있습니다.

 

WeightedRandomSampler를 사용하면 해당 샘플이 조정 전 미니배치에 포함될 확률에 weight가 곱해져 최종적으로 미니배치에 포함될 확률이 산출됩니다. 이제 weight를 만드는 법을 알아보겠습니다.

 

import numpy as np

def make_weights(labels, nclasses):
    labels = np.array(labels)
    weight_list = []
    
    for cls in range(nclasses):
        idx = np.where(labels == cls)[0]
        count = len(idx)  # count는 해당 레이블의 샘플 개수
        weight = 1 / count
        weights = [weight] * count  # 가중치를 count 수만큼 반복하여 리스트에 저장
        weight_list += weights
        
    return weight_list

 

데이터셋의 레이블이 0부터 N까지 순차적으로 정렬된 데이터셋을 사용할 시의 코드입니다.

 

weight=1/count 로 적용이 되고 count는 해당 레이블의 샘플 개수를 의미하므로, 샘플 개수가 많을 수록 weight값은 작아지게 됩니다.(+각 레이블 샘플들의 weight를 모두 합하면 1이 됩니다.)

 

따라서 이는 모든 클래스가 같은 샘플링 확률을 가지는 class-balanced sampling을 의미한다고 볼 수 있습니다.

 

이런식으로 각각의 샘플들이 샘플링될 확률을 조정하는 weight를 만들고 이 weight를 sampler에 넣어준 후 해당 sampler를 dataloader의 매개변수로 넣어줌으로써 re-sampling 알고리즘이 완성됩니다.

 

다음의 데이터셋을 활용하여 전체 코드를 진행시켜 보도록 하겠습니다.

 

import numpy as np

# 클래스 레이블 생성 (0: 30개, 1: 10개, 2: 5개, 3: 5개)
labels = np.array([0] * 30 + [1] * 10 + [2] * 5 + [3] * 5)

# 입력 데이터 생성 (예시로 랜덤한 값 사용)
X = np.random.rand(len(labels), 2)  # 2차원 입력 데이터

 

0 레이블의 샘플 수 → 30

1 레이블의 샘플 수 → 10

2 레이블의 샘플 수 →  5

3 레이블의 샘플 수 →  5

 

와 같은 불균형 데이터셋을 사용하였습니다.

 

import torch
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.sampler import WeightedRandomSampler

def make_weights(labels, nclasses):
    labels = np.array(labels)
    weight_list = []
    
    for cls in range(nclasses):
        idx = np.where(labels == cls)[0]
        count = len(idx)
        weight = 1 / count
        weights = [weight] * count
        weight_list += weights
        
    return weight_list

 

class-balanced sampling을 위한 weight maker 함수를 선언합니다.

 

# 클래스 별 가중치 생성
weights = make_weights(labels, 4)
print(weights)
[0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.03333333333333333, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2]

 

밑은 print(weights)를 한 결과입니다. 0.333...의 weight를 가지는 샘플들이 30개, 0.1의 weight를 가지는 샘플들이 10개, 0.2의 weight를 가지는 샘플들이 10개(레이블 2의 샘플 5개+레이블 3의 샘플 5개)라는 것을 알 수 있습니다.

 

각 레이블의 샘플들의 weight를 모두 합하면 역시나 1이 됩니다.

 

# 가중치를 토치 텐서로 변환
weights_tensor = torch.tensor(weights, dtype=torch.double)

# TensorDataset 생성
dataset = TensorDataset(torch.tensor(X, dtype=torch.float32), torch.tensor(labels, dtype=torch.long))

# WeightedRandomSampler 생성
sampler = WeightedRandomSampler(weights_tensor, len(weights), replacement=True)

# DataLoader 생성 > 비교를 위해 random sampler도 생성
DataLoader_wrs = DataLoader(dataset, batch_size=5, sampler=sampler)
DataLoader_rs = DataLoader(dataset, batch_size=5, shuffle=True)

 

sampler를 만들어주고 비교를 위해 sampler를 적용한 Dataloader와 sampler를 적용하지 않은 Dataloader 두가지를 만들어 주었습니다.

 

# wrs로 실험
epochs = 5
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}")
    print("---------")
    # DataLoader로부터 데이터를 가져와 출력
    for inputs, labels in DataLoader_wrs:
        print("입력 데이터:")
        print(inputs)
        print("클래스 레이블:")
        print(labels)
        print()

 

입력 데이터:
tensor([[0.2597, 0.2171],
        [0.2410, 0.3000],
        [0.0573, 0.1691],
        [0.9524, 0.6731],
        [0.7562, 0.4262]])
클래스 레이블:
tensor([3, 0, 3, 2, 2])

입력 데이터:
tensor([[0.9524, 0.6731],
        [0.8963, 0.0296],
        [0.2539, 0.7088],
        [0.0690, 0.6228],
        [0.0690, 0.6228]])
클래스 레이블:
tensor([2, 2, 1, 3, 3])

입력 데이터:
tensor([[0.6400, 0.6870],
        [0.2597, 0.2171],
        [0.2597, 0.2171],
        [0.8643, 0.4195],
        [0.7562, 0.4262]])
클래스 레이블:
tensor([0, 3, 3, 0, 2])

입력 데이터:
tensor([[8.9631e-01, 2.9606e-02],
        [5.2682e-01, 1.9270e-01],
        [6.9023e-02, 6.2277e-01],
        [1.9427e-04, 6.5817e-01],
        [9.5244e-01, 6.7315e-01]])
클래스 레이블:
tensor([2, 2, 3, 3, 2])

입력 데이터:
tensor([[1.9427e-04, 6.5817e-01],
        [6.3996e-01, 6.8701e-01],
        [7.5622e-01, 4.2617e-01],
        [5.5971e-01, 5.0291e-01],
        [8.8036e-01, 8.4322e-01]])
클래스 레이블:
tensor([3, 0, 2, 0, 1])

입력 데이터:
tensor([[3.9912e-01, 8.8486e-01],
        [1.9427e-04, 6.5817e-01],
        [6.9023e-02, 6.2277e-01],
        [5.2682e-01, 1.9270e-01],
        [8.8240e-01, 1.1885e-01]])
클래스 레이블:
tensor([0, 3, 3, 2, 3])

 

샘플링된 결과를 print한 일부분입니다. 레이블 0,1,2,3이 골고루 샘플링 된것을 확인할 수 있습니다.

 

다음으로는 random sampling(Instance-balanced sampling)한 결과를 살펴보겠습니다.

 

# rs로 실험
epochs = 5
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}")
    print("---------")
    # DataLoader로부터 데이터를 가져와 출력
    for inputs, labels in DataLoader_rs:
        print("입력 데이터:")
        print(inputs)
        print("클래스 레이블:")
        print(labels)
        print()
입력 데이터:
tensor([[0.6792, 0.0640],
        [0.2754, 0.3197],
        [0.5638, 0.9923],
        [0.3154, 0.2666],
        [0.5196, 0.7962]])
클래스 레이블:
tensor([0, 0, 1, 1, 2])

입력 데이터:
tensor([[0.5574, 0.6954],
        [0.8270, 0.2186],
        [0.0834, 0.6174],
        [0.8095, 0.3751],
        [0.9175, 0.6190]])
클래스 레이블:
tensor([0, 0, 1, 2, 3])

입력 데이터:
tensor([[0.4511, 0.7954],
        [0.8480, 0.6067],
        [0.7565, 0.4878],
        [0.4409, 0.3619],
        [0.1070, 0.9318]])
클래스 레이블:
tensor([0, 0, 3, 0, 0])

입력 데이터:
tensor([[0.1735, 0.4621],
        [0.1942, 0.4984],
        [0.8648, 0.6007],
        [0.9101, 0.1342],
        [0.8330, 0.3381]])
클래스 레이블:
tensor([0, 0, 3, 3, 0])

입력 데이터:
tensor([[0.8903, 0.0760],
        [0.6902, 0.7394],
        [0.3147, 0.7318],
        [0.5444, 0.1538],
        [0.9653, 0.4251]])
클래스 레이블:
tensor([0, 0, 1, 2, 0])

입력 데이터:
tensor([[0.5280, 0.8519],
        [0.8961, 0.4116],
        [0.4583, 0.6146],
        [0.8919, 0.8983],
        [0.6356, 0.0208]])
클래스 레이블:
tensor([0, 2, 0, 0, 0])

 

0 레이블의 샘플들이 많은 비율로 샘플링 되었음을 확인할 수 있습니다.

 

이와 같은 방법으로 re-sampling을 코드 상으로 구현할 수 있습니다.

 

make_weights 함수를 바꾸어 줌으로써 나머지 두가지 방법도 구현해낼 수 있습니다.

 

square-root sampling같은 경우 1/sqrt(count)로 구현할 수 있고, progressively-balanced sampling 같은 경우 class-balanced weight maker와 Instance-balance weight maker를 만들고 epoch를 전역변수로 선언한 후 만들어 볼 수 있습니다.

 

마지막으로 전체코드 첨부하며 포스팅 마무리하도록 하겠습니다. 감사합니다!

 

import numpy as np

# 클래스 레이블 생성 (0: 30개, 1: 10개, 2: 5개, 3: 5개)
labels = np.array([0] * 30 + [1] * 10 + [2] * 5 + [3] * 5)

# 입력 데이터 생성 (예시로 랜덤한 값 사용)
X = np.random.rand(len(labels), 2)  # 2차원 입력 데이터

import torch
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.sampler import WeightedRandomSampler

def make_weights(labels, nclasses):
    labels = np.array(labels)
    weight_list = []
    
    for cls in range(nclasses):
        idx = np.where(labels == cls)[0]
        count = len(idx)
        weight = 1 / count
        weights = [weight] * count
        weight_list += weights
        
    return weight_list

# 클래스 별 가중치 생성
weights = make_weights(labels, 4)
print(weights)

# 가중치를 토치 텐서로 변환
weights_tensor = torch.tensor(weights, dtype=torch.double)

# TensorDataset 생성
dataset = TensorDataset(torch.tensor(X, dtype=torch.float32), torch.tensor(labels, dtype=torch.long))

# WeightedRandomSampler 생성
sampler = WeightedRandomSampler(weights_tensor, len(weights), replacement=True)

# DataLoader 생성
DataLoader_wrs = DataLoader(dataset, batch_size=5, sampler=sampler)
DataLoader_rs=DataLoader(dataset, batch_size=5, shuffle=True)

# wrs로 실험
epochs = 5
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}")
    print("---------")
    # DataLoader로부터 데이터를 가져와 출력
    for inputs, labels in DataLoader_wrs:
        print("클래스 레이블:")
        print(labels)
        print()
        
# rs로 실험
epochs = 5
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}")
    print("---------")
    # DataLoader로부터 데이터를 가져와 출력
    for inputs, labels in DataLoader_rs:
        print("입력 데이터:")
        print(inputs)
        print("클래스 레이블:")
        print(labels)
        print()