File size: 11,716 Bytes
d520909
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
"""
Document Processor Pipeline

Main pipeline that orchestrates document processing:
1. Load document
2. OCR (PaddleOCR or Tesseract)
3. Layout detection
4. Reading order reconstruction
5. Semantic chunking
6. Grounding evidence

Outputs ProcessedDocument with all extracted information.
"""

import time
from pathlib import Path
from typing import List, Optional, Dict, Any, Union
from datetime import datetime
from pydantic import BaseModel, Field
from loguru import logger
import numpy as np

from ..schemas.core import (
    ProcessedDocument,
    DocumentMetadata,
    DocumentChunk,
    OCRRegion,
    LayoutRegion,
)
from ..io.loader import load_document, LoadedDocument
from ..io.cache import get_document_cache
from ..ocr import get_ocr_engine, OCRConfig, OCRResult
from ..layout import get_layout_detector, LayoutConfig, LayoutResult
from ..reading_order import get_reading_order_reconstructor, ReadingOrderConfig
from ..chunking import get_document_chunker, ChunkerConfig


class PipelineConfig(BaseModel):
    """Configuration for the document processing pipeline."""
    # Component configs
    ocr: OCRConfig = Field(default_factory=OCRConfig)
    layout: LayoutConfig = Field(default_factory=LayoutConfig)
    reading_order: ReadingOrderConfig = Field(default_factory=ReadingOrderConfig)
    chunking: ChunkerConfig = Field(default_factory=ChunkerConfig)

    # Pipeline behavior
    render_dpi: int = Field(default=300, ge=72, description="DPI for PDF rendering")
    enable_caching: bool = Field(default=True, description="Cache rendered pages")
    parallel_pages: bool = Field(default=False, description="Process pages in parallel")
    max_pages: Optional[int] = Field(default=None, description="Max pages to process")

    # Output options
    include_ocr_regions: bool = Field(default=True)
    include_layout_regions: bool = Field(default=True)
    generate_full_text: bool = Field(default=True)


class DocumentProcessor:
    """
    Main document processing pipeline.

    Provides end-to-end document processing with:
    - Multi-format support (PDF, images)
    - Pluggable OCR engines
    - Layout detection
    - Reading order reconstruction
    - Semantic chunking
    """

    def __init__(self, config: Optional[PipelineConfig] = None):
        """
        Initialize document processor.

        Args:
            config: Pipeline configuration
        """
        self.config = config or PipelineConfig()
        self._initialized = False

        # Component instances (lazy initialization)
        self._ocr_engine = None
        self._layout_detector = None
        self._reading_order = None
        self._chunker = None

    def initialize(self):
        """Initialize all pipeline components."""
        if self._initialized:
            return

        logger.info("Initializing document processing pipeline...")

        # Initialize OCR
        self._ocr_engine = get_ocr_engine(
            engine_type=self.config.ocr.engine,
            config=self.config.ocr,
        )

        # Initialize layout detector (create new instance to respect config)
        from ..layout.detector import create_layout_detector
        self._layout_detector = create_layout_detector(self.config.layout, initialize=True)

        # Initialize reading order
        self._reading_order = get_reading_order_reconstructor(self.config.reading_order)

        # Initialize chunker
        self._chunker = get_document_chunker(self.config.chunking)

        self._initialized = True
        logger.info("Document processing pipeline initialized")

    def process(
        self,
        source: Union[str, Path],
        document_id: Optional[str] = None,
    ) -> ProcessedDocument:
        """
        Process a document through the full pipeline.

        Args:
            source: Path to document
            document_id: Optional document ID

        Returns:
            ProcessedDocument with all extracted information
        """
        if not self._initialized:
            self.initialize()

        start_time = time.time()
        source_path = str(Path(source).absolute())

        logger.info(f"Processing document: {source_path}")

        try:
            # Step 1: Load document
            loaded_doc = load_document(source_path, document_id)
            document_id = loaded_doc.document_id

            # Determine pages to process
            num_pages = loaded_doc.num_pages
            if self.config.max_pages:
                num_pages = min(num_pages, self.config.max_pages)

            logger.info(f"Document loaded: {num_pages} pages")

            # Step 2: Process each page
            all_ocr_regions: List[OCRRegion] = []
            all_layout_regions: List[LayoutRegion] = []
            page_dimensions = []

            for page_num in range(num_pages):
                logger.debug(f"Processing page {page_num + 1}/{num_pages}")

                # Render page
                page_image = self._get_page_image(loaded_doc, page_num)
                height, width = page_image.shape[:2]
                page_dimensions.append((width, height))

                # OCR
                ocr_result = self._ocr_engine.recognize(page_image, page_num)
                if ocr_result.success:
                    all_ocr_regions.extend(ocr_result.regions)

                # Layout detection
                layout_result = self._layout_detector.detect(
                    page_image,
                    page_num,
                    ocr_result.regions if ocr_result.success else None,
                )
                if layout_result.success:
                    all_layout_regions.extend(layout_result.regions)

            # Step 3: Reading order reconstruction
            if all_ocr_regions:
                reading_result = self._reading_order.reconstruct(
                    all_ocr_regions,
                    all_layout_regions,
                    page_width=page_dimensions[0][0] if page_dimensions else None,
                    page_height=page_dimensions[0][1] if page_dimensions else None,
                )

                # Reorder OCR regions
                if reading_result.success and reading_result.order:
                    all_ocr_regions = [all_ocr_regions[i] for i in reading_result.order]

            # Step 4: Chunking
            chunks = self._chunker.create_chunks(
                all_ocr_regions,
                all_layout_regions if self.config.include_layout_regions else None,
                document_id,
                source_path,
            )

            # Step 5: Generate full text
            full_text = ""
            if self.config.generate_full_text and all_ocr_regions:
                full_text = self._generate_full_text(all_ocr_regions)

            # Calculate quality metrics
            ocr_confidence_avg = None
            if all_ocr_regions:
                ocr_confidence_avg = sum(r.confidence for r in all_ocr_regions) / len(all_ocr_regions)

            layout_confidence_avg = None
            if all_layout_regions:
                layout_confidence_avg = sum(r.confidence for r in all_layout_regions) / len(all_layout_regions)

            # Build metadata
            metadata = DocumentMetadata(
                document_id=document_id,
                source_path=source_path,
                filename=loaded_doc.filename,
                file_type=loaded_doc.file_type,
                file_size_bytes=loaded_doc.file_size_bytes,
                num_pages=loaded_doc.num_pages,
                page_dimensions=page_dimensions,
                processed_at=datetime.utcnow(),
                total_chunks=len(chunks),
                total_characters=len(full_text),
                ocr_confidence_avg=ocr_confidence_avg,
                layout_confidence_avg=layout_confidence_avg,
            )

            # Build result
            result = ProcessedDocument(
                metadata=metadata,
                ocr_regions=all_ocr_regions if self.config.include_ocr_regions else [],
                layout_regions=all_layout_regions if self.config.include_layout_regions else [],
                chunks=chunks,
                full_text=full_text,
                status="completed",
            )

            processing_time = time.time() - start_time
            logger.info(
                f"Document processed in {processing_time:.2f}s: "
                f"{len(all_ocr_regions)} OCR regions, "
                f"{len(all_layout_regions)} layout regions, "
                f"{len(chunks)} chunks"
            )

            return result

        except Exception as e:
            logger.error(f"Document processing failed: {e}")
            raise

        finally:
            # Clean up
            if 'loaded_doc' in locals():
                loaded_doc.close()

    def _get_page_image(
        self,
        doc: LoadedDocument,
        page_num: int,
    ) -> np.ndarray:
        """Get page image, using cache if enabled."""
        if self.config.enable_caching:
            cache = get_document_cache()
            cached = cache.get(doc.document_id, page_num, self.config.render_dpi)
            if cached is not None:
                return cached

        # Render page
        image = doc.get_page_image(page_num, self.config.render_dpi)

        # Cache if enabled
        if self.config.enable_caching:
            cache = get_document_cache()
            cache.put(doc.document_id, page_num, self.config.render_dpi, image)

        return image

    def _generate_full_text(self, ocr_regions: List[OCRRegion]) -> str:
        """Generate full text from OCR regions in reading order."""
        # Group by page
        by_page: Dict[int, List[OCRRegion]] = {}
        for r in ocr_regions:
            if r.page not in by_page:
                by_page[r.page] = []
            by_page[r.page].append(r)

        # Build text page by page
        pages_text = []
        for page_num in sorted(by_page.keys()):
            page_regions = by_page[page_num]
            page_text = " ".join(r.text for r in page_regions)
            pages_text.append(page_text)

        return "\n\n".join(pages_text)

    def process_batch(
        self,
        sources: List[Union[str, Path]],
    ) -> List[ProcessedDocument]:
        """
        Process multiple documents.

        Args:
            sources: List of document paths

        Returns:
            List of ProcessedDocument
        """
        results = []
        for source in sources:
            try:
                result = self.process(source)
                results.append(result)
            except Exception as e:
                logger.error(f"Failed to process {source}: {e}")
                # Could append an error result here

        return results


# Global instance and factory functions
_document_processor: Optional[DocumentProcessor] = None


def get_document_processor(
    config: Optional[PipelineConfig] = None,
) -> DocumentProcessor:
    """Get or create singleton document processor."""
    global _document_processor
    if _document_processor is None:
        _document_processor = DocumentProcessor(config)
        _document_processor.initialize()
    return _document_processor


def process_document(
    source: Union[str, Path],
    document_id: Optional[str] = None,
    config: Optional[PipelineConfig] = None,
) -> ProcessedDocument:
    """
    Convenience function to process a document.

    Args:
        source: Document path
        document_id: Optional document ID
        config: Optional pipeline configuration

    Returns:
        ProcessedDocument
    """
    processor = get_document_processor(config)
    return processor.process(source, document_id)