import os import json import uuid import asyncio import tempfile import shutil from pathlib import Path import zipfile import cv2 from fastapi import FastAPI, WebSocket, UploadFile, File, BackgroundTasks from fastapi.responses import FileResponse, Response from fastapi.staticfiles import StaticFiles import resend from model import load_model from config import get_optimal_config from engine import run from pcu import MODEL_CLASSES from visualize import generate_all BUSINESS_MAP = { "Cars": [0, 1, 2, 3, 12], "Buses": [4, 9, 10], "Two-wheelers": [7, 11], "Three-wheelers": [6], "Trucks": [5, 8], "Others": [13] } from contextlib import asynccontextmanager import numpy as np BASE = Path(__file__).parent.parent FRONTEND = BASE / "frontend" UPLOAD_DIR = Path(tempfile.gettempdir()) / "funky_uploads" UPLOAD_DIR.mkdir(exist_ok=True) REPORT_DIR = Path(tempfile.gettempdir()) / "funky_reports" REPORT_DIR.mkdir(exist_ok=True) videos = {} video_info = {} run_results = {} model = None @asynccontextmanager async def lifespan(app: FastAPI): global model model = load_model() # Warm up: run a dummy inference so OpenVINO compiles its graph now, # not on the first real user request try: dummy_img = np.zeros((736, 736, 3), dtype=np.uint8) model([dummy_img, dummy_img], verbose=False) # list of 2 imgs triggers batch=2 print("[BACKEND] Model warm-up complete.") except Exception as e: print(f"[BACKEND] Warm-up skipped: {e}") yield # Shutdown: nothing to clean up app = FastAPI(lifespan=lifespan) @app.get("/") def index(): return FileResponse(FRONTEND / "initial.html") @app.post("/upload") async def upload(file: UploadFile = File(...)): video_id = str(uuid.uuid4())[:8] path = UPLOAD_DIR / f"{video_id}.mp4" # Clean up any previous temp uploads to avoid stale state for old_path in UPLOAD_DIR.glob("*.mp4"): try: old_path.unlink() except Exception: pass print(f"[BACKEND] Received upload request: {file.filename}") try: with open(path, "wb") as f: shutil.copyfileobj(file.file, f) file_size = os.path.getsize(path) print(f"[BACKEND] Successfully stored: {path} ({file_size} bytes)") videos[video_id] = str(path) video_info[video_id] = file.filename return {"video_id": video_id} except Exception as e: print(f"[BACKEND] Upload failed: {str(e)}") return Response(content=str(e), status_code=500) @app.get("/config/{video_id}") def config_endpoint(video_id: str): path = videos.get(video_id) cfg = get_optimal_config(path) return cfg @app.get("/first-frame/{video_id}") def first_frame(video_id: str): path = videos.get(video_id) cap = cv2.VideoCapture(path) ret, frame = cap.read() cap.release() _, buf = cv2.imencode(".jpg", frame) return Response(content=buf.tobytes(), media_type="image/jpeg") @app.get("/constants") def constants(): return {"classes": MODEL_CLASSES, "business_map": BUSINESS_MAP} @app.post("/reports/{video_id}") def generate_reports(video_id: str): data = run_results.get(video_id) if not data: return {"error": "no results", "files": []} out_dir = str(REPORT_DIR / video_id) report_format = data.get("report_format", "png") files = generate_all(data, MODEL_CLASSES, out_dir, report_format) # Include annotated video if it exists annotated_src = data.get("annotated_video") if annotated_src and os.path.exists(annotated_src): dest = os.path.join(out_dir, "annotated.mp4") shutil.copy2(annotated_src, dest) # Always overwrite — never skip files.append("annotated.mp4") return {"files": files} @app.get("/reports/{video_id}/{name}") def get_report(video_id: str, name: str): path = REPORT_DIR / video_id / name if not path.exists(): return Response(status_code=404) media = "image/png" if name.endswith(".pdf"): media = "application/pdf" elif name.endswith(".mp4"): media = "video/mp4" elif name.endswith(".json"): media = "application/json" elif name.endswith(".csv"): media = "text/csv" return FileResponse(str(path), media_type=media) @app.get("/bundle/{video_id}") def download_all_reports(video_id: str): print(f"[BACKEND] ZIP request for {video_id}") base_path = REPORT_DIR / video_id if not base_path.exists(): print(f"[BACKEND] Error: {base_path} not found") return Response(content=f"Report directory not found for {video_id}", status_code=404) try: zip_filename = f"bundle_{video_id}.zip" zip_path = REPORT_DIR / zip_filename if zip_path.exists(): zip_path.unlink() print(f"[BACKEND] Creating ZIP: {zip_path}") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(base_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, base_path) zipf.write(file_path, arcname) if not zip_path.exists(): raise Exception("Zip file was not created") original_name = video_info.get(video_id, "UrbanFlow_Analysis") safe_name = "".join(x for x in original_name if x.isalnum() or x in "._-").rsplit(".", 1)[0] print(f"[BACKEND] Serving ZIP: {zip_path}") return FileResponse( str(zip_path), media_type="application/zip", filename=f"{safe_name}_UrbanFlow.zip" ) except Exception as e: import traceback print(f"[BACKEND] ZIP Error: {str(e)}\n{traceback.format_exc()}") return Response(content=str(e), status_code=500) FEEDBACK_PATH = Path(tempfile.gettempdir()) / "urbanflow_feedback.json" def send_feedback_email(api_key, feedback): try: resend.api_key = api_key fb_type = feedback.get('type') or 'General' rating = feedback.get('rating', 0) details = feedback.get('details') or "" usecase = feedback.get('usecase') or "Not specified" emojis = feedback.get('emojis', {}) priorities = feedback.get('priorities', []) # Check if it's stars-only (no emojis, no priorities, no text) has_emojis = any(v for v in emojis.values()) has_priorities = len(priorities) > 0 has_text = bool(details and details.strip()) is_stars_only = not (has_emojis or has_priorities or has_text) def get_emoji_row(label, choice, custom_options=None): options = custom_options or ["Poor", "Fair", "Good", "Great"] c = (choice or "").title() row = f"
{label}
" for opt in options: if c == opt: row += f"{opt}" else: row += f"{opt}" row += "
" return row # Section 1: Experience Metrics (Above) metrics_html = "" if not is_stars_only: metrics_html += get_emoji_row("Recommend Product", emojis.get("fb-recommend"), ["Unlikely", "Maybe", "Likely", "Highly"]) metrics_html += get_emoji_row("Security Assessment", emojis.get("fb-security")) metrics_html += get_emoji_row("Integration Readiness", emojis.get("fb-integration")) metrics_html += get_emoji_row("Ease of Use", emojis.get("fb-ease")) # Section 2: Priorities (Below) priority_section = "" if has_priorities: priority_section = "

Feature Prioritization:

" for p in priorities: priority_section += f"
{p}
" # Categorization & Feedback Content categorization_html = "" if not is_stars_only: categorization_html = f"""
Primary Use Case{usecase}
Feedback Category{fb_type}
""" detailed_feedback_html = "" if has_text: detailed_feedback_html = f"""

Detailed Word Feedback

{details}
""" # Header with Rating header_rating = f"""

UrbanFlow Intelligence

{'★' * rating}{'☆' * (5-rating)}

Overall Experience: {rating}/5 Stars

""" # Assemble the body final_content = header_rating if is_stars_only: final_content += "
User submitted an overall star rating with no additional details.
" else: final_content += f"""
{metrics_html}
{categorization_html}
{priority_section}
{detailed_feedback_html} """ html_body = f"""
{final_content}

Inference Engine Feedback Capture

© 2026 UrbanFlow. All rights reserved.

""" resend.Emails.send({ "from": "UrbanFlow ", "to": "support.urbanflow365@gmail.com", "subject": f"Feedback: {fb_type} - {rating}/5 Stars", "html": html_body }) print(f"[BACKEND] Feedback email successfully transmitted via Resend.") except Exception as e: print(f"[BACKEND] Resend Error: {str(e)}") @app.post("/api/feedback") async def submit_feedback(background_tasks: BackgroundTasks, request_data: dict = None): from datetime import datetime, timezone feedback = request_data or {} entries = [] if FEEDBACK_PATH.exists(): entries = json.loads(FEEDBACK_PATH.read_text()) feedback["timestamp"] = datetime.now(timezone.utc).isoformat() entries.append(feedback) FEEDBACK_PATH.write_text(json.dumps(entries, indent=2)) # Trigger Email via Resend if API key is present resend_key = os.getenv("RESEND_API_KEY") if resend_key: background_tasks.add_task(send_feedback_email, resend_key, feedback) print(f"[BACKEND] Email task queued for support.urbanflow365@gmail.com") else: print(f"[BACKEND] Resend skipped: RESEND_API_KEY secret not found.") return {"status": "ok"} @app.websocket("/ws/run") async def ws_run(ws: WebSocket): await ws.accept() data = json.loads(await ws.receive_text()) video_id = data["video_id"] line = data["line"] cfg = data["config"] save_annotated = data.get("annotated_video", False) annotated_options = data.get("annotated_options", {"bbox": True, "track_id": True, "spatial": True}) report_format = data.get("report_format", "png") path = videos.get(video_id) if not path: await ws.send_text(json.dumps({"error": f"Unknown video_id: {video_id}"})) await ws.close() return loop = asyncio.get_running_loop() queue = asyncio.Queue() def on_frame(update): loop.call_soon_threadsafe(queue.put_nowait, update) task = loop.run_in_executor( None, run, model, path, line, cfg, on_frame, save_annotated, annotated_options ) try: while True: done = task.done() while not queue.empty(): update = queue.get_nowait() await ws.send_text(json.dumps(update)) if done: break await asyncio.sleep(0.05) result = task.result() # re-raises any exception from the engine result["report_format"] = report_format result["video_path"] = path result["video_meta"] = { "filename": video_info.get(video_id, "unknown"), "fps": cfg.get("video_fps", 0), "frames": cfg.get("frames", 0), "duration": cfg.get("duration", 0), "resolution": cfg.get("resolution", [0, 0]), "pixels": cfg.get("pixels", 0), } result["engine_config"] = cfg result["export_json"] = data.get("export_json", False) result["export_csv"] = data.get("export_csv", False) run_results[video_id] = result await ws.send_text(json.dumps({ "done": True, "video_id": video_id, "processing_time": result["processing_time"], "actual_fps": result["actual_fps"], "speed_vs_realtime": result["speed_vs_realtime"], "pcu": result.get("pcu", {}), })) await ws.close() except Exception as e: import traceback err_msg = traceback.format_exc() print(f"[BACKEND] Engine error for {video_id}:\n{err_msg}") try: await ws.send_text(json.dumps({"error": str(e), "detail": err_msg})) await ws.close() except Exception: pass from fastapi.staticfiles import StaticFiles class SmartCacheStaticFiles(StaticFiles): def is_not_modified(self, response_headers, request_headers) -> bool: return False async def get_response(self, path: str, scope): resp = await super().get_response(path, scope) if path.endswith(".html") or path in ("", "/"): resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" else: resp.headers["Cache-Control"] = "public, max-age=3600" return resp app.mount("/", SmartCacheStaticFiles(directory=str(FRONTEND)), name="frontend") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)