Code_LLM / src /api /routes.py
AnatoliiG
fix api bugs
6eebe14
Raw
History Blame Contribute Delete
4.48 kB
import asyncio
import json
import logging
import threading
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from src.core.config import settings
from src.core.engine import engine
from src.utils.helpers import get_clean_text
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/chat/completions")
async def chat_completions(request: Request):
if not engine.llm:
raise HTTPException(status_code=500, detail="Model not loaded")
try:
data = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
# Бережно собираем сообщения, сохраняя служебные поля для Tool Calling
messages = []
for m in data.get("messages", []):
msg = {
"role": m.get("role", "user"),
"content": get_clean_text(m.get("content")),
}
if "tool_calls" in m:
msg["tool_calls"] = m["tool_calls"]
if "tool_call_id" in m:
msg["tool_call_id"] = m["tool_call_id"]
messages.append(msg)
# Параметры от агента
is_stream = data.get("stream", False)
stop = data.get("stop", [])
if isinstance(stop, str):
stop = [stop]
default_stops = ["<|im_end|>", "<|endoftext|>", "<|file_sep|>"]
for s in default_stops:
if s not in stop:
stop.append(s)
gen_kwargs = {
"max_tokens": data.get("max_tokens", settings.DEFAULT_MAX_TOKENS),
"temperature": data.get("temperature", settings.DEFAULT_TEMP),
"top_p": data.get("top_p", 0.95),
"stop": stop,
"stream": is_stream,
"tools": data.get("tools", None),
"tool_choice": data.get("tool_choice", None),
}
# Если Агент просит ответ целиком (stream=False)
if not is_stream:
loop = asyncio.get_running_loop()
try:
# Выполняем синхронный код в пуле потоков, чтобы не заблокировать FastAPI
response = await loop.run_in_executor(
None, lambda: engine.generate(messages, **gen_kwargs)
)
return JSONResponse(content=response)
except Exception as e:
logger.error(f"Sync generation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Если это Чат (stream=True)
abort_event = threading.Event()
async def stream_generator():
queue = asyncio.Queue()
loop = asyncio.get_running_loop()
def worker():
try:
gen_kwargs["abort_event"] = abort_event
for chunk in engine.generate(messages, **gen_kwargs):
loop.call_soon_threadsafe(queue.put_nowait, chunk)
loop.call_soon_threadsafe(queue.put_nowait, None)
except Exception as e:
if not abort_event.is_set():
logger.error(f"Generation error: {e}")
loop.call_soon_threadsafe(queue.put_nowait, {"error": str(e)})
loop.run_in_executor(None, worker)
try:
while True:
if await request.is_disconnected():
abort_event.set()
break
try:
chunk = await asyncio.wait_for(queue.get(), timeout=0.1)
except asyncio.TimeoutError:
continue
if chunk is None:
yield "data: [DONE]\n\n"
break
if isinstance(chunk, dict) and "error" in chunk:
if abort_event.is_set():
break
err_json = json.dumps(
{"error": {"message": chunk["error"], "type": "internal_error"}}
)
yield f"data: {err_json}\n\n"
break
yield f"data: {json.dumps(chunk)}\n\n"
except asyncio.CancelledError:
abort_event.set()
raise
return StreamingResponse(
stream_generator(),
media_type="text/event-stream",
headers={
"X-Accel-Buffering": "no",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)