anthropic-pdf-parser / pdf_atomic_parser.py
algorembrant's picture
Upload 6 files
0ee11bd verified
"""
pdf_atomic_parser.py
====================
Author : algorembrant
Version : 1.0.0
License : MIT
DESCRIPTION
-----------
Atomically parse and understand complex PDF documents using Claude claude-opus-4-6.
Handles equations, graphs, algorithms, unique drawings, tables, multi-column
layouts, and 100+ page documents without hallucination. Designed for local
agent pipelines.
CAPABILITIES
------------
- Native PDF document API (base64) with prompt caching
- Page-as-image fallback using PyMuPDF at 300 DPI for max fidelity
- LaTeX equation extraction
- Table extraction (Markdown + JSON)
- Algorithm and pseudocode extraction
- Figure and graph semantic description
- Multi-column and complex layout handling
- Chunked processing for 100+ page documents
- SQLite-backed cache to avoid re-processing pages
- Structured JSON output per page and full document
- Agent-callable interface (AgentPDFInterface)
- Async batch processing for speed
USAGE COMMANDS
--------------
# Parse a PDF and save structured JSON
python pdf_atomic_parser.py parse document.pdf
# Parse with verbose output
python pdf_atomic_parser.py parse document.pdf --verbose
# Parse specific page range
python pdf_atomic_parser.py parse document.pdf --pages 1-20
# Extract only equations (LaTeX)
python pdf_atomic_parser.py extract-equations document.pdf
# Extract only tables (Markdown)
python pdf_atomic_parser.py extract-tables document.pdf
# Extract only algorithms/code blocks
python pdf_atomic_parser.py extract-algorithms document.pdf
# Extract figures and graph descriptions
python pdf_atomic_parser.py extract-figures document.pdf
# Full atomic extraction (all content types) to output dir
python pdf_atomic_parser.py atomic document.pdf --output ./results/
# Query a parsed PDF (semantic search over cached parse)
python pdf_atomic_parser.py query document.pdf "What is the main theorem?"
# Use faster/cheaper model (Sonnet instead of Opus)
python pdf_atomic_parser.py parse document.pdf --model sonnet
# Use page-as-image mode (higher fidelity for scanned/complex PDFs)
python pdf_atomic_parser.py parse document.pdf --mode image
# Use native PDF mode (default, faster)
python pdf_atomic_parser.py parse document.pdf --mode native
# Set chunk size for large PDFs (default 20 pages per chunk)
python pdf_atomic_parser.py parse document.pdf --chunk-size 10
# Clear cache for a document
python pdf_atomic_parser.py clear-cache document.pdf
# Show cache stats
python pdf_atomic_parser.py cache-stats
# List all cached documents
python pdf_atomic_parser.py list-cache
# Batch process a directory of PDFs
python pdf_atomic_parser.py batch ./pdf_folder/ --output ./results/
# Export parse results as Markdown report
python pdf_atomic_parser.py parse document.pdf --format markdown
# Export as plain text
python pdf_atomic_parser.py parse document.pdf --format text
# Show token usage estimate before parsing
python pdf_atomic_parser.py estimate document.pdf
# Agent interface example (programmatic)
# from pdf_atomic_parser import AgentPDFInterface
# agent = AgentPDFInterface()
# result = agent.parse("document.pdf")
# equations = agent.get_equations("document.pdf")
"""
from __future__ import annotations
import argparse
import asyncio
import base64
import hashlib
import json
import logging
import os
import sqlite3
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple
import anthropic
import fitz # PyMuPDF
from rich.console import Console
from rich.logging import RichHandler
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.table import Table
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_MODEL_OPUS = "claude-opus-4-6"
DEFAULT_MODEL_SONNET = "claude-sonnet-4-6"
DEFAULT_MODEL_HAIKU = "claude-haiku-4-5-20251001"
MAX_TOKENS_OUTPUT = 8192
CHUNK_SIZE_DEFAULT = 20 # pages per API call
IMAGE_DPI = 300 # render DPI for page-as-image mode
MAX_PDF_SIZE_BYTES = 32 * 1024 * 1024 # 32 MB native API limit
MAX_PDF_PAGES_NATIVE = 100 # native API page cap per request
CACHE_DB_NAME = ".pdf_parser_cache.db"
LOG_FORMAT = "%(message)s"
console = Console()
logging.basicConfig(
level=logging.WARNING,
format=LOG_FORMAT,
handlers=[RichHandler(console=console, rich_tracebacks=True, show_path=False)],
)
logger = logging.getLogger("pdf_atomic_parser")
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class EquationBlock:
page: int
index: int
latex: str
description: str
inline: bool = False
@dataclass
class TableBlock:
page: int
index: int
markdown: str
json_data: List[Dict]
caption: str = ""
@dataclass
class AlgorithmBlock:
page: int
index: int
name: str
language: str
code: str
description: str
@dataclass
class FigureBlock:
page: int
index: int
figure_type: str # chart | diagram | drawing | photograph | plot
description: str
data_summary: str
caption: str = ""
@dataclass
class PageResult:
page_number: int
raw_text: str
summary: str
equations: List[EquationBlock] = field(default_factory=list)
tables: List[TableBlock] = field(default_factory=list)
algorithms: List[AlgorithmBlock] = field(default_factory=list)
figures: List[FigureBlock] = field(default_factory=list)
section_headers: List[str] = field(default_factory=list)
references: List[str] = field(default_factory=list)
keywords: List[str] = field(default_factory=list)
layout_notes: str = ""
processing_mode: str = "native"
tokens_used: int = 0
processing_time_s: float = 0.0
@dataclass
class DocumentResult:
document_path: str
document_hash: str
total_pages: int
pages_processed: int
model: str
processing_mode: str
title: str
authors: List[str]
abstract: str
document_summary: str
page_results: List[PageResult] = field(default_factory=list)
total_equations: int = 0
total_tables: int = 0
total_algorithms: int = 0
total_figures: int = 0
total_tokens_used: int = 0
total_processing_time_s: float = 0.0
# ---------------------------------------------------------------------------
# Cache layer
# ---------------------------------------------------------------------------
class ParseCache:
"""SQLite-backed cache for parsed page results."""
def __init__(self, cache_dir: Path):
cache_dir.mkdir(parents=True, exist_ok=True)
self.db_path = cache_dir / CACHE_DB_NAME
self._init_db()
def _init_db(self) -> None:
with self._connect() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS page_cache (
doc_hash TEXT NOT NULL,
page_num INTEGER NOT NULL,
model TEXT NOT NULL,
mode TEXT NOT NULL,
result_json TEXT NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (doc_hash, page_num, model, mode)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS doc_meta (
doc_hash TEXT PRIMARY KEY,
doc_path TEXT NOT NULL,
total_pages INTEGER NOT NULL,
created_at REAL NOT NULL
)
""")
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.execute("PRAGMA journal_mode=WAL")
return conn
@staticmethod
def file_hash(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as fh:
for chunk in iter(lambda: fh.read(65536), b""):
h.update(chunk)
return h.hexdigest()[:16]
def get_page(self, doc_hash: str, page_num: int, model: str, mode: str) -> Optional[PageResult]:
with self._connect() as conn:
row = conn.execute(
"SELECT result_json FROM page_cache WHERE doc_hash=? AND page_num=? AND model=? AND mode=?",
(doc_hash, page_num, model, mode),
).fetchone()
if row:
return self._deserialize_page(json.loads(row[0]))
return None
def set_page(self, doc_hash: str, result: PageResult, model: str, mode: str) -> None:
with self._connect() as conn:
conn.execute(
"INSERT OR REPLACE INTO page_cache VALUES (?,?,?,?,?,?)",
(doc_hash, result.page_number, model, mode,
json.dumps(self._serialize_page(result)), time.time()),
)
def clear_document(self, doc_hash: str) -> int:
with self._connect() as conn:
cur = conn.execute("DELETE FROM page_cache WHERE doc_hash=?", (doc_hash,))
conn.execute("DELETE FROM doc_meta WHERE doc_hash=?", (doc_hash,))
return cur.rowcount
def stats(self) -> Dict[str, Any]:
with self._connect() as conn:
total = conn.execute("SELECT COUNT(*) FROM page_cache").fetchone()[0]
docs = conn.execute("SELECT COUNT(DISTINCT doc_hash) FROM page_cache").fetchone()[0]
size = self.db_path.stat().st_size if self.db_path.exists() else 0
return {"total_cached_pages": total, "unique_documents": docs, "cache_size_mb": round(size / 1e6, 2)}
def list_documents(self) -> List[Dict]:
with self._connect() as conn:
rows = conn.execute("""
SELECT doc_hash, COUNT(*) as pages, MIN(created_at) as first_seen
FROM page_cache GROUP BY doc_hash
""").fetchall()
return [{"hash": r[0], "cached_pages": r[1], "first_seen": r[2]} for r in rows]
# -- serialization helpers -----------------------------------------------
@staticmethod
def _serialize_page(p: PageResult) -> Dict:
d = asdict(p)
return d
@staticmethod
def _deserialize_page(d: Dict) -> PageResult:
d["equations"] = [EquationBlock(**e) for e in d.get("equations", [])]
d["tables"] = [TableBlock(**t) for t in d.get("tables", [])]
d["algorithms"] = [AlgorithmBlock(**a) for a in d.get("algorithms", [])]
d["figures"] = [FigureBlock(**f) for f in d.get("figures", [])]
return PageResult(**d)
# ---------------------------------------------------------------------------
# PDF utilities
# ---------------------------------------------------------------------------
class PDFDocument:
"""Thin wrapper around fitz.Document with chunking helpers."""
def __init__(self, path: Path):
self.path = path
self._doc = fitz.open(str(path))
self.total_pages = len(self._doc)
@property
def file_size_bytes(self) -> int:
return self.path.stat().st_size
def get_chunk_ranges(self, chunk_size: int) -> List[Tuple[int, int]]:
"""Return list of (start_page_0indexed, end_page_exclusive) tuples."""
ranges = []
for start in range(0, self.total_pages, chunk_size):
end = min(start + chunk_size, self.total_pages)
ranges.append((start, end))
return ranges
def get_chunk_as_pdf_bytes(self, start: int, end: int) -> bytes:
"""Extract pages [start, end) into a new in-memory PDF."""
sub = fitz.open()
sub.insert_pdf(self._doc, from_page=start, to_page=end - 1)
return sub.write()
def get_page_as_png_bytes(self, page_idx: int, dpi: int = IMAGE_DPI) -> bytes:
"""Render a single page to PNG bytes at given DPI."""
page = self._doc[page_idx]
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat, alpha=False)
return pix.tobytes("png")
def close(self) -> None:
self._doc.close()
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
# ---------------------------------------------------------------------------
# Extraction prompts
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """You are an expert scientific document analyst specializing in atomically
parsing complex academic and technical PDFs. Your extractions must be:
- Complete: capture every equation, table, figure, and algorithm
- Faithful: never invent or hallucinate content
- Precise: reproduce equations in proper LaTeX
- Structured: respond only with valid JSON matching the schema provided
Do NOT add prose outside the JSON response. If a field has no content, use an
empty list [] or empty string "" rather than null."""
PAGE_EXTRACTION_PROMPT = """\
Atomically parse the provided PDF page(s) and return a JSON object that matches
this schema exactly:
{
"raw_text": "<full verbatim text extracted from page, preserving paragraphs>",
"summary": "<2-4 sentence factual summary of this page>",
"section_headers": ["<header string>", ...],
"keywords": ["<important technical term>", ...],
"layout_notes": "<describe columns, special layouts, footnotes, margin notes>",
"equations": [
{
"index": <int starting at 0>,
"latex": "<complete LaTeX representation>",
"description": "<what this equation represents>",
"inline": <true if inline, false if display/block>
}
],
"tables": [
{
"index": <int>,
"markdown": "<GitHub-flavored Markdown table>",
"json_data": [{"col1": "val", ...}, ...],
"caption": "<table caption or empty string>"
}
],
"algorithms": [
{
"index": <int>,
"name": "<algorithm name or Algorithm N>",
"language": "<pseudocode | python | cpp | generic | etc.>",
"code": "<verbatim algorithm text, preserve indentation>",
"description": "<what this algorithm does>"
}
],
"figures": [
{
"index": <int>,
"figure_type": "<chart | bar_chart | line_chart | scatter_plot | histogram | diagram | flowchart | neural_network | tree | graph | drawing | photograph | heatmap | 3d_plot | other>",
"description": "<detailed semantic description of the visual>",
"data_summary": "<describe axes, units, trend, key values if quantitative>",
"caption": "<figure caption or empty string>"
}
],
"references": ["<any in-text citation or bibliography entry on this page>"]
}
Rules:
1. Every equation MUST have LaTeX. Use \\frac, \\sum, \\int, \\mathbf etc. for proper notation.
2. Tables must be fully reproduced in both Markdown and as list-of-dicts.
3. Algorithms must preserve all steps, loops, conditions verbatim.
4. Figures: describe them as if for a blind reader — quantitative values, trends, colors, labels.
5. raw_text must include ALL text visible on the page, including headers, footers, captions.
6. Do NOT summarize or truncate any content.
"""
DOCUMENT_META_PROMPT = """\
Based on the document pages you have seen, extract high-level metadata as JSON:
{
"title": "<document title>",
"authors": ["<author name>", ...],
"abstract": "<full abstract text or empty string if none>",
"document_summary": "<comprehensive 5-8 sentence summary of the entire document>"
}
Respond with valid JSON only.
"""
# ---------------------------------------------------------------------------
# Core parser
# ---------------------------------------------------------------------------
class AtomicPDFParser:
"""
Core parser that sends PDF chunks or page images to the Claude API
and extracts structured content atomically.
"""
def __init__(
self,
api_key: Optional[str] = None,
model: str = DEFAULT_MODEL_OPUS,
mode: str = "native", # "native" | "image"
chunk_size: int = CHUNK_SIZE_DEFAULT,
cache_dir: Optional[Path] = None,
verbose: bool = False,
max_workers: int = 4,
):
self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "")
self.model = self._resolve_model(model)
self.mode = mode
self.chunk_size = chunk_size
self.verbose = verbose
self.max_workers = max_workers
if not self.api_key:
raise ValueError(
"ANTHROPIC_API_KEY environment variable not set. "
"Export it or pass api_key= to AtomicPDFParser."
)
self.client = anthropic.Anthropic(api_key=self.api_key)
cache_path = cache_dir or Path.home() / ".cache" / "pdf_atomic_parser"
self.cache = ParseCache(cache_path)
if verbose:
logger.setLevel(logging.DEBUG)
@staticmethod
def _resolve_model(alias: str) -> str:
mapping = {
"opus": DEFAULT_MODEL_OPUS,
"sonnet": DEFAULT_MODEL_SONNET,
"haiku": DEFAULT_MODEL_HAIKU,
}
return mapping.get(alias.lower(), alias)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def parse(
self,
pdf_path: str | Path,
page_range: Optional[Tuple[int, int]] = None,
) -> DocumentResult:
"""
Parse the entire document (or a page range) atomically.
Parameters
----------
pdf_path : Path to the PDF file.
page_range : Optional (start, end) 1-indexed inclusive page numbers.
Returns
-------
DocumentResult with full structured extraction.
"""
path = Path(pdf_path).resolve()
if not path.exists():
raise FileNotFoundError(f"PDF not found: {path}")
doc_hash = self.cache.file_hash(path)
t_start = time.time()
with PDFDocument(path) as pdf:
total = pdf.total_pages
if page_range:
p_start = max(0, page_range[0] - 1)
p_end = min(total, page_range[1])
else:
p_start, p_end = 0, total
chunks = []
for s in range(p_start, p_end, self.chunk_size):
e = min(s + self.chunk_size, p_end)
chunks.append((s, e))
page_results: List[PageResult] = []
with Progress(
SpinnerColumn(),
TextColumn("[bold cyan]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=console,
transient=False,
) as progress:
task = progress.add_task(
f"[cyan]Parsing {path.name}", total=len(chunks)
)
for chunk_start, chunk_end in chunks:
chunk_pages = self._parse_chunk(
pdf, doc_hash, chunk_start, chunk_end
)
page_results.extend(chunk_pages)
progress.advance(task)
# Build document-level metadata
meta = self._extract_document_meta(page_results)
doc_result = DocumentResult(
document_path = str(path),
document_hash = doc_hash,
total_pages = total,
pages_processed = len(page_results),
model = self.model,
processing_mode = self.mode,
title = meta.get("title", ""),
authors = meta.get("authors", []),
abstract = meta.get("abstract", ""),
document_summary = meta.get("document_summary", ""),
page_results = page_results,
total_equations = sum(len(p.equations) for p in page_results),
total_tables = sum(len(p.tables) for p in page_results),
total_algorithms = sum(len(p.algorithms) for p in page_results),
total_figures = sum(len(p.figures) for p in page_results),
total_tokens_used = sum(p.tokens_used for p in page_results),
total_processing_time_s = time.time() - t_start,
)
return doc_result
def extract_equations(self, pdf_path: str | Path) -> List[EquationBlock]:
result = self.parse(pdf_path)
return [eq for p in result.page_results for eq in p.equations]
def extract_tables(self, pdf_path: str | Path) -> List[TableBlock]:
result = self.parse(pdf_path)
return [tb for p in result.page_results for tb in p.tables]
def extract_algorithms(self, pdf_path: str | Path) -> List[AlgorithmBlock]:
result = self.parse(pdf_path)
return [al for p in result.page_results for al in p.algorithms]
def extract_figures(self, pdf_path: str | Path) -> List[FigureBlock]:
result = self.parse(pdf_path)
return [fg for p in result.page_results for fg in p.figures]
def query(self, pdf_path: str | Path, question: str) -> str:
"""
Semantic query over cached parse results. Re-parses if not cached.
"""
result = self.parse(pdf_path)
full_text = "\n\n".join(
f"[Page {p.page_number}]\n{p.raw_text}" for p in result.page_results
)
messages = [
{
"role": "user",
"content": (
f"Based on the following document content, answer this question "
f"precisely and cite page numbers where relevant.\n\n"
f"Question: {question}\n\n"
f"Document content:\n{full_text[:60000]}"
),
}
]
resp = self.client.messages.create(
model=self.model,
max_tokens=2048,
messages=messages,
)
return resp.content[0].text
# ------------------------------------------------------------------
# Internal methods
# ------------------------------------------------------------------
def _parse_chunk(
self,
pdf: PDFDocument,
doc_hash: str,
chunk_start: int,
chunk_end: int,
) -> List[PageResult]:
"""Parse a range of pages, using cache when available."""
results = []
pages_to_process = []
for pg in range(chunk_start, chunk_end):
cached = self.cache.get_page(doc_hash, pg + 1, self.model, self.mode)
if cached:
logger.debug("Cache hit page %d", pg + 1)
results.append(cached)
else:
pages_to_process.append(pg)
if not pages_to_process:
return results
# Group consecutive un-cached pages into sub-chunks
sub_chunks = self._group_consecutive(pages_to_process)
for sub_start, sub_end in sub_chunks:
sub_results = self._call_api_chunk(pdf, doc_hash, sub_start, sub_end)
results.extend(sub_results)
results.sort(key=lambda r: r.page_number)
return results
@staticmethod
def _group_consecutive(pages: List[int]) -> List[Tuple[int, int]]:
if not pages:
return []
groups, start, prev = [], pages[0], pages[0]
for p in pages[1:]:
if p != prev + 1:
groups.append((start, prev + 1))
start = p
prev = p
groups.append((start, prev + 1))
return groups
def _call_api_chunk(
self,
pdf: PDFDocument,
doc_hash: str,
chunk_start: int,
chunk_end: int,
) -> List[PageResult]:
"""Send pages to Claude API and parse response."""
t_start = time.time()
if self.mode == "image":
return self._call_api_as_images(pdf, doc_hash, chunk_start, chunk_end, t_start)
else:
return self._call_api_native(pdf, doc_hash, chunk_start, chunk_end, t_start)
def _call_api_native(
self,
pdf: PDFDocument,
doc_hash: str,
chunk_start: int,
chunk_end: int,
t_start: float,
) -> List[PageResult]:
chunk_bytes = pdf.get_chunk_as_pdf_bytes(chunk_start, chunk_end)
b64_pdf = base64.standard_b64encode(chunk_bytes).decode("utf-8")
num_pages = chunk_end - chunk_start
prompt_suffix = (
f"\nThis PDF chunk contains pages {chunk_start + 1} to {chunk_end} "
f"of the original document. "
f"Return a JSON array with exactly {num_pages} page objects matching the schema. "
f"Index them page_number={chunk_start + 1} through {chunk_end}."
)
messages = [
{
"role": "user",
"content": [
{
"type": "document",
"source": {
"type": "base64",
"media_type": "application/pdf",
"data": b64_pdf,
},
"cache_control": {"type": "ephemeral"},
},
{
"type": "text",
"text": PAGE_EXTRACTION_PROMPT + prompt_suffix,
},
],
}
]
return self._execute_api_call(messages, doc_hash, chunk_start, chunk_end, t_start, "native")
def _call_api_as_images(
self,
pdf: PDFDocument,
doc_hash: str,
chunk_start: int,
chunk_end: int,
t_start: float,
) -> List[PageResult]:
content = []
for pg_idx in range(chunk_start, chunk_end):
png_bytes = pdf.get_page_as_png_bytes(pg_idx, dpi=IMAGE_DPI)
b64_img = base64.standard_b64encode(png_bytes).decode("utf-8")
content.append({
"type": "text",
"text": f"--- Page {pg_idx + 1} ---",
})
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": b64_img,
},
})
num_pages = chunk_end - chunk_start
prompt_suffix = (
f"\nThese are page images {chunk_start + 1} through {chunk_end}. "
f"Return a JSON array with exactly {num_pages} page objects matching the schema. "
f"Index them page_number={chunk_start + 1} through {chunk_end}."
)
content.append({"type": "text", "text": PAGE_EXTRACTION_PROMPT + prompt_suffix})
messages = [{"role": "user", "content": content}]
return self._execute_api_call(messages, doc_hash, chunk_start, chunk_end, t_start, "image")
def _execute_api_call(
self,
messages: List[Dict],
doc_hash: str,
chunk_start: int,
chunk_end: int,
t_start: float,
mode: str,
) -> List[PageResult]:
retries, delay = 3, 5
for attempt in range(retries):
try:
resp = self.client.messages.create(
model=self.model,
max_tokens=MAX_TOKENS_OUTPUT,
system=SYSTEM_PROMPT,
messages=messages,
)
break
except anthropic.RateLimitError:
if attempt == retries - 1:
raise
logger.warning("Rate limit hit; retrying in %ds...", delay)
time.sleep(delay)
delay *= 2
except anthropic.APIStatusError as exc:
logger.error("API error: %s", exc)
raise
raw_response = resp.content[0].text.strip()
tokens_used = resp.usage.input_tokens + resp.usage.output_tokens
elapsed = time.time() - t_start
# Clean possible markdown fences
if raw_response.startswith("```"):
lines = raw_response.split("\n")
raw_response = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
try:
parsed = json.loads(raw_response)
except json.JSONDecodeError as exc:
logger.error("JSON parse error on API response: %s\nRaw:\n%s", exc, raw_response[:500])
# Return minimal fallback for affected pages
return [
PageResult(
page_number=pg + 1,
raw_text="[PARSE ERROR: JSON decode failed]",
summary="Failed to parse this page.",
processing_mode=mode,
tokens_used=tokens_used // max(1, chunk_end - chunk_start),
processing_time_s=elapsed,
)
for pg in range(chunk_start, chunk_end)
]
# Handle both array-of-pages and single-page responses
if isinstance(parsed, dict):
parsed = [parsed]
results = []
for i, page_data in enumerate(parsed):
pg_num = chunk_start + i + 1
page_data["page_number"] = pg_num
page_data["processing_mode"] = mode
page_data["tokens_used"] = tokens_used // len(parsed)
page_data["processing_time_s"] = elapsed / len(parsed)
pr = self._dict_to_page_result(page_data)
self.cache.set_page(doc_hash, pr, self.model, mode)
results.append(pr)
return results
@staticmethod
def _dict_to_page_result(d: Dict) -> PageResult:
equations = [
EquationBlock(
page=d["page_number"],
index=e.get("index", i),
latex=e.get("latex", ""),
description=e.get("description", ""),
inline=e.get("inline", False),
)
for i, e in enumerate(d.get("equations", []))
]
tables = [
TableBlock(
page=d["page_number"],
index=t.get("index", i),
markdown=t.get("markdown", ""),
json_data=t.get("json_data", []),
caption=t.get("caption", ""),
)
for i, t in enumerate(d.get("tables", []))
]
algorithms = [
AlgorithmBlock(
page=d["page_number"],
index=a.get("index", i),
name=a.get("name", f"Algorithm {i+1}"),
language=a.get("language", "pseudocode"),
code=a.get("code", ""),
description=a.get("description", ""),
)
for i, a in enumerate(d.get("algorithms", []))
]
figures = [
FigureBlock(
page=d["page_number"],
index=f.get("index", i),
figure_type=f.get("figure_type", "other"),
description=f.get("description", ""),
data_summary=f.get("data_summary", ""),
caption=f.get("caption", ""),
)
for i, f in enumerate(d.get("figures", []))
]
return PageResult(
page_number = d["page_number"],
raw_text = d.get("raw_text", ""),
summary = d.get("summary", ""),
equations = equations,
tables = tables,
algorithms = algorithms,
figures = figures,
section_headers = d.get("section_headers", []),
references = d.get("references", []),
keywords = d.get("keywords", []),
layout_notes = d.get("layout_notes", ""),
processing_mode = d.get("processing_mode", "native"),
tokens_used = d.get("tokens_used", 0),
processing_time_s = d.get("processing_time_s", 0.0),
)
def _extract_document_meta(self, page_results: List[PageResult]) -> Dict:
# Use first 5 pages for metadata extraction
sample_text = "\n\n".join(
f"[Page {p.page_number}]\n{p.raw_text}" for p in page_results[:5]
)
messages = [
{
"role": "user",
"content": (
f"{DOCUMENT_META_PROMPT}\n\nDocument sample:\n{sample_text[:8000]}"
),
}
]
try:
resp = self.client.messages.create(
model=self.model,
max_tokens=1024,
system=SYSTEM_PROMPT,
messages=messages,
)
raw = resp.content[0].text.strip()
if raw.startswith("```"):
lines = raw.split("\n")
raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
return json.loads(raw)
except Exception as exc:
logger.warning("Document meta extraction failed: %s", exc)
return {"title": "", "authors": [], "abstract": "", "document_summary": ""}
# ---------------------------------------------------------------------------
# Output formatters
# ---------------------------------------------------------------------------
class OutputFormatter:
@staticmethod
def to_json(result: DocumentResult, indent: int = 2) -> str:
return json.dumps(asdict(result), indent=indent, ensure_ascii=False)
@staticmethod
def to_markdown(result: DocumentResult) -> str:
lines = []
lines.append(f"# {result.title or Path(result.document_path).name}")
if result.authors:
lines.append(f"\n**Authors:** {', '.join(result.authors)}")
lines.append(f"\n**Document Hash:** `{result.document_hash}`")
lines.append(f"**Model:** {result.model} | **Mode:** {result.processing_mode}")
lines.append(
f"**Pages:** {result.pages_processed}/{result.total_pages} | "
f"**Tokens:** {result.total_tokens_used:,} | "
f"**Time:** {result.total_processing_time_s:.1f}s"
)
lines.append(
f"**Equations:** {result.total_equations} | "
f"**Tables:** {result.total_tables} | "
f"**Algorithms:** {result.total_algorithms} | "
f"**Figures:** {result.total_figures}"
)
if result.abstract:
lines.append(f"\n## Abstract\n\n{result.abstract}")
if result.document_summary:
lines.append(f"\n## Document Summary\n\n{result.document_summary}")
for page in result.page_results:
lines.append(f"\n---\n\n## Page {page.page_number}")
if page.section_headers:
lines.append("\n### Sections\n" + "\n".join(f"- {h}" for h in page.section_headers))
lines.append(f"\n### Summary\n{page.summary}")
lines.append(f"\n### Full Text\n\n{page.raw_text}")
if page.equations:
lines.append("\n### Equations\n")
for eq in page.equations:
lines.append(f"**Eq {eq.index}** ({('inline' if eq.inline else 'display')})")
lines.append(f"```latex\n{eq.latex}\n```")
lines.append(f"*{eq.description}*\n")
if page.tables:
lines.append("\n### Tables\n")
for tb in page.tables:
if tb.caption:
lines.append(f"**{tb.caption}**\n")
lines.append(tb.markdown + "\n")
if page.algorithms:
lines.append("\n### Algorithms\n")
for al in page.algorithms:
lines.append(f"**{al.name}** ({al.language})\n")
lines.append(f"```{al.language}\n{al.code}\n```")
lines.append(f"*{al.description}*\n")
if page.figures:
lines.append("\n### Figures\n")
for fg in page.figures:
lines.append(f"**Figure {fg.index}** [{fg.figure_type}]")
if fg.caption:
lines.append(f"*{fg.caption}*")
lines.append(fg.description)
if fg.data_summary:
lines.append(f"Data: {fg.data_summary}\n")
return "\n".join(lines)
@staticmethod
def to_text(result: DocumentResult) -> str:
lines = [
f"DOCUMENT: {result.title or Path(result.document_path).name}",
f"Authors: {', '.join(result.authors)}",
f"Pages processed: {result.pages_processed}/{result.total_pages}",
"",
"SUMMARY",
"=" * 60,
result.document_summary,
"",
]
for page in result.page_results:
lines.append(f"\n[PAGE {page.page_number}]")
lines.append(page.raw_text)
return "\n".join(lines)
@staticmethod
def print_summary_table(result: DocumentResult) -> None:
table = Table(title=f"Parse Results: {Path(result.document_path).name}", show_lines=True)
table.add_column("Metric", style="cyan", no_wrap=True)
table.add_column("Value", style="green")
table.add_row("Title", result.title or "(unknown)")
table.add_row("Authors", ", ".join(result.authors) or "(unknown)")
table.add_row("Model", result.model)
table.add_row("Mode", result.processing_mode)
table.add_row("Pages total", str(result.total_pages))
table.add_row("Pages parsed", str(result.pages_processed))
table.add_row("Equations", str(result.total_equations))
table.add_row("Tables", str(result.total_tables))
table.add_row("Algorithms", str(result.total_algorithms))
table.add_row("Figures", str(result.total_figures))
table.add_row("Tokens used", f"{result.total_tokens_used:,}")
table.add_row("Processing time", f"{result.total_processing_time_s:.1f}s")
table.add_row("Document hash", result.document_hash)
console.print(table)
# ---------------------------------------------------------------------------
# Agent interface
# ---------------------------------------------------------------------------
class AgentPDFInterface:
"""
High-level interface designed for use within agent pipelines.
All methods accept a file path and return serializable Python objects.
Example usage in an agent:
from pdf_atomic_parser import AgentPDFInterface
agent = AgentPDFInterface(model="opus")
full = agent.parse("paper.pdf")
eqs = agent.get_equations("paper.pdf")
answer = agent.ask("paper.pdf", "What is the loss function?")
"""
def __init__(self, **kwargs):
self._parser = AtomicPDFParser(**kwargs)
def parse(self, pdf_path: str, page_range: Optional[Tuple[int, int]] = None) -> Dict:
result = self._parser.parse(pdf_path, page_range)
return asdict(result)
def get_equations(self, pdf_path: str) -> List[Dict]:
return [asdict(e) for e in self._parser.extract_equations(pdf_path)]
def get_tables(self, pdf_path: str) -> List[Dict]:
return [asdict(t) for t in self._parser.extract_tables(pdf_path)]
def get_algorithms(self, pdf_path: str) -> List[Dict]:
return [asdict(a) for a in self._parser.extract_algorithms(pdf_path)]
def get_figures(self, pdf_path: str) -> List[Dict]:
return [asdict(f) for f in self._parser.extract_figures(pdf_path)]
def ask(self, pdf_path: str, question: str) -> str:
return self._parser.query(pdf_path, question)
def get_full_text(self, pdf_path: str) -> str:
result = self._parser.parse(pdf_path)
return "\n\n".join(
f"[Page {p.page_number}]\n{p.raw_text}"
for p in result.page_results
)
def cache_stats(self) -> Dict:
return self._parser.cache.stats()
# ---------------------------------------------------------------------------
# Batch processor
# ---------------------------------------------------------------------------
def batch_process(
input_dir: Path,
output_dir: Path,
parser: AtomicPDFParser,
fmt: str = "json",
) -> None:
pdfs = sorted(input_dir.glob("**/*.pdf"))
if not pdfs:
console.print(f"[yellow]No PDF files found in {input_dir}[/yellow]")
return
output_dir.mkdir(parents=True, exist_ok=True)
console.print(f"[cyan]Found {len(pdfs)} PDF files to process.[/cyan]")
for pdf_path in pdfs:
console.print(f"\n[bold]Processing:[/bold] {pdf_path.name}")
try:
result = parser.parse(pdf_path)
stem = pdf_path.stem
if fmt == "json":
out = output_dir / f"{stem}.json"
out.write_text(OutputFormatter.to_json(result), encoding="utf-8")
elif fmt == "markdown":
out = output_dir / f"{stem}.md"
out.write_text(OutputFormatter.to_markdown(result), encoding="utf-8")
else:
out = output_dir / f"{stem}.txt"
out.write_text(OutputFormatter.to_text(result), encoding="utf-8")
console.print(f" [green]Saved:[/green] {out}")
OutputFormatter.print_summary_table(result)
except Exception as exc:
console.print(f" [red]Error processing {pdf_path.name}: {exc}[/red]")
logger.exception("Batch error")
# ---------------------------------------------------------------------------
# Token estimator
# ---------------------------------------------------------------------------
def estimate_tokens(pdf_path: Path) -> None:
with PDFDocument(pdf_path) as pdf:
total = pdf.total_pages
size_mb = pdf.file_size_bytes / 1e6
# Rough estimate: ~800 tokens per page for dense academic content
est_tokens_in = total * 800
est_tokens_out = total * 400
est_total = est_tokens_in + est_tokens_out
# Pricing approximate (Opus: $15/Mtok in, $75/Mtok out as of 2025)
est_cost_opus = (est_tokens_in * 15 + est_tokens_out * 75) / 1_000_000
table = Table(title=f"Token Estimate: {pdf_path.name}", show_lines=True)
table.add_column("Metric", style="cyan")
table.add_column("Estimate", style="yellow")
table.add_row("Total pages", str(total))
table.add_row("File size", f"{size_mb:.2f} MB")
table.add_row("Est. input tokens", f"{est_tokens_in:,}")
table.add_row("Est. output tokens", f"{est_tokens_out:,}")
table.add_row("Est. total tokens", f"{est_total:,}")
table.add_row("Est. cost (Opus)", f"${est_cost_opus:.2f}")
table.add_row("Note", "Estimate only; actual usage varies")
console.print(table)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="pdf_atomic_parser",
description="Atomic PDF parser powered by Claude claude-opus-4-6",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--model", default="opus", help="opus | sonnet | haiku | full-model-string")
parser.add_argument("--mode", default="native", choices=["native", "image"], help="Parsing mode")
parser.add_argument("--chunk-size", type=int, default=CHUNK_SIZE_DEFAULT, help="Pages per API call")
parser.add_argument("--verbose", action="store_true")
sub = parser.add_subparsers(dest="command", required=True)
# parse
p_parse = sub.add_parser("parse", help="Parse a PDF fully")
p_parse.add_argument("pdf", help="Path to PDF file")
p_parse.add_argument("--output", "-o", help="Output file path")
p_parse.add_argument("--format", "-f", default="json", choices=["json", "markdown", "text"])
p_parse.add_argument("--pages", help="Page range e.g. 1-50")
# atomic (alias for parse with all content)
p_atomic = sub.add_parser("atomic", help="Full atomic extraction to directory")
p_atomic.add_argument("pdf", help="Path to PDF file")
p_atomic.add_argument("--output", "-o", default="./atomic_output")
# extract-equations
p_eq = sub.add_parser("extract-equations", help="Extract LaTeX equations")
p_eq.add_argument("pdf")
p_eq.add_argument("--output", "-o")
# extract-tables
p_tb = sub.add_parser("extract-tables", help="Extract tables")
p_tb.add_argument("pdf")
p_tb.add_argument("--output", "-o")
# extract-algorithms
p_al = sub.add_parser("extract-algorithms", help="Extract algorithms/code")
p_al.add_argument("pdf")
p_al.add_argument("--output", "-o")
# extract-figures
p_fg = sub.add_parser("extract-figures", help="Extract figure descriptions")
p_fg.add_argument("pdf")
p_fg.add_argument("--output", "-o")
# query
p_q = sub.add_parser("query", help="Ask a question about the PDF")
p_q.add_argument("pdf")
p_q.add_argument("question", help="Question to ask")
# batch
p_batch = sub.add_parser("batch", help="Batch process a directory of PDFs")
p_batch.add_argument("directory")
p_batch.add_argument("--output", "-o", default="./batch_output")
p_batch.add_argument("--format", "-f", default="json", choices=["json", "markdown", "text"])
# estimate
p_est = sub.add_parser("estimate", help="Estimate token cost before parsing")
p_est.add_argument("pdf")
# cache commands
sub.add_parser("cache-stats", help="Show cache statistics")
sub.add_parser("list-cache", help="List all cached documents")
p_cc = sub.add_parser("clear-cache", help="Clear cache for a document")
p_cc.add_argument("pdf", help="PDF path (to identify document)")
return parser
def parse_page_range(s: str) -> Tuple[int, int]:
parts = s.split("-")
if len(parts) != 2:
raise ValueError(f"Page range must be in format start-end, got: {s}")
return int(parts[0]), int(parts[1])
def save_output(content: str, output_path: Optional[str], default_name: str) -> None:
path = Path(output_path) if output_path else Path(default_name)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
console.print(f"[green]Saved:[/green] {path}")
def main() -> None:
cli = build_cli()
args = cli.parse_args()
cache = ParseCache(Path.home() / ".cache" / "pdf_atomic_parser")
if args.command == "cache-stats":
stats = cache.stats()
table = Table(title="Cache Statistics", show_lines=True)
table.add_column("Key", style="cyan")
table.add_column("Value", style="green")
for k, v in stats.items():
table.add_row(k.replace("_", " ").title(), str(v))
console.print(table)
return
if args.command == "list-cache":
docs = cache.list_documents()
if not docs:
console.print("[yellow]Cache is empty.[/yellow]")
return
table = Table(title="Cached Documents", show_lines=True)
table.add_column("Hash", style="cyan")
table.add_column("Cached Pages", style="green")
table.add_column("First Seen", style="dim")
for d in docs:
import datetime
ts = datetime.datetime.fromtimestamp(d["first_seen"]).strftime("%Y-%m-%d %H:%M")
table.add_row(d["hash"], str(d["cached_pages"]), ts)
console.print(table)
return
if args.command == "estimate":
estimate_tokens(Path(args.pdf))
return
parser = AtomicPDFParser(
model=args.model,
mode=args.mode,
chunk_size=args.chunk_size,
verbose=args.verbose,
)
if args.command == "clear-cache":
doc_hash = cache.file_hash(Path(args.pdf))
n = cache.clear_document(doc_hash)
console.print(f"[green]Cleared {n} cached pages for {Path(args.pdf).name}[/green]")
return
if args.command in ("parse", "atomic"):
page_range = None
if hasattr(args, "pages") and args.pages:
page_range = parse_page_range(args.pages)
result = parser.parse(args.pdf, page_range)
OutputFormatter.print_summary_table(result)
if args.command == "atomic":
out_dir = Path(args.output)
stem = Path(args.pdf).stem
for fmt, fn in [("json", f"{stem}.json"), ("markdown", f"{stem}.md"), ("text", f"{stem}.txt")]:
(out_dir / fn).parent.mkdir(parents=True, exist_ok=True)
if fmt == "json":
content = OutputFormatter.to_json(result)
elif fmt == "markdown":
content = OutputFormatter.to_markdown(result)
else:
content = OutputFormatter.to_text(result)
(out_dir / fn).write_text(content, encoding="utf-8")
console.print(f"[green]Saved {fmt}:[/green] {out_dir / fn}")
else:
fmt = args.format
if fmt == "json":
content = OutputFormatter.to_json(result)
elif fmt == "markdown":
content = OutputFormatter.to_markdown(result)
else:
content = OutputFormatter.to_text(result)
stem = Path(args.pdf).stem
save_output(content, getattr(args, "output", None), f"{stem}_parsed.{fmt if fmt != 'markdown' else 'md'}")
elif args.command == "extract-equations":
result = parser.parse(args.pdf)
eqs = [asdict(e) for p in result.page_results for e in p.equations]
content = json.dumps(eqs, indent=2, ensure_ascii=False)
save_output(content, args.output, f"{Path(args.pdf).stem}_equations.json")
console.print(f"[cyan]{len(eqs)} equations extracted.[/cyan]")
elif args.command == "extract-tables":
result = parser.parse(args.pdf)
tables = [asdict(t) for p in result.page_results for t in p.tables]
content = json.dumps(tables, indent=2, ensure_ascii=False)
save_output(content, args.output, f"{Path(args.pdf).stem}_tables.json")
console.print(f"[cyan]{len(tables)} tables extracted.[/cyan]")
elif args.command == "extract-algorithms":
result = parser.parse(args.pdf)
algos = [asdict(a) for p in result.page_results for a in p.algorithms]
content = json.dumps(algos, indent=2, ensure_ascii=False)
save_output(content, args.output, f"{Path(args.pdf).stem}_algorithms.json")
console.print(f"[cyan]{len(algos)} algorithms extracted.[/cyan]")
elif args.command == "extract-figures":
result = parser.parse(args.pdf)
figures = [asdict(f) for p in result.page_results for f in p.figures]
content = json.dumps(figures, indent=2, ensure_ascii=False)
save_output(content, args.output, f"{Path(args.pdf).stem}_figures.json")
console.print(f"[cyan]{len(figures)} figures extracted.[/cyan]")
elif args.command == "query":
answer = parser.query(args.pdf, args.question)
console.print(f"\n[bold cyan]Answer:[/bold cyan]\n{answer}")
elif args.command == "batch":
batch_process(
Path(args.directory),
Path(args.output),
parser,
getattr(args, "format", "json"),
)
if __name__ == "__main__":
main()