ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
"""Service for processing documents and ingesting to vector store."""
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document as LangChainDocument
from src.db.postgres.vector_store import get_vector_store
from src.storage.az_blob.az_blob import blob_storage
from src.db.postgres.models import Document as DBDocument
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from typing import List
from datetime import datetime, timezone, timedelta
import sys
import docx
import pytesseract
from pdf2image import convert_from_bytes
from io import BytesIO
_JAKARTA_TZ = timezone(timedelta(hours=7))
logger = get_logger("knowledge_processing")
class KnowledgeProcessingService:
"""Service for processing documents and ingesting to vector store."""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int:
"""Process document and ingest to vector store.
Returns:
Number of chunks ingested
"""
try:
logger.info(f"Processing document {db_doc.id}")
content = await blob_storage.download_file(db_doc.blob_name)
if db_doc.file_type == "pdf":
documents = await self._build_pdf_documents(content, db_doc)
else:
text = self._extract_text(content, db_doc.file_type)
if not text.strip():
raise ValueError("No text extracted from document")
chunks = self.text_splitter.split_text(text)
documents = [
LangChainDocument(
page_content=chunk,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"chunk_index": i,
},
}
)
for i, chunk in enumerate(chunks)
]
if not documents:
raise ValueError("No text extracted from document")
vector_store = get_vector_store()
await vector_store.aadd_documents(documents)
logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested")
return len(documents)
except Exception as e:
logger.error(f"Failed to process document {db_doc.id}", error=str(e))
raise
async def _build_pdf_documents(
self, content: bytes, db_doc: DBDocument
) -> List[LangChainDocument]:
"""Build LangChain documents from PDF with page_label metadata using Tesseract OCR."""
documents: List[LangChainDocument] = []
poppler_path = None
if sys.platform == "win32":
pytesseract.pytesseract.tesseract_cmd = r"./software/Tesseract-OCR/tesseract.exe"
poppler_path = "./software/poppler-24.08.0/Library/bin"
images = convert_from_bytes(content, poppler_path=poppler_path)
logger.info(f"Tesseract OCR: converting {len(images)} pages")
for page_num, image in enumerate(images, start=1):
page_text = pytesseract.image_to_string(image)
if not page_text.strip():
continue
for chunk in self.text_splitter.split_text(page_text):
documents.append(LangChainDocument(
page_content=chunk,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"chunk_index": len(documents),
"page_label": page_num,
},
}
))
return documents
def _extract_text(self, content: bytes, file_type: str) -> str:
"""Extract text from DOCX or TXT content."""
if file_type == "docx":
doc = docx.Document(BytesIO(content))
return "\n".join(p.text for p in doc.paragraphs)
elif file_type == "txt":
return content.decode("utf-8")
else:
raise ValueError(f"Unsupported file type: {file_type}")
knowledge_processor = KnowledgeProcessingService()