import os import numpy as np import cv2 import csv import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.ticker import MaxNLocator from export_json import export_json # Formal MIS palette C_PRIMARY = "#1e293b" C_ACCENT = "#334155" C_IN = "#059669" C_OUT = "#dc2626" C_FLOW = "#2563eb" C_CONG = "#d97706" C_CONF = "#7c3aed" C_BAR = "#0f766e" C_GRID = "#e2e8f0" C_BG = "#ffffff" def _style(ax, title, xlabel="", ylabel=""): ax.set_title(title, fontsize=13, fontweight="700", color=C_PRIMARY, pad=14) if xlabel: ax.set_xlabel(xlabel, fontsize=9, fontweight="600", color=C_ACCENT) if ylabel: ax.set_ylabel(ylabel, fontsize=9, fontweight="600", color=C_ACCENT) ax.tick_params(labelsize=8, colors=C_ACCENT) ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_color(C_GRID) ax.spines["bottom"].set_color(C_GRID) ax.yaxis.grid(True, color=C_GRID, linewidth=0.6, alpha=0.8) ax.set_axisbelow(True) def _save(fig, path, fmt="png"): if fmt == "pdf": path = path.rsplit(".", 1)[0] + ".pdf" fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none") plt.close(fig) def direction_pie(total_in, total_out, out_dir, fmt="png"): if total_in + total_out == 0: return None fig, ax = plt.subplots(figsize=(5, 5), facecolor=C_BG) wedges, texts, autotexts = ax.pie( [total_in, total_out], labels=[f"Incoming ({total_in})", f"Outgoing ({total_out})"], autopct="%1.1f%%", startangle=90, colors=[C_IN, C_OUT], wedgeprops={"edgecolor": C_BG, "linewidth": 2.5 if (total_in > 0 and total_out > 0) else 0}, textprops={"fontsize": 10, "fontweight": "600", "color": C_PRIMARY}, ) for t in autotexts: t.set_fontsize(11) t.set_fontweight("700") t.set_color(C_BG) ax.set_title("Directional Split", fontsize=13, fontweight="700", color=C_PRIMARY, pad=16) total = total_in + total_out ax.text(0, -1.35, f"Total: {total} vehicles", ha="center", fontsize=9, color=C_ACCENT, fontweight="500") path = os.path.join(out_dir, "direction_pie.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"direction_pie.{ext}" def flow_histogram(flow_times, out_dir, fmt="png"): if not flow_times: return None fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG) bins = min(30, max(5, len(set(flow_times)))) counts, edges, patches = ax.hist(flow_times, bins=bins, color=C_FLOW, alpha=0.85, edgecolor=C_BG, linewidth=0.8) ax.yaxis.set_major_locator(MaxNLocator(integer=True)) _style(ax, "Traffic Flow Over Time", "Time (seconds)", "Vehicles Crossed") peak_idx = int(np.argmax(counts)) peak_time = (edges[peak_idx] + edges[peak_idx + 1]) / 2 ax.text(0.98, 0.95, f"Peak: {int(counts[peak_idx])} vehicles at {peak_time:.1f}s", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "flow_over_time.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"flow_over_time.{ext}" def congestion_chart(congestion, out_dir, fmt="png"): if not congestion: return None fig, ax = plt.subplots(figsize=(10, 4), facecolor=C_BG) x = range(len(congestion)) ax.fill_between(x, congestion, alpha=0.08, color=C_CONG) ax.plot(x, congestion, alpha=0.25, color=C_CONG, linewidth=0.5) win = min(30, max(3, len(congestion) // 10)) smooth = np.convolve(congestion, np.ones(win) / win, mode="same") ax.plot(x, smooth, linewidth=2, color=C_CONG) ax.yaxis.set_major_locator(MaxNLocator(integer=True)) _style(ax, "Congestion Index", "Frame", "Active Vehicles") avg = np.mean(congestion) peak = max(congestion) ax.axhline(avg, color=C_ACCENT, linewidth=0.8, linestyle="--", alpha=0.5) ax.text(0.98, 0.95, f"Peak: {peak} | Avg: {avg:.1f}", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "congestion_index.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"congestion_index.{ext}" def class_dominance(class_in, class_out, model_classes, out_dir, fmt="png"): totals = {} for k in set(list(class_in.keys()) + list(class_out.keys())): totals[k] = class_in.get(k, 0) + class_out.get(k, 0) if not totals or sum(totals.values()) == 0: return None sorted_items = sorted(totals.items(), key=lambda x: x[1], reverse=True) classes = [model_classes.get(int(i), f"cls_{i}") for i, _ in sorted_items] values = [v for _, v in sorted_items] fig, ax = plt.subplots(figsize=(10, 4.5), facecolor=C_BG) n = len(classes) bar_width = min(0.45, max(0.15, 0.6 / max(n, 1))) bars = ax.bar(range(n), values, width=bar_width, color=C_BAR, edgecolor=C_BG, linewidth=0.5, zorder=3) ax.set_xticks(range(n)) ax.set_xticklabels(classes, rotation=35, ha="right", fontsize=9, fontweight="500") ax.yaxis.set_major_locator(MaxNLocator(integer=True)) for bar, v in zip(bars, values): ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.15, str(v), ha="center", va="bottom", fontsize=9, fontweight="700", color=C_PRIMARY) _style(ax, "Class Dominance", "", "Vehicle Count") total = sum(values) ax.text(0.98, 0.95, f"Total: {total} vehicles | {n} classes detected", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "class_dominance.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"class_dominance.{ext}" def confidence_dist(conf_scores, out_dir, fmt="png"): if not conf_scores: return None fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG) ax.hist(conf_scores, bins=30, color=C_CONF, alpha=0.85, edgecolor=C_BG, linewidth=0.8) ax.yaxis.set_major_locator(MaxNLocator(integer=True)) _style(ax, "Detection Confidence Distribution", "Confidence Score", "Detections") mean_c = np.mean(conf_scores) median_c = np.median(conf_scores) ax.axvline(mean_c, color=C_PRIMARY, linewidth=1, linestyle="--", alpha=0.6) ax.text(0.98, 0.95, f"Mean: {mean_c:.3f} | Median: {median_c:.3f} | N={len(conf_scores)}", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "confidence_dist.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"confidence_dist.{ext}" def export_csv(raw_events, out_dir): if not raw_events or len(raw_events) <= 1: return None path = os.path.join(out_dir, "raw_data.csv") with open(path, mode="w", newline="") as f: writer = csv.writer(f) writer.writerows(raw_events) return "raw_data.csv" def spatial_heatmap(heatmap_points, video_path, out_dir, fmt="png"): if not heatmap_points or not video_path or not os.path.exists(video_path): return None cap = cv2.VideoCapture(video_path) ret, frame = cap.read() cap.release() if not ret: return None h, w = frame.shape[:2] density = np.zeros((h, w), dtype=np.float32) for pt in heatmap_points: cx, cy = int(pt[0]), int(pt[1]) if 0 <= cx < w and 0 <= cy < h: cv2.circle(density, (cx, cy), 15, 1.0, -1) density = cv2.GaussianBlur(density, (75, 75), 0) max_val = np.max(density) if max_val > 0: density = (density / max_val) * 255.0 density = density.astype(np.uint8) heatmap = cv2.applyColorMap(density, cv2.COLORMAP_JET) mask = density > 10 overlay = frame.copy() overlay[mask] = cv2.addWeighted(frame[mask], 0.3, heatmap[mask], 0.7, 0).squeeze() if fmt == "pdf": # Wrap OpenCV image in matplotlib for PDF export overlay_rgb = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB) fig, ax = plt.subplots(figsize=(10, 6), facecolor=C_BG) ax.imshow(overlay_rgb) ax.set_title("Spatial Density Heatmap", fontsize=13, fontweight="700", color=C_PRIMARY, pad=14) ax.axis('off') path = os.path.join(out_dir, "heatmap.pdf") fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none") plt.close(fig) return "heatmap.pdf" else: path = os.path.join(out_dir, "heatmap.png") cv2.imwrite(path, overlay) return "heatmap.png" def generate_all(data, model_classes, out_dir, report_format="png"): os.makedirs(out_dir, exist_ok=True) plt.rcParams.update({ "font.family": "sans-serif", "font.sans-serif": ["DejaVu Sans", "Arial", "Helvetica"], "axes.unicode_minus": False, }) total_in = sum(data["class_in"].values()) total_out = sum(data["class_out"].values()) fmt = report_format video_path = data.get("video_path") heatmap_points = data.get("heatmap_points", []) raw_events = data.get("raw_events", []) tasks = [ lambda: direction_pie(total_in, total_out, out_dir, fmt), lambda: flow_histogram(data.get("flow_times", []), out_dir, fmt), lambda: congestion_chart(data.get("congestion", []), out_dir, fmt), lambda: class_dominance(data["class_in"], data["class_out"], model_classes, out_dir, fmt), lambda: confidence_dist(data.get("conf_scores", []), out_dir, fmt), lambda: spatial_heatmap(heatmap_points, video_path, out_dir, fmt), ] if data.get("export_csv", False): tasks.append(lambda: export_csv(raw_events, out_dir)) if data.get("export_json", False): tasks.append(lambda: export_json( data, data.get("video_meta", {}), data.get("engine_config", {}), out_dir, )) files = [] for fn in tasks: name = fn() if name: files.append(name) return files