""" RAG CLI Commands Commands: sparknet rag index - Index document for retrieval sparknet rag search - Search indexed documents sparknet rag ask - Answer question using RAG sparknet rag status - Show index status """ import typer from typing import Optional, List from pathlib import Path import json import sys # Create RAG sub-app rag_app = typer.Typer( name="rag", help="RAG and retrieval commands", ) @rag_app.command("index") def index_document( files: List[Path] = typer.Argument(..., help="Document file(s) to index"), collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), embedding_model: str = typer.Option("nomic-embed-text", "--model", "-m", help="Embedding model"), ): """ Index document(s) for RAG retrieval. Example: sparknet rag index document.pdf sparknet rag index *.pdf --collection contracts """ from loguru import logger # Validate files valid_files = [] for f in files: if f.exists(): valid_files.append(f) else: typer.echo(f"Warning: File not found, skipping: {f}", err=True) if not valid_files: typer.echo("Error: No valid files to index", err=True) raise typer.Exit(1) typer.echo(f"Indexing {len(valid_files)} document(s)...") try: from ..rag import ( VectorStoreConfig, EmbeddingConfig, get_document_indexer, ) # Configure store_config = VectorStoreConfig(collection_name=collection) embed_config = EmbeddingConfig(ollama_model=embedding_model) # Get indexer indexer = get_document_indexer() # Index documents results = indexer.index_batch([str(f) for f in valid_files]) # Summary successful = sum(1 for r in results if r.success) total_chunks = sum(r.num_chunks_indexed for r in results) typer.echo(f"\nIndexing complete:") typer.echo(f" Documents: {successful}/{len(results)} successful") typer.echo(f" Chunks indexed: {total_chunks}") for r in results: status = "✓" if r.success else "✗" typer.echo(f" [{status}] {r.source_path}: {r.num_chunks_indexed} chunks") if r.error: typer.echo(f" Error: {r.error}") except ImportError as e: typer.echo(f"Error: Missing dependency - {e}", err=True) raise typer.Exit(1) except Exception as e: typer.echo(f"Error indexing documents: {e}", err=True) raise typer.Exit(1) @rag_app.command("search") def search_documents( query: str = typer.Argument(..., help="Search query"), top_k: int = typer.Option(5, "--top", "-k", help="Number of results"), collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), chunk_type: Optional[str] = typer.Option(None, "--type", "-t", help="Filter by chunk type"), output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), ): """ Search indexed documents. Example: sparknet rag search "payment terms" --top 10 sparknet rag search "table data" --type table """ typer.echo(f"Searching: {query}") try: from ..rag import get_document_retriever, RetrieverConfig # Configure config = RetrieverConfig(default_top_k=top_k) retriever = get_document_retriever(config) # Build filters filters = {} if document_id: filters["document_id"] = document_id if chunk_type: filters["chunk_type"] = chunk_type # Search chunks = retriever.retrieve(query, top_k=top_k, filters=filters if filters else None) if not chunks: typer.echo("No results found.") return # Format output output_data = { "query": query, "num_results": len(chunks), "results": [ { "chunk_id": c.chunk_id, "document_id": c.document_id, "page": c.page, "chunk_type": c.chunk_type, "similarity": c.similarity, "text": c.text[:500] + "..." if len(c.text) > 500 else c.text, } for c in chunks ], } if output: with open(output, "w") as f: json.dump(output_data, f, indent=2) typer.echo(f"Results written to: {output}") else: typer.echo(f"\nFound {len(chunks)} results:\n") for i, c in enumerate(chunks, 1): typer.echo(f"[{i}] Similarity: {c.similarity:.3f}") if c.page is not None: typer.echo(f" Page: {c.page + 1}, Type: {c.chunk_type or 'text'}") typer.echo(f" {c.text[:200]}...") typer.echo() except Exception as e: typer.echo(f"Error searching: {e}", err=True) raise typer.Exit(1) @rag_app.command("ask") def ask_question( question: str = typer.Argument(..., help="Question to answer"), top_k: int = typer.Option(5, "--top", "-k", help="Number of context chunks"), collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), show_evidence: bool = typer.Option(True, "--evidence/--no-evidence", help="Show evidence sources"), ): """ Answer a question using RAG. Example: sparknet rag ask "What are the payment terms?" sparknet rag ask "What is the contract value?" --document contract123 """ typer.echo(f"Question: {question}") typer.echo("Processing...") try: from ..rag import get_grounded_generator, GeneratorConfig # Configure config = GeneratorConfig() generator = get_grounded_generator(config) # Build filters filters = {"document_id": document_id} if document_id else None # Generate answer result = generator.answer_question(question, top_k=top_k, filters=filters) # Format output output_data = { "question": question, "answer": result.answer, "confidence": result.confidence, "abstained": result.abstained, "abstain_reason": result.abstain_reason, "citations": [ { "index": c.index, "page": c.page, "snippet": c.text_snippet, "confidence": c.confidence, } for c in result.citations ], "num_chunks_used": result.num_chunks_used, } if output: with open(output, "w") as f: json.dump(output_data, f, indent=2) typer.echo(f"Results written to: {output}") else: typer.echo(f"\nAnswer: {result.answer}") typer.echo(f"\nConfidence: {result.confidence:.2f}") if result.abstained: typer.echo(f"Note: {result.abstain_reason}") if show_evidence and result.citations: typer.echo(f"\nSources ({len(result.citations)}):") for c in result.citations: page_info = f"Page {c.page + 1}" if c.page is not None else "" typer.echo(f" [{c.index}] {page_info}: {c.text_snippet[:80]}...") except Exception as e: typer.echo(f"Error generating answer: {e}", err=True) raise typer.Exit(1) @rag_app.command("status") def show_status( collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), ): """ Show RAG index status. Example: sparknet rag status sparknet rag status --collection contracts """ typer.echo("RAG Index Status") typer.echo("=" * 40) try: from ..rag import get_vector_store, VectorStoreConfig config = VectorStoreConfig(collection_name=collection) store = get_vector_store(config) # Get stats total_chunks = store.count() typer.echo(f"Collection: {collection}") typer.echo(f"Total chunks: {total_chunks}") # List documents if hasattr(store, 'list_documents'): doc_ids = store.list_documents() typer.echo(f"Documents indexed: {len(doc_ids)}") if doc_ids: typer.echo("\nDocuments:") for doc_id in doc_ids[:10]: chunk_count = store.count(doc_id) typer.echo(f" - {doc_id}: {chunk_count} chunks") if len(doc_ids) > 10: typer.echo(f" ... and {len(doc_ids) - 10} more") except Exception as e: typer.echo(f"Error getting status: {e}", err=True) raise typer.Exit(1) @rag_app.command("delete") def delete_document( document_id: str = typer.Argument(..., help="Document ID to delete"), collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"), ): """ Delete a document from the index. Example: sparknet rag delete doc123 sparknet rag delete doc123 --force """ if not force: confirm = typer.confirm(f"Delete document '{document_id}' from index?") if not confirm: typer.echo("Cancelled.") return try: from ..rag import get_vector_store, VectorStoreConfig config = VectorStoreConfig(collection_name=collection) store = get_vector_store(config) deleted = store.delete_document(document_id) typer.echo(f"Deleted {deleted} chunks for document: {document_id}") except Exception as e: typer.echo(f"Error deleting document: {e}", err=True) raise typer.Exit(1)