""" Zone-based Entry/Exit Tracker Tracks people entering and exiting based on zone detection with cooldown mechanism. """ import time import cv2 from collections import defaultdict class ZoneTracker: def __init__(self, frame_width, entry_zone_percent=0.4, exit_zone_percent=0.4, cooldown_seconds=2.0, center_buffer_percent=0.1): """ Initialize the zone tracker. Args: frame_width: Width of the video frame in pixels entry_zone_percent: Percentage of frame width for entry zone (left side) exit_zone_percent: Percentage of frame width for exit zone (right side) cooldown_seconds: Time in seconds before same person can be counted again center_buffer_percent: Percentage of center to ignore (prevents false counts) """ self.frame_width = frame_width self.entry_zone_percent = entry_zone_percent self.exit_zone_percent = exit_zone_percent self.cooldown_seconds = cooldown_seconds self.center_buffer_percent = center_buffer_percent # Calculate zone boundaries self.entry_zone_end = int(frame_width * entry_zone_percent) buffer_width = int(frame_width * center_buffer_percent) self.center_start = int(frame_width / 2 - buffer_width / 2) self.center_end = int(frame_width / 2 + buffer_width / 2) self.exit_zone_start = int(frame_width * (1 - exit_zone_percent)) # Counters self.total_entered = 0 self.total_exited = 0 # Track faces with timestamps to prevent double-counting # Key: face_id (unique ID), Value: {'centroid': (x, y), 'zone': zone, 'timestamp': time, 'size': (w, h)} self.tracked_faces = {} self.face_cooldowns = defaultdict(float) # Track last seen zone for each face (to detect zone transitions) self.last_zone = {} # Unique face ID counter self.next_face_id = 1 def get_zone(self, face_x, face_w): """ Determine which zone a face is in based on its position. Args: face_x: X coordinate of face (left edge) face_w: Width of face bounding box face_center: Center X of the face Returns: 'entry' if in entry zone, 'exit' if in exit zone, 'center' if in buffer, None otherwise """ face_center = face_x + face_w // 2 # Check if in center buffer zone (ignore) if self.center_start <= face_center <= self.center_end: return 'center' # Check entry zone (left side) if face_center < self.entry_zone_end: return 'entry' # Check exit zone (right side) if face_center > self.exit_zone_start: return 'exit' # In the middle zone (between entry/exit and center buffer) return None def _calculate_centroid(self, face_x, face_y, face_w, face_h): """Calculate the centroid of a face bounding box.""" return (face_x + face_w // 2, face_y + face_h // 2) def _calculate_distance(self, pt1, pt2): """Calculate Euclidean distance between two points.""" return ((pt1[0] - pt2[0])**2 + (pt1[1] - pt2[1])**2)**0.5 def _match_face_to_tracked(self, centroid, size): """ Match a detected face to an existing tracked face based on proximity. Args: centroid: (x, y) centroid of the detected face size: (w, h) size of the detected face Returns: face_id if matched, None if new face """ max_distance = 100 # Maximum pixel distance to consider it the same face max_size_diff = 50 # Maximum size difference to consider it the same face for face_id, face_data in self.tracked_faces.items(): # Skip if face hasn't been seen recently (within last 2 seconds) time_since_seen = time.time() - face_data.get('timestamp', 0) if time_since_seen > 2.0: continue tracked_centroid = face_data.get('centroid') tracked_size = face_data.get('size', (0, 0)) if tracked_centroid: distance = self._calculate_distance(centroid, tracked_centroid) size_diff = abs(size[0] + size[1] - tracked_size[0] - tracked_size[1]) # Match if close enough in position and size if distance < max_distance and size_diff < max_size_diff: return face_id return None def process_faces(self, faces): """ Process detected faces and update entry/exit counts. Args: faces: List of tuples (x, y, w, h, confidence) from face detector Returns: Dictionary with updated counts and zone info """ current_time = time.time() current_zones = {} # Process each detected face for face in faces: face_x, face_y, face_w, face_h, confidence = face centroid = self._calculate_centroid(face_x, face_y, face_w, face_h) zone = self.get_zone(face_x, face_w) if zone is None or zone == 'center': continue # Try to match this face to an existing tracked face face_id = self._match_face_to_tracked(centroid, (face_w, face_h)) if face_id is None: # New face - assign a new ID face_id = self.next_face_id self.next_face_id += 1 current_zones[face_id] = zone # Update tracked face data self.tracked_faces[face_id] = { 'centroid': centroid, 'zone': zone, 'timestamp': current_time, 'size': (face_w, face_h) } # Check if this face is in cooldown if face_id in self.face_cooldowns: if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds: # Still in cooldown, update zone but don't count self.last_zone[face_id] = zone continue # Check for zone transitions or first detection if face_id not in self.last_zone: # First time seeing this face - count if in entry/exit zone self.last_zone[face_id] = zone # Count on first detection in entry/exit zones if zone == 'entry': # Person entered (first detected in entry zone) self.total_entered += 1 self.face_cooldowns[face_id] = current_time elif zone == 'exit': # Person exited (first detected in exit zone) self.total_exited += 1 self.face_cooldowns[face_id] = current_time else: # Face has been seen before - check for valid transition last_zone = self.last_zone[face_id] # Only count if we have a clear zone transition # Entry: person transitions to entry zone from non-entry zone # Exit: person transitions to exit zone from non-exit zone if zone == 'entry' and last_zone != 'entry': # Person entered (transitioned to entry zone) self.total_entered += 1 self.face_cooldowns[face_id] = current_time self.last_zone[face_id] = zone elif zone == 'exit' and last_zone != 'exit': # Person exited (transitioned to exit zone) self.total_exited += 1 self.face_cooldowns[face_id] = current_time self.last_zone[face_id] = zone else: # Same zone or transition we don't care about - just update self.last_zone[face_id] = zone # Clean up old tracking data for faces no longer detected faces_to_remove = [] for face_id in list(self.last_zone.keys()): if face_id not in current_zones: # Face no longer detected, but keep in memory for a bit if face_id in self.tracked_faces: last_seen = self.tracked_faces[face_id].get('timestamp', 0) if current_time - last_seen > 5.0: # Remove after 5 seconds faces_to_remove.append(face_id) for face_id in faces_to_remove: if face_id in self.last_zone: del self.last_zone[face_id] if face_id in self.tracked_faces: del self.tracked_faces[face_id] if face_id in self.face_cooldowns: del self.face_cooldowns[face_id] return { 'total_entered': self.total_entered, 'total_exited': self.total_exited, 'current_occupancy': self.total_entered - self.total_exited, 'zones': current_zones } def get_counts(self): """Get current count statistics.""" return { 'total_entered': self.total_entered, 'total_exited': self.total_exited, 'current_occupancy': self.total_entered - self.total_exited } def reset_counts(self): """Reset all counters and tracking data.""" self.total_entered = 0 self.total_exited = 0 self.tracked_faces.clear() self.face_cooldowns.clear() self.last_zone.clear() def draw_zones(self, frame): """ Draw zone boundaries on the frame for visualization. Args: frame: Frame to draw on Returns: Frame with zone boundaries drawn """ result_frame = frame.copy() h = frame.shape[0] # Draw entry zone (left, green) cv2.rectangle(result_frame, (0, 0), (self.entry_zone_end, h), (0, 255, 0), 2) cv2.putText(result_frame, "ENTRY", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) # Draw exit zone (right, red) cv2.rectangle(result_frame, (self.exit_zone_start, 0), (self.frame_width, h), (0, 0, 255), 2) cv2.putText(result_frame, "EXIT", (self.exit_zone_start + 10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) # Draw center buffer (yellow, semi-transparent) overlay = result_frame.copy() cv2.rectangle(overlay, (self.center_start, 0), (self.center_end, h), (0, 255, 255), -1) cv2.addWeighted(overlay, 0.2, result_frame, 0.8, 0, result_frame) return result_frame