Spaces:
Running
Running
File size: 11,841 Bytes
2b50ae3 ec47b7f 2b50ae3 f5f43a0 2b50ae3 f5f43a0 2b50ae3 5955ef8 2b50ae3 f5f43a0 2b50ae3 f5f43a0 2ba4c4a f5f43a0 2b50ae3 72ceb13 2b50ae3 72ceb13 ec47b7f 72ceb13 ec47b7f 72ceb13 ec47b7f 72ceb13 2b50ae3 72ceb13 2b50ae3 76adee7 2b50ae3 7eedd82 2b50ae3 7eedd82 2b50ae3 7eedd82 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 | """FastAPI 백엔드 — /api/ask 4구성 호출.
루트 구조:
GET /api/health — 헬스체크
GET /api/dataset/summary — 데이터셋 메타 (검토건 표, GP 표)
POST /api/ask — 질문 → 3개 답변(Python/Sonnet/Gemma) + route
POST /api/register — 사용자 등록 검토건 → 세션 내 그래프 merge
GET /api/download/{type} — paper/graph/ttl 정적 파일
환경변수:
- LLM_BACKEND (ollama|hf_inference|transformers_local)
- ANTHROPIC_API_KEY
- HF_TOKEN
- HF_GEMMA_MODEL (default: google/gemma-3-4b-it)
- OLLAMA_MODEL (default: gemma4:e4b)
"""
from __future__ import annotations
import os
import sys
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
# 경로 셋업 — Docker 컨테이너(/app/api/main.py)와 로컬 개발(active/hf_app/api/main.py) 둘 다 지원
API_DIR = Path(__file__).resolve().parent
HF_APP_DIR = API_DIR.parent # 컨테이너: /app, 로컬: active/hf_app
# code/rag_engine 위치 — 컨테이너는 /app/code, 로컬은 active/code
_container_code = HF_APP_DIR / "code"
_local_code = HF_APP_DIR.parent / "code"
CODE_DIR = _container_code if _container_code.exists() else _local_code
sys.path.insert(0, str(CODE_DIR))
# api 폴더 (llm_adapters.py 위치) — 같은 폴더지만 uvicorn 모듈 import 형식이라 명시 필요
sys.path.insert(0, str(API_DIR))
# 데이터 위치 — 컨테이너는 /app/active/ontology, 로컬은 active/ontology
_container_data = HF_APP_DIR / "active" / "ontology"
_local_data = HF_APP_DIR.parent / "ontology"
# Dockerfile에서도 data/를 /app/active/ontology로 COPY함 (HF_APP_DIR/active/ontology)
ONTOLOGY_DIR = _container_data if _container_data.exists() else _local_data
import rag_engine
import semantic_search as ss
import llm_adapters
# 자산 파일 (다운로드 카드용 — assets/는 LFS 추가 전까지 미존재)
PAPER_PATH = HF_APP_DIR / "assets" / "paper_v6.pdf"
ONT_GRAPH_PNG = HF_APP_DIR / "assets" / "ontology_class_hierarchy_v1_10.png"
TTL_PATH = ONTOLOGY_DIR / "investment_ontology_v1_10.ttl"
JSONL_PATH = ONTOLOGY_DIR / "regulations_chunks_v14.jsonl"
ALIAS_PATH = ONTOLOGY_DIR / "alias_dictionary.json"
LOOKUP_PATH = ONTOLOGY_DIR / "risk_weight_lookup.json"
# ============================================================
# 앱 + 데이터 로드
# ============================================================
app = FastAPI(title="LP출자 온톨로지 LLM 프로토타입 API", version="1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 개발 편의. 운영 시 도메인 제한.
allow_credentials=True, allow_methods=["*"], allow_headers=["*"],
)
# 전역 데이터 — 가벼운 것(TTL/청크/alias/lookup)만 startup에 로드.
# KoSimCSE 임베딩은 메모리 스파이크가 커서 첫 질문 때 lazy 로드 (OOM 회피).
print(f"[startup] LLM_BACKEND = {llm_adapters.LLM_BACKEND}")
print(f"[startup] 데이터 로드: {TTL_PATH}")
GRAPH = rag_engine.load_ttl(TTL_PATH)
CHUNKS = rag_engine.load_chunks(JSONL_PATH)
ALIAS = rag_engine.load_alias(ALIAS_PATH)
LOOKUP = rag_engine.load_lookup(LOOKUP_PATH)
print(f"[startup] 트리플 {len(GRAPH):,}, 청크 {len(CHUNKS)}")
print(f"[startup] ready (KoSimCSE는 첫 질문 시 lazy 로드)")
_kosimcse_warmed = False
_kosimcse_lock = threading.Lock()
def _ensure_kosimcse():
"""KoSimCSE 모델·임베딩을 첫 사용 시 한 번만 로드.
double-checked locking — 락 없이 플래그만 체크하면 동시 요청 2개가
동시에 `if not _kosimcse_warmed`를 통과해 warm_up을 중복 실행한다.
그러면 274청크를 CPU로 두 번 임베딩 → 메모리 2배 스파이크 → OOM 위험.
락으로 직렬화해 warm_up이 정확히 한 번만 돌게 한다.
"""
global _kosimcse_warmed
if _kosimcse_warmed:
return
with _kosimcse_lock:
if _kosimcse_warmed:
return
print("[lazy] KoSimCSE warm-up 시작...")
ss.warm_up(CHUNKS)
_kosimcse_warmed = True
print("[lazy] KoSimCSE warm-up 완료")
# ============================================================
# 스키마
# ============================================================
class AskRequest(BaseModel):
question: str
mode: str = "axisB" # "axisB" | "keyword"
class AnswerCol(BaseModel):
answer: str
route: str
elapsed_sec: float = 0.0
class AskResponse(BaseModel):
question: str
mode: str
route: str
python: AnswerCol # raw 컨텍스트 (LLM 호출 없음)
sonnet: AnswerCol
gemma: AnswerCol
# ============================================================
# 엔드포인트
# ============================================================
@app.get("/api/health")
def health():
return {
"ok": True,
"triples": len(GRAPH),
"chunks": len(CHUNKS),
"llm_backend": llm_adapters.LLM_BACKEND,
}
@app.get("/api/dataset/summary")
def dataset_summary():
"""데이터셋 요약 — 검토건 표, GP 표."""
from rag_engine import (
query_all_investments_with_label, query_investment_branches,
query_investment_meta,
)
invs = query_all_investments_with_label(GRAPH)
fund_rows, gp_set = [], set()
for inv in invs:
if inv.get("n_branches", 0) == 0:
continue
meta = query_investment_meta(GRAPH, inv["iri"])
branches = query_investment_branches(GRAPH, inv["iri"])
amt = sum(int(float(b.get("amount", 0))) for b in branches) // 100000000
first_stage = branches[0].get("stage_label", "-") if branches else "-"
fund_rows.append({
"id": str(inv["iri"]).split("_")[-1],
"fund": inv["fund_label"][:25],
"amount_eok": amt,
"stage": first_stage,
"branches": len(branches),
})
if meta.get("gp_label"):
gp_set.add(meta["gp_label"])
return {
"funds": fund_rows,
"gps": [{"name": n, "id": f"gp-{i+1:03d}"} for i, n in enumerate(sorted(gp_set))],
}
@app.post("/api/ask", response_model=AskResponse)
def ask(req: AskRequest):
q = req.question.strip()
if not q:
raise HTTPException(400, "question is empty")
mode = req.mode if req.mode in ("axisB", "keyword") else "axisB"
# axisB 모드는 KoSimCSE 의미검색 필요 → 첫 사용 시 lazy 로드
if mode == "axisB":
_ensure_kosimcse()
# Python column: LLM 답변 생성 없이 raw 컨텍스트
try:
if mode == "axisB":
r_py = rag_engine.answer_question_llm(
q, GRAPH, CHUNKS, ALIAS, LOOKUP,
use_anthropic=True, use_gemma_gen=False, use_semantic=True,
)
else:
r_py = rag_engine.answer_question(
q, GRAPH, CHUNKS, ALIAS, LOOKUP, use_gemma=False,
)
except Exception as e:
r_py = {"answer": f"오류: {e}", "route": "error"}
route_str = r_py.get("route", "")
# Sonnet column
if os.environ.get("ANTHROPIC_API_KEY"):
r_son = _call_sonnet(q, r_py.get("answer", ""))
else:
r_son = AnswerCol(answer="⚠️ ANTHROPIC_API_KEY 미설정", route="no_api", elapsed_sec=0)
# Gemma column
r_gem = _call_gemma_col(q, r_py.get("answer", ""))
return AskResponse(
question=q, mode=mode, route=route_str,
python=AnswerCol(answer=r_py.get("answer", ""), route=route_str, elapsed_sec=0.0),
sonnet=r_son,
gemma=r_gem,
)
@app.get("/api/download/{kind}")
def download(kind: str):
path_map = {
"paper": (PAPER_PATH, "LP출자_온톨로지_RAG_paper.pdf", "application/pdf"),
"graph": (ONT_GRAPH_PNG, "ontology_graph.png", "image/png"),
"ttl": (TTL_PATH, "investment_ontology_v1_10.ttl", "text/turtle"),
}
if kind not in path_map:
raise HTTPException(404, f"unknown kind: {kind}")
path, filename, media_type = path_map[kind]
if not path.exists():
# HF Docker 빌드는 LFS 파일(assets)을 빌드 컨텍스트에 실제 내용으로 넣지 않을 수 있다.
# 이 경우 런타임에 Space 레포에서 직접 fetch (HF_TOKEN). 빌드/LFS smudge 의존 제거.
try:
from huggingface_hub import hf_hub_download
fetched = hf_hub_download(
repo_id="ForStream/ontology-prototype", repo_type="space",
filename=f"assets/{path.name}", token=os.environ.get("HF_TOKEN") or None,
)
return FileResponse(fetched, filename=filename, media_type=media_type)
except Exception as e:
raise HTTPException(404, f"file not found: {path} (런타임 fetch 실패: {e})")
return FileResponse(str(path), filename=filename, media_type=media_type)
# ============================================================
# 내부 헬퍼
# ============================================================
def _call_sonnet(question: str, context: str) -> AnswerCol:
import anthropic
start = time.time()
try:
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
system = (
"당신은 한국 LP출자 도메인 전문 금융회사 직원의 보조 AI입니다. "
"아래 컨텍스트의 사실을 그대로 유지하면서 자연스러운 한국어로 답변을 다듬어 주세요."
)
resp = client.messages.create(
model=os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6"),
max_tokens=1500, temperature=0.3, system=system,
messages=[{"role": "user", "content": f"[컨텍스트]\n{context}\n\n[질문]\n{question}\n\n[답변]"}],
)
text = "".join(b.text for b in resp.content if hasattr(b, "text"))
return AnswerCol(answer=text, route="sonnet", elapsed_sec=time.time() - start)
except Exception as e:
return AnswerCol(answer=f"Sonnet 오류: {e}", route="error", elapsed_sec=time.time() - start)
def _call_gemma_col(question: str, context: str) -> AnswerCol:
start = time.time()
answer, ok, info = llm_adapters.call_gemma(question, context)
if ok:
return AnswerCol(answer=answer, route="gemma", elapsed_sec=time.time() - start)
return AnswerCol(answer=f"Gemma 오류: {info}", route="error", elapsed_sec=time.time() - start)
# ============================================================
# 정적 파일 — React 빌드 결과 (컨테이너/로컬 경로 모두 탐색)
# ============================================================
_dist_candidates = [
HF_APP_DIR / "web" / "dist", # 로컬: active/hf_app/web/dist
HF_APP_DIR / "hf_app" / "web" / "dist", # 컨테이너 현 Dockerfile: /app/hf_app/web/dist
Path("/app/hf_app/web/dist"),
Path("/app/web/dist"),
]
WEB_DIST = next((p for p in _dist_candidates if p.exists()), None)
print(f"[startup] WEB_DIST = {WEB_DIST}")
if WEB_DIST is not None:
from fastapi.staticfiles import StaticFiles
app.mount("/", StaticFiles(directory=str(WEB_DIST), html=True), name="web")
else:
# dist 못 찾아도 헬스체크가 통과하도록 루트 라우트 제공 (재시작 루프 방지)
from fastapi.responses import HTMLResponse
@app.get("/")
def _root_fallback():
return HTMLResponse(
"<h1>LP출자 온톨로지 프로토타입 API</h1>"
"<p>프론트엔드 빌드(web/dist)를 찾지 못했습니다. API는 /api/* 에서 동작합니다.</p>"
f"<p>탐색 경로: {[str(p) for p in _dist_candidates]}</p>"
)
|