SPARKNET / src /rag /docint_bridge.py
MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Document Intelligence Bridge for RAG
Bridges the document_intelligence subsystem with the RAG indexer/retriever.
Converts ParseResult to a format compatible with DocumentIndexer.
"""
from typing import List, Optional, Dict, Any
from pathlib import Path
from pydantic import BaseModel
from loguru import logger
from .store import VectorStore, get_vector_store
from .embeddings import EmbeddingAdapter, get_embedding_adapter
from .indexer import IndexingResult, IndexerConfig
# Try to import document_intelligence types
try:
from ..document_intelligence.chunks import (
ParseResult,
DocumentChunk,
BoundingBox,
EvidenceRef,
ChunkType,
)
DOCINT_AVAILABLE = True
except ImportError:
DOCINT_AVAILABLE = False
logger.warning("document_intelligence module not available")
class DocIntIndexer:
"""
Indexes ParseResult from document_intelligence into the vector store.
This bridges the new document_intelligence subsystem with the existing
RAG infrastructure.
"""
def __init__(
self,
config: Optional[IndexerConfig] = None,
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
):
self.config = config or IndexerConfig()
self._store = vector_store
self._embedder = embedding_adapter
@property
def store(self) -> VectorStore:
if self._store is None:
self._store = get_vector_store()
return self._store
@property
def embedder(self) -> EmbeddingAdapter:
if self._embedder is None:
self._embedder = get_embedding_adapter()
return self._embedder
def index_parse_result(
self,
parse_result: "ParseResult",
source_path: Optional[str] = None,
) -> IndexingResult:
"""
Index a ParseResult from document_intelligence.
Args:
parse_result: ParseResult from DocumentParser
source_path: Optional override for source path
Returns:
IndexingResult with indexing stats
"""
if not DOCINT_AVAILABLE:
return IndexingResult(
document_id="unknown",
source_path="unknown",
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error="document_intelligence module not available",
)
document_id = parse_result.doc_id
source = source_path or parse_result.filename
try:
chunks_to_index = []
skipped = 0
for chunk in parse_result.chunks:
# Skip empty or short chunks
if self.config.skip_empty_chunks:
if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length:
skipped += 1
continue
chunk_data = {
"chunk_id": chunk.chunk_id,
"document_id": document_id,
"source_path": source,
"text": chunk.text,
"sequence_index": chunk.sequence_index,
"confidence": chunk.confidence,
}
if self.config.include_page:
chunk_data["page"] = chunk.page
if self.config.include_chunk_type:
chunk_data["chunk_type"] = chunk.chunk_type.value
if self.config.include_bbox and chunk.bbox:
chunk_data["bbox"] = {
"x_min": chunk.bbox.x_min,
"y_min": chunk.bbox.y_min,
"x_max": chunk.bbox.x_max,
"y_max": chunk.bbox.y_max,
}
chunks_to_index.append(chunk_data)
if not chunks_to_index:
return IndexingResult(
document_id=document_id,
source_path=source,
num_chunks_indexed=0,
num_chunks_skipped=skipped,
success=True,
)
# Generate embeddings in batches
logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks")
texts = [c["text"] for c in chunks_to_index]
embeddings = []
batch_size = self.config.batch_size
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_embeddings = self.embedder.embed_batch(batch)
embeddings.extend(batch_embeddings)
# Store in vector database
logger.info(f"Storing {len(chunks_to_index)} chunks in vector store")
self.store.add_chunks(chunks_to_index, embeddings)
logger.info(
f"Indexed document {document_id}: "
f"{len(chunks_to_index)} chunks, {skipped} skipped"
)
return IndexingResult(
document_id=document_id,
source_path=source,
num_chunks_indexed=len(chunks_to_index),
num_chunks_skipped=skipped,
success=True,
)
except Exception as e:
logger.error(f"Failed to index parse result: {e}")
return IndexingResult(
document_id=document_id,
source_path=source,
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error=str(e),
)
def index_document(
self,
path: str,
max_pages: Optional[int] = None,
) -> IndexingResult:
"""
Parse and index a document in one step.
Args:
path: Path to document file
max_pages: Optional limit on pages to process
Returns:
IndexingResult
"""
if not DOCINT_AVAILABLE:
return IndexingResult(
document_id=str(path),
source_path=str(path),
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error="document_intelligence module not available",
)
try:
from ..document_intelligence import DocumentParser, ParserConfig
config = ParserConfig(max_pages=max_pages)
parser = DocumentParser(config=config)
logger.info(f"Parsing document: {path}")
parse_result = parser.parse(path)
return self.index_parse_result(parse_result, source_path=str(path))
except Exception as e:
logger.error(f"Failed to parse and index document: {e}")
return IndexingResult(
document_id=str(path),
source_path=str(path),
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error=str(e),
)
def delete_document(self, document_id: str) -> int:
"""Remove a document from the index."""
return self.store.delete_document(document_id)
def get_stats(self) -> Dict[str, Any]:
"""Get indexing statistics."""
total_chunks = self.store.count()
return {
"total_chunks": total_chunks,
"embedding_model": self.embedder.model_name,
"embedding_dimension": self.embedder.embedding_dimension,
}
class DocIntRetriever:
"""
Retriever with document_intelligence EvidenceRef support.
Wraps DocumentRetriever with conversions to document_intelligence types.
"""
def __init__(
self,
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
similarity_threshold: float = 0.5,
):
self._store = vector_store
self._embedder = embedding_adapter
self.similarity_threshold = similarity_threshold
@property
def store(self) -> VectorStore:
if self._store is None:
self._store = get_vector_store()
return self._store
@property
def embedder(self) -> EmbeddingAdapter:
if self._embedder is None:
self._embedder = get_embedding_adapter()
return self._embedder
def retrieve(
self,
query: str,
top_k: int = 5,
document_id: Optional[str] = None,
chunk_types: Optional[List[str]] = None,
page_range: Optional[tuple] = None,
) -> List[Dict[str, Any]]:
"""
Retrieve relevant chunks.
Args:
query: Search query
top_k: Number of results
document_id: Filter by document
chunk_types: Filter by chunk type(s)
page_range: Filter by page range (start, end)
Returns:
List of chunk dicts with metadata
"""
# Build filters
filters = {}
if document_id:
filters["document_id"] = document_id
if chunk_types:
filters["chunk_type"] = chunk_types
if page_range:
filters["page"] = {"min": page_range[0], "max": page_range[1]}
# Embed query
query_embedding = self.embedder.embed_text(query)
# Search
results = self.store.search(
query_embedding=query_embedding,
top_k=top_k,
filters=filters if filters else None,
)
# Convert to dicts
chunks = []
for result in results:
if result.similarity < self.similarity_threshold:
continue
chunk = {
"chunk_id": result.chunk_id,
"document_id": result.document_id,
"text": result.text,
"similarity": result.similarity,
"page": result.page,
"chunk_type": result.chunk_type,
"bbox": result.bbox,
"source_path": result.metadata.get("source_path"),
"confidence": result.metadata.get("confidence"),
}
chunks.append(chunk)
return chunks
def retrieve_with_evidence(
self,
query: str,
top_k: int = 5,
document_id: Optional[str] = None,
chunk_types: Optional[List[str]] = None,
page_range: Optional[tuple] = None,
) -> tuple:
"""
Retrieve chunks with EvidenceRef objects.
Returns:
Tuple of (chunks, evidence_refs)
"""
chunks = self.retrieve(
query, top_k, document_id, chunk_types, page_range
)
evidence_refs = []
if DOCINT_AVAILABLE:
for chunk in chunks:
bbox = None
if chunk.get("bbox"):
bbox_data = chunk["bbox"]
bbox = BoundingBox(
x_min=bbox_data.get("x_min", 0),
y_min=bbox_data.get("y_min", 0),
x_max=bbox_data.get("x_max", 1),
y_max=bbox_data.get("y_max", 1),
normalized=True,
)
else:
bbox = BoundingBox(x_min=0, y_min=0, x_max=1, y_max=1)
evidence = EvidenceRef(
chunk_id=chunk["chunk_id"],
doc_id=chunk["document_id"],
page=chunk.get("page", 1),
bbox=bbox,
source_type=chunk.get("chunk_type", "text"),
snippet=chunk["text"][:200],
confidence=chunk.get("confidence", chunk["similarity"]),
)
evidence_refs.append(evidence)
return chunks, evidence_refs
def build_context(
self,
chunks: List[Dict[str, Any]],
max_length: int = 8000,
) -> str:
"""Build context string from retrieved chunks."""
if not chunks:
return ""
parts = []
for i, chunk in enumerate(chunks, 1):
header = f"[{i}]"
if chunk.get("page"):
header += f" Page {chunk['page']}"
if chunk.get("chunk_type"):
header += f" ({chunk['chunk_type']})"
header += f" [sim={chunk['similarity']:.2f}]"
parts.append(header)
parts.append(chunk["text"])
parts.append("")
context = "\n".join(parts)
if len(context) > max_length:
context = context[:max_length] + "\n...[truncated]"
return context
# Singleton instances
_docint_indexer: Optional[DocIntIndexer] = None
_docint_retriever: Optional[DocIntRetriever] = None
def get_docint_indexer(
config: Optional[IndexerConfig] = None,
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
) -> DocIntIndexer:
"""Get or create singleton DocIntIndexer."""
global _docint_indexer
if _docint_indexer is None:
_docint_indexer = DocIntIndexer(
config=config,
vector_store=vector_store,
embedding_adapter=embedding_adapter,
)
return _docint_indexer
def get_docint_retriever(
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
similarity_threshold: float = 0.5,
) -> DocIntRetriever:
"""Get or create singleton DocIntRetriever."""
global _docint_retriever
if _docint_retriever is None:
_docint_retriever = DocIntRetriever(
vector_store=vector_store,
embedding_adapter=embedding_adapter,
similarity_threshold=similarity_threshold,
)
return _docint_retriever
def reset_docint_components():
"""Reset singleton instances."""
global _docint_indexer, _docint_retriever
_docint_indexer = None
_docint_retriever = None