File size: 13,324 Bytes
d520909 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
"""
Evidence Verifier
Verifies that claims are supported by document evidence.
Cross-references extracted information with source documents.
"""
from typing import List, Optional, Dict, Any, Tuple
from enum import Enum
from pydantic import BaseModel, Field
from loguru import logger
import re
class EvidenceStrength(str, Enum):
"""Evidence strength levels."""
STRONG = "strong" # Directly quoted/stated
MODERATE = "moderate" # Implied or paraphrased
WEAK = "weak" # Tangentially related
NONE = "none" # No supporting evidence
class VerifierConfig(BaseModel):
"""Configuration for evidence verifier."""
# Matching settings
fuzzy_match: bool = Field(default=True, description="Enable fuzzy matching")
case_sensitive: bool = Field(default=False, description="Case-sensitive matching")
min_match_ratio: float = Field(
default=0.6,
ge=0.0,
le=1.0,
description="Minimum match ratio for fuzzy matching"
)
# Scoring
strong_threshold: float = Field(default=0.9, ge=0.0, le=1.0)
moderate_threshold: float = Field(default=0.7, ge=0.0, le=1.0)
weak_threshold: float = Field(default=0.5, ge=0.0, le=1.0)
# Processing
max_evidence_per_claim: int = Field(default=5, ge=1)
context_window: int = Field(default=100, description="Characters around match")
class EvidenceMatch(BaseModel):
"""A match between claim and evidence."""
evidence_text: str
match_score: float
strength: EvidenceStrength
# Location
chunk_id: Optional[str] = None
page: Optional[int] = None
position: Optional[int] = None
# Context
context_before: Optional[str] = None
context_after: Optional[str] = None
class VerificationResult(BaseModel):
"""Result of evidence verification."""
claim: str
verified: bool
strength: EvidenceStrength
confidence: float
# Evidence
evidence_matches: List[EvidenceMatch]
best_match: Optional[EvidenceMatch] = None
# Analysis
coverage_score: float # How much of claim is covered
contradiction_found: bool = False
notes: Optional[str] = None
class EvidenceVerifier:
"""
Verifies claims against document evidence.
Features:
- Text matching (exact and fuzzy)
- Evidence strength scoring
- Contradiction detection
- Context extraction
"""
def __init__(self, config: Optional[VerifierConfig] = None):
"""Initialize evidence verifier."""
self.config = config or VerifierConfig()
def verify_claim(
self,
claim: str,
evidence_chunks: List[Dict[str, Any]],
) -> VerificationResult:
"""
Verify a claim against evidence.
Args:
claim: The claim to verify
evidence_chunks: List of evidence chunks with text
Returns:
VerificationResult
"""
if not claim or not evidence_chunks:
return VerificationResult(
claim=claim,
verified=False,
strength=EvidenceStrength.NONE,
confidence=0.0,
evidence_matches=[],
coverage_score=0.0,
)
# Find matches in evidence
matches = []
for chunk in evidence_chunks:
chunk_text = chunk.get("text", "")
if not chunk_text:
continue
chunk_matches = self._find_matches(claim, chunk_text, chunk)
matches.extend(chunk_matches)
# Sort by score and take top matches
matches.sort(key=lambda m: m.match_score, reverse=True)
top_matches = matches[:self.config.max_evidence_per_claim]
# Calculate overall scores
if top_matches:
best_match = top_matches[0]
overall_strength = best_match.strength
confidence = best_match.match_score
coverage_score = self._calculate_coverage(claim, top_matches)
else:
best_match = None
overall_strength = EvidenceStrength.NONE
confidence = 0.0
coverage_score = 0.0
# Determine verification status
verified = (
overall_strength in [EvidenceStrength.STRONG, EvidenceStrength.MODERATE]
and confidence >= self.config.moderate_threshold
)
# Check for contradictions
contradiction_found = self._check_contradictions(claim, evidence_chunks)
return VerificationResult(
claim=claim,
verified=verified and not contradiction_found,
strength=overall_strength,
confidence=confidence,
evidence_matches=top_matches,
best_match=best_match,
coverage_score=coverage_score,
contradiction_found=contradiction_found,
)
def verify_multiple(
self,
claims: List[str],
evidence_chunks: List[Dict[str, Any]],
) -> List[VerificationResult]:
"""
Verify multiple claims against evidence.
Args:
claims: List of claims to verify
evidence_chunks: Evidence chunks
Returns:
List of VerificationResult
"""
return [self.verify_claim(claim, evidence_chunks) for claim in claims]
def verify_extraction(
self,
extraction: Dict[str, Any],
evidence_chunks: List[Dict[str, Any]],
) -> Dict[str, VerificationResult]:
"""
Verify extracted fields as claims.
Args:
extraction: Dictionary of field -> value
evidence_chunks: Evidence chunks
Returns:
Dictionary of field -> VerificationResult
"""
results = {}
for field, value in extraction.items():
if value is None:
continue
# Convert to claim
claim = f"{field}: {value}"
results[field] = self.verify_claim(claim, evidence_chunks)
return results
def _find_matches(
self,
claim: str,
text: str,
chunk: Dict[str, Any],
) -> List[EvidenceMatch]:
"""Find matches for claim in text."""
matches = []
# Normalize texts
claim_normalized = claim.lower() if not self.config.case_sensitive else claim
text_normalized = text.lower() if not self.config.case_sensitive else text
# Extract key terms from claim
terms = self._extract_terms(claim_normalized)
# Try exact substring match
if claim_normalized in text_normalized:
pos = text_normalized.find(claim_normalized)
match = self._create_match(
text, pos, len(claim), chunk,
score=1.0, strength=EvidenceStrength.STRONG
)
matches.append(match)
# Try term matching
term_scores = []
for term in terms:
if term in text_normalized:
pos = text_normalized.find(term)
term_scores.append((term, pos, 1.0))
elif self.config.fuzzy_match:
# Try fuzzy match
fuzzy_score, fuzzy_pos = self._fuzzy_find(term, text_normalized)
if fuzzy_score >= self.config.min_match_ratio:
term_scores.append((term, fuzzy_pos, fuzzy_score))
if term_scores:
# Calculate combined score
avg_score = sum(s[2] for s in term_scores) / len(terms) if terms else 0
coverage = len(term_scores) / len(terms) if terms else 0
combined_score = (avg_score * 0.7) + (coverage * 0.3)
# Determine strength
if combined_score >= self.config.strong_threshold:
strength = EvidenceStrength.STRONG
elif combined_score >= self.config.moderate_threshold:
strength = EvidenceStrength.MODERATE
elif combined_score >= self.config.weak_threshold:
strength = EvidenceStrength.WEAK
else:
strength = EvidenceStrength.NONE
# Create match at first term position
if strength != EvidenceStrength.NONE:
best_term = max(term_scores, key=lambda t: t[2])
match = self._create_match(
text, best_term[1], len(best_term[0]), chunk,
score=combined_score, strength=strength
)
matches.append(match)
return matches
def _create_match(
self,
text: str,
position: int,
length: int,
chunk: Dict[str, Any],
score: float,
strength: EvidenceStrength,
) -> EvidenceMatch:
"""Create an evidence match with context."""
# Extract context
window = self.config.context_window
start = max(0, position - window)
end = min(len(text), position + length + window)
context_before = text[start:position] if position > 0 else ""
evidence_text = text[position:position + length]
context_after = text[position + length:end] if position + length < len(text) else ""
return EvidenceMatch(
evidence_text=evidence_text,
match_score=score,
strength=strength,
chunk_id=chunk.get("chunk_id"),
page=chunk.get("page"),
position=position,
context_before=context_before[-50:] if context_before else None,
context_after=context_after[:50] if context_after else None,
)
def _extract_terms(self, text: str) -> List[str]:
"""Extract key terms from text."""
# Remove common stop words and punctuation
stop_words = {
"the", "a", "an", "is", "are", "was", "were", "be", "been",
"being", "have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "must", "shall",
"can", "need", "dare", "ought", "used", "to", "of", "in",
"for", "on", "with", "at", "by", "from", "as", "into", "through",
"during", "before", "after", "above", "below", "between",
"and", "but", "if", "or", "because", "until", "while",
}
# Tokenize
words = re.findall(r'\b\w+\b', text.lower())
# Filter
terms = [w for w in words if w not in stop_words and len(w) > 2]
return terms
def _fuzzy_find(self, term: str, text: str) -> Tuple[float, int]:
"""Find term in text with fuzzy matching."""
# Simple sliding window match
best_score = 0.0
best_pos = 0
term_len = len(term)
for i in range(len(text) - term_len + 1):
window = text[i:i + term_len]
# Calculate simple match ratio
matches = sum(1 for a, b in zip(term, window) if a == b)
score = matches / term_len
if score > best_score:
best_score = score
best_pos = i
return best_score, best_pos
def _calculate_coverage(
self,
claim: str,
matches: List[EvidenceMatch],
) -> float:
"""Calculate how much of the claim is covered by evidence."""
claim_terms = set(self._extract_terms(claim.lower()))
if not claim_terms:
return 0.0
covered_terms = set()
for match in matches:
match_terms = set(self._extract_terms(match.evidence_text.lower()))
covered_terms.update(match_terms.intersection(claim_terms))
return len(covered_terms) / len(claim_terms)
def _check_contradictions(
self,
claim: str,
evidence_chunks: List[Dict[str, Any]],
) -> bool:
"""Check if evidence contains contradictions to the claim."""
# Simple negation patterns
negation_patterns = [
r'\bnot\b', r'\bno\b', r'\bnever\b', r'\bnone\b',
r'\bwithout\b', r'\bfailed\b', r'\bdenied\b',
]
claim_lower = claim.lower()
claim_terms = set(self._extract_terms(claim_lower))
for chunk in evidence_chunks:
text = chunk.get("text", "").lower()
# Check if evidence has claim terms with negation
for term in claim_terms:
if term in text:
# Check for nearby negation
for pattern in negation_patterns:
matches = list(re.finditer(pattern, text))
for match in matches:
# Check if negation is near the term
term_pos = text.find(term)
if abs(match.start() - term_pos) < 30:
return True
return False
# Global instance and factory
_evidence_verifier: Optional[EvidenceVerifier] = None
def get_evidence_verifier(
config: Optional[VerifierConfig] = None,
) -> EvidenceVerifier:
"""Get or create singleton evidence verifier."""
global _evidence_verifier
if _evidence_verifier is None:
_evidence_verifier = EvidenceVerifier(config)
return _evidence_verifier
def reset_evidence_verifier():
"""Reset the global verifier instance."""
global _evidence_verifier
_evidence_verifier = None
|