FCOS核心代码阅读笔记

Contents
  1. fcos_core/modeling/rpn/fcos/fcos.py
  2. fcos_core/modeling/rpn/fcos/loss.py
  3. fcos_core/modeling/rpn/fcos/inference.py
  4. 参考资料

fcos_core/modeling/rpn/fcos/fcos.py

这个文件主要包括fcos的网络结构,包含三个loss: loss_cls, loss_reg, loss_centerness。

FCOS Architecture

其中一个关键函数是compute_locations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def compute_locations(self, features):
locations = []
for level, feature in enumerate(features):
h, w = feature.size()[-2:]
locations_per_level = self.compute_locations_per_level(
h, w, self.fpn_strides[level],
feature.device
)
locations.append(locations_per_level)
return locations

def compute_locations_per_level(self, h, w, stride, device):
shifts_x = torch.arange(
0, w * stride, step=stride,
dtype=torch.float32, device=device
)
shifts_y = torch.arange(
0, h * stride, step=stride,
dtype=torch.float32, device=device
)
shift_y, shift_x = torch.meshgrid(shifts_y, shifts_x)
shift_x = shift_x.reshape(-1)
shift_y = shift_y.reshape(-1)
locations = torch.stack((shift_x, shift_y), dim=1) + stride // 2
return locations

对于fpn的5个特征图:P3,P4,P5,P6,P7,计算特征图上的点映射到原图的位置,即生成一个二维网格(meshgrid)。

上面代码的24行加上stride // 2是为了解决一个向下取整造成的问题,使原图上的对应点尽可能接近location(x,y)的感受野中心。

最后得到的locations是一个list,包含5个level特征图的所有点映射到原图的坐标。

fcos_core/modeling/rpn/fcos/loss.py

上述locations作为了prepare_targets函数的第一个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def prepare_targets(self, points, targets):
object_sizes_of_interest = [
[-1, 64],
[64, 128],
[128, 256],
[256, 512],
[512, INF],
]
expanded_object_sizes_of_interest = []
for l, points_per_level in enumerate(points):
# a.new_tensor has the same type with a
object_sizes_of_interest_per_level = \
points_per_level.new_tensor(object_sizes_of_interest[l])
expanded_object_sizes_of_interest.append(
object_sizes_of_interest_per_level[None].expand(len(points_per_level), -1)
)

expanded_object_sizes_of_interest = torch.cat(expanded_object_sizes_of_interest, dim=0)
num_points_per_level = [len(points_per_level) for points_per_level in points]
self.num_points_per_level = num_points_per_level
points_all_level = torch.cat(points, dim=0)
# shape: (P, N), (P, N, 4), P is the image nums
labels, reg_targets = self.compute_targets_for_locations(
points_all_level, targets, expanded_object_sizes_of_interest
)

# labels[i] is a tuple
for i in range(len(labels)):
labels[i] = torch.split(labels[i], num_points_per_level, dim=0)
reg_targets[i] = torch.split(reg_targets[i], num_points_per_level, dim=0)

labels_level_first = []
reg_targets_level_first = []
# labels_level_first[level] has P*N_level elements
# reg_targets_level_first[level] has shape (P*N_level, 4)
for level in range(len(points)):
labels_level_first.append(
torch.cat([labels_per_im[level] for labels_per_im in labels], dim=0)
)

reg_targets_per_level = torch.cat([
reg_targets_per_im[level]
for reg_targets_per_im in reg_targets
], dim=0)

if self.norm_reg_targets:
reg_targets_per_level = reg_targets_per_level / self.fpn_strides[level]
reg_targets_level_first.append(reg_targets_per_level)

return labels_level_first, reg_targets_level_first

首先构造了一个expanded_object_sizes_of_interest变量,对于每一个采样点,都需要有一个对应的sizes_of_interest。expanded_object_sizes_of_interest按照每个level创建了该level所有采样点的sizes_of_interest,然后用torch.cat合并起来,形成了(N, 2)形状的数据,N为所有采样点的个数。

num_points_per_level是每个level的点个数,这个用于后续的操作。

points_all_level包含了所有采样点,跟expanded_object_sizes_of_interest类似,也用torch.cat合并,形成了(N, 2)的形状。

compute_targets_for_locations函数使用points_all_level, targets, expanded_object_sizes_of_interest计算分类和回归的标注即labels和reg_targets,形状为(P, N)和(P, N, 4),P为图像数量。

compute_targets_for_locations函数得到的labels和reg_targets是把所有level的数据拼接在一起的。现在要根据每个level的点个数即num_points_per_level把他们拆开,按照level优先做成一个list。最后的结果是labels_level_first[level],拥有P*N_level个元素。reg_targets_level_first[level]形状为(P*N_level, 4),N_level为该level采样点的个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def compute_targets_for_locations(self, locations, targets, object_sizes_of_interest):
labels = []
reg_targets = []
xs, ys = locations[:, 0], locations[:, 1]

for im_i in range(len(targets)):
targets_per_im = targets[im_i]
assert targets_per_im.mode == "xyxy"
bboxes = targets_per_im.bbox
labels_per_im = targets_per_im.get_field("labels")
area = targets_per_im.area()

l = xs[:, None] - bboxes[:, 0][None]
t = ys[:, None] - bboxes[:, 1][None]
r = bboxes[:, 2][None] - xs[:, None]
b = bboxes[:, 3][None] - ys[:, None]
# shape: (N, M, 4)
reg_targets_per_im = torch.stack([l, t, r, b], dim=2)

if self.center_sampling_radius > 0:
is_in_boxes = self.get_sample_region(
bboxes,
self.fpn_strides,
self.num_points_per_level,
xs, ys,
radius=self.center_sampling_radius
)
else:
# no center sampling, it will use all the locations within a ground-truth box
is_in_boxes = reg_targets_per_im.min(dim=2)[0] > 0

max_reg_targets_per_im = reg_targets_per_im.max(dim=2)[0]
# limit the regression range for each location
is_cared_in_the_level = \
(max_reg_targets_per_im >= object_sizes_of_interest[:, [0]]) & \
(max_reg_targets_per_im <= object_sizes_of_interest[:, [1]])

locations_to_gt_area = area[None].repeat(len(locations), 1)
locations_to_gt_area[is_in_boxes == 0] = INF
locations_to_gt_area[is_cared_in_the_level == 0] = INF

# if there are still more than one objects for a location,
# we choose the one with minimal area

# shape (N)
locations_to_min_area, locations_to_gt_inds = locations_to_gt_area.min(dim=1)

reg_targets_per_im = reg_targets_per_im[range(len(locations)), locations_to_gt_inds]
labels_per_im = labels_per_im[locations_to_gt_inds]
labels_per_im[locations_to_min_area == INF] = 0

labels.append(labels_per_im)
reg_targets.append(reg_targets_per_im)

return labels, reg_targets

由于每张图像可能含有不同数量的bbox,所以先读取targets的labels和bbox,创建labels_per_im变量和reg_targets_per_im变量(形状为(N, M, 4),N为所有采样点的个数,M为bbox数量)。

根据论文3.2节,不同尺度大小的bbox将被分配到不同的fpn level去计算,不在level对应范围的bbox将被忽略。这样操作之后若某一个采样点仍然对应到多个bbox,则取最小面积的bbox。fcos中的实现是把level对应范围之外的bbox面积设成无穷大,用locations_to_gt_area这个变量实现位置到gt中面积的映射,代码48行使reg_targets_per_im取到面积最小的bbox。同理每个采样点也有一个对应的类别,对于不在gt bbox的点类别设成背景。最终的labels_per_im形状为(N),reg_targets_per_im形状为(N, 4)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __call__(self, locations, box_cls, box_regression, centerness, targets):
"""
Arguments:
locations (list[Tensor])
box_cls (list[Tensor])
box_regression (list[Tensor])
centerness (list[Tensor])
targets (list[BoxList])

Returns:
cls_loss (Tensor)
reg_loss (Tensor)
centerness_loss (Tensor)
"""
N = box_cls[0].size(0)
num_classes = box_cls[0].size(1)
labels, reg_targets = self.prepare_targets(locations, targets)

box_cls_flatten = []
box_regression_flatten = []
centerness_flatten = []
labels_flatten = []
reg_targets_flatten = []
for l in range(len(labels)):
box_cls_flatten.append(box_cls[l].permute(0, 2, 3, 1).reshape(-1, num_classes))
box_regression_flatten.append(box_regression[l].permute(0, 2, 3, 1).reshape(-1, 4))
labels_flatten.append(labels[l].reshape(-1))
reg_targets_flatten.append(reg_targets[l].reshape(-1, 4))
centerness_flatten.append(centerness[l].reshape(-1))

box_cls_flatten = torch.cat(box_cls_flatten, dim=0)
box_regression_flatten = torch.cat(box_regression_flatten, dim=0)
centerness_flatten = torch.cat(centerness_flatten, dim=0)
labels_flatten = torch.cat(labels_flatten, dim=0)
reg_targets_flatten = torch.cat(reg_targets_flatten, dim=0)

pos_inds = torch.nonzero(labels_flatten > 0).squeeze(1)

box_regression_flatten = box_regression_flatten[pos_inds]
reg_targets_flatten = reg_targets_flatten[pos_inds]
centerness_flatten = centerness_flatten[pos_inds]

num_gpus = get_num_gpus()
# sync num_pos from all gpus
total_num_pos = reduce_sum(pos_inds.new_tensor([pos_inds.numel()])).item()
num_pos_avg_per_gpu = max(total_num_pos / float(num_gpus), 1.0)

cls_loss = self.cls_loss_func(
box_cls_flatten,
labels_flatten.int()
) / num_pos_avg_per_gpu

if pos_inds.numel() > 0:
centerness_targets = self.compute_centerness_targets(reg_targets_flatten)

# average sum_centerness_targets from all gpus,
# which is used to normalize centerness-weighed reg loss
sum_centerness_targets_avg_per_gpu = \
reduce_sum(centerness_targets.sum()).item() / float(num_gpus)

reg_loss = self.box_reg_loss_func(
box_regression_flatten,
reg_targets_flatten,
centerness_targets
) / sum_centerness_targets_avg_per_gpu
centerness_loss = self.centerness_loss_func(
centerness_flatten,
centerness_targets
) / num_pos_avg_per_gpu
else:
reg_loss = box_regression_flatten.sum()
reduce_sum(centerness_flatten.new_tensor([0.0]))
centerness_loss = centerness_flatten.sum()

return cls_loss, reg_loss, centerness_loss

fcos_core/modeling/rpn/fcos/inference.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def forward_for_single_feature_map(
self, locations, box_cls,
box_regression, centerness,
image_sizes):
"""
Arguments:
anchors: list[BoxList]
box_cls: tensor of size N, A * C, H, W
box_regression: tensor of size N, A * 4, H, W
"""
N, C, H, W = box_cls.shape

# put in the same format as locations
box_cls = box_cls.view(N, C, H, W).permute(0, 2, 3, 1)
box_cls = box_cls.reshape(N, -1, C).sigmoid()
box_regression = box_regression.view(N, 4, H, W).permute(0, 2, 3, 1)
box_regression = box_regression.reshape(N, -1, 4)
centerness = centerness.view(N, 1, H, W).permute(0, 2, 3, 1)
centerness = centerness.reshape(N, -1).sigmoid()

candidate_inds = box_cls > self.pre_nms_thresh
pre_nms_top_n = candidate_inds.view(N, -1).sum(1)
pre_nms_top_n = pre_nms_top_n.clamp(max=self.pre_nms_top_n)

# multiply the classification scores with centerness scores
box_cls = box_cls * centerness[:, :, None]

results = []
for i in range(N):
per_box_cls = box_cls[i]
per_candidate_inds = candidate_inds[i]
per_box_cls = per_box_cls[per_candidate_inds]

per_candidate_nonzeros = per_candidate_inds.nonzero()
per_box_loc = per_candidate_nonzeros[:, 0]
per_class = per_candidate_nonzeros[:, 1] + 1

per_box_regression = box_regression[i]
per_box_regression = per_box_regression[per_box_loc]
per_locations = locations[per_box_loc]

per_pre_nms_top_n = pre_nms_top_n[i]

if per_candidate_inds.sum().item() > per_pre_nms_top_n.item():
per_box_cls, top_k_indices = \
per_box_cls.topk(per_pre_nms_top_n, sorted=False)
per_class = per_class[top_k_indices]
per_box_regression = per_box_regression[top_k_indices]
per_locations = per_locations[top_k_indices]

detections = torch.stack([
per_locations[:, 0] - per_box_regression[:, 0],
per_locations[:, 1] - per_box_regression[:, 1],
per_locations[:, 0] + per_box_regression[:, 2],
per_locations[:, 1] + per_box_regression[:, 3],
], dim=1)

h, w = image_sizes[i]
boxlist = BoxList(detections, (int(w), int(h)), mode="xyxy")
boxlist.add_field("labels", per_class)
boxlist.add_field("scores", torch.sqrt(per_box_cls))
boxlist = boxlist.clip_to_image(remove_empty=False)
boxlist = remove_small_boxes(boxlist, self.min_size)
results.append(boxlist)

return results

这个文件主要完成推理阶段的操作,forward_for_single_feature_map对每个fpn level的结果做后处理。

网络输出的box_cls, box_regression, centerness和locations作为输入,由pre_nms_thresh取阈值,得到大于阈值的点的索引candidate_inds。

对pre_nms_top_n做clamp操作使其最大值为self.pre_nms_top_n,也就是最多保留这么多个点。如果candidate_inds求和(即保留的点数量)大于pre_nms_top_n,则保留box_cls, class, box_regression, locations的top_k个结果。最后把结果添加到boxlist的结构中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def select_over_all_levels(self, boxlists):
num_images = len(boxlists)
results = []
for i in range(num_images):
# multiclass nms
result = boxlist_ml_nms(boxlists[i], self.nms_thresh)
number_of_detections = len(result)

# Limit to max_per_image detections **over all classes**
if number_of_detections > self.fpn_post_nms_top_n > 0:
cls_scores = result.get_field("scores")
image_thresh, _ = torch.kthvalue(
cls_scores.cpu(),
number_of_detections - self.fpn_post_nms_top_n + 1
)
keep = cls_scores >= image_thresh.item()
keep = torch.nonzero(keep).squeeze(1)
result = result[keep]
results.append(result)
return results

select_over_all_levels对刚才的所有level的结果做nms(C++实现),然后取得分最高的fpn_post_nms_top_n个检测结果,使用torch.kthvalue得到cls_scores的阈值。

参考资料

[Paper] [Code] Tian, Zhi, et al. “FCOS: Fully Convolutional One-Stage Object Detection.” arXiv preprint arXiv:1904.01355 (2019).