import asyncio import hashlib import time import traceback from typing import Optional from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends from src.core.config import ( DEFAULT_PINECONE_KEY, IDX_FACES, IDX_OBJECTS, IDX_FACES_ARCFACE, IDX_FACES_ADAFACE, USE_SPLIT_FACE_INDEXES, USE_CLUSTER_AWARE_SEARCH, ) from src.core.security import get_verified_keys from src.services.db_client import ( merge_face_results, merge_object_results, pinecone_pool, search_faces, search_faces_split, search_objects, ensure_indexes, ) from src.core.logging import log from src.common.utils import face_ui_score, get_ip, is_default_key, to_list router = APIRouter() @router.post("/api/search") async def search_database( request: Request, file: UploadFile = File(...), detect_faces: bool = Form(True), user_id: str = Form(""), keys: dict = Depends(get_verified_keys), ): ip = get_ip(request) start = time.perf_counter() mode = "guest" if is_default_key(keys["pinecone_key"], DEFAULT_PINECONE_KEY) else "personal" log("INFO", "search.start", user_id=user_id or "anonymous", ip=ip, mode=mode, filename=file.filename, detect_faces=detect_faces) try: file_bytes = await file.read() ai_manager = request.app.state.ai sem = request.app.state.ai_semaphore # Run query inference async with sem: vectors = await ai_manager.process_image_bytes_async( file_bytes, detect_faces=detect_faces ) inference_ms = round((time.perf_counter() - start) * 1000) face_vectors = [v for v in vectors if v["type"] == "face"] object_vectors = [v for v in vectors if v["type"] == "object"] log("INFO", "search.inference_done", user_id=user_id or "anonymous", ip=ip, mode=mode, face_vecs=len(face_vectors), obj_vecs=len(object_vectors), inference_ms=inference_ms) pc = pinecone_pool.get(keys["pinecone_key"]) # Stable opaque user identity derived from the Pinecone key — matches # what clustering.py writes to Supabase so cluster lookups work. cluster_uid = hashlib.sha256(keys["pinecone_key"].encode()).hexdigest()[:16] # Auto-create indexes if missing. Self-heals the case where user # hasn't triggered verify-keys yet. try: created = await asyncio.to_thread(ensure_indexes, pc) if created: log("INFO", "search.indexes_auto_created", user_id=user_id or "anonymous", ip=ip, created=created) await asyncio.sleep(8) except Exception as e: log("ERROR", "search.ensure_indexes_failed", user_id=user_id or "anonymous", ip=ip, error=str(e)) idx_obj = pc.Index(IDX_OBJECTS) if USE_SPLIT_FACE_INDEXES: idx_arcface = pc.Index(IDX_FACES_ARCFACE) idx_adaface = pc.Index(IDX_FACES_ADAFACE) idx_face_legacy = None else: idx_face_legacy = pc.Index(IDX_FACES) idx_arcface = None idx_adaface = None if detect_faces and face_vectors: return await _run_face_search( face_vectors, object_vectors, idx_arcface, idx_adaface, idx_face_legacy, idx_obj, start, user_id, ip, mode, pc=pc, cluster_uid=cluster_uid, ) return await _run_object_search( object_vectors, idx_obj, start, user_id, ip, mode ) except HTTPException: raise except Exception as e: log("ERROR", "search.error", user_id=user_id or "anonymous", ip=ip, mode=mode, error=str(e), traceback=traceback.format_exc()[-800:]) raise HTTPException(500, str(e)) async def _query_face_split(fv, idx_arcface, idx_adaface, pc=None, cluster_uid=None): """Parallel query to ArcFace + AdaFace indexes, then fuse. When USE_CLUSTER_AWARE_SEARCH is on, expands results to include every image in the matched person clusters for near-100% recall.""" arcface_vec = to_list(fv["arcface_vector"]) adaface_vec = to_list(fv.get("adaface_vector")) if fv.get("has_adaface") else None try: image_map = await asyncio.to_thread( search_faces_split, idx_arcface, idx_adaface, arcface_vec, adaface_vec, ) except Exception as e: if "404" in str(e): raise HTTPException( 404, "Face indexes not found. Go to Settings → Verify & Save to create them." ) raise # Expand clusters for matches with fused_score >= 0.35 (more inclusive). # Most same-person matches score above 0.35; this ensures complete photo galleries. # Lowered from 0.50 to catch borderline cases while still rejecting imposters. CLUSTER_EXPAND_MIN_SCORE = 0.35 high_confidence = { url: d for url, d in image_map.items() if d.get("fused_score", 0.0) >= CLUSTER_EXPAND_MIN_SCORE } if USE_CLUSTER_AWARE_SEARCH and high_confidence and pc is not None and cluster_uid: from src.services.clustering import search_cluster_aware image_map = await search_cluster_aware(pc, high_confidence, cluster_uid) return _format_face_group(fv, image_map, scoring="fused") async def _query_face_legacy(fv, idx_face): """Legacy single-index query for pre-Phase-2 data.""" vec = to_list(fv["vector"]) det_score = fv.get("det_score", 1.0) try: image_map = await asyncio.to_thread(search_faces, idx_face, vec, det_score) except Exception as e: if "404" in str(e): raise HTTPException(404, "Pinecone index not found.") raise return _format_face_group(fv, image_map, scoring="legacy") def _format_face_group(fv, image_map, scoring: str): """Shape the response the same way regardless of scoring backend.""" matches = [] for url, d in image_map.items(): if scoring == "fused": display_score = face_ui_score(d["fused_score"], mode="fused") raw_score = round(d["fused_score"], 4) else: display_score = face_ui_score(d["raw_score"], mode="legacy") raw_score = round(d["raw_score"], 4) matches.append({ "url": url, "score": display_score, "raw_score": raw_score, "arcface_score": round(d.get("arcface_score", 0), 4), "adaface_score": round(d.get("adaface_score", 0), 4), "face_crop": d["face_crop"], "folder": d["folder"], "caption": "👤 Verified Identity", }) matches.sort(key=lambda x: x["score"], reverse=True) return { "query_face_idx": fv.get("face_idx", 0), "query_face_crop": fv.get("face_crop", ""), "query_bbox": fv.get("bbox", []), "det_score": fv.get("det_score", 1.0), "face_width_px": fv.get("face_width_px", 0), "matches": matches, } async def _run_face_search( face_vectors, object_vectors, idx_arcface, idx_adaface, idx_face_legacy, idx_obj, start, user_id, ip, mode, pc=None, cluster_uid=None, ) -> dict: # Build face query tasks if USE_SPLIT_FACE_INDEXES: face_tasks = [ _query_face_split(fv, idx_arcface, idx_adaface, pc=pc, cluster_uid=cluster_uid) for fv in face_vectors ] else: face_tasks = [_query_face_legacy(fv, idx_face_legacy) for fv in face_vectors] # Object queries run in parallel with face queries async def _query_obj_single(ov): vec = to_list(ov["vector"]) try: return await asyncio.to_thread(search_objects, idx_obj, vec) except Exception as e: if "404" in str(e): raise HTTPException(404, "Pinecone index not found.") raise obj_tasks = [_query_obj_single(ov) for ov in object_vectors] all_results = await asyncio.gather(*face_tasks, *obj_tasks) raw_groups = list(all_results[:len(face_tasks)]) obj_nested = list(all_results[len(face_tasks):]) merged_face = merge_face_results(raw_groups) merged_objects = merge_object_results(obj_nested) face_groups = [g for g in raw_groups if g.get("matches")] duration_ms = round((time.perf_counter() - start) * 1000) log("INFO", "search.complete", user_id=user_id or "anonymous", ip=ip, mode=mode, lanes=["face", "object"], face_groups=len(face_groups), face_results=len(merged_face), object_results=len(merged_objects), duration_ms=duration_ms, index_mode="split" if USE_SPLIT_FACE_INDEXES else "legacy") return { "mode": "face", "face_groups": face_groups, "results": merged_face, "object_results": merged_objects, } async def _run_object_search(object_vectors, idx_obj, start, user_id, ip, mode) -> dict: if not object_vectors: return {"mode": "object", "results": [], "face_groups": []} async def _query_obj(ov): vec = to_list(ov["vector"]) try: return await asyncio.to_thread(search_objects, idx_obj, vec) except Exception as e: if "404" in str(e): raise HTTPException(404, "Pinecone index not found.") raise nested = await asyncio.gather(*[_query_obj(ov) for ov in object_vectors]) final = merge_object_results(nested) duration_ms = round((time.perf_counter() - start) * 1000) log("INFO", "search.complete", user_id=user_id or "anonymous", ip=ip, mode=mode, lanes=["object"], results=len(final), duration_ms=duration_ms) return {"mode": "object", "results": final, "face_groups": []} @router.post("/api/search-by-face") async def search_by_face( request: Request, front: UploadFile = File(...), left: Optional[UploadFile] = File(None), right: Optional[UploadFile] = File(None), user_id: str = Form(""), keys: dict = Depends(get_verified_keys), ): """ Multi-angle face search: accepts 1-3 face images, fuses embeddings server-side, performs single Pinecone query. 3x faster + lower quota usage vs 3 sequential queries. """ import numpy as np ip = get_ip(request) start = time.perf_counter() mode = "guest" if is_default_key(keys["pinecone_key"], DEFAULT_PINECONE_KEY) else "personal" log("INFO", "search.search_by_face.start", user_id=user_id or "anonymous", ip=ip, mode=mode) try: ai_manager = request.app.state.ai sem = request.app.state.ai_semaphore log("DEBUG", "search.search_by_face.received_files", user_id=user_id or "anonymous", ip=ip, front=bool(front), left=bool(left), right=bool(right)) # Read all image bytes in parallel images = {} for name, file in [("front", front), ("left", left), ("right", right)]: if file: file_bytes = await file.read() images[name] = file_bytes log("DEBUG", "search.search_by_face.file_read", user_id=user_id or "anonymous", ip=ip, angle=name, size_bytes=len(file_bytes)) if not images: log("ERROR", "search.search_by_face.no_images", user_id=user_id or "anonymous", ip=ip) raise HTTPException(400, "At least front image required") # Process all images in parallel async def process_img(name, data): async with sem: return name, await ai_manager.process_image_bytes_async( data, detect_faces=True ) results = await asyncio.gather( *[process_img(name, data) for name, data in images.items()], return_exceptions=True ) # Extract face vectors from successful results face_vectors_by_angle = {} for result in results: if isinstance(result, Exception): log("WARNING", "search.search_by_face.process_error", user_id=user_id or "anonymous", ip=ip, error=str(result), traceback=traceback.format_exc()[-500:]) continue name, vectors = result face_vecs = [v for v in vectors if v["type"] == "face"] if face_vecs: face_vectors_by_angle[name] = face_vecs[0] log("DEBUG", "search.search_by_face.face_detected", user_id=user_id or "anonymous", ip=ip, angle=name, det_score=face_vecs[0].get("det_score", 0)) else: log("WARNING", "search.search_by_face.no_face_in_angle", user_id=user_id or "anonymous", ip=ip, angle=name, vectors_count=len(vectors) if vectors else 0) if not face_vectors_by_angle: log("ERROR", "search.search_by_face.no_faces_detected", user_id=user_id or "anonymous", ip=ip) raise HTTPException(400, "No face detected in provided images") # Get front face crop for results display (use if available, fallback to any angle) front_face_crop = ( face_vectors_by_angle.get("front", {}).get("face_crop", "") or next((v.get("face_crop", "") for v in face_vectors_by_angle.values() if v.get("face_crop")), "") ) # Fuse embeddings: front weighted higher weights = {"front": 0.5, "left": 0.25, "right": 0.25} arcface_vectors = [] adaface_vectors = [] det_scores = [] for angle, vec in face_vectors_by_angle.items(): w = weights.get(angle, 0) if w > 0: arcface_vectors.append(np.array(to_list(vec["arcface_vector"])) * w) det_scores.append(vec.get("det_score", 1.0)) if vec.get("has_adaface") and vec.get("adaface_vector") is not None: adaface_vectors.append(np.array(to_list(vec["adaface_vector"])) * w) if not arcface_vectors: raise HTTPException(400, "Could not fuse face embeddings") # Fuse and normalize fused_arcface = np.sum(arcface_vectors, axis=0) fused_arcface = fused_arcface / (np.linalg.norm(fused_arcface) + 1e-7) fused_adaface = None has_adaface = False if adaface_vectors and len(adaface_vectors) > 0: fused_adaface = np.sum(adaface_vectors, axis=0) fused_adaface = fused_adaface / (np.linalg.norm(fused_adaface) + 1e-7) has_adaface = True # Build synthetic face vector dict for query (include front face crop for UI display) fv = { "face_idx": 0, "det_score": float(np.mean(det_scores)), "arcface_vector": fused_arcface.tolist(), "has_adaface": has_adaface, "adaface_vector": fused_adaface.tolist() if has_adaface else None, "bbox": [0, 0, 0, 0], "face_width_px": 0, "face_crop": front_face_crop, } inference_ms = round((time.perf_counter() - start) * 1000) log("INFO", "search.search_by_face.fused", user_id=user_id or "anonymous", ip=ip, angles=list(face_vectors_by_angle.keys()), inference_ms=inference_ms) pc = pinecone_pool.get(keys["pinecone_key"]) cluster_uid = hashlib.sha256(keys["pinecone_key"].encode()).hexdigest()[:16] # Ensure indexes exist try: created = await asyncio.to_thread(ensure_indexes, pc) if created: log("INFO", "search.indexes_auto_created", user_id=user_id or "anonymous", ip=ip, created=created) await asyncio.sleep(8) except Exception as e: log("ERROR", "search.ensure_indexes_failed", user_id=user_id or "anonymous", ip=ip, error=str(e)) # Setup indexes if USE_SPLIT_FACE_INDEXES: idx_arcface = pc.Index(IDX_FACES_ARCFACE) idx_adaface = pc.Index(IDX_FACES_ADAFACE) idx_face_legacy = None else: idx_face_legacy = pc.Index(IDX_FACES) idx_arcface = None idx_adaface = None # Query with fused vector if USE_SPLIT_FACE_INDEXES: face_group = await _query_face_split(fv, idx_arcface, idx_adaface, pc=pc, cluster_uid=cluster_uid) else: face_group = await _query_face_legacy(fv, idx_face_legacy) duration_ms = round((time.perf_counter() - start) * 1000) log("INFO", "search.search_by_face.complete", user_id=user_id or "anonymous", ip=ip, results=len(face_group.get("matches", [])), duration_ms=duration_ms) return { "mode": "face", "face_groups": [face_group] if face_group.get("matches") else [], "results": [], "object_results": [], } except HTTPException: raise except Exception as e: log("ERROR", "search.search_by_face.error", user_id=user_id or "anonymous", ip=ip, mode=mode, error=str(e), traceback=traceback.format_exc()[-800:]) raise HTTPException(500, str(e))