|
|
""" |
|
|
Document Intelligence Bridge for RAG |
|
|
|
|
|
Bridges the document_intelligence subsystem with the RAG indexer/retriever. |
|
|
Converts ParseResult to a format compatible with DocumentIndexer. |
|
|
""" |
|
|
|
|
|
from typing import List, Optional, Dict, Any |
|
|
from pathlib import Path |
|
|
from pydantic import BaseModel |
|
|
from loguru import logger |
|
|
|
|
|
from .store import VectorStore, get_vector_store |
|
|
from .embeddings import EmbeddingAdapter, get_embedding_adapter |
|
|
from .indexer import IndexingResult, IndexerConfig |
|
|
|
|
|
|
|
|
try: |
|
|
from ..document_intelligence.chunks import ( |
|
|
ParseResult, |
|
|
DocumentChunk, |
|
|
BoundingBox, |
|
|
EvidenceRef, |
|
|
ChunkType, |
|
|
) |
|
|
DOCINT_AVAILABLE = True |
|
|
except ImportError: |
|
|
DOCINT_AVAILABLE = False |
|
|
logger.warning("document_intelligence module not available") |
|
|
|
|
|
|
|
|
class DocIntIndexer: |
|
|
""" |
|
|
Indexes ParseResult from document_intelligence into the vector store. |
|
|
|
|
|
This bridges the new document_intelligence subsystem with the existing |
|
|
RAG infrastructure. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: Optional[IndexerConfig] = None, |
|
|
vector_store: Optional[VectorStore] = None, |
|
|
embedding_adapter: Optional[EmbeddingAdapter] = None, |
|
|
): |
|
|
self.config = config or IndexerConfig() |
|
|
self._store = vector_store |
|
|
self._embedder = embedding_adapter |
|
|
|
|
|
@property |
|
|
def store(self) -> VectorStore: |
|
|
if self._store is None: |
|
|
self._store = get_vector_store() |
|
|
return self._store |
|
|
|
|
|
@property |
|
|
def embedder(self) -> EmbeddingAdapter: |
|
|
if self._embedder is None: |
|
|
self._embedder = get_embedding_adapter() |
|
|
return self._embedder |
|
|
|
|
|
def index_parse_result( |
|
|
self, |
|
|
parse_result: "ParseResult", |
|
|
source_path: Optional[str] = None, |
|
|
) -> IndexingResult: |
|
|
""" |
|
|
Index a ParseResult from document_intelligence. |
|
|
|
|
|
Args: |
|
|
parse_result: ParseResult from DocumentParser |
|
|
source_path: Optional override for source path |
|
|
|
|
|
Returns: |
|
|
IndexingResult with indexing stats |
|
|
""" |
|
|
if not DOCINT_AVAILABLE: |
|
|
return IndexingResult( |
|
|
document_id="unknown", |
|
|
source_path="unknown", |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=0, |
|
|
success=False, |
|
|
error="document_intelligence module not available", |
|
|
) |
|
|
|
|
|
document_id = parse_result.doc_id |
|
|
source = source_path or parse_result.filename |
|
|
|
|
|
try: |
|
|
chunks_to_index = [] |
|
|
skipped = 0 |
|
|
|
|
|
for chunk in parse_result.chunks: |
|
|
|
|
|
if self.config.skip_empty_chunks: |
|
|
if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: |
|
|
skipped += 1 |
|
|
continue |
|
|
|
|
|
chunk_data = { |
|
|
"chunk_id": chunk.chunk_id, |
|
|
"document_id": document_id, |
|
|
"source_path": source, |
|
|
"text": chunk.text, |
|
|
"sequence_index": chunk.sequence_index, |
|
|
"confidence": chunk.confidence, |
|
|
} |
|
|
|
|
|
if self.config.include_page: |
|
|
chunk_data["page"] = chunk.page |
|
|
|
|
|
if self.config.include_chunk_type: |
|
|
chunk_data["chunk_type"] = chunk.chunk_type.value |
|
|
|
|
|
if self.config.include_bbox and chunk.bbox: |
|
|
chunk_data["bbox"] = { |
|
|
"x_min": chunk.bbox.x_min, |
|
|
"y_min": chunk.bbox.y_min, |
|
|
"x_max": chunk.bbox.x_max, |
|
|
"y_max": chunk.bbox.y_max, |
|
|
} |
|
|
|
|
|
chunks_to_index.append(chunk_data) |
|
|
|
|
|
if not chunks_to_index: |
|
|
return IndexingResult( |
|
|
document_id=document_id, |
|
|
source_path=source, |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=skipped, |
|
|
success=True, |
|
|
) |
|
|
|
|
|
|
|
|
logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") |
|
|
texts = [c["text"] for c in chunks_to_index] |
|
|
|
|
|
embeddings = [] |
|
|
batch_size = self.config.batch_size |
|
|
for i in range(0, len(texts), batch_size): |
|
|
batch = texts[i:i + batch_size] |
|
|
batch_embeddings = self.embedder.embed_batch(batch) |
|
|
embeddings.extend(batch_embeddings) |
|
|
|
|
|
|
|
|
logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") |
|
|
self.store.add_chunks(chunks_to_index, embeddings) |
|
|
|
|
|
logger.info( |
|
|
f"Indexed document {document_id}: " |
|
|
f"{len(chunks_to_index)} chunks, {skipped} skipped" |
|
|
) |
|
|
|
|
|
return IndexingResult( |
|
|
document_id=document_id, |
|
|
source_path=source, |
|
|
num_chunks_indexed=len(chunks_to_index), |
|
|
num_chunks_skipped=skipped, |
|
|
success=True, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to index parse result: {e}") |
|
|
return IndexingResult( |
|
|
document_id=document_id, |
|
|
source_path=source, |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=0, |
|
|
success=False, |
|
|
error=str(e), |
|
|
) |
|
|
|
|
|
def index_document( |
|
|
self, |
|
|
path: str, |
|
|
max_pages: Optional[int] = None, |
|
|
) -> IndexingResult: |
|
|
""" |
|
|
Parse and index a document in one step. |
|
|
|
|
|
Args: |
|
|
path: Path to document file |
|
|
max_pages: Optional limit on pages to process |
|
|
|
|
|
Returns: |
|
|
IndexingResult |
|
|
""" |
|
|
if not DOCINT_AVAILABLE: |
|
|
return IndexingResult( |
|
|
document_id=str(path), |
|
|
source_path=str(path), |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=0, |
|
|
success=False, |
|
|
error="document_intelligence module not available", |
|
|
) |
|
|
|
|
|
try: |
|
|
from ..document_intelligence import DocumentParser, ParserConfig |
|
|
|
|
|
config = ParserConfig(max_pages=max_pages) |
|
|
parser = DocumentParser(config=config) |
|
|
|
|
|
logger.info(f"Parsing document: {path}") |
|
|
parse_result = parser.parse(path) |
|
|
|
|
|
return self.index_parse_result(parse_result, source_path=str(path)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to parse and index document: {e}") |
|
|
return IndexingResult( |
|
|
document_id=str(path), |
|
|
source_path=str(path), |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=0, |
|
|
success=False, |
|
|
error=str(e), |
|
|
) |
|
|
|
|
|
def delete_document(self, document_id: str) -> int: |
|
|
"""Remove a document from the index.""" |
|
|
return self.store.delete_document(document_id) |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get indexing statistics.""" |
|
|
total_chunks = self.store.count() |
|
|
|
|
|
return { |
|
|
"total_chunks": total_chunks, |
|
|
"embedding_model": self.embedder.model_name, |
|
|
"embedding_dimension": self.embedder.embedding_dimension, |
|
|
} |
|
|
|
|
|
|
|
|
class DocIntRetriever: |
|
|
""" |
|
|
Retriever with document_intelligence EvidenceRef support. |
|
|
|
|
|
Wraps DocumentRetriever with conversions to document_intelligence types. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
vector_store: Optional[VectorStore] = None, |
|
|
embedding_adapter: Optional[EmbeddingAdapter] = None, |
|
|
similarity_threshold: float = 0.5, |
|
|
): |
|
|
self._store = vector_store |
|
|
self._embedder = embedding_adapter |
|
|
self.similarity_threshold = similarity_threshold |
|
|
|
|
|
@property |
|
|
def store(self) -> VectorStore: |
|
|
if self._store is None: |
|
|
self._store = get_vector_store() |
|
|
return self._store |
|
|
|
|
|
@property |
|
|
def embedder(self) -> EmbeddingAdapter: |
|
|
if self._embedder is None: |
|
|
self._embedder = get_embedding_adapter() |
|
|
return self._embedder |
|
|
|
|
|
def retrieve( |
|
|
self, |
|
|
query: str, |
|
|
top_k: int = 5, |
|
|
document_id: Optional[str] = None, |
|
|
chunk_types: Optional[List[str]] = None, |
|
|
page_range: Optional[tuple] = None, |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Retrieve relevant chunks. |
|
|
|
|
|
Args: |
|
|
query: Search query |
|
|
top_k: Number of results |
|
|
document_id: Filter by document |
|
|
chunk_types: Filter by chunk type(s) |
|
|
page_range: Filter by page range (start, end) |
|
|
|
|
|
Returns: |
|
|
List of chunk dicts with metadata |
|
|
""" |
|
|
|
|
|
filters = {} |
|
|
|
|
|
if document_id: |
|
|
filters["document_id"] = document_id |
|
|
|
|
|
if chunk_types: |
|
|
filters["chunk_type"] = chunk_types |
|
|
|
|
|
if page_range: |
|
|
filters["page"] = {"min": page_range[0], "max": page_range[1]} |
|
|
|
|
|
|
|
|
query_embedding = self.embedder.embed_text(query) |
|
|
|
|
|
|
|
|
results = self.store.search( |
|
|
query_embedding=query_embedding, |
|
|
top_k=top_k, |
|
|
filters=filters if filters else None, |
|
|
) |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
for result in results: |
|
|
if result.similarity < self.similarity_threshold: |
|
|
continue |
|
|
|
|
|
chunk = { |
|
|
"chunk_id": result.chunk_id, |
|
|
"document_id": result.document_id, |
|
|
"text": result.text, |
|
|
"similarity": result.similarity, |
|
|
"page": result.page, |
|
|
"chunk_type": result.chunk_type, |
|
|
"bbox": result.bbox, |
|
|
"source_path": result.metadata.get("source_path"), |
|
|
"confidence": result.metadata.get("confidence"), |
|
|
} |
|
|
chunks.append(chunk) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def retrieve_with_evidence( |
|
|
self, |
|
|
query: str, |
|
|
top_k: int = 5, |
|
|
document_id: Optional[str] = None, |
|
|
chunk_types: Optional[List[str]] = None, |
|
|
page_range: Optional[tuple] = None, |
|
|
) -> tuple: |
|
|
""" |
|
|
Retrieve chunks with EvidenceRef objects. |
|
|
|
|
|
Returns: |
|
|
Tuple of (chunks, evidence_refs) |
|
|
""" |
|
|
chunks = self.retrieve( |
|
|
query, top_k, document_id, chunk_types, page_range |
|
|
) |
|
|
|
|
|
evidence_refs = [] |
|
|
|
|
|
if DOCINT_AVAILABLE: |
|
|
for chunk in chunks: |
|
|
bbox = None |
|
|
if chunk.get("bbox"): |
|
|
bbox_data = chunk["bbox"] |
|
|
bbox = BoundingBox( |
|
|
x_min=bbox_data.get("x_min", 0), |
|
|
y_min=bbox_data.get("y_min", 0), |
|
|
x_max=bbox_data.get("x_max", 1), |
|
|
y_max=bbox_data.get("y_max", 1), |
|
|
normalized=True, |
|
|
) |
|
|
else: |
|
|
bbox = BoundingBox(x_min=0, y_min=0, x_max=1, y_max=1) |
|
|
|
|
|
evidence = EvidenceRef( |
|
|
chunk_id=chunk["chunk_id"], |
|
|
doc_id=chunk["document_id"], |
|
|
page=chunk.get("page", 1), |
|
|
bbox=bbox, |
|
|
source_type=chunk.get("chunk_type", "text"), |
|
|
snippet=chunk["text"][:200], |
|
|
confidence=chunk.get("confidence", chunk["similarity"]), |
|
|
) |
|
|
evidence_refs.append(evidence) |
|
|
|
|
|
return chunks, evidence_refs |
|
|
|
|
|
def build_context( |
|
|
self, |
|
|
chunks: List[Dict[str, Any]], |
|
|
max_length: int = 8000, |
|
|
) -> str: |
|
|
"""Build context string from retrieved chunks.""" |
|
|
if not chunks: |
|
|
return "" |
|
|
|
|
|
parts = [] |
|
|
for i, chunk in enumerate(chunks, 1): |
|
|
header = f"[{i}]" |
|
|
if chunk.get("page"): |
|
|
header += f" Page {chunk['page']}" |
|
|
if chunk.get("chunk_type"): |
|
|
header += f" ({chunk['chunk_type']})" |
|
|
header += f" [sim={chunk['similarity']:.2f}]" |
|
|
|
|
|
parts.append(header) |
|
|
parts.append(chunk["text"]) |
|
|
parts.append("") |
|
|
|
|
|
context = "\n".join(parts) |
|
|
|
|
|
if len(context) > max_length: |
|
|
context = context[:max_length] + "\n...[truncated]" |
|
|
|
|
|
return context |
|
|
|
|
|
|
|
|
|
|
|
_docint_indexer: Optional[DocIntIndexer] = None |
|
|
_docint_retriever: Optional[DocIntRetriever] = None |
|
|
|
|
|
|
|
|
def get_docint_indexer( |
|
|
config: Optional[IndexerConfig] = None, |
|
|
vector_store: Optional[VectorStore] = None, |
|
|
embedding_adapter: Optional[EmbeddingAdapter] = None, |
|
|
) -> DocIntIndexer: |
|
|
"""Get or create singleton DocIntIndexer.""" |
|
|
global _docint_indexer |
|
|
|
|
|
if _docint_indexer is None: |
|
|
_docint_indexer = DocIntIndexer( |
|
|
config=config, |
|
|
vector_store=vector_store, |
|
|
embedding_adapter=embedding_adapter, |
|
|
) |
|
|
|
|
|
return _docint_indexer |
|
|
|
|
|
|
|
|
def get_docint_retriever( |
|
|
vector_store: Optional[VectorStore] = None, |
|
|
embedding_adapter: Optional[EmbeddingAdapter] = None, |
|
|
similarity_threshold: float = 0.5, |
|
|
) -> DocIntRetriever: |
|
|
"""Get or create singleton DocIntRetriever.""" |
|
|
global _docint_retriever |
|
|
|
|
|
if _docint_retriever is None: |
|
|
_docint_retriever = DocIntRetriever( |
|
|
vector_store=vector_store, |
|
|
embedding_adapter=embedding_adapter, |
|
|
similarity_threshold=similarity_threshold, |
|
|
) |
|
|
|
|
|
return _docint_retriever |
|
|
|
|
|
|
|
|
def reset_docint_components(): |
|
|
"""Reset singleton instances.""" |
|
|
global _docint_indexer, _docint_retriever |
|
|
_docint_indexer = None |
|
|
_docint_retriever = None |
|
|
|