"""Service for processing documents and ingesting to vector store.""" from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.documents import Document as LangChainDocument from src.db.postgres.vector_store import get_vector_store from src.storage.az_blob.az_blob import blob_storage from src.db.postgres.models import Document as DBDocument from sqlalchemy.ext.asyncio import AsyncSession from src.middlewares.logging import get_logger from typing import List from datetime import datetime, timezone, timedelta import sys import docx import pytesseract from pdf2image import convert_from_bytes from io import BytesIO _JAKARTA_TZ = timezone(timedelta(hours=7)) logger = get_logger("knowledge_processing") class KnowledgeProcessingService: """Service for processing documents and ingesting to vector store.""" def __init__(self): self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len ) async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int: """Process document and ingest to vector store. Returns: Number of chunks ingested """ try: logger.info(f"Processing document {db_doc.id}") content = await blob_storage.download_file(db_doc.blob_name) if db_doc.file_type == "pdf": documents = await self._build_pdf_documents(content, db_doc) else: text = self._extract_text(content, db_doc.file_type) if not text.strip(): raise ValueError("No text extracted from document") chunks = self.text_splitter.split_text(text) documents = [ LangChainDocument( page_content=chunk, metadata={ "user_id": db_doc.user_id, "source_type": "document", "updated_at": datetime.now(_JAKARTA_TZ).isoformat(), "data": { "document_id": db_doc.id, "filename": db_doc.filename, "file_type": db_doc.file_type, "chunk_index": i, }, } ) for i, chunk in enumerate(chunks) ] if not documents: raise ValueError("No text extracted from document") vector_store = get_vector_store() await vector_store.aadd_documents(documents) logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested") return len(documents) except Exception as e: logger.error(f"Failed to process document {db_doc.id}", error=str(e)) raise async def _build_pdf_documents( self, content: bytes, db_doc: DBDocument ) -> List[LangChainDocument]: """Build LangChain documents from PDF with page_label metadata using Tesseract OCR.""" documents: List[LangChainDocument] = [] poppler_path = None if sys.platform == "win32": pytesseract.pytesseract.tesseract_cmd = r"./software/Tesseract-OCR/tesseract.exe" poppler_path = "./software/poppler-24.08.0/Library/bin" images = convert_from_bytes(content, poppler_path=poppler_path) logger.info(f"Tesseract OCR: converting {len(images)} pages") for page_num, image in enumerate(images, start=1): page_text = pytesseract.image_to_string(image) if not page_text.strip(): continue for chunk in self.text_splitter.split_text(page_text): documents.append(LangChainDocument( page_content=chunk, metadata={ "user_id": db_doc.user_id, "source_type": "document", "updated_at": datetime.now(_JAKARTA_TZ).isoformat(), "data": { "document_id": db_doc.id, "filename": db_doc.filename, "file_type": db_doc.file_type, "chunk_index": len(documents), "page_label": page_num, }, } )) return documents def _extract_text(self, content: bytes, file_type: str) -> str: """Extract text from DOCX or TXT content.""" if file_type == "docx": doc = docx.Document(BytesIO(content)) return "\n".join(p.text for p in doc.paragraphs) elif file_type == "txt": return content.decode("utf-8") else: raise ValueError(f"Unsupported file type: {file_type}") knowledge_processor = KnowledgeProcessingService()