| import numpy as np |
| import torch |
| import json |
| import pandas as pd |
| from tqdm import tqdm |
| from typing import List, Dict, Tuple, Set, Union, Optional |
| from langchain.docstore.document import Document |
| from langchain_community.vectorstores import FAISS |
| from langchain_community.vectorstores.faiss import DistanceStrategy |
| from langchain_core.embeddings.embeddings import Embeddings |
| from FlagEmbedding import BGEM3FlagModel |
|
|
| def setup_gpu_info() -> None: |
| print(f"Số lượng GPU khả dụng: {torch.cuda.device_count()}") |
| print(f"GPU hiện tại: {torch.cuda.current_device()}") |
| print(f"Tên GPU: {torch.cuda.get_device_name(0)}") |
|
|
| def load_model(model_name: str, use_fp16: bool = False) -> BGEM3FlagModel: |
| return BGEM3FlagModel(model_name, use_fp16=use_fp16) |
|
|
| def load_json_file(file_path: str) -> dict: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
|
|
| def load_jsonl_file(file_path: str) -> List[Dict]: |
| corpus = [] |
| with open(file_path, "r", encoding="utf-8") as file: |
| for line in file: |
| data = json.loads(line.strip()) |
| corpus.append(data) |
| return corpus |
|
|
| def extract_corpus_from_legal_documents(legal_data: dict) -> List[Dict]: |
| corpus = [] |
| for document in legal_data: |
| for article in document['articles']: |
| chunk = { |
| "law_id": document['law_id'], |
| "article_id": article['article_id'], |
| "title": article['title'], |
| "text": article['title'] + '\n' + article['text'] |
| } |
| corpus.append(chunk) |
| return corpus |
|
|
| def convert_corpus_to_documents(corpus: List[Dict[str, str]]) -> List[Document]: |
| documents = [] |
| for i in tqdm(range(len(corpus)), desc="Converting corpus to documents"): |
| context = corpus[i]['text'] |
| metadata = { |
| 'law_id': corpus[i]['law_id'], |
| 'article_id': corpus[i]['article_id'], |
| 'title': corpus[i]['title'] |
| } |
| documents.append(Document(page_content=context, metadata=metadata)) |
| return documents |
|
|
| class CustomEmbedding(Embeddings): |
| """Custom embedding class that uses the BGEM3FlagModel.""" |
| |
| def __init__(self, model: BGEM3FlagModel, batch_size: int = 1): |
| self.model = model |
| self.batch_size = batch_size |
| |
| def embed_documents(self, texts: List[str]) -> List[List[float]]: |
| embeddings = [] |
| for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding documents"): |
| batch_texts = texts[i:i+self.batch_size] |
| batch_embeddings = self._get_batch_embeddings(batch_texts) |
| embeddings.extend(batch_embeddings) |
| torch.cuda.empty_cache() |
| return np.vstack(embeddings) |
|
|
| def embed_query(self, text: str) -> List[float]: |
| embedding = self.model.encode(text, max_length=256)['dense_vecs'] |
| return embedding |
|
|
| def _get_batch_embeddings(self, texts: List[str]) -> List[List[float]]: |
| with torch.no_grad(): |
| outputs = self.model.encode(texts, batch_size=self.batch_size, max_length=2048)['dense_vecs'] |
| batch_embeddings = outputs |
| del outputs |
| return batch_embeddings |
|
|
|
|
| class VectorDB: |
| """Vector database for document retrieval.""" |
| |
| def __init__( |
| self, |
| documents: List[Document], |
| embedding: Embeddings, |
| vector_db=FAISS, |
| index_path: Optional[str] = None |
| ) -> None: |
| self.vector_db = vector_db |
| self.embedding = embedding |
| self.index_path = index_path |
| self.db = self._build_db(documents) |
|
|
| def _build_db(self, documents: List[Document]): |
| if self.index_path: |
| db = self.vector_db.load_local( |
| self.index_path, |
| self.embedding, |
| allow_dangerous_deserialization=True |
| ) |
| else: |
| db = self.vector_db.from_documents( |
| documents=documents, |
| embedding=self.embedding, |
| distance_strategy=DistanceStrategy.DOT_PRODUCT |
| ) |
| return db |
| |
| def get_retriever(self, search_type: str = "similarity", search_kwargs: dict = {"k": 10}): |
| retriever = self.db.as_retriever(search_type=search_type, search_kwargs=search_kwargs) |
| return retriever |
| |
| def save_local(self, folder_path: str) -> None: |
| self.db.save_local(folder_path) |
|
|
|
|
| def process_sample(sample: dict, retriever) -> List[int]: |
| question = sample['question'] |
| docs = retriever.invoke(question) |
| retrieved_article_full_ids = [ |
| docs[i].metadata['law_id'] + "#" + docs[i].metadata['article_id'] |
| for i in range(len(docs)) |
| ] |
| indexes = [] |
| for article in sample['relevant_articles']: |
| article_full_id = article['law_id'] + "#" + article['article_id'] |
| if article_full_id in retrieved_article_full_ids: |
| idx = retrieved_article_full_ids.index(article_full_id) + 1 |
| indexes.append(idx) |
| else: |
| indexes.append(0) |
| return indexes |
|
|
| def calculate_metrics(all_indexes: List[List[int]], num_samples: int, selected_keys: Set[str]) -> Dict[str, float]: |
| count = [len(indexes) for indexes in all_indexes] |
| result = {} |
| |
| for thres in [1, 3, 5, 10, 100]: |
| found = [[y for y in x if 0 < y <= thres] for x in all_indexes] |
| found_count = [len(x) for x in found] |
| acc = sum(1 for i in range(num_samples) if found_count[i] > 0) / num_samples |
| rec = sum(found_count[i] / count[i] for i in range(num_samples)) / num_samples |
| pre = sum(found_count[i] / thres for i in range(num_samples)) / num_samples |
| mrr = sum(1 / min(x) if x else 0 for x in found) / num_samples |
|
|
| if f"Accuracy@{thres}" in selected_keys: |
| result[f"Accuracy@{thres}"] = acc |
| if f"MRR@{thres}" in selected_keys: |
| result[f"MRR@{thres}"] = mrr |
| |
| return result |
|
|
|
|
| def save_results(result: Dict[str, float], output_path: str) -> None: |
| with open(output_path, "w", encoding="utf-8") as f: |
| json.dump(result, f, indent=4, ensure_ascii=False) |
| print(f"Results saved to {output_path}") |
|
|
|
|
| def main(): |
| setup_gpu_info() |
| model = load_model('AITeamVN/Vietnamese_Embedding', use_fp16=False) |
| samples = load_json_file('zalo_kaggle/train_question_answer.json')['items'] |
| legal_data = load_json_file('zalo_kaggle/legal_corpus.json') |
| |
| corpus = extract_corpus_from_legal_documents(legal_data) |
| documents = convert_corpus_to_documents(corpus) |
| embedding = CustomEmbedding(model, batch_size=1) |
| vectordb = VectorDB( |
| documents=documents, |
| embedding=embedding, |
| vector_db=FAISS, |
| index_path=None |
| ) |
| retriever = vectordb.get_retriever(search_type="similarity", search_kwargs={"k": 100}) |
| all_indexes = [] |
| for sample in tqdm(samples, desc="Processing samples"): |
| all_indexes.append(process_sample(sample, retriever)) |
| selected_keys = {"Accuracy@1", "Accuracy@3", "Accuracy@5", "Accuracy@10", "MRR@10", "Accuracy@100"} |
| result = calculate_metrics(all_indexes, len(samples), selected_keys) |
| print(result) |
| save_results(result, "zalo_kaggle/Vietnamese_Embedding.json") |
| if __name__ == "__main__": |
| main() |