|
|
""" |
|
|
Evidence Verifier |
|
|
|
|
|
Verifies that claims are supported by document evidence. |
|
|
Cross-references extracted information with source documents. |
|
|
""" |
|
|
|
|
|
from typing import List, Optional, Dict, Any, Tuple |
|
|
from enum import Enum |
|
|
from pydantic import BaseModel, Field |
|
|
from loguru import logger |
|
|
import re |
|
|
|
|
|
|
|
|
class EvidenceStrength(str, Enum): |
|
|
"""Evidence strength levels.""" |
|
|
STRONG = "strong" |
|
|
MODERATE = "moderate" |
|
|
WEAK = "weak" |
|
|
NONE = "none" |
|
|
|
|
|
|
|
|
class VerifierConfig(BaseModel): |
|
|
"""Configuration for evidence verifier.""" |
|
|
|
|
|
fuzzy_match: bool = Field(default=True, description="Enable fuzzy matching") |
|
|
case_sensitive: bool = Field(default=False, description="Case-sensitive matching") |
|
|
min_match_ratio: float = Field( |
|
|
default=0.6, |
|
|
ge=0.0, |
|
|
le=1.0, |
|
|
description="Minimum match ratio for fuzzy matching" |
|
|
) |
|
|
|
|
|
|
|
|
strong_threshold: float = Field(default=0.9, ge=0.0, le=1.0) |
|
|
moderate_threshold: float = Field(default=0.7, ge=0.0, le=1.0) |
|
|
weak_threshold: float = Field(default=0.5, ge=0.0, le=1.0) |
|
|
|
|
|
|
|
|
max_evidence_per_claim: int = Field(default=5, ge=1) |
|
|
context_window: int = Field(default=100, description="Characters around match") |
|
|
|
|
|
|
|
|
class EvidenceMatch(BaseModel): |
|
|
"""A match between claim and evidence.""" |
|
|
evidence_text: str |
|
|
match_score: float |
|
|
strength: EvidenceStrength |
|
|
|
|
|
|
|
|
chunk_id: Optional[str] = None |
|
|
page: Optional[int] = None |
|
|
position: Optional[int] = None |
|
|
|
|
|
|
|
|
context_before: Optional[str] = None |
|
|
context_after: Optional[str] = None |
|
|
|
|
|
|
|
|
class VerificationResult(BaseModel): |
|
|
"""Result of evidence verification.""" |
|
|
claim: str |
|
|
verified: bool |
|
|
strength: EvidenceStrength |
|
|
confidence: float |
|
|
|
|
|
|
|
|
evidence_matches: List[EvidenceMatch] |
|
|
best_match: Optional[EvidenceMatch] = None |
|
|
|
|
|
|
|
|
coverage_score: float |
|
|
contradiction_found: bool = False |
|
|
notes: Optional[str] = None |
|
|
|
|
|
|
|
|
class EvidenceVerifier: |
|
|
""" |
|
|
Verifies claims against document evidence. |
|
|
|
|
|
Features: |
|
|
- Text matching (exact and fuzzy) |
|
|
- Evidence strength scoring |
|
|
- Contradiction detection |
|
|
- Context extraction |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[VerifierConfig] = None): |
|
|
"""Initialize evidence verifier.""" |
|
|
self.config = config or VerifierConfig() |
|
|
|
|
|
def verify_claim( |
|
|
self, |
|
|
claim: str, |
|
|
evidence_chunks: List[Dict[str, Any]], |
|
|
) -> VerificationResult: |
|
|
""" |
|
|
Verify a claim against evidence. |
|
|
|
|
|
Args: |
|
|
claim: The claim to verify |
|
|
evidence_chunks: List of evidence chunks with text |
|
|
|
|
|
Returns: |
|
|
VerificationResult |
|
|
""" |
|
|
if not claim or not evidence_chunks: |
|
|
return VerificationResult( |
|
|
claim=claim, |
|
|
verified=False, |
|
|
strength=EvidenceStrength.NONE, |
|
|
confidence=0.0, |
|
|
evidence_matches=[], |
|
|
coverage_score=0.0, |
|
|
) |
|
|
|
|
|
|
|
|
matches = [] |
|
|
for chunk in evidence_chunks: |
|
|
chunk_text = chunk.get("text", "") |
|
|
if not chunk_text: |
|
|
continue |
|
|
|
|
|
chunk_matches = self._find_matches(claim, chunk_text, chunk) |
|
|
matches.extend(chunk_matches) |
|
|
|
|
|
|
|
|
matches.sort(key=lambda m: m.match_score, reverse=True) |
|
|
top_matches = matches[:self.config.max_evidence_per_claim] |
|
|
|
|
|
|
|
|
if top_matches: |
|
|
best_match = top_matches[0] |
|
|
overall_strength = best_match.strength |
|
|
confidence = best_match.match_score |
|
|
coverage_score = self._calculate_coverage(claim, top_matches) |
|
|
else: |
|
|
best_match = None |
|
|
overall_strength = EvidenceStrength.NONE |
|
|
confidence = 0.0 |
|
|
coverage_score = 0.0 |
|
|
|
|
|
|
|
|
verified = ( |
|
|
overall_strength in [EvidenceStrength.STRONG, EvidenceStrength.MODERATE] |
|
|
and confidence >= self.config.moderate_threshold |
|
|
) |
|
|
|
|
|
|
|
|
contradiction_found = self._check_contradictions(claim, evidence_chunks) |
|
|
|
|
|
return VerificationResult( |
|
|
claim=claim, |
|
|
verified=verified and not contradiction_found, |
|
|
strength=overall_strength, |
|
|
confidence=confidence, |
|
|
evidence_matches=top_matches, |
|
|
best_match=best_match, |
|
|
coverage_score=coverage_score, |
|
|
contradiction_found=contradiction_found, |
|
|
) |
|
|
|
|
|
def verify_multiple( |
|
|
self, |
|
|
claims: List[str], |
|
|
evidence_chunks: List[Dict[str, Any]], |
|
|
) -> List[VerificationResult]: |
|
|
""" |
|
|
Verify multiple claims against evidence. |
|
|
|
|
|
Args: |
|
|
claims: List of claims to verify |
|
|
evidence_chunks: Evidence chunks |
|
|
|
|
|
Returns: |
|
|
List of VerificationResult |
|
|
""" |
|
|
return [self.verify_claim(claim, evidence_chunks) for claim in claims] |
|
|
|
|
|
def verify_extraction( |
|
|
self, |
|
|
extraction: Dict[str, Any], |
|
|
evidence_chunks: List[Dict[str, Any]], |
|
|
) -> Dict[str, VerificationResult]: |
|
|
""" |
|
|
Verify extracted fields as claims. |
|
|
|
|
|
Args: |
|
|
extraction: Dictionary of field -> value |
|
|
evidence_chunks: Evidence chunks |
|
|
|
|
|
Returns: |
|
|
Dictionary of field -> VerificationResult |
|
|
""" |
|
|
results = {} |
|
|
|
|
|
for field, value in extraction.items(): |
|
|
if value is None: |
|
|
continue |
|
|
|
|
|
|
|
|
claim = f"{field}: {value}" |
|
|
results[field] = self.verify_claim(claim, evidence_chunks) |
|
|
|
|
|
return results |
|
|
|
|
|
def _find_matches( |
|
|
self, |
|
|
claim: str, |
|
|
text: str, |
|
|
chunk: Dict[str, Any], |
|
|
) -> List[EvidenceMatch]: |
|
|
"""Find matches for claim in text.""" |
|
|
matches = [] |
|
|
|
|
|
|
|
|
claim_normalized = claim.lower() if not self.config.case_sensitive else claim |
|
|
text_normalized = text.lower() if not self.config.case_sensitive else text |
|
|
|
|
|
|
|
|
terms = self._extract_terms(claim_normalized) |
|
|
|
|
|
|
|
|
if claim_normalized in text_normalized: |
|
|
pos = text_normalized.find(claim_normalized) |
|
|
match = self._create_match( |
|
|
text, pos, len(claim), chunk, |
|
|
score=1.0, strength=EvidenceStrength.STRONG |
|
|
) |
|
|
matches.append(match) |
|
|
|
|
|
|
|
|
term_scores = [] |
|
|
for term in terms: |
|
|
if term in text_normalized: |
|
|
pos = text_normalized.find(term) |
|
|
term_scores.append((term, pos, 1.0)) |
|
|
elif self.config.fuzzy_match: |
|
|
|
|
|
fuzzy_score, fuzzy_pos = self._fuzzy_find(term, text_normalized) |
|
|
if fuzzy_score >= self.config.min_match_ratio: |
|
|
term_scores.append((term, fuzzy_pos, fuzzy_score)) |
|
|
|
|
|
if term_scores: |
|
|
|
|
|
avg_score = sum(s[2] for s in term_scores) / len(terms) if terms else 0 |
|
|
coverage = len(term_scores) / len(terms) if terms else 0 |
|
|
combined_score = (avg_score * 0.7) + (coverage * 0.3) |
|
|
|
|
|
|
|
|
if combined_score >= self.config.strong_threshold: |
|
|
strength = EvidenceStrength.STRONG |
|
|
elif combined_score >= self.config.moderate_threshold: |
|
|
strength = EvidenceStrength.MODERATE |
|
|
elif combined_score >= self.config.weak_threshold: |
|
|
strength = EvidenceStrength.WEAK |
|
|
else: |
|
|
strength = EvidenceStrength.NONE |
|
|
|
|
|
|
|
|
if strength != EvidenceStrength.NONE: |
|
|
best_term = max(term_scores, key=lambda t: t[2]) |
|
|
match = self._create_match( |
|
|
text, best_term[1], len(best_term[0]), chunk, |
|
|
score=combined_score, strength=strength |
|
|
) |
|
|
matches.append(match) |
|
|
|
|
|
return matches |
|
|
|
|
|
def _create_match( |
|
|
self, |
|
|
text: str, |
|
|
position: int, |
|
|
length: int, |
|
|
chunk: Dict[str, Any], |
|
|
score: float, |
|
|
strength: EvidenceStrength, |
|
|
) -> EvidenceMatch: |
|
|
"""Create an evidence match with context.""" |
|
|
|
|
|
window = self.config.context_window |
|
|
start = max(0, position - window) |
|
|
end = min(len(text), position + length + window) |
|
|
|
|
|
context_before = text[start:position] if position > 0 else "" |
|
|
evidence_text = text[position:position + length] |
|
|
context_after = text[position + length:end] if position + length < len(text) else "" |
|
|
|
|
|
return EvidenceMatch( |
|
|
evidence_text=evidence_text, |
|
|
match_score=score, |
|
|
strength=strength, |
|
|
chunk_id=chunk.get("chunk_id"), |
|
|
page=chunk.get("page"), |
|
|
position=position, |
|
|
context_before=context_before[-50:] if context_before else None, |
|
|
context_after=context_after[:50] if context_after else None, |
|
|
) |
|
|
|
|
|
def _extract_terms(self, text: str) -> List[str]: |
|
|
"""Extract key terms from text.""" |
|
|
|
|
|
stop_words = { |
|
|
"the", "a", "an", "is", "are", "was", "were", "be", "been", |
|
|
"being", "have", "has", "had", "do", "does", "did", "will", |
|
|
"would", "could", "should", "may", "might", "must", "shall", |
|
|
"can", "need", "dare", "ought", "used", "to", "of", "in", |
|
|
"for", "on", "with", "at", "by", "from", "as", "into", "through", |
|
|
"during", "before", "after", "above", "below", "between", |
|
|
"and", "but", "if", "or", "because", "until", "while", |
|
|
} |
|
|
|
|
|
|
|
|
words = re.findall(r'\b\w+\b', text.lower()) |
|
|
|
|
|
|
|
|
terms = [w for w in words if w not in stop_words and len(w) > 2] |
|
|
|
|
|
return terms |
|
|
|
|
|
def _fuzzy_find(self, term: str, text: str) -> Tuple[float, int]: |
|
|
"""Find term in text with fuzzy matching.""" |
|
|
|
|
|
best_score = 0.0 |
|
|
best_pos = 0 |
|
|
|
|
|
term_len = len(term) |
|
|
for i in range(len(text) - term_len + 1): |
|
|
window = text[i:i + term_len] |
|
|
|
|
|
matches = sum(1 for a, b in zip(term, window) if a == b) |
|
|
score = matches / term_len |
|
|
|
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_pos = i |
|
|
|
|
|
return best_score, best_pos |
|
|
|
|
|
def _calculate_coverage( |
|
|
self, |
|
|
claim: str, |
|
|
matches: List[EvidenceMatch], |
|
|
) -> float: |
|
|
"""Calculate how much of the claim is covered by evidence.""" |
|
|
claim_terms = set(self._extract_terms(claim.lower())) |
|
|
if not claim_terms: |
|
|
return 0.0 |
|
|
|
|
|
covered_terms = set() |
|
|
for match in matches: |
|
|
match_terms = set(self._extract_terms(match.evidence_text.lower())) |
|
|
covered_terms.update(match_terms.intersection(claim_terms)) |
|
|
|
|
|
return len(covered_terms) / len(claim_terms) |
|
|
|
|
|
def _check_contradictions( |
|
|
self, |
|
|
claim: str, |
|
|
evidence_chunks: List[Dict[str, Any]], |
|
|
) -> bool: |
|
|
"""Check if evidence contains contradictions to the claim.""" |
|
|
|
|
|
negation_patterns = [ |
|
|
r'\bnot\b', r'\bno\b', r'\bnever\b', r'\bnone\b', |
|
|
r'\bwithout\b', r'\bfailed\b', r'\bdenied\b', |
|
|
] |
|
|
|
|
|
claim_lower = claim.lower() |
|
|
claim_terms = set(self._extract_terms(claim_lower)) |
|
|
|
|
|
for chunk in evidence_chunks: |
|
|
text = chunk.get("text", "").lower() |
|
|
|
|
|
|
|
|
for term in claim_terms: |
|
|
if term in text: |
|
|
|
|
|
for pattern in negation_patterns: |
|
|
matches = list(re.finditer(pattern, text)) |
|
|
for match in matches: |
|
|
|
|
|
term_pos = text.find(term) |
|
|
if abs(match.start() - term_pos) < 30: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
_evidence_verifier: Optional[EvidenceVerifier] = None |
|
|
|
|
|
|
|
|
def get_evidence_verifier( |
|
|
config: Optional[VerifierConfig] = None, |
|
|
) -> EvidenceVerifier: |
|
|
"""Get or create singleton evidence verifier.""" |
|
|
global _evidence_verifier |
|
|
if _evidence_verifier is None: |
|
|
_evidence_verifier = EvidenceVerifier(config) |
|
|
return _evidence_verifier |
|
|
|
|
|
|
|
|
def reset_evidence_verifier(): |
|
|
"""Reset the global verifier instance.""" |
|
|
global _evidence_verifier |
|
|
_evidence_verifier = None |
|
|
|