Spaces:
Running
Running
| import asyncio | |
| import io | |
| import time | |
| import uuid | |
| from typing import List | |
| from fastapi import APIRouter, File, Form, HTTPException, Query, Request, UploadFile, Depends | |
| from src.core.config import ( | |
| IDX_FACES, IDX_OBJECTS, | |
| IDX_FACES_ARCFACE, IDX_FACES_ADAFACE, | |
| MAX_FILES_PER_UPLOAD, USE_SPLIT_FACE_INDEXES, | |
| USE_ASYNC_UPLOADS, CLUSTER_AUTO_TRIGGER_EVERY, | |
| ) | |
| from src.core.security import get_verified_keys | |
| from src.services.db_client import cld_upload, pinecone_pool, ensure_indexes | |
| from src.core.logging import log | |
| from src.common.utils import get_ip, standardize_category_name, to_list | |
| router = APIRouter() | |
| def chunker(seq, size): | |
| return (seq[pos:pos + size] for pos in range(0, len(seq), size)) | |
| # ────────────────────────────────────────────────────────────── | |
| # Per-file processor — Cloudinary upload + AI inference only. | |
| # Vectors are RETURNED, not upserted here. Caller batches all | |
| # files' vectors into single Pinecone upserts (same as Phase 2). | |
| # ────────────────────────────────────────────────────────────── | |
| async def _process_one_file( | |
| *, | |
| file_bytes: bytes, | |
| folder: str, | |
| detect_faces: bool, | |
| keys: dict, | |
| ai, | |
| sem, | |
| ) -> tuple[str, str, list]: | |
| """Returns (file_id, image_url, vectors). Mirrors Phase 2 signature.""" | |
| file_id = uuid.uuid4().hex | |
| async def _run_ai(): | |
| async with sem: | |
| return await ai.process_image_bytes_async(file_bytes, detect_faces=detect_faces) | |
| cld_task = asyncio.to_thread( | |
| cld_upload, io.BytesIO(file_bytes), folder, keys["cloudinary_creds"] | |
| ) | |
| ai_task = _run_ai() | |
| cld_res, vectors = await asyncio.gather(cld_task, ai_task) | |
| return file_id, cld_res["secure_url"], vectors | |
| # ────────────────────────────────────────────────────────────── | |
| # Shared batch-upsert logic — used by sync upload AND job worker | |
| # ────────────────────────────────────────────────────────────── | |
| async def _batch_upsert_all( | |
| *, results: list, folder: str, pc, | |
| ) -> dict: | |
| """ | |
| Takes [(file_id, url, vectors), ...] from all files, groups them by | |
| target index, and upserts in one batch per index (single Pinecone | |
| call per index, not per-file). | |
| """ | |
| arcface_upserts = [] | |
| adaface_upserts = [] | |
| legacy_face_upserts = [] | |
| object_upserts = [] | |
| uploaded_urls = [] | |
| for file_id, image_url, vectors in results: | |
| uploaded_urls.append(image_url) | |
| for i, v in enumerate(vectors): | |
| vector_id = f"{file_id}_{i}" | |
| if v["type"] == "face": | |
| meta_common = { | |
| "url": image_url, | |
| "folder": folder, | |
| "face_crop": v.get("face_crop", ""), | |
| "det_score": float(v.get("det_score", 1.0)), | |
| "face_width_px": int(v.get("face_width_px", 0)), | |
| "blur_score": float(v.get("blur_score", 100.0)), | |
| } | |
| if USE_SPLIT_FACE_INDEXES: | |
| arcface_upserts.append({ | |
| "id": vector_id, | |
| "values": to_list(v["arcface_vector"]), | |
| "metadata": meta_common, | |
| }) | |
| if v.get("has_adaface"): | |
| adaface_upserts.append({ | |
| "id": vector_id, | |
| "values": to_list(v["adaface_vector"]), | |
| "metadata": meta_common, | |
| }) | |
| else: | |
| legacy_face_upserts.append({ | |
| "id": vector_id, | |
| "values": to_list(v["vector"]), | |
| "metadata": meta_common, | |
| }) | |
| else: | |
| object_upserts.append({ | |
| "id": vector_id, | |
| "values": to_list(v["vector"]), | |
| "metadata": {"url": image_url, "folder": folder}, | |
| }) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| if USE_SPLIT_FACE_INDEXES: | |
| idx_arcface = pc.Index(IDX_FACES_ARCFACE) | |
| idx_adaface = pc.Index(IDX_FACES_ADAFACE) | |
| else: | |
| idx_face_legacy = pc.Index(IDX_FACES) | |
| def batched_upsert(index, vectors): | |
| for batch in chunker(vectors, 200): | |
| index.upsert(vectors=batch) | |
| db_tasks = [] | |
| if USE_SPLIT_FACE_INDEXES: | |
| if arcface_upserts: | |
| db_tasks.append(asyncio.to_thread(batched_upsert, idx_arcface, arcface_upserts)) | |
| if adaface_upserts: | |
| db_tasks.append(asyncio.to_thread(batched_upsert, idx_adaface, adaface_upserts)) | |
| else: | |
| if legacy_face_upserts: | |
| db_tasks.append(asyncio.to_thread(batched_upsert, idx_face_legacy, legacy_face_upserts)) | |
| if object_upserts: | |
| db_tasks.append(asyncio.to_thread(batched_upsert, idx_obj, object_upserts)) | |
| if db_tasks: | |
| await asyncio.gather(*db_tasks) | |
| return { | |
| "uploaded_urls": uploaded_urls, | |
| "arcface_vecs": len(arcface_upserts), | |
| "adaface_vecs": len(adaface_upserts), | |
| "legacy_face_vecs": len(legacy_face_upserts), | |
| "object_vecs": len(object_upserts), | |
| } | |
| # ────────────────────────────────────────────────────────────── | |
| # Upload endpoint | |
| # ────────────────────────────────────────────────────────────── | |
| async def upload_images( | |
| request: Request, | |
| files: List[UploadFile] = File(...), | |
| folder_name: str = Form(...), | |
| detect_faces: bool = Form(True), | |
| user_id: str = Form(""), | |
| async_mode: bool = Query(False, alias="async"), | |
| keys: dict = Depends(get_verified_keys), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| if len(files) > MAX_FILES_PER_UPLOAD: | |
| raise HTTPException(400, f"Too many files. Max {MAX_FILES_PER_UPLOAD} per request.") | |
| folder = standardize_category_name(folder_name) | |
| pc = pinecone_pool.get(keys["pinecone_key"]) | |
| # Auto-create indexes if missing. Idempotent. | |
| try: | |
| created = await asyncio.to_thread(ensure_indexes, pc) | |
| if created: | |
| log("INFO", "upload.indexes_auto_created", | |
| user_id=user_id or "anonymous", ip=ip, created=created) | |
| await asyncio.sleep(8) | |
| except Exception as e: | |
| log("ERROR", "upload.ensure_indexes_failed", | |
| user_id=user_id or "anonymous", ip=ip, error=str(e)) | |
| raise HTTPException(500, f"Failed to initialize indexes: {e}") | |
| # ── Async mode: enqueue job, return immediately ────────────── | |
| if async_mode and USE_ASYNC_UPLOADS: | |
| from src.services.jobs import create_job | |
| files_data = [] | |
| for f in files: | |
| b = await f.read() | |
| files_data.append({"bytes": list(b), "filename": f.filename}) | |
| job_payload = { | |
| "files_data": files_data, | |
| "folder": folder, | |
| "detect_faces": detect_faces, | |
| "user_id": user_id or "anonymous", | |
| "keys": { | |
| "pinecone_key": keys["pinecone_key"], | |
| "cloudinary_creds": keys["cloudinary_creds"], | |
| }, | |
| } | |
| job_id = await create_job( | |
| user_id=user_id or "anonymous", | |
| folder=folder, | |
| total_files=len(files), | |
| job_payload=job_payload, | |
| ) | |
| log("INFO", "upload.async_enqueued", | |
| user_id=user_id or "anonymous", ip=ip, | |
| job_id=job_id, files=len(files), folder=folder) | |
| return { | |
| "message": "Upload queued", | |
| "job_id": job_id, | |
| "status_url": f"/api/jobs/{job_id}", | |
| "total_files": len(files), | |
| } | |
| # ── Synchronous mode (default, matches original Phase 2 perf) ─ | |
| ai = request.app.state.ai | |
| sem = request.app.state.ai_semaphore | |
| # Read all files in parallel first, THEN fan out to _process_one_file. | |
| # Doing `await f.read()` inside the list-comp would serialize reads. | |
| file_bytes_list = await asyncio.gather(*[f.read() for f in files]) | |
| results = await asyncio.gather(*[ | |
| _process_one_file( | |
| file_bytes=fb, | |
| folder=folder, | |
| detect_faces=detect_faces, | |
| keys=keys, | |
| ai=ai, | |
| sem=sem, | |
| ) | |
| for fb in file_bytes_list | |
| ]) | |
| summary = await _batch_upsert_all(results=results, folder=folder, pc=pc) | |
| duration_ms = round((time.perf_counter() - start) * 1000) | |
| log( | |
| "INFO", "upload.complete", | |
| user_id=user_id or "anonymous", ip=ip, | |
| files=len(files), folder=folder, duration_ms=duration_ms, | |
| mode="split" if USE_SPLIT_FACE_INDEXES else "legacy", | |
| arcface_vecs=summary["arcface_vecs"], | |
| adaface_vecs=summary["adaface_vecs"], | |
| legacy_face_vecs=summary["legacy_face_vecs"], | |
| object_vecs=summary["object_vecs"], | |
| ) | |
| # Log this sync upload to upload_jobs so the table isn't empty. | |
| # Sync uploads bypass the job queue entirely; this fire-and-forget task | |
| # writes a completed row for visibility without changing the upload flow. | |
| asyncio.create_task( | |
| _log_sync_upload(user_id=user_id or "anonymous", folder=folder, summary=summary) | |
| ) | |
| # Auto-trigger clustering if threshold crossed (fire and forget) | |
| if CLUSTER_AUTO_TRIGGER_EVERY > 0 and summary["arcface_vecs"] > 0: | |
| asyncio.create_task( | |
| _maybe_trigger_clustering(pc, user_id, keys["pinecone_key"]) | |
| ) | |
| return { | |
| "message": "Done!", | |
| "urls": summary["uploaded_urls"], | |
| "summary": { | |
| "files": len(files), | |
| "face_vectors": summary["arcface_vecs"] or summary["legacy_face_vecs"], | |
| "adaface_vectors": summary["adaface_vecs"], | |
| "object_vectors": summary["object_vecs"], | |
| "index_mode": "split" if USE_SPLIT_FACE_INDEXES else "legacy", | |
| }, | |
| } | |
| async def _log_sync_upload(user_id: str, folder: str, summary: dict) -> None: | |
| """Write a completed row to upload_jobs for sync upload visibility. | |
| Sync uploads skip the job queue; without this the table stays empty and | |
| makes it impossible to audit what was indexed.""" | |
| import json | |
| from src.services.jobs import _supa_insert | |
| row = { | |
| "job_id": uuid.uuid4().hex, | |
| "user_id": user_id, | |
| "folder": folder, | |
| "status": "completed", | |
| "total_files": len(summary["uploaded_urls"]), | |
| "processed_files": len(summary["uploaded_urls"]), | |
| "result": json.dumps({ | |
| "face_vectors": summary["arcface_vecs"] or summary["legacy_face_vecs"], | |
| "adaface_vectors": summary["adaface_vecs"], | |
| "object_vectors": summary["object_vecs"], | |
| }), | |
| } | |
| try: | |
| await _supa_insert("upload_jobs", row) | |
| except Exception: | |
| pass # Supabase not configured — silently skip, don't crash the upload | |
| async def _maybe_trigger_clustering(pc, user_id: str, pinecone_key: str) -> None: | |
| """Background auto-cluster trigger when CLUSTER_AUTO_TRIGGER_EVERY crossed.""" | |
| try: | |
| from src.services.cache import cache | |
| from src.services.clustering import run_clustering | |
| import hashlib | |
| uid = hashlib.sha256(pinecone_key.encode()).hexdigest()[:16] | |
| counter_key = f"upload_count:{uid}" | |
| count = await cache.incr(counter_key) | |
| if count >= CLUSTER_AUTO_TRIGGER_EVERY: | |
| await cache.delete(counter_key) | |
| log("INFO", "upload.auto_cluster_triggered", | |
| user_id=user_id or "anonymous", trigger_count=count) | |
| await run_clustering(pc, uid) | |
| except Exception as e: | |
| log("ERROR", "upload.auto_cluster_error", error=str(e)) | |
| # ────────────────────────────────────────────────────────────── | |
| # Exported for jobs.py worker — same batched upsert path | |
| # ────────────────────────────────────────────────────────────── | |
| __all__ = ["upload_images", "_process_one_file", "_batch_upsert_all"] |