| import re |
| import numpy as np |
| from sklearn.metrics.pairwise import cosine_similarity |
| from openai import OpenAI |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
| |
| |
| |
|
|
| class GroqJudge: |
| def __init__(self, api_key: str, model: str = "deepseek/deepseek-v3.2",): |
| """ |
| Wraps OpenRouter's chat completions to match the .generate(prompt) interface |
| expected by RAGEvaluator. |
| |
| Args: |
| api_key: Your OpenRouter API key (https://openrouter.ai) |
| model: OpenRouter model to use (primary model with fallback support) |
| """ |
| self.client = OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=api_key, |
| ) |
| self.model = model |
| |
| |
| self.fallback_models = [ |
| "deepseek/deepseek-v3.2", |
| "qwen/qwen3.6-plus-preview:free", |
| "stepfun/step-3.5-flash:free", |
| "nvidia/nemotron-3-super-120b-a12b:free", |
| "z-ai/glm-4.5-air:free", |
| "nvidia/nemotron-3-nano-30b-a3b:free", |
| "arcee-ai/trinity-mini:free", |
| "xiaomi/mimo-v2-flash" |
| ] |
|
|
| def generate(self, prompt: str) -> str: |
| """Generate response with fallback support for multiple models.""" |
| last_error = None |
| |
| |
| models_to_try = [self.model] + [m for m in self.fallback_models if m != self.model] |
| |
| for model_name in models_to_try: |
| |
| |
| try: |
| response = self.client.chat.completions.create( |
| model=model_name, |
| messages=[{"role": "user", "content": prompt}], |
| ) |
| content = response.choices[0].message.content |
| if content is None: |
| raise ValueError(f"Model {model_name} returned None content") |
| return content.strip() |
| except Exception as e: |
| last_error = e |
| |
| if "429" in str(e) or "rate_limit" in str(e).lower() or "model" in str(e).lower(): |
| continue |
| |
| raise |
| |
| |
| raise last_error |
|
|
|
|
| |
| |
| |
|
|
| class RAGEvaluator: |
| def __init__(self, judge_model: str, embedding_model, api_key: str, verbose=True): |
| """ |
| judge_model: Model name string passed to OpenRouterJudge, must match cfg.gen['judge_model'] |
| e.g. "stepfun/step-3.5-flash:free", "nvidia/nemotron-3-super-120b-a12b:free" |
| embedding_model: The proc.encoder (SentenceTransformer) for similarity checks |
| api_key: OpenRouter API key (https://openrouter.ai) |
| verbose: If True, prints progress via internal helpers |
| """ |
| self.judge = GroqJudge(api_key=api_key, model=judge_model) |
| self.encoder = embedding_model |
| self.verbose = verbose |
|
|
| |
| |
| |
|
|
| def evaluate_faithfulness(self, answer: str, context_list: list[str], strict: bool = True) -> dict: |
| """ |
| Args: |
| strict: If True, verifies each claim against chunks individually |
| (more API calls but catches vague batch verdicts). |
| If False, uses single batched verification call. |
| """ |
| if self.verbose: |
| self._print_extraction_header(len(answer), strict=strict) |
|
|
| |
| extraction_prompt = ( |
| "Extract a list of independent factual claims from the following answer.\n" |
| "Rules:\n" |
| "- Each claim must be specific and verifiable — include numbers, names, or concrete details where present\n" |
| "- Vague claims like 'the model performs well' or 'this improves results' are NOT acceptable\n" |
| "- Do NOT include claims about what the context does or does not contain\n" |
| "- Do NOT include introductory text, numbering, or bullet points\n" |
| "- Do NOT rephrase or merge claims\n" |
| "- One claim per line only\n\n" |
| f"Answer: {answer}" |
| ) |
| raw_claims = self.judge.generate(extraction_prompt) |
|
|
| |
| claims = [ |
| c.strip() for c in raw_claims.split('\n') |
| if len(c.strip()) > 20 and not c.strip().endswith(':') |
| ] |
|
|
| if not claims: |
| return {"score": 0, "details": []} |
|
|
| |
| if strict: |
| |
| |
| def verify_claim_wrapper(args): |
| i, claim = args |
| return i, self._verify_claim_against_chunks(claim, context_list) |
| |
| with ThreadPoolExecutor(max_workers=min(len(claims), 5)) as executor: |
| futures = [executor.submit(verify_claim_wrapper, (i, claim)) for i, claim in enumerate(claims)] |
| verdicts = {i: result for future in as_completed(futures) for i, result in [future.result()]} |
| else: |
| |
| combined_context = "\n".join(context_list) |
| if len(combined_context) > 6000: |
| combined_context = combined_context[:6000] |
|
|
| claims_formatted = "\n".join([f"{i+1}. {c}" for i, c in enumerate(claims)]) |
|
|
| batch_prompt = ( |
| f"Context:\n{combined_context}\n\n" |
| f"For each claim, respond YES only if the claim is EXPLICITLY and DIRECTLY " |
| f"supported by the context above. Respond NO if the claim is inferred, assumed, " |
| f"or not clearly stated in the context.\n\n" |
| f"Format strictly as:\n" |
| f"1: YES\n" |
| f"2: NO\n\n" |
| f"Claims:\n{claims_formatted}" |
| ) |
| raw_verdicts = self.judge.generate(batch_prompt) |
|
|
| verdicts = {} |
| for line in raw_verdicts.split('\n'): |
| match = re.match(r'(\d+)\s*:\s*(YES|NO)', line.strip().upper()) |
| if match: |
| verdicts[int(match.group(1)) - 1] = match.group(2) == "YES" |
|
|
| |
| verified_count = 0 |
| details = [] |
| for i, claim in enumerate(claims): |
| is_supported = verdicts.get(i, False) |
| if is_supported: |
| verified_count += 1 |
| details.append({ |
| "claim": claim, |
| "verdict": "Supported" if is_supported else "Not Supported" |
| }) |
|
|
| score = (verified_count / len(claims)) * 100 |
|
|
| if self.verbose: |
| self._print_faithfulness_results(claims, details, score) |
|
|
| return {"score": score, "details": details} |
|
|
| def _verify_claim_against_chunks(self, claim: str, context_list: list[str]) -> bool: |
| """Verify a single claim against each chunk individually. Returns True if any chunk supports it.""" |
| def verify_single_chunk(chunk): |
| prompt = ( |
| f"Context:\n{chunk}\n\n" |
| f"Claim: {claim}\n\n" |
| f"Is this claim EXPLICITLY and DIRECTLY stated in the context above? " |
| f"Do not infer or assume. Respond with YES or NO only." |
| ) |
| result = self.judge.generate(prompt) |
| return "YES" in result.upper() |
| |
| |
| with ThreadPoolExecutor(max_workers=min(len(context_list), 5)) as executor: |
| futures = [executor.submit(verify_single_chunk, chunk) for chunk in context_list] |
| for future in as_completed(futures): |
| if future.result(): |
| return True |
| return False |
|
|
| |
| |
| |
|
|
| def evaluate_relevancy(self, query: str, answer: str) -> dict: |
| if self.verbose: |
| self._print_relevancy_header() |
|
|
| |
| |
| gen_prompt = ( |
| f"Generate 3 distinct questions that the following answer addresses.\n" |
| f"Rules:\n" |
| f"- Do NOT rephrase or repeat this question: '{query}'\n" |
| f"- Each question must end with a '?'\n" |
| f"- One question per line, no numbering or bullet points\n\n" |
| f"Answer: {answer}" |
| ) |
| raw_gen = self.judge.generate(gen_prompt) |
|
|
| |
| gen_queries = [ |
| q.strip() for q in raw_gen.split('\n') |
| if len(q.strip()) > 10 |
| ][:3] |
|
|
| if not gen_queries: |
| return {"score": 0, "queries": []} |
|
|
| |
| all_vecs = self.encoder.encode([query] + gen_queries) |
| original_vec = all_vecs[0:1] |
| generated_vecs = all_vecs[1:] |
|
|
| similarities = cosine_similarity(original_vec, generated_vecs)[0] |
| avg_score = float(np.mean(similarities)) |
|
|
| if self.verbose: |
| self._print_relevancy_results(query, gen_queries, similarities, avg_score) |
|
|
| return {"score": avg_score, "queries": gen_queries} |
|
|
| |
| |
| |
|
|
| def evaluate_dataset(self, test_cases: list[dict], strict: bool = False) -> dict: |
| """ |
| Runs faithfulness + relevancy over a full test set and aggregates results. |
| |
| Args: |
| test_cases: List of dicts, each with keys: |
| - "query": str |
| - "answer": str |
| - "contexts": List[str] |
| strict: If True, passes strict=True to evaluate_faithfulness |
| (per-chunk verification, more API calls, harder to pass) |
| |
| Returns: |
| { |
| "avg_faithfulness": float, |
| "avg_relevancy": float, |
| "per_query": List[dict] |
| } |
| """ |
| faithfulness_scores = [] |
| relevancy_scores = [] |
| per_query = [] |
|
|
| for i, case in enumerate(test_cases): |
| if self.verbose: |
| print(f"\n{'='*60}") |
| print(f"Query {i+1}/{len(test_cases)}: {case['query']}") |
| print('='*60) |
|
|
| f_result = self.evaluate_faithfulness(case['answer'], case['contexts'], strict=strict) |
| r_result = self.evaluate_relevancy(case['query'], case['answer']) |
|
|
| faithfulness_scores.append(f_result['score']) |
| relevancy_scores.append(r_result['score']) |
| per_query.append({ |
| "query": case['query'], |
| "faithfulness": f_result, |
| "relevancy": r_result, |
| }) |
|
|
| results = { |
| "avg_faithfulness": float(np.mean(faithfulness_scores)), |
| "avg_relevancy": float(np.mean(relevancy_scores)), |
| "per_query": per_query, |
| } |
|
|
| if self.verbose: |
| self._print_dataset_summary(results) |
|
|
| return results |
|
|
| |
| |
| |
|
|
| def _print_extraction_header(self, length, strict=False): |
| mode = "strict per-chunk" if strict else "batch" |
| print(f"\n[EVAL] Analyzing Faithfulness ({mode})...") |
| print(f" - Extracting claims from answer ({length} chars)") |
|
|
| def _print_faithfulness_results(self, claims, details, score): |
| print(f" - Verifying {len(claims)} claims against context...") |
| for i, detail in enumerate(details): |
| status = "✅" if "Yes" in detail['verdict'] else "❌" |
| print(f" {status} Claim {i+1}: {detail['claim'][:75]}...") |
| print(f" 🎯 Faithfulness Score: {score:.1f}%") |
|
|
| def _print_relevancy_header(self): |
| print(f"\n[EVAL] Analyzing Relevancy...") |
| print(f" - Generating 3 distinct questions addressed by the answer") |
|
|
| def _print_relevancy_results(self, query, gen_queries, similarities, avg): |
| print(f" - Comparing to original query: '{query}'") |
| for i, (q, sim) in enumerate(zip(gen_queries, similarities)): |
| print(f" Q{i+1}: {q} (Sim: {sim:.2f})") |
| print(f" 🎯 Average Relevancy: {avg:.2f}") |
|
|
| def _print_dataset_summary(self, results): |
| print(f"\n{'='*60}") |
| print(f" DATASET EVALUATION SUMMARY") |
| print(f"{'='*60}") |
| print(f" Avg Faithfulness : {results['avg_faithfulness']:.1f}%") |
| print(f" Avg Relevancy : {results['avg_relevancy']:.2f}") |
| print(f" Queries Evaluated: {len(results['per_query'])}") |
| print(f"{'='*60}") |