MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Document Processor Pipeline
Main pipeline that orchestrates document processing:
1. Load document
2. OCR (PaddleOCR or Tesseract)
3. Layout detection
4. Reading order reconstruction
5. Semantic chunking
6. Grounding evidence
Outputs ProcessedDocument with all extracted information.
"""
import time
from pathlib import Path
from typing import List, Optional, Dict, Any, Union
from datetime import datetime
from pydantic import BaseModel, Field
from loguru import logger
import numpy as np
from ..schemas.core import (
ProcessedDocument,
DocumentMetadata,
DocumentChunk,
OCRRegion,
LayoutRegion,
)
from ..io.loader import load_document, LoadedDocument
from ..io.cache import get_document_cache
from ..ocr import get_ocr_engine, OCRConfig, OCRResult
from ..layout import get_layout_detector, LayoutConfig, LayoutResult
from ..reading_order import get_reading_order_reconstructor, ReadingOrderConfig
from ..chunking import get_document_chunker, ChunkerConfig
class PipelineConfig(BaseModel):
"""Configuration for the document processing pipeline."""
# Component configs
ocr: OCRConfig = Field(default_factory=OCRConfig)
layout: LayoutConfig = Field(default_factory=LayoutConfig)
reading_order: ReadingOrderConfig = Field(default_factory=ReadingOrderConfig)
chunking: ChunkerConfig = Field(default_factory=ChunkerConfig)
# Pipeline behavior
render_dpi: int = Field(default=300, ge=72, description="DPI for PDF rendering")
enable_caching: bool = Field(default=True, description="Cache rendered pages")
parallel_pages: bool = Field(default=False, description="Process pages in parallel")
max_pages: Optional[int] = Field(default=None, description="Max pages to process")
# Output options
include_ocr_regions: bool = Field(default=True)
include_layout_regions: bool = Field(default=True)
generate_full_text: bool = Field(default=True)
class DocumentProcessor:
"""
Main document processing pipeline.
Provides end-to-end document processing with:
- Multi-format support (PDF, images)
- Pluggable OCR engines
- Layout detection
- Reading order reconstruction
- Semantic chunking
"""
def __init__(self, config: Optional[PipelineConfig] = None):
"""
Initialize document processor.
Args:
config: Pipeline configuration
"""
self.config = config or PipelineConfig()
self._initialized = False
# Component instances (lazy initialization)
self._ocr_engine = None
self._layout_detector = None
self._reading_order = None
self._chunker = None
def initialize(self):
"""Initialize all pipeline components."""
if self._initialized:
return
logger.info("Initializing document processing pipeline...")
# Initialize OCR
self._ocr_engine = get_ocr_engine(
engine_type=self.config.ocr.engine,
config=self.config.ocr,
)
# Initialize layout detector (create new instance to respect config)
from ..layout.detector import create_layout_detector
self._layout_detector = create_layout_detector(self.config.layout, initialize=True)
# Initialize reading order
self._reading_order = get_reading_order_reconstructor(self.config.reading_order)
# Initialize chunker
self._chunker = get_document_chunker(self.config.chunking)
self._initialized = True
logger.info("Document processing pipeline initialized")
def process(
self,
source: Union[str, Path],
document_id: Optional[str] = None,
) -> ProcessedDocument:
"""
Process a document through the full pipeline.
Args:
source: Path to document
document_id: Optional document ID
Returns:
ProcessedDocument with all extracted information
"""
if not self._initialized:
self.initialize()
start_time = time.time()
source_path = str(Path(source).absolute())
logger.info(f"Processing document: {source_path}")
try:
# Step 1: Load document
loaded_doc = load_document(source_path, document_id)
document_id = loaded_doc.document_id
# Determine pages to process
num_pages = loaded_doc.num_pages
if self.config.max_pages:
num_pages = min(num_pages, self.config.max_pages)
logger.info(f"Document loaded: {num_pages} pages")
# Step 2: Process each page
all_ocr_regions: List[OCRRegion] = []
all_layout_regions: List[LayoutRegion] = []
page_dimensions = []
for page_num in range(num_pages):
logger.debug(f"Processing page {page_num + 1}/{num_pages}")
# Render page
page_image = self._get_page_image(loaded_doc, page_num)
height, width = page_image.shape[:2]
page_dimensions.append((width, height))
# OCR
ocr_result = self._ocr_engine.recognize(page_image, page_num)
if ocr_result.success:
all_ocr_regions.extend(ocr_result.regions)
# Layout detection
layout_result = self._layout_detector.detect(
page_image,
page_num,
ocr_result.regions if ocr_result.success else None,
)
if layout_result.success:
all_layout_regions.extend(layout_result.regions)
# Step 3: Reading order reconstruction
if all_ocr_regions:
reading_result = self._reading_order.reconstruct(
all_ocr_regions,
all_layout_regions,
page_width=page_dimensions[0][0] if page_dimensions else None,
page_height=page_dimensions[0][1] if page_dimensions else None,
)
# Reorder OCR regions
if reading_result.success and reading_result.order:
all_ocr_regions = [all_ocr_regions[i] for i in reading_result.order]
# Step 4: Chunking
chunks = self._chunker.create_chunks(
all_ocr_regions,
all_layout_regions if self.config.include_layout_regions else None,
document_id,
source_path,
)
# Step 5: Generate full text
full_text = ""
if self.config.generate_full_text and all_ocr_regions:
full_text = self._generate_full_text(all_ocr_regions)
# Calculate quality metrics
ocr_confidence_avg = None
if all_ocr_regions:
ocr_confidence_avg = sum(r.confidence for r in all_ocr_regions) / len(all_ocr_regions)
layout_confidence_avg = None
if all_layout_regions:
layout_confidence_avg = sum(r.confidence for r in all_layout_regions) / len(all_layout_regions)
# Build metadata
metadata = DocumentMetadata(
document_id=document_id,
source_path=source_path,
filename=loaded_doc.filename,
file_type=loaded_doc.file_type,
file_size_bytes=loaded_doc.file_size_bytes,
num_pages=loaded_doc.num_pages,
page_dimensions=page_dimensions,
processed_at=datetime.utcnow(),
total_chunks=len(chunks),
total_characters=len(full_text),
ocr_confidence_avg=ocr_confidence_avg,
layout_confidence_avg=layout_confidence_avg,
)
# Build result
result = ProcessedDocument(
metadata=metadata,
ocr_regions=all_ocr_regions if self.config.include_ocr_regions else [],
layout_regions=all_layout_regions if self.config.include_layout_regions else [],
chunks=chunks,
full_text=full_text,
status="completed",
)
processing_time = time.time() - start_time
logger.info(
f"Document processed in {processing_time:.2f}s: "
f"{len(all_ocr_regions)} OCR regions, "
f"{len(all_layout_regions)} layout regions, "
f"{len(chunks)} chunks"
)
return result
except Exception as e:
logger.error(f"Document processing failed: {e}")
raise
finally:
# Clean up
if 'loaded_doc' in locals():
loaded_doc.close()
def _get_page_image(
self,
doc: LoadedDocument,
page_num: int,
) -> np.ndarray:
"""Get page image, using cache if enabled."""
if self.config.enable_caching:
cache = get_document_cache()
cached = cache.get(doc.document_id, page_num, self.config.render_dpi)
if cached is not None:
return cached
# Render page
image = doc.get_page_image(page_num, self.config.render_dpi)
# Cache if enabled
if self.config.enable_caching:
cache = get_document_cache()
cache.put(doc.document_id, page_num, self.config.render_dpi, image)
return image
def _generate_full_text(self, ocr_regions: List[OCRRegion]) -> str:
"""Generate full text from OCR regions in reading order."""
# Group by page
by_page: Dict[int, List[OCRRegion]] = {}
for r in ocr_regions:
if r.page not in by_page:
by_page[r.page] = []
by_page[r.page].append(r)
# Build text page by page
pages_text = []
for page_num in sorted(by_page.keys()):
page_regions = by_page[page_num]
page_text = " ".join(r.text for r in page_regions)
pages_text.append(page_text)
return "\n\n".join(pages_text)
def process_batch(
self,
sources: List[Union[str, Path]],
) -> List[ProcessedDocument]:
"""
Process multiple documents.
Args:
sources: List of document paths
Returns:
List of ProcessedDocument
"""
results = []
for source in sources:
try:
result = self.process(source)
results.append(result)
except Exception as e:
logger.error(f"Failed to process {source}: {e}")
# Could append an error result here
return results
# Global instance and factory functions
_document_processor: Optional[DocumentProcessor] = None
def get_document_processor(
config: Optional[PipelineConfig] = None,
) -> DocumentProcessor:
"""Get or create singleton document processor."""
global _document_processor
if _document_processor is None:
_document_processor = DocumentProcessor(config)
_document_processor.initialize()
return _document_processor
def process_document(
source: Union[str, Path],
document_id: Optional[str] = None,
config: Optional[PipelineConfig] = None,
) -> ProcessedDocument:
"""
Convenience function to process a document.
Args:
source: Document path
document_id: Optional document ID
config: Optional pipeline configuration
Returns:
ProcessedDocument
"""
processor = get_document_processor(config)
return processor.process(source, document_id)