| | """ |
| | Calibrated Multimodal Semantic Coherence Index (cMSCI) Engine. |
| | |
| | Replaces fixed weighted averaging (MSCI) with a principled pipeline: |
| | 1. Gramian Volume: geometric coherence of embedding vectors |
| | 2. Distribution Normalization: z-score calibration per channel |
| | 3. Contrastive Margin: comparison against hard negatives |
| | 4. Cross-Space Alignment: Ex-MCR projects CLAPβCLIP for 3-way GRAM |
| | 5. Probabilistic Uncertainty: MC sampling for confidence intervals |
| | |
| | The CalibratedCoherenceEngine runs alongside CoherenceEngine (not replacing |
| | it) and returns both legacy MSCI and new cMSCI scores for comparison. |
| | |
| | Variant progression: |
| | A: MSCI (baseline, weighted cosine average) |
| | B: GRAM-only (geometric, no calibration) |
| | C: GRAM + z-norm (normalized geometric) |
| | D: GRAM + z-norm + contrastive (calibrated geometric) |
| | E: GRAM + z-norm + contrastive + Ex-MCR (3-way calibrated) |
| | F: Full cMSCI (probabilistic + calibrated + 3-way) |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import logging |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional |
| |
|
| | import numpy as np |
| |
|
| | from src.coherence.gram_volume import ( |
| | gram_volume_2d, |
| | gram_volume_3d, |
| | gram_volume_nd, |
| | normalized_gram_coherence, |
| | ) |
| | from src.config.settings import ( |
| | CMSCI_MARGIN_ALPHA, |
| | CMSCI_CHANNEL_WEIGHT_TI, |
| | CMSCI_CALIBRATION_MODE, |
| | CMSCI_W_3D, |
| | CMSCI_GAMMA, |
| | ) |
| | from src.embeddings.aligned_embeddings import AlignedEmbedder |
| | from src.embeddings.similarity import cosine_similarity |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class CalibratedCoherenceEngine: |
| | """ |
| | Uncertainty-aware, geometrically-grounded tri-modal coherence engine. |
| | |
| | Computes cMSCI alongside legacy MSCI for comparison. |
| | |
| | Usage: |
| | engine = CalibratedCoherenceEngine() |
| | result = engine.evaluate("A beach at sunset", "beach.jpg", "waves.wav") |
| | print(result["cmsci"]) # Calibrated score |
| | print(result["msci"]) # Legacy score (for comparison) |
| | print(result["variant_scores"]) # Scores for each variant A-F |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | target_dim: int = 512, |
| | calibration_path: Optional[str] = None, |
| | exmcr_weights_path: Optional[str] = None, |
| | bridge_path: Optional[str] = None, |
| | prob_clip_adapter_path: Optional[str] = None, |
| | prob_clap_adapter_path: Optional[str] = None, |
| | negative_bank_enabled: bool = True, |
| | ): |
| | self.embedder = AlignedEmbedder(target_dim=target_dim) |
| |
|
| | |
| | self._calibration = None |
| | if calibration_path and Path(calibration_path).exists(): |
| | from src.coherence.calibration import CalibrationStore |
| | self._calibration = CalibrationStore.load(calibration_path) |
| | logger.info("Calibration loaded from %s", calibration_path) |
| |
|
| | |
| | self._negative_bank = None |
| | if negative_bank_enabled: |
| | try: |
| | from src.coherence.negative_bank import NegativeBank |
| | self._negative_bank = NegativeBank() |
| | except Exception as e: |
| | logger.warning("Negative bank disabled: %s", e) |
| |
|
| | |
| | self._exmcr = None |
| | if exmcr_weights_path: |
| | from src.embeddings.space_alignment import ExMCRProjector |
| | self._exmcr = ExMCRProjector(weights_path=exmcr_weights_path) |
| | if self._exmcr.is_identity: |
| | logger.info("Ex-MCR in identity mode (no weights)") |
| | else: |
| | logger.info("Ex-MCR projector active") |
| |
|
| | |
| | self._bridge = None |
| | if bridge_path and Path(bridge_path).exists(): |
| | from src.embeddings.cross_space_bridge import CrossSpaceBridge |
| | self._bridge = CrossSpaceBridge.load(bridge_path) |
| | logger.info("CrossSpaceBridge loaded from %s", bridge_path) |
| |
|
| | |
| | self._prob_clip = None |
| | self._prob_clap = None |
| | if prob_clip_adapter_path and Path(prob_clip_adapter_path).exists(): |
| | from src.embeddings.probabilistic_adapter import ProbabilisticAdapter |
| | self._prob_clip = ProbabilisticAdapter.load(prob_clip_adapter_path) |
| | logger.info("CLIP probabilistic adapter loaded") |
| | if prob_clap_adapter_path and Path(prob_clap_adapter_path).exists(): |
| | from src.embeddings.probabilistic_adapter import ProbabilisticAdapter |
| | self._prob_clap = ProbabilisticAdapter.load(prob_clap_adapter_path) |
| | logger.info("CLAP probabilistic adapter loaded") |
| |
|
| | def evaluate( |
| | self, |
| | text: str, |
| | image_path: Optional[str] = None, |
| | audio_path: Optional[str] = None, |
| | domain: str = "", |
| | n_mc_samples: int = 100, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Evaluate multimodal coherence with full cMSCI pipeline. |
| | |
| | Returns both legacy MSCI and cMSCI scores along with all |
| | intermediate computations for ablation analysis. |
| | |
| | Args: |
| | text: Text prompt. |
| | image_path: Path to image file. |
| | audio_path: Path to audio file. |
| | domain: Domain hint for negative bank (e.g., "nature"). |
| | n_mc_samples: Number of MC samples for uncertainty. |
| | |
| | Returns: |
| | Dict with keys: |
| | msci: Legacy MSCI score (weighted cosine average) |
| | cmsci: Calibrated cMSCI score |
| | scores: Raw pairwise scores (st_i, st_a, si_a) |
| | gram: Gramian volume scores |
| | calibration: Z-normalized scores |
| | contrastive: Contrastive margin results |
| | uncertainty: MC sampling uncertainty (if adapters loaded) |
| | variant_scores: Scores for each variant A-F |
| | """ |
| | |
| | emb_text_clip = self.embedder.embed_text(text) |
| | emb_text_clap = self.embedder.embed_text_for_audio(text) if audio_path else None |
| | emb_image = self.embedder.embed_image(image_path) if image_path else None |
| | emb_audio = self.embedder.embed_audio(audio_path) if audio_path else None |
| |
|
| | |
| | st_i = None |
| | st_a = None |
| | si_a = None |
| |
|
| | if emb_text_clip is not None and emb_image is not None: |
| | st_i = float(round(cosine_similarity(emb_text_clip, emb_image), 4)) |
| | if emb_text_clap is not None and emb_audio is not None: |
| | st_a = float(round(cosine_similarity(emb_text_clap, emb_audio), 4)) |
| |
|
| | available = {} |
| | if st_i is not None: |
| | available["st_i"] = st_i |
| | if st_a is not None: |
| | available["st_a"] = st_a |
| |
|
| | weights = {"st_i": 0.45, "st_a": 0.45, "si_a": 0.10} |
| | if len(available) >= 2: |
| | total_w = sum(weights[k] for k in available if k in weights) |
| | msci = sum(available[k] * weights[k] for k in available if k in weights) / max(total_w, 1e-6) |
| | elif len(available) == 1: |
| | msci = list(available.values())[0] |
| | else: |
| | msci = None |
| |
|
| | variant_a = msci |
| |
|
| | |
| | gram_ti = None |
| | gram_ta = None |
| | gram_tia = None |
| | gram_coherence_2way = None |
| |
|
| | if emb_text_clip is not None and emb_image is not None: |
| | gram_ti = gram_volume_2d(emb_text_clip, emb_image) |
| |
|
| | if emb_text_clap is not None and emb_audio is not None: |
| | gram_ta = gram_volume_2d(emb_text_clap, emb_audio) |
| |
|
| | |
| | gram_coherences = [] |
| | if gram_ti is not None: |
| | gram_coherences.append(normalized_gram_coherence(gram_ti)) |
| | if gram_ta is not None: |
| | gram_coherences.append(normalized_gram_coherence(gram_ta)) |
| |
|
| | if gram_coherences: |
| | gram_coherence_2way = float(np.mean(gram_coherences)) |
| |
|
| | variant_b = gram_coherence_2way |
| |
|
| | |
| | z_st_i = None |
| | z_st_a = None |
| | z_gram_ti = None |
| | z_gram_ta = None |
| | variant_c = variant_b |
| |
|
| | |
| | w_ti = CMSCI_CHANNEL_WEIGHT_TI |
| | cal_mode = CMSCI_CALIBRATION_MODE |
| |
|
| | if self._calibration is not None: |
| | if st_i is not None: |
| | z_st_i = self._calibration.normalize("st_i", st_i) |
| | if st_a is not None: |
| | z_st_a = self._calibration.normalize("st_a", st_a) |
| |
|
| | |
| | if gram_ti is not None: |
| | gram_coh_ti = normalized_gram_coherence(gram_ti) |
| | z_gram_ti = self._calibration.normalize("gram_coh_ti", gram_coh_ti) |
| | if gram_ta is not None: |
| | gram_coh_ta = normalized_gram_coherence(gram_ta) |
| | z_gram_ta = self._calibration.normalize("gram_coh_ta", gram_coh_ta) |
| |
|
| | |
| | if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None: |
| | z_mean = w_ti * z_gram_ti + (1.0 - w_ti) * z_gram_ta |
| | else: |
| | |
| | z_coherences = [] |
| | z_weights = [] |
| | if z_st_i is not None: |
| | z_coherences.append(z_st_i) |
| | z_weights.append(w_ti) |
| | if z_st_a is not None: |
| | z_coherences.append(z_st_a) |
| | z_weights.append(1.0 - w_ti) |
| |
|
| | if z_coherences: |
| | total_w = sum(z_weights) |
| | z_mean = sum(z * wt for z, wt in zip(z_coherences, z_weights)) / total_w |
| | else: |
| | z_mean = None |
| |
|
| | if z_mean is not None: |
| | |
| | variant_c = float(1.0 / (1.0 + np.exp(-z_mean))) |
| |
|
| | |
| | contrastive_result = None |
| | variant_d = variant_c |
| | margin_alpha = CMSCI_MARGIN_ALPHA |
| |
|
| | if self._negative_bank is not None and gram_coherence_2way is not None: |
| | matched_volume = float(np.mean([v for v in [gram_ti, gram_ta] if v is not None])) |
| | contrastive_result = self._negative_bank.compute_contrastive_margin( |
| | matched_volume=matched_volume, |
| | text_clip_emb=emb_text_clip, |
| | image_emb=emb_image, |
| | text_clap_emb=emb_text_clap, |
| | audio_emb=emb_audio, |
| | domain=domain, |
| | k=5, |
| | ) |
| |
|
| | if contrastive_result["n_negatives"] > 0: |
| | |
| | |
| | margin = contrastive_result["margin"] |
| |
|
| | |
| | if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None: |
| | z_mean_d = w_ti * z_gram_ti + (1.0 - w_ti) * z_gram_ta |
| | else: |
| | z_coherences_d = [] |
| | z_weights_d = [] |
| | if z_st_i is not None: |
| | z_coherences_d.append(z_st_i) |
| | z_weights_d.append(w_ti) |
| | elif st_i is not None: |
| | z_coherences_d.append(st_i) |
| | z_weights_d.append(w_ti) |
| | if z_st_a is not None: |
| | z_coherences_d.append(z_st_a) |
| | z_weights_d.append(1.0 - w_ti) |
| | elif st_a is not None: |
| | z_coherences_d.append(st_a) |
| | z_weights_d.append(1.0 - w_ti) |
| |
|
| | if z_coherences_d: |
| | total_wd = sum(z_weights_d) |
| | z_mean_d = sum(z * wt for z, wt in zip(z_coherences_d, z_weights_d)) / total_wd |
| | else: |
| | z_mean_d = None |
| |
|
| | if z_mean_d is not None: |
| | variant_d = float(1.0 / (1.0 + np.exp(-(z_mean_d + margin_alpha * margin)))) |
| | else: |
| | variant_d = variant_c |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | audio_projected = None |
| | variant_e = variant_d |
| | z_compl = None |
| | gram_ia_volume = None |
| | w_3d = CMSCI_W_3D |
| |
|
| | |
| | z_2d = None |
| | margin = 0.0 |
| | if contrastive_result is not None and contrastive_result["n_negatives"] > 0: |
| | margin = contrastive_result["margin"] |
| | if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None: |
| | z_2d = w_ti * z_gram_ti + (1.0 - w_ti) * z_gram_ta |
| | elif z_st_i is not None and z_st_a is not None: |
| | z_2d = w_ti * z_st_i + (1.0 - w_ti) * z_st_a |
| |
|
| | |
| | if self._exmcr is not None and not self._exmcr.is_identity: |
| | if emb_audio is not None: |
| | audio_projected = self._exmcr.project_audio(emb_audio) |
| | if emb_image is not None: |
| | si_a = float(round(cosine_similarity(emb_image, audio_projected), 4)) |
| | |
| | gram_ia_volume = gram_volume_2d(emb_image, audio_projected) |
| | if emb_text_clip is not None and emb_image is not None and audio_projected is not None: |
| | gram_tia = gram_volume_3d(emb_text_clip, emb_image, audio_projected) |
| |
|
| | |
| | |
| | if gram_ia_volume is not None and self._calibration is not None: |
| | gram_ia_coherence = normalized_gram_coherence(gram_ia_volume) |
| | z_gram_ia_coh = self._calibration.normalize("gram_coh_ia_exmcr", gram_ia_coherence) |
| | z_compl = -z_gram_ia_coh |
| |
|
| | |
| | if z_2d is not None: |
| | logit_e = z_2d + margin_alpha * margin |
| | if z_compl is not None: |
| | logit_e += w_3d * z_compl |
| | variant_e = float(1.0 / (1.0 + np.exp(-logit_e))) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | uncertainty_result = None |
| | variant_f = variant_e |
| | u_ti = None |
| | u_ta = None |
| | adaptive_w_ti = None |
| | gamma = CMSCI_GAMMA |
| |
|
| | if self._prob_clip is not None or self._prob_clap is not None: |
| | mc_volumes = [] |
| |
|
| | |
| | if self._prob_clip is not None and emb_text_clip is not None and emb_image is not None: |
| | u_text_clip = self._prob_clip.uncertainty(emb_text_clip) |
| | u_image_clip = self._prob_clip.uncertainty(emb_image) |
| | u_ti = float(np.mean([u_text_clip, u_image_clip])) |
| |
|
| | |
| | text_samples = self._prob_clip.sample(emb_text_clip, n_mc_samples) |
| | image_samples = self._prob_clip.sample(emb_image, n_mc_samples) |
| | for t_s, i_s in zip(text_samples, image_samples): |
| | mc_volumes.append(gram_volume_2d(t_s, i_s)) |
| |
|
| | if self._prob_clap is not None and emb_text_clap is not None and emb_audio is not None: |
| | u_text_clap = self._prob_clap.uncertainty(emb_text_clap) |
| | u_audio_clap = self._prob_clap.uncertainty(emb_audio) |
| | u_ta = float(np.mean([u_text_clap, u_audio_clap])) |
| |
|
| | text_samples = self._prob_clap.sample(emb_text_clap, n_mc_samples) |
| | audio_samples = self._prob_clap.sample(emb_audio, n_mc_samples) |
| | for t_s, a_s in zip(text_samples, audio_samples): |
| | mc_volumes.append(gram_volume_2d(t_s, a_s)) |
| |
|
| | |
| | if u_ti is not None and u_ta is not None and u_ti > 0 and u_ta > 0 and gamma > 0: |
| | inv_ti = 1.0 / u_ti |
| | inv_ta = 1.0 / u_ta |
| | adaptive_w = inv_ti / (inv_ti + inv_ta) |
| | w_ti_final = (1.0 - gamma) * w_ti + gamma * adaptive_w |
| | adaptive_w_ti = float(w_ti_final) |
| |
|
| | |
| | if cal_mode == "gram" and z_gram_ti is not None and z_gram_ta is not None: |
| | z_2d_adaptive = w_ti_final * z_gram_ti + (1.0 - w_ti_final) * z_gram_ta |
| | elif z_st_i is not None and z_st_a is not None: |
| | z_2d_adaptive = w_ti_final * z_st_i + (1.0 - w_ti_final) * z_st_a |
| | else: |
| | z_2d_adaptive = None |
| |
|
| | if z_2d_adaptive is not None: |
| | logit_f = z_2d_adaptive + margin_alpha * margin |
| | if z_compl is not None: |
| | logit_f += w_3d * z_compl |
| | variant_f = float(1.0 / (1.0 + np.exp(-logit_f))) |
| |
|
| | |
| | if mc_volumes: |
| | mc_coherences = [normalized_gram_coherence(v) for v in mc_volumes] |
| | mc_mean = float(np.mean(mc_coherences)) |
| | mc_std = float(np.std(mc_coherences)) |
| | mc_ci_lower = float(np.percentile(mc_coherences, 2.5)) |
| | mc_ci_upper = float(np.percentile(mc_coherences, 97.5)) |
| | else: |
| | mc_mean = mc_std = mc_ci_lower = mc_ci_upper = None |
| |
|
| | uncertainty_result = { |
| | "mc_mean": round(mc_mean, 4) if mc_mean is not None else None, |
| | "mc_std": round(mc_std, 4) if mc_std is not None else None, |
| | "mc_ci_lower": round(mc_ci_lower, 4) if mc_ci_lower is not None else None, |
| | "mc_ci_upper": round(mc_ci_upper, 4) if mc_ci_upper is not None else None, |
| | "u_ti": round(u_ti, 6) if u_ti is not None else None, |
| | "u_ta": round(u_ta, 6) if u_ta is not None else None, |
| | "adaptive_w_ti": round(adaptive_w_ti, 4) if adaptive_w_ti is not None else None, |
| | "gamma": gamma, |
| | "n_samples": n_mc_samples, |
| | } |
| |
|
| | |
| | |
| | cmsci = variant_f |
| | active_variant = "F" |
| |
|
| | if variant_f == variant_e: |
| | active_variant = "E" if variant_e != variant_d else "D" |
| | if variant_e == variant_d: |
| | active_variant = "D" if variant_d != variant_c else "C" |
| | if variant_d == variant_c: |
| | active_variant = "C" if variant_c != variant_b else "B" |
| | if variant_c == variant_b: |
| | active_variant = "B" if variant_b is not None else "A" |
| |
|
| | |
| | if cmsci is None: |
| | cmsci = msci |
| | active_variant = "A" |
| |
|
| | logger.info( |
| | "cMSCI = %.4f (variant %s) | MSCI = %s", |
| | cmsci if cmsci is not None else 0.0, |
| | active_variant, |
| | msci, |
| | ) |
| |
|
| | return { |
| | "cmsci": round(cmsci, 4) if cmsci is not None else None, |
| | "msci": round(msci, 4) if msci is not None else None, |
| | "active_variant": active_variant, |
| | "scores": { |
| | "st_i": st_i, |
| | "st_a": st_a, |
| | "si_a": si_a, |
| | }, |
| | "gram": { |
| | "text_image": round(gram_ti, 4) if gram_ti is not None else None, |
| | "text_audio": round(gram_ta, 4) if gram_ta is not None else None, |
| | "text_image_audio": round(gram_tia, 4) if gram_tia is not None else None, |
| | "coherence_2way": round(gram_coherence_2way, 4) if gram_coherence_2way is not None else None, |
| | }, |
| | "calibration": { |
| | "z_st_i": round(z_st_i, 4) if z_st_i is not None else None, |
| | "z_st_a": round(z_st_a, 4) if z_st_a is not None else None, |
| | "z_gram_ti": round(z_gram_ti, 4) if z_gram_ti is not None else None, |
| | "z_gram_ta": round(z_gram_ta, 4) if z_gram_ta is not None else None, |
| | "z_compl": round(z_compl, 4) if z_compl is not None else None, |
| | "gram_ia_volume": round(gram_ia_volume, 4) if gram_ia_volume is not None else None, |
| | "u_ti": round(u_ti, 6) if u_ti is not None else None, |
| | "u_ta": round(u_ta, 6) if u_ta is not None else None, |
| | "adaptive_w_ti": round(adaptive_w_ti, 4) if adaptive_w_ti is not None else None, |
| | "cal_mode": cal_mode if self._calibration is not None else None, |
| | "w_ti": w_ti, |
| | "w_3d": w_3d, |
| | "gamma": gamma, |
| | "margin_alpha": CMSCI_MARGIN_ALPHA if contrastive_result else None, |
| | }, |
| | "contrastive": contrastive_result, |
| | "uncertainty": uncertainty_result, |
| | "variant_scores": { |
| | "A_msci": round(variant_a, 4) if variant_a is not None else None, |
| | "B_gram": round(variant_b, 4) if variant_b is not None else None, |
| | "C_gram_znorm": round(variant_c, 4) if variant_c is not None else None, |
| | "D_gram_znorm_contrastive": round(variant_d, 4) if variant_d is not None else None, |
| | "E_gram_znorm_contrastive_exmcr": round(variant_e, 4) if variant_e is not None else None, |
| | "F_full_cmsci": round(variant_f, 4) if variant_f is not None else None, |
| | }, |
| | } |
| |
|
| | def evaluate_batch( |
| | self, |
| | items: List[Dict[str, str]], |
| | n_mc_samples: int = 100, |
| | ) -> List[Dict[str, Any]]: |
| | """ |
| | Evaluate a batch of (text, image_path, audio_path) triples. |
| | |
| | Args: |
| | items: List of dicts with keys "text", "image_path", "audio_path", "domain". |
| | n_mc_samples: MC samples per item. |
| | |
| | Returns: |
| | List of result dicts from evaluate(). |
| | """ |
| | results = [] |
| | for item in items: |
| | result = self.evaluate( |
| | text=item.get("text", ""), |
| | image_path=item.get("image_path"), |
| | audio_path=item.get("audio_path"), |
| | domain=item.get("domain", ""), |
| | n_mc_samples=n_mc_samples, |
| | ) |
| | results.append(result) |
| | return results |
| |
|