import os
import re
import gradio as gr
from src.core.engine import engine
from src.utils.helpers import (
extract_search_query,
extract_text_from_file,
get_clean_text,
web_search,
)
def user_input(user_message, history):
if not user_message:
return None, history
history = history or []
history.append({"role": "user", "content": str(user_message)})
return "", history
def set_interactive(is_interactive):
return (
gr.update(
interactive=is_interactive,
placeholder="Ожидайте, Агент думает..."
if not is_interactive
else "Напиши сообщение...",
),
gr.update(interactive=is_interactive),
)
def bot_response(
history, system_prompt, temperature, max_tokens, use_search, uploaded_file
):
# --- 1. ДИНАМИЧЕСКИЙ ПРОМПТ И ПРЕДОХРАНИТЕЛЬ ---
actual_sys_prompt = system_prompt
if use_search:
system_guard = (
"\n\n[ВАЖНО]: Отвечай пользователю напрямую обычным текстом. "
"КАТЕГОРИЧЕСКИ ЗАПРЕЩАЕТСЯ выводить технические команды, массивы или JSON-вызовы (например [{'type': 'search'}]). "
"Если тебе нужно рассуждать перед ответом, оборачивай свои мысли СТРОГО в теги твои мысли."
)
else:
# Если галочка поиска выключена, удаляем из системного промпта призыв искать
actual_sys_prompt = actual_sys_prompt.replace(
"Если нужно узнать свежую информацию, используйте поиск.", ""
)
system_guard = (
"\n\n[СИСТЕМНОЕ УВЕДОМЛЕНИЕ]: ВНИМАНИЕ! Функция поиска в интернете сейчас ОТКЛЮЧЕНА. "
"Ты ДОЛЖЕН ответить, используя только свои внутренние знания. "
"НИ В КОЕМ СЛУЧАЕ не пытайся генерировать команды поиска (например [{'type': 'search'}]). "
"Просто дай лучший ответ, который можешь, основываясь на своей памяти. "
"Если тебе нужно рассуждать, используй теги мысли."
)
messages = [{"role": "system", "content": actual_sys_prompt + system_guard}]
file_info, file_content = "", ""
# --- 2. ОБРАБОТКА ФАЙЛА ---
if uploaded_file and os.path.exists(uploaded_file):
filename = os.path.basename(uploaded_file)
size_kb = os.path.getsize(uploaded_file) / 1024
file_info = f"📎 **Файл прочитан:** `{filename}` ({size_kb:.1f} KB)"
file_content = extract_text_from_file(uploaded_file)[:40000]
if len(file_content) >= 40000:
file_info += " *(загружен первый сегмент)*"
if "[Ошибка" in file_content:
file_info = f"❌ **Ошибка файла:** `{filename}`"
# --- 3. ОЧИСТКА И ФОРМИРОВАНИЕ ИСТОРИИ ---
for msg in history[-7:]:
content = str(msg["content"])
if msg["role"] == "assistant":
# Убираем плашки UI
if "---\n\n" in content:
content = content.split("---\n\n", 1)[-1]
# Тотально вырезаем теги размышлений и HTML спойлеры
content = re.sub(r".*?", "", content, flags=re.DOTALL)
content = re.sub(r".*?", "", content, flags=re.DOTALL)
# Вырезаем технические массивы поиска, если они проскочили в прошлых ответах
content = re.sub(
r"\[\s*\{.*?['\"]type['\"]\s*:\s*['\"]search['\"].*?\}\s*\]",
"",
content,
flags=re.DOTALL,
)
content = content.strip()
if not content:
content = "*(ответ скрыт)*"
messages.append({"role": msg["role"], "content": get_clean_text(content)})
history.append({"role": "assistant", "content": "⏳ Инициализация..."})
yield history
# --- 4. АГЕНТ ПОИСКА (РОУТЕР) ---
if use_search:
search_info = ""
history[-1]["content"] = (
file_info + "\n" if file_info else ""
) + "🤔 Агент анализирует необходимость поиска..."
yield history
# Находим последний вопрос пользователя
last_user_msg = ""
for msg in reversed(history):
if msg["role"] == "user":
last_user_msg = msg["content"]
break
# Промпт Роутера: кладем инструкцию прямо в USER, чтобы модель не могла её проигнорировать.
# Используем XML теги, LLM справляются с ними лучше, чем с JSON.
agent_messages = [
{
"role": "user",
"content": (
f"Пользователь спросил: «{last_user_msg}»\n\n"
"Определи, нужен ли поиск в интернете для ответа на этот запрос.\n"
"Любые вопросы про недавние события, атаки (например, на НПЗ), новости и точные факты ТРЕБУЮТ поиска.\n\n"
"ОТВЕТЬ СТРОГО ОДНИМ ИЗ ВАРИАНТОВ (без лишних слов):\n"
"Вариант 1 (поиск нужен): твой поисковый запрос\n"
"Вариант 2 (поиск не нужен): "
),
}
]
try:
eval_response = engine.generate(
messages=agent_messages,
max_tokens=400, # <--- Увеличили, чтобы модель успела дописать тег поиска, если задумается
temperature=0.0,
stream=False,
)
raw_response = eval_response["choices"][0]["message"]["content"]
clean_query = extract_search_query(raw_response)
if clean_query:
search_info = f'🌐 Ищем: *"{clean_query}"*...'
history[-1]["content"] = (
f"{file_info + '\n' if file_info else ''}{search_info}"
)
yield history
search_results = web_search(clean_query)
if search_results:
search_info = f'🌐 Найдено {len(search_results)} результатов по запросу *"{clean_query}"*'
search_context = "СВЕЖИЕ РЕЗУЛЬТАТЫ ПОИСКА ИЗ ИНТЕРНЕТА:\n\n"
for i, r in enumerate(search_results, 1):
search_context += f"{i}. {r['title']} ({r['url']})\nСниппет: {r['snippet']}\n\n"
messages[0]["content"] += (
f"\n\n{search_context}\n\n[УВЕДОМЛЕНИЕ]: Поиск выполнен. Сформулируй ответ на основе контекста выше."
)
else:
search_info = f'🌐 Поиск по запросу *"{clean_query}"* не дал результатов (возможно сработал лимит DuckDuckGo).'
else:
search_info = "⚡ Агент ответит из своих знаний (поиск не нужен)."
except Exception as e:
print(f"Ошибка Роутера: {e}")
search_info = "⚡ Ошибка роутера. Отвечаю из базы знаний."
# Добавляем контент файла в начало (чтобы модель опиралась на него при ответе)
if file_content:
messages[0]["content"] += (
f"\n\nСодержимое файла '{os.path.basename(uploaded_file)}':\n\n{file_content}"
)
status_header = (file_info + "\n" if file_info else "") + (
search_info + "\n" if search_info else ""
)
if status_header:
status_header += "---\n\n"
history[-1]["content"] = status_header + "⏳ Генерация ответа..."
yield history
# --- 5. СТРИМИНГ И УМНЫЙ UI ---
try:
stream = engine.generate(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
repeat_penalty=1.15, # Штраф за зацикливание текста
stream=True,
)
partial_text = ""
for chunk in stream:
delta = chunk["choices"][0].get("delta", {})
if delta.get("content"):
partial_text += delta["content"]
display_text = partial_text
# Умный UI для тегов
if "" in partial_text and "" not in partial_text:
display_text = (
partial_text.replace(
"",
"🧠 Идет анализ (в процессе)...
\n\n",
)
+ "\n\n "
)
elif "" in partial_text:
display_text = partial_text.replace(
"",
"🧠 Анализ завершен (нажмите, чтобы раскрыть)
\n\n",
).replace(" ", "\n\n\n\n")
history[-1]["content"] = status_header + display_text
yield history
except Exception as e:
history[-1]["content"] = status_header + f"\n\n❌ Ошибка генерации: {str(e)}"
yield history