| import gc |
| import uuid |
|
|
| import chromadb |
| import numpy as np |
| import torch |
| import torch.nn.functional as F |
| from PIL import Image |
| from transformers import AutoModel, AutoImageProcessor |
|
|
| from src.utils.utils import extract_images_from_file |
|
|
|
|
| |
| |
| |
|
|
| class is_conf_image: |
| def __init__(self): |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.feature_extractor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", |
| cache_dir="../weights", use_fast=True, |
| trust_remote_code=True) |
| self.model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", |
| cache_dir="../weights", trust_remote_code=True).eval().to(self.device) |
|
|
| self.client = chromadb.PersistentClient(path="../db/image") |
| self.collection = self.client.get_or_create_collection(name="image_embedding", metadata={"hnsw": "cosine"}, ) |
| self.max_size: int = 800 |
| self.cnt: int = 0 |
| self.cnt_infer: int = 0 |
|
|
| async def making_embedding_vector(self, image_path: str, category: int, ): |
| image = Image.open(image_path).convert("RGB") |
| image = np.array(image) |
|
|
| embedding_vector = self.inference(image) |
| |
|
|
| self.add_vectors(embedding_vector, {"image": image_path, "category": category}) |
|
|
| if (self.cnt + 1) % 200 == 0: |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| self.cnt += 1 |
| else: |
| self.cnt += 1 |
| return embedding_vector |
|
|
| async def infer_image(self, image_path: str, threshold: float = 0.45, top_k: int = 2): |
| image = Image.open(image_path).convert("RGB") |
| image = np.array(image) |
|
|
| if image.shape[0] > self.max_size or image.shape[1] > self.max_size or image_path.endswith('.pdf'): |
| results = [] |
| for image in extract_images_from_file(image_path, max_size=self.max_size): |
| image = Image.open(image).convert("RGB") |
| image = np.array(image) |
| embedding_vector = self.inference(image) |
| result = self.finding_from_db(embedding_vector, threshold, top_k) |
| results.append(result) |
| return results |
|
|
| embedding_vector = self.inference(image) |
| results = self.finding_from_db(embedding_vector, threshold, top_k) |
|
|
| |
| |
| if (self.cnt_infer + 1) % 200 == 0: |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| self.cnt_infer += 1 |
| else: |
| self.cnt_infer += 1 |
| return results |
|
|
| def finding_from_db(self, embedding_vector, threshold: float, top_k: int, ) -> dict: |
| result_out, idx = {}, 0 |
| |
| results = self.collection.query(query_embeddings=embedding_vector, n_results=top_k, |
| include=["embeddings", "metadatas", "distances"]) |
| |
|
|
| for j in range(len(results["distances"][0])): |
| if results["distances"][0][j] <= threshold: |
| result_out["similar_image" + str(idx)] = results["metadatas"][0][j]["image"] |
| result_out["category" + str(idx)] = results["metadatas"][0][j]["category"] |
| result_out["cosine distance" + str(idx)] = results["distances"][0][j] |
| return result_out |
|
|
| @torch.inference_mode() |
| def inference(self, image: np.array): |
| inputs = self.feature_extractor(images=image, return_tensors="pt").to(self.device) |
| outputs = self.model(**inputs).last_hidden_state |
| outputs = F.normalize(outputs[:, 0], p=2, dim=1).detach().cpu().numpy() |
|
|
| return outputs.tolist() |
|
|
| def add_vectors(self, vectors, metadatas): |
| |
| self.collection.add( |
| embeddings=vectors[0], |
| metadatas=metadatas, |
| ids=str(uuid.uuid4()) |
| ) |
|
|