MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Document Parser
Main orchestrator for document parsing pipeline.
Coordinates OCR, layout detection, and chunk generation.
"""
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
import numpy as np
from ..chunks.models import (
BoundingBox,
ChunkType,
DocumentChunk,
PageResult,
ParseResult,
TableChunk,
ChartChunk,
)
from ..io import (
DocumentFormat,
DocumentInfo,
RenderOptions,
load_document,
get_document_cache,
)
from ..models import (
OCRModel,
OCRResult,
LayoutModel,
LayoutResult,
LayoutRegion,
LayoutRegionType,
TableModel,
TableStructure,
ChartModel,
ChartStructure,
)
logger = logging.getLogger(__name__)
@dataclass
class ParserConfig:
"""Configuration for document parser."""
# Rendering
render_dpi: int = 200
max_pages: Optional[int] = None
# OCR
ocr_enabled: bool = True
ocr_languages: List[str] = field(default_factory=lambda: ["en"])
ocr_min_confidence: float = 0.5
# Layout
layout_enabled: bool = True
reading_order_enabled: bool = True
# Specialized extraction
table_extraction_enabled: bool = True
chart_extraction_enabled: bool = True
# Chunking
merge_adjacent_text: bool = True
min_chunk_chars: int = 10
max_chunk_chars: int = 4000
# Caching
cache_enabled: bool = True
# Output
include_markdown: bool = True
include_raw_ocr: bool = False
class DocumentParser:
"""
Main document parsing orchestrator.
Coordinates the full pipeline:
1. Load document and render pages
2. Run OCR on each page
3. Detect layout regions
4. Extract tables and charts
5. Generate semantic chunks
6. Build reading order
7. Produce final ParseResult
"""
def __init__(
self,
config: Optional[ParserConfig] = None,
ocr_model: Optional[OCRModel] = None,
layout_model: Optional[LayoutModel] = None,
table_model: Optional[TableModel] = None,
chart_model: Optional[ChartModel] = None,
):
self.config = config or ParserConfig()
self.ocr_model = ocr_model
self.layout_model = layout_model
self.table_model = table_model
self.chart_model = chart_model
self._cache = get_document_cache() if self.config.cache_enabled else None
def parse(
self,
path: Union[str, Path],
page_range: Optional[Tuple[int, int]] = None,
) -> ParseResult:
"""
Parse a document and return structured results.
Args:
path: Path to document file
page_range: Optional (start, end) page range (1-indexed, inclusive)
Returns:
ParseResult with chunks and metadata
"""
path = Path(path)
start_time = time.time()
logger.info(f"Parsing document: {path}")
# Load document
loader, renderer = load_document(path)
doc_info = loader.info
# Generate doc_id
doc_id = doc_info.doc_id
# Determine pages to process
start_page = page_range[0] if page_range else 1
end_page = page_range[1] if page_range else doc_info.num_pages
if self.config.max_pages:
end_page = min(end_page, start_page + self.config.max_pages - 1)
page_numbers = list(range(start_page, end_page + 1))
logger.info(f"Processing pages {start_page}-{end_page} of {doc_info.num_pages}")
# Process each page
page_results: List[PageResult] = []
all_chunks: List[DocumentChunk] = []
markdown_by_page: Dict[int, str] = {}
sequence_index = 0
render_options = RenderOptions(dpi=self.config.render_dpi)
for page_num, page_image in renderer.render_pages(page_numbers, render_options):
logger.debug(f"Processing page {page_num}")
# Process single page
page_result, page_chunks = self._process_page(
page_image=page_image,
page_number=page_num,
doc_id=doc_id,
sequence_start=sequence_index,
)
page_results.append(page_result)
all_chunks.extend(page_chunks)
sequence_index += len(page_chunks)
# Generate page markdown
if self.config.include_markdown:
markdown_by_page[page_num] = self._generate_page_markdown(page_chunks)
# Close document
loader.close()
# Build full markdown
markdown_full = "\n\n---\n\n".join(
f"## Page {p}\n\n{md}"
for p, md in sorted(markdown_by_page.items())
)
processing_time = time.time() - start_time
logger.info(f"Parsed {len(all_chunks)} chunks in {processing_time:.2f}s")
return ParseResult(
doc_id=doc_id,
source_path=str(path.absolute()),
filename=path.name,
num_pages=doc_info.num_pages,
pages=page_results,
chunks=all_chunks,
markdown_full=markdown_full,
markdown_by_page=markdown_by_page,
processing_time_ms=processing_time * 1000,
metadata={
"format": doc_info.format.value,
"has_text_layer": doc_info.has_text_layer,
"is_scanned": doc_info.is_scanned,
"render_dpi": self.config.render_dpi,
}
)
def _process_page(
self,
page_image: np.ndarray,
page_number: int,
doc_id: str,
sequence_start: int,
) -> Tuple[PageResult, List[DocumentChunk]]:
"""Process a single page."""
height, width = page_image.shape[:2]
chunks: List[DocumentChunk] = []
sequence_index = sequence_start
# Run OCR
ocr_result: Optional[OCRResult] = None
if self.config.ocr_enabled and self.ocr_model:
ocr_result = self.ocr_model.recognize(page_image)
# Run layout detection
layout_result: Optional[LayoutResult] = None
if self.config.layout_enabled and self.layout_model:
layout_result = self.layout_model.detect(page_image)
# Process layout regions or fall back to OCR blocks
if layout_result and layout_result.regions:
for region in layout_result.get_ordered_regions():
region_chunks = self._process_region(
page_image=page_image,
region=region,
ocr_result=ocr_result,
page_number=page_number,
doc_id=doc_id,
sequence_index=sequence_index,
image_size=(width, height),
)
chunks.extend(region_chunks)
sequence_index += len(region_chunks)
elif ocr_result and ocr_result.blocks:
# Fall back to OCR blocks
for block in ocr_result.blocks:
chunk = self._create_text_chunk(
text=block.text,
bbox=block.bbox,
confidence=block.confidence,
page_number=page_number,
doc_id=doc_id,
sequence_index=sequence_index,
chunk_type=ChunkType.PARAGRAPH,
)
chunks.append(chunk)
sequence_index += 1
# Merge adjacent text chunks if enabled
if self.config.merge_adjacent_text:
chunks = self._merge_adjacent_chunks(chunks)
# Build page result
page_result = PageResult(
page_number=page_number,
width=width,
height=height,
chunks=[c.chunk_id for c in chunks],
ocr_confidence=ocr_result.confidence if ocr_result else None,
)
return page_result, chunks
def _process_region(
self,
page_image: np.ndarray,
region: LayoutRegion,
ocr_result: Optional[OCRResult],
page_number: int,
doc_id: str,
sequence_index: int,
image_size: Tuple[int, int],
) -> List[DocumentChunk]:
"""Process a single layout region."""
chunks: List[DocumentChunk] = []
width, height = image_size
# Normalize bbox if needed
bbox = region.bbox
if not bbox.normalized:
bbox = bbox.to_normalized(width, height)
# Handle different region types
if region.region_type == LayoutRegionType.TABLE:
table_chunk = self._extract_table(
page_image=page_image,
region=region,
page_number=page_number,
doc_id=doc_id,
sequence_index=sequence_index,
)
if table_chunk:
chunks.append(table_chunk)
elif region.region_type in {LayoutRegionType.CHART, LayoutRegionType.FIGURE}:
# Try chart extraction first
chart_chunk = self._extract_chart(
page_image=page_image,
region=region,
page_number=page_number,
doc_id=doc_id,
sequence_index=sequence_index,
)
if chart_chunk:
chunks.append(chart_chunk)
else:
# Fall back to figure chunk
text = self._get_region_text(region, ocr_result) or "[Figure]"
chunk = self._create_text_chunk(
text=text,
bbox=bbox,
confidence=region.confidence,
page_number=page_number,
doc_id=doc_id,
sequence_index=sequence_index,
chunk_type=ChunkType.FIGURE,
)
chunks.append(chunk)
else:
# Text-based region
text = self._get_region_text(region, ocr_result)
if text and len(text.strip()) >= self.config.min_chunk_chars:
chunk_type = region.region_type.to_chunk_type()
chunk = self._create_text_chunk(
text=text,
bbox=bbox,
confidence=region.confidence,
page_number=page_number,
doc_id=doc_id,
sequence_index=sequence_index,
chunk_type=chunk_type,
)
chunks.append(chunk)
return chunks
def _get_region_text(
self,
region: LayoutRegion,
ocr_result: Optional[OCRResult],
) -> str:
"""Get text for a region from OCR result."""
if not ocr_result:
return ""
return ocr_result.get_text_in_region(region.bbox, threshold=0.3)
def _extract_table(
self,
page_image: np.ndarray,
region: LayoutRegion,
page_number: int,
doc_id: str,
sequence_index: int,
) -> Optional[TableChunk]:
"""Extract table structure from a region."""
if not self.config.table_extraction_enabled or not self.table_model:
return None
try:
table_structure = self.table_model.extract_structure(
page_image,
region.bbox
)
if table_structure.num_rows > 0:
return table_structure.to_table_chunk(
doc_id=doc_id,
page=page_number,
sequence_index=sequence_index,
)
except Exception as e:
logger.warning(f"Table extraction failed: {e}")
return None
def _extract_chart(
self,
page_image: np.ndarray,
region: LayoutRegion,
page_number: int,
doc_id: str,
sequence_index: int,
) -> Optional[ChartChunk]:
"""Extract chart data from a region."""
if not self.config.chart_extraction_enabled or not self.chart_model:
return None
try:
chart_structure = self.chart_model.extract_chart(
page_image,
region.bbox
)
if chart_structure.chart_type.value != "unknown":
return chart_structure.to_chart_chunk(
doc_id=doc_id,
page=page_number,
sequence_index=sequence_index,
)
except Exception as e:
logger.warning(f"Chart extraction failed: {e}")
return None
def _create_text_chunk(
self,
text: str,
bbox: BoundingBox,
confidence: float,
page_number: int,
doc_id: str,
sequence_index: int,
chunk_type: ChunkType,
) -> DocumentChunk:
"""Create a text chunk."""
chunk_id = DocumentChunk.generate_chunk_id(
doc_id=doc_id,
page=page_number,
bbox=bbox,
chunk_type_str=chunk_type.value,
)
return DocumentChunk(
chunk_id=chunk_id,
doc_id=doc_id,
chunk_type=chunk_type,
text=text,
page=page_number,
bbox=bbox,
confidence=confidence,
sequence_index=sequence_index,
)
def _merge_adjacent_chunks(
self,
chunks: List[DocumentChunk],
) -> List[DocumentChunk]:
"""Merge adjacent text chunks of the same type."""
if len(chunks) <= 1:
return chunks
merged: List[DocumentChunk] = []
current: Optional[DocumentChunk] = None
mergeable_types = {
ChunkType.TEXT,
ChunkType.PARAGRAPH,
}
for chunk in chunks:
if current is None:
current = chunk
continue
# Check if can merge
can_merge = (
current.chunk_type in mergeable_types and
chunk.chunk_type in mergeable_types and
current.chunk_type == chunk.chunk_type and
current.page == chunk.page and
self._chunks_adjacent(current, chunk)
)
if can_merge:
# Merge chunks
merged_text = current.text + "\n" + chunk.text
if len(merged_text) <= self.config.max_chunk_chars:
current = DocumentChunk(
chunk_id=current.chunk_id, # Keep first ID
doc_id=current.doc_id,
chunk_type=current.chunk_type,
text=merged_text,
page=current.page,
bbox=self._merge_bboxes(current.bbox, chunk.bbox),
confidence=min(current.confidence, chunk.confidence),
sequence_index=current.sequence_index,
)
else:
merged.append(current)
current = chunk
else:
merged.append(current)
current = chunk
if current:
merged.append(current)
return merged
def _chunks_adjacent(
self,
chunk1: DocumentChunk,
chunk2: DocumentChunk,
gap_threshold: float = 0.05,
) -> bool:
"""Check if two chunks are vertically adjacent."""
# Check vertical gap
gap = chunk2.bbox.y_min - chunk1.bbox.y_max
return 0 <= gap <= gap_threshold
def _merge_bboxes(
self,
bbox1: BoundingBox,
bbox2: BoundingBox,
) -> BoundingBox:
"""Merge two bounding boxes."""
return BoundingBox(
x_min=min(bbox1.x_min, bbox2.x_min),
y_min=min(bbox1.y_min, bbox2.y_min),
x_max=max(bbox1.x_max, bbox2.x_max),
y_max=max(bbox1.y_max, bbox2.y_max),
normalized=bbox1.normalized,
)
def _generate_page_markdown(
self,
chunks: List[DocumentChunk],
) -> str:
"""Generate markdown for page chunks."""
lines: List[str] = []
for chunk in chunks:
# Add anchor comment
lines.append(f"<!-- chunk:{chunk.chunk_id} -->")
# Format based on chunk type
if chunk.chunk_type == ChunkType.TITLE:
lines.append(f"# {chunk.text}")
elif chunk.chunk_type == ChunkType.HEADING:
lines.append(f"## {chunk.text}")
elif chunk.chunk_type == ChunkType.TABLE:
if isinstance(chunk, TableChunk):
lines.append(chunk.to_markdown())
else:
lines.append(chunk.text)
elif chunk.chunk_type == ChunkType.LIST:
# Format as list items
for item in chunk.text.split("\n"):
if item.strip():
lines.append(f"- {item.strip()}")
elif chunk.chunk_type == ChunkType.CODE:
lines.append(f"```\n{chunk.text}\n```")
elif chunk.chunk_type == ChunkType.FIGURE:
lines.append(f"[Figure: {chunk.text}]")
elif chunk.chunk_type == ChunkType.CHART:
if isinstance(chunk, ChartChunk):
lines.append(f"[Chart: {chunk.title or chunk.chart_type}]")
lines.append(chunk.text)
else:
lines.append(f"[Chart: {chunk.text}]")
else:
lines.append(chunk.text)
lines.append("") # Blank line between chunks
return "\n".join(lines)
def parse_document(
path: Union[str, Path],
config: Optional[ParserConfig] = None,
) -> ParseResult:
"""
Convenience function to parse a document.
Args:
path: Path to document
config: Optional parser configuration
Returns:
ParseResult with extracted chunks
"""
parser = DocumentParser(config=config)
return parser.parse(path)