| """PyMuPDF-based PDF parsing utilities.""" |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import List, Tuple |
|
|
| import fitz |
| from PIL import Image |
|
|
| SPARSE_TEXT_THRESHOLD = 100 |
| _LINE_Y_TOLERANCE = 4.0 |
| _SPACE_POINTS = 3.5 |
|
|
|
|
| @dataclass |
| class PDFPage: |
| page_number: int |
| embedded_text: str |
| image: Image.Image |
| is_sparse: bool |
|
|
|
|
| def extract_pdf_pages(file_bytes: bytes, dpi_scale: float = 2.0) -> List[PDFPage]: |
| doc = fitz.open(stream=file_bytes, filetype="pdf") |
| pages = [] |
| try: |
| for page_num, page in enumerate(doc): |
| embedded_text = page.get_text("text") |
| mat = fitz.Matrix(dpi_scale, dpi_scale) |
| pix = page.get_pixmap(matrix=mat) |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| is_sparse = len(embedded_text.strip()) < SPARSE_TEXT_THRESHOLD |
| pages.append( |
| PDFPage( |
| page_number=page_num + 1, |
| embedded_text=embedded_text, |
| image=img, |
| is_sparse=is_sparse, |
| ) |
| ) |
| finally: |
| doc.close() |
| return pages |
|
|
|
|
| def _group_blocks_into_lines( |
| blocks: List[Tuple[float, float, float, str]], |
| ) -> List[List[Tuple[float, float, str]]]: |
| blocks.sort(key=lambda item: (round(item[0], 1), item[1])) |
| lines: List[List[Tuple[float, float, str]]] = [] |
| current_y: float | None = None |
| current_line: List[Tuple[float, float, str]] = [] |
|
|
| for y0, x0, x1, text in blocks: |
| if current_y is None or abs(y0 - current_y) > _LINE_Y_TOLERANCE: |
| if current_line: |
| lines.append(current_line) |
| current_line = [(x0, x1, text)] |
| current_y = y0 |
| else: |
| current_line.append((x0, x1, text)) |
|
|
| if current_line: |
| lines.append(current_line) |
| return lines |
|
|
|
|
| def extract_page_spatial_text(page: fitz.Page) -> str: |
| """Rebuild page text with column spacing from native PDF text blocks.""" |
| raw_blocks = page.get_text("blocks") |
| text_blocks: List[Tuple[float, float, float, str]] = [] |
|
|
| for block in raw_blocks: |
| if block[6] != 0: |
| continue |
| x0, y0, x1, y1, text, *_ = block |
| cleaned = text.replace("\n", " ").strip() |
| if cleaned: |
| text_blocks.append((y0, x0, x1, cleaned)) |
|
|
| if not text_blocks: |
| return page.get_text("text", sort=True).strip() |
|
|
| lines_out: List[str] = [] |
| for line_blocks in _group_blocks_into_lines(text_blocks): |
| line_blocks.sort(key=lambda item: item[0]) |
| parts: List[str] = [] |
| cursor_x = 0.0 |
|
|
| for x0, x1, text in line_blocks: |
| if parts: |
| gap = max(1, int((x0 - cursor_x) / _SPACE_POINTS)) |
| parts.append(" " * gap) |
| else: |
| leading = max(0, int(x0 / _SPACE_POINTS)) |
| if leading: |
| parts.append(" " * leading) |
| parts.append(text) |
| cursor_x = x1 |
|
|
| lines_out.append("".join(parts).rstrip()) |
|
|
| return "\n".join(lines_out).strip() |
|
|
|
|
| def extract_pdf_spatial_pages(file_bytes: bytes) -> List[Tuple[int, str, bool]]: |
| """Return (page_num, spatial_text, is_sparse) for each PDF page.""" |
| doc = fitz.open(stream=file_bytes, filetype="pdf") |
| pages: List[Tuple[int, str, bool]] = [] |
| try: |
| for page_num, page in enumerate(doc, start=1): |
| embedded = page.get_text("text").strip() |
| is_sparse = len(embedded) < SPARSE_TEXT_THRESHOLD |
| if is_sparse: |
| pages.append((page_num, embedded, True)) |
| else: |
| pages.append((page_num, extract_page_spatial_text(page), False)) |
| finally: |
| doc.close() |
| return pages |
|
|
|
|
| def render_page_image( |
| file_bytes: bytes, page_num: int, dpi_scale: float = 2.0 |
| ) -> Image.Image: |
| """Render a single PDF page — used only when chart OCR is needed.""" |
| doc = fitz.open(stream=file_bytes, filetype="pdf") |
| try: |
| page = doc[page_num - 1] |
| mat = fitz.Matrix(dpi_scale, dpi_scale) |
| pix = page.get_pixmap(matrix=mat) |
| return Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| finally: |
| doc.close() |
|
|
|
|
| def render_page_png_base64(file_bytes: bytes, page_num: int = 1, dpi_scale: float = 2.0) -> str: |
| import base64 |
|
|
| doc = fitz.open(stream=file_bytes, filetype="pdf") |
| try: |
| page = doc[page_num - 1] |
| mat = fitz.Matrix(dpi_scale, dpi_scale) |
| pix = page.get_pixmap(matrix=mat) |
| return base64.b64encode(pix.tobytes("png")).decode("ascii") |
| finally: |
| doc.close() |
|
|