Here's the integrated and corrected code combining YOLOv8 pose estimation with DeepSORT tracking, ensuring keypoints are properly linked to tracked IDs:
import numpy as np
from collections import defaultdict
from ultralytics import YOLO
from deep_sort import DeepSort
# Initialize YOLOv8 Pose Estimation Model
pose_model = YOLO("yolov8n-pose.pt") # Official Ultralytics model
# Initialize DeepSORT Tracker
tracker = DeepSort(max_age=5, n_init=3)
# Tracked persons storage
tracked_persons = defaultdict(lambda: {"age": 0, "last_bbox": None, "last_keypoints": None})
def calculate_iou(bbox1, bbox2):
"""Calculate Intersection over Union between two bounding boxes."""
x1_1, y1_1, x2_1, y2_1 = bbox1
x1_2, y1_2, x2_2, y2_2 = bbox2
# Intersection area
xi1, yi1 = max(x1_1, x1_2), max(y1_1, y1_2)
xi2, yi2 = min(x2_1, x2_2), min(y2_1, y2_2)
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
# Union area
area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
union_area = area1 + area2 - inter_area
return inter_area / union_area if union_area != 0 else 0
def process_frame(frame):
# Step 1: YOLO Pose Detection
results = pose_model(frame, verbose=False, conf=0.7)[0]
detections = []
for detection in results:
detections.append({
"bbox": detection.boxes.xyxy[0].cpu().numpy().tolist(),
"keypoints": detection.keypoints.xy[0].cpu().numpy().tolist(),
"confidence": detection.boxes.conf[0].cpu().item()
})
# Step 2: Update Tracker
bboxes = np.array([det["bbox"] for det in detections], dtype=np.float32)
confidences = np.array([det["confidence"] for det in detections], dtype=np.float32)
tracklets = tracker.update(bboxes, confidences, frame)
# Step 3: Update Tracked Persons with Keypoints
current_ids = set()
for tracklet in tracklets:
track_id = tracklet.track_id
tracklet_bbox = tracklet.to_tlbr()
# Find matching detection with highest IoU
best_match = None
best_iou = 0.0
for det in detections:
iou = calculate_iou(tracklet_bbox, det["bbox"])
if iou > best_iou:
best_iou = iou
best_match = det
# Update tracked person data
tracked_persons[track_id]["age"] += 1
tracked_persons[track_id]["last_bbox"] = tracklet_bbox
tracked_persons[track_id]["last_keypoints"] = best_match["keypoints"] if best_match else None
current_ids.add(track_id)
# Age decay for non-detected persons
for track_id in list(tracked_persons.keys()):
if track_id not in current_ids:
tracked_persons[track_id]["age"] = max(0, tracked_persons[track_id]["age"] - 1)
# Step 4: Select Top 2 Candidates
active_candidates = [{"id": tid, **data} for tid, data in tracked_persons.items() if data["age"] > 0]
candidates = sorted(active_candidates, key=lambda x: -x["age"])[:2]
# Step 5: Assign CPR Roles (A = Leftmost, B = Rightmost)
if len(candidates) >= 2:
candidates.sort(key=lambda x: x["last_bbox"][0]) # Sort by leftmost x-coordinate
person_a, person_b = candidates[0], candidates[1]
# CPR-specific validation (optional)
if person_a["last_keypoints"] and person_b["last_keypoints"]:
# Example: Check if Person A has lower wrist positions (compressions)
a_right_wrist_y = person_a["last_keypoints"][10][1] # COCO keypoint index 10
b_right_wrist_y = person_b["last_keypoints"][10][1]
if b_right_wrist_y < a_right_wrist_y:
person_a, person_b = person_b, person_a # Swap roles
else:
# Fallback to last known positions
person_a, person_b = None, None
return person_a, person_b
Install requirements:
pip install ultralytics deep-sort-realtime
For real-time webcam use:
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
person_a, person_b = process_frame(frame)
# Add visualization/CPR guidance logic
Keypoint indices follow COCO format (0=nose, 5-6=shoulders, 9-10=wrists, etc.)
This implementation maintains stable tracking of CPR participants while ensuring pose data integrity for medical guidance applications.