visual-search-api / src /api /upload.py
AdarshDRC's picture
fix: Resolving backend
29bfc1f
import asyncio
import io
import time
import uuid
from typing import List
from fastapi import APIRouter, File, Form, HTTPException, Query, Request, UploadFile, Depends
from src.core.config import (
IDX_FACES, IDX_OBJECTS,
IDX_FACES_ARCFACE, IDX_FACES_ADAFACE,
MAX_FILES_PER_UPLOAD, USE_SPLIT_FACE_INDEXES,
USE_ASYNC_UPLOADS, CLUSTER_AUTO_TRIGGER_EVERY,
)
from src.core.security import get_verified_keys
from src.services.db_client import cld_upload, pinecone_pool, ensure_indexes
from src.core.logging import log
from src.common.utils import get_ip, standardize_category_name, to_list
router = APIRouter()
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
# ──────────────────────────────────────────────────────────────
# Per-file processor — Cloudinary upload + AI inference only.
# Vectors are RETURNED, not upserted here. Caller batches all
# files' vectors into single Pinecone upserts (same as Phase 2).
# ──────────────────────────────────────────────────────────────
async def _process_one_file(
*,
file_bytes: bytes,
folder: str,
detect_faces: bool,
keys: dict,
ai,
sem,
) -> tuple[str, str, list]:
"""Returns (file_id, image_url, vectors). Mirrors Phase 2 signature."""
file_id = uuid.uuid4().hex
async def _run_ai():
async with sem:
return await ai.process_image_bytes_async(file_bytes, detect_faces=detect_faces)
cld_task = asyncio.to_thread(
cld_upload, io.BytesIO(file_bytes), folder, keys["cloudinary_creds"]
)
ai_task = _run_ai()
cld_res, vectors = await asyncio.gather(cld_task, ai_task)
return file_id, cld_res["secure_url"], vectors
# ──────────────────────────────────────────────────────────────
# Shared batch-upsert logic — used by sync upload AND job worker
# ──────────────────────────────────────────────────────────────
async def _batch_upsert_all(
*, results: list, folder: str, pc,
) -> dict:
"""
Takes [(file_id, url, vectors), ...] from all files, groups them by
target index, and upserts in one batch per index (single Pinecone
call per index, not per-file).
"""
arcface_upserts = []
adaface_upserts = []
legacy_face_upserts = []
object_upserts = []
uploaded_urls = []
for file_id, image_url, vectors in results:
uploaded_urls.append(image_url)
for i, v in enumerate(vectors):
vector_id = f"{file_id}_{i}"
if v["type"] == "face":
meta_common = {
"url": image_url,
"folder": folder,
"face_crop": v.get("face_crop", ""),
"det_score": float(v.get("det_score", 1.0)),
"face_width_px": int(v.get("face_width_px", 0)),
"blur_score": float(v.get("blur_score", 100.0)),
}
if USE_SPLIT_FACE_INDEXES:
arcface_upserts.append({
"id": vector_id,
"values": to_list(v["arcface_vector"]),
"metadata": meta_common,
})
if v.get("has_adaface"):
adaface_upserts.append({
"id": vector_id,
"values": to_list(v["adaface_vector"]),
"metadata": meta_common,
})
else:
legacy_face_upserts.append({
"id": vector_id,
"values": to_list(v["vector"]),
"metadata": meta_common,
})
else:
object_upserts.append({
"id": vector_id,
"values": to_list(v["vector"]),
"metadata": {"url": image_url, "folder": folder},
})
idx_obj = pc.Index(IDX_OBJECTS)
if USE_SPLIT_FACE_INDEXES:
idx_arcface = pc.Index(IDX_FACES_ARCFACE)
idx_adaface = pc.Index(IDX_FACES_ADAFACE)
else:
idx_face_legacy = pc.Index(IDX_FACES)
def batched_upsert(index, vectors):
for batch in chunker(vectors, 200):
index.upsert(vectors=batch)
db_tasks = []
if USE_SPLIT_FACE_INDEXES:
if arcface_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_arcface, arcface_upserts))
if adaface_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_adaface, adaface_upserts))
else:
if legacy_face_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_face_legacy, legacy_face_upserts))
if object_upserts:
db_tasks.append(asyncio.to_thread(batched_upsert, idx_obj, object_upserts))
if db_tasks:
await asyncio.gather(*db_tasks)
return {
"uploaded_urls": uploaded_urls,
"arcface_vecs": len(arcface_upserts),
"adaface_vecs": len(adaface_upserts),
"legacy_face_vecs": len(legacy_face_upserts),
"object_vecs": len(object_upserts),
}
# ──────────────────────────────────────────────────────────────
# Upload endpoint
# ──────────────────────────────────────────────────────────────
@router.post("/api/upload")
async def upload_images(
request: Request,
files: List[UploadFile] = File(...),
folder_name: str = Form(...),
detect_faces: bool = Form(True),
user_id: str = Form(""),
async_mode: bool = Query(False, alias="async"),
keys: dict = Depends(get_verified_keys),
):
ip = get_ip(request)
start = time.perf_counter()
if len(files) > MAX_FILES_PER_UPLOAD:
raise HTTPException(400, f"Too many files. Max {MAX_FILES_PER_UPLOAD} per request.")
folder = standardize_category_name(folder_name)
pc = pinecone_pool.get(keys["pinecone_key"])
# Auto-create indexes if missing. Idempotent.
try:
created = await asyncio.to_thread(ensure_indexes, pc)
if created:
log("INFO", "upload.indexes_auto_created",
user_id=user_id or "anonymous", ip=ip, created=created)
await asyncio.sleep(8)
except Exception as e:
log("ERROR", "upload.ensure_indexes_failed",
user_id=user_id or "anonymous", ip=ip, error=str(e))
raise HTTPException(500, f"Failed to initialize indexes: {e}")
# ── Async mode: enqueue job, return immediately ──────────────
if async_mode and USE_ASYNC_UPLOADS:
from src.services.jobs import create_job
files_data = []
for f in files:
b = await f.read()
files_data.append({"bytes": list(b), "filename": f.filename})
job_payload = {
"files_data": files_data,
"folder": folder,
"detect_faces": detect_faces,
"user_id": user_id or "anonymous",
"keys": {
"pinecone_key": keys["pinecone_key"],
"cloudinary_creds": keys["cloudinary_creds"],
},
}
job_id = await create_job(
user_id=user_id or "anonymous",
folder=folder,
total_files=len(files),
job_payload=job_payload,
)
log("INFO", "upload.async_enqueued",
user_id=user_id or "anonymous", ip=ip,
job_id=job_id, files=len(files), folder=folder)
return {
"message": "Upload queued",
"job_id": job_id,
"status_url": f"/api/jobs/{job_id}",
"total_files": len(files),
}
# ── Synchronous mode (default, matches original Phase 2 perf) ─
ai = request.app.state.ai
sem = request.app.state.ai_semaphore
# Read all files in parallel first, THEN fan out to _process_one_file.
# Doing `await f.read()` inside the list-comp would serialize reads.
file_bytes_list = await asyncio.gather(*[f.read() for f in files])
results = await asyncio.gather(*[
_process_one_file(
file_bytes=fb,
folder=folder,
detect_faces=detect_faces,
keys=keys,
ai=ai,
sem=sem,
)
for fb in file_bytes_list
])
summary = await _batch_upsert_all(results=results, folder=folder, pc=pc)
duration_ms = round((time.perf_counter() - start) * 1000)
log(
"INFO", "upload.complete",
user_id=user_id or "anonymous", ip=ip,
files=len(files), folder=folder, duration_ms=duration_ms,
mode="split" if USE_SPLIT_FACE_INDEXES else "legacy",
arcface_vecs=summary["arcface_vecs"],
adaface_vecs=summary["adaface_vecs"],
legacy_face_vecs=summary["legacy_face_vecs"],
object_vecs=summary["object_vecs"],
)
# Log this sync upload to upload_jobs so the table isn't empty.
# Sync uploads bypass the job queue entirely; this fire-and-forget task
# writes a completed row for visibility without changing the upload flow.
asyncio.create_task(
_log_sync_upload(user_id=user_id or "anonymous", folder=folder, summary=summary)
)
# Auto-trigger clustering if threshold crossed (fire and forget)
if CLUSTER_AUTO_TRIGGER_EVERY > 0 and summary["arcface_vecs"] > 0:
asyncio.create_task(
_maybe_trigger_clustering(pc, user_id, keys["pinecone_key"])
)
return {
"message": "Done!",
"urls": summary["uploaded_urls"],
"summary": {
"files": len(files),
"face_vectors": summary["arcface_vecs"] or summary["legacy_face_vecs"],
"adaface_vectors": summary["adaface_vecs"],
"object_vectors": summary["object_vecs"],
"index_mode": "split" if USE_SPLIT_FACE_INDEXES else "legacy",
},
}
async def _log_sync_upload(user_id: str, folder: str, summary: dict) -> None:
"""Write a completed row to upload_jobs for sync upload visibility.
Sync uploads skip the job queue; without this the table stays empty and
makes it impossible to audit what was indexed."""
import json
from src.services.jobs import _supa_insert
row = {
"job_id": uuid.uuid4().hex,
"user_id": user_id,
"folder": folder,
"status": "completed",
"total_files": len(summary["uploaded_urls"]),
"processed_files": len(summary["uploaded_urls"]),
"result": json.dumps({
"face_vectors": summary["arcface_vecs"] or summary["legacy_face_vecs"],
"adaface_vectors": summary["adaface_vecs"],
"object_vectors": summary["object_vecs"],
}),
}
try:
await _supa_insert("upload_jobs", row)
except Exception:
pass # Supabase not configured — silently skip, don't crash the upload
async def _maybe_trigger_clustering(pc, user_id: str, pinecone_key: str) -> None:
"""Background auto-cluster trigger when CLUSTER_AUTO_TRIGGER_EVERY crossed."""
try:
from src.services.cache import cache
from src.services.clustering import run_clustering
import hashlib
uid = hashlib.sha256(pinecone_key.encode()).hexdigest()[:16]
counter_key = f"upload_count:{uid}"
count = await cache.incr(counter_key)
if count >= CLUSTER_AUTO_TRIGGER_EVERY:
await cache.delete(counter_key)
log("INFO", "upload.auto_cluster_triggered",
user_id=user_id or "anonymous", trigger_count=count)
await run_clustering(pc, uid)
except Exception as e:
log("ERROR", "upload.auto_cluster_error", error=str(e))
# ──────────────────────────────────────────────────────────────
# Exported for jobs.py worker — same batched upsert path
# ──────────────────────────────────────────────────────────────
__all__ = ["upload_images", "_process_one_file", "_batch_upsert_all"]