MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Evidence Verifier
Verifies that claims are supported by document evidence.
Cross-references extracted information with source documents.
"""
from typing import List, Optional, Dict, Any, Tuple
from enum import Enum
from pydantic import BaseModel, Field
from loguru import logger
import re
class EvidenceStrength(str, Enum):
"""Evidence strength levels."""
STRONG = "strong" # Directly quoted/stated
MODERATE = "moderate" # Implied or paraphrased
WEAK = "weak" # Tangentially related
NONE = "none" # No supporting evidence
class VerifierConfig(BaseModel):
"""Configuration for evidence verifier."""
# Matching settings
fuzzy_match: bool = Field(default=True, description="Enable fuzzy matching")
case_sensitive: bool = Field(default=False, description="Case-sensitive matching")
min_match_ratio: float = Field(
default=0.6,
ge=0.0,
le=1.0,
description="Minimum match ratio for fuzzy matching"
)
# Scoring
strong_threshold: float = Field(default=0.9, ge=0.0, le=1.0)
moderate_threshold: float = Field(default=0.7, ge=0.0, le=1.0)
weak_threshold: float = Field(default=0.5, ge=0.0, le=1.0)
# Processing
max_evidence_per_claim: int = Field(default=5, ge=1)
context_window: int = Field(default=100, description="Characters around match")
class EvidenceMatch(BaseModel):
"""A match between claim and evidence."""
evidence_text: str
match_score: float
strength: EvidenceStrength
# Location
chunk_id: Optional[str] = None
page: Optional[int] = None
position: Optional[int] = None
# Context
context_before: Optional[str] = None
context_after: Optional[str] = None
class VerificationResult(BaseModel):
"""Result of evidence verification."""
claim: str
verified: bool
strength: EvidenceStrength
confidence: float
# Evidence
evidence_matches: List[EvidenceMatch]
best_match: Optional[EvidenceMatch] = None
# Analysis
coverage_score: float # How much of claim is covered
contradiction_found: bool = False
notes: Optional[str] = None
class EvidenceVerifier:
"""
Verifies claims against document evidence.
Features:
- Text matching (exact and fuzzy)
- Evidence strength scoring
- Contradiction detection
- Context extraction
"""
def __init__(self, config: Optional[VerifierConfig] = None):
"""Initialize evidence verifier."""
self.config = config or VerifierConfig()
def verify_claim(
self,
claim: str,
evidence_chunks: List[Dict[str, Any]],
) -> VerificationResult:
"""
Verify a claim against evidence.
Args:
claim: The claim to verify
evidence_chunks: List of evidence chunks with text
Returns:
VerificationResult
"""
if not claim or not evidence_chunks:
return VerificationResult(
claim=claim,
verified=False,
strength=EvidenceStrength.NONE,
confidence=0.0,
evidence_matches=[],
coverage_score=0.0,
)
# Find matches in evidence
matches = []
for chunk in evidence_chunks:
chunk_text = chunk.get("text", "")
if not chunk_text:
continue
chunk_matches = self._find_matches(claim, chunk_text, chunk)
matches.extend(chunk_matches)
# Sort by score and take top matches
matches.sort(key=lambda m: m.match_score, reverse=True)
top_matches = matches[:self.config.max_evidence_per_claim]
# Calculate overall scores
if top_matches:
best_match = top_matches[0]
overall_strength = best_match.strength
confidence = best_match.match_score
coverage_score = self._calculate_coverage(claim, top_matches)
else:
best_match = None
overall_strength = EvidenceStrength.NONE
confidence = 0.0
coverage_score = 0.0
# Determine verification status
verified = (
overall_strength in [EvidenceStrength.STRONG, EvidenceStrength.MODERATE]
and confidence >= self.config.moderate_threshold
)
# Check for contradictions
contradiction_found = self._check_contradictions(claim, evidence_chunks)
return VerificationResult(
claim=claim,
verified=verified and not contradiction_found,
strength=overall_strength,
confidence=confidence,
evidence_matches=top_matches,
best_match=best_match,
coverage_score=coverage_score,
contradiction_found=contradiction_found,
)
def verify_multiple(
self,
claims: List[str],
evidence_chunks: List[Dict[str, Any]],
) -> List[VerificationResult]:
"""
Verify multiple claims against evidence.
Args:
claims: List of claims to verify
evidence_chunks: Evidence chunks
Returns:
List of VerificationResult
"""
return [self.verify_claim(claim, evidence_chunks) for claim in claims]
def verify_extraction(
self,
extraction: Dict[str, Any],
evidence_chunks: List[Dict[str, Any]],
) -> Dict[str, VerificationResult]:
"""
Verify extracted fields as claims.
Args:
extraction: Dictionary of field -> value
evidence_chunks: Evidence chunks
Returns:
Dictionary of field -> VerificationResult
"""
results = {}
for field, value in extraction.items():
if value is None:
continue
# Convert to claim
claim = f"{field}: {value}"
results[field] = self.verify_claim(claim, evidence_chunks)
return results
def _find_matches(
self,
claim: str,
text: str,
chunk: Dict[str, Any],
) -> List[EvidenceMatch]:
"""Find matches for claim in text."""
matches = []
# Normalize texts
claim_normalized = claim.lower() if not self.config.case_sensitive else claim
text_normalized = text.lower() if not self.config.case_sensitive else text
# Extract key terms from claim
terms = self._extract_terms(claim_normalized)
# Try exact substring match
if claim_normalized in text_normalized:
pos = text_normalized.find(claim_normalized)
match = self._create_match(
text, pos, len(claim), chunk,
score=1.0, strength=EvidenceStrength.STRONG
)
matches.append(match)
# Try term matching
term_scores = []
for term in terms:
if term in text_normalized:
pos = text_normalized.find(term)
term_scores.append((term, pos, 1.0))
elif self.config.fuzzy_match:
# Try fuzzy match
fuzzy_score, fuzzy_pos = self._fuzzy_find(term, text_normalized)
if fuzzy_score >= self.config.min_match_ratio:
term_scores.append((term, fuzzy_pos, fuzzy_score))
if term_scores:
# Calculate combined score
avg_score = sum(s[2] for s in term_scores) / len(terms) if terms else 0
coverage = len(term_scores) / len(terms) if terms else 0
combined_score = (avg_score * 0.7) + (coverage * 0.3)
# Determine strength
if combined_score >= self.config.strong_threshold:
strength = EvidenceStrength.STRONG
elif combined_score >= self.config.moderate_threshold:
strength = EvidenceStrength.MODERATE
elif combined_score >= self.config.weak_threshold:
strength = EvidenceStrength.WEAK
else:
strength = EvidenceStrength.NONE
# Create match at first term position
if strength != EvidenceStrength.NONE:
best_term = max(term_scores, key=lambda t: t[2])
match = self._create_match(
text, best_term[1], len(best_term[0]), chunk,
score=combined_score, strength=strength
)
matches.append(match)
return matches
def _create_match(
self,
text: str,
position: int,
length: int,
chunk: Dict[str, Any],
score: float,
strength: EvidenceStrength,
) -> EvidenceMatch:
"""Create an evidence match with context."""
# Extract context
window = self.config.context_window
start = max(0, position - window)
end = min(len(text), position + length + window)
context_before = text[start:position] if position > 0 else ""
evidence_text = text[position:position + length]
context_after = text[position + length:end] if position + length < len(text) else ""
return EvidenceMatch(
evidence_text=evidence_text,
match_score=score,
strength=strength,
chunk_id=chunk.get("chunk_id"),
page=chunk.get("page"),
position=position,
context_before=context_before[-50:] if context_before else None,
context_after=context_after[:50] if context_after else None,
)
def _extract_terms(self, text: str) -> List[str]:
"""Extract key terms from text."""
# Remove common stop words and punctuation
stop_words = {
"the", "a", "an", "is", "are", "was", "were", "be", "been",
"being", "have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "must", "shall",
"can", "need", "dare", "ought", "used", "to", "of", "in",
"for", "on", "with", "at", "by", "from", "as", "into", "through",
"during", "before", "after", "above", "below", "between",
"and", "but", "if", "or", "because", "until", "while",
}
# Tokenize
words = re.findall(r'\b\w+\b', text.lower())
# Filter
terms = [w for w in words if w not in stop_words and len(w) > 2]
return terms
def _fuzzy_find(self, term: str, text: str) -> Tuple[float, int]:
"""Find term in text with fuzzy matching."""
# Simple sliding window match
best_score = 0.0
best_pos = 0
term_len = len(term)
for i in range(len(text) - term_len + 1):
window = text[i:i + term_len]
# Calculate simple match ratio
matches = sum(1 for a, b in zip(term, window) if a == b)
score = matches / term_len
if score > best_score:
best_score = score
best_pos = i
return best_score, best_pos
def _calculate_coverage(
self,
claim: str,
matches: List[EvidenceMatch],
) -> float:
"""Calculate how much of the claim is covered by evidence."""
claim_terms = set(self._extract_terms(claim.lower()))
if not claim_terms:
return 0.0
covered_terms = set()
for match in matches:
match_terms = set(self._extract_terms(match.evidence_text.lower()))
covered_terms.update(match_terms.intersection(claim_terms))
return len(covered_terms) / len(claim_terms)
def _check_contradictions(
self,
claim: str,
evidence_chunks: List[Dict[str, Any]],
) -> bool:
"""Check if evidence contains contradictions to the claim."""
# Simple negation patterns
negation_patterns = [
r'\bnot\b', r'\bno\b', r'\bnever\b', r'\bnone\b',
r'\bwithout\b', r'\bfailed\b', r'\bdenied\b',
]
claim_lower = claim.lower()
claim_terms = set(self._extract_terms(claim_lower))
for chunk in evidence_chunks:
text = chunk.get("text", "").lower()
# Check if evidence has claim terms with negation
for term in claim_terms:
if term in text:
# Check for nearby negation
for pattern in negation_patterns:
matches = list(re.finditer(pattern, text))
for match in matches:
# Check if negation is near the term
term_pos = text.find(term)
if abs(match.start() - term_pos) < 30:
return True
return False
# Global instance and factory
_evidence_verifier: Optional[EvidenceVerifier] = None
def get_evidence_verifier(
config: Optional[VerifierConfig] = None,
) -> EvidenceVerifier:
"""Get or create singleton evidence verifier."""
global _evidence_verifier
if _evidence_verifier is None:
_evidence_verifier = EvidenceVerifier(config)
return _evidence_verifier
def reset_evidence_verifier():
"""Reset the global verifier instance."""
global _evidence_verifier
_evidence_verifier = None