|
|
""" |
|
|
Evidence Building and Management |
|
|
|
|
|
Creates and manages evidence references for extracted data. |
|
|
Links every extraction to its visual source. |
|
|
""" |
|
|
|
|
|
import hashlib |
|
|
from dataclasses import dataclass, field |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
|
|
from ..chunks.models import ( |
|
|
BoundingBox, |
|
|
DocumentChunk, |
|
|
EvidenceRef, |
|
|
TableChunk, |
|
|
ChartChunk, |
|
|
) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class EvidenceConfig: |
|
|
"""Configuration for evidence building.""" |
|
|
|
|
|
|
|
|
crop_enabled: bool = True |
|
|
crop_output_dir: Optional[Path] = None |
|
|
crop_format: str = "png" |
|
|
crop_padding_percent: float = 0.02 |
|
|
|
|
|
|
|
|
include_snippet: bool = True |
|
|
max_snippet_length: int = 200 |
|
|
include_context: bool = True |
|
|
context_chars: int = 50 |
|
|
|
|
|
|
|
|
class EvidenceBuilder: |
|
|
""" |
|
|
Builds evidence references for extractions. |
|
|
|
|
|
Creates links between extracted values and their |
|
|
visual sources in the document. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[EvidenceConfig] = None): |
|
|
self.config = config or EvidenceConfig() |
|
|
self._crop_counter = 0 |
|
|
|
|
|
def create_evidence( |
|
|
self, |
|
|
chunk: DocumentChunk, |
|
|
value: Any, |
|
|
field_name: Optional[str] = None, |
|
|
crop_image: Optional[Any] = None, |
|
|
) -> EvidenceRef: |
|
|
""" |
|
|
Create an evidence reference from a chunk. |
|
|
|
|
|
Args: |
|
|
chunk: Source chunk |
|
|
value: Extracted value |
|
|
field_name: Optional field name being extracted |
|
|
crop_image: Optional cropped image for this evidence |
|
|
|
|
|
Returns: |
|
|
EvidenceRef linking to the source |
|
|
""" |
|
|
|
|
|
crop_path = None |
|
|
if crop_image is not None and self.config.crop_enabled: |
|
|
crop_path = self._save_crop(crop_image, chunk) |
|
|
|
|
|
|
|
|
snippet = self._create_snippet(chunk.text, str(value)) |
|
|
|
|
|
|
|
|
if isinstance(chunk, TableChunk): |
|
|
source_type = "table" |
|
|
elif isinstance(chunk, ChartChunk): |
|
|
source_type = "chart" |
|
|
else: |
|
|
source_type = chunk.chunk_type.value |
|
|
|
|
|
return EvidenceRef( |
|
|
chunk_id=chunk.chunk_id, |
|
|
doc_id=chunk.doc_id, |
|
|
page=chunk.page, |
|
|
bbox=chunk.bbox, |
|
|
source_type=source_type, |
|
|
snippet=snippet, |
|
|
confidence=chunk.confidence, |
|
|
crop_path=crop_path, |
|
|
) |
|
|
|
|
|
def create_evidence_from_bbox( |
|
|
self, |
|
|
doc_id: str, |
|
|
page: int, |
|
|
bbox: BoundingBox, |
|
|
source_text: str, |
|
|
confidence: float = 1.0, |
|
|
source_type: str = "region", |
|
|
crop_image: Optional[Any] = None, |
|
|
) -> EvidenceRef: |
|
|
""" |
|
|
Create evidence from a bounding box. |
|
|
|
|
|
Args: |
|
|
doc_id: Document ID |
|
|
page: Page number |
|
|
bbox: Bounding box of evidence |
|
|
source_text: Text content |
|
|
confidence: Confidence score |
|
|
source_type: Type of source (text, table, chart, etc.) |
|
|
crop_image: Optional cropped image |
|
|
|
|
|
Returns: |
|
|
EvidenceRef for the region |
|
|
""" |
|
|
|
|
|
chunk_id = self._generate_region_id(doc_id, page, bbox) |
|
|
|
|
|
|
|
|
crop_path = None |
|
|
if crop_image is not None and self.config.crop_enabled: |
|
|
crop_path = self._save_crop_direct( |
|
|
crop_image, |
|
|
doc_id, |
|
|
page, |
|
|
chunk_id, |
|
|
) |
|
|
|
|
|
return EvidenceRef( |
|
|
chunk_id=chunk_id, |
|
|
doc_id=doc_id, |
|
|
page=page, |
|
|
bbox=bbox, |
|
|
source_type=source_type, |
|
|
snippet=source_text[:self.config.max_snippet_length], |
|
|
confidence=confidence, |
|
|
crop_path=crop_path, |
|
|
) |
|
|
|
|
|
def create_table_cell_evidence( |
|
|
self, |
|
|
table_chunk: TableChunk, |
|
|
row: int, |
|
|
col: int, |
|
|
crop_image: Optional[Any] = None, |
|
|
) -> Optional[EvidenceRef]: |
|
|
""" |
|
|
Create evidence for a specific table cell. |
|
|
|
|
|
Args: |
|
|
table_chunk: Source table |
|
|
row: Cell row (0-indexed) |
|
|
col: Cell column (0-indexed) |
|
|
crop_image: Optional cropped cell image |
|
|
|
|
|
Returns: |
|
|
EvidenceRef for the cell, or None if cell not found |
|
|
""" |
|
|
cell = table_chunk.get_cell(row, col) |
|
|
if cell is None: |
|
|
return None |
|
|
|
|
|
cell_id = f"r{row}c{col}" |
|
|
|
|
|
|
|
|
crop_path = None |
|
|
if crop_image is not None and self.config.crop_enabled: |
|
|
crop_path = self._save_crop_direct( |
|
|
crop_image, |
|
|
table_chunk.doc_id, |
|
|
table_chunk.page, |
|
|
f"{table_chunk.chunk_id}_{cell_id}", |
|
|
) |
|
|
|
|
|
return EvidenceRef( |
|
|
chunk_id=table_chunk.chunk_id, |
|
|
doc_id=table_chunk.doc_id, |
|
|
page=table_chunk.page, |
|
|
bbox=cell.bbox, |
|
|
source_type="table_cell", |
|
|
snippet=cell.text[:self.config.max_snippet_length], |
|
|
confidence=cell.confidence, |
|
|
cell_id=cell_id, |
|
|
crop_path=crop_path, |
|
|
) |
|
|
|
|
|
def merge_evidence( |
|
|
self, |
|
|
evidence_list: List[EvidenceRef], |
|
|
) -> List[EvidenceRef]: |
|
|
""" |
|
|
Merge overlapping evidence references. |
|
|
|
|
|
Combines evidence that refers to the same region. |
|
|
""" |
|
|
if len(evidence_list) <= 1: |
|
|
return evidence_list |
|
|
|
|
|
merged = [] |
|
|
used = set() |
|
|
|
|
|
for i, ev1 in enumerate(evidence_list): |
|
|
if i in used: |
|
|
continue |
|
|
|
|
|
|
|
|
group = [ev1] |
|
|
for j, ev2 in enumerate(evidence_list[i + 1:], start=i + 1): |
|
|
if j in used: |
|
|
continue |
|
|
|
|
|
if (ev1.doc_id == ev2.doc_id and |
|
|
ev1.page == ev2.page and |
|
|
ev1.bbox.iou(ev2.bbox) > 0.5): |
|
|
group.append(ev2) |
|
|
used.add(j) |
|
|
|
|
|
|
|
|
if len(group) == 1: |
|
|
merged.append(ev1) |
|
|
else: |
|
|
merged.append(self._merge_evidence_group(group)) |
|
|
|
|
|
used.add(i) |
|
|
|
|
|
return merged |
|
|
|
|
|
def _merge_evidence_group( |
|
|
self, |
|
|
group: List[EvidenceRef], |
|
|
) -> EvidenceRef: |
|
|
"""Merge a group of overlapping evidence.""" |
|
|
|
|
|
best = max(group, key=lambda e: e.confidence) |
|
|
|
|
|
|
|
|
merged_bbox = BoundingBox( |
|
|
x_min=min(e.bbox.x_min for e in group), |
|
|
y_min=min(e.bbox.y_min for e in group), |
|
|
x_max=max(e.bbox.x_max for e in group), |
|
|
y_max=max(e.bbox.y_max for e in group), |
|
|
normalized=best.bbox.normalized, |
|
|
) |
|
|
|
|
|
|
|
|
snippets = list(set(e.snippet for e in group if e.snippet)) |
|
|
combined_snippet = " | ".join(snippets)[:self.config.max_snippet_length] |
|
|
|
|
|
return EvidenceRef( |
|
|
chunk_id=best.chunk_id, |
|
|
doc_id=best.doc_id, |
|
|
page=best.page, |
|
|
bbox=merged_bbox, |
|
|
source_type=best.source_type, |
|
|
snippet=combined_snippet, |
|
|
confidence=max(e.confidence for e in group), |
|
|
cell_id=best.cell_id, |
|
|
crop_path=best.crop_path, |
|
|
) |
|
|
|
|
|
def _create_snippet( |
|
|
self, |
|
|
full_text: str, |
|
|
value: str, |
|
|
) -> str: |
|
|
"""Create a text snippet highlighting the value.""" |
|
|
if not self.config.include_snippet: |
|
|
return "" |
|
|
|
|
|
|
|
|
value_lower = value.lower() |
|
|
text_lower = full_text.lower() |
|
|
|
|
|
idx = text_lower.find(value_lower) |
|
|
if idx >= 0 and self.config.include_context: |
|
|
|
|
|
start = max(0, idx - self.config.context_chars) |
|
|
end = min(len(full_text), idx + len(value) + self.config.context_chars) |
|
|
|
|
|
snippet = full_text[start:end] |
|
|
if start > 0: |
|
|
snippet = "..." + snippet |
|
|
if end < len(full_text): |
|
|
snippet = snippet + "..." |
|
|
|
|
|
return snippet[:self.config.max_snippet_length] |
|
|
|
|
|
|
|
|
return full_text[:self.config.max_snippet_length] |
|
|
|
|
|
def _generate_region_id( |
|
|
self, |
|
|
doc_id: str, |
|
|
page: int, |
|
|
bbox: BoundingBox, |
|
|
) -> str: |
|
|
"""Generate a stable ID for a region.""" |
|
|
content = f"{doc_id}_{page}_{bbox.xyxy}" |
|
|
return hashlib.md5(content.encode()).hexdigest()[:16] |
|
|
|
|
|
def _save_crop( |
|
|
self, |
|
|
image: Any, |
|
|
chunk: DocumentChunk, |
|
|
) -> Optional[str]: |
|
|
"""Save a crop image for a chunk.""" |
|
|
return self._save_crop_direct( |
|
|
image, |
|
|
chunk.doc_id, |
|
|
chunk.page, |
|
|
chunk.chunk_id, |
|
|
) |
|
|
|
|
|
def _save_crop_direct( |
|
|
self, |
|
|
image: Any, |
|
|
doc_id: str, |
|
|
page: int, |
|
|
identifier: str, |
|
|
) -> Optional[str]: |
|
|
"""Save a crop image directly.""" |
|
|
if self.config.crop_output_dir is None: |
|
|
return None |
|
|
|
|
|
try: |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
if isinstance(image, np.ndarray): |
|
|
pil_image = Image.fromarray(image) |
|
|
elif isinstance(image, Image.Image): |
|
|
pil_image = image |
|
|
else: |
|
|
return None |
|
|
|
|
|
|
|
|
output_dir = Path(self.config.crop_output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
filename = f"{doc_id}_{page}_{identifier}.{self.config.crop_format}" |
|
|
output_path = output_dir / filename |
|
|
|
|
|
pil_image.save(output_path) |
|
|
return str(output_path) |
|
|
|
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
class EvidenceTracker: |
|
|
""" |
|
|
Tracks evidence references during extraction. |
|
|
|
|
|
Maintains a collection of evidence and provides |
|
|
methods for querying and validation. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._evidence: List[EvidenceRef] = [] |
|
|
self._by_field: Dict[str, List[EvidenceRef]] = {} |
|
|
self._by_chunk: Dict[str, List[EvidenceRef]] = {} |
|
|
|
|
|
def add( |
|
|
self, |
|
|
evidence: EvidenceRef, |
|
|
field_name: Optional[str] = None, |
|
|
) -> None: |
|
|
"""Add an evidence reference.""" |
|
|
self._evidence.append(evidence) |
|
|
|
|
|
|
|
|
if evidence.chunk_id not in self._by_chunk: |
|
|
self._by_chunk[evidence.chunk_id] = [] |
|
|
self._by_chunk[evidence.chunk_id].append(evidence) |
|
|
|
|
|
|
|
|
if field_name: |
|
|
if field_name not in self._by_field: |
|
|
self._by_field[field_name] = [] |
|
|
self._by_field[field_name].append(evidence) |
|
|
|
|
|
def get_all(self) -> List[EvidenceRef]: |
|
|
"""Get all evidence references.""" |
|
|
return self._evidence.copy() |
|
|
|
|
|
def get_for_field(self, field_name: str) -> List[EvidenceRef]: |
|
|
"""Get evidence for a specific field.""" |
|
|
return self._by_field.get(field_name, []).copy() |
|
|
|
|
|
def get_for_chunk(self, chunk_id: str) -> List[EvidenceRef]: |
|
|
"""Get evidence from a specific chunk.""" |
|
|
return self._by_chunk.get(chunk_id, []).copy() |
|
|
|
|
|
def get_by_page(self, page: int) -> List[EvidenceRef]: |
|
|
"""Get evidence from a specific page.""" |
|
|
return [e for e in self._evidence if e.page == page] |
|
|
|
|
|
def get_high_confidence(self, threshold: float = 0.8) -> List[EvidenceRef]: |
|
|
"""Get evidence above confidence threshold.""" |
|
|
return [e for e in self._evidence if e.confidence >= threshold] |
|
|
|
|
|
def validate_field( |
|
|
self, |
|
|
field_name: str, |
|
|
min_evidence: int = 1, |
|
|
min_confidence: float = 0.5, |
|
|
) -> bool: |
|
|
""" |
|
|
Validate that a field has sufficient evidence. |
|
|
|
|
|
Args: |
|
|
field_name: Field to validate |
|
|
min_evidence: Minimum number of evidence references |
|
|
min_confidence: Minimum confidence score |
|
|
|
|
|
Returns: |
|
|
True if field has sufficient evidence |
|
|
""" |
|
|
field_evidence = self.get_for_field(field_name) |
|
|
|
|
|
if len(field_evidence) < min_evidence: |
|
|
return False |
|
|
|
|
|
|
|
|
max_confidence = max((e.confidence for e in field_evidence), default=0) |
|
|
return max_confidence >= min_confidence |
|
|
|
|
|
def clear(self) -> None: |
|
|
"""Clear all evidence.""" |
|
|
self._evidence = [] |
|
|
self._by_field = {} |
|
|
self._by_chunk = {} |
|
|
|