""" Document Intelligence Bridge for RAG Bridges the document_intelligence subsystem with the RAG indexer/retriever. Converts ParseResult to a format compatible with DocumentIndexer. """ from typing import List, Optional, Dict, Any from pathlib import Path from pydantic import BaseModel from loguru import logger from .store import VectorStore, get_vector_store from .embeddings import EmbeddingAdapter, get_embedding_adapter from .indexer import IndexingResult, IndexerConfig # Try to import document_intelligence types try: from ..document_intelligence.chunks import ( ParseResult, DocumentChunk, BoundingBox, EvidenceRef, ChunkType, ) DOCINT_AVAILABLE = True except ImportError: DOCINT_AVAILABLE = False logger.warning("document_intelligence module not available") class DocIntIndexer: """ Indexes ParseResult from document_intelligence into the vector store. This bridges the new document_intelligence subsystem with the existing RAG infrastructure. """ def __init__( self, config: Optional[IndexerConfig] = None, vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, ): self.config = config or IndexerConfig() self._store = vector_store self._embedder = embedding_adapter @property def store(self) -> VectorStore: if self._store is None: self._store = get_vector_store() return self._store @property def embedder(self) -> EmbeddingAdapter: if self._embedder is None: self._embedder = get_embedding_adapter() return self._embedder def index_parse_result( self, parse_result: "ParseResult", source_path: Optional[str] = None, ) -> IndexingResult: """ Index a ParseResult from document_intelligence. Args: parse_result: ParseResult from DocumentParser source_path: Optional override for source path Returns: IndexingResult with indexing stats """ if not DOCINT_AVAILABLE: return IndexingResult( document_id="unknown", source_path="unknown", num_chunks_indexed=0, num_chunks_skipped=0, success=False, error="document_intelligence module not available", ) document_id = parse_result.doc_id source = source_path or parse_result.filename try: chunks_to_index = [] skipped = 0 for chunk in parse_result.chunks: # Skip empty or short chunks if self.config.skip_empty_chunks: if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: skipped += 1 continue chunk_data = { "chunk_id": chunk.chunk_id, "document_id": document_id, "source_path": source, "text": chunk.text, "sequence_index": chunk.sequence_index, "confidence": chunk.confidence, } if self.config.include_page: chunk_data["page"] = chunk.page if self.config.include_chunk_type: chunk_data["chunk_type"] = chunk.chunk_type.value if self.config.include_bbox and chunk.bbox: chunk_data["bbox"] = { "x_min": chunk.bbox.x_min, "y_min": chunk.bbox.y_min, "x_max": chunk.bbox.x_max, "y_max": chunk.bbox.y_max, } chunks_to_index.append(chunk_data) if not chunks_to_index: return IndexingResult( document_id=document_id, source_path=source, num_chunks_indexed=0, num_chunks_skipped=skipped, success=True, ) # Generate embeddings in batches logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") texts = [c["text"] for c in chunks_to_index] embeddings = [] batch_size = self.config.batch_size for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] batch_embeddings = self.embedder.embed_batch(batch) embeddings.extend(batch_embeddings) # Store in vector database logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") self.store.add_chunks(chunks_to_index, embeddings) logger.info( f"Indexed document {document_id}: " f"{len(chunks_to_index)} chunks, {skipped} skipped" ) return IndexingResult( document_id=document_id, source_path=source, num_chunks_indexed=len(chunks_to_index), num_chunks_skipped=skipped, success=True, ) except Exception as e: logger.error(f"Failed to index parse result: {e}") return IndexingResult( document_id=document_id, source_path=source, num_chunks_indexed=0, num_chunks_skipped=0, success=False, error=str(e), ) def index_document( self, path: str, max_pages: Optional[int] = None, ) -> IndexingResult: """ Parse and index a document in one step. Args: path: Path to document file max_pages: Optional limit on pages to process Returns: IndexingResult """ if not DOCINT_AVAILABLE: return IndexingResult( document_id=str(path), source_path=str(path), num_chunks_indexed=0, num_chunks_skipped=0, success=False, error="document_intelligence module not available", ) try: from ..document_intelligence import DocumentParser, ParserConfig config = ParserConfig(max_pages=max_pages) parser = DocumentParser(config=config) logger.info(f"Parsing document: {path}") parse_result = parser.parse(path) return self.index_parse_result(parse_result, source_path=str(path)) except Exception as e: logger.error(f"Failed to parse and index document: {e}") return IndexingResult( document_id=str(path), source_path=str(path), num_chunks_indexed=0, num_chunks_skipped=0, success=False, error=str(e), ) def delete_document(self, document_id: str) -> int: """Remove a document from the index.""" return self.store.delete_document(document_id) def get_stats(self) -> Dict[str, Any]: """Get indexing statistics.""" total_chunks = self.store.count() return { "total_chunks": total_chunks, "embedding_model": self.embedder.model_name, "embedding_dimension": self.embedder.embedding_dimension, } class DocIntRetriever: """ Retriever with document_intelligence EvidenceRef support. Wraps DocumentRetriever with conversions to document_intelligence types. """ def __init__( self, vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, similarity_threshold: float = 0.5, ): self._store = vector_store self._embedder = embedding_adapter self.similarity_threshold = similarity_threshold @property def store(self) -> VectorStore: if self._store is None: self._store = get_vector_store() return self._store @property def embedder(self) -> EmbeddingAdapter: if self._embedder is None: self._embedder = get_embedding_adapter() return self._embedder def retrieve( self, query: str, top_k: int = 5, document_id: Optional[str] = None, chunk_types: Optional[List[str]] = None, page_range: Optional[tuple] = None, ) -> List[Dict[str, Any]]: """ Retrieve relevant chunks. Args: query: Search query top_k: Number of results document_id: Filter by document chunk_types: Filter by chunk type(s) page_range: Filter by page range (start, end) Returns: List of chunk dicts with metadata """ # Build filters filters = {} if document_id: filters["document_id"] = document_id if chunk_types: filters["chunk_type"] = chunk_types if page_range: filters["page"] = {"min": page_range[0], "max": page_range[1]} # Embed query query_embedding = self.embedder.embed_text(query) # Search results = self.store.search( query_embedding=query_embedding, top_k=top_k, filters=filters if filters else None, ) # Convert to dicts chunks = [] for result in results: if result.similarity < self.similarity_threshold: continue chunk = { "chunk_id": result.chunk_id, "document_id": result.document_id, "text": result.text, "similarity": result.similarity, "page": result.page, "chunk_type": result.chunk_type, "bbox": result.bbox, "source_path": result.metadata.get("source_path"), "confidence": result.metadata.get("confidence"), } chunks.append(chunk) return chunks def retrieve_with_evidence( self, query: str, top_k: int = 5, document_id: Optional[str] = None, chunk_types: Optional[List[str]] = None, page_range: Optional[tuple] = None, ) -> tuple: """ Retrieve chunks with EvidenceRef objects. Returns: Tuple of (chunks, evidence_refs) """ chunks = self.retrieve( query, top_k, document_id, chunk_types, page_range ) evidence_refs = [] if DOCINT_AVAILABLE: for chunk in chunks: bbox = None if chunk.get("bbox"): bbox_data = chunk["bbox"] bbox = BoundingBox( x_min=bbox_data.get("x_min", 0), y_min=bbox_data.get("y_min", 0), x_max=bbox_data.get("x_max", 1), y_max=bbox_data.get("y_max", 1), normalized=True, ) else: bbox = BoundingBox(x_min=0, y_min=0, x_max=1, y_max=1) evidence = EvidenceRef( chunk_id=chunk["chunk_id"], doc_id=chunk["document_id"], page=chunk.get("page", 1), bbox=bbox, source_type=chunk.get("chunk_type", "text"), snippet=chunk["text"][:200], confidence=chunk.get("confidence", chunk["similarity"]), ) evidence_refs.append(evidence) return chunks, evidence_refs def build_context( self, chunks: List[Dict[str, Any]], max_length: int = 8000, ) -> str: """Build context string from retrieved chunks.""" if not chunks: return "" parts = [] for i, chunk in enumerate(chunks, 1): header = f"[{i}]" if chunk.get("page"): header += f" Page {chunk['page']}" if chunk.get("chunk_type"): header += f" ({chunk['chunk_type']})" header += f" [sim={chunk['similarity']:.2f}]" parts.append(header) parts.append(chunk["text"]) parts.append("") context = "\n".join(parts) if len(context) > max_length: context = context[:max_length] + "\n...[truncated]" return context # Singleton instances _docint_indexer: Optional[DocIntIndexer] = None _docint_retriever: Optional[DocIntRetriever] = None def get_docint_indexer( config: Optional[IndexerConfig] = None, vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, ) -> DocIntIndexer: """Get or create singleton DocIntIndexer.""" global _docint_indexer if _docint_indexer is None: _docint_indexer = DocIntIndexer( config=config, vector_store=vector_store, embedding_adapter=embedding_adapter, ) return _docint_indexer def get_docint_retriever( vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, similarity_threshold: float = 0.5, ) -> DocIntRetriever: """Get or create singleton DocIntRetriever.""" global _docint_retriever if _docint_retriever is None: _docint_retriever = DocIntRetriever( vector_store=vector_store, embedding_adapter=embedding_adapter, similarity_threshold=similarity_threshold, ) return _docint_retriever def reset_docint_components(): """Reset singleton instances.""" global _docint_indexer, _docint_retriever _docint_indexer = None _docint_retriever = None