import asyncio import io import time import uuid from typing import List from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends from src.core.config import IDX_FACES, IDX_OBJECTS, MAX_FILES_PER_UPLOAD from src.core.security import get_verified_keys from src.services.db_client import cld_upload, pinecone_pool from src.core.logging import log from src.common.utils import get_ip, standardize_category_name, to_list router = APIRouter() def chunker(seq, size): return (seq[pos:pos + size] for pos in range(0, len(seq), size)) @router.post("/api/upload") async def upload_images( request: Request, files: List[UploadFile] = File(...), folder_name: str = Form(...), detect_faces: bool = Form(True), user_id: str = Form(""), keys: dict = Depends(get_verified_keys) ): ip = get_ip(request) start = time.perf_counter() if len(files) > MAX_FILES_PER_UPLOAD: raise HTTPException(400, f"Too many files. Max {MAX_FILES_PER_UPLOAD} per request.") folder = standardize_category_name(folder_name) pc = pinecone_pool.get(keys["pinecone_key"]) idx_obj = pc.Index(IDX_OBJECTS) idx_face = pc.Index(IDX_FACES) ai_manager = request.app.state.ai sem = request.app.state.ai_semaphore all_face_upserts: list[dict] = [] all_object_upserts: list[dict] = [] uploaded_urls: list[str] = [] async def _process_file(file: UploadFile) -> tuple[str, str, list]: file_bytes = await file.read() file_id = uuid.uuid4().hex async def _run_ai(): async with sem: return await ai_manager.process_image_bytes_async(file_bytes, detect_faces=detect_faces) cld_task = asyncio.to_thread(cld_upload, io.BytesIO(file_bytes), folder, keys["cloudinary_creds"]) ai_task = _run_ai() cld_res, vectors = await asyncio.gather(cld_task, ai_task) return file_id, cld_res["secure_url"], vectors results = await asyncio.gather(*[_process_file(f) for f in files]) for file_id, image_url, vectors in results: uploaded_urls.append(image_url) for i, v in enumerate(vectors): vector_id = f"{file_id}_{i}" if v["type"] == "face": all_face_upserts.append({ "id": vector_id, "values": to_list(v["vector"]), "metadata": { "url": image_url, "folder": folder, "face_crop": v.get("face_crop", ""), "det_score": float(v.get("det_score", 1.0)), "face_width_px": int(v.get("face_width_px", 0)), }, }) else: all_object_upserts.append({ "id": vector_id, "values": to_list(v["vector"]), "metadata": {"url": image_url, "folder": folder}, }) db_tasks = [] def batched_upsert(index, vectors): for batch in chunker(vectors, 200): index.upsert(vectors=batch) if all_face_upserts: db_tasks.append(asyncio.to_thread(batched_upsert, idx_face, all_face_upserts)) if all_object_upserts: db_tasks.append(asyncio.to_thread(batched_upsert, idx_obj, all_object_upserts)) if db_tasks: try: await asyncio.gather(*db_tasks) except Exception as e: raise HTTPException(500, f"Database insertion failed: {e}") duration_ms = round((time.perf_counter() - start) * 1000) log("INFO", "upload.complete", user_id=user_id or "anonymous", ip=ip, files=len(files), folder=folder, duration_ms=duration_ms) return { "message": "Done!", "urls": uploaded_urls, "summary": { "files": len(files), "face_vectors": len(all_face_upserts), "object_vectors": len(all_object_upserts), }, }