""" Document Intelligence CLI Commands CLI interface for the document_intelligence subsystem. """ import json import sys from pathlib import Path from typing import List, Optional import click @click.group(name="docint") def docint_cli(): """Document Intelligence commands.""" pass @docint_cli.command() @click.argument("path", type=click.Path(exists=True)) @click.option("--output", "-o", type=click.Path(), help="Output JSON file") @click.option("--max-pages", type=int, help="Maximum pages to process") @click.option("--dpi", type=int, default=200, help="Render DPI (default: 200)") @click.option("--format", "output_format", type=click.Choice(["json", "markdown", "text"]), default="json", help="Output format") def parse(path: str, output: Optional[str], max_pages: Optional[int], dpi: int, output_format: str): """ Parse a document into semantic chunks. Example: sparknet docint parse invoice.pdf -o result.json sparknet docint parse document.pdf --format markdown """ from src.document_intelligence import ( DocumentParser, ParserConfig, ) config = ParserConfig( render_dpi=dpi, max_pages=max_pages, ) parser = DocumentParser(config=config) click.echo(f"Parsing: {path}") try: result = parser.parse(path) if output_format == "json": output_data = { "doc_id": result.doc_id, "filename": result.filename, "num_pages": result.num_pages, "chunks": [ { "chunk_id": c.chunk_id, "type": c.chunk_type.value, "text": c.text, "page": c.page, "bbox": c.bbox.xyxy, "confidence": c.confidence, } for c in result.chunks ], "processing_time_ms": result.processing_time_ms, } if output: with open(output, "w") as f: json.dump(output_data, f, indent=2) click.echo(f"Output written to: {output}") else: click.echo(json.dumps(output_data, indent=2)) elif output_format == "markdown": if output: with open(output, "w") as f: f.write(result.markdown_full) click.echo(f"Markdown written to: {output}") else: click.echo(result.markdown_full) else: # text for chunk in result.chunks: click.echo(f"[Page {chunk.page}, {chunk.chunk_type.value}]") click.echo(chunk.text) click.echo() click.echo(f"\nParsed {len(result.chunks)} chunks in {result.processing_time_ms:.0f}ms") except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command() @click.argument("path", type=click.Path(exists=True)) @click.option("--field", "-f", multiple=True, help="Field to extract (can specify multiple)") @click.option("--schema", "-s", type=click.Path(exists=True), help="JSON schema file") @click.option("--preset", type=click.Choice(["invoice", "receipt", "contract"]), help="Use preset schema") @click.option("--output", "-o", type=click.Path(), help="Output JSON file") def extract(path: str, field: tuple, schema: Optional[str], preset: Optional[str], output: Optional[str]): """ Extract fields from a document. Example: sparknet docint extract invoice.pdf --preset invoice sparknet docint extract doc.pdf -f vendor_name -f total_amount sparknet docint extract doc.pdf --schema my_schema.json """ from src.document_intelligence import ( DocumentParser, FieldExtractor, ExtractionSchema, FieldSpec, FieldType, create_invoice_schema, create_receipt_schema, create_contract_schema, ) # Build schema if preset: if preset == "invoice": extraction_schema = create_invoice_schema() elif preset == "receipt": extraction_schema = create_receipt_schema() elif preset == "contract": extraction_schema = create_contract_schema() elif schema: with open(schema) as f: schema_dict = json.load(f) extraction_schema = ExtractionSchema.from_json_schema(schema_dict) elif field: extraction_schema = ExtractionSchema(name="custom") for f in field: extraction_schema.add_string_field(f, required=True) else: click.echo("Error: Specify --field, --schema, or --preset", err=True) sys.exit(1) click.echo(f"Extracting from: {path}") click.echo(f"Fields: {', '.join(f.name for f in extraction_schema.fields)}") try: # Parse document parser = DocumentParser() parse_result = parser.parse(path) # Extract fields extractor = FieldExtractor() result = extractor.extract(parse_result, extraction_schema) output_data = { "doc_id": parse_result.doc_id, "filename": parse_result.filename, "extracted_data": result.data, "confidence": result.overall_confidence, "abstained_fields": result.abstained_fields, "evidence": [ { "chunk_id": e.chunk_id, "page": e.page, "bbox": e.bbox.xyxy, "snippet": e.snippet, } for e in result.evidence ], } if output: with open(output, "w") as f: json.dump(output_data, f, indent=2) click.echo(f"Output written to: {output}") else: click.echo("\nExtracted Data:") for key, value in result.data.items(): status = "" if key not in result.abstained_fields else " [ABSTAINED]" click.echo(f" {key}: {value}{status}") click.echo(f"\nConfidence: {result.overall_confidence:.2f}") if result.abstained_fields: click.echo(f"Abstained: {', '.join(result.abstained_fields)}") except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command() @click.argument("path", type=click.Path(exists=True)) @click.argument("question") @click.option("--verbose", "-v", is_flag=True, help="Show evidence details") @click.option("--use-rag", is_flag=True, help="Use RAG for retrieval (requires indexed document)") @click.option("--document-id", "-d", help="Document ID for RAG retrieval") @click.option("--top-k", "-k", type=int, default=5, help="Number of chunks to consider") @click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type (can specify multiple)") @click.option("--page-start", type=int, help="Filter by page range start") @click.option("--page-end", type=int, help="Filter by page range end") def ask(path: str, question: str, verbose: bool, use_rag: bool, document_id: Optional[str], top_k: int, chunk_type: tuple, page_start: Optional[int], page_end: Optional[int]): """ Ask a question about a document. Example: sparknet docint ask invoice.pdf "What is the total amount?" sparknet docint ask doc.pdf "Find claims" --use-rag --top-k 10 sparknet docint ask doc.pdf "What tables show?" -t table --use-rag """ from src.document_intelligence import DocumentParser click.echo(f"Document: {path}") click.echo(f"Question: {question}") if use_rag: click.echo("Mode: RAG (semantic retrieval)") else: click.echo("Mode: Keyword search") click.echo() try: if use_rag: # Use RAG-based answering from src.document_intelligence.tools import get_rag_tool tool = get_rag_tool("rag_answer") # Build page range if specified page_range = None if page_start is not None and page_end is not None: page_range = (page_start, page_end) result = tool.execute( question=question, document_id=document_id, top_k=top_k, chunk_types=list(chunk_type) if chunk_type else None, page_range=page_range, ) else: # Parse document and use keyword-based search from src.document_intelligence.tools import get_tool parser = DocumentParser() parse_result = parser.parse(path) tool = get_tool("answer_question") result = tool.execute( parse_result=parse_result, question=question, top_k=top_k, ) if result.success: data = result.data click.echo(f"Answer: {data.get('answer', 'No answer found')}") click.echo(f"Confidence: {data.get('confidence', 0):.2f}") if data.get('abstained'): click.echo("Note: The system abstained due to low confidence.") if verbose and result.evidence: click.echo("\nEvidence:") for ev in result.evidence: click.echo(f" - Page {ev.get('page', '?')}: {ev.get('snippet', '')[:100]}...") if data.get('citations'): click.echo("\nCitations:") for cit in data['citations']: click.echo(f" [{cit['index']}] {cit.get('text', '')[:80]}...") else: click.echo(f"Error: {result.error}", err=True) except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command() @click.argument("path", type=click.Path(exists=True)) @click.option("--output", "-o", type=click.Path(), help="Output JSON file") def classify(path: str, output: Optional[str]): """ Classify a document's type. Example: sparknet docint classify document.pdf """ from src.document_intelligence import DocumentParser from src.document_intelligence.chunks import DocumentType click.echo(f"Classifying: {path}") try: # Parse document parser = DocumentParser() parse_result = parser.parse(path) # Simple classification based on keywords first_page_chunks = [c for c in parse_result.chunks if c.page == 1][:5] content = " ".join(c.text[:200] for c in first_page_chunks).lower() doc_type = "other" confidence = 0.5 type_keywords = { "invoice": ["invoice", "bill", "payment due", "amount due", "invoice number"], "contract": ["agreement", "contract", "party", "whereas", "terms and conditions"], "receipt": ["receipt", "paid", "transaction", "thank you for your purchase"], "form": ["form", "fill in", "checkbox", "signature line"], "letter": ["dear", "sincerely", "regards", "to whom it may concern"], "report": ["report", "findings", "conclusion", "summary", "analysis"], "patent": ["patent", "claims", "invention", "embodiment", "disclosed"], } for dtype, keywords in type_keywords.items(): matches = sum(1 for k in keywords if k in content) if matches >= 2: doc_type = dtype confidence = min(0.95, 0.5 + matches * 0.15) break output_data = { "doc_id": parse_result.doc_id, "filename": parse_result.filename, "document_type": doc_type, "confidence": confidence, } if output: with open(output, "w") as f: json.dump(output_data, f, indent=2) click.echo(f"Output written to: {output}") else: click.echo(f"Type: {doc_type}") click.echo(f"Confidence: {confidence:.2f}") except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command() @click.argument("path", type=click.Path(exists=True)) @click.option("--query", "-q", help="Search query") @click.option("--type", "chunk_type", help="Filter by chunk type") @click.option("--top", "-k", type=int, default=10, help="Number of results") def search(path: str, query: Optional[str], chunk_type: Optional[str], top: int): """ Search document content. Example: sparknet docint search document.pdf -q "payment terms" sparknet docint search document.pdf --type table """ from src.document_intelligence import DocumentParser from src.document_intelligence.tools import get_tool click.echo(f"Searching: {path}") try: # Parse document parser = DocumentParser() parse_result = parser.parse(path) if query: # Search by query tool = get_tool("search_chunks") result = tool.execute( parse_result=parse_result, query=query, chunk_types=[chunk_type] if chunk_type else None, top_k=top, ) if result.success: results = result.data.get("results", []) click.echo(f"Found {len(results)} results:\n") for i, r in enumerate(results, 1): click.echo(f"{i}. [Page {r['page']}, {r['type']}] (score: {r['score']:.2f})") click.echo(f" {r['text'][:200]}...") click.echo() else: click.echo(f"Error: {result.error}", err=True) elif chunk_type: # Filter by type matching = [c for c in parse_result.chunks if c.chunk_type.value == chunk_type] click.echo(f"Found {len(matching)} {chunk_type} chunks:\n") for i, chunk in enumerate(matching[:top], 1): click.echo(f"{i}. [Page {chunk.page}] {chunk.chunk_id}") click.echo(f" {chunk.text[:200]}...") click.echo() else: # List all chunks click.echo(f"Total chunks: {len(parse_result.chunks)}\n") # Group by type by_type = {} for chunk in parse_result.chunks: t = chunk.chunk_type.value by_type[t] = by_type.get(t, 0) + 1 click.echo("Chunk types:") for t, count in sorted(by_type.items()): click.echo(f" {t}: {count}") except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command() @click.argument("path", type=click.Path(exists=True)) @click.option("--page", "-p", type=int, default=1, help="Page number") @click.option("--output-dir", "-d", type=click.Path(), default="./crops", help="Output directory for crops") @click.option("--annotate", "-a", is_flag=True, help="Create annotated page image") def visualize(path: str, page: int, output_dir: str, annotate: bool): """ Visualize document regions. Example: sparknet docint visualize document.pdf --page 1 --annotate """ from src.document_intelligence import ( DocumentParser, load_document, RenderOptions, ) from src.document_intelligence.grounding import create_annotated_image, CropManager from PIL import Image import numpy as np output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) click.echo(f"Processing: {path}, page {page}") try: # Parse document parser = DocumentParser() parse_result = parser.parse(path) # Load and render page loader, renderer = load_document(path) page_image = renderer.render_page(page, RenderOptions(dpi=200)) loader.close() # Get page chunks page_chunks = [c for c in parse_result.chunks if c.page == page] if annotate: # Create annotated image bboxes = [c.bbox for c in page_chunks] labels = [f"{c.chunk_type.value[:10]}" for c in page_chunks] annotated = create_annotated_image(page_image, bboxes, labels) output_file = output_path / f"annotated_page_{page}.png" Image.fromarray(annotated).save(output_file) click.echo(f"Saved annotated image: {output_file}") else: # Save individual crops crop_manager = CropManager(output_path) for chunk in page_chunks: crop_path = crop_manager.save_crop( page_image, parse_result.doc_id, page, chunk.bbox, ) click.echo(f"Saved crop: {crop_path}") click.echo(f"\nProcessed {len(page_chunks)} chunks from page {page}") except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command() @click.argument("paths", nargs=-1, type=click.Path(exists=True), required=True) @click.option("--max-pages", type=int, help="Maximum pages to process per document") @click.option("--batch-size", type=int, default=32, help="Embedding batch size") @click.option("--min-length", type=int, default=10, help="Minimum chunk text length") def index(paths: tuple, max_pages: Optional[int], batch_size: int, min_length: int): """ Index documents into the vector store for RAG. Example: sparknet docint index document.pdf sparknet docint index *.pdf --max-pages 50 sparknet docint index doc1.pdf doc2.pdf doc3.pdf """ from src.document_intelligence.tools import get_rag_tool click.echo(f"Indexing {len(paths)} document(s)...") click.echo() try: tool = get_rag_tool("index_document") total_indexed = 0 total_skipped = 0 errors = [] for path in paths: click.echo(f"Processing: {path}") result = tool.execute( path=path, max_pages=max_pages, ) if result.success: data = result.data indexed = data.get("chunks_indexed", 0) skipped = data.get("chunks_skipped", 0) total_indexed += indexed total_skipped += skipped click.echo(f" Indexed: {indexed} chunks, Skipped: {skipped}") click.echo(f" Document ID: {data.get('document_id', 'unknown')}") else: errors.append((path, result.error)) click.echo(f" Error: {result.error}", err=True) click.echo() click.echo("=" * 40) click.echo(f"Total documents: {len(paths)}") click.echo(f"Total chunks indexed: {total_indexed}") click.echo(f"Total chunks skipped: {total_skipped}") if errors: click.echo(f"Errors: {len(errors)}") for path, err in errors: click.echo(f" - {path}: {err}") except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command(name="index-stats") def index_stats(): """ Show statistics about the vector store index. Example: sparknet docint index-stats """ from src.document_intelligence.tools import get_rag_tool try: tool = get_rag_tool("get_index_stats") result = tool.execute() if result.success: data = result.data click.echo("Vector Store Statistics:") click.echo(f" Total chunks: {data.get('total_chunks', 0)}") click.echo(f" Embedding model: {data.get('embedding_model', 'unknown')}") click.echo(f" Embedding dimension: {data.get('embedding_dimension', 'unknown')}") else: click.echo(f"Error: {result.error}", err=True) except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command(name="delete-index") @click.argument("document_id") @click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt") def delete_index(document_id: str, yes: bool): """ Delete a document from the vector store index. Example: sparknet docint delete-index doc_abc123 """ from src.document_intelligence.tools import get_rag_tool if not yes: click.confirm(f"Delete document '{document_id}' from index?", abort=True) try: tool = get_rag_tool("delete_document") result = tool.execute(document_id=document_id) if result.success: data = result.data click.echo(f"Deleted {data.get('chunks_deleted', 0)} chunks for document: {document_id}") else: click.echo(f"Error: {result.error}", err=True) except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) @docint_cli.command(name="retrieve") @click.argument("query") @click.option("--top-k", "-k", type=int, default=5, help="Number of results") @click.option("--document-id", "-d", help="Filter by document ID") @click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type") @click.option("--page-start", type=int, help="Filter by page range start") @click.option("--page-end", type=int, help="Filter by page range end") @click.option("--verbose", "-v", is_flag=True, help="Show full chunk text") def retrieve(query: str, top_k: int, document_id: Optional[str], chunk_type: tuple, page_start: Optional[int], page_end: Optional[int], verbose: bool): """ Retrieve relevant chunks from the vector store. Example: sparknet docint retrieve "payment terms" sparknet docint retrieve "claims" -d doc_abc123 -t paragraph -k 10 """ from src.document_intelligence.tools import get_rag_tool click.echo(f"Query: {query}") click.echo() try: tool = get_rag_tool("retrieve_chunks") page_range = None if page_start is not None and page_end is not None: page_range = (page_start, page_end) result = tool.execute( query=query, top_k=top_k, document_id=document_id, chunk_types=list(chunk_type) if chunk_type else None, page_range=page_range, ) if result.success: data = result.data chunks = data.get("chunks", []) click.echo(f"Found {len(chunks)} results:\n") for i, chunk in enumerate(chunks, 1): click.echo(f"{i}. [sim={chunk['similarity']:.3f}] Page {chunk.get('page', '?')}, {chunk.get('chunk_type', 'text')}") click.echo(f" Document: {chunk['document_id']}") text = chunk['text'] if verbose: click.echo(f" Text: {text}") else: click.echo(f" Text: {text[:150]}...") click.echo() else: click.echo(f"Error: {result.error}", err=True) except Exception as e: click.echo(f"Error: {e}", err=True) sys.exit(1) # Register with main CLI def register_commands(cli): """Register docint commands with main CLI.""" cli.add_command(docint_cli)