| import os |
| import re |
|
|
| import gradio as gr |
|
|
| from src.core.engine import engine |
| from src.utils.helpers import ( |
| extract_search_query, |
| extract_text_from_file, |
| get_clean_text, |
| web_search, |
| ) |
|
|
|
|
| def user_input(user_message, history): |
| if not user_message: |
| return None, history |
| history = history or [] |
| history.append({"role": "user", "content": str(user_message)}) |
| return "", history |
|
|
|
|
| def set_interactive(is_interactive): |
| return ( |
| gr.update( |
| interactive=is_interactive, |
| placeholder="Ожидайте, Агент думает..." |
| if not is_interactive |
| else "Напиши сообщение...", |
| ), |
| gr.update(interactive=is_interactive), |
| ) |
|
|
|
|
| def bot_response( |
| history, system_prompt, temperature, max_tokens, use_search, uploaded_file |
| ): |
| |
| actual_sys_prompt = system_prompt |
|
|
| if use_search: |
| system_guard = ( |
| "\n\n[ВАЖНО]: Отвечай пользователю напрямую обычным текстом. " |
| "КАТЕГОРИЧЕСКИ ЗАПРЕЩАЕТСЯ выводить технические команды, массивы или JSON-вызовы (например [{'type': 'search'}]). " |
| "Если тебе нужно рассуждать перед ответом, оборачивай свои мысли СТРОГО в теги <think>твои мысли</think>." |
| ) |
| else: |
| |
| actual_sys_prompt = actual_sys_prompt.replace( |
| "Если нужно узнать свежую информацию, используйте поиск.", "" |
| ) |
|
|
| system_guard = ( |
| "\n\n[СИСТЕМНОЕ УВЕДОМЛЕНИЕ]: ВНИМАНИЕ! Функция поиска в интернете сейчас ОТКЛЮЧЕНА. " |
| "Ты ДОЛЖЕН ответить, используя только свои внутренние знания. " |
| "НИ В КОЕМ СЛУЧАЕ не пытайся генерировать команды поиска (например [{'type': 'search'}]). " |
| "Просто дай лучший ответ, который можешь, основываясь на своей памяти. " |
| "Если тебе нужно рассуждать, используй теги <think>мысли</think>." |
| ) |
|
|
| messages = [{"role": "system", "content": actual_sys_prompt + system_guard}] |
| file_info, file_content = "", "" |
|
|
| |
| if uploaded_file and os.path.exists(uploaded_file): |
| filename = os.path.basename(uploaded_file) |
| size_kb = os.path.getsize(uploaded_file) / 1024 |
| file_info = f"📎 **Файл прочитан:** `{filename}` ({size_kb:.1f} KB)" |
|
|
| file_content = extract_text_from_file(uploaded_file)[:40000] |
| if len(file_content) >= 40000: |
| file_info += " *(загружен первый сегмент)*" |
| if "[Ошибка" in file_content: |
| file_info = f"❌ **Ошибка файла:** `{filename}`" |
|
|
| |
| for msg in history[-7:]: |
| content = str(msg["content"]) |
| if msg["role"] == "assistant": |
| |
| if "---\n\n" in content: |
| content = content.split("---\n\n", 1)[-1] |
|
|
| |
| content = re.sub(r"<details.*?>.*?</details>", "", content, flags=re.DOTALL) |
| content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL) |
|
|
| |
| content = re.sub( |
| r"\[\s*\{.*?['\"]type['\"]\s*:\s*['\"]search['\"].*?\}\s*\]", |
| "", |
| content, |
| flags=re.DOTALL, |
| ) |
|
|
| content = content.strip() |
| if not content: |
| content = "*(ответ скрыт)*" |
|
|
| messages.append({"role": msg["role"], "content": get_clean_text(content)}) |
|
|
| history.append({"role": "assistant", "content": "⏳ Инициализация..."}) |
| yield history |
|
|
| |
| if use_search: |
| search_info = "" |
| history[-1]["content"] = ( |
| file_info + "\n" if file_info else "" |
| ) + "🤔 Агент анализирует необходимость поиска..." |
| yield history |
|
|
| |
| last_user_msg = "" |
| for msg in reversed(history): |
| if msg["role"] == "user": |
| last_user_msg = msg["content"] |
| break |
|
|
| |
| |
| agent_messages = [ |
| { |
| "role": "user", |
| "content": ( |
| f"Пользователь спросил: «{last_user_msg}»\n\n" |
| "Определи, нужен ли поиск в интернете для ответа на этот запрос.\n" |
| "Любые вопросы про недавние события, атаки (например, на НПЗ), новости и точные факты ТРЕБУЮТ поиска.\n\n" |
| "ОТВЕТЬ СТРОГО ОДНИМ ИЗ ВАРИАНТОВ (без лишних слов):\n" |
| "Вариант 1 (поиск нужен): <search>твой поисковый запрос</search>\n" |
| "Вариант 2 (поиск не нужен): <no_search>" |
| ), |
| } |
| ] |
|
|
| try: |
| eval_response = engine.generate( |
| messages=agent_messages, |
| max_tokens=400, |
| temperature=0.0, |
| stream=False, |
| ) |
|
|
| raw_response = eval_response["choices"][0]["message"]["content"] |
| clean_query = extract_search_query(raw_response) |
|
|
| if clean_query: |
| search_info = f'🌐 Ищем: *"{clean_query}"*...' |
| history[-1]["content"] = ( |
| f"{file_info + '\n' if file_info else ''}{search_info}" |
| ) |
| yield history |
|
|
| search_results = web_search(clean_query) |
|
|
| if search_results: |
| search_info = f'🌐 Найдено {len(search_results)} результатов по запросу *"{clean_query}"*' |
| search_context = "СВЕЖИЕ РЕЗУЛЬТАТЫ ПОИСКА ИЗ ИНТЕРНЕТА:\n\n" |
| for i, r in enumerate(search_results, 1): |
| search_context += f"{i}. {r['title']} ({r['url']})\nСниппет: {r['snippet']}\n\n" |
|
|
| messages[0]["content"] += ( |
| f"\n\n{search_context}\n\n[УВЕДОМЛЕНИЕ]: Поиск выполнен. Сформулируй ответ на основе контекста выше." |
| ) |
| else: |
| search_info = f'🌐 Поиск по запросу *"{clean_query}"* не дал результатов (возможно сработал лимит DuckDuckGo).' |
| else: |
| search_info = "⚡ Агент ответит из своих знаний (поиск не нужен)." |
|
|
| except Exception as e: |
| print(f"Ошибка Роутера: {e}") |
| search_info = "⚡ Ошибка роутера. Отвечаю из базы знаний." |
|
|
| |
| if file_content: |
| messages[0]["content"] += ( |
| f"\n\nСодержимое файла '{os.path.basename(uploaded_file)}':\n\n{file_content}" |
| ) |
|
|
| status_header = (file_info + "\n" if file_info else "") + ( |
| search_info + "\n" if search_info else "" |
| ) |
| if status_header: |
| status_header += "---\n\n" |
|
|
| history[-1]["content"] = status_header + "⏳ Генерация ответа..." |
| yield history |
|
|
| |
| try: |
| stream = engine.generate( |
| messages=messages, |
| max_tokens=max_tokens, |
| temperature=temperature, |
| repeat_penalty=1.15, |
| stream=True, |
| ) |
| partial_text = "" |
| for chunk in stream: |
| delta = chunk["choices"][0].get("delta", {}) |
| if delta.get("content"): |
| partial_text += delta["content"] |
| display_text = partial_text |
|
|
| |
| if "<think>" in partial_text and "</think>" not in partial_text: |
| display_text = ( |
| partial_text.replace( |
| "<think>", |
| "<details open><summary><i>🧠 Идет анализ (в процессе)...</i></summary>\n\n", |
| ) |
| + "\n\n</details>" |
| ) |
| elif "</think>" in partial_text: |
| display_text = partial_text.replace( |
| "<think>", |
| "<details><summary><i>🧠 Анализ завершен (нажмите, чтобы раскрыть)</i></summary>\n\n", |
| ).replace("</think>", "\n\n</details>\n\n") |
|
|
| history[-1]["content"] = status_header + display_text |
| yield history |
|
|
| except Exception as e: |
| history[-1]["content"] = status_header + f"\n\n❌ Ошибка генерации: {str(e)}" |
| yield history |
|
|