Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from st_copy_to_clipboard import st_copy_to_clipboard | |
| import re | |
| import numpy as np | |
| from doc_preprocessing import process_files, get_embeddings | |
| from vector_DB import VectorDatabase # Import the class | |
| from llm_interaction import get_answer | |
| # Initialize vector database (FAISS) - corrected instantiation | |
| vector_database = VectorDatabase() #Instantiate the VectorDatabase Class | |
| chunks_metadata = [] | |
| def process_query(query): | |
| if vector_database.is_empty(): #Use the method | |
| return "Please upload files first." | |
| # query_embedding = get_embeddings([query])[0] | |
| # results = vector_database.query(query_embedding, k=3) # use the method | |
| print('Query:', query) | |
| query_embedding = get_embeddings([query])[0] # Get the embedding for the query | |
| print('Asking Queries..................') | |
| results = vector_database.query(query_embedding, k=10) # Get the top 2 results | |
| return results | |
| def normalize_line_breaks(text): | |
| text = text.replace("\\n", " \n ") | |
| return text | |
| def display_results(results, chunks): | |
| cpt = 1 | |
| for result in (results): | |
| if result['score'] < 0.5: | |
| st.subheader(f"Réponse {cpt} :") | |
| st.write(f"Source File: {result['file_name']}, Score: {round(1./(1+result['score'])*100,2)}%") # | |
| text_to_display = result['chunk_text'] | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| previous_chunk_index = result['chunk_index'] - 1 | |
| if previous_chunk_index >= 0: | |
| try: | |
| previous_chunk_text = chunks[previous_chunk_index] | |
| if st.button(f"Ajouter la portion de texte précédente", key=f"before_{cpt}"): | |
| text_to_display = previous_chunk_text[:-50] + result['chunk_text'] | |
| except IndexError: | |
| pass #silently ignore | |
| with col2: | |
| next_chunk_index = result['chunk_index'] + 1 | |
| if next_chunk_index < len(chunks): | |
| try: | |
| next_chunk_text = chunks[next_chunk_index] | |
| if st.button(f"Ajouter la portion de texte suivante", key=f"after_{cpt}"): | |
| text_to_display = result['chunk_text'] + next_chunk_text[50:] | |
| except IndexError: | |
| pass | |
| st.write("Citations depuis le document :") | |
| st.write(normalize_line_breaks(text_to_display)) | |
| st_copy_to_clipboard(normalize_line_breaks(text_to_display)) | |
| cpt += 1 | |
| def main(): | |
| st.title("Document Query App") | |
| uploaded_files = st.file_uploader( | |
| "Upload PDF or Word files", accept_multiple_files=True, type=["pdf", "docx"] | |
| ) | |
| query = st.text_input("Enter your query:") | |
| if uploaded_files: | |
| global chunks_metadata | |
| all_chunks, all_embeddings, chunks_metadata = process_files(uploaded_files) | |
| vector_database.add_data(all_embeddings, all_chunks, chunks_metadata) # use the method | |
| st.session_state.files_processed = True | |
| if query: | |
| results = process_query(query) | |
| display_results(results, all_chunks) | |
| if __name__ == "__main__": | |
| main() |