|
|
""" |
|
|
PDF Document Loading and Rendering |
|
|
|
|
|
Uses PyMuPDF (fitz) for PDF operations. |
|
|
Falls back to pdf2image + poppler if needed. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Iterator, List, Optional, Tuple, Union |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
|
|
|
from .base import ( |
|
|
DocumentFormat, |
|
|
DocumentInfo, |
|
|
DocumentLoader, |
|
|
PageInfo, |
|
|
PageRenderer, |
|
|
RenderOptions, |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class PDFLoader(DocumentLoader): |
|
|
""" |
|
|
PDF document loader using PyMuPDF. |
|
|
|
|
|
Extracts metadata and provides page information. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._doc = None |
|
|
self._info: Optional[DocumentInfo] = None |
|
|
self._path: Optional[Path] = None |
|
|
|
|
|
def load(self, path: Union[str, Path]) -> DocumentInfo: |
|
|
"""Load PDF and extract metadata.""" |
|
|
try: |
|
|
import fitz |
|
|
except ImportError: |
|
|
raise ImportError( |
|
|
"PyMuPDF (fitz) is required for PDF loading. " |
|
|
"Install with: pip install pymupdf" |
|
|
) |
|
|
|
|
|
self._path = Path(path) |
|
|
if not self._path.exists(): |
|
|
raise FileNotFoundError(f"PDF file not found: {self._path}") |
|
|
|
|
|
|
|
|
self.close() |
|
|
|
|
|
|
|
|
self._doc = fitz.open(str(self._path)) |
|
|
|
|
|
|
|
|
metadata = self._doc.metadata or {} |
|
|
|
|
|
|
|
|
pages = [] |
|
|
has_text_layer = False |
|
|
has_images = False |
|
|
|
|
|
for page_num in range(len(self._doc)): |
|
|
page = self._doc[page_num] |
|
|
rect = page.rect |
|
|
|
|
|
|
|
|
page_has_text = len(page.get_text().strip()) > 0 |
|
|
if page_has_text: |
|
|
has_text_layer = True |
|
|
|
|
|
|
|
|
image_list = page.get_images(full=True) |
|
|
if image_list: |
|
|
has_images = True |
|
|
|
|
|
page_info = PageInfo( |
|
|
page_number=page_num + 1, |
|
|
width_pixels=int(rect.width), |
|
|
height_pixels=int(rect.height), |
|
|
width_points=rect.width, |
|
|
height_points=rect.height, |
|
|
dpi=72, |
|
|
rotation=page.rotation, |
|
|
has_text=page_has_text, |
|
|
has_images=len(image_list) > 0 |
|
|
) |
|
|
pages.append(page_info) |
|
|
|
|
|
|
|
|
is_scanned = has_images and not has_text_layer |
|
|
|
|
|
self._info = DocumentInfo( |
|
|
path=self._path, |
|
|
format=DocumentFormat.PDF, |
|
|
num_pages=len(self._doc), |
|
|
pages=pages, |
|
|
title=metadata.get("title"), |
|
|
author=metadata.get("author"), |
|
|
subject=metadata.get("subject"), |
|
|
creator=metadata.get("creator"), |
|
|
creation_date=metadata.get("creationDate"), |
|
|
modification_date=metadata.get("modDate"), |
|
|
file_size_bytes=self._path.stat().st_size, |
|
|
is_encrypted=self._doc.is_encrypted, |
|
|
has_text_layer=has_text_layer, |
|
|
is_scanned=is_scanned, |
|
|
has_forms=self._doc.is_form_pdf, |
|
|
has_annotations=any( |
|
|
len(self._doc[i].annots()) > 0 |
|
|
for i in range(len(self._doc)) |
|
|
if self._doc[i].annots() is not None |
|
|
) |
|
|
) |
|
|
|
|
|
return self._info |
|
|
|
|
|
def close(self) -> None: |
|
|
"""Close the PDF document.""" |
|
|
if self._doc is not None: |
|
|
self._doc.close() |
|
|
self._doc = None |
|
|
|
|
|
def is_loaded(self) -> bool: |
|
|
"""Check if a document is loaded.""" |
|
|
return self._doc is not None |
|
|
|
|
|
@property |
|
|
def info(self) -> Optional[DocumentInfo]: |
|
|
"""Get document info.""" |
|
|
return self._info |
|
|
|
|
|
@property |
|
|
def document(self): |
|
|
"""Get the underlying fitz document (for advanced use).""" |
|
|
return self._doc |
|
|
|
|
|
|
|
|
class PDFRenderer(PageRenderer): |
|
|
""" |
|
|
PDF page renderer using PyMuPDF. |
|
|
|
|
|
Renders PDF pages to images at specified DPI. |
|
|
""" |
|
|
|
|
|
def __init__(self, loader: PDFLoader): |
|
|
self._loader = loader |
|
|
|
|
|
def render_page( |
|
|
self, |
|
|
page_number: int, |
|
|
options: Optional[RenderOptions] = None |
|
|
) -> np.ndarray: |
|
|
"""Render a PDF page to an image.""" |
|
|
if not self._loader.is_loaded(): |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
options = options or RenderOptions() |
|
|
doc = self._loader.document |
|
|
|
|
|
|
|
|
if page_number < 1 or page_number > len(doc): |
|
|
raise ValueError(f"Invalid page number: {page_number}") |
|
|
|
|
|
page = doc[page_number - 1] |
|
|
|
|
|
|
|
|
|
|
|
zoom = options.dpi / 72.0 |
|
|
matrix = self._get_matrix(zoom) |
|
|
|
|
|
|
|
|
if options.color_mode == "L": |
|
|
colorspace = self._get_grayscale_colorspace() |
|
|
else: |
|
|
colorspace = self._get_rgb_colorspace() |
|
|
|
|
|
|
|
|
try: |
|
|
import fitz |
|
|
|
|
|
pixmap = page.get_pixmap( |
|
|
matrix=matrix, |
|
|
colorspace=colorspace, |
|
|
alpha=options.color_mode == "RGBA" |
|
|
) |
|
|
|
|
|
|
|
|
if options.color_mode == "L": |
|
|
img = np.frombuffer(pixmap.samples, dtype=np.uint8) |
|
|
img = img.reshape(pixmap.height, pixmap.width) |
|
|
elif options.color_mode == "RGBA": |
|
|
img = np.frombuffer(pixmap.samples, dtype=np.uint8) |
|
|
img = img.reshape(pixmap.height, pixmap.width, 4) |
|
|
else: |
|
|
img = np.frombuffer(pixmap.samples, dtype=np.uint8) |
|
|
img = img.reshape(pixmap.height, pixmap.width, 3) |
|
|
|
|
|
return img |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error rendering page {page_number}: {e}") |
|
|
raise |
|
|
|
|
|
def _get_matrix(self, zoom: float): |
|
|
"""Get transformation matrix for rendering.""" |
|
|
import fitz |
|
|
return fitz.Matrix(zoom, zoom) |
|
|
|
|
|
def _get_rgb_colorspace(self): |
|
|
"""Get RGB colorspace.""" |
|
|
import fitz |
|
|
return fitz.csRGB |
|
|
|
|
|
def _get_grayscale_colorspace(self): |
|
|
"""Get grayscale colorspace.""" |
|
|
import fitz |
|
|
return fitz.csGRAY |
|
|
|
|
|
def render_pages( |
|
|
self, |
|
|
page_numbers: Optional[List[int]] = None, |
|
|
options: Optional[RenderOptions] = None |
|
|
) -> Iterator[Tuple[int, np.ndarray]]: |
|
|
"""Render multiple pages.""" |
|
|
if not self._loader.is_loaded(): |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
info = self._loader.info |
|
|
if page_numbers is None: |
|
|
page_numbers = list(range(1, info.num_pages + 1)) |
|
|
|
|
|
for page_num in page_numbers: |
|
|
yield page_num, self.render_page(page_num, options) |
|
|
|
|
|
|
|
|
class PDFTextExtractor: |
|
|
""" |
|
|
Extract text and text positions from PDF. |
|
|
|
|
|
Useful for PDFs with embedded text layer. |
|
|
""" |
|
|
|
|
|
def __init__(self, loader: PDFLoader): |
|
|
self._loader = loader |
|
|
|
|
|
def extract_text(self, page_number: int) -> str: |
|
|
"""Extract plain text from a page.""" |
|
|
if not self._loader.is_loaded(): |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
doc = self._loader.document |
|
|
page = doc[page_number - 1] |
|
|
return page.get_text() |
|
|
|
|
|
def extract_text_with_positions( |
|
|
self, |
|
|
page_number: int |
|
|
) -> List[dict]: |
|
|
""" |
|
|
Extract text with bounding box positions. |
|
|
|
|
|
Returns list of dicts with: |
|
|
- text: The text content |
|
|
- bbox: (x0, y0, x1, y1) in page coordinates |
|
|
- block_no: Block number |
|
|
- line_no: Line number within block |
|
|
- word_no: Word number within line |
|
|
""" |
|
|
if not self._loader.is_loaded(): |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
doc = self._loader.document |
|
|
page = doc[page_number - 1] |
|
|
|
|
|
|
|
|
text_dict = page.get_text("dict") |
|
|
|
|
|
words = [] |
|
|
for block in text_dict.get("blocks", []): |
|
|
if block.get("type") != 0: |
|
|
continue |
|
|
|
|
|
block_no = block.get("number", 0) |
|
|
|
|
|
for line_no, line in enumerate(block.get("lines", [])): |
|
|
for word_no, span in enumerate(line.get("spans", [])): |
|
|
bbox = span.get("bbox", (0, 0, 0, 0)) |
|
|
words.append({ |
|
|
"text": span.get("text", ""), |
|
|
"bbox": bbox, |
|
|
"block_no": block_no, |
|
|
"line_no": line_no, |
|
|
"word_no": word_no, |
|
|
"font": span.get("font", ""), |
|
|
"size": span.get("size", 0), |
|
|
"flags": span.get("flags", 0), |
|
|
}) |
|
|
|
|
|
return words |
|
|
|
|
|
def get_page_dimensions(self, page_number: int) -> Tuple[float, float]: |
|
|
"""Get page dimensions in points.""" |
|
|
if not self._loader.is_loaded(): |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
doc = self._loader.document |
|
|
page = doc[page_number - 1] |
|
|
rect = page.rect |
|
|
return rect.width, rect.height |
|
|
|
|
|
|
|
|
def load_pdf(path: Union[str, Path]) -> Tuple[PDFLoader, PDFRenderer]: |
|
|
""" |
|
|
Convenience function to load a PDF. |
|
|
|
|
|
Returns: |
|
|
Tuple of (loader, renderer) |
|
|
|
|
|
Example: |
|
|
loader, renderer = load_pdf("document.pdf") |
|
|
info = loader.info |
|
|
for page_num in range(1, info.num_pages + 1): |
|
|
image = renderer.render_page(page_num) |
|
|
""" |
|
|
loader = PDFLoader() |
|
|
loader.load(path) |
|
|
renderer = PDFRenderer(loader) |
|
|
return loader, renderer |
|
|
|