| """ |
| Reading Order Reconstructor Implementation |
| |
| Rule-based reading order reconstruction for document elements. |
| """ |
|
|
| import time |
| from typing import List, Optional, Dict, Any, Tuple |
| from collections import defaultdict |
| import numpy as np |
| from loguru import logger |
|
|
| from .base import ReadingOrderReconstructor, ReadingOrderConfig, ReadingOrderResult |
| from ..schemas.core import BoundingBox, LayoutRegion, OCRRegion, LayoutType |
|
|
|
|
| class RuleBasedReadingOrder(ReadingOrderReconstructor): |
| """ |
| Rule-based reading order reconstruction. |
| |
| Handles: |
| - Single column documents |
| - Multi-column layouts |
| - Mixed layouts (text + figures) |
| - Headers and footers |
| """ |
|
|
| def initialize(self): |
| """Initialize (no model loading needed).""" |
| self._initialized = True |
| logger.info("Initialized rule-based reading order reconstructor") |
|
|
| def reconstruct( |
| self, |
| regions: List[Any], |
| layout_regions: Optional[List[LayoutRegion]] = None, |
| page_width: Optional[int] = None, |
| page_height: Optional[int] = None, |
| ) -> ReadingOrderResult: |
| """Reconstruct reading order using rule-based approach.""" |
| if not self._initialized: |
| self.initialize() |
|
|
| start_time = time.time() |
|
|
| if not regions: |
| return ReadingOrderResult(success=True) |
|
|
| |
| bboxes = self._extract_bboxes(regions) |
| if not bboxes: |
| return ReadingOrderResult(success=True) |
|
|
| |
| if page_width is None: |
| page_width = max(b.x_max for b in bboxes) |
| if page_height is None: |
| page_height = max(b.y_max for b in bboxes) |
|
|
| |
| num_columns, column_assignments = self._detect_columns(bboxes, page_width) |
|
|
| |
| if num_columns == 1: |
| order = self._sort_single_column(bboxes) |
| else: |
| order = self._sort_multi_column(bboxes, column_assignments, num_columns) |
|
|
| |
| if self.config.header_footer_separate and layout_regions: |
| order = self._adjust_for_headers_footers( |
| order, regions, layout_regions, page_height |
| ) |
|
|
| processing_time = (time.time() - start_time) * 1000 |
|
|
| return ReadingOrderResult( |
| order=order, |
| ordered_regions=[regions[i] for i in order], |
| num_columns=num_columns, |
| column_assignments=column_assignments, |
| processing_time_ms=processing_time, |
| success=True, |
| ) |
|
|
| def _extract_bboxes(self, regions: List[Any]) -> List[BoundingBox]: |
| """Extract bounding boxes from regions.""" |
| bboxes = [] |
| for r in regions: |
| if hasattr(r, 'bbox'): |
| bboxes.append(r.bbox) |
| elif isinstance(r, BoundingBox): |
| bboxes.append(r) |
| return bboxes |
|
|
| def _detect_columns( |
| self, |
| bboxes: List[BoundingBox], |
| page_width: int, |
| ) -> Tuple[int, Dict[int, int]]: |
| """Detect column structure in the document.""" |
| if not self.config.detect_columns or len(bboxes) < 4: |
| return 1, {i: 0 for i in range(len(bboxes))} |
|
|
| |
| x_centers = [(b.x_min + b.x_max) / 2 for b in bboxes] |
|
|
| |
| min_gap = page_width * self.config.column_gap_threshold |
| sorted_centers = sorted(set(x_centers)) |
|
|
| |
| gaps = [] |
| for i in range(len(sorted_centers) - 1): |
| gap = sorted_centers[i + 1] - sorted_centers[i] |
| if gap > min_gap: |
| gaps.append((sorted_centers[i] + sorted_centers[i + 1]) / 2) |
|
|
| |
| num_columns = min(len(gaps) + 1, self.config.max_columns) |
|
|
| if num_columns == 1: |
| return 1, {i: 0 for i in range(len(bboxes))} |
|
|
| |
| column_boundaries = [0] + sorted(gaps[:num_columns - 1]) + [page_width] |
| assignments = {} |
|
|
| for i, bbox in enumerate(bboxes): |
| center = (bbox.x_min + bbox.x_max) / 2 |
| for col in range(num_columns): |
| if column_boundaries[col] <= center < column_boundaries[col + 1]: |
| assignments[i] = col |
| break |
| else: |
| assignments[i] = num_columns - 1 |
|
|
| return num_columns, assignments |
|
|
| def _sort_single_column(self, bboxes: List[BoundingBox]) -> List[int]: |
| """Sort regions in single-column layout.""" |
| |
| indexed = list(enumerate(bboxes)) |
|
|
| if self.config.vertical_priority: |
| |
| indexed.sort(key=lambda x: (x[1].y_min, x[1].x_min)) |
| else: |
| |
| indexed.sort(key=lambda x: (x[1].x_min, x[1].y_min)) |
|
|
| if self.config.reading_direction == "rtl": |
| |
| |
| rows = self._group_by_y(indexed) |
| result = [] |
| for row in rows: |
| row.reverse() |
| result.extend([i for i, _ in row]) |
| return result |
|
|
| return [i for i, _ in indexed] |
|
|
| def _sort_multi_column( |
| self, |
| bboxes: List[BoundingBox], |
| column_assignments: Dict[int, int], |
| num_columns: int, |
| ) -> List[int]: |
| """Sort regions in multi-column layout.""" |
| |
| columns = defaultdict(list) |
| for i, bbox in enumerate(bboxes): |
| col = column_assignments.get(i, 0) |
| columns[col].append((i, bbox)) |
|
|
| |
| for col in columns: |
| columns[col].sort(key=lambda x: (x[1].y_min, x[1].x_min)) |
|
|
| |
| result = [] |
| if self.config.reading_direction == "ltr": |
| col_order = range(num_columns) |
| else: |
| col_order = range(num_columns - 1, -1, -1) |
|
|
| for col in col_order: |
| result.extend([i for i, _ in columns.get(col, [])]) |
|
|
| return result |
|
|
| def _group_by_y( |
| self, |
| indexed_bboxes: List[Tuple[int, BoundingBox]], |
| tolerance: float = 10.0, |
| ) -> List[List[Tuple[int, BoundingBox]]]: |
| """Group bboxes into rows by y position.""" |
| if not indexed_bboxes: |
| return [] |
|
|
| |
| sorted_items = sorted(indexed_bboxes, key=lambda x: x[1].y_min) |
|
|
| rows = [] |
| current_row = [sorted_items[0]] |
| current_y = sorted_items[0][1].y_min |
|
|
| for item in sorted_items[1:]: |
| if abs(item[1].y_min - current_y) <= tolerance: |
| current_row.append(item) |
| else: |
| |
| current_row.sort(key=lambda x: x[1].x_min) |
| rows.append(current_row) |
| current_row = [item] |
| current_y = item[1].y_min |
|
|
| if current_row: |
| current_row.sort(key=lambda x: x[1].x_min) |
| rows.append(current_row) |
|
|
| return rows |
|
|
| def _adjust_for_headers_footers( |
| self, |
| order: List[int], |
| regions: List[Any], |
| layout_regions: List[LayoutRegion], |
| page_height: int, |
| ) -> List[int]: |
| """Adjust order to put headers first and footers last.""" |
| |
| header_indices = set() |
| footer_indices = set() |
|
|
| header_y_threshold = page_height * 0.1 |
| footer_y_threshold = page_height * 0.9 |
|
|
| for layout_r in layout_regions: |
| if layout_r.type == LayoutType.HEADER: |
| for i, r in enumerate(regions): |
| if hasattr(r, 'bbox') and layout_r.bbox.contains(r.bbox): |
| header_indices.add(i) |
| elif layout_r.type == LayoutType.FOOTER: |
| for i, r in enumerate(regions): |
| if hasattr(r, 'bbox') and layout_r.bbox.contains(r.bbox): |
| footer_indices.add(i) |
|
|
| |
| for i, r in enumerate(regions): |
| if hasattr(r, 'bbox'): |
| if r.bbox.y_max < header_y_threshold: |
| header_indices.add(i) |
| elif r.bbox.y_min > footer_y_threshold: |
| footer_indices.add(i) |
|
|
| |
| headers = [i for i in order if i in header_indices] |
| footers = [i for i in order if i in footer_indices] |
| body = [i for i in order if i not in header_indices and i not in footer_indices] |
|
|
| return headers + body + footers |
|
|
|
|
| |
| _reading_order: Optional[ReadingOrderReconstructor] = None |
|
|
|
|
| def get_reading_order_reconstructor( |
| config: Optional[ReadingOrderConfig] = None, |
| ) -> ReadingOrderReconstructor: |
| """Get or create singleton reading order reconstructor.""" |
| global _reading_order |
| if _reading_order is None: |
| config = config or ReadingOrderConfig() |
| _reading_order = RuleBasedReadingOrder(config) |
| _reading_order.initialize() |
| return _reading_order |
|
|