NLP-RAG / retriever /evaluator.py
Qar-Raz's picture
hf-space: deploy branch without frontend/data/results
c7256ee
import re
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from openai import OpenAI
from concurrent.futures import ThreadPoolExecutor, as_completed
# ------------------------------------------------------------------
# OpenRouter Judge Wrapper
# ------------------------------------------------------------------
class GroqJudge:
def __init__(self, api_key: str, model: str = "deepseek/deepseek-v3.2",):
"""
Wraps OpenRouter's chat completions to match the .generate(prompt) interface
expected by RAGEvaluator.
Args:
api_key: Your OpenRouter API key (https://openrouter.ai)
model: OpenRouter model to use (primary model with fallback support)
"""
self.client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
self.model = model
# Fallback models in order of preference (OpenRouter free models)
self.fallback_models = [
"deepseek/deepseek-v3.2",
"qwen/qwen3.6-plus-preview:free",
"stepfun/step-3.5-flash:free",
"nvidia/nemotron-3-super-120b-a12b:free",
"z-ai/glm-4.5-air:free",
"nvidia/nemotron-3-nano-30b-a3b:free",
"arcee-ai/trinity-mini:free",
"xiaomi/mimo-v2-flash"
]
def generate(self, prompt: str) -> str:
"""Generate response with fallback support for multiple models."""
last_error = None
# Try primary model first, then fallbacks
models_to_try = [self.model] + [m for m in self.fallback_models if m != self.model]
for model_name in models_to_try:
try:
response = self.client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": prompt}],
)
content = response.choices[0].message.content
if content is None:
raise ValueError(f"Model {model_name} returned None content")
return content.strip()
except Exception as e:
last_error = e
# If rate limited or model unavailable, try next model
if "429" in str(e) or "rate_limit" in str(e).lower() or "model" in str(e).lower():
continue
# For other errors, raise immediately
raise
# If all models fail, raise the last error
raise last_error
# ------------------------------------------------------------------
# RAG Evaluator
# ------------------------------------------------------------------
class RAGEvaluator:
def __init__(self, judge_model: str, embedding_model, api_key: str, verbose=True):
"""
judge_model: Model name string passed to OpenRouterJudge, must match cfg.gen['judge_model']
e.g. "stepfun/step-3.5-flash:free", "nvidia/nemotron-3-super-120b-a12b:free"
embedding_model: The proc.encoder (SentenceTransformer) for similarity checks
api_key: OpenRouter API key (https://openrouter.ai)
verbose: If True, prints progress via internal helpers
"""
self.judge = GroqJudge(api_key=api_key, model=judge_model)
self.encoder = embedding_model
self.verbose = verbose
# ------------------------------------------------------------------
# 1. FAITHFULNESS: Claim Extraction & Verification
# ------------------------------------------------------------------
def evaluate_faithfulness(self, answer: str, context_list: list[str], strict: bool = True) -> dict:
"""
Args:
strict: If True, verifies each claim against chunks individually
(more API calls but catches vague batch verdicts).
If False, uses single batched verification call.
"""
if self.verbose:
self._print_extraction_header(len(answer), strict=strict)
# --- Step A: Extraction ---
extraction_prompt = (
"Extract a list of independent factual claims from the following answer.\n"
"Rules:\n"
"- Each claim must be specific and verifiable — include numbers, names, or concrete details where present\n"
"- Vague claims like 'the model performs well' or 'this improves results' are NOT acceptable\n"
"- Do NOT include claims about what the context does or does not contain\n"
"- Do NOT include introductory text, numbering, or bullet points\n"
"- Do NOT rephrase or merge claims\n"
"- One claim per line only\n\n"
f"Answer: {answer}"
)
raw_claims = self.judge.generate(extraction_prompt)
# Filter out short lines, preamble, and lines ending with ':'
claims = [
c.strip() for c in raw_claims.split('\n')
if len(c.strip()) > 20 and not c.strip().endswith(':')
]
if not claims:
return {"score": 0, "details": []}
# --- Step B: Verification ---
if strict:
# Per-chunk: claim must be explicitly supported by at least one chunk
# Parallelize across claims as well
def verify_claim_wrapper(args):
i, claim = args
return i, self._verify_claim_against_chunks(claim, context_list)
with ThreadPoolExecutor(max_workers=min(len(claims), 5)) as executor:
futures = [executor.submit(verify_claim_wrapper, (i, claim)) for i, claim in enumerate(claims)]
verdicts = {i: result for future in as_completed(futures) for i, result in [future.result()]}
else:
# Batch: all chunks joined, strict burden-of-proof prompt
combined_context = "\n".join(context_list)
if len(combined_context) > 6000:
combined_context = combined_context[:6000]
claims_formatted = "\n".join([f"{i+1}. {c}" for i, c in enumerate(claims)])
batch_prompt = (
f"Context:\n{combined_context}\n\n"
f"For each claim, respond YES only if the claim is EXPLICITLY and DIRECTLY "
f"supported by the context above. Respond NO if the claim is inferred, assumed, "
f"or not clearly stated in the context.\n\n"
f"Format strictly as:\n"
f"1: YES\n"
f"2: NO\n\n"
f"Claims:\n{claims_formatted}"
)
raw_verdicts = self.judge.generate(batch_prompt)
verdicts = {}
for line in raw_verdicts.split('\n'):
match = re.match(r'(\d+)\s*:\s*(YES|NO)', line.strip().upper())
if match:
verdicts[int(match.group(1)) - 1] = match.group(2) == "YES"
# --- Step C: Scoring & Details ---
verified_count = 0
details = []
for i, claim in enumerate(claims):
is_supported = verdicts.get(i, False)
if is_supported:
verified_count += 1
details.append({
"claim": claim,
"verdict": "Supported" if is_supported else "Not Supported"
})
score = (verified_count / len(claims)) * 100
if self.verbose:
self._print_faithfulness_results(claims, details, score)
return {"score": score, "details": details}
def _verify_claim_against_chunks(self, claim: str, context_list: list[str]) -> bool:
"""Verify a single claim against each chunk individually. Returns True if any chunk supports it."""
def verify_single_chunk(chunk):
prompt = (
f"Context:\n{chunk}\n\n"
f"Claim: {claim}\n\n"
f"Is this claim EXPLICITLY and DIRECTLY stated in the context above? "
f"Do not infer or assume. Respond with YES or NO only."
)
result = self.judge.generate(prompt)
return "YES" in result.upper()
# Use ThreadPoolExecutor for parallel verification
with ThreadPoolExecutor(max_workers=min(len(context_list), 5)) as executor:
futures = [executor.submit(verify_single_chunk, chunk) for chunk in context_list]
for future in as_completed(futures):
if future.result():
return True
return False
# ------------------------------------------------------------------
# 2. RELEVANCY: Alternate Query Generation
# ------------------------------------------------------------------
def evaluate_relevancy(self, query: str, answer: str) -> dict:
if self.verbose:
self._print_relevancy_header()
# --- Step A: Generation ---
# Explicitly ask the judge NOT to rephrase the original query
gen_prompt = (
f"Generate 3 distinct questions that the following answer addresses.\n"
f"Rules:\n"
f"- Do NOT rephrase or repeat this question: '{query}'\n"
f"- Each question must end with a '?'\n"
f"- One question per line, no numbering or bullet points\n\n"
f"Answer: {answer}"
)
raw_gen = self.judge.generate(gen_prompt)
# Filter by length rather than just '?' presence
gen_queries = [
q.strip() for q in raw_gen.split('\n')
if len(q.strip()) > 10
][:3]
if not gen_queries:
return {"score": 0, "queries": []}
# --- Step B: Similarity (single batched encode call) ---
all_vecs = self.encoder.encode([query] + gen_queries)
original_vec = all_vecs[0:1]
generated_vecs = all_vecs[1:]
similarities = cosine_similarity(original_vec, generated_vecs)[0]
avg_score = float(np.mean(similarities))
if self.verbose:
self._print_relevancy_results(query, gen_queries, similarities, avg_score)
return {"score": avg_score, "queries": gen_queries}
# ------------------------------------------------------------------
# 3. DATASET-LEVEL EVALUATION
# ------------------------------------------------------------------
def evaluate_dataset(self, test_cases: list[dict], strict: bool = False) -> dict:
"""
Runs faithfulness + relevancy over a full test set and aggregates results.
Args:
test_cases: List of dicts, each with keys:
- "query": str
- "answer": str
- "contexts": List[str]
strict: If True, passes strict=True to evaluate_faithfulness
(per-chunk verification, more API calls, harder to pass)
Returns:
{
"avg_faithfulness": float,
"avg_relevancy": float,
"per_query": List[dict]
}
"""
faithfulness_scores = []
relevancy_scores = []
per_query = []
for i, case in enumerate(test_cases):
if self.verbose:
print(f"\n{'='*60}")
print(f"Query {i+1}/{len(test_cases)}: {case['query']}")
print('='*60)
f_result = self.evaluate_faithfulness(case['answer'], case['contexts'], strict=strict)
r_result = self.evaluate_relevancy(case['query'], case['answer'])
faithfulness_scores.append(f_result['score'])
relevancy_scores.append(r_result['score'])
per_query.append({
"query": case['query'],
"faithfulness": f_result,
"relevancy": r_result,
})
results = {
"avg_faithfulness": float(np.mean(faithfulness_scores)),
"avg_relevancy": float(np.mean(relevancy_scores)),
"per_query": per_query,
}
if self.verbose:
self._print_dataset_summary(results)
return results
# ------------------------------------------------------------------
# 4. PRINT HELPERS
# ------------------------------------------------------------------
def _print_extraction_header(self, length, strict=False):
mode = "strict per-chunk" if strict else "batch"
print(f"\n[EVAL] Analyzing Faithfulness ({mode})...")
print(f" - Extracting claims from answer ({length} chars)")
def _print_faithfulness_results(self, claims, details, score):
print(f" - Verifying {len(claims)} claims against context...")
for i, detail in enumerate(details):
status = "✅" if "Yes" in detail['verdict'] else "❌"
print(f" {status} Claim {i+1}: {detail['claim'][:75]}...")
print(f" 🎯 Faithfulness Score: {score:.1f}%")
def _print_relevancy_header(self):
print(f"\n[EVAL] Analyzing Relevancy...")
print(f" - Generating 3 distinct questions addressed by the answer")
def _print_relevancy_results(self, query, gen_queries, similarities, avg):
print(f" - Comparing to original query: '{query}'")
for i, (q, sim) in enumerate(zip(gen_queries, similarities)):
print(f" Q{i+1}: {q} (Sim: {sim:.2f})")
print(f" 🎯 Average Relevancy: {avg:.2f}")
def _print_dataset_summary(self, results):
print(f"\n{'='*60}")
print(f" DATASET EVALUATION SUMMARY")
print(f"{'='*60}")
print(f" Avg Faithfulness : {results['avg_faithfulness']:.1f}%")
print(f" Avg Relevancy : {results['avg_relevancy']:.2f}")
print(f" Queries Evaluated: {len(results['per_query'])}")
print(f"{'='*60}")