|
|
""" |
|
|
Document Processor Pipeline |
|
|
|
|
|
Main pipeline that orchestrates document processing: |
|
|
1. Load document |
|
|
2. OCR (PaddleOCR or Tesseract) |
|
|
3. Layout detection |
|
|
4. Reading order reconstruction |
|
|
5. Semantic chunking |
|
|
6. Grounding evidence |
|
|
|
|
|
Outputs ProcessedDocument with all extracted information. |
|
|
""" |
|
|
|
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import List, Optional, Dict, Any, Union |
|
|
from datetime import datetime |
|
|
from pydantic import BaseModel, Field |
|
|
from loguru import logger |
|
|
import numpy as np |
|
|
|
|
|
from ..schemas.core import ( |
|
|
ProcessedDocument, |
|
|
DocumentMetadata, |
|
|
DocumentChunk, |
|
|
OCRRegion, |
|
|
LayoutRegion, |
|
|
) |
|
|
from ..io.loader import load_document, LoadedDocument |
|
|
from ..io.cache import get_document_cache |
|
|
from ..ocr import get_ocr_engine, OCRConfig, OCRResult |
|
|
from ..layout import get_layout_detector, LayoutConfig, LayoutResult |
|
|
from ..reading_order import get_reading_order_reconstructor, ReadingOrderConfig |
|
|
from ..chunking import get_document_chunker, ChunkerConfig |
|
|
|
|
|
|
|
|
class PipelineConfig(BaseModel): |
|
|
"""Configuration for the document processing pipeline.""" |
|
|
|
|
|
ocr: OCRConfig = Field(default_factory=OCRConfig) |
|
|
layout: LayoutConfig = Field(default_factory=LayoutConfig) |
|
|
reading_order: ReadingOrderConfig = Field(default_factory=ReadingOrderConfig) |
|
|
chunking: ChunkerConfig = Field(default_factory=ChunkerConfig) |
|
|
|
|
|
|
|
|
render_dpi: int = Field(default=300, ge=72, description="DPI for PDF rendering") |
|
|
enable_caching: bool = Field(default=True, description="Cache rendered pages") |
|
|
parallel_pages: bool = Field(default=False, description="Process pages in parallel") |
|
|
max_pages: Optional[int] = Field(default=None, description="Max pages to process") |
|
|
|
|
|
|
|
|
include_ocr_regions: bool = Field(default=True) |
|
|
include_layout_regions: bool = Field(default=True) |
|
|
generate_full_text: bool = Field(default=True) |
|
|
|
|
|
|
|
|
class DocumentProcessor: |
|
|
""" |
|
|
Main document processing pipeline. |
|
|
|
|
|
Provides end-to-end document processing with: |
|
|
- Multi-format support (PDF, images) |
|
|
- Pluggable OCR engines |
|
|
- Layout detection |
|
|
- Reading order reconstruction |
|
|
- Semantic chunking |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[PipelineConfig] = None): |
|
|
""" |
|
|
Initialize document processor. |
|
|
|
|
|
Args: |
|
|
config: Pipeline configuration |
|
|
""" |
|
|
self.config = config or PipelineConfig() |
|
|
self._initialized = False |
|
|
|
|
|
|
|
|
self._ocr_engine = None |
|
|
self._layout_detector = None |
|
|
self._reading_order = None |
|
|
self._chunker = None |
|
|
|
|
|
def initialize(self): |
|
|
"""Initialize all pipeline components.""" |
|
|
if self._initialized: |
|
|
return |
|
|
|
|
|
logger.info("Initializing document processing pipeline...") |
|
|
|
|
|
|
|
|
self._ocr_engine = get_ocr_engine( |
|
|
engine_type=self.config.ocr.engine, |
|
|
config=self.config.ocr, |
|
|
) |
|
|
|
|
|
|
|
|
from ..layout.detector import create_layout_detector |
|
|
self._layout_detector = create_layout_detector(self.config.layout, initialize=True) |
|
|
|
|
|
|
|
|
self._reading_order = get_reading_order_reconstructor(self.config.reading_order) |
|
|
|
|
|
|
|
|
self._chunker = get_document_chunker(self.config.chunking) |
|
|
|
|
|
self._initialized = True |
|
|
logger.info("Document processing pipeline initialized") |
|
|
|
|
|
def process( |
|
|
self, |
|
|
source: Union[str, Path], |
|
|
document_id: Optional[str] = None, |
|
|
) -> ProcessedDocument: |
|
|
""" |
|
|
Process a document through the full pipeline. |
|
|
|
|
|
Args: |
|
|
source: Path to document |
|
|
document_id: Optional document ID |
|
|
|
|
|
Returns: |
|
|
ProcessedDocument with all extracted information |
|
|
""" |
|
|
if not self._initialized: |
|
|
self.initialize() |
|
|
|
|
|
start_time = time.time() |
|
|
source_path = str(Path(source).absolute()) |
|
|
|
|
|
logger.info(f"Processing document: {source_path}") |
|
|
|
|
|
try: |
|
|
|
|
|
loaded_doc = load_document(source_path, document_id) |
|
|
document_id = loaded_doc.document_id |
|
|
|
|
|
|
|
|
num_pages = loaded_doc.num_pages |
|
|
if self.config.max_pages: |
|
|
num_pages = min(num_pages, self.config.max_pages) |
|
|
|
|
|
logger.info(f"Document loaded: {num_pages} pages") |
|
|
|
|
|
|
|
|
all_ocr_regions: List[OCRRegion] = [] |
|
|
all_layout_regions: List[LayoutRegion] = [] |
|
|
page_dimensions = [] |
|
|
|
|
|
for page_num in range(num_pages): |
|
|
logger.debug(f"Processing page {page_num + 1}/{num_pages}") |
|
|
|
|
|
|
|
|
page_image = self._get_page_image(loaded_doc, page_num) |
|
|
height, width = page_image.shape[:2] |
|
|
page_dimensions.append((width, height)) |
|
|
|
|
|
|
|
|
ocr_result = self._ocr_engine.recognize(page_image, page_num) |
|
|
if ocr_result.success: |
|
|
all_ocr_regions.extend(ocr_result.regions) |
|
|
|
|
|
|
|
|
layout_result = self._layout_detector.detect( |
|
|
page_image, |
|
|
page_num, |
|
|
ocr_result.regions if ocr_result.success else None, |
|
|
) |
|
|
if layout_result.success: |
|
|
all_layout_regions.extend(layout_result.regions) |
|
|
|
|
|
|
|
|
if all_ocr_regions: |
|
|
reading_result = self._reading_order.reconstruct( |
|
|
all_ocr_regions, |
|
|
all_layout_regions, |
|
|
page_width=page_dimensions[0][0] if page_dimensions else None, |
|
|
page_height=page_dimensions[0][1] if page_dimensions else None, |
|
|
) |
|
|
|
|
|
|
|
|
if reading_result.success and reading_result.order: |
|
|
all_ocr_regions = [all_ocr_regions[i] for i in reading_result.order] |
|
|
|
|
|
|
|
|
chunks = self._chunker.create_chunks( |
|
|
all_ocr_regions, |
|
|
all_layout_regions if self.config.include_layout_regions else None, |
|
|
document_id, |
|
|
source_path, |
|
|
) |
|
|
|
|
|
|
|
|
full_text = "" |
|
|
if self.config.generate_full_text and all_ocr_regions: |
|
|
full_text = self._generate_full_text(all_ocr_regions) |
|
|
|
|
|
|
|
|
ocr_confidence_avg = None |
|
|
if all_ocr_regions: |
|
|
ocr_confidence_avg = sum(r.confidence for r in all_ocr_regions) / len(all_ocr_regions) |
|
|
|
|
|
layout_confidence_avg = None |
|
|
if all_layout_regions: |
|
|
layout_confidence_avg = sum(r.confidence for r in all_layout_regions) / len(all_layout_regions) |
|
|
|
|
|
|
|
|
metadata = DocumentMetadata( |
|
|
document_id=document_id, |
|
|
source_path=source_path, |
|
|
filename=loaded_doc.filename, |
|
|
file_type=loaded_doc.file_type, |
|
|
file_size_bytes=loaded_doc.file_size_bytes, |
|
|
num_pages=loaded_doc.num_pages, |
|
|
page_dimensions=page_dimensions, |
|
|
processed_at=datetime.utcnow(), |
|
|
total_chunks=len(chunks), |
|
|
total_characters=len(full_text), |
|
|
ocr_confidence_avg=ocr_confidence_avg, |
|
|
layout_confidence_avg=layout_confidence_avg, |
|
|
) |
|
|
|
|
|
|
|
|
result = ProcessedDocument( |
|
|
metadata=metadata, |
|
|
ocr_regions=all_ocr_regions if self.config.include_ocr_regions else [], |
|
|
layout_regions=all_layout_regions if self.config.include_layout_regions else [], |
|
|
chunks=chunks, |
|
|
full_text=full_text, |
|
|
status="completed", |
|
|
) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
logger.info( |
|
|
f"Document processed in {processing_time:.2f}s: " |
|
|
f"{len(all_ocr_regions)} OCR regions, " |
|
|
f"{len(all_layout_regions)} layout regions, " |
|
|
f"{len(chunks)} chunks" |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Document processing failed: {e}") |
|
|
raise |
|
|
|
|
|
finally: |
|
|
|
|
|
if 'loaded_doc' in locals(): |
|
|
loaded_doc.close() |
|
|
|
|
|
def _get_page_image( |
|
|
self, |
|
|
doc: LoadedDocument, |
|
|
page_num: int, |
|
|
) -> np.ndarray: |
|
|
"""Get page image, using cache if enabled.""" |
|
|
if self.config.enable_caching: |
|
|
cache = get_document_cache() |
|
|
cached = cache.get(doc.document_id, page_num, self.config.render_dpi) |
|
|
if cached is not None: |
|
|
return cached |
|
|
|
|
|
|
|
|
image = doc.get_page_image(page_num, self.config.render_dpi) |
|
|
|
|
|
|
|
|
if self.config.enable_caching: |
|
|
cache = get_document_cache() |
|
|
cache.put(doc.document_id, page_num, self.config.render_dpi, image) |
|
|
|
|
|
return image |
|
|
|
|
|
def _generate_full_text(self, ocr_regions: List[OCRRegion]) -> str: |
|
|
"""Generate full text from OCR regions in reading order.""" |
|
|
|
|
|
by_page: Dict[int, List[OCRRegion]] = {} |
|
|
for r in ocr_regions: |
|
|
if r.page not in by_page: |
|
|
by_page[r.page] = [] |
|
|
by_page[r.page].append(r) |
|
|
|
|
|
|
|
|
pages_text = [] |
|
|
for page_num in sorted(by_page.keys()): |
|
|
page_regions = by_page[page_num] |
|
|
page_text = " ".join(r.text for r in page_regions) |
|
|
pages_text.append(page_text) |
|
|
|
|
|
return "\n\n".join(pages_text) |
|
|
|
|
|
def process_batch( |
|
|
self, |
|
|
sources: List[Union[str, Path]], |
|
|
) -> List[ProcessedDocument]: |
|
|
""" |
|
|
Process multiple documents. |
|
|
|
|
|
Args: |
|
|
sources: List of document paths |
|
|
|
|
|
Returns: |
|
|
List of ProcessedDocument |
|
|
""" |
|
|
results = [] |
|
|
for source in sources: |
|
|
try: |
|
|
result = self.process(source) |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process {source}: {e}") |
|
|
|
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
_document_processor: Optional[DocumentProcessor] = None |
|
|
|
|
|
|
|
|
def get_document_processor( |
|
|
config: Optional[PipelineConfig] = None, |
|
|
) -> DocumentProcessor: |
|
|
"""Get or create singleton document processor.""" |
|
|
global _document_processor |
|
|
if _document_processor is None: |
|
|
_document_processor = DocumentProcessor(config) |
|
|
_document_processor.initialize() |
|
|
return _document_processor |
|
|
|
|
|
|
|
|
def process_document( |
|
|
source: Union[str, Path], |
|
|
document_id: Optional[str] = None, |
|
|
config: Optional[PipelineConfig] = None, |
|
|
) -> ProcessedDocument: |
|
|
""" |
|
|
Convenience function to process a document. |
|
|
|
|
|
Args: |
|
|
source: Document path |
|
|
document_id: Optional document ID |
|
|
config: Optional pipeline configuration |
|
|
|
|
|
Returns: |
|
|
ProcessedDocument |
|
|
""" |
|
|
processor = get_document_processor(config) |
|
|
return processor.process(source, document_id) |
|
|
|