Spaces:
Sleeping
Sleeping
| import time | |
| from typing import Any, Dict, List | |
| import cloudinary | |
| import cloudinary.uploader | |
| import cloudinary.api | |
| from pinecone import Pinecone, ServerlessSpec | |
| from src.core.config import IDX_FACES, IDX_OBJECTS | |
| class PineconePool: | |
| def __init__(self): | |
| self._clients = {} | |
| def get(self, api_key: str) -> Pinecone: | |
| if api_key not in self._clients: | |
| self._clients[api_key] = Pinecone(api_key=api_key) | |
| return self._clients[api_key] | |
| pinecone_pool = PineconePool() | |
| def _set_cld_config(creds: dict): | |
| cloudinary.config( | |
| cloud_name=creds.get("cloud_name"), | |
| api_key=creds.get("api_key"), | |
| api_secret=creds.get("api_secret"), | |
| secure=True | |
| ) | |
| def cld_ping(creds: dict): | |
| _set_cld_config(creds) | |
| cloudinary.api.ping() | |
| def cld_upload(file_obj, folder: str, creds: dict) -> dict: | |
| _set_cld_config(creds) | |
| return cloudinary.uploader.upload(file_obj, folder=folder) | |
| def cld_root_folders(creds: dict) -> dict: | |
| _set_cld_config(creds) | |
| return cloudinary.api.root_folders() | |
| def cld_list_folder_images(folder: str, creds: dict, cursor: str = None, page_size: int = 100) -> dict: | |
| _set_cld_config(creds) | |
| kwargs = {"type": "upload", "prefix": f"{folder}/", "max_results": page_size} | |
| if cursor: | |
| kwargs["next_cursor"] = cursor | |
| return cloudinary.api.resources(**kwargs) | |
| def cld_delete_resource(public_id: str, creds: dict): | |
| _set_cld_config(creds) | |
| cloudinary.uploader.destroy(public_id) | |
| def cld_delete_folder_resources(folder: str, creds: dict): | |
| _set_cld_config(creds) | |
| cloudinary.api.delete_resources_by_prefix(f"{folder}/") | |
| def cld_remove_folder(folder: str, creds: dict): | |
| _set_cld_config(creds) | |
| try: | |
| cloudinary.api.delete_folder(folder) | |
| except Exception: | |
| pass | |
| def cld_delete_all_paginated(creds: dict) -> int: | |
| _set_cld_config(creds) | |
| deleted = 0 | |
| cursor = None | |
| while True: | |
| kwargs = {"type": "upload", "max_results": 500} | |
| if cursor: | |
| kwargs["next_cursor"] = cursor | |
| res = cloudinary.api.resources(**kwargs) | |
| resources = res.get("resources", []) | |
| if not resources: | |
| break | |
| pids = [r["public_id"] for r in resources] | |
| cloudinary.api.delete_resources(pids) | |
| deleted += len(pids) | |
| cursor = res.get("next_cursor") | |
| if not cursor: | |
| break | |
| return deleted | |
| def ensure_indexes(pc: Pinecone) -> List[str]: | |
| created = [] | |
| existing = [idx.name for idx in pc.list_indexes()] | |
| for name in [IDX_FACES, IDX_OBJECTS]: | |
| if name not in existing: | |
| pc.create_index( | |
| name=name, | |
| dimension=1024 if name == IDX_FACES else 1536, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1") | |
| ) | |
| created.append(name) | |
| return created | |
| def delete_and_recreate_indexes(pc: Pinecone): | |
| existing = [idx.name for idx in pc.list_indexes()] | |
| for name in [IDX_FACES, IDX_OBJECTS]: | |
| if name in existing: | |
| pc.delete_index(name) | |
| time.sleep(5) | |
| ensure_indexes(pc) | |
| def search_faces(idx, vec: List[float], det_score: float) -> Dict[str, Any]: | |
| res = idx.query(vector=vec, top_k=50, include_metadata=True) | |
| image_map = {} | |
| for match in res.get("matches", []): | |
| meta = match.get("metadata", {}) | |
| url = meta.get("url") | |
| if not url: | |
| continue | |
| score = match.get("score", 0) | |
| if url not in image_map or image_map[url]["raw_score"] < score: | |
| image_map[url] = { | |
| "raw_score": score, | |
| "face_crop": meta.get("face_crop", ""), | |
| "folder": meta.get("folder", "uncategorized") | |
| } | |
| return image_map | |
| import numpy as np | |
| def search_objects(idx, vec: List[float], filter_dict: dict = None) -> List[Dict[str, Any]]: | |
| query_kwargs = {"vector": vec, "top_k": 50, "include_metadata": True} | |
| if filter_dict: | |
| query_kwargs["filter"] = filter_dict | |
| res = idx.query(**query_kwargs) | |
| matches = res.get("matches", []) | |
| if not matches: | |
| return [] | |
| # ── ENTERPRISE FIX: Dynamic Gradient Analysis ── | |
| # Extract the raw scores | |
| scores = [m.get("score", 0) for m in matches] | |
| # Calculate the drop-off from the absolute best match to the 5th match | |
| if len(scores) >= 5: | |
| top_score = scores[0] | |
| fifth_score = scores[4] | |
| gradient = top_score - fifth_score | |
| # If the highest score is mediocre AND there is no statistical "cliff", | |
| # it means the AI just grabbed a random cluster of distant neighbors. | |
| # This dynamically catches out-of-distribution items without hardcoding | |
| # strict global cutoffs. | |
| if top_score < 0.65 and gradient < 0.05: | |
| return [] # System realizes it's hallucinating and returns nothing | |
| # Proceed to map results normally... | |
| results = [] | |
| for match in matches: | |
| meta = match.get("metadata", {}) | |
| results.append({ | |
| "url": meta.get("url", ""), | |
| "score": round(match.get("score", 0), 4), | |
| "raw_score": match.get("score", 0), | |
| "folder": meta.get("folder", "uncategorized") | |
| }) | |
| return results | |
| def merge_face_results(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| merged = {} | |
| for group in groups: | |
| for match in group.get("matches", []): | |
| url = match["url"] | |
| if url not in merged or merged[url]["score"] < match["score"]: | |
| merged[url] = match | |
| return sorted(merged.values(), key=lambda x: x["score"], reverse=True) | |
| def merge_object_results(nested_results: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: | |
| merged = {} | |
| for res_list in nested_results: | |
| for match in res_list: | |
| url = match["url"] | |
| if url not in merged or merged[url]["score"] < match["score"]: | |
| merged[url] = match | |
| return sorted(merged.values(), key=lambda x: x["score"], reverse=True) |