|
|
""" |
|
|
RAG Tools for Document Intelligence |
|
|
|
|
|
Provides RAG-powered tools for: |
|
|
- IndexDocumentTool: Index documents into vector store |
|
|
- RetrieveChunksTool: Semantic retrieval with filters |
|
|
- RAGAnswerTool: Answer questions using RAG |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
from .document_tools import DocumentTool, ToolResult |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
try: |
|
|
from ...rag import ( |
|
|
get_docint_indexer, |
|
|
get_docint_retriever, |
|
|
get_grounded_generator, |
|
|
GeneratorConfig, |
|
|
) |
|
|
from ...rag.indexer import IndexerConfig |
|
|
RAG_AVAILABLE = True |
|
|
except ImportError: |
|
|
RAG_AVAILABLE = False |
|
|
logger.warning("RAG module not available") |
|
|
|
|
|
|
|
|
class IndexDocumentTool(DocumentTool): |
|
|
""" |
|
|
Index a document into the vector store for RAG. |
|
|
|
|
|
Input: |
|
|
parse_result: Previously parsed document (ParseResult) |
|
|
OR |
|
|
path: Path to document file (will parse first) |
|
|
max_pages: Optional maximum pages to process |
|
|
|
|
|
Output: |
|
|
IndexingResult with stats |
|
|
""" |
|
|
|
|
|
name = "index_document" |
|
|
description = "Index a document into the vector store for semantic retrieval" |
|
|
|
|
|
def __init__(self, indexer_config: Optional[Any] = None): |
|
|
self.indexer_config = indexer_config |
|
|
|
|
|
def execute( |
|
|
self, |
|
|
parse_result: Optional[Any] = None, |
|
|
path: Optional[str] = None, |
|
|
max_pages: Optional[int] = None, |
|
|
**kwargs |
|
|
) -> ToolResult: |
|
|
if not RAG_AVAILABLE: |
|
|
return ToolResult( |
|
|
success=False, |
|
|
error="RAG module not available. Install chromadb: pip install chromadb" |
|
|
) |
|
|
|
|
|
try: |
|
|
indexer = get_docint_indexer(config=self.indexer_config) |
|
|
|
|
|
if parse_result is not None: |
|
|
|
|
|
result = indexer.index_parse_result(parse_result) |
|
|
elif path is not None: |
|
|
|
|
|
result = indexer.index_document(path, max_pages=max_pages) |
|
|
else: |
|
|
return ToolResult( |
|
|
success=False, |
|
|
error="Either parse_result or path must be provided" |
|
|
) |
|
|
|
|
|
return ToolResult( |
|
|
success=result.success, |
|
|
data={ |
|
|
"document_id": result.document_id, |
|
|
"source_path": result.source_path, |
|
|
"chunks_indexed": result.num_chunks_indexed, |
|
|
"chunks_skipped": result.num_chunks_skipped, |
|
|
}, |
|
|
error=result.error, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Index document failed: {e}") |
|
|
return ToolResult(success=False, error=str(e)) |
|
|
|
|
|
|
|
|
class RetrieveChunksTool(DocumentTool): |
|
|
""" |
|
|
Retrieve relevant chunks using semantic search. |
|
|
|
|
|
Input: |
|
|
query: Search query |
|
|
top_k: Number of results (default: 5) |
|
|
document_id: Filter by document ID |
|
|
chunk_types: Filter by chunk type(s) (e.g., ["paragraph", "table"]) |
|
|
page_range: Filter by page range (start, end) |
|
|
|
|
|
Output: |
|
|
List of relevant chunks with similarity scores |
|
|
""" |
|
|
|
|
|
name = "retrieve_chunks" |
|
|
description = "Retrieve relevant document chunks using semantic search" |
|
|
|
|
|
def __init__(self, similarity_threshold: float = 0.5): |
|
|
self.similarity_threshold = similarity_threshold |
|
|
|
|
|
def execute( |
|
|
self, |
|
|
query: str, |
|
|
top_k: int = 5, |
|
|
document_id: Optional[str] = None, |
|
|
chunk_types: Optional[List[str]] = None, |
|
|
page_range: Optional[tuple] = None, |
|
|
include_evidence: bool = True, |
|
|
**kwargs |
|
|
) -> ToolResult: |
|
|
if not RAG_AVAILABLE: |
|
|
return ToolResult( |
|
|
success=False, |
|
|
error="RAG module not available. Install chromadb: pip install chromadb" |
|
|
) |
|
|
|
|
|
try: |
|
|
retriever = get_docint_retriever( |
|
|
similarity_threshold=self.similarity_threshold |
|
|
) |
|
|
|
|
|
if include_evidence: |
|
|
chunks, evidence_refs = retriever.retrieve_with_evidence( |
|
|
query=query, |
|
|
top_k=top_k, |
|
|
document_id=document_id, |
|
|
chunk_types=chunk_types, |
|
|
page_range=page_range, |
|
|
) |
|
|
|
|
|
evidence = [ |
|
|
{ |
|
|
"chunk_id": ev.chunk_id, |
|
|
"page": ev.page, |
|
|
"bbox": ev.bbox.xyxy if ev.bbox else None, |
|
|
"snippet": ev.snippet, |
|
|
"confidence": ev.confidence, |
|
|
} |
|
|
for ev in evidence_refs |
|
|
] |
|
|
else: |
|
|
chunks = retriever.retrieve( |
|
|
query=query, |
|
|
top_k=top_k, |
|
|
document_id=document_id, |
|
|
chunk_types=chunk_types, |
|
|
page_range=page_range, |
|
|
) |
|
|
evidence = [] |
|
|
|
|
|
return ToolResult( |
|
|
success=True, |
|
|
data={ |
|
|
"query": query, |
|
|
"num_results": len(chunks), |
|
|
"chunks": [ |
|
|
{ |
|
|
"chunk_id": c["chunk_id"], |
|
|
"document_id": c["document_id"], |
|
|
"text": c["text"][:500], |
|
|
"similarity": c["similarity"], |
|
|
"page": c.get("page"), |
|
|
"chunk_type": c.get("chunk_type"), |
|
|
} |
|
|
for c in chunks |
|
|
], |
|
|
}, |
|
|
evidence=evidence, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Retrieve chunks failed: {e}") |
|
|
return ToolResult(success=False, error=str(e)) |
|
|
|
|
|
|
|
|
class RAGAnswerTool(DocumentTool): |
|
|
""" |
|
|
Answer a question using RAG (Retrieval-Augmented Generation). |
|
|
|
|
|
Input: |
|
|
question: Question to answer |
|
|
document_id: Filter to specific document |
|
|
top_k: Number of chunks to retrieve (default: 5) |
|
|
chunk_types: Filter by chunk type(s) |
|
|
page_range: Filter by page range |
|
|
|
|
|
Output: |
|
|
Answer with citations and evidence |
|
|
""" |
|
|
|
|
|
name = "rag_answer" |
|
|
description = "Answer a question using RAG with grounded citations" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
llm_client: Optional[Any] = None, |
|
|
min_confidence: float = 0.5, |
|
|
abstain_threshold: float = 0.3, |
|
|
): |
|
|
self.llm_client = llm_client |
|
|
self.min_confidence = min_confidence |
|
|
self.abstain_threshold = abstain_threshold |
|
|
|
|
|
def execute( |
|
|
self, |
|
|
question: str, |
|
|
document_id: Optional[str] = None, |
|
|
top_k: int = 5, |
|
|
chunk_types: Optional[List[str]] = None, |
|
|
page_range: Optional[tuple] = None, |
|
|
**kwargs |
|
|
) -> ToolResult: |
|
|
if not RAG_AVAILABLE: |
|
|
return ToolResult( |
|
|
success=False, |
|
|
error="RAG module not available. Install chromadb: pip install chromadb" |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
retriever = get_docint_retriever() |
|
|
chunks, evidence_refs = retriever.retrieve_with_evidence( |
|
|
query=question, |
|
|
top_k=top_k, |
|
|
document_id=document_id, |
|
|
chunk_types=chunk_types, |
|
|
page_range=page_range, |
|
|
) |
|
|
|
|
|
if not chunks: |
|
|
return ToolResult( |
|
|
success=True, |
|
|
data={ |
|
|
"question": question, |
|
|
"answer": "I could not find relevant information to answer this question.", |
|
|
"confidence": 0.0, |
|
|
"abstained": True, |
|
|
"reason": "No relevant chunks found", |
|
|
}, |
|
|
) |
|
|
|
|
|
|
|
|
context = retriever.build_context(chunks) |
|
|
|
|
|
|
|
|
if self.llm_client is None: |
|
|
|
|
|
best_chunk = chunks[0] |
|
|
return ToolResult( |
|
|
success=True, |
|
|
data={ |
|
|
"question": question, |
|
|
"answer": f"Based on the document: {best_chunk['text'][:500]}", |
|
|
"confidence": best_chunk["similarity"], |
|
|
"abstained": False, |
|
|
"context_chunks": len(chunks), |
|
|
}, |
|
|
evidence=[ |
|
|
{ |
|
|
"chunk_id": ev.chunk_id, |
|
|
"page": ev.page, |
|
|
"bbox": ev.bbox.xyxy if ev.bbox else None, |
|
|
"snippet": ev.snippet, |
|
|
} |
|
|
for ev in evidence_refs |
|
|
], |
|
|
) |
|
|
|
|
|
|
|
|
generator_config = GeneratorConfig( |
|
|
min_confidence=self.min_confidence, |
|
|
abstain_on_low_confidence=True, |
|
|
abstain_threshold=self.abstain_threshold, |
|
|
) |
|
|
generator = get_grounded_generator( |
|
|
config=generator_config, |
|
|
llm_client=self.llm_client, |
|
|
) |
|
|
|
|
|
answer = generator.generate_answer( |
|
|
question=question, |
|
|
context=context, |
|
|
chunks=chunks, |
|
|
) |
|
|
|
|
|
return ToolResult( |
|
|
success=True, |
|
|
data={ |
|
|
"question": question, |
|
|
"answer": answer.text, |
|
|
"confidence": answer.confidence, |
|
|
"abstained": answer.abstained, |
|
|
"citations": [ |
|
|
{ |
|
|
"index": c.index, |
|
|
"chunk_id": c.chunk_id, |
|
|
"text": c.text, |
|
|
} |
|
|
for c in (answer.citations or []) |
|
|
], |
|
|
}, |
|
|
evidence=[ |
|
|
{ |
|
|
"chunk_id": ev.chunk_id, |
|
|
"page": ev.page, |
|
|
"bbox": ev.bbox.xyxy if ev.bbox else None, |
|
|
"snippet": ev.snippet, |
|
|
} |
|
|
for ev in evidence_refs |
|
|
], |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"RAG answer failed: {e}") |
|
|
return ToolResult(success=False, error=str(e)) |
|
|
|
|
|
|
|
|
class DeleteDocumentTool(DocumentTool): |
|
|
""" |
|
|
Delete a document from the vector store index. |
|
|
|
|
|
Input: |
|
|
document_id: ID of document to delete |
|
|
|
|
|
Output: |
|
|
Number of chunks deleted |
|
|
""" |
|
|
|
|
|
name = "delete_document" |
|
|
description = "Remove a document from the vector store index" |
|
|
|
|
|
def execute(self, document_id: str, **kwargs) -> ToolResult: |
|
|
if not RAG_AVAILABLE: |
|
|
return ToolResult( |
|
|
success=False, |
|
|
error="RAG module not available" |
|
|
) |
|
|
|
|
|
try: |
|
|
indexer = get_docint_indexer() |
|
|
deleted_count = indexer.delete_document(document_id) |
|
|
|
|
|
return ToolResult( |
|
|
success=True, |
|
|
data={ |
|
|
"document_id": document_id, |
|
|
"chunks_deleted": deleted_count, |
|
|
}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Delete document failed: {e}") |
|
|
return ToolResult(success=False, error=str(e)) |
|
|
|
|
|
|
|
|
class GetIndexStatsTool(DocumentTool): |
|
|
""" |
|
|
Get statistics about the vector store index. |
|
|
|
|
|
Output: |
|
|
Index statistics (total chunks, embedding model, etc.) |
|
|
""" |
|
|
|
|
|
name = "get_index_stats" |
|
|
description = "Get statistics about the vector store index" |
|
|
|
|
|
def execute(self, **kwargs) -> ToolResult: |
|
|
if not RAG_AVAILABLE: |
|
|
return ToolResult( |
|
|
success=False, |
|
|
error="RAG module not available" |
|
|
) |
|
|
|
|
|
try: |
|
|
indexer = get_docint_indexer() |
|
|
stats = indexer.get_stats() |
|
|
|
|
|
return ToolResult( |
|
|
success=True, |
|
|
data=stats, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Get index stats failed: {e}") |
|
|
return ToolResult(success=False, error=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
RAG_TOOLS = { |
|
|
"index_document": IndexDocumentTool, |
|
|
"retrieve_chunks": RetrieveChunksTool, |
|
|
"rag_answer": RAGAnswerTool, |
|
|
"delete_document": DeleteDocumentTool, |
|
|
"get_index_stats": GetIndexStatsTool, |
|
|
} |
|
|
|
|
|
|
|
|
def get_rag_tool(name: str, **kwargs) -> DocumentTool: |
|
|
"""Get a RAG tool instance by name.""" |
|
|
if name not in RAG_TOOLS: |
|
|
raise ValueError(f"Unknown RAG tool: {name}") |
|
|
return RAG_TOOLS[name](**kwargs) |
|
|
|
|
|
|
|
|
def list_rag_tools() -> List[Dict[str, str]]: |
|
|
"""List all available RAG tools.""" |
|
|
return [ |
|
|
{"name": name, "description": cls.description} |
|
|
for name, cls in RAG_TOOLS.items() |
|
|
] |
|
|
|