Radha / app /services /vector_store.py
Coderadi's picture
first commit
521f25e
"""
VECTOR STORE SERVICE MODULE
===========================
This service builds and queries the FAISS vector index used for context retrieval.
Learning data (database/learning_data/*.txt) and past chats (database/chats_data/*.json)
are loaded at startup, split into chunks, embedded with HuggingFace, and stored in FAISS.
When the user ask a question we embed it and retrieve and k most similar chunks; only
those chunks are sent to the LLM, so token usage is bounded.
LIFECYCLE:
- create_vector_store(): Load all .txt and .json, chunk, embed, build FAISS, save to disk.
Called once at startup. Restart the server after adding new .txt files so they are included.
- get_retriever(k): Return a retriever that fetches k nearest chunks for a query string.
- save_vector_store(): Write the current FAISS index to database/vector_store/ (called after create).
Embeddings run locally (sentence-transformers); no extra API key. Groq and Realtime services
call get_retriever() for every request to get context.
"""
import json
import logging
from pathlib import Path
from typing import List, Optional
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from config import (
LEARNING_DATA_DIR,
CHATS_DATA_DIR,
VECTOR_STORE_DIR,
EMBEDDING_MODEL,
CHUNK_SIZE,
CHUNK_OVERLAP,
)
logger = logging.getLogger("J.A.R.V.I.S")
# =========================================================
# VECTOR STORE SERVICE CLASS
# =========================================================
class VectorStoreService:
"""
Builds a FAISS index from learning_data .txt files and chats_data .json files,
and provides a retriever to fetch the k most relevant chunks for a query.
"""
def __init__(self):
"""Create the embedding model (local) and text splitter; vector_store is set in create_vector_store()."""
# Embeddings run locally (no API key); used to convert text into vectors for similarity search.
self.embeddings = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL,
model_kwargs={"device":"cpu"},
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
)
self.vector_store: Optional[FAISS] = None
self._retriever_cache: dict = {}
# ----------------------------------------------------------------------
# LoAD DOcUMENTS FROM DISK
# ----------------------------------------------------------------------
def load_learning_data(self) -> List[Document]:
"""Read all .text files in database/learning_data/ and return one Document per file (content + source name). """
documents = []
for file_path in list(LEARNING_DATA_DIR.glob("*.txt")):
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if content:
documents.append(Document(page_content=content, metadata={"source": str(file_path.name)}))
logger.info("[VECTOR] Loaded learning data: %s (%s chars)", file_path.name, len(content))
except Exception as e:
logger.warning("Could not load learning data file %s: %s", file_path, e)
logger.info("[VECTOR] Total learning data files loaded: %d", len(documents))
return documents
def load_chat_history(self) -> List[Document]:
"""load all .json files in database/chats_data/; turn each into one Document (User:/Assistant: lines)."""
documents = []
for file_path in sorted(CHATS_DATA_DIR.glob("*.json")):
try:
with open(file_path, "r", encoding="utf-8") as f:
chat_data = json.load(f)
messages = chat_data.get("messages", [])
# Format as "User: ..." / "Assistant: ..." so the retriever can match past conversations.
chat_content = "\n".join([
f"User: {msg.get('content', '')}"if msg.get('role') == 'user'
else f"Assistant: {msg.get('content', '')}"
for msg in messages
])
if chat_content.strip():
documents.append(Document(page_content=chat_content, metadata={"source": f"chat_{file_path.stem}"}))
logger.info("[VECTOR] Loaded chat history: %s (%d messages)", file_path.name, len(messages))
except Exception as e:
logger.warning("Could not load chat history file %s: %s", file_path, e)
logger.info("[VECTOR] Total chat history files loaded: %d", len(documents))
return documents
# -------------------------------------------------------
# BUILD AND SAVE FAISS INDEX
# -------------------------------------------------------
def create_vector_store(self) -> FAISS:
"""
Load learning_data + chats_data, embed, build FAISS index, save to disk.
Called once at startup. If there are no documents we create a tiny placeholder index.
"""
learning_docs = self.load_learning_data()
chat_docs = self.load_chat_history()
all_documents = learning_docs + chat_docs
logger.info("[VECTOR] Total documents to index: %d (learning: %d, chat:%d)",
len(all_documents), len(learning_docs), len(chat_docs))
if not all_documents:
# Placeholder so get_retriever() never fails; return this single chunk for any query.
self.vector_store = FAISS.from_texts(["No data available yet."], self.embeddings)
logger.info("[VECTOR] No douments found, created placeholder index")
else:
chunks = self.text_splitter.split_documents(all_documents)
logger.info("[VECTOR] Split into %d chunks (chunk_size=%d, overlap=%d)",
len(chunks), CHUNK_SIZE, CHUNK_OVERLAP)
self.vector_store = FAISS.from_documents(chunks, self.embeddings)
logger.info("[VECTOR] FAISS index build successfully with %d vectors", len(chunks))
self._retriever_cache.clear()
self.save_vector_store()
return self.vector_store
def save_vector_store(self):
"""Write the current FAISS index to database/vector_store/. On error we only log."""
if self.vector_store:
try:
self.vector_store.save_local(str(VECTOR_STORE_DIR))
except Exception as e:
logger.error("failed to save vector store to disk: %s", e)
# ---------------------------------------------------------------------------
# RETRIEVER FOR CONTEXT
# ---------------------------------------------------------------------------
def get_retriever(self, k: int = 10):
"""Return a retriever that returns that k most similar chunks for a query string."""
if not self.vector_store:
raise RuntimeError("Vector store not initialized. This should not happen.")
if k not in self._retriever_cache:
self._retriever_cache[k] = self.vector_store.as_retriever(search_kwargs={"k": k})
return self._retriever_cache[k]