def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05):
pose_entries = []
all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist])
points_per_limb = 10
grid = np.arange(points_per_limb, dtype=np.float32).reshape(1, -1, 1)
all_keypoints_by_type = [np.array(keypoints, np.float32) for keypoints in all_keypoints_by_type]
for part_id in range(len(BODY_PARTS_PAF_IDS)):
part_pafs = pafs[:, :, BODY_PARTS_PAF_IDS[part_id]]
kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]]
kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]]
n = len(kpts_a)
m = len(kpts_b)
if n == 0 or m == 0:
continue
# Get vectors between all pairs of keypoints, i.e. candidate limb vectors.
a = kpts_a[:, :2]
a = np.broadcast_to(a[None], (m, n, 2))
b = kpts_b[:, :2]
vec_raw = (b[:, None, :] - a).reshape(-1, 1, 2)
# Sample points along every candidate limb vector.
steps = (1 / (points_per_limb - 1) * vec_raw)
points = steps * grid + a.reshape(-1, 1, 2)
points = points.round().astype(dtype=np.int32)
x = points[..., 0].ravel()
y = points[..., 1].ravel()
# Compute affinity score between candidate limb vectors and part affinity field.
field = part_pafs[y, x].reshape(-1, points_per_limb, 2)
vec_norm = np.linalg.norm(vec_raw, ord=2, axis=-1, keepdims=True)
vec = vec_raw / (vec_norm + 1e-6)
affinity_scores = (field * vec).sum(-1).reshape(-1, points_per_limb)
valid_affinity_scores = affinity_scores > min_paf_score
valid_num = valid_affinity_scores.sum(1)
affinity_scores = (affinity_scores * valid_affinity_scores).sum(1) / (valid_num + 1e-6)
success_ratio = valid_num / points_per_limb
# Get a list of limbs according to the obtained affinity score.
valid_limbs = np.where(np.logical_and(affinity_scores > 0, success_ratio > 0.8))[0]
if len(valid_limbs) == 0:
continue
b_idx, a_idx = np.divmod(valid_limbs, n)
affinity_scores = affinity_scores[valid_limbs]
# Suppress incompatible connections.
a_idx, b_idx, affinity_scores = connections_nms(a_idx, b_idx, affinity_scores)
connections = list(zip(kpts_a[a_idx, 3].astype(np.int32),
kpts_b[b_idx, 3].astype(np.int32),
affinity_scores))
if len(connections) == 0:
continue
if part_id == 0:
pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))]
for i in range(len(connections)):
pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0]
pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1]
pose_entries[i][-1] = 2
pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
elif part_id == 17 or part_id == 18:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1:
pose_entries[j][kpt_b_id] = connections[i][1]
elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1:
pose_entries[j][kpt_a_id] = connections[i][0]
continue
else:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
num = 0
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0]:
pose_entries[j][kpt_b_id] = connections[i][1]
num += 1
pose_entries[j][-1] += 1
pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2]
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_a_id] = connections[i][0]
pose_entry[kpt_b_id] = connections[i][1]
pose_entry[-1] = 2
pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
pose_entries.append(pose_entry)
filtered_entries = []
for i in range(len(pose_entries)):
if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2):
continue
filtered_entries.append(pose_entries[i])
pose_entries = np.asarray(filtered_entries)
return pose_entries, all_keypoints
The group_keypoints
function is responsible for assembling detected keypoints into full human poses. It uses the detected keypoints, Part Affinity Fields (PAFs), and predefined body part connections to group keypoints into pose entries, which represent individual humans in the image.
def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05):
all_keypoints_by_type
: A list of detected keypoints grouped by type (e.g., nose, shoulders, elbows, etc.).pafs
: The Part Affinity Fields (PAFs), which encode the direction and strength of connections between keypoints.pose_entry_size
: The size of each pose entry (default is 20, which includes keypoints, scores, and metadata).min_paf_score
: The minimum PAF score required to consider a connection valid.pose_entries
: A list of pose entries, where each entry represents a detected human pose.all_keypoints
: A flattened array of all detected keypoints.pose_entries = []
all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist])
points_per_limb = 10
grid = np.arange(points_per_limb, dtype=np.float32).reshape(1, -1, 1)
all_keypoints_by_type = [np.array(keypoints, np.float32) for keypoints in all_keypoints_by_type]
pose_entries
: A list to store the final grouped poses.all_keypoints
: A flattened array of all detected keypoints.points_per_limb
: The number of points sampled along each candidate limb.grid
: A grid used to sample points along candidate limb vectors.all_keypoints_by_type
: Converts the keypoints into NumPy arrays for efficient processing.