
SSD (Single Shot MultiBox Detector)
- 다양한 크기의 feature map을 사용 -> 원본 이미지에서 grid 크기를 다르게 하는 효과
- convolution을 할 때 비율이 다른 default box를 설정
- 각 레이어에서 피쳐 맵들을 가져와 Object Detection을 수행한 결과들을 모두 합하여 localization loss와 confidence loss를 구한 다음, 전체 네트워크를 학습시키는 방식
- 학습 전에 ground truth와 defalt box의 jaccard overlap(IoU)이 0.5 이상인 것을 미리 매칭시켜 positive sample로 설정


TinaFace
- 백본으로 ResNet50을 사용했고 기존에 존재했던 모듈(Feature Pyramid Network, Inception block, FCN 등)을 활용

RetinaFace
- 다양한 얼굴 크기에 대해 pixel-wise face localization을 수행
- 기존의 box classification과 regression 브랜치와 함께 extra-supervised and self-supervised multi-task learning을 수행

DSFD (Dual Shot Face Detector)
- Feature Enhance Module(FEM), Progressive Anchor Loss(PAL), Improved Anchor Matching (IAM)라는 기법을 사용해 성능을 높인 모델
S3FD
- 하나의 deep neural network를 사용해 다양한 얼굴 크기에 대해 face detection을 수행
- 다양한 크기의 얼굴을 잘 찾기 위해 layer의 넓은 범위에 anchor를 바둑판 형식으로 배열하였고, anchor 크기도 새롭게 디자인
-----------------------------------------------------------------
SSD 모델을 통한 Multi-face detector구현
import os, cv2, time
import tensorflow as tf
import tqdm
import numpy as np
import math
from itertools import product
import matplotlib.pyplot as plt
PROJECT_PATH = os.getenv('HOME')+'/aiffel/face_detector'
DATA_PATH = os.path.join(PROJECT_PATH, 'widerface')
MODEL_PATH = os.path.join(PROJECT_PATH, 'checkpoints')
TRAIN_TFRECORD_PATH = os.path.join(PROJECT_PATH, 'dataset', 'train_mask.tfrecord')
VALID_TFRECORD_PATH = os.path.join(PROJECT_PATH, 'dataset', 'val_mask.tfrecord')
CHECKPOINT_PATH = os.path.join(PROJECT_PATH, 'checkpoints')
DATASET_LEN = 12880
BATCH_SIZE = 32
IMAGE_WIDTH = 320
IMAGE_HEIGHT = 256
IMAGE_LABELS = ['background', 'face']
# bounding box 파일을 분석
def parse_box(data):
x0 = int(data[0])
y0 = int(data[1])
w = int(data[2])
h = int(data[3])
return x0, y0, w, h
# 이미지별 bounding box 정보를 wider_face_train_bbx_gt.txt에서 파싱해서 리스트로 추출
def parse_widerface(file):
infos = []
with open(file) as fp:
line = fp.readline()
while line:
n_object = int(fp.readline())
boxes = []
for i in range(n_object):
box = fp.readline().split(' ')
x0, y0, w, h = parse_box(box)
if (w == 0) or (h == 0):
continue
boxes.append([x0, y0, w, h])
if n_object == 0:
box = fp.readline().split(' ')
x0, y0, w, h = parse_box(box)
boxes.append([x0, y0, w, h])
infos.append((line.strip(), boxes))
line = fp.readline()
return infos
# bounding box 정보는 [x, y, w, h] 형태로 저장되어 있는데, [x_min, y_min, x_max, y_max] 형태의 꼭짓점 좌표 정보로 변환
def process_image(image_file):
image_string = tf.io.read_file(image_file)
try:
image_data = tf.image.decode_jpeg(image_string, channels=3)
return 0, image_string, image_data
except tf.errors.InvalidArgumentError:
return 1, image_string, None
def xywh_to_voc(file_name, boxes, image_data):
shape = image_data.shape
image_info = {}
image_info['filename'] = file_name
image_info['width'] = shape[1]
image_info['height'] = shape[0]
image_info['depth'] = 3
difficult = []
classes = []
xmin, ymin, xmax, ymax = [], [], [], []
for box in boxes:
classes.append(1)
difficult.append(0)
xmin.append(box[0])
ymin.append(box[1])
xmax.append(box[0] + box[2])
ymax.append(box[1] + box[3])
image_info['class'] = classes
image_info['xmin'] = xmin
image_info['ymin'] = ymin
image_info['xmax'] = xmax
image_info['ymax'] = ymax
image_info['difficult'] = difficult
return image_info
# tf.train.Example 인스턴스를 생성하는 메소드
def make_example(image_string, image_infos):
for info in image_infos:
filename = info['filename']
width = info['width']
height = info['height']
depth = info['depth']
classes = info['class']
xmin = info['xmin']
ymin = info['ymin']
xmax = info['xmax']
ymax = info['ymax']
if isinstance(image_string, type(tf.constant(0))):
encoded_image = [image_string.numpy()]
else:
encoded_image = [image_string]
base_name = [tf.compat.as_bytes(os.path.basename(filename))]
example = tf.train.Example(features=tf.train.Features(feature={
'filename':tf.train.Feature(bytes_list=tf.train.BytesList(value=base_name)),
'height':tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
'width':tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
'classes':tf.train.Feature(int64_list=tf.train.Int64List(value=classes)),
'x_mins':tf.train.Feature(float_list=tf.train.FloatList(value=xmin)),
'y_mins':tf.train.Feature(float_list=tf.train.FloatList(value=ymin)),
'x_maxes':tf.train.Feature(float_list=tf.train.FloatList(value=xmax)),
'y_maxes':tf.train.Feature(float_list=tf.train.FloatList(value=ymax)),
'image_raw':tf.train.Feature(bytes_list=tf.train.BytesList(value=encoded_image))
}))
return example
# example을 serialize하여 TFRecord 파일로 생성
for split in ['train', 'val']:
if split == 'train':
output_file = TRAIN_TFRECORD_PATH
anno_txt = 'wider_face_train_bbx_gt.txt'
file_path = 'WIDER_train'
else:
output_file = VALID_TFRECORD_PATH
anno_txt = 'wider_face_val_bbx_gt.txt'
file_path = 'WIDER_val'
with tf.io.TFRecordWriter(output_file) as writer:
for info in tqdm.tqdm(parse_widerface(os.path.join(DATA_PATH, 'wider_face_split', anno_txt))):
image_file = os.path.join(DATA_PATH, file_path, 'images', info[0])
error, image_string, image_data = process_image(image_file)
boxes = xywh_to_voc(image_file, info[1], image_data)
if not error:
tf_example = make_example(image_string, [boxes])
writer.write(tf_example.SerializeToString())
# default box 정보를 전역 변수로 만들어둠.
BOX_MIN_SIZES = [[10, 16, 24], [32, 48], [64, 96], [128, 192, 256]]
BOX_STEPS = [8, 16, 32, 64]
# 4가지 유형의 feature map을 생성
image_sizes = (IMAGE_HEIGHT, IMAGE_WIDTH)
min_sizes = BOX_MIN_SIZES
steps= BOX_STEPS
feature_maps = [
[math.ceil(image_sizes[0] / step), math.ceil(image_sizes[1] / step)]
for step in steps
]
feature_maps
# feature map별로 순회하면서 default box를 생성
boxes = []
for k, f in enumerate(feature_maps):
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes[k]:
s_kx = min_size / image_sizes[1]
s_ky = min_size / image_sizes[0]
cx = (j + 0.5) * steps[k] / image_sizes[1]
cy = (i + 0.5) * steps[k] / image_sizes[0]
boxes += [cx, cy, s_kx, s_ky]
len(boxes)
pretty_boxes = np.asarray(boxes).reshape([-1, 4])
print(pretty_boxes.shape)
print(pretty_boxes)
# feature map을 만들고 그에 연결된 default box를 생성
def default_box():
image_sizes = (IMAGE_HEIGHT, IMAGE_WIDTH)
min_sizes = BOX_MIN_SIZES
steps= BOX_STEPS
feature_maps = [
[math.ceil(image_sizes[0] / step), math.ceil(image_sizes[1] / step)]
for step in steps
]
boxes = []
for k, f in enumerate(feature_maps):
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes[k]:
s_kx = min_size / image_sizes[1]
s_ky = min_size / image_sizes[0]
cx = (j + 0.5) * steps[k] / image_sizes[1]
cy = (i + 0.5) * steps[k] / image_sizes[0]
boxes += [cx, cy, s_kx, s_ky]
boxes = np.asarray(boxes).reshape([-1, 4])
return boxes
# SSD model 빌드하기
def _conv_block(inputs, filters, kernel=(3, 3), strides=(1, 1)):
block_id = (tf.keras.backend.get_uid())
if strides == (2, 2):
x = tf.keras.layers.ZeroPadding2D(padding=((1, 1), (1, 1)), name='conv_pad_%d' % block_id)(inputs)
x = tf.keras.layers.Conv2D(filters, kernel,
padding='valid',
use_bias=False,
strides=strides,
name='conv_%d' % block_id)(x)
else:
x = tf.keras.layers.Conv2D(filters, kernel,
padding='same',
use_bias=False,
strides=strides,
name='conv_%d' % block_id)(inputs)
x = tf.keras.layers.BatchNormalization(name='conv_bn_%d' % block_id)(x)
return tf.keras.layers.ReLU(name='conv_relu_%d' % block_id)(x)
def _depthwise_conv_block(inputs, filters, strides=(1, 1)):
block_id = tf.keras.backend.get_uid()
if strides == (1, 1):
x = inputs
else:
x = tf.keras.layers.ZeroPadding2D(((1, 1), (1, 1)), name='conv_pad_%d' % block_id)(inputs)
x = tf.keras.layers.DepthwiseConv2D((3, 3),
padding='same' if strides == (1, 1) else 'valid',
strides=strides,
use_bias=False,
name='conv_dw_%d' % block_id)(x)
x = tf.keras.layers.BatchNormalization(name='conv_dw_%d_bn' % block_id)(x)
x = tf.keras.layers.ReLU(name='conv_dw_%d_relu' % block_id)(x)
x = tf.keras.layers.Conv2D(filters, (1, 1),
padding='same',
use_bias=False,
strides=(1, 1),
name='conv_pw_%d' % block_id)(x)
x = tf.keras.layers.BatchNormalization(name='conv_pw_%d_bn' % block_id)(x)
return tf.keras.layers.ReLU(name='conv_pw_%d_relu' % block_id)(x)
def _branch_block(inputs, filters):
x = tf.keras.layers.Conv2D(filters, kernel_size=(3, 3), padding='same')(inputs)
x = tf.keras.layers.LeakyReLU()(x)
x = tf.keras.layers.Conv2D(filters, kernel_size=(3, 3), padding='same')(x)
x1 = tf.keras.layers.Conv2D(filters * 2, kernel_size=(3, 3), padding='same')(inputs)
x = tf.keras.layers.Concatenate(axis=-1)([x, x1])
return tf.keras.layers.ReLU()(x)
def _create_head_block(inputs, filters):
x = tf.keras.layers.Conv2D(filters, kernel_size=(3, 3), strides=(1, 1), padding='same')(inputs)
return x
def _compute_heads(inputs, num_class, num_cell):
conf = _create_head_block(inputs, num_cell * num_class)
conf = tf.keras.layers.Reshape((-1, num_class))(conf)
loc = _create_head_block(inputs, num_cell * 4)
loc = tf.keras.layers.Reshape((-1, 4))(loc)
return conf, loc
# SSD model 준비
def SsdModel():
base_channel = 16
num_cells = [3, 2, 2, 3]
num_class = len(IMAGE_LABELS)
x = inputs = tf.keras.layers.Input(shape=[IMAGE_HEIGHT, IMAGE_WIDTH, 3], name='input_image')
x = _conv_block(x, base_channel, strides=(2, 2))
x = _conv_block(x, base_channel * 2, strides=(1, 1))
x = _conv_block(x, base_channel * 2, strides=(2, 2))
x = _conv_block(x, base_channel * 2, strides=(1, 1))
x = _conv_block(x, base_channel * 4, strides=(2, 2))
x = _conv_block(x, base_channel * 4, strides=(1, 1))
x = _conv_block(x, base_channel * 4, strides=(1, 1))
x = _conv_block(x, base_channel * 4, strides=(1, 1))
x1 = _branch_block(x, base_channel)
x = _conv_block(x, base_channel * 8, strides=(2, 2))
x = _conv_block(x, base_channel * 8, strides=(1, 1))
x = _conv_block(x, base_channel * 8, strides=(1, 1))
x2 = _branch_block(x, base_channel)
x = _depthwise_conv_block(x, base_channel * 16, strides=(2, 2))
x = _depthwise_conv_block(x, base_channel * 16, strides=(1, 1))
x3 = _branch_block(x, base_channel)
x = _depthwise_conv_block(x, base_channel * 16, strides=(2, 2))
x4 = _branch_block(x, base_channel)
extra_layers = [x1, x2, x3, x4]
confs = []
locs = []
for layer, num_cell in zip(extra_layers, num_cells):
conf, loc = _compute_heads(layer, num_class, num_cell)
confs.append(conf)
locs.append(loc)
confs = tf.keras.layers.Concatenate(axis=1, name="face_classes")(confs)
locs = tf.keras.layers.Concatenate(axis=1, name="face_boxes")(locs)
predictions = tf.keras.layers.Concatenate(axis=2, name='predictions')([locs, confs])
model = tf.keras.Model(inputs=inputs, outputs=predictions, name='ssd_model')
return model
# 준비한 모델을 생성
model = SsdModel()
print("the number of model layers: ", len(model.layers))
model.summary()
# Default box 적용
def _intersect(box_a, box_b):
A = tf.shape(box_a)[0]
B = tf.shape(box_b)[0]
max_xy = tf.minimum(
tf.broadcast_to(tf.expand_dims(box_a[:, 2:], 1), [A, B, 2]),
tf.broadcast_to(tf.expand_dims(box_b[:, 2:], 0), [A, B, 2]))
min_xy = tf.maximum(
tf.broadcast_to(tf.expand_dims(box_a[:, :2], 1), [A, B, 2]),
tf.broadcast_to(tf.expand_dims(box_b[:, :2], 0), [A, B, 2]))
inter = tf.clip_by_value(max_xy - min_xy, 0.0, 512.0)
return inter[:, :, 0] * inter[:, :, 1]
def _jaccard(box_a, box_b):
inter = _intersect(box_a, box_b)
area_a = tf.broadcast_to(
tf.expand_dims(
(box_a[:, 2] - box_a[:, 0]) * (box_a[:, 3] - box_a[:, 1]), 1),
tf.shape(inter)) # [A,B]
area_b = tf.broadcast_to(
tf.expand_dims(
(box_b[:, 2] - box_b[:, 0]) * (box_b[:, 3] - box_b[:, 1]), 0),
tf.shape(inter)) # [A,B]
union = area_a + area_b - inter
return inter / union # [A,B]
# jaccard 메소드를 이용해 label의 ground truth bbox와 가장 overlap 비율이 높은 matched box를 구함.
# _encode_bbox 메소드를 통해 bbox의 scale을 동일하게 보정함.
# 전체 default box에 대해 일정 threshold 이상 overlap되는 ground truth bounding box 존재 여부(positive/negative)를 concat하여 새로운 label로 업데이트함.
def _encode_bbox(matched, boxes, variances=[0.1, 0.2]):
g_cxcy = (matched[:, :2] + matched[:, 2:]) / 2 - boxes[:, :2]
g_cxcy /= (variances[0] * boxes[:, 2:])
g_wh = (matched[:, 2:] - matched[:, :2]) / boxes[:, 2:]
g_wh = tf.math.log(g_wh) / variances[1]
g_wh = tf.where(tf.math.is_inf(g_wh), 0.0, g_wh)
return tf.concat([g_cxcy, g_wh], 1)
def encode_tf(labels, boxes):
match_threshold = 0.45
boxes = tf.cast(boxes, tf.float32)
bbox = labels[:, :4]
conf = labels[:, -1]
# jaccard index
overlaps = _jaccard(bbox, boxes)
best_box_overlap = tf.reduce_max(overlaps, 1)
best_box_idx = tf.argmax(overlaps, 1, tf.int32)
best_truth_overlap = tf.reduce_max(overlaps, 0)
best_truth_idx = tf.argmax(overlaps, 0, tf.int32)
best_truth_overlap = tf.tensor_scatter_nd_update(
best_truth_overlap, tf.expand_dims(best_box_idx, 1),
tf.ones_like(best_box_idx, tf.float32) * 2.)
best_truth_idx = tf.tensor_scatter_nd_update(
best_truth_idx, tf.expand_dims(best_box_idx, 1),
tf.range(tf.size(best_box_idx), dtype=tf.int32))
# Scale Ground-Truth Boxes
matches_bbox = tf.gather(bbox, best_truth_idx)
loc_t = _encode_bbox(matches_bbox, boxes)
conf_t = tf.gather(conf, best_truth_idx)
conf_t = tf.where(tf.less(best_truth_overlap, match_threshold), tf.zeros_like(conf_t), conf_t)
return tf.concat([loc_t, conf_t[..., tf.newaxis]], axis=1)
# augmemtation과 label을 encoding 하여 기존의 dataset을 변환
def _transform_data(train, boxes):
def transform_data(img, labels):
img = tf.cast(img, tf.float32)
if train:
img, labels = _crop(img, labels)
img = _pad_to_square(img)
img, labels = _resize(img, labels)
if train:
img, labels = _flip(img, labels)
if train:
img = _distort(img)
labels = encode_tf(labels, boxes)
img = img/255.0
return img, labels
return transform_data
# TFRecord 에 _transform_data를 적용하는 함수 클로저 생성
def _parse_tfrecord(train, boxes):
def parse_tfrecord(tfrecord):
features = {
'filename': tf.io.FixedLenFeature([], tf.string),
'height': tf.io.FixedLenFeature([], tf.int64),
'width': tf.io.FixedLenFeature([], tf.int64),
'classes': tf.io.VarLenFeature(tf.int64),
'x_mins': tf.io.VarLenFeature(tf.float32),
'y_mins': tf.io.VarLenFeature(tf.float32),
'x_maxes': tf.io.VarLenFeature(tf.float32),
'y_maxes': tf.io.VarLenFeature(tf.float32),
'difficult':tf.io.VarLenFeature(tf.int64),
'image_raw': tf.io.FixedLenFeature([], tf.string),
}
parsed_example = tf.io.parse_single_example(tfrecord, features)
img = tf.image.decode_jpeg(parsed_example['image_raw'], channels=3)
width = tf.cast(parsed_example['width'], tf.float32)
height = tf.cast(parsed_example['height'], tf.float32)
labels = tf.sparse.to_dense(parsed_example['classes'])
labels = tf.cast(labels, tf.float32)
labels = tf.stack(
[tf.sparse.to_dense(parsed_example['x_mins']),
tf.sparse.to_dense(parsed_example['y_mins']),
tf.sparse.to_dense(parsed_example['x_maxes']),
tf.sparse.to_dense(parsed_example['y_maxes']),labels], axis=1)
img, labels = _transform_data(train, boxes)(img, labels)
return img, labels
return parse_tfrecord
# tf.data.TFRecordDataset.map()에 _parse_tfrecord을 적용하는 실제 데이터셋 변환 메인 메소드
def load_tfrecord_dataset(tfrecord_name, train=True, boxes=None, buffer_size=1024):
raw_dataset = tf.data.TFRecordDataset(tfrecord_name)
raw_dataset = raw_dataset.cache()
if train:
raw_dataset = raw_dataset.repeat()
raw_dataset = raw_dataset.shuffle(buffer_size=buffer_size)
dataset = raw_dataset.map(_parse_tfrecord(train, boxes), num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
return dataset
# load_tfrecord_dataset을 통해 train, validation 데이터셋을 생성하는 최종 메소드
def load_dataset(boxes, train=True, buffer_size=1024):
if train:
dataset = load_tfrecord_dataset(
tfrecord_name=TRAIN_TFRECORD_PATH,
train=train,
boxes=boxes,
buffer_size=buffer_size)
else:
dataset = load_tfrecord_dataset(
tfrecord_name=VALID_TFRECORD_PATH,
train=train,
boxes=boxes,
buffer_size=buffer_size)
return dataset
# 초기시점에 WarmUp부분을 도입해 learning rate가 천천히 증가할 수 있도록 학습 스텝에 따라 다른 Learning Rate이 적용
class PiecewiseConstantWarmUpDecay(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, boundaries, values, warmup_steps, min_lr, name=None):
super(PiecewiseConstantWarmUpDecay, self).__init__()
if len(boundaries) != len(values) - 1:
raise ValueError(
"The length of boundaries should be 1 less than the"
"length of values")
self.boundaries = boundaries
self.values = values
self.name = name
self.warmup_steps = warmup_steps
self.min_lr = min_lr
def __call__(self, step):
with tf.name_scope(self.name or "PiecewiseConstantWarmUp"):
step = tf.cast(tf.convert_to_tensor(step), tf.float32)
pred_fn_pairs = []
warmup_steps = self.warmup_steps
boundaries = self.boundaries
values = self.values
min_lr = self.min_lr
pred_fn_pairs.append(
(step <= warmup_steps,
lambda: min_lr + step * (values[0] - min_lr) / warmup_steps))
pred_fn_pairs.append(
(tf.logical_and(step <= boundaries[0],
step > warmup_steps),
lambda: tf.constant(values[0])))
pred_fn_pairs.append(
(step > boundaries[-1], lambda: tf.constant(values[-1])))
for low, high, v in zip(boundaries[:-1], boundaries[1:],
values[1:-1]):
pred = (step > low) & (step <= high)
pred_fn_pairs.append((pred, lambda: tf.constant(v)))
return tf.case(pred_fn_pairs, lambda: tf.constant(values[0]),
exclusive=True)
def MultiStepWarmUpLR(initial_learning_rate, lr_steps, lr_rate,
warmup_steps=0., min_lr=0.,
name='MultiStepWarmUpLR'):
assert warmup_steps <= lr_steps[0]
assert min_lr <= initial_learning_rate
lr_steps_value = [initial_learning_rate]
for _ in range(len(lr_steps)):
lr_steps_value.append(lr_steps_value[-1] * lr_rate)
return PiecewiseConstantWarmUpDecay(
boundaries=lr_steps, values=lr_steps_value, warmup_steps=warmup_steps,
min_lr=min_lr)
def hard_negative_mining(loss, class_truth, neg_ratio):
pos_idx = class_truth > 0
num_pos = tf.math.reduce_sum(tf.cast(pos_idx, tf.int32), axis=1)
num_neg = num_pos * neg_ratio
rank = tf.argsort(loss, axis=1, direction='DESCENDING')
rank = tf.argsort(rank, axis=1)
neg_idx = rank < tf.expand_dims(num_neg, 1)
return pos_idx, neg_idx
def MultiBoxLoss(num_class, neg_pos_ratio=3.0):
def multi_loss(y_true, y_pred):
num_batch = tf.shape(y_true)[0]
loc_pred, class_pred = y_pred[..., :4], y_pred[..., 4:]
loc_truth, class_truth = y_true[..., :4], tf.squeeze(y_true[..., 4:])
cross_entropy = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
temp_loss = cross_entropy(class_truth, class_pred)
pos_idx, neg_idx = hard_negative_mining(temp_loss, class_truth, neg_pos_ratio)
cross_entropy = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='sum')
loss_class = cross_entropy(
class_truth[tf.math.logical_or(pos_idx, neg_idx)],
class_pred[tf.math.logical_or(pos_idx, neg_idx)])
smooth_l1_loss = tf.keras.losses.Huber(reduction='sum')
loss_loc = smooth_l1_loss(loc_truth[pos_idx],loc_pred[pos_idx])
num_pos = tf.math.reduce_sum(tf.cast(pos_idx, tf.float32))
loss_class = loss_class / num_pos
loss_loc = loss_loc / num_pos
return loss_loc, loss_class
return multi_loss
boxes = default_box()
train_dataset = load_dataset(boxes, train=True)
# Training
boxes = default_box()
train_dataset = load_dataset(boxes, train=True)
model = SsdModel()
model.summary()
tf.keras.utils.plot_model(
model,
to_file=os.path.join(os.getcwd(), 'model.png'),
show_shapes=True,
show_layer_names=True
)
steps_per_epoch = DATASET_LEN // BATCH_SIZE
learning_rate = MultiStepWarmUpLR(
initial_learning_rate=1e-2,
lr_steps=[e*steps_per_epoch for e in [50, 70]],
lr_rate=0.1,
warmup_steps=5*steps_per_epoch,
min_lr=1e-4
)
optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9, nesterov=True)
multi_loss = MultiBoxLoss(len(IMAGE_LABELS), neg_pos_ratio=3)
@tf.function
def train_step(inputs, labels):
with tf.GradientTape() as tape:
predictions = model(inputs, training=True)
losses = {}
losses['reg'] = tf.reduce_sum(model.losses) #unused. Init for redefine network
losses['loc'], losses['class'] = multi_loss(labels, predictions)
total_loss = tf.add_n([l for l in losses.values()])
grads = tape.gradient(total_loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
return total_loss, losses
EPOCHS = 1
for epoch in range(0, EPOCHS):
for step, (inputs, labels) in enumerate(train_dataset.take(steps_per_epoch)):
load_t0 = time.time()
total_loss, losses = train_step(inputs, labels)
load_t1 = time.time()
batch_time = load_t1 - load_t0
print(f"\rEpoch: {epoch + 1}/{EPOCHS} | Batch {step + 1}/{steps_per_epoch} | Batch time {batch_time:.3f} || Loss: {total_loss:.6f} | loc loss:{losses['loc']:.6f} | class loss:{losses['class']:.6f} ",end = '',flush=True)
filepath = os.path.join(CHECKPOINT_PATH, f'weights_epoch_{(epoch + 1):03d}.h5')
model.save_weights(filepath)
# NMS 구현하기
def compute_nms(boxes, scores, nms_threshold=0.4, limit=200):
if boxes.shape[0] == 0:
return tf.constant([], dtype=tf.int32)
selected = [0]
idx = tf.argsort(scores, direction='DESCENDING')
idx = idx[:limit]
boxes = tf.gather(boxes, idx)
iou = _jaccard(boxes, boxes)
while True:
row = iou[selected[-1]]
next_indices = row <= nms_threshold
iou = tf.where(
tf.expand_dims(tf.math.logical_not(next_indices), 0),
tf.ones_like(iou, dtype=tf.float32),
iou
)
if not tf.math.reduce_any(next_indices):
break
selected.append(tf.argsort(
tf.dtypes.cast(next_indices, tf.int32), direction='DESCENDING')[0].numpy())
return tf.gather(idx, selected)
# 모델의 예측 결과를 디코딩해서 예측 확률을 토대로 NMS를 통해 최종 box와 score결과를 만들어 줌.
def decode_bbox_tf(predicts, boxes, variances=[0.1, 0.2]):
centers = boxes[:, :2] + predicts[:, :2] * variances[0] * boxes[:, 2:]
sides = boxes[:, 2:] * tf.math.exp(predicts[:, 2:] * variances[1])
return tf.concat([centers - sides / 2, centers + sides / 2], axis=1)
def parse_predict(predictions, boxes):
label_classes = IMAGE_LABELS
bbox_predictions, confidences = tf.split(predictions[0], [4, -1], axis=-1)
boxes = decode_bbox_tf(bbox_predictions, boxes)
scores = tf.math.softmax(confidences, axis=-1)
out_boxes = []
out_labels = []
out_scores = []
for c in range(1, len(label_classes)):
cls_scores = scores[:, c]
score_idx = cls_scores > 0.5
cls_boxes = boxes[score_idx]
cls_scores = cls_scores[score_idx]
nms_idx = compute_nms(cls_boxes, cls_scores)
cls_boxes = tf.gather(cls_boxes, nms_idx)
cls_scores = tf.gather(cls_scores, nms_idx)
cls_labels = [c] * cls_boxes.shape[0]
out_boxes.append(cls_boxes)
out_labels.extend(cls_labels)
out_scores.append(cls_scores)
out_boxes = tf.concat(out_boxes, axis=0)
out_scores = tf.concat(out_scores, axis=0)
boxes = tf.clip_by_value(out_boxes, 0.0, 1.0).numpy()
classes = np.array(out_labels)
scores = out_scores.numpy()
return boxes, classes, scores
# 이미지 패딩을 추가/제거해주는 함수
def pad_input_image(img, max_steps):
img_h, img_w, _ = img.shape
img_pad_h = 0
if img_h % max_steps > 0:
img_pad_h = max_steps - img_h % max_steps
img_pad_w = 0
if img_w % max_steps > 0:
img_pad_w = max_steps - img_w % max_steps
padd_val = np.mean(img, axis=(0, 1)).astype(np.uint8)
img = cv2.copyMakeBorder(img, 0, img_pad_h, 0, img_pad_w,
cv2.BORDER_CONSTANT, value=padd_val.tolist())
pad_params = (img_h, img_w, img_pad_h, img_pad_w)
return img, pad_params
def recover_pad(boxes, pad_params):
img_h, img_w, img_pad_h, img_pad_w = pad_params
box = np.reshape(boxes[0], [-1, 2, 2]) * [(img_pad_w + img_w) / img_w, (img_pad_h + img_h) / img_h]
boxes[0] = np.reshape(box, [-1, 4])
return boxes
# 이미지에 box를 그려주는 함수
def draw_box_on_face(img, boxes, classes, scores, box_index, class_list):
img_height = img.shape[0]
img_width = img.shape[1]
x_min = int(boxes[box_index][0] * img_width)
y_min = int(boxes[box_index][1] * img_height)
x_max = int(boxes[box_index][2] * img_width)
y_max = int(boxes[box_index][3] * img_height)
if classes[box_index] == 1:
color = (0, 255, 0)
else:
color = (0, 0, 255)
cv2.rectangle(img, (x_min, y_min), (x_max, y_max), color, 2)
if len(scores) > box_index :
score = "{:.4f}".format(scores[box_index])
class_name = class_list[classes[box_index]]
label = '{} {}'.format(class_name, score)
position = (x_min, y_min - 4)
cv2.putText(img, label, position, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255))
# 클라우드에 저장된 테스트용 이미지의 결과
filepath = os.path.join(PROJECT_PATH, 'checkpoints', 'weights_epoch_008.h5')
model.load_weights(filepath)
TEST_IMAGE_PATH = os.path.join(PROJECT_PATH, 'image_people.png')
img_raw = cv2.imread(TEST_IMAGE_PATH)
img_raw = cv2.resize(img_raw, (IMAGE_WIDTH, IMAGE_HEIGHT))
img = np.float32(img_raw.copy())
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img, pad_params = pad_input_image(img, max_steps=max(BOX_STEPS))
img = img / 255.0
boxes = default_box()
boxes = tf.cast(boxes, tf.float32)
predictions = model.predict(img[np.newaxis, ...])
pred_boxes, labels, scores = parse_predict(predictions, boxes)
pred_boxes = recover_pad(pred_boxes, pad_params)
for box_index in range(len(pred_boxes)):
draw_box_on_face(img_raw, pred_boxes, labels, scores, box_index, IMAGE_LABELS)
plt.imshow(cv2.cvtColor(img_raw, cv2.COLOR_BGR2RGB))
plt.show()