|
|
""" |
|
|
Document Intelligence CLI Commands |
|
|
|
|
|
CLI interface for the document_intelligence subsystem. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import List, Optional |
|
|
|
|
|
import click |
|
|
|
|
|
|
|
|
@click.group(name="docint") |
|
|
def docint_cli(): |
|
|
"""Document Intelligence commands.""" |
|
|
pass |
|
|
|
|
|
|
|
|
@docint_cli.command() |
|
|
@click.argument("path", type=click.Path(exists=True)) |
|
|
@click.option("--output", "-o", type=click.Path(), help="Output JSON file") |
|
|
@click.option("--max-pages", type=int, help="Maximum pages to process") |
|
|
@click.option("--dpi", type=int, default=200, help="Render DPI (default: 200)") |
|
|
@click.option("--format", "output_format", type=click.Choice(["json", "markdown", "text"]), |
|
|
default="json", help="Output format") |
|
|
def parse(path: str, output: Optional[str], max_pages: Optional[int], |
|
|
dpi: int, output_format: str): |
|
|
""" |
|
|
Parse a document into semantic chunks. |
|
|
|
|
|
Example: |
|
|
sparknet docint parse invoice.pdf -o result.json |
|
|
sparknet docint parse document.pdf --format markdown |
|
|
""" |
|
|
from src.document_intelligence import ( |
|
|
DocumentParser, |
|
|
ParserConfig, |
|
|
) |
|
|
|
|
|
config = ParserConfig( |
|
|
render_dpi=dpi, |
|
|
max_pages=max_pages, |
|
|
) |
|
|
|
|
|
parser = DocumentParser(config=config) |
|
|
|
|
|
click.echo(f"Parsing: {path}") |
|
|
|
|
|
try: |
|
|
result = parser.parse(path) |
|
|
|
|
|
if output_format == "json": |
|
|
output_data = { |
|
|
"doc_id": result.doc_id, |
|
|
"filename": result.filename, |
|
|
"num_pages": result.num_pages, |
|
|
"chunks": [ |
|
|
{ |
|
|
"chunk_id": c.chunk_id, |
|
|
"type": c.chunk_type.value, |
|
|
"text": c.text, |
|
|
"page": c.page, |
|
|
"bbox": c.bbox.xyxy, |
|
|
"confidence": c.confidence, |
|
|
} |
|
|
for c in result.chunks |
|
|
], |
|
|
"processing_time_ms": result.processing_time_ms, |
|
|
} |
|
|
|
|
|
if output: |
|
|
with open(output, "w") as f: |
|
|
json.dump(output_data, f, indent=2) |
|
|
click.echo(f"Output written to: {output}") |
|
|
else: |
|
|
click.echo(json.dumps(output_data, indent=2)) |
|
|
|
|
|
elif output_format == "markdown": |
|
|
if output: |
|
|
with open(output, "w") as f: |
|
|
f.write(result.markdown_full) |
|
|
click.echo(f"Markdown written to: {output}") |
|
|
else: |
|
|
click.echo(result.markdown_full) |
|
|
|
|
|
else: |
|
|
for chunk in result.chunks: |
|
|
click.echo(f"[Page {chunk.page}, {chunk.chunk_type.value}]") |
|
|
click.echo(chunk.text) |
|
|
click.echo() |
|
|
|
|
|
click.echo(f"\nParsed {len(result.chunks)} chunks in {result.processing_time_ms:.0f}ms") |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command() |
|
|
@click.argument("path", type=click.Path(exists=True)) |
|
|
@click.option("--field", "-f", multiple=True, help="Field to extract (can specify multiple)") |
|
|
@click.option("--schema", "-s", type=click.Path(exists=True), help="JSON schema file") |
|
|
@click.option("--preset", type=click.Choice(["invoice", "receipt", "contract"]), |
|
|
help="Use preset schema") |
|
|
@click.option("--output", "-o", type=click.Path(), help="Output JSON file") |
|
|
def extract(path: str, field: tuple, schema: Optional[str], preset: Optional[str], |
|
|
output: Optional[str]): |
|
|
""" |
|
|
Extract fields from a document. |
|
|
|
|
|
Example: |
|
|
sparknet docint extract invoice.pdf --preset invoice |
|
|
sparknet docint extract doc.pdf -f vendor_name -f total_amount |
|
|
sparknet docint extract doc.pdf --schema my_schema.json |
|
|
""" |
|
|
from src.document_intelligence import ( |
|
|
DocumentParser, |
|
|
FieldExtractor, |
|
|
ExtractionSchema, |
|
|
FieldSpec, |
|
|
FieldType, |
|
|
create_invoice_schema, |
|
|
create_receipt_schema, |
|
|
create_contract_schema, |
|
|
) |
|
|
|
|
|
|
|
|
if preset: |
|
|
if preset == "invoice": |
|
|
extraction_schema = create_invoice_schema() |
|
|
elif preset == "receipt": |
|
|
extraction_schema = create_receipt_schema() |
|
|
elif preset == "contract": |
|
|
extraction_schema = create_contract_schema() |
|
|
elif schema: |
|
|
with open(schema) as f: |
|
|
schema_dict = json.load(f) |
|
|
extraction_schema = ExtractionSchema.from_json_schema(schema_dict) |
|
|
elif field: |
|
|
extraction_schema = ExtractionSchema(name="custom") |
|
|
for f in field: |
|
|
extraction_schema.add_string_field(f, required=True) |
|
|
else: |
|
|
click.echo("Error: Specify --field, --schema, or --preset", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
click.echo(f"Extracting from: {path}") |
|
|
click.echo(f"Fields: {', '.join(f.name for f in extraction_schema.fields)}") |
|
|
|
|
|
try: |
|
|
|
|
|
parser = DocumentParser() |
|
|
parse_result = parser.parse(path) |
|
|
|
|
|
|
|
|
extractor = FieldExtractor() |
|
|
result = extractor.extract(parse_result, extraction_schema) |
|
|
|
|
|
output_data = { |
|
|
"doc_id": parse_result.doc_id, |
|
|
"filename": parse_result.filename, |
|
|
"extracted_data": result.data, |
|
|
"confidence": result.overall_confidence, |
|
|
"abstained_fields": result.abstained_fields, |
|
|
"evidence": [ |
|
|
{ |
|
|
"chunk_id": e.chunk_id, |
|
|
"page": e.page, |
|
|
"bbox": e.bbox.xyxy, |
|
|
"snippet": e.snippet, |
|
|
} |
|
|
for e in result.evidence |
|
|
], |
|
|
} |
|
|
|
|
|
if output: |
|
|
with open(output, "w") as f: |
|
|
json.dump(output_data, f, indent=2) |
|
|
click.echo(f"Output written to: {output}") |
|
|
else: |
|
|
click.echo("\nExtracted Data:") |
|
|
for key, value in result.data.items(): |
|
|
status = "" if key not in result.abstained_fields else " [ABSTAINED]" |
|
|
click.echo(f" {key}: {value}{status}") |
|
|
|
|
|
click.echo(f"\nConfidence: {result.overall_confidence:.2f}") |
|
|
|
|
|
if result.abstained_fields: |
|
|
click.echo(f"Abstained: {', '.join(result.abstained_fields)}") |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command() |
|
|
@click.argument("path", type=click.Path(exists=True)) |
|
|
@click.argument("question") |
|
|
@click.option("--verbose", "-v", is_flag=True, help="Show evidence details") |
|
|
@click.option("--use-rag", is_flag=True, help="Use RAG for retrieval (requires indexed document)") |
|
|
@click.option("--document-id", "-d", help="Document ID for RAG retrieval") |
|
|
@click.option("--top-k", "-k", type=int, default=5, help="Number of chunks to consider") |
|
|
@click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type (can specify multiple)") |
|
|
@click.option("--page-start", type=int, help="Filter by page range start") |
|
|
@click.option("--page-end", type=int, help="Filter by page range end") |
|
|
def ask(path: str, question: str, verbose: bool, use_rag: bool, |
|
|
document_id: Optional[str], top_k: int, chunk_type: tuple, |
|
|
page_start: Optional[int], page_end: Optional[int]): |
|
|
""" |
|
|
Ask a question about a document. |
|
|
|
|
|
Example: |
|
|
sparknet docint ask invoice.pdf "What is the total amount?" |
|
|
sparknet docint ask doc.pdf "Find claims" --use-rag --top-k 10 |
|
|
sparknet docint ask doc.pdf "What tables show?" -t table --use-rag |
|
|
""" |
|
|
from src.document_intelligence import DocumentParser |
|
|
|
|
|
click.echo(f"Document: {path}") |
|
|
click.echo(f"Question: {question}") |
|
|
|
|
|
if use_rag: |
|
|
click.echo("Mode: RAG (semantic retrieval)") |
|
|
else: |
|
|
click.echo("Mode: Keyword search") |
|
|
|
|
|
click.echo() |
|
|
|
|
|
try: |
|
|
if use_rag: |
|
|
|
|
|
from src.document_intelligence.tools import get_rag_tool |
|
|
|
|
|
tool = get_rag_tool("rag_answer") |
|
|
|
|
|
|
|
|
page_range = None |
|
|
if page_start is not None and page_end is not None: |
|
|
page_range = (page_start, page_end) |
|
|
|
|
|
result = tool.execute( |
|
|
question=question, |
|
|
document_id=document_id, |
|
|
top_k=top_k, |
|
|
chunk_types=list(chunk_type) if chunk_type else None, |
|
|
page_range=page_range, |
|
|
) |
|
|
else: |
|
|
|
|
|
from src.document_intelligence.tools import get_tool |
|
|
|
|
|
parser = DocumentParser() |
|
|
parse_result = parser.parse(path) |
|
|
|
|
|
tool = get_tool("answer_question") |
|
|
result = tool.execute( |
|
|
parse_result=parse_result, |
|
|
question=question, |
|
|
top_k=top_k, |
|
|
) |
|
|
|
|
|
if result.success: |
|
|
data = result.data |
|
|
click.echo(f"Answer: {data.get('answer', 'No answer found')}") |
|
|
click.echo(f"Confidence: {data.get('confidence', 0):.2f}") |
|
|
|
|
|
if data.get('abstained'): |
|
|
click.echo("Note: The system abstained due to low confidence.") |
|
|
|
|
|
if verbose and result.evidence: |
|
|
click.echo("\nEvidence:") |
|
|
for ev in result.evidence: |
|
|
click.echo(f" - Page {ev.get('page', '?')}: {ev.get('snippet', '')[:100]}...") |
|
|
|
|
|
if data.get('citations'): |
|
|
click.echo("\nCitations:") |
|
|
for cit in data['citations']: |
|
|
click.echo(f" [{cit['index']}] {cit.get('text', '')[:80]}...") |
|
|
else: |
|
|
click.echo(f"Error: {result.error}", err=True) |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command() |
|
|
@click.argument("path", type=click.Path(exists=True)) |
|
|
@click.option("--output", "-o", type=click.Path(), help="Output JSON file") |
|
|
def classify(path: str, output: Optional[str]): |
|
|
""" |
|
|
Classify a document's type. |
|
|
|
|
|
Example: |
|
|
sparknet docint classify document.pdf |
|
|
""" |
|
|
from src.document_intelligence import DocumentParser |
|
|
from src.document_intelligence.chunks import DocumentType |
|
|
|
|
|
click.echo(f"Classifying: {path}") |
|
|
|
|
|
try: |
|
|
|
|
|
parser = DocumentParser() |
|
|
parse_result = parser.parse(path) |
|
|
|
|
|
|
|
|
first_page_chunks = [c for c in parse_result.chunks if c.page == 1][:5] |
|
|
content = " ".join(c.text[:200] for c in first_page_chunks).lower() |
|
|
|
|
|
doc_type = "other" |
|
|
confidence = 0.5 |
|
|
|
|
|
type_keywords = { |
|
|
"invoice": ["invoice", "bill", "payment due", "amount due", "invoice number"], |
|
|
"contract": ["agreement", "contract", "party", "whereas", "terms and conditions"], |
|
|
"receipt": ["receipt", "paid", "transaction", "thank you for your purchase"], |
|
|
"form": ["form", "fill in", "checkbox", "signature line"], |
|
|
"letter": ["dear", "sincerely", "regards", "to whom it may concern"], |
|
|
"report": ["report", "findings", "conclusion", "summary", "analysis"], |
|
|
"patent": ["patent", "claims", "invention", "embodiment", "disclosed"], |
|
|
} |
|
|
|
|
|
for dtype, keywords in type_keywords.items(): |
|
|
matches = sum(1 for k in keywords if k in content) |
|
|
if matches >= 2: |
|
|
doc_type = dtype |
|
|
confidence = min(0.95, 0.5 + matches * 0.15) |
|
|
break |
|
|
|
|
|
output_data = { |
|
|
"doc_id": parse_result.doc_id, |
|
|
"filename": parse_result.filename, |
|
|
"document_type": doc_type, |
|
|
"confidence": confidence, |
|
|
} |
|
|
|
|
|
if output: |
|
|
with open(output, "w") as f: |
|
|
json.dump(output_data, f, indent=2) |
|
|
click.echo(f"Output written to: {output}") |
|
|
else: |
|
|
click.echo(f"Type: {doc_type}") |
|
|
click.echo(f"Confidence: {confidence:.2f}") |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command() |
|
|
@click.argument("path", type=click.Path(exists=True)) |
|
|
@click.option("--query", "-q", help="Search query") |
|
|
@click.option("--type", "chunk_type", help="Filter by chunk type") |
|
|
@click.option("--top", "-k", type=int, default=10, help="Number of results") |
|
|
def search(path: str, query: Optional[str], chunk_type: Optional[str], top: int): |
|
|
""" |
|
|
Search document content. |
|
|
|
|
|
Example: |
|
|
sparknet docint search document.pdf -q "payment terms" |
|
|
sparknet docint search document.pdf --type table |
|
|
""" |
|
|
from src.document_intelligence import DocumentParser |
|
|
from src.document_intelligence.tools import get_tool |
|
|
|
|
|
click.echo(f"Searching: {path}") |
|
|
|
|
|
try: |
|
|
|
|
|
parser = DocumentParser() |
|
|
parse_result = parser.parse(path) |
|
|
|
|
|
if query: |
|
|
|
|
|
tool = get_tool("search_chunks") |
|
|
result = tool.execute( |
|
|
parse_result=parse_result, |
|
|
query=query, |
|
|
chunk_types=[chunk_type] if chunk_type else None, |
|
|
top_k=top, |
|
|
) |
|
|
|
|
|
if result.success: |
|
|
results = result.data.get("results", []) |
|
|
click.echo(f"Found {len(results)} results:\n") |
|
|
|
|
|
for i, r in enumerate(results, 1): |
|
|
click.echo(f"{i}. [Page {r['page']}, {r['type']}] (score: {r['score']:.2f})") |
|
|
click.echo(f" {r['text'][:200]}...") |
|
|
click.echo() |
|
|
else: |
|
|
click.echo(f"Error: {result.error}", err=True) |
|
|
|
|
|
elif chunk_type: |
|
|
|
|
|
matching = [c for c in parse_result.chunks if c.chunk_type.value == chunk_type] |
|
|
click.echo(f"Found {len(matching)} {chunk_type} chunks:\n") |
|
|
|
|
|
for i, chunk in enumerate(matching[:top], 1): |
|
|
click.echo(f"{i}. [Page {chunk.page}] {chunk.chunk_id}") |
|
|
click.echo(f" {chunk.text[:200]}...") |
|
|
click.echo() |
|
|
|
|
|
else: |
|
|
|
|
|
click.echo(f"Total chunks: {len(parse_result.chunks)}\n") |
|
|
|
|
|
|
|
|
by_type = {} |
|
|
for chunk in parse_result.chunks: |
|
|
t = chunk.chunk_type.value |
|
|
by_type[t] = by_type.get(t, 0) + 1 |
|
|
|
|
|
click.echo("Chunk types:") |
|
|
for t, count in sorted(by_type.items()): |
|
|
click.echo(f" {t}: {count}") |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command() |
|
|
@click.argument("path", type=click.Path(exists=True)) |
|
|
@click.option("--page", "-p", type=int, default=1, help="Page number") |
|
|
@click.option("--output-dir", "-d", type=click.Path(), default="./crops", |
|
|
help="Output directory for crops") |
|
|
@click.option("--annotate", "-a", is_flag=True, help="Create annotated page image") |
|
|
def visualize(path: str, page: int, output_dir: str, annotate: bool): |
|
|
""" |
|
|
Visualize document regions. |
|
|
|
|
|
Example: |
|
|
sparknet docint visualize document.pdf --page 1 --annotate |
|
|
""" |
|
|
from src.document_intelligence import ( |
|
|
DocumentParser, |
|
|
load_document, |
|
|
RenderOptions, |
|
|
) |
|
|
from src.document_intelligence.grounding import create_annotated_image, CropManager |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
|
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
click.echo(f"Processing: {path}, page {page}") |
|
|
|
|
|
try: |
|
|
|
|
|
parser = DocumentParser() |
|
|
parse_result = parser.parse(path) |
|
|
|
|
|
|
|
|
loader, renderer = load_document(path) |
|
|
page_image = renderer.render_page(page, RenderOptions(dpi=200)) |
|
|
loader.close() |
|
|
|
|
|
|
|
|
page_chunks = [c for c in parse_result.chunks if c.page == page] |
|
|
|
|
|
if annotate: |
|
|
|
|
|
bboxes = [c.bbox for c in page_chunks] |
|
|
labels = [f"{c.chunk_type.value[:10]}" for c in page_chunks] |
|
|
|
|
|
annotated = create_annotated_image(page_image, bboxes, labels) |
|
|
|
|
|
output_file = output_path / f"annotated_page_{page}.png" |
|
|
Image.fromarray(annotated).save(output_file) |
|
|
click.echo(f"Saved annotated image: {output_file}") |
|
|
|
|
|
else: |
|
|
|
|
|
crop_manager = CropManager(output_path) |
|
|
|
|
|
for chunk in page_chunks: |
|
|
crop_path = crop_manager.save_crop( |
|
|
page_image, |
|
|
parse_result.doc_id, |
|
|
page, |
|
|
chunk.bbox, |
|
|
) |
|
|
click.echo(f"Saved crop: {crop_path}") |
|
|
|
|
|
click.echo(f"\nProcessed {len(page_chunks)} chunks from page {page}") |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command() |
|
|
@click.argument("paths", nargs=-1, type=click.Path(exists=True), required=True) |
|
|
@click.option("--max-pages", type=int, help="Maximum pages to process per document") |
|
|
@click.option("--batch-size", type=int, default=32, help="Embedding batch size") |
|
|
@click.option("--min-length", type=int, default=10, help="Minimum chunk text length") |
|
|
def index(paths: tuple, max_pages: Optional[int], batch_size: int, min_length: int): |
|
|
""" |
|
|
Index documents into the vector store for RAG. |
|
|
|
|
|
Example: |
|
|
sparknet docint index document.pdf |
|
|
sparknet docint index *.pdf --max-pages 50 |
|
|
sparknet docint index doc1.pdf doc2.pdf doc3.pdf |
|
|
""" |
|
|
from src.document_intelligence.tools import get_rag_tool |
|
|
|
|
|
click.echo(f"Indexing {len(paths)} document(s)...") |
|
|
click.echo() |
|
|
|
|
|
try: |
|
|
tool = get_rag_tool("index_document") |
|
|
|
|
|
total_indexed = 0 |
|
|
total_skipped = 0 |
|
|
errors = [] |
|
|
|
|
|
for path in paths: |
|
|
click.echo(f"Processing: {path}") |
|
|
|
|
|
result = tool.execute( |
|
|
path=path, |
|
|
max_pages=max_pages, |
|
|
) |
|
|
|
|
|
if result.success: |
|
|
data = result.data |
|
|
indexed = data.get("chunks_indexed", 0) |
|
|
skipped = data.get("chunks_skipped", 0) |
|
|
total_indexed += indexed |
|
|
total_skipped += skipped |
|
|
click.echo(f" Indexed: {indexed} chunks, Skipped: {skipped}") |
|
|
click.echo(f" Document ID: {data.get('document_id', 'unknown')}") |
|
|
else: |
|
|
errors.append((path, result.error)) |
|
|
click.echo(f" Error: {result.error}", err=True) |
|
|
|
|
|
click.echo() |
|
|
click.echo("=" * 40) |
|
|
click.echo(f"Total documents: {len(paths)}") |
|
|
click.echo(f"Total chunks indexed: {total_indexed}") |
|
|
click.echo(f"Total chunks skipped: {total_skipped}") |
|
|
|
|
|
if errors: |
|
|
click.echo(f"Errors: {len(errors)}") |
|
|
for path, err in errors: |
|
|
click.echo(f" - {path}: {err}") |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command(name="index-stats") |
|
|
def index_stats(): |
|
|
""" |
|
|
Show statistics about the vector store index. |
|
|
|
|
|
Example: |
|
|
sparknet docint index-stats |
|
|
""" |
|
|
from src.document_intelligence.tools import get_rag_tool |
|
|
|
|
|
try: |
|
|
tool = get_rag_tool("get_index_stats") |
|
|
result = tool.execute() |
|
|
|
|
|
if result.success: |
|
|
data = result.data |
|
|
click.echo("Vector Store Statistics:") |
|
|
click.echo(f" Total chunks: {data.get('total_chunks', 0)}") |
|
|
click.echo(f" Embedding model: {data.get('embedding_model', 'unknown')}") |
|
|
click.echo(f" Embedding dimension: {data.get('embedding_dimension', 'unknown')}") |
|
|
else: |
|
|
click.echo(f"Error: {result.error}", err=True) |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command(name="delete-index") |
|
|
@click.argument("document_id") |
|
|
@click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt") |
|
|
def delete_index(document_id: str, yes: bool): |
|
|
""" |
|
|
Delete a document from the vector store index. |
|
|
|
|
|
Example: |
|
|
sparknet docint delete-index doc_abc123 |
|
|
""" |
|
|
from src.document_intelligence.tools import get_rag_tool |
|
|
|
|
|
if not yes: |
|
|
click.confirm(f"Delete document '{document_id}' from index?", abort=True) |
|
|
|
|
|
try: |
|
|
tool = get_rag_tool("delete_document") |
|
|
result = tool.execute(document_id=document_id) |
|
|
|
|
|
if result.success: |
|
|
data = result.data |
|
|
click.echo(f"Deleted {data.get('chunks_deleted', 0)} chunks for document: {document_id}") |
|
|
else: |
|
|
click.echo(f"Error: {result.error}", err=True) |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
@docint_cli.command(name="retrieve") |
|
|
@click.argument("query") |
|
|
@click.option("--top-k", "-k", type=int, default=5, help="Number of results") |
|
|
@click.option("--document-id", "-d", help="Filter by document ID") |
|
|
@click.option("--chunk-type", "-t", multiple=True, help="Filter by chunk type") |
|
|
@click.option("--page-start", type=int, help="Filter by page range start") |
|
|
@click.option("--page-end", type=int, help="Filter by page range end") |
|
|
@click.option("--verbose", "-v", is_flag=True, help="Show full chunk text") |
|
|
def retrieve(query: str, top_k: int, document_id: Optional[str], |
|
|
chunk_type: tuple, page_start: Optional[int], |
|
|
page_end: Optional[int], verbose: bool): |
|
|
""" |
|
|
Retrieve relevant chunks from the vector store. |
|
|
|
|
|
Example: |
|
|
sparknet docint retrieve "payment terms" |
|
|
sparknet docint retrieve "claims" -d doc_abc123 -t paragraph -k 10 |
|
|
""" |
|
|
from src.document_intelligence.tools import get_rag_tool |
|
|
|
|
|
click.echo(f"Query: {query}") |
|
|
click.echo() |
|
|
|
|
|
try: |
|
|
tool = get_rag_tool("retrieve_chunks") |
|
|
|
|
|
page_range = None |
|
|
if page_start is not None and page_end is not None: |
|
|
page_range = (page_start, page_end) |
|
|
|
|
|
result = tool.execute( |
|
|
query=query, |
|
|
top_k=top_k, |
|
|
document_id=document_id, |
|
|
chunk_types=list(chunk_type) if chunk_type else None, |
|
|
page_range=page_range, |
|
|
) |
|
|
|
|
|
if result.success: |
|
|
data = result.data |
|
|
chunks = data.get("chunks", []) |
|
|
click.echo(f"Found {len(chunks)} results:\n") |
|
|
|
|
|
for i, chunk in enumerate(chunks, 1): |
|
|
click.echo(f"{i}. [sim={chunk['similarity']:.3f}] Page {chunk.get('page', '?')}, {chunk.get('chunk_type', 'text')}") |
|
|
click.echo(f" Document: {chunk['document_id']}") |
|
|
|
|
|
text = chunk['text'] |
|
|
if verbose: |
|
|
click.echo(f" Text: {text}") |
|
|
else: |
|
|
click.echo(f" Text: {text[:150]}...") |
|
|
click.echo() |
|
|
else: |
|
|
click.echo(f"Error: {result.error}", err=True) |
|
|
|
|
|
except Exception as e: |
|
|
click.echo(f"Error: {e}", err=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
|
|
|
def register_commands(cli): |
|
|
"""Register docint commands with main CLI.""" |
|
|
cli.add_command(docint_cli) |
|
|
|