SPARKNET / src /cli /docint.py
MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Document Intelligence CLI Commands
CLI interface for the document_intelligence subsystem.
"""
import json
import sys
from pathlib import Path
from typing import List, Optional
import click
@click.group(name="docint")
def docint_cli():
"""Document Intelligence commands."""
pass
@docint_cli.command()
@click.argument("path", type=click.Path(exists=True))
@click.option("--output", "-o", type=click.Path(), help="Output JSON file")
@click.option("--max-pages", type=int, help="Maximum pages to process")
@click.option("--dpi", type=int, default=200, help="Render DPI (default: 200)")
@click.option("--format", "output_format", type=click.Choice(["json", "markdown", "text"]),
default="json", help="Output format")
def parse(path: str, output: Optional[str], max_pages: Optional[int],
dpi: int, output_format: str):
"""
Parse a document into semantic chunks.
Example:
sparknet docint parse invoice.pdf -o result.json
sparknet docint parse document.pdf --format markdown
"""
from src.document_intelligence import (
DocumentParser,
ParserConfig,
)
config = ParserConfig(
render_dpi=dpi,
max_pages=max_pages,
)
parser = DocumentParser(config=config)
click.echo(f"Parsing: {path}")
try:
result = parser.parse(path)
if output_format == "json":
output_data = {
"doc_id": result.doc_id,
"filename": result.filename,
"num_pages": result.num_pages,
"chunks": [
{
"chunk_id": c.chunk_id,
"type": c.chunk_type.value,
"text": c.text,
"page": c.page,
"bbox": c.bbox.xyxy,
"confidence": c.confidence,
}
for c in result.chunks
],
"processing_time_ms": result.processing_time_ms,
}
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
click.echo(f"Output written to: {output}")
else:
click.echo(json.dumps(output_data, indent=2))
elif output_format == "markdown":
if output:
with open(output, "w") as f:
f.write(result.markdown_full)
click.echo(f"Markdown written to: {output}")
else:
click.echo(result.markdown_full)
else: # text
for chunk in result.chunks:
click.echo(f"[Page {chunk.page}, {chunk.chunk_type.value}]")
click.echo(chunk.text)
click.echo()
click.echo(f"\nParsed {len(result.chunks)} chunks in {result.processing_time_ms:.0f}ms")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command()
@click.argument("path", type=click.Path(exists=True))
@click.option("--field", "-f", multiple=True, help="Field to extract (can specify multiple)")
@click.option("--schema", "-s", type=click.Path(exists=True), help="JSON schema file")
@click.option("--preset", type=click.Choice(["invoice", "receipt", "contract"]),
help="Use preset schema")
@click.option("--output", "-o", type=click.Path(), help="Output JSON file")
def extract(path: str, field: tuple, schema: Optional[str], preset: Optional[str],
output: Optional[str]):
"""
Extract fields from a document.
Example:
sparknet docint extract invoice.pdf --preset invoice
sparknet docint extract doc.pdf -f vendor_name -f total_amount
sparknet docint extract doc.pdf --schema my_schema.json
"""
from src.document_intelligence import (
DocumentParser,
FieldExtractor,
ExtractionSchema,
FieldSpec,
FieldType,
create_invoice_schema,
create_receipt_schema,
create_contract_schema,
)
# Build schema
if preset:
if preset == "invoice":
extraction_schema = create_invoice_schema()
elif preset == "receipt":
extraction_schema = create_receipt_schema()
elif preset == "contract":
extraction_schema = create_contract_schema()
elif schema:
with open(schema) as f:
schema_dict = json.load(f)
extraction_schema = ExtractionSchema.from_json_schema(schema_dict)
elif field:
extraction_schema = ExtractionSchema(name="custom")
for f in field:
extraction_schema.add_string_field(f, required=True)
else:
click.echo("Error: Specify --field, --schema, or --preset", err=True)
sys.exit(1)
click.echo(f"Extracting from: {path}")
click.echo(f"Fields: {', '.join(f.name for f in extraction_schema.fields)}")
try:
# Parse document
parser = DocumentParser()
parse_result = parser.parse(path)
# Extract fields
extractor = FieldExtractor()
result = extractor.extract(parse_result, extraction_schema)
output_data = {
"doc_id": parse_result.doc_id,
"filename": parse_result.filename,
"extracted_data": result.data,
"confidence": result.overall_confidence,
"abstained_fields": result.abstained_fields,
"evidence": [
{
"chunk_id": e.chunk_id,
"page": e.page,
"bbox": e.bbox.xyxy,
"snippet": e.snippet,
}
for e in result.evidence
],
}
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
click.echo(f"Output written to: {output}")
else:
click.echo("\nExtracted Data:")
for key, value in result.data.items():
status = "" if key not in result.abstained_fields else " [ABSTAINED]"
click.echo(f" {key}: {value}{status}")
click.echo(f"\nConfidence: {result.overall_confidence:.2f}")
if result.abstained_fields:
click.echo(f"Abstained: {', '.join(result.abstained_fields)}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command()
@click.argument("path", type=click.Path(exists=True))
@click.argument("question")
@click.option("--verbose", "-v", is_flag=True, help="Show evidence details")
@click.option("--use-rag", is_flag=True, help="Use RAG for retrieval (requires indexed document)")
@click.option("--document-id", "-d", help="Document ID for RAG retrieval")
@click.option("--top-k", "-k", type=int, default=5, help="Number of chunks to consider")
@click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type (can specify multiple)")
@click.option("--page-start", type=int, help="Filter by page range start")
@click.option("--page-end", type=int, help="Filter by page range end")
def ask(path: str, question: str, verbose: bool, use_rag: bool,
document_id: Optional[str], top_k: int, chunk_type: tuple,
page_start: Optional[int], page_end: Optional[int]):
"""
Ask a question about a document.
Example:
sparknet docint ask invoice.pdf "What is the total amount?"
sparknet docint ask doc.pdf "Find claims" --use-rag --top-k 10
sparknet docint ask doc.pdf "What tables show?" -t table --use-rag
"""
from src.document_intelligence import DocumentParser
click.echo(f"Document: {path}")
click.echo(f"Question: {question}")
if use_rag:
click.echo("Mode: RAG (semantic retrieval)")
else:
click.echo("Mode: Keyword search")
click.echo()
try:
if use_rag:
# Use RAG-based answering
from src.document_intelligence.tools import get_rag_tool
tool = get_rag_tool("rag_answer")
# Build page range if specified
page_range = None
if page_start is not None and page_end is not None:
page_range = (page_start, page_end)
result = tool.execute(
question=question,
document_id=document_id,
top_k=top_k,
chunk_types=list(chunk_type) if chunk_type else None,
page_range=page_range,
)
else:
# Parse document and use keyword-based search
from src.document_intelligence.tools import get_tool
parser = DocumentParser()
parse_result = parser.parse(path)
tool = get_tool("answer_question")
result = tool.execute(
parse_result=parse_result,
question=question,
top_k=top_k,
)
if result.success:
data = result.data
click.echo(f"Answer: {data.get('answer', 'No answer found')}")
click.echo(f"Confidence: {data.get('confidence', 0):.2f}")
if data.get('abstained'):
click.echo("Note: The system abstained due to low confidence.")
if verbose and result.evidence:
click.echo("\nEvidence:")
for ev in result.evidence:
click.echo(f" - Page {ev.get('page', '?')}: {ev.get('snippet', '')[:100]}...")
if data.get('citations'):
click.echo("\nCitations:")
for cit in data['citations']:
click.echo(f" [{cit['index']}] {cit.get('text', '')[:80]}...")
else:
click.echo(f"Error: {result.error}", err=True)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command()
@click.argument("path", type=click.Path(exists=True))
@click.option("--output", "-o", type=click.Path(), help="Output JSON file")
def classify(path: str, output: Optional[str]):
"""
Classify a document's type.
Example:
sparknet docint classify document.pdf
"""
from src.document_intelligence import DocumentParser
from src.document_intelligence.chunks import DocumentType
click.echo(f"Classifying: {path}")
try:
# Parse document
parser = DocumentParser()
parse_result = parser.parse(path)
# Simple classification based on keywords
first_page_chunks = [c for c in parse_result.chunks if c.page == 1][:5]
content = " ".join(c.text[:200] for c in first_page_chunks).lower()
doc_type = "other"
confidence = 0.5
type_keywords = {
"invoice": ["invoice", "bill", "payment due", "amount due", "invoice number"],
"contract": ["agreement", "contract", "party", "whereas", "terms and conditions"],
"receipt": ["receipt", "paid", "transaction", "thank you for your purchase"],
"form": ["form", "fill in", "checkbox", "signature line"],
"letter": ["dear", "sincerely", "regards", "to whom it may concern"],
"report": ["report", "findings", "conclusion", "summary", "analysis"],
"patent": ["patent", "claims", "invention", "embodiment", "disclosed"],
}
for dtype, keywords in type_keywords.items():
matches = sum(1 for k in keywords if k in content)
if matches >= 2:
doc_type = dtype
confidence = min(0.95, 0.5 + matches * 0.15)
break
output_data = {
"doc_id": parse_result.doc_id,
"filename": parse_result.filename,
"document_type": doc_type,
"confidence": confidence,
}
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
click.echo(f"Output written to: {output}")
else:
click.echo(f"Type: {doc_type}")
click.echo(f"Confidence: {confidence:.2f}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command()
@click.argument("path", type=click.Path(exists=True))
@click.option("--query", "-q", help="Search query")
@click.option("--type", "chunk_type", help="Filter by chunk type")
@click.option("--top", "-k", type=int, default=10, help="Number of results")
def search(path: str, query: Optional[str], chunk_type: Optional[str], top: int):
"""
Search document content.
Example:
sparknet docint search document.pdf -q "payment terms"
sparknet docint search document.pdf --type table
"""
from src.document_intelligence import DocumentParser
from src.document_intelligence.tools import get_tool
click.echo(f"Searching: {path}")
try:
# Parse document
parser = DocumentParser()
parse_result = parser.parse(path)
if query:
# Search by query
tool = get_tool("search_chunks")
result = tool.execute(
parse_result=parse_result,
query=query,
chunk_types=[chunk_type] if chunk_type else None,
top_k=top,
)
if result.success:
results = result.data.get("results", [])
click.echo(f"Found {len(results)} results:\n")
for i, r in enumerate(results, 1):
click.echo(f"{i}. [Page {r['page']}, {r['type']}] (score: {r['score']:.2f})")
click.echo(f" {r['text'][:200]}...")
click.echo()
else:
click.echo(f"Error: {result.error}", err=True)
elif chunk_type:
# Filter by type
matching = [c for c in parse_result.chunks if c.chunk_type.value == chunk_type]
click.echo(f"Found {len(matching)} {chunk_type} chunks:\n")
for i, chunk in enumerate(matching[:top], 1):
click.echo(f"{i}. [Page {chunk.page}] {chunk.chunk_id}")
click.echo(f" {chunk.text[:200]}...")
click.echo()
else:
# List all chunks
click.echo(f"Total chunks: {len(parse_result.chunks)}\n")
# Group by type
by_type = {}
for chunk in parse_result.chunks:
t = chunk.chunk_type.value
by_type[t] = by_type.get(t, 0) + 1
click.echo("Chunk types:")
for t, count in sorted(by_type.items()):
click.echo(f" {t}: {count}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command()
@click.argument("path", type=click.Path(exists=True))
@click.option("--page", "-p", type=int, default=1, help="Page number")
@click.option("--output-dir", "-d", type=click.Path(), default="./crops",
help="Output directory for crops")
@click.option("--annotate", "-a", is_flag=True, help="Create annotated page image")
def visualize(path: str, page: int, output_dir: str, annotate: bool):
"""
Visualize document regions.
Example:
sparknet docint visualize document.pdf --page 1 --annotate
"""
from src.document_intelligence import (
DocumentParser,
load_document,
RenderOptions,
)
from src.document_intelligence.grounding import create_annotated_image, CropManager
from PIL import Image
import numpy as np
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
click.echo(f"Processing: {path}, page {page}")
try:
# Parse document
parser = DocumentParser()
parse_result = parser.parse(path)
# Load and render page
loader, renderer = load_document(path)
page_image = renderer.render_page(page, RenderOptions(dpi=200))
loader.close()
# Get page chunks
page_chunks = [c for c in parse_result.chunks if c.page == page]
if annotate:
# Create annotated image
bboxes = [c.bbox for c in page_chunks]
labels = [f"{c.chunk_type.value[:10]}" for c in page_chunks]
annotated = create_annotated_image(page_image, bboxes, labels)
output_file = output_path / f"annotated_page_{page}.png"
Image.fromarray(annotated).save(output_file)
click.echo(f"Saved annotated image: {output_file}")
else:
# Save individual crops
crop_manager = CropManager(output_path)
for chunk in page_chunks:
crop_path = crop_manager.save_crop(
page_image,
parse_result.doc_id,
page,
chunk.bbox,
)
click.echo(f"Saved crop: {crop_path}")
click.echo(f"\nProcessed {len(page_chunks)} chunks from page {page}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command()
@click.argument("paths", nargs=-1, type=click.Path(exists=True), required=True)
@click.option("--max-pages", type=int, help="Maximum pages to process per document")
@click.option("--batch-size", type=int, default=32, help="Embedding batch size")
@click.option("--min-length", type=int, default=10, help="Minimum chunk text length")
def index(paths: tuple, max_pages: Optional[int], batch_size: int, min_length: int):
"""
Index documents into the vector store for RAG.
Example:
sparknet docint index document.pdf
sparknet docint index *.pdf --max-pages 50
sparknet docint index doc1.pdf doc2.pdf doc3.pdf
"""
from src.document_intelligence.tools import get_rag_tool
click.echo(f"Indexing {len(paths)} document(s)...")
click.echo()
try:
tool = get_rag_tool("index_document")
total_indexed = 0
total_skipped = 0
errors = []
for path in paths:
click.echo(f"Processing: {path}")
result = tool.execute(
path=path,
max_pages=max_pages,
)
if result.success:
data = result.data
indexed = data.get("chunks_indexed", 0)
skipped = data.get("chunks_skipped", 0)
total_indexed += indexed
total_skipped += skipped
click.echo(f" Indexed: {indexed} chunks, Skipped: {skipped}")
click.echo(f" Document ID: {data.get('document_id', 'unknown')}")
else:
errors.append((path, result.error))
click.echo(f" Error: {result.error}", err=True)
click.echo()
click.echo("=" * 40)
click.echo(f"Total documents: {len(paths)}")
click.echo(f"Total chunks indexed: {total_indexed}")
click.echo(f"Total chunks skipped: {total_skipped}")
if errors:
click.echo(f"Errors: {len(errors)}")
for path, err in errors:
click.echo(f" - {path}: {err}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command(name="index-stats")
def index_stats():
"""
Show statistics about the vector store index.
Example:
sparknet docint index-stats
"""
from src.document_intelligence.tools import get_rag_tool
try:
tool = get_rag_tool("get_index_stats")
result = tool.execute()
if result.success:
data = result.data
click.echo("Vector Store Statistics:")
click.echo(f" Total chunks: {data.get('total_chunks', 0)}")
click.echo(f" Embedding model: {data.get('embedding_model', 'unknown')}")
click.echo(f" Embedding dimension: {data.get('embedding_dimension', 'unknown')}")
else:
click.echo(f"Error: {result.error}", err=True)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command(name="delete-index")
@click.argument("document_id")
@click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt")
def delete_index(document_id: str, yes: bool):
"""
Delete a document from the vector store index.
Example:
sparknet docint delete-index doc_abc123
"""
from src.document_intelligence.tools import get_rag_tool
if not yes:
click.confirm(f"Delete document '{document_id}' from index?", abort=True)
try:
tool = get_rag_tool("delete_document")
result = tool.execute(document_id=document_id)
if result.success:
data = result.data
click.echo(f"Deleted {data.get('chunks_deleted', 0)} chunks for document: {document_id}")
else:
click.echo(f"Error: {result.error}", err=True)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@docint_cli.command(name="retrieve")
@click.argument("query")
@click.option("--top-k", "-k", type=int, default=5, help="Number of results")
@click.option("--document-id", "-d", help="Filter by document ID")
@click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type")
@click.option("--page-start", type=int, help="Filter by page range start")
@click.option("--page-end", type=int, help="Filter by page range end")
@click.option("--verbose", "-v", is_flag=True, help="Show full chunk text")
def retrieve(query: str, top_k: int, document_id: Optional[str],
chunk_type: tuple, page_start: Optional[int],
page_end: Optional[int], verbose: bool):
"""
Retrieve relevant chunks from the vector store.
Example:
sparknet docint retrieve "payment terms"
sparknet docint retrieve "claims" -d doc_abc123 -t paragraph -k 10
"""
from src.document_intelligence.tools import get_rag_tool
click.echo(f"Query: {query}")
click.echo()
try:
tool = get_rag_tool("retrieve_chunks")
page_range = None
if page_start is not None and page_end is not None:
page_range = (page_start, page_end)
result = tool.execute(
query=query,
top_k=top_k,
document_id=document_id,
chunk_types=list(chunk_type) if chunk_type else None,
page_range=page_range,
)
if result.success:
data = result.data
chunks = data.get("chunks", [])
click.echo(f"Found {len(chunks)} results:\n")
for i, chunk in enumerate(chunks, 1):
click.echo(f"{i}. [sim={chunk['similarity']:.3f}] Page {chunk.get('page', '?')}, {chunk.get('chunk_type', 'text')}")
click.echo(f" Document: {chunk['document_id']}")
text = chunk['text']
if verbose:
click.echo(f" Text: {text}")
else:
click.echo(f" Text: {text[:150]}...")
click.echo()
else:
click.echo(f"Error: {result.error}", err=True)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
# Register with main CLI
def register_commands(cli):
"""Register docint commands with main CLI."""
cli.add_command(docint_cli)