Spaces:
Running
Running
| import os | |
| import json | |
| import uuid | |
| import asyncio | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| import zipfile | |
| import cv2 | |
| from fastapi import FastAPI, WebSocket, UploadFile, File, BackgroundTasks | |
| from fastapi.responses import FileResponse, Response | |
| from fastapi.staticfiles import StaticFiles | |
| import resend | |
| from model import load_model | |
| from config import get_optimal_config | |
| from engine import run | |
| from pcu import MODEL_CLASSES | |
| from visualize import generate_all | |
| BUSINESS_MAP = { | |
| "Cars": [0, 1, 2, 3, 12], | |
| "Buses": [4, 9, 10], | |
| "Two-wheelers": [7, 11], | |
| "Three-wheelers": [6], | |
| "Trucks": [5, 8], | |
| "Others": [13] | |
| } | |
| from contextlib import asynccontextmanager | |
| import numpy as np | |
| BASE = Path(__file__).parent.parent | |
| FRONTEND = BASE / "frontend" | |
| UPLOAD_DIR = Path(tempfile.gettempdir()) / "funky_uploads" | |
| UPLOAD_DIR.mkdir(exist_ok=True) | |
| REPORT_DIR = Path(tempfile.gettempdir()) / "funky_reports" | |
| REPORT_DIR.mkdir(exist_ok=True) | |
| videos = {} | |
| video_info = {} | |
| run_results = {} | |
| model = None | |
| async def lifespan(app: FastAPI): | |
| global model | |
| model = load_model() | |
| # Warm up: run a dummy inference so OpenVINO compiles its graph now, | |
| # not on the first real user request | |
| try: | |
| dummy_img = np.zeros((736, 736, 3), dtype=np.uint8) | |
| model([dummy_img, dummy_img], verbose=False) # list of 2 imgs triggers batch=2 | |
| print("[BACKEND] Model warm-up complete.") | |
| except Exception as e: | |
| print(f"[BACKEND] Warm-up skipped: {e}") | |
| yield | |
| # Shutdown: nothing to clean up | |
| app = FastAPI(lifespan=lifespan) | |
| def index(): | |
| return FileResponse(FRONTEND / "initial.html") | |
| async def upload(file: UploadFile = File(...)): | |
| video_id = str(uuid.uuid4())[:8] | |
| path = UPLOAD_DIR / f"{video_id}.mp4" | |
| # Clean up any previous temp uploads to avoid stale state | |
| for old_path in UPLOAD_DIR.glob("*.mp4"): | |
| try: | |
| old_path.unlink() | |
| except Exception: | |
| pass | |
| print(f"[BACKEND] Received upload request: {file.filename}") | |
| try: | |
| with open(path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| file_size = os.path.getsize(path) | |
| print(f"[BACKEND] Successfully stored: {path} ({file_size} bytes)") | |
| videos[video_id] = str(path) | |
| video_info[video_id] = file.filename | |
| return {"video_id": video_id} | |
| except Exception as e: | |
| print(f"[BACKEND] Upload failed: {str(e)}") | |
| return Response(content=str(e), status_code=500) | |
| def config_endpoint(video_id: str): | |
| path = videos.get(video_id) | |
| cfg = get_optimal_config(path) | |
| return cfg | |
| def first_frame(video_id: str): | |
| path = videos.get(video_id) | |
| cap = cv2.VideoCapture(path) | |
| ret, frame = cap.read() | |
| cap.release() | |
| _, buf = cv2.imencode(".jpg", frame) | |
| return Response(content=buf.tobytes(), media_type="image/jpeg") | |
| def constants(): | |
| return {"classes": MODEL_CLASSES, "business_map": BUSINESS_MAP} | |
| def generate_reports(video_id: str): | |
| data = run_results.get(video_id) | |
| if not data: | |
| return {"error": "no results", "files": []} | |
| out_dir = str(REPORT_DIR / video_id) | |
| report_format = data.get("report_format", "png") | |
| files = generate_all(data, MODEL_CLASSES, out_dir, report_format) | |
| # Include annotated video if it exists | |
| annotated_src = data.get("annotated_video") | |
| if annotated_src and os.path.exists(annotated_src): | |
| dest = os.path.join(out_dir, "annotated.mp4") | |
| shutil.copy2(annotated_src, dest) # Always overwrite — never skip | |
| files.append("annotated.mp4") | |
| return {"files": files} | |
| def get_report(video_id: str, name: str): | |
| path = REPORT_DIR / video_id / name | |
| if not path.exists(): | |
| return Response(status_code=404) | |
| media = "image/png" | |
| if name.endswith(".pdf"): | |
| media = "application/pdf" | |
| elif name.endswith(".mp4"): | |
| media = "video/mp4" | |
| elif name.endswith(".json"): | |
| media = "application/json" | |
| elif name.endswith(".csv"): | |
| media = "text/csv" | |
| return FileResponse(str(path), media_type=media) | |
| def download_all_reports(video_id: str): | |
| print(f"[BACKEND] ZIP request for {video_id}") | |
| base_path = REPORT_DIR / video_id | |
| if not base_path.exists(): | |
| print(f"[BACKEND] Error: {base_path} not found") | |
| return Response(content=f"Report directory not found for {video_id}", status_code=404) | |
| try: | |
| zip_filename = f"bundle_{video_id}.zip" | |
| zip_path = REPORT_DIR / zip_filename | |
| if zip_path.exists(): | |
| zip_path.unlink() | |
| print(f"[BACKEND] Creating ZIP: {zip_path}") | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, _, files in os.walk(base_path): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| arcname = os.path.relpath(file_path, base_path) | |
| zipf.write(file_path, arcname) | |
| if not zip_path.exists(): | |
| raise Exception("Zip file was not created") | |
| original_name = video_info.get(video_id, "UrbanFlow_Analysis") | |
| safe_name = "".join(x for x in original_name if x.isalnum() or x in "._-").rsplit(".", 1)[0] | |
| print(f"[BACKEND] Serving ZIP: {zip_path}") | |
| return FileResponse( | |
| str(zip_path), | |
| media_type="application/zip", | |
| filename=f"{safe_name}_UrbanFlow.zip" | |
| ) | |
| except Exception as e: | |
| import traceback | |
| print(f"[BACKEND] ZIP Error: {str(e)}\n{traceback.format_exc()}") | |
| return Response(content=str(e), status_code=500) | |
| FEEDBACK_PATH = Path(tempfile.gettempdir()) / "urbanflow_feedback.json" | |
| def send_feedback_email(api_key, feedback): | |
| try: | |
| resend.api_key = api_key | |
| fb_type = feedback.get('type') or 'General' | |
| rating = feedback.get('rating', 0) | |
| details = feedback.get('details') or "" | |
| usecase = feedback.get('usecase') or "Not specified" | |
| emojis = feedback.get('emojis', {}) | |
| priorities = feedback.get('priorities', []) | |
| # Check if it's stars-only (no emojis, no priorities, no text) | |
| has_emojis = any(v for v in emojis.values()) | |
| has_priorities = len(priorities) > 0 | |
| has_text = bool(details and details.strip()) | |
| is_stars_only = not (has_emojis or has_priorities or has_text) | |
| def get_emoji_row(label, choice, custom_options=None): | |
| options = custom_options or ["Poor", "Fair", "Good", "Great"] | |
| c = (choice or "").title() | |
| row = f"<div style='margin-bottom: 16px;'><span style='font-size: 10px; font-weight: 800; color: #888; text-transform: uppercase; letter-spacing: 1px;'>{label}</span><br><div style='margin-top: 6px;'>" | |
| for opt in options: | |
| if c == opt: | |
| row += f"<span style='display: inline-block; background: #c89a6c; color: #000; font-size: 10px; font-weight: bold; padding: 4px 12px; border-radius: 6px; margin-right: 8px;'>{opt}</span>" | |
| else: | |
| row += f"<span style='display: inline-block; background: #fff; color: #ccc; font-size: 10px; padding: 3px 11px; border-radius: 6px; border: 1px solid #eee; margin-right: 8px;'>{opt}</span>" | |
| row += "</div></div>" | |
| return row | |
| # Section 1: Experience Metrics (Above) | |
| metrics_html = "" | |
| if not is_stars_only: | |
| metrics_html += get_emoji_row("Recommend Product", emojis.get("fb-recommend"), ["Unlikely", "Maybe", "Likely", "Highly"]) | |
| metrics_html += get_emoji_row("Security Assessment", emojis.get("fb-security")) | |
| metrics_html += get_emoji_row("Integration Readiness", emojis.get("fb-integration")) | |
| metrics_html += get_emoji_row("Ease of Use", emojis.get("fb-ease")) | |
| # Section 2: Priorities (Below) | |
| priority_section = "" | |
| if has_priorities: | |
| priority_section = "<p style='font-size: 10px; font-weight: 800; color: #888; text-transform: uppercase; letter-spacing: 1px; margin-bottom: 10px;'>Feature Prioritization:</p>" | |
| for p in priorities: | |
| priority_section += f"<div style='background: #fff; border-left: 4px solid #c89a6c; padding: 10px 16px; font-size: 13px; font-weight: 600; margin-bottom: 8px; border-top: 1px solid #f0f0f0; border-right: 1px solid #f0f0f0; border-bottom: 1px solid #f0f0f0; border-radius: 0 8px 8px 0;'>{p}</div>" | |
| # Categorization & Feedback Content | |
| categorization_html = "" | |
| if not is_stars_only: | |
| categorization_html = f""" | |
| <div style="background: #fafafa; padding: 25px; border-radius: 16px; margin: 30px 0; border: 1px solid #f0f0f0;"> | |
| <table style="width: 100%; font-size: 13px;"> | |
| <tr><td style="color: #999; width: 40%; padding: 6px 0; text-transform: uppercase; font-size: 9px; font-weight: 800; letter-spacing: 0.5px;">Primary Use Case</td><td style="font-weight: 700; color: #333;">{usecase}</td></tr> | |
| <tr><td style="color: #999; padding: 6px 0; text-transform: uppercase; font-size: 9px; font-weight: 800; letter-spacing: 0.5px;">Feedback Category</td><td style="font-weight: 700; color: #333;">{fb_type}</td></tr> | |
| </table> | |
| </div> | |
| """ | |
| detailed_feedback_html = "" | |
| if has_text: | |
| detailed_feedback_html = f""" | |
| <div style="margin-top: 30px;"> | |
| <p style="font-size: 10px; font-weight: 800; color: #888; text-transform: uppercase; letter-spacing: 1px; margin-bottom: 12px;">Detailed Word Feedback</p> | |
| <div style="background: #fff; border: 1px solid #eee; padding: 25px; border-radius: 14px; font-size: 14px; color: #444; white-space: pre-wrap; line-height: 1.8; box-shadow: 0 2px 10px rgba(0,0,0,0.02);">{details}</div> | |
| </div> | |
| """ | |
| # Header with Rating | |
| header_rating = f""" | |
| <div style="text-align: center; margin-bottom: 40px; padding: 20px; background: linear-gradient(180deg, #fff 0%, #fafafa 100%); border-radius: 20px;"> | |
| <h2 style="color: #8b5e3c; margin: 0; font-size: 26px; font-weight: 900; letter-spacing: -1px;">UrbanFlow Intelligence</h2> | |
| <div style="margin-top: 15px; font-size: 22px; color: #c89a6c; letter-spacing: 4px;">{'★' * rating}{'☆' * (5-rating)}</div> | |
| <p style="color: #aaa; font-size: 10px; text-transform: uppercase; letter-spacing: 2px; margin-top: 10px; font-weight: 700;">Overall Experience: {rating}/5 Stars</p> | |
| </div> | |
| """ | |
| # Assemble the body | |
| final_content = header_rating | |
| if is_stars_only: | |
| final_content += "<div style='text-align: center; padding: 40px; color: #777; font-style: italic; font-size: 14px;'>User submitted an overall star rating with no additional details.</div>" | |
| else: | |
| final_content += f""" | |
| <div style="margin-bottom: 40px;">{metrics_html}</div> | |
| {categorization_html} | |
| <div style="margin-bottom: 40px;">{priority_section}</div> | |
| {detailed_feedback_html} | |
| """ | |
| html_body = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <link href="https://fonts.googleapis.com/css2?family=Montserrat:wght@400;600;700;800;900&display=swap" rel="stylesheet"> | |
| </head> | |
| <body style="margin: 0; padding: 0; background-color: #ffffff;"> | |
| <div style="font-family: 'Montserrat', sans-serif; color: #333; max-width: 600px; margin: 40px auto; padding: 0 20px;"> | |
| {final_content} | |
| <div style="margin-top: 60px; padding-top: 30px; border-top: 1px solid #eee; text-align: center;"> | |
| <p style="font-size: 10px; color: #bbb; text-transform: uppercase; letter-spacing: 2px; font-weight: 700; margin: 0;">Inference Engine Feedback Capture</p> | |
| <p style="font-size: 9px; color: #ddd; margin-top: 5px;">© 2026 UrbanFlow. All rights reserved.</p> | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| resend.Emails.send({ | |
| "from": "UrbanFlow <onboarding@resend.dev>", | |
| "to": "support.urbanflow365@gmail.com", | |
| "subject": f"Feedback: {fb_type} - {rating}/5 Stars", | |
| "html": html_body | |
| }) | |
| print(f"[BACKEND] Feedback email successfully transmitted via Resend.") | |
| except Exception as e: | |
| print(f"[BACKEND] Resend Error: {str(e)}") | |
| async def submit_feedback(background_tasks: BackgroundTasks, request_data: dict = None): | |
| from datetime import datetime, timezone | |
| feedback = request_data or {} | |
| entries = [] | |
| if FEEDBACK_PATH.exists(): | |
| entries = json.loads(FEEDBACK_PATH.read_text()) | |
| feedback["timestamp"] = datetime.now(timezone.utc).isoformat() | |
| entries.append(feedback) | |
| FEEDBACK_PATH.write_text(json.dumps(entries, indent=2)) | |
| # Trigger Email via Resend if API key is present | |
| resend_key = os.getenv("RESEND_API_KEY") | |
| if resend_key: | |
| background_tasks.add_task(send_feedback_email, resend_key, feedback) | |
| print(f"[BACKEND] Email task queued for support.urbanflow365@gmail.com") | |
| else: | |
| print(f"[BACKEND] Resend skipped: RESEND_API_KEY secret not found.") | |
| return {"status": "ok"} | |
| async def ws_run(ws: WebSocket): | |
| await ws.accept() | |
| data = json.loads(await ws.receive_text()) | |
| video_id = data["video_id"] | |
| line = data["line"] | |
| cfg = data["config"] | |
| save_annotated = data.get("annotated_video", False) | |
| annotated_options = data.get("annotated_options", {"bbox": True, "track_id": True, "spatial": True}) | |
| report_format = data.get("report_format", "png") | |
| path = videos.get(video_id) | |
| if not path: | |
| await ws.send_text(json.dumps({"error": f"Unknown video_id: {video_id}"})) | |
| await ws.close() | |
| return | |
| loop = asyncio.get_running_loop() | |
| queue = asyncio.Queue() | |
| def on_frame(update): | |
| loop.call_soon_threadsafe(queue.put_nowait, update) | |
| task = loop.run_in_executor( | |
| None, run, model, path, line, cfg, on_frame, save_annotated, annotated_options | |
| ) | |
| try: | |
| while True: | |
| done = task.done() | |
| while not queue.empty(): | |
| update = queue.get_nowait() | |
| await ws.send_text(json.dumps(update)) | |
| if done: | |
| break | |
| await asyncio.sleep(0.05) | |
| result = task.result() # re-raises any exception from the engine | |
| result["report_format"] = report_format | |
| result["video_path"] = path | |
| result["video_meta"] = { | |
| "filename": video_info.get(video_id, "unknown"), | |
| "fps": cfg.get("video_fps", 0), | |
| "frames": cfg.get("frames", 0), | |
| "duration": cfg.get("duration", 0), | |
| "resolution": cfg.get("resolution", [0, 0]), | |
| "pixels": cfg.get("pixels", 0), | |
| } | |
| result["engine_config"] = cfg | |
| result["export_json"] = data.get("export_json", False) | |
| result["export_csv"] = data.get("export_csv", False) | |
| run_results[video_id] = result | |
| await ws.send_text(json.dumps({ | |
| "done": True, | |
| "video_id": video_id, | |
| "processing_time": result["processing_time"], | |
| "actual_fps": result["actual_fps"], | |
| "speed_vs_realtime": result["speed_vs_realtime"], | |
| "pcu": result.get("pcu", {}), | |
| })) | |
| await ws.close() | |
| except Exception as e: | |
| import traceback | |
| err_msg = traceback.format_exc() | |
| print(f"[BACKEND] Engine error for {video_id}:\n{err_msg}") | |
| try: | |
| await ws.send_text(json.dumps({"error": str(e), "detail": err_msg})) | |
| await ws.close() | |
| except Exception: | |
| pass | |
| from fastapi.staticfiles import StaticFiles | |
| class SmartCacheStaticFiles(StaticFiles): | |
| def is_not_modified(self, response_headers, request_headers) -> bool: | |
| return False | |
| async def get_response(self, path: str, scope): | |
| resp = await super().get_response(path, scope) | |
| if path.endswith(".html") or path in ("", "/"): | |
| resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" | |
| else: | |
| resp.headers["Cache-Control"] = "public, max-age=3600" | |
| return resp | |
| app.mount("/", SmartCacheStaticFiles(directory=str(FRONTEND)), name="frontend") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |