Spaces:
Running
Running
| """FastAPI 백엔드 — /api/ask 4구성 호출. | |
| 루트 구조: | |
| GET /api/health — 헬스체크 | |
| GET /api/dataset/summary — 데이터셋 메타 (검토건 표, GP 표) | |
| POST /api/ask — 질문 → 3개 답변(Python/Sonnet/Gemma) + route | |
| POST /api/register — 사용자 등록 검토건 → 세션 내 그래프 merge | |
| GET /api/download/{type} — paper/graph/ttl 정적 파일 | |
| 환경변수: | |
| - LLM_BACKEND (ollama|hf_inference|transformers_local) | |
| - ANTHROPIC_API_KEY | |
| - HF_TOKEN | |
| - HF_GEMMA_MODEL (default: google/gemma-3-4b-it) | |
| - OLLAMA_MODEL (default: gemma4:e4b) | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sys | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel | |
| # 경로 셋업 — Docker 컨테이너(/app/api/main.py)와 로컬 개발(active/hf_app/api/main.py) 둘 다 지원 | |
| API_DIR = Path(__file__).resolve().parent | |
| HF_APP_DIR = API_DIR.parent # 컨테이너: /app, 로컬: active/hf_app | |
| # code/rag_engine 위치 — 컨테이너는 /app/code, 로컬은 active/code | |
| _container_code = HF_APP_DIR / "code" | |
| _local_code = HF_APP_DIR.parent / "code" | |
| CODE_DIR = _container_code if _container_code.exists() else _local_code | |
| sys.path.insert(0, str(CODE_DIR)) | |
| # api 폴더 (llm_adapters.py 위치) — 같은 폴더지만 uvicorn 모듈 import 형식이라 명시 필요 | |
| sys.path.insert(0, str(API_DIR)) | |
| # 데이터 위치 — 컨테이너는 /app/active/ontology, 로컬은 active/ontology | |
| _container_data = HF_APP_DIR / "active" / "ontology" | |
| _local_data = HF_APP_DIR.parent / "ontology" | |
| # Dockerfile에서도 data/를 /app/active/ontology로 COPY함 (HF_APP_DIR/active/ontology) | |
| ONTOLOGY_DIR = _container_data if _container_data.exists() else _local_data | |
| import rag_engine | |
| import semantic_search as ss | |
| import llm_adapters | |
| # 자산 파일 (다운로드 카드용 — assets/는 LFS 추가 전까지 미존재) | |
| PAPER_PATH = HF_APP_DIR / "assets" / "paper_v6.pdf" | |
| ONT_GRAPH_PNG = HF_APP_DIR / "assets" / "ontology_class_hierarchy_v1_10.png" | |
| TTL_PATH = ONTOLOGY_DIR / "investment_ontology_v1_10.ttl" | |
| JSONL_PATH = ONTOLOGY_DIR / "regulations_chunks_v14.jsonl" | |
| ALIAS_PATH = ONTOLOGY_DIR / "alias_dictionary.json" | |
| LOOKUP_PATH = ONTOLOGY_DIR / "risk_weight_lookup.json" | |
| # ============================================================ | |
| # 앱 + 데이터 로드 | |
| # ============================================================ | |
| app = FastAPI(title="LP출자 온톨로지 LLM 프로토타입 API", version="1.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # 개발 편의. 운영 시 도메인 제한. | |
| allow_credentials=True, allow_methods=["*"], allow_headers=["*"], | |
| ) | |
| # 전역 데이터 — 가벼운 것(TTL/청크/alias/lookup)만 startup에 로드. | |
| # KoSimCSE 임베딩은 메모리 스파이크가 커서 첫 질문 때 lazy 로드 (OOM 회피). | |
| print(f"[startup] LLM_BACKEND = {llm_adapters.LLM_BACKEND}") | |
| print(f"[startup] 데이터 로드: {TTL_PATH}") | |
| GRAPH = rag_engine.load_ttl(TTL_PATH) | |
| CHUNKS = rag_engine.load_chunks(JSONL_PATH) | |
| ALIAS = rag_engine.load_alias(ALIAS_PATH) | |
| LOOKUP = rag_engine.load_lookup(LOOKUP_PATH) | |
| print(f"[startup] 트리플 {len(GRAPH):,}, 청크 {len(CHUNKS)}") | |
| print(f"[startup] ready (KoSimCSE는 첫 질문 시 lazy 로드)") | |
| _kosimcse_warmed = False | |
| _kosimcse_lock = threading.Lock() | |
| def _ensure_kosimcse(): | |
| """KoSimCSE 모델·임베딩을 첫 사용 시 한 번만 로드. | |
| double-checked locking — 락 없이 플래그만 체크하면 동시 요청 2개가 | |
| 동시에 `if not _kosimcse_warmed`를 통과해 warm_up을 중복 실행한다. | |
| 그러면 274청크를 CPU로 두 번 임베딩 → 메모리 2배 스파이크 → OOM 위험. | |
| 락으로 직렬화해 warm_up이 정확히 한 번만 돌게 한다. | |
| """ | |
| global _kosimcse_warmed | |
| if _kosimcse_warmed: | |
| return | |
| with _kosimcse_lock: | |
| if _kosimcse_warmed: | |
| return | |
| print("[lazy] KoSimCSE warm-up 시작...") | |
| ss.warm_up(CHUNKS) | |
| _kosimcse_warmed = True | |
| print("[lazy] KoSimCSE warm-up 완료") | |
| # ============================================================ | |
| # 스키마 | |
| # ============================================================ | |
| class AskRequest(BaseModel): | |
| question: str | |
| mode: str = "axisB" # "axisB" | "keyword" | |
| class AnswerCol(BaseModel): | |
| answer: str | |
| route: str | |
| elapsed_sec: float = 0.0 | |
| class AskResponse(BaseModel): | |
| question: str | |
| mode: str | |
| route: str | |
| python: AnswerCol # raw 컨텍스트 (LLM 호출 없음) | |
| sonnet: AnswerCol | |
| gemma: AnswerCol | |
| # ============================================================ | |
| # 엔드포인트 | |
| # ============================================================ | |
| def health(): | |
| return { | |
| "ok": True, | |
| "triples": len(GRAPH), | |
| "chunks": len(CHUNKS), | |
| "llm_backend": llm_adapters.LLM_BACKEND, | |
| } | |
| def dataset_summary(): | |
| """데이터셋 요약 — 검토건 표, GP 표.""" | |
| from rag_engine import ( | |
| query_all_investments_with_label, query_investment_branches, | |
| query_investment_meta, | |
| ) | |
| invs = query_all_investments_with_label(GRAPH) | |
| fund_rows, gp_set = [], set() | |
| for inv in invs: | |
| if inv.get("n_branches", 0) == 0: | |
| continue | |
| meta = query_investment_meta(GRAPH, inv["iri"]) | |
| branches = query_investment_branches(GRAPH, inv["iri"]) | |
| amt = sum(int(float(b.get("amount", 0))) for b in branches) // 100000000 | |
| first_stage = branches[0].get("stage_label", "-") if branches else "-" | |
| fund_rows.append({ | |
| "id": str(inv["iri"]).split("_")[-1], | |
| "fund": inv["fund_label"][:25], | |
| "amount_eok": amt, | |
| "stage": first_stage, | |
| "branches": len(branches), | |
| }) | |
| if meta.get("gp_label"): | |
| gp_set.add(meta["gp_label"]) | |
| return { | |
| "funds": fund_rows, | |
| "gps": [{"name": n, "id": f"gp-{i+1:03d}"} for i, n in enumerate(sorted(gp_set))], | |
| } | |
| def ask(req: AskRequest): | |
| q = req.question.strip() | |
| if not q: | |
| raise HTTPException(400, "question is empty") | |
| mode = req.mode if req.mode in ("axisB", "keyword") else "axisB" | |
| # axisB 모드는 KoSimCSE 의미검색 필요 → 첫 사용 시 lazy 로드 | |
| if mode == "axisB": | |
| _ensure_kosimcse() | |
| # Python column: LLM 답변 생성 없이 raw 컨텍스트 | |
| try: | |
| if mode == "axisB": | |
| r_py = rag_engine.answer_question_llm( | |
| q, GRAPH, CHUNKS, ALIAS, LOOKUP, | |
| use_anthropic=True, use_gemma_gen=False, use_semantic=True, | |
| ) | |
| else: | |
| r_py = rag_engine.answer_question( | |
| q, GRAPH, CHUNKS, ALIAS, LOOKUP, use_gemma=False, | |
| ) | |
| except Exception as e: | |
| r_py = {"answer": f"오류: {e}", "route": "error"} | |
| route_str = r_py.get("route", "") | |
| # Sonnet column | |
| if os.environ.get("ANTHROPIC_API_KEY"): | |
| r_son = _call_sonnet(q, r_py.get("answer", "")) | |
| else: | |
| r_son = AnswerCol(answer="⚠️ ANTHROPIC_API_KEY 미설정", route="no_api", elapsed_sec=0) | |
| # Gemma column | |
| r_gem = _call_gemma_col(q, r_py.get("answer", "")) | |
| return AskResponse( | |
| question=q, mode=mode, route=route_str, | |
| python=AnswerCol(answer=r_py.get("answer", ""), route=route_str, elapsed_sec=0.0), | |
| sonnet=r_son, | |
| gemma=r_gem, | |
| ) | |
| def download(kind: str): | |
| path_map = { | |
| "paper": (PAPER_PATH, "LP출자_온톨로지_RAG_paper.pdf", "application/pdf"), | |
| "graph": (ONT_GRAPH_PNG, "ontology_graph.png", "image/png"), | |
| "ttl": (TTL_PATH, "investment_ontology_v1_10.ttl", "text/turtle"), | |
| } | |
| if kind not in path_map: | |
| raise HTTPException(404, f"unknown kind: {kind}") | |
| path, filename, media_type = path_map[kind] | |
| if not path.exists(): | |
| # HF Docker 빌드는 LFS 파일(assets)을 빌드 컨텍스트에 실제 내용으로 넣지 않을 수 있다. | |
| # 이 경우 런타임에 Space 레포에서 직접 fetch (HF_TOKEN). 빌드/LFS smudge 의존 제거. | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| fetched = hf_hub_download( | |
| repo_id="ForStream/ontology-prototype", repo_type="space", | |
| filename=f"assets/{path.name}", token=os.environ.get("HF_TOKEN") or None, | |
| ) | |
| return FileResponse(fetched, filename=filename, media_type=media_type) | |
| except Exception as e: | |
| raise HTTPException(404, f"file not found: {path} (런타임 fetch 실패: {e})") | |
| return FileResponse(str(path), filename=filename, media_type=media_type) | |
| # ============================================================ | |
| # 내부 헬퍼 | |
| # ============================================================ | |
| def _call_sonnet(question: str, context: str) -> AnswerCol: | |
| import anthropic | |
| start = time.time() | |
| try: | |
| client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) | |
| system = ( | |
| "당신은 한국 LP출자 도메인 전문 금융회사 직원의 보조 AI입니다. " | |
| "아래 컨텍스트의 사실을 그대로 유지하면서 자연스러운 한국어로 답변을 다듬어 주세요." | |
| ) | |
| resp = client.messages.create( | |
| model=os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6"), | |
| max_tokens=1500, temperature=0.3, system=system, | |
| messages=[{"role": "user", "content": f"[컨텍스트]\n{context}\n\n[질문]\n{question}\n\n[답변]"}], | |
| ) | |
| text = "".join(b.text for b in resp.content if hasattr(b, "text")) | |
| return AnswerCol(answer=text, route="sonnet", elapsed_sec=time.time() - start) | |
| except Exception as e: | |
| return AnswerCol(answer=f"Sonnet 오류: {e}", route="error", elapsed_sec=time.time() - start) | |
| def _call_gemma_col(question: str, context: str) -> AnswerCol: | |
| start = time.time() | |
| answer, ok, info = llm_adapters.call_gemma(question, context) | |
| if ok: | |
| return AnswerCol(answer=answer, route="gemma", elapsed_sec=time.time() - start) | |
| return AnswerCol(answer=f"Gemma 오류: {info}", route="error", elapsed_sec=time.time() - start) | |
| # ============================================================ | |
| # 정적 파일 — React 빌드 결과 (컨테이너/로컬 경로 모두 탐색) | |
| # ============================================================ | |
| _dist_candidates = [ | |
| HF_APP_DIR / "web" / "dist", # 로컬: active/hf_app/web/dist | |
| HF_APP_DIR / "hf_app" / "web" / "dist", # 컨테이너 현 Dockerfile: /app/hf_app/web/dist | |
| Path("/app/hf_app/web/dist"), | |
| Path("/app/web/dist"), | |
| ] | |
| WEB_DIST = next((p for p in _dist_candidates if p.exists()), None) | |
| print(f"[startup] WEB_DIST = {WEB_DIST}") | |
| if WEB_DIST is not None: | |
| from fastapi.staticfiles import StaticFiles | |
| app.mount("/", StaticFiles(directory=str(WEB_DIST), html=True), name="web") | |
| else: | |
| # dist 못 찾아도 헬스체크가 통과하도록 루트 라우트 제공 (재시작 루프 방지) | |
| from fastapi.responses import HTMLResponse | |
| def _root_fallback(): | |
| return HTMLResponse( | |
| "<h1>LP출자 온톨로지 프로토타입 API</h1>" | |
| "<p>프론트엔드 빌드(web/dist)를 찾지 못했습니다. API는 /api/* 에서 동작합니다.</p>" | |
| f"<p>탐색 경로: {[str(p) for p in _dist_candidates]}</p>" | |
| ) | |