deepshield / models /model_loader.py
ar07xd's picture
Sync from GitHub via hub-sync
711bdfc verified
from __future__ import annotations
import os
from pathlib import Path
from threading import Lock
from typing import Optional, Tuple
from loguru import logger
from config import settings
class ModelLoader:
"""Singleton holder for preloaded AI models. Thread-safe lazy init."""
_instance: Optional["ModelLoader"] = None
_lock: Lock = Lock()
def __new__(cls) -> "ModelLoader":
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._image_model = None
cls._instance._image_processor = None
cls._instance._general_image_model = None
cls._instance._general_image_processor = None
cls._instance._general_image_unavailable = False
cls._instance._diffusion_image_model = None
cls._instance._diffusion_image_processor = None
cls._instance._diffusion_image_unavailable = False
cls._instance._text_pipeline = None
cls._instance._multilang_text_pipeline = None
cls._instance._ocr_reader = None
cls._instance._face_detector = None
cls._instance._face_detector_unavailable = False
cls._instance._spacy_nlp = None
cls._instance._sentence_transformer = None
cls._instance._efficientnet_detector = None
cls._instance._ffpp_model = None
cls._instance._ffpp_processor = None
return cls._instance
@classmethod
def get_instance(cls) -> "ModelLoader":
return cls()
# ---------- Image (ViT deepfake classifier) ----------
def load_image_model(self) -> Tuple[object, object]:
if self._image_model is None:
logger.info(f"Loading image model: {settings.IMAGE_MODEL_ID}")
from transformers import AutoImageProcessor, AutoModelForImageClassification
self._image_processor = AutoImageProcessor.from_pretrained(settings.IMAGE_MODEL_ID)
model = AutoModelForImageClassification.from_pretrained(settings.IMAGE_MODEL_ID)
model.to(settings.DEVICE)
model.eval()
self._image_model = model
logger.info("Image model loaded")
return self._image_model, self._image_processor
# ---------- General AI image detector (no-face scenes / objects / art) ----------
def load_general_image_model(self) -> Optional[Tuple[object, object]]:
if self._general_image_unavailable:
return None
if self._general_image_model is None:
try:
logger.info(f"Loading general AI image model: {settings.GENERAL_IMAGE_MODEL_ID}")
from transformers import AutoImageProcessor, AutoModelForImageClassification
self._general_image_processor = AutoImageProcessor.from_pretrained(settings.GENERAL_IMAGE_MODEL_ID)
model = AutoModelForImageClassification.from_pretrained(settings.GENERAL_IMAGE_MODEL_ID)
model.to(settings.DEVICE)
model.eval()
self._general_image_model = model
logger.info("General AI image model loaded")
except Exception as e: # noqa: BLE001
self._general_image_unavailable = True
logger.warning(f"General AI image model load failed: {e}")
return None
return self._general_image_model, self._general_image_processor
# ---------- Diffusion AI-image detector (Phase C1/C2) ----------
def load_diffusion_image_model(self) -> Optional[Tuple[object, object]]:
"""Lazy-load the diffusion-specialised AI-image detector.
Uses DIFFUSION_IMAGE_MODEL_ID (default: haywoodsloan/ai-image-detector-deploy).
Returns None when disabled, model ID is empty, or load fails.
"""
if not settings.DIFFUSION_MODEL_ENABLED:
return None
model_id = settings.DIFFUSION_IMAGE_MODEL_ID.strip()
if not model_id:
return None
if self._diffusion_image_unavailable:
return None
if self._diffusion_image_model is not None:
return self._diffusion_image_model, self._diffusion_image_processor
try:
logger.info(f"Loading diffusion AI-image detector: {model_id}")
from transformers import AutoImageProcessor, AutoModelForImageClassification
self._diffusion_image_processor = AutoImageProcessor.from_pretrained(model_id)
model = AutoModelForImageClassification.from_pretrained(model_id)
model.to(settings.DEVICE)
model.eval()
self._diffusion_image_model = model
logger.info("Diffusion AI-image detector loaded")
except Exception as e: # noqa: BLE001
self._diffusion_image_unavailable = True
logger.warning(f"Diffusion AI-image detector load failed (continuing without it): {e}")
return None
return self._diffusion_image_model, self._diffusion_image_processor
# ---------- Text (BERT fake-news classifier — English) ----------
def load_text_model(self):
if self._text_pipeline is None:
logger.info(f"Loading text model: {settings.TEXT_MODEL_ID}")
from transformers import pipeline
self._text_pipeline = pipeline(
"text-classification",
model=settings.TEXT_MODEL_ID,
device=0 if settings.DEVICE == "cuda" else -1,
)
logger.info("Text model loaded")
return self._text_pipeline
# ---------- Multilingual text model (Phase 13) ----------
def load_multilang_text_model(self):
"""Load multilingual fake-news classifier. Falls back to English model if not configured."""
model_id = settings.TEXT_MULTILANG_MODEL_ID
if not model_id:
logger.debug("TEXT_MULTILANG_MODEL_ID not set — falling back to English text model")
return self.load_text_model()
if self._multilang_text_pipeline is None:
logger.info(f"Loading multilingual text model: {model_id}")
from transformers import pipeline
self._multilang_text_pipeline = pipeline(
"text-classification",
model=model_id,
device=0 if settings.DEVICE == "cuda" else -1,
)
logger.info("Multilingual text model loaded")
return self._multilang_text_pipeline
# ---------- spaCy NLP (Phase 13 NER) ----------
def load_spacy_nlp(self):
"""Lazy-load spaCy English NLP model. Returns None if spaCy is not installed."""
if self._spacy_nlp is None:
try:
import spacy # type: ignore
try:
self._spacy_nlp = spacy.load("en_core_web_sm")
logger.info("spaCy en_core_web_sm loaded")
except OSError:
logger.warning(
"spaCy model 'en_core_web_sm' not found. "
"Run: python -m spacy download en_core_web_sm"
)
return None
except ImportError:
logger.warning("spaCy not installed — NER keyword extraction disabled")
return None
return self._spacy_nlp
# ---------- Sentence-Transformer (Phase 13 truth-override) ----------
def load_sentence_transformer(self):
"""Lazy-load sentence-transformers/all-MiniLM-L6-v2. Returns None if not installed."""
if self._sentence_transformer is None:
try:
from sentence_transformers import SentenceTransformer # type: ignore
self._sentence_transformer = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
logger.info("Sentence-transformer (all-MiniLM-L6-v2) loaded")
except ImportError:
logger.warning("sentence-transformers not installed — truth-override disabled")
return None
except Exception as e:
logger.warning(f"Sentence-transformer load failed: {e}")
return None
return self._sentence_transformer
# ---------- OCR (EasyOCR) — Phase 13: use OCR_LANGS from config ----------
def load_ocr_engine(self):
if self._ocr_reader is None:
langs = [l.strip() for l in settings.OCR_LANGS.split(",") if l.strip()]
if not langs:
langs = ["en"]
logger.info(f"Loading EasyOCR reader (langs: {langs})")
import easyocr # type: ignore
self._ocr_reader = easyocr.Reader(
langs, gpu=(settings.DEVICE == "cuda"), verbose=False, download_enabled=True,
)
logger.info("EasyOCR loaded")
return self._ocr_reader
# ---------- Face detector (MediaPipe) ----------
def load_face_detector(self):
if self._face_detector_unavailable:
return None
if self._face_detector is None:
logger.info("Loading MediaPipe FaceMesh")
try:
import mediapipe as mp # type: ignore
if not hasattr(mp, "solutions"):
raise ImportError("installed mediapipe package has no solutions API")
self._face_detector = mp.solutions.face_mesh.FaceMesh(
static_image_mode=True,
max_num_faces=5,
min_detection_confidence=0.5,
)
except Exception as exc: # noqa: BLE001
self._face_detector_unavailable = True
logger.warning(f"MediaPipe FaceMesh unavailable: {exc}")
return None
logger.info("MediaPipe FaceMesh loaded")
return self._face_detector
# ---------- EfficientNetAutoAttB4 (ICPR2020 / DeepShield1 merge) ----------
def load_efficientnet(self):
"""Lazy-load EfficientNetAutoAttB4 detector. Returns None if deps are missing."""
if self._efficientnet_detector is None:
try:
from services.efficientnet_service import EfficientNetDetector
self._efficientnet_detector = EfficientNetDetector(
model_name=settings.EFFICIENTNET_MODEL,
train_db=settings.EFFICIENTNET_TRAIN_DB,
device=settings.DEVICE,
)
except Exception as e:
logger.warning(f"EfficientNet load failed (continuing without it): {e}")
return None
return self._efficientnet_detector
# ---------- FFPP-fine-tuned ViT (Phase 11.3) ----------
def _download_ffpp_checkpoint(self) -> Optional[Path]:
repo_id = settings.FFPP_MODEL_REPO_ID.strip()
if not repo_id:
return None
try:
from huggingface_hub import snapshot_download
except Exception as e:
logger.warning(f"huggingface_hub unavailable for FFPP checkpoint download: {e}")
return None
try:
revision = settings.FFPP_MODEL_REVISION.strip() or "main"
token = os.getenv("HF_TOKEN") or None
logger.info(f"Downloading FFPP ViT checkpoint from Hub: {repo_id}@{revision}")
snapshot_dir = snapshot_download(
repo_id=repo_id,
repo_type="model",
revision=revision,
allow_patterns=["config.json", "model.safetensors"],
token=token,
)
checkpoint_dir = Path(snapshot_dir)
if not (checkpoint_dir / "config.json").exists():
logger.warning(
f"Downloaded FFPP checkpoint from {repo_id} but config.json is missing"
)
return None
return checkpoint_dir
except Exception as e:
logger.warning(f"FFPP checkpoint download failed from {repo_id}: {e}")
return None
def load_ffpp_model(self) -> Optional[Tuple[object, object]]:
"""Lazy-load the FaceForensics++ fine-tuned ViT from a local checkpoint.
The checkpoint directory was exported from Colab with only
`model.safetensors` + `config.json` (no preprocessor_config.json), so the
image processor is loaded from the base google/vit-base-patch16-224-in21k
— this matches the processor used during training.
Returns None if disabled or the checkpoint is missing.
"""
if not settings.FFPP_ENABLED:
return None
if self._ffpp_model is not None:
return self._ffpp_model, self._ffpp_processor
configured_path = Path(settings.FFPP_MODEL_PATH)
repo_root = Path(__file__).resolve().parent.parent.parent
candidates = [configured_path] if configured_path.is_absolute() else [
(repo_root / configured_path).resolve(),
(Path.cwd() / configured_path).resolve(),
(repo_root / "trained_models").resolve(),
]
ckpt_path = next((p for p in candidates if (p / "config.json").exists()), candidates[0])
if not (ckpt_path / "config.json").exists():
downloaded = self._download_ffpp_checkpoint()
if downloaded is not None:
ckpt_path = downloaded
else:
tried = ", ".join(str(p) for p in candidates)
logger.warning(f"FFPP ViT checkpoint not found. Tried: {tried} — skipping")
return None
try:
from transformers import AutoImageProcessor, AutoModelForImageClassification
logger.info(f"Loading FFPP ViT model from {ckpt_path}")
processor = AutoImageProcessor.from_pretrained(settings.FFPP_BASE_PROCESSOR_ID)
model = AutoModelForImageClassification.from_pretrained(str(ckpt_path))
model.to(settings.DEVICE)
model.eval()
self._ffpp_model = model
self._ffpp_processor = processor
logger.info("FFPP ViT model loaded")
return self._ffpp_model, self._ffpp_processor
except Exception as e:
logger.warning(f"FFPP ViT load failed (continuing without it): {e}")
return None
# ---------- Preload ----------
def preload_phase1(self) -> None:
"""Preload all core models to prevent lazy-loading delays during first analysis."""
self.load_image_model()
self.load_general_image_model()
self.load_diffusion_image_model()
self.load_face_detector()
self.load_efficientnet()
self.load_ffpp_model()
self.load_ocr_engine()
self.load_text_model()
self.load_multilang_text_model()
self.load_spacy_nlp()
self.load_sentence_transformer()
def is_ready(self) -> bool:
"""Phase 19.5 — readiness signal for /health/ready.
When PRELOAD_MODELS is enabled, readiness = image model loaded.
Otherwise the loader constructs successfully → ready (lazy-load on demand).
"""
if settings.PRELOAD_MODELS:
return self._image_model is not None
return True
def get_model_loader() -> ModelLoader:
return ModelLoader.get_instance()