""" Evidence Building and Management Creates and manages evidence references for extracted data. Links every extraction to its visual source. """ import hashlib from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Union from ..chunks.models import ( BoundingBox, DocumentChunk, EvidenceRef, TableChunk, ChartChunk, ) @dataclass class EvidenceConfig: """Configuration for evidence building.""" # Crop settings crop_enabled: bool = True crop_output_dir: Optional[Path] = None crop_format: str = "png" crop_padding_percent: float = 0.02 # 2% padding around bbox # Evidence settings include_snippet: bool = True max_snippet_length: int = 200 include_context: bool = True context_chars: int = 50 class EvidenceBuilder: """ Builds evidence references for extractions. Creates links between extracted values and their visual sources in the document. """ def __init__(self, config: Optional[EvidenceConfig] = None): self.config = config or EvidenceConfig() self._crop_counter = 0 def create_evidence( self, chunk: DocumentChunk, value: Any, field_name: Optional[str] = None, crop_image: Optional[Any] = None, ) -> EvidenceRef: """ Create an evidence reference from a chunk. Args: chunk: Source chunk value: Extracted value field_name: Optional field name being extracted crop_image: Optional cropped image for this evidence Returns: EvidenceRef linking to the source """ # Generate crop path if image provided crop_path = None if crop_image is not None and self.config.crop_enabled: crop_path = self._save_crop(crop_image, chunk) # Create snippet snippet = self._create_snippet(chunk.text, str(value)) # Determine source type if isinstance(chunk, TableChunk): source_type = "table" elif isinstance(chunk, ChartChunk): source_type = "chart" else: source_type = chunk.chunk_type.value return EvidenceRef( chunk_id=chunk.chunk_id, doc_id=chunk.doc_id, page=chunk.page, bbox=chunk.bbox, source_type=source_type, snippet=snippet, confidence=chunk.confidence, crop_path=crop_path, ) def create_evidence_from_bbox( self, doc_id: str, page: int, bbox: BoundingBox, source_text: str, confidence: float = 1.0, source_type: str = "region", crop_image: Optional[Any] = None, ) -> EvidenceRef: """ Create evidence from a bounding box. Args: doc_id: Document ID page: Page number bbox: Bounding box of evidence source_text: Text content confidence: Confidence score source_type: Type of source (text, table, chart, etc.) crop_image: Optional cropped image Returns: EvidenceRef for the region """ # Generate chunk_id for the region chunk_id = self._generate_region_id(doc_id, page, bbox) # Generate crop path if image provided crop_path = None if crop_image is not None and self.config.crop_enabled: crop_path = self._save_crop_direct( crop_image, doc_id, page, chunk_id, ) return EvidenceRef( chunk_id=chunk_id, doc_id=doc_id, page=page, bbox=bbox, source_type=source_type, snippet=source_text[:self.config.max_snippet_length], confidence=confidence, crop_path=crop_path, ) def create_table_cell_evidence( self, table_chunk: TableChunk, row: int, col: int, crop_image: Optional[Any] = None, ) -> Optional[EvidenceRef]: """ Create evidence for a specific table cell. Args: table_chunk: Source table row: Cell row (0-indexed) col: Cell column (0-indexed) crop_image: Optional cropped cell image Returns: EvidenceRef for the cell, or None if cell not found """ cell = table_chunk.get_cell(row, col) if cell is None: return None cell_id = f"r{row}c{col}" # Generate crop path crop_path = None if crop_image is not None and self.config.crop_enabled: crop_path = self._save_crop_direct( crop_image, table_chunk.doc_id, table_chunk.page, f"{table_chunk.chunk_id}_{cell_id}", ) return EvidenceRef( chunk_id=table_chunk.chunk_id, doc_id=table_chunk.doc_id, page=table_chunk.page, bbox=cell.bbox, source_type="table_cell", snippet=cell.text[:self.config.max_snippet_length], confidence=cell.confidence, cell_id=cell_id, crop_path=crop_path, ) def merge_evidence( self, evidence_list: List[EvidenceRef], ) -> List[EvidenceRef]: """ Merge overlapping evidence references. Combines evidence that refers to the same region. """ if len(evidence_list) <= 1: return evidence_list merged = [] used = set() for i, ev1 in enumerate(evidence_list): if i in used: continue # Find overlapping evidence group = [ev1] for j, ev2 in enumerate(evidence_list[i + 1:], start=i + 1): if j in used: continue if (ev1.doc_id == ev2.doc_id and ev1.page == ev2.page and ev1.bbox.iou(ev2.bbox) > 0.5): group.append(ev2) used.add(j) # Merge group if len(group) == 1: merged.append(ev1) else: merged.append(self._merge_evidence_group(group)) used.add(i) return merged def _merge_evidence_group( self, group: List[EvidenceRef], ) -> EvidenceRef: """Merge a group of overlapping evidence.""" # Take the one with highest confidence best = max(group, key=lambda e: e.confidence) # Merge bounding boxes merged_bbox = BoundingBox( x_min=min(e.bbox.x_min for e in group), y_min=min(e.bbox.y_min for e in group), x_max=max(e.bbox.x_max for e in group), y_max=max(e.bbox.y_max for e in group), normalized=best.bbox.normalized, ) # Combine snippets snippets = list(set(e.snippet for e in group if e.snippet)) combined_snippet = " | ".join(snippets)[:self.config.max_snippet_length] return EvidenceRef( chunk_id=best.chunk_id, doc_id=best.doc_id, page=best.page, bbox=merged_bbox, source_type=best.source_type, snippet=combined_snippet, confidence=max(e.confidence for e in group), cell_id=best.cell_id, crop_path=best.crop_path, ) def _create_snippet( self, full_text: str, value: str, ) -> str: """Create a text snippet highlighting the value.""" if not self.config.include_snippet: return "" # Try to find value in text value_lower = value.lower() text_lower = full_text.lower() idx = text_lower.find(value_lower) if idx >= 0 and self.config.include_context: # Add context around value start = max(0, idx - self.config.context_chars) end = min(len(full_text), idx + len(value) + self.config.context_chars) snippet = full_text[start:end] if start > 0: snippet = "..." + snippet if end < len(full_text): snippet = snippet + "..." return snippet[:self.config.max_snippet_length] # Return start of text return full_text[:self.config.max_snippet_length] def _generate_region_id( self, doc_id: str, page: int, bbox: BoundingBox, ) -> str: """Generate a stable ID for a region.""" content = f"{doc_id}_{page}_{bbox.xyxy}" return hashlib.md5(content.encode()).hexdigest()[:16] def _save_crop( self, image: Any, chunk: DocumentChunk, ) -> Optional[str]: """Save a crop image for a chunk.""" return self._save_crop_direct( image, chunk.doc_id, chunk.page, chunk.chunk_id, ) def _save_crop_direct( self, image: Any, doc_id: str, page: int, identifier: str, ) -> Optional[str]: """Save a crop image directly.""" if self.config.crop_output_dir is None: return None try: from PIL import Image import numpy as np # Convert to PIL if needed if isinstance(image, np.ndarray): pil_image = Image.fromarray(image) elif isinstance(image, Image.Image): pil_image = image else: return None # Create output path output_dir = Path(self.config.crop_output_dir) output_dir.mkdir(parents=True, exist_ok=True) filename = f"{doc_id}_{page}_{identifier}.{self.config.crop_format}" output_path = output_dir / filename pil_image.save(output_path) return str(output_path) except Exception: return None class EvidenceTracker: """ Tracks evidence references during extraction. Maintains a collection of evidence and provides methods for querying and validation. """ def __init__(self): self._evidence: List[EvidenceRef] = [] self._by_field: Dict[str, List[EvidenceRef]] = {} self._by_chunk: Dict[str, List[EvidenceRef]] = {} def add( self, evidence: EvidenceRef, field_name: Optional[str] = None, ) -> None: """Add an evidence reference.""" self._evidence.append(evidence) # Index by chunk if evidence.chunk_id not in self._by_chunk: self._by_chunk[evidence.chunk_id] = [] self._by_chunk[evidence.chunk_id].append(evidence) # Index by field if field_name: if field_name not in self._by_field: self._by_field[field_name] = [] self._by_field[field_name].append(evidence) def get_all(self) -> List[EvidenceRef]: """Get all evidence references.""" return self._evidence.copy() def get_for_field(self, field_name: str) -> List[EvidenceRef]: """Get evidence for a specific field.""" return self._by_field.get(field_name, []).copy() def get_for_chunk(self, chunk_id: str) -> List[EvidenceRef]: """Get evidence from a specific chunk.""" return self._by_chunk.get(chunk_id, []).copy() def get_by_page(self, page: int) -> List[EvidenceRef]: """Get evidence from a specific page.""" return [e for e in self._evidence if e.page == page] def get_high_confidence(self, threshold: float = 0.8) -> List[EvidenceRef]: """Get evidence above confidence threshold.""" return [e for e in self._evidence if e.confidence >= threshold] def validate_field( self, field_name: str, min_evidence: int = 1, min_confidence: float = 0.5, ) -> bool: """ Validate that a field has sufficient evidence. Args: field_name: Field to validate min_evidence: Minimum number of evidence references min_confidence: Minimum confidence score Returns: True if field has sufficient evidence """ field_evidence = self.get_for_field(field_name) if len(field_evidence) < min_evidence: return False # Check confidence max_confidence = max((e.confidence for e in field_evidence), default=0) return max_confidence >= min_confidence def clear(self) -> None: """Clear all evidence.""" self._evidence = [] self._by_field = {} self._by_chunk = {}