Spaces:
Running
Running
| """ | |
| VECTOR STORE SERVICE MODULE | |
| =========================== | |
| This service builds and queries the FAISS vector index used for context retrieval. | |
| Learning data (database/learning_data/*.txt) and past chats (database/chats_data/*.json) | |
| are loaded at startup, split into chunks, embedded with HuggingFace, and stored in FAISS. | |
| When the user ask a question we embed it and retrieve and k most similar chunks; only | |
| those chunks are sent to the LLM, so token usage is bounded. | |
| LIFECYCLE: | |
| - create_vector_store(): Load all .txt and .json, chunk, embed, build FAISS, save to disk. | |
| Called once at startup. Restart the server after adding new .txt files so they are included. | |
| - get_retriever(k): Return a retriever that fetches k nearest chunks for a query string. | |
| - save_vector_store(): Write the current FAISS index to database/vector_store/ (called after create). | |
| Embeddings run locally (sentence-transformers); no extra API key. Groq and Realtime services | |
| call get_retriever() for every request to get context. | |
| """ | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| from config import ( | |
| LEARNING_DATA_DIR, | |
| CHATS_DATA_DIR, | |
| VECTOR_STORE_DIR, | |
| EMBEDDING_MODEL, | |
| CHUNK_SIZE, | |
| CHUNK_OVERLAP, | |
| ) | |
| logger = logging.getLogger("J.A.R.V.I.S") | |
| # ========================================================= | |
| # VECTOR STORE SERVICE CLASS | |
| # ========================================================= | |
| class VectorStoreService: | |
| """ | |
| Builds a FAISS index from learning_data .txt files and chats_data .json files, | |
| and provides a retriever to fetch the k most relevant chunks for a query. | |
| """ | |
| def __init__(self): | |
| """Create the embedding model (local) and text splitter; vector_store is set in create_vector_store().""" | |
| # Embeddings run locally (no API key); used to convert text into vectors for similarity search. | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name=EMBEDDING_MODEL, | |
| model_kwargs={"device":"cpu"}, | |
| ) | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| ) | |
| self.vector_store: Optional[FAISS] = None | |
| self._retriever_cache: dict = {} | |
| # ---------------------------------------------------------------------- | |
| # LoAD DOcUMENTS FROM DISK | |
| # ---------------------------------------------------------------------- | |
| def load_learning_data(self) -> List[Document]: | |
| """Read all .text files in database/learning_data/ and return one Document per file (content + source name). """ | |
| documents = [] | |
| for file_path in list(LEARNING_DATA_DIR.glob("*.txt")): | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| content = f.read().strip() | |
| if content: | |
| documents.append(Document(page_content=content, metadata={"source": str(file_path.name)})) | |
| logger.info("[VECTOR] Loaded learning data: %s (%s chars)", file_path.name, len(content)) | |
| except Exception as e: | |
| logger.warning("Could not load learning data file %s: %s", file_path, e) | |
| logger.info("[VECTOR] Total learning data files loaded: %d", len(documents)) | |
| return documents | |
| def load_chat_history(self) -> List[Document]: | |
| """load all .json files in database/chats_data/; turn each into one Document (User:/Assistant: lines).""" | |
| documents = [] | |
| for file_path in sorted(CHATS_DATA_DIR.glob("*.json")): | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| chat_data = json.load(f) | |
| messages = chat_data.get("messages", []) | |
| # Format as "User: ..." / "Assistant: ..." so the retriever can match past conversations. | |
| chat_content = "\n".join([ | |
| f"User: {msg.get('content', '')}"if msg.get('role') == 'user' | |
| else f"Assistant: {msg.get('content', '')}" | |
| for msg in messages | |
| ]) | |
| if chat_content.strip(): | |
| documents.append(Document(page_content=chat_content, metadata={"source": f"chat_{file_path.stem}"})) | |
| logger.info("[VECTOR] Loaded chat history: %s (%d messages)", file_path.name, len(messages)) | |
| except Exception as e: | |
| logger.warning("Could not load chat history file %s: %s", file_path, e) | |
| logger.info("[VECTOR] Total chat history files loaded: %d", len(documents)) | |
| return documents | |
| # ------------------------------------------------------- | |
| # BUILD AND SAVE FAISS INDEX | |
| # ------------------------------------------------------- | |
| def create_vector_store(self) -> FAISS: | |
| """ | |
| Load learning_data + chats_data, embed, build FAISS index, save to disk. | |
| Called once at startup. If there are no documents we create a tiny placeholder index. | |
| """ | |
| learning_docs = self.load_learning_data() | |
| chat_docs = self.load_chat_history() | |
| all_documents = learning_docs + chat_docs | |
| logger.info("[VECTOR] Total documents to index: %d (learning: %d, chat:%d)", | |
| len(all_documents), len(learning_docs), len(chat_docs)) | |
| if not all_documents: | |
| # Placeholder so get_retriever() never fails; return this single chunk for any query. | |
| self.vector_store = FAISS.from_texts(["No data available yet."], self.embeddings) | |
| logger.info("[VECTOR] No douments found, created placeholder index") | |
| else: | |
| chunks = self.text_splitter.split_documents(all_documents) | |
| logger.info("[VECTOR] Split into %d chunks (chunk_size=%d, overlap=%d)", | |
| len(chunks), CHUNK_SIZE, CHUNK_OVERLAP) | |
| self.vector_store = FAISS.from_documents(chunks, self.embeddings) | |
| logger.info("[VECTOR] FAISS index build successfully with %d vectors", len(chunks)) | |
| self._retriever_cache.clear() | |
| self.save_vector_store() | |
| return self.vector_store | |
| def save_vector_store(self): | |
| """Write the current FAISS index to database/vector_store/. On error we only log.""" | |
| if self.vector_store: | |
| try: | |
| self.vector_store.save_local(str(VECTOR_STORE_DIR)) | |
| except Exception as e: | |
| logger.error("failed to save vector store to disk: %s", e) | |
| # --------------------------------------------------------------------------- | |
| # RETRIEVER FOR CONTEXT | |
| # --------------------------------------------------------------------------- | |
| def get_retriever(self, k: int = 10): | |
| """Return a retriever that returns that k most similar chunks for a query string.""" | |
| if not self.vector_store: | |
| raise RuntimeError("Vector store not initialized. This should not happen.") | |
| if k not in self._retriever_cache: | |
| self._retriever_cache[k] = self.vector_store.as_retriever(search_kwargs={"k": k}) | |
| return self._retriever_cache[k] | |