|
|
""" |
|
|
Document Parser |
|
|
|
|
|
Main orchestrator for document parsing pipeline. |
|
|
Coordinates OCR, layout detection, and chunk generation. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import time |
|
|
from dataclasses import dataclass, field |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union |
|
|
|
|
|
import numpy as np |
|
|
|
|
|
from ..chunks.models import ( |
|
|
BoundingBox, |
|
|
ChunkType, |
|
|
DocumentChunk, |
|
|
PageResult, |
|
|
ParseResult, |
|
|
TableChunk, |
|
|
ChartChunk, |
|
|
) |
|
|
from ..io import ( |
|
|
DocumentFormat, |
|
|
DocumentInfo, |
|
|
RenderOptions, |
|
|
load_document, |
|
|
get_document_cache, |
|
|
) |
|
|
from ..models import ( |
|
|
OCRModel, |
|
|
OCRResult, |
|
|
LayoutModel, |
|
|
LayoutResult, |
|
|
LayoutRegion, |
|
|
LayoutRegionType, |
|
|
TableModel, |
|
|
TableStructure, |
|
|
ChartModel, |
|
|
ChartStructure, |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ParserConfig: |
|
|
"""Configuration for document parser.""" |
|
|
|
|
|
|
|
|
render_dpi: int = 200 |
|
|
max_pages: Optional[int] = None |
|
|
|
|
|
|
|
|
ocr_enabled: bool = True |
|
|
ocr_languages: List[str] = field(default_factory=lambda: ["en"]) |
|
|
ocr_min_confidence: float = 0.5 |
|
|
|
|
|
|
|
|
layout_enabled: bool = True |
|
|
reading_order_enabled: bool = True |
|
|
|
|
|
|
|
|
table_extraction_enabled: bool = True |
|
|
chart_extraction_enabled: bool = True |
|
|
|
|
|
|
|
|
merge_adjacent_text: bool = True |
|
|
min_chunk_chars: int = 10 |
|
|
max_chunk_chars: int = 4000 |
|
|
|
|
|
|
|
|
cache_enabled: bool = True |
|
|
|
|
|
|
|
|
include_markdown: bool = True |
|
|
include_raw_ocr: bool = False |
|
|
|
|
|
|
|
|
class DocumentParser: |
|
|
""" |
|
|
Main document parsing orchestrator. |
|
|
|
|
|
Coordinates the full pipeline: |
|
|
1. Load document and render pages |
|
|
2. Run OCR on each page |
|
|
3. Detect layout regions |
|
|
4. Extract tables and charts |
|
|
5. Generate semantic chunks |
|
|
6. Build reading order |
|
|
7. Produce final ParseResult |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: Optional[ParserConfig] = None, |
|
|
ocr_model: Optional[OCRModel] = None, |
|
|
layout_model: Optional[LayoutModel] = None, |
|
|
table_model: Optional[TableModel] = None, |
|
|
chart_model: Optional[ChartModel] = None, |
|
|
): |
|
|
self.config = config or ParserConfig() |
|
|
self.ocr_model = ocr_model |
|
|
self.layout_model = layout_model |
|
|
self.table_model = table_model |
|
|
self.chart_model = chart_model |
|
|
|
|
|
self._cache = get_document_cache() if self.config.cache_enabled else None |
|
|
|
|
|
def parse( |
|
|
self, |
|
|
path: Union[str, Path], |
|
|
page_range: Optional[Tuple[int, int]] = None, |
|
|
) -> ParseResult: |
|
|
""" |
|
|
Parse a document and return structured results. |
|
|
|
|
|
Args: |
|
|
path: Path to document file |
|
|
page_range: Optional (start, end) page range (1-indexed, inclusive) |
|
|
|
|
|
Returns: |
|
|
ParseResult with chunks and metadata |
|
|
""" |
|
|
path = Path(path) |
|
|
start_time = time.time() |
|
|
|
|
|
logger.info(f"Parsing document: {path}") |
|
|
|
|
|
|
|
|
loader, renderer = load_document(path) |
|
|
doc_info = loader.info |
|
|
|
|
|
|
|
|
doc_id = doc_info.doc_id |
|
|
|
|
|
|
|
|
start_page = page_range[0] if page_range else 1 |
|
|
end_page = page_range[1] if page_range else doc_info.num_pages |
|
|
|
|
|
if self.config.max_pages: |
|
|
end_page = min(end_page, start_page + self.config.max_pages - 1) |
|
|
|
|
|
page_numbers = list(range(start_page, end_page + 1)) |
|
|
|
|
|
logger.info(f"Processing pages {start_page}-{end_page} of {doc_info.num_pages}") |
|
|
|
|
|
|
|
|
page_results: List[PageResult] = [] |
|
|
all_chunks: List[DocumentChunk] = [] |
|
|
markdown_by_page: Dict[int, str] = {} |
|
|
sequence_index = 0 |
|
|
|
|
|
render_options = RenderOptions(dpi=self.config.render_dpi) |
|
|
|
|
|
for page_num, page_image in renderer.render_pages(page_numbers, render_options): |
|
|
logger.debug(f"Processing page {page_num}") |
|
|
|
|
|
|
|
|
page_result, page_chunks = self._process_page( |
|
|
page_image=page_image, |
|
|
page_number=page_num, |
|
|
doc_id=doc_id, |
|
|
sequence_start=sequence_index, |
|
|
) |
|
|
|
|
|
page_results.append(page_result) |
|
|
all_chunks.extend(page_chunks) |
|
|
sequence_index += len(page_chunks) |
|
|
|
|
|
|
|
|
if self.config.include_markdown: |
|
|
markdown_by_page[page_num] = self._generate_page_markdown(page_chunks) |
|
|
|
|
|
|
|
|
loader.close() |
|
|
|
|
|
|
|
|
markdown_full = "\n\n---\n\n".join( |
|
|
f"## Page {p}\n\n{md}" |
|
|
for p, md in sorted(markdown_by_page.items()) |
|
|
) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
logger.info(f"Parsed {len(all_chunks)} chunks in {processing_time:.2f}s") |
|
|
|
|
|
return ParseResult( |
|
|
doc_id=doc_id, |
|
|
source_path=str(path.absolute()), |
|
|
filename=path.name, |
|
|
num_pages=doc_info.num_pages, |
|
|
pages=page_results, |
|
|
chunks=all_chunks, |
|
|
markdown_full=markdown_full, |
|
|
markdown_by_page=markdown_by_page, |
|
|
processing_time_ms=processing_time * 1000, |
|
|
metadata={ |
|
|
"format": doc_info.format.value, |
|
|
"has_text_layer": doc_info.has_text_layer, |
|
|
"is_scanned": doc_info.is_scanned, |
|
|
"render_dpi": self.config.render_dpi, |
|
|
} |
|
|
) |
|
|
|
|
|
def _process_page( |
|
|
self, |
|
|
page_image: np.ndarray, |
|
|
page_number: int, |
|
|
doc_id: str, |
|
|
sequence_start: int, |
|
|
) -> Tuple[PageResult, List[DocumentChunk]]: |
|
|
"""Process a single page.""" |
|
|
height, width = page_image.shape[:2] |
|
|
chunks: List[DocumentChunk] = [] |
|
|
sequence_index = sequence_start |
|
|
|
|
|
|
|
|
ocr_result: Optional[OCRResult] = None |
|
|
if self.config.ocr_enabled and self.ocr_model: |
|
|
ocr_result = self.ocr_model.recognize(page_image) |
|
|
|
|
|
|
|
|
layout_result: Optional[LayoutResult] = None |
|
|
if self.config.layout_enabled and self.layout_model: |
|
|
layout_result = self.layout_model.detect(page_image) |
|
|
|
|
|
|
|
|
if layout_result and layout_result.regions: |
|
|
for region in layout_result.get_ordered_regions(): |
|
|
region_chunks = self._process_region( |
|
|
page_image=page_image, |
|
|
region=region, |
|
|
ocr_result=ocr_result, |
|
|
page_number=page_number, |
|
|
doc_id=doc_id, |
|
|
sequence_index=sequence_index, |
|
|
image_size=(width, height), |
|
|
) |
|
|
chunks.extend(region_chunks) |
|
|
sequence_index += len(region_chunks) |
|
|
|
|
|
elif ocr_result and ocr_result.blocks: |
|
|
|
|
|
for block in ocr_result.blocks: |
|
|
chunk = self._create_text_chunk( |
|
|
text=block.text, |
|
|
bbox=block.bbox, |
|
|
confidence=block.confidence, |
|
|
page_number=page_number, |
|
|
doc_id=doc_id, |
|
|
sequence_index=sequence_index, |
|
|
chunk_type=ChunkType.PARAGRAPH, |
|
|
) |
|
|
chunks.append(chunk) |
|
|
sequence_index += 1 |
|
|
|
|
|
|
|
|
if self.config.merge_adjacent_text: |
|
|
chunks = self._merge_adjacent_chunks(chunks) |
|
|
|
|
|
|
|
|
page_result = PageResult( |
|
|
page_number=page_number, |
|
|
width=width, |
|
|
height=height, |
|
|
chunks=[c.chunk_id for c in chunks], |
|
|
ocr_confidence=ocr_result.confidence if ocr_result else None, |
|
|
) |
|
|
|
|
|
return page_result, chunks |
|
|
|
|
|
def _process_region( |
|
|
self, |
|
|
page_image: np.ndarray, |
|
|
region: LayoutRegion, |
|
|
ocr_result: Optional[OCRResult], |
|
|
page_number: int, |
|
|
doc_id: str, |
|
|
sequence_index: int, |
|
|
image_size: Tuple[int, int], |
|
|
) -> List[DocumentChunk]: |
|
|
"""Process a single layout region.""" |
|
|
chunks: List[DocumentChunk] = [] |
|
|
width, height = image_size |
|
|
|
|
|
|
|
|
bbox = region.bbox |
|
|
if not bbox.normalized: |
|
|
bbox = bbox.to_normalized(width, height) |
|
|
|
|
|
|
|
|
if region.region_type == LayoutRegionType.TABLE: |
|
|
table_chunk = self._extract_table( |
|
|
page_image=page_image, |
|
|
region=region, |
|
|
page_number=page_number, |
|
|
doc_id=doc_id, |
|
|
sequence_index=sequence_index, |
|
|
) |
|
|
if table_chunk: |
|
|
chunks.append(table_chunk) |
|
|
|
|
|
elif region.region_type in {LayoutRegionType.CHART, LayoutRegionType.FIGURE}: |
|
|
|
|
|
chart_chunk = self._extract_chart( |
|
|
page_image=page_image, |
|
|
region=region, |
|
|
page_number=page_number, |
|
|
doc_id=doc_id, |
|
|
sequence_index=sequence_index, |
|
|
) |
|
|
if chart_chunk: |
|
|
chunks.append(chart_chunk) |
|
|
else: |
|
|
|
|
|
text = self._get_region_text(region, ocr_result) or "[Figure]" |
|
|
chunk = self._create_text_chunk( |
|
|
text=text, |
|
|
bbox=bbox, |
|
|
confidence=region.confidence, |
|
|
page_number=page_number, |
|
|
doc_id=doc_id, |
|
|
sequence_index=sequence_index, |
|
|
chunk_type=ChunkType.FIGURE, |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
else: |
|
|
|
|
|
text = self._get_region_text(region, ocr_result) |
|
|
if text and len(text.strip()) >= self.config.min_chunk_chars: |
|
|
chunk_type = region.region_type.to_chunk_type() |
|
|
chunk = self._create_text_chunk( |
|
|
text=text, |
|
|
bbox=bbox, |
|
|
confidence=region.confidence, |
|
|
page_number=page_number, |
|
|
doc_id=doc_id, |
|
|
sequence_index=sequence_index, |
|
|
chunk_type=chunk_type, |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _get_region_text( |
|
|
self, |
|
|
region: LayoutRegion, |
|
|
ocr_result: Optional[OCRResult], |
|
|
) -> str: |
|
|
"""Get text for a region from OCR result.""" |
|
|
if not ocr_result: |
|
|
return "" |
|
|
|
|
|
return ocr_result.get_text_in_region(region.bbox, threshold=0.3) |
|
|
|
|
|
def _extract_table( |
|
|
self, |
|
|
page_image: np.ndarray, |
|
|
region: LayoutRegion, |
|
|
page_number: int, |
|
|
doc_id: str, |
|
|
sequence_index: int, |
|
|
) -> Optional[TableChunk]: |
|
|
"""Extract table structure from a region.""" |
|
|
if not self.config.table_extraction_enabled or not self.table_model: |
|
|
return None |
|
|
|
|
|
try: |
|
|
table_structure = self.table_model.extract_structure( |
|
|
page_image, |
|
|
region.bbox |
|
|
) |
|
|
|
|
|
if table_structure.num_rows > 0: |
|
|
return table_structure.to_table_chunk( |
|
|
doc_id=doc_id, |
|
|
page=page_number, |
|
|
sequence_index=sequence_index, |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Table extraction failed: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _extract_chart( |
|
|
self, |
|
|
page_image: np.ndarray, |
|
|
region: LayoutRegion, |
|
|
page_number: int, |
|
|
doc_id: str, |
|
|
sequence_index: int, |
|
|
) -> Optional[ChartChunk]: |
|
|
"""Extract chart data from a region.""" |
|
|
if not self.config.chart_extraction_enabled or not self.chart_model: |
|
|
return None |
|
|
|
|
|
try: |
|
|
chart_structure = self.chart_model.extract_chart( |
|
|
page_image, |
|
|
region.bbox |
|
|
) |
|
|
|
|
|
if chart_structure.chart_type.value != "unknown": |
|
|
return chart_structure.to_chart_chunk( |
|
|
doc_id=doc_id, |
|
|
page=page_number, |
|
|
sequence_index=sequence_index, |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Chart extraction failed: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _create_text_chunk( |
|
|
self, |
|
|
text: str, |
|
|
bbox: BoundingBox, |
|
|
confidence: float, |
|
|
page_number: int, |
|
|
doc_id: str, |
|
|
sequence_index: int, |
|
|
chunk_type: ChunkType, |
|
|
) -> DocumentChunk: |
|
|
"""Create a text chunk.""" |
|
|
chunk_id = DocumentChunk.generate_chunk_id( |
|
|
doc_id=doc_id, |
|
|
page=page_number, |
|
|
bbox=bbox, |
|
|
chunk_type_str=chunk_type.value, |
|
|
) |
|
|
|
|
|
return DocumentChunk( |
|
|
chunk_id=chunk_id, |
|
|
doc_id=doc_id, |
|
|
chunk_type=chunk_type, |
|
|
text=text, |
|
|
page=page_number, |
|
|
bbox=bbox, |
|
|
confidence=confidence, |
|
|
sequence_index=sequence_index, |
|
|
) |
|
|
|
|
|
def _merge_adjacent_chunks( |
|
|
self, |
|
|
chunks: List[DocumentChunk], |
|
|
) -> List[DocumentChunk]: |
|
|
"""Merge adjacent text chunks of the same type.""" |
|
|
if len(chunks) <= 1: |
|
|
return chunks |
|
|
|
|
|
merged: List[DocumentChunk] = [] |
|
|
current: Optional[DocumentChunk] = None |
|
|
|
|
|
mergeable_types = { |
|
|
ChunkType.TEXT, |
|
|
ChunkType.PARAGRAPH, |
|
|
} |
|
|
|
|
|
for chunk in chunks: |
|
|
if current is None: |
|
|
current = chunk |
|
|
continue |
|
|
|
|
|
|
|
|
can_merge = ( |
|
|
current.chunk_type in mergeable_types and |
|
|
chunk.chunk_type in mergeable_types and |
|
|
current.chunk_type == chunk.chunk_type and |
|
|
current.page == chunk.page and |
|
|
self._chunks_adjacent(current, chunk) |
|
|
) |
|
|
|
|
|
if can_merge: |
|
|
|
|
|
merged_text = current.text + "\n" + chunk.text |
|
|
if len(merged_text) <= self.config.max_chunk_chars: |
|
|
current = DocumentChunk( |
|
|
chunk_id=current.chunk_id, |
|
|
doc_id=current.doc_id, |
|
|
chunk_type=current.chunk_type, |
|
|
text=merged_text, |
|
|
page=current.page, |
|
|
bbox=self._merge_bboxes(current.bbox, chunk.bbox), |
|
|
confidence=min(current.confidence, chunk.confidence), |
|
|
sequence_index=current.sequence_index, |
|
|
) |
|
|
else: |
|
|
merged.append(current) |
|
|
current = chunk |
|
|
else: |
|
|
merged.append(current) |
|
|
current = chunk |
|
|
|
|
|
if current: |
|
|
merged.append(current) |
|
|
|
|
|
return merged |
|
|
|
|
|
def _chunks_adjacent( |
|
|
self, |
|
|
chunk1: DocumentChunk, |
|
|
chunk2: DocumentChunk, |
|
|
gap_threshold: float = 0.05, |
|
|
) -> bool: |
|
|
"""Check if two chunks are vertically adjacent.""" |
|
|
|
|
|
gap = chunk2.bbox.y_min - chunk1.bbox.y_max |
|
|
return 0 <= gap <= gap_threshold |
|
|
|
|
|
def _merge_bboxes( |
|
|
self, |
|
|
bbox1: BoundingBox, |
|
|
bbox2: BoundingBox, |
|
|
) -> BoundingBox: |
|
|
"""Merge two bounding boxes.""" |
|
|
return BoundingBox( |
|
|
x_min=min(bbox1.x_min, bbox2.x_min), |
|
|
y_min=min(bbox1.y_min, bbox2.y_min), |
|
|
x_max=max(bbox1.x_max, bbox2.x_max), |
|
|
y_max=max(bbox1.y_max, bbox2.y_max), |
|
|
normalized=bbox1.normalized, |
|
|
) |
|
|
|
|
|
def _generate_page_markdown( |
|
|
self, |
|
|
chunks: List[DocumentChunk], |
|
|
) -> str: |
|
|
"""Generate markdown for page chunks.""" |
|
|
lines: List[str] = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
|
|
|
lines.append(f"<!-- chunk:{chunk.chunk_id} -->") |
|
|
|
|
|
|
|
|
if chunk.chunk_type == ChunkType.TITLE: |
|
|
lines.append(f"# {chunk.text}") |
|
|
elif chunk.chunk_type == ChunkType.HEADING: |
|
|
lines.append(f"## {chunk.text}") |
|
|
elif chunk.chunk_type == ChunkType.TABLE: |
|
|
if isinstance(chunk, TableChunk): |
|
|
lines.append(chunk.to_markdown()) |
|
|
else: |
|
|
lines.append(chunk.text) |
|
|
elif chunk.chunk_type == ChunkType.LIST: |
|
|
|
|
|
for item in chunk.text.split("\n"): |
|
|
if item.strip(): |
|
|
lines.append(f"- {item.strip()}") |
|
|
elif chunk.chunk_type == ChunkType.CODE: |
|
|
lines.append(f"```\n{chunk.text}\n```") |
|
|
elif chunk.chunk_type == ChunkType.FIGURE: |
|
|
lines.append(f"[Figure: {chunk.text}]") |
|
|
elif chunk.chunk_type == ChunkType.CHART: |
|
|
if isinstance(chunk, ChartChunk): |
|
|
lines.append(f"[Chart: {chunk.title or chunk.chart_type}]") |
|
|
lines.append(chunk.text) |
|
|
else: |
|
|
lines.append(f"[Chart: {chunk.text}]") |
|
|
else: |
|
|
lines.append(chunk.text) |
|
|
|
|
|
lines.append("") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def parse_document( |
|
|
path: Union[str, Path], |
|
|
config: Optional[ParserConfig] = None, |
|
|
) -> ParseResult: |
|
|
""" |
|
|
Convenience function to parse a document. |
|
|
|
|
|
Args: |
|
|
path: Path to document |
|
|
config: Optional parser configuration |
|
|
|
|
|
Returns: |
|
|
ParseResult with extracted chunks |
|
|
""" |
|
|
parser = DocumentParser(config=config) |
|
|
return parser.parse(path) |
|
|
|