File size: 13,324 Bytes
d520909
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
"""
Evidence Verifier

Verifies that claims are supported by document evidence.
Cross-references extracted information with source documents.
"""

from typing import List, Optional, Dict, Any, Tuple
from enum import Enum
from pydantic import BaseModel, Field
from loguru import logger
import re


class EvidenceStrength(str, Enum):
    """Evidence strength levels."""
    STRONG = "strong"  # Directly quoted/stated
    MODERATE = "moderate"  # Implied or paraphrased
    WEAK = "weak"  # Tangentially related
    NONE = "none"  # No supporting evidence


class VerifierConfig(BaseModel):
    """Configuration for evidence verifier."""
    # Matching settings
    fuzzy_match: bool = Field(default=True, description="Enable fuzzy matching")
    case_sensitive: bool = Field(default=False, description="Case-sensitive matching")
    min_match_ratio: float = Field(
        default=0.6,
        ge=0.0,
        le=1.0,
        description="Minimum match ratio for fuzzy matching"
    )

    # Scoring
    strong_threshold: float = Field(default=0.9, ge=0.0, le=1.0)
    moderate_threshold: float = Field(default=0.7, ge=0.0, le=1.0)
    weak_threshold: float = Field(default=0.5, ge=0.0, le=1.0)

    # Processing
    max_evidence_per_claim: int = Field(default=5, ge=1)
    context_window: int = Field(default=100, description="Characters around match")


class EvidenceMatch(BaseModel):
    """A match between claim and evidence."""
    evidence_text: str
    match_score: float
    strength: EvidenceStrength

    # Location
    chunk_id: Optional[str] = None
    page: Optional[int] = None
    position: Optional[int] = None

    # Context
    context_before: Optional[str] = None
    context_after: Optional[str] = None


class VerificationResult(BaseModel):
    """Result of evidence verification."""
    claim: str
    verified: bool
    strength: EvidenceStrength
    confidence: float

    # Evidence
    evidence_matches: List[EvidenceMatch]
    best_match: Optional[EvidenceMatch] = None

    # Analysis
    coverage_score: float  # How much of claim is covered
    contradiction_found: bool = False
    notes: Optional[str] = None


class EvidenceVerifier:
    """
    Verifies claims against document evidence.

    Features:
    - Text matching (exact and fuzzy)
    - Evidence strength scoring
    - Contradiction detection
    - Context extraction
    """

    def __init__(self, config: Optional[VerifierConfig] = None):
        """Initialize evidence verifier."""
        self.config = config or VerifierConfig()

    def verify_claim(
        self,
        claim: str,
        evidence_chunks: List[Dict[str, Any]],
    ) -> VerificationResult:
        """
        Verify a claim against evidence.

        Args:
            claim: The claim to verify
            evidence_chunks: List of evidence chunks with text

        Returns:
            VerificationResult
        """
        if not claim or not evidence_chunks:
            return VerificationResult(
                claim=claim,
                verified=False,
                strength=EvidenceStrength.NONE,
                confidence=0.0,
                evidence_matches=[],
                coverage_score=0.0,
            )

        # Find matches in evidence
        matches = []
        for chunk in evidence_chunks:
            chunk_text = chunk.get("text", "")
            if not chunk_text:
                continue

            chunk_matches = self._find_matches(claim, chunk_text, chunk)
            matches.extend(chunk_matches)

        # Sort by score and take top matches
        matches.sort(key=lambda m: m.match_score, reverse=True)
        top_matches = matches[:self.config.max_evidence_per_claim]

        # Calculate overall scores
        if top_matches:
            best_match = top_matches[0]
            overall_strength = best_match.strength
            confidence = best_match.match_score
            coverage_score = self._calculate_coverage(claim, top_matches)
        else:
            best_match = None
            overall_strength = EvidenceStrength.NONE
            confidence = 0.0
            coverage_score = 0.0

        # Determine verification status
        verified = (
            overall_strength in [EvidenceStrength.STRONG, EvidenceStrength.MODERATE]
            and confidence >= self.config.moderate_threshold
        )

        # Check for contradictions
        contradiction_found = self._check_contradictions(claim, evidence_chunks)

        return VerificationResult(
            claim=claim,
            verified=verified and not contradiction_found,
            strength=overall_strength,
            confidence=confidence,
            evidence_matches=top_matches,
            best_match=best_match,
            coverage_score=coverage_score,
            contradiction_found=contradiction_found,
        )

    def verify_multiple(
        self,
        claims: List[str],
        evidence_chunks: List[Dict[str, Any]],
    ) -> List[VerificationResult]:
        """
        Verify multiple claims against evidence.

        Args:
            claims: List of claims to verify
            evidence_chunks: Evidence chunks

        Returns:
            List of VerificationResult
        """
        return [self.verify_claim(claim, evidence_chunks) for claim in claims]

    def verify_extraction(
        self,
        extraction: Dict[str, Any],
        evidence_chunks: List[Dict[str, Any]],
    ) -> Dict[str, VerificationResult]:
        """
        Verify extracted fields as claims.

        Args:
            extraction: Dictionary of field -> value
            evidence_chunks: Evidence chunks

        Returns:
            Dictionary of field -> VerificationResult
        """
        results = {}

        for field, value in extraction.items():
            if value is None:
                continue

            # Convert to claim
            claim = f"{field}: {value}"
            results[field] = self.verify_claim(claim, evidence_chunks)

        return results

    def _find_matches(
        self,
        claim: str,
        text: str,
        chunk: Dict[str, Any],
    ) -> List[EvidenceMatch]:
        """Find matches for claim in text."""
        matches = []

        # Normalize texts
        claim_normalized = claim.lower() if not self.config.case_sensitive else claim
        text_normalized = text.lower() if not self.config.case_sensitive else text

        # Extract key terms from claim
        terms = self._extract_terms(claim_normalized)

        # Try exact substring match
        if claim_normalized in text_normalized:
            pos = text_normalized.find(claim_normalized)
            match = self._create_match(
                text, pos, len(claim), chunk,
                score=1.0, strength=EvidenceStrength.STRONG
            )
            matches.append(match)

        # Try term matching
        term_scores = []
        for term in terms:
            if term in text_normalized:
                pos = text_normalized.find(term)
                term_scores.append((term, pos, 1.0))
            elif self.config.fuzzy_match:
                # Try fuzzy match
                fuzzy_score, fuzzy_pos = self._fuzzy_find(term, text_normalized)
                if fuzzy_score >= self.config.min_match_ratio:
                    term_scores.append((term, fuzzy_pos, fuzzy_score))

        if term_scores:
            # Calculate combined score
            avg_score = sum(s[2] for s in term_scores) / len(terms) if terms else 0
            coverage = len(term_scores) / len(terms) if terms else 0
            combined_score = (avg_score * 0.7) + (coverage * 0.3)

            # Determine strength
            if combined_score >= self.config.strong_threshold:
                strength = EvidenceStrength.STRONG
            elif combined_score >= self.config.moderate_threshold:
                strength = EvidenceStrength.MODERATE
            elif combined_score >= self.config.weak_threshold:
                strength = EvidenceStrength.WEAK
            else:
                strength = EvidenceStrength.NONE

            # Create match at first term position
            if strength != EvidenceStrength.NONE:
                best_term = max(term_scores, key=lambda t: t[2])
                match = self._create_match(
                    text, best_term[1], len(best_term[0]), chunk,
                    score=combined_score, strength=strength
                )
                matches.append(match)

        return matches

    def _create_match(
        self,
        text: str,
        position: int,
        length: int,
        chunk: Dict[str, Any],
        score: float,
        strength: EvidenceStrength,
    ) -> EvidenceMatch:
        """Create an evidence match with context."""
        # Extract context
        window = self.config.context_window
        start = max(0, position - window)
        end = min(len(text), position + length + window)

        context_before = text[start:position] if position > 0 else ""
        evidence_text = text[position:position + length]
        context_after = text[position + length:end] if position + length < len(text) else ""

        return EvidenceMatch(
            evidence_text=evidence_text,
            match_score=score,
            strength=strength,
            chunk_id=chunk.get("chunk_id"),
            page=chunk.get("page"),
            position=position,
            context_before=context_before[-50:] if context_before else None,
            context_after=context_after[:50] if context_after else None,
        )

    def _extract_terms(self, text: str) -> List[str]:
        """Extract key terms from text."""
        # Remove common stop words and punctuation
        stop_words = {
            "the", "a", "an", "is", "are", "was", "were", "be", "been",
            "being", "have", "has", "had", "do", "does", "did", "will",
            "would", "could", "should", "may", "might", "must", "shall",
            "can", "need", "dare", "ought", "used", "to", "of", "in",
            "for", "on", "with", "at", "by", "from", "as", "into", "through",
            "during", "before", "after", "above", "below", "between",
            "and", "but", "if", "or", "because", "until", "while",
        }

        # Tokenize
        words = re.findall(r'\b\w+\b', text.lower())

        # Filter
        terms = [w for w in words if w not in stop_words and len(w) > 2]

        return terms

    def _fuzzy_find(self, term: str, text: str) -> Tuple[float, int]:
        """Find term in text with fuzzy matching."""
        # Simple sliding window match
        best_score = 0.0
        best_pos = 0

        term_len = len(term)
        for i in range(len(text) - term_len + 1):
            window = text[i:i + term_len]
            # Calculate simple match ratio
            matches = sum(1 for a, b in zip(term, window) if a == b)
            score = matches / term_len

            if score > best_score:
                best_score = score
                best_pos = i

        return best_score, best_pos

    def _calculate_coverage(
        self,
        claim: str,
        matches: List[EvidenceMatch],
    ) -> float:
        """Calculate how much of the claim is covered by evidence."""
        claim_terms = set(self._extract_terms(claim.lower()))
        if not claim_terms:
            return 0.0

        covered_terms = set()
        for match in matches:
            match_terms = set(self._extract_terms(match.evidence_text.lower()))
            covered_terms.update(match_terms.intersection(claim_terms))

        return len(covered_terms) / len(claim_terms)

    def _check_contradictions(
        self,
        claim: str,
        evidence_chunks: List[Dict[str, Any]],
    ) -> bool:
        """Check if evidence contains contradictions to the claim."""
        # Simple negation patterns
        negation_patterns = [
            r'\bnot\b', r'\bno\b', r'\bnever\b', r'\bnone\b',
            r'\bwithout\b', r'\bfailed\b', r'\bdenied\b',
        ]

        claim_lower = claim.lower()
        claim_terms = set(self._extract_terms(claim_lower))

        for chunk in evidence_chunks:
            text = chunk.get("text", "").lower()

            # Check if evidence has claim terms with negation
            for term in claim_terms:
                if term in text:
                    # Check for nearby negation
                    for pattern in negation_patterns:
                        matches = list(re.finditer(pattern, text))
                        for match in matches:
                            # Check if negation is near the term
                            term_pos = text.find(term)
                            if abs(match.start() - term_pos) < 30:
                                return True

        return False


# Global instance and factory
_evidence_verifier: Optional[EvidenceVerifier] = None


def get_evidence_verifier(
    config: Optional[VerifierConfig] = None,
) -> EvidenceVerifier:
    """Get or create singleton evidence verifier."""
    global _evidence_verifier
    if _evidence_verifier is None:
        _evidence_verifier = EvidenceVerifier(config)
    return _evidence_verifier


def reset_evidence_verifier():
    """Reset the global verifier instance."""
    global _evidence_verifier
    _evidence_verifier = None