Let's implement a minimal yet effective integration of real-time camera handling while maintaining your existing analysis structure. I'll provide clear code blocks showing exactly where to make changes.
1. Add the Threaded Camera Class (at the top of the file):
# ================== ADD THIS CLASS ==================
import threading
from queue import Queue
class ThreadedCamera:
def __init__(self, source, fps):
self.cap = cv2.VideoCapture(source)
self.fps = fps
self.q = Queue(maxsize=30) # Small buffer to prevent frame drops
self.running = True
# Set camera FPS if supported
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
self.thread = threading.Thread(target=self._reader)
self.thread.daemon = True
self.thread.start()
def _reader(self):
while self.running:
ret, frame = self.cap.read()
if ret:
self.q.put(frame)
def read(self):
return self.q.get()
def release(self):
self.running = False
self.thread.join()
self.cap.release()
# ====================================================
2. Modify the CPRAnalyzer Initialization (replace existing video capture):
class CPRAnalyzer:
def __init__(self, camera_source, camera_fps): # CHANGED SIGNATURE
print(f"\\\\n[INIT] Initializing CPR Analyzer for camera (FPS: {camera_fps})")
# ========== REPLACE VIDEO CAPTURE ==========
self.cap = ThreadedCamera(camera_source, camera_fps)
self.fps = camera_fps # Use provided FPS
# ===========================================
# Keep existing initialization below
self.base = "camera_output" # Simplified path
self.output_video_path = os.path.abspath(f"{self.base}_processed.mp4")
self.video_writer = None
self._writer_initialized = False
# Keep original derived parameters (now using provided FPS)
self.sampling_interval_frames = int(round(self.fps * self.SAMPLING_INTERVAL))
self.error_threshold_frames = int(self.MIN_ERROR_DURATION / self.SAMPLING_INTERVAL)
self.reporting_interval_frames = int(self.REPORTING_INTERVAL / self.SAMPLING_INTERVAL)
# Rest of your existing initialization remains unchanged...
3. Update the Main Processing Loop (modify frame acquisition):
def run_analysis(self):
try:
print("\\\\n[RUN ANALYSIS] Starting analysis")
main_loop_start_time = time.time()
# Original variables
first_time_to_have_a_proccessed_frame = True
waiting_to_start_new_chunk = False
chunk_start_frame_index = 0
frame_counter = 0 # Track processed frames
while True:
# ========== MODIFIED FRAME ACQUISITION ==========
# Get frame from camera queue
frame = self.cap.read()
frame_counter += 1
# ===============================================
# Original processing continues unchanged
print(f"\\\\n[FRAME {frame_counter}]")
# Skip frame logic (now uses camera-based counter)
if frame_counter % self.sampling_interval_frames != 0:
formatted_warnings = self._format_warnings()
print(f"[SKIP FRAME] Skipping frame {frame_counter}")
continue
# Original processing pipeline
frame = self._handle_frame_rotation(frame)
is_complete_chunk, accept_frame, posture_warnings = self._process_frame(frame)
processed_frame = self._compose_frame(frame, accept_frame)
# Original chunk handling logic remains unchanged
# ...
# Video writing (modified initialization)
if not self._writer_initialized:
self._initialize_video_writer(processed_frame)
if self._writer_initialized:
self.video_writer.write(processed_frame)
# Original exit check
if self._check_exit():
break
# Original cleanup remains
4. Update Video Writer Initialization:
def _initialize_video_writer(self, frame):
"""Initialize with camera FPS"""
height, width = frame.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.video_writer = cv2.VideoWriter(
self.output_video_path,
fourcc,
self.fps, # Use camera FPS
(width, height)
self._writer_initialized = True
print(f"[VIDEO WRITER] Initialized at {self.fps} FPS")
Key Integration Points:
Usage Example:
if __name__ == "__main__":
# For webcam with known FPS (e.g., 30 FPS)
analyzer = CPRAnalyzer(camera_source=0, camera_fps=30)
analyzer.run_analysis()
This implementation: