File size: 11,716 Bytes
d520909 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
"""
Document Processor Pipeline
Main pipeline that orchestrates document processing:
1. Load document
2. OCR (PaddleOCR or Tesseract)
3. Layout detection
4. Reading order reconstruction
5. Semantic chunking
6. Grounding evidence
Outputs ProcessedDocument with all extracted information.
"""
import time
from pathlib import Path
from typing import List, Optional, Dict, Any, Union
from datetime import datetime
from pydantic import BaseModel, Field
from loguru import logger
import numpy as np
from ..schemas.core import (
ProcessedDocument,
DocumentMetadata,
DocumentChunk,
OCRRegion,
LayoutRegion,
)
from ..io.loader import load_document, LoadedDocument
from ..io.cache import get_document_cache
from ..ocr import get_ocr_engine, OCRConfig, OCRResult
from ..layout import get_layout_detector, LayoutConfig, LayoutResult
from ..reading_order import get_reading_order_reconstructor, ReadingOrderConfig
from ..chunking import get_document_chunker, ChunkerConfig
class PipelineConfig(BaseModel):
"""Configuration for the document processing pipeline."""
# Component configs
ocr: OCRConfig = Field(default_factory=OCRConfig)
layout: LayoutConfig = Field(default_factory=LayoutConfig)
reading_order: ReadingOrderConfig = Field(default_factory=ReadingOrderConfig)
chunking: ChunkerConfig = Field(default_factory=ChunkerConfig)
# Pipeline behavior
render_dpi: int = Field(default=300, ge=72, description="DPI for PDF rendering")
enable_caching: bool = Field(default=True, description="Cache rendered pages")
parallel_pages: bool = Field(default=False, description="Process pages in parallel")
max_pages: Optional[int] = Field(default=None, description="Max pages to process")
# Output options
include_ocr_regions: bool = Field(default=True)
include_layout_regions: bool = Field(default=True)
generate_full_text: bool = Field(default=True)
class DocumentProcessor:
"""
Main document processing pipeline.
Provides end-to-end document processing with:
- Multi-format support (PDF, images)
- Pluggable OCR engines
- Layout detection
- Reading order reconstruction
- Semantic chunking
"""
def __init__(self, config: Optional[PipelineConfig] = None):
"""
Initialize document processor.
Args:
config: Pipeline configuration
"""
self.config = config or PipelineConfig()
self._initialized = False
# Component instances (lazy initialization)
self._ocr_engine = None
self._layout_detector = None
self._reading_order = None
self._chunker = None
def initialize(self):
"""Initialize all pipeline components."""
if self._initialized:
return
logger.info("Initializing document processing pipeline...")
# Initialize OCR
self._ocr_engine = get_ocr_engine(
engine_type=self.config.ocr.engine,
config=self.config.ocr,
)
# Initialize layout detector (create new instance to respect config)
from ..layout.detector import create_layout_detector
self._layout_detector = create_layout_detector(self.config.layout, initialize=True)
# Initialize reading order
self._reading_order = get_reading_order_reconstructor(self.config.reading_order)
# Initialize chunker
self._chunker = get_document_chunker(self.config.chunking)
self._initialized = True
logger.info("Document processing pipeline initialized")
def process(
self,
source: Union[str, Path],
document_id: Optional[str] = None,
) -> ProcessedDocument:
"""
Process a document through the full pipeline.
Args:
source: Path to document
document_id: Optional document ID
Returns:
ProcessedDocument with all extracted information
"""
if not self._initialized:
self.initialize()
start_time = time.time()
source_path = str(Path(source).absolute())
logger.info(f"Processing document: {source_path}")
try:
# Step 1: Load document
loaded_doc = load_document(source_path, document_id)
document_id = loaded_doc.document_id
# Determine pages to process
num_pages = loaded_doc.num_pages
if self.config.max_pages:
num_pages = min(num_pages, self.config.max_pages)
logger.info(f"Document loaded: {num_pages} pages")
# Step 2: Process each page
all_ocr_regions: List[OCRRegion] = []
all_layout_regions: List[LayoutRegion] = []
page_dimensions = []
for page_num in range(num_pages):
logger.debug(f"Processing page {page_num + 1}/{num_pages}")
# Render page
page_image = self._get_page_image(loaded_doc, page_num)
height, width = page_image.shape[:2]
page_dimensions.append((width, height))
# OCR
ocr_result = self._ocr_engine.recognize(page_image, page_num)
if ocr_result.success:
all_ocr_regions.extend(ocr_result.regions)
# Layout detection
layout_result = self._layout_detector.detect(
page_image,
page_num,
ocr_result.regions if ocr_result.success else None,
)
if layout_result.success:
all_layout_regions.extend(layout_result.regions)
# Step 3: Reading order reconstruction
if all_ocr_regions:
reading_result = self._reading_order.reconstruct(
all_ocr_regions,
all_layout_regions,
page_width=page_dimensions[0][0] if page_dimensions else None,
page_height=page_dimensions[0][1] if page_dimensions else None,
)
# Reorder OCR regions
if reading_result.success and reading_result.order:
all_ocr_regions = [all_ocr_regions[i] for i in reading_result.order]
# Step 4: Chunking
chunks = self._chunker.create_chunks(
all_ocr_regions,
all_layout_regions if self.config.include_layout_regions else None,
document_id,
source_path,
)
# Step 5: Generate full text
full_text = ""
if self.config.generate_full_text and all_ocr_regions:
full_text = self._generate_full_text(all_ocr_regions)
# Calculate quality metrics
ocr_confidence_avg = None
if all_ocr_regions:
ocr_confidence_avg = sum(r.confidence for r in all_ocr_regions) / len(all_ocr_regions)
layout_confidence_avg = None
if all_layout_regions:
layout_confidence_avg = sum(r.confidence for r in all_layout_regions) / len(all_layout_regions)
# Build metadata
metadata = DocumentMetadata(
document_id=document_id,
source_path=source_path,
filename=loaded_doc.filename,
file_type=loaded_doc.file_type,
file_size_bytes=loaded_doc.file_size_bytes,
num_pages=loaded_doc.num_pages,
page_dimensions=page_dimensions,
processed_at=datetime.utcnow(),
total_chunks=len(chunks),
total_characters=len(full_text),
ocr_confidence_avg=ocr_confidence_avg,
layout_confidence_avg=layout_confidence_avg,
)
# Build result
result = ProcessedDocument(
metadata=metadata,
ocr_regions=all_ocr_regions if self.config.include_ocr_regions else [],
layout_regions=all_layout_regions if self.config.include_layout_regions else [],
chunks=chunks,
full_text=full_text,
status="completed",
)
processing_time = time.time() - start_time
logger.info(
f"Document processed in {processing_time:.2f}s: "
f"{len(all_ocr_regions)} OCR regions, "
f"{len(all_layout_regions)} layout regions, "
f"{len(chunks)} chunks"
)
return result
except Exception as e:
logger.error(f"Document processing failed: {e}")
raise
finally:
# Clean up
if 'loaded_doc' in locals():
loaded_doc.close()
def _get_page_image(
self,
doc: LoadedDocument,
page_num: int,
) -> np.ndarray:
"""Get page image, using cache if enabled."""
if self.config.enable_caching:
cache = get_document_cache()
cached = cache.get(doc.document_id, page_num, self.config.render_dpi)
if cached is not None:
return cached
# Render page
image = doc.get_page_image(page_num, self.config.render_dpi)
# Cache if enabled
if self.config.enable_caching:
cache = get_document_cache()
cache.put(doc.document_id, page_num, self.config.render_dpi, image)
return image
def _generate_full_text(self, ocr_regions: List[OCRRegion]) -> str:
"""Generate full text from OCR regions in reading order."""
# Group by page
by_page: Dict[int, List[OCRRegion]] = {}
for r in ocr_regions:
if r.page not in by_page:
by_page[r.page] = []
by_page[r.page].append(r)
# Build text page by page
pages_text = []
for page_num in sorted(by_page.keys()):
page_regions = by_page[page_num]
page_text = " ".join(r.text for r in page_regions)
pages_text.append(page_text)
return "\n\n".join(pages_text)
def process_batch(
self,
sources: List[Union[str, Path]],
) -> List[ProcessedDocument]:
"""
Process multiple documents.
Args:
sources: List of document paths
Returns:
List of ProcessedDocument
"""
results = []
for source in sources:
try:
result = self.process(source)
results.append(result)
except Exception as e:
logger.error(f"Failed to process {source}: {e}")
# Could append an error result here
return results
# Global instance and factory functions
_document_processor: Optional[DocumentProcessor] = None
def get_document_processor(
config: Optional[PipelineConfig] = None,
) -> DocumentProcessor:
"""Get or create singleton document processor."""
global _document_processor
if _document_processor is None:
_document_processor = DocumentProcessor(config)
_document_processor.initialize()
return _document_processor
def process_document(
source: Union[str, Path],
document_id: Optional[str] = None,
config: Optional[PipelineConfig] = None,
) -> ProcessedDocument:
"""
Convenience function to process a document.
Args:
source: Document path
document_id: Optional document ID
config: Optional pipeline configuration
Returns:
ProcessedDocument
"""
processor = get_document_processor(config)
return processor.process(source, document_id)
|