| | """ |
| | pdf_atomic_parser.py |
| | ==================== |
| | Author : algorembrant |
| | Version : 1.0.0 |
| | License : MIT |
| | |
| | DESCRIPTION |
| | ----------- |
| | Atomically parse and understand complex PDF documents using Claude claude-opus-4-6. |
| | Handles equations, graphs, algorithms, unique drawings, tables, multi-column |
| | layouts, and 100+ page documents without hallucination. Designed for local |
| | agent pipelines. |
| | |
| | CAPABILITIES |
| | ------------ |
| | - Native PDF document API (base64) with prompt caching |
| | - Page-as-image fallback using PyMuPDF at 300 DPI for max fidelity |
| | - LaTeX equation extraction |
| | - Table extraction (Markdown + JSON) |
| | - Algorithm and pseudocode extraction |
| | - Figure and graph semantic description |
| | - Multi-column and complex layout handling |
| | - Chunked processing for 100+ page documents |
| | - SQLite-backed cache to avoid re-processing pages |
| | - Structured JSON output per page and full document |
| | - Agent-callable interface (AgentPDFInterface) |
| | - Async batch processing for speed |
| | |
| | USAGE COMMANDS |
| | -------------- |
| | # Parse a PDF and save structured JSON |
| | python pdf_atomic_parser.py parse document.pdf |
| | |
| | # Parse with verbose output |
| | python pdf_atomic_parser.py parse document.pdf --verbose |
| | |
| | # Parse specific page range |
| | python pdf_atomic_parser.py parse document.pdf --pages 1-20 |
| | |
| | # Extract only equations (LaTeX) |
| | python pdf_atomic_parser.py extract-equations document.pdf |
| | |
| | # Extract only tables (Markdown) |
| | python pdf_atomic_parser.py extract-tables document.pdf |
| | |
| | # Extract only algorithms/code blocks |
| | python pdf_atomic_parser.py extract-algorithms document.pdf |
| | |
| | # Extract figures and graph descriptions |
| | python pdf_atomic_parser.py extract-figures document.pdf |
| | |
| | # Full atomic extraction (all content types) to output dir |
| | python pdf_atomic_parser.py atomic document.pdf --output ./results/ |
| | |
| | # Query a parsed PDF (semantic search over cached parse) |
| | python pdf_atomic_parser.py query document.pdf "What is the main theorem?" |
| | |
| | # Use faster/cheaper model (Sonnet instead of Opus) |
| | python pdf_atomic_parser.py parse document.pdf --model sonnet |
| | |
| | # Use page-as-image mode (higher fidelity for scanned/complex PDFs) |
| | python pdf_atomic_parser.py parse document.pdf --mode image |
| | |
| | # Use native PDF mode (default, faster) |
| | python pdf_atomic_parser.py parse document.pdf --mode native |
| | |
| | # Set chunk size for large PDFs (default 20 pages per chunk) |
| | python pdf_atomic_parser.py parse document.pdf --chunk-size 10 |
| | |
| | # Clear cache for a document |
| | python pdf_atomic_parser.py clear-cache document.pdf |
| | |
| | # Show cache stats |
| | python pdf_atomic_parser.py cache-stats |
| | |
| | # List all cached documents |
| | python pdf_atomic_parser.py list-cache |
| | |
| | # Batch process a directory of PDFs |
| | python pdf_atomic_parser.py batch ./pdf_folder/ --output ./results/ |
| | |
| | # Export parse results as Markdown report |
| | python pdf_atomic_parser.py parse document.pdf --format markdown |
| | |
| | # Export as plain text |
| | python pdf_atomic_parser.py parse document.pdf --format text |
| | |
| | # Show token usage estimate before parsing |
| | python pdf_atomic_parser.py estimate document.pdf |
| | |
| | # Agent interface example (programmatic) |
| | # from pdf_atomic_parser import AgentPDFInterface |
| | # agent = AgentPDFInterface() |
| | # result = agent.parse("document.pdf") |
| | # equations = agent.get_equations("document.pdf") |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import asyncio |
| | import base64 |
| | import hashlib |
| | import json |
| | import logging |
| | import os |
| | import sqlite3 |
| | import sys |
| | import time |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | from dataclasses import asdict, dataclass, field |
| | from pathlib import Path |
| | from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple |
| |
|
| | import anthropic |
| | import fitz |
| | from rich.console import Console |
| | from rich.logging import RichHandler |
| | from rich.progress import ( |
| | BarColumn, |
| | MofNCompleteColumn, |
| | Progress, |
| | SpinnerColumn, |
| | TaskProgressColumn, |
| | TextColumn, |
| | TimeElapsedColumn, |
| | TimeRemainingColumn, |
| | ) |
| | from rich.table import Table |
| | from tqdm import tqdm |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | DEFAULT_MODEL_OPUS = "claude-opus-4-6" |
| | DEFAULT_MODEL_SONNET = "claude-sonnet-4-6" |
| | DEFAULT_MODEL_HAIKU = "claude-haiku-4-5-20251001" |
| |
|
| | MAX_TOKENS_OUTPUT = 8192 |
| | CHUNK_SIZE_DEFAULT = 20 |
| | IMAGE_DPI = 300 |
| | MAX_PDF_SIZE_BYTES = 32 * 1024 * 1024 |
| | MAX_PDF_PAGES_NATIVE = 100 |
| | CACHE_DB_NAME = ".pdf_parser_cache.db" |
| | LOG_FORMAT = "%(message)s" |
| |
|
| | console = Console() |
| |
|
| | logging.basicConfig( |
| | level=logging.WARNING, |
| | format=LOG_FORMAT, |
| | handlers=[RichHandler(console=console, rich_tracebacks=True, show_path=False)], |
| | ) |
| | logger = logging.getLogger("pdf_atomic_parser") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class EquationBlock: |
| | page: int |
| | index: int |
| | latex: str |
| | description: str |
| | inline: bool = False |
| |
|
| |
|
| | @dataclass |
| | class TableBlock: |
| | page: int |
| | index: int |
| | markdown: str |
| | json_data: List[Dict] |
| | caption: str = "" |
| |
|
| |
|
| | @dataclass |
| | class AlgorithmBlock: |
| | page: int |
| | index: int |
| | name: str |
| | language: str |
| | code: str |
| | description: str |
| |
|
| |
|
| | @dataclass |
| | class FigureBlock: |
| | page: int |
| | index: int |
| | figure_type: str |
| | description: str |
| | data_summary: str |
| | caption: str = "" |
| |
|
| |
|
| | @dataclass |
| | class PageResult: |
| | page_number: int |
| | raw_text: str |
| | summary: str |
| | equations: List[EquationBlock] = field(default_factory=list) |
| | tables: List[TableBlock] = field(default_factory=list) |
| | algorithms: List[AlgorithmBlock] = field(default_factory=list) |
| | figures: List[FigureBlock] = field(default_factory=list) |
| | section_headers: List[str] = field(default_factory=list) |
| | references: List[str] = field(default_factory=list) |
| | keywords: List[str] = field(default_factory=list) |
| | layout_notes: str = "" |
| | processing_mode: str = "native" |
| | tokens_used: int = 0 |
| | processing_time_s: float = 0.0 |
| |
|
| |
|
| | @dataclass |
| | class DocumentResult: |
| | document_path: str |
| | document_hash: str |
| | total_pages: int |
| | pages_processed: int |
| | model: str |
| | processing_mode: str |
| | title: str |
| | authors: List[str] |
| | abstract: str |
| | document_summary: str |
| | page_results: List[PageResult] = field(default_factory=list) |
| | total_equations: int = 0 |
| | total_tables: int = 0 |
| | total_algorithms: int = 0 |
| | total_figures: int = 0 |
| | total_tokens_used: int = 0 |
| | total_processing_time_s: float = 0.0 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ParseCache: |
| | """SQLite-backed cache for parsed page results.""" |
| |
|
| | def __init__(self, cache_dir: Path): |
| | cache_dir.mkdir(parents=True, exist_ok=True) |
| | self.db_path = cache_dir / CACHE_DB_NAME |
| | self._init_db() |
| |
|
| | def _init_db(self) -> None: |
| | with self._connect() as conn: |
| | conn.execute(""" |
| | CREATE TABLE IF NOT EXISTS page_cache ( |
| | doc_hash TEXT NOT NULL, |
| | page_num INTEGER NOT NULL, |
| | model TEXT NOT NULL, |
| | mode TEXT NOT NULL, |
| | result_json TEXT NOT NULL, |
| | created_at REAL NOT NULL, |
| | PRIMARY KEY (doc_hash, page_num, model, mode) |
| | ) |
| | """) |
| | conn.execute(""" |
| | CREATE TABLE IF NOT EXISTS doc_meta ( |
| | doc_hash TEXT PRIMARY KEY, |
| | doc_path TEXT NOT NULL, |
| | total_pages INTEGER NOT NULL, |
| | created_at REAL NOT NULL |
| | ) |
| | """) |
| |
|
| | def _connect(self) -> sqlite3.Connection: |
| | conn = sqlite3.connect(self.db_path, timeout=30) |
| | conn.execute("PRAGMA journal_mode=WAL") |
| | return conn |
| |
|
| | @staticmethod |
| | def file_hash(path: Path) -> str: |
| | h = hashlib.sha256() |
| | with open(path, "rb") as fh: |
| | for chunk in iter(lambda: fh.read(65536), b""): |
| | h.update(chunk) |
| | return h.hexdigest()[:16] |
| |
|
| | def get_page(self, doc_hash: str, page_num: int, model: str, mode: str) -> Optional[PageResult]: |
| | with self._connect() as conn: |
| | row = conn.execute( |
| | "SELECT result_json FROM page_cache WHERE doc_hash=? AND page_num=? AND model=? AND mode=?", |
| | (doc_hash, page_num, model, mode), |
| | ).fetchone() |
| | if row: |
| | return self._deserialize_page(json.loads(row[0])) |
| | return None |
| |
|
| | def set_page(self, doc_hash: str, result: PageResult, model: str, mode: str) -> None: |
| | with self._connect() as conn: |
| | conn.execute( |
| | "INSERT OR REPLACE INTO page_cache VALUES (?,?,?,?,?,?)", |
| | (doc_hash, result.page_number, model, mode, |
| | json.dumps(self._serialize_page(result)), time.time()), |
| | ) |
| |
|
| | def clear_document(self, doc_hash: str) -> int: |
| | with self._connect() as conn: |
| | cur = conn.execute("DELETE FROM page_cache WHERE doc_hash=?", (doc_hash,)) |
| | conn.execute("DELETE FROM doc_meta WHERE doc_hash=?", (doc_hash,)) |
| | return cur.rowcount |
| |
|
| | def stats(self) -> Dict[str, Any]: |
| | with self._connect() as conn: |
| | total = conn.execute("SELECT COUNT(*) FROM page_cache").fetchone()[0] |
| | docs = conn.execute("SELECT COUNT(DISTINCT doc_hash) FROM page_cache").fetchone()[0] |
| | size = self.db_path.stat().st_size if self.db_path.exists() else 0 |
| | return {"total_cached_pages": total, "unique_documents": docs, "cache_size_mb": round(size / 1e6, 2)} |
| |
|
| | def list_documents(self) -> List[Dict]: |
| | with self._connect() as conn: |
| | rows = conn.execute(""" |
| | SELECT doc_hash, COUNT(*) as pages, MIN(created_at) as first_seen |
| | FROM page_cache GROUP BY doc_hash |
| | """).fetchall() |
| | return [{"hash": r[0], "cached_pages": r[1], "first_seen": r[2]} for r in rows] |
| |
|
| | |
| |
|
| | @staticmethod |
| | def _serialize_page(p: PageResult) -> Dict: |
| | d = asdict(p) |
| | return d |
| |
|
| | @staticmethod |
| | def _deserialize_page(d: Dict) -> PageResult: |
| | d["equations"] = [EquationBlock(**e) for e in d.get("equations", [])] |
| | d["tables"] = [TableBlock(**t) for t in d.get("tables", [])] |
| | d["algorithms"] = [AlgorithmBlock(**a) for a in d.get("algorithms", [])] |
| | d["figures"] = [FigureBlock(**f) for f in d.get("figures", [])] |
| | return PageResult(**d) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class PDFDocument: |
| | """Thin wrapper around fitz.Document with chunking helpers.""" |
| |
|
| | def __init__(self, path: Path): |
| | self.path = path |
| | self._doc = fitz.open(str(path)) |
| | self.total_pages = len(self._doc) |
| |
|
| | @property |
| | def file_size_bytes(self) -> int: |
| | return self.path.stat().st_size |
| |
|
| | def get_chunk_ranges(self, chunk_size: int) -> List[Tuple[int, int]]: |
| | """Return list of (start_page_0indexed, end_page_exclusive) tuples.""" |
| | ranges = [] |
| | for start in range(0, self.total_pages, chunk_size): |
| | end = min(start + chunk_size, self.total_pages) |
| | ranges.append((start, end)) |
| | return ranges |
| |
|
| | def get_chunk_as_pdf_bytes(self, start: int, end: int) -> bytes: |
| | """Extract pages [start, end) into a new in-memory PDF.""" |
| | sub = fitz.open() |
| | sub.insert_pdf(self._doc, from_page=start, to_page=end - 1) |
| | return sub.write() |
| |
|
| | def get_page_as_png_bytes(self, page_idx: int, dpi: int = IMAGE_DPI) -> bytes: |
| | """Render a single page to PNG bytes at given DPI.""" |
| | page = self._doc[page_idx] |
| | mat = fitz.Matrix(dpi / 72, dpi / 72) |
| | pix = page.get_pixmap(matrix=mat, alpha=False) |
| | return pix.tobytes("png") |
| |
|
| | def close(self) -> None: |
| | self._doc.close() |
| |
|
| | def __enter__(self): |
| | return self |
| |
|
| | def __exit__(self, *_): |
| | self.close() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | SYSTEM_PROMPT = """You are an expert scientific document analyst specializing in atomically |
| | parsing complex academic and technical PDFs. Your extractions must be: |
| | - Complete: capture every equation, table, figure, and algorithm |
| | - Faithful: never invent or hallucinate content |
| | - Precise: reproduce equations in proper LaTeX |
| | - Structured: respond only with valid JSON matching the schema provided |
| | |
| | Do NOT add prose outside the JSON response. If a field has no content, use an |
| | empty list [] or empty string "" rather than null.""" |
| |
|
| | PAGE_EXTRACTION_PROMPT = """\ |
| | Atomically parse the provided PDF page(s) and return a JSON object that matches |
| | this schema exactly: |
| | |
| | { |
| | "raw_text": "<full verbatim text extracted from page, preserving paragraphs>", |
| | "summary": "<2-4 sentence factual summary of this page>", |
| | "section_headers": ["<header string>", ...], |
| | "keywords": ["<important technical term>", ...], |
| | "layout_notes": "<describe columns, special layouts, footnotes, margin notes>", |
| | "equations": [ |
| | { |
| | "index": <int starting at 0>, |
| | "latex": "<complete LaTeX representation>", |
| | "description": "<what this equation represents>", |
| | "inline": <true if inline, false if display/block> |
| | } |
| | ], |
| | "tables": [ |
| | { |
| | "index": <int>, |
| | "markdown": "<GitHub-flavored Markdown table>", |
| | "json_data": [{"col1": "val", ...}, ...], |
| | "caption": "<table caption or empty string>" |
| | } |
| | ], |
| | "algorithms": [ |
| | { |
| | "index": <int>, |
| | "name": "<algorithm name or Algorithm N>", |
| | "language": "<pseudocode | python | cpp | generic | etc.>", |
| | "code": "<verbatim algorithm text, preserve indentation>", |
| | "description": "<what this algorithm does>" |
| | } |
| | ], |
| | "figures": [ |
| | { |
| | "index": <int>, |
| | "figure_type": "<chart | bar_chart | line_chart | scatter_plot | histogram | diagram | flowchart | neural_network | tree | graph | drawing | photograph | heatmap | 3d_plot | other>", |
| | "description": "<detailed semantic description of the visual>", |
| | "data_summary": "<describe axes, units, trend, key values if quantitative>", |
| | "caption": "<figure caption or empty string>" |
| | } |
| | ], |
| | "references": ["<any in-text citation or bibliography entry on this page>"] |
| | } |
| | |
| | Rules: |
| | 1. Every equation MUST have LaTeX. Use \\frac, \\sum, \\int, \\mathbf etc. for proper notation. |
| | 2. Tables must be fully reproduced in both Markdown and as list-of-dicts. |
| | 3. Algorithms must preserve all steps, loops, conditions verbatim. |
| | 4. Figures: describe them as if for a blind reader — quantitative values, trends, colors, labels. |
| | 5. raw_text must include ALL text visible on the page, including headers, footers, captions. |
| | 6. Do NOT summarize or truncate any content. |
| | """ |
| |
|
| | DOCUMENT_META_PROMPT = """\ |
| | Based on the document pages you have seen, extract high-level metadata as JSON: |
| | |
| | { |
| | "title": "<document title>", |
| | "authors": ["<author name>", ...], |
| | "abstract": "<full abstract text or empty string if none>", |
| | "document_summary": "<comprehensive 5-8 sentence summary of the entire document>" |
| | } |
| | |
| | Respond with valid JSON only. |
| | """ |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class AtomicPDFParser: |
| | """ |
| | Core parser that sends PDF chunks or page images to the Claude API |
| | and extracts structured content atomically. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | api_key: Optional[str] = None, |
| | model: str = DEFAULT_MODEL_OPUS, |
| | mode: str = "native", |
| | chunk_size: int = CHUNK_SIZE_DEFAULT, |
| | cache_dir: Optional[Path] = None, |
| | verbose: bool = False, |
| | max_workers: int = 4, |
| | ): |
| | self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "") |
| | self.model = self._resolve_model(model) |
| | self.mode = mode |
| | self.chunk_size = chunk_size |
| | self.verbose = verbose |
| | self.max_workers = max_workers |
| |
|
| | if not self.api_key: |
| | raise ValueError( |
| | "ANTHROPIC_API_KEY environment variable not set. " |
| | "Export it or pass api_key= to AtomicPDFParser." |
| | ) |
| |
|
| | self.client = anthropic.Anthropic(api_key=self.api_key) |
| |
|
| | cache_path = cache_dir or Path.home() / ".cache" / "pdf_atomic_parser" |
| | self.cache = ParseCache(cache_path) |
| |
|
| | if verbose: |
| | logger.setLevel(logging.DEBUG) |
| |
|
| | @staticmethod |
| | def _resolve_model(alias: str) -> str: |
| | mapping = { |
| | "opus": DEFAULT_MODEL_OPUS, |
| | "sonnet": DEFAULT_MODEL_SONNET, |
| | "haiku": DEFAULT_MODEL_HAIKU, |
| | } |
| | return mapping.get(alias.lower(), alias) |
| |
|
| | |
| | |
| | |
| |
|
| | def parse( |
| | self, |
| | pdf_path: str | Path, |
| | page_range: Optional[Tuple[int, int]] = None, |
| | ) -> DocumentResult: |
| | """ |
| | Parse the entire document (or a page range) atomically. |
| | |
| | Parameters |
| | ---------- |
| | pdf_path : Path to the PDF file. |
| | page_range : Optional (start, end) 1-indexed inclusive page numbers. |
| | |
| | Returns |
| | ------- |
| | DocumentResult with full structured extraction. |
| | """ |
| | path = Path(pdf_path).resolve() |
| | if not path.exists(): |
| | raise FileNotFoundError(f"PDF not found: {path}") |
| |
|
| | doc_hash = self.cache.file_hash(path) |
| | t_start = time.time() |
| |
|
| | with PDFDocument(path) as pdf: |
| | total = pdf.total_pages |
| | if page_range: |
| | p_start = max(0, page_range[0] - 1) |
| | p_end = min(total, page_range[1]) |
| | else: |
| | p_start, p_end = 0, total |
| |
|
| | chunks = [] |
| | for s in range(p_start, p_end, self.chunk_size): |
| | e = min(s + self.chunk_size, p_end) |
| | chunks.append((s, e)) |
| |
|
| | page_results: List[PageResult] = [] |
| |
|
| | with Progress( |
| | SpinnerColumn(), |
| | TextColumn("[bold cyan]{task.description}"), |
| | BarColumn(), |
| | MofNCompleteColumn(), |
| | TaskProgressColumn(), |
| | TimeElapsedColumn(), |
| | TimeRemainingColumn(), |
| | console=console, |
| | transient=False, |
| | ) as progress: |
| | task = progress.add_task( |
| | f"[cyan]Parsing {path.name}", total=len(chunks) |
| | ) |
| |
|
| | for chunk_start, chunk_end in chunks: |
| | chunk_pages = self._parse_chunk( |
| | pdf, doc_hash, chunk_start, chunk_end |
| | ) |
| | page_results.extend(chunk_pages) |
| | progress.advance(task) |
| |
|
| | |
| | meta = self._extract_document_meta(page_results) |
| |
|
| | doc_result = DocumentResult( |
| | document_path = str(path), |
| | document_hash = doc_hash, |
| | total_pages = total, |
| | pages_processed = len(page_results), |
| | model = self.model, |
| | processing_mode = self.mode, |
| | title = meta.get("title", ""), |
| | authors = meta.get("authors", []), |
| | abstract = meta.get("abstract", ""), |
| | document_summary = meta.get("document_summary", ""), |
| | page_results = page_results, |
| | total_equations = sum(len(p.equations) for p in page_results), |
| | total_tables = sum(len(p.tables) for p in page_results), |
| | total_algorithms = sum(len(p.algorithms) for p in page_results), |
| | total_figures = sum(len(p.figures) for p in page_results), |
| | total_tokens_used = sum(p.tokens_used for p in page_results), |
| | total_processing_time_s = time.time() - t_start, |
| | ) |
| | return doc_result |
| |
|
| | def extract_equations(self, pdf_path: str | Path) -> List[EquationBlock]: |
| | result = self.parse(pdf_path) |
| | return [eq for p in result.page_results for eq in p.equations] |
| |
|
| | def extract_tables(self, pdf_path: str | Path) -> List[TableBlock]: |
| | result = self.parse(pdf_path) |
| | return [tb for p in result.page_results for tb in p.tables] |
| |
|
| | def extract_algorithms(self, pdf_path: str | Path) -> List[AlgorithmBlock]: |
| | result = self.parse(pdf_path) |
| | return [al for p in result.page_results for al in p.algorithms] |
| |
|
| | def extract_figures(self, pdf_path: str | Path) -> List[FigureBlock]: |
| | result = self.parse(pdf_path) |
| | return [fg for p in result.page_results for fg in p.figures] |
| |
|
| | def query(self, pdf_path: str | Path, question: str) -> str: |
| | """ |
| | Semantic query over cached parse results. Re-parses if not cached. |
| | """ |
| | result = self.parse(pdf_path) |
| | full_text = "\n\n".join( |
| | f"[Page {p.page_number}]\n{p.raw_text}" for p in result.page_results |
| | ) |
| | messages = [ |
| | { |
| | "role": "user", |
| | "content": ( |
| | f"Based on the following document content, answer this question " |
| | f"precisely and cite page numbers where relevant.\n\n" |
| | f"Question: {question}\n\n" |
| | f"Document content:\n{full_text[:60000]}" |
| | ), |
| | } |
| | ] |
| | resp = self.client.messages.create( |
| | model=self.model, |
| | max_tokens=2048, |
| | messages=messages, |
| | ) |
| | return resp.content[0].text |
| |
|
| | |
| | |
| | |
| |
|
| | def _parse_chunk( |
| | self, |
| | pdf: PDFDocument, |
| | doc_hash: str, |
| | chunk_start: int, |
| | chunk_end: int, |
| | ) -> List[PageResult]: |
| | """Parse a range of pages, using cache when available.""" |
| | results = [] |
| | pages_to_process = [] |
| |
|
| | for pg in range(chunk_start, chunk_end): |
| | cached = self.cache.get_page(doc_hash, pg + 1, self.model, self.mode) |
| | if cached: |
| | logger.debug("Cache hit page %d", pg + 1) |
| | results.append(cached) |
| | else: |
| | pages_to_process.append(pg) |
| |
|
| | if not pages_to_process: |
| | return results |
| |
|
| | |
| | sub_chunks = self._group_consecutive(pages_to_process) |
| | for sub_start, sub_end in sub_chunks: |
| | sub_results = self._call_api_chunk(pdf, doc_hash, sub_start, sub_end) |
| | results.extend(sub_results) |
| |
|
| | results.sort(key=lambda r: r.page_number) |
| | return results |
| |
|
| | @staticmethod |
| | def _group_consecutive(pages: List[int]) -> List[Tuple[int, int]]: |
| | if not pages: |
| | return [] |
| | groups, start, prev = [], pages[0], pages[0] |
| | for p in pages[1:]: |
| | if p != prev + 1: |
| | groups.append((start, prev + 1)) |
| | start = p |
| | prev = p |
| | groups.append((start, prev + 1)) |
| | return groups |
| |
|
| | def _call_api_chunk( |
| | self, |
| | pdf: PDFDocument, |
| | doc_hash: str, |
| | chunk_start: int, |
| | chunk_end: int, |
| | ) -> List[PageResult]: |
| | """Send pages to Claude API and parse response.""" |
| | t_start = time.time() |
| |
|
| | if self.mode == "image": |
| | return self._call_api_as_images(pdf, doc_hash, chunk_start, chunk_end, t_start) |
| | else: |
| | return self._call_api_native(pdf, doc_hash, chunk_start, chunk_end, t_start) |
| |
|
| | def _call_api_native( |
| | self, |
| | pdf: PDFDocument, |
| | doc_hash: str, |
| | chunk_start: int, |
| | chunk_end: int, |
| | t_start: float, |
| | ) -> List[PageResult]: |
| | chunk_bytes = pdf.get_chunk_as_pdf_bytes(chunk_start, chunk_end) |
| | b64_pdf = base64.standard_b64encode(chunk_bytes).decode("utf-8") |
| | num_pages = chunk_end - chunk_start |
| |
|
| | prompt_suffix = ( |
| | f"\nThis PDF chunk contains pages {chunk_start + 1} to {chunk_end} " |
| | f"of the original document. " |
| | f"Return a JSON array with exactly {num_pages} page objects matching the schema. " |
| | f"Index them page_number={chunk_start + 1} through {chunk_end}." |
| | ) |
| |
|
| | messages = [ |
| | { |
| | "role": "user", |
| | "content": [ |
| | { |
| | "type": "document", |
| | "source": { |
| | "type": "base64", |
| | "media_type": "application/pdf", |
| | "data": b64_pdf, |
| | }, |
| | "cache_control": {"type": "ephemeral"}, |
| | }, |
| | { |
| | "type": "text", |
| | "text": PAGE_EXTRACTION_PROMPT + prompt_suffix, |
| | }, |
| | ], |
| | } |
| | ] |
| |
|
| | return self._execute_api_call(messages, doc_hash, chunk_start, chunk_end, t_start, "native") |
| |
|
| | def _call_api_as_images( |
| | self, |
| | pdf: PDFDocument, |
| | doc_hash: str, |
| | chunk_start: int, |
| | chunk_end: int, |
| | t_start: float, |
| | ) -> List[PageResult]: |
| | content = [] |
| | for pg_idx in range(chunk_start, chunk_end): |
| | png_bytes = pdf.get_page_as_png_bytes(pg_idx, dpi=IMAGE_DPI) |
| | b64_img = base64.standard_b64encode(png_bytes).decode("utf-8") |
| | content.append({ |
| | "type": "text", |
| | "text": f"--- Page {pg_idx + 1} ---", |
| | }) |
| | content.append({ |
| | "type": "image", |
| | "source": { |
| | "type": "base64", |
| | "media_type": "image/png", |
| | "data": b64_img, |
| | }, |
| | }) |
| |
|
| | num_pages = chunk_end - chunk_start |
| | prompt_suffix = ( |
| | f"\nThese are page images {chunk_start + 1} through {chunk_end}. " |
| | f"Return a JSON array with exactly {num_pages} page objects matching the schema. " |
| | f"Index them page_number={chunk_start + 1} through {chunk_end}." |
| | ) |
| | content.append({"type": "text", "text": PAGE_EXTRACTION_PROMPT + prompt_suffix}) |
| |
|
| | messages = [{"role": "user", "content": content}] |
| | return self._execute_api_call(messages, doc_hash, chunk_start, chunk_end, t_start, "image") |
| |
|
| | def _execute_api_call( |
| | self, |
| | messages: List[Dict], |
| | doc_hash: str, |
| | chunk_start: int, |
| | chunk_end: int, |
| | t_start: float, |
| | mode: str, |
| | ) -> List[PageResult]: |
| | retries, delay = 3, 5 |
| | for attempt in range(retries): |
| | try: |
| | resp = self.client.messages.create( |
| | model=self.model, |
| | max_tokens=MAX_TOKENS_OUTPUT, |
| | system=SYSTEM_PROMPT, |
| | messages=messages, |
| | ) |
| | break |
| | except anthropic.RateLimitError: |
| | if attempt == retries - 1: |
| | raise |
| | logger.warning("Rate limit hit; retrying in %ds...", delay) |
| | time.sleep(delay) |
| | delay *= 2 |
| | except anthropic.APIStatusError as exc: |
| | logger.error("API error: %s", exc) |
| | raise |
| |
|
| | raw_response = resp.content[0].text.strip() |
| | tokens_used = resp.usage.input_tokens + resp.usage.output_tokens |
| | elapsed = time.time() - t_start |
| |
|
| | |
| | if raw_response.startswith("```"): |
| | lines = raw_response.split("\n") |
| | raw_response = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) |
| |
|
| | try: |
| | parsed = json.loads(raw_response) |
| | except json.JSONDecodeError as exc: |
| | logger.error("JSON parse error on API response: %s\nRaw:\n%s", exc, raw_response[:500]) |
| | |
| | return [ |
| | PageResult( |
| | page_number=pg + 1, |
| | raw_text="[PARSE ERROR: JSON decode failed]", |
| | summary="Failed to parse this page.", |
| | processing_mode=mode, |
| | tokens_used=tokens_used // max(1, chunk_end - chunk_start), |
| | processing_time_s=elapsed, |
| | ) |
| | for pg in range(chunk_start, chunk_end) |
| | ] |
| |
|
| | |
| | if isinstance(parsed, dict): |
| | parsed = [parsed] |
| |
|
| | results = [] |
| | for i, page_data in enumerate(parsed): |
| | pg_num = chunk_start + i + 1 |
| | page_data["page_number"] = pg_num |
| | page_data["processing_mode"] = mode |
| | page_data["tokens_used"] = tokens_used // len(parsed) |
| | page_data["processing_time_s"] = elapsed / len(parsed) |
| |
|
| | pr = self._dict_to_page_result(page_data) |
| | self.cache.set_page(doc_hash, pr, self.model, mode) |
| | results.append(pr) |
| |
|
| | return results |
| |
|
| | @staticmethod |
| | def _dict_to_page_result(d: Dict) -> PageResult: |
| | equations = [ |
| | EquationBlock( |
| | page=d["page_number"], |
| | index=e.get("index", i), |
| | latex=e.get("latex", ""), |
| | description=e.get("description", ""), |
| | inline=e.get("inline", False), |
| | ) |
| | for i, e in enumerate(d.get("equations", [])) |
| | ] |
| | tables = [ |
| | TableBlock( |
| | page=d["page_number"], |
| | index=t.get("index", i), |
| | markdown=t.get("markdown", ""), |
| | json_data=t.get("json_data", []), |
| | caption=t.get("caption", ""), |
| | ) |
| | for i, t in enumerate(d.get("tables", [])) |
| | ] |
| | algorithms = [ |
| | AlgorithmBlock( |
| | page=d["page_number"], |
| | index=a.get("index", i), |
| | name=a.get("name", f"Algorithm {i+1}"), |
| | language=a.get("language", "pseudocode"), |
| | code=a.get("code", ""), |
| | description=a.get("description", ""), |
| | ) |
| | for i, a in enumerate(d.get("algorithms", [])) |
| | ] |
| | figures = [ |
| | FigureBlock( |
| | page=d["page_number"], |
| | index=f.get("index", i), |
| | figure_type=f.get("figure_type", "other"), |
| | description=f.get("description", ""), |
| | data_summary=f.get("data_summary", ""), |
| | caption=f.get("caption", ""), |
| | ) |
| | for i, f in enumerate(d.get("figures", [])) |
| | ] |
| | return PageResult( |
| | page_number = d["page_number"], |
| | raw_text = d.get("raw_text", ""), |
| | summary = d.get("summary", ""), |
| | equations = equations, |
| | tables = tables, |
| | algorithms = algorithms, |
| | figures = figures, |
| | section_headers = d.get("section_headers", []), |
| | references = d.get("references", []), |
| | keywords = d.get("keywords", []), |
| | layout_notes = d.get("layout_notes", ""), |
| | processing_mode = d.get("processing_mode", "native"), |
| | tokens_used = d.get("tokens_used", 0), |
| | processing_time_s = d.get("processing_time_s", 0.0), |
| | ) |
| |
|
| | def _extract_document_meta(self, page_results: List[PageResult]) -> Dict: |
| | |
| | sample_text = "\n\n".join( |
| | f"[Page {p.page_number}]\n{p.raw_text}" for p in page_results[:5] |
| | ) |
| | messages = [ |
| | { |
| | "role": "user", |
| | "content": ( |
| | f"{DOCUMENT_META_PROMPT}\n\nDocument sample:\n{sample_text[:8000]}" |
| | ), |
| | } |
| | ] |
| | try: |
| | resp = self.client.messages.create( |
| | model=self.model, |
| | max_tokens=1024, |
| | system=SYSTEM_PROMPT, |
| | messages=messages, |
| | ) |
| | raw = resp.content[0].text.strip() |
| | if raw.startswith("```"): |
| | lines = raw.split("\n") |
| | raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) |
| | return json.loads(raw) |
| | except Exception as exc: |
| | logger.warning("Document meta extraction failed: %s", exc) |
| | return {"title": "", "authors": [], "abstract": "", "document_summary": ""} |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class OutputFormatter: |
| | @staticmethod |
| | def to_json(result: DocumentResult, indent: int = 2) -> str: |
| | return json.dumps(asdict(result), indent=indent, ensure_ascii=False) |
| |
|
| | @staticmethod |
| | def to_markdown(result: DocumentResult) -> str: |
| | lines = [] |
| | lines.append(f"# {result.title or Path(result.document_path).name}") |
| | if result.authors: |
| | lines.append(f"\n**Authors:** {', '.join(result.authors)}") |
| | lines.append(f"\n**Document Hash:** `{result.document_hash}`") |
| | lines.append(f"**Model:** {result.model} | **Mode:** {result.processing_mode}") |
| | lines.append( |
| | f"**Pages:** {result.pages_processed}/{result.total_pages} | " |
| | f"**Tokens:** {result.total_tokens_used:,} | " |
| | f"**Time:** {result.total_processing_time_s:.1f}s" |
| | ) |
| | lines.append( |
| | f"**Equations:** {result.total_equations} | " |
| | f"**Tables:** {result.total_tables} | " |
| | f"**Algorithms:** {result.total_algorithms} | " |
| | f"**Figures:** {result.total_figures}" |
| | ) |
| | if result.abstract: |
| | lines.append(f"\n## Abstract\n\n{result.abstract}") |
| | if result.document_summary: |
| | lines.append(f"\n## Document Summary\n\n{result.document_summary}") |
| |
|
| | for page in result.page_results: |
| | lines.append(f"\n---\n\n## Page {page.page_number}") |
| | if page.section_headers: |
| | lines.append("\n### Sections\n" + "\n".join(f"- {h}" for h in page.section_headers)) |
| | lines.append(f"\n### Summary\n{page.summary}") |
| | lines.append(f"\n### Full Text\n\n{page.raw_text}") |
| |
|
| | if page.equations: |
| | lines.append("\n### Equations\n") |
| | for eq in page.equations: |
| | lines.append(f"**Eq {eq.index}** ({('inline' if eq.inline else 'display')})") |
| | lines.append(f"```latex\n{eq.latex}\n```") |
| | lines.append(f"*{eq.description}*\n") |
| |
|
| | if page.tables: |
| | lines.append("\n### Tables\n") |
| | for tb in page.tables: |
| | if tb.caption: |
| | lines.append(f"**{tb.caption}**\n") |
| | lines.append(tb.markdown + "\n") |
| |
|
| | if page.algorithms: |
| | lines.append("\n### Algorithms\n") |
| | for al in page.algorithms: |
| | lines.append(f"**{al.name}** ({al.language})\n") |
| | lines.append(f"```{al.language}\n{al.code}\n```") |
| | lines.append(f"*{al.description}*\n") |
| |
|
| | if page.figures: |
| | lines.append("\n### Figures\n") |
| | for fg in page.figures: |
| | lines.append(f"**Figure {fg.index}** [{fg.figure_type}]") |
| | if fg.caption: |
| | lines.append(f"*{fg.caption}*") |
| | lines.append(fg.description) |
| | if fg.data_summary: |
| | lines.append(f"Data: {fg.data_summary}\n") |
| |
|
| | return "\n".join(lines) |
| |
|
| | @staticmethod |
| | def to_text(result: DocumentResult) -> str: |
| | lines = [ |
| | f"DOCUMENT: {result.title or Path(result.document_path).name}", |
| | f"Authors: {', '.join(result.authors)}", |
| | f"Pages processed: {result.pages_processed}/{result.total_pages}", |
| | "", |
| | "SUMMARY", |
| | "=" * 60, |
| | result.document_summary, |
| | "", |
| | ] |
| | for page in result.page_results: |
| | lines.append(f"\n[PAGE {page.page_number}]") |
| | lines.append(page.raw_text) |
| | return "\n".join(lines) |
| |
|
| | @staticmethod |
| | def print_summary_table(result: DocumentResult) -> None: |
| | table = Table(title=f"Parse Results: {Path(result.document_path).name}", show_lines=True) |
| | table.add_column("Metric", style="cyan", no_wrap=True) |
| | table.add_column("Value", style="green") |
| |
|
| | table.add_row("Title", result.title or "(unknown)") |
| | table.add_row("Authors", ", ".join(result.authors) or "(unknown)") |
| | table.add_row("Model", result.model) |
| | table.add_row("Mode", result.processing_mode) |
| | table.add_row("Pages total", str(result.total_pages)) |
| | table.add_row("Pages parsed", str(result.pages_processed)) |
| | table.add_row("Equations", str(result.total_equations)) |
| | table.add_row("Tables", str(result.total_tables)) |
| | table.add_row("Algorithms", str(result.total_algorithms)) |
| | table.add_row("Figures", str(result.total_figures)) |
| | table.add_row("Tokens used", f"{result.total_tokens_used:,}") |
| | table.add_row("Processing time", f"{result.total_processing_time_s:.1f}s") |
| | table.add_row("Document hash", result.document_hash) |
| |
|
| | console.print(table) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class AgentPDFInterface: |
| | """ |
| | High-level interface designed for use within agent pipelines. |
| | All methods accept a file path and return serializable Python objects. |
| | |
| | Example usage in an agent: |
| | from pdf_atomic_parser import AgentPDFInterface |
| | |
| | agent = AgentPDFInterface(model="opus") |
| | full = agent.parse("paper.pdf") |
| | eqs = agent.get_equations("paper.pdf") |
| | answer = agent.ask("paper.pdf", "What is the loss function?") |
| | """ |
| |
|
| | def __init__(self, **kwargs): |
| | self._parser = AtomicPDFParser(**kwargs) |
| |
|
| | def parse(self, pdf_path: str, page_range: Optional[Tuple[int, int]] = None) -> Dict: |
| | result = self._parser.parse(pdf_path, page_range) |
| | return asdict(result) |
| |
|
| | def get_equations(self, pdf_path: str) -> List[Dict]: |
| | return [asdict(e) for e in self._parser.extract_equations(pdf_path)] |
| |
|
| | def get_tables(self, pdf_path: str) -> List[Dict]: |
| | return [asdict(t) for t in self._parser.extract_tables(pdf_path)] |
| |
|
| | def get_algorithms(self, pdf_path: str) -> List[Dict]: |
| | return [asdict(a) for a in self._parser.extract_algorithms(pdf_path)] |
| |
|
| | def get_figures(self, pdf_path: str) -> List[Dict]: |
| | return [asdict(f) for f in self._parser.extract_figures(pdf_path)] |
| |
|
| | def ask(self, pdf_path: str, question: str) -> str: |
| | return self._parser.query(pdf_path, question) |
| |
|
| | def get_full_text(self, pdf_path: str) -> str: |
| | result = self._parser.parse(pdf_path) |
| | return "\n\n".join( |
| | f"[Page {p.page_number}]\n{p.raw_text}" |
| | for p in result.page_results |
| | ) |
| |
|
| | def cache_stats(self) -> Dict: |
| | return self._parser.cache.stats() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def batch_process( |
| | input_dir: Path, |
| | output_dir: Path, |
| | parser: AtomicPDFParser, |
| | fmt: str = "json", |
| | ) -> None: |
| | pdfs = sorted(input_dir.glob("**/*.pdf")) |
| | if not pdfs: |
| | console.print(f"[yellow]No PDF files found in {input_dir}[/yellow]") |
| | return |
| |
|
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | console.print(f"[cyan]Found {len(pdfs)} PDF files to process.[/cyan]") |
| |
|
| | for pdf_path in pdfs: |
| | console.print(f"\n[bold]Processing:[/bold] {pdf_path.name}") |
| | try: |
| | result = parser.parse(pdf_path) |
| | stem = pdf_path.stem |
| | if fmt == "json": |
| | out = output_dir / f"{stem}.json" |
| | out.write_text(OutputFormatter.to_json(result), encoding="utf-8") |
| | elif fmt == "markdown": |
| | out = output_dir / f"{stem}.md" |
| | out.write_text(OutputFormatter.to_markdown(result), encoding="utf-8") |
| | else: |
| | out = output_dir / f"{stem}.txt" |
| | out.write_text(OutputFormatter.to_text(result), encoding="utf-8") |
| | console.print(f" [green]Saved:[/green] {out}") |
| | OutputFormatter.print_summary_table(result) |
| | except Exception as exc: |
| | console.print(f" [red]Error processing {pdf_path.name}: {exc}[/red]") |
| | logger.exception("Batch error") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def estimate_tokens(pdf_path: Path) -> None: |
| | with PDFDocument(pdf_path) as pdf: |
| | total = pdf.total_pages |
| | size_mb = pdf.file_size_bytes / 1e6 |
| |
|
| | |
| | est_tokens_in = total * 800 |
| | est_tokens_out = total * 400 |
| | est_total = est_tokens_in + est_tokens_out |
| |
|
| | |
| | est_cost_opus = (est_tokens_in * 15 + est_tokens_out * 75) / 1_000_000 |
| |
|
| | table = Table(title=f"Token Estimate: {pdf_path.name}", show_lines=True) |
| | table.add_column("Metric", style="cyan") |
| | table.add_column("Estimate", style="yellow") |
| |
|
| | table.add_row("Total pages", str(total)) |
| | table.add_row("File size", f"{size_mb:.2f} MB") |
| | table.add_row("Est. input tokens", f"{est_tokens_in:,}") |
| | table.add_row("Est. output tokens", f"{est_tokens_out:,}") |
| | table.add_row("Est. total tokens", f"{est_total:,}") |
| | table.add_row("Est. cost (Opus)", f"${est_cost_opus:.2f}") |
| | table.add_row("Note", "Estimate only; actual usage varies") |
| |
|
| | console.print(table) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def build_cli() -> argparse.ArgumentParser: |
| | parser = argparse.ArgumentParser( |
| | prog="pdf_atomic_parser", |
| | description="Atomic PDF parser powered by Claude claude-opus-4-6", |
| | formatter_class=argparse.RawDescriptionHelpFormatter, |
| | ) |
| | parser.add_argument("--model", default="opus", help="opus | sonnet | haiku | full-model-string") |
| | parser.add_argument("--mode", default="native", choices=["native", "image"], help="Parsing mode") |
| | parser.add_argument("--chunk-size", type=int, default=CHUNK_SIZE_DEFAULT, help="Pages per API call") |
| | parser.add_argument("--verbose", action="store_true") |
| |
|
| | sub = parser.add_subparsers(dest="command", required=True) |
| |
|
| | |
| | p_parse = sub.add_parser("parse", help="Parse a PDF fully") |
| | p_parse.add_argument("pdf", help="Path to PDF file") |
| | p_parse.add_argument("--output", "-o", help="Output file path") |
| | p_parse.add_argument("--format", "-f", default="json", choices=["json", "markdown", "text"]) |
| | p_parse.add_argument("--pages", help="Page range e.g. 1-50") |
| |
|
| | |
| | p_atomic = sub.add_parser("atomic", help="Full atomic extraction to directory") |
| | p_atomic.add_argument("pdf", help="Path to PDF file") |
| | p_atomic.add_argument("--output", "-o", default="./atomic_output") |
| |
|
| | |
| | p_eq = sub.add_parser("extract-equations", help="Extract LaTeX equations") |
| | p_eq.add_argument("pdf") |
| | p_eq.add_argument("--output", "-o") |
| |
|
| | |
| | p_tb = sub.add_parser("extract-tables", help="Extract tables") |
| | p_tb.add_argument("pdf") |
| | p_tb.add_argument("--output", "-o") |
| |
|
| | |
| | p_al = sub.add_parser("extract-algorithms", help="Extract algorithms/code") |
| | p_al.add_argument("pdf") |
| | p_al.add_argument("--output", "-o") |
| |
|
| | |
| | p_fg = sub.add_parser("extract-figures", help="Extract figure descriptions") |
| | p_fg.add_argument("pdf") |
| | p_fg.add_argument("--output", "-o") |
| |
|
| | |
| | p_q = sub.add_parser("query", help="Ask a question about the PDF") |
| | p_q.add_argument("pdf") |
| | p_q.add_argument("question", help="Question to ask") |
| |
|
| | |
| | p_batch = sub.add_parser("batch", help="Batch process a directory of PDFs") |
| | p_batch.add_argument("directory") |
| | p_batch.add_argument("--output", "-o", default="./batch_output") |
| | p_batch.add_argument("--format", "-f", default="json", choices=["json", "markdown", "text"]) |
| |
|
| | |
| | p_est = sub.add_parser("estimate", help="Estimate token cost before parsing") |
| | p_est.add_argument("pdf") |
| |
|
| | |
| | sub.add_parser("cache-stats", help="Show cache statistics") |
| | sub.add_parser("list-cache", help="List all cached documents") |
| | p_cc = sub.add_parser("clear-cache", help="Clear cache for a document") |
| | p_cc.add_argument("pdf", help="PDF path (to identify document)") |
| |
|
| | return parser |
| |
|
| |
|
| | def parse_page_range(s: str) -> Tuple[int, int]: |
| | parts = s.split("-") |
| | if len(parts) != 2: |
| | raise ValueError(f"Page range must be in format start-end, got: {s}") |
| | return int(parts[0]), int(parts[1]) |
| |
|
| |
|
| | def save_output(content: str, output_path: Optional[str], default_name: str) -> None: |
| | path = Path(output_path) if output_path else Path(default_name) |
| | path.parent.mkdir(parents=True, exist_ok=True) |
| | path.write_text(content, encoding="utf-8") |
| | console.print(f"[green]Saved:[/green] {path}") |
| |
|
| |
|
| | def main() -> None: |
| | cli = build_cli() |
| | args = cli.parse_args() |
| | cache = ParseCache(Path.home() / ".cache" / "pdf_atomic_parser") |
| |
|
| | if args.command == "cache-stats": |
| | stats = cache.stats() |
| | table = Table(title="Cache Statistics", show_lines=True) |
| | table.add_column("Key", style="cyan") |
| | table.add_column("Value", style="green") |
| | for k, v in stats.items(): |
| | table.add_row(k.replace("_", " ").title(), str(v)) |
| | console.print(table) |
| | return |
| |
|
| | if args.command == "list-cache": |
| | docs = cache.list_documents() |
| | if not docs: |
| | console.print("[yellow]Cache is empty.[/yellow]") |
| | return |
| | table = Table(title="Cached Documents", show_lines=True) |
| | table.add_column("Hash", style="cyan") |
| | table.add_column("Cached Pages", style="green") |
| | table.add_column("First Seen", style="dim") |
| | for d in docs: |
| | import datetime |
| | ts = datetime.datetime.fromtimestamp(d["first_seen"]).strftime("%Y-%m-%d %H:%M") |
| | table.add_row(d["hash"], str(d["cached_pages"]), ts) |
| | console.print(table) |
| | return |
| |
|
| | if args.command == "estimate": |
| | estimate_tokens(Path(args.pdf)) |
| | return |
| |
|
| | parser = AtomicPDFParser( |
| | model=args.model, |
| | mode=args.mode, |
| | chunk_size=args.chunk_size, |
| | verbose=args.verbose, |
| | ) |
| |
|
| | if args.command == "clear-cache": |
| | doc_hash = cache.file_hash(Path(args.pdf)) |
| | n = cache.clear_document(doc_hash) |
| | console.print(f"[green]Cleared {n} cached pages for {Path(args.pdf).name}[/green]") |
| | return |
| |
|
| | if args.command in ("parse", "atomic"): |
| | page_range = None |
| | if hasattr(args, "pages") and args.pages: |
| | page_range = parse_page_range(args.pages) |
| |
|
| | result = parser.parse(args.pdf, page_range) |
| | OutputFormatter.print_summary_table(result) |
| |
|
| | if args.command == "atomic": |
| | out_dir = Path(args.output) |
| | stem = Path(args.pdf).stem |
| | for fmt, fn in [("json", f"{stem}.json"), ("markdown", f"{stem}.md"), ("text", f"{stem}.txt")]: |
| | (out_dir / fn).parent.mkdir(parents=True, exist_ok=True) |
| | if fmt == "json": |
| | content = OutputFormatter.to_json(result) |
| | elif fmt == "markdown": |
| | content = OutputFormatter.to_markdown(result) |
| | else: |
| | content = OutputFormatter.to_text(result) |
| | (out_dir / fn).write_text(content, encoding="utf-8") |
| | console.print(f"[green]Saved {fmt}:[/green] {out_dir / fn}") |
| | else: |
| | fmt = args.format |
| | if fmt == "json": |
| | content = OutputFormatter.to_json(result) |
| | elif fmt == "markdown": |
| | content = OutputFormatter.to_markdown(result) |
| | else: |
| | content = OutputFormatter.to_text(result) |
| |
|
| | stem = Path(args.pdf).stem |
| | save_output(content, getattr(args, "output", None), f"{stem}_parsed.{fmt if fmt != 'markdown' else 'md'}") |
| |
|
| | elif args.command == "extract-equations": |
| | result = parser.parse(args.pdf) |
| | eqs = [asdict(e) for p in result.page_results for e in p.equations] |
| | content = json.dumps(eqs, indent=2, ensure_ascii=False) |
| | save_output(content, args.output, f"{Path(args.pdf).stem}_equations.json") |
| | console.print(f"[cyan]{len(eqs)} equations extracted.[/cyan]") |
| |
|
| | elif args.command == "extract-tables": |
| | result = parser.parse(args.pdf) |
| | tables = [asdict(t) for p in result.page_results for t in p.tables] |
| | content = json.dumps(tables, indent=2, ensure_ascii=False) |
| | save_output(content, args.output, f"{Path(args.pdf).stem}_tables.json") |
| | console.print(f"[cyan]{len(tables)} tables extracted.[/cyan]") |
| |
|
| | elif args.command == "extract-algorithms": |
| | result = parser.parse(args.pdf) |
| | algos = [asdict(a) for p in result.page_results for a in p.algorithms] |
| | content = json.dumps(algos, indent=2, ensure_ascii=False) |
| | save_output(content, args.output, f"{Path(args.pdf).stem}_algorithms.json") |
| | console.print(f"[cyan]{len(algos)} algorithms extracted.[/cyan]") |
| |
|
| | elif args.command == "extract-figures": |
| | result = parser.parse(args.pdf) |
| | figures = [asdict(f) for p in result.page_results for f in p.figures] |
| | content = json.dumps(figures, indent=2, ensure_ascii=False) |
| | save_output(content, args.output, f"{Path(args.pdf).stem}_figures.json") |
| | console.print(f"[cyan]{len(figures)} figures extracted.[/cyan]") |
| |
|
| | elif args.command == "query": |
| | answer = parser.query(args.pdf, args.question) |
| | console.print(f"\n[bold cyan]Answer:[/bold cyan]\n{answer}") |
| |
|
| | elif args.command == "batch": |
| | batch_process( |
| | Path(args.directory), |
| | Path(args.output), |
| | parser, |
| | getattr(args, "format", "json"), |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|