Spaces:
Running
Running
| import time | |
| from typing import Any, Dict, List | |
| import cloudinary | |
| import cloudinary.uploader | |
| import cloudinary.api | |
| from pinecone import Pinecone, ServerlessSpec | |
| from src.core.config import ( | |
| IDX_FACES, IDX_OBJECTS, | |
| IDX_FACES_ARCFACE, IDX_FACES_ADAFACE, | |
| USE_SPLIT_FACE_INDEXES, | |
| ARCFACE_WEIGHT, ADAFACE_WEIGHT, | |
| FACE_MATCH_THRESHOLD, FUSED_MATCH_THRESHOLD, ARCFACE_SOLO_THRESHOLD, | |
| FACE_SEARCH_TOP_K, OBJECT_SEARCH_TOP_K, | |
| FACE_RESULTS_PER_QUERY_CAP, | |
| FACE_DIM, ADAFACE_DIM, FUSED_FACE_DIM, | |
| FACE_BLUR_THRESHOLD, | |
| ) | |
| # ────────────────────────────────────────────────────────────── | |
| # Pinecone client pool | |
| # ────────────────────────────────────────────────────────────── | |
| class PineconePool: | |
| def __init__(self): | |
| self._clients = {} | |
| def get(self, api_key: str) -> Pinecone: | |
| if api_key not in self._clients: | |
| self._clients[api_key] = Pinecone(api_key=api_key) | |
| return self._clients[api_key] | |
| pinecone_pool = PineconePool() | |
| # ────────────────────────────────────────────────────────────── | |
| # Cloudinary helpers (unchanged from Phase 1) | |
| # ────────────────────────────────────────────────────────────── | |
| def _set_cld_config(creds: dict): | |
| cloudinary.config( | |
| cloud_name=creds.get("cloud_name"), | |
| api_key=creds.get("api_key"), | |
| api_secret=creds.get("api_secret"), | |
| secure=True, | |
| ) | |
| def cld_ping(creds: dict): | |
| _set_cld_config(creds) | |
| cloudinary.api.ping() | |
| def cld_upload(file_obj, folder: str, creds: dict) -> dict: | |
| _set_cld_config(creds) | |
| return cloudinary.uploader.upload(file_obj, folder=folder) | |
| def cld_root_folders(creds: dict) -> dict: | |
| _set_cld_config(creds) | |
| return cloudinary.api.root_folders() | |
| def cld_list_folder_images(folder: str, creds: dict, cursor: str = None, page_size: int = 100) -> dict: | |
| _set_cld_config(creds) | |
| kwargs = {"type": "upload", "prefix": f"{folder}/", "max_results": page_size} | |
| if cursor: | |
| kwargs["next_cursor"] = cursor | |
| return cloudinary.api.resources(**kwargs) | |
| def cld_delete_resource(public_id: str, creds: dict): | |
| _set_cld_config(creds) | |
| cloudinary.uploader.destroy(public_id) | |
| def cld_delete_folder_resources(folder: str, creds: dict): | |
| _set_cld_config(creds) | |
| cloudinary.api.delete_resources_by_prefix(f"{folder}/") | |
| def cld_remove_folder(folder: str, creds: dict): | |
| _set_cld_config(creds) | |
| try: | |
| cloudinary.api.delete_folder(folder) | |
| except Exception: | |
| pass | |
| def cld_delete_all_paginated(creds: dict) -> int: | |
| _set_cld_config(creds) | |
| deleted = 0 | |
| cursor = None | |
| while True: | |
| kwargs = {"type": "upload", "max_results": 500} | |
| if cursor: | |
| kwargs["next_cursor"] = cursor | |
| res = cloudinary.api.resources(**kwargs) | |
| resources = res.get("resources", []) | |
| if not resources: | |
| break | |
| pids = [r["public_id"] for r in resources] | |
| cloudinary.api.delete_resources(pids) | |
| deleted += len(pids) | |
| cursor = res.get("next_cursor") | |
| if not cursor: | |
| break | |
| return deleted | |
| # ────────────────────────────────────────────────────────────── | |
| # Index management | |
| # ────────────────────────────────────────────────────────────── | |
| def ensure_indexes(pc: Pinecone) -> List[str]: | |
| """ | |
| Ensures all required indexes exist. | |
| - Objects index: 1536d (unchanged) | |
| - Legacy faces index: 1024d (kept for backward compat) | |
| - New split indexes: 512d each (ArcFace + AdaFace separately) | |
| """ | |
| created = [] | |
| existing = {idx.name for idx in pc.list_indexes()} | |
| index_specs = [ | |
| (IDX_OBJECTS, 1536), | |
| (IDX_FACES, FUSED_FACE_DIM), # legacy — only created on first run if missing | |
| ] | |
| if USE_SPLIT_FACE_INDEXES: | |
| index_specs.extend([ | |
| (IDX_FACES_ARCFACE, FACE_DIM), | |
| (IDX_FACES_ADAFACE, ADAFACE_DIM), | |
| ]) | |
| for name, dim in index_specs: | |
| if name not in existing: | |
| pc.create_index( | |
| name=name, | |
| dimension=dim, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
| ) | |
| created.append(name) | |
| return created | |
| def delete_and_recreate_indexes(pc: Pinecone): | |
| """Used by /api/reset-database. Now also resets split indexes.""" | |
| existing = {idx.name for idx in pc.list_indexes()} | |
| targets = [IDX_FACES, IDX_OBJECTS] | |
| if USE_SPLIT_FACE_INDEXES: | |
| targets.extend([IDX_FACES_ARCFACE, IDX_FACES_ADAFACE]) | |
| for name in targets: | |
| if name in existing: | |
| pc.delete_index(name) | |
| time.sleep(5) | |
| ensure_indexes(pc) | |
| # ────────────────────────────────────────────────────────────── | |
| # LEGACY face search (for backward compat / fallback) | |
| # ────────────────────────────────────────────────────────────── | |
| def search_faces(idx, vec: List[float], det_score: float, filter_dict: dict = None) -> Dict[str, Any]: | |
| query_kwargs = {"vector": vec, "top_k": FACE_SEARCH_TOP_K, "include_metadata": True} | |
| if filter_dict: | |
| query_kwargs["filter"] = filter_dict | |
| res = idx.query(**query_kwargs) | |
| image_map = {} | |
| LEGACY_THRESHOLD = 0.45 # on old fused 1024-d vector | |
| for match in res.get("matches", []): | |
| raw_score = match.get("score", 0) | |
| if raw_score < LEGACY_THRESHOLD: | |
| continue | |
| meta = match.get("metadata", {}) | |
| url = meta.get("url") | |
| if not url: | |
| continue | |
| if url not in image_map or image_map[url]["raw_score"] < raw_score: | |
| image_map[url] = { | |
| "raw_score": raw_score, | |
| "face_crop": meta.get("face_crop", ""), | |
| "folder": meta.get("folder", "uncategorized"), | |
| } | |
| return image_map | |
| # ────────────────────────────────────────────────────────────── | |
| # PHASE 2: Split-index face search with score fusion | |
| # ────────────────────────────────────────────────────────────── | |
| def search_faces_split( | |
| idx_arcface, idx_adaface, | |
| arcface_vec: List[float], adaface_vec: List[float], | |
| filter_dict: dict = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Queries BOTH face indexes, fuses scores per vector_id, returns a map | |
| keyed by url with the best fused score across all query augmentations. | |
| Score fusion formula: | |
| fused_score = ARCFACE_WEIGHT * arcface_cos + ADAFACE_WEIGHT * adaface_cos | |
| When a vector exists in only one index (e.g. AdaFace failed on upload), | |
| we scale the single-index score by its weight + max possible from the | |
| other side (treat missing as average of its distribution = ~0.15). | |
| """ | |
| query_kwargs_base = {"top_k": FACE_SEARCH_TOP_K, "include_metadata": True} | |
| if filter_dict: | |
| query_kwargs_base["filter"] = filter_dict | |
| # Query both indexes in parallel (caller uses asyncio.gather) | |
| arc_res = idx_arcface.query(vector=arcface_vec, **query_kwargs_base) | |
| # Only query AdaFace if we have a valid vector (not all zeros) | |
| has_ada = adaface_vec is not None and any(abs(x) > 1e-6 for x in adaface_vec) | |
| if has_ada: | |
| ada_res = idx_adaface.query(vector=adaface_vec, **query_kwargs_base) | |
| else: | |
| ada_res = {"matches": []} | |
| # Index AdaFace results by vector_id | |
| ada_by_id = { | |
| m["id"]: m.get("score", 0.0) | |
| for m in ada_res.get("matches", []) | |
| } | |
| # Index AdaFace metadata by vector_id (in case a vector_id is only in AdaFace) | |
| ada_meta_by_id = { | |
| m["id"]: m.get("metadata", {}) | |
| for m in ada_res.get("matches", []) | |
| } | |
| image_map: Dict[str, Any] = {} | |
| seen_vector_ids = set() | |
| # ── Pass 1: ArcFace matches (the primary signal) ───────────── | |
| for match in arc_res.get("matches", []): | |
| vid = match["id"] | |
| seen_vector_ids.add(vid) | |
| arc_score = match.get("score", 0.0) | |
| # Hard floor: if ArcFace says no, it's no. This kills imposters. | |
| if arc_score < FACE_MATCH_THRESHOLD: | |
| continue | |
| ada_score = ada_by_id.get(vid, None) | |
| if ada_score is None: | |
| # No AdaFace confirmation — apply stricter solo threshold. | |
| if arc_score < ARCFACE_SOLO_THRESHOLD: | |
| continue | |
| fused = arc_score | |
| else: | |
| fused = ARCFACE_WEIGHT * arc_score + ADAFACE_WEIGHT * ada_score | |
| if fused < FUSED_MATCH_THRESHOLD: | |
| continue | |
| meta = match.get("metadata", {}) | |
| url = meta.get("url") | |
| if not url: | |
| continue | |
| if meta.get("blur_score", 100.0) < FACE_BLUR_THRESHOLD: | |
| continue | |
| existing = image_map.get(url) | |
| if not existing or existing["fused_score"] < fused: | |
| image_map[url] = { | |
| "fused_score": fused, | |
| "arcface_score": arc_score, | |
| "adaface_score": ada_score if ada_score is not None else 0.0, | |
| "raw_score": arc_score, # for UI back-compat | |
| "face_crop": meta.get("face_crop", ""), | |
| "folder": meta.get("folder", "uncategorized"), | |
| "vector_id": vid, | |
| } | |
| # Cap at most N results per query face | |
| if len(image_map) > FACE_RESULTS_PER_QUERY_CAP: | |
| top = sorted( | |
| image_map.items(), | |
| key=lambda kv: kv[1]["fused_score"], | |
| reverse=True, | |
| )[:FACE_RESULTS_PER_QUERY_CAP] | |
| image_map = dict(top) | |
| return image_map | |
| # ────────────────────────────────────────────────────────────── | |
| # Object search (unchanged) | |
| # ────────────────────────────────────────────────────────────── | |
| def search_objects(idx, vec: List[float]) -> List[Dict[str, Any]]: | |
| res = idx.query(vector=vec, top_k=OBJECT_SEARCH_TOP_K, include_metadata=True) | |
| results = [] | |
| for match in res.get("matches", []): | |
| meta = match.get("metadata", {}) | |
| results.append({ | |
| "url": meta.get("url", ""), | |
| "score": round(match.get("score", 0), 4), | |
| "raw_score": match.get("score", 0), | |
| "folder": meta.get("folder", "uncategorized"), | |
| }) | |
| return results | |
| # ────────────────────────────────────────────────────────────── | |
| # Result merging | |
| # ────────────────────────────────────────────────────────────── | |
| def merge_face_results(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Dedupe across multiple query faces (or augmentations), keep best score per URL.""" | |
| merged = {} | |
| for group in groups: | |
| for match in group.get("matches", []): | |
| url = match["url"] | |
| if url not in merged or merged[url]["score"] < match["score"]: | |
| merged[url] = match | |
| return sorted(merged.values(), key=lambda x: x["score"], reverse=True) | |
| def merge_object_results(nested_results: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: | |
| merged = {} | |
| for res_list in nested_results: | |
| for match in res_list: | |
| url = match["url"] | |
| if url not in merged or merged[url]["score"] < match["score"]: | |
| merged[url] = match | |
| return sorted(merged.values(), key=lambda x: x["score"], reverse=True) |