| """ |
| Human Evaluation Analysis Module |
| |
| Analyzes human evaluation data to compute: |
| - Intra-rater reliability (Cohen's kappa for self-agreement) |
| - Inter-rater reliability (Krippendorff's alpha for multi-rater agreement) |
| - Descriptive statistics |
| - Correlation with MSCI scores (aggregated across raters) |
| """ |
|
|
| from __future__ import annotations |
|
|
| from collections import defaultdict |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple, Any |
| import json |
| import numpy as np |
| from scipy import stats |
|
|
| from src.evaluation.human_eval_schema import ( |
| EvaluationSession, |
| HumanEvaluation, |
| ReliabilityMetrics, |
| ) |
|
|
|
|
| def compute_cohens_kappa(ratings1: List[int], ratings2: List[int]) -> float: |
| """ |
| Compute Cohen's kappa for two sets of ratings. |
| |
| Args: |
| ratings1: First set of ratings |
| ratings2: Second set of ratings (same samples, different time) |
| |
| Returns: |
| Cohen's kappa coefficient |
| """ |
| if len(ratings1) != len(ratings2): |
| raise ValueError("Rating lists must have the same length") |
|
|
| n = len(ratings1) |
| if n == 0: |
| return 0.0 |
|
|
| |
| categories = sorted(set(ratings1) | set(ratings2)) |
| k = len(categories) |
| cat_to_idx = {cat: i for i, cat in enumerate(categories)} |
|
|
| confusion = np.zeros((k, k)) |
| for r1, r2 in zip(ratings1, ratings2): |
| confusion[cat_to_idx[r1], cat_to_idx[r2]] += 1 |
|
|
| |
| p_o = np.trace(confusion) / n |
|
|
| |
| row_sums = confusion.sum(axis=1) |
| col_sums = confusion.sum(axis=0) |
| p_e = np.sum(row_sums * col_sums) / (n * n) |
|
|
| |
| if p_e == 1.0: |
| return 1.0 |
| return (p_o - p_e) / (1 - p_e) |
|
|
|
|
| def compute_weighted_kappa( |
| ratings1: List[int], ratings2: List[int], weights: str = "quadratic" |
| ) -> float: |
| """ |
| Compute weighted Cohen's kappa for ordinal data. |
| |
| Args: |
| ratings1: First set of ratings |
| ratings2: Second set of ratings |
| weights: "linear" or "quadratic" weighting scheme |
| |
| Returns: |
| Weighted kappa coefficient |
| """ |
| if len(ratings1) != len(ratings2): |
| raise ValueError("Rating lists must have the same length") |
|
|
| n = len(ratings1) |
| if n == 0: |
| return 0.0 |
|
|
| |
| all_ratings = set(ratings1) | set(ratings2) |
| min_cat, max_cat = min(all_ratings), max(all_ratings) |
| categories = list(range(min_cat, max_cat + 1)) |
| k = len(categories) |
| cat_to_idx = {cat: i for i, cat in enumerate(categories)} |
|
|
| |
| confusion = np.zeros((k, k)) |
| for r1, r2 in zip(ratings1, ratings2): |
| confusion[cat_to_idx[r1], cat_to_idx[r2]] += 1 |
|
|
| |
| weight_matrix = np.zeros((k, k)) |
| for i in range(k): |
| for j in range(k): |
| if weights == "linear": |
| weight_matrix[i, j] = abs(i - j) / (k - 1) |
| else: |
| weight_matrix[i, j] = ((i - j) ** 2) / ((k - 1) ** 2) |
|
|
| |
| confusion = confusion / n |
|
|
| |
| row_sums = confusion.sum(axis=1) |
| col_sums = confusion.sum(axis=0) |
|
|
| |
| expected = np.outer(row_sums, col_sums) |
|
|
| |
| w_observed = np.sum(weight_matrix * confusion) |
| w_expected = np.sum(weight_matrix * expected) |
|
|
| if w_expected == 0: |
| return 1.0 |
| return 1 - (w_observed / w_expected) |
|
|
|
|
| def compute_intra_rater_reliability( |
| session: EvaluationSession, |
| ) -> Optional[ReliabilityMetrics]: |
| """ |
| Compute intra-rater reliability from re-rated samples. |
| |
| Args: |
| session: Evaluation session containing evaluations |
| |
| Returns: |
| ReliabilityMetrics or None if no re-ratings available |
| """ |
| |
| first_ratings: Dict[str, HumanEvaluation] = {} |
| reratings: Dict[str, HumanEvaluation] = {} |
|
|
| for eval in session.evaluations: |
| if eval.sample_id in session.rerating_sample_ids: |
| if eval.is_rerating: |
| reratings[eval.sample_id] = eval |
| else: |
| first_ratings[eval.sample_id] = eval |
|
|
| |
| paired_ids = set(first_ratings.keys()) & set(reratings.keys()) |
| if not paired_ids: |
| return None |
|
|
| |
| dimensions = [ |
| "text_image_coherence", |
| "text_audio_coherence", |
| "image_audio_coherence", |
| "overall_coherence", |
| ] |
|
|
| all_first = [] |
| all_second = [] |
|
|
| for sample_id in paired_ids: |
| first = first_ratings[sample_id] |
| second = reratings[sample_id] |
|
|
| for dim in dimensions: |
| all_first.append(getattr(first, dim)) |
| all_second.append(getattr(second, dim)) |
|
|
| |
| kappa = compute_cohens_kappa(all_first, all_second) |
| weighted_kappa = compute_weighted_kappa(all_first, all_second, weights="quadratic") |
|
|
| |
| agreements = sum(1 for f, s in zip(all_first, all_second) if f == s) |
| percent_agreement = agreements / len(all_first) * 100 |
|
|
| |
| mad = np.mean([abs(f - s) for f, s in zip(all_first, all_second)]) |
|
|
| return ReliabilityMetrics( |
| kappa=kappa, |
| percent_agreement=percent_agreement, |
| weighted_kappa=weighted_kappa, |
| mean_absolute_difference=mad, |
| n_reratings=len(paired_ids), |
| ) |
|
|
|
|
| @dataclass |
| class HumanEvalSummary: |
| """Summary statistics for human evaluations.""" |
| n_samples: int |
| n_evaluations: int |
|
|
| |
| text_image_mean: float |
| text_image_std: float |
| text_audio_mean: float |
| text_audio_std: float |
| image_audio_mean: float |
| image_audio_std: float |
| overall_mean: float |
| overall_std: float |
|
|
| |
| mean_weighted_score: float |
| std_weighted_score: float |
|
|
| |
| reliability: Optional[ReliabilityMetrics] |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary.""" |
| return { |
| "n_samples": self.n_samples, |
| "n_evaluations": self.n_evaluations, |
| "text_image": {"mean": self.text_image_mean, "std": self.text_image_std}, |
| "text_audio": {"mean": self.text_audio_mean, "std": self.text_audio_std}, |
| "image_audio": {"mean": self.image_audio_mean, "std": self.image_audio_std}, |
| "overall": {"mean": self.overall_mean, "std": self.overall_std}, |
| "weighted_score": {"mean": self.mean_weighted_score, "std": self.std_weighted_score}, |
| "reliability": self.reliability.to_dict() if self.reliability else None, |
| } |
|
|
|
|
| def compute_human_eval_summary(session: EvaluationSession) -> HumanEvalSummary: |
| """ |
| Compute summary statistics for human evaluations. |
| |
| Args: |
| session: Evaluation session |
| |
| Returns: |
| HumanEvalSummary with descriptive statistics |
| """ |
| |
| evals = [e for e in session.evaluations if not e.is_rerating] |
|
|
| if not evals: |
| raise ValueError("No evaluations found in session") |
|
|
| |
| ti = [e.text_image_coherence for e in evals] |
| ta = [e.text_audio_coherence for e in evals] |
| ia = [e.image_audio_coherence for e in evals] |
| overall = [e.overall_coherence for e in evals] |
| weighted = [e.weighted_score() for e in evals] |
|
|
| |
| reliability = compute_intra_rater_reliability(session) |
|
|
| return HumanEvalSummary( |
| n_samples=len(set(e.sample_id for e in evals)), |
| n_evaluations=len(evals), |
| text_image_mean=np.mean(ti), |
| text_image_std=np.std(ti), |
| text_audio_mean=np.mean(ta), |
| text_audio_std=np.std(ta), |
| image_audio_mean=np.mean(ia), |
| image_audio_std=np.std(ia), |
| overall_mean=np.mean(overall), |
| overall_std=np.std(overall), |
| mean_weighted_score=np.mean(weighted), |
| std_weighted_score=np.std(weighted), |
| reliability=reliability, |
| ) |
|
|
|
|
| def compute_human_msci_correlation( |
| session: EvaluationSession, |
| msci_scores: Optional[Dict[str, float]] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Compute correlation between human ratings and MSCI scores. |
| |
| Args: |
| session: Evaluation session with human ratings |
| msci_scores: Optional dict mapping sample_id to MSCI score. |
| If None, uses msci_score from sample metadata. |
| |
| Returns: |
| Dictionary with correlation statistics |
| """ |
| |
| human_weighted = [] |
| human_overall = [] |
| msci_values = [] |
|
|
| sample_msci = {} |
| if msci_scores: |
| sample_msci = msci_scores |
| else: |
| |
| for sample in session.samples: |
| if sample.msci_score is not None: |
| sample_msci[sample.sample_id] = sample.msci_score |
|
|
| for eval in session.evaluations: |
| if eval.is_rerating: |
| continue |
|
|
| if eval.sample_id in sample_msci: |
| human_weighted.append(eval.weighted_score()) |
| human_overall.append(eval.overall_coherence / 5.0) |
| msci_values.append(sample_msci[eval.sample_id]) |
|
|
| if len(msci_values) < 3: |
| return { |
| "error": "Insufficient paired data for correlation", |
| "n_paired": len(msci_values), |
| } |
|
|
| |
| spearman_weighted = stats.spearmanr(msci_values, human_weighted) |
| spearman_overall = stats.spearmanr(msci_values, human_overall) |
|
|
| |
| pearson_weighted = stats.pearsonr(msci_values, human_weighted) |
| pearson_overall = stats.pearsonr(msci_values, human_overall) |
|
|
| return { |
| "n_paired": len(msci_values), |
| "msci_vs_weighted_human": { |
| "spearman_rho": spearman_weighted.correlation, |
| "spearman_p": spearman_weighted.pvalue, |
| "pearson_r": pearson_weighted.statistic, |
| "pearson_p": pearson_weighted.pvalue, |
| }, |
| "msci_vs_overall_human": { |
| "spearman_rho": spearman_overall.correlation, |
| "spearman_p": spearman_overall.pvalue, |
| "pearson_r": pearson_overall.statistic, |
| "pearson_p": pearson_overall.pvalue, |
| }, |
| "interpretation": _interpret_correlation(spearman_weighted.correlation, spearman_weighted.pvalue), |
| } |
|
|
|
|
| def _interpret_correlation(rho: float, p: float, alpha: float = 0.05) -> str: |
| """Generate human-readable interpretation of correlation.""" |
| if p >= alpha: |
| return f"No significant correlation (ρ={rho:.3f}, p={p:.4f} ≥ {alpha})" |
|
|
| strength = "weak" if abs(rho) < 0.3 else "moderate" if abs(rho) < 0.6 else "strong" |
| direction = "positive" if rho > 0 else "negative" |
|
|
| return f"Significant {strength} {direction} correlation (ρ={rho:.3f}, p={p:.4f})" |
|
|
|
|
| def analyze_by_condition(session: EvaluationSession) -> Dict[str, Dict[str, Any]]: |
| """ |
| Analyze human ratings grouped by experimental condition. |
| |
| Args: |
| session: Evaluation session |
| |
| Returns: |
| Dictionary with statistics per condition |
| """ |
| |
| by_condition: Dict[str, List[HumanEvaluation]] = defaultdict(list) |
|
|
| |
| sample_to_condition = {s.sample_id: s.condition for s in session.samples} |
|
|
| for eval in session.evaluations: |
| if eval.is_rerating: |
| continue |
| condition = sample_to_condition.get(eval.sample_id, "unknown") |
| by_condition[condition].append(eval) |
|
|
| results = {} |
|
|
| for condition, evals in by_condition.items(): |
| if not evals: |
| continue |
|
|
| weighted = [e.weighted_score() for e in evals] |
| overall = [e.overall_coherence for e in evals] |
|
|
| results[condition] = { |
| "n": len(evals), |
| "weighted_score": { |
| "mean": np.mean(weighted), |
| "std": np.std(weighted), |
| "median": np.median(weighted), |
| }, |
| "overall_coherence": { |
| "mean": np.mean(overall), |
| "std": np.std(overall), |
| "median": np.median(overall), |
| }, |
| } |
|
|
| return results |
|
|
|
|
| def generate_analysis_report( |
| session: EvaluationSession, |
| output_path: Optional[Path] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Generate a comprehensive analysis report. |
| |
| Args: |
| session: Evaluation session |
| output_path: Optional path to save JSON report |
| |
| Returns: |
| Dictionary with complete analysis |
| """ |
| report = { |
| "session_id": session.session_id, |
| "evaluator_id": session.evaluator_id, |
| "started_at": session.started_at, |
| "completed_at": session.completed_at, |
| "summary": compute_human_eval_summary(session).to_dict(), |
| "by_condition": analyze_by_condition(session), |
| "msci_correlation": compute_human_msci_correlation(session), |
| } |
|
|
| if output_path: |
| output_path = Path(output_path) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| with output_path.open("w", encoding="utf-8") as f: |
| json.dump(report, f, indent=2, ensure_ascii=False) |
|
|
| return report |
|
|
|
|
| |
| |
| |
|
|
| def compute_krippendorff_alpha( |
| data_matrix: np.ndarray, |
| level: str = "ordinal", |
| ) -> float: |
| """ |
| Compute Krippendorff's alpha for inter-rater reliability. |
| |
| Args: |
| data_matrix: Shape (n_raters, n_items). Use np.nan for missing values. |
| level: "nominal", "ordinal", or "interval" measurement level. |
| |
| Returns: |
| Krippendorff's alpha coefficient (-1 to 1, >0.667 acceptable). |
| """ |
| n_raters, n_items = data_matrix.shape |
|
|
| |
| |
| all_values = [] |
| pairs_observed = [] |
|
|
| for item in range(n_items): |
| values = data_matrix[:, item] |
| valid = values[~np.isnan(values)] |
| if len(valid) < 2: |
| continue |
| all_values.extend(valid) |
| |
| for i in range(len(valid)): |
| for j in range(i + 1, len(valid)): |
| pairs_observed.append((valid[i], valid[j])) |
|
|
| if not pairs_observed: |
| return 0.0 |
|
|
| all_values = np.array(all_values) |
|
|
| |
| if level == "nominal": |
| def dist(a, b): |
| return 0.0 if a == b else 1.0 |
| elif level == "ordinal": |
| |
| unique_vals = np.sort(np.unique(all_values)) |
| val_to_rank = {v: i for i, v in enumerate(unique_vals)} |
| def dist(a, b): |
| return (val_to_rank[a] - val_to_rank[b]) ** 2 |
| else: |
| def dist(a, b): |
| return (a - b) ** 2 |
|
|
| |
| D_o = np.mean([dist(a, b) for a, b in pairs_observed]) |
|
|
| |
| n_total = len(all_values) |
| D_e_sum = 0.0 |
| count = 0 |
| for i in range(n_total): |
| for j in range(i + 1, n_total): |
| D_e_sum += dist(all_values[i], all_values[j]) |
| count += 1 |
|
|
| D_e = D_e_sum / count if count > 0 else 0.0 |
|
|
| if D_e == 0: |
| return 1.0 |
|
|
| alpha = 1.0 - D_o / D_e |
| return alpha |
|
|
|
|
| def aggregate_multi_rater_sessions( |
| sessions: List[EvaluationSession], |
| ) -> Dict[str, Dict[str, Any]]: |
| """ |
| Aggregate evaluations across multiple raters for the same samples. |
| |
| Args: |
| sessions: List of completed evaluation sessions (same sample set). |
| |
| Returns: |
| Dictionary mapping sample_id to aggregated scores. |
| """ |
| |
| by_sample: Dict[str, List[HumanEvaluation]] = defaultdict(list) |
|
|
| for session in sessions: |
| for ev in session.evaluations: |
| if ev.is_rerating: |
| continue |
| by_sample[ev.sample_id].append(ev) |
|
|
| |
| aggregated = {} |
| for sample_id, evals in by_sample.items(): |
| ti = [e.text_image_coherence for e in evals] |
| ta = [e.text_audio_coherence for e in evals] |
| ia = [e.image_audio_coherence for e in evals] |
| overall = [e.overall_coherence for e in evals] |
| weighted = [e.weighted_score() for e in evals] |
|
|
| aggregated[sample_id] = { |
| "n_raters": len(evals), |
| "text_image": {"mean": float(np.mean(ti)), "std": float(np.std(ti))}, |
| "text_audio": {"mean": float(np.mean(ta)), "std": float(np.std(ta))}, |
| "image_audio": {"mean": float(np.mean(ia)), "std": float(np.std(ia))}, |
| "overall": {"mean": float(np.mean(overall)), "std": float(np.std(overall))}, |
| "weighted_score": {"mean": float(np.mean(weighted)), "std": float(np.std(weighted))}, |
| "evaluator_ids": [e.evaluator_id for e in evals], |
| } |
|
|
| return aggregated |
|
|
|
|
| def compute_inter_rater_reliability( |
| sessions: List[EvaluationSession], |
| ) -> Dict[str, Any]: |
| """ |
| Compute inter-rater reliability across multiple evaluators. |
| |
| Args: |
| sessions: List of evaluation sessions (same sample set). |
| |
| Returns: |
| Dictionary with Krippendorff's alpha per dimension and overall. |
| """ |
| |
| sample_sets = [] |
| for session in sessions: |
| ids = {e.sample_id for e in session.evaluations if not e.is_rerating} |
| sample_sets.append(ids) |
|
|
| common_ids = sorted(set.intersection(*sample_sets)) if sample_sets else [] |
|
|
| if len(common_ids) < 3: |
| return {"error": "Too few common samples for reliability analysis", |
| "n_common": len(common_ids)} |
|
|
| n_raters = len(sessions) |
| n_items = len(common_ids) |
| id_to_idx = {sid: i for i, sid in enumerate(common_ids)} |
|
|
| dimensions = { |
| "text_image": "text_image_coherence", |
| "text_audio": "text_audio_coherence", |
| "image_audio": "image_audio_coherence", |
| "overall": "overall_coherence", |
| } |
|
|
| results = {"n_raters": n_raters, "n_common_samples": n_items} |
|
|
| for dim_name, attr_name in dimensions.items(): |
| matrix = np.full((n_raters, n_items), np.nan) |
|
|
| for rater_idx, session in enumerate(sessions): |
| for ev in session.evaluations: |
| if ev.is_rerating: |
| continue |
| if ev.sample_id in id_to_idx: |
| matrix[rater_idx, id_to_idx[ev.sample_id]] = getattr(ev, attr_name) |
|
|
| alpha = compute_krippendorff_alpha(matrix, level="ordinal") |
| results[dim_name] = { |
| "krippendorff_alpha": round(alpha, 4), |
| "interpretation": _interpret_alpha(alpha), |
| } |
|
|
| |
| w_matrix = np.full((n_raters, n_items), np.nan) |
| for rater_idx, session in enumerate(sessions): |
| for ev in session.evaluations: |
| if ev.is_rerating: |
| continue |
| if ev.sample_id in id_to_idx: |
| w_matrix[rater_idx, id_to_idx[ev.sample_id]] = ev.weighted_score() |
|
|
| alpha_w = compute_krippendorff_alpha(w_matrix, level="interval") |
| results["weighted_score"] = { |
| "krippendorff_alpha": round(alpha_w, 4), |
| "interpretation": _interpret_alpha(alpha_w), |
| } |
|
|
| return results |
|
|
|
|
| def _interpret_alpha(alpha: float) -> str: |
| """Interpret Krippendorff's alpha value.""" |
| if alpha >= 0.80: |
| return "good agreement" |
| elif alpha >= 0.667: |
| return "acceptable agreement" |
| elif alpha >= 0.40: |
| return "moderate agreement" |
| else: |
| return "poor agreement" |
|
|
|
|
| def compute_multi_rater_msci_correlation( |
| sessions: List[EvaluationSession], |
| sample_msci: Dict[str, float], |
| ) -> Dict[str, Any]: |
| """ |
| Compute Spearman correlation between average human scores and MSCI. |
| |
| Args: |
| sessions: List of evaluation sessions. |
| sample_msci: Mapping sample_id -> MSCI score. |
| |
| Returns: |
| Correlation statistics with bootstrap 95% CI. |
| """ |
| aggregated = aggregate_multi_rater_sessions(sessions) |
|
|
| human_scores = [] |
| msci_scores = [] |
|
|
| for sample_id, agg in aggregated.items(): |
| if sample_id in sample_msci: |
| human_scores.append(agg["weighted_score"]["mean"]) |
| msci_scores.append(sample_msci[sample_id]) |
|
|
| if len(human_scores) < 5: |
| return {"error": "Too few paired samples", "n_paired": len(human_scores)} |
|
|
| human_arr = np.array(human_scores) |
| msci_arr = np.array(msci_scores) |
|
|
| |
| spearman = stats.spearmanr(msci_arr, human_arr) |
| |
| pearson = stats.pearsonr(msci_arr, human_arr) |
|
|
| |
| n_boot = 10000 |
| rng = np.random.default_rng(42) |
| boot_rhos = [] |
| for _ in range(n_boot): |
| idx = rng.choice(len(human_arr), size=len(human_arr), replace=True) |
| r, _ = stats.spearmanr(msci_arr[idx], human_arr[idx]) |
| boot_rhos.append(r) |
| ci_lower = float(np.percentile(boot_rhos, 2.5)) |
| ci_upper = float(np.percentile(boot_rhos, 97.5)) |
|
|
| return { |
| "n_paired": len(human_scores), |
| "spearman_rho": round(float(spearman.correlation), 4), |
| "spearman_p": float(spearman.pvalue), |
| "spearman_95ci": [round(ci_lower, 4), round(ci_upper, 4)], |
| "pearson_r": round(float(pearson.statistic), 4), |
| "pearson_p": float(pearson.pvalue), |
| "interpretation": _interpret_correlation( |
| float(spearman.correlation), float(spearman.pvalue) |
| ), |
| } |
|
|