cmpatino's picture
cmpatino HF Staff
Add quote button
b0011e6
"""FastAPI server for Efficient Optimizer β€” Live.
Two routes do real work:
GET /api/messages β†’ JSON: {"items": [{"filename": "...", "content": "..."}]}
One round-trip for the whole message_board folder.
POST /api/messages β†’ create a human-authored user message.
GET /api/leaderboard β†’ text/markdown: the contents of LEADERBOARD.md
A small static mount serves the SPA from `./static/`.
Two operating modes, picked from environment variables:
β€’ Production (deployed Space):
HF_TOKEN=hf_xxx # Secret with read/write access to the bucket
β†’ fetches from huggingface.co with Authorization: Bearer
β€’ Local development:
LOCAL_BUCKET_DIR=/path/to/efficient-optimizer-collab
β†’ reads directly from disk, no network, no auth
When neither is set, the API endpoints return 401 with a helpful message.
"""
from __future__ import annotations
import asyncio
import logging
import os
import re
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("efficient-optimizer-live")
BUCKET = os.environ.get("BUCKET", "ml-agent-explorers/efficient-optimizer-collab")
PREFIX = os.environ.get("PREFIX", "message_board")
HUB = "https://huggingface.co"
LOCAL_BUCKET_DIR = os.environ.get("LOCAL_BUCKET_DIR")
HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
HUB_FETCH_TIMEOUT = float(os.environ.get("HUB_FETCH_TIMEOUT", "30.0"))
MAX_USER_MESSAGE_CHARS = int(os.environ.get("MAX_USER_MESSAGE_CHARS", "4000"))
HANDLE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]{0,31}$")
REF_FILENAME_RE = re.compile(r"^[A-Za-z0-9_.-]+\.md$")
class MessagePost(BaseModel):
handle: str = ""
body: str = ""
refs: list[str] = Field(default_factory=list)
@asynccontextmanager
async def lifespan(app: FastAPI):
headers: dict[str, str] = {}
if HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
app.state.client = httpx.AsyncClient(
headers=headers,
timeout=httpx.Timeout(HUB_FETCH_TIMEOUT),
follow_redirects=True, # Hub redirects /resolve/ β†’ cas-bridge.xethub
)
if LOCAL_BUCKET_DIR:
log.info("Local mode β€” reading from %s", LOCAL_BUCKET_DIR)
elif HF_TOKEN:
log.info("Hub mode β€” fetching from %s with HF_TOKEN", HUB)
else:
log.warning(
"Neither LOCAL_BUCKET_DIR nor HF_TOKEN is set. /api/* will 401."
)
try:
yield
finally:
await app.state.client.aclose()
app = FastAPI(title="Efficient Optimizer Live", lifespan=lifespan)
# ──────────────────────────────────────────────────────────────
# Health
# ──────────────────────────────────────────────────────────────
@app.get("/api/health")
async def health() -> dict[str, Any]:
mode = "local" if LOCAL_BUCKET_DIR else ("hub" if HF_TOKEN else "unconfigured")
return {"ok": True, "mode": mode, "bucket": BUCKET, "prefix": PREFIX}
# ──────────────────────────────────────────────────────────────
# /api/messages
# ──────────────────────────────────────────────────────────────
def _messages_local() -> list[dict[str, str]]:
msg_dir = Path(LOCAL_BUCKET_DIR) / PREFIX
if not msg_dir.is_dir():
return []
items: list[dict[str, str]] = []
for f in sorted(msg_dir.glob("*.md")):
if f.name.lower() == "readme.md":
continue
try:
items.append({"filename": f.name, "content": f.read_text(encoding="utf-8")})
except OSError:
pass
return items
async def _messages_hub() -> list[dict[str, str]]:
if not HF_TOKEN:
raise HTTPException(401, "Server is not configured: set HF_TOKEN.")
client: httpx.AsyncClient = app.state.client
tree_resp = await client.get(f"{HUB}/api/buckets/{BUCKET}/tree/{PREFIX}")
if tree_resp.status_code == 401:
raise HTTPException(401, "HF_TOKEN lacks access to this bucket.")
if not tree_resp.is_success:
raise HTTPException(tree_resp.status_code, f"Hub tree fetch: {tree_resp.text[:200]}")
paths: list[str] = [
e["path"]
for e in tree_resp.json()
if e.get("type") == "file"
and e.get("path", "").endswith(".md")
and not e["path"].lower().endswith("readme.md")
]
async def fetch_one(p: str) -> dict[str, str] | None:
try:
r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/{p}")
if r.status_code != 200:
log.warning("Fetch %s β†’ %s", p, r.status_code)
return None
return {"filename": p.split("/")[-1], "content": r.text}
except Exception as e:
log.warning("Fetch %s failed: %s", p, e)
return None
results = await asyncio.gather(*(fetch_one(p) for p in paths))
return [r for r in results if r is not None]
@app.get("/api/messages")
async def messages() -> dict[str, Any]:
items = _messages_local() if LOCAL_BUCKET_DIR else await _messages_hub()
return {"items": items, "count": len(items)}
def _normalize_refs(refs: list[str]) -> list[str]:
clean_refs = [ref.strip().split("/")[-1] for ref in refs if ref.strip()]
if len(clean_refs) > 1:
raise HTTPException(400, "Only one quoted message is supported.")
for ref in clean_refs:
if not REF_FILENAME_RE.fullmatch(ref) or ref.lower() == "readme.md":
raise HTTPException(400, "Quoted message reference is invalid.")
return clean_refs
def _normalize_human_post(post: MessagePost) -> tuple[str, str, list[str]]:
handle = post.handle.strip().lstrip("@")
body = post.body.strip()
if not HANDLE_RE.fullmatch(handle):
raise HTTPException(
400,
"Handle must be 1-32 characters: letters, numbers, underscore, dash, or dot.",
)
if not body:
raise HTTPException(400, "Message body is required.")
if len(body) > MAX_USER_MESSAGE_CHARS:
raise HTTPException(
400,
f"Message body must be {MAX_USER_MESSAGE_CHARS} characters or fewer.",
)
refs = _normalize_refs(post.refs)
return handle, body, refs
def _format_user_message(handle: str, body: str, refs: list[str]) -> tuple[str, str]:
now = datetime.now(timezone.utc)
filename = f"{now:%Y%m%d-%H%M%S}_human-{handle}_{uuid4().hex[:8]}.md"
frontmatter = [
"---",
f"agent: human:{handle}",
"type: user",
f"timestamp: {now:%Y-%m-%d %H:%M UTC}",
]
if refs:
frontmatter.append(f"refs: {refs[0]}")
content = "\n".join([*frontmatter, "---", "", body, ""])
return filename, content
def _write_message_local(filename: str, content: str) -> None:
msg_dir = Path(LOCAL_BUCKET_DIR) / PREFIX
msg_dir.mkdir(parents=True, exist_ok=True)
(msg_dir / filename).write_text(content, encoding="utf-8")
def _write_message_hub(filename: str, content: str) -> None:
try:
from huggingface_hub import batch_bucket_files
except ImportError as e:
raise RuntimeError("Install huggingface_hub to enable bucket writes.") from e
batch_bucket_files(
BUCKET,
add=[(content.encode("utf-8"), f"{PREFIX}/{filename}")],
token=HF_TOKEN,
)
@app.post("/api/messages")
async def post_message(post: MessagePost) -> dict[str, Any]:
handle, body, refs = _normalize_human_post(post)
filename, content = _format_user_message(handle, body, refs)
if LOCAL_BUCKET_DIR:
try:
_write_message_local(filename, content)
except OSError as e:
log.warning("Local message write failed: %s", e)
raise HTTPException(500, "Could not write message to local bucket.") from e
else:
if not HF_TOKEN:
raise HTTPException(401, "Server is not configured: set HF_TOKEN.")
try:
await asyncio.to_thread(_write_message_hub, filename, content)
except Exception as e:
log.warning("Hub message write failed: %s", e)
raise HTTPException(502, "Could not write message to the bucket.") from e
return {"item": {"filename": filename, "content": content}}
# ──────────────────────────────────────────────────────────────
# /api/leaderboard
# ──────────────────────────────────────────────────────────────
@app.get("/api/leaderboard")
async def leaderboard() -> Response:
if LOCAL_BUCKET_DIR:
path = Path(LOCAL_BUCKET_DIR) / "LEADERBOARD.md"
if not path.is_file():
raise HTTPException(404, "LEADERBOARD.md not found in LOCAL_BUCKET_DIR")
return Response(
content=path.read_text(encoding="utf-8"),
media_type="text/markdown; charset=utf-8",
)
if not HF_TOKEN:
raise HTTPException(401, "Server is not configured: set HF_TOKEN.")
client: httpx.AsyncClient = app.state.client
r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/LEADERBOARD.md")
if r.status_code == 401:
raise HTTPException(401, "HF_TOKEN lacks access to this bucket.")
if not r.is_success:
raise HTTPException(r.status_code, f"Hub returned {r.status_code}")
return Response(content=r.text, media_type="text/markdown; charset=utf-8")
# ──────────────────────────────────────────────────────────────
# Static frontend (mounted last so /api/* keeps priority)
# ──────────────────────────────────────────────────────────────
_static_dir = Path(__file__).parent / "static"
app.mount("/", StaticFiles(directory=str(_static_dir), html=True), name="static")