import os import json import uuid import asyncio import tempfile import shutil from pathlib import Path import zipfile import cv2 from fastapi import FastAPI, WebSocket, UploadFile, File, BackgroundTasks from fastapi.responses import FileResponse, Response from fastapi.staticfiles import StaticFiles import resend from model import load_model from config import get_optimal_config from engine import run from pcu import MODEL_CLASSES from visualize import generate_all BUSINESS_MAP = { "Cars": [0, 1, 2, 3, 12], "Buses": [4, 9, 10], "Two-wheelers": [7, 11], "Three-wheelers": [6], "Trucks": [5, 8], "Others": [13] } from contextlib import asynccontextmanager import numpy as np BASE = Path(__file__).parent.parent FRONTEND = BASE / "frontend" UPLOAD_DIR = Path(tempfile.gettempdir()) / "funky_uploads" UPLOAD_DIR.mkdir(exist_ok=True) REPORT_DIR = Path(tempfile.gettempdir()) / "funky_reports" REPORT_DIR.mkdir(exist_ok=True) videos = {} video_info = {} run_results = {} model = None @asynccontextmanager async def lifespan(app: FastAPI): global model model = load_model() # Warm up: run a dummy inference so OpenVINO compiles its graph now, # not on the first real user request try: dummy_img = np.zeros((736, 736, 3), dtype=np.uint8) model([dummy_img, dummy_img], verbose=False) # list of 2 imgs triggers batch=2 print("[BACKEND] Model warm-up complete.") except Exception as e: print(f"[BACKEND] Warm-up skipped: {e}") yield # Shutdown: nothing to clean up app = FastAPI(lifespan=lifespan) @app.get("/") def index(): return FileResponse(FRONTEND / "initial.html") @app.post("/upload") async def upload(file: UploadFile = File(...)): video_id = str(uuid.uuid4())[:8] path = UPLOAD_DIR / f"{video_id}.mp4" # Clean up any previous temp uploads to avoid stale state for old_path in UPLOAD_DIR.glob("*.mp4"): try: old_path.unlink() except Exception: pass print(f"[BACKEND] Received upload request: {file.filename}") try: with open(path, "wb") as f: shutil.copyfileobj(file.file, f) file_size = os.path.getsize(path) print(f"[BACKEND] Successfully stored: {path} ({file_size} bytes)") videos[video_id] = str(path) video_info[video_id] = file.filename return {"video_id": video_id} except Exception as e: print(f"[BACKEND] Upload failed: {str(e)}") return Response(content=str(e), status_code=500) @app.get("/config/{video_id}") def config_endpoint(video_id: str): path = videos.get(video_id) cfg = get_optimal_config(path) return cfg @app.get("/first-frame/{video_id}") def first_frame(video_id: str): path = videos.get(video_id) cap = cv2.VideoCapture(path) ret, frame = cap.read() cap.release() _, buf = cv2.imencode(".jpg", frame) return Response(content=buf.tobytes(), media_type="image/jpeg") @app.get("/constants") def constants(): return {"classes": MODEL_CLASSES, "business_map": BUSINESS_MAP} @app.post("/reports/{video_id}") def generate_reports(video_id: str): data = run_results.get(video_id) if not data: return {"error": "no results", "files": []} out_dir = str(REPORT_DIR / video_id) report_format = data.get("report_format", "png") files = generate_all(data, MODEL_CLASSES, out_dir, report_format) # Include annotated video if it exists annotated_src = data.get("annotated_video") if annotated_src and os.path.exists(annotated_src): dest = os.path.join(out_dir, "annotated.mp4") shutil.copy2(annotated_src, dest) # Always overwrite — never skip files.append("annotated.mp4") return {"files": files} @app.get("/reports/{video_id}/{name}") def get_report(video_id: str, name: str): path = REPORT_DIR / video_id / name if not path.exists(): return Response(status_code=404) media = "image/png" if name.endswith(".pdf"): media = "application/pdf" elif name.endswith(".mp4"): media = "video/mp4" elif name.endswith(".json"): media = "application/json" elif name.endswith(".csv"): media = "text/csv" return FileResponse(str(path), media_type=media) @app.get("/bundle/{video_id}") def download_all_reports(video_id: str): print(f"[BACKEND] ZIP request for {video_id}") base_path = REPORT_DIR / video_id if not base_path.exists(): print(f"[BACKEND] Error: {base_path} not found") return Response(content=f"Report directory not found for {video_id}", status_code=404) try: zip_filename = f"bundle_{video_id}.zip" zip_path = REPORT_DIR / zip_filename if zip_path.exists(): zip_path.unlink() print(f"[BACKEND] Creating ZIP: {zip_path}") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(base_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, base_path) zipf.write(file_path, arcname) if not zip_path.exists(): raise Exception("Zip file was not created") original_name = video_info.get(video_id, "UrbanFlow_Analysis") safe_name = "".join(x for x in original_name if x.isalnum() or x in "._-").rsplit(".", 1)[0] print(f"[BACKEND] Serving ZIP: {zip_path}") return FileResponse( str(zip_path), media_type="application/zip", filename=f"{safe_name}_UrbanFlow.zip" ) except Exception as e: import traceback print(f"[BACKEND] ZIP Error: {str(e)}\n{traceback.format_exc()}") return Response(content=str(e), status_code=500) FEEDBACK_PATH = Path(tempfile.gettempdir()) / "urbanflow_feedback.json" def send_feedback_email(api_key, feedback): try: resend.api_key = api_key fb_type = feedback.get('type') or 'General' rating = feedback.get('rating', 0) details = feedback.get('details') or "" usecase = feedback.get('usecase') or "Not specified" emojis = feedback.get('emojis', {}) priorities = feedback.get('priorities', []) # Check if it's stars-only (no emojis, no priorities, no text) has_emojis = any(v for v in emojis.values()) has_priorities = len(priorities) > 0 has_text = bool(details and details.strip()) is_stars_only = not (has_emojis or has_priorities or has_text) def get_emoji_row(label, choice, custom_options=None): options = custom_options or ["Poor", "Fair", "Good", "Great"] c = (choice or "").title() row = f"
Feature Prioritization:
" for p in priorities: priority_section += f"| Primary Use Case | {usecase} |
| Feedback Category | {fb_type} |
Detailed Word Feedback
Overall Experience: {rating}/5 Stars
Inference Engine Feedback Capture
© 2026 UrbanFlow. All rights reserved.