Spaces:
Sleeping
Sleeping
File size: 8,559 Bytes
ad1bda5 |
|
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
from typing import List, Dict, Optional, Tuple
import time
import logging
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class QueueMonitor:
def __init__(self, weights: str = "yolov8s.pt", confidence: float = 0.3, fps: float = 30.0, hf_token: str = None):
try:
if hf_token is None:
hf_token = os.getenv("HF_TOKEN")
self.hf_token = hf_token
self.model = YOLO(weights)
self.tracker = sv.ByteTrack()
self.confidence = confidence
self.fps = fps
self.frame_count = 0
self.colors = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])
self.color_annotator = sv.ColorAnnotator(color=self.colors)
self.label_annotator = sv.LabelAnnotator(
color=self.colors, text_color=sv.Color.from_hex("#000000")
)
self.zones = []
self.time_in_zone_trackers = {}
self.zone_annotators = []
logger.info(f"QueueMonitor initialized with model: {weights}, confidence: {confidence}")
except Exception as e:
logger.error(f"Failed to initialize QueueMonitor: {e}")
raise
def setup_zones(self, polygons: List[np.ndarray]):
try:
if not polygons or len(polygons) == 0:
raise ValueError("At least one zone polygon is required")
self.zones = []
self.zone_annotators = []
self.time_tracking = {}
for idx, polygon in enumerate(polygons):
if polygon.shape[0] < 3:
raise ValueError(f"Zone {idx} polygon must have at least 3 points")
zone = sv.PolygonZone(
polygon=polygon,
triggering_anchors=(sv.Position.CENTER,),
)
self.zones.append(zone)
zone_annotator = sv.PolygonZoneAnnotator(
zone=zone,
color=self.colors.by_idx(idx),
thickness=2,
text_thickness=2,
text_scale=0.5
)
self.zone_annotators.append(zone_annotator)
self.time_tracking[idx] = {}
logger.info(f"Setup {len(self.zones)} zones successfully")
except Exception as e:
logger.error(f"Failed to setup zones: {e}")
raise
def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Dict]:
try:
if frame is None or frame.size == 0:
raise ValueError("Invalid frame: frame is None or empty")
if len(self.zones) == 0:
raise ValueError("No zones configured. Please setup zones first.")
self.frame_count += 1
current_time = self.frame_count / self.fps if self.fps > 0 else self.frame_count
results = self.model(frame, verbose=False, conf=self.confidence)[0]
detections = sv.Detections.from_ultralytics(results)
detections = detections[detections.class_id == 0]
if len(detections) == 0:
detections = self.tracker.update_with_detections(detections)
else:
detections = self.tracker.update_with_detections(detections)
annotated_frame = frame.copy()
zone_stats = []
for idx, (zone, zone_annotator) in enumerate(zip(self.zones, self.zone_annotators)):
try:
annotated_frame = zone_annotator.annotate(scene=annotated_frame)
mask = zone.trigger(detections)
detections_in_zone = detections[mask]
current_count = len(detections_in_zone)
tracker_ids = detections_in_zone.tracker_id.tolist() if detections_in_zone.tracker_id is not None else []
frame_time = 1.0 / self.fps if self.fps > 0 else 1.0
current_trackers_in_zone = set(tracker_ids)
zone_time_tracking = self.time_tracking[idx]
for tid in current_trackers_in_zone:
if tid not in zone_time_tracking:
zone_time_tracking[tid] = {"start_time": current_time, "total_time": 0.0, "visits": 1}
else:
zone_time_tracking[tid]["total_time"] += frame_time
for tid in list(zone_time_tracking.keys()):
if tid not in current_trackers_in_zone:
if zone_time_tracking[tid]["total_time"] <= 0:
del zone_time_tracking[tid]
time_data = {}
for tid in tracker_ids:
if tid in zone_time_tracking:
time_data[str(tid)] = round(zone_time_tracking[tid]["total_time"], 2)
time_values = [tracking["total_time"] for tracking in zone_time_tracking.values()]
avg_time = np.mean(time_values) if time_values else 0.0
max_time = max(time_values) if time_values else 0.0
total_unique_visits = sum(tracking.get("visits", 1) for tracking in zone_time_tracking.values())
zone_stats.append({
"zone_id": idx,
"count": current_count,
"tracker_ids": tracker_ids,
"time_in_zone_seconds": time_data,
"avg_time_seconds": round(avg_time, 2),
"max_time_seconds": round(max_time, 2),
"total_visits": total_unique_visits
})
if len(detections_in_zone) > 0:
custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx)
annotated_frame = self.color_annotator.annotate(
scene=annotated_frame,
detections=detections_in_zone,
custom_color_lookup=custom_color_lookup,
)
if detections_in_zone.tracker_id is not None:
labels = []
for tid in detections_in_zone.tracker_id:
time_str = f"{time_data.get(str(tid), 0):.1f}s" if str(tid) in time_data else f"#{tid}"
labels.append(f"#{tid} ({time_str})")
annotated_frame = self.label_annotator.annotate(
scene=annotated_frame,
detections=detections_in_zone,
labels=labels,
custom_color_lookup=custom_color_lookup,
)
except Exception as e:
logger.warning(f"Error processing zone {idx}: {e}")
zone_stats.append({
"zone_id": idx,
"count": 0,
"tracker_ids": [],
"time_in_zone_seconds": {},
"avg_time_seconds": 0.0,
"max_time_seconds": 0.0,
"total_visits": 0,
"error": str(e)
})
return annotated_frame, zone_stats
except Exception as e:
logger.error(f"Error processing frame: {e}")
raise
if __name__ == "__main__":
# Example usage with a dummy frame
monitor = QueueMonitor()
dummy_frame = np.zeros((720, 1280, 3), dtype=np.uint8)
# Define a simple rectangular zone
polygon = np.array([[100, 100], [600, 100], [600, 600], [100, 600]])
monitor.setup_zones([polygon])
processed, stats = monitor.process_frame(dummy_frame)
print(f"Stats: {stats}")
|