feat: initial implementation of People Counter web app
- Add Flask application with MJPEG video streaming - Implement OpenCV DNN face detection module - Add zone-based entry/exit tracking with cooldown mechanism - Create web interface with real-time WebSocket updates - Add model download script and comprehensive README - Include OpenCV DNN model files for face detection
This commit is contained in:
218
zone_tracker.py
Normal file
218
zone_tracker.py
Normal file
@@ -0,0 +1,218 @@
|
||||
"""
|
||||
Zone-based Entry/Exit Tracker
|
||||
Tracks people entering and exiting based on zone detection with cooldown mechanism.
|
||||
"""
|
||||
|
||||
import time
|
||||
import cv2
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
class ZoneTracker:
|
||||
def __init__(self, frame_width, entry_zone_percent=0.4, exit_zone_percent=0.4,
|
||||
cooldown_seconds=2.0, center_buffer_percent=0.1):
|
||||
"""
|
||||
Initialize the zone tracker.
|
||||
|
||||
Args:
|
||||
frame_width: Width of the video frame in pixels
|
||||
entry_zone_percent: Percentage of frame width for entry zone (left side)
|
||||
exit_zone_percent: Percentage of frame width for exit zone (right side)
|
||||
cooldown_seconds: Time in seconds before same person can be counted again
|
||||
center_buffer_percent: Percentage of center to ignore (prevents false counts)
|
||||
"""
|
||||
self.frame_width = frame_width
|
||||
self.entry_zone_percent = entry_zone_percent
|
||||
self.exit_zone_percent = exit_zone_percent
|
||||
self.cooldown_seconds = cooldown_seconds
|
||||
self.center_buffer_percent = center_buffer_percent
|
||||
|
||||
# Calculate zone boundaries
|
||||
self.entry_zone_end = int(frame_width * entry_zone_percent)
|
||||
buffer_width = int(frame_width * center_buffer_percent)
|
||||
self.center_start = int(frame_width / 2 - buffer_width / 2)
|
||||
self.center_end = int(frame_width / 2 + buffer_width / 2)
|
||||
self.exit_zone_start = int(frame_width * (1 - exit_zone_percent))
|
||||
|
||||
# Counters
|
||||
self.total_entered = 0
|
||||
self.total_exited = 0
|
||||
|
||||
# Track faces with timestamps to prevent double-counting
|
||||
# Key: face_id (centroid hash), Value: (zone, timestamp)
|
||||
self.tracked_faces = {}
|
||||
self.face_cooldowns = defaultdict(float)
|
||||
|
||||
# Track last seen zone for each face (to detect zone transitions)
|
||||
self.last_zone = {}
|
||||
|
||||
def get_zone(self, face_x, face_w):
|
||||
"""
|
||||
Determine which zone a face is in based on its position.
|
||||
|
||||
Args:
|
||||
face_x: X coordinate of face (left edge)
|
||||
face_w: Width of face bounding box
|
||||
face_center: Center X of the face
|
||||
|
||||
Returns:
|
||||
'entry' if in entry zone, 'exit' if in exit zone, 'center' if in buffer, None otherwise
|
||||
"""
|
||||
face_center = face_x + face_w // 2
|
||||
|
||||
# Check if in center buffer zone (ignore)
|
||||
if self.center_start <= face_center <= self.center_end:
|
||||
return 'center'
|
||||
|
||||
# Check entry zone (left side)
|
||||
if face_center < self.entry_zone_end:
|
||||
return 'entry'
|
||||
|
||||
# Check exit zone (right side)
|
||||
if face_center > self.exit_zone_start:
|
||||
return 'exit'
|
||||
|
||||
# In the middle zone (between entry/exit and center buffer)
|
||||
return None
|
||||
|
||||
def _get_face_id(self, face_x, face_y, face_w, face_h):
|
||||
"""
|
||||
Generate a simple ID for a face based on its position and size.
|
||||
This is a basic approach - in production, use proper tracking algorithms.
|
||||
|
||||
Args:
|
||||
face_x, face_y: Top-left coordinates
|
||||
face_w, face_h: Width and height
|
||||
|
||||
Returns:
|
||||
A simple hash-like ID for tracking
|
||||
"""
|
||||
# Use approximate position and size to create a simple ID
|
||||
# This helps group similar detections as the same person
|
||||
grid_x = face_x // 50
|
||||
grid_y = face_y // 50
|
||||
size_category = (face_w + face_h) // 50
|
||||
return f"{grid_x}_{grid_y}_{size_category}"
|
||||
|
||||
def process_faces(self, faces):
|
||||
"""
|
||||
Process detected faces and update entry/exit counts.
|
||||
|
||||
Args:
|
||||
faces: List of tuples (x, y, w, h, confidence) from face detector
|
||||
|
||||
Returns:
|
||||
Dictionary with updated counts and zone info
|
||||
"""
|
||||
current_time = time.time()
|
||||
current_zones = {}
|
||||
|
||||
# Process each detected face
|
||||
for face in faces:
|
||||
face_x, face_y, face_w, face_h, confidence = face
|
||||
face_id = self._get_face_id(face_x, face_y, face_w, face_h)
|
||||
zone = self.get_zone(face_x, face_w)
|
||||
|
||||
if zone is None or zone == 'center':
|
||||
continue
|
||||
|
||||
current_zones[face_id] = zone
|
||||
|
||||
# Check if this face is in cooldown
|
||||
if face_id in self.face_cooldowns:
|
||||
if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds:
|
||||
continue # Still in cooldown, skip
|
||||
|
||||
# Check for zone transitions or first detection
|
||||
if face_id not in self.last_zone:
|
||||
# First time seeing this face - mark the zone
|
||||
self.last_zone[face_id] = zone
|
||||
self.tracked_faces[face_id] = (zone, current_time)
|
||||
else:
|
||||
# Face has been seen before - check for valid transition
|
||||
last_zone = self.last_zone[face_id]
|
||||
|
||||
# Only count if we have a clear zone assignment
|
||||
# Entry: person appears in entry zone
|
||||
# Exit: person appears in exit zone
|
||||
if zone == 'entry' and last_zone != 'entry':
|
||||
# Person entered
|
||||
self.total_entered += 1
|
||||
self.face_cooldowns[face_id] = current_time
|
||||
self.last_zone[face_id] = zone
|
||||
elif zone == 'exit' and last_zone != 'exit':
|
||||
# Person exited
|
||||
self.total_exited += 1
|
||||
self.face_cooldowns[face_id] = current_time
|
||||
self.last_zone[face_id] = zone
|
||||
|
||||
# Clean up old tracking data for faces no longer detected
|
||||
faces_to_remove = []
|
||||
for face_id in self.last_zone:
|
||||
if face_id not in current_zones:
|
||||
# Face no longer detected, but keep in memory for a bit
|
||||
if face_id in self.tracked_faces:
|
||||
last_seen = self.tracked_faces[face_id][1]
|
||||
if current_time - last_seen > 5.0: # Remove after 5 seconds
|
||||
faces_to_remove.append(face_id)
|
||||
|
||||
for face_id in faces_to_remove:
|
||||
if face_id in self.last_zone:
|
||||
del self.last_zone[face_id]
|
||||
if face_id in self.tracked_faces:
|
||||
del self.tracked_faces[face_id]
|
||||
if face_id in self.face_cooldowns:
|
||||
del self.face_cooldowns[face_id]
|
||||
|
||||
return {
|
||||
'total_entered': self.total_entered,
|
||||
'total_exited': self.total_exited,
|
||||
'current_occupancy': self.total_entered - self.total_exited,
|
||||
'zones': current_zones
|
||||
}
|
||||
|
||||
def get_counts(self):
|
||||
"""Get current count statistics."""
|
||||
return {
|
||||
'total_entered': self.total_entered,
|
||||
'total_exited': self.total_exited,
|
||||
'current_occupancy': self.total_entered - self.total_exited
|
||||
}
|
||||
|
||||
def reset_counts(self):
|
||||
"""Reset all counters and tracking data."""
|
||||
self.total_entered = 0
|
||||
self.total_exited = 0
|
||||
self.tracked_faces.clear()
|
||||
self.face_cooldowns.clear()
|
||||
self.last_zone.clear()
|
||||
|
||||
def draw_zones(self, frame):
|
||||
"""
|
||||
Draw zone boundaries on the frame for visualization.
|
||||
|
||||
Args:
|
||||
frame: Frame to draw on
|
||||
|
||||
Returns:
|
||||
Frame with zone boundaries drawn
|
||||
"""
|
||||
result_frame = frame.copy()
|
||||
h = frame.shape[0]
|
||||
|
||||
# Draw entry zone (left, green)
|
||||
cv2.rectangle(result_frame, (0, 0), (self.entry_zone_end, h), (0, 255, 0), 2)
|
||||
cv2.putText(result_frame, "ENTRY", (10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
||||
|
||||
# Draw exit zone (right, red)
|
||||
cv2.rectangle(result_frame, (self.exit_zone_start, 0), (self.frame_width, h), (0, 0, 255), 2)
|
||||
cv2.putText(result_frame, "EXIT", (self.exit_zone_start + 10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
|
||||
|
||||
# Draw center buffer (yellow, semi-transparent)
|
||||
overlay = result_frame.copy()
|
||||
cv2.rectangle(overlay, (self.center_start, 0), (self.center_end, h), (0, 255, 255), -1)
|
||||
cv2.addWeighted(overlay, 0.2, result_frame, 0.8, 0, result_frame)
|
||||
|
||||
return result_frame
|
||||
Reference in New Issue
Block a user