UrbanFlow / backend /engine.py
Subh775's picture
RELEASE: URBANFLOW: YOUR VISION PARTNER..
091ea0d
import os
import time
import tempfile
import threading
import queue
import numpy as np
import cv2
from collections import defaultdict
from pcu import compute_pcu, MODEL_CLASSES
from tracker_config import get_tracker_path
from speed import estimate_speeds
def _side(p, a, b):
return np.sign((b[0] - a[0]) * (p[1] - a[1]) - (b[1] - a[1]) * (p[0] - a[0]))
def _point_to_segment_dist(px, py, ax, ay, bx, by):
A = np.array([ax, ay], dtype=float)
B = np.array([bx, by], dtype=float)
P = np.array([px, py], dtype=float)
AB = B - A
t = np.clip(np.dot(P - A, AB) / np.dot(AB, AB), 0, 1)
return np.linalg.norm(P - (A + t * AB))
# Lightweight drawing colors (BGR for OpenCV)
_CLR_BOX = (230, 180, 50) # teal-ish
_CLR_LINE = (80, 220, 100) # green
_CLR_TEXT_BG = (30, 30, 30) # dark bg for text
class ThreadedVideoWriter:
def __init__(self, path, fps, size):
self.path = path
self.fps = fps
self.size = size
self.queue = queue.Queue(maxsize=128)
self.stopped = False
self.writer = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*"mp4v"), fps, size)
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
def _run(self):
while not self.stopped or not self.queue.empty():
try:
frame = self.queue.get(timeout=1.0)
if frame is not None:
self.writer.write(frame)
self.queue.task_done()
except queue.Empty:
continue
self.writer.release()
print(f"[BACKEND] Threaded writer finished: {self.path}")
def write(self, frame):
if not self.stopped:
try:
self.queue.put(frame) # no copy — frame ownership transfers to queue
except Exception as e:
print(f"[BACKEND] Writer queue error: {e}")
def stop(self):
self.stopped = True
self.thread.join()
def _draw_annotations(frame, boxes, ids, clses, line_pts, options):
"""Draw bounding boxes, track IDs, and counting line on frame in-place."""
# Counting line (Spatial Boundary)
if options.get("spatial", True):
cv2.line(frame, tuple(line_pts[0]), tuple(line_pts[1]), _CLR_LINE, 3, cv2.LINE_AA)
if boxes is not None and ids is not None and clses is not None:
for box, obj_id, cls_idx in zip(boxes, ids, clses):
x1, y1, x2, y2 = map(int, box)
# Bounding Box
if options.get("bbox", True):
cv2.rectangle(frame, (x1, y1), (x2, y2), _CLR_BOX, 2)
# Labels
labels = []
if options.get("track_id", True):
labels.append(f"ID:{int(obj_id)}")
if options.get("class_name", True):
labels.append(MODEL_CLASSES.get(int(cls_idx), "Unknown"))
elif options.get("class_id", False):
labels.append(f"C:{int(cls_idx)}")
if labels:
label_text = " | ".join(labels)
(tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
cv2.rectangle(frame, (x1, y1 - th - 6), (x1 + tw + 6, y1), _CLR_TEXT_BG, -1)
cv2.putText(frame, label_text, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1, cv2.LINE_AA)
def run(model, video_path, line, config, on_frame, save_annotated=False, annotated_options=None):
"""
Runs YOLO tracking on video. Calls on_frame(update_dict) after each processed frame.
line: [[x1,y1], [x2,y2]]
save_annotated: if True, writes annotated MP4 with boxes + IDs + counting line
annotated_options: dict of toggleable visual overlays
"""
if annotated_options is None:
annotated_options = {"bbox": True, "track_id": True, "spatial": True}
# Force bbox to True if export is enabled (user requirement)
if save_annotated:
annotated_options["bbox"] = True
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
out_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
out_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# Dynamic crossing threshold: 5% of frame height, min 40px
cross_dist = max(40, int(out_h * 0.05))
stride = config["detect_stride"]
total_iters = total // stride
# Annotated video writer (temp directory — auto-cleaned on container shutdown)
annotated_path = None
writer = None
if save_annotated:
annotated_dir = os.path.join(tempfile.gettempdir(), "funky_reports")
os.makedirs(annotated_dir, exist_ok=True)
annotated_path = os.path.join(annotated_dir, f"annotated_{os.path.basename(video_path)}.mp4")
writer_fps = max(1.0, fps / stride)
writer = ThreadedVideoWriter(annotated_path, writer_fps, (out_w, out_h))
prev_side = {}
counted_ids = set()
class_in = defaultdict(int)
class_out = defaultdict(int)
congestion = []
flow_times = []
conf_scores = []
heatmap_points = []
track_positions = defaultdict(list)
raw_events = [["frame_index", "timestamp_sec", "vehicle_id", "class_name", "direction"]]
start = time.time()
# https://docs.ultralytics.com/modes/predict/#inference-sources
# https://docs.ultralytics.com/modes/track/#why-choose-ultralytics-yolo-for-object-tracking
# ExecuTorch: https://docs.ultralytics.com/integrations/executorch/#what-are-the-system-requirements-for-executorch-export
results = model.track(
source=video_path,
tracker=get_tracker_path(),
imgsz=736, # MUST match OpenVINO export imgsz — compiled graph is fixed shape
conf=config.get("conf", 0.12),
iou=config.get("iou", 0.6),
vid_stride=stride,
stream=True,
verbose=False,
persist=False, # MUST be False — True causes ByteTracker state leak across runs
batch=2 # MUST match OpenVINO export batch size
)
a = line[0]
b = line[1]
iterator = iter(enumerate(results))
while True:
try:
frame_idx, r = next(iterator)
except StopIteration:
break
except RuntimeError as e:
if "incompatible" in str(e) and "shape=" in str(e):
print(f"[BACKEND] Ignored OpenVINO shape mismatch on final trailing batch.")
break
raise e
active = 0
cur_boxes = None
cur_ids = None
if r.boxes.id is not None:
ids = r.boxes.id.cpu().numpy()
cls = r.boxes.cls.cpu().numpy()
xyxy = r.boxes.xyxy.cpu().numpy()
active = len(ids)
confs = r.boxes.conf.cpu().numpy().tolist()
conf_scores.extend(confs)
cur_boxes = xyxy
cur_ids = ids
for obj_id, c, box in zip(ids, cls, xyxy):
cx = int((box[0] + box[2]) / 2)
cy = int((box[1] + box[3]) / 2)
heatmap_points.append([cx, cy])
track_positions[obj_id].append((frame_idx, cx, cy))
current = _side((cx, cy), a, b)
# Skip if centroid is exactly on the line (cross-product == 0)
# — avoids misfired crossings due to floating-point boundary hits
if current == 0:
continue
if obj_id in prev_side and obj_id not in counted_ids:
if prev_side[obj_id] != current:
dist = _point_to_segment_dist(cx, cy, a[0], a[1], b[0], b[1])
if dist < cross_dist:
t = frame_idx * stride / fps
flow_times.append(round(t, 2))
if current > 0:
class_in[int(c)] += 1
raw_events.append([frame_idx + 1, round(t, 2), int(obj_id), MODEL_CLASSES.get(int(c), f"cls_{int(c)}"), "IN"])
else:
class_out[int(c)] += 1
raw_events.append([frame_idx + 1, round(t, 2), int(obj_id), MODEL_CLASSES.get(int(c), f"cls_{int(c)}"), "OUT"])
counted_ids.add(obj_id)
prev_side[obj_id] = current
# Write annotated frame
cur_clses = cls if r.boxes.id is not None else None
if writer is not None:
frame = r.orig_img.copy()
_draw_annotations(frame, cur_boxes, cur_ids, cur_clses, [a, b], annotated_options)
writer.write(frame)
congestion.append(active)
elapsed = time.time() - start
update = {
"frame_index": frame_idx + 1,
"total_iters": total_iters,
"total_frames": total,
"active": active,
"congestion_len": len(congestion), # just the length, not the full list
"congestion_last": congestion[-1] if congestion else 0, # only latest value
"class_in": {str(k): v for k, v in class_in.items()},
"class_out": {str(k): v for k, v in class_out.items()},
"flow_count": len(flow_times), # just the count
"elapsed": round(elapsed, 2),
"fps": round((frame_idx + 1) / elapsed, 2) if elapsed > 0 else 0,
}
on_frame(update)
if writer is not None:
writer.stop()
processing_time = round(time.time() - start, 2)
actual_fps = round(total / processing_time, 2) if processing_time > 0 else 0
speed_vs_rt = round(actual_fps / fps, 2) if fps > 0 else 0
speed_data = estimate_speeds(dict(track_positions))
pcu_data = compute_pcu(
{str(k): v for k, v in class_in.items()},
{str(k): v for k, v in class_out.items()},
)
result = {
"class_in": dict(class_in),
"class_out": dict(class_out),
"congestion": congestion,
"flow_times": flow_times,
"conf_scores": conf_scores,
"heatmap_points": heatmap_points,
"raw_events": raw_events,
"processing_time": processing_time,
"actual_fps": actual_fps,
"speed_vs_realtime": speed_vs_rt,
"speed": speed_data,
"pcu": pcu_data,
}
if annotated_path and os.path.exists(annotated_path):
result["annotated_video"] = annotated_path
return result