| """ |
| Production-Ready AI Content Detector (v3 - Enhanced Ensemble) |
| ============================================================== |
| Multi-modal detection: Image, Audio, Text |
| |
| Uses trained meta-classifiers (LogReg) that combine multiple models + features |
| per modality for maximum accuracy. v3 adds: |
| - Bombek1 SigLIP2+DINOv2 image detector (0.9997 AUC, JPEG-robust) |
| - DF_Arena_1B audio model (Speech DF Arena, 8 training datasets) |
| - fakespot-ai RoBERTa text detector (Mozilla-backed, catches GPT technical) |
| |
| Usage: |
| detector = AIContentDetector() |
| result = detector.detect_image("photo.jpg") |
| result = detector.detect_audio("voice.wav") |
| result = detector.detect_text("Some text to analyze...") |
| result = detector.detect_video("clip.mp4") # frames + audio analysis |
| results = detector.detect_images_batch(["img1.jpg", "img2.png"]) |
| """ |
|
|
| import sys, os |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
| try: |
| import fix_torchcodec |
| except ImportError: |
| pass |
|
|
| import torch |
| import numpy as np |
| import soundfile as sf |
| from PIL import Image |
| from typing import Union, List, Dict, Optional |
| import io |
| import math |
| from collections import Counter |
| from torchvision import transforms as tv_transforms |
|
|
|
|
| |
| |
| |
| _IMG_SCALER_MEAN = [0.46721075337286583, 0.4332848905084707, 0.34848470501282125, 0.7513610315914312, -2.7428234702735845, 1.4757695660114816e-05, 0.47213903127932083, 0.5310949190042461] |
| _IMG_SCALER_SCALE = [0.4562829992667211, 0.4653274721438903, 0.2594560381028844, 0.2566914952700282, 0.31761878154208484, 1.745336794888413e-05, 0.4468171423032323, 0.4707389622737817] |
| _IMG_LR_COEF = [0.6488963010751596, 0.19470730198227582, 0.3669096091179738, -1.1058065882150858, -0.47635552888598026, -0.015401252102331365, 2.5029078795863406, 1.237011726618108] |
| _IMG_LR_INTERCEPT = -0.7403570533419102 |
|
|
| |
| |
| _AUD_SCALER_MEAN = [0.5667607612050348, 0.2773010993612484, 0.23310774392822925, 0.03141037016224877, 1807.2398348786571, 897.18004887457, 0.12301036345108962, 6620.40736210088, 0.5433762406366287] |
| _AUD_SCALER_SCALE = [0.48680867334512096, 0.29197482864644153, 0.4211570130989059, 0.024618810573647662, 459.40344999868597, 394.8528855416117, 0.046570088698838365, 829.6553459300637, 0.4155082795685684] |
| _AUD_LR_COEF = [0.7845433297452213, -0.25601227158569434, 0.38715143588917217, 0.5305971113288093, 0.14191280089652655, 1.7648106776858394, -1.6174243839603224, -1.09787021389514, 1.092684667819162] |
| _AUD_LR_INTERCEPT = 0.39250921446958165 |
|
|
| |
| _TXT_SCALER_MEAN = [1.1353826005329457, 0.33250804246780497, -0.48164806951384675, 5.916446148470062, 0.6490103211442594, 0.5124573713819743, 5.220866125485708, 0.6364287314816944] |
| _TXT_SCALER_SCALE = [0.19535976595611237, 0.45007809250809544, 0.21119484430166974, 1.1937958293169302, 0.19352867829552858, 0.21389850106439456, 1.2135677101079925, 0.43094435530407293] |
| _TXT_LR_COEF = [-0.6243579398646565, 0.389259232075374, -0.5040499517552531, -0.21291399657541557, -0.08360375807827485, -0.014109874794709326, 0.22446151217916235, 1.2266905154327146] |
| _TXT_LR_INTERCEPT = 0.1964292008569683 |
|
|
|
|
| def _logistic_predict(features, scaler_mean, scaler_scale, coef, intercept): |
| """Apply StandardScaler + LogisticRegression prediction.""" |
| x = np.array(features, dtype=np.float64) |
| x_scaled = (x - np.array(scaler_mean)) / np.array(scaler_scale) |
| logit = float(np.dot(x_scaled, np.array(coef)) + intercept) |
| prob = 1.0 / (1.0 + math.exp(-logit)) |
| return prob |
|
|
|
|
| class AIContentDetector: |
| """Production-ready multi-modal AI content detector with stacking ensembles.""" |
|
|
| def __init__(self, device: str = "auto", load_image=True, load_audio=True, load_text=True, |
| quantize_text: bool = True, compile_models: bool = True): |
| """ |
| Initialize detector. Only loads models for requested modalities. |
| |
| Args: |
| device: "auto", "cuda", or "cpu" |
| load_image: Load image detection models (4 ViT classifiers) |
| load_audio: Load audio detection models (2 wav2vec2 classifiers) |
| load_text: Load text detection models (Falcon-7B pair + RoBERTa) |
| quantize_text: Use INT8 for Falcon-7B (halves VRAM: 26GBβ13GB) |
| compile_models: Use torch.compile for 10-30% speedup (slow first call) |
| """ |
| if device == "auto": |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| else: |
| self.device = device |
| self._quantize_text = quantize_text |
| self._compile_models = compile_models |
|
|
| self._image_models = None |
| self._audio_models = None |
| self._text_models = None |
|
|
| if load_image: |
| self._load_image_models() |
| if load_audio: |
| self._load_audio_models() |
| if load_text: |
| self._load_text_models() |
|
|
| |
|
|
| def _load_image_models(self): |
| from transformers import pipeline as hf_pipeline |
| from transformers import AutoModelForImageClassification |
| print("Loading 4 ViT + SPAI + Bombek1 image detectors...") |
| dev = 0 if self.device == "cuda" else -1 |
|
|
| def _load_image_pipeline(model_id): |
| """Load image-classification pipeline with transformers 5.x compatibility.""" |
| try: |
| return hf_pipeline("image-classification", model=model_id, device=dev) |
| except (ValueError, OSError): |
| |
| from transformers import ViTImageProcessor |
| img_proc = ViTImageProcessor.from_pretrained(model_id) |
| model = AutoModelForImageClassification.from_pretrained(model_id) |
| return hf_pipeline("image-classification", model=model, image_processor=img_proc, device=dev) |
|
|
| self._image_models = [ |
| _load_image_pipeline("NYUAD-ComNets/NYUAD_AI-generated_images_detector"), |
| _load_image_pipeline("Organika/sdxl-detector"), |
| _load_image_pipeline("umm-maybe/AI-image-detector"), |
| _load_image_pipeline("dima806/ai_vs_real_image_detection"), |
| ] |
|
|
| |
| self._bombek_model = None |
| try: |
| from huggingface_hub import hf_hub_download |
| import importlib.util |
| model_pt = hf_hub_download( |
| repo_id="Bombek1/ai-image-detector-siglip-dinov2", |
| filename="pytorch_model.pt" |
| ) |
| model_py = hf_hub_download( |
| repo_id="Bombek1/ai-image-detector-siglip-dinov2", |
| filename="model.py" |
| ) |
| spec = importlib.util.spec_from_file_location("bombek_model", model_py) |
| bombek_mod = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(bombek_mod) |
| self._bombek_model = bombek_mod.AIImageDetector(model_pt, device=self.device) |
| print(" Bombek1 SigLIP2+DINOv2 loaded (0.9997 AUC)") |
| except Exception as e: |
| print(f" Warning: Bombek1 failed to load: {e}") |
|
|
| |
| self._spai_model = None |
| self._spai_to_tensor = tv_transforms.ToTensor() |
| spai_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "spai_repo") |
| spai_weights = os.path.join(spai_dir, "weights", "spai.pth") |
| if os.path.exists(spai_weights): |
| try: |
| sys.path.insert(0, spai_dir) |
| from spai.config import get_custom_config |
| from spai.models.build import build_cls_model |
| from spai.utils import load_pretrained |
| import logging |
| spai_logger = logging.getLogger("spai_load") |
| spai_logger.setLevel(logging.WARNING) |
|
|
| config = get_custom_config(os.path.join(spai_dir, "configs", "spai.yaml")) |
| config.defrost() |
| config.PRETRAINED = spai_weights |
| config.freeze() |
|
|
| self._spai_model = build_cls_model(config) |
| self._spai_model.cuda() |
| self._spai_model.eval() |
| load_pretrained(config, self._spai_model, spai_logger) |
| self._spai_feat_batch = config.MODEL.FEATURE_EXTRACTION_BATCH |
| print(" SPAI model loaded (139.9M params, CVPR 2025)") |
| except Exception as e: |
| print(f" Warning: SPAI failed to load: {e}") |
| self._spai_model = None |
| else: |
| print(f" SPAI weights not found at {spai_weights}, skipping") |
|
|
| print("Image models loaded!") |
|
|
| def _extract_image_features(self, img: Image.Image) -> list: |
| """Extract 4 model scores + 2 FFT features for meta-classifier.""" |
| feats = [] |
|
|
| |
| for p in self._image_models: |
| result = p(img) |
| ai_score = 0.0 |
| for r in result: |
| lab = r["label"].lower() |
| if lab in ["sd", "dalle", "artificial", "fake", "ai"]: |
| ai_score = r["score"] |
| break |
| feats.append(ai_score) |
|
|
| |
| img_gray = np.array(img.convert('L').resize((256, 256)), dtype=np.float64) |
| f_shift = np.fft.fftshift(np.fft.fft2(img_gray)) |
| power = np.abs(f_shift) ** 2 |
| h, w = power.shape |
| cy, cx = h // 2, w // 2 |
| Y, X = np.ogrid[:h, :w] |
| r = np.sqrt((X - cx)**2 + (Y - cy)**2).astype(int) |
| max_r = min(cx, cy) |
| radial_psd = np.zeros(max_r) |
| for i in range(max_r): |
| mask = r == i |
| if mask.any(): |
| radial_psd[i] = power[mask].mean() |
| log_psd = np.log(radial_psd + 1e-10) |
| freqs = np.arange(1, len(log_psd)) |
| slope, _ = np.polyfit(np.log(freqs), log_psd[1:], 1) |
| mid = len(radial_psd) // 2 |
| hf_ratio = np.sum(radial_psd[mid:]) / (np.sum(radial_psd) + 1e-10) |
|
|
| feats.append(slope) |
| feats.append(hf_ratio) |
| return feats |
|
|
| def _spai_score(self, img: Image.Image) -> float: |
| """Get SPAI (CVPR 2025) AI probability score for an image.""" |
| if self._spai_model is None: |
| return -1.0 |
| try: |
| |
| if img.size[0] < 224 or img.size[1] < 224: |
| img = img.resize((max(224, img.size[0]), max(224, img.size[1]))) |
| t = self._spai_to_tensor(img).unsqueeze(0).cuda() |
| with torch.no_grad(): |
| out = self._spai_model([t], self._spai_feat_batch) |
| return float(torch.sigmoid(out).item()) |
| except Exception: |
| return -1.0 |
|
|
| def _bombek_score(self, img: Image.Image) -> float: |
| """Get Bombek1 SigLIP2+DINOv2 AI probability score.""" |
| if self._bombek_model is None: |
| return -1.0 |
| try: |
| result = self._bombek_model.predict(img) |
| return float(result["probability"]) |
| except Exception: |
| return -1.0 |
|
|
| def detect_image(self, image: Union[str, Image.Image]) -> Dict: |
| """ |
| Detect if an image is AI-generated using stacking meta-classifier + SPAI + Bombek1. |
| |
| Args: |
| image: File path or PIL Image |
| |
| Returns: |
| {"is_ai": bool, "confidence": float, "ai_probability": float, "label": str, "details": dict} |
| """ |
| if self._image_models is None: |
| raise RuntimeError("Image models not loaded. Initialize with load_image=True") |
|
|
| |
| provenance = None |
| image_path = None |
| if isinstance(image, str): |
| image_path = image |
| provenance = self.check_provenance(image) |
| image = Image.open(image) |
| img = image.convert("RGB") |
|
|
| feats6 = self._extract_image_features(img) |
|
|
| |
| spai = self._spai_score(img) |
|
|
| |
| bombek = self._bombek_score(img) |
|
|
| |
| feats = feats6 + [max(0.0, bombek), max(0.0, spai)] |
| raw_prob = _logistic_predict(feats, _IMG_SCALER_MEAN, _IMG_SCALER_SCALE, _IMG_LR_COEF, _IMG_LR_INTERCEPT) |
|
|
| model_scores = feats6[:4] |
| n_ai_models = sum(1 for s in model_scores if s > 0.5) |
| if spai >= 0 and spai > 0.5: |
| n_ai_models += 1 |
| if bombek >= 0 and bombek > 0.5: |
| n_ai_models += 1 |
|
|
| |
| ai_prob = raw_prob |
|
|
| is_ai = ai_prob > 0.5 |
| confidence = abs(ai_prob - 0.5) * 2 |
|
|
| model_names = [ |
| "NYUAD_AI-generated_images_detector", |
| "sdxl-detector", |
| "AI-image-detector", |
| "ai_vs_real_image_detection", |
| ] |
| details = {name: round(score, 4) for name, score in zip(model_names, model_scores)} |
| details["fft_slope"] = round(feats[4], 4) |
| details["fft_hf_ratio"] = round(feats[5], 8) |
| if spai >= 0: |
| details["SPAI"] = round(spai, 4) |
| if bombek >= 0: |
| details["Bombek1_SigLIP2_DINOv2"] = round(bombek, 4) |
| details["models_agreeing_ai"] = n_ai_models |
|
|
| |
| if provenance and provenance["has_provenance"]: |
| details["provenance"] = { |
| "source": provenance["source"], |
| "ai_signals": provenance["ai_signals"], |
| "camera_signals": provenance["camera_signals"], |
| } |
| |
| if provenance["ai_signals"]: |
| |
| ai_prob = max(ai_prob, 0.85) |
| is_ai = True |
| elif provenance["camera_signals"] and not provenance["ai_signals"]: |
| |
| if ai_prob > 0.5 and n_ai_models < 4: |
| details["provenance_override"] = f"Camera metadata found, reducing AI probability from {ai_prob:.4f}" |
| ai_prob = min(ai_prob, 0.45) |
| is_ai = False |
|
|
| confidence = abs(ai_prob - 0.5) * 2 |
|
|
| return { |
| "is_ai": is_ai, |
| "confidence": round(confidence, 3), |
| "ai_probability": round(ai_prob, 4), |
| "label": "AI-Generated" if is_ai else "Real", |
| "details": details, |
| } |
|
|
| def detect_images_batch(self, images: List[Union[str, Image.Image]]) -> List[Dict]: |
| """Batch process multiple images.""" |
| return [self.detect_image(img) for img in images] |
|
|
| |
|
|
| @staticmethod |
| def check_provenance(image_path: str) -> Dict: |
| """ |
| Check image provenance metadata for AI generation signals. |
| |
| Checks C2PA (if library available), EXIF, and XMP metadata for |
| known AI tool signatures or real camera provenance. |
| |
| Args: |
| image_path: Path to image file |
| |
| Returns: |
| {"has_provenance": bool, "source": str|None, "ai_signals": list, "camera_signals": list} |
| """ |
| result = {"has_provenance": False, "source": None, "ai_signals": [], "camera_signals": [], "details": {}} |
|
|
| |
| ai_keywords = ["dall-e", "dalle", "chatgpt", "openai", "midjourney", "stable diffusion", |
| "firefly", "adobe firefly", "imagen", "gemini", "flux", "ideogram", |
| "leonardo", "playground", "nightcafe", "artbreeder"] |
|
|
| |
| try: |
| import c2pa |
| reader = c2pa.Reader(image_path) |
| import json |
| manifest_data = json.loads(reader.json()) |
| result["has_provenance"] = True |
| result["source"] = "c2pa" |
| result["details"]["c2pa"] = manifest_data |
|
|
| active = manifest_data.get("active_manifest", "") |
| if active and active in manifest_data.get("manifests", {}): |
| m = manifest_data["manifests"][active] |
| gen = m.get("claim_generator", "") |
| result["details"]["claim_generator"] = gen |
|
|
| |
| for assertion in m.get("assertions", []): |
| if "c2pa.actions" in assertion.get("label", ""): |
| for action in assertion.get("data", {}).get("actions", []): |
| dst = action.get("digitalSourceType", "") |
| if "trainedAlgorithmicMedia" in dst: |
| result["ai_signals"].append(f"c2pa:trainedAlgorithmicMedia") |
| elif "digitalCapture" in dst: |
| result["camera_signals"].append(f"c2pa:digitalCapture") |
|
|
| if any(kw in gen.lower() for kw in ai_keywords): |
| result["ai_signals"].append(f"c2pa:generator={gen}") |
| except ImportError: |
| pass |
| except Exception: |
| pass |
|
|
| |
| try: |
| img = Image.open(image_path) |
| exif = img.getexif() |
| if exif: |
| |
| software = exif.get(305, "") |
| make = exif.get(271, "") |
| model = exif.get(272, "") |
|
|
| if software or make or model: |
| result["has_provenance"] = True |
| result["details"]["exif_software"] = software |
| result["details"]["exif_make"] = make |
| result["details"]["exif_model"] = model |
|
|
| sw_lower = software.lower() |
| if any(kw in sw_lower for kw in ai_keywords): |
| result["ai_signals"].append(f"exif:software={software}") |
| if make and make.lower() not in ["", "unknown"]: |
| result["camera_signals"].append(f"exif:make={make}") |
| if model and model.lower() not in ["", "unknown"]: |
| result["camera_signals"].append(f"exif:model={model}") |
| except Exception: |
| pass |
|
|
| |
| try: |
| with open(image_path, 'rb') as f: |
| data = f.read(min(65536, os.path.getsize(image_path))) |
| |
| xmp_start = data.find(b'<x:xmpmeta') |
| if xmp_start >= 0: |
| xmp_end = data.find(b'</x:xmpmeta>', xmp_start) |
| if xmp_end >= 0: |
| xmp = data[xmp_start:xmp_end + 13].decode('utf-8', errors='ignore') |
| result["details"]["has_xmp"] = True |
| xmp_lower = xmp.lower() |
| for kw in ai_keywords: |
| if kw in xmp_lower: |
| result["ai_signals"].append(f"xmp:contains={kw}") |
| result["has_provenance"] = True |
| |
| if "trainedalgorithmicmedia" in xmp_lower: |
| result["ai_signals"].append("xmp:trainedAlgorithmicMedia") |
| result["has_provenance"] = True |
| if "digitalcapture" in xmp_lower: |
| result["camera_signals"].append("xmp:digitalCapture") |
| result["has_provenance"] = True |
| except Exception: |
| pass |
|
|
| if not result["source"]: |
| if result["ai_signals"]: |
| result["source"] = "metadata" |
| elif result["camera_signals"]: |
| result["source"] = "exif" |
|
|
| return result |
|
|
| |
|
|
| def _load_audio_models(self): |
| from transformers import AutoFeatureExtractor, AutoModelForAudioClassification |
| print("Loading 3 audio detectors + DF_Arena_1B...") |
| self._audio_models = [] |
|
|
| for name, short in [ |
| ("DavidCombei/wav2vec2-xls-r-1b-DeepFake-AI4TRUST", "DavidCombei-1B"), |
| ("Gustking/wav2vec2-large-xlsr-deepfake-audio-classification", "Gustking"), |
| ]: |
| feat = AutoFeatureExtractor.from_pretrained(name) |
| model = AutoModelForAudioClassification.from_pretrained(name).eval().to(self.device) |
| if self._compile_models: |
| try: |
| model = torch.compile(model) |
| except Exception: |
| pass |
| self._audio_models.append({"feat": feat, "model": model, "fake_idx": 1, "name": short}) |
|
|
| |
| try: |
| mo_feat = AutoFeatureExtractor.from_pretrained("mo-thecreator/Deepfake-audio-detection") |
| mo_model = AutoModelForAudioClassification.from_pretrained("mo-thecreator/Deepfake-audio-detection").eval().to(self.device) |
| |
| id2label = getattr(mo_model.config, 'id2label', {}) |
| fake_idx = 1 |
| for idx, label in id2label.items(): |
| if any(kw in str(label).lower() for kw in ['fake', 'spoof', 'deepfake', 'synthetic']): |
| fake_idx = int(idx) |
| break |
| self._audio_models.append({"feat": mo_feat, "model": mo_model, "fake_idx": fake_idx, "name": "mo-thecreator"}) |
| print(" mo-thecreator Deepfake-audio-detection loaded (In-the-Wild specialist)") |
| except Exception as e: |
| print(f" Warning: mo-thecreator failed to load: {e}") |
| self._audio_models.append(None) |
|
|
| |
| |
| self._arena_pipe = None |
| try: |
| from transformers import pipeline as hf_pipeline |
| self._arena_pipe = hf_pipeline( |
| "antispoofing", |
| model="Speech-Arena-2025/DF_Arena_1B_V_1", |
| trust_remote_code=True, |
| device=self.device |
| ) |
| print(" DF_Arena_1B loaded (1B params, Speech DF Arena 2025)") |
| except Exception as e: |
| print(f" Warning: DF_Arena_1B failed to load: {e}") |
|
|
| print("Audio models loaded!") |
|
|
| def _arena_score(self, audio_arr: np.ndarray) -> float: |
| """Get DF_Arena_1B spoof probability score.""" |
| if self._arena_pipe is None: |
| return -1.0 |
| try: |
| result = self._arena_pipe(audio_arr) |
| return float(result.get("all_scores", {}).get("spoof", 0.0)) |
| except Exception: |
| return -1.0 |
|
|
| def _extract_audio_features(self, audio_arr: np.ndarray, sr: int) -> list: |
| """Extract 3 model scores + 5 spectral features for meta-classifier. |
| Feature order: [DavidCombei, Gustking, mo-thecreator, spec_flat, centroid_mean, |
| centroid_std, zcr, rolloff]""" |
| import librosa |
|
|
| feats = [] |
|
|
| |
| for m in self._audio_models: |
| if m is None: |
| feats.append(0.5) |
| continue |
| inp = m["feat"](audio_arr, sampling_rate=sr, return_tensors="pt", padding=True) |
| with torch.no_grad(): |
| logits = m["model"](**{k: v.to(self.device) for k, v in inp.items()}).logits |
| probs = torch.softmax(logits, dim=-1).cpu().numpy()[0] |
| feats.append(float(probs[m["fake_idx"]])) |
|
|
| |
| sf_vals = librosa.feature.spectral_flatness(y=audio_arr, n_fft=2048, hop_length=512) |
| feats.append(float(np.mean(sf_vals))) |
|
|
| centroid = librosa.feature.spectral_centroid(y=audio_arr, sr=sr) |
| feats.append(float(np.mean(centroid))) |
| feats.append(float(np.std(centroid))) |
|
|
| zcr = librosa.feature.zero_crossing_rate(audio_arr) |
| feats.append(float(np.mean(zcr))) |
|
|
| rolloff = librosa.feature.spectral_rolloff(y=audio_arr, sr=sr, roll_percent=0.99) |
| feats.append(float(np.mean(rolloff))) |
|
|
| return feats |
|
|
| def detect_audio(self, audio: Union[str, np.ndarray], sr: int = 16000, max_duration: float = 4.0) -> Dict: |
| """ |
| Detect if audio is AI-generated/deepfake using stacking meta-classifier. |
| |
| Args: |
| audio: File path or numpy array |
| sr: Sample rate (if numpy array) |
| max_duration: Max seconds to analyze |
| |
| Returns: |
| {"is_ai": bool, "confidence": float, "ai_probability": float, "label": str, "details": dict} |
| """ |
| if self._audio_models is None: |
| raise RuntimeError("Audio models not loaded. Initialize with load_audio=True") |
|
|
| import librosa |
|
|
| if isinstance(audio, str): |
| audio_arr, sr = sf.read(audio) |
| audio_arr = audio_arr.astype(np.float32) |
| else: |
| audio_arr = audio.astype(np.float32) |
|
|
| if len(audio_arr.shape) > 1: |
| audio_arr = audio_arr[:, 0] |
|
|
| |
| if sr != 16000: |
| audio_arr = librosa.resample(audio_arr, orig_sr=sr, target_sr=16000) |
| sr = 16000 |
|
|
| |
| max_samples = int(max_duration * sr) |
| audio_arr = audio_arr[:max_samples] |
|
|
| |
| if np.abs(audio_arr).max() > 0: |
| audio_arr = audio_arr / np.abs(audio_arr).max() |
|
|
| feats8 = self._extract_audio_features(audio_arr, sr) |
|
|
| |
| arena_score = self._arena_score(audio_arr) |
|
|
| |
| feats = feats8 + [max(0.0, arena_score)] |
| raw_prob = _logistic_predict(feats, _AUD_SCALER_MEAN, _AUD_SCALER_SCALE, _AUD_LR_COEF, _AUD_LR_INTERCEPT) |
|
|
| |
| |
| centroid_mean = feats[4] |
| centroid_std = feats[5] |
| spec_flat = feats[3] |
| rolloff = feats[7] |
|
|
| |
| spectral_real_votes = 0 |
| if centroid_mean > 2000: |
| spectral_real_votes += 1 |
| if centroid_std > 1000: |
| spectral_real_votes += 1 |
| if spec_flat > 0.04: |
| spectral_real_votes += 1 |
| if rolloff > 6500: |
| spectral_real_votes += 1 |
|
|
| |
| ai_prob = raw_prob |
|
|
| is_ai = ai_prob > 0.5 |
| confidence = abs(ai_prob - 0.5) * 2 |
|
|
| details = { |
| "DavidCombei-1B": round(feats[0], 4), |
| "Gustking": round(feats[1], 4), |
| "mo-thecreator": round(feats[2], 4), |
| "spectral_flatness": round(feats[3], 6), |
| "centroid_mean": round(feats[4], 2), |
| "centroid_std": round(feats[5], 2), |
| "zcr": round(feats[6], 6), |
| "rolloff_99": round(feats[7], 2), |
| "spectral_real_votes": spectral_real_votes, |
| } |
| if arena_score >= 0: |
| details["DF_Arena_1B"] = round(arena_score, 4) |
|
|
| return { |
| "is_ai": is_ai, |
| "confidence": round(confidence, 3), |
| "ai_probability": round(ai_prob, 4), |
| "label": "AI-Generated" if is_ai else "Real", |
| "details": details, |
| } |
|
|
| def detect_audio_batch(self, audio_files: List[str]) -> List[Dict]: |
| """Batch process multiple audio files.""" |
| return [self.detect_audio(f) for f in audio_files] |
|
|
| |
|
|
| def _load_text_models(self): |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline as hf_pipeline |
| print("Loading text detectors (Binoculars + RoBERTa + fakespot)...") |
|
|
| |
| observer_name = "tiiuae/falcon-7b" |
| performer_name = "tiiuae/falcon-7b-instruct" |
|
|
| self._tokenizer = AutoTokenizer.from_pretrained(observer_name) |
| if self._tokenizer.pad_token is None: |
| self._tokenizer.pad_token = self._tokenizer.eos_token |
|
|
| if self._quantize_text: |
| |
| print(" Using INT8 quantization for Falcon-7B") |
| try: |
| from transformers import BitsAndBytesConfig |
| bnb_config = BitsAndBytesConfig(load_in_8bit=True) |
| self._observer = AutoModelForCausalLM.from_pretrained( |
| observer_name, quantization_config=bnb_config, device_map="auto" |
| ) |
| self._performer = AutoModelForCausalLM.from_pretrained( |
| performer_name, quantization_config=bnb_config, device_map="auto" |
| ) |
| except (ImportError, TypeError): |
| |
| self._observer = AutoModelForCausalLM.from_pretrained( |
| observer_name, load_in_8bit=True, device_map="auto" |
| ) |
| self._performer = AutoModelForCausalLM.from_pretrained( |
| performer_name, load_in_8bit=True, device_map="auto" |
| ) |
| else: |
| self._observer = AutoModelForCausalLM.from_pretrained( |
| observer_name, torch_dtype=torch.float16, device_map="auto" |
| ) |
| self._performer = AutoModelForCausalLM.from_pretrained( |
| performer_name, torch_dtype=torch.float16, device_map="auto" |
| ) |
| self._observer.eval() |
| self._performer.eval() |
|
|
| |
| dev = 0 if self.device == "cuda" else -1 |
| self._roberta_clf = hf_pipeline( |
| "text-classification", model="Hello-SimpleAI/chatgpt-detector-roberta", device=dev, top_k=None |
| ) |
|
|
| |
| self._fakespot_clf = None |
| try: |
| self._fakespot_clf = hf_pipeline( |
| "text-classification", model="fakespot-ai/roberta-base-ai-text-detection-v1", |
| device=dev, top_k=None |
| ) |
| print(" fakespot-ai RoBERTa loaded (Mozilla-backed)") |
| except Exception as e: |
| print(f" Warning: fakespot-ai failed to load: {e}") |
|
|
| self._text_models = True |
| print("Text models loaded!") |
|
|
| def _binoculars_score(self, text: str) -> float: |
| """Compute Binoculars score: lower = more likely AI""" |
| inputs = self._tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True) |
| inputs = {k: v.to(self._observer.device) for k, v in inputs.items()} |
|
|
| with torch.no_grad(): |
| obs_logits = self._observer(**inputs).logits |
| per_logits = self._performer(**inputs).logits |
|
|
| pobs = torch.log_softmax(obs_logits[:, :-1], dim=-1) |
| pper = torch.log_softmax(per_logits[:, :-1], dim=-1) |
|
|
| ids = inputs["input_ids"][:, 1:] |
| log_obs = pobs.gather(-1, ids.unsqueeze(-1)).squeeze(-1) |
| log_per = pper.gather(-1, ids.unsqueeze(-1)).squeeze(-1) |
|
|
| mask = inputs.get("attention_mask", torch.ones_like(inputs["input_ids"]))[:, 1:] |
| log_obs = (log_obs * mask).sum() / mask.sum() |
| log_per = (log_per * mask).sum() / mask.sum() |
|
|
| return float(torch.exp(log_obs - log_per)) |
|
|
| def _roberta_ai_score(self, text: str) -> float: |
| """Get RoBERTa ChatGPT detector score.""" |
| result = self._roberta_clf(text[:512]) |
| |
| if result and isinstance(result[0], list): |
| result = result[0] |
| for r in result: |
| if r["label"].lower() in ["chatgpt", "fake", "ai", "1", "label_1"]: |
| return r["score"] |
| return 0.0 |
|
|
| def _fakespot_ai_score(self, text: str) -> float: |
| """Get fakespot-ai RoBERTa AI score. Returns -1 if not loaded.""" |
| if self._fakespot_clf is None: |
| return -1.0 |
| try: |
| result = self._fakespot_clf(text[:512]) |
| if result and isinstance(result[0], list): |
| result = result[0] |
| for r in result: |
| if r["label"].lower() in ["machine", "ai", "fake", "generated", "1", "label_1"]: |
| return r["score"] |
| return 0.0 |
| except Exception: |
| return -1.0 |
|
|
| @staticmethod |
| def _text_stats(text: str) -> list: |
| """Compute statistical text features: burstiness, entropy, ttr, hapax, avg_word_len.""" |
| words = text.split() |
| sentences = [s.strip() for s in text.replace('!', '.').replace('?', '.').split('.') if len(s.strip()) > 5] |
| if len(words) < 10 or len(sentences) < 2: |
| return [0.0] * 5 |
| sent_lens = [len(s.split()) for s in sentences] |
| mean_l, std_l = np.mean(sent_lens), np.std(sent_lens) |
| burstiness = (std_l - mean_l) / (std_l + mean_l) if (std_l + mean_l) > 0 else 0 |
| freq = Counter(w.lower() for w in words) |
| entropy = -sum((c / len(words)) * math.log2(c / len(words)) for c in freq.values()) |
| ttr = len(set(w.lower() for w in words)) / len(words) |
| hapax = sum(1 for c in freq.values() if c == 1) / len(words) |
| avg_word_len = np.mean([len(w) for w in words]) |
| return [burstiness, entropy, ttr, hapax, avg_word_len] |
|
|
| def _extract_text_features(self, text: str) -> list: |
| """Extract Binoculars + RoBERTa + stats for meta-classifier.""" |
| feats = [] |
| feats.append(self._binoculars_score(text[:1000])) |
| feats.append(self._roberta_ai_score(text)) |
| feats.extend(self._text_stats(text[:2000])) |
| return feats |
|
|
| def detect_text(self, text: str) -> Dict: |
| """ |
| Detect if text is AI-generated using stacking meta-classifier + fakespot. |
| |
| Args: |
| text: Text to analyze (min ~100 chars for reliable results) |
| |
| Returns: |
| {"is_ai": bool, "confidence": float, "ai_probability": float, "label": str, "details": dict} |
| """ |
| if self._text_models is None: |
| raise RuntimeError("Text models not loaded. Initialize with load_text=True") |
|
|
| if len(text) < 50: |
| return {"is_ai": False, "confidence": 0.0, "ai_probability": 0.0, |
| "label": "Too short", "warning": "Text too short for reliable detection"} |
|
|
| feats7 = self._extract_text_features(text) |
| word_count = len(text.split()) |
|
|
| |
| fakespot = self._fakespot_ai_score(text) |
| feats = feats7 + [max(0.0, fakespot)] |
|
|
| |
| |
| if word_count < 100: |
| bino = feats[0] |
| roberta = feats[1] |
| bino_ai = max(0.0, min(1.0, (1.10 - bino) / 0.15)) |
| if fakespot >= 0: |
| ai_prob = bino_ai * 0.50 + roberta * 0.25 + fakespot * 0.25 |
| else: |
| ai_prob = bino_ai * 0.65 + roberta * 0.35 |
| ai_prob = max(0.0, min(1.0, ai_prob)) |
| else: |
| |
| ai_prob = _logistic_predict(feats, _TXT_SCALER_MEAN, _TXT_SCALER_SCALE, _TXT_LR_COEF, _TXT_LR_INTERCEPT) |
|
|
| is_ai = ai_prob > 0.5 |
| confidence = abs(ai_prob - 0.5) * 2 |
|
|
| details = { |
| "binoculars_score": round(feats[0], 4), |
| "roberta_ai_score": round(feats[1], 4), |
| "burstiness": round(feats[2], 4), |
| "entropy": round(feats[3], 4), |
| "ttr": round(feats[4], 4), |
| "hapax_ratio": round(feats[5], 4), |
| "avg_word_len": round(feats[6], 4), |
| } |
| if fakespot >= 0: |
| details["fakespot_ai_score"] = round(fakespot, 4) |
| if word_count < 100: |
| details["short_text_mode"] = True |
|
|
| return { |
| "is_ai": is_ai, |
| "confidence": round(confidence, 3), |
| "ai_probability": round(ai_prob, 4), |
| "label": "AI-Generated" if is_ai else "Human-Written", |
| "details": details, |
| } |
|
|
| def detect_text_batch(self, texts: List[str]) -> List[Dict]: |
| """Batch process multiple texts.""" |
| return [self.detect_text(t) for t in texts] |
|
|
| |
|
|
| def detect_video(self, video: str, num_frames: int = 8, analyze_audio: bool = True) -> Dict: |
| """ |
| Detect if a video is AI-generated by analyzing frames + audio track. |
| |
| Combines image detection on sampled frames with audio detection on |
| the extracted audio track (via ffmpeg). Returns separate results for |
| video (frames) and audio, plus a combined probability. |
| |
| Args: |
| video: Path to video file (mp4, avi, webm, etc.) |
| num_frames: Number of frames to sample (default 8) |
| analyze_audio: Also extract and analyze audio track (default True) |
| |
| Returns: |
| {"is_ai": bool, "ai_probability": float, "confidence": float, "label": str, |
| "video": {...frames analysis...}, |
| "audio": {...audio analysis or None...}, |
| "combined_ai_probability": float} |
| """ |
| if self._image_models is None: |
| raise RuntimeError("Image models not loaded. Initialize with load_image=True") |
|
|
| import cv2 |
|
|
| |
| cap = cv2.VideoCapture(video) |
| if not cap.isOpened(): |
| raise ValueError(f"Cannot open video: {video}") |
|
|
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| if total_frames <= 0: |
| raise ValueError(f"Cannot read frame count: {video}") |
|
|
| |
| start = int(total_frames * 0.05) |
| end = int(total_frames * 0.95) |
| if end <= start: |
| start, end = 0, total_frames |
| indices = np.linspace(start, end - 1, num_frames, dtype=int) |
|
|
| frame_results = [] |
| for idx in indices: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx)) |
| ret, frame = cap.read() |
| if not ret: |
| continue |
| pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| result = self.detect_image(pil_img) |
| frame_results.append(result) |
|
|
| cap.release() |
|
|
| if not frame_results: |
| raise ValueError(f"Could not read any frames from: {video}") |
|
|
| ai_count = sum(1 for r in frame_results if r["is_ai"]) |
| video_prob = float(np.mean([r["ai_probability"] for r in frame_results])) |
| video_is_ai = ai_count > len(frame_results) / 2 |
|
|
| video_result = { |
| "is_ai": video_is_ai, |
| "ai_probability": round(video_prob, 4), |
| "frames_analyzed": len(frame_results), |
| "frames_ai": ai_count, |
| "label": "AI-Generated" if video_is_ai else "Real", |
| "details": {f"frame_{i}": round(r["ai_probability"], 4) for i, r in enumerate(frame_results)}, |
| } |
|
|
| |
| audio_result = None |
| if analyze_audio and self._audio_models is not None: |
| audio_result = self._extract_and_analyze_audio(video) |
|
|
| |
| |
| if audio_result is not None: |
| audio_prob = audio_result["ai_probability"] |
| combined_prob = 0.5 * video_prob + 0.5 * audio_prob |
| else: |
| combined_prob = video_prob |
|
|
| is_ai = combined_prob > 0.5 |
| confidence = abs(combined_prob - 0.5) * 2 |
|
|
| return { |
| "is_ai": is_ai, |
| "ai_probability": round(combined_prob, 4), |
| "confidence": round(confidence, 3), |
| "label": "AI-Generated" if is_ai else "Real", |
| "video": video_result, |
| "audio": audio_result, |
| "combined_ai_probability": round(combined_prob, 4), |
| } |
|
|
| def _extract_and_analyze_audio(self, video_path: str) -> Optional[Dict]: |
| """Extract audio track from video via ffmpeg and run audio detection.""" |
| import subprocess |
| import tempfile |
|
|
| tmp_wav = None |
| try: |
| tmp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) |
| tmp_wav.close() |
|
|
| |
| result = subprocess.run( |
| ["ffmpeg", "-y", "-i", video_path, "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", tmp_wav.name], |
| capture_output=True, timeout=30, |
| ) |
| if result.returncode != 0: |
| return None |
|
|
| |
| if os.path.getsize(tmp_wav.name) < 1000: |
| return None |
|
|
| return self.detect_audio(tmp_wav.name) |
| except Exception: |
| return None |
| finally: |
| if tmp_wav and os.path.exists(tmp_wav.name): |
| os.unlink(tmp_wav.name) |
|
|
| def detect_video_batch(self, video_files: List[str], num_frames: int = 8) -> List[Dict]: |
| """Batch process multiple videos.""" |
| return [self.detect_video(f, num_frames) for f in video_files] |
|
|
| |
|
|
| def unload(self, modality: str = "all"): |
| """Free GPU memory for a modality: 'image', 'audio', 'text', or 'all'""" |
| if modality in ("image", "all") and self._image_models: |
| del self._image_models |
| self._image_models = None |
| if self._bombek_model is not None: |
| del self._bombek_model |
| self._bombek_model = None |
| if modality in ("audio", "all") and self._audio_models: |
| for m in self._audio_models: |
| del m["model"] |
| self._audio_models = None |
| if self._arena_pipe is not None: |
| del self._arena_pipe |
| self._arena_pipe = None |
| if modality in ("text", "all") and self._text_models: |
| del self._observer, self._performer, self._roberta_clf |
| if self._fakespot_clf is not None: |
| del self._fakespot_clf |
| self._fakespot_clf = None |
| self._text_models = None |
| torch.cuda.empty_cache() |
|
|
|
|
| |
| if __name__ == "__main__": |
| print("=" * 60) |
| print("AI Content Detector v2 - Stacking Ensemble Validation") |
| print("=" * 60) |
|
|
| detector = AIContentDetector(load_text=False) |
|
|
| |
| ai_dir = "/home/jupyter/ai-detection/image/ai_generated" |
| if os.path.exists(ai_dir): |
| files = [f for f in os.listdir(ai_dir) if f.endswith(".png")] |
| if files: |
| result = detector.detect_image(os.path.join(ai_dir, files[0])) |
| print(f"\nImage test (AI-generated): {result['label']} (prob={result['ai_probability']}, conf={result['confidence']})") |
|
|
| |
| from datasets import load_dataset |
| ds = load_dataset("uoft-cs/cifar10", split="test[:5]") |
| results = detector.detect_images_batch([img["img"].resize((512, 512)) for img in ds]) |
| real_count = sum(1 for r in results if not r["is_ai"]) |
| print(f"Image batch (5 real CIFAR-10): {real_count}/5 correctly identified as Real") |
|
|
| |
| audio_dir = "/home/jupyter/ai-detection/audio/test_audio" |
| if os.path.exists(audio_dir): |
| wav_files = [f for f in sorted(os.listdir(audio_dir)) if f.endswith(".wav") and "synth" not in f and "real_speech_" not in f] |
| if wav_files: |
| result = detector.detect_audio(os.path.join(audio_dir, wav_files[0])) |
| print(f"\nAudio test ({wav_files[0]}): {result['label']} (prob={result['ai_probability']})") |
|
|
| print("\nDone! Import with: from detector import AIContentDetector") |
|
|