"""FastAPI 백엔드 — /api/ask 4구성 호출. 루트 구조: GET /api/health — 헬스체크 GET /api/dataset/summary — 데이터셋 메타 (검토건 표, GP 표) POST /api/ask — 질문 → 3개 답변(Python/Sonnet/Gemma) + route POST /api/register — 사용자 등록 검토건 → 세션 내 그래프 merge GET /api/download/{type} — paper/graph/ttl 정적 파일 환경변수: - LLM_BACKEND (ollama|hf_inference|transformers_local) - ANTHROPIC_API_KEY - HF_TOKEN - HF_GEMMA_MODEL (default: google/gemma-3-4b-it) - OLLAMA_MODEL (default: gemma4:e4b) """ from __future__ import annotations import os import sys import threading import time from pathlib import Path from typing import Any, Dict, List, Optional from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel # 경로 셋업 — Docker 컨테이너(/app/api/main.py)와 로컬 개발(active/hf_app/api/main.py) 둘 다 지원 API_DIR = Path(__file__).resolve().parent HF_APP_DIR = API_DIR.parent # 컨테이너: /app, 로컬: active/hf_app # code/rag_engine 위치 — 컨테이너는 /app/code, 로컬은 active/code _container_code = HF_APP_DIR / "code" _local_code = HF_APP_DIR.parent / "code" CODE_DIR = _container_code if _container_code.exists() else _local_code sys.path.insert(0, str(CODE_DIR)) # api 폴더 (llm_adapters.py 위치) — 같은 폴더지만 uvicorn 모듈 import 형식이라 명시 필요 sys.path.insert(0, str(API_DIR)) # 데이터 위치 — 컨테이너는 /app/active/ontology, 로컬은 active/ontology _container_data = HF_APP_DIR / "active" / "ontology" _local_data = HF_APP_DIR.parent / "ontology" # Dockerfile에서도 data/를 /app/active/ontology로 COPY함 (HF_APP_DIR/active/ontology) ONTOLOGY_DIR = _container_data if _container_data.exists() else _local_data import rag_engine import semantic_search as ss import llm_adapters # 자산 파일 (다운로드 카드용 — assets/는 LFS 추가 전까지 미존재) PAPER_PATH = HF_APP_DIR / "assets" / "paper_v6.pdf" ONT_GRAPH_PNG = HF_APP_DIR / "assets" / "ontology_class_hierarchy_v1_10.png" TTL_PATH = ONTOLOGY_DIR / "investment_ontology_v1_10.ttl" JSONL_PATH = ONTOLOGY_DIR / "regulations_chunks_v14.jsonl" ALIAS_PATH = ONTOLOGY_DIR / "alias_dictionary.json" LOOKUP_PATH = ONTOLOGY_DIR / "risk_weight_lookup.json" # ============================================================ # 앱 + 데이터 로드 # ============================================================ app = FastAPI(title="LP출자 온톨로지 LLM 프로토타입 API", version="1.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], # 개발 편의. 운영 시 도메인 제한. allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 전역 데이터 — 가벼운 것(TTL/청크/alias/lookup)만 startup에 로드. # KoSimCSE 임베딩은 메모리 스파이크가 커서 첫 질문 때 lazy 로드 (OOM 회피). print(f"[startup] LLM_BACKEND = {llm_adapters.LLM_BACKEND}") print(f"[startup] 데이터 로드: {TTL_PATH}") GRAPH = rag_engine.load_ttl(TTL_PATH) CHUNKS = rag_engine.load_chunks(JSONL_PATH) ALIAS = rag_engine.load_alias(ALIAS_PATH) LOOKUP = rag_engine.load_lookup(LOOKUP_PATH) print(f"[startup] 트리플 {len(GRAPH):,}, 청크 {len(CHUNKS)}") print(f"[startup] ready (KoSimCSE는 첫 질문 시 lazy 로드)") _kosimcse_warmed = False _kosimcse_lock = threading.Lock() def _ensure_kosimcse(): """KoSimCSE 모델·임베딩을 첫 사용 시 한 번만 로드. double-checked locking — 락 없이 플래그만 체크하면 동시 요청 2개가 동시에 `if not _kosimcse_warmed`를 통과해 warm_up을 중복 실행한다. 그러면 274청크를 CPU로 두 번 임베딩 → 메모리 2배 스파이크 → OOM 위험. 락으로 직렬화해 warm_up이 정확히 한 번만 돌게 한다. """ global _kosimcse_warmed if _kosimcse_warmed: return with _kosimcse_lock: if _kosimcse_warmed: return print("[lazy] KoSimCSE warm-up 시작...") ss.warm_up(CHUNKS) _kosimcse_warmed = True print("[lazy] KoSimCSE warm-up 완료") # ============================================================ # 스키마 # ============================================================ class AskRequest(BaseModel): question: str mode: str = "axisB" # "axisB" | "keyword" class AnswerCol(BaseModel): answer: str route: str elapsed_sec: float = 0.0 class AskResponse(BaseModel): question: str mode: str route: str python: AnswerCol # raw 컨텍스트 (LLM 호출 없음) sonnet: AnswerCol gemma: AnswerCol # ============================================================ # 엔드포인트 # ============================================================ @app.get("/api/health") def health(): return { "ok": True, "triples": len(GRAPH), "chunks": len(CHUNKS), "llm_backend": llm_adapters.LLM_BACKEND, } @app.get("/api/dataset/summary") def dataset_summary(): """데이터셋 요약 — 검토건 표, GP 표.""" from rag_engine import ( query_all_investments_with_label, query_investment_branches, query_investment_meta, ) invs = query_all_investments_with_label(GRAPH) fund_rows, gp_set = [], set() for inv in invs: if inv.get("n_branches", 0) == 0: continue meta = query_investment_meta(GRAPH, inv["iri"]) branches = query_investment_branches(GRAPH, inv["iri"]) amt = sum(int(float(b.get("amount", 0))) for b in branches) // 100000000 first_stage = branches[0].get("stage_label", "-") if branches else "-" fund_rows.append({ "id": str(inv["iri"]).split("_")[-1], "fund": inv["fund_label"][:25], "amount_eok": amt, "stage": first_stage, "branches": len(branches), }) if meta.get("gp_label"): gp_set.add(meta["gp_label"]) return { "funds": fund_rows, "gps": [{"name": n, "id": f"gp-{i+1:03d}"} for i, n in enumerate(sorted(gp_set))], } @app.post("/api/ask", response_model=AskResponse) def ask(req: AskRequest): q = req.question.strip() if not q: raise HTTPException(400, "question is empty") mode = req.mode if req.mode in ("axisB", "keyword") else "axisB" # axisB 모드는 KoSimCSE 의미검색 필요 → 첫 사용 시 lazy 로드 if mode == "axisB": _ensure_kosimcse() # Python column: LLM 답변 생성 없이 raw 컨텍스트 try: if mode == "axisB": r_py = rag_engine.answer_question_llm( q, GRAPH, CHUNKS, ALIAS, LOOKUP, use_anthropic=True, use_gemma_gen=False, use_semantic=True, ) else: r_py = rag_engine.answer_question( q, GRAPH, CHUNKS, ALIAS, LOOKUP, use_gemma=False, ) except Exception as e: r_py = {"answer": f"오류: {e}", "route": "error"} route_str = r_py.get("route", "") # Sonnet column if os.environ.get("ANTHROPIC_API_KEY"): r_son = _call_sonnet(q, r_py.get("answer", "")) else: r_son = AnswerCol(answer="⚠️ ANTHROPIC_API_KEY 미설정", route="no_api", elapsed_sec=0) # Gemma column r_gem = _call_gemma_col(q, r_py.get("answer", "")) return AskResponse( question=q, mode=mode, route=route_str, python=AnswerCol(answer=r_py.get("answer", ""), route=route_str, elapsed_sec=0.0), sonnet=r_son, gemma=r_gem, ) @app.get("/api/download/{kind}") def download(kind: str): path_map = { "paper": (PAPER_PATH, "LP출자_온톨로지_RAG_paper.pdf", "application/pdf"), "graph": (ONT_GRAPH_PNG, "ontology_graph.png", "image/png"), "ttl": (TTL_PATH, "investment_ontology_v1_10.ttl", "text/turtle"), } if kind not in path_map: raise HTTPException(404, f"unknown kind: {kind}") path, filename, media_type = path_map[kind] if not path.exists(): # HF Docker 빌드는 LFS 파일(assets)을 빌드 컨텍스트에 실제 내용으로 넣지 않을 수 있다. # 이 경우 런타임에 Space 레포에서 직접 fetch (HF_TOKEN). 빌드/LFS smudge 의존 제거. try: from huggingface_hub import hf_hub_download fetched = hf_hub_download( repo_id="ForStream/ontology-prototype", repo_type="space", filename=f"assets/{path.name}", token=os.environ.get("HF_TOKEN") or None, ) return FileResponse(fetched, filename=filename, media_type=media_type) except Exception as e: raise HTTPException(404, f"file not found: {path} (런타임 fetch 실패: {e})") return FileResponse(str(path), filename=filename, media_type=media_type) # ============================================================ # 내부 헬퍼 # ============================================================ def _call_sonnet(question: str, context: str) -> AnswerCol: import anthropic start = time.time() try: client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) system = ( "당신은 한국 LP출자 도메인 전문 금융회사 직원의 보조 AI입니다. " "아래 컨텍스트의 사실을 그대로 유지하면서 자연스러운 한국어로 답변을 다듬어 주세요." ) resp = client.messages.create( model=os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6"), max_tokens=1500, temperature=0.3, system=system, messages=[{"role": "user", "content": f"[컨텍스트]\n{context}\n\n[질문]\n{question}\n\n[답변]"}], ) text = "".join(b.text for b in resp.content if hasattr(b, "text")) return AnswerCol(answer=text, route="sonnet", elapsed_sec=time.time() - start) except Exception as e: return AnswerCol(answer=f"Sonnet 오류: {e}", route="error", elapsed_sec=time.time() - start) def _call_gemma_col(question: str, context: str) -> AnswerCol: start = time.time() answer, ok, info = llm_adapters.call_gemma(question, context) if ok: return AnswerCol(answer=answer, route="gemma", elapsed_sec=time.time() - start) return AnswerCol(answer=f"Gemma 오류: {info}", route="error", elapsed_sec=time.time() - start) # ============================================================ # 정적 파일 — React 빌드 결과 (컨테이너/로컬 경로 모두 탐색) # ============================================================ _dist_candidates = [ HF_APP_DIR / "web" / "dist", # 로컬: active/hf_app/web/dist HF_APP_DIR / "hf_app" / "web" / "dist", # 컨테이너 현 Dockerfile: /app/hf_app/web/dist Path("/app/hf_app/web/dist"), Path("/app/web/dist"), ] WEB_DIST = next((p for p in _dist_candidates if p.exists()), None) print(f"[startup] WEB_DIST = {WEB_DIST}") if WEB_DIST is not None: from fastapi.staticfiles import StaticFiles app.mount("/", StaticFiles(directory=str(WEB_DIST), html=True), name="web") else: # dist 못 찾아도 헬스체크가 통과하도록 루트 라우트 제공 (재시작 루프 방지) from fastapi.responses import HTMLResponse @app.get("/") def _root_fallback(): return HTMLResponse( "
프론트엔드 빌드(web/dist)를 찾지 못했습니다. API는 /api/* 에서 동작합니다.
" f"탐색 경로: {[str(p) for p in _dist_candidates]}
" )