""" Flask Application for People Counter Web App Provides video streaming and real-time count updates via WebSocket. """ from flask import Flask, render_template, Response from flask_socketio import SocketIO, emit import time import threading from camera import Camera app = Flask(__name__) app.config['SECRET_KEY'] = 'people-counter-secret-key' socketio = SocketIO(app, cors_allowed_origins="*") # Global camera instance camera = None count_update_interval = 0.5 # Update counts every 0.5 seconds def initialize_camera(): """Initialize the camera.""" global camera try: camera = Camera(camera_index=0, process_every_n_frames=3) camera.start() print("Camera initialized successfully") return True except Exception as e: print(f"Failed to initialize camera: {e}") return False def count_update_thread(): """Background thread to periodically send count updates via WebSocket.""" while True: time.sleep(count_update_interval) if camera: counts = camera.get_counts() socketio.emit('count_update', counts) else: # Send zero counts if camera not available socketio.emit('count_update', { 'total_entered': 0, 'total_exited': 0, 'current_occupancy': 0 }) @app.route('/') def index(): """Serve the main page.""" return render_template('index.html') def generate_frames(): """Generator function for MJPEG video streaming.""" while True: if camera: frame = camera.get_frame() if frame: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') else: time.sleep(0.1) else: time.sleep(0.1) @app.route('/video_feed') def video_feed(): """Video streaming route.""" return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @socketio.on('connect') def handle_connect(): """Handle WebSocket connection.""" print('Client connected') if camera: counts = camera.get_counts() emit('count_update', counts) @socketio.on('disconnect') def handle_disconnect(): """Handle WebSocket disconnection.""" print('Client disconnected') @socketio.on('reset_counts') def handle_reset_counts(): """Handle reset counts request.""" if camera: camera.reset_counts() counts = camera.get_counts() emit('count_update', counts) emit('reset_confirmation', {'status': 'success'}) if __name__ == '__main__': # Initialize camera if initialize_camera(): # Start background thread for count updates update_thread = threading.Thread(target=count_update_thread, daemon=True) update_thread.start() # Run Flask app try: socketio.run(app, host='0.0.0.0', port=5000, debug=False, allow_unsafe_werkzeug=True) except KeyboardInterrupt: print("\nShutting down...") finally: if camera: camera.stop() else: print("Failed to initialize camera. Exiting.")