| | """ |
| | Document Indexer for RAG |
| | |
| | Handles indexing processed documents into the vector store. |
| | """ |
| |
|
| | from typing import List, Optional, Dict, Any, Union |
| | from pathlib import Path |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| |
|
| | from .store import VectorStore, get_vector_store |
| | from .embeddings import EmbeddingAdapter, get_embedding_adapter |
| |
|
| | try: |
| | from ..document.schemas.core import ProcessedDocument, DocumentChunk |
| | from ..document.pipeline import process_document, PipelineConfig |
| | DOCUMENT_MODULE_AVAILABLE = True |
| | except ImportError: |
| | DOCUMENT_MODULE_AVAILABLE = False |
| | logger.warning("Document module not available for indexing") |
| |
|
| |
|
| | class IndexerConfig(BaseModel): |
| | """Configuration for document indexer.""" |
| | |
| | batch_size: int = Field(default=32, ge=1, description="Embedding batch size") |
| |
|
| | |
| | include_bbox: bool = Field(default=True, description="Include bounding boxes") |
| | include_page: bool = Field(default=True, description="Include page numbers") |
| | include_chunk_type: bool = Field(default=True, description="Include chunk types") |
| |
|
| | |
| | skip_empty_chunks: bool = Field(default=True, description="Skip empty text chunks") |
| | min_chunk_length: int = Field(default=10, ge=1, description="Minimum chunk text length") |
| |
|
| |
|
| | class IndexingResult(BaseModel): |
| | """Result of indexing operation.""" |
| | document_id: str |
| | source_path: str |
| | num_chunks_indexed: int |
| | num_chunks_skipped: int |
| | success: bool |
| | error: Optional[str] = None |
| |
|
| |
|
| | class DocumentIndexer: |
| | """ |
| | Indexes documents into the vector store for RAG. |
| | |
| | Workflow: |
| | 1. Process document (if not already processed) |
| | 2. Extract chunks with metadata |
| | 3. Generate embeddings |
| | 4. Store in vector database |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | config: Optional[IndexerConfig] = None, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | ): |
| | """ |
| | Initialize indexer. |
| | |
| | Args: |
| | config: Indexer configuration |
| | vector_store: Vector store instance |
| | embedding_adapter: Embedding adapter instance |
| | """ |
| | self.config = config or IndexerConfig() |
| | self._store = vector_store |
| | self._embedder = embedding_adapter |
| |
|
| | @property |
| | def store(self) -> VectorStore: |
| | """Get vector store (lazy initialization).""" |
| | if self._store is None: |
| | self._store = get_vector_store() |
| | return self._store |
| |
|
| | @property |
| | def embedder(self) -> EmbeddingAdapter: |
| | """Get embedding adapter (lazy initialization).""" |
| | if self._embedder is None: |
| | self._embedder = get_embedding_adapter() |
| | return self._embedder |
| |
|
| | def index_document( |
| | self, |
| | source: Union[str, Path], |
| | document_id: Optional[str] = None, |
| | pipeline_config: Optional[Any] = None, |
| | ) -> IndexingResult: |
| | """ |
| | Index a document from file. |
| | |
| | Args: |
| | source: Path to document |
| | document_id: Optional document ID |
| | pipeline_config: Optional pipeline configuration |
| | |
| | Returns: |
| | IndexingResult |
| | """ |
| | if not DOCUMENT_MODULE_AVAILABLE: |
| | return IndexingResult( |
| | document_id=document_id or str(source), |
| | source_path=str(source), |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=0, |
| | success=False, |
| | error="Document processing module not available", |
| | ) |
| |
|
| | try: |
| | |
| | logger.info(f"Processing document: {source}") |
| | processed = process_document(source, document_id, pipeline_config) |
| |
|
| | |
| | return self.index_processed_document(processed) |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to index document: {e}") |
| | return IndexingResult( |
| | document_id=document_id or str(source), |
| | source_path=str(source), |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=0, |
| | success=False, |
| | error=str(e), |
| | ) |
| |
|
| | def index_processed_document( |
| | self, |
| | document: "ProcessedDocument", |
| | ) -> IndexingResult: |
| | """ |
| | Index an already-processed document. |
| | |
| | Args: |
| | document: ProcessedDocument instance |
| | |
| | Returns: |
| | IndexingResult |
| | """ |
| | document_id = document.metadata.document_id |
| | source_path = document.metadata.source_path |
| |
|
| | try: |
| | |
| | chunks_to_index = [] |
| | skipped = 0 |
| |
|
| | for chunk in document.chunks: |
| | |
| | if self.config.skip_empty_chunks: |
| | if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: |
| | skipped += 1 |
| | continue |
| |
|
| | chunk_data = { |
| | "chunk_id": chunk.chunk_id, |
| | "document_id": document_id, |
| | "source_path": source_path, |
| | "text": chunk.text, |
| | "sequence_index": chunk.sequence_index, |
| | "confidence": chunk.confidence, |
| | } |
| |
|
| | if self.config.include_page: |
| | chunk_data["page"] = chunk.page |
| |
|
| | if self.config.include_chunk_type: |
| | chunk_data["chunk_type"] = chunk.chunk_type.value |
| |
|
| | if self.config.include_bbox and chunk.bbox: |
| | chunk_data["bbox"] = { |
| | "x_min": chunk.bbox.x_min, |
| | "y_min": chunk.bbox.y_min, |
| | "x_max": chunk.bbox.x_max, |
| | "y_max": chunk.bbox.y_max, |
| | } |
| |
|
| | chunks_to_index.append(chunk_data) |
| |
|
| | if not chunks_to_index: |
| | return IndexingResult( |
| | document_id=document_id, |
| | source_path=source_path, |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=skipped, |
| | success=True, |
| | ) |
| |
|
| | |
| | logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") |
| | texts = [c["text"] for c in chunks_to_index] |
| | embeddings = self.embedder.embed_batch(texts) |
| |
|
| | |
| | logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") |
| | self.store.add_chunks(chunks_to_index, embeddings) |
| |
|
| | logger.info( |
| | f"Indexed document {document_id}: " |
| | f"{len(chunks_to_index)} chunks, {skipped} skipped" |
| | ) |
| |
|
| | return IndexingResult( |
| | document_id=document_id, |
| | source_path=source_path, |
| | num_chunks_indexed=len(chunks_to_index), |
| | num_chunks_skipped=skipped, |
| | success=True, |
| | ) |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to index processed document: {e}") |
| | return IndexingResult( |
| | document_id=document_id, |
| | source_path=source_path, |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=0, |
| | success=False, |
| | error=str(e), |
| | ) |
| |
|
| | def index_batch( |
| | self, |
| | sources: List[Union[str, Path]], |
| | pipeline_config: Optional[Any] = None, |
| | ) -> List[IndexingResult]: |
| | """ |
| | Index multiple documents. |
| | |
| | Args: |
| | sources: List of document paths |
| | pipeline_config: Optional pipeline configuration |
| | |
| | Returns: |
| | List of IndexingResult |
| | """ |
| | results = [] |
| |
|
| | for source in sources: |
| | result = self.index_document(source, pipeline_config=pipeline_config) |
| | results.append(result) |
| |
|
| | |
| | successful = sum(1 for r in results if r.success) |
| | total_chunks = sum(r.num_chunks_indexed for r in results) |
| |
|
| | logger.info( |
| | f"Batch indexing complete: " |
| | f"{successful}/{len(results)} documents, " |
| | f"{total_chunks} total chunks" |
| | ) |
| |
|
| | return results |
| |
|
| | def delete_document(self, document_id: str) -> int: |
| | """ |
| | Remove a document from the index. |
| | |
| | Args: |
| | document_id: Document ID to remove |
| | |
| | Returns: |
| | Number of chunks deleted |
| | """ |
| | return self.store.delete_document(document_id) |
| |
|
| | def get_index_stats(self) -> Dict[str, Any]: |
| | """ |
| | Get indexing statistics. |
| | |
| | Returns: |
| | Dictionary with index stats |
| | """ |
| | total_chunks = self.store.count() |
| |
|
| | |
| | try: |
| | if hasattr(self.store, 'list_documents'): |
| | doc_ids = self.store.list_documents() |
| | num_documents = len(doc_ids) |
| | else: |
| | num_documents = None |
| | except: |
| | num_documents = None |
| |
|
| | return { |
| | "total_chunks": total_chunks, |
| | "num_documents": num_documents, |
| | "embedding_model": self.embedder.model_name, |
| | "embedding_dimension": self.embedder.embedding_dimension, |
| | } |
| |
|
| |
|
| | |
| | _document_indexer: Optional[DocumentIndexer] = None |
| |
|
| |
|
| | def get_document_indexer( |
| | config: Optional[IndexerConfig] = None, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | ) -> DocumentIndexer: |
| | """ |
| | Get or create singleton document indexer. |
| | |
| | Args: |
| | config: Indexer configuration |
| | vector_store: Optional vector store instance |
| | embedding_adapter: Optional embedding adapter |
| | |
| | Returns: |
| | DocumentIndexer instance |
| | """ |
| | global _document_indexer |
| |
|
| | if _document_indexer is None: |
| | _document_indexer = DocumentIndexer( |
| | config=config, |
| | vector_store=vector_store, |
| | embedding_adapter=embedding_adapter, |
| | ) |
| |
|
| | return _document_indexer |
| |
|
| |
|
| | def reset_document_indexer(): |
| | """Reset the global indexer instance.""" |
| | global _document_indexer |
| | _document_indexer = None |
| |
|