import asyncio import base64 import functools import io import threading import traceback import hashlib import cv2 import numpy as np import torch import torch.nn.functional as F from PIL import Image from transformers import AutoImageProcessor, AutoModel, AutoProcessor from ultralytics import YOLO import insightface from insightface.app import FaceAnalysis from src.core.config import ( MAX_IMAGE_SIZE, MAX_CROPS, YOLO_PERSON_CLASS_ID, YOLO_MIN_CROP_PX, YOLO_CONF_THRESHOLD, DET_SIZE_PRIMARY, DET_SCALES, IOU_DEDUP_THRESHOLD, MIN_FACE_SIZE, MAX_FACES_PER_IMAGE, FACE_QUALITY_GATE, FACE_DIM, ADAFACE_DIM, FUSED_FACE_DIM, FACE_CROP_THUMB_SIZE, FACE_CROP_QUALITY, FACE_CROP_PADDING, ADAFACE_CROP_PADDING, INFERENCE_CACHE_SIZE, ENABLE_ADAFACE, HF_TOKEN, ) def _resize_pil(img: Image.Image, max_side: int = MAX_IMAGE_SIZE) -> Image.Image: w, h = img.size if max(w, h) <= max_side: return img scale = max_side / max(w, h) return img.resize((int(w * scale), int(h * scale)), Image.LANCZOS) def _crop_to_b64(img_bgr: np.ndarray, x1: int, y1: int, x2: int, y2: int) -> str: H, W = img_bgr.shape[:2] w, h = x2 - x1, y2 - y1 pad_x = int(w * FACE_CROP_PADDING) pad_y = int(h * FACE_CROP_PADDING) cx1, cy1 = max(0, x1 - pad_x), max(0, y1 - pad_y) cx2, cy2 = min(W, x2 + pad_x), min(H, y2 + pad_y) crop = img_bgr[cy1:cy2, cx1:cx2] if crop.size == 0: return "" pil = Image.fromarray(crop[:, :, ::-1]).resize((FACE_CROP_THUMB_SIZE, FACE_CROP_THUMB_SIZE), Image.LANCZOS) buf = io.BytesIO() pil.save(buf, format="JPEG", quality=FACE_CROP_QUALITY) return base64.b64encode(buf.getvalue()).decode() def _face_crop_for_adaface(img_bgr: np.ndarray, x1: int, y1: int, x2: int, y2: int) -> np.ndarray | None: H, W = img_bgr.shape[:2] w, h = x2 - x1, y2 - y1 pad_x = int(w * ADAFACE_CROP_PADDING) pad_y = int(h * ADAFACE_CROP_PADDING) cx1, cy1 = max(0, x1 - pad_x), max(0, y1 - pad_y) cx2, cy2 = min(W, x2 + pad_x), min(H, y2 + pad_y) crop = img_bgr[cy1:cy2, cx1:cx2] if crop.size == 0: return None rgb = crop[:, :, ::-1].copy() pil = Image.fromarray(rgb).resize((112, 112), Image.LANCZOS) arr = np.array(pil, dtype=np.float32) / 255.0 arr = (arr - 0.5) / 0.5 return arr.transpose(2, 0, 1) def _clahe_enhance(bgr: np.ndarray) -> np.ndarray: lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB) l_ch, a_ch, b_ch = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) l_eq = clahe.apply(l_ch) return cv2.cvtColor(cv2.merge([l_eq, a_ch, b_ch]), cv2.COLOR_LAB2BGR) def _iou(box_a: list, box_b: list) -> float: xa, ya = max(box_a[0], box_b[0]), max(box_a[1], box_b[1]) xb, yb = min(box_a[2], box_b[2]), min(box_a[3], box_b[3]) inter = max(0, xb - xa) * max(0, yb - ya) if inter == 0: return 0.0 area_a = (box_a[2] - box_a[0]) * (box_a[3] - box_a[1]) area_b = (box_b[2] - box_b[0]) * (box_b[3] - box_b[1]) return inter / (area_a + area_b - inter) def _dedup_faces(faces_list: list, iou_thresh: float = IOU_DEDUP_THRESHOLD) -> list: if not faces_list: return [] faces_list = sorted(faces_list, key=lambda f: float(f.det_score), reverse=True) kept = [] for face in faces_list: b = face.bbox.astype(int) box = [b[0], b[1], b[2], b[3]] if not any(_iou(box, [k.bbox.astype(int)[i] for i in range(4)]) > iou_thresh for k in kept): kept.append(face) return kept class AIModelManager: def __init__(self): self.device = "cuda" if torch.cuda.is_available() else "cpu" self.siglip_processor = AutoProcessor.from_pretrained("google/siglip-base-patch16-224", use_fast=True) self.siglip_model = AutoModel.from_pretrained("google/siglip-base-patch16-224").to(self.device).eval() self.dinov2_processor = AutoImageProcessor.from_pretrained("facebook/dinov2-base") self.dinov2_model = AutoModel.from_pretrained("facebook/dinov2-base").to(self.device).eval() if self.device == "cuda": self.siglip_model = self.siglip_model.half() self.dinov2_model = self.dinov2_model.half() self.yolo = YOLO("yolo11n-seg.pt") self.face_app = FaceAnalysis(name="buffalo_l", providers=["CUDAExecutionProvider", "CPUExecutionProvider"] if self.device == "cuda" else ["CPUExecutionProvider"]) self.face_app.prepare(ctx_id=0 if self.device == "cuda" else -1, det_size=DET_SIZE_PRIMARY) self.face_app.get(np.zeros((112, 112, 3), dtype=np.uint8)) self.adaface_model = None self._load_adaface() self._face_lock = threading.Lock() self._cache_lock = threading.Lock() self._cache: dict[str, list] = {} def _load_adaface(self) -> None: if not ENABLE_ADAFACE: return import os import sys REPO_ID = "minchul/cvlface_adaface_ir50_ms1mv2" CACHE_PATH = os.path.expanduser("~/.cvlface_cache/minchul/cvlface_adaface_ir50_ms1mv2") try: from huggingface_hub import hf_hub_download from transformers import AutoModel as _HFAutoModel os.makedirs(CACHE_PATH, exist_ok=True) hf_hub_download(repo_id=REPO_ID, filename="files.txt", token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False) with open(os.path.join(CACHE_PATH, "files.txt")) as f: extra = [x.strip() for x in f.read().split("\n") if x.strip()] for fname in extra + ["config.json", "wrapper.py", "model.safetensors"]: if not os.path.exists(os.path.join(CACHE_PATH, fname)): hf_hub_download(repo_id=REPO_ID, filename=fname, token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False) cwd = os.getcwd() os.chdir(CACHE_PATH) sys.path.insert(0, CACHE_PATH) try: model = _HFAutoModel.from_pretrained(CACHE_PATH, trust_remote_code=True, token=HF_TOKEN) finally: os.chdir(cwd) if CACHE_PATH in sys.path: sys.path.remove(CACHE_PATH) self.adaface_model = model.to(self.device).eval() except Exception as e: self.adaface_model = None def _adaface_embed(self, face_arr_chw: np.ndarray | None) -> np.ndarray | None: if self.adaface_model is None or face_arr_chw is None: return None try: t = torch.from_numpy(face_arr_chw).unsqueeze(0).to(self.device) if self.device == "cuda": t = t.half() with torch.no_grad(): out = self.adaface_model(t) emb = out if isinstance(out, torch.Tensor) else out.embedding return F.normalize(emb.float(), p=2, dim=1)[0].cpu().numpy() except Exception: return None def _embed_crops_batch(self, crops: list[Image.Image]) -> list[np.ndarray]: if not crops: return [] with torch.no_grad(): sig_in = self.siglip_processor(images=crops, return_tensors="pt", padding=True) sig_in = {k: v.to(self.device) for k, v in sig_in.items()} if self.device == "cuda": sig_in = {k: v.half() if v.dtype == torch.float32 else v for k, v in sig_in.items()} sig_out = self.siglip_model.get_image_features(**sig_in) if hasattr(sig_out, "image_embeds"): sig_out = sig_out.image_embeds elif hasattr(sig_out, "pooler_output"): sig_out = sig_out.pooler_output elif hasattr(sig_out, "last_hidden_state"): sig_out = sig_out.last_hidden_state[:, 0, :] elif isinstance(sig_out, tuple): sig_out = sig_out[0] sig_vecs = F.normalize(sig_out.float(), p=2, dim=1).cpu() dino_in = self.dinov2_processor(images=crops, return_tensors="pt") dino_in = {k: v.to(self.device) for k, v in dino_in.items()} if self.device == "cuda": dino_in = {k: v.half() if v.dtype == torch.float32 else v for k, v in dino_in.items()} dino_out = self.dinov2_model(**dino_in) dino_vecs = F.normalize(dino_out.last_hidden_state[:, 0, :].float(), p=2, dim=1).cpu() fused = F.normalize(torch.cat([sig_vecs, dino_vecs], dim=1), p=2, dim=1) return [fused[i].numpy() for i in range(len(crops))] def _detect_and_encode_faces(self, img_np: np.ndarray) -> list[dict]: if self.face_app is None: return [] try: if img_np.dtype != np.uint8: img_np = (img_np * 255).astype(np.uint8) bgr = img_np[:, :, ::-1].copy() if img_np.shape[2] == 3 else img_np.copy() bgr_enhanced = _clahe_enhance(bgr) all_raw_faces = [] H, W = bgr.shape[:2] for scale in DET_SCALES: scale_w, scale_h = min(W, scale[0]), min(H, scale[1]) bgr_scaled = bgr_enhanced if scale_w == W and scale_h == H else cv2.resize(bgr_enhanced, (scale_w, scale_h)) try: self.face_app.det_model.input_size = scale with self._face_lock: faces_at_scale = self.face_app.get(bgr_scaled) sx, sy = W / scale_w, H / scale_h for f in faces_at_scale: if sx != 1.0 or sy != 1.0: f.bbox[0] *= sx; f.bbox[1] *= sy; f.bbox[2] *= sx; f.bbox[3] *= sy all_raw_faces.extend(faces_at_scale) except Exception: pass bgr_flip = cv2.flip(bgr_enhanced, 1) try: self.face_app.det_model.input_size = DET_SIZE_PRIMARY with self._face_lock: faces_flip = self.face_app.get(bgr_flip) for f in faces_flip: x1, y1, x2, y2 = f.bbox f.bbox[0], f.bbox[2] = W - x2, W - x1 all_raw_faces.extend(faces_flip) except Exception: pass self.face_app.det_model.input_size = DET_SIZE_PRIMARY faces = _dedup_faces(all_raw_faces) results, accepted = [], 0 for face in faces: if accepted >= MAX_FACES_PER_IMAGE: break bbox_raw = face.bbox.astype(int) x1, y1, x2, y2 = bbox_raw x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(bgr.shape[1], x2), min(bgr.shape[0], y2) w, h = x2 - x1, y2 - y1 if w < MIN_FACE_SIZE or h < MIN_FACE_SIZE: continue det_score = float(face.det_score) if hasattr(face, "det_score") else 1.0 if det_score < FACE_QUALITY_GATE or face.embedding is None: continue arcface_vec = face.embedding.astype(np.float32) n = np.linalg.norm(arcface_vec) if n > 0: arcface_vec = arcface_vec / n face_chw = _face_crop_for_adaface(bgr, x1, y1, x2, y2) adaface_vec = self._adaface_embed(face_chw) fused_raw = np.concatenate([arcface_vec, adaface_vec]) if adaface_vec is not None else np.concatenate([arcface_vec, np.zeros(ADAFACE_DIM, dtype=np.float32)]) n2 = np.linalg.norm(fused_raw) final_vec = (fused_raw / n2) if n2 > 0 else fused_raw results.append({ "type": "face", "vector": final_vec, "face_idx": accepted, "bbox": [int(x1), int(y1), int(w), int(h)], "face_crop": _crop_to_b64(bgr, x1, y1, x2, y2), "det_score": det_score, "face_width_px": int(w), }) accepted += 1 return results except Exception: return [] def process_image_bytes(self, image_bytes: bytes, detect_faces: bool = True) -> list[dict]: file_hash = hashlib.md5(image_bytes[:65536]).hexdigest() cache_key = f"{file_hash}_{detect_faces}" with self._cache_lock: if cache_key in self._cache: return list(self._cache[cache_key]) extracted = [] original_pil = Image.open(io.BytesIO(image_bytes)).convert("RGB") img_np = np.array(original_pil) faces_found = False if detect_faces and hasattr(self, 'face_app') and self.face_app is not None: face_results = self._detect_and_encode_faces(img_np) if face_results: faces_found = True extracted.extend(face_results) crops: list[Image.Image] = [] yolo_results = getattr(self, 'yolo', lambda x, **kwargs: [])(original_pil, conf=YOLO_CONF_THRESHOLD, verbose=False) for r in yolo_results: if r.masks is not None: for seg_idx, mask_xy in enumerate(r.masks.xy): cls_id = int(r.boxes.cls[seg_idx].item()) if faces_found and cls_id == YOLO_PERSON_CLASS_ID: continue polygon = np.array(mask_xy, dtype=np.int32) if len(polygon) < 3: continue x, y, w, h = cv2.boundingRect(polygon) if w < YOLO_MIN_CROP_PX or h < YOLO_MIN_CROP_PX: continue crops.append(original_pil.crop((x, y, x + w, y + h))) if len(crops) >= MAX_CROPS: break elif r.boxes is not None: for box in r.boxes: cls_id = int(box.cls.item()) if faces_found and cls_id == YOLO_PERSON_CLASS_ID: continue x1, y1, x2, y2 = box.xyxy[0].tolist() if (x2 - x1) < YOLO_MIN_CROP_PX or (y2 - y1) < YOLO_MIN_CROP_PX: continue crops.append(original_pil.crop((x1, y1, x2, y2))) if len(crops) >= MAX_CROPS: break all_crops = [_resize_pil(c, MAX_IMAGE_SIZE) for c in [original_pil] + crops] obj_vecs = self._embed_crops_batch(all_crops) extracted.extend({"type": "object", "vector": v} for v in obj_vecs) with self._cache_lock: if len(self._cache) >= INFERENCE_CACHE_SIZE: oldest = next(iter(self._cache)) del self._cache[oldest] self._cache[cache_key] = list(extracted) return extracted async def process_image_bytes_async(self, image_bytes: bytes, detect_faces: bool = True) -> list[dict]: loop = asyncio.get_event_loop() return await loop.run_in_executor( None, functools.partial(self.process_image_bytes, image_bytes, detect_faces), )