2 Commits

Author SHA1 Message Date
b68fa2614e feat: migrate from face detection to HOG person detection 2026-01-21 11:39:32 +01:00
cae56c40cc feat: improve face tracking and matching logic in ZoneTracker
- Introduce unique face ID generation and enhance face matching based on proximity and size
- Refactor face ID generation to use centroids and size for better accuracy
- Update tracked face data structure to include centroid, zone, timestamp, and size
- Improve comments for clarity on face tracking and matching processes
2026-01-20 00:54:57 +01:00
7 changed files with 219 additions and 2003 deletions

View File

@@ -6,7 +6,7 @@ Integrates face detection and zone tracking.
import cv2 import cv2
import threading import threading
import time import time
from face_detector import FaceDetector from person_detector import PersonDetector
from zone_tracker import ZoneTracker from zone_tracker import ZoneTracker
@@ -18,8 +18,8 @@ class Camera:
Args: Args:
camera_index: Index of the USB camera (usually 0) camera_index: Index of the USB camera (usually 0)
process_every_n_frames: Process face detection every N frames for performance process_every_n_frames: Process detection every N frames for performance
face_confidence: Confidence threshold for face detection face_confidence: Confidence threshold for person detection
frame_width: Desired frame width frame_width: Desired frame width
frame_height: Desired frame height frame_height: Desired frame height
""" """
@@ -37,8 +37,8 @@ class Camera:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)
# Initialize face detector and zone tracker # Initialize person detector and zone tracker
self.face_detector = FaceDetector(confidence_threshold=face_confidence) self.person_detector = PersonDetector(confidence_threshold=face_confidence)
self.zone_tracker = None # Will be initialized after first frame self.zone_tracker = None # Will be initialized after first frame
# Frame processing state # Frame processing state
@@ -95,7 +95,7 @@ class Camera:
with self.lock: with self.lock:
self.current_frame = frame.copy() self.current_frame = frame.copy()
# Process face detection every N frames # Process detection every N frames
if self.frame_counter % self.process_every_n_frames == 0: if self.frame_counter % self.process_every_n_frames == 0:
processed_frame, counts = self._process_frame(frame) processed_frame, counts = self._process_frame(frame)
with self.lock: with self.lock:
@@ -104,7 +104,7 @@ class Camera:
def _process_frame(self, frame): def _process_frame(self, frame):
""" """
Process a single frame: detect faces, track zones, update counts. Process a single frame: detect people, track zones, update counts.
Args: Args:
frame: Input frame from camera frame: Input frame from camera
@@ -112,12 +112,12 @@ class Camera:
Returns: Returns:
Tuple of (processed_frame, counts_dict) Tuple of (processed_frame, counts_dict)
""" """
# Detect faces # Detect people
faces = self.face_detector.detect_faces(frame) people = self.person_detector.detect_people(frame)
# Track zones and update counts # Track zones and update counts
if self.zone_tracker: if self.zone_tracker:
counts = self.zone_tracker.process_faces(faces) counts = self.zone_tracker.process_faces(people)
else: else:
counts = { counts = {
'total_entered': 0, 'total_entered': 0,
@@ -131,8 +131,8 @@ class Camera:
else: else:
processed_frame = frame.copy() processed_frame = frame.copy()
# Draw faces on frame # Draw people on frame
processed_frame = self.face_detector.draw_faces(processed_frame, faces) processed_frame = self.person_detector.draw_people(processed_frame, people)
# Draw count information on frame # Draw count information on frame
text_y = 60 text_y = 60

View File

@@ -1,77 +1,27 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Script to download OpenCV DNN face detection model files. Cleanup script for People Counter.
Downloads the required prototxt and caffemodel files for face detection. Removes unused model files since we switched to HOG detector.
""" """
import os import shutil
import urllib.request
from pathlib import Path from pathlib import Path
def download_file(url, destination):
"""Download a file from URL to destination."""
print(f"Downloading {os.path.basename(destination)}...")
try:
urllib.request.urlretrieve(url, destination)
print(f"✓ Successfully downloaded {os.path.basename(destination)}")
return True
except Exception as e:
print(f"✗ Error downloading {os.path.basename(destination)}: {e}")
return False
def main(): def main():
"""Main function to download model files.""" print("Cleaning up unused model files...")
# Create models directory if it doesn't exist
models_dir = Path("models") models_dir = Path("models")
models_dir.mkdir(exist_ok=True) if models_dir.exists():
try:
# Model file URLs shutil.rmtree(models_dir)
prototxt_url = "https://raw.githubusercontent.com/opencv/opencv/master/samples/dnn/face_detector/deploy.prototxt" print("✓ Removed models directory")
model_url = "https://raw.githubusercontent.com/opencv/opencv_3rdparty/dnn_samples_face_detector_20170830/res10_300x300_ssd_iter_140000.caffemodel" except Exception as e:
print(f"✗ Failed to remove models directory: {e}")
# Destination paths
prototxt_path = models_dir / "deploy.prototxt"
model_path = models_dir / "res10_300x300_ssd_iter_140000.caffemodel"
print("=" * 60)
print("OpenCV DNN Face Detection Model Downloader")
print("=" * 60)
print()
# Check if files already exist
if prototxt_path.exists():
print(f"{prototxt_path.name} already exists. Skipping download.")
else: else:
success = download_file(prototxt_url, prototxt_path) print("✓ No models directory to remove")
if not success:
print("\nAlternative: You can manually download deploy.prototxt from:") print("\nSystem ready for HOG-based person detection.")
print("https://github.com/opencv/opencv/blob/master/samples/dnn/face_detector/deploy.prototxt")
print()
if model_path.exists():
print(f"{model_path.name} already exists. Skipping download.")
else:
success = download_file(model_url, model_path)
if not success:
print("\n⚠ Warning: The caffemodel file is large (~10MB) and may require manual download.")
print("Alternative download methods:")
print("1. Using wget:")
print(f" wget -O {model_path} {model_url}")
print("2. Using curl:")
print(f" curl -L -o {model_path} {model_url}")
print("3. Direct browser download:")
print(f" {model_url}")
print()
print()
print("=" * 60)
if prototxt_path.exists() and model_path.exists():
print("✓ All model files are ready!")
else:
print("⚠ Some files may be missing. Please check the files above.")
print("=" * 60)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,118 +0,0 @@
"""
Face Detection Module using OpenCV DNN Face Detector
Uses pre-trained models for accurate face detection.
"""
import cv2
import numpy as np
import os
class FaceDetector:
def __init__(self, model_dir="models", confidence_threshold=0.5):
"""
Initialize the face detector with OpenCV DNN models.
Args:
model_dir: Directory containing the model files
confidence_threshold: Minimum confidence for face detection (0.0-1.0)
"""
self.confidence_threshold = confidence_threshold
self.model_dir = model_dir
# Paths to model files
self.prototxt_path = os.path.join(model_dir, "deploy.prototxt")
self.model_path = os.path.join(model_dir, "res10_300x300_ssd_iter_140000.caffemodel")
# Load the DNN face detector
self.net = None
self._load_model()
def _load_model(self):
"""Load the OpenCV DNN face detection model."""
if not os.path.exists(self.prototxt_path):
raise FileNotFoundError(
f"Model prototxt file not found: {self.prototxt_path}\n"
"Please download the model files first."
)
if not os.path.exists(self.model_path):
raise FileNotFoundError(
f"Model weights file not found: {self.model_path}\n"
"Please download the model files first."
)
self.net = cv2.dnn.readNetFromCaffe(self.prototxt_path, self.model_path)
def detect_faces(self, frame):
"""
Detect faces in a frame.
Args:
frame: BGR image frame from OpenCV
Returns:
List of tuples (x, y, w, h, confidence) for each detected face
where (x, y) is top-left corner, w and h are width and height
"""
if self.net is None:
return []
# Get frame dimensions
(h, w) = frame.shape[:2]
# Create blob from frame (preprocessing for DNN)
blob = cv2.dnn.blobFromImage(
cv2.resize(frame, (300, 300)),
1.0,
(300, 300),
(104.0, 177.0, 123.0)
)
# Pass blob through network
self.net.setInput(blob)
detections = self.net.forward()
faces = []
# Process detections
for i in range(0, detections.shape[2]):
confidence = detections[0, 0, i, 2]
# Filter weak detections
if confidence > self.confidence_threshold:
# Get bounding box coordinates
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(x1, y1, x2, y2) = box.astype("int")
# Ensure coordinates are within frame bounds
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(w, x2)
y2 = min(h, y2)
# Convert to (x, y, w, h) format
faces.append((x1, y1, x2 - x1, y2 - y1, confidence))
return faces
def draw_faces(self, frame, faces, color=(0, 255, 0), thickness=2):
"""
Draw bounding boxes around detected faces.
Args:
frame: Frame to draw on
faces: List of face detections from detect_faces()
color: BGR color tuple for bounding boxes
thickness: Line thickness
Returns:
Frame with bounding boxes drawn
"""
result_frame = frame.copy()
for (x, y, w, h, confidence) in faces:
cv2.rectangle(result_frame, (x, y), (x + w, y + h), color, thickness)
# Optionally draw confidence score
label = f"{confidence:.2f}"
cv2.putText(result_frame, label, (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return result_frame

File diff suppressed because it is too large Load Diff

129
person_detector.py Normal file
View File

@@ -0,0 +1,129 @@
"""
Person Detection Module using OpenCV HOG (Histogram of Oriented Gradients).
Uses built-in OpenCV people detector - no external model files required.
"""
import cv2
import numpy as np
class PersonDetector:
def __init__(self, model_dir=None, confidence_threshold=0.6):
"""
Initialize the person detector with HOG descriptor.
Args:
model_dir: Ignored for HOG (kept for API compatibility)
confidence_threshold: Threshold for detection weights
"""
self.confidence_threshold = confidence_threshold
# Initialize HOG descriptor/person detector
self.hog = cv2.HOGDescriptor()
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
print("Initialized HOG Person Detector")
def detect_people(self, frame):
"""
Detect people in a frame using HOG.
Args:
frame: BGR image frame from OpenCV
Returns:
List of tuples (x, y, w, h, confidence) for each detected person
"""
# Resize for faster processing (optional, but HOG is computationally expensive)
# Using a slightly smaller scale can speed things up significantly
scale = 1.0
if frame.shape[1] > 640:
scale = 640 / frame.shape[1]
frame_small = cv2.resize(frame, None, fx=scale, fy=scale)
else:
frame_small = frame
# Detect people
# winStride: step size in x and y
# padding: padding around the input
# scale: coefficient of the detection window increase
(rects, weights) = self.hog.detectMultiScale(
frame_small,
winStride=(4, 4),
padding=(8, 8),
scale=1.05,
hitThreshold=0.0 # Default
)
people = []
# Convert detected rectangles to our format
for i, (x, y, w, h) in enumerate(rects):
confidence = weights[i]
# HOG returns confidence scores, usually > 0.
# We can filter if needed.
check_conf = float(confidence) if isinstance(confidence, (float, np.float32, np.float64)) else float(confidence[0])
if check_conf > self.confidence_threshold:
# Scale back up if we resized
if scale != 1.0:
x = int(x / scale)
y = int(y / scale)
w = int(w / scale)
h = int(h / scale)
# Size filtering
# Ignore detections that are too small (noise) or too large (walls/windows)
# Assumes 640x480 or similar resolution
if w < 40 or w > 400 or h < 80 or h > 480:
continue
# Ensure coordinates are within frame bounds (simple clamp)
x = max(0, x)
y = max(0, y)
people.append((x, y, w, h, check_conf))
return people
detect_faces = detect_people # Alias for compatibility
def draw_people(self, frame, people, color=(0, 255, 0), thickness=2):
"""
Draw bounding boxes around detected people.
"""
result_frame = frame.copy()
for (x, y, w, h, confidence) in people:
cv2.rectangle(result_frame, (x, y), (x + w, y + h), color, thickness)
# Draw label
label = f"Person: {confidence:.2f}"
# Get label size
(label_w, label_h), baseline = cv2.getTextSize(
label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1
)
# Draw background rectangle for label
cv2.rectangle(
result_frame,
(x, y - label_h - 10),
(x + label_w, y),
color,
-1
)
# Draw text
cv2.putText(
result_frame,
label,
(x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1
)
return result_frame
draw_faces = draw_people # Alias for compatibility

View File

@@ -10,7 +10,7 @@ from collections import defaultdict
class ZoneTracker: class ZoneTracker:
def __init__(self, frame_width, entry_zone_percent=0.4, exit_zone_percent=0.4, def __init__(self, frame_width, entry_zone_percent=0.4, exit_zone_percent=0.4,
cooldown_seconds=2.0, center_buffer_percent=0.1): cooldown_seconds=4.0, center_buffer_percent=0.1):
""" """
Initialize the zone tracker. Initialize the zone tracker.
@@ -39,12 +39,15 @@ class ZoneTracker:
self.total_exited = 0 self.total_exited = 0
# Track faces with timestamps to prevent double-counting # Track faces with timestamps to prevent double-counting
# Key: face_id (centroid hash), Value: (zone, timestamp) # Key: face_id (unique ID), Value: {'centroid': (x, y), 'zone': zone, 'timestamp': time, 'size': (w, h)}
self.tracked_faces = {} self.tracked_faces = {}
self.face_cooldowns = defaultdict(float) self.face_cooldowns = defaultdict(float)
# Track last seen zone for each face (to detect zone transitions) # Track last seen zone for each face (to detect zone transitions)
self.last_zone = {} self.last_zone = {}
# Unique face ID counter
self.next_face_id = 1
def get_zone(self, face_x, face_w): def get_zone(self, face_x, face_w):
""" """
@@ -75,24 +78,46 @@ class ZoneTracker:
# In the middle zone (between entry/exit and center buffer) # In the middle zone (between entry/exit and center buffer)
return None return None
def _get_face_id(self, face_x, face_y, face_w, face_h): def _calculate_centroid(self, face_x, face_y, face_w, face_h):
"""Calculate the centroid of a face bounding box."""
return (face_x + face_w // 2, face_y + face_h // 2)
def _calculate_distance(self, pt1, pt2):
"""Calculate Euclidean distance between two points."""
return ((pt1[0] - pt2[0])**2 + (pt1[1] - pt2[1])**2)**0.5
def _match_face_to_tracked(self, centroid, size):
""" """
Generate a simple ID for a face based on its position and size. Match a detected face to an existing tracked face based on proximity.
This is a basic approach - in production, use proper tracking algorithms.
Args: Args:
face_x, face_y: Top-left coordinates centroid: (x, y) centroid of the detected face
face_w, face_h: Width and height size: (w, h) size of the detected face
Returns: Returns:
A simple hash-like ID for tracking face_id if matched, None if new face
""" """
# Use approximate position and size to create a simple ID max_distance = 150 # Maximum pixel distance to consider it the same face
# This helps group similar detections as the same person max_size_diff = 100 # Maximum size difference to consider it the same face
grid_x = face_x // 50
grid_y = face_y // 50 for face_id, face_data in self.tracked_faces.items():
size_category = (face_w + face_h) // 50 # Skip if face hasn't been seen recently (within last 2 seconds)
return f"{grid_x}_{grid_y}_{size_category}" time_since_seen = time.time() - face_data.get('timestamp', 0)
if time_since_seen > 2.0:
continue
tracked_centroid = face_data.get('centroid')
tracked_size = face_data.get('size', (0, 0))
if tracked_centroid:
distance = self._calculate_distance(centroid, tracked_centroid)
size_diff = abs(size[0] + size[1] - tracked_size[0] - tracked_size[1])
# Match if close enough in position and size
if distance < max_distance and size_diff < max_size_diff:
return face_id
return None
def process_faces(self, faces): def process_faces(self, faces):
""" """
@@ -110,24 +135,41 @@ class ZoneTracker:
# Process each detected face # Process each detected face
for face in faces: for face in faces:
face_x, face_y, face_w, face_h, confidence = face face_x, face_y, face_w, face_h, confidence = face
face_id = self._get_face_id(face_x, face_y, face_w, face_h) centroid = self._calculate_centroid(face_x, face_y, face_w, face_h)
zone = self.get_zone(face_x, face_w) zone = self.get_zone(face_x, face_w)
if zone is None or zone == 'center': if zone is None or zone == 'center':
continue continue
# Try to match this face to an existing tracked face
face_id = self._match_face_to_tracked(centroid, (face_w, face_h))
if face_id is None:
# New face - assign a new ID
face_id = self.next_face_id
self.next_face_id += 1
current_zones[face_id] = zone current_zones[face_id] = zone
# Update tracked face data
self.tracked_faces[face_id] = {
'centroid': centroid,
'zone': zone,
'timestamp': current_time,
'size': (face_w, face_h)
}
# Check if this face is in cooldown # Check if this face is in cooldown
if face_id in self.face_cooldowns: if face_id in self.face_cooldowns:
if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds: if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds:
continue # Still in cooldown, skip # Still in cooldown, update zone but don't count
self.last_zone[face_id] = zone
continue
# Check for zone transitions or first detection # Check for zone transitions or first detection
if face_id not in self.last_zone: if face_id not in self.last_zone:
# First time seeing this face - count if in entry/exit zone # First time seeing this face - count if in entry/exit zone
self.last_zone[face_id] = zone self.last_zone[face_id] = zone
self.tracked_faces[face_id] = (zone, current_time)
# Count on first detection in entry/exit zones # Count on first detection in entry/exit zones
if zone == 'entry': if zone == 'entry':
@@ -155,14 +197,17 @@ class ZoneTracker:
self.total_exited += 1 self.total_exited += 1
self.face_cooldowns[face_id] = current_time self.face_cooldowns[face_id] = current_time
self.last_zone[face_id] = zone self.last_zone[face_id] = zone
else:
# Same zone or transition we don't care about - just update
self.last_zone[face_id] = zone
# Clean up old tracking data for faces no longer detected # Clean up old tracking data for faces no longer detected
faces_to_remove = [] faces_to_remove = []
for face_id in self.last_zone: for face_id in list(self.last_zone.keys()):
if face_id not in current_zones: if face_id not in current_zones:
# Face no longer detected, but keep in memory for a bit # Face no longer detected, but keep in memory for a bit
if face_id in self.tracked_faces: if face_id in self.tracked_faces:
last_seen = self.tracked_faces[face_id][1] last_seen = self.tracked_faces[face_id].get('timestamp', 0)
if current_time - last_seen > 5.0: # Remove after 5 seconds if current_time - last_seen > 5.0: # Remove after 5 seconds
faces_to_remove.append(face_id) faces_to_remove.append(face_id)