Spaces:
Sleeping
Sleeping
| import pypdf | |
| from docx import Document | |
| from transformers.pipelines import pipeline | |
| from sentence_transformers import SentenceTransformer | |
| import streamlit as st | |
| import numpy as np | |
| import os | |
| emb_model = "intfloat/multilingual-e5-base" | |
| def extract_text(file): | |
| text = "" | |
| # Check if the input is a file path (string) or a file-like object | |
| if isinstance(file, str): | |
| file_name = os.path.basename(file) | |
| try: | |
| with open(file, 'rb') as f: # Open in binary mode | |
| if file_name.endswith(".pdf"): | |
| print('Processing pdf file.................\n') | |
| reader = pypdf.PdfReader(f) | |
| for page in reader.pages: | |
| text += page.extract_text() + "\\n" | |
| elif file_name.endswith(".docx"): | |
| document = Document(f) | |
| print('Processing DOCX file.................\n') | |
| for paragraph in document.paragraphs: | |
| if paragraph.text.strip(): # Check if the paragraph is not empty | |
| text += paragraph.text + "\\n" | |
| except FileNotFoundError: | |
| st.error(f"Error: File not found at {file}") | |
| return "" | |
| except Exception as e: | |
| st.error(f"Error reading {file_name}: {e}") | |
| return "" | |
| else: # Assume it's a file-like object (e.g., from Streamlit file_uploader) | |
| file_name = file.name | |
| try: | |
| if file_name.endswith(".pdf"): | |
| reader = pypdf.PdfReader(file) | |
| for page in reader.pages: | |
| text += page.extract_text() + "\\n" | |
| elif file_name.endswith(".docx"): | |
| document = Document(file) | |
| for paragraph in document.paragraphs: | |
| text += paragraph.text + "\\n" | |
| except Exception as e: | |
| st.error(f"Error reading {file_name}: {e}") | |
| return "" | |
| return text | |
| def chunk_text(text, chunk_size=500, overlap=50): | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunk = text[start:end] | |
| chunks.append(chunk) | |
| start = end - overlap | |
| return chunks | |
| def get_embeddings(texts)-> np.ndarray: | |
| try: | |
| model = SentenceTransformer(emb_model, trust_remote_code=True) | |
| embeddings = model.encode(texts) | |
| print(f"Generated {len(embeddings)} embeddings.") | |
| return embeddings | |
| except Exception as e: | |
| st.error(f"Error generating embeddings: {e}") | |
| return np.array([]) | |
| def process_files(files): | |
| all_chunks = [] | |
| all_embeddings = [] | |
| chunks_metadata = [] | |
| for file in files: | |
| print(f"Processing file: {file.name if hasattr(file, 'name') else os.path.basename(file)}") | |
| text = extract_text(file) | |
| if not text: # Skip files that failed to process | |
| print(f"Skipping file {file.name if hasattr(file, 'name') else os.path.basename(file)} due to extraction error.") | |
| continue | |
| print(f"Chunking text...{file.name if hasattr(file, 'name') else os.path.basename(file)}\n") | |
| chunks = chunk_text(text) | |
| embeddings = get_embeddings(chunks) | |
| # if not embeddings: # Skip files that failed to embed | |
| # continue | |
| all_chunks.extend(chunks) | |
| all_embeddings.extend(embeddings) | |
| for i, chunk in enumerate(chunks): | |
| chunks_metadata.append({"file_name": file.name if hasattr(file, 'name') else os.path.basename(file), "chunk_index": i}) | |
| return all_chunks, all_embeddings, chunks_metadata |