Spaces:
Running
Running
| from __future__ import annotations | |
| import os | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Optional, Tuple | |
| from loguru import logger | |
| from config import settings | |
| class ModelLoader: | |
| """Singleton holder for preloaded AI models. Thread-safe lazy init.""" | |
| _instance: Optional["ModelLoader"] = None | |
| _lock: Lock = Lock() | |
| def __new__(cls) -> "ModelLoader": | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._image_model = None | |
| cls._instance._image_processor = None | |
| cls._instance._general_image_model = None | |
| cls._instance._general_image_processor = None | |
| cls._instance._general_image_unavailable = False | |
| cls._instance._diffusion_image_model = None | |
| cls._instance._diffusion_image_processor = None | |
| cls._instance._diffusion_image_unavailable = False | |
| cls._instance._text_pipeline = None | |
| cls._instance._multilang_text_pipeline = None | |
| cls._instance._ocr_reader = None | |
| cls._instance._face_detector = None | |
| cls._instance._face_detector_unavailable = False | |
| cls._instance._spacy_nlp = None | |
| cls._instance._sentence_transformer = None | |
| cls._instance._efficientnet_detector = None | |
| cls._instance._ffpp_model = None | |
| cls._instance._ffpp_processor = None | |
| return cls._instance | |
| def get_instance(cls) -> "ModelLoader": | |
| return cls() | |
| # ---------- Image (ViT deepfake classifier) ---------- | |
| def load_image_model(self) -> Tuple[object, object]: | |
| if self._image_model is None: | |
| logger.info(f"Loading image model: {settings.IMAGE_MODEL_ID}") | |
| from transformers import AutoImageProcessor, AutoModelForImageClassification | |
| self._image_processor = AutoImageProcessor.from_pretrained(settings.IMAGE_MODEL_ID) | |
| model = AutoModelForImageClassification.from_pretrained(settings.IMAGE_MODEL_ID) | |
| model.to(settings.DEVICE) | |
| model.eval() | |
| self._image_model = model | |
| logger.info("Image model loaded") | |
| return self._image_model, self._image_processor | |
| # ---------- General AI image detector (no-face scenes / objects / art) ---------- | |
| def load_general_image_model(self) -> Optional[Tuple[object, object]]: | |
| if self._general_image_unavailable: | |
| return None | |
| if self._general_image_model is None: | |
| try: | |
| logger.info(f"Loading general AI image model: {settings.GENERAL_IMAGE_MODEL_ID}") | |
| from transformers import AutoImageProcessor, AutoModelForImageClassification | |
| self._general_image_processor = AutoImageProcessor.from_pretrained(settings.GENERAL_IMAGE_MODEL_ID) | |
| model = AutoModelForImageClassification.from_pretrained(settings.GENERAL_IMAGE_MODEL_ID) | |
| model.to(settings.DEVICE) | |
| model.eval() | |
| self._general_image_model = model | |
| logger.info("General AI image model loaded") | |
| except Exception as e: # noqa: BLE001 | |
| self._general_image_unavailable = True | |
| logger.warning(f"General AI image model load failed: {e}") | |
| return None | |
| return self._general_image_model, self._general_image_processor | |
| # ---------- Diffusion AI-image detector (Phase C1/C2) ---------- | |
| def load_diffusion_image_model(self) -> Optional[Tuple[object, object]]: | |
| """Lazy-load the diffusion-specialised AI-image detector. | |
| Uses DIFFUSION_IMAGE_MODEL_ID (default: haywoodsloan/ai-image-detector-deploy). | |
| Returns None when disabled, model ID is empty, or load fails. | |
| """ | |
| if not settings.DIFFUSION_MODEL_ENABLED: | |
| return None | |
| model_id = settings.DIFFUSION_IMAGE_MODEL_ID.strip() | |
| if not model_id: | |
| return None | |
| if self._diffusion_image_unavailable: | |
| return None | |
| if self._diffusion_image_model is not None: | |
| return self._diffusion_image_model, self._diffusion_image_processor | |
| try: | |
| logger.info(f"Loading diffusion AI-image detector: {model_id}") | |
| from transformers import AutoImageProcessor, AutoModelForImageClassification | |
| self._diffusion_image_processor = AutoImageProcessor.from_pretrained(model_id) | |
| model = AutoModelForImageClassification.from_pretrained(model_id) | |
| model.to(settings.DEVICE) | |
| model.eval() | |
| self._diffusion_image_model = model | |
| logger.info("Diffusion AI-image detector loaded") | |
| except Exception as e: # noqa: BLE001 | |
| self._diffusion_image_unavailable = True | |
| logger.warning(f"Diffusion AI-image detector load failed (continuing without it): {e}") | |
| return None | |
| return self._diffusion_image_model, self._diffusion_image_processor | |
| # ---------- Text (BERT fake-news classifier — English) ---------- | |
| def load_text_model(self): | |
| if self._text_pipeline is None: | |
| logger.info(f"Loading text model: {settings.TEXT_MODEL_ID}") | |
| from transformers import pipeline | |
| self._text_pipeline = pipeline( | |
| "text-classification", | |
| model=settings.TEXT_MODEL_ID, | |
| device=0 if settings.DEVICE == "cuda" else -1, | |
| ) | |
| logger.info("Text model loaded") | |
| return self._text_pipeline | |
| # ---------- Multilingual text model (Phase 13) ---------- | |
| def load_multilang_text_model(self): | |
| """Load multilingual fake-news classifier. Falls back to English model if not configured.""" | |
| model_id = settings.TEXT_MULTILANG_MODEL_ID | |
| if not model_id: | |
| logger.debug("TEXT_MULTILANG_MODEL_ID not set — falling back to English text model") | |
| return self.load_text_model() | |
| if self._multilang_text_pipeline is None: | |
| logger.info(f"Loading multilingual text model: {model_id}") | |
| from transformers import pipeline | |
| self._multilang_text_pipeline = pipeline( | |
| "text-classification", | |
| model=model_id, | |
| device=0 if settings.DEVICE == "cuda" else -1, | |
| ) | |
| logger.info("Multilingual text model loaded") | |
| return self._multilang_text_pipeline | |
| # ---------- spaCy NLP (Phase 13 NER) ---------- | |
| def load_spacy_nlp(self): | |
| """Lazy-load spaCy English NLP model. Returns None if spaCy is not installed.""" | |
| if self._spacy_nlp is None: | |
| try: | |
| import spacy # type: ignore | |
| try: | |
| self._spacy_nlp = spacy.load("en_core_web_sm") | |
| logger.info("spaCy en_core_web_sm loaded") | |
| except OSError: | |
| logger.warning( | |
| "spaCy model 'en_core_web_sm' not found. " | |
| "Run: python -m spacy download en_core_web_sm" | |
| ) | |
| return None | |
| except ImportError: | |
| logger.warning("spaCy not installed — NER keyword extraction disabled") | |
| return None | |
| return self._spacy_nlp | |
| # ---------- Sentence-Transformer (Phase 13 truth-override) ---------- | |
| def load_sentence_transformer(self): | |
| """Lazy-load sentence-transformers/all-MiniLM-L6-v2. Returns None if not installed.""" | |
| if self._sentence_transformer is None: | |
| try: | |
| from sentence_transformers import SentenceTransformer # type: ignore | |
| self._sentence_transformer = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| logger.info("Sentence-transformer (all-MiniLM-L6-v2) loaded") | |
| except ImportError: | |
| logger.warning("sentence-transformers not installed — truth-override disabled") | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Sentence-transformer load failed: {e}") | |
| return None | |
| return self._sentence_transformer | |
| # ---------- OCR (EasyOCR) — Phase 13: use OCR_LANGS from config ---------- | |
| def load_ocr_engine(self): | |
| if self._ocr_reader is None: | |
| langs = [l.strip() for l in settings.OCR_LANGS.split(",") if l.strip()] | |
| if not langs: | |
| langs = ["en"] | |
| logger.info(f"Loading EasyOCR reader (langs: {langs})") | |
| import easyocr # type: ignore | |
| self._ocr_reader = easyocr.Reader( | |
| langs, gpu=(settings.DEVICE == "cuda"), verbose=False, download_enabled=True, | |
| ) | |
| logger.info("EasyOCR loaded") | |
| return self._ocr_reader | |
| # ---------- Face detector (MediaPipe) ---------- | |
| def load_face_detector(self): | |
| if self._face_detector_unavailable: | |
| return None | |
| if self._face_detector is None: | |
| logger.info("Loading MediaPipe FaceMesh") | |
| try: | |
| import mediapipe as mp # type: ignore | |
| if not hasattr(mp, "solutions"): | |
| raise ImportError("installed mediapipe package has no solutions API") | |
| self._face_detector = mp.solutions.face_mesh.FaceMesh( | |
| static_image_mode=True, | |
| max_num_faces=5, | |
| min_detection_confidence=0.5, | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| self._face_detector_unavailable = True | |
| logger.warning(f"MediaPipe FaceMesh unavailable: {exc}") | |
| return None | |
| logger.info("MediaPipe FaceMesh loaded") | |
| return self._face_detector | |
| # ---------- EfficientNetAutoAttB4 (ICPR2020 / DeepShield1 merge) ---------- | |
| def load_efficientnet(self): | |
| """Lazy-load EfficientNetAutoAttB4 detector. Returns None if deps are missing.""" | |
| if self._efficientnet_detector is None: | |
| try: | |
| from services.efficientnet_service import EfficientNetDetector | |
| self._efficientnet_detector = EfficientNetDetector( | |
| model_name=settings.EFFICIENTNET_MODEL, | |
| train_db=settings.EFFICIENTNET_TRAIN_DB, | |
| device=settings.DEVICE, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"EfficientNet load failed (continuing without it): {e}") | |
| return None | |
| return self._efficientnet_detector | |
| # ---------- FFPP-fine-tuned ViT (Phase 11.3) ---------- | |
| def _download_ffpp_checkpoint(self) -> Optional[Path]: | |
| repo_id = settings.FFPP_MODEL_REPO_ID.strip() | |
| if not repo_id: | |
| return None | |
| try: | |
| from huggingface_hub import snapshot_download | |
| except Exception as e: | |
| logger.warning(f"huggingface_hub unavailable for FFPP checkpoint download: {e}") | |
| return None | |
| try: | |
| revision = settings.FFPP_MODEL_REVISION.strip() or "main" | |
| token = os.getenv("HF_TOKEN") or None | |
| logger.info(f"Downloading FFPP ViT checkpoint from Hub: {repo_id}@{revision}") | |
| snapshot_dir = snapshot_download( | |
| repo_id=repo_id, | |
| repo_type="model", | |
| revision=revision, | |
| allow_patterns=["config.json", "model.safetensors"], | |
| token=token, | |
| ) | |
| checkpoint_dir = Path(snapshot_dir) | |
| if not (checkpoint_dir / "config.json").exists(): | |
| logger.warning( | |
| f"Downloaded FFPP checkpoint from {repo_id} but config.json is missing" | |
| ) | |
| return None | |
| return checkpoint_dir | |
| except Exception as e: | |
| logger.warning(f"FFPP checkpoint download failed from {repo_id}: {e}") | |
| return None | |
| def load_ffpp_model(self) -> Optional[Tuple[object, object]]: | |
| """Lazy-load the FaceForensics++ fine-tuned ViT from a local checkpoint. | |
| The checkpoint directory was exported from Colab with only | |
| `model.safetensors` + `config.json` (no preprocessor_config.json), so the | |
| image processor is loaded from the base google/vit-base-patch16-224-in21k | |
| — this matches the processor used during training. | |
| Returns None if disabled or the checkpoint is missing. | |
| """ | |
| if not settings.FFPP_ENABLED: | |
| return None | |
| if self._ffpp_model is not None: | |
| return self._ffpp_model, self._ffpp_processor | |
| configured_path = Path(settings.FFPP_MODEL_PATH) | |
| repo_root = Path(__file__).resolve().parent.parent.parent | |
| candidates = [configured_path] if configured_path.is_absolute() else [ | |
| (repo_root / configured_path).resolve(), | |
| (Path.cwd() / configured_path).resolve(), | |
| (repo_root / "trained_models").resolve(), | |
| ] | |
| ckpt_path = next((p for p in candidates if (p / "config.json").exists()), candidates[0]) | |
| if not (ckpt_path / "config.json").exists(): | |
| downloaded = self._download_ffpp_checkpoint() | |
| if downloaded is not None: | |
| ckpt_path = downloaded | |
| else: | |
| tried = ", ".join(str(p) for p in candidates) | |
| logger.warning(f"FFPP ViT checkpoint not found. Tried: {tried} — skipping") | |
| return None | |
| try: | |
| from transformers import AutoImageProcessor, AutoModelForImageClassification | |
| logger.info(f"Loading FFPP ViT model from {ckpt_path}") | |
| processor = AutoImageProcessor.from_pretrained(settings.FFPP_BASE_PROCESSOR_ID) | |
| model = AutoModelForImageClassification.from_pretrained(str(ckpt_path)) | |
| model.to(settings.DEVICE) | |
| model.eval() | |
| self._ffpp_model = model | |
| self._ffpp_processor = processor | |
| logger.info("FFPP ViT model loaded") | |
| return self._ffpp_model, self._ffpp_processor | |
| except Exception as e: | |
| logger.warning(f"FFPP ViT load failed (continuing without it): {e}") | |
| return None | |
| # ---------- Preload ---------- | |
| def preload_phase1(self) -> None: | |
| """Preload all core models to prevent lazy-loading delays during first analysis.""" | |
| self.load_image_model() | |
| self.load_general_image_model() | |
| self.load_diffusion_image_model() | |
| self.load_face_detector() | |
| self.load_efficientnet() | |
| self.load_ffpp_model() | |
| self.load_ocr_engine() | |
| self.load_text_model() | |
| self.load_multilang_text_model() | |
| self.load_spacy_nlp() | |
| self.load_sentence_transformer() | |
| def is_ready(self) -> bool: | |
| """Phase 19.5 — readiness signal for /health/ready. | |
| When PRELOAD_MODELS is enabled, readiness = image model loaded. | |
| Otherwise the loader constructs successfully → ready (lazy-load on demand). | |
| """ | |
| if settings.PRELOAD_MODELS: | |
| return self._image_model is not None | |
| return True | |
| def get_model_loader() -> ModelLoader: | |
| return ModelLoader.get_instance() | |