Spaces:
Sleeping
Sleeping
File size: 8,559 Bytes
ad1bda5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
from typing import List, Dict, Optional, Tuple
import time
import logging
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class QueueMonitor:
def __init__(self, weights: str = "yolov8s.pt", confidence: float = 0.3, fps: float = 30.0, hf_token: str = None):
try:
if hf_token is None:
hf_token = os.getenv("HF_TOKEN")
self.hf_token = hf_token
self.model = YOLO(weights)
self.tracker = sv.ByteTrack()
self.confidence = confidence
self.fps = fps
self.frame_count = 0
self.colors = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])
self.color_annotator = sv.ColorAnnotator(color=self.colors)
self.label_annotator = sv.LabelAnnotator(
color=self.colors, text_color=sv.Color.from_hex("#000000")
)
self.zones = []
self.time_in_zone_trackers = {}
self.zone_annotators = []
logger.info(f"QueueMonitor initialized with model: {weights}, confidence: {confidence}")
except Exception as e:
logger.error(f"Failed to initialize QueueMonitor: {e}")
raise
def setup_zones(self, polygons: List[np.ndarray]):
try:
if not polygons or len(polygons) == 0:
raise ValueError("At least one zone polygon is required")
self.zones = []
self.zone_annotators = []
self.time_tracking = {}
for idx, polygon in enumerate(polygons):
if polygon.shape[0] < 3:
raise ValueError(f"Zone {idx} polygon must have at least 3 points")
zone = sv.PolygonZone(
polygon=polygon,
triggering_anchors=(sv.Position.CENTER,),
)
self.zones.append(zone)
zone_annotator = sv.PolygonZoneAnnotator(
zone=zone,
color=self.colors.by_idx(idx),
thickness=2,
text_thickness=2,
text_scale=0.5
)
self.zone_annotators.append(zone_annotator)
self.time_tracking[idx] = {}
logger.info(f"Setup {len(self.zones)} zones successfully")
except Exception as e:
logger.error(f"Failed to setup zones: {e}")
raise
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict]:
try:
if frame is None or frame.size == 0:
raise ValueError("Invalid frame: frame is None or empty")
if len(self.zones) == 0:
raise ValueError("No zones configured. Please setup zones first.")
self.frame_count += 1
current_time = self.frame_count / self.fps if self.fps > 0 else self.frame_count
results = self.model(frame, verbose=False, conf=self.confidence)[0]
detections = sv.Detections.from_ultralytics(results)
detections = detections[detections.class_id == 0]
if len(detections) == 0:
detections = self.tracker.update_with_detections(detections)
else:
detections = self.tracker.update_with_detections(detections)
annotated_frame = frame.copy()
zone_stats = []
for idx, (zone, zone_annotator) in enumerate(zip(self.zones, self.zone_annotators)):
try:
annotated_frame = zone_annotator.annotate(scene=annotated_frame)
mask = zone.trigger(detections)
detections_in_zone = detections[mask]
current_count = len(detections_in_zone)
tracker_ids = detections_in_zone.tracker_id.tolist() if detections_in_zone.tracker_id is not None else []
frame_time = 1.0 / self.fps if self.fps > 0 else 1.0
current_trackers_in_zone = set(tracker_ids)
zone_time_tracking = self.time_tracking[idx]
for tid in current_trackers_in_zone:
if tid not in zone_time_tracking:
zone_time_tracking[tid] = {"start_time": current_time, "total_time": 0.0, "visits": 1}
else:
zone_time_tracking[tid]["total_time"] += frame_time
for tid in list(zone_time_tracking.keys()):
if tid not in current_trackers_in_zone:
if zone_time_tracking[tid]["total_time"] <= 0:
del zone_time_tracking[tid]
time_data = {}
for tid in tracker_ids:
if tid in zone_time_tracking:
time_data[str(tid)] = round(zone_time_tracking[tid]["total_time"], 2)
time_values = [tracking["total_time"] for tracking in zone_time_tracking.values()]
avg_time = np.mean(time_values) if time_values else 0.0
max_time = max(time_values) if time_values else 0.0
total_unique_visits = sum(tracking.get("visits", 1) for tracking in zone_time_tracking.values())
zone_stats.append({
"zone_id": idx,
"count": current_count,
"tracker_ids": tracker_ids,
"time_in_zone_seconds": time_data,
"avg_time_seconds": round(avg_time, 2),
"max_time_seconds": round(max_time, 2),
"total_visits": total_unique_visits
})
if len(detections_in_zone) > 0:
custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx)
annotated_frame = self.color_annotator.annotate(
scene=annotated_frame,
detections=detections_in_zone,
custom_color_lookup=custom_color_lookup,
)
if detections_in_zone.tracker_id is not None:
labels = []
for tid in detections_in_zone.tracker_id:
time_str = f"{time_data.get(str(tid), 0):.1f}s" if str(tid) in time_data else f"#{tid}"
labels.append(f"#{tid} ({time_str})")
annotated_frame = self.label_annotator.annotate(
scene=annotated_frame,
detections=detections_in_zone,
labels=labels,
custom_color_lookup=custom_color_lookup,
)
except Exception as e:
logger.warning(f"Error processing zone {idx}: {e}")
zone_stats.append({
"zone_id": idx,
"count": 0,
"tracker_ids": [],
"time_in_zone_seconds": {},
"avg_time_seconds": 0.0,
"max_time_seconds": 0.0,
"total_visits": 0,
"error": str(e)
})
return annotated_frame, zone_stats
except Exception as e:
logger.error(f"Error processing frame: {e}")
raise
if __name__ == "__main__":
# Example usage with a dummy frame
monitor = QueueMonitor()
dummy_frame = np.zeros((720, 1280, 3), dtype=np.uint8)
# Define a simple rectangular zone
polygon = np.array([[100, 100], [600, 100], [600, 600], [100, 600]])
monitor.setup_zones([polygon])
processed, stats = monitor.process_frame(dummy_frame)
print(f"Stats: {stats}")
|