visual-search-api2 / src /services /db_client.py
AdarshDRC's picture
Update src/services/db_client.py
c56ede7 verified
import time
from typing import Any, Dict, List
import cloudinary
import cloudinary.uploader
import cloudinary.api
from pinecone import Pinecone, ServerlessSpec
from src.core.config import IDX_FACES, IDX_OBJECTS
class PineconePool:
def __init__(self):
self._clients = {}
def get(self, api_key: str) -> Pinecone:
if api_key not in self._clients:
self._clients[api_key] = Pinecone(api_key=api_key)
return self._clients[api_key]
pinecone_pool = PineconePool()
def _set_cld_config(creds: dict):
cloudinary.config(
cloud_name=creds.get("cloud_name"),
api_key=creds.get("api_key"),
api_secret=creds.get("api_secret"),
secure=True
)
def cld_ping(creds: dict):
_set_cld_config(creds)
cloudinary.api.ping()
def cld_upload(file_obj, folder: str, creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.uploader.upload(file_obj, folder=folder)
def cld_root_folders(creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.api.root_folders()
def cld_list_folder_images(folder: str, creds: dict, cursor: str = None, page_size: int = 100) -> dict:
_set_cld_config(creds)
kwargs = {"type": "upload", "prefix": f"{folder}/", "max_results": page_size}
if cursor:
kwargs["next_cursor"] = cursor
return cloudinary.api.resources(**kwargs)
def cld_delete_resource(public_id: str, creds: dict):
_set_cld_config(creds)
cloudinary.uploader.destroy(public_id)
def cld_delete_folder_resources(folder: str, creds: dict):
_set_cld_config(creds)
cloudinary.api.delete_resources_by_prefix(f"{folder}/")
def cld_remove_folder(folder: str, creds: dict):
_set_cld_config(creds)
try:
cloudinary.api.delete_folder(folder)
except Exception:
pass
def cld_delete_all_paginated(creds: dict) -> int:
_set_cld_config(creds)
deleted = 0
cursor = None
while True:
kwargs = {"type": "upload", "max_results": 500}
if cursor:
kwargs["next_cursor"] = cursor
res = cloudinary.api.resources(**kwargs)
resources = res.get("resources", [])
if not resources:
break
pids = [r["public_id"] for r in resources]
cloudinary.api.delete_resources(pids)
deleted += len(pids)
cursor = res.get("next_cursor")
if not cursor:
break
return deleted
def ensure_indexes(pc: Pinecone) -> List[str]:
created = []
existing = [idx.name for idx in pc.list_indexes()]
for name in [IDX_FACES, IDX_OBJECTS]:
if name not in existing:
pc.create_index(
name=name,
dimension=1024 if name == IDX_FACES else 1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
created.append(name)
return created
def delete_and_recreate_indexes(pc: Pinecone):
existing = [idx.name for idx in pc.list_indexes()]
for name in [IDX_FACES, IDX_OBJECTS]:
if name in existing:
pc.delete_index(name)
time.sleep(5)
ensure_indexes(pc)
def search_faces(idx, vec: List[float], det_score: float) -> Dict[str, Any]:
res = idx.query(vector=vec, top_k=50, include_metadata=True)
image_map = {}
for match in res.get("matches", []):
meta = match.get("metadata", {})
url = meta.get("url")
if not url:
continue
score = match.get("score", 0)
if url not in image_map or image_map[url]["raw_score"] < score:
image_map[url] = {
"raw_score": score,
"face_crop": meta.get("face_crop", ""),
"folder": meta.get("folder", "uncategorized")
}
return image_map
import numpy as np
def search_objects(idx, vec: List[float], filter_dict: dict = None) -> List[Dict[str, Any]]:
query_kwargs = {"vector": vec, "top_k": 50, "include_metadata": True}
if filter_dict:
query_kwargs["filter"] = filter_dict
res = idx.query(**query_kwargs)
matches = res.get("matches", [])
if not matches:
return []
# ── ENTERPRISE FIX: Dynamic Gradient Analysis ──
# Extract the raw scores
scores = [m.get("score", 0) for m in matches]
# Calculate the drop-off from the absolute best match to the 5th match
if len(scores) >= 5:
top_score = scores[0]
fifth_score = scores[4]
gradient = top_score - fifth_score
# If the highest score is mediocre AND there is no statistical "cliff",
# it means the AI just grabbed a random cluster of distant neighbors.
# This dynamically catches out-of-distribution items without hardcoding
# strict global cutoffs.
if top_score < 0.65 and gradient < 0.05:
return [] # System realizes it's hallucinating and returns nothing
# Proceed to map results normally...
results = []
for match in matches:
meta = match.get("metadata", {})
results.append({
"url": meta.get("url", ""),
"score": round(match.get("score", 0), 4),
"raw_score": match.get("score", 0),
"folder": meta.get("folder", "uncategorized")
})
return results
def merge_face_results(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
merged = {}
for group in groups:
for match in group.get("matches", []):
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True)
def merge_object_results(nested_results: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
merged = {}
for res_list in nested_results:
for match in res_list:
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True)