"""Live game runtime: lazily builds the in-process llama.cpp backend, generates cases, and holds live ``Session`` objects per run. Single-flight is MANDATORY: ``llama_cpp.Llama`` is not thread-safe, so every model call (generation + interrogation) runs under one lock - never concurrently, on any machine. Background case generation runs on EVERY box, but it can never make a player wait: each generation call holds the lock for just that one call (never the whole pipeline), and on a small box it streams with an interrupt check between tokens - the moment a player asks a question the in-flight generation aborts within a token, the lock frees, and the turn runs. Generation resumes once the table has been idle for a while. Fresh AI cases land in the buffer (served on the very next New Case) AND join the shuffled rotation, so the pool of mysteries grows for as long as the Space stays up. """ from __future__ import annotations import random import threading import time import uuid from collections.abc import Iterator from dataclasses import dataclass from ..config import effective_cpus, get_settings from ..engine.session import Session from ..generator.pipeline import generate_case from ..llm.backend import GenParams, LLMBackend, LLMError, make_backend from ..persistence.case_store import load_case, save_runtime_case from ..persistence.paths import prebaked_cases_dir, runtime_cases_dir from ..schemas.accusation import Accusation from ..schemas.case import CaseFile from ..schemas.enums import Relevance from .case_adapter import casefile_to_public from .public_view import PublicCase # How long the table must be quiet (no interrogation turn) before a small box starts - # or resumes - a background generation. _IDLE_SECS = 90.0 class _SharedLockBackend: """Per-call single-flight wrapper. Each model call holds the runtime lock for JUST that call, so a player's turn waits behind at most one in-flight call - never a whole multi-call generation. With an ``interrupt`` event the call streams internally and aborts between tokens the moment a player shows up, freeing the lock at once.""" def __init__(self, inner: LLMBackend, lock: threading.Lock, interrupt: threading.Event | None = None) -> None: self._inner = inner self._lock = lock self._interrupt = interrupt def generate(self, prompt: str, params: GenParams) -> str: with self._lock: if self._interrupt is None: return self._inner.generate(prompt, params) if self._interrupt.is_set(): raise LLMError("generation interrupted by player") parts: list[str] = [] for delta in self._inner.stream(prompt, params): parts.append(delta) if self._interrupt.is_set(): raise LLMError("generation interrupted by player") return "".join(parts) def stream(self, prompt: str, params: GenParams) -> Iterator[str]: with self._lock: yield from self._inner.stream(prompt, params) @dataclass class LiveRun: run_id: str case: CaseFile session: Session public: PublicCase baselines: dict[str, int] class GameRuntime: def __init__(self) -> None: self._lock = threading.Lock() # MANDATORY single-flight over all model calls self._backend: LLMBackend | None = None self._backend_failed = False self._runs: dict[str, LiveRun] = {} self._buffer: CaseFile | None = None self._buffer_lock = threading.Lock() self._seed = int(time.time()) % 900_000 + 1000 self._rng = random.Random(self._seed) # Pre-baked pool: full, model-authored cases shipped with the Space, served instantly # on New Case so nobody waits ~2 min for live generation. Interrogation is still live. self._prebaked: list[CaseFile] = [] self._prebaked_idx = 0 self._prebaked_loaded = False # Background generation: a fast box generates immediately; a small box (the 2-vCPU # Space) waits for an idle table and aborts between tokens when a player shows up. self._fast_box = effective_cpus() > 4 self._gen_interrupt = threading.Event() self._gen_running = False self._gen_flag_lock = threading.Lock() self._last_player_ts = 0.0 # ---- backend ---- def _get_backend(self) -> LLMBackend | None: if self._backend is None and not self._backend_failed: try: self._backend = make_backend(get_settings()) except LLMError: self._backend_failed = True return self._backend def available(self) -> bool: return self._get_backend() is not None def _next_seed(self) -> int: self._seed += 1 return self._seed # ---- generation ---- def _generate(self, seed: int, *, interruptible: bool = False) -> CaseFile: backend = self._get_backend() if backend is None: raise LLMError("no backend") wrapped = _SharedLockBackend(backend, self._lock, self._gen_interrupt if interruptible else None) result = generate_case(wrapped, seed=seed) save_runtime_case(result.case) return result.case def _prebuild(self) -> None: """Generate ONE fresh AI case in the background, yielding to players. On a small box: wait until the table is idle, abort between tokens if a player interrupts, then wait for idle again and retry. The finished case is served on the very next New Case and joins the rotation for good.""" try: for _ in range(8): try: if not self._fast_box: while time.time() - self._last_player_ts < _IDLE_SECS: time.sleep(5) case = self._generate(self._next_seed(), interruptible=not self._fast_box) except LLMError: continue # interrupted by a player, or malformed output - try again except Exception: break with self._buffer_lock: self._buffer = case self._prebaked.append(case) break finally: with self._gen_flag_lock: self._gen_running = False def _spawn_gen(self) -> None: if not self.available(): return with self._gen_flag_lock: if self._gen_running: return self._gen_running = True threading.Thread(target=self._prebuild, daemon=True).start() def _load_prebaked(self) -> None: if self._prebaked_loaded: return self._prebaked_loaded = True pool_dir = prebaked_cases_dir() if not pool_dir.is_dir(): return for path in sorted(pool_dir.glob("*.json")): try: self._prebaked.append(load_case(path)) except Exception: continue # Shuffle per process (the seed is time-based) so the FIRST case of every fresh # visit/restart is randomized - never the same opening mystery twice in a row. self._rng.shuffle(self._prebaked) def start_buffer(self) -> None: """Make the first New Case instant (shipped pool, shuffled) and start growing the pool with a fresh AI-generated case in the background.""" self._load_prebaked() self._spawn_gen() def _take_buffered(self) -> CaseFile | None: with self._buffer_lock: case = self._buffer self._buffer = None return case def _take_prebaked(self) -> CaseFile | None: self._load_prebaked() if not self._prebaked: return None if self._prebaked_idx >= len(self._prebaked): # Bag exhausted: reshuffle for a fresh order on the next lap. self._rng.shuffle(self._prebaked) self._prebaked_idx = 0 case = self._prebaked[self._prebaked_idx] self._prebaked_idx += 1 return case def _maybe_refill(self) -> None: """Keep one fresh AI case cooking in the background whenever the buffer is empty.""" if self._buffer is None: self._spawn_gen() def new_generated_run(self) -> tuple[PublicCase, str] | None: if not self.available(): return None # Prefer a freshly generated case if one is ready; else serve the pre-baked pool # instantly; only with neither do we generate synchronously (first run, no pool). case = self._take_buffered() or self._take_prebaked() if case is None: try: case = self._generate(self._next_seed()) except Exception: return None self._maybe_refill() return self._register(case) def load_generated_run(self, case_id: str) -> tuple[PublicCase, str] | None: if not self.available(): return None self._load_prebaked() case = next((c for c in self._prebaked if c.case_id == case_id), None) if case is None: for directory in (prebaked_cases_dir(), runtime_cases_dir()): path = directory / f"{case_id}.json" if path.exists(): try: case = load_case(path) except Exception: case = None break if case is None: return None return self._register(case) def _register(self, case: CaseFile) -> tuple[PublicCase, str]: public = casefile_to_public(case) session = Session(case, self._get_backend()) # type: ignore[arg-type] run_id = uuid.uuid4().hex baselines = {s.id: s.baseline_suspicion for s in public.suspects} self._runs[run_id] = LiveRun(run_id, case, session, public, baselines) return public, run_id def get(self, run_id: str) -> LiveRun | None: return self._runs.get(run_id) # ---- live turn / verdict ---- def _suspicion(self, run: LiveRun, sus_id: str) -> int: st = run.session.state.state_for(sus_id) base = run.baselines.get(sus_id, 25) val = base + round(st.stress * 55) + (20 if st.broken_lie_ids else 0) return max(0, min(100, val)) def interrogate_live( self, run: LiveRun, sus_id: str, question: str, clue_id: str | None ) -> dict: prev = self._suspicion(run, sus_id) # Tell any in-flight background generation to yield the lock NOW (it aborts # between tokens), then take the table. self._gen_interrupt.set() self._last_player_ts = time.time() with self._lock: self._gen_interrupt.clear() final = None for ev in run.session.interrogate(sus_id, question, presented_clue_id=clue_id): if ev.final is not None: final = ev.final self._last_player_ts = time.time() reply = final.turn.spoken if final else "…I have nothing to say to that." after = self._suspicion(run, sus_id) adj = final.adjudication if final else None rattled = bool(adj and adj.relevance in (Relevance.DIRECT, Relevance.BREAKING)) cornered = bool(adj and adj.is_contradiction) return { "reply": reply, "suspicionDelta": after - prev, "suspicion": after, "flags": {"rattled": rattled, "contradictionExposed": cornered, "cornered": cornered}, } def accuse_live(self, run: LiveRun, suspect_id: str, motive_id: str, evidence_ids: list[str]) -> dict: verdict = run.session.accuse( Accusation(accused_sus_id=suspect_id, motive_id=motive_id, cited_clue_ids=tuple(evidence_ids)) ) culprit_id = run.case.culprit.sus_id killer = run.case.suspect(culprit_id) if verdict.culprit_correct: truth = verdict.rationale or run.case.culprit.method_narrative else: accused = run.case.suspect(suspect_id).name if any(s.sus_id == suspect_id for s in run.case.suspects) else "the accused" truth = ( f"You charged {accused}. The case held for a night - but the evidence led past " f"them to {killer.name}, who walked out into the rain." ) return { "correct": verdict.culprit_correct, "verdict": { "stamp": "CASE CLOSED" if verdict.culprit_correct else "MISTRIAL", "killerId": culprit_id, "killerName": killer.name, "truth": truth, }, "score": { "points": verdict.score, "max": 100, "killerCorrect": verdict.culprit_correct, "motiveCorrect": verdict.motive_correct, "evidenceHits": len(evidence_ids), }, "stats": [], } RUNTIME = GameRuntime()