|
|
""" |
|
|
Document Loader |
|
|
|
|
|
Loads and renders PDF and image documents for processing. |
|
|
Supports page-by-page rendering with configurable DPI. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import hashlib |
|
|
from pathlib import Path |
|
|
from typing import List, Tuple, Optional, Union, BinaryIO |
|
|
from dataclasses import dataclass |
|
|
from loguru import logger |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
try: |
|
|
import fitz |
|
|
HAS_PYMUPDF = True |
|
|
except ImportError: |
|
|
HAS_PYMUPDF = False |
|
|
logger.warning("PyMuPDF not installed. PDF support disabled. Install with: pip install pymupdf") |
|
|
|
|
|
|
|
|
try: |
|
|
from pdf2image import convert_from_path, convert_from_bytes |
|
|
HAS_PDF2IMAGE = True |
|
|
except ImportError: |
|
|
HAS_PDF2IMAGE = False |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PageInfo: |
|
|
"""Information about a document page.""" |
|
|
page_number: int |
|
|
width: int |
|
|
height: int |
|
|
dpi: int |
|
|
has_text: bool = False |
|
|
rotation: int = 0 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LoadedDocument: |
|
|
""" |
|
|
A loaded document ready for processing. |
|
|
""" |
|
|
document_id: str |
|
|
source_path: str |
|
|
filename: str |
|
|
file_type: str |
|
|
file_size_bytes: int |
|
|
num_pages: int |
|
|
pages_info: List[PageInfo] |
|
|
|
|
|
|
|
|
_doc_handle: Optional[object] = None |
|
|
|
|
|
def get_page_image(self, page_number: int, dpi: int = 300) -> np.ndarray: |
|
|
"""Render a specific page as an image.""" |
|
|
raise NotImplementedError("Subclasses must implement get_page_image") |
|
|
|
|
|
def close(self): |
|
|
"""Close document handle and free resources.""" |
|
|
pass |
|
|
|
|
|
|
|
|
class PDFDocument(LoadedDocument): |
|
|
"""Loaded PDF document with PyMuPDF backend.""" |
|
|
|
|
|
def get_page_image(self, page_number: int, dpi: int = 300) -> np.ndarray: |
|
|
"""Render PDF page as numpy array.""" |
|
|
if not HAS_PYMUPDF or self._doc_handle is None: |
|
|
raise RuntimeError("PyMuPDF not available or document not loaded") |
|
|
|
|
|
if page_number < 0 or page_number >= self.num_pages: |
|
|
raise ValueError(f"Page {page_number} out of range (0-{self.num_pages - 1})") |
|
|
|
|
|
doc = self._doc_handle |
|
|
page = doc[page_number] |
|
|
|
|
|
|
|
|
zoom = dpi / 72.0 |
|
|
matrix = fitz.Matrix(zoom, zoom) |
|
|
|
|
|
|
|
|
pixmap = page.get_pixmap(matrix=matrix, alpha=False) |
|
|
|
|
|
|
|
|
img_array = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape( |
|
|
pixmap.height, pixmap.width, 3 |
|
|
) |
|
|
|
|
|
return img_array |
|
|
|
|
|
def get_page_text(self, page_number: int) -> str: |
|
|
"""Extract text from PDF page using PyMuPDF.""" |
|
|
if not HAS_PYMUPDF or self._doc_handle is None: |
|
|
return "" |
|
|
|
|
|
if page_number < 0 or page_number >= self.num_pages: |
|
|
return "" |
|
|
|
|
|
page = self._doc_handle[page_number] |
|
|
return page.get_text() |
|
|
|
|
|
def close(self): |
|
|
"""Close PDF document.""" |
|
|
if self._doc_handle is not None: |
|
|
self._doc_handle.close() |
|
|
self._doc_handle = None |
|
|
|
|
|
|
|
|
class ImageDocument(LoadedDocument): |
|
|
"""Loaded image document (single page).""" |
|
|
|
|
|
_image: Optional[np.ndarray] = None |
|
|
|
|
|
def get_page_image(self, page_number: int = 0, dpi: int = 300) -> np.ndarray: |
|
|
"""Return the image (images are single-page).""" |
|
|
if page_number != 0: |
|
|
raise ValueError("Image documents have only one page (page 0)") |
|
|
|
|
|
if self._image is None: |
|
|
|
|
|
with Image.open(self.source_path) as img: |
|
|
if img.mode != "RGB": |
|
|
img = img.convert("RGB") |
|
|
self._image = np.array(img) |
|
|
|
|
|
return self._image |
|
|
|
|
|
def close(self): |
|
|
"""Clear image from memory.""" |
|
|
self._image = None |
|
|
|
|
|
|
|
|
class DocumentLoader: |
|
|
""" |
|
|
Document loader with support for PDF and image files. |
|
|
""" |
|
|
|
|
|
SUPPORTED_EXTENSIONS = { |
|
|
".pdf": "pdf", |
|
|
".png": "image", |
|
|
".jpg": "image", |
|
|
".jpeg": "image", |
|
|
".tiff": "image", |
|
|
".tif": "image", |
|
|
".bmp": "image", |
|
|
".webp": "image", |
|
|
} |
|
|
|
|
|
def __init__(self, default_dpi: int = 300, cache_enabled: bool = True): |
|
|
""" |
|
|
Initialize document loader. |
|
|
|
|
|
Args: |
|
|
default_dpi: Default DPI for PDF rendering |
|
|
cache_enabled: Whether to cache rendered pages |
|
|
""" |
|
|
self.default_dpi = default_dpi |
|
|
self.cache_enabled = cache_enabled |
|
|
|
|
|
|
|
|
if not HAS_PYMUPDF and not HAS_PDF2IMAGE: |
|
|
logger.warning("No PDF backend available. PDF loading will fail.") |
|
|
|
|
|
def load( |
|
|
self, |
|
|
source: Union[str, Path, BinaryIO], |
|
|
document_id: Optional[str] = None, |
|
|
) -> LoadedDocument: |
|
|
""" |
|
|
Load a document from file path or file object. |
|
|
|
|
|
Args: |
|
|
source: File path or file-like object |
|
|
document_id: Optional document ID (generated from hash if not provided) |
|
|
|
|
|
Returns: |
|
|
LoadedDocument instance |
|
|
""" |
|
|
|
|
|
if isinstance(source, (str, Path)): |
|
|
path = Path(source) |
|
|
if not path.exists(): |
|
|
raise FileNotFoundError(f"Document not found: {path}") |
|
|
|
|
|
source_path = str(path.absolute()) |
|
|
filename = path.name |
|
|
file_size = path.stat().st_size |
|
|
ext = path.suffix.lower() |
|
|
|
|
|
|
|
|
if document_id is None: |
|
|
document_id = self._generate_doc_id(source_path) |
|
|
|
|
|
else: |
|
|
raise ValueError("File-like objects not yet supported. Please provide a file path.") |
|
|
|
|
|
|
|
|
if ext not in self.SUPPORTED_EXTENSIONS: |
|
|
raise ValueError(f"Unsupported file type: {ext}") |
|
|
|
|
|
file_type = self.SUPPORTED_EXTENSIONS[ext] |
|
|
|
|
|
|
|
|
if file_type == "pdf": |
|
|
return self._load_pdf(source_path, filename, file_size, document_id) |
|
|
else: |
|
|
return self._load_image(source_path, filename, file_size, document_id) |
|
|
|
|
|
def _load_pdf( |
|
|
self, |
|
|
source_path: str, |
|
|
filename: str, |
|
|
file_size: int, |
|
|
document_id: str, |
|
|
) -> PDFDocument: |
|
|
"""Load a PDF document.""" |
|
|
if not HAS_PYMUPDF: |
|
|
raise RuntimeError("PyMuPDF required for PDF loading. Install with: pip install pymupdf") |
|
|
|
|
|
logger.info(f"Loading PDF: {filename}") |
|
|
|
|
|
doc = fitz.open(source_path) |
|
|
num_pages = len(doc) |
|
|
|
|
|
|
|
|
pages_info = [] |
|
|
for i in range(num_pages): |
|
|
page = doc[i] |
|
|
rect = page.rect |
|
|
has_text = len(page.get_text().strip()) > 0 |
|
|
|
|
|
pages_info.append(PageInfo( |
|
|
page_number=i, |
|
|
width=int(rect.width), |
|
|
height=int(rect.height), |
|
|
dpi=72, |
|
|
has_text=has_text, |
|
|
rotation=page.rotation, |
|
|
)) |
|
|
|
|
|
return PDFDocument( |
|
|
document_id=document_id, |
|
|
source_path=source_path, |
|
|
filename=filename, |
|
|
file_type="pdf", |
|
|
file_size_bytes=file_size, |
|
|
num_pages=num_pages, |
|
|
pages_info=pages_info, |
|
|
_doc_handle=doc, |
|
|
) |
|
|
|
|
|
def _load_image( |
|
|
self, |
|
|
source_path: str, |
|
|
filename: str, |
|
|
file_size: int, |
|
|
document_id: str, |
|
|
) -> ImageDocument: |
|
|
"""Load an image document.""" |
|
|
logger.info(f"Loading image: {filename}") |
|
|
|
|
|
with Image.open(source_path) as img: |
|
|
width, height = img.size |
|
|
|
|
|
pages_info = [PageInfo( |
|
|
page_number=0, |
|
|
width=width, |
|
|
height=height, |
|
|
dpi=self.default_dpi, |
|
|
has_text=False, |
|
|
)] |
|
|
|
|
|
return ImageDocument( |
|
|
document_id=document_id, |
|
|
source_path=source_path, |
|
|
filename=filename, |
|
|
file_type="image", |
|
|
file_size_bytes=file_size, |
|
|
num_pages=1, |
|
|
pages_info=pages_info, |
|
|
) |
|
|
|
|
|
def _generate_doc_id(self, source_path: str) -> str: |
|
|
"""Generate document ID from file path and modification time.""" |
|
|
stat = os.stat(source_path) |
|
|
content = f"{source_path}:{stat.st_mtime}:{stat.st_size}" |
|
|
return hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
|
|
|
|
|
|
|
|
|
_default_loader: Optional[DocumentLoader] = None |
|
|
|
|
|
|
|
|
def get_loader() -> DocumentLoader: |
|
|
"""Get or create the default document loader.""" |
|
|
global _default_loader |
|
|
if _default_loader is None: |
|
|
_default_loader = DocumentLoader() |
|
|
return _default_loader |
|
|
|
|
|
|
|
|
def load_document( |
|
|
source: Union[str, Path, BinaryIO], |
|
|
document_id: Optional[str] = None, |
|
|
) -> LoadedDocument: |
|
|
"""Load a document using the default loader.""" |
|
|
return get_loader().load(source, document_id) |
|
|
|
|
|
|
|
|
def load_pdf(source: Union[str, Path], document_id: Optional[str] = None) -> PDFDocument: |
|
|
"""Load a PDF document.""" |
|
|
doc = load_document(source, document_id) |
|
|
if not isinstance(doc, PDFDocument): |
|
|
raise ValueError(f"Expected PDF, got {doc.file_type}") |
|
|
return doc |
|
|
|
|
|
|
|
|
def load_image(source: Union[str, Path], document_id: Optional[str] = None) -> ImageDocument: |
|
|
"""Load an image document.""" |
|
|
doc = load_document(source, document_id) |
|
|
if not isinstance(doc, ImageDocument): |
|
|
raise ValueError(f"Expected image, got {doc.file_type}") |
|
|
return doc |
|
|
|
|
|
|
|
|
def render_page( |
|
|
document: LoadedDocument, |
|
|
page_number: int, |
|
|
dpi: int = 300, |
|
|
) -> np.ndarray: |
|
|
"""Render a document page as a numpy array.""" |
|
|
return document.get_page_image(page_number, dpi) |
|
|
|