Spaces:
Running
Running
| import os | |
| import time | |
| import tempfile | |
| import threading | |
| import queue | |
| import numpy as np | |
| import cv2 | |
| from collections import defaultdict | |
| from pcu import compute_pcu, MODEL_CLASSES | |
| from tracker_config import get_tracker_path | |
| from speed import estimate_speeds | |
| def _side(p, a, b): | |
| return np.sign((b[0] - a[0]) * (p[1] - a[1]) - (b[1] - a[1]) * (p[0] - a[0])) | |
| def _point_to_segment_dist(px, py, ax, ay, bx, by): | |
| A = np.array([ax, ay], dtype=float) | |
| B = np.array([bx, by], dtype=float) | |
| P = np.array([px, py], dtype=float) | |
| AB = B - A | |
| t = np.clip(np.dot(P - A, AB) / np.dot(AB, AB), 0, 1) | |
| return np.linalg.norm(P - (A + t * AB)) | |
| # Lightweight drawing colors (BGR for OpenCV) | |
| _CLR_BOX = (230, 180, 50) # teal-ish | |
| _CLR_LINE = (80, 220, 100) # green | |
| _CLR_TEXT_BG = (30, 30, 30) # dark bg for text | |
| class ThreadedVideoWriter: | |
| def __init__(self, path, fps, size): | |
| self.path = path | |
| self.fps = fps | |
| self.size = size | |
| self.queue = queue.Queue(maxsize=128) | |
| self.stopped = False | |
| self.writer = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*"mp4v"), fps, size) | |
| self.thread = threading.Thread(target=self._run, daemon=True) | |
| self.thread.start() | |
| def _run(self): | |
| while not self.stopped or not self.queue.empty(): | |
| try: | |
| frame = self.queue.get(timeout=1.0) | |
| if frame is not None: | |
| self.writer.write(frame) | |
| self.queue.task_done() | |
| except queue.Empty: | |
| continue | |
| self.writer.release() | |
| print(f"[BACKEND] Threaded writer finished: {self.path}") | |
| def write(self, frame): | |
| if not self.stopped: | |
| try: | |
| self.queue.put(frame) # no copy — frame ownership transfers to queue | |
| except Exception as e: | |
| print(f"[BACKEND] Writer queue error: {e}") | |
| def stop(self): | |
| self.stopped = True | |
| self.thread.join() | |
| def _draw_annotations(frame, boxes, ids, clses, line_pts, options): | |
| """Draw bounding boxes, track IDs, and counting line on frame in-place.""" | |
| # Counting line (Spatial Boundary) | |
| if options.get("spatial", True): | |
| cv2.line(frame, tuple(line_pts[0]), tuple(line_pts[1]), _CLR_LINE, 3, cv2.LINE_AA) | |
| if boxes is not None and ids is not None and clses is not None: | |
| for box, obj_id, cls_idx in zip(boxes, ids, clses): | |
| x1, y1, x2, y2 = map(int, box) | |
| # Bounding Box | |
| if options.get("bbox", True): | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), _CLR_BOX, 2) | |
| # Labels | |
| labels = [] | |
| if options.get("track_id", True): | |
| labels.append(f"ID:{int(obj_id)}") | |
| if options.get("class_name", True): | |
| labels.append(MODEL_CLASSES.get(int(cls_idx), "Unknown")) | |
| elif options.get("class_id", False): | |
| labels.append(f"C:{int(cls_idx)}") | |
| if labels: | |
| label_text = " | ".join(labels) | |
| (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) | |
| cv2.rectangle(frame, (x1, y1 - th - 6), (x1 + tw + 6, y1), _CLR_TEXT_BG, -1) | |
| cv2.putText(frame, label_text, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1, cv2.LINE_AA) | |
| def run(model, video_path, line, config, on_frame, save_annotated=False, annotated_options=None): | |
| """ | |
| Runs YOLO tracking on video. Calls on_frame(update_dict) after each processed frame. | |
| line: [[x1,y1], [x2,y2]] | |
| save_annotated: if True, writes annotated MP4 with boxes + IDs + counting line | |
| annotated_options: dict of toggleable visual overlays | |
| """ | |
| if annotated_options is None: | |
| annotated_options = {"bbox": True, "track_id": True, "spatial": True} | |
| # Force bbox to True if export is enabled (user requirement) | |
| if save_annotated: | |
| annotated_options["bbox"] = True | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| out_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| out_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| # Dynamic crossing threshold: 5% of frame height, min 40px | |
| cross_dist = max(40, int(out_h * 0.05)) | |
| stride = config["detect_stride"] | |
| total_iters = total // stride | |
| # Annotated video writer (temp directory — auto-cleaned on container shutdown) | |
| annotated_path = None | |
| writer = None | |
| if save_annotated: | |
| annotated_dir = os.path.join(tempfile.gettempdir(), "funky_reports") | |
| os.makedirs(annotated_dir, exist_ok=True) | |
| annotated_path = os.path.join(annotated_dir, f"annotated_{os.path.basename(video_path)}.mp4") | |
| writer_fps = max(1.0, fps / stride) | |
| writer = ThreadedVideoWriter(annotated_path, writer_fps, (out_w, out_h)) | |
| prev_side = {} | |
| counted_ids = set() | |
| class_in = defaultdict(int) | |
| class_out = defaultdict(int) | |
| congestion = [] | |
| flow_times = [] | |
| conf_scores = [] | |
| heatmap_points = [] | |
| track_positions = defaultdict(list) | |
| raw_events = [["frame_index", "timestamp_sec", "vehicle_id", "class_name", "direction"]] | |
| start = time.time() | |
| # https://docs.ultralytics.com/modes/predict/#inference-sources | |
| # https://docs.ultralytics.com/modes/track/#why-choose-ultralytics-yolo-for-object-tracking | |
| # ExecuTorch: https://docs.ultralytics.com/integrations/executorch/#what-are-the-system-requirements-for-executorch-export | |
| results = model.track( | |
| source=video_path, | |
| tracker=get_tracker_path(), | |
| imgsz=736, # MUST match OpenVINO export imgsz — compiled graph is fixed shape | |
| conf=config.get("conf", 0.12), | |
| iou=config.get("iou", 0.6), | |
| vid_stride=stride, | |
| stream=True, | |
| verbose=False, | |
| persist=False, # MUST be False — True causes ByteTracker state leak across runs | |
| batch=2 # MUST match OpenVINO export batch size | |
| ) | |
| a = line[0] | |
| b = line[1] | |
| iterator = iter(enumerate(results)) | |
| while True: | |
| try: | |
| frame_idx, r = next(iterator) | |
| except StopIteration: | |
| break | |
| except RuntimeError as e: | |
| if "incompatible" in str(e) and "shape=" in str(e): | |
| print(f"[BACKEND] Ignored OpenVINO shape mismatch on final trailing batch.") | |
| break | |
| raise e | |
| active = 0 | |
| cur_boxes = None | |
| cur_ids = None | |
| if r.boxes.id is not None: | |
| ids = r.boxes.id.cpu().numpy() | |
| cls = r.boxes.cls.cpu().numpy() | |
| xyxy = r.boxes.xyxy.cpu().numpy() | |
| active = len(ids) | |
| confs = r.boxes.conf.cpu().numpy().tolist() | |
| conf_scores.extend(confs) | |
| cur_boxes = xyxy | |
| cur_ids = ids | |
| for obj_id, c, box in zip(ids, cls, xyxy): | |
| cx = int((box[0] + box[2]) / 2) | |
| cy = int((box[1] + box[3]) / 2) | |
| heatmap_points.append([cx, cy]) | |
| track_positions[obj_id].append((frame_idx, cx, cy)) | |
| current = _side((cx, cy), a, b) | |
| # Skip if centroid is exactly on the line (cross-product == 0) | |
| # — avoids misfired crossings due to floating-point boundary hits | |
| if current == 0: | |
| continue | |
| if obj_id in prev_side and obj_id not in counted_ids: | |
| if prev_side[obj_id] != current: | |
| dist = _point_to_segment_dist(cx, cy, a[0], a[1], b[0], b[1]) | |
| if dist < cross_dist: | |
| t = frame_idx * stride / fps | |
| flow_times.append(round(t, 2)) | |
| if current > 0: | |
| class_in[int(c)] += 1 | |
| raw_events.append([frame_idx + 1, round(t, 2), int(obj_id), MODEL_CLASSES.get(int(c), f"cls_{int(c)}"), "IN"]) | |
| else: | |
| class_out[int(c)] += 1 | |
| raw_events.append([frame_idx + 1, round(t, 2), int(obj_id), MODEL_CLASSES.get(int(c), f"cls_{int(c)}"), "OUT"]) | |
| counted_ids.add(obj_id) | |
| prev_side[obj_id] = current | |
| # Write annotated frame | |
| cur_clses = cls if r.boxes.id is not None else None | |
| if writer is not None: | |
| frame = r.orig_img.copy() | |
| _draw_annotations(frame, cur_boxes, cur_ids, cur_clses, [a, b], annotated_options) | |
| writer.write(frame) | |
| congestion.append(active) | |
| elapsed = time.time() - start | |
| update = { | |
| "frame_index": frame_idx + 1, | |
| "total_iters": total_iters, | |
| "total_frames": total, | |
| "active": active, | |
| "congestion_len": len(congestion), # just the length, not the full list | |
| "congestion_last": congestion[-1] if congestion else 0, # only latest value | |
| "class_in": {str(k): v for k, v in class_in.items()}, | |
| "class_out": {str(k): v for k, v in class_out.items()}, | |
| "flow_count": len(flow_times), # just the count | |
| "elapsed": round(elapsed, 2), | |
| "fps": round((frame_idx + 1) / elapsed, 2) if elapsed > 0 else 0, | |
| } | |
| on_frame(update) | |
| if writer is not None: | |
| writer.stop() | |
| processing_time = round(time.time() - start, 2) | |
| actual_fps = round(total / processing_time, 2) if processing_time > 0 else 0 | |
| speed_vs_rt = round(actual_fps / fps, 2) if fps > 0 else 0 | |
| speed_data = estimate_speeds(dict(track_positions)) | |
| pcu_data = compute_pcu( | |
| {str(k): v for k, v in class_in.items()}, | |
| {str(k): v for k, v in class_out.items()}, | |
| ) | |
| result = { | |
| "class_in": dict(class_in), | |
| "class_out": dict(class_out), | |
| "congestion": congestion, | |
| "flow_times": flow_times, | |
| "conf_scores": conf_scores, | |
| "heatmap_points": heatmap_points, | |
| "raw_events": raw_events, | |
| "processing_time": processing_time, | |
| "actual_fps": actual_fps, | |
| "speed_vs_realtime": speed_vs_rt, | |
| "speed": speed_data, | |
| "pcu": pcu_data, | |
| } | |
| if annotated_path and os.path.exists(annotated_path): | |
| result["annotated_video"] = annotated_path | |
| return result | |