import asyncio import time import traceback from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends from src.core.config import DEFAULT_PINECONE_KEY, IDX_FACES, IDX_OBJECTS from src.core.security import get_verified_keys from src.services.db_client import ( merge_face_results, merge_object_results, pinecone_pool, search_faces, search_objects, ) from src.core.logging import log from src.common.utils import face_ui_score, get_ip, is_default_key, to_list router = APIRouter() @router.post("/api/search") async def search_database( request: Request, file: UploadFile = File(...), detect_faces: bool = Form(True), user_id: str = Form(""), keys: dict = Depends(get_verified_keys) ): ip = get_ip(request) start = time.perf_counter() mode = "guest" if is_default_key(keys["pinecone_key"], DEFAULT_PINECONE_KEY) else "personal" log("INFO", "search.start", user_id=user_id or "anonymous", ip=ip, mode=mode, filename=file.filename, detect_faces=detect_faces) try: file_bytes = await file.read() ai_manager = request.app.state.ai sem = request.app.state.ai_semaphore async with sem: vectors = await ai_manager.process_image_bytes_async(file_bytes, detect_faces=detect_faces) inference_ms = round((time.perf_counter() - start) * 1000) face_vectors = [v for v in vectors if v["type"] == "face"] object_vectors = [v for v in vectors if v["type"] == "object"] lanes_used = list({v["type"] for v in vectors}) log("INFO", "search.inference_done", user_id=user_id or "anonymous", ip=ip, mode=mode, face_vecs=len(face_vectors), obj_vecs=len(object_vectors), inference_ms=inference_ms) pc = pinecone_pool.get(keys["pinecone_key"]) idx_obj = pc.Index(IDX_OBJECTS) idx_face = pc.Index(IDX_FACES) if detect_faces and face_vectors: return await _run_face_search(face_vectors, object_vectors, idx_face, idx_obj, start, user_id, ip, mode, lanes_used) else: return await _run_object_search(object_vectors, idx_obj, start, user_id, ip, mode, lanes_used) except HTTPException: raise except Exception as e: log("ERROR", "search.error", user_id=user_id or "anonymous", ip=ip, mode=mode, error=str(e), traceback=traceback.format_exc()[-800:]) raise HTTPException(500, str(e)) async def _run_face_search(face_vectors, object_vectors, idx_face, idx_obj, start, user_id, ip, mode, lanes_used) -> dict: async def _query_face(fv: dict) -> dict: vec = to_list(fv["vector"]) det_score = fv.get("det_score", 1.0) try: image_map = await asyncio.to_thread(search_faces, idx_face, vec, det_score) except Exception as e: if "404" in str(e): raise HTTPException(404, "Pinecone index not found. Go to Settings → Verify & Save.") raise return { "query_face_idx": fv.get("face_idx", 0), "query_face_crop": fv.get("face_crop", ""), "query_bbox": fv.get("bbox", []), "det_score": det_score, "face_width_px": fv.get("face_width_px", 0), "matches": sorted( [ { "url": url, "score": face_ui_score(d["raw_score"]), "raw_score": round(d["raw_score"], 4), "face_crop": d["face_crop"], "folder": d["folder"], "caption": "👤 Verified Identity", } for url, d in image_map.items() ], key=lambda x: x["score"], reverse=True, )[:50], } async def _query_obj_single(ov: dict) -> list: vec = to_list(ov["vector"]) try: return await asyncio.to_thread(search_objects, idx_obj, vec) except Exception as e: if "404" in str(e): raise HTTPException(404, "Pinecone index not found.") raise face_tasks = [_query_face(fv) for fv in face_vectors] obj_tasks = [_query_obj_single(ov) for ov in object_vectors] all_results = await asyncio.gather(*face_tasks, *obj_tasks) raw_groups = list(all_results[:len(face_tasks)]) obj_nested = list(all_results[len(face_tasks):]) merged_face = merge_face_results(raw_groups) merged_objects = merge_object_results(obj_nested) face_groups = [g for g in raw_groups if g.get("matches")] duration_ms = round((time.perf_counter() - start) * 1000) log("INFO", "search.complete", user_id=user_id or "anonymous", ip=ip, mode=mode, lanes=["face", "object"], face_groups=len(face_groups), face_results=len(merged_face), object_results=len(merged_objects), duration_ms=duration_ms) return { "mode": "face", "face_groups": face_groups, "results": merged_face, "object_results": merged_objects, } async def _run_object_search(object_vectors, idx_obj, start, user_id, ip, mode, lanes_used) -> dict: if not object_vectors: return {"mode": "object", "results": [], "face_groups": []} async def _query_obj(ov: dict) -> list: vec = to_list(ov["vector"]) try: return await asyncio.to_thread(search_objects, idx_obj, vec) except Exception as e: if "404" in str(e): raise HTTPException(404, "Pinecone index not found.") raise nested = await asyncio.gather(*[_query_obj(ov) for ov in object_vectors]) final = merge_object_results(nested) duration_ms = round((time.perf_counter() - start) * 1000) log("INFO", "search.complete", user_id=user_id or "anonymous", ip=ip, mode=mode, lanes=lanes_used, results=len(final), duration_ms=duration_ms) return {"mode": "object", "results": final, "face_groups": []}