| # import faiss | |
| import numpy as np | |
| import hnswlib | |
| from src.utils import print_rank | |
| # class FAISSIndex: | |
| # """ | |
| # Manages FAISS indices for different candidate types and their associated keys. | |
| # """ | |
| # # BUG: incompatible with numpy >= 2.0.0 | |
| # def __init__(self, ngpus=None): | |
| # self.indices = {} # Stores FAISS indices for each candidate type | |
| # self.keys_dict = {} # Stores candidate keys for each candidate type | |
| # self.ngpus = ngpus or faiss.get_num_gpus() | |
| # print_rank(f"FAISS Index initialized with {self.ngpus} GPUs") | |
| # def create_index(self, cand_type, cand_vectors, cand_keys): | |
| # """ | |
| # Create a multi-GPU FAISS index for a candidate type. | |
| # Args: | |
| # cand_type (str): Candidate type (state, trajectory, interval) | |
| # cand_vectors (np.ndarray): Embeddings for the candidates | |
| # cand_keys (list): List of candidate IDs | |
| # """ | |
| # print_rank(f"Building FAISS index for {cand_type}") | |
| # assert len(cand_keys) == cand_vectors.shape[0] | |
| # # Store candidate keys for this type | |
| # self.keys_dict[cand_type] = cand_keys | |
| # # Normalize vectors for cosine similarity | |
| # vectors = cand_vectors.astype(np.float32).copy() | |
| # faiss.normalize_L2(vectors) | |
| # # Create CPU index | |
| # d = vectors.shape[1] # Embedding dimension | |
| # cpu_index = faiss.IndexFlatIP(d) # Inner product similarity | |
| # cpu_index.add(vectors) | |
| # # Distribute the index across multiple GPUs | |
| # co = faiss.GpuMultipleClonerOptions() | |
| # co.shard = True # Shard the index across GPUs | |
| # gpu_index = faiss.index_cpu_to_all_gpus(cpu_index, co=co, ngpu=self.ngpus) | |
| # # Store the GPU index | |
| # self.indices[cand_type] = gpu_index | |
| # def search(self, cand_type, query_vectors, k): | |
| # """ | |
| # Search for nearest neighbors in the index for a specific candidate type. | |
| # Args: | |
| # cand_type (str): Candidate type (state, trajectory, interval) | |
| # query_vector (np.ndarray): Query embedding(s) | |
| # k (int): Number of results to retrieve | |
| # Returns: | |
| # tuple: (scores, predictions) where: | |
| # - scores is a list of lists of similarity scores | |
| # - predictions is a list of lists of candidate IDs | |
| # """ | |
| # if cand_type not in self.indices: | |
| # raise ValueError(f"Index for {cand_type} not found") | |
| # if len(query_vectors.shape) == 1: | |
| # q = query_vectors.reshape(1, -1).astype(np.float32) | |
| # else: | |
| # q = query_vectors.astype(np.float32) | |
| # # Normalize vectors for cosine similarity | |
| # faiss.normalize_L2(q) | |
| # assert q.shape[1] == self.indices[cand_type].d, \ | |
| # f"Query dimension {q.shape[1]} doesn't match index dimension {self.indices[cand_type].d}" | |
| # # Search in the appropriate index | |
| # scores, indices = self.indices[cand_type].search(q, k) | |
| # # Process results - create a list of predictions for each query | |
| # all_predictions = [] | |
| # for i in range(indices.shape[0]): | |
| # predictions = [self.keys_dict[cand_type][int(idx)] for idx in indices[i]] | |
| # all_predictions.append(predictions) | |
| # return scores.tolist(), all_predictions | |
| class HNSWIndex: | |
| """ | |
| Manages HNSW indices for different candidate types and their associated keys. | |
| This implementation provides functionality similar to FAISSIndex. | |
| """ | |
| def __init__(self, ef_construction=200, M=48): | |
| self.indices = {} # Stores HNSW indices for each candidate type | |
| self.keys_dict = {} # Stores candidate keys for each candidate type | |
| self.dimensions = {} # Stores embedding dimensions for each candidate type | |
| self.ef_construction = ef_construction # Controls index quality | |
| self.M = M # Controls graph connectivity | |
| print_rank(f"HNSW Index initialized with ef_construction={ef_construction}, M={M}") | |
| def create_index(self, cand_type, cand_vectors, cand_keys): | |
| """ | |
| Create an HNSW index for a candidate type. | |
| Args: | |
| cand_type (str): Candidate type (state, trajectory, interval) | |
| cand_vectors (np.ndarray): Embeddings for the candidates | |
| cand_keys (list): List of candidate IDs | |
| """ | |
| print_rank(f"Building HNSW index for {cand_type}") | |
| assert len(cand_keys) == cand_vectors.shape[0] | |
| # Store candidate keys for this type | |
| self.keys_dict[cand_type] = cand_keys | |
| # Normalize vectors for cosine similarity | |
| vectors = cand_vectors.astype(np.float32).copy() | |
| # Equivalent to faiss.normalize_L2 | |
| norms = np.linalg.norm(vectors, axis=1, keepdims=True) | |
| assert not np.any(norms == 0), "Zero norm found in candidate vectors" | |
| vectors = vectors / norms | |
| num_elements, dim = vectors.shape | |
| # Initialize the index; using cosine metric (distance = 1 - cosine similarity) | |
| index = hnswlib.Index(space='cosine', dim=dim) | |
| index.init_index(max_elements=num_elements, ef_construction=self.ef_construction, M=self.M) | |
| # Add all vectors with their IDs | |
| index.add_items(vectors, np.arange(num_elements)) | |
| # Set search quality parameter | |
| index.set_ef(100) | |
| # Store the index | |
| self.indices[cand_type] = index | |
| def search(self, cand_type, query_vectors, k): | |
| """ | |
| Search for nearest neighbors in the index for a specific candidate type. | |
| Args: | |
| cand_type (str): Candidate type (state, trajectory, interval) | |
| query_vector (np.ndarray): Query embedding(s) | |
| k (int): Number of results to retrieve | |
| Returns: | |
| tuple: (scores, predictions) where: | |
| - scores is a list of lists of similarity scores | |
| - predictions is a list of lists of candidate IDs | |
| """ | |
| if cand_type not in self.indices: | |
| raise ValueError(f"Index for {cand_type} not found") | |
| if len(query_vectors.shape) == 1: | |
| q = query_vectors.reshape(1, -1).astype(np.float32) | |
| else: | |
| q = query_vectors.astype(np.float32) | |
| # Normalize query vectors | |
| norms = np.linalg.norm(q, axis=1, keepdims=True) | |
| assert not np.any(norms == 0), "Zero norm found in query vectors" | |
| q = q / norms | |
| assert q.shape[1] == self.indices[cand_type].dim, \ | |
| f"Query dimension {q.shape[1]} doesn't match index dimension {self.indices[cand_type].dim}" | |
| # Search in the HNSW index | |
| indices, distances = self.indices[cand_type].knn_query(q, k=k) | |
| # Convert distances to similarity scores | |
| scores = 1 - distances | |
| # Process results - create a list of predictions for each query | |
| all_predictions = [] | |
| for i in range(indices.shape[0]): | |
| predictions = [self.keys_dict[cand_type][int(idx)] for idx in indices[i]] | |
| all_predictions.append(predictions) | |
| return scores.tolist(), all_predictions |