feat: improve face tracking and matching logic in ZoneTracker
- Introduce unique face ID generation and enhance face matching based on proximity and size - Refactor face ID generation to use centroids and size for better accuracy - Update tracked face data structure to include centroid, zone, timestamp, and size - Improve comments for clarity on face tracking and matching processes
This commit is contained in:
@@ -39,13 +39,16 @@ class ZoneTracker:
|
|||||||
self.total_exited = 0
|
self.total_exited = 0
|
||||||
|
|
||||||
# Track faces with timestamps to prevent double-counting
|
# Track faces with timestamps to prevent double-counting
|
||||||
# Key: face_id (centroid hash), Value: (zone, timestamp)
|
# Key: face_id (unique ID), Value: {'centroid': (x, y), 'zone': zone, 'timestamp': time, 'size': (w, h)}
|
||||||
self.tracked_faces = {}
|
self.tracked_faces = {}
|
||||||
self.face_cooldowns = defaultdict(float)
|
self.face_cooldowns = defaultdict(float)
|
||||||
|
|
||||||
# Track last seen zone for each face (to detect zone transitions)
|
# Track last seen zone for each face (to detect zone transitions)
|
||||||
self.last_zone = {}
|
self.last_zone = {}
|
||||||
|
|
||||||
|
# Unique face ID counter
|
||||||
|
self.next_face_id = 1
|
||||||
|
|
||||||
def get_zone(self, face_x, face_w):
|
def get_zone(self, face_x, face_w):
|
||||||
"""
|
"""
|
||||||
Determine which zone a face is in based on its position.
|
Determine which zone a face is in based on its position.
|
||||||
@@ -75,24 +78,46 @@ class ZoneTracker:
|
|||||||
# In the middle zone (between entry/exit and center buffer)
|
# In the middle zone (between entry/exit and center buffer)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _get_face_id(self, face_x, face_y, face_w, face_h):
|
def _calculate_centroid(self, face_x, face_y, face_w, face_h):
|
||||||
|
"""Calculate the centroid of a face bounding box."""
|
||||||
|
return (face_x + face_w // 2, face_y + face_h // 2)
|
||||||
|
|
||||||
|
def _calculate_distance(self, pt1, pt2):
|
||||||
|
"""Calculate Euclidean distance between two points."""
|
||||||
|
return ((pt1[0] - pt2[0])**2 + (pt1[1] - pt2[1])**2)**0.5
|
||||||
|
|
||||||
|
def _match_face_to_tracked(self, centroid, size):
|
||||||
"""
|
"""
|
||||||
Generate a simple ID for a face based on its position and size.
|
Match a detected face to an existing tracked face based on proximity.
|
||||||
This is a basic approach - in production, use proper tracking algorithms.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
face_x, face_y: Top-left coordinates
|
centroid: (x, y) centroid of the detected face
|
||||||
face_w, face_h: Width and height
|
size: (w, h) size of the detected face
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A simple hash-like ID for tracking
|
face_id if matched, None if new face
|
||||||
"""
|
"""
|
||||||
# Use approximate position and size to create a simple ID
|
max_distance = 100 # Maximum pixel distance to consider it the same face
|
||||||
# This helps group similar detections as the same person
|
max_size_diff = 50 # Maximum size difference to consider it the same face
|
||||||
grid_x = face_x // 50
|
|
||||||
grid_y = face_y // 50
|
for face_id, face_data in self.tracked_faces.items():
|
||||||
size_category = (face_w + face_h) // 50
|
# Skip if face hasn't been seen recently (within last 2 seconds)
|
||||||
return f"{grid_x}_{grid_y}_{size_category}"
|
time_since_seen = time.time() - face_data.get('timestamp', 0)
|
||||||
|
if time_since_seen > 2.0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
tracked_centroid = face_data.get('centroid')
|
||||||
|
tracked_size = face_data.get('size', (0, 0))
|
||||||
|
|
||||||
|
if tracked_centroid:
|
||||||
|
distance = self._calculate_distance(centroid, tracked_centroid)
|
||||||
|
size_diff = abs(size[0] + size[1] - tracked_size[0] - tracked_size[1])
|
||||||
|
|
||||||
|
# Match if close enough in position and size
|
||||||
|
if distance < max_distance and size_diff < max_size_diff:
|
||||||
|
return face_id
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
def process_faces(self, faces):
|
def process_faces(self, faces):
|
||||||
"""
|
"""
|
||||||
@@ -110,24 +135,41 @@ class ZoneTracker:
|
|||||||
# Process each detected face
|
# Process each detected face
|
||||||
for face in faces:
|
for face in faces:
|
||||||
face_x, face_y, face_w, face_h, confidence = face
|
face_x, face_y, face_w, face_h, confidence = face
|
||||||
face_id = self._get_face_id(face_x, face_y, face_w, face_h)
|
centroid = self._calculate_centroid(face_x, face_y, face_w, face_h)
|
||||||
zone = self.get_zone(face_x, face_w)
|
zone = self.get_zone(face_x, face_w)
|
||||||
|
|
||||||
if zone is None or zone == 'center':
|
if zone is None or zone == 'center':
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Try to match this face to an existing tracked face
|
||||||
|
face_id = self._match_face_to_tracked(centroid, (face_w, face_h))
|
||||||
|
|
||||||
|
if face_id is None:
|
||||||
|
# New face - assign a new ID
|
||||||
|
face_id = self.next_face_id
|
||||||
|
self.next_face_id += 1
|
||||||
|
|
||||||
current_zones[face_id] = zone
|
current_zones[face_id] = zone
|
||||||
|
|
||||||
|
# Update tracked face data
|
||||||
|
self.tracked_faces[face_id] = {
|
||||||
|
'centroid': centroid,
|
||||||
|
'zone': zone,
|
||||||
|
'timestamp': current_time,
|
||||||
|
'size': (face_w, face_h)
|
||||||
|
}
|
||||||
|
|
||||||
# Check if this face is in cooldown
|
# Check if this face is in cooldown
|
||||||
if face_id in self.face_cooldowns:
|
if face_id in self.face_cooldowns:
|
||||||
if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds:
|
if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds:
|
||||||
continue # Still in cooldown, skip
|
# Still in cooldown, update zone but don't count
|
||||||
|
self.last_zone[face_id] = zone
|
||||||
|
continue
|
||||||
|
|
||||||
# Check for zone transitions or first detection
|
# Check for zone transitions or first detection
|
||||||
if face_id not in self.last_zone:
|
if face_id not in self.last_zone:
|
||||||
# First time seeing this face - count if in entry/exit zone
|
# First time seeing this face - count if in entry/exit zone
|
||||||
self.last_zone[face_id] = zone
|
self.last_zone[face_id] = zone
|
||||||
self.tracked_faces[face_id] = (zone, current_time)
|
|
||||||
|
|
||||||
# Count on first detection in entry/exit zones
|
# Count on first detection in entry/exit zones
|
||||||
if zone == 'entry':
|
if zone == 'entry':
|
||||||
@@ -155,14 +197,17 @@ class ZoneTracker:
|
|||||||
self.total_exited += 1
|
self.total_exited += 1
|
||||||
self.face_cooldowns[face_id] = current_time
|
self.face_cooldowns[face_id] = current_time
|
||||||
self.last_zone[face_id] = zone
|
self.last_zone[face_id] = zone
|
||||||
|
else:
|
||||||
|
# Same zone or transition we don't care about - just update
|
||||||
|
self.last_zone[face_id] = zone
|
||||||
|
|
||||||
# Clean up old tracking data for faces no longer detected
|
# Clean up old tracking data for faces no longer detected
|
||||||
faces_to_remove = []
|
faces_to_remove = []
|
||||||
for face_id in self.last_zone:
|
for face_id in list(self.last_zone.keys()):
|
||||||
if face_id not in current_zones:
|
if face_id not in current_zones:
|
||||||
# Face no longer detected, but keep in memory for a bit
|
# Face no longer detected, but keep in memory for a bit
|
||||||
if face_id in self.tracked_faces:
|
if face_id in self.tracked_faces:
|
||||||
last_seen = self.tracked_faces[face_id][1]
|
last_seen = self.tracked_faces[face_id].get('timestamp', 0)
|
||||||
if current_time - last_seen > 5.0: # Remove after 5 seconds
|
if current_time - last_seen > 5.0: # Remove after 5 seconds
|
||||||
faces_to_remove.append(face_id)
|
faces_to_remove.append(face_id)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user