Compare commits

2 Commits

Author SHA1 Message Date
b68fa2614e feat: migrate from face detection to HOG person detection 2026-01-21 11:39:32 +01:00
cae56c40cc feat: improve face tracking and matching logic in ZoneTracker
- Introduce unique face ID generation and enhance face matching based on proximity and size
- Refactor face ID generation to use centroids and size for better accuracy
- Update tracked face data structure to include centroid, zone, timestamp, and size
- Improve comments for clarity on face tracking and matching processes
2026-01-20 00:54:57 +01:00
7 changed files with 219 additions and 2003 deletions

View File

@@ -6,7 +6,7 @@ Integrates face detection and zone tracking.
import cv2
import threading
import time
from face_detector import FaceDetector
from person_detector import PersonDetector
from zone_tracker import ZoneTracker
@@ -18,8 +18,8 @@ class Camera:
Args:
camera_index: Index of the USB camera (usually 0)
process_every_n_frames: Process face detection every N frames for performance
face_confidence: Confidence threshold for face detection
process_every_n_frames: Process detection every N frames for performance
face_confidence: Confidence threshold for person detection
frame_width: Desired frame width
frame_height: Desired frame height
"""
@@ -37,8 +37,8 @@ class Camera:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)
# Initialize face detector and zone tracker
self.face_detector = FaceDetector(confidence_threshold=face_confidence)
# Initialize person detector and zone tracker
self.person_detector = PersonDetector(confidence_threshold=face_confidence)
self.zone_tracker = None # Will be initialized after first frame
# Frame processing state
@@ -95,7 +95,7 @@ class Camera:
with self.lock:
self.current_frame = frame.copy()
# Process face detection every N frames
# Process detection every N frames
if self.frame_counter % self.process_every_n_frames == 0:
processed_frame, counts = self._process_frame(frame)
with self.lock:
@@ -104,7 +104,7 @@ class Camera:
def _process_frame(self, frame):
"""
Process a single frame: detect faces, track zones, update counts.
Process a single frame: detect people, track zones, update counts.
Args:
frame: Input frame from camera
@@ -112,12 +112,12 @@ class Camera:
Returns:
Tuple of (processed_frame, counts_dict)
"""
# Detect faces
faces = self.face_detector.detect_faces(frame)
# Detect people
people = self.person_detector.detect_people(frame)
# Track zones and update counts
if self.zone_tracker:
counts = self.zone_tracker.process_faces(faces)
counts = self.zone_tracker.process_faces(people)
else:
counts = {
'total_entered': 0,
@@ -131,8 +131,8 @@ class Camera:
else:
processed_frame = frame.copy()
# Draw faces on frame
processed_frame = self.face_detector.draw_faces(processed_frame, faces)
# Draw people on frame
processed_frame = self.person_detector.draw_people(processed_frame, people)
# Draw count information on frame
text_y = 60

View File

@@ -1,77 +1,27 @@
#!/usr/bin/env python3
"""
Script to download OpenCV DNN face detection model files.
Downloads the required prototxt and caffemodel files for face detection.
Cleanup script for People Counter.
Removes unused model files since we switched to HOG detector.
"""
import os
import urllib.request
import shutil
from pathlib import Path
def download_file(url, destination):
"""Download a file from URL to destination."""
print(f"Downloading {os.path.basename(destination)}...")
try:
urllib.request.urlretrieve(url, destination)
print(f"✓ Successfully downloaded {os.path.basename(destination)}")
return True
except Exception as e:
print(f"✗ Error downloading {os.path.basename(destination)}: {e}")
return False
def main():
"""Main function to download model files."""
# Create models directory if it doesn't exist
print("Cleaning up unused model files...")
models_dir = Path("models")
models_dir.mkdir(exist_ok=True)
# Model file URLs
prototxt_url = "https://raw.githubusercontent.com/opencv/opencv/master/samples/dnn/face_detector/deploy.prototxt"
model_url = "https://raw.githubusercontent.com/opencv/opencv_3rdparty/dnn_samples_face_detector_20170830/res10_300x300_ssd_iter_140000.caffemodel"
# Destination paths
prototxt_path = models_dir / "deploy.prototxt"
model_path = models_dir / "res10_300x300_ssd_iter_140000.caffemodel"
print("=" * 60)
print("OpenCV DNN Face Detection Model Downloader")
print("=" * 60)
print()
# Check if files already exist
if prototxt_path.exists():
print(f"{prototxt_path.name} already exists. Skipping download.")
if models_dir.exists():
try:
shutil.rmtree(models_dir)
print("✓ Removed models directory")
except Exception as e:
print(f"✗ Failed to remove models directory: {e}")
else:
success = download_file(prototxt_url, prototxt_path)
if not success:
print("\nAlternative: You can manually download deploy.prototxt from:")
print("https://github.com/opencv/opencv/blob/master/samples/dnn/face_detector/deploy.prototxt")
print()
print("✓ No models directory to remove")
if model_path.exists():
print(f"{model_path.name} already exists. Skipping download.")
else:
success = download_file(model_url, model_path)
if not success:
print("\n⚠ Warning: The caffemodel file is large (~10MB) and may require manual download.")
print("Alternative download methods:")
print("1. Using wget:")
print(f" wget -O {model_path} {model_url}")
print("2. Using curl:")
print(f" curl -L -o {model_path} {model_url}")
print("3. Direct browser download:")
print(f" {model_url}")
print()
print()
print("=" * 60)
if prototxt_path.exists() and model_path.exists():
print("✓ All model files are ready!")
else:
print("⚠ Some files may be missing. Please check the files above.")
print("=" * 60)
print("\nSystem ready for HOG-based person detection.")
if __name__ == "__main__":

View File

@@ -1,118 +0,0 @@
"""
Face Detection Module using OpenCV DNN Face Detector
Uses pre-trained models for accurate face detection.
"""
import cv2
import numpy as np
import os
class FaceDetector:
def __init__(self, model_dir="models", confidence_threshold=0.5):
"""
Initialize the face detector with OpenCV DNN models.
Args:
model_dir: Directory containing the model files
confidence_threshold: Minimum confidence for face detection (0.0-1.0)
"""
self.confidence_threshold = confidence_threshold
self.model_dir = model_dir
# Paths to model files
self.prototxt_path = os.path.join(model_dir, "deploy.prototxt")
self.model_path = os.path.join(model_dir, "res10_300x300_ssd_iter_140000.caffemodel")
# Load the DNN face detector
self.net = None
self._load_model()
def _load_model(self):
"""Load the OpenCV DNN face detection model."""
if not os.path.exists(self.prototxt_path):
raise FileNotFoundError(
f"Model prototxt file not found: {self.prototxt_path}\n"
"Please download the model files first."
)
if not os.path.exists(self.model_path):
raise FileNotFoundError(
f"Model weights file not found: {self.model_path}\n"
"Please download the model files first."
)
self.net = cv2.dnn.readNetFromCaffe(self.prototxt_path, self.model_path)
def detect_faces(self, frame):
"""
Detect faces in a frame.
Args:
frame: BGR image frame from OpenCV
Returns:
List of tuples (x, y, w, h, confidence) for each detected face
where (x, y) is top-left corner, w and h are width and height
"""
if self.net is None:
return []
# Get frame dimensions
(h, w) = frame.shape[:2]
# Create blob from frame (preprocessing for DNN)
blob = cv2.dnn.blobFromImage(
cv2.resize(frame, (300, 300)),
1.0,
(300, 300),
(104.0, 177.0, 123.0)
)
# Pass blob through network
self.net.setInput(blob)
detections = self.net.forward()
faces = []
# Process detections
for i in range(0, detections.shape[2]):
confidence = detections[0, 0, i, 2]
# Filter weak detections
if confidence > self.confidence_threshold:
# Get bounding box coordinates
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(x1, y1, x2, y2) = box.astype("int")
# Ensure coordinates are within frame bounds
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(w, x2)
y2 = min(h, y2)
# Convert to (x, y, w, h) format
faces.append((x1, y1, x2 - x1, y2 - y1, confidence))
return faces
def draw_faces(self, frame, faces, color=(0, 255, 0), thickness=2):
"""
Draw bounding boxes around detected faces.
Args:
frame: Frame to draw on
faces: List of face detections from detect_faces()
color: BGR color tuple for bounding boxes
thickness: Line thickness
Returns:
Frame with bounding boxes drawn
"""
result_frame = frame.copy()
for (x, y, w, h, confidence) in faces:
cv2.rectangle(result_frame, (x, y), (x + w, y + h), color, thickness)
# Optionally draw confidence score
label = f"{confidence:.2f}"
cv2.putText(result_frame, label, (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return result_frame

File diff suppressed because it is too large Load Diff

129
person_detector.py Normal file
View File

@@ -0,0 +1,129 @@
"""
Person Detection Module using OpenCV HOG (Histogram of Oriented Gradients).
Uses built-in OpenCV people detector - no external model files required.
"""
import cv2
import numpy as np
class PersonDetector:
def __init__(self, model_dir=None, confidence_threshold=0.6):
"""
Initialize the person detector with HOG descriptor.
Args:
model_dir: Ignored for HOG (kept for API compatibility)
confidence_threshold: Threshold for detection weights
"""
self.confidence_threshold = confidence_threshold
# Initialize HOG descriptor/person detector
self.hog = cv2.HOGDescriptor()
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
print("Initialized HOG Person Detector")
def detect_people(self, frame):
"""
Detect people in a frame using HOG.
Args:
frame: BGR image frame from OpenCV
Returns:
List of tuples (x, y, w, h, confidence) for each detected person
"""
# Resize for faster processing (optional, but HOG is computationally expensive)
# Using a slightly smaller scale can speed things up significantly
scale = 1.0
if frame.shape[1] > 640:
scale = 640 / frame.shape[1]
frame_small = cv2.resize(frame, None, fx=scale, fy=scale)
else:
frame_small = frame
# Detect people
# winStride: step size in x and y
# padding: padding around the input
# scale: coefficient of the detection window increase
(rects, weights) = self.hog.detectMultiScale(
frame_small,
winStride=(4, 4),
padding=(8, 8),
scale=1.05,
hitThreshold=0.0 # Default
)
people = []
# Convert detected rectangles to our format
for i, (x, y, w, h) in enumerate(rects):
confidence = weights[i]
# HOG returns confidence scores, usually > 0.
# We can filter if needed.
check_conf = float(confidence) if isinstance(confidence, (float, np.float32, np.float64)) else float(confidence[0])
if check_conf > self.confidence_threshold:
# Scale back up if we resized
if scale != 1.0:
x = int(x / scale)
y = int(y / scale)
w = int(w / scale)
h = int(h / scale)
# Size filtering
# Ignore detections that are too small (noise) or too large (walls/windows)
# Assumes 640x480 or similar resolution
if w < 40 or w > 400 or h < 80 or h > 480:
continue
# Ensure coordinates are within frame bounds (simple clamp)
x = max(0, x)
y = max(0, y)
people.append((x, y, w, h, check_conf))
return people
detect_faces = detect_people # Alias for compatibility
def draw_people(self, frame, people, color=(0, 255, 0), thickness=2):
"""
Draw bounding boxes around detected people.
"""
result_frame = frame.copy()
for (x, y, w, h, confidence) in people:
cv2.rectangle(result_frame, (x, y), (x + w, y + h), color, thickness)
# Draw label
label = f"Person: {confidence:.2f}"
# Get label size
(label_w, label_h), baseline = cv2.getTextSize(
label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1
)
# Draw background rectangle for label
cv2.rectangle(
result_frame,
(x, y - label_h - 10),
(x + label_w, y),
color,
-1
)
# Draw text
cv2.putText(
result_frame,
label,
(x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1
)
return result_frame
draw_faces = draw_people # Alias for compatibility

View File

@@ -10,7 +10,7 @@ from collections import defaultdict
class ZoneTracker:
def __init__(self, frame_width, entry_zone_percent=0.4, exit_zone_percent=0.4,
cooldown_seconds=2.0, center_buffer_percent=0.1):
cooldown_seconds=4.0, center_buffer_percent=0.1):
"""
Initialize the zone tracker.
@@ -39,13 +39,16 @@ class ZoneTracker:
self.total_exited = 0
# Track faces with timestamps to prevent double-counting
# Key: face_id (centroid hash), Value: (zone, timestamp)
# Key: face_id (unique ID), Value: {'centroid': (x, y), 'zone': zone, 'timestamp': time, 'size': (w, h)}
self.tracked_faces = {}
self.face_cooldowns = defaultdict(float)
# Track last seen zone for each face (to detect zone transitions)
self.last_zone = {}
# Unique face ID counter
self.next_face_id = 1
def get_zone(self, face_x, face_w):
"""
Determine which zone a face is in based on its position.
@@ -75,24 +78,46 @@ class ZoneTracker:
# In the middle zone (between entry/exit and center buffer)
return None
def _get_face_id(self, face_x, face_y, face_w, face_h):
def _calculate_centroid(self, face_x, face_y, face_w, face_h):
"""Calculate the centroid of a face bounding box."""
return (face_x + face_w // 2, face_y + face_h // 2)
def _calculate_distance(self, pt1, pt2):
"""Calculate Euclidean distance between two points."""
return ((pt1[0] - pt2[0])**2 + (pt1[1] - pt2[1])**2)**0.5
def _match_face_to_tracked(self, centroid, size):
"""
Generate a simple ID for a face based on its position and size.
This is a basic approach - in production, use proper tracking algorithms.
Match a detected face to an existing tracked face based on proximity.
Args:
face_x, face_y: Top-left coordinates
face_w, face_h: Width and height
centroid: (x, y) centroid of the detected face
size: (w, h) size of the detected face
Returns:
A simple hash-like ID for tracking
face_id if matched, None if new face
"""
# Use approximate position and size to create a simple ID
# This helps group similar detections as the same person
grid_x = face_x // 50
grid_y = face_y // 50
size_category = (face_w + face_h) // 50
return f"{grid_x}_{grid_y}_{size_category}"
max_distance = 150 # Maximum pixel distance to consider it the same face
max_size_diff = 100 # Maximum size difference to consider it the same face
for face_id, face_data in self.tracked_faces.items():
# Skip if face hasn't been seen recently (within last 2 seconds)
time_since_seen = time.time() - face_data.get('timestamp', 0)
if time_since_seen > 2.0:
continue
tracked_centroid = face_data.get('centroid')
tracked_size = face_data.get('size', (0, 0))
if tracked_centroid:
distance = self._calculate_distance(centroid, tracked_centroid)
size_diff = abs(size[0] + size[1] - tracked_size[0] - tracked_size[1])
# Match if close enough in position and size
if distance < max_distance and size_diff < max_size_diff:
return face_id
return None
def process_faces(self, faces):
"""
@@ -110,24 +135,41 @@ class ZoneTracker:
# Process each detected face
for face in faces:
face_x, face_y, face_w, face_h, confidence = face
face_id = self._get_face_id(face_x, face_y, face_w, face_h)
centroid = self._calculate_centroid(face_x, face_y, face_w, face_h)
zone = self.get_zone(face_x, face_w)
if zone is None or zone == 'center':
continue
# Try to match this face to an existing tracked face
face_id = self._match_face_to_tracked(centroid, (face_w, face_h))
if face_id is None:
# New face - assign a new ID
face_id = self.next_face_id
self.next_face_id += 1
current_zones[face_id] = zone
# Update tracked face data
self.tracked_faces[face_id] = {
'centroid': centroid,
'zone': zone,
'timestamp': current_time,
'size': (face_w, face_h)
}
# Check if this face is in cooldown
if face_id in self.face_cooldowns:
if current_time - self.face_cooldowns[face_id] < self.cooldown_seconds:
continue # Still in cooldown, skip
# Still in cooldown, update zone but don't count
self.last_zone[face_id] = zone
continue
# Check for zone transitions or first detection
if face_id not in self.last_zone:
# First time seeing this face - count if in entry/exit zone
self.last_zone[face_id] = zone
self.tracked_faces[face_id] = (zone, current_time)
# Count on first detection in entry/exit zones
if zone == 'entry':
@@ -155,14 +197,17 @@ class ZoneTracker:
self.total_exited += 1
self.face_cooldowns[face_id] = current_time
self.last_zone[face_id] = zone
else:
# Same zone or transition we don't care about - just update
self.last_zone[face_id] = zone
# Clean up old tracking data for faces no longer detected
faces_to_remove = []
for face_id in self.last_zone:
for face_id in list(self.last_zone.keys()):
if face_id not in current_zones:
# Face no longer detected, but keep in memory for a bit
if face_id in self.tracked_faces:
last_seen = self.tracked_faces[face_id][1]
last_seen = self.tracked_faces[face_id].get('timestamp', 0)
if current_time - last_seen > 5.0: # Remove after 5 seconds
faces_to_remove.append(face_id)