Spaces:
Running on Zero
Running on Zero
| """HuggingWizards — co-op pixel wizard arena with a Nemotron-4B Game Master. | |
| Single process: | |
| * FastAPI serves the canvas client (`/`) and a WebSocket (`/ws`). | |
| * A ~20 Hz asyncio task runs the authoritative simulation on CPU and | |
| broadcasts snapshots to every connected client (players + spectators). | |
| * At each round boundary the Game Master (Nemotron, burst GPU) decides rewards | |
| and the next round; every decision is logged as an agent trace. | |
| * A Gradio dashboard is mounted at `/dashboard` (live traces + leaderboard). | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import contextlib | |
| import json | |
| import os | |
| import time | |
| import uuid | |
| # Gradio 5+ auto-enables SSR on Spaces: a Node server takes port 7860 and | |
| # pushes the Python server to 7861, colliding with our own uvicorn bind | |
| # ("[Errno 98] address already in use"). We serve Gradio mounted INSIDE | |
| # FastAPI, so SSR must be off. Set before gradio import; assignment (not | |
| # setdefault) because Spaces exports GRADIO_SSR_MODE=true. | |
| os.environ["GRADIO_SSR_MODE"] = "false" | |
| # On ZeroGPU, `spaces` must be imported before any CUDA-touching library. | |
| with contextlib.suppress(Exception): # not installed in local dev | |
| import spaces # noqa: F401 | |
| import gradio as gr | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from game import gamemaster | |
| from game.engine import Engine, MAX_PLAYERS, TICK_HZ | |
| HERE = os.path.dirname(__file__) | |
| engine = Engine() | |
| # Active websocket connections: ws -> {"pid", "role", "name", "char"} | |
| clients: dict[WebSocket, dict] = {} | |
| # Ordered waiting line for when the 8-slot arena is full. | |
| queue: list[WebSocket] = [] | |
| # ---- persistent leaderboard ---------------------------------------------- | |
| LEADERBOARD_FILE = os.path.join(HERE, "traces", "leaderboard.json") | |
| _leaderboard: list[dict] = [] | |
| def _load_leaderboard(): | |
| global _leaderboard | |
| try: | |
| with open(LEADERBOARD_FILE, encoding="utf-8") as f: | |
| _leaderboard = json.load(f) | |
| except Exception: | |
| _leaderboard = [] | |
| def _save_leaderboard(): | |
| with contextlib.suppress(Exception): | |
| with open(LEADERBOARD_FILE, "w", encoding="utf-8") as f: | |
| json.dump(_leaderboard[:100], f, indent=2) | |
| def add_score(name: str, gold: int, level: int, wave: int) -> list[dict]: | |
| _leaderboard.append({ | |
| "name": str(name)[:16] or "Wizard", "gold": int(gold), | |
| "level": int(level), "wave": int(wave), | |
| "ts": time.strftime("%Y-%m-%d %H:%M"), | |
| }) | |
| _leaderboard.sort(key=lambda e: (e["gold"], e["wave"], e["level"]), reverse=True) | |
| del _leaderboard[100:] | |
| _save_leaderboard() | |
| return _leaderboard[:20] | |
| _load_leaderboard() | |
| async def broadcast(payload: dict): | |
| if not clients: | |
| return | |
| msg = json.dumps(payload) | |
| dead = [] | |
| for ws in list(clients.keys()): | |
| try: | |
| await ws.send_text(msg) | |
| except Exception: | |
| dead.append(ws) | |
| for ws in dead: | |
| _drop(ws) | |
| def _drop(ws: WebSocket): | |
| info = clients.pop(ws, None) | |
| if info and info.get("pid"): | |
| engine.remove_player(info["pid"]) | |
| if not engine.players and engine.status != "lobby": | |
| engine.status = "lobby" | |
| engine.status_msg = "All wizards left. Waiting for players..." | |
| async def _send(ws: WebSocket, payload: dict): | |
| with contextlib.suppress(Exception): | |
| await ws.send_text(json.dumps(payload)) | |
| async def handle_skill_request(ws: WebSocket, pid: str, prompt: str): | |
| """The round's top wizard asks the Game Master for a custom power-up.""" | |
| if engine.status != "intermission" or engine.skill_request_used \ | |
| or engine.skill_request_pid != pid or not prompt.strip(): | |
| await _send(ws, {"type": "skill_result", "ok": False, | |
| "reason": "Not available right now."}) | |
| return | |
| engine.skill_request_used = True # one wish per intermission | |
| ctx = {"round": engine.round, "player_level": engine.players[pid].level | |
| if pid in engine.players else 1} | |
| spec = await asyncio.to_thread(gamemaster.generate_skill, prompt, ctx) | |
| granted = engine.add_runtime_skill(pid, spec) if spec else None | |
| await _send(ws, {"type": "skill_result", "ok": bool(granted), "skill": granted, | |
| "reason": None if granted else "The arcane energies fizzled — try again."}) | |
| def _ws_for_pid(pid: str): | |
| for ws, info in clients.items(): | |
| if info.get("pid") == pid: | |
| return ws | |
| return None | |
| async def _broadcast_queue(): | |
| """Tell each queued connection its current place in line.""" | |
| for i, ws in enumerate(list(queue)): | |
| await _send(ws, {"type": "queue", "pos": i + 1, "size": len(queue)}) | |
| async def promote_queue(): | |
| """Fill open arena slots from the front of the queue.""" | |
| while engine.can_join() and queue: | |
| ws = queue.pop(0) | |
| info = clients.get(ws) | |
| if info is None: # disconnected while waiting | |
| continue | |
| pid = uuid.uuid4().hex[:8] | |
| engine.add_player(pid, info.get("name", "Wizard"), info.get("char", "warrior")) | |
| clients[ws] = {**info, "pid": pid, "role": "player"} | |
| await _send(ws, {"type": "welcome", "pid": pid, "role": "player", "reason": "from_queue"}) | |
| async def manage_slots(): | |
| """Eliminate out-of-lives wizards, then promote the queue into free slots.""" | |
| for pid, p in list(engine.players.items()): | |
| if p.lives <= 0 and not p.alive: | |
| ws = _ws_for_pid(pid) | |
| engine.remove_player(pid) | |
| if ws is not None: | |
| info = clients.get(ws, {}) | |
| clients[ws] = {**info, "pid": None, "role": "spectator"} | |
| await _send(ws, {"type": "eliminated", | |
| "score": {"gold": p.gold, "level": p.level, "wave": engine.round}}) | |
| if not engine.players and engine.status != "lobby": | |
| engine.status = "lobby" | |
| engine.status_msg = "Waiting for wizards to join..." | |
| await promote_queue() | |
| await _broadcast_queue() | |
| async def game_loop(): | |
| last = time.time() | |
| prev_status = engine.status | |
| gm_task: asyncio.Task | None = None | |
| gm_applied = False | |
| interval = 1.0 / TICK_HZ | |
| slot_accum = 0.0 | |
| while True: | |
| now = time.time() | |
| dt = min(0.1, now - last) | |
| last = now | |
| engine.tick(dt) | |
| # eliminate / promote queue / broadcast positions a few times a second | |
| slot_accum += dt | |
| if slot_accum >= 0.4: | |
| slot_accum = 0.0 | |
| await manage_slots() | |
| # Round just ended -> ask the Game Master (off the loop thread). | |
| if engine.status == "intermission" and prev_status == "active": | |
| summary = engine.round_summary() | |
| prev_cfg = dict(engine.cfg) | |
| gm_task = asyncio.create_task( | |
| asyncio.to_thread(gamemaster.decide, summary, prev_cfg) | |
| ) | |
| gm_applied = False | |
| # Apply the decision as soon as it's ready (enables shopping this break). | |
| if gm_task is not None and gm_task.done() and not gm_applied: | |
| with contextlib.suppress(Exception): | |
| engine.apply_gm_decision(gm_task.result()) | |
| gm_applied = True | |
| # Start the next round once the break is over AND the decision is in. | |
| if engine.status == "intermission" and gm_applied and time.time() >= engine.intermission_until: | |
| if engine.players: | |
| engine.start_round() | |
| else: | |
| engine.status = "lobby" | |
| engine.status_msg = "Waiting for wizards to join..." | |
| gm_task = None | |
| gm_applied = False | |
| prev_status = engine.status | |
| await broadcast(engine.snapshot()) | |
| await asyncio.sleep(max(0, interval - (time.time() - now))) | |
| # -------------------------------------------------------------------------- | |
| # FastAPI app | |
| # -------------------------------------------------------------------------- | |
| app = FastAPI(title="HuggingWizards") | |
| _loop_task: asyncio.Task | None = None | |
| def ensure_loop(): | |
| """Start the game loop once. Robust to Gradio overriding the app lifespan.""" | |
| global _loop_task | |
| if _loop_task is None or _loop_task.done(): | |
| _loop_task = asyncio.create_task(game_loop()) | |
| async def index(): | |
| return FileResponse(os.path.join(HERE, "static", "index.html")) | |
| async def traces(): | |
| """Recent Game Master agent traces, trimmed for the in-game panel.""" | |
| out = [] | |
| for t in gamemaster.RECENT_TRACES[:15]: | |
| out.append({ | |
| "trace_id": t.get("trace_id"), "round": t.get("round"), | |
| "ts": t.get("ts"), "model": t.get("model"), | |
| "source": t.get("source"), "latency_sec": t.get("latency_sec"), | |
| "kind": t.get("kind", "round_decision"), "error": t.get("error"), | |
| "mercy": t.get("mercy"), | |
| "raw_output": str(t.get("raw_output", ""))[:600], | |
| "decision": t.get("decision"), | |
| }) | |
| return {"traces": out} | |
| async def ws_endpoint(ws: WebSocket): | |
| await ws.accept() | |
| ensure_loop() | |
| clients[ws] = {"pid": None, "role": "spectator", "name": "Wizard", "char": "warrior"} | |
| try: | |
| while True: | |
| data = json.loads(await ws.receive_text()) | |
| mtype = data.get("type") | |
| if mtype == "join": | |
| name = str(data.get("name", "Wizard"))[:16] or "Wizard" | |
| char = str(data.get("char", "warrior"))[:16] or "soldier" | |
| clients[ws]["name"] = name | |
| clients[ws]["char"] = char | |
| if clients[ws]["pid"]: | |
| continue | |
| if engine.can_join(): | |
| pid = uuid.uuid4().hex[:8] | |
| engine.add_player(pid, name, char) | |
| clients[ws] = {**clients[ws], "pid": pid, "role": "player"} | |
| await _send(ws, {"type": "welcome", "pid": pid, "role": "player"}) | |
| else: | |
| # game full -> join the queue | |
| if ws not in queue: | |
| queue.append(ws) | |
| pos = queue.index(ws) + 1 | |
| await _send(ws, {"type": "welcome", "pid": None, "role": "spectator", | |
| "reason": "queued", "pos": pos, "size": len(queue)}) | |
| elif mtype == "spectate": | |
| clients[ws]["name"] = str(data.get("name", "Wizard"))[:16] or "Wizard" | |
| clients[ws]["char"] = str(data.get("char", "warrior"))[:16] or "soldier" | |
| elif mtype == "leave_queue": | |
| if ws in queue: | |
| queue.remove(ws) | |
| elif mtype == "get_leaderboard": | |
| await _send(ws, {"type": "leaderboard", "top": _leaderboard[:20]}) | |
| elif mtype == "submit_score": | |
| top = add_score(str(data.get("name", "Wizard")), int(data.get("gold", 0)), | |
| int(data.get("level", 1)), int(data.get("wave", 0))) | |
| await _send(ws, {"type": "leaderboard", "top": top, "submitted": True}) | |
| elif mtype == "input" and clients[ws]["pid"]: | |
| engine.set_input(clients[ws]["pid"], data) | |
| elif mtype == "choose_card" and clients[ws]["pid"]: | |
| engine.choose_card(clients[ws]["pid"], data.get("key", "")) | |
| elif mtype == "request_skill" and clients[ws]["pid"]: | |
| await handle_skill_request(ws, clients[ws]["pid"], str(data.get("prompt", ""))) | |
| elif mtype == "start" and clients[ws]["pid"]: | |
| if engine.status == "lobby": | |
| engine.start_round() | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception: | |
| pass | |
| finally: | |
| if ws in queue: | |
| queue.remove(ws) | |
| _drop(ws) | |
| app.mount("/static", StaticFiles(directory=os.path.join(HERE, "static")), name="static") | |
| # -------------------------------------------------------------------------- | |
| # Gradio dashboard (mounted at /dashboard) | |
| # -------------------------------------------------------------------------- | |
| def _dashboard_state(): | |
| players = sorted(engine.players.values(), key=lambda p: p.gold, reverse=True) | |
| if players: | |
| lb = "| Wizard | Level | Gold | Last-round dmg | Kills |\n|---|---|---|---|---|\n" | |
| lb += "\n".join( | |
| f"| {p.name} | {p.level} | {p.gold} | {round(p.dmg_dealt)} | {p.kills} |" | |
| for p in players | |
| ) | |
| else: | |
| lb = "_No wizards in the arena yet._" | |
| header = (f"### Round {engine.round} — `{engine.status}`\n" | |
| f"{engine.status_msg}\n\n" | |
| f"Players: {len(engine.players)}/{MAX_PLAYERS} · " | |
| f"GM: _{engine.gm_message or '—'}_") | |
| traces = gamemaster.RECENT_TRACES[:10] | |
| return header, lb, traces | |
| with gr.Blocks(title="HuggingWizards — Game Master Dashboard") as demo: | |
| gr.Markdown("# 🧙 HuggingWizards — Game Master Dashboard\n" | |
| "Play at the [arena](/). Below: live state, leaderboard, and the " | |
| "per-round **agent traces** produced by Nemotron-4B.") | |
| header_md = gr.Markdown() | |
| with gr.Row(): | |
| leaderboard_md = gr.Markdown() | |
| gr.Markdown("## Agent traces (most recent first)") | |
| traces_json = gr.JSON() | |
| timer = gr.Timer(2.0) | |
| timer.tick(_dashboard_state, outputs=[header_md, leaderboard_md, traces_json]) | |
| demo.load(_dashboard_state, outputs=[header_md, leaderboard_md, traces_json]) | |
| app = gr.mount_gradio_app(app, demo, path="/dashboard", ssr_mode=False) | |
| # ---- ZeroGPU startup report ------------------------------------------------ | |
| # The `spaces` package sends its "this app has @spaces.GPU functions" report | |
| # to the ZeroGPU controller via a hook it installs on gr.Blocks.launch(). We | |
| # serve everything through uvicorn instead of demo.launch(), so that hook | |
| # never fires and ZeroGPU kills the Space with "No @spaces.GPU function | |
| # detected during startup". Invoke the same startup task manually. It packs | |
| # the (already loaded) model for fast GPU transfer and phones home; off | |
| # Spaces, `spaces.zero` doesn't define `startup` and this is a no-op. | |
| with contextlib.suppress(Exception): | |
| from spaces import zero as _spaces_zero # type: ignore | |
| if hasattr(_spaces_zero, "startup"): | |
| _spaces_zero.startup() | |
| print("[app] ZeroGPU startup report sent") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Spaces expects the app on 7860 (it exports GRADIO_SERVER_PORT). | |
| port = int(os.environ.get("PORT") or os.environ.get("GRADIO_SERVER_PORT") or 7860) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |