""" pdf_atomic_parser.py ==================== Author : algorembrant Version : 1.0.0 License : MIT DESCRIPTION ----------- Atomically parse and understand complex PDF documents using Claude claude-opus-4-6. Handles equations, graphs, algorithms, unique drawings, tables, multi-column layouts, and 100+ page documents without hallucination. Designed for local agent pipelines. CAPABILITIES ------------ - Native PDF document API (base64) with prompt caching - Page-as-image fallback using PyMuPDF at 300 DPI for max fidelity - LaTeX equation extraction - Table extraction (Markdown + JSON) - Algorithm and pseudocode extraction - Figure and graph semantic description - Multi-column and complex layout handling - Chunked processing for 100+ page documents - SQLite-backed cache to avoid re-processing pages - Structured JSON output per page and full document - Agent-callable interface (AgentPDFInterface) - Async batch processing for speed USAGE COMMANDS -------------- # Parse a PDF and save structured JSON python pdf_atomic_parser.py parse document.pdf # Parse with verbose output python pdf_atomic_parser.py parse document.pdf --verbose # Parse specific page range python pdf_atomic_parser.py parse document.pdf --pages 1-20 # Extract only equations (LaTeX) python pdf_atomic_parser.py extract-equations document.pdf # Extract only tables (Markdown) python pdf_atomic_parser.py extract-tables document.pdf # Extract only algorithms/code blocks python pdf_atomic_parser.py extract-algorithms document.pdf # Extract figures and graph descriptions python pdf_atomic_parser.py extract-figures document.pdf # Full atomic extraction (all content types) to output dir python pdf_atomic_parser.py atomic document.pdf --output ./results/ # Query a parsed PDF (semantic search over cached parse) python pdf_atomic_parser.py query document.pdf "What is the main theorem?" # Use faster/cheaper model (Sonnet instead of Opus) python pdf_atomic_parser.py parse document.pdf --model sonnet # Use page-as-image mode (higher fidelity for scanned/complex PDFs) python pdf_atomic_parser.py parse document.pdf --mode image # Use native PDF mode (default, faster) python pdf_atomic_parser.py parse document.pdf --mode native # Set chunk size for large PDFs (default 20 pages per chunk) python pdf_atomic_parser.py parse document.pdf --chunk-size 10 # Clear cache for a document python pdf_atomic_parser.py clear-cache document.pdf # Show cache stats python pdf_atomic_parser.py cache-stats # List all cached documents python pdf_atomic_parser.py list-cache # Batch process a directory of PDFs python pdf_atomic_parser.py batch ./pdf_folder/ --output ./results/ # Export parse results as Markdown report python pdf_atomic_parser.py parse document.pdf --format markdown # Export as plain text python pdf_atomic_parser.py parse document.pdf --format text # Show token usage estimate before parsing python pdf_atomic_parser.py estimate document.pdf # Agent interface example (programmatic) # from pdf_atomic_parser import AgentPDFInterface # agent = AgentPDFInterface() # result = agent.parse("document.pdf") # equations = agent.get_equations("document.pdf") """ from __future__ import annotations import argparse import asyncio import base64 import hashlib import json import logging import os import sqlite3 import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple import anthropic import fitz # PyMuPDF from rich.console import Console from rich.logging import RichHandler from rich.progress import ( BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from rich.table import Table from tqdm import tqdm # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- DEFAULT_MODEL_OPUS = "claude-opus-4-6" DEFAULT_MODEL_SONNET = "claude-sonnet-4-6" DEFAULT_MODEL_HAIKU = "claude-haiku-4-5-20251001" MAX_TOKENS_OUTPUT = 8192 CHUNK_SIZE_DEFAULT = 20 # pages per API call IMAGE_DPI = 300 # render DPI for page-as-image mode MAX_PDF_SIZE_BYTES = 32 * 1024 * 1024 # 32 MB native API limit MAX_PDF_PAGES_NATIVE = 100 # native API page cap per request CACHE_DB_NAME = ".pdf_parser_cache.db" LOG_FORMAT = "%(message)s" console = Console() logging.basicConfig( level=logging.WARNING, format=LOG_FORMAT, handlers=[RichHandler(console=console, rich_tracebacks=True, show_path=False)], ) logger = logging.getLogger("pdf_atomic_parser") # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclass class EquationBlock: page: int index: int latex: str description: str inline: bool = False @dataclass class TableBlock: page: int index: int markdown: str json_data: List[Dict] caption: str = "" @dataclass class AlgorithmBlock: page: int index: int name: str language: str code: str description: str @dataclass class FigureBlock: page: int index: int figure_type: str # chart | diagram | drawing | photograph | plot description: str data_summary: str caption: str = "" @dataclass class PageResult: page_number: int raw_text: str summary: str equations: List[EquationBlock] = field(default_factory=list) tables: List[TableBlock] = field(default_factory=list) algorithms: List[AlgorithmBlock] = field(default_factory=list) figures: List[FigureBlock] = field(default_factory=list) section_headers: List[str] = field(default_factory=list) references: List[str] = field(default_factory=list) keywords: List[str] = field(default_factory=list) layout_notes: str = "" processing_mode: str = "native" tokens_used: int = 0 processing_time_s: float = 0.0 @dataclass class DocumentResult: document_path: str document_hash: str total_pages: int pages_processed: int model: str processing_mode: str title: str authors: List[str] abstract: str document_summary: str page_results: List[PageResult] = field(default_factory=list) total_equations: int = 0 total_tables: int = 0 total_algorithms: int = 0 total_figures: int = 0 total_tokens_used: int = 0 total_processing_time_s: float = 0.0 # --------------------------------------------------------------------------- # Cache layer # --------------------------------------------------------------------------- class ParseCache: """SQLite-backed cache for parsed page results.""" def __init__(self, cache_dir: Path): cache_dir.mkdir(parents=True, exist_ok=True) self.db_path = cache_dir / CACHE_DB_NAME self._init_db() def _init_db(self) -> None: with self._connect() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS page_cache ( doc_hash TEXT NOT NULL, page_num INTEGER NOT NULL, model TEXT NOT NULL, mode TEXT NOT NULL, result_json TEXT NOT NULL, created_at REAL NOT NULL, PRIMARY KEY (doc_hash, page_num, model, mode) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS doc_meta ( doc_hash TEXT PRIMARY KEY, doc_path TEXT NOT NULL, total_pages INTEGER NOT NULL, created_at REAL NOT NULL ) """) def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path, timeout=30) conn.execute("PRAGMA journal_mode=WAL") return conn @staticmethod def file_hash(path: Path) -> str: h = hashlib.sha256() with open(path, "rb") as fh: for chunk in iter(lambda: fh.read(65536), b""): h.update(chunk) return h.hexdigest()[:16] def get_page(self, doc_hash: str, page_num: int, model: str, mode: str) -> Optional[PageResult]: with self._connect() as conn: row = conn.execute( "SELECT result_json FROM page_cache WHERE doc_hash=? AND page_num=? AND model=? AND mode=?", (doc_hash, page_num, model, mode), ).fetchone() if row: return self._deserialize_page(json.loads(row[0])) return None def set_page(self, doc_hash: str, result: PageResult, model: str, mode: str) -> None: with self._connect() as conn: conn.execute( "INSERT OR REPLACE INTO page_cache VALUES (?,?,?,?,?,?)", (doc_hash, result.page_number, model, mode, json.dumps(self._serialize_page(result)), time.time()), ) def clear_document(self, doc_hash: str) -> int: with self._connect() as conn: cur = conn.execute("DELETE FROM page_cache WHERE doc_hash=?", (doc_hash,)) conn.execute("DELETE FROM doc_meta WHERE doc_hash=?", (doc_hash,)) return cur.rowcount def stats(self) -> Dict[str, Any]: with self._connect() as conn: total = conn.execute("SELECT COUNT(*) FROM page_cache").fetchone()[0] docs = conn.execute("SELECT COUNT(DISTINCT doc_hash) FROM page_cache").fetchone()[0] size = self.db_path.stat().st_size if self.db_path.exists() else 0 return {"total_cached_pages": total, "unique_documents": docs, "cache_size_mb": round(size / 1e6, 2)} def list_documents(self) -> List[Dict]: with self._connect() as conn: rows = conn.execute(""" SELECT doc_hash, COUNT(*) as pages, MIN(created_at) as first_seen FROM page_cache GROUP BY doc_hash """).fetchall() return [{"hash": r[0], "cached_pages": r[1], "first_seen": r[2]} for r in rows] # -- serialization helpers ----------------------------------------------- @staticmethod def _serialize_page(p: PageResult) -> Dict: d = asdict(p) return d @staticmethod def _deserialize_page(d: Dict) -> PageResult: d["equations"] = [EquationBlock(**e) for e in d.get("equations", [])] d["tables"] = [TableBlock(**t) for t in d.get("tables", [])] d["algorithms"] = [AlgorithmBlock(**a) for a in d.get("algorithms", [])] d["figures"] = [FigureBlock(**f) for f in d.get("figures", [])] return PageResult(**d) # --------------------------------------------------------------------------- # PDF utilities # --------------------------------------------------------------------------- class PDFDocument: """Thin wrapper around fitz.Document with chunking helpers.""" def __init__(self, path: Path): self.path = path self._doc = fitz.open(str(path)) self.total_pages = len(self._doc) @property def file_size_bytes(self) -> int: return self.path.stat().st_size def get_chunk_ranges(self, chunk_size: int) -> List[Tuple[int, int]]: """Return list of (start_page_0indexed, end_page_exclusive) tuples.""" ranges = [] for start in range(0, self.total_pages, chunk_size): end = min(start + chunk_size, self.total_pages) ranges.append((start, end)) return ranges def get_chunk_as_pdf_bytes(self, start: int, end: int) -> bytes: """Extract pages [start, end) into a new in-memory PDF.""" sub = fitz.open() sub.insert_pdf(self._doc, from_page=start, to_page=end - 1) return sub.write() def get_page_as_png_bytes(self, page_idx: int, dpi: int = IMAGE_DPI) -> bytes: """Render a single page to PNG bytes at given DPI.""" page = self._doc[page_idx] mat = fitz.Matrix(dpi / 72, dpi / 72) pix = page.get_pixmap(matrix=mat, alpha=False) return pix.tobytes("png") def close(self) -> None: self._doc.close() def __enter__(self): return self def __exit__(self, *_): self.close() # --------------------------------------------------------------------------- # Extraction prompts # --------------------------------------------------------------------------- SYSTEM_PROMPT = """You are an expert scientific document analyst specializing in atomically parsing complex academic and technical PDFs. Your extractions must be: - Complete: capture every equation, table, figure, and algorithm - Faithful: never invent or hallucinate content - Precise: reproduce equations in proper LaTeX - Structured: respond only with valid JSON matching the schema provided Do NOT add prose outside the JSON response. If a field has no content, use an empty list [] or empty string "" rather than null.""" PAGE_EXTRACTION_PROMPT = """\ Atomically parse the provided PDF page(s) and return a JSON object that matches this schema exactly: { "raw_text": "", "summary": "<2-4 sentence factual summary of this page>", "section_headers": ["
", ...], "keywords": ["", ...], "layout_notes": "", "equations": [ { "index": , "latex": "", "description": "", "inline": } ], "tables": [ { "index": , "markdown": "", "json_data": [{"col1": "val", ...}, ...], "caption": "" } ], "algorithms": [ { "index": , "name": "", "language": "", "code": "", "description": "" } ], "figures": [ { "index": , "figure_type": "", "description": "", "data_summary": "", "caption": "
" } ], "references": [""] } Rules: 1. Every equation MUST have LaTeX. Use \\frac, \\sum, \\int, \\mathbf etc. for proper notation. 2. Tables must be fully reproduced in both Markdown and as list-of-dicts. 3. Algorithms must preserve all steps, loops, conditions verbatim. 4. Figures: describe them as if for a blind reader — quantitative values, trends, colors, labels. 5. raw_text must include ALL text visible on the page, including headers, footers, captions. 6. Do NOT summarize or truncate any content. """ DOCUMENT_META_PROMPT = """\ Based on the document pages you have seen, extract high-level metadata as JSON: { "title": "", "authors": ["", ...], "abstract": "", "document_summary": "" } Respond with valid JSON only. """ # --------------------------------------------------------------------------- # Core parser # --------------------------------------------------------------------------- class AtomicPDFParser: """ Core parser that sends PDF chunks or page images to the Claude API and extracts structured content atomically. """ def __init__( self, api_key: Optional[str] = None, model: str = DEFAULT_MODEL_OPUS, mode: str = "native", # "native" | "image" chunk_size: int = CHUNK_SIZE_DEFAULT, cache_dir: Optional[Path] = None, verbose: bool = False, max_workers: int = 4, ): self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "") self.model = self._resolve_model(model) self.mode = mode self.chunk_size = chunk_size self.verbose = verbose self.max_workers = max_workers if not self.api_key: raise ValueError( "ANTHROPIC_API_KEY environment variable not set. " "Export it or pass api_key= to AtomicPDFParser." ) self.client = anthropic.Anthropic(api_key=self.api_key) cache_path = cache_dir or Path.home() / ".cache" / "pdf_atomic_parser" self.cache = ParseCache(cache_path) if verbose: logger.setLevel(logging.DEBUG) @staticmethod def _resolve_model(alias: str) -> str: mapping = { "opus": DEFAULT_MODEL_OPUS, "sonnet": DEFAULT_MODEL_SONNET, "haiku": DEFAULT_MODEL_HAIKU, } return mapping.get(alias.lower(), alias) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def parse( self, pdf_path: str | Path, page_range: Optional[Tuple[int, int]] = None, ) -> DocumentResult: """ Parse the entire document (or a page range) atomically. Parameters ---------- pdf_path : Path to the PDF file. page_range : Optional (start, end) 1-indexed inclusive page numbers. Returns ------- DocumentResult with full structured extraction. """ path = Path(pdf_path).resolve() if not path.exists(): raise FileNotFoundError(f"PDF not found: {path}") doc_hash = self.cache.file_hash(path) t_start = time.time() with PDFDocument(path) as pdf: total = pdf.total_pages if page_range: p_start = max(0, page_range[0] - 1) p_end = min(total, page_range[1]) else: p_start, p_end = 0, total chunks = [] for s in range(p_start, p_end, self.chunk_size): e = min(s + self.chunk_size, p_end) chunks.append((s, e)) page_results: List[PageResult] = [] with Progress( SpinnerColumn(), TextColumn("[bold cyan]{task.description}"), BarColumn(), MofNCompleteColumn(), TaskProgressColumn(), TimeElapsedColumn(), TimeRemainingColumn(), console=console, transient=False, ) as progress: task = progress.add_task( f"[cyan]Parsing {path.name}", total=len(chunks) ) for chunk_start, chunk_end in chunks: chunk_pages = self._parse_chunk( pdf, doc_hash, chunk_start, chunk_end ) page_results.extend(chunk_pages) progress.advance(task) # Build document-level metadata meta = self._extract_document_meta(page_results) doc_result = DocumentResult( document_path = str(path), document_hash = doc_hash, total_pages = total, pages_processed = len(page_results), model = self.model, processing_mode = self.mode, title = meta.get("title", ""), authors = meta.get("authors", []), abstract = meta.get("abstract", ""), document_summary = meta.get("document_summary", ""), page_results = page_results, total_equations = sum(len(p.equations) for p in page_results), total_tables = sum(len(p.tables) for p in page_results), total_algorithms = sum(len(p.algorithms) for p in page_results), total_figures = sum(len(p.figures) for p in page_results), total_tokens_used = sum(p.tokens_used for p in page_results), total_processing_time_s = time.time() - t_start, ) return doc_result def extract_equations(self, pdf_path: str | Path) -> List[EquationBlock]: result = self.parse(pdf_path) return [eq for p in result.page_results for eq in p.equations] def extract_tables(self, pdf_path: str | Path) -> List[TableBlock]: result = self.parse(pdf_path) return [tb for p in result.page_results for tb in p.tables] def extract_algorithms(self, pdf_path: str | Path) -> List[AlgorithmBlock]: result = self.parse(pdf_path) return [al for p in result.page_results for al in p.algorithms] def extract_figures(self, pdf_path: str | Path) -> List[FigureBlock]: result = self.parse(pdf_path) return [fg for p in result.page_results for fg in p.figures] def query(self, pdf_path: str | Path, question: str) -> str: """ Semantic query over cached parse results. Re-parses if not cached. """ result = self.parse(pdf_path) full_text = "\n\n".join( f"[Page {p.page_number}]\n{p.raw_text}" for p in result.page_results ) messages = [ { "role": "user", "content": ( f"Based on the following document content, answer this question " f"precisely and cite page numbers where relevant.\n\n" f"Question: {question}\n\n" f"Document content:\n{full_text[:60000]}" ), } ] resp = self.client.messages.create( model=self.model, max_tokens=2048, messages=messages, ) return resp.content[0].text # ------------------------------------------------------------------ # Internal methods # ------------------------------------------------------------------ def _parse_chunk( self, pdf: PDFDocument, doc_hash: str, chunk_start: int, chunk_end: int, ) -> List[PageResult]: """Parse a range of pages, using cache when available.""" results = [] pages_to_process = [] for pg in range(chunk_start, chunk_end): cached = self.cache.get_page(doc_hash, pg + 1, self.model, self.mode) if cached: logger.debug("Cache hit page %d", pg + 1) results.append(cached) else: pages_to_process.append(pg) if not pages_to_process: return results # Group consecutive un-cached pages into sub-chunks sub_chunks = self._group_consecutive(pages_to_process) for sub_start, sub_end in sub_chunks: sub_results = self._call_api_chunk(pdf, doc_hash, sub_start, sub_end) results.extend(sub_results) results.sort(key=lambda r: r.page_number) return results @staticmethod def _group_consecutive(pages: List[int]) -> List[Tuple[int, int]]: if not pages: return [] groups, start, prev = [], pages[0], pages[0] for p in pages[1:]: if p != prev + 1: groups.append((start, prev + 1)) start = p prev = p groups.append((start, prev + 1)) return groups def _call_api_chunk( self, pdf: PDFDocument, doc_hash: str, chunk_start: int, chunk_end: int, ) -> List[PageResult]: """Send pages to Claude API and parse response.""" t_start = time.time() if self.mode == "image": return self._call_api_as_images(pdf, doc_hash, chunk_start, chunk_end, t_start) else: return self._call_api_native(pdf, doc_hash, chunk_start, chunk_end, t_start) def _call_api_native( self, pdf: PDFDocument, doc_hash: str, chunk_start: int, chunk_end: int, t_start: float, ) -> List[PageResult]: chunk_bytes = pdf.get_chunk_as_pdf_bytes(chunk_start, chunk_end) b64_pdf = base64.standard_b64encode(chunk_bytes).decode("utf-8") num_pages = chunk_end - chunk_start prompt_suffix = ( f"\nThis PDF chunk contains pages {chunk_start + 1} to {chunk_end} " f"of the original document. " f"Return a JSON array with exactly {num_pages} page objects matching the schema. " f"Index them page_number={chunk_start + 1} through {chunk_end}." ) messages = [ { "role": "user", "content": [ { "type": "document", "source": { "type": "base64", "media_type": "application/pdf", "data": b64_pdf, }, "cache_control": {"type": "ephemeral"}, }, { "type": "text", "text": PAGE_EXTRACTION_PROMPT + prompt_suffix, }, ], } ] return self._execute_api_call(messages, doc_hash, chunk_start, chunk_end, t_start, "native") def _call_api_as_images( self, pdf: PDFDocument, doc_hash: str, chunk_start: int, chunk_end: int, t_start: float, ) -> List[PageResult]: content = [] for pg_idx in range(chunk_start, chunk_end): png_bytes = pdf.get_page_as_png_bytes(pg_idx, dpi=IMAGE_DPI) b64_img = base64.standard_b64encode(png_bytes).decode("utf-8") content.append({ "type": "text", "text": f"--- Page {pg_idx + 1} ---", }) content.append({ "type": "image", "source": { "type": "base64", "media_type": "image/png", "data": b64_img, }, }) num_pages = chunk_end - chunk_start prompt_suffix = ( f"\nThese are page images {chunk_start + 1} through {chunk_end}. " f"Return a JSON array with exactly {num_pages} page objects matching the schema. " f"Index them page_number={chunk_start + 1} through {chunk_end}." ) content.append({"type": "text", "text": PAGE_EXTRACTION_PROMPT + prompt_suffix}) messages = [{"role": "user", "content": content}] return self._execute_api_call(messages, doc_hash, chunk_start, chunk_end, t_start, "image") def _execute_api_call( self, messages: List[Dict], doc_hash: str, chunk_start: int, chunk_end: int, t_start: float, mode: str, ) -> List[PageResult]: retries, delay = 3, 5 for attempt in range(retries): try: resp = self.client.messages.create( model=self.model, max_tokens=MAX_TOKENS_OUTPUT, system=SYSTEM_PROMPT, messages=messages, ) break except anthropic.RateLimitError: if attempt == retries - 1: raise logger.warning("Rate limit hit; retrying in %ds...", delay) time.sleep(delay) delay *= 2 except anthropic.APIStatusError as exc: logger.error("API error: %s", exc) raise raw_response = resp.content[0].text.strip() tokens_used = resp.usage.input_tokens + resp.usage.output_tokens elapsed = time.time() - t_start # Clean possible markdown fences if raw_response.startswith("```"): lines = raw_response.split("\n") raw_response = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) try: parsed = json.loads(raw_response) except json.JSONDecodeError as exc: logger.error("JSON parse error on API response: %s\nRaw:\n%s", exc, raw_response[:500]) # Return minimal fallback for affected pages return [ PageResult( page_number=pg + 1, raw_text="[PARSE ERROR: JSON decode failed]", summary="Failed to parse this page.", processing_mode=mode, tokens_used=tokens_used // max(1, chunk_end - chunk_start), processing_time_s=elapsed, ) for pg in range(chunk_start, chunk_end) ] # Handle both array-of-pages and single-page responses if isinstance(parsed, dict): parsed = [parsed] results = [] for i, page_data in enumerate(parsed): pg_num = chunk_start + i + 1 page_data["page_number"] = pg_num page_data["processing_mode"] = mode page_data["tokens_used"] = tokens_used // len(parsed) page_data["processing_time_s"] = elapsed / len(parsed) pr = self._dict_to_page_result(page_data) self.cache.set_page(doc_hash, pr, self.model, mode) results.append(pr) return results @staticmethod def _dict_to_page_result(d: Dict) -> PageResult: equations = [ EquationBlock( page=d["page_number"], index=e.get("index", i), latex=e.get("latex", ""), description=e.get("description", ""), inline=e.get("inline", False), ) for i, e in enumerate(d.get("equations", [])) ] tables = [ TableBlock( page=d["page_number"], index=t.get("index", i), markdown=t.get("markdown", ""), json_data=t.get("json_data", []), caption=t.get("caption", ""), ) for i, t in enumerate(d.get("tables", [])) ] algorithms = [ AlgorithmBlock( page=d["page_number"], index=a.get("index", i), name=a.get("name", f"Algorithm {i+1}"), language=a.get("language", "pseudocode"), code=a.get("code", ""), description=a.get("description", ""), ) for i, a in enumerate(d.get("algorithms", [])) ] figures = [ FigureBlock( page=d["page_number"], index=f.get("index", i), figure_type=f.get("figure_type", "other"), description=f.get("description", ""), data_summary=f.get("data_summary", ""), caption=f.get("caption", ""), ) for i, f in enumerate(d.get("figures", [])) ] return PageResult( page_number = d["page_number"], raw_text = d.get("raw_text", ""), summary = d.get("summary", ""), equations = equations, tables = tables, algorithms = algorithms, figures = figures, section_headers = d.get("section_headers", []), references = d.get("references", []), keywords = d.get("keywords", []), layout_notes = d.get("layout_notes", ""), processing_mode = d.get("processing_mode", "native"), tokens_used = d.get("tokens_used", 0), processing_time_s = d.get("processing_time_s", 0.0), ) def _extract_document_meta(self, page_results: List[PageResult]) -> Dict: # Use first 5 pages for metadata extraction sample_text = "\n\n".join( f"[Page {p.page_number}]\n{p.raw_text}" for p in page_results[:5] ) messages = [ { "role": "user", "content": ( f"{DOCUMENT_META_PROMPT}\n\nDocument sample:\n{sample_text[:8000]}" ), } ] try: resp = self.client.messages.create( model=self.model, max_tokens=1024, system=SYSTEM_PROMPT, messages=messages, ) raw = resp.content[0].text.strip() if raw.startswith("```"): lines = raw.split("\n") raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) return json.loads(raw) except Exception as exc: logger.warning("Document meta extraction failed: %s", exc) return {"title": "", "authors": [], "abstract": "", "document_summary": ""} # --------------------------------------------------------------------------- # Output formatters # --------------------------------------------------------------------------- class OutputFormatter: @staticmethod def to_json(result: DocumentResult, indent: int = 2) -> str: return json.dumps(asdict(result), indent=indent, ensure_ascii=False) @staticmethod def to_markdown(result: DocumentResult) -> str: lines = [] lines.append(f"# {result.title or Path(result.document_path).name}") if result.authors: lines.append(f"\n**Authors:** {', '.join(result.authors)}") lines.append(f"\n**Document Hash:** `{result.document_hash}`") lines.append(f"**Model:** {result.model} | **Mode:** {result.processing_mode}") lines.append( f"**Pages:** {result.pages_processed}/{result.total_pages} | " f"**Tokens:** {result.total_tokens_used:,} | " f"**Time:** {result.total_processing_time_s:.1f}s" ) lines.append( f"**Equations:** {result.total_equations} | " f"**Tables:** {result.total_tables} | " f"**Algorithms:** {result.total_algorithms} | " f"**Figures:** {result.total_figures}" ) if result.abstract: lines.append(f"\n## Abstract\n\n{result.abstract}") if result.document_summary: lines.append(f"\n## Document Summary\n\n{result.document_summary}") for page in result.page_results: lines.append(f"\n---\n\n## Page {page.page_number}") if page.section_headers: lines.append("\n### Sections\n" + "\n".join(f"- {h}" for h in page.section_headers)) lines.append(f"\n### Summary\n{page.summary}") lines.append(f"\n### Full Text\n\n{page.raw_text}") if page.equations: lines.append("\n### Equations\n") for eq in page.equations: lines.append(f"**Eq {eq.index}** ({('inline' if eq.inline else 'display')})") lines.append(f"```latex\n{eq.latex}\n```") lines.append(f"*{eq.description}*\n") if page.tables: lines.append("\n### Tables\n") for tb in page.tables: if tb.caption: lines.append(f"**{tb.caption}**\n") lines.append(tb.markdown + "\n") if page.algorithms: lines.append("\n### Algorithms\n") for al in page.algorithms: lines.append(f"**{al.name}** ({al.language})\n") lines.append(f"```{al.language}\n{al.code}\n```") lines.append(f"*{al.description}*\n") if page.figures: lines.append("\n### Figures\n") for fg in page.figures: lines.append(f"**Figure {fg.index}** [{fg.figure_type}]") if fg.caption: lines.append(f"*{fg.caption}*") lines.append(fg.description) if fg.data_summary: lines.append(f"Data: {fg.data_summary}\n") return "\n".join(lines) @staticmethod def to_text(result: DocumentResult) -> str: lines = [ f"DOCUMENT: {result.title or Path(result.document_path).name}", f"Authors: {', '.join(result.authors)}", f"Pages processed: {result.pages_processed}/{result.total_pages}", "", "SUMMARY", "=" * 60, result.document_summary, "", ] for page in result.page_results: lines.append(f"\n[PAGE {page.page_number}]") lines.append(page.raw_text) return "\n".join(lines) @staticmethod def print_summary_table(result: DocumentResult) -> None: table = Table(title=f"Parse Results: {Path(result.document_path).name}", show_lines=True) table.add_column("Metric", style="cyan", no_wrap=True) table.add_column("Value", style="green") table.add_row("Title", result.title or "(unknown)") table.add_row("Authors", ", ".join(result.authors) or "(unknown)") table.add_row("Model", result.model) table.add_row("Mode", result.processing_mode) table.add_row("Pages total", str(result.total_pages)) table.add_row("Pages parsed", str(result.pages_processed)) table.add_row("Equations", str(result.total_equations)) table.add_row("Tables", str(result.total_tables)) table.add_row("Algorithms", str(result.total_algorithms)) table.add_row("Figures", str(result.total_figures)) table.add_row("Tokens used", f"{result.total_tokens_used:,}") table.add_row("Processing time", f"{result.total_processing_time_s:.1f}s") table.add_row("Document hash", result.document_hash) console.print(table) # --------------------------------------------------------------------------- # Agent interface # --------------------------------------------------------------------------- class AgentPDFInterface: """ High-level interface designed for use within agent pipelines. All methods accept a file path and return serializable Python objects. Example usage in an agent: from pdf_atomic_parser import AgentPDFInterface agent = AgentPDFInterface(model="opus") full = agent.parse("paper.pdf") eqs = agent.get_equations("paper.pdf") answer = agent.ask("paper.pdf", "What is the loss function?") """ def __init__(self, **kwargs): self._parser = AtomicPDFParser(**kwargs) def parse(self, pdf_path: str, page_range: Optional[Tuple[int, int]] = None) -> Dict: result = self._parser.parse(pdf_path, page_range) return asdict(result) def get_equations(self, pdf_path: str) -> List[Dict]: return [asdict(e) for e in self._parser.extract_equations(pdf_path)] def get_tables(self, pdf_path: str) -> List[Dict]: return [asdict(t) for t in self._parser.extract_tables(pdf_path)] def get_algorithms(self, pdf_path: str) -> List[Dict]: return [asdict(a) for a in self._parser.extract_algorithms(pdf_path)] def get_figures(self, pdf_path: str) -> List[Dict]: return [asdict(f) for f in self._parser.extract_figures(pdf_path)] def ask(self, pdf_path: str, question: str) -> str: return self._parser.query(pdf_path, question) def get_full_text(self, pdf_path: str) -> str: result = self._parser.parse(pdf_path) return "\n\n".join( f"[Page {p.page_number}]\n{p.raw_text}" for p in result.page_results ) def cache_stats(self) -> Dict: return self._parser.cache.stats() # --------------------------------------------------------------------------- # Batch processor # --------------------------------------------------------------------------- def batch_process( input_dir: Path, output_dir: Path, parser: AtomicPDFParser, fmt: str = "json", ) -> None: pdfs = sorted(input_dir.glob("**/*.pdf")) if not pdfs: console.print(f"[yellow]No PDF files found in {input_dir}[/yellow]") return output_dir.mkdir(parents=True, exist_ok=True) console.print(f"[cyan]Found {len(pdfs)} PDF files to process.[/cyan]") for pdf_path in pdfs: console.print(f"\n[bold]Processing:[/bold] {pdf_path.name}") try: result = parser.parse(pdf_path) stem = pdf_path.stem if fmt == "json": out = output_dir / f"{stem}.json" out.write_text(OutputFormatter.to_json(result), encoding="utf-8") elif fmt == "markdown": out = output_dir / f"{stem}.md" out.write_text(OutputFormatter.to_markdown(result), encoding="utf-8") else: out = output_dir / f"{stem}.txt" out.write_text(OutputFormatter.to_text(result), encoding="utf-8") console.print(f" [green]Saved:[/green] {out}") OutputFormatter.print_summary_table(result) except Exception as exc: console.print(f" [red]Error processing {pdf_path.name}: {exc}[/red]") logger.exception("Batch error") # --------------------------------------------------------------------------- # Token estimator # --------------------------------------------------------------------------- def estimate_tokens(pdf_path: Path) -> None: with PDFDocument(pdf_path) as pdf: total = pdf.total_pages size_mb = pdf.file_size_bytes / 1e6 # Rough estimate: ~800 tokens per page for dense academic content est_tokens_in = total * 800 est_tokens_out = total * 400 est_total = est_tokens_in + est_tokens_out # Pricing approximate (Opus: $15/Mtok in, $75/Mtok out as of 2025) est_cost_opus = (est_tokens_in * 15 + est_tokens_out * 75) / 1_000_000 table = Table(title=f"Token Estimate: {pdf_path.name}", show_lines=True) table.add_column("Metric", style="cyan") table.add_column("Estimate", style="yellow") table.add_row("Total pages", str(total)) table.add_row("File size", f"{size_mb:.2f} MB") table.add_row("Est. input tokens", f"{est_tokens_in:,}") table.add_row("Est. output tokens", f"{est_tokens_out:,}") table.add_row("Est. total tokens", f"{est_total:,}") table.add_row("Est. cost (Opus)", f"${est_cost_opus:.2f}") table.add_row("Note", "Estimate only; actual usage varies") console.print(table) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def build_cli() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="pdf_atomic_parser", description="Atomic PDF parser powered by Claude claude-opus-4-6", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--model", default="opus", help="opus | sonnet | haiku | full-model-string") parser.add_argument("--mode", default="native", choices=["native", "image"], help="Parsing mode") parser.add_argument("--chunk-size", type=int, default=CHUNK_SIZE_DEFAULT, help="Pages per API call") parser.add_argument("--verbose", action="store_true") sub = parser.add_subparsers(dest="command", required=True) # parse p_parse = sub.add_parser("parse", help="Parse a PDF fully") p_parse.add_argument("pdf", help="Path to PDF file") p_parse.add_argument("--output", "-o", help="Output file path") p_parse.add_argument("--format", "-f", default="json", choices=["json", "markdown", "text"]) p_parse.add_argument("--pages", help="Page range e.g. 1-50") # atomic (alias for parse with all content) p_atomic = sub.add_parser("atomic", help="Full atomic extraction to directory") p_atomic.add_argument("pdf", help="Path to PDF file") p_atomic.add_argument("--output", "-o", default="./atomic_output") # extract-equations p_eq = sub.add_parser("extract-equations", help="Extract LaTeX equations") p_eq.add_argument("pdf") p_eq.add_argument("--output", "-o") # extract-tables p_tb = sub.add_parser("extract-tables", help="Extract tables") p_tb.add_argument("pdf") p_tb.add_argument("--output", "-o") # extract-algorithms p_al = sub.add_parser("extract-algorithms", help="Extract algorithms/code") p_al.add_argument("pdf") p_al.add_argument("--output", "-o") # extract-figures p_fg = sub.add_parser("extract-figures", help="Extract figure descriptions") p_fg.add_argument("pdf") p_fg.add_argument("--output", "-o") # query p_q = sub.add_parser("query", help="Ask a question about the PDF") p_q.add_argument("pdf") p_q.add_argument("question", help="Question to ask") # batch p_batch = sub.add_parser("batch", help="Batch process a directory of PDFs") p_batch.add_argument("directory") p_batch.add_argument("--output", "-o", default="./batch_output") p_batch.add_argument("--format", "-f", default="json", choices=["json", "markdown", "text"]) # estimate p_est = sub.add_parser("estimate", help="Estimate token cost before parsing") p_est.add_argument("pdf") # cache commands sub.add_parser("cache-stats", help="Show cache statistics") sub.add_parser("list-cache", help="List all cached documents") p_cc = sub.add_parser("clear-cache", help="Clear cache for a document") p_cc.add_argument("pdf", help="PDF path (to identify document)") return parser def parse_page_range(s: str) -> Tuple[int, int]: parts = s.split("-") if len(parts) != 2: raise ValueError(f"Page range must be in format start-end, got: {s}") return int(parts[0]), int(parts[1]) def save_output(content: str, output_path: Optional[str], default_name: str) -> None: path = Path(output_path) if output_path else Path(default_name) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") console.print(f"[green]Saved:[/green] {path}") def main() -> None: cli = build_cli() args = cli.parse_args() cache = ParseCache(Path.home() / ".cache" / "pdf_atomic_parser") if args.command == "cache-stats": stats = cache.stats() table = Table(title="Cache Statistics", show_lines=True) table.add_column("Key", style="cyan") table.add_column("Value", style="green") for k, v in stats.items(): table.add_row(k.replace("_", " ").title(), str(v)) console.print(table) return if args.command == "list-cache": docs = cache.list_documents() if not docs: console.print("[yellow]Cache is empty.[/yellow]") return table = Table(title="Cached Documents", show_lines=True) table.add_column("Hash", style="cyan") table.add_column("Cached Pages", style="green") table.add_column("First Seen", style="dim") for d in docs: import datetime ts = datetime.datetime.fromtimestamp(d["first_seen"]).strftime("%Y-%m-%d %H:%M") table.add_row(d["hash"], str(d["cached_pages"]), ts) console.print(table) return if args.command == "estimate": estimate_tokens(Path(args.pdf)) return parser = AtomicPDFParser( model=args.model, mode=args.mode, chunk_size=args.chunk_size, verbose=args.verbose, ) if args.command == "clear-cache": doc_hash = cache.file_hash(Path(args.pdf)) n = cache.clear_document(doc_hash) console.print(f"[green]Cleared {n} cached pages for {Path(args.pdf).name}[/green]") return if args.command in ("parse", "atomic"): page_range = None if hasattr(args, "pages") and args.pages: page_range = parse_page_range(args.pages) result = parser.parse(args.pdf, page_range) OutputFormatter.print_summary_table(result) if args.command == "atomic": out_dir = Path(args.output) stem = Path(args.pdf).stem for fmt, fn in [("json", f"{stem}.json"), ("markdown", f"{stem}.md"), ("text", f"{stem}.txt")]: (out_dir / fn).parent.mkdir(parents=True, exist_ok=True) if fmt == "json": content = OutputFormatter.to_json(result) elif fmt == "markdown": content = OutputFormatter.to_markdown(result) else: content = OutputFormatter.to_text(result) (out_dir / fn).write_text(content, encoding="utf-8") console.print(f"[green]Saved {fmt}:[/green] {out_dir / fn}") else: fmt = args.format if fmt == "json": content = OutputFormatter.to_json(result) elif fmt == "markdown": content = OutputFormatter.to_markdown(result) else: content = OutputFormatter.to_text(result) stem = Path(args.pdf).stem save_output(content, getattr(args, "output", None), f"{stem}_parsed.{fmt if fmt != 'markdown' else 'md'}") elif args.command == "extract-equations": result = parser.parse(args.pdf) eqs = [asdict(e) for p in result.page_results for e in p.equations] content = json.dumps(eqs, indent=2, ensure_ascii=False) save_output(content, args.output, f"{Path(args.pdf).stem}_equations.json") console.print(f"[cyan]{len(eqs)} equations extracted.[/cyan]") elif args.command == "extract-tables": result = parser.parse(args.pdf) tables = [asdict(t) for p in result.page_results for t in p.tables] content = json.dumps(tables, indent=2, ensure_ascii=False) save_output(content, args.output, f"{Path(args.pdf).stem}_tables.json") console.print(f"[cyan]{len(tables)} tables extracted.[/cyan]") elif args.command == "extract-algorithms": result = parser.parse(args.pdf) algos = [asdict(a) for p in result.page_results for a in p.algorithms] content = json.dumps(algos, indent=2, ensure_ascii=False) save_output(content, args.output, f"{Path(args.pdf).stem}_algorithms.json") console.print(f"[cyan]{len(algos)} algorithms extracted.[/cyan]") elif args.command == "extract-figures": result = parser.parse(args.pdf) figures = [asdict(f) for p in result.page_results for f in p.figures] content = json.dumps(figures, indent=2, ensure_ascii=False) save_output(content, args.output, f"{Path(args.pdf).stem}_figures.json") console.print(f"[cyan]{len(figures)} figures extracted.[/cyan]") elif args.command == "query": answer = parser.query(args.pdf, args.question) console.print(f"\n[bold cyan]Answer:[/bold cyan]\n{answer}") elif args.command == "batch": batch_process( Path(args.directory), Path(args.output), parser, getattr(args, "format", "json"), ) if __name__ == "__main__": main()