|
|
""" |
|
|
RAG CLI Commands |
|
|
|
|
|
Commands: |
|
|
sparknet rag index <file> - Index document for retrieval |
|
|
sparknet rag search <query> - Search indexed documents |
|
|
sparknet rag ask <question> - Answer question using RAG |
|
|
sparknet rag status - Show index status |
|
|
""" |
|
|
|
|
|
import typer |
|
|
from typing import Optional, List |
|
|
from pathlib import Path |
|
|
import json |
|
|
import sys |
|
|
|
|
|
|
|
|
rag_app = typer.Typer( |
|
|
name="rag", |
|
|
help="RAG and retrieval commands", |
|
|
) |
|
|
|
|
|
|
|
|
@rag_app.command("index") |
|
|
def index_document( |
|
|
files: List[Path] = typer.Argument(..., help="Document file(s) to index"), |
|
|
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
|
|
embedding_model: str = typer.Option("nomic-embed-text", "--model", "-m", help="Embedding model"), |
|
|
): |
|
|
""" |
|
|
Index document(s) for RAG retrieval. |
|
|
|
|
|
Example: |
|
|
sparknet rag index document.pdf |
|
|
sparknet rag index *.pdf --collection contracts |
|
|
""" |
|
|
from loguru import logger |
|
|
|
|
|
|
|
|
valid_files = [] |
|
|
for f in files: |
|
|
if f.exists(): |
|
|
valid_files.append(f) |
|
|
else: |
|
|
typer.echo(f"Warning: File not found, skipping: {f}", err=True) |
|
|
|
|
|
if not valid_files: |
|
|
typer.echo("Error: No valid files to index", err=True) |
|
|
raise typer.Exit(1) |
|
|
|
|
|
typer.echo(f"Indexing {len(valid_files)} document(s)...") |
|
|
|
|
|
try: |
|
|
from ..rag import ( |
|
|
VectorStoreConfig, |
|
|
EmbeddingConfig, |
|
|
get_document_indexer, |
|
|
) |
|
|
|
|
|
|
|
|
store_config = VectorStoreConfig(collection_name=collection) |
|
|
embed_config = EmbeddingConfig(ollama_model=embedding_model) |
|
|
|
|
|
|
|
|
indexer = get_document_indexer() |
|
|
|
|
|
|
|
|
results = indexer.index_batch([str(f) for f in valid_files]) |
|
|
|
|
|
|
|
|
successful = sum(1 for r in results if r.success) |
|
|
total_chunks = sum(r.num_chunks_indexed for r in results) |
|
|
|
|
|
typer.echo(f"\nIndexing complete:") |
|
|
typer.echo(f" Documents: {successful}/{len(results)} successful") |
|
|
typer.echo(f" Chunks indexed: {total_chunks}") |
|
|
|
|
|
for r in results: |
|
|
status = "✓" if r.success else "✗" |
|
|
typer.echo(f" [{status}] {r.source_path}: {r.num_chunks_indexed} chunks") |
|
|
if r.error: |
|
|
typer.echo(f" Error: {r.error}") |
|
|
|
|
|
except ImportError as e: |
|
|
typer.echo(f"Error: Missing dependency - {e}", err=True) |
|
|
raise typer.Exit(1) |
|
|
except Exception as e: |
|
|
typer.echo(f"Error indexing documents: {e}", err=True) |
|
|
raise typer.Exit(1) |
|
|
|
|
|
|
|
|
@rag_app.command("search") |
|
|
def search_documents( |
|
|
query: str = typer.Argument(..., help="Search query"), |
|
|
top_k: int = typer.Option(5, "--top", "-k", help="Number of results"), |
|
|
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
|
|
document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), |
|
|
chunk_type: Optional[str] = typer.Option(None, "--type", "-t", help="Filter by chunk type"), |
|
|
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), |
|
|
): |
|
|
""" |
|
|
Search indexed documents. |
|
|
|
|
|
Example: |
|
|
sparknet rag search "payment terms" --top 10 |
|
|
sparknet rag search "table data" --type table |
|
|
""" |
|
|
typer.echo(f"Searching: {query}") |
|
|
|
|
|
try: |
|
|
from ..rag import get_document_retriever, RetrieverConfig |
|
|
|
|
|
|
|
|
config = RetrieverConfig(default_top_k=top_k) |
|
|
retriever = get_document_retriever(config) |
|
|
|
|
|
|
|
|
filters = {} |
|
|
if document_id: |
|
|
filters["document_id"] = document_id |
|
|
if chunk_type: |
|
|
filters["chunk_type"] = chunk_type |
|
|
|
|
|
|
|
|
chunks = retriever.retrieve(query, top_k=top_k, filters=filters if filters else None) |
|
|
|
|
|
if not chunks: |
|
|
typer.echo("No results found.") |
|
|
return |
|
|
|
|
|
|
|
|
output_data = { |
|
|
"query": query, |
|
|
"num_results": len(chunks), |
|
|
"results": [ |
|
|
{ |
|
|
"chunk_id": c.chunk_id, |
|
|
"document_id": c.document_id, |
|
|
"page": c.page, |
|
|
"chunk_type": c.chunk_type, |
|
|
"similarity": c.similarity, |
|
|
"text": c.text[:500] + "..." if len(c.text) > 500 else c.text, |
|
|
} |
|
|
for c in chunks |
|
|
], |
|
|
} |
|
|
|
|
|
if output: |
|
|
with open(output, "w") as f: |
|
|
json.dump(output_data, f, indent=2) |
|
|
typer.echo(f"Results written to: {output}") |
|
|
else: |
|
|
typer.echo(f"\nFound {len(chunks)} results:\n") |
|
|
for i, c in enumerate(chunks, 1): |
|
|
typer.echo(f"[{i}] Similarity: {c.similarity:.3f}") |
|
|
if c.page is not None: |
|
|
typer.echo(f" Page: {c.page + 1}, Type: {c.chunk_type or 'text'}") |
|
|
typer.echo(f" {c.text[:200]}...") |
|
|
typer.echo() |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error searching: {e}", err=True) |
|
|
raise typer.Exit(1) |
|
|
|
|
|
|
|
|
@rag_app.command("ask") |
|
|
def ask_question( |
|
|
question: str = typer.Argument(..., help="Question to answer"), |
|
|
top_k: int = typer.Option(5, "--top", "-k", help="Number of context chunks"), |
|
|
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
|
|
document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), |
|
|
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), |
|
|
show_evidence: bool = typer.Option(True, "--evidence/--no-evidence", help="Show evidence sources"), |
|
|
): |
|
|
""" |
|
|
Answer a question using RAG. |
|
|
|
|
|
Example: |
|
|
sparknet rag ask "What are the payment terms?" |
|
|
sparknet rag ask "What is the contract value?" --document contract123 |
|
|
""" |
|
|
typer.echo(f"Question: {question}") |
|
|
typer.echo("Processing...") |
|
|
|
|
|
try: |
|
|
from ..rag import get_grounded_generator, GeneratorConfig |
|
|
|
|
|
|
|
|
config = GeneratorConfig() |
|
|
generator = get_grounded_generator(config) |
|
|
|
|
|
|
|
|
filters = {"document_id": document_id} if document_id else None |
|
|
|
|
|
|
|
|
result = generator.answer_question(question, top_k=top_k, filters=filters) |
|
|
|
|
|
|
|
|
output_data = { |
|
|
"question": question, |
|
|
"answer": result.answer, |
|
|
"confidence": result.confidence, |
|
|
"abstained": result.abstained, |
|
|
"abstain_reason": result.abstain_reason, |
|
|
"citations": [ |
|
|
{ |
|
|
"index": c.index, |
|
|
"page": c.page, |
|
|
"snippet": c.text_snippet, |
|
|
"confidence": c.confidence, |
|
|
} |
|
|
for c in result.citations |
|
|
], |
|
|
"num_chunks_used": result.num_chunks_used, |
|
|
} |
|
|
|
|
|
if output: |
|
|
with open(output, "w") as f: |
|
|
json.dump(output_data, f, indent=2) |
|
|
typer.echo(f"Results written to: {output}") |
|
|
else: |
|
|
typer.echo(f"\nAnswer: {result.answer}") |
|
|
typer.echo(f"\nConfidence: {result.confidence:.2f}") |
|
|
|
|
|
if result.abstained: |
|
|
typer.echo(f"Note: {result.abstain_reason}") |
|
|
|
|
|
if show_evidence and result.citations: |
|
|
typer.echo(f"\nSources ({len(result.citations)}):") |
|
|
for c in result.citations: |
|
|
page_info = f"Page {c.page + 1}" if c.page is not None else "" |
|
|
typer.echo(f" [{c.index}] {page_info}: {c.text_snippet[:80]}...") |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error generating answer: {e}", err=True) |
|
|
raise typer.Exit(1) |
|
|
|
|
|
|
|
|
@rag_app.command("status") |
|
|
def show_status( |
|
|
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
|
|
): |
|
|
""" |
|
|
Show RAG index status. |
|
|
|
|
|
Example: |
|
|
sparknet rag status |
|
|
sparknet rag status --collection contracts |
|
|
""" |
|
|
typer.echo("RAG Index Status") |
|
|
typer.echo("=" * 40) |
|
|
|
|
|
try: |
|
|
from ..rag import get_vector_store, VectorStoreConfig |
|
|
|
|
|
config = VectorStoreConfig(collection_name=collection) |
|
|
store = get_vector_store(config) |
|
|
|
|
|
|
|
|
total_chunks = store.count() |
|
|
|
|
|
typer.echo(f"Collection: {collection}") |
|
|
typer.echo(f"Total chunks: {total_chunks}") |
|
|
|
|
|
|
|
|
if hasattr(store, 'list_documents'): |
|
|
doc_ids = store.list_documents() |
|
|
typer.echo(f"Documents indexed: {len(doc_ids)}") |
|
|
|
|
|
if doc_ids: |
|
|
typer.echo("\nDocuments:") |
|
|
for doc_id in doc_ids[:10]: |
|
|
chunk_count = store.count(doc_id) |
|
|
typer.echo(f" - {doc_id}: {chunk_count} chunks") |
|
|
|
|
|
if len(doc_ids) > 10: |
|
|
typer.echo(f" ... and {len(doc_ids) - 10} more") |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error getting status: {e}", err=True) |
|
|
raise typer.Exit(1) |
|
|
|
|
|
|
|
|
@rag_app.command("delete") |
|
|
def delete_document( |
|
|
document_id: str = typer.Argument(..., help="Document ID to delete"), |
|
|
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
|
|
force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"), |
|
|
): |
|
|
""" |
|
|
Delete a document from the index. |
|
|
|
|
|
Example: |
|
|
sparknet rag delete doc123 |
|
|
sparknet rag delete doc123 --force |
|
|
""" |
|
|
if not force: |
|
|
confirm = typer.confirm(f"Delete document '{document_id}' from index?") |
|
|
if not confirm: |
|
|
typer.echo("Cancelled.") |
|
|
return |
|
|
|
|
|
try: |
|
|
from ..rag import get_vector_store, VectorStoreConfig |
|
|
|
|
|
config = VectorStoreConfig(collection_name=collection) |
|
|
store = get_vector_store(config) |
|
|
|
|
|
deleted = store.delete_document(document_id) |
|
|
typer.echo(f"Deleted {deleted} chunks for document: {document_id}") |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error deleting document: {e}", err=True) |
|
|
raise typer.Exit(1) |
|
|
|