""" Document Processor Pipeline Main pipeline that orchestrates document processing: 1. Load document 2. OCR (PaddleOCR or Tesseract) 3. Layout detection 4. Reading order reconstruction 5. Semantic chunking 6. Grounding evidence Outputs ProcessedDocument with all extracted information. """ import time from pathlib import Path from typing import List, Optional, Dict, Any, Union from datetime import datetime from pydantic import BaseModel, Field from loguru import logger import numpy as np from ..schemas.core import ( ProcessedDocument, DocumentMetadata, DocumentChunk, OCRRegion, LayoutRegion, ) from ..io.loader import load_document, LoadedDocument from ..io.cache import get_document_cache from ..ocr import get_ocr_engine, OCRConfig, OCRResult from ..layout import get_layout_detector, LayoutConfig, LayoutResult from ..reading_order import get_reading_order_reconstructor, ReadingOrderConfig from ..chunking import get_document_chunker, ChunkerConfig class PipelineConfig(BaseModel): """Configuration for the document processing pipeline.""" # Component configs ocr: OCRConfig = Field(default_factory=OCRConfig) layout: LayoutConfig = Field(default_factory=LayoutConfig) reading_order: ReadingOrderConfig = Field(default_factory=ReadingOrderConfig) chunking: ChunkerConfig = Field(default_factory=ChunkerConfig) # Pipeline behavior render_dpi: int = Field(default=300, ge=72, description="DPI for PDF rendering") enable_caching: bool = Field(default=True, description="Cache rendered pages") parallel_pages: bool = Field(default=False, description="Process pages in parallel") max_pages: Optional[int] = Field(default=None, description="Max pages to process") # Output options include_ocr_regions: bool = Field(default=True) include_layout_regions: bool = Field(default=True) generate_full_text: bool = Field(default=True) class DocumentProcessor: """ Main document processing pipeline. Provides end-to-end document processing with: - Multi-format support (PDF, images) - Pluggable OCR engines - Layout detection - Reading order reconstruction - Semantic chunking """ def __init__(self, config: Optional[PipelineConfig] = None): """ Initialize document processor. Args: config: Pipeline configuration """ self.config = config or PipelineConfig() self._initialized = False # Component instances (lazy initialization) self._ocr_engine = None self._layout_detector = None self._reading_order = None self._chunker = None def initialize(self): """Initialize all pipeline components.""" if self._initialized: return logger.info("Initializing document processing pipeline...") # Initialize OCR self._ocr_engine = get_ocr_engine( engine_type=self.config.ocr.engine, config=self.config.ocr, ) # Initialize layout detector (create new instance to respect config) from ..layout.detector import create_layout_detector self._layout_detector = create_layout_detector(self.config.layout, initialize=True) # Initialize reading order self._reading_order = get_reading_order_reconstructor(self.config.reading_order) # Initialize chunker self._chunker = get_document_chunker(self.config.chunking) self._initialized = True logger.info("Document processing pipeline initialized") def process( self, source: Union[str, Path], document_id: Optional[str] = None, ) -> ProcessedDocument: """ Process a document through the full pipeline. Args: source: Path to document document_id: Optional document ID Returns: ProcessedDocument with all extracted information """ if not self._initialized: self.initialize() start_time = time.time() source_path = str(Path(source).absolute()) logger.info(f"Processing document: {source_path}") try: # Step 1: Load document loaded_doc = load_document(source_path, document_id) document_id = loaded_doc.document_id # Determine pages to process num_pages = loaded_doc.num_pages if self.config.max_pages: num_pages = min(num_pages, self.config.max_pages) logger.info(f"Document loaded: {num_pages} pages") # Step 2: Process each page all_ocr_regions: List[OCRRegion] = [] all_layout_regions: List[LayoutRegion] = [] page_dimensions = [] for page_num in range(num_pages): logger.debug(f"Processing page {page_num + 1}/{num_pages}") # Render page page_image = self._get_page_image(loaded_doc, page_num) height, width = page_image.shape[:2] page_dimensions.append((width, height)) # OCR ocr_result = self._ocr_engine.recognize(page_image, page_num) if ocr_result.success: all_ocr_regions.extend(ocr_result.regions) # Layout detection layout_result = self._layout_detector.detect( page_image, page_num, ocr_result.regions if ocr_result.success else None, ) if layout_result.success: all_layout_regions.extend(layout_result.regions) # Step 3: Reading order reconstruction if all_ocr_regions: reading_result = self._reading_order.reconstruct( all_ocr_regions, all_layout_regions, page_width=page_dimensions[0][0] if page_dimensions else None, page_height=page_dimensions[0][1] if page_dimensions else None, ) # Reorder OCR regions if reading_result.success and reading_result.order: all_ocr_regions = [all_ocr_regions[i] for i in reading_result.order] # Step 4: Chunking chunks = self._chunker.create_chunks( all_ocr_regions, all_layout_regions if self.config.include_layout_regions else None, document_id, source_path, ) # Step 5: Generate full text full_text = "" if self.config.generate_full_text and all_ocr_regions: full_text = self._generate_full_text(all_ocr_regions) # Calculate quality metrics ocr_confidence_avg = None if all_ocr_regions: ocr_confidence_avg = sum(r.confidence for r in all_ocr_regions) / len(all_ocr_regions) layout_confidence_avg = None if all_layout_regions: layout_confidence_avg = sum(r.confidence for r in all_layout_regions) / len(all_layout_regions) # Build metadata metadata = DocumentMetadata( document_id=document_id, source_path=source_path, filename=loaded_doc.filename, file_type=loaded_doc.file_type, file_size_bytes=loaded_doc.file_size_bytes, num_pages=loaded_doc.num_pages, page_dimensions=page_dimensions, processed_at=datetime.utcnow(), total_chunks=len(chunks), total_characters=len(full_text), ocr_confidence_avg=ocr_confidence_avg, layout_confidence_avg=layout_confidence_avg, ) # Build result result = ProcessedDocument( metadata=metadata, ocr_regions=all_ocr_regions if self.config.include_ocr_regions else [], layout_regions=all_layout_regions if self.config.include_layout_regions else [], chunks=chunks, full_text=full_text, status="completed", ) processing_time = time.time() - start_time logger.info( f"Document processed in {processing_time:.2f}s: " f"{len(all_ocr_regions)} OCR regions, " f"{len(all_layout_regions)} layout regions, " f"{len(chunks)} chunks" ) return result except Exception as e: logger.error(f"Document processing failed: {e}") raise finally: # Clean up if 'loaded_doc' in locals(): loaded_doc.close() def _get_page_image( self, doc: LoadedDocument, page_num: int, ) -> np.ndarray: """Get page image, using cache if enabled.""" if self.config.enable_caching: cache = get_document_cache() cached = cache.get(doc.document_id, page_num, self.config.render_dpi) if cached is not None: return cached # Render page image = doc.get_page_image(page_num, self.config.render_dpi) # Cache if enabled if self.config.enable_caching: cache = get_document_cache() cache.put(doc.document_id, page_num, self.config.render_dpi, image) return image def _generate_full_text(self, ocr_regions: List[OCRRegion]) -> str: """Generate full text from OCR regions in reading order.""" # Group by page by_page: Dict[int, List[OCRRegion]] = {} for r in ocr_regions: if r.page not in by_page: by_page[r.page] = [] by_page[r.page].append(r) # Build text page by page pages_text = [] for page_num in sorted(by_page.keys()): page_regions = by_page[page_num] page_text = " ".join(r.text for r in page_regions) pages_text.append(page_text) return "\n\n".join(pages_text) def process_batch( self, sources: List[Union[str, Path]], ) -> List[ProcessedDocument]: """ Process multiple documents. Args: sources: List of document paths Returns: List of ProcessedDocument """ results = [] for source in sources: try: result = self.process(source) results.append(result) except Exception as e: logger.error(f"Failed to process {source}: {e}") # Could append an error result here return results # Global instance and factory functions _document_processor: Optional[DocumentProcessor] = None def get_document_processor( config: Optional[PipelineConfig] = None, ) -> DocumentProcessor: """Get or create singleton document processor.""" global _document_processor if _document_processor is None: _document_processor = DocumentProcessor(config) _document_processor.initialize() return _document_processor def process_document( source: Union[str, Path], document_id: Optional[str] = None, config: Optional[PipelineConfig] = None, ) -> ProcessedDocument: """ Convenience function to process a document. Args: source: Document path document_id: Optional document ID config: Optional pipeline configuration Returns: ProcessedDocument """ processor = get_document_processor(config) return processor.process(source, document_id)