from dotenv import load_dotenv load_dotenv(".env") import gradio as gr import os from llama_index.core import Settings, StorageContext, load_index_from_storage from documents_prep import load_json_documents, load_table_documents, load_image_documents from logger.my_logging import log_message, init_chunks_log, log_full_chunk_to_file from index_retriever import create_vector_index, create_query_engine import sys from config import ( HF_REPO_ID, HF_TOKEN, DOWNLOAD_DIR, CHUNKS_FILENAME, JSON_FILES_DIR, TABLE_DATA_DIR, IMAGE_DATA_DIR, DEFAULT_MODEL, AVAILABLE_MODELS, DEFAULT_RETRIEVAL_PARAMS ) from converters.converter import process_uploaded_file, convert_single_excel_to_json, convert_single_excel_to_csv from main_utils import * import shutil from config import INDEX_STORAGE_DIR retrieval_params = DEFAULT_RETRIEVAL_PARAMS.copy() def restart_system(): """Перезапуск системы для применения новых документов""" global query_engine, chunks_df, reranker, vector_index, current_model try: log_message("Начало перезапуска системы...") query_engine, chunks_df, reranker, vector_index, chunk_info = initialize_system( repo_id=HF_REPO_ID, hf_token=HF_TOKEN, download_dir=DOWNLOAD_DIR, json_files_dir=JSON_FILES_DIR, table_data_dir=TABLE_DATA_DIR, image_data_dir=IMAGE_DATA_DIR, use_json_instead_csv=True, force_rebuild=True ) if query_engine: log_message("Система успешно перезапущена") return "✅ Система успешно перезапущена! Новые документы загружены." else: return "❌ Ошибка при перезапуске системы" except Exception as e: error_msg = f"Ошибка перезапуска: {str(e)}" log_message(error_msg) return f"❌ {error_msg}" def initialize_system(repo_id, hf_token, download_dir, chunks_filename=None, json_files_dir=None, table_data_dir=None, image_data_dir=None, use_json_instead_csv=False, force_rebuild=False): try: log_message("Инициализация системы") from config import CHUNK_SIZE, CHUNK_OVERLAP from llama_index.core.text_splitter import TokenTextSplitter embed_model = get_embedding_model() llm = get_llm_model(DEFAULT_MODEL) reranker = get_reranker_model() Settings.embed_model = embed_model Settings.llm = llm Settings.text_splitter = TokenTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator=" ", backup_separators=["\n", ".", "!", "?"] ) vector_index = None all_documents = [] chunk_info = [] # --- ЛОГИКА ЗАГРУЗКИ / СОЗДАНИЯ ИНДЕКСА --- # Проверяем, существует ли индекс на диске index_exists = os.path.exists(INDEX_STORAGE_DIR) and os.listdir(INDEX_STORAGE_DIR) if index_exists and not force_rebuild: log_message(f"📂 Найден сохраненный индекс в {INDEX_STORAGE_DIR}. Загружаем...") try: # ЗАГРУЗКА С ДИСКА storage_context = StorageContext.from_defaults(persist_dir=INDEX_STORAGE_DIR) vector_index = load_index_from_storage(storage_context) log_message("✅ Индекс успешно загружен с диска (без пересборки).") # Восстанавливаем chunk_info из загруженного индекса (для UI) # Берем все узлы из docstore индекса docstore_nodes = vector_index.docstore.docs.values() all_documents = list(docstore_nodes) # Это будут Nodes, а не исходные Documents, но для UI пойдет except Exception as e: log_message(f"⚠️ Ошибка загрузки индекса: {e}. Будем строить заново.") force_rebuild = True # Если не загрузился, строим заново # Если индекса нет или попросили пересобрать if not index_exists or force_rebuild: log_message("🏗️ Построение индекса с нуля...") if os.path.exists(download_dir): shutil.rmtree(download_dir) os.makedirs(download_dir, exist_ok=True) if use_json_instead_csv and json_files_dir: log_message("Используем JSON файлы вместо CSV") from documents_prep import load_all_documents all_documents = load_all_documents( repo_id=repo_id, hf_token=hf_token, json_dir=json_files_dir, table_dir=table_data_dir if table_data_dir else "", image_dir=image_data_dir if image_data_dir else "" ) else: if chunks_filename: log_message("Загружаем данные из CSV") if table_data_dir: from documents_prep import load_table_documents table_chunks = load_table_documents(repo_id, hf_token, table_data_dir) log_message(f"Загружено {len(table_chunks)} табличных чанков") all_documents.extend(table_chunks) if image_data_dir: from documents_prep import load_image_documents image_documents = load_image_documents(repo_id, hf_token, image_data_dir) log_message(f"Загружено {len(image_documents)} документов изображений") all_documents.extend(image_documents) # --- 2. ОЧИСТКА МЕТАДАННЫХ (УДАЛЕНИЕ KEYWORDS) --- log_message("🧹 Очистка метаданных: удаление keywords и лишних полей...") for doc in all_documents: # 1. Удаляем keywords, если они есть if 'keywords' in doc.metadata: del doc.metadata['keywords'] # 2. ЖЕСТКО скрываем все служебные поля от эмбеддинга # Оставляем видимым для вектора только document_id (по умолчанию) doc.excluded_embed_metadata_keys = [ "table_identifier", "connection_type", "chunk_id", "section_id", "type", "image_number", "table_number", "row_start", "row_end", "is_complete_table", "file_path", "file_name", "section_path", "parent_section", "level", "table_title", "section", "keywords" # на случай если где-то остался ] # 3. Настраиваем метаданные для LLM (чтобы ответ был чище) doc.excluded_llm_metadata_keys = [ "section_path", "chunk_id", "connection_type", "table_identifier", "file_path", "is_complete_table" ] log_message(f"Метаданные очищены. Keywords удалены. Всего документов: {len(all_documents)}") # ----------------------------------------------------- # --- 📊 ОТЧЕТ О СОДЕРЖИМОМ БАЗЫ --- log_message("\n=== 📚 РЕЕСТР ДОКУМЕНТОВ В БАЗЕ ДАННЫХ ===") doc_stats = {} for doc in all_documents: doc_id = doc.metadata.get('document_id', 'UNKNOWN_ID') d_type = doc.metadata.get('type', 'text') # Нормализация типов для красивого отчета if 'table' in d_type: d_type = 'table' elif 'image' in d_type: d_type = 'image' else: d_type = 'text' if doc_id not in doc_stats: doc_stats[doc_id] = {'text': 0, 'table': 0, 'image': 0} doc_stats[doc_id][d_type] += 1 # Вывод таблицы в лог log_message(f"{'ДОКУМЕНТ (ID)':<40} | {'ТЕКСТ':<8} | {'ТАБЛИЦЫ':<8} | {'ИЗОБР.':<8}") log_message("-" * 75) sorted_ids = sorted(doc_stats.keys()) for doc_id in sorted_ids: s = doc_stats[doc_id] log_message(f"{doc_id:<40} | {s['text']:<8} | {s['table']:<8} | {s['image']:<8}") log_message(f"ИТОГО УНИКАЛЬНЫХ ДОКУМЕНТОВ: {len(sorted_ids)}") log_message("==========================================\n") # ---------------------------------- # --- 📝 ЗАПИСЬ ВСЕХ ЧАНКОВ В ФАЙЛ --- log_message("⏳ Начало записи всех чанков в all_chunks_debug.log...") init_chunks_log() for i, doc in enumerate(all_documents): log_full_chunk_to_file(doc, i, len(all_documents)) log_message("✅ Все чанки успешно записаны в лог-файл.") # ------------------------------------- vector_index = create_vector_index(all_documents) log_message(f"💾 Сохранение индекса на диск: {INDEX_STORAGE_DIR}...") vector_index.storage_context.persist(persist_dir=INDEX_STORAGE_DIR) log_message("✅ Индекс сохранен.") global retrieval_params log_message(f"Создание Query Engine с параметрами: {retrieval_params}") query_engine = create_query_engine( vector_index, vector_top_k=retrieval_params['vector_top_k'], bm25_top_k=retrieval_params['bm25_top_k'], similarity_cutoff=retrieval_params['similarity_cutoff'], hybrid_top_k=retrieval_params['hybrid_top_k'] ) chunk_info = [] for doc in all_documents: metadata = doc.metadata text_val = doc.text if hasattr(doc, 'text') else doc.get_content() chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'type': doc.metadata.get('type', 'text'), 'chunk_text': doc.text[:200] + '...' if len(doc.text) > 200 else doc.text, 'table_number': doc.metadata.get('table_number', ''), 'image_number': doc.metadata.get('image_number', ''), 'section': doc.metadata.get('section', ''), 'connection_type': doc.metadata.get('connection_type', '') }) log_message(f"Система успешно инициализирована") return query_engine, chunks_df, reranker, vector_index, chunk_info except Exception as e: log_message(f"Ошибка инициализации: {str(e)}") import traceback log_message(traceback.format_exc()) return None, None, None, None, [] def switch_model(model_name, vector_index): from llama_index.core import Settings from index_retriever import create_query_engine try: log_message(f"Переключение на модель: {model_name}") new_llm = get_llm_model(model_name) Settings.llm = new_llm if vector_index is not None: new_query_engine = create_query_engine(vector_index) log_message(f"Модель успешно переключена на: {model_name}") return new_query_engine, f"✅ Модель переключена на: {model_name}" else: return None, "❌ Ошибка: система не инициализирована" except Exception as e: error_msg = f"Ошибка переключения модели: {str(e)}" log_message(error_msg) return None, f"❌ {error_msg}" def create_query_engine(vector_index, vector_top_k=retrieval_params['vector_top_k'], bm25_top_k=retrieval_params['bm25_top_k'], similarity_cutoff=retrieval_params['similarity_cutoff'], hybrid_top_k=retrieval_params['hybrid_top_k'], ): try: from index_retriever import create_query_engine as create_index_query_engine # Передаем параметры дальше в реализацию из index_retriever query_engine = create_index_query_engine( vector_index=vector_index, vector_top_k=vector_top_k, bm25_top_k=bm25_top_k, similarity_cutoff=similarity_cutoff, hybrid_top_k=hybrid_top_k ) return query_engine except Exception as e: log_message(f"Ошибка создания query engine: {str(e)}") raise def main_answer_question(question): global query_engine, reranker, current_model, chunks_df, retrieval_params if not question.strip(): return ("
Пожалуйста, введите вопрос
", "
Источники появятся после обработки запроса
", "
Чанки появятся после обработки запроса
") try: answer_html, sources_html, chunks_html = answer_question( question, query_engine, reranker, current_model, chunks_df, rerank_top_k=retrieval_params['rerank_top_k'], similarity_cutoff=retrieval_params['similarity_cutoff'], rerank_threshold=retrieval_params['rerank_threshold'] ) return answer_html, sources_html, chunks_html except Exception as e: log_message(f"Ошибка при ответе на вопрос: {str(e)}") return (f"
Ошибка: {str(e)}
", "
Источники недоступны из-за ошибки
", "
Чанки недоступны из-за ошибки
") def update_retrieval_params(vector_top_k, bm25_top_k, similarity_cutoff, hybrid_top_k, rerank_top_k, rerank_threshold): global query_engine, vector_index, retrieval_params try: retrieval_params['vector_top_k'] = vector_top_k retrieval_params['bm25_top_k'] = bm25_top_k retrieval_params['similarity_cutoff'] = similarity_cutoff retrieval_params['hybrid_top_k'] = hybrid_top_k retrieval_params['rerank_top_k'] = rerank_top_k retrieval_params['rerank_threshold'] = rerank_threshold # Recreate query engine with new parameters if vector_index is not None: query_engine = create_query_engine( vector_index=vector_index, vector_top_k=vector_top_k, bm25_top_k=bm25_top_k, similarity_cutoff=similarity_cutoff, hybrid_top_k=hybrid_top_k ) log_message(f"Параметры поиска обновлены: vector_top_k={vector_top_k}, " f"bm25_top_k={bm25_top_k}, cutoff={similarity_cutoff}, " f"hybrid_top_k={hybrid_top_k}, rerank_top_k={rerank_top_k}") return f"✅ Параметры обновлены" else: return "❌ Система не инициализирована" except Exception as e: error_msg = f"Ошибка обновления параметров: {str(e)}" log_message(error_msg) return f"❌ {error_msg}" def retrieve_chunks(question: str, top_k: int = 20) -> list: from index_retriever import rerank_nodes global query_engine, reranker if query_engine is None: return [] try: retrieved_nodes = query_engine.retriever.retrieve(question) log_message(f"Получено {len(retrieved_nodes)} узлов") rerank_threshold = retrieval_params.get('rerank_threshold', 0.5) reranked_nodes = rerank_nodes( question, retrieved_nodes, reranker, top_k=top_k, rerank_threshold=rerank_threshold ) chunks_data = [] for i, node in enumerate(reranked_nodes): metadata = node.metadata if hasattr(node, 'metadata') else {} chunk = { 'rank': i + 1, 'document_id': metadata.get('document_id', 'unknown'), 'section_id': metadata.get('section_id', ''), 'section_path': metadata.get('section_path', ''), 'section_text': metadata.get('section_text', ''), 'type': metadata.get('type', 'text'), 'table_number': metadata.get('table_number', ''), 'image_number': metadata.get('image_number', ''), 'text': node.text } chunks_data.append(chunk) log_message(f"Возвращено {len(chunks_data)} чанков") return chunks_data except Exception as e: log_message(f"Ошибка получения чанков: {str(e)}") return [] def create_demo_interface(answer_question_func, switch_model_func, current_model, chunk_info=None): with gr.Blocks(title="AIEXP - AI Expert для нормативной документации", theme=gr.themes.Soft()) as demo: gr.api(retrieve_chunks, api_name="retrieve_chunks") gr.Markdown(""" # AIEXP - Artificial Intelligence Expert ## Инструмент для работы с нормативной документацией """) with gr.Tab("Поиск по нормативным документам"): gr.Markdown("### Задайте вопрос по нормативной документации") with gr.Row(): with gr.Column(scale=2): model_dropdown = gr.Dropdown( choices=list(AVAILABLE_MODELS.keys()), value=current_model, label="Выберите языковую модель", info="Выберите модель для генерации ответов" ) with gr.Column(scale=1): switch_btn = gr.Button("Переключить модель", variant="secondary") model_status = gr.Textbox( value=f"Текущая модель: {current_model}", label="Статус модели", interactive=False ) with gr.Row(): with gr.Column(scale=3): question_input = gr.Textbox( label="Ваш вопрос к базе знаний", placeholder="Введите вопрос по нормативным документам...", lines=3 ) ask_btn = gr.Button("Найти ответ", variant="primary", size="lg") gr.Examples( examples=[ "О чем этот рисунок: ГОСТ Р 50.04.07-2022 Приложение Л. Л.1.5 Рисунок Л.2", "Л.9 Формула в ГОСТ Р 50.04.07 - 2022 что и о чем там?", "Какой стандарт устанавливает порядок признания протоколов испытаний продукции в области использования атомной энергии?", "Кто несет ответственность за организацию и проведение признания протоколов испытаний продукции?", "В каких случаях могут быть признаны протоколы испытаний, проведенные лабораториями?", "В какой таблице можно найти информацию о методы исследований при аттестационных испытаниях технологии термической обработки заготовок из легированных сталей? Какой документ и какой раздел?" ], inputs=question_input ) with gr.Row(): with gr.Column(scale=2): answer_output = gr.HTML( label="", value=f"
Здесь появится ответ на ваш вопрос...
Текущая модель: {current_model}
", ) with gr.Column(scale=1): sources_output = gr.HTML( label="", value="
Здесь появятся релевантные чанки...
", ) with gr.Column(scale=1): chunks_output = gr.HTML( label="Релевантные чанки", value="
Здесь появятся релевантные чанки...
", ) with gr.Tab("⚙️ Параметры поиска"): gr.Markdown("### Настройка параметров векторного поиска и переранжирования") with gr.Row(): with gr.Column(): vector_top_k = gr.Slider( minimum=10, maximum=200, step=10, value=DEFAULT_RETRIEVAL_PARAMS['vector_top_k'], label="Vector Top K", info="Количество результатов из векторного поиска" ) with gr.Column(): bm25_top_k = gr.Slider( minimum=10, maximum=200, step=10, value=DEFAULT_RETRIEVAL_PARAMS['bm25_top_k'], label="BM25 Top K", info="Количество результатов из BM25 поиска" ) with gr.Row(): with gr.Column(): similarity_cutoff = gr.Slider( minimum=0.0, maximum=1.0, step=0.05, value=DEFAULT_RETRIEVAL_PARAMS['similarity_cutoff'], label="Similarity Cutoff", info="Минимальный порог схожести для векторного поиска" ) with gr.Column(): hybrid_top_k = gr.Slider( minimum=10, maximum=300, step=10, value=DEFAULT_RETRIEVAL_PARAMS['hybrid_top_k'], label="Hybrid Top K", info="Количество результатов из гибридного поиска" ) with gr.Row(): with gr.Column(): rerank_top_k = gr.Slider( minimum=5, maximum=100, step=5, value=DEFAULT_RETRIEVAL_PARAMS['rerank_top_k'], label="Rerank Top K", info="Количество результатов после переранжирования" ) with gr.Column(): rerank_threshold = gr.Slider( minimum=0.0, maximum=1.0, step=0.05, value=DEFAULT_RETRIEVAL_PARAMS['rerank_threshold'], label="Rerank Threshold (Stage 3)", info="Минимальная уверенность реранкера (0.0 - 1.0)" ) with gr.Row(): with gr.Column(): update_btn = gr.Button("Применить параметры", variant="primary") update_status = gr.Textbox( value="Параметры готовы к применению", label="Статус", interactive=False ) gr.Markdown(""" ### Рекомендации: - **Vector Top K**: Увеличьте для более полного поиска по семантике (50-100) - **BM25 Top K**: Увеличьте для лучшего поиска по ключевым словам (30-80) - **Similarity Cutoff**: Снизьте для более мягких критериев (0.3-0.6), повысьте для строгих (0.7-0.9) - **Hybrid Top K**: Объединённые результаты (100-150) - **Rerank Top K**: Финальные результаты (10-30) - **Rerank Threshold**: Снизьте для более широкого выбора (0.1-0.4), повысьте для точных ответов (0.5-0.8) """) update_btn.click( fn=update_retrieval_params, inputs=[vector_top_k, bm25_top_k, similarity_cutoff, hybrid_top_k, rerank_top_k, rerank_threshold], outputs=[update_status] ) gr.Markdown("### Текущие параметры:") current_params_display = gr.Textbox( value="", label="", interactive=False, lines=6 ) def display_current_params(): return f"""Vector Top K: {retrieval_params['vector_top_k']}\n BM25 Top K: {retrieval_params['bm25_top_k']}\n Similarity Cutoff: {retrieval_params['similarity_cutoff']}\n Hybrid Top K: {retrieval_params['hybrid_top_k']}\n Rerank Top K: {retrieval_params['rerank_top_k']}\n Rerank Threshold: {retrieval_params['rerank_threshold']} """ demo.load( fn=display_current_params, outputs=[current_params_display] ) update_btn.click( fn=display_current_params, outputs=[current_params_display] ) with gr.Tab("📤 Загрузка документов"): gr.Markdown(""" ### Загрузка новых документов в систему Выберите тип документа и загрузите файл. Система автоматически обработает и добавит его в базу знаний. """) with gr.Row(): with gr.Column(scale=2): file_type_radio = gr.Radio( choices=["Таблица", "Изображение (метаданные)", "JSON документ"], value="Таблица", label="Тип документа", info="Выберите тип загружаемого документа" ) file_upload = gr.File( label="Выберите файл", file_types=[".xlsx", ".xls", ".csv", ".json"], type="filepath" ) with gr.Row(): upload_btn = gr.Button("📤 Загрузить и обработать", variant="primary", size="lg") restart_btn = gr.Button("🔄 Перезапустить систему", variant="secondary", size="lg") upload_status = gr.Textbox( label="Статус загрузки", value="Ожидание загрузки файла...", interactive=False, lines=8 ) restart_status = gr.Textbox( label="Статус перезапуска", value="Система готова к работе", interactive=False, lines=2 ) with gr.Column(scale=1): gr.Markdown(""" ### Требования к файлам: **Таблицы (Excel → JSON):** - Формат: .xlsx или .xls - Обязательные колонки: - Номер таблицы - Обозначение документа - Раздел документа - Название таблицы **Изображения (Excel → CSV):** - Формат: .xlsx, .xls или .csv - Метаданные изображений **JSON документы:** - Формат: .json - Структурированные данные ### Процесс загрузки: 1. Выберите тип документа 2. Загрузите файл 3. Дождитесь обработки 4. Нажмите "Перезапустить систему" """) upload_btn.click( fn=process_uploaded_file, inputs=[file_upload, file_type_radio], outputs=[upload_status] ) restart_btn.click( fn=restart_system, inputs=[], outputs=[restart_status] ) switch_btn.click( fn=switch_model_func, inputs=[model_dropdown], outputs=[model_status] ) ask_btn.click( fn=answer_question_func, inputs=[question_input], outputs=[answer_output, sources_output, chunks_output] ) question_input.submit( fn=answer_question_func, inputs=[question_input], outputs=[answer_output, sources_output, chunks_output] ) return demo query_engine = None chunks_df = None reranker = None vector_index = None current_model = DEFAULT_MODEL def main_switch_model(model_name): global query_engine, vector_index, current_model new_query_engine, status_message = switch_model(model_name, vector_index) if new_query_engine: query_engine = new_query_engine current_model = model_name return status_message def main(): global query_engine, chunks_df, reranker, vector_index, current_model GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") if GOOGLE_API_KEY: log_message("Использование Google API для модели генерации текста") else: log_message("Google API ключ не найден, использование локальной модели") log_message("Запуск AIEXP - AI Expert для нормативной документации") query_engine, chunks_df, reranker, vector_index, chunk_info = initialize_system( repo_id=HF_REPO_ID, hf_token=HF_TOKEN, download_dir=DOWNLOAD_DIR, json_files_dir=JSON_FILES_DIR, table_data_dir=TABLE_DATA_DIR, image_data_dir=IMAGE_DATA_DIR, use_json_instead_csv=True, ) if query_engine: log_message("Запуск веб-интерфейса") demo = create_demo_interface( answer_question_func=main_answer_question, switch_model_func=main_switch_model, current_model=current_model, chunk_info=chunk_info ) demo.api = "retrieve_chunks" demo.queue() demo.launch( server_name="0.0.0.0", server_port=7860, share=True, debug=False ) else: log_message("Невозможно запустить приложение из-за ошибки инициализации") sys.exit(1) if __name__ == "__main__": main()