| |
|
|
| from sentence_transformers import SentenceTransformer |
| from llama_index.core import SimpleDirectoryReader |
| from huggingface_hub import login |
| from typing import List, Tuple |
| from dotenv import load_dotenv |
| import numpy as np |
| import os |
| import tempfile |
| from docx import Document |
| import tempfile |
| import os |
| import logging |
|
|
|
|
| |
| |
|
|
| |
| os.environ["HF_HOME"] = "/tmp/huggingface_cache" |
|
|
| |
| cache_dir = os.environ["HF_HOME"] |
| if not os.path.exists(cache_dir): |
| os.makedirs(cache_dir) |
| |
| |
| huggingface_token = os.getenv('HUGGINGFACE_HUB_TOKEN') |
|
|
|
|
| if huggingface_token: |
| login(token=huggingface_token, add_to_git_credential=True, write_permission=True) |
| else: |
| raise ValueError("Hugging Face token is not set. Please set the HUGGINGFACE_HUB_TOKEN environment variable.") |
|
|
| |
| model_name = 'nvidia/NV-Embed-v1' |
|
|
| model_name = 'nomic-ai/nomic-embed-text-v1.5' |
| model = SentenceTransformer('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True) |
| model.max_seq_length = 4096 |
| model.tokenizer.padding_side = "right" |
|
|
|
|
| def read_document(file_content: bytes, file_id: str, file_format: str) -> str: |
| """Extract text content from a document file depending on its format.""" |
| try: |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| file_path = os.path.join(temp_dir, f"document_{file_id}.{file_format}") |
| |
| |
| with open(file_path, 'wb') as temp_file: |
| temp_file.write(file_content) |
|
|
| |
| if file_format.lower() == 'docx': |
| text_content = extract_text_from_docx(file_path) |
| elif file_format.lower() == 'pdf': |
| text_content = extract_text_from_pdf(file_path) |
| elif file_format.lower() in ['txt', 'md', 'csv', 'xlsx','pptx']: |
| reader = SimpleDirectoryReader(input_files=[file_path]) |
| documents = reader.load_data() |
| text_content = documents[0].text if documents else '' |
| else: |
| raise ValueError(f"Unsupported file format: {file_format}") |
|
|
| if text_content: |
| return text_content |
| else: |
| raise ValueError("No content extracted from the document.") |
| |
| except Exception as e: |
| logging.error(f"Error reading document: {e}") |
| raise |
|
|
| def extract_text_from_docx(file_path: str) -> str: |
| """Extract text from a DOCX file.""" |
| try: |
| doc = Document(file_path) |
| full_text = [para.text for para in doc.paragraphs] |
| return '\n'.join(full_text) |
| except Exception as e: |
| logging.error(f"Error extracting text from DOCX file: {e}") |
| raise |
|
|
| def extract_text_from_pdf(file_path: str) -> str: |
| """Extract text from a PDF file.""" |
| import pdfplumber |
| try: |
| with pdfplumber.open(file_path) as pdf: |
| full_text = [page.extract_text() for page in pdf.pages] |
| return '\n'.join(full_text).strip() |
| except Exception as e: |
| logging.error(f"Error extracting text from PDF file: {e}") |
| raise |
|
|
|
|
|
|
| def cumulative_semantic_chunking( text: str, max_chunk_size: int, similarity_threshold: float, embedding_model: SentenceTransformer = model) -> List[str]: |
| """Cumulative semantic chunking using sentence embeddings.""" |
| sentences = text.split('.') |
|
|
| |
| sentence_embeddings = model.encode(sentences) |
|
|
| chunks = [] |
| current_chunk = sentences[0] |
| |
| current_embedding = sentence_embeddings[0] |
|
|
| for sentence, embedding in zip(sentences[1:], sentence_embeddings[1:]): |
| combined_chunk = current_chunk + '. ' + sentence |
| combined_embedding = (current_embedding * len(current_chunk.split()) + embedding * len(sentence.split())) / (len(current_chunk.split()) + len(sentence.split())) |
|
|
| similarity = np.dot(current_embedding, combined_embedding) / (np.linalg.norm(current_embedding) * np.linalg.norm(combined_embedding)) |
|
|
| if similarity >= similarity_threshold and len(combined_chunk) <= max_chunk_size: |
| current_chunk = combined_chunk |
| current_embedding = combined_embedding |
| else: |
| chunks.append(current_chunk.strip()) |
| current_chunk = sentence |
| current_embedding = embedding |
|
|
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
|
|
| |
| return chunks |
|
|
| |
| |
| |
|
|
| def embed_chunks(chunks: List[str]) -> Tuple[List[np.ndarray], int]: |
| """Embed the chunks using the SentenceTransformer model and return embeddings along with the total token count.""" |
| total_tokens = 0 |
| embeddings = [] |
|
|
| for chunk in chunks: |
| tokens = model.tokenizer.encode(chunk, add_special_tokens=False) |
| total_tokens += len(tokens) |
| embedding = model.encode(chunk) |
| embeddings.append(embedding) |
|
|
| return embeddings, total_tokens |