| """FastAPI server for Efficient Optimizer β Live. |
| |
| Two routes do real work: |
| |
| GET /api/messages β JSON: {"items": [{"filename": "...", "content": "..."}]} |
| One round-trip for the whole message_board folder. |
| POST /api/messages β create a human-authored user message. |
| GET /api/leaderboard β text/markdown: the contents of LEADERBOARD.md |
| |
| A small static mount serves the SPA from `./static/`. |
| |
| Two operating modes, picked from environment variables: |
| |
| β’ Production (deployed Space): |
| HF_TOKEN=hf_xxx # Secret with read/write access to the bucket |
| β fetches from huggingface.co with Authorization: Bearer |
| |
| β’ Local development: |
| LOCAL_BUCKET_DIR=/path/to/efficient-optimizer-collab |
| β reads directly from disk, no network, no auth |
| |
| When neither is set, the API endpoints return 401 with a helpful message. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import os |
| import re |
| from contextlib import asynccontextmanager |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
| from uuid import uuid4 |
|
|
| import httpx |
| from fastapi import FastAPI, HTTPException |
| from fastapi.responses import Response |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel, Field |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
| log = logging.getLogger("efficient-optimizer-live") |
|
|
| BUCKET = os.environ.get("BUCKET", "ml-agent-explorers/efficient-optimizer-collab") |
| PREFIX = os.environ.get("PREFIX", "message_board") |
| HUB = "https://huggingface.co" |
|
|
| LOCAL_BUCKET_DIR = os.environ.get("LOCAL_BUCKET_DIR") |
| HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") |
| HUB_FETCH_TIMEOUT = float(os.environ.get("HUB_FETCH_TIMEOUT", "30.0")) |
| MAX_USER_MESSAGE_CHARS = int(os.environ.get("MAX_USER_MESSAGE_CHARS", "4000")) |
| HANDLE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]{0,31}$") |
| REF_FILENAME_RE = re.compile(r"^[A-Za-z0-9_.-]+\.md$") |
|
|
|
|
| class MessagePost(BaseModel): |
| handle: str = "" |
| body: str = "" |
| refs: list[str] = Field(default_factory=list) |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| headers: dict[str, str] = {} |
| if HF_TOKEN: |
| headers["Authorization"] = f"Bearer {HF_TOKEN}" |
| app.state.client = httpx.AsyncClient( |
| headers=headers, |
| timeout=httpx.Timeout(HUB_FETCH_TIMEOUT), |
| follow_redirects=True, |
| ) |
| if LOCAL_BUCKET_DIR: |
| log.info("Local mode β reading from %s", LOCAL_BUCKET_DIR) |
| elif HF_TOKEN: |
| log.info("Hub mode β fetching from %s with HF_TOKEN", HUB) |
| else: |
| log.warning( |
| "Neither LOCAL_BUCKET_DIR nor HF_TOKEN is set. /api/* will 401." |
| ) |
| try: |
| yield |
| finally: |
| await app.state.client.aclose() |
|
|
|
|
| app = FastAPI(title="Efficient Optimizer Live", lifespan=lifespan) |
|
|
|
|
| |
| |
| |
| @app.get("/api/health") |
| async def health() -> dict[str, Any]: |
| mode = "local" if LOCAL_BUCKET_DIR else ("hub" if HF_TOKEN else "unconfigured") |
| return {"ok": True, "mode": mode, "bucket": BUCKET, "prefix": PREFIX} |
|
|
|
|
| |
| |
| |
| def _messages_local() -> list[dict[str, str]]: |
| msg_dir = Path(LOCAL_BUCKET_DIR) / PREFIX |
| if not msg_dir.is_dir(): |
| return [] |
| items: list[dict[str, str]] = [] |
| for f in sorted(msg_dir.glob("*.md")): |
| if f.name.lower() == "readme.md": |
| continue |
| try: |
| items.append({"filename": f.name, "content": f.read_text(encoding="utf-8")}) |
| except OSError: |
| pass |
| return items |
|
|
|
|
| async def _messages_hub() -> list[dict[str, str]]: |
| if not HF_TOKEN: |
| raise HTTPException(401, "Server is not configured: set HF_TOKEN.") |
| client: httpx.AsyncClient = app.state.client |
|
|
| tree_resp = await client.get(f"{HUB}/api/buckets/{BUCKET}/tree/{PREFIX}") |
| if tree_resp.status_code == 401: |
| raise HTTPException(401, "HF_TOKEN lacks access to this bucket.") |
| if not tree_resp.is_success: |
| raise HTTPException(tree_resp.status_code, f"Hub tree fetch: {tree_resp.text[:200]}") |
|
|
| paths: list[str] = [ |
| e["path"] |
| for e in tree_resp.json() |
| if e.get("type") == "file" |
| and e.get("path", "").endswith(".md") |
| and not e["path"].lower().endswith("readme.md") |
| ] |
|
|
| async def fetch_one(p: str) -> dict[str, str] | None: |
| try: |
| r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/{p}") |
| if r.status_code != 200: |
| log.warning("Fetch %s β %s", p, r.status_code) |
| return None |
| return {"filename": p.split("/")[-1], "content": r.text} |
| except Exception as e: |
| log.warning("Fetch %s failed: %s", p, e) |
| return None |
|
|
| results = await asyncio.gather(*(fetch_one(p) for p in paths)) |
| return [r for r in results if r is not None] |
|
|
|
|
| @app.get("/api/messages") |
| async def messages() -> dict[str, Any]: |
| items = _messages_local() if LOCAL_BUCKET_DIR else await _messages_hub() |
| return {"items": items, "count": len(items)} |
|
|
|
|
| def _normalize_refs(refs: list[str]) -> list[str]: |
| clean_refs = [ref.strip().split("/")[-1] for ref in refs if ref.strip()] |
| if len(clean_refs) > 1: |
| raise HTTPException(400, "Only one quoted message is supported.") |
| for ref in clean_refs: |
| if not REF_FILENAME_RE.fullmatch(ref) or ref.lower() == "readme.md": |
| raise HTTPException(400, "Quoted message reference is invalid.") |
| return clean_refs |
|
|
|
|
| def _normalize_human_post(post: MessagePost) -> tuple[str, str, list[str]]: |
| handle = post.handle.strip().lstrip("@") |
| body = post.body.strip() |
| if not HANDLE_RE.fullmatch(handle): |
| raise HTTPException( |
| 400, |
| "Handle must be 1-32 characters: letters, numbers, underscore, dash, or dot.", |
| ) |
| if not body: |
| raise HTTPException(400, "Message body is required.") |
| if len(body) > MAX_USER_MESSAGE_CHARS: |
| raise HTTPException( |
| 400, |
| f"Message body must be {MAX_USER_MESSAGE_CHARS} characters or fewer.", |
| ) |
| refs = _normalize_refs(post.refs) |
| return handle, body, refs |
|
|
|
|
| def _format_user_message(handle: str, body: str, refs: list[str]) -> tuple[str, str]: |
| now = datetime.now(timezone.utc) |
| filename = f"{now:%Y%m%d-%H%M%S}_human-{handle}_{uuid4().hex[:8]}.md" |
| frontmatter = [ |
| "---", |
| f"agent: human:{handle}", |
| "type: user", |
| f"timestamp: {now:%Y-%m-%d %H:%M UTC}", |
| ] |
| if refs: |
| frontmatter.append(f"refs: {refs[0]}") |
| content = "\n".join([*frontmatter, "---", "", body, ""]) |
| return filename, content |
|
|
|
|
| def _write_message_local(filename: str, content: str) -> None: |
| msg_dir = Path(LOCAL_BUCKET_DIR) / PREFIX |
| msg_dir.mkdir(parents=True, exist_ok=True) |
| (msg_dir / filename).write_text(content, encoding="utf-8") |
|
|
|
|
| def _write_message_hub(filename: str, content: str) -> None: |
| try: |
| from huggingface_hub import batch_bucket_files |
| except ImportError as e: |
| raise RuntimeError("Install huggingface_hub to enable bucket writes.") from e |
|
|
| batch_bucket_files( |
| BUCKET, |
| add=[(content.encode("utf-8"), f"{PREFIX}/{filename}")], |
| token=HF_TOKEN, |
| ) |
|
|
|
|
| @app.post("/api/messages") |
| async def post_message(post: MessagePost) -> dict[str, Any]: |
| handle, body, refs = _normalize_human_post(post) |
| filename, content = _format_user_message(handle, body, refs) |
| if LOCAL_BUCKET_DIR: |
| try: |
| _write_message_local(filename, content) |
| except OSError as e: |
| log.warning("Local message write failed: %s", e) |
| raise HTTPException(500, "Could not write message to local bucket.") from e |
| else: |
| if not HF_TOKEN: |
| raise HTTPException(401, "Server is not configured: set HF_TOKEN.") |
| try: |
| await asyncio.to_thread(_write_message_hub, filename, content) |
| except Exception as e: |
| log.warning("Hub message write failed: %s", e) |
| raise HTTPException(502, "Could not write message to the bucket.") from e |
| return {"item": {"filename": filename, "content": content}} |
|
|
|
|
| |
| |
| |
| @app.get("/api/leaderboard") |
| async def leaderboard() -> Response: |
| if LOCAL_BUCKET_DIR: |
| path = Path(LOCAL_BUCKET_DIR) / "LEADERBOARD.md" |
| if not path.is_file(): |
| raise HTTPException(404, "LEADERBOARD.md not found in LOCAL_BUCKET_DIR") |
| return Response( |
| content=path.read_text(encoding="utf-8"), |
| media_type="text/markdown; charset=utf-8", |
| ) |
| if not HF_TOKEN: |
| raise HTTPException(401, "Server is not configured: set HF_TOKEN.") |
| client: httpx.AsyncClient = app.state.client |
| r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/LEADERBOARD.md") |
| if r.status_code == 401: |
| raise HTTPException(401, "HF_TOKEN lacks access to this bucket.") |
| if not r.is_success: |
| raise HTTPException(r.status_code, f"Hub returned {r.status_code}") |
| return Response(content=r.text, media_type="text/markdown; charset=utf-8") |
|
|
|
|
| |
| |
| |
| _static_dir = Path(__file__).parent / "static" |
| app.mount("/", StaticFiles(directory=str(_static_dir), html=True), name="static") |
|
|