pratik-250620's picture
Upload folder using huggingface_hub
358d3bc verified
"""
Calibrated Multimodal Semantic Coherence Index (cMSCI) Engine.
Replaces fixed weighted averaging (MSCI) with a principled pipeline:
1. Gramian Volume: geometric coherence of embedding vectors
2. Distribution Normalization: z-score calibration per channel
3. Contrastive Margin: comparison against hard negatives
4. Cross-Space Alignment: Ex-MCR projects CLAP→CLIP for 3-way GRAM
5. Probabilistic Uncertainty: MC sampling for confidence intervals
The CalibratedCoherenceEngine runs alongside CoherenceEngine (not replacing
it) and returns both legacy MSCI and new cMSCI scores for comparison.
Variant progression:
A: MSCI (baseline, weighted cosine average)
B: GRAM-only (geometric, no calibration)
C: GRAM + z-norm (normalized geometric)
D: GRAM + z-norm + contrastive (calibrated geometric)
E: GRAM + z-norm + contrastive + Ex-MCR (3-way calibrated)
F: Full cMSCI (probabilistic + calibrated + 3-way)
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
from src.coherence.gram_volume import (
gram_volume_2d,
gram_volume_3d,
gram_volume_nd,
normalized_gram_coherence,
)
from src.config.settings import (
CMSCI_MARGIN_ALPHA,
CMSCI_CHANNEL_WEIGHT_TI,
CMSCI_CALIBRATION_MODE,
CMSCI_W_3D,
CMSCI_GAMMA,
)
from src.embeddings.aligned_embeddings import AlignedEmbedder
from src.embeddings.similarity import cosine_similarity
logger = logging.getLogger(__name__)
class CalibratedCoherenceEngine:
"""
Uncertainty-aware, geometrically-grounded tri-modal coherence engine.
Computes cMSCI alongside legacy MSCI for comparison.
Usage:
engine = CalibratedCoherenceEngine()
result = engine.evaluate("A beach at sunset", "beach.jpg", "waves.wav")
print(result["cmsci"]) # Calibrated score
print(result["msci"]) # Legacy score (for comparison)
print(result["variant_scores"]) # Scores for each variant A-F
"""
def __init__(
self,
target_dim: int = 512,
calibration_path: Optional[str] = None,
exmcr_weights_path: Optional[str] = None,
bridge_path: Optional[str] = None,
prob_clip_adapter_path: Optional[str] = None,
prob_clap_adapter_path: Optional[str] = None,
negative_bank_enabled: bool = True,
):
self.embedder = AlignedEmbedder(target_dim=target_dim)
# Calibration store (Phase 2)
self._calibration = None
if calibration_path and Path(calibration_path).exists():
from src.coherence.calibration import CalibrationStore
self._calibration = CalibrationStore.load(calibration_path)
logger.info("Calibration loaded from %s", calibration_path)
# Negative bank (Phase 2)
self._negative_bank = None
if negative_bank_enabled:
try:
from src.coherence.negative_bank import NegativeBank
self._negative_bank = NegativeBank()
except Exception as e:
logger.warning("Negative bank disabled: %s", e)
# Ex-MCR projector (Phase 3 β€” projects CLAP into CLIP space)
self._exmcr = None
if exmcr_weights_path:
from src.embeddings.space_alignment import ExMCRProjector
self._exmcr = ExMCRProjector(weights_path=exmcr_weights_path)
if self._exmcr.is_identity:
logger.info("Ex-MCR in identity mode (no weights)")
else:
logger.info("Ex-MCR projector active")
# Cross-Space Bridge (projects CLIP image + CLAP audio β†’ shared 256-d)
self._bridge = None
if bridge_path and Path(bridge_path).exists():
from src.embeddings.cross_space_bridge import CrossSpaceBridge
self._bridge = CrossSpaceBridge.load(bridge_path)
logger.info("CrossSpaceBridge loaded from %s", bridge_path)
# Probabilistic adapters (Phase 4)
self._prob_clip = None
self._prob_clap = None
if prob_clip_adapter_path and Path(prob_clip_adapter_path).exists():
from src.embeddings.probabilistic_adapter import ProbabilisticAdapter
self._prob_clip = ProbabilisticAdapter.load(prob_clip_adapter_path)
logger.info("CLIP probabilistic adapter loaded")
if prob_clap_adapter_path and Path(prob_clap_adapter_path).exists():
from src.embeddings.probabilistic_adapter import ProbabilisticAdapter
self._prob_clap = ProbabilisticAdapter.load(prob_clap_adapter_path)
logger.info("CLAP probabilistic adapter loaded")
def evaluate(
self,
text: str,
image_path: Optional[str] = None,
audio_path: Optional[str] = None,
domain: str = "",
n_mc_samples: int = 100,
) -> Dict[str, Any]:
"""
Evaluate multimodal coherence with full cMSCI pipeline.
Returns both legacy MSCI and cMSCI scores along with all
intermediate computations for ablation analysis.
Args:
text: Text prompt.
image_path: Path to image file.
audio_path: Path to audio file.
domain: Domain hint for negative bank (e.g., "nature").
n_mc_samples: Number of MC samples for uncertainty.
Returns:
Dict with keys:
msci: Legacy MSCI score (weighted cosine average)
cmsci: Calibrated cMSCI score
scores: Raw pairwise scores (st_i, st_a, si_a)
gram: Gramian volume scores
calibration: Z-normalized scores
contrastive: Contrastive margin results
uncertainty: MC sampling uncertainty (if adapters loaded)
variant_scores: Scores for each variant A-F
"""
# ── Embed ──────────────────────────────────────────────
emb_text_clip = self.embedder.embed_text(text)
emb_text_clap = self.embedder.embed_text_for_audio(text) if audio_path else None
emb_image = self.embedder.embed_image(image_path) if image_path else None
emb_audio = self.embedder.embed_audio(audio_path) if audio_path else None
# ── Legacy MSCI (Variant A) ────────────────────────────
st_i = None
st_a = None
si_a = None
if emb_text_clip is not None and emb_image is not None:
st_i = float(round(cosine_similarity(emb_text_clip, emb_image), 4))
if emb_text_clap is not None and emb_audio is not None:
st_a = float(round(cosine_similarity(emb_text_clap, emb_audio), 4))
available = {}
if st_i is not None:
available["st_i"] = st_i
if st_a is not None:
available["st_a"] = st_a
weights = {"st_i": 0.45, "st_a": 0.45, "si_a": 0.10}
if len(available) >= 2:
total_w = sum(weights[k] for k in available if k in weights)
msci = sum(available[k] * weights[k] for k in available if k in weights) / max(total_w, 1e-6)
elif len(available) == 1:
msci = list(available.values())[0]
else:
msci = None
variant_a = msci
# ── Gramian Volume (Variant B) ─────────────────────────
gram_ti = None
gram_ta = None
gram_tia = None
gram_coherence_2way = None
if emb_text_clip is not None and emb_image is not None:
gram_ti = gram_volume_2d(emb_text_clip, emb_image)
if emb_text_clap is not None and emb_audio is not None:
gram_ta = gram_volume_2d(emb_text_clap, emb_audio)
# 2-way GRAM coherence (average of text-image and text-audio gram coherences)
gram_coherences = []
if gram_ti is not None:
gram_coherences.append(normalized_gram_coherence(gram_ti))
if gram_ta is not None:
gram_coherences.append(normalized_gram_coherence(gram_ta))
if gram_coherences:
gram_coherence_2way = float(np.mean(gram_coherences))
variant_b = gram_coherence_2way
# ── Z-Score Normalization (Variant C) ──────────────────
z_st_i = None
z_st_a = None
z_gram_ti = None
z_gram_ta = None
variant_c = variant_b # default to B if no calibration
# Channel weight from settings (optimized via LOO-CV)
w_ti = CMSCI_CHANNEL_WEIGHT_TI
cal_mode = CMSCI_CALIBRATION_MODE
if self._calibration is not None:
if st_i is not None:
z_st_i = self._calibration.normalize("st_i", st_i)
if st_a is not None:
z_st_a = self._calibration.normalize("st_a", st_a)
# GRAM coherence z-scores (for gram calibration mode)
if gram_ti is not None:
gram_coh_ti = normalized_gram_coherence(gram_ti)
z_gram_ti = self._calibration.normalize("gram_coh_ti", gram_coh_ti)
if gram_ta is not None:
gram_coh_ta = normalized_gram_coherence(gram_ta)
z_gram_ta = self._calibration.normalize("gram_coh_ta", gram_coh_ta)
# Select calibration mode: cosine z-scores or gram coherence z-scores
if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None:
z_mean = w_ti * z_gram_ti + (1.0 - w_ti) * z_gram_ta
else:
# Cosine mode (original behavior) with weighted channels
z_coherences = []
z_weights = []
if z_st_i is not None:
z_coherences.append(z_st_i)
z_weights.append(w_ti)
if z_st_a is not None:
z_coherences.append(z_st_a)
z_weights.append(1.0 - w_ti)
if z_coherences:
total_w = sum(z_weights)
z_mean = sum(z * wt for z, wt in zip(z_coherences, z_weights)) / total_w
else:
z_mean = None
if z_mean is not None:
# Map z-scores back to [0,1] via sigmoid for interpretability
variant_c = float(1.0 / (1.0 + np.exp(-z_mean)))
# ── Contrastive Margin (Variant D) ─────────────────────
contrastive_result = None
variant_d = variant_c # default to C if no negatives
margin_alpha = CMSCI_MARGIN_ALPHA
if self._negative_bank is not None and gram_coherence_2way is not None:
matched_volume = float(np.mean([v for v in [gram_ti, gram_ta] if v is not None]))
contrastive_result = self._negative_bank.compute_contrastive_margin(
matched_volume=matched_volume,
text_clip_emb=emb_text_clip,
image_emb=emb_image,
text_clap_emb=emb_text_clap,
audio_emb=emb_audio,
domain=domain,
k=5,
)
if contrastive_result["n_negatives"] > 0:
# cMSCI_D = sigmoid(z_mean + alpha * margin)
# alpha amplifies the contrastive signal at the sigmoid operating point
margin = contrastive_result["margin"]
# Use the same calibration mode and weighting as Variant C
if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None:
z_mean_d = w_ti * z_gram_ti + (1.0 - w_ti) * z_gram_ta
else:
z_coherences_d = []
z_weights_d = []
if z_st_i is not None:
z_coherences_d.append(z_st_i)
z_weights_d.append(w_ti)
elif st_i is not None:
z_coherences_d.append(st_i)
z_weights_d.append(w_ti)
if z_st_a is not None:
z_coherences_d.append(z_st_a)
z_weights_d.append(1.0 - w_ti)
elif st_a is not None:
z_coherences_d.append(st_a)
z_weights_d.append(1.0 - w_ti)
if z_coherences_d:
total_wd = sum(z_weights_d)
z_mean_d = sum(z * wt for z, wt in zip(z_coherences_d, z_weights_d)) / total_wd
else:
z_mean_d = None
if z_mean_d is not None:
variant_d = float(1.0 / (1.0 + np.exp(-(z_mean_d + margin_alpha * margin))))
else:
variant_d = variant_c
# ── Cross-Space Complementarity β€” Variant E ──────────
# COMPLEMENTARITY: E = sigmoid(z_2d + w_3d * z_compl + alpha * margin)
# ExMCR projects CLAP audio β†’ CLIP space, enabling measurement of
# image-audio complementarity (Gramian dispersion in unified space).
# High complementarity = image and audio contribute unique perspectives.
# Low complementarity = redundant cross-modal information.
# z_compl = z_normalize(gram_volume_ia) β€” positive z = more complementary.
# w_3d=0 recovers D exactly (safety guarantee).
audio_projected = None
variant_e = variant_d # default to D if no projector
z_compl = None # z-normalized complementarity (exported for optimizer)
gram_ia_volume = None # raw image-audio Gramian volume
w_3d = CMSCI_W_3D
# Reconstruct D's pre-margin z-score (z_2d) for composition
z_2d = None
margin = 0.0
if contrastive_result is not None and contrastive_result["n_negatives"] > 0:
margin = contrastive_result["margin"]
if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None:
z_2d = w_ti * z_gram_ti + (1.0 - w_ti) * z_gram_ta
elif z_st_i is not None and z_st_a is not None:
z_2d = w_ti * z_st_i + (1.0 - w_ti) * z_st_a
# Project audio into CLIP space via ExMCR and compute complementarity
if self._exmcr is not None and not self._exmcr.is_identity:
if emb_audio is not None:
audio_projected = self._exmcr.project_audio(emb_audio)
if emb_image is not None:
si_a = float(round(cosine_similarity(emb_image, audio_projected), 4))
# Image-audio Gramian volume = dispersion = complementarity
gram_ia_volume = gram_volume_2d(emb_image, audio_projected)
if emb_text_clip is not None and emb_image is not None and audio_projected is not None:
gram_tia = gram_volume_3d(emb_text_clip, emb_image, audio_projected)
# Z-normalize complementarity (volume, NOT coherence)
# z_compl = -z_gram_ia_coherence (flipped: high volume = high complementarity)
if gram_ia_volume is not None and self._calibration is not None:
gram_ia_coherence = normalized_gram_coherence(gram_ia_volume)
z_gram_ia_coh = self._calibration.normalize("gram_coh_ia_exmcr", gram_ia_coherence)
z_compl = -z_gram_ia_coh # flip: positive = more complementary
# Compose: E = sigmoid(z_2d + w_3d * z_compl + alpha * margin)
if z_2d is not None:
logit_e = z_2d + margin_alpha * margin
if z_compl is not None:
logit_e += w_3d * z_compl
variant_e = float(1.0 / (1.0 + np.exp(-logit_e)))
# ── Probabilistic Adaptive Weighting (Variant F) ──────
# ProbVLM drives per-sample channel weights instead of fixed w_ti.
# adaptive_w = (1/u_ti) / (1/u_ti + 1/u_ta) β€” trust more confident channel
# w_ti_final = (1 - gamma) * base_w + gamma * adaptive_w
# gamma=0 β†’ w_ti_final = base_w β†’ recovers E exactly (safety guarantee)
# MC sampling remains metadata only (confidence intervals, not scoring).
uncertainty_result = None
variant_f = variant_e # default to E
u_ti = None # per-channel uncertainty (exported for optimizer)
u_ta = None
adaptive_w_ti = None
gamma = CMSCI_GAMMA
if self._prob_clip is not None or self._prob_clap is not None:
mc_volumes = []
# Per-channel uncertainty from ProbVLM adapters
if self._prob_clip is not None and emb_text_clip is not None and emb_image is not None:
u_text_clip = self._prob_clip.uncertainty(emb_text_clip)
u_image_clip = self._prob_clip.uncertainty(emb_image)
u_ti = float(np.mean([u_text_clip, u_image_clip]))
# MC samples for confidence interval metadata
text_samples = self._prob_clip.sample(emb_text_clip, n_mc_samples)
image_samples = self._prob_clip.sample(emb_image, n_mc_samples)
for t_s, i_s in zip(text_samples, image_samples):
mc_volumes.append(gram_volume_2d(t_s, i_s))
if self._prob_clap is not None and emb_text_clap is not None and emb_audio is not None:
u_text_clap = self._prob_clap.uncertainty(emb_text_clap)
u_audio_clap = self._prob_clap.uncertainty(emb_audio)
u_ta = float(np.mean([u_text_clap, u_audio_clap]))
text_samples = self._prob_clap.sample(emb_text_clap, n_mc_samples)
audio_samples = self._prob_clap.sample(emb_audio, n_mc_samples)
for t_s, a_s in zip(text_samples, audio_samples):
mc_volumes.append(gram_volume_2d(t_s, a_s))
# Compute adaptive channel weight from uncertainty
if u_ti is not None and u_ta is not None and u_ti > 0 and u_ta > 0 and gamma > 0:
inv_ti = 1.0 / u_ti
inv_ta = 1.0 / u_ta
adaptive_w = inv_ti / (inv_ti + inv_ta)
w_ti_final = (1.0 - gamma) * w_ti + gamma * adaptive_w
adaptive_w_ti = float(w_ti_final)
# Recompute z_2d with adaptive weights
if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None:
z_2d_adaptive = w_ti_final * z_gram_ti + (1.0 - w_ti_final) * z_gram_ta
elif z_st_i is not None and z_st_a is not None:
z_2d_adaptive = w_ti_final * z_st_i + (1.0 - w_ti_final) * z_st_a
else:
z_2d_adaptive = None
if z_2d_adaptive is not None:
logit_f = z_2d_adaptive + margin_alpha * margin
if z_compl is not None:
logit_f += w_3d * z_compl
variant_f = float(1.0 / (1.0 + np.exp(-logit_f)))
# MC sampling for confidence intervals (metadata, NOT scoring)
if mc_volumes:
mc_coherences = [normalized_gram_coherence(v) for v in mc_volumes]
mc_mean = float(np.mean(mc_coherences))
mc_std = float(np.std(mc_coherences))
mc_ci_lower = float(np.percentile(mc_coherences, 2.5))
mc_ci_upper = float(np.percentile(mc_coherences, 97.5))
else:
mc_mean = mc_std = mc_ci_lower = mc_ci_upper = None
uncertainty_result = {
"mc_mean": round(mc_mean, 4) if mc_mean is not None else None,
"mc_std": round(mc_std, 4) if mc_std is not None else None,
"mc_ci_lower": round(mc_ci_lower, 4) if mc_ci_lower is not None else None,
"mc_ci_upper": round(mc_ci_upper, 4) if mc_ci_upper is not None else None,
"u_ti": round(u_ti, 6) if u_ti is not None else None,
"u_ta": round(u_ta, 6) if u_ta is not None else None,
"adaptive_w_ti": round(adaptive_w_ti, 4) if adaptive_w_ti is not None else None,
"gamma": gamma,
"n_samples": n_mc_samples,
}
# ── Assemble cMSCI ─────────────────────────────────────
# cMSCI is the highest available variant
cmsci = variant_f
active_variant = "F"
if variant_f == variant_e:
active_variant = "E" if variant_e != variant_d else "D"
if variant_e == variant_d:
active_variant = "D" if variant_d != variant_c else "C"
if variant_d == variant_c:
active_variant = "C" if variant_c != variant_b else "B"
if variant_c == variant_b:
active_variant = "B" if variant_b is not None else "A"
# Final cMSCI: use the most sophisticated available variant
if cmsci is None:
cmsci = msci # fallback to legacy
active_variant = "A"
logger.info(
"cMSCI = %.4f (variant %s) | MSCI = %s",
cmsci if cmsci is not None else 0.0,
active_variant,
msci,
)
return {
"cmsci": round(cmsci, 4) if cmsci is not None else None,
"msci": round(msci, 4) if msci is not None else None,
"active_variant": active_variant,
"scores": {
"st_i": st_i,
"st_a": st_a,
"si_a": si_a,
},
"gram": {
"text_image": round(gram_ti, 4) if gram_ti is not None else None,
"text_audio": round(gram_ta, 4) if gram_ta is not None else None,
"text_image_audio": round(gram_tia, 4) if gram_tia is not None else None,
"coherence_2way": round(gram_coherence_2way, 4) if gram_coherence_2way is not None else None,
},
"calibration": {
"z_st_i": round(z_st_i, 4) if z_st_i is not None else None,
"z_st_a": round(z_st_a, 4) if z_st_a is not None else None,
"z_gram_ti": round(z_gram_ti, 4) if z_gram_ti is not None else None,
"z_gram_ta": round(z_gram_ta, 4) if z_gram_ta is not None else None,
"z_compl": round(z_compl, 4) if z_compl is not None else None,
"gram_ia_volume": round(gram_ia_volume, 4) if gram_ia_volume is not None else None,
"u_ti": round(u_ti, 6) if u_ti is not None else None,
"u_ta": round(u_ta, 6) if u_ta is not None else None,
"adaptive_w_ti": round(adaptive_w_ti, 4) if adaptive_w_ti is not None else None,
"cal_mode": cal_mode if self._calibration is not None else None,
"w_ti": w_ti,
"w_3d": w_3d,
"gamma": gamma,
"margin_alpha": CMSCI_MARGIN_ALPHA if contrastive_result else None,
},
"contrastive": contrastive_result,
"uncertainty": uncertainty_result,
"variant_scores": {
"A_msci": round(variant_a, 4) if variant_a is not None else None,
"B_gram": round(variant_b, 4) if variant_b is not None else None,
"C_gram_znorm": round(variant_c, 4) if variant_c is not None else None,
"D_gram_znorm_contrastive": round(variant_d, 4) if variant_d is not None else None,
"E_gram_znorm_contrastive_exmcr": round(variant_e, 4) if variant_e is not None else None,
"F_full_cmsci": round(variant_f, 4) if variant_f is not None else None,
},
}
def evaluate_batch(
self,
items: List[Dict[str, str]],
n_mc_samples: int = 100,
) -> List[Dict[str, Any]]:
"""
Evaluate a batch of (text, image_path, audio_path) triples.
Args:
items: List of dicts with keys "text", "image_path", "audio_path", "domain".
n_mc_samples: MC samples per item.
Returns:
List of result dicts from evaluate().
"""
results = []
for item in items:
result = self.evaluate(
text=item.get("text", ""),
image_path=item.get("image_path"),
audio_path=item.get("audio_path"),
domain=item.get("domain", ""),
n_mc_samples=n_mc_samples,
)
results.append(result)
return results