HuggingWizards / app.py
Quazim0t0's picture
Upload 127 files
34f4ada verified
Raw
History Blame Contribute Delete
14.8 kB
"""HuggingWizards — co-op pixel wizard arena with a Nemotron-4B Game Master.
Single process:
* FastAPI serves the canvas client (`/`) and a WebSocket (`/ws`).
* A ~20 Hz asyncio task runs the authoritative simulation on CPU and
broadcasts snapshots to every connected client (players + spectators).
* At each round boundary the Game Master (Nemotron, burst GPU) decides rewards
and the next round; every decision is logged as an agent trace.
* A Gradio dashboard is mounted at `/dashboard` (live traces + leaderboard).
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import os
import time
import uuid
# Gradio 5+ auto-enables SSR on Spaces: a Node server takes port 7860 and
# pushes the Python server to 7861, colliding with our own uvicorn bind
# ("[Errno 98] address already in use"). We serve Gradio mounted INSIDE
# FastAPI, so SSR must be off. Set before gradio import; assignment (not
# setdefault) because Spaces exports GRADIO_SSR_MODE=true.
os.environ["GRADIO_SSR_MODE"] = "false"
# On ZeroGPU, `spaces` must be imported before any CUDA-touching library.
with contextlib.suppress(Exception): # not installed in local dev
import spaces # noqa: F401
import gradio as gr
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from game import gamemaster
from game.engine import Engine, MAX_PLAYERS, TICK_HZ
HERE = os.path.dirname(__file__)
engine = Engine()
# Active websocket connections: ws -> {"pid", "role", "name", "char"}
clients: dict[WebSocket, dict] = {}
# Ordered waiting line for when the 8-slot arena is full.
queue: list[WebSocket] = []
# ---- persistent leaderboard ----------------------------------------------
LEADERBOARD_FILE = os.path.join(HERE, "traces", "leaderboard.json")
_leaderboard: list[dict] = []
def _load_leaderboard():
global _leaderboard
try:
with open(LEADERBOARD_FILE, encoding="utf-8") as f:
_leaderboard = json.load(f)
except Exception:
_leaderboard = []
def _save_leaderboard():
with contextlib.suppress(Exception):
with open(LEADERBOARD_FILE, "w", encoding="utf-8") as f:
json.dump(_leaderboard[:100], f, indent=2)
def add_score(name: str, gold: int, level: int, wave: int) -> list[dict]:
_leaderboard.append({
"name": str(name)[:16] or "Wizard", "gold": int(gold),
"level": int(level), "wave": int(wave),
"ts": time.strftime("%Y-%m-%d %H:%M"),
})
_leaderboard.sort(key=lambda e: (e["gold"], e["wave"], e["level"]), reverse=True)
del _leaderboard[100:]
_save_leaderboard()
return _leaderboard[:20]
_load_leaderboard()
async def broadcast(payload: dict):
if not clients:
return
msg = json.dumps(payload)
dead = []
for ws in list(clients.keys()):
try:
await ws.send_text(msg)
except Exception:
dead.append(ws)
for ws in dead:
_drop(ws)
def _drop(ws: WebSocket):
info = clients.pop(ws, None)
if info and info.get("pid"):
engine.remove_player(info["pid"])
if not engine.players and engine.status != "lobby":
engine.status = "lobby"
engine.status_msg = "All wizards left. Waiting for players..."
async def _send(ws: WebSocket, payload: dict):
with contextlib.suppress(Exception):
await ws.send_text(json.dumps(payload))
async def handle_skill_request(ws: WebSocket, pid: str, prompt: str):
"""The round's top wizard asks the Game Master for a custom power-up."""
if engine.status != "intermission" or engine.skill_request_used \
or engine.skill_request_pid != pid or not prompt.strip():
await _send(ws, {"type": "skill_result", "ok": False,
"reason": "Not available right now."})
return
engine.skill_request_used = True # one wish per intermission
ctx = {"round": engine.round, "player_level": engine.players[pid].level
if pid in engine.players else 1}
spec = await asyncio.to_thread(gamemaster.generate_skill, prompt, ctx)
granted = engine.add_runtime_skill(pid, spec) if spec else None
await _send(ws, {"type": "skill_result", "ok": bool(granted), "skill": granted,
"reason": None if granted else "The arcane energies fizzled — try again."})
def _ws_for_pid(pid: str):
for ws, info in clients.items():
if info.get("pid") == pid:
return ws
return None
async def _broadcast_queue():
"""Tell each queued connection its current place in line."""
for i, ws in enumerate(list(queue)):
await _send(ws, {"type": "queue", "pos": i + 1, "size": len(queue)})
async def promote_queue():
"""Fill open arena slots from the front of the queue."""
while engine.can_join() and queue:
ws = queue.pop(0)
info = clients.get(ws)
if info is None: # disconnected while waiting
continue
pid = uuid.uuid4().hex[:8]
engine.add_player(pid, info.get("name", "Wizard"), info.get("char", "warrior"))
clients[ws] = {**info, "pid": pid, "role": "player"}
await _send(ws, {"type": "welcome", "pid": pid, "role": "player", "reason": "from_queue"})
async def manage_slots():
"""Eliminate out-of-lives wizards, then promote the queue into free slots."""
for pid, p in list(engine.players.items()):
if p.lives <= 0 and not p.alive:
ws = _ws_for_pid(pid)
engine.remove_player(pid)
if ws is not None:
info = clients.get(ws, {})
clients[ws] = {**info, "pid": None, "role": "spectator"}
await _send(ws, {"type": "eliminated",
"score": {"gold": p.gold, "level": p.level, "wave": engine.round}})
if not engine.players and engine.status != "lobby":
engine.status = "lobby"
engine.status_msg = "Waiting for wizards to join..."
await promote_queue()
await _broadcast_queue()
async def game_loop():
last = time.time()
prev_status = engine.status
gm_task: asyncio.Task | None = None
gm_applied = False
interval = 1.0 / TICK_HZ
slot_accum = 0.0
while True:
now = time.time()
dt = min(0.1, now - last)
last = now
engine.tick(dt)
# eliminate / promote queue / broadcast positions a few times a second
slot_accum += dt
if slot_accum >= 0.4:
slot_accum = 0.0
await manage_slots()
# Round just ended -> ask the Game Master (off the loop thread).
if engine.status == "intermission" and prev_status == "active":
summary = engine.round_summary()
prev_cfg = dict(engine.cfg)
gm_task = asyncio.create_task(
asyncio.to_thread(gamemaster.decide, summary, prev_cfg)
)
gm_applied = False
# Apply the decision as soon as it's ready (enables shopping this break).
if gm_task is not None and gm_task.done() and not gm_applied:
with contextlib.suppress(Exception):
engine.apply_gm_decision(gm_task.result())
gm_applied = True
# Start the next round once the break is over AND the decision is in.
if engine.status == "intermission" and gm_applied and time.time() >= engine.intermission_until:
if engine.players:
engine.start_round()
else:
engine.status = "lobby"
engine.status_msg = "Waiting for wizards to join..."
gm_task = None
gm_applied = False
prev_status = engine.status
await broadcast(engine.snapshot())
await asyncio.sleep(max(0, interval - (time.time() - now)))
# --------------------------------------------------------------------------
# FastAPI app
# --------------------------------------------------------------------------
app = FastAPI(title="HuggingWizards")
_loop_task: asyncio.Task | None = None
def ensure_loop():
"""Start the game loop once. Robust to Gradio overriding the app lifespan."""
global _loop_task
if _loop_task is None or _loop_task.done():
_loop_task = asyncio.create_task(game_loop())
@app.get("/")
async def index():
return FileResponse(os.path.join(HERE, "static", "index.html"))
@app.get("/traces")
async def traces():
"""Recent Game Master agent traces, trimmed for the in-game panel."""
out = []
for t in gamemaster.RECENT_TRACES[:15]:
out.append({
"trace_id": t.get("trace_id"), "round": t.get("round"),
"ts": t.get("ts"), "model": t.get("model"),
"source": t.get("source"), "latency_sec": t.get("latency_sec"),
"kind": t.get("kind", "round_decision"), "error": t.get("error"),
"mercy": t.get("mercy"),
"raw_output": str(t.get("raw_output", ""))[:600],
"decision": t.get("decision"),
})
return {"traces": out}
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):
await ws.accept()
ensure_loop()
clients[ws] = {"pid": None, "role": "spectator", "name": "Wizard", "char": "warrior"}
try:
while True:
data = json.loads(await ws.receive_text())
mtype = data.get("type")
if mtype == "join":
name = str(data.get("name", "Wizard"))[:16] or "Wizard"
char = str(data.get("char", "warrior"))[:16] or "soldier"
clients[ws]["name"] = name
clients[ws]["char"] = char
if clients[ws]["pid"]:
continue
if engine.can_join():
pid = uuid.uuid4().hex[:8]
engine.add_player(pid, name, char)
clients[ws] = {**clients[ws], "pid": pid, "role": "player"}
await _send(ws, {"type": "welcome", "pid": pid, "role": "player"})
else:
# game full -> join the queue
if ws not in queue:
queue.append(ws)
pos = queue.index(ws) + 1
await _send(ws, {"type": "welcome", "pid": None, "role": "spectator",
"reason": "queued", "pos": pos, "size": len(queue)})
elif mtype == "spectate":
clients[ws]["name"] = str(data.get("name", "Wizard"))[:16] or "Wizard"
clients[ws]["char"] = str(data.get("char", "warrior"))[:16] or "soldier"
elif mtype == "leave_queue":
if ws in queue:
queue.remove(ws)
elif mtype == "get_leaderboard":
await _send(ws, {"type": "leaderboard", "top": _leaderboard[:20]})
elif mtype == "submit_score":
top = add_score(str(data.get("name", "Wizard")), int(data.get("gold", 0)),
int(data.get("level", 1)), int(data.get("wave", 0)))
await _send(ws, {"type": "leaderboard", "top": top, "submitted": True})
elif mtype == "input" and clients[ws]["pid"]:
engine.set_input(clients[ws]["pid"], data)
elif mtype == "choose_card" and clients[ws]["pid"]:
engine.choose_card(clients[ws]["pid"], data.get("key", ""))
elif mtype == "request_skill" and clients[ws]["pid"]:
await handle_skill_request(ws, clients[ws]["pid"], str(data.get("prompt", "")))
elif mtype == "start" and clients[ws]["pid"]:
if engine.status == "lobby":
engine.start_round()
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
if ws in queue:
queue.remove(ws)
_drop(ws)
app.mount("/static", StaticFiles(directory=os.path.join(HERE, "static")), name="static")
# --------------------------------------------------------------------------
# Gradio dashboard (mounted at /dashboard)
# --------------------------------------------------------------------------
def _dashboard_state():
players = sorted(engine.players.values(), key=lambda p: p.gold, reverse=True)
if players:
lb = "| Wizard | Level | Gold | Last-round dmg | Kills |\n|---|---|---|---|---|\n"
lb += "\n".join(
f"| {p.name} | {p.level} | {p.gold} | {round(p.dmg_dealt)} | {p.kills} |"
for p in players
)
else:
lb = "_No wizards in the arena yet._"
header = (f"### Round {engine.round} — `{engine.status}`\n"
f"{engine.status_msg}\n\n"
f"Players: {len(engine.players)}/{MAX_PLAYERS} · "
f"GM: _{engine.gm_message or '—'}_")
traces = gamemaster.RECENT_TRACES[:10]
return header, lb, traces
with gr.Blocks(title="HuggingWizards — Game Master Dashboard") as demo:
gr.Markdown("# 🧙 HuggingWizards — Game Master Dashboard\n"
"Play at the [arena](/). Below: live state, leaderboard, and the "
"per-round **agent traces** produced by Nemotron-4B.")
header_md = gr.Markdown()
with gr.Row():
leaderboard_md = gr.Markdown()
gr.Markdown("## Agent traces (most recent first)")
traces_json = gr.JSON()
timer = gr.Timer(2.0)
timer.tick(_dashboard_state, outputs=[header_md, leaderboard_md, traces_json])
demo.load(_dashboard_state, outputs=[header_md, leaderboard_md, traces_json])
app = gr.mount_gradio_app(app, demo, path="/dashboard", ssr_mode=False)
# ---- ZeroGPU startup report ------------------------------------------------
# The `spaces` package sends its "this app has @spaces.GPU functions" report
# to the ZeroGPU controller via a hook it installs on gr.Blocks.launch(). We
# serve everything through uvicorn instead of demo.launch(), so that hook
# never fires and ZeroGPU kills the Space with "No @spaces.GPU function
# detected during startup". Invoke the same startup task manually. It packs
# the (already loaded) model for fast GPU transfer and phones home; off
# Spaces, `spaces.zero` doesn't define `startup` and this is a no-op.
with contextlib.suppress(Exception):
from spaces import zero as _spaces_zero # type: ignore
if hasattr(_spaces_zero, "startup"):
_spaces_zero.startup()
print("[app] ZeroGPU startup report sent")
if __name__ == "__main__":
import uvicorn
# Spaces expects the app on 7860 (it exports GRADIO_SERVER_PORT).
port = int(os.environ.get("PORT") or os.environ.get("GRADIO_SERVER_PORT") or 7860)
uvicorn.run(app, host="0.0.0.0", port=port)