import os import re import gradio as gr from src.core.engine import engine from src.utils.helpers import ( extract_search_query, extract_text_from_file, get_clean_text, web_search, ) def user_input(user_message, history): if not user_message: return None, history history = history or [] history.append({"role": "user", "content": str(user_message)}) return "", history def set_interactive(is_interactive): return ( gr.update( interactive=is_interactive, placeholder="Ожидайте, Агент думает..." if not is_interactive else "Напиши сообщение...", ), gr.update(interactive=is_interactive), ) def bot_response( history, system_prompt, temperature, max_tokens, use_search, uploaded_file ): # --- 1. ДИНАМИЧЕСКИЙ ПРОМПТ И ПРЕДОХРАНИТЕЛЬ --- actual_sys_prompt = system_prompt if use_search: system_guard = ( "\n\n[ВАЖНО]: Отвечай пользователю напрямую обычным текстом. " "КАТЕГОРИЧЕСКИ ЗАПРЕЩАЕТСЯ выводить технические команды, массивы или JSON-вызовы (например [{'type': 'search'}]). " "Если тебе нужно рассуждать перед ответом, оборачивай свои мысли СТРОГО в теги твои мысли." ) else: # Если галочка поиска выключена, удаляем из системного промпта призыв искать actual_sys_prompt = actual_sys_prompt.replace( "Если нужно узнать свежую информацию, используйте поиск.", "" ) system_guard = ( "\n\n[СИСТЕМНОЕ УВЕДОМЛЕНИЕ]: ВНИМАНИЕ! Функция поиска в интернете сейчас ОТКЛЮЧЕНА. " "Ты ДОЛЖЕН ответить, используя только свои внутренние знания. " "НИ В КОЕМ СЛУЧАЕ не пытайся генерировать команды поиска (например [{'type': 'search'}]). " "Просто дай лучший ответ, который можешь, основываясь на своей памяти. " "Если тебе нужно рассуждать, используй теги мысли." ) messages = [{"role": "system", "content": actual_sys_prompt + system_guard}] file_info, file_content = "", "" # --- 2. ОБРАБОТКА ФАЙЛА --- if uploaded_file and os.path.exists(uploaded_file): filename = os.path.basename(uploaded_file) size_kb = os.path.getsize(uploaded_file) / 1024 file_info = f"📎 **Файл прочитан:** `{filename}` ({size_kb:.1f} KB)" file_content = extract_text_from_file(uploaded_file)[:40000] if len(file_content) >= 40000: file_info += " *(загружен первый сегмент)*" if "[Ошибка" in file_content: file_info = f"❌ **Ошибка файла:** `{filename}`" # --- 3. ОЧИСТКА И ФОРМИРОВАНИЕ ИСТОРИИ --- for msg in history[-7:]: content = str(msg["content"]) if msg["role"] == "assistant": # Убираем плашки UI if "---\n\n" in content: content = content.split("---\n\n", 1)[-1] # Тотально вырезаем теги размышлений и HTML спойлеры content = re.sub(r".*?", "", content, flags=re.DOTALL) content = re.sub(r".*?", "", content, flags=re.DOTALL) # Вырезаем технические массивы поиска, если они проскочили в прошлых ответах content = re.sub( r"\[\s*\{.*?['\"]type['\"]\s*:\s*['\"]search['\"].*?\}\s*\]", "", content, flags=re.DOTALL, ) content = content.strip() if not content: content = "*(ответ скрыт)*" messages.append({"role": msg["role"], "content": get_clean_text(content)}) history.append({"role": "assistant", "content": "⏳ Инициализация..."}) yield history # --- 4. АГЕНТ ПОИСКА (РОУТЕР) --- if use_search: search_info = "" history[-1]["content"] = ( file_info + "\n" if file_info else "" ) + "🤔 Агент анализирует необходимость поиска..." yield history # Находим последний вопрос пользователя last_user_msg = "" for msg in reversed(history): if msg["role"] == "user": last_user_msg = msg["content"] break # Промпт Роутера: кладем инструкцию прямо в USER, чтобы модель не могла её проигнорировать. # Используем XML теги, LLM справляются с ними лучше, чем с JSON. agent_messages = [ { "role": "user", "content": ( f"Пользователь спросил: «{last_user_msg}»\n\n" "Определи, нужен ли поиск в интернете для ответа на этот запрос.\n" "Любые вопросы про недавние события, атаки (например, на НПЗ), новости и точные факты ТРЕБУЮТ поиска.\n\n" "ОТВЕТЬ СТРОГО ОДНИМ ИЗ ВАРИАНТОВ (без лишних слов):\n" "Вариант 1 (поиск нужен): твой поисковый запрос\n" "Вариант 2 (поиск не нужен): " ), } ] try: eval_response = engine.generate( messages=agent_messages, max_tokens=400, # <--- Увеличили, чтобы модель успела дописать тег поиска, если задумается temperature=0.0, stream=False, ) raw_response = eval_response["choices"][0]["message"]["content"] clean_query = extract_search_query(raw_response) if clean_query: search_info = f'🌐 Ищем: *"{clean_query}"*...' history[-1]["content"] = ( f"{file_info + '\n' if file_info else ''}{search_info}" ) yield history search_results = web_search(clean_query) if search_results: search_info = f'🌐 Найдено {len(search_results)} результатов по запросу *"{clean_query}"*' search_context = "СВЕЖИЕ РЕЗУЛЬТАТЫ ПОИСКА ИЗ ИНТЕРНЕТА:\n\n" for i, r in enumerate(search_results, 1): search_context += f"{i}. {r['title']} ({r['url']})\nСниппет: {r['snippet']}\n\n" messages[0]["content"] += ( f"\n\n{search_context}\n\n[УВЕДОМЛЕНИЕ]: Поиск выполнен. Сформулируй ответ на основе контекста выше." ) else: search_info = f'🌐 Поиск по запросу *"{clean_query}"* не дал результатов (возможно сработал лимит DuckDuckGo).' else: search_info = "⚡ Агент ответит из своих знаний (поиск не нужен)." except Exception as e: print(f"Ошибка Роутера: {e}") search_info = "⚡ Ошибка роутера. Отвечаю из базы знаний." # Добавляем контент файла в начало (чтобы модель опиралась на него при ответе) if file_content: messages[0]["content"] += ( f"\n\nСодержимое файла '{os.path.basename(uploaded_file)}':\n\n{file_content}" ) status_header = (file_info + "\n" if file_info else "") + ( search_info + "\n" if search_info else "" ) if status_header: status_header += "---\n\n" history[-1]["content"] = status_header + "⏳ Генерация ответа..." yield history # --- 5. СТРИМИНГ И УМНЫЙ UI --- try: stream = engine.generate( messages=messages, max_tokens=max_tokens, temperature=temperature, repeat_penalty=1.15, # Штраф за зацикливание текста stream=True, ) partial_text = "" for chunk in stream: delta = chunk["choices"][0].get("delta", {}) if delta.get("content"): partial_text += delta["content"] display_text = partial_text # Умный UI для тегов if "" in partial_text and "" not in partial_text: display_text = ( partial_text.replace( "", "
🧠 Идет анализ (в процессе)...\n\n", ) + "\n\n
" ) elif "
" in partial_text: display_text = partial_text.replace( "", "
🧠 Анализ завершен (нажмите, чтобы раскрыть)\n\n", ).replace("", "\n\n
\n\n") history[-1]["content"] = status_header + display_text yield history except Exception as e: history[-1]["content"] = status_header + f"\n\n❌ Ошибка генерации: {str(e)}" yield history