visual-search-api2 / src /api /upload.py
AdarshDRC's picture
Create upload.py
1db146d verified
import asyncio
import io
import time
import uuid
from typing import List
from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends
from src.core.config import IDX_FACES, IDX_OBJECTS, MAX_FILES_PER_UPLOAD
from src.core.security import get_verified_keys
from src.services.db_client import cld_upload, pinecone_pool
from src.core.logging import log
from src.common.utils import get_ip, standardize_category_name, to_list
router = APIRouter()
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
@router.post("/api/upload")
async def upload_images(
request: Request,
files: List[UploadFile] = File(...),
folder_name: str = Form(...),
detect_faces: bool = Form(True),
user_id: str = Form(""),
keys: dict = Depends(get_verified_keys)
):
ip = get_ip(request)
start = time.perf_counter()
if len(files) > MAX_FILES_PER_UPLOAD:
raise HTTPException(400, f"Too many files. Max {MAX_FILES_PER_UPLOAD} per request.")
folder = standardize_category_name(folder_name)
pc = pinecone_pool.get(keys["pinecone_key"])
idx_obj = pc.Index(IDX_OBJECTS)
idx_face = pc.Index(IDX_FACES)
ai_manager = request.app.state.ai
sem = request.app.state.ai_semaphore
all_face_upserts: list[dict] = []
all_object_upserts: list[dict] = []
uploaded_urls: list[str] = []
async def _process_file(file: UploadFile) -> tuple[str, str, list]:
file_bytes = await file.read()
file_id = uuid.uuid4().hex
async def _run_ai():
async with sem:
return await ai_manager.process_image_bytes_async(file_bytes, detect_faces=detect_faces)
cld_task = asyncio.to_thread(cld_upload, io.BytesIO(file_bytes), folder, keys["cloudinary_creds"])
ai_task = _run_ai()
cld_res, vectors = await asyncio.gather(cld_task, ai_task)
return file_id, cld_res["secure_url"], vectors
results = await asyncio.gather(*[_process_file(f) for f in files])
for file_id, image_url, vectors in results:
uploaded_urls.append(image_url)
for i, v in enumerate(vectors):
vector_id = f"{file_id}_{i}"
if v["type"] == "face":
all_face_upserts.append({
"id": vector_id,
"values": to_list(v["vector"]),
"metadata": {
"url": image_url,
"folder": folder,
"face_crop": v.get("face_crop", ""),
"det_score": float(v.get("det_score", 1.0)),
"face_width_px": int(v.get("face_width_px", 0)),
},
})
else:
all_object_upserts.append({
"id": vector_id,
"values": to_list(v["vector"]),
"metadata": {"url": image_url, "folder": folder},
})
db_tasks = []
def batched_upsert(index, vectors):
for batch in chunker(vectors, 200):
index.upsert(vectors=batch)
if all_face_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_face, all_face_upserts))
if all_object_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_obj, all_object_upserts))
if db_tasks:
try:
await asyncio.gather(*db_tasks)
except Exception as e:
raise HTTPException(500, f"Database insertion failed: {e}")
duration_ms = round((time.perf_counter() - start) * 1000)
log("INFO", "upload.complete", user_id=user_id or "anonymous", ip=ip,
files=len(files), folder=folder, duration_ms=duration_ms)
return {
"message": "Done!",
"urls": uploaded_urls,
"summary": {
"files": len(files),
"face_vectors": len(all_face_upserts),
"object_vectors": len(all_object_upserts),
},
}