File size: 5,110 Bytes
e2ec8a2
 
 
 
 
 
 
 
 
 
 
bbbfba8
 
e2ec8a2
 
 
bbbfba8
 
 
e2ec8a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bbbfba8
 
 
 
e2ec8a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Structural validator β€” checks internal consistency of a CanonicalDocument.

Checks:
  - ID uniqueness across the entire document
  - reading_order references existing region IDs
  - bbox containment: word βŠ‚ line βŠ‚ region βŠ‚ page (with tolerance)
  - spatial ordering: words in a line, lines in a region
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from src.app.domain.errors import Severity, ValidationEntry, ValidationReport
from src.app.geometry.bbox import contains

if TYPE_CHECKING:
    from src.app.domain.models import CanonicalDocument

VALIDATOR_NAME = "structural"


def validate_structure(
    doc: CanonicalDocument,
    *,
    bbox_tolerance: float = 5.0,
) -> ValidationReport:
    """Run all structural checks on a CanonicalDocument."""
    report = ValidationReport()
    _check_id_uniqueness(doc, report)
    _check_reading_order(doc, report)
    _check_bbox_containment(doc, report, bbox_tolerance)
    return report


def _check_id_uniqueness(doc: CanonicalDocument, report: ValidationReport) -> None:
    """Every ID in the document must be unique."""
    seen: dict[str, str] = {}  # id β†’ first path
    for pi, page in enumerate(doc.pages):
        _register_id(page.id, f"pages[{pi}]", seen, report)
        for ri, region in enumerate(page.text_regions):
            rpath = f"pages[{pi}].text_regions[{ri}]"
            _register_id(region.id, rpath, seen, report)
            for li, line in enumerate(region.lines):
                lpath = f"{rpath}.lines[{li}]"
                _register_id(line.id, lpath, seen, report)
                for wi, word in enumerate(line.words):
                    wpath = f"{lpath}.words[{wi}]"
                    _register_id(word.id, wpath, seen, report)
        for ni, ntr in enumerate(page.non_text_regions):
            npath = f"pages[{pi}].non_text_regions[{ni}]"
            _register_id(ntr.id, npath, seen, report)


def _register_id(
    node_id: str, path: str, seen: dict[str, str], report: ValidationReport
) -> None:
    if node_id in seen:
        report.add(ValidationEntry(
            validator=VALIDATOR_NAME,
            severity=Severity.ERROR,
            path=path,
            message=f"Duplicate ID '{node_id}', first seen at {seen[node_id]}",
            code="duplicate_id",
        ))
    else:
        seen[node_id] = path


def _check_reading_order(doc: CanonicalDocument, report: ValidationReport) -> None:
    """reading_order entries must reference existing region IDs."""
    for pi, page in enumerate(doc.pages):
        region_ids = {r.id for r in page.text_regions}
        for idx, ref_id in enumerate(page.reading_order):
            if ref_id not in region_ids:
                report.add(ValidationEntry(
                    validator=VALIDATOR_NAME,
                    severity=Severity.ERROR,
                    path=f"pages[{pi}].reading_order[{idx}]",
                    message=f"reading_order references unknown region ID '{ref_id}'",
                    code="invalid_reading_order_ref",
                ))


def _check_bbox_containment(
    doc: CanonicalDocument, report: ValidationReport, tolerance: float
) -> None:
    """Check that child bboxes are contained within parent bboxes."""
    for pi, page in enumerate(doc.pages):
        page_bbox = (0.0, 0.0, page.width, page.height)

        for ri, region in enumerate(page.text_regions):
            rpath = f"pages[{pi}].text_regions[{ri}]"

            if not contains(page_bbox, region.geometry.bbox, tolerance):
                report.add(ValidationEntry(
                    validator=VALIDATOR_NAME,
                    severity=Severity.WARNING,
                    path=rpath,
                    message=(
                        f"Region bbox {region.geometry.bbox} exceeds page bounds"
                        f" ({page.width}x{page.height}) beyond tolerance {tolerance}px"
                    ),
                    code="region_exceeds_page",
                ))

            for li, line in enumerate(region.lines):
                lpath = f"{rpath}.lines[{li}]"

                if not contains(region.geometry.bbox, line.geometry.bbox, tolerance):
                    report.add(ValidationEntry(
                        validator=VALIDATOR_NAME,
                        severity=Severity.WARNING,
                        path=lpath,
                        message=f"Line bbox exceeds region bbox beyond tolerance {tolerance}px",
                        code="line_exceeds_region",
                    ))

                for wi, word in enumerate(line.words):
                    wpath = f"{lpath}.words[{wi}]"

                    if not contains(line.geometry.bbox, word.geometry.bbox, tolerance):
                        report.add(ValidationEntry(
                            validator=VALIDATOR_NAME,
                            severity=Severity.WARNING,
                            path=wpath,
                            message=f"Word bbox exceeds line bbox beyond tolerance {tolerance}px",
                            code="word_exceeds_line",
                        ))