File size: 3,986 Bytes
1db146d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import asyncio
import io
import time
import uuid
from typing import List

from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile, Depends

from src.core.config import IDX_FACES, IDX_OBJECTS, MAX_FILES_PER_UPLOAD
from src.core.security import get_verified_keys
from src.services.db_client import cld_upload, pinecone_pool
from src.core.logging import log
from src.common.utils import get_ip, standardize_category_name, to_list

router = APIRouter()

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

@router.post("/api/upload")
async def upload_images(
    request: Request,
    files: List[UploadFile] = File(...),
    folder_name: str = Form(...),
    detect_faces: bool = Form(True),
    user_id: str = Form(""),
    keys: dict = Depends(get_verified_keys)
):
    ip = get_ip(request)
    start = time.perf_counter()

    if len(files) > MAX_FILES_PER_UPLOAD:
        raise HTTPException(400, f"Too many files. Max {MAX_FILES_PER_UPLOAD} per request.")

    folder = standardize_category_name(folder_name)
    pc = pinecone_pool.get(keys["pinecone_key"])
    idx_obj = pc.Index(IDX_OBJECTS)
    idx_face = pc.Index(IDX_FACES)

    ai_manager = request.app.state.ai
    sem = request.app.state.ai_semaphore

    all_face_upserts: list[dict] = []
    all_object_upserts: list[dict] = []
    uploaded_urls: list[str] = []

    async def _process_file(file: UploadFile) -> tuple[str, str, list]:
        file_bytes = await file.read()
        file_id = uuid.uuid4().hex

        async def _run_ai():
            async with sem:
                return await ai_manager.process_image_bytes_async(file_bytes, detect_faces=detect_faces)

        cld_task = asyncio.to_thread(cld_upload, io.BytesIO(file_bytes), folder, keys["cloudinary_creds"])
        ai_task = _run_ai()
        
        cld_res, vectors = await asyncio.gather(cld_task, ai_task)
        return file_id, cld_res["secure_url"], vectors

    results = await asyncio.gather(*[_process_file(f) for f in files])

    for file_id, image_url, vectors in results:
        uploaded_urls.append(image_url)
        for i, v in enumerate(vectors):
            vector_id = f"{file_id}_{i}"
            if v["type"] == "face":
                all_face_upserts.append({
                    "id": vector_id,
                    "values": to_list(v["vector"]),
                    "metadata": {
                        "url": image_url,
                        "folder": folder,
                        "face_crop": v.get("face_crop", ""),
                        "det_score": float(v.get("det_score", 1.0)),
                        "face_width_px": int(v.get("face_width_px", 0)),
                    },
                })
            else:
                all_object_upserts.append({
                    "id": vector_id,
                    "values": to_list(v["vector"]),
                    "metadata": {"url": image_url, "folder": folder},
                })

    db_tasks = []
    def batched_upsert(index, vectors):
        for batch in chunker(vectors, 200):
            index.upsert(vectors=batch)

    if all_face_upserts:
        db_tasks.append(asyncio.to_thread(batched_upsert, idx_face, all_face_upserts))
    if all_object_upserts:
        db_tasks.append(asyncio.to_thread(batched_upsert, idx_obj, all_object_upserts))
        
    if db_tasks:
        try:
            await asyncio.gather(*db_tasks)
        except Exception as e:
            raise HTTPException(500, f"Database insertion failed: {e}")

    duration_ms = round((time.perf_counter() - start) * 1000)
    log("INFO", "upload.complete", user_id=user_id or "anonymous", ip=ip,
        files=len(files), folder=folder, duration_ms=duration_ms)

    return {
        "message": "Done!",
        "urls": uploaded_urls,
        "summary": {
            "files": len(files),
            "face_vectors": len(all_face_upserts),
            "object_vectors": len(all_object_upserts),
        },
    }