Spaces:
Running on Zero
Running on Zero
| """ | |
| modular_mind.py -- numpy inference for the trained Modular Mind boss brain. | |
| Loads the weights produced by train.py (mm_weights.npz) and runs the exact same | |
| forward pass as mm_torch.ModularMindPolicy, in pure numpy (no torch needed at game | |
| runtime -> tiny, fast, instant cold-start on a HuggingFace Space). | |
| decide(state) returns the chosen boss action plus rich telemetry (per-specialist | |
| drives, the shared latent, the coordinator's modulation) so the game can VISUALISE | |
| the Modular Mind making each decision -- including the two modulator specialists | |
| (Punisher, Enrage) whose only influence is the latent they write into the bridge. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import numpy as np | |
| from features import ACTIONS, NF, extract_features, legal_mask | |
| # must match mm_torch.SPEC_DEFS | |
| SPEC_DEFS = [ | |
| ("Aggressor", "CLEAVE", "#ff4d4d"), | |
| ("Stalker", "APPROACH", "#4da6ff"), | |
| ("Survivor", "RETREAT", "#9b59b6"), | |
| ("Baiter", "IDLE", "#f1c40f"), | |
| ("Defender", "BLOCK", "#48b0c4"), | |
| ("Punisher", None, "#e67e22"), | |
| ("Enrage", None, "#c0392b"), | |
| ] | |
| WEIGHTS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mm_weights.npz") | |
| H, D_LATENT = 24, 12 | |
| def _layernorm(x, w, b, eps=1e-5): | |
| mu = x.mean() | |
| var = ((x - mu) ** 2).mean() | |
| return (x - mu) / np.sqrt(var + eps) * w + b | |
| def _relu(x): | |
| return np.maximum(x, 0.0) | |
| class ModularMind: | |
| def __init__(self, weights_path=WEIGHTS_PATH): | |
| if os.path.exists(weights_path): | |
| self.W = {k: v for k, v in np.load(weights_path).items()} | |
| self.trained = True | |
| else: # fallback so the game still runs before training finishes | |
| self.W = self._random_weights() | |
| self.trained = False | |
| def _random_weights(self): | |
| rng = np.random.default_rng(0) | |
| W = {} | |
| for i, (_, owns, _) in enumerate(SPEC_DEFS): | |
| W[f"s{i}_fc1_w"] = rng.normal(0, .3, (H, NF)) | |
| W[f"s{i}_fc1_b"] = np.zeros(H) | |
| W[f"s{i}_lat_w"] = rng.normal(0, .3, (D_LATENT, H)) | |
| W[f"s{i}_lat_b"] = np.zeros(D_LATENT) | |
| if owns is not None: | |
| W[f"s{i}_drv_w"] = rng.normal(0, .3, (1, H)) | |
| W[f"s{i}_drv_b"] = np.zeros(1) | |
| W["link_ni_w"] = np.ones(D_LATENT); W["link_ni_b"] = np.zeros(D_LATENT) | |
| W["link_v"] = rng.normal(0, .3, (2 * D_LATENT, D_LATENT)) | |
| W["link_g"] = rng.normal(0, .3, (2 * D_LATENT, D_LATENT)) | |
| W["link_d"] = rng.normal(0, .3, (D_LATENT, 2 * D_LATENT)) | |
| W["link_no_w"] = np.ones(D_LATENT); W["link_no_b"] = np.zeros(D_LATENT) | |
| W["coord_w"] = rng.normal(0, .3, (len(ACTIONS), D_LATENT)) | |
| W["coord_b"] = np.zeros(len(ACTIONS)) | |
| return W | |
| def decide(self, state: dict, explore: float = 0.06): | |
| W = self.W | |
| f = extract_features(state).astype(np.float64) | |
| drives = np.zeros(len(ACTIONS)) | |
| latents = [] | |
| per_spec = [] | |
| for i, (name, owns, color) in enumerate(SPEC_DEFS): | |
| h = np.tanh(W[f"s{i}_fc1_w"] @ f + W[f"s{i}_fc1_b"]) | |
| lat = W[f"s{i}_lat_w"] @ h + W[f"s{i}_lat_b"] | |
| latents.append(lat) | |
| drv = None | |
| if owns is not None: | |
| drv = float((W[f"s{i}_drv_w"] @ h + W[f"s{i}_drv_b"]).item()) | |
| drives[ACTIONS.index(owns)] += drv | |
| per_spec.append({ | |
| "name": name, "owns": owns, "color": color, | |
| "drive": round(drv, 3) if drv is not None else None, | |
| "latent_norm": round(float(np.linalg.norm(lat)), 3), | |
| }) | |
| # RecursiveLink: shared latent bridge | |
| z = np.sum(latents, axis=0) | |
| zn = _layernorm(z, W["link_ni_w"], W["link_ni_b"]) | |
| reglu = _relu(W["link_g"] @ zn) * (W["link_v"] @ zn) | |
| out = W["link_d"] @ reglu | |
| shared = _layernorm(out + z, W["link_no_w"], W["link_no_b"]) | |
| # Coordinator read-out (modulator influence flows in here) | |
| modulation = W["coord_w"] @ shared + W["coord_b"] | |
| logits = drives + modulation | |
| mask = legal_mask(state) | |
| masked = np.where(mask > 0.5, logits, -1e9) | |
| probs = np.exp(masked - masked.max()) | |
| probs = probs / probs.sum() | |
| # `explore` is a true mistake-rate: with prob `explore` take a uniformly | |
| # random LEGAL action (the difficulty dial), else the best action. (Sampling | |
| # from `probs` barely degrades play -- the policy is too confident -- so we | |
| # use uniform mistakes to make easier tiers actually easier.) | |
| if explore > 0 and np.random.random() < explore: | |
| legal_idx = np.where(mask > 0.5)[0] | |
| choice = int(np.random.choice(legal_idx)) | |
| else: | |
| choice = int(np.argmax(masked)) | |
| phase = 2 if state.get("bossHP", 1.0) < 0.5 else 1 | |
| return { | |
| "action": ACTIONS[choice], | |
| "phase": phase, | |
| "trained": self.trained, | |
| "specialists": per_spec, | |
| "base_drive": {a: round(float(d), 3) for a, d in zip(ACTIONS, drives)}, | |
| "modulation": {a: round(float(m), 3) for a, m in zip(ACTIONS, modulation)}, | |
| "final_drive": {a: round(float(d), 3) for a, d in zip(ACTIONS, logits)}, | |
| "probs": {a: round(float(p), 3) for a, p in zip(ACTIONS, probs)}, | |
| "legal": {a: bool(m > 0.5) for a, m in zip(ACTIONS, mask)}, | |
| "shared_latent": [round(float(x), 3) for x in shared], | |
| } | |
| # ---- difficulty tiers -------------------------------------------------------- | |
| # Each tier = a different training checkpoint PLUS a decision-noise level. Easy is a | |
| # partially-trained brain that also "thinks loosely" (more exploration -> more | |
| # mistakes, more openings for the player); Hard is the fully-trained brain playing | |
| # sharply. Missing weight files fall back to the hard weights. | |
| _DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # Difficulty = decision-noise on the trained brain. `explore` is the mistake-rate | |
| # (prob of a random legal action); higher = the boss makes more exploitable mistakes. | |
| # (Once the boss learned to BLOCK it dominates the sim even early in training, so a | |
| # "weaker checkpoint" no longer yields an easy boss -- noise is the controllable dial: | |
| # vs a near-optimal dodger these give boss win-rates ~0.35 / ~0.65 / ~0.95.) | |
| DIFFICULTY = { | |
| "easy": {"file": "mm_weights.npz", "explore": 0.50}, | |
| "normal": {"file": "mm_weights.npz", "explore": 0.22}, | |
| "hard": {"file": "mm_weights.npz", "explore": 0.04}, | |
| } | |
| _MINDS: dict[str, "ModularMind"] = {} | |
| def get_mind(difficulty: str = "hard") -> ModularMind: | |
| key = difficulty if difficulty in DIFFICULTY else "hard" | |
| if key not in _MINDS: | |
| path = os.path.join(_DIR, DIFFICULTY[key]["file"]) | |
| if not os.path.exists(path): # tier not trained yet | |
| path = os.path.join(_DIR, DIFFICULTY["hard"]["file"]) | |
| mind = ModularMind(path) | |
| # the HARD tier shares the online learner's live (player-adapted) weights, | |
| # so finetuning from real fights takes effect immediately. (Lazy import to | |
| # avoid an import cycle: online -> mm_grad -> modular_mind.) | |
| if key == "hard": | |
| try: | |
| import online | |
| if online.ENABLED: | |
| mind.W = online.live_weights() | |
| mind.trained = True | |
| except Exception as e: | |
| print(f"[modular_mind] online weights not shared ({e})") | |
| _MINDS[key] = mind | |
| return _MINDS[key] | |
| def decide(state: dict) -> dict: | |
| """Route a decision to the brain for the requested difficulty tier (each tier | |
| has its own checkpoint and exploration/mistake level).""" | |
| key = str(state.get("difficulty", "hard")) | |
| if key not in DIFFICULTY: | |
| key = "hard" | |
| out = get_mind(key).decide(state, explore=DIFFICULTY[key]["explore"]) | |
| out["difficulty"] = key | |
| return out | |