import time from typing import Any, Dict, List import cloudinary import cloudinary.uploader import cloudinary.api from pinecone import Pinecone, ServerlessSpec from src.core.config import IDX_FACES, IDX_OBJECTS class PineconePool: def __init__(self): self._clients = {} def get(self, api_key: str) -> Pinecone: if api_key not in self._clients: self._clients[api_key] = Pinecone(api_key=api_key) return self._clients[api_key] pinecone_pool = PineconePool() def _set_cld_config(creds: dict): cloudinary.config( cloud_name=creds.get("cloud_name"), api_key=creds.get("api_key"), api_secret=creds.get("api_secret"), secure=True ) def cld_ping(creds: dict): _set_cld_config(creds) cloudinary.api.ping() def cld_upload(file_obj, folder: str, creds: dict) -> dict: _set_cld_config(creds) return cloudinary.uploader.upload(file_obj, folder=folder) def cld_root_folders(creds: dict) -> dict: _set_cld_config(creds) return cloudinary.api.root_folders() def cld_list_folder_images(folder: str, creds: dict, cursor: str = None, page_size: int = 100) -> dict: _set_cld_config(creds) kwargs = {"type": "upload", "prefix": f"{folder}/", "max_results": page_size} if cursor: kwargs["next_cursor"] = cursor return cloudinary.api.resources(**kwargs) def cld_delete_resource(public_id: str, creds: dict): _set_cld_config(creds) cloudinary.uploader.destroy(public_id) def cld_delete_folder_resources(folder: str, creds: dict): _set_cld_config(creds) cloudinary.api.delete_resources_by_prefix(f"{folder}/") def cld_remove_folder(folder: str, creds: dict): _set_cld_config(creds) try: cloudinary.api.delete_folder(folder) except Exception: pass def cld_delete_all_paginated(creds: dict) -> int: _set_cld_config(creds) deleted = 0 cursor = None while True: kwargs = {"type": "upload", "max_results": 500} if cursor: kwargs["next_cursor"] = cursor res = cloudinary.api.resources(**kwargs) resources = res.get("resources", []) if not resources: break pids = [r["public_id"] for r in resources] cloudinary.api.delete_resources(pids) deleted += len(pids) cursor = res.get("next_cursor") if not cursor: break return deleted def ensure_indexes(pc: Pinecone) -> List[str]: created = [] existing = [idx.name for idx in pc.list_indexes()] for name in [IDX_FACES, IDX_OBJECTS]: if name not in existing: pc.create_index( name=name, dimension=1024 if name == IDX_FACES else 1536, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1") ) created.append(name) return created def delete_and_recreate_indexes(pc: Pinecone): existing = [idx.name for idx in pc.list_indexes()] for name in [IDX_FACES, IDX_OBJECTS]: if name in existing: pc.delete_index(name) time.sleep(5) ensure_indexes(pc) def search_faces(idx, vec: List[float], det_score: float) -> Dict[str, Any]: res = idx.query(vector=vec, top_k=50, include_metadata=True) image_map = {} for match in res.get("matches", []): meta = match.get("metadata", {}) url = meta.get("url") if not url: continue score = match.get("score", 0) if url not in image_map or image_map[url]["raw_score"] < score: image_map[url] = { "raw_score": score, "face_crop": meta.get("face_crop", ""), "folder": meta.get("folder", "uncategorized") } return image_map import numpy as np def search_objects(idx, vec: List[float], filter_dict: dict = None) -> List[Dict[str, Any]]: query_kwargs = {"vector": vec, "top_k": 50, "include_metadata": True} if filter_dict: query_kwargs["filter"] = filter_dict res = idx.query(**query_kwargs) matches = res.get("matches", []) if not matches: return [] # ── ENTERPRISE FIX: Dynamic Gradient Analysis ── # Extract the raw scores scores = [m.get("score", 0) for m in matches] # Calculate the drop-off from the absolute best match to the 5th match if len(scores) >= 5: top_score = scores[0] fifth_score = scores[4] gradient = top_score - fifth_score # If the highest score is mediocre AND there is no statistical "cliff", # it means the AI just grabbed a random cluster of distant neighbors. # This dynamically catches out-of-distribution items without hardcoding # strict global cutoffs. if top_score < 0.65 and gradient < 0.05: return [] # System realizes it's hallucinating and returns nothing # Proceed to map results normally... results = [] for match in matches: meta = match.get("metadata", {}) results.append({ "url": meta.get("url", ""), "score": round(match.get("score", 0), 4), "raw_score": match.get("score", 0), "folder": meta.get("folder", "uncategorized") }) return results def merge_face_results(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]: merged = {} for group in groups: for match in group.get("matches", []): url = match["url"] if url not in merged or merged[url]["score"] < match["score"]: merged[url] = match return sorted(merged.values(), key=lambda x: x["score"], reverse=True) def merge_object_results(nested_results: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: merged = {} for res_list in nested_results: for match in res_list: url = match["url"] if url not in merged or merged[url]["score"] < match["score"]: merged[url] = match return sorted(merged.values(), key=lambda x: x["score"], reverse=True)