SPARKNET / src /cli /rag.py
MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
RAG CLI Commands
Commands:
sparknet rag index <file> - Index document for retrieval
sparknet rag search <query> - Search indexed documents
sparknet rag ask <question> - Answer question using RAG
sparknet rag status - Show index status
"""
import typer
from typing import Optional, List
from pathlib import Path
import json
import sys
# Create RAG sub-app
rag_app = typer.Typer(
name="rag",
help="RAG and retrieval commands",
)
@rag_app.command("index")
def index_document(
files: List[Path] = typer.Argument(..., help="Document file(s) to index"),
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"),
embedding_model: str = typer.Option("nomic-embed-text", "--model", "-m", help="Embedding model"),
):
"""
Index document(s) for RAG retrieval.
Example:
sparknet rag index document.pdf
sparknet rag index *.pdf --collection contracts
"""
from loguru import logger
# Validate files
valid_files = []
for f in files:
if f.exists():
valid_files.append(f)
else:
typer.echo(f"Warning: File not found, skipping: {f}", err=True)
if not valid_files:
typer.echo("Error: No valid files to index", err=True)
raise typer.Exit(1)
typer.echo(f"Indexing {len(valid_files)} document(s)...")
try:
from ..rag import (
VectorStoreConfig,
EmbeddingConfig,
get_document_indexer,
)
# Configure
store_config = VectorStoreConfig(collection_name=collection)
embed_config = EmbeddingConfig(ollama_model=embedding_model)
# Get indexer
indexer = get_document_indexer()
# Index documents
results = indexer.index_batch([str(f) for f in valid_files])
# Summary
successful = sum(1 for r in results if r.success)
total_chunks = sum(r.num_chunks_indexed for r in results)
typer.echo(f"\nIndexing complete:")
typer.echo(f" Documents: {successful}/{len(results)} successful")
typer.echo(f" Chunks indexed: {total_chunks}")
for r in results:
status = "✓" if r.success else "✗"
typer.echo(f" [{status}] {r.source_path}: {r.num_chunks_indexed} chunks")
if r.error:
typer.echo(f" Error: {r.error}")
except ImportError as e:
typer.echo(f"Error: Missing dependency - {e}", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Error indexing documents: {e}", err=True)
raise typer.Exit(1)
@rag_app.command("search")
def search_documents(
query: str = typer.Argument(..., help="Search query"),
top_k: int = typer.Option(5, "--top", "-k", help="Number of results"),
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"),
document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"),
chunk_type: Optional[str] = typer.Option(None, "--type", "-t", help="Filter by chunk type"),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"),
):
"""
Search indexed documents.
Example:
sparknet rag search "payment terms" --top 10
sparknet rag search "table data" --type table
"""
typer.echo(f"Searching: {query}")
try:
from ..rag import get_document_retriever, RetrieverConfig
# Configure
config = RetrieverConfig(default_top_k=top_k)
retriever = get_document_retriever(config)
# Build filters
filters = {}
if document_id:
filters["document_id"] = document_id
if chunk_type:
filters["chunk_type"] = chunk_type
# Search
chunks = retriever.retrieve(query, top_k=top_k, filters=filters if filters else None)
if not chunks:
typer.echo("No results found.")
return
# Format output
output_data = {
"query": query,
"num_results": len(chunks),
"results": [
{
"chunk_id": c.chunk_id,
"document_id": c.document_id,
"page": c.page,
"chunk_type": c.chunk_type,
"similarity": c.similarity,
"text": c.text[:500] + "..." if len(c.text) > 500 else c.text,
}
for c in chunks
],
}
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
typer.echo(f"Results written to: {output}")
else:
typer.echo(f"\nFound {len(chunks)} results:\n")
for i, c in enumerate(chunks, 1):
typer.echo(f"[{i}] Similarity: {c.similarity:.3f}")
if c.page is not None:
typer.echo(f" Page: {c.page + 1}, Type: {c.chunk_type or 'text'}")
typer.echo(f" {c.text[:200]}...")
typer.echo()
except Exception as e:
typer.echo(f"Error searching: {e}", err=True)
raise typer.Exit(1)
@rag_app.command("ask")
def ask_question(
question: str = typer.Argument(..., help="Question to answer"),
top_k: int = typer.Option(5, "--top", "-k", help="Number of context chunks"),
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"),
document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"),
show_evidence: bool = typer.Option(True, "--evidence/--no-evidence", help="Show evidence sources"),
):
"""
Answer a question using RAG.
Example:
sparknet rag ask "What are the payment terms?"
sparknet rag ask "What is the contract value?" --document contract123
"""
typer.echo(f"Question: {question}")
typer.echo("Processing...")
try:
from ..rag import get_grounded_generator, GeneratorConfig
# Configure
config = GeneratorConfig()
generator = get_grounded_generator(config)
# Build filters
filters = {"document_id": document_id} if document_id else None
# Generate answer
result = generator.answer_question(question, top_k=top_k, filters=filters)
# Format output
output_data = {
"question": question,
"answer": result.answer,
"confidence": result.confidence,
"abstained": result.abstained,
"abstain_reason": result.abstain_reason,
"citations": [
{
"index": c.index,
"page": c.page,
"snippet": c.text_snippet,
"confidence": c.confidence,
}
for c in result.citations
],
"num_chunks_used": result.num_chunks_used,
}
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
typer.echo(f"Results written to: {output}")
else:
typer.echo(f"\nAnswer: {result.answer}")
typer.echo(f"\nConfidence: {result.confidence:.2f}")
if result.abstained:
typer.echo(f"Note: {result.abstain_reason}")
if show_evidence and result.citations:
typer.echo(f"\nSources ({len(result.citations)}):")
for c in result.citations:
page_info = f"Page {c.page + 1}" if c.page is not None else ""
typer.echo(f" [{c.index}] {page_info}: {c.text_snippet[:80]}...")
except Exception as e:
typer.echo(f"Error generating answer: {e}", err=True)
raise typer.Exit(1)
@rag_app.command("status")
def show_status(
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"),
):
"""
Show RAG index status.
Example:
sparknet rag status
sparknet rag status --collection contracts
"""
typer.echo("RAG Index Status")
typer.echo("=" * 40)
try:
from ..rag import get_vector_store, VectorStoreConfig
config = VectorStoreConfig(collection_name=collection)
store = get_vector_store(config)
# Get stats
total_chunks = store.count()
typer.echo(f"Collection: {collection}")
typer.echo(f"Total chunks: {total_chunks}")
# List documents
if hasattr(store, 'list_documents'):
doc_ids = store.list_documents()
typer.echo(f"Documents indexed: {len(doc_ids)}")
if doc_ids:
typer.echo("\nDocuments:")
for doc_id in doc_ids[:10]:
chunk_count = store.count(doc_id)
typer.echo(f" - {doc_id}: {chunk_count} chunks")
if len(doc_ids) > 10:
typer.echo(f" ... and {len(doc_ids) - 10} more")
except Exception as e:
typer.echo(f"Error getting status: {e}", err=True)
raise typer.Exit(1)
@rag_app.command("delete")
def delete_document(
document_id: str = typer.Argument(..., help="Document ID to delete"),
collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"),
force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"),
):
"""
Delete a document from the index.
Example:
sparknet rag delete doc123
sparknet rag delete doc123 --force
"""
if not force:
confirm = typer.confirm(f"Delete document '{document_id}' from index?")
if not confirm:
typer.echo("Cancelled.")
return
try:
from ..rag import get_vector_store, VectorStoreConfig
config = VectorStoreConfig(collection_name=collection)
store = get_vector_store(config)
deleted = store.delete_document(document_id)
typer.echo(f"Deleted {deleted} chunks for document: {document_id}")
except Exception as e:
typer.echo(f"Error deleting document: {e}", err=True)
raise typer.Exit(1)