syscred_duplicate / syscred /eval_metrics.py
D Ф m i И i q ц e L Ф y e r
Deploy SysCRED v2.3.1 - GraphRAG + LIAR benchmark + TREC integration
8e97fc5
# -*- coding: utf-8 -*-
"""
Evaluation Metrics Module - SysCRED
====================================
Information Retrieval evaluation metrics for TREC-style experiments.
Metrics:
- MAP (Mean Average Precision)
- NDCG (Normalized Discounted Cumulative Gain)
- P@K (Precision at K)
- Recall@K
- MRR (Mean Reciprocal Rank)
Based on pytrec_eval for official TREC evaluation.
(c) Dominique S. Loyer - PhD Thesis Prototype
Citation Key: loyerEvaluationModelesRecherche2025
"""
import math
from typing import Dict, List, Tuple, Any
from collections import defaultdict
# Check for pytrec_eval
try:
import pytrec_eval
HAS_PYTREC_EVAL = True
except ImportError:
HAS_PYTREC_EVAL = False
print("[EvalMetrics] pytrec_eval not installed. Using built-in metrics.")
class EvaluationMetrics:
"""
IR Evaluation metrics using pytrec_eval or built-in implementations.
Supports TREC-style evaluation with:
- Official pytrec_eval (if available)
- Fallback pure-Python implementations
"""
def __init__(self):
"""Initialize the metrics calculator."""
self.use_pytrec = HAS_PYTREC_EVAL
# --- Built-in Metric Implementations ---
@staticmethod
def precision_at_k(retrieved: List[str], relevant: set, k: int) -> float:
"""
Calculate Precision@K.
P@K = |relevant ∩ retrieved[:k]| / k
"""
if k <= 0:
return 0.0
retrieved_k = retrieved[:k]
relevant_retrieved = len([d for d in retrieved_k if d in relevant])
return relevant_retrieved / k
@staticmethod
def recall_at_k(retrieved: List[str], relevant: set, k: int) -> float:
"""
Calculate Recall@K.
R@K = |relevant ∩ retrieved[:k]| / |relevant|
"""
if not relevant:
return 0.0
retrieved_k = retrieved[:k]
relevant_retrieved = len([d for d in retrieved_k if d in relevant])
return relevant_retrieved / len(relevant)
@staticmethod
def average_precision(retrieved: List[str], relevant: set) -> float:
"""
Calculate Average Precision for a single query.
AP = (1/|relevant|) × Σ (P@k × rel(k))
"""
if not relevant:
return 0.0
hits = 0
sum_precision = 0.0
for i, doc in enumerate(retrieved):
if doc in relevant:
hits += 1
sum_precision += hits / (i + 1)
return sum_precision / len(relevant)
@staticmethod
def dcg_at_k(retrieved: List[str], relevance: Dict[str, int], k: int) -> float:
"""
Calculate DCG@K (Discounted Cumulative Gain).
DCG@K = Σ (2^rel(i) - 1) / log2(i + 2)
"""
dcg = 0.0
for i, doc in enumerate(retrieved[:k]):
rel = relevance.get(doc, 0)
dcg += (2 ** rel - 1) / math.log2(i + 2)
return dcg
@staticmethod
def ndcg_at_k(retrieved: List[str], relevance: Dict[str, int], k: int) -> float:
"""
Calculate NDCG@K (Normalized DCG).
NDCG@K = DCG@K / IDCG@K
"""
dcg = EvaluationMetrics.dcg_at_k(retrieved, relevance, k)
# Calculate IDCG (ideal DCG)
sorted_rels = sorted(relevance.values(), reverse=True)[:k]
idcg = 0.0
for i, rel in enumerate(sorted_rels):
idcg += (2 ** rel - 1) / math.log2(i + 2)
return dcg / idcg if idcg > 0 else 0.0
@staticmethod
def reciprocal_rank(retrieved: List[str], relevant: set) -> float:
"""
Calculate Reciprocal Rank.
RR = 1 / rank of first relevant document
"""
for i, doc in enumerate(retrieved):
if doc in relevant:
return 1.0 / (i + 1)
return 0.0
@staticmethod
def mrr(retrieved: List[str], relevant: set) -> float:
"""
Alias for reciprocal_rank (Mean Reciprocal Rank for single query).
MRR = 1 / rank of first relevant document
"""
return EvaluationMetrics.reciprocal_rank(retrieved, relevant)
# --- TREC-Style Evaluation ---
def evaluate_run(
self,
run: Dict[str, List[Tuple[str, float]]],
qrels: Dict[str, Dict[str, int]],
metrics: List[str] = None
) -> Dict[str, Dict[str, float]]:
"""
Evaluate a run against qrels (relevance judgments).
Args:
run: {query_id: [(doc_id, score), ...]}
qrels: {query_id: {doc_id: relevance}}
metrics: List of metrics to compute
['map', 'ndcg', 'P_5', 'P_10', 'recall_100']
Returns:
{query_id: {metric: value}}
"""
if metrics is None:
metrics = ['map', 'ndcg', 'P_5', 'P_10', 'P_20', 'recall_100', 'recip_rank']
if self.use_pytrec and HAS_PYTREC_EVAL:
return self._evaluate_pytrec(run, qrels, metrics)
else:
return self._evaluate_builtin(run, qrels, metrics)
def _evaluate_pytrec(
self,
run: Dict[str, List[Tuple[str, float]]],
qrels: Dict[str, Dict[str, int]],
metrics: List[str]
) -> Dict[str, Dict[str, float]]:
"""Evaluate using pytrec_eval."""
# Convert run format for pytrec_eval
pytrec_run = {}
for qid, docs in run.items():
pytrec_run[qid] = {doc_id: score for doc_id, score in docs}
# Create evaluator
evaluator = pytrec_eval.RelevanceEvaluator(qrels, set(metrics))
# Evaluate
results = evaluator.evaluate(pytrec_run)
return results
def _evaluate_builtin(
self,
run: Dict[str, List[Tuple[str, float]]],
qrels: Dict[str, Dict[str, int]],
metrics: List[str]
) -> Dict[str, Dict[str, float]]:
"""Evaluate using built-in implementations."""
results = {}
for qid, docs_scores in run.items():
if qid not in qrels:
continue
q_results = {}
retrieved = [doc_id for doc_id, _ in docs_scores]
relevance = qrels[qid]
relevant = set(doc_id for doc_id, rel in relevance.items() if rel > 0)
for metric in metrics:
if metric == 'map':
q_results['map'] = self.average_precision(retrieved, relevant)
elif metric == 'ndcg':
q_results['ndcg'] = self.ndcg_at_k(retrieved, relevance, 1000)
elif metric.startswith('ndcg_cut_'):
k = int(metric.split('_')[-1])
q_results[metric] = self.ndcg_at_k(retrieved, relevance, k)
elif metric.startswith('P_'):
k = int(metric.split('_')[-1])
q_results[metric] = self.precision_at_k(retrieved, relevant, k)
elif metric.startswith('recall_'):
k = int(metric.split('_')[-1])
q_results[metric] = self.recall_at_k(retrieved, relevant, k)
elif metric == 'recip_rank':
q_results['recip_rank'] = self.reciprocal_rank(retrieved, relevant)
results[qid] = q_results
return results
def compute_aggregate(
self,
results: Dict[str, Dict[str, float]]
) -> Dict[str, float]:
"""
Compute aggregate metrics across all queries.
Returns mean values for each metric.
"""
if not results:
return {}
aggregated = defaultdict(list)
for qid, metrics in results.items():
for metric, value in metrics.items():
aggregated[metric].append(value)
return {metric: sum(values) / len(values)
for metric, values in aggregated.items()}
def format_results(
self,
results: Dict[str, Dict[str, float]],
include_per_query: bool = False
) -> str:
"""Format results as a readable string."""
lines = []
# Aggregate
agg = self.compute_aggregate(results)
lines.append("=" * 50)
lines.append("AGGREGATE METRICS")
lines.append("=" * 50)
for metric, value in sorted(agg.items()):
lines.append(f" {metric:20s}: {value:.4f}")
# Per-query (optional)
if include_per_query:
lines.append("")
lines.append("=" * 50)
lines.append("PER-QUERY METRICS")
lines.append("=" * 50)
for qid in sorted(results.keys()):
lines.append(f"\nQuery {qid}:")
for metric, value in sorted(results[qid].items()):
lines.append(f" {metric:20s}: {value:.4f}")
return '\n'.join(lines)
def parse_qrels_file(filepath: str) -> Dict[str, Dict[str, int]]:
"""
Parse a TREC qrels file.
Format: query_id 0 doc_id relevance
"""
qrels = defaultdict(dict)
with open(filepath, 'r') as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 4:
qid, _, docid, rel = parts[:4]
qrels[qid][docid] = int(rel)
return dict(qrels)
def parse_run_file(filepath: str) -> Dict[str, List[Tuple[str, float]]]:
"""
Parse a TREC run file.
Format: query_id Q0 doc_id rank score run_tag
"""
run = defaultdict(list)
with open(filepath, 'r') as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 5:
qid, _, docid, rank, score = parts[:5]
run[qid].append((docid, float(score)))
# Sort by score descending
for qid in run:
run[qid].sort(key=lambda x: x[1], reverse=True)
return dict(run)
# --- Testing ---
if __name__ == "__main__":
print("=" * 60)
print("SysCRED Evaluation Metrics - Tests")
print("=" * 60)
metrics = EvaluationMetrics()
print(f"\nUsing pytrec_eval: {metrics.use_pytrec}")
# Test data
retrieved = ['doc1', 'doc2', 'doc3', 'doc4', 'doc5', 'doc6', 'doc7', 'doc8', 'doc9', 'doc10']
relevant = {'doc1', 'doc3', 'doc5', 'doc8'}
relevance = {'doc1': 2, 'doc3': 1, 'doc5': 2, 'doc8': 1}
print("\n--- Built-in Metrics Tests ---")
print(f"P@5: {metrics.precision_at_k(retrieved, relevant, 5):.4f}")
print(f"P@10: {metrics.precision_at_k(retrieved, relevant, 10):.4f}")
print(f"R@5: {metrics.recall_at_k(retrieved, relevant, 5):.4f}")
print(f"R@10: {metrics.recall_at_k(retrieved, relevant, 10):.4f}")
print(f"AP: {metrics.average_precision(retrieved, relevant):.4f}")
print(f"NDCG@10: {metrics.ndcg_at_k(retrieved, relevance, 10):.4f}")
print(f"RR: {metrics.reciprocal_rank(retrieved, relevant):.4f}")
# Test run evaluation
print("\n--- Run Evaluation Test ---")
run = {
'Q1': [(doc, 10-i) for i, doc in enumerate(retrieved)],
'Q2': [('doc2', 10), ('doc1', 9), ('doc4', 8), ('doc3', 7)]
}
qrels = {
'Q1': relevance,
'Q2': {'doc1': 1, 'doc3': 2}
}
results = metrics.evaluate_run(run, qrels)
print(metrics.format_results(results))
print("\n" + "=" * 60)
print("Tests complete!")
print("=" * 60)