Spaces:
Sleeping
Sleeping
File size: 3,986 Bytes
1db146d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | import asyncio
import io
import time
import uuid
from typing import List
from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends
from src.core.config import IDX_FACES, IDX_OBJECTS, MAX_FILES_PER_UPLOAD
from src.core.security import get_verified_keys
from src.services.db_client import cld_upload, pinecone_pool
from src.core.logging import log
from src.common.utils import get_ip, standardize_category_name, to_list
router = APIRouter()
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
@router.post("/api/upload")
async def upload_images(
request: Request,
files: List[UploadFile] = File(...),
folder_name: str = Form(...),
detect_faces: bool = Form(True),
user_id: str = Form(""),
keys: dict = Depends(get_verified_keys)
):
ip = get_ip(request)
start = time.perf_counter()
if len(files) > MAX_FILES_PER_UPLOAD:
raise HTTPException(400, f"Too many files. Max {MAX_FILES_PER_UPLOAD} per request.")
folder = standardize_category_name(folder_name)
pc = pinecone_pool.get(keys["pinecone_key"])
idx_obj = pc.Index(IDX_OBJECTS)
idx_face = pc.Index(IDX_FACES)
ai_manager = request.app.state.ai
sem = request.app.state.ai_semaphore
all_face_upserts: list[dict] = []
all_object_upserts: list[dict] = []
uploaded_urls: list[str] = []
async def _process_file(file: UploadFile) -> tuple[str, str, list]:
file_bytes = await file.read()
file_id = uuid.uuid4().hex
async def _run_ai():
async with sem:
return await ai_manager.process_image_bytes_async(file_bytes, detect_faces=detect_faces)
cld_task = asyncio.to_thread(cld_upload, io.BytesIO(file_bytes), folder, keys["cloudinary_creds"])
ai_task = _run_ai()
cld_res, vectors = await asyncio.gather(cld_task, ai_task)
return file_id, cld_res["secure_url"], vectors
results = await asyncio.gather(*[_process_file(f) for f in files])
for file_id, image_url, vectors in results:
uploaded_urls.append(image_url)
for i, v in enumerate(vectors):
vector_id = f"{file_id}_{i}"
if v["type"] == "face":
all_face_upserts.append({
"id": vector_id,
"values": to_list(v["vector"]),
"metadata": {
"url": image_url,
"folder": folder,
"face_crop": v.get("face_crop", ""),
"det_score": float(v.get("det_score", 1.0)),
"face_width_px": int(v.get("face_width_px", 0)),
},
})
else:
all_object_upserts.append({
"id": vector_id,
"values": to_list(v["vector"]),
"metadata": {"url": image_url, "folder": folder},
})
db_tasks = []
def batched_upsert(index, vectors):
for batch in chunker(vectors, 200):
index.upsert(vectors=batch)
if all_face_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_face, all_face_upserts))
if all_object_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_obj, all_object_upserts))
if db_tasks:
try:
await asyncio.gather(*db_tasks)
except Exception as e:
raise HTTPException(500, f"Database insertion failed: {e}")
duration_ms = round((time.perf_counter() - start) * 1000)
log("INFO", "upload.complete", user_id=user_id or "anonymous", ip=ip,
files=len(files), folder=folder, duration_ms=duration_ms)
return {
"message": "Done!",
"urls": uploaded_urls,
"summary": {
"files": len(files),
"face_vectors": len(all_face_upserts),
"object_vectors": len(all_object_upserts),
},
} |