Files
PeopleCounter/zone_tracker.py

274 lines
11 KiB
Python

"""
Zone-based Entry/Exit Tracker
Tracks people entering and exiting based on zone detection with cooldown mechanism.
"""
import time
import cv2
from collections import defaultdict
class ZoneTracker:
def __init__(self, frame_width, entry_zone_percent=0.4, exit_zone_percent=0.4,
cooldown_seconds=4.0, center_buffer_percent=0.1):
"""
Initialize the zone tracker.
Args:
frame_width: Width of the video frame in pixels
entry_zone_percent: Percentage of frame width for entry zone (left side)
exit_zone_percent: Percentage of frame width for exit zone (right side)
cooldown_seconds: Time in seconds before same person can be counted again
center_buffer_percent: Percentage of center to ignore (prevents false counts)
"""
self.frame_width = frame_width
self.entry_zone_percent = entry_zone_percent
self.exit_zone_percent = exit_zone_percent
self.cooldown_seconds = cooldown_seconds
self.center_buffer_percent = center_buffer_percent
# Calculate zone boundaries
self.entry_zone_end = int(frame_width * entry_zone_percent)
buffer_width = int(frame_width * center_buffer_percent)
self.center_start = int(frame_width / 2 - buffer_width / 2)
self.center_end = int(frame_width / 2 + buffer_width / 2)
self.exit_zone_start = int(frame_width * (1 - exit_zone_percent))
# Counters
self.total_entered = 0
self.total_exited = 0
# Track faces with timestamps to prevent double-counting
# Key: face_id (unique ID), Value: {'centroid': (x, y), 'zone': zone, 'timestamp': time, 'size': (w, h)}
self.tracked_faces = {}
self.face_cooldowns = defaultdict(float)
# Track last seen zone for each face (to detect zone transitions)
self.last_zone = {}
# Unique face ID counter
self.next_face_id = 1
def get_zone(self, face_x, face_w):
"""
Determine which zone a face is in based on its position.
Args:
face_x: X coordinate of face (left edge)
face_w: Width of face bounding box
face_center: Center X of the face
Returns:
'entry' if in entry zone, 'exit' if in exit zone, 'center' if in buffer, None otherwise
"""
face_center = face_x + face_w // 2
# Check if in center buffer zone (ignore)
if self.center_start <= face_center <= self.center_end:
return 'center'
# Check entry zone (left side)
if face_center < self.entry_zone_end:
return 'entry'
# Check exit zone (right side)
if face_center > self.exit_zone_start:
return 'exit'
# In the middle zone (between entry/exit and center buffer)
return None
def _calculate_centroid(self, face_x, face_y, face_w, face_h):
"""Calculate the centroid of a face bounding box."""
return (face_x + face_w // 2, face_y + face_h // 2)
def _calculate_distance(self, pt1, pt2):
"""Calculate Euclidean distance between two points."""
return ((pt1[0] - pt2[0])**2 + (pt1[1] - pt2[1])**2)**0.5
def _match_face_to_tracked(self, centroid, size):
"""
Match a detected face to an existing tracked face based on proximity.
Args:
centroid: (x, y) centroid of the detected face
size: (w, h) size of the detected face
Returns:
face_id if matched, None if new face
"""
max_distance = 150 # Maximum pixel distance to consider it the same face
max_size_diff = 100 # Maximum size difference to consider it the same face
for face_id, face_data in self.tracked_faces.items():
# Skip if face hasn't been seen recently (within last 2 seconds)
time_since_seen = time.time() - face_data.get('timestamp', 0)
if time_since_seen > 2.0:
continue
tracked_centroid = face_data.get('centroid')
tracked_size = face_data.get('size', (0, 0))
if tracked_centroid:
distance = self._calculate_distance(centroid, tracked_centroid)
size_diff = abs(size[0] + size[1] - tracked_size[0] - tracked_size[1])
# Match if close enough in position and size
if distance < max_distance and size_diff < max_size_diff:
return face_id
return None
def process_faces(self, faces):
"""
Process detected faces and update entry/exit counts.
Args:
faces: List of tuples (x, y, w, h, confidence) from face detector
Returns:
Dictionary with updated counts and zone info
"""
current_time = time.time()
current_zones = {}
# Process each detected face
for face in faces:
face_x, face_y, face_w, face_h, confidence = face
centroid = self._calculate_centroid(face_x, face_y, face_w, face_h)
zone = self.get_zone(face_x, face_w)
if zone is None or zone == 'center':
continue
# Try to match this face to an existing tracked face
face_id = self._match_face_to_tracked(centroid, (face_w, face_h))
if face_id is None:
# New face - assign a new ID
face_id = self.next_face_id
self.next_face_id += 1
current_zones[face_id] = zone
# Update tracked face data
self.tracked_faces[face_id] = {
'centroid': centroid,
'zone': zone,
'timestamp': current_time,
'size': (face_w, face_h)
}
# Check if this face is in cooldown
if face_id in self.face_cooldowns:
if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds:
# Still in cooldown, update zone but don't count
self.last_zone[face_id] = zone
continue
# Check for zone transitions or first detection
if face_id not in self.last_zone:
# First time seeing this face - count if in entry/exit zone
self.last_zone[face_id] = zone
# Count on first detection in entry/exit zones
if zone == 'entry':
# Person entered (first detected in entry zone)
self.total_entered += 1
self.face_cooldowns[face_id] = current_time
elif zone == 'exit':
# Person exited (first detected in exit zone)
self.total_exited += 1
self.face_cooldowns[face_id] = current_time
else:
# Face has been seen before - check for valid transition
last_zone = self.last_zone[face_id]
# Only count if we have a clear zone transition
# Entry: person transitions to entry zone from non-entry zone
# Exit: person transitions to exit zone from non-exit zone
if zone == 'entry' and last_zone != 'entry':
# Person entered (transitioned to entry zone)
self.total_entered += 1
self.face_cooldowns[face_id] = current_time
self.last_zone[face_id] = zone
elif zone == 'exit' and last_zone != 'exit':
# Person exited (transitioned to exit zone)
self.total_exited += 1
self.face_cooldowns[face_id] = current_time
self.last_zone[face_id] = zone
else:
# Same zone or transition we don't care about - just update
self.last_zone[face_id] = zone
# Clean up old tracking data for faces no longer detected
faces_to_remove = []
for face_id in list(self.last_zone.keys()):
if face_id not in current_zones:
# Face no longer detected, but keep in memory for a bit
if face_id in self.tracked_faces:
last_seen = self.tracked_faces[face_id].get('timestamp', 0)
if current_time - last_seen > 5.0: # Remove after 5 seconds
faces_to_remove.append(face_id)
for face_id in faces_to_remove:
if face_id in self.last_zone:
del self.last_zone[face_id]
if face_id in self.tracked_faces:
del self.tracked_faces[face_id]
if face_id in self.face_cooldowns:
del self.face_cooldowns[face_id]
return {
'total_entered': self.total_entered,
'total_exited': self.total_exited,
'current_occupancy': self.total_entered - self.total_exited,
'zones': current_zones
}
def get_counts(self):
"""Get current count statistics."""
return {
'total_entered': self.total_entered,
'total_exited': self.total_exited,
'current_occupancy': self.total_entered - self.total_exited
}
def reset_counts(self):
"""Reset all counters and tracking data."""
self.total_entered = 0
self.total_exited = 0
self.tracked_faces.clear()
self.face_cooldowns.clear()
self.last_zone.clear()
def draw_zones(self, frame):
"""
Draw zone boundaries on the frame for visualization.
Args:
frame: Frame to draw on
Returns:
Frame with zone boundaries drawn
"""
result_frame = frame.copy()
h = frame.shape[0]
# Draw entry zone (left, green)
cv2.rectangle(result_frame, (0, 0), (self.entry_zone_end, h), (0, 255, 0), 2)
cv2.putText(result_frame, "ENTRY", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# Draw exit zone (right, red)
cv2.rectangle(result_frame, (self.exit_zone_start, 0), (self.frame_width, h), (0, 0, 255), 2)
cv2.putText(result_frame, "EXIT", (self.exit_zone_start + 10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# Draw center buffer (yellow, semi-transparent)
overlay = result_frame.copy()
cv2.rectangle(overlay, (self.center_start, 0), (self.center_end, h), (0, 255, 255), -1)
cv2.addWeighted(overlay, 0.2, result_frame, 0.8, 0, result_frame)
return result_frame