""" Document Parser Main orchestrator for document parsing pipeline. Coordinates OCR, layout detection, and chunk generation. """ import logging import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import numpy as np from ..chunks.models import ( BoundingBox, ChunkType, DocumentChunk, PageResult, ParseResult, TableChunk, ChartChunk, ) from ..io import ( DocumentFormat, DocumentInfo, RenderOptions, load_document, get_document_cache, ) from ..models import ( OCRModel, OCRResult, LayoutModel, LayoutResult, LayoutRegion, LayoutRegionType, TableModel, TableStructure, ChartModel, ChartStructure, ) logger = logging.getLogger(__name__) @dataclass class ParserConfig: """Configuration for document parser.""" # Rendering render_dpi: int = 200 max_pages: Optional[int] = None # OCR ocr_enabled: bool = True ocr_languages: List[str] = field(default_factory=lambda: ["en"]) ocr_min_confidence: float = 0.5 # Layout layout_enabled: bool = True reading_order_enabled: bool = True # Specialized extraction table_extraction_enabled: bool = True chart_extraction_enabled: bool = True # Chunking merge_adjacent_text: bool = True min_chunk_chars: int = 10 max_chunk_chars: int = 4000 # Caching cache_enabled: bool = True # Output include_markdown: bool = True include_raw_ocr: bool = False class DocumentParser: """ Main document parsing orchestrator. Coordinates the full pipeline: 1. Load document and render pages 2. Run OCR on each page 3. Detect layout regions 4. Extract tables and charts 5. Generate semantic chunks 6. Build reading order 7. Produce final ParseResult """ def __init__( self, config: Optional[ParserConfig] = None, ocr_model: Optional[OCRModel] = None, layout_model: Optional[LayoutModel] = None, table_model: Optional[TableModel] = None, chart_model: Optional[ChartModel] = None, ): self.config = config or ParserConfig() self.ocr_model = ocr_model self.layout_model = layout_model self.table_model = table_model self.chart_model = chart_model self._cache = get_document_cache() if self.config.cache_enabled else None def parse( self, path: Union[str, Path], page_range: Optional[Tuple[int, int]] = None, ) -> ParseResult: """ Parse a document and return structured results. Args: path: Path to document file page_range: Optional (start, end) page range (1-indexed, inclusive) Returns: ParseResult with chunks and metadata """ path = Path(path) start_time = time.time() logger.info(f"Parsing document: {path}") # Load document loader, renderer = load_document(path) doc_info = loader.info # Generate doc_id doc_id = doc_info.doc_id # Determine pages to process start_page = page_range[0] if page_range else 1 end_page = page_range[1] if page_range else doc_info.num_pages if self.config.max_pages: end_page = min(end_page, start_page + self.config.max_pages - 1) page_numbers = list(range(start_page, end_page + 1)) logger.info(f"Processing pages {start_page}-{end_page} of {doc_info.num_pages}") # Process each page page_results: List[PageResult] = [] all_chunks: List[DocumentChunk] = [] markdown_by_page: Dict[int, str] = {} sequence_index = 0 render_options = RenderOptions(dpi=self.config.render_dpi) for page_num, page_image in renderer.render_pages(page_numbers, render_options): logger.debug(f"Processing page {page_num}") # Process single page page_result, page_chunks = self._process_page( page_image=page_image, page_number=page_num, doc_id=doc_id, sequence_start=sequence_index, ) page_results.append(page_result) all_chunks.extend(page_chunks) sequence_index += len(page_chunks) # Generate page markdown if self.config.include_markdown: markdown_by_page[page_num] = self._generate_page_markdown(page_chunks) # Close document loader.close() # Build full markdown markdown_full = "\n\n---\n\n".join( f"## Page {p}\n\n{md}" for p, md in sorted(markdown_by_page.items()) ) processing_time = time.time() - start_time logger.info(f"Parsed {len(all_chunks)} chunks in {processing_time:.2f}s") return ParseResult( doc_id=doc_id, source_path=str(path.absolute()), filename=path.name, num_pages=doc_info.num_pages, pages=page_results, chunks=all_chunks, markdown_full=markdown_full, markdown_by_page=markdown_by_page, processing_time_ms=processing_time * 1000, metadata={ "format": doc_info.format.value, "has_text_layer": doc_info.has_text_layer, "is_scanned": doc_info.is_scanned, "render_dpi": self.config.render_dpi, } ) def _process_page( self, page_image: np.ndarray, page_number: int, doc_id: str, sequence_start: int, ) -> Tuple[PageResult, List[DocumentChunk]]: """Process a single page.""" height, width = page_image.shape[:2] chunks: List[DocumentChunk] = [] sequence_index = sequence_start # Run OCR ocr_result: Optional[OCRResult] = None if self.config.ocr_enabled and self.ocr_model: ocr_result = self.ocr_model.recognize(page_image) # Run layout detection layout_result: Optional[LayoutResult] = None if self.config.layout_enabled and self.layout_model: layout_result = self.layout_model.detect(page_image) # Process layout regions or fall back to OCR blocks if layout_result and layout_result.regions: for region in layout_result.get_ordered_regions(): region_chunks = self._process_region( page_image=page_image, region=region, ocr_result=ocr_result, page_number=page_number, doc_id=doc_id, sequence_index=sequence_index, image_size=(width, height), ) chunks.extend(region_chunks) sequence_index += len(region_chunks) elif ocr_result and ocr_result.blocks: # Fall back to OCR blocks for block in ocr_result.blocks: chunk = self._create_text_chunk( text=block.text, bbox=block.bbox, confidence=block.confidence, page_number=page_number, doc_id=doc_id, sequence_index=sequence_index, chunk_type=ChunkType.PARAGRAPH, ) chunks.append(chunk) sequence_index += 1 # Merge adjacent text chunks if enabled if self.config.merge_adjacent_text: chunks = self._merge_adjacent_chunks(chunks) # Build page result page_result = PageResult( page_number=page_number, width=width, height=height, chunks=[c.chunk_id for c in chunks], ocr_confidence=ocr_result.confidence if ocr_result else None, ) return page_result, chunks def _process_region( self, page_image: np.ndarray, region: LayoutRegion, ocr_result: Optional[OCRResult], page_number: int, doc_id: str, sequence_index: int, image_size: Tuple[int, int], ) -> List[DocumentChunk]: """Process a single layout region.""" chunks: List[DocumentChunk] = [] width, height = image_size # Normalize bbox if needed bbox = region.bbox if not bbox.normalized: bbox = bbox.to_normalized(width, height) # Handle different region types if region.region_type == LayoutRegionType.TABLE: table_chunk = self._extract_table( page_image=page_image, region=region, page_number=page_number, doc_id=doc_id, sequence_index=sequence_index, ) if table_chunk: chunks.append(table_chunk) elif region.region_type in {LayoutRegionType.CHART, LayoutRegionType.FIGURE}: # Try chart extraction first chart_chunk = self._extract_chart( page_image=page_image, region=region, page_number=page_number, doc_id=doc_id, sequence_index=sequence_index, ) if chart_chunk: chunks.append(chart_chunk) else: # Fall back to figure chunk text = self._get_region_text(region, ocr_result) or "[Figure]" chunk = self._create_text_chunk( text=text, bbox=bbox, confidence=region.confidence, page_number=page_number, doc_id=doc_id, sequence_index=sequence_index, chunk_type=ChunkType.FIGURE, ) chunks.append(chunk) else: # Text-based region text = self._get_region_text(region, ocr_result) if text and len(text.strip()) >= self.config.min_chunk_chars: chunk_type = region.region_type.to_chunk_type() chunk = self._create_text_chunk( text=text, bbox=bbox, confidence=region.confidence, page_number=page_number, doc_id=doc_id, sequence_index=sequence_index, chunk_type=chunk_type, ) chunks.append(chunk) return chunks def _get_region_text( self, region: LayoutRegion, ocr_result: Optional[OCRResult], ) -> str: """Get text for a region from OCR result.""" if not ocr_result: return "" return ocr_result.get_text_in_region(region.bbox, threshold=0.3) def _extract_table( self, page_image: np.ndarray, region: LayoutRegion, page_number: int, doc_id: str, sequence_index: int, ) -> Optional[TableChunk]: """Extract table structure from a region.""" if not self.config.table_extraction_enabled or not self.table_model: return None try: table_structure = self.table_model.extract_structure( page_image, region.bbox ) if table_structure.num_rows > 0: return table_structure.to_table_chunk( doc_id=doc_id, page=page_number, sequence_index=sequence_index, ) except Exception as e: logger.warning(f"Table extraction failed: {e}") return None def _extract_chart( self, page_image: np.ndarray, region: LayoutRegion, page_number: int, doc_id: str, sequence_index: int, ) -> Optional[ChartChunk]: """Extract chart data from a region.""" if not self.config.chart_extraction_enabled or not self.chart_model: return None try: chart_structure = self.chart_model.extract_chart( page_image, region.bbox ) if chart_structure.chart_type.value != "unknown": return chart_structure.to_chart_chunk( doc_id=doc_id, page=page_number, sequence_index=sequence_index, ) except Exception as e: logger.warning(f"Chart extraction failed: {e}") return None def _create_text_chunk( self, text: str, bbox: BoundingBox, confidence: float, page_number: int, doc_id: str, sequence_index: int, chunk_type: ChunkType, ) -> DocumentChunk: """Create a text chunk.""" chunk_id = DocumentChunk.generate_chunk_id( doc_id=doc_id, page=page_number, bbox=bbox, chunk_type_str=chunk_type.value, ) return DocumentChunk( chunk_id=chunk_id, doc_id=doc_id, chunk_type=chunk_type, text=text, page=page_number, bbox=bbox, confidence=confidence, sequence_index=sequence_index, ) def _merge_adjacent_chunks( self, chunks: List[DocumentChunk], ) -> List[DocumentChunk]: """Merge adjacent text chunks of the same type.""" if len(chunks) <= 1: return chunks merged: List[DocumentChunk] = [] current: Optional[DocumentChunk] = None mergeable_types = { ChunkType.TEXT, ChunkType.PARAGRAPH, } for chunk in chunks: if current is None: current = chunk continue # Check if can merge can_merge = ( current.chunk_type in mergeable_types and chunk.chunk_type in mergeable_types and current.chunk_type == chunk.chunk_type and current.page == chunk.page and self._chunks_adjacent(current, chunk) ) if can_merge: # Merge chunks merged_text = current.text + "\n" + chunk.text if len(merged_text) <= self.config.max_chunk_chars: current = DocumentChunk( chunk_id=current.chunk_id, # Keep first ID doc_id=current.doc_id, chunk_type=current.chunk_type, text=merged_text, page=current.page, bbox=self._merge_bboxes(current.bbox, chunk.bbox), confidence=min(current.confidence, chunk.confidence), sequence_index=current.sequence_index, ) else: merged.append(current) current = chunk else: merged.append(current) current = chunk if current: merged.append(current) return merged def _chunks_adjacent( self, chunk1: DocumentChunk, chunk2: DocumentChunk, gap_threshold: float = 0.05, ) -> bool: """Check if two chunks are vertically adjacent.""" # Check vertical gap gap = chunk2.bbox.y_min - chunk1.bbox.y_max return 0 <= gap <= gap_threshold def _merge_bboxes( self, bbox1: BoundingBox, bbox2: BoundingBox, ) -> BoundingBox: """Merge two bounding boxes.""" return BoundingBox( x_min=min(bbox1.x_min, bbox2.x_min), y_min=min(bbox1.y_min, bbox2.y_min), x_max=max(bbox1.x_max, bbox2.x_max), y_max=max(bbox1.y_max, bbox2.y_max), normalized=bbox1.normalized, ) def _generate_page_markdown( self, chunks: List[DocumentChunk], ) -> str: """Generate markdown for page chunks.""" lines: List[str] = [] for chunk in chunks: # Add anchor comment lines.append(f"") # Format based on chunk type if chunk.chunk_type == ChunkType.TITLE: lines.append(f"# {chunk.text}") elif chunk.chunk_type == ChunkType.HEADING: lines.append(f"## {chunk.text}") elif chunk.chunk_type == ChunkType.TABLE: if isinstance(chunk, TableChunk): lines.append(chunk.to_markdown()) else: lines.append(chunk.text) elif chunk.chunk_type == ChunkType.LIST: # Format as list items for item in chunk.text.split("\n"): if item.strip(): lines.append(f"- {item.strip()}") elif chunk.chunk_type == ChunkType.CODE: lines.append(f"```\n{chunk.text}\n```") elif chunk.chunk_type == ChunkType.FIGURE: lines.append(f"[Figure: {chunk.text}]") elif chunk.chunk_type == ChunkType.CHART: if isinstance(chunk, ChartChunk): lines.append(f"[Chart: {chunk.title or chunk.chart_type}]") lines.append(chunk.text) else: lines.append(f"[Chart: {chunk.text}]") else: lines.append(chunk.text) lines.append("") # Blank line between chunks return "\n".join(lines) def parse_document( path: Union[str, Path], config: Optional[ParserConfig] = None, ) -> ParseResult: """ Convenience function to parse a document. Args: path: Path to document config: Optional parser configuration Returns: ParseResult with extracted chunks """ parser = DocumentParser(config=config) return parser.parse(path)