MgGladys's picture
Add files using upload-large-folder tool
0a937d7 verified
# import faiss
import numpy as np
import hnswlib
from src.utils import print_rank
# class FAISSIndex:
# """
# Manages FAISS indices for different candidate types and their associated keys.
# """
# # BUG: incompatible with numpy >= 2.0.0
# def __init__(self, ngpus=None):
# self.indices = {} # Stores FAISS indices for each candidate type
# self.keys_dict = {} # Stores candidate keys for each candidate type
# self.ngpus = ngpus or faiss.get_num_gpus()
# print_rank(f"FAISS Index initialized with {self.ngpus} GPUs")
# def create_index(self, cand_type, cand_vectors, cand_keys):
# """
# Create a multi-GPU FAISS index for a candidate type.
# Args:
# cand_type (str): Candidate type (state, trajectory, interval)
# cand_vectors (np.ndarray): Embeddings for the candidates
# cand_keys (list): List of candidate IDs
# """
# print_rank(f"Building FAISS index for {cand_type}")
# assert len(cand_keys) == cand_vectors.shape[0]
# # Store candidate keys for this type
# self.keys_dict[cand_type] = cand_keys
# # Normalize vectors for cosine similarity
# vectors = cand_vectors.astype(np.float32).copy()
# faiss.normalize_L2(vectors)
# # Create CPU index
# d = vectors.shape[1] # Embedding dimension
# cpu_index = faiss.IndexFlatIP(d) # Inner product similarity
# cpu_index.add(vectors)
# # Distribute the index across multiple GPUs
# co = faiss.GpuMultipleClonerOptions()
# co.shard = True # Shard the index across GPUs
# gpu_index = faiss.index_cpu_to_all_gpus(cpu_index, co=co, ngpu=self.ngpus)
# # Store the GPU index
# self.indices[cand_type] = gpu_index
# def search(self, cand_type, query_vectors, k):
# """
# Search for nearest neighbors in the index for a specific candidate type.
# Args:
# cand_type (str): Candidate type (state, trajectory, interval)
# query_vector (np.ndarray): Query embedding(s)
# k (int): Number of results to retrieve
# Returns:
# tuple: (scores, predictions) where:
# - scores is a list of lists of similarity scores
# - predictions is a list of lists of candidate IDs
# """
# if cand_type not in self.indices:
# raise ValueError(f"Index for {cand_type} not found")
# if len(query_vectors.shape) == 1:
# q = query_vectors.reshape(1, -1).astype(np.float32)
# else:
# q = query_vectors.astype(np.float32)
# # Normalize vectors for cosine similarity
# faiss.normalize_L2(q)
# assert q.shape[1] == self.indices[cand_type].d, \
# f"Query dimension {q.shape[1]} doesn't match index dimension {self.indices[cand_type].d}"
# # Search in the appropriate index
# scores, indices = self.indices[cand_type].search(q, k)
# # Process results - create a list of predictions for each query
# all_predictions = []
# for i in range(indices.shape[0]):
# predictions = [self.keys_dict[cand_type][int(idx)] for idx in indices[i]]
# all_predictions.append(predictions)
# return scores.tolist(), all_predictions
class HNSWIndex:
"""
Manages HNSW indices for different candidate types and their associated keys.
This implementation provides functionality similar to FAISSIndex.
"""
def __init__(self, ef_construction=200, M=48):
self.indices = {} # Stores HNSW indices for each candidate type
self.keys_dict = {} # Stores candidate keys for each candidate type
self.dimensions = {} # Stores embedding dimensions for each candidate type
self.ef_construction = ef_construction # Controls index quality
self.M = M # Controls graph connectivity
print_rank(f"HNSW Index initialized with ef_construction={ef_construction}, M={M}")
def create_index(self, cand_type, cand_vectors, cand_keys):
"""
Create an HNSW index for a candidate type.
Args:
cand_type (str): Candidate type (state, trajectory, interval)
cand_vectors (np.ndarray): Embeddings for the candidates
cand_keys (list): List of candidate IDs
"""
print_rank(f"Building HNSW index for {cand_type}")
assert len(cand_keys) == cand_vectors.shape[0]
# Store candidate keys for this type
self.keys_dict[cand_type] = cand_keys
# Normalize vectors for cosine similarity
vectors = cand_vectors.astype(np.float32).copy()
# Equivalent to faiss.normalize_L2
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
assert not np.any(norms == 0), "Zero norm found in candidate vectors"
vectors = vectors / norms
num_elements, dim = vectors.shape
# Initialize the index; using cosine metric (distance = 1 - cosine similarity)
index = hnswlib.Index(space='cosine', dim=dim)
index.init_index(max_elements=num_elements, ef_construction=self.ef_construction, M=self.M)
# Add all vectors with their IDs
index.add_items(vectors, np.arange(num_elements))
# Set search quality parameter
index.set_ef(100)
# Store the index
self.indices[cand_type] = index
def search(self, cand_type, query_vectors, k):
"""
Search for nearest neighbors in the index for a specific candidate type.
Args:
cand_type (str): Candidate type (state, trajectory, interval)
query_vector (np.ndarray): Query embedding(s)
k (int): Number of results to retrieve
Returns:
tuple: (scores, predictions) where:
- scores is a list of lists of similarity scores
- predictions is a list of lists of candidate IDs
"""
if cand_type not in self.indices:
raise ValueError(f"Index for {cand_type} not found")
if len(query_vectors.shape) == 1:
q = query_vectors.reshape(1, -1).astype(np.float32)
else:
q = query_vectors.astype(np.float32)
# Normalize query vectors
norms = np.linalg.norm(q, axis=1, keepdims=True)
assert not np.any(norms == 0), "Zero norm found in query vectors"
q = q / norms
assert q.shape[1] == self.indices[cand_type].dim, \
f"Query dimension {q.shape[1]} doesn't match index dimension {self.indices[cand_type].dim}"
# Search in the HNSW index
indices, distances = self.indices[cand_type].knn_query(q, k=k)
# Convert distances to similarity scores
scores = 1 - distances
# Process results - create a list of predictions for each query
all_predictions = []
for i in range(indices.shape[0]):
predictions = [self.keys_dict[cand_type][int(idx)] for idx in indices[i]]
all_predictions.append(predictions)
return scores.tolist(), all_predictions