Spaces:
Sleeping
Sleeping
File size: 6,105 Bytes
fb44492 c56ede7 fb44492 4558b92 c56ede7 4558b92 fb44492 4558b92 fb44492 4558b92 fb44492 4558b92 fb44492 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | import time
from typing import Any, Dict, List
import cloudinary
import cloudinary.uploader
import cloudinary.api
from pinecone import Pinecone, ServerlessSpec
from src.core.config import IDX_FACES, IDX_OBJECTS
class PineconePool:
def __init__(self):
self._clients = {}
def get(self, api_key: str) -> Pinecone:
if api_key not in self._clients:
self._clients[api_key] = Pinecone(api_key=api_key)
return self._clients[api_key]
pinecone_pool = PineconePool()
def _set_cld_config(creds: dict):
cloudinary.config(
cloud_name=creds.get("cloud_name"),
api_key=creds.get("api_key"),
api_secret=creds.get("api_secret"),
secure=True
)
def cld_ping(creds: dict):
_set_cld_config(creds)
cloudinary.api.ping()
def cld_upload(file_obj, folder: str, creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.uploader.upload(file_obj, folder=folder)
def cld_root_folders(creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.api.root_folders()
def cld_list_folder_images(folder: str, creds: dict, cursor: str = None, page_size: int = 100) -> dict:
_set_cld_config(creds)
kwargs = {"type": "upload", "prefix": f"{folder}/", "max_results": page_size}
if cursor:
kwargs["next_cursor"] = cursor
return cloudinary.api.resources(**kwargs)
def cld_delete_resource(public_id: str, creds: dict):
_set_cld_config(creds)
cloudinary.uploader.destroy(public_id)
def cld_delete_folder_resources(folder: str, creds: dict):
_set_cld_config(creds)
cloudinary.api.delete_resources_by_prefix(f"{folder}/")
def cld_remove_folder(folder: str, creds: dict):
_set_cld_config(creds)
try:
cloudinary.api.delete_folder(folder)
except Exception:
pass
def cld_delete_all_paginated(creds: dict) -> int:
_set_cld_config(creds)
deleted = 0
cursor = None
while True:
kwargs = {"type": "upload", "max_results": 500}
if cursor:
kwargs["next_cursor"] = cursor
res = cloudinary.api.resources(**kwargs)
resources = res.get("resources", [])
if not resources:
break
pids = [r["public_id"] for r in resources]
cloudinary.api.delete_resources(pids)
deleted += len(pids)
cursor = res.get("next_cursor")
if not cursor:
break
return deleted
def ensure_indexes(pc: Pinecone) -> List[str]:
created = []
existing = [idx.name for idx in pc.list_indexes()]
for name in [IDX_FACES, IDX_OBJECTS]:
if name not in existing:
pc.create_index(
name=name,
dimension=1024 if name == IDX_FACES else 1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
created.append(name)
return created
def delete_and_recreate_indexes(pc: Pinecone):
existing = [idx.name for idx in pc.list_indexes()]
for name in [IDX_FACES, IDX_OBJECTS]:
if name in existing:
pc.delete_index(name)
time.sleep(5)
ensure_indexes(pc)
def search_faces(idx, vec: List[float], det_score: float) -> Dict[str, Any]:
res = idx.query(vector=vec, top_k=50, include_metadata=True)
image_map = {}
for match in res.get("matches", []):
meta = match.get("metadata", {})
url = meta.get("url")
if not url:
continue
score = match.get("score", 0)
if url not in image_map or image_map[url]["raw_score"] < score:
image_map[url] = {
"raw_score": score,
"face_crop": meta.get("face_crop", ""),
"folder": meta.get("folder", "uncategorized")
}
return image_map
import numpy as np
def search_objects(idx, vec: List[float], filter_dict: dict = None) -> List[Dict[str, Any]]:
query_kwargs = {"vector": vec, "top_k": 50, "include_metadata": True}
if filter_dict:
query_kwargs["filter"] = filter_dict
res = idx.query(**query_kwargs)
matches = res.get("matches", [])
if not matches:
return []
# ── ENTERPRISE FIX: Dynamic Gradient Analysis ──
# Extract the raw scores
scores = [m.get("score", 0) for m in matches]
# Calculate the drop-off from the absolute best match to the 5th match
if len(scores) >= 5:
top_score = scores[0]
fifth_score = scores[4]
gradient = top_score - fifth_score
# If the highest score is mediocre AND there is no statistical "cliff",
# it means the AI just grabbed a random cluster of distant neighbors.
# This dynamically catches out-of-distribution items without hardcoding
# strict global cutoffs.
if top_score < 0.65 and gradient < 0.05:
return [] # System realizes it's hallucinating and returns nothing
# Proceed to map results normally...
results = []
for match in matches:
meta = match.get("metadata", {})
results.append({
"url": meta.get("url", ""),
"score": round(match.get("score", 0), 4),
"raw_score": match.get("score", 0),
"folder": meta.get("folder", "uncategorized")
})
return results
def merge_face_results(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
merged = {}
for group in groups:
for match in group.get("matches", []):
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True)
def merge_object_results(nested_results: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
merged = {}
for res_list in nested_results:
for match in res_list:
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True) |