import cv2 import numpy as np from ultralytics import YOLO import supervision as sv from typing import List, Dict, Optional, Tuple import time import logging import os logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class QueueMonitor: def __init__(self, weights: str = "yolov8s.pt", confidence: float = 0.3, fps: float = 30.0, hf_token: str = None): try: if hf_token is None: hf_token = os.getenv("HF_TOKEN") self.hf_token = hf_token self.model = YOLO(weights) self.tracker = sv.ByteTrack() self.confidence = confidence self.fps = fps self.frame_count = 0 self.colors = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"]) self.color_annotator = sv.ColorAnnotator(color=self.colors) self.label_annotator = sv.LabelAnnotator( color=self.colors, text_color=sv.Color.from_hex("#000000") ) self.zones = [] self.time_in_zone_trackers = {} self.zone_annotators = [] logger.info(f"QueueMonitor initialized with model: {weights}, confidence: {confidence}") except Exception as e: logger.error(f"Failed to initialize QueueMonitor: {e}") raise def setup_zones(self, polygons: List[np.ndarray]): try: if not polygons or len(polygons) == 0: raise ValueError("At least one zone polygon is required") self.zones = [] self.zone_annotators = [] self.time_tracking = {} for idx, polygon in enumerate(polygons): if polygon.shape[0] < 3: raise ValueError(f"Zone {idx} polygon must have at least 3 points") zone = sv.PolygonZone( polygon=polygon, triggering_anchors=(sv.Position.CENTER,), ) self.zones.append(zone) zone_annotator = sv.PolygonZoneAnnotator( zone=zone, color=self.colors.by_idx(idx), thickness=2, text_thickness=2, text_scale=0.5 ) self.zone_annotators.append(zone_annotator) self.time_tracking[idx] = {} logger.info(f"Setup {len(self.zones)} zones successfully") except Exception as e: logger.error(f"Failed to setup zones: {e}") raise def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict]: try: if frame is None or frame.size == 0: raise ValueError("Invalid frame: frame is None or empty") if len(self.zones) == 0: raise ValueError("No zones configured. Please setup zones first.") self.frame_count += 1 current_time = self.frame_count / self.fps if self.fps > 0 else self.frame_count results = self.model(frame, verbose=False, conf=self.confidence)[0] detections = sv.Detections.from_ultralytics(results) detections = detections[detections.class_id == 0] if len(detections) == 0: detections = self.tracker.update_with_detections(detections) else: detections = self.tracker.update_with_detections(detections) annotated_frame = frame.copy() zone_stats = [] for idx, (zone, zone_annotator) in enumerate(zip(self.zones, self.zone_annotators)): try: annotated_frame = zone_annotator.annotate(scene=annotated_frame) mask = zone.trigger(detections) detections_in_zone = detections[mask] current_count = len(detections_in_zone) tracker_ids = detections_in_zone.tracker_id.tolist() if detections_in_zone.tracker_id is not None else [] frame_time = 1.0 / self.fps if self.fps > 0 else 1.0 current_trackers_in_zone = set(tracker_ids) zone_time_tracking = self.time_tracking[idx] for tid in current_trackers_in_zone: if tid not in zone_time_tracking: zone_time_tracking[tid] = {"start_time": current_time, "total_time": 0.0, "visits": 1} else: zone_time_tracking[tid]["total_time"] += frame_time for tid in list(zone_time_tracking.keys()): if tid not in current_trackers_in_zone: if zone_time_tracking[tid]["total_time"] <= 0: del zone_time_tracking[tid] time_data = {} for tid in tracker_ids: if tid in zone_time_tracking: time_data[str(tid)] = round(zone_time_tracking[tid]["total_time"], 2) time_values = [tracking["total_time"] for tracking in zone_time_tracking.values()] avg_time = np.mean(time_values) if time_values else 0.0 max_time = max(time_values) if time_values else 0.0 total_unique_visits = sum(tracking.get("visits", 1) for tracking in zone_time_tracking.values()) zone_stats.append({ "zone_id": idx, "count": current_count, "tracker_ids": tracker_ids, "time_in_zone_seconds": time_data, "avg_time_seconds": round(avg_time, 2), "max_time_seconds": round(max_time, 2), "total_visits": total_unique_visits }) if len(detections_in_zone) > 0: custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx) annotated_frame = self.color_annotator.annotate( scene=annotated_frame, detections=detections_in_zone, custom_color_lookup=custom_color_lookup, ) if detections_in_zone.tracker_id is not None: labels = [] for tid in detections_in_zone.tracker_id: time_str = f"{time_data.get(str(tid), 0):.1f}s" if str(tid) in time_data else f"#{tid}" labels.append(f"#{tid} ({time_str})") annotated_frame = self.label_annotator.annotate( scene=annotated_frame, detections=detections_in_zone, labels=labels, custom_color_lookup=custom_color_lookup, ) except Exception as e: logger.warning(f"Error processing zone {idx}: {e}") zone_stats.append({ "zone_id": idx, "count": 0, "tracker_ids": [], "time_in_zone_seconds": {}, "avg_time_seconds": 0.0, "max_time_seconds": 0.0, "total_visits": 0, "error": str(e) }) return annotated_frame, zone_stats except Exception as e: logger.error(f"Error processing frame: {e}") raise if __name__ == "__main__": # Example usage with a dummy frame monitor = QueueMonitor() dummy_frame = np.zeros((720, 1280, 3), dtype=np.uint8) # Define a simple rectangular zone polygon = np.array([[100, 100], [600, 100], [600, 600], [100, 600]]) monitor.setup_zones([polygon]) processed, stats = monitor.process_frame(dummy_frame) print(f"Stats: {stats}")