|
|
""" |
|
|
Core Data Models for Document Intelligence |
|
|
|
|
|
Comprehensive Pydantic models for: |
|
|
- Bounding boxes and spatial data |
|
|
- Document chunks (text, table, chart, form fields) |
|
|
- Evidence references for grounding |
|
|
- Parse results and document metadata |
|
|
|
|
|
Design principles: |
|
|
- Vision-first: treat documents as visual objects |
|
|
- Grounding: every extraction has evidence pointers |
|
|
- Stable IDs: reproducible, hash-based chunk identifiers |
|
|
- Schema-compatible: JSON export/import, Pydantic validation |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import hashlib |
|
|
import json |
|
|
from datetime import datetime |
|
|
from enum import Enum |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
|
|
from pydantic import BaseModel, Field, field_validator, model_validator |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BoundingBox(BaseModel): |
|
|
""" |
|
|
Bounding box in XYXY format (x_min, y_min, x_max, y_max). |
|
|
|
|
|
Supports both pixel coordinates and normalized (0-1) coordinates. |
|
|
All spatial grounding uses this as the standard format. |
|
|
""" |
|
|
x_min: float = Field(..., description="Left edge (x1)") |
|
|
y_min: float = Field(..., description="Top edge (y1)") |
|
|
x_max: float = Field(..., description="Right edge (x2)") |
|
|
y_max: float = Field(..., description="Bottom edge (y2)") |
|
|
|
|
|
|
|
|
normalized: bool = Field(default=False, description="True if 0-1 normalized") |
|
|
page_width: Optional[int] = Field(default=None, description="Page width in pixels") |
|
|
page_height: Optional[int] = Field(default=None, description="Page height in pixels") |
|
|
|
|
|
@field_validator('x_max') |
|
|
@classmethod |
|
|
def validate_x_max(cls, v, info): |
|
|
if 'x_min' in info.data and v < info.data['x_min']: |
|
|
raise ValueError('x_max must be >= x_min') |
|
|
return v |
|
|
|
|
|
@field_validator('y_max') |
|
|
@classmethod |
|
|
def validate_y_max(cls, v, info): |
|
|
if 'y_min' in info.data and v < info.data['y_min']: |
|
|
raise ValueError('y_max must be >= y_min') |
|
|
return v |
|
|
|
|
|
@property |
|
|
def width(self) -> float: |
|
|
return self.x_max - self.x_min |
|
|
|
|
|
@property |
|
|
def height(self) -> float: |
|
|
return self.y_max - self.y_min |
|
|
|
|
|
@property |
|
|
def area(self) -> float: |
|
|
return self.width * self.height |
|
|
|
|
|
@property |
|
|
def center(self) -> Tuple[float, float]: |
|
|
return ((self.x_min + self.x_max) / 2, (self.y_min + self.y_max) / 2) |
|
|
|
|
|
@property |
|
|
def xyxy(self) -> Tuple[float, float, float, float]: |
|
|
"""Return as (x_min, y_min, x_max, y_max).""" |
|
|
return (self.x_min, self.y_min, self.x_max, self.y_max) |
|
|
|
|
|
@property |
|
|
def xywh(self) -> Tuple[float, float, float, float]: |
|
|
"""Return as (x, y, width, height).""" |
|
|
return (self.x_min, self.y_min, self.width, self.height) |
|
|
|
|
|
def to_pixel(self, width: int, height: int) -> BoundingBox: |
|
|
"""Convert to pixel coordinates.""" |
|
|
if not self.normalized: |
|
|
return self |
|
|
return BoundingBox( |
|
|
x_min=int(self.x_min * width), |
|
|
y_min=int(self.y_min * height), |
|
|
x_max=int(self.x_max * width), |
|
|
y_max=int(self.y_max * height), |
|
|
normalized=False, |
|
|
page_width=width, |
|
|
page_height=height, |
|
|
) |
|
|
|
|
|
def to_normalized(self, width: int, height: int) -> BoundingBox: |
|
|
"""Convert to normalized (0-1) coordinates.""" |
|
|
if self.normalized: |
|
|
return self |
|
|
return BoundingBox( |
|
|
x_min=self.x_min / width, |
|
|
y_min=self.y_min / height, |
|
|
x_max=self.x_max / width, |
|
|
y_max=self.y_max / height, |
|
|
normalized=True, |
|
|
page_width=width, |
|
|
page_height=height, |
|
|
) |
|
|
|
|
|
def iou(self, other: BoundingBox) -> float: |
|
|
"""Calculate Intersection over Union.""" |
|
|
x1 = max(self.x_min, other.x_min) |
|
|
y1 = max(self.y_min, other.y_min) |
|
|
x2 = min(self.x_max, other.x_max) |
|
|
y2 = min(self.y_max, other.y_max) |
|
|
|
|
|
if x2 < x1 or y2 < y1: |
|
|
return 0.0 |
|
|
|
|
|
intersection = (x2 - x1) * (y2 - y1) |
|
|
union = self.area + other.area - intersection |
|
|
return intersection / union if union > 0 else 0.0 |
|
|
|
|
|
def contains(self, other: BoundingBox) -> bool: |
|
|
"""Check if this bbox fully contains another.""" |
|
|
return ( |
|
|
self.x_min <= other.x_min and |
|
|
self.y_min <= other.y_min and |
|
|
self.x_max >= other.x_max and |
|
|
self.y_max >= other.y_max |
|
|
) |
|
|
|
|
|
def expand(self, margin: float) -> BoundingBox: |
|
|
"""Expand bbox by margin pixels.""" |
|
|
return BoundingBox( |
|
|
x_min=max(0, self.x_min - margin), |
|
|
y_min=max(0, self.y_min - margin), |
|
|
x_max=self.x_max + margin, |
|
|
y_max=self.y_max + margin, |
|
|
normalized=self.normalized, |
|
|
page_width=self.page_width, |
|
|
page_height=self.page_height, |
|
|
) |
|
|
|
|
|
def clip(self, max_width: float, max_height: float) -> BoundingBox: |
|
|
"""Clip bbox to image boundaries.""" |
|
|
return BoundingBox( |
|
|
x_min=max(0, self.x_min), |
|
|
y_min=max(0, self.y_min), |
|
|
x_max=min(max_width, self.x_max), |
|
|
y_max=min(max_height, self.y_max), |
|
|
normalized=self.normalized, |
|
|
page_width=self.page_width, |
|
|
page_height=self.page_height, |
|
|
) |
|
|
|
|
|
@classmethod |
|
|
def from_xyxy(cls, xyxy: Tuple[float, float, float, float], **kwargs) -> BoundingBox: |
|
|
"""Create from (x_min, y_min, x_max, y_max) tuple.""" |
|
|
return cls(x_min=xyxy[0], y_min=xyxy[1], x_max=xyxy[2], y_max=xyxy[3], **kwargs) |
|
|
|
|
|
@classmethod |
|
|
def from_xywh(cls, xywh: Tuple[float, float, float, float], **kwargs) -> BoundingBox: |
|
|
"""Create from (x, y, width, height) tuple.""" |
|
|
x, y, w, h = xywh |
|
|
return cls(x_min=x, y_min=y, x_max=x + w, y_max=y + h, **kwargs) |
|
|
|
|
|
def __hash__(self): |
|
|
return hash((self.x_min, self.y_min, self.x_max, self.y_max)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChunkType(str, Enum): |
|
|
""" |
|
|
Semantic chunk types for document segmentation. |
|
|
|
|
|
Covers text, tables, figures, charts, forms, and structural elements. |
|
|
Used for routing chunks to specialized extraction logic. |
|
|
""" |
|
|
|
|
|
TEXT = "text" |
|
|
TITLE = "title" |
|
|
HEADING = "heading" |
|
|
PARAGRAPH = "paragraph" |
|
|
LIST = "list" |
|
|
LIST_ITEM = "list_item" |
|
|
|
|
|
|
|
|
TABLE = "table" |
|
|
TABLE_CELL = "table_cell" |
|
|
FIGURE = "figure" |
|
|
CHART = "chart" |
|
|
FORMULA = "formula" |
|
|
CODE = "code" |
|
|
|
|
|
|
|
|
FORM_FIELD = "form_field" |
|
|
CHECKBOX = "checkbox" |
|
|
SIGNATURE = "signature" |
|
|
STAMP = "stamp" |
|
|
HANDWRITING = "handwriting" |
|
|
|
|
|
|
|
|
HEADER = "header" |
|
|
FOOTER = "footer" |
|
|
PAGE_NUMBER = "page_number" |
|
|
CAPTION = "caption" |
|
|
FOOTNOTE = "footnote" |
|
|
WATERMARK = "watermark" |
|
|
LOGO = "logo" |
|
|
|
|
|
|
|
|
METADATA = "metadata" |
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
|
|
|
class ConfidenceLevel(str, Enum): |
|
|
"""Confidence level classification.""" |
|
|
HIGH = "high" |
|
|
MEDIUM = "medium" |
|
|
LOW = "low" |
|
|
VERY_LOW = "very_low" |
|
|
|
|
|
@classmethod |
|
|
def from_score(cls, score: float) -> ConfidenceLevel: |
|
|
if score >= 0.9: |
|
|
return cls.HIGH |
|
|
elif score >= 0.7: |
|
|
return cls.MEDIUM |
|
|
elif score >= 0.5: |
|
|
return cls.LOW |
|
|
else: |
|
|
return cls.VERY_LOW |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DocumentChunk(BaseModel): |
|
|
""" |
|
|
Base document chunk with text and grounding evidence. |
|
|
|
|
|
This is the fundamental unit for retrieval and extraction. |
|
|
Every chunk has: |
|
|
- Stable, reproducible chunk_id (hash-based) |
|
|
- Precise spatial grounding (page, bbox) |
|
|
- Confidence score for quality assessment |
|
|
""" |
|
|
|
|
|
chunk_id: str = Field(..., description="Unique, stable chunk identifier") |
|
|
doc_id: str = Field(..., description="Parent document identifier") |
|
|
|
|
|
|
|
|
chunk_type: ChunkType = Field(..., description="Semantic type") |
|
|
text: str = Field(..., description="Text content") |
|
|
|
|
|
|
|
|
page: int = Field(..., ge=0, description="Zero-indexed page number") |
|
|
bbox: BoundingBox = Field(..., description="Bounding box on page") |
|
|
|
|
|
|
|
|
confidence: float = Field(default=1.0, ge=0.0, le=1.0, description="Extraction confidence") |
|
|
|
|
|
|
|
|
sequence_index: int = Field(default=0, ge=0, description="Position in reading order") |
|
|
|
|
|
|
|
|
source_path: Optional[str] = Field(default=None, description="Original file path") |
|
|
|
|
|
|
|
|
parent_id: Optional[str] = Field(default=None, description="Parent chunk ID") |
|
|
children_ids: List[str] = Field(default_factory=list, description="Child chunk IDs") |
|
|
|
|
|
|
|
|
caption: Optional[str] = Field(default=None, description="Caption if applicable") |
|
|
|
|
|
|
|
|
warnings: List[str] = Field(default_factory=list, description="Quality warnings") |
|
|
|
|
|
|
|
|
extra: Dict[str, Any] = Field(default_factory=dict, description="Type-specific metadata") |
|
|
|
|
|
|
|
|
embedding: Optional[List[float]] = Field(default=None, exclude=True) |
|
|
|
|
|
@property |
|
|
def confidence_level(self) -> ConfidenceLevel: |
|
|
return ConfidenceLevel.from_score(self.confidence) |
|
|
|
|
|
@property |
|
|
def needs_review(self) -> bool: |
|
|
"""Check if chunk needs human review.""" |
|
|
return self.confidence < 0.7 or len(self.warnings) > 0 |
|
|
|
|
|
def content_hash(self) -> str: |
|
|
"""Generate hash of chunk content for deduplication.""" |
|
|
content = f"{self.doc_id}:{self.page}:{self.chunk_type.value}:{self.text[:200]}" |
|
|
return hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
|
|
|
@staticmethod |
|
|
def generate_chunk_id( |
|
|
doc_id: str, |
|
|
page: int, |
|
|
bbox: BoundingBox, |
|
|
chunk_type: ChunkType, |
|
|
) -> str: |
|
|
""" |
|
|
Generate a stable, reproducible chunk ID. |
|
|
|
|
|
Uses hash of (doc_id, page, bbox, type) for reproducibility. |
|
|
""" |
|
|
bbox_str = f"{bbox.x_min:.2f},{bbox.y_min:.2f},{bbox.x_max:.2f},{bbox.y_max:.2f}" |
|
|
content = f"{doc_id}:p{page}:{bbox_str}:{chunk_type.value}" |
|
|
return hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
|
|
|
def to_retrieval_metadata(self) -> Dict[str, Any]: |
|
|
"""Convert to metadata dict for vector store.""" |
|
|
return { |
|
|
"chunk_id": self.chunk_id, |
|
|
"doc_id": self.doc_id, |
|
|
"chunk_type": self.chunk_type.value, |
|
|
"page": self.page, |
|
|
"bbox_xyxy": list(self.bbox.xyxy), |
|
|
"confidence": self.confidence, |
|
|
"sequence_index": self.sequence_index, |
|
|
"source_path": self.source_path, |
|
|
} |
|
|
|
|
|
def __hash__(self): |
|
|
return hash(self.chunk_id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TableCell(BaseModel): |
|
|
"""A single cell in a table.""" |
|
|
cell_id: str = Field(..., description="Unique cell identifier") |
|
|
row: int = Field(..., ge=0, description="Row index (0-based)") |
|
|
col: int = Field(..., ge=0, description="Column index (0-based)") |
|
|
text: str = Field(default="", description="Cell text content") |
|
|
bbox: Optional[BoundingBox] = Field(default=None, description="Cell bounding box") |
|
|
|
|
|
|
|
|
rowspan: int = Field(default=1, ge=1, description="Number of rows spanned") |
|
|
colspan: int = Field(default=1, ge=1, description="Number of columns spanned") |
|
|
|
|
|
|
|
|
is_header: bool = Field(default=False, description="Is header cell") |
|
|
|
|
|
confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
|
|
|
|
|
|
|
|
class TableChunk(DocumentChunk): |
|
|
""" |
|
|
Specialized chunk for tables with structured cell data. |
|
|
|
|
|
Preserves row/column structure and supports merged cells. |
|
|
""" |
|
|
chunk_type: ChunkType = Field(default=ChunkType.TABLE) |
|
|
|
|
|
|
|
|
cells: List[TableCell] = Field(default_factory=list, description="All table cells") |
|
|
num_rows: int = Field(default=0, ge=0, description="Number of rows") |
|
|
num_cols: int = Field(default=0, ge=0, description="Number of columns") |
|
|
|
|
|
|
|
|
header_rows: List[int] = Field(default_factory=list, description="Header row indices") |
|
|
header_cols: List[int] = Field(default_factory=list, description="Header column indices") |
|
|
|
|
|
|
|
|
has_merged_cells: bool = Field(default=False) |
|
|
table_title: Optional[str] = Field(default=None) |
|
|
|
|
|
def get_cell(self, row: int, col: int) -> Optional[TableCell]: |
|
|
"""Get cell at specific position.""" |
|
|
for cell in self.cells: |
|
|
if cell.row == row and cell.col == col: |
|
|
return cell |
|
|
|
|
|
if (cell.row <= row < cell.row + cell.rowspan and |
|
|
cell.col <= col < cell.col + cell.colspan): |
|
|
return cell |
|
|
return None |
|
|
|
|
|
def get_row(self, row: int) -> List[TableCell]: |
|
|
"""Get all cells in a row.""" |
|
|
return [c for c in self.cells if c.row == row] |
|
|
|
|
|
def get_column(self, col: int) -> List[TableCell]: |
|
|
"""Get all cells in a column.""" |
|
|
return [c for c in self.cells if c.col == col] |
|
|
|
|
|
def to_csv(self) -> str: |
|
|
"""Export table to CSV format.""" |
|
|
import io |
|
|
import csv |
|
|
|
|
|
output = io.StringIO() |
|
|
writer = csv.writer(output) |
|
|
|
|
|
for row_idx in range(self.num_rows): |
|
|
row_data = [] |
|
|
for col_idx in range(self.num_cols): |
|
|
cell = self.get_cell(row_idx, col_idx) |
|
|
row_data.append(cell.text if cell else "") |
|
|
writer.writerow(row_data) |
|
|
|
|
|
return output.getvalue() |
|
|
|
|
|
def to_markdown(self) -> str: |
|
|
"""Export table to Markdown format.""" |
|
|
lines = [] |
|
|
|
|
|
for row_idx in range(self.num_rows): |
|
|
row_cells = [] |
|
|
for col_idx in range(self.num_cols): |
|
|
cell = self.get_cell(row_idx, col_idx) |
|
|
row_cells.append(cell.text if cell else "") |
|
|
lines.append("| " + " | ".join(row_cells) + " |") |
|
|
|
|
|
|
|
|
if row_idx == 0 or row_idx in self.header_rows: |
|
|
lines.append("| " + " | ".join(["---"] * self.num_cols) + " |") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
def to_structured_json(self) -> Dict[str, Any]: |
|
|
"""Export table to structured JSON with headers.""" |
|
|
|
|
|
headers = [] |
|
|
if self.header_rows: |
|
|
for col_idx in range(self.num_cols): |
|
|
cell = self.get_cell(self.header_rows[0], col_idx) |
|
|
headers.append(cell.text if cell else f"col_{col_idx}") |
|
|
else: |
|
|
headers = [f"col_{i}" for i in range(self.num_cols)] |
|
|
|
|
|
|
|
|
data_start = max(self.header_rows) + 1 if self.header_rows else 0 |
|
|
rows = [] |
|
|
|
|
|
for row_idx in range(data_start, self.num_rows): |
|
|
row_dict = {} |
|
|
for col_idx, header in enumerate(headers): |
|
|
cell = self.get_cell(row_idx, col_idx) |
|
|
row_dict[header] = cell.text if cell else "" |
|
|
rows.append(row_dict) |
|
|
|
|
|
return { |
|
|
"headers": headers, |
|
|
"rows": rows, |
|
|
"num_rows": self.num_rows - len(self.header_rows), |
|
|
"num_cols": self.num_cols, |
|
|
} |
|
|
|
|
|
|
|
|
class ChartDataPoint(BaseModel): |
|
|
"""A data point in a chart.""" |
|
|
label: Optional[str] = None |
|
|
value: Optional[float] = None |
|
|
category: Optional[str] = None |
|
|
series: Optional[str] = None |
|
|
confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
|
|
|
|
|
|
|
|
class ChartChunk(DocumentChunk): |
|
|
""" |
|
|
Specialized chunk for charts/graphs with structured interpretation. |
|
|
|
|
|
Extracts title, axes, series, and key values from visualizations. |
|
|
""" |
|
|
chunk_type: ChunkType = Field(default=ChunkType.CHART) |
|
|
|
|
|
|
|
|
chart_type: Optional[str] = Field(default=None, description="bar, line, pie, scatter, etc.") |
|
|
title: Optional[str] = Field(default=None) |
|
|
|
|
|
|
|
|
x_axis_label: Optional[str] = Field(default=None) |
|
|
y_axis_label: Optional[str] = Field(default=None) |
|
|
x_axis_unit: Optional[str] = Field(default=None) |
|
|
y_axis_unit: Optional[str] = Field(default=None) |
|
|
|
|
|
|
|
|
series_names: List[str] = Field(default_factory=list) |
|
|
data_points: List[ChartDataPoint] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
key_values: Dict[str, Any] = Field(default_factory=dict, description="Key numeric values") |
|
|
trends: List[str] = Field(default_factory=list, description="Identified trends") |
|
|
summary: Optional[str] = Field(default=None, description="Natural language summary") |
|
|
|
|
|
def to_structured_json(self) -> Dict[str, Any]: |
|
|
"""Export chart data as structured JSON.""" |
|
|
return { |
|
|
"chart_type": self.chart_type, |
|
|
"title": self.title, |
|
|
"axes": { |
|
|
"x": {"label": self.x_axis_label, "unit": self.x_axis_unit}, |
|
|
"y": {"label": self.y_axis_label, "unit": self.y_axis_unit}, |
|
|
}, |
|
|
"series": self.series_names, |
|
|
"data_points": [dp.model_dump() for dp in self.data_points], |
|
|
"key_values": self.key_values, |
|
|
"trends": self.trends, |
|
|
"summary": self.summary, |
|
|
} |
|
|
|
|
|
|
|
|
class FormFieldChunk(DocumentChunk): |
|
|
""" |
|
|
Specialized chunk for form fields. |
|
|
|
|
|
Handles text fields, checkboxes, radio buttons, signatures. |
|
|
""" |
|
|
chunk_type: ChunkType = Field(default=ChunkType.FORM_FIELD) |
|
|
|
|
|
|
|
|
field_name: Optional[str] = Field(default=None, description="Field label/name") |
|
|
field_value: Optional[str] = Field(default=None, description="Extracted value") |
|
|
field_type: str = Field(default="text", description="text, checkbox, signature, date, etc.") |
|
|
|
|
|
|
|
|
is_checked: Optional[bool] = Field(default=None) |
|
|
options: List[str] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
is_required: bool = Field(default=False) |
|
|
is_filled: bool = Field(default=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EvidenceRef(BaseModel): |
|
|
""" |
|
|
Evidence reference for grounding extractions. |
|
|
|
|
|
Links every extracted value back to its source in the document. |
|
|
Required for auditability and trust. |
|
|
""" |
|
|
|
|
|
chunk_id: str = Field(..., description="Source chunk ID") |
|
|
doc_id: str = Field(..., description="Document ID") |
|
|
page: int = Field(..., ge=0, description="Page number (0-indexed)") |
|
|
bbox: BoundingBox = Field(..., description="Bounding box of evidence") |
|
|
|
|
|
|
|
|
source_type: str = Field(..., description="text, table, chart, form_field, etc.") |
|
|
snippet: str = Field(..., max_length=1000, description="Text snippet as evidence") |
|
|
|
|
|
|
|
|
confidence: float = Field(..., ge=0.0, le=1.0, description="Evidence confidence") |
|
|
|
|
|
|
|
|
cell_id: Optional[str] = Field(default=None, description="Table cell ID if applicable") |
|
|
|
|
|
|
|
|
crop_path: Optional[str] = Field(default=None, description="Path to cropped image") |
|
|
image_base64: Optional[str] = Field(default=None, description="Base64 encoded crop") |
|
|
|
|
|
|
|
|
warnings: List[str] = Field(default_factory=list) |
|
|
|
|
|
@property |
|
|
def needs_review(self) -> bool: |
|
|
return self.confidence < 0.7 or len(self.warnings) > 0 |
|
|
|
|
|
def to_citation(self, include_bbox: bool = False) -> str: |
|
|
"""Format as human-readable citation.""" |
|
|
citation = f"[Page {self.page + 1}, {self.source_type}]" |
|
|
if include_bbox: |
|
|
citation += f" @ ({self.bbox.x_min:.0f}, {self.bbox.y_min:.0f})" |
|
|
citation += f': "{self.snippet[:100]}..."' if len(self.snippet) > 100 else f': "{self.snippet}"' |
|
|
return citation |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PageResult(BaseModel): |
|
|
"""Result of parsing a single page.""" |
|
|
page_num: int = Field(..., ge=0, description="Page number (0-indexed)") |
|
|
width: int = Field(..., gt=0, description="Page width in pixels") |
|
|
height: int = Field(..., gt=0, description="Page height in pixels") |
|
|
|
|
|
|
|
|
chunks: List[DocumentChunk] = Field(default_factory=list) |
|
|
markdown: str = Field(default="", description="Page content as Markdown") |
|
|
|
|
|
|
|
|
ocr_confidence: Optional[float] = Field(default=None) |
|
|
layout_confidence: Optional[float] = Field(default=None) |
|
|
|
|
|
|
|
|
image_path: Optional[str] = Field(default=None, description="Path to rendered page image") |
|
|
|
|
|
|
|
|
class ParseResult(BaseModel): |
|
|
""" |
|
|
Complete result of document parsing. |
|
|
|
|
|
Contains all parsed content with metadata for downstream processing. |
|
|
""" |
|
|
|
|
|
doc_id: str = Field(..., description="Unique document identifier") |
|
|
source_path: str = Field(..., description="Original file path") |
|
|
filename: str = Field(..., description="Original filename") |
|
|
|
|
|
|
|
|
file_type: str = Field(..., description="pdf, png, jpg, tiff, etc.") |
|
|
file_size_bytes: int = Field(default=0, ge=0) |
|
|
file_hash: Optional[str] = Field(default=None, description="SHA256 of file content") |
|
|
|
|
|
|
|
|
num_pages: int = Field(..., ge=1) |
|
|
pages: List[PageResult] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
chunks: List[DocumentChunk] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
markdown_full: str = Field(default="", description="Full document as Markdown") |
|
|
markdown_by_page: Dict[int, str] = Field(default_factory=dict) |
|
|
|
|
|
|
|
|
parsed_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
processing_time_ms: float = Field(default=0.0) |
|
|
|
|
|
|
|
|
avg_ocr_confidence: Optional[float] = Field(default=None) |
|
|
avg_layout_confidence: Optional[float] = Field(default=None) |
|
|
|
|
|
|
|
|
detected_language: Optional[str] = Field(default=None) |
|
|
|
|
|
|
|
|
models_used: Dict[str, str] = Field(default_factory=dict, description="Model name -> version") |
|
|
|
|
|
|
|
|
warnings: List[str] = Field(default_factory=list) |
|
|
errors: List[str] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
metadata: Dict[str, Any] = Field(default_factory=dict) |
|
|
|
|
|
@property |
|
|
def is_successful(self) -> bool: |
|
|
return len(self.errors) == 0 and len(self.chunks) > 0 |
|
|
|
|
|
@property |
|
|
def has_tables(self) -> bool: |
|
|
return any(c.chunk_type == ChunkType.TABLE for c in self.chunks) |
|
|
|
|
|
@property |
|
|
def has_charts(self) -> bool: |
|
|
return any(c.chunk_type == ChunkType.CHART for c in self.chunks) |
|
|
|
|
|
def get_chunks_by_type(self, chunk_type: ChunkType) -> List[DocumentChunk]: |
|
|
return [c for c in self.chunks if c.chunk_type == chunk_type] |
|
|
|
|
|
def get_chunks_by_page(self, page: int) -> List[DocumentChunk]: |
|
|
return [c for c in self.chunks if c.page == page] |
|
|
|
|
|
def get_tables(self) -> List[TableChunk]: |
|
|
return [c for c in self.chunks if isinstance(c, TableChunk)] |
|
|
|
|
|
def get_charts(self) -> List[ChartChunk]: |
|
|
return [c for c in self.chunks if isinstance(c, ChartChunk)] |
|
|
|
|
|
def to_json(self, indent: int = 2) -> str: |
|
|
"""Serialize to JSON.""" |
|
|
return self.model_dump_json(indent=indent) |
|
|
|
|
|
@classmethod |
|
|
def from_json(cls, json_str: str) -> ParseResult: |
|
|
"""Deserialize from JSON.""" |
|
|
return cls.model_validate_json(json_str) |
|
|
|
|
|
def save(self, path: Union[str, Path]): |
|
|
"""Save to JSON file.""" |
|
|
Path(path).write_text(self.to_json(), encoding="utf-8") |
|
|
|
|
|
@classmethod |
|
|
def load(cls, path: Union[str, Path]) -> ParseResult: |
|
|
"""Load from JSON file.""" |
|
|
return cls.from_json(Path(path).read_text(encoding="utf-8")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FieldExtraction(BaseModel): |
|
|
""" |
|
|
Single extracted field with evidence. |
|
|
""" |
|
|
field_name: str = Field(..., description="Schema field name") |
|
|
value: Any = Field(..., description="Extracted value") |
|
|
value_type: str = Field(..., description="string, number, boolean, array, object") |
|
|
|
|
|
|
|
|
evidence: List[EvidenceRef] = Field(default_factory=list) |
|
|
confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
|
|
|
|
|
|
|
|
is_valid: bool = Field(default=True) |
|
|
validation_errors: List[str] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
abstained: bool = Field(default=False) |
|
|
abstain_reason: Optional[str] = Field(default=None) |
|
|
|
|
|
|
|
|
class ExtractionResult(BaseModel): |
|
|
""" |
|
|
Complete extraction result with data, evidence, and validation. |
|
|
""" |
|
|
|
|
|
data: Dict[str, Any] = Field(default_factory=dict) |
|
|
fields: List[FieldExtraction] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
evidence: List[EvidenceRef] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
overall_confidence: float = Field(default=1.0, ge=0.0, le=1.0) |
|
|
|
|
|
|
|
|
validation_passed: bool = Field(default=True) |
|
|
validation_errors: List[str] = Field(default_factory=list) |
|
|
validation_warnings: List[str] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
abstained_fields: List[str] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
processing_time_ms: float = Field(default=0.0) |
|
|
model_used: Optional[str] = Field(default=None) |
|
|
|
|
|
@property |
|
|
def is_grounded(self) -> bool: |
|
|
"""Check if all fields have evidence.""" |
|
|
return all(f.evidence for f in self.fields if not f.abstained) |
|
|
|
|
|
@property |
|
|
def needs_review(self) -> bool: |
|
|
"""Check if result needs human review.""" |
|
|
return ( |
|
|
self.overall_confidence < 0.7 or |
|
|
len(self.abstained_fields) > 0 or |
|
|
not self.validation_passed |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DocumentType(str, Enum): |
|
|
"""Document type classifications.""" |
|
|
INVOICE = "invoice" |
|
|
CONTRACT = "contract" |
|
|
AGREEMENT = "agreement" |
|
|
PATENT = "patent" |
|
|
RESEARCH_PAPER = "research_paper" |
|
|
REPORT = "report" |
|
|
LETTER = "letter" |
|
|
FORM = "form" |
|
|
RECEIPT = "receipt" |
|
|
BANK_STATEMENT = "bank_statement" |
|
|
TAX_DOCUMENT = "tax_document" |
|
|
ID_DOCUMENT = "id_document" |
|
|
MEDICAL_RECORD = "medical_record" |
|
|
LEGAL_DOCUMENT = "legal_document" |
|
|
TECHNICAL_SPEC = "technical_spec" |
|
|
PRESENTATION = "presentation" |
|
|
SPREADSHEET = "spreadsheet" |
|
|
EMAIL = "email" |
|
|
OTHER = "other" |
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
|
|
|
class ClassificationResult(BaseModel): |
|
|
"""Document classification result.""" |
|
|
doc_id: str |
|
|
doc_type: DocumentType |
|
|
confidence: float = Field(ge=0.0, le=1.0) |
|
|
|
|
|
|
|
|
alternatives: List[Tuple[DocumentType, float]] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
evidence: List[EvidenceRef] = Field(default_factory=list) |
|
|
reasoning: Optional[str] = Field(default=None) |
|
|
|
|
|
|
|
|
is_confident: bool = Field(default=True) |
|
|
|