visual-search-api / src /services /db_client.py
AdarshDRC's picture
fix: Resolving backend
29bfc1f
import time
from typing import Any, Dict, List
import cloudinary
import cloudinary.uploader
import cloudinary.api
from pinecone import Pinecone, ServerlessSpec
from src.core.config import (
IDX_FACES, IDX_OBJECTS,
IDX_FACES_ARCFACE, IDX_FACES_ADAFACE,
USE_SPLIT_FACE_INDEXES,
ARCFACE_WEIGHT, ADAFACE_WEIGHT,
FACE_MATCH_THRESHOLD, FUSED_MATCH_THRESHOLD, ARCFACE_SOLO_THRESHOLD,
FACE_SEARCH_TOP_K, OBJECT_SEARCH_TOP_K,
FACE_RESULTS_PER_QUERY_CAP,
FACE_DIM, ADAFACE_DIM, FUSED_FACE_DIM,
FACE_BLUR_THRESHOLD,
)
# ──────────────────────────────────────────────────────────────
# Pinecone client pool
# ──────────────────────────────────────────────────────────────
class PineconePool:
def __init__(self):
self._clients = {}
def get(self, api_key: str) -> Pinecone:
if api_key not in self._clients:
self._clients[api_key] = Pinecone(api_key=api_key)
return self._clients[api_key]
pinecone_pool = PineconePool()
# ──────────────────────────────────────────────────────────────
# Cloudinary helpers (unchanged from Phase 1)
# ──────────────────────────────────────────────────────────────
def _set_cld_config(creds: dict):
cloudinary.config(
cloud_name=creds.get("cloud_name"),
api_key=creds.get("api_key"),
api_secret=creds.get("api_secret"),
secure=True,
)
def cld_ping(creds: dict):
_set_cld_config(creds)
cloudinary.api.ping()
def cld_upload(file_obj, folder: str, creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.uploader.upload(file_obj, folder=folder)
def cld_root_folders(creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.api.root_folders()
def cld_list_folder_images(folder: str, creds: dict, cursor: str = None, page_size: int = 100) -> dict:
_set_cld_config(creds)
kwargs = {"type": "upload", "prefix": f"{folder}/", "max_results": page_size}
if cursor:
kwargs["next_cursor"] = cursor
return cloudinary.api.resources(**kwargs)
def cld_delete_resource(public_id: str, creds: dict):
_set_cld_config(creds)
cloudinary.uploader.destroy(public_id)
def cld_delete_folder_resources(folder: str, creds: dict):
_set_cld_config(creds)
cloudinary.api.delete_resources_by_prefix(f"{folder}/")
def cld_remove_folder(folder: str, creds: dict):
_set_cld_config(creds)
try:
cloudinary.api.delete_folder(folder)
except Exception:
pass
def cld_delete_all_paginated(creds: dict) -> int:
_set_cld_config(creds)
deleted = 0
cursor = None
while True:
kwargs = {"type": "upload", "max_results": 500}
if cursor:
kwargs["next_cursor"] = cursor
res = cloudinary.api.resources(**kwargs)
resources = res.get("resources", [])
if not resources:
break
pids = [r["public_id"] for r in resources]
cloudinary.api.delete_resources(pids)
deleted += len(pids)
cursor = res.get("next_cursor")
if not cursor:
break
return deleted
# ──────────────────────────────────────────────────────────────
# Index management
# ──────────────────────────────────────────────────────────────
def ensure_indexes(pc: Pinecone) -> List[str]:
"""
Ensures all required indexes exist.
- Objects index: 1536d (unchanged)
- Legacy faces index: 1024d (kept for backward compat)
- New split indexes: 512d each (ArcFace + AdaFace separately)
"""
created = []
existing = {idx.name for idx in pc.list_indexes()}
index_specs = [
(IDX_OBJECTS, 1536),
(IDX_FACES, FUSED_FACE_DIM), # legacy — only created on first run if missing
]
if USE_SPLIT_FACE_INDEXES:
index_specs.extend([
(IDX_FACES_ARCFACE, FACE_DIM),
(IDX_FACES_ADAFACE, ADAFACE_DIM),
])
for name, dim in index_specs:
if name not in existing:
pc.create_index(
name=name,
dimension=dim,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
created.append(name)
return created
def delete_and_recreate_indexes(pc: Pinecone):
"""Used by /api/reset-database. Now also resets split indexes."""
existing = {idx.name for idx in pc.list_indexes()}
targets = [IDX_FACES, IDX_OBJECTS]
if USE_SPLIT_FACE_INDEXES:
targets.extend([IDX_FACES_ARCFACE, IDX_FACES_ADAFACE])
for name in targets:
if name in existing:
pc.delete_index(name)
time.sleep(5)
ensure_indexes(pc)
# ──────────────────────────────────────────────────────────────
# LEGACY face search (for backward compat / fallback)
# ──────────────────────────────────────────────────────────────
def search_faces(idx, vec: List[float], det_score: float, filter_dict: dict = None) -> Dict[str, Any]:
query_kwargs = {"vector": vec, "top_k": FACE_SEARCH_TOP_K, "include_metadata": True}
if filter_dict:
query_kwargs["filter"] = filter_dict
res = idx.query(**query_kwargs)
image_map = {}
LEGACY_THRESHOLD = 0.45 # on old fused 1024-d vector
for match in res.get("matches", []):
raw_score = match.get("score", 0)
if raw_score < LEGACY_THRESHOLD:
continue
meta = match.get("metadata", {})
url = meta.get("url")
if not url:
continue
if url not in image_map or image_map[url]["raw_score"] < raw_score:
image_map[url] = {
"raw_score": raw_score,
"face_crop": meta.get("face_crop", ""),
"folder": meta.get("folder", "uncategorized"),
}
return image_map
# ──────────────────────────────────────────────────────────────
# PHASE 2: Split-index face search with score fusion
# ──────────────────────────────────────────────────────────────
def search_faces_split(
idx_arcface, idx_adaface,
arcface_vec: List[float], adaface_vec: List[float],
filter_dict: dict = None,
) -> Dict[str, Any]:
"""
Queries BOTH face indexes, fuses scores per vector_id, returns a map
keyed by url with the best fused score across all query augmentations.
Score fusion formula:
fused_score = ARCFACE_WEIGHT * arcface_cos + ADAFACE_WEIGHT * adaface_cos
When a vector exists in only one index (e.g. AdaFace failed on upload),
we scale the single-index score by its weight + max possible from the
other side (treat missing as average of its distribution = ~0.15).
"""
query_kwargs_base = {"top_k": FACE_SEARCH_TOP_K, "include_metadata": True}
if filter_dict:
query_kwargs_base["filter"] = filter_dict
# Query both indexes in parallel (caller uses asyncio.gather)
arc_res = idx_arcface.query(vector=arcface_vec, **query_kwargs_base)
# Only query AdaFace if we have a valid vector (not all zeros)
has_ada = adaface_vec is not None and any(abs(x) > 1e-6 for x in adaface_vec)
if has_ada:
ada_res = idx_adaface.query(vector=adaface_vec, **query_kwargs_base)
else:
ada_res = {"matches": []}
# Index AdaFace results by vector_id
ada_by_id = {
m["id"]: m.get("score", 0.0)
for m in ada_res.get("matches", [])
}
# Index AdaFace metadata by vector_id (in case a vector_id is only in AdaFace)
ada_meta_by_id = {
m["id"]: m.get("metadata", {})
for m in ada_res.get("matches", [])
}
image_map: Dict[str, Any] = {}
seen_vector_ids = set()
# ── Pass 1: ArcFace matches (the primary signal) ─────────────
for match in arc_res.get("matches", []):
vid = match["id"]
seen_vector_ids.add(vid)
arc_score = match.get("score", 0.0)
# Hard floor: if ArcFace says no, it's no. This kills imposters.
if arc_score < FACE_MATCH_THRESHOLD:
continue
ada_score = ada_by_id.get(vid, None)
if ada_score is None:
# No AdaFace confirmation — apply stricter solo threshold.
if arc_score < ARCFACE_SOLO_THRESHOLD:
continue
fused = arc_score
else:
fused = ARCFACE_WEIGHT * arc_score + ADAFACE_WEIGHT * ada_score
if fused < FUSED_MATCH_THRESHOLD:
continue
meta = match.get("metadata", {})
url = meta.get("url")
if not url:
continue
if meta.get("blur_score", 100.0) < FACE_BLUR_THRESHOLD:
continue
existing = image_map.get(url)
if not existing or existing["fused_score"] < fused:
image_map[url] = {
"fused_score": fused,
"arcface_score": arc_score,
"adaface_score": ada_score if ada_score is not None else 0.0,
"raw_score": arc_score, # for UI back-compat
"face_crop": meta.get("face_crop", ""),
"folder": meta.get("folder", "uncategorized"),
"vector_id": vid,
}
# Cap at most N results per query face
if len(image_map) > FACE_RESULTS_PER_QUERY_CAP:
top = sorted(
image_map.items(),
key=lambda kv: kv[1]["fused_score"],
reverse=True,
)[:FACE_RESULTS_PER_QUERY_CAP]
image_map = dict(top)
return image_map
# ──────────────────────────────────────────────────────────────
# Object search (unchanged)
# ──────────────────────────────────────────────────────────────
def search_objects(idx, vec: List[float]) -> List[Dict[str, Any]]:
res = idx.query(vector=vec, top_k=OBJECT_SEARCH_TOP_K, include_metadata=True)
results = []
for match in res.get("matches", []):
meta = match.get("metadata", {})
results.append({
"url": meta.get("url", ""),
"score": round(match.get("score", 0), 4),
"raw_score": match.get("score", 0),
"folder": meta.get("folder", "uncategorized"),
})
return results
# ──────────────────────────────────────────────────────────────
# Result merging
# ──────────────────────────────────────────────────────────────
def merge_face_results(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Dedupe across multiple query faces (or augmentations), keep best score per URL."""
merged = {}
for group in groups:
for match in group.get("matches", []):
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True)
def merge_object_results(nested_results: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
merged = {}
for res_list in nested_results:
for match in res_list:
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True)