본문 바로가기

AI 인공지능

인공지능 수첩 ( segmentation / Mask R-CNN / RoIPool Layer / RoIAlign / FCN / U-Net / semantic segmentation )

반응형

 

segmentation

- 픽셀 수준에서 이미지의 각 부분이 어떤 영역인지 분리해 내는 방법


Mask R-CNN

- Instance Segmentation 모델

- 2-Stage Object Detection의 Faster-R-CNN을 계승한 것

- object detection 모델로 각 개체를 구분하고 이후에 각 개체 별로 시맨틱 세그멘테이션을 수행


 

RoIPool Layer

- 다양한 RoI 영역을 Pooling을 통해 동일한 크기의 Feature map으로 추출해 내는 레이어

- 이 고정 사이즈의 Feature map을 바탕으로 바운딩 박스와 object의 클래스를 추론해냄.


 

RoIAlign

- Quantization하지 않고도 RoI를 처리할 고정 사이즈의 Feature map을 생성할 수 있도록 함.

- RoI 영역을 pooling layer의 크기에 맞추어 등분한 후, RoIPool을 했을 때의 quantization 영역 중 가까운 것들과의 bilinear interpolation을 통해 생성해야 할 Feature Map을 계산해냄.


FCN (Fully Convolutional Network)

- Semantic Segmentation 모델

- 네트워크 뒷단에 fully connected layer 대신 CNN을 붙여줌.

- 위치정보를 유지하면서 클래스 단위의 히트맵(heat map)을 얻어 세그멘테이션을 하기 위해, fully connected layer를 CNN으로 대체함.

- 위치의 특성을 유지하면서 이미지 분류를 하기 위해서 마지막 CNN은 1x1의 커널 크기(kernel size)와 클래스의 개수만큼의 채널을 가짐.

- CNN을 거치면 클래스 히트맵을 얻을 수 있음.

- Upsampling을 위해 FCN에서는 DeconvolutionInterpolation 방식을 활용함.

- FCN은 입력 이미지의 크기를 조정하여 세그멘테이션 맵을 얻어냄.


U-Net

- Semantic Segmentation 모델

- FCN에서 upsampling을 통해서 특성 맵을 키운 것을 입력값과 대칭적으로 만들어 준 것

- 네트워크 구조를 좌측의 Contracting path와 우측의 Expansive path 두 가지로 구분함.

- 좌측의 Contracting path는 일반적으로 우리가 사용해왔던 Convolution network와 유사한 구조를 가짐.

- 두 Path에서 크기가 같은 블록의 출력과 입력은 skip connection처럼 연결해 주어 low-level의 feature를 활용할 수 있도록함.

- 타일 방식을 사용해서 어느 정도 서로 겹치는 구간으로 타일을 나누어 네트워크를 추론함.

- 클래스 간 데이터 양의 불균형을 해결해 주기 위해서 분포를 고려한 weight map을 학습 때 사용함.


 

semantic segmentation_Unet

# 데이터셋 구성

from albumentations import HorizontalFlip, RandomSizedCrop, Compose, OneOf, Resize

def build_augmentation( is_train=True ):

if is_train: # 훈련용 데이터일 경우

return Compose([ // Compose()함수Albumentation에서 다양한 augmentation기법을 확률적으로 적용함.

HorizontalFlip( p=0.5 ), # 50%의 확률로 좌우대칭

RandomSizedCrop( # 50%의 확률로 RandomSizedCrop

min_max_height=(300, 370),

w2h_ratio=370/1242,

height=224,

width=224,

p=0.5

),

Resize( # 입력이미지를 224X224로 resize

width=224,

height=224

)

])

return Compose([ # 테스트용 데이터일 경우에는 224X224로 resize만 수행

Resize(

width=224,

height=224

)

])

dir_path = os.getenv('HOME') + '/aiffel/semantic_segmentation/data/training'

augmentation_train = build_augmentation()

augmentation_test = build_augmentation( is_train=False )

input_images = glob (os.path.join( dir_path, "image_2", "*.png" ))

# 훈련 데이터셋에서 5개만 가져와 augmentation적용

plt.figure(figsize=(12, 20))

for i in range(5):

image = imread( input_images[i] )

image_data = {"image" : image}

resized = augmentation_test(**image_data)

processed = augmentation_train(**image_data)

plt.subplot(5, 2, 2*i+1)

plt.imshow( resized["image"] ) # 원본이미지

plt.subplot(5, 2, 2*i+2)

plt.imshow( processed["image"] ) # augment된 이미지

plt.show()

class KittiGenerator( tf.keras.utils.Sequence ): # KittiGenerator는 tf.keras.utils.Sequence를 상속받음.

def __init__(self,

dir_path,

batch_size = 16,

img_size = (224, 224, 3),

output_size = (224, 224),

is_train = True,

augmentation = None):

self.dir_path = dir_path

self.batch_size = batch_size

self.is_train = is_train

self.dir_path = dir_path

self.augmentation = augmentation

self.img_size = img_size

self.output_size = output_size

self.data = self.load_dataset()

# load_dataset()을 통해서 kitti dataset의 directory path에서 라벨과 이미지를 확인함.

def load_dataset(self):

input_images = glob(os.path.join(self.dir_path, "image_2", "*.png"))

label_images = glob(os.path.join(self.dir_path, "semantic", "*.png"))

input_images.sort()

label_images.sort()

assert len(input_images) == len(label_images)

data = [ _ for _ in zip( input_images, label_images )]

if self.is_train:

return data[:-30]

return data[-30:]

 

def __len__(self):

# Generator의 length로서 전체 dataset을 batch_size로 나누고, 소숫점 첫째자리에서 올림한 값을 반환함.

return math.ceil( len(self.data) / self.batch_size )

def __getitem__(self, index):

batch_data = self.data[

index*self.batch_size:

(index + 1)*self.batch_size

]

inputs = np.zeros([self.batch_size, *self.img_size])

outputs = np.zeros([self.batch_size, *self.output_size])

 

for i, data in enumerate(batch_data):

input_img_path, output_path = data

_input = imread(input_img_path)

_output = imread(output_path)

_output = (_output==7).astype(np.uint8)*1

data = {

"image": _input,

"mask": _output,

}

augmented = self.augmentation(**data)

inputs[i] = augmented["image"] / 255

outputs[i] = augmented["mask"]

return inputs, outputs

# 입력은 resize및 augmentation이 적용된 input image이고, 출력은 semantic label임.

def on_epoch_end(self): # 한 epoch가 끝나면 실행되는 함수

self.indexes = np.arange(len(self.data))

if self.is_train == True :

np.random.shuffle(self.indexes)

return self.indexes

augmentation = build_augmentation()

test_preproc = build_augmentation(is_train=False)

 

train_generator = KittiGenerator(

dir_path,

augmentation=augmentation,

)

test_generator = KittiGenerator(

dir_path,

augmentation=test_preproc,

is_train=False

)

# U-Net 모델 구조 만들기

def build_model(input_shape=(224, 224, 3)):

inputs = Input(input_shape)

# Contracting Path

conv1 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(inputs)

conv1 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv1)

pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

conv2 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool1)

conv2 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv2)

pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

conv3 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool2)

conv3 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv3)

pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

conv4 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool3)

conv4 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv4)

drop4 = Dropout(0.5)(conv4)

pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

 

conv5 = Conv2D(1024, 3, activation='relu', padding='same',kernel_initializer='he_normal')(pool4)

conv5 = Conv2D(1024, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv5)

 

# Expanding Path

drop5 = Dropout(0.5)(conv5)

up6 = Conv2DTranspose(512, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(drop5)

merge6 = concatenate([drop4,up6], axis = 3)

conv6 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge6)

conv6 = Conv2D(512, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv6)

up7 = Conv2DTranspose(256, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(conv6)

merge7 = concatenate([conv3,up7], axis = 3)

conv7 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge7)

conv7 = Conv2D(256, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv7)

up8 = Conv2DTranspose(128, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(conv7)

merge8 = concatenate([conv2,up8], axis = 3)

conv8 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge8)

conv8 = Conv2D(128, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv8)

up9 = Conv2DTranspose(64, 2, activation='relu', strides=(2,2), kernel_initializer='he_normal')(conv8)

merge9 = concatenate([conv1,up9], axis = 3)

conv9 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(merge9)

conv9 = Conv2D(64, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv9)

conv9 = Conv2D(2, 3, activation='relu', padding='same',kernel_initializer='he_normal')(conv9)

conv10 = Conv2D( 1, 1, activation='sigmoid' )(conv9)

model = Model( inputs = inputs, outputs = conv10 )

return model

# 모델 학습하기

model_path = os.getenv('HOME')+'/aiffel/semantic_segmentation/seg_model_unet.h5'

model = build_model()

model.compile(optimizer = Adam(1e-4), loss = 'binary_crossentropy')

model.fit(

train_generator,

validation_data = test_generator,

steps_per_epoch = len(train_generator),

epochs = 100,

)

model.save(model_path)

# 모델 시각화

model_path = dir_path + '/seg_model_unet.h5'

model = tf.keras.models.load_model(model_path)

def get_output( model, preproc, image_path, output_path ):

origin_img = imread(image_path)

data = {"image":origin_img}

processed = preproc(**data)

output = model( np.expand_dims( processed["image"] / 255, axis=0))

output = (output[0].numpy() > 0.5).astype(np.uint8).squeeze(-1)*255 # threshold를 변경하면 도로인식 결과범위가 달라짐.

output = Image.fromarray(output)

background = Image.fromarray(origin_img).convert('RGBA')

output = output.resize((origin_img.shape[1], origin_img.shape[0])).convert('RGBA')

output = Image.blend(background, output, alpha=0.5) # 입력 이미지와 라벨을 한번에 볼 수 있도록 오버레이해줌.

output.show()

return output

i = 1

get_output(

model,

test_preproc,

image_path=dir_path + f'/image_2/00{str(i).zfill(4)}_10.png',

output_path=dir_path + f'./result_{str(i).zfill(3)}.png'

)

def calculate_iou_score(target, prediction):

intersection = np.logical_and(target, prediction)

union = np.logical_or(target, prediction)

iou_score = float(np.sum(intersection)) / float(np.sum(union))

print('IoU : %f' % iou_score )

return iou_score

def get_output(model, preproc, image_path, output_path, label_path):

origin_img = imread(image_path)

data = {"image":origin_img}

processed = preproc(**data)

output = model(np.expand_dims(processed["image"]/255,axis=0))

output = (output[0].numpy()>=0.5).astype(np.uint8).squeeze(-1)*255

prediction = output / 255 # 도로로 판단한 영역

 

output = Image.fromarray(output)

background = Image.fromarray(origin_img).convert('RGBA')

output = output.resize((origin_img.shape[1], origin_img.shape[0])).convert('RGBA')

output = Image.blend( background, output, alpha=0.5 )

output.show() # 도로로 판단한 영역을 시각화함.

 

if label_path:

label_img = imread(label_path)

label_data = {"image":label_img}

label_processed = preproc(**label_data)

label_processed = label_processed["image"]

target = (label_processed == 7).astype(np.uint8)*1 # 라벨에서 도로로 기재된 영역

return output, prediction, target

else:

return output, prediction, _

i = 1

output, prediction, target = get_output(

model,

test_preproc,

image_path=dir_path + f'/image_2/00{str(i).zfill(4)}_10.png',

output_path=dir_path + f'./result_{str(i).zfill(3)}.png',

label_path=dir_path + f'/semantic/00{str(i).zfill(4)}_10.png'

)

calculate_iou_score(target, prediction)

 

반응형