Spaces:
Sleeping
Sleeping
| """Structural validator β checks internal consistency of a CanonicalDocument. | |
| Checks: | |
| - ID uniqueness across the entire document | |
| - reading_order references existing region IDs | |
| - bbox containment: word β line β region β page (with tolerance) | |
| - spatial ordering: words in a line, lines in a region | |
| """ | |
| from __future__ import annotations | |
| from typing import TYPE_CHECKING | |
| from src.app.domain.errors import Severity, ValidationEntry, ValidationReport | |
| from src.app.geometry.bbox import contains | |
| if TYPE_CHECKING: | |
| from src.app.domain.models import CanonicalDocument | |
| VALIDATOR_NAME = "structural" | |
| def validate_structure( | |
| doc: CanonicalDocument, | |
| *, | |
| bbox_tolerance: float = 5.0, | |
| ) -> ValidationReport: | |
| """Run all structural checks on a CanonicalDocument.""" | |
| report = ValidationReport() | |
| _check_id_uniqueness(doc, report) | |
| _check_reading_order(doc, report) | |
| _check_bbox_containment(doc, report, bbox_tolerance) | |
| return report | |
| def _check_id_uniqueness(doc: CanonicalDocument, report: ValidationReport) -> None: | |
| """Every ID in the document must be unique.""" | |
| seen: dict[str, str] = {} # id β first path | |
| for pi, page in enumerate(doc.pages): | |
| _register_id(page.id, f"pages[{pi}]", seen, report) | |
| for ri, region in enumerate(page.text_regions): | |
| rpath = f"pages[{pi}].text_regions[{ri}]" | |
| _register_id(region.id, rpath, seen, report) | |
| for li, line in enumerate(region.lines): | |
| lpath = f"{rpath}.lines[{li}]" | |
| _register_id(line.id, lpath, seen, report) | |
| for wi, word in enumerate(line.words): | |
| wpath = f"{lpath}.words[{wi}]" | |
| _register_id(word.id, wpath, seen, report) | |
| for ni, ntr in enumerate(page.non_text_regions): | |
| npath = f"pages[{pi}].non_text_regions[{ni}]" | |
| _register_id(ntr.id, npath, seen, report) | |
| def _register_id( | |
| node_id: str, path: str, seen: dict[str, str], report: ValidationReport | |
| ) -> None: | |
| if node_id in seen: | |
| report.add(ValidationEntry( | |
| validator=VALIDATOR_NAME, | |
| severity=Severity.ERROR, | |
| path=path, | |
| message=f"Duplicate ID '{node_id}', first seen at {seen[node_id]}", | |
| code="duplicate_id", | |
| )) | |
| else: | |
| seen[node_id] = path | |
| def _check_reading_order(doc: CanonicalDocument, report: ValidationReport) -> None: | |
| """reading_order entries must reference existing region IDs.""" | |
| for pi, page in enumerate(doc.pages): | |
| region_ids = {r.id for r in page.text_regions} | |
| for idx, ref_id in enumerate(page.reading_order): | |
| if ref_id not in region_ids: | |
| report.add(ValidationEntry( | |
| validator=VALIDATOR_NAME, | |
| severity=Severity.ERROR, | |
| path=f"pages[{pi}].reading_order[{idx}]", | |
| message=f"reading_order references unknown region ID '{ref_id}'", | |
| code="invalid_reading_order_ref", | |
| )) | |
| def _check_bbox_containment( | |
| doc: CanonicalDocument, report: ValidationReport, tolerance: float | |
| ) -> None: | |
| """Check that child bboxes are contained within parent bboxes.""" | |
| for pi, page in enumerate(doc.pages): | |
| page_bbox = (0.0, 0.0, page.width, page.height) | |
| for ri, region in enumerate(page.text_regions): | |
| rpath = f"pages[{pi}].text_regions[{ri}]" | |
| if not contains(page_bbox, region.geometry.bbox, tolerance): | |
| report.add(ValidationEntry( | |
| validator=VALIDATOR_NAME, | |
| severity=Severity.WARNING, | |
| path=rpath, | |
| message=( | |
| f"Region bbox {region.geometry.bbox} exceeds page bounds" | |
| f" ({page.width}x{page.height}) beyond tolerance {tolerance}px" | |
| ), | |
| code="region_exceeds_page", | |
| )) | |
| for li, line in enumerate(region.lines): | |
| lpath = f"{rpath}.lines[{li}]" | |
| if not contains(region.geometry.bbox, line.geometry.bbox, tolerance): | |
| report.add(ValidationEntry( | |
| validator=VALIDATOR_NAME, | |
| severity=Severity.WARNING, | |
| path=lpath, | |
| message=f"Line bbox exceeds region bbox beyond tolerance {tolerance}px", | |
| code="line_exceeds_region", | |
| )) | |
| for wi, word in enumerate(line.words): | |
| wpath = f"{lpath}.words[{wi}]" | |
| if not contains(line.geometry.bbox, word.geometry.bbox, tolerance): | |
| report.add(ValidationEntry( | |
| validator=VALIDATOR_NAME, | |
| severity=Severity.WARNING, | |
| path=wpath, | |
| message=f"Word bbox exceeds line bbox beyond tolerance {tolerance}px", | |
| code="word_exceeds_line", | |
| )) | |