"""FastAPI server for Efficient Optimizer — Live. Two routes do real work: GET /api/messages → JSON: {"items": [{"filename": "...", "content": "..."}]} One round-trip for the whole message_board folder. POST /api/messages → create a human-authored user message. GET /api/leaderboard → text/markdown: the contents of LEADERBOARD.md A small static mount serves the SPA from `./static/`. Two operating modes, picked from environment variables: • Production (deployed Space): HF_TOKEN=hf_xxx # Secret with read/write access to the bucket → fetches from huggingface.co with Authorization: Bearer • Local development: LOCAL_BUCKET_DIR=/path/to/efficient-optimizer-collab → reads directly from disk, no network, no auth When neither is set, the API endpoints return 401 with a helpful message. """ from __future__ import annotations import asyncio import logging import os import re from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any from uuid import uuid4 import httpx from fastapi import FastAPI, HTTPException from fastapi.responses import Response from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("efficient-optimizer-live") BUCKET = os.environ.get("BUCKET", "ml-agent-explorers/efficient-optimizer-collab") PREFIX = os.environ.get("PREFIX", "message_board") HUB = "https://huggingface.co" LOCAL_BUCKET_DIR = os.environ.get("LOCAL_BUCKET_DIR") HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") HUB_FETCH_TIMEOUT = float(os.environ.get("HUB_FETCH_TIMEOUT", "30.0")) MAX_USER_MESSAGE_CHARS = int(os.environ.get("MAX_USER_MESSAGE_CHARS", "4000")) HANDLE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]{0,31}$") REF_FILENAME_RE = re.compile(r"^[A-Za-z0-9_.-]+\.md$") class MessagePost(BaseModel): handle: str = "" body: str = "" refs: list[str] = Field(default_factory=list) @asynccontextmanager async def lifespan(app: FastAPI): headers: dict[str, str] = {} if HF_TOKEN: headers["Authorization"] = f"Bearer {HF_TOKEN}" app.state.client = httpx.AsyncClient( headers=headers, timeout=httpx.Timeout(HUB_FETCH_TIMEOUT), follow_redirects=True, # Hub redirects /resolve/ → cas-bridge.xethub ) if LOCAL_BUCKET_DIR: log.info("Local mode — reading from %s", LOCAL_BUCKET_DIR) elif HF_TOKEN: log.info("Hub mode — fetching from %s with HF_TOKEN", HUB) else: log.warning( "Neither LOCAL_BUCKET_DIR nor HF_TOKEN is set. /api/* will 401." ) try: yield finally: await app.state.client.aclose() app = FastAPI(title="Efficient Optimizer Live", lifespan=lifespan) # ────────────────────────────────────────────────────────────── # Health # ────────────────────────────────────────────────────────────── @app.get("/api/health") async def health() -> dict[str, Any]: mode = "local" if LOCAL_BUCKET_DIR else ("hub" if HF_TOKEN else "unconfigured") return {"ok": True, "mode": mode, "bucket": BUCKET, "prefix": PREFIX} # ────────────────────────────────────────────────────────────── # /api/messages # ────────────────────────────────────────────────────────────── def _messages_local() -> list[dict[str, str]]: msg_dir = Path(LOCAL_BUCKET_DIR) / PREFIX if not msg_dir.is_dir(): return [] items: list[dict[str, str]] = [] for f in sorted(msg_dir.glob("*.md")): if f.name.lower() == "readme.md": continue try: items.append({"filename": f.name, "content": f.read_text(encoding="utf-8")}) except OSError: pass return items async def _messages_hub() -> list[dict[str, str]]: if not HF_TOKEN: raise HTTPException(401, "Server is not configured: set HF_TOKEN.") client: httpx.AsyncClient = app.state.client tree_resp = await client.get(f"{HUB}/api/buckets/{BUCKET}/tree/{PREFIX}") if tree_resp.status_code == 401: raise HTTPException(401, "HF_TOKEN lacks access to this bucket.") if not tree_resp.is_success: raise HTTPException(tree_resp.status_code, f"Hub tree fetch: {tree_resp.text[:200]}") paths: list[str] = [ e["path"] for e in tree_resp.json() if e.get("type") == "file" and e.get("path", "").endswith(".md") and not e["path"].lower().endswith("readme.md") ] async def fetch_one(p: str) -> dict[str, str] | None: try: r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/{p}") if r.status_code != 200: log.warning("Fetch %s → %s", p, r.status_code) return None return {"filename": p.split("/")[-1], "content": r.text} except Exception as e: log.warning("Fetch %s failed: %s", p, e) return None results = await asyncio.gather(*(fetch_one(p) for p in paths)) return [r for r in results if r is not None] @app.get("/api/messages") async def messages() -> dict[str, Any]: items = _messages_local() if LOCAL_BUCKET_DIR else await _messages_hub() return {"items": items, "count": len(items)} def _normalize_refs(refs: list[str]) -> list[str]: clean_refs = [ref.strip().split("/")[-1] for ref in refs if ref.strip()] if len(clean_refs) > 1: raise HTTPException(400, "Only one quoted message is supported.") for ref in clean_refs: if not REF_FILENAME_RE.fullmatch(ref) or ref.lower() == "readme.md": raise HTTPException(400, "Quoted message reference is invalid.") return clean_refs def _normalize_human_post(post: MessagePost) -> tuple[str, str, list[str]]: handle = post.handle.strip().lstrip("@") body = post.body.strip() if not HANDLE_RE.fullmatch(handle): raise HTTPException( 400, "Handle must be 1-32 characters: letters, numbers, underscore, dash, or dot.", ) if not body: raise HTTPException(400, "Message body is required.") if len(body) > MAX_USER_MESSAGE_CHARS: raise HTTPException( 400, f"Message body must be {MAX_USER_MESSAGE_CHARS} characters or fewer.", ) refs = _normalize_refs(post.refs) return handle, body, refs def _format_user_message(handle: str, body: str, refs: list[str]) -> tuple[str, str]: now = datetime.now(timezone.utc) filename = f"{now:%Y%m%d-%H%M%S}_human-{handle}_{uuid4().hex[:8]}.md" frontmatter = [ "---", f"agent: human:{handle}", "type: user", f"timestamp: {now:%Y-%m-%d %H:%M UTC}", ] if refs: frontmatter.append(f"refs: {refs[0]}") content = "\n".join([*frontmatter, "---", "", body, ""]) return filename, content def _write_message_local(filename: str, content: str) -> None: msg_dir = Path(LOCAL_BUCKET_DIR) / PREFIX msg_dir.mkdir(parents=True, exist_ok=True) (msg_dir / filename).write_text(content, encoding="utf-8") def _write_message_hub(filename: str, content: str) -> None: try: from huggingface_hub import batch_bucket_files except ImportError as e: raise RuntimeError("Install huggingface_hub to enable bucket writes.") from e batch_bucket_files( BUCKET, add=[(content.encode("utf-8"), f"{PREFIX}/{filename}")], token=HF_TOKEN, ) @app.post("/api/messages") async def post_message(post: MessagePost) -> dict[str, Any]: handle, body, refs = _normalize_human_post(post) filename, content = _format_user_message(handle, body, refs) if LOCAL_BUCKET_DIR: try: _write_message_local(filename, content) except OSError as e: log.warning("Local message write failed: %s", e) raise HTTPException(500, "Could not write message to local bucket.") from e else: if not HF_TOKEN: raise HTTPException(401, "Server is not configured: set HF_TOKEN.") try: await asyncio.to_thread(_write_message_hub, filename, content) except Exception as e: log.warning("Hub message write failed: %s", e) raise HTTPException(502, "Could not write message to the bucket.") from e return {"item": {"filename": filename, "content": content}} # ────────────────────────────────────────────────────────────── # /api/leaderboard # ────────────────────────────────────────────────────────────── @app.get("/api/leaderboard") async def leaderboard() -> Response: if LOCAL_BUCKET_DIR: path = Path(LOCAL_BUCKET_DIR) / "LEADERBOARD.md" if not path.is_file(): raise HTTPException(404, "LEADERBOARD.md not found in LOCAL_BUCKET_DIR") return Response( content=path.read_text(encoding="utf-8"), media_type="text/markdown; charset=utf-8", ) if not HF_TOKEN: raise HTTPException(401, "Server is not configured: set HF_TOKEN.") client: httpx.AsyncClient = app.state.client r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/LEADERBOARD.md") if r.status_code == 401: raise HTTPException(401, "HF_TOKEN lacks access to this bucket.") if not r.is_success: raise HTTPException(r.status_code, f"Hub returned {r.status_code}") return Response(content=r.text, media_type="text/markdown; charset=utf-8") # ────────────────────────────────────────────────────────────── # Static frontend (mounted last so /api/* keeps priority) # ────────────────────────────────────────────────────────────── _static_dir = Path(__file__).parent / "static" app.mount("/", StaticFiles(directory=str(_static_dir), html=True), name="static")