ai-queue-management / queue_monitor.py
Agent
Initial commit: AI Queue Management System
ad1bda5
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
from typing import List, Dict, Optional, Tuple
import time
import logging
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class QueueMonitor:
def __init__(self, weights: str = "yolov8s.pt", confidence: float = 0.3, fps: float = 30.0, hf_token: str = None):
try:
if hf_token is None:
hf_token = os.getenv("HF_TOKEN")
self.hf_token = hf_token
self.model = YOLO(weights)
self.tracker = sv.ByteTrack()
self.confidence = confidence
self.fps = fps
self.frame_count = 0
self.colors = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])
self.color_annotator = sv.ColorAnnotator(color=self.colors)
self.label_annotator = sv.LabelAnnotator(
color=self.colors, text_color=sv.Color.from_hex("#000000")
)
self.zones = []
self.time_in_zone_trackers = {}
self.zone_annotators = []
logger.info(f"QueueMonitor initialized with model: {weights}, confidence: {confidence}")
except Exception as e:
logger.error(f"Failed to initialize QueueMonitor: {e}")
raise
def setup_zones(self, polygons: List[np.ndarray]):
try:
if not polygons or len(polygons) == 0:
raise ValueError("At least one zone polygon is required")
self.zones = []
self.zone_annotators = []
self.time_tracking = {}
for idx, polygon in enumerate(polygons):
if polygon.shape[0] < 3:
raise ValueError(f"Zone {idx} polygon must have at least 3 points")
zone = sv.PolygonZone(
polygon=polygon,
triggering_anchors=(sv.Position.CENTER,),
)
self.zones.append(zone)
zone_annotator = sv.PolygonZoneAnnotator(
zone=zone,
color=self.colors.by_idx(idx),
thickness=2,
text_thickness=2,
text_scale=0.5
)
self.zone_annotators.append(zone_annotator)
self.time_tracking[idx] = {}
logger.info(f"Setup {len(self.zones)} zones successfully")
except Exception as e:
logger.error(f"Failed to setup zones: {e}")
raise
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict]:
try:
if frame is None or frame.size == 0:
raise ValueError("Invalid frame: frame is None or empty")
if len(self.zones) == 0:
raise ValueError("No zones configured. Please setup zones first.")
self.frame_count += 1
current_time = self.frame_count / self.fps if self.fps > 0 else self.frame_count
results = self.model(frame, verbose=False, conf=self.confidence)[0]
detections = sv.Detections.from_ultralytics(results)
detections = detections[detections.class_id == 0]
if len(detections) == 0:
detections = self.tracker.update_with_detections(detections)
else:
detections = self.tracker.update_with_detections(detections)
annotated_frame = frame.copy()
zone_stats = []
for idx, (zone, zone_annotator) in enumerate(zip(self.zones, self.zone_annotators)):
try:
annotated_frame = zone_annotator.annotate(scene=annotated_frame)
mask = zone.trigger(detections)
detections_in_zone = detections[mask]
current_count = len(detections_in_zone)
tracker_ids = detections_in_zone.tracker_id.tolist() if detections_in_zone.tracker_id is not None else []
frame_time = 1.0 / self.fps if self.fps > 0 else 1.0
current_trackers_in_zone = set(tracker_ids)
zone_time_tracking = self.time_tracking[idx]
for tid in current_trackers_in_zone:
if tid not in zone_time_tracking:
zone_time_tracking[tid] = {"start_time": current_time, "total_time": 0.0, "visits": 1}
else:
zone_time_tracking[tid]["total_time"] += frame_time
for tid in list(zone_time_tracking.keys()):
if tid not in current_trackers_in_zone:
if zone_time_tracking[tid]["total_time"] <= 0:
del zone_time_tracking[tid]
time_data = {}
for tid in tracker_ids:
if tid in zone_time_tracking:
time_data[str(tid)] = round(zone_time_tracking[tid]["total_time"], 2)
time_values = [tracking["total_time"] for tracking in zone_time_tracking.values()]
avg_time = np.mean(time_values) if time_values else 0.0
max_time = max(time_values) if time_values else 0.0
total_unique_visits = sum(tracking.get("visits", 1) for tracking in zone_time_tracking.values())
zone_stats.append({
"zone_id": idx,
"count": current_count,
"tracker_ids": tracker_ids,
"time_in_zone_seconds": time_data,
"avg_time_seconds": round(avg_time, 2),
"max_time_seconds": round(max_time, 2),
"total_visits": total_unique_visits
})
if len(detections_in_zone) > 0:
custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx)
annotated_frame = self.color_annotator.annotate(
scene=annotated_frame,
detections=detections_in_zone,
custom_color_lookup=custom_color_lookup,
)
if detections_in_zone.tracker_id is not None:
labels = []
for tid in detections_in_zone.tracker_id:
time_str = f"{time_data.get(str(tid), 0):.1f}s" if str(tid) in time_data else f"#{tid}"
labels.append(f"#{tid} ({time_str})")
annotated_frame = self.label_annotator.annotate(
scene=annotated_frame,
detections=detections_in_zone,
labels=labels,
custom_color_lookup=custom_color_lookup,
)
except Exception as e:
logger.warning(f"Error processing zone {idx}: {e}")
zone_stats.append({
"zone_id": idx,
"count": 0,
"tracker_ids": [],
"time_in_zone_seconds": {},
"avg_time_seconds": 0.0,
"max_time_seconds": 0.0,
"total_visits": 0,
"error": str(e)
})
return annotated_frame, zone_stats
except Exception as e:
logger.error(f"Error processing frame: {e}")
raise
if __name__ == "__main__":
# Example usage with a dummy frame
monitor = QueueMonitor()
dummy_frame = np.zeros((720, 1280, 3), dtype=np.uint8)
# Define a simple rectangular zone
polygon = np.array([[100, 100], [600, 100], [600, 600], [100, 600]])
monitor.setup_zones([polygon])
processed, stats = monitor.process_frame(dummy_frame)
print(f"Stats: {stats}")