Here's the integrated code with enhanced rescuer/patient differentiation, visualization, and a main function:
import cv2
import numpy as np
from collections import defaultdict
from ultralytics import YOLO
from deep_sort import DeepSort
# Initialize YOLOv8 Pose Estimation Model
pose_model = YOLO("yolov8n-pose.pt") # Official Ultralytics model
# Initialize DeepSORT Tracker
tracker = DeepSort(max_age=5, n_init=3)
# Tracked persons storage
tracked_persons = defaultdict(lambda: {"age": 0, "last_bbox": None, "last_keypoints": None})
def calculate_iou(bbox1, bbox2):
"""Calculate Intersection over Union between two bounding boxes."""
x1_1, y1_1, x2_1, y2_1 = bbox1
x1_2, y1_2, x2_2, y2_2 = bbox2
# Intersection area
xi1, yi1 = max(x1_1, x1_2), max(y1_1, y1_2)
xi2, yi2 = min(x2_1, x2_2), min(y2_1, y2_2)
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
# Union area
area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
union_area = area1 + area2 - inter_area
return inter_area / union_area if union_area != 0 else 0
def is_rescuer(candidate, frame_height):
"""Identify rescuer using posture analysis and keypoint positions."""
if not candidate["last_bbox"] or not candidate["last_keypoints"]:
return False
bbox = candidate["last_bbox"]
keypoints = candidate["last_keypoints"]
# Bounding box analysis
x1, y1, x2, y2 = bbox
width = x2 - x1
height = y2 - y1
aspect_ratio = width / height
# Keypoint analysis
nose_y = keypoints[0][1] # COCO nose index=0
# Rescuer criteria (vertical posture + high head position)
return aspect_ratio < 0.65 and nose_y < frame_height * 0.3
def process_frame(frame):
frame_height = frame.shape[0]
# Step 1: YOLO Pose Detection
results = pose_model(frame, verbose=False, conf=0.7)[0]
detections = []
for detection in results:
detections.append({
"bbox": detection.boxes.xyxy[0].cpu().numpy().tolist(),
"keypoints": detection.keypoints.xy[0].cpu().numpy().tolist(),
"confidence": detection.boxes.conf[0].cpu().item()
})
# Step 2: Update Tracker
bboxes = np.array([det["bbox"] for det in detections], dtype=np.float32)
confidences = np.array([det["confidence"] for det in detections], dtype=np.float32)
tracklets = tracker.update(bboxes, confidences, frame)
# Step 3: Update Tracked Persons with Keypoints
current_ids = set()
for tracklet in tracklets:
track_id = tracklet.track_id
tracklet_bbox = tracklet.to_tlbr()
# Find matching detection with highest IoU
best_match = None
best_iou = 0.0
for det in detections:
iou = calculate_iou(tracklet_bbox, det["bbox"])
if iou > best_iou:
best_iou = iou
best_match = det
# Update tracked person data
tracked_persons[track_id]["age"] += 1
tracked_persons[track_id]["last_bbox"] = tracklet_bbox
tracked_persons[track_id]["last_keypoints"] = best_match["keypoints"] if best_match else None
current_ids.add(track_id)
# Age decay for non-detected persons
for track_id in list(tracked_persons.keys()):
if track_id not in current_ids:
tracked_persons[track_id]["age"] = max(0, tracked_persons[track_id]["age"] - 1)
# Step 4: Find Rescuer and Patient
rescuer = None
patient = None
candidates = [{"id": tid, **data} for tid, data in tracked_persons.items() if data["age"] > 2]
for candidate in candidates:
if is_rescuer(candidate, frame_height):
if not rescuer or candidate["age"] > rescuer["age"]:
rescuer = candidate
else:
if not patient or candidate["age"] > patient["age"]:
patient = candidate
return rescuer, patient
def draw_annotations(frame, rescuer, patient):
# Draw rescuer (A) with green box
if rescuer and rescuer["last_bbox"]:
x1, y1, x2, y2 = map(int, rescuer["last_bbox"])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, "Rescuer A", (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)
# Draw patient (B) with red box
if patient and patient["last_bbox"]:
x1, y1, x2, y2 = map(int, patient["last_bbox"])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(frame, "Patient B", (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)
return frame
def main():
cap = cv2.VideoCapture(0) # Use 0 for webcam or file path for video
while True:
ret, frame = cap.read()
if not ret:
break
# Process frame
rescuer, patient = process_frame(frame)
# Add visualizations
frame = draw_annotations(frame, rescuer, patient)
# Display results
cv2.imshow('CPR Monitoring', frame)
# Exit on 'q'
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
Key improvements in this integration:
age > 2
) for considerationTo run:
pip install ultralytics deep-sort-realtime opencv-python
python cpr_monitor.py
The system will: