# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """End-to-end: spawn E2B, install opencode, write 5 sorting algorithms, verify. Talks to E2B and the LLM endpoints directly via the e2b SDK and httpx — no imports from the ``opencode_env`` package at runtime. The proxy that captures per-token logprobs (Mode B) is uploaded into the sandbox as a standalone source file from ``../interception.py``. For each endpoint configured in ``envs/opencode_env/.env`` (vLLM / OpenAI / HF Router) this script: 1. Creates a fresh E2B sandbox 2. Installs opencode (``curl https://opencode.ai/install | bash``) 3. (Mode B only) uploads + starts the in-sandbox logprob-capture proxy 4. Writes ``opencode.json`` pointing at the proxy (or the LLM directly) 5. Runs ``opencode run --format json ""`` to completion 6. Runs an in-sandbox verifier that imports each sort module and tests it 7. Reads back: per-file pass/fail, file contents, proxy logprob stats, wall time, sandbox id Default usage (runs every endpoint that has the required vars in .env):: .venv/bin/python envs/opencode_env/tests/test_five_sorts_e2e.py Common flags:: --endpoint vllm|openai|hf_router|all (default: all) --mode transparent_proxy|black_box (default: transparent_proxy) --agent-timeout 600 (seconds before opencode is killed) --max-tokens-cap 4096 (per-turn max_tokens clamp) --save-artifacts (dump JSON per run to tests/_artifacts/) --instruction-override "..." (custom instruction) Requires ``E2B_API_KEY`` in the environment plus per-endpoint creds in .env. Each rollout takes 1–7 minutes of wall plus ~10s sandbox cold start. """ from __future__ import annotations import argparse import json import os import secrets import sys import time from dataclasses import asdict, dataclass, field from pathlib import Path from statistics import mean from typing import Any import httpx from e2b import Sandbox # --------------------------------------------------------------------------- # .env loader — minimal, no python-dotenv dep. # --------------------------------------------------------------------------- _THIS_DIR = Path(__file__).resolve().parent _ENV_DIR = _THIS_DIR.parent _DOTENV_PATH = _ENV_DIR / ".env" _PROXY_SOURCE_PATH = _ENV_DIR / "sandbox" / "interception.py" def _load_env(path: Path) -> None: if not path.exists(): return for raw in path.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") k = k.strip() v = v.strip().strip('"').strip("'") if k and k not in os.environ: os.environ[k] = v _load_env(_DOTENV_PATH) # --------------------------------------------------------------------------- # Endpoint specs — three flavors, all OAI-compatible. # --------------------------------------------------------------------------- @dataclass class Endpoint: label: str base_url: str model: str api_key: str # Inject ``chat_template_kwargs.enable_thinking=false`` on forwarded # requests. Needed for Qwen3.5 served via vLLM (otherwise the model # spends its budget on reasoning). OpenAI rejects this field with HTTP # 400 ("Unrecognized request argument"); HF Router's Instruct variant # doesn't need it. Default per-endpoint, overridable via CLI. disable_thinking_default: bool = False def _resolve_endpoints() -> tuple[list[Endpoint], list[str]]: """Return (configured, skipped_reasons) from current process env.""" specs = [ # (label, base_url_env, default_base_url, model_env, default_model, # api_key_env, default_api_key) ( "vllm", "VLLM_URL", "", "VLLM_MODEL", "Qwen/Qwen3.5-4B", "VLLM_API_KEY", "intercepted", ), ( "openai", "OPENAI_BASE_URL", "https://api.openai.com/v1", "OPENAI_MODEL", "gpt-4o-mini", "OPENAI_API_KEY", "", ), ( "hf_router", "HF_ROUTER_BASE_URL", "https://router.huggingface.co/v1", "HF_ROUTER_MODEL", "Qwen/Qwen3-4B-Instruct-2507:nscale", "HF_ROUTER_API_KEY", "", ), ] chosen: list[Endpoint] = [] skipped: list[str] = [] for label, bu_env, bu_default, mdl_env, mdl_default, ak_env, ak_default in specs: base = os.environ.get(bu_env) or bu_default model = os.environ.get(mdl_env) or mdl_default api_key = os.environ.get(ak_env) or ak_default if not (base and model and api_key): skipped.append( f"{label} (need {bu_env} / {mdl_env} / {ak_env} in .env)" ) continue # Always normalize to a /v1 base URL — opencode + the proxy expect it. base = base.rstrip("/") if not base.endswith("/v1"): base = f"{base}/v1" chosen.append( Endpoint( label=label, base_url=base, model=model, api_key=api_key, disable_thinking_default=(label == "vllm"), ) ) return chosen, skipped # --------------------------------------------------------------------------- # The locked task: instruction + verifier source. Identical for all endpoints. # --------------------------------------------------------------------------- MODULES = ["bubble_sort", "merge_sort", "quick_sort"] INSTRUCTION = ( "Create THREE Python files in the current working directory, one per " "sorting algorithm. Use RELATIVE paths — do NOT write to absolute paths " "like `/bubble_sort.py`. Files (one algorithm each):\n" " - bubble_sort.py -> bubble sort\n" " - merge_sort.py -> merge sort\n" " - quick_sort.py -> quicksort\n\n" "Each file MUST expose exactly one function with this signature:\n" " def sort(arr: list[int]) -> list[int]\n\n" "It must return a NEW list sorted in non-decreasing order (do not mutate " "the input). Each file must implement the algorithm named for it — do " "NOT call `sorted()` or `list.sort()`, and do NOT import third-party " "libraries. Handle edge cases: empty list, single element, duplicates, " "already-sorted, reverse-sorted, negative numbers. Do not write tests, " "a main block, README, or any other files." ) VERIFIER_SOURCE = '''\ """Verifier for the three-sorts E2E test. Runs inside the sandbox.""" import importlib import json import re import shutil import sys import traceback from pathlib import Path WORKDIR = Path("/home/user/workdir") LOG_DIR = Path("/home/user/logs/verifier") LOG_DIR.mkdir(parents=True, exist_ok=True) WORKDIR.mkdir(parents=True, exist_ok=True) sys.path.insert(0, str(WORKDIR)) MODULES = ["bubble_sort", "merge_sort", "quick_sort"] # Some models (notably Qwen3 served via vLLM) ignore "use relative paths" # and write files to ``/.py``. With ``--dangerously-skip-permissions`` # opencode allows it, so we relocate any stray files into WORKDIR so the # import side below is path-uniform. for name in MODULES: stray = Path("/") / f"{name}.py" target = WORKDIR / f"{name}.py" if stray.exists() and not target.exists(): shutil.move(str(stray), str(target)) # Each test case: (input, expected_sorted_output) CASES = [ ([3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5], [1, 1, 2, 3, 3, 4, 5, 5, 5, 6, 9]), ([], []), ([42], [42]), ([2, 1], [1, 2]), ([10, 9, 8, 7, 6, 5, 4, 3, 2, 1], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]), ([-3, -1, -2, 0, 5, 4], [-3, -2, -1, 0, 4, 5]), ([7, 7, 7, 7, 7], [7, 7, 7, 7, 7]), ] # Catch a model that calls ``sorted()`` / ``.sort()`` while pretending to # "implement" the named algorithm. SOURCE_FORBIDDEN = re.compile(r"\\b(sorted\\s*\\(|\\.sort\\s*\\()") results = {} for name in MODULES: fpath = WORKDIR / f"{name}.py" if not fpath.exists(): results[name] = "missing" continue try: src = fpath.read_text() if SOURCE_FORBIDDEN.search(src): results[name] = "cheat: uses sorted()/list.sort()" continue sys.modules.pop(name, None) mod = importlib.import_module(name) # Accept either ``sort`` (per spec) or the algorithm-named function # (a common drift — e.g. gpt-4o-mini emits ``def bubble_sort(...)``). fn = getattr(mod, "sort", None) or getattr(mod, name, None) if fn is None: results[name] = "no_sort_or_named_function" continue all_pass = True for inp, expected in CASES: inp_copy = list(inp) actual = fn(list(inp)) if actual != expected: all_pass = False results[name] = ( f"fail: {fn.__name__}({inp!r}) -> {actual!r}, " f"expected {expected!r}" ) break # The callee should not mutate the caller's list. if list(inp) != inp_copy: all_pass = False results[name] = ( f"fail: {fn.__name__} mutated input {inp!r} -> {inp_copy!r}" ) break if all_pass: results[name] = "pass" except Exception: tb = traceback.format_exc() results[name] = f"error: {tb.splitlines()[-1]}" passed = sum(1 for v in results.values() if v == "pass") reward = passed / len(MODULES) (LOG_DIR / "reward.txt").write_text(f"{reward:.4f}") (LOG_DIR / "results.json").write_text(json.dumps(results, indent=2)) print(f"REWARD={reward:.4f} PASSED={passed}/{len(MODULES)}") print(f"RESULTS={json.dumps(results)}") ''' # --------------------------------------------------------------------------- # Sandbox paths. # --------------------------------------------------------------------------- HOME = "/home/user" WORKDIR = f"{HOME}/workdir" OPENCODE_BIN = f"{HOME}/.opencode/bin/opencode" OPENCODE_CONFIG = f"{HOME}/.config/opencode/opencode.json" INSTRUCTION_PATH = f"{HOME}/task/instruction.md" VERIFIER_PATH = f"{HOME}/test.py" AGENT_LOG = f"{HOME}/logs/agent/opencode.jsonl" PROXY_LOG = f"{HOME}/logs/agent/proxy.log" PROXY_TRACE = f"{HOME}/logs/agent/proxy_trace.jsonl" PROXY_SCRIPT_PATH = f"{HOME}/proxy/interception.py" REWARD_FILE = f"{HOME}/logs/verifier/reward.txt" RESULTS_FILE = f"{HOME}/logs/verifier/results.json" PROXY_PORT = 7000 # --------------------------------------------------------------------------- # Result types. # --------------------------------------------------------------------------- @dataclass class LogprobStats: n_turns: int = 0 productive_turns: int = 0 total_completion_tokens: int = 0 tokens_per_turn: list[int] = field(default_factory=list) mean_logprob: float | None = None first_token: str = "" first_logprob: float | None = None last_token: str = "" last_logprob: float | None = None finish_reasons: dict[str, int] = field(default_factory=dict) @dataclass class RunResult: endpoint: str model: str base_url: str sandbox_id: str = "" reward: float | None = None tests: dict[str, str] = field(default_factory=dict) files: dict[str, str] = field(default_factory=dict) files_extra: list[str] = field(default_factory=list) logprobs: LogprobStats = field(default_factory=LogprobStats) wall_s: float = 0.0 agent_exit_code: int | None = None error: str = "" proxy_log_tail: str = "" agent_log_tail: str = "" verifier_stdout: str = "" # Raw per-turn dump (request body + response body, truncated). Saved # into artifacts so failures can be debugged without re-running. raw_turns: list[dict[str, Any]] = field(default_factory=list) @property def passed(self) -> int: return sum(1 for v in self.tests.values() if v == "pass") # --------------------------------------------------------------------------- # Sandbox helpers — thin wrappers around e2b SDK. # --------------------------------------------------------------------------- def _exec( sbx: Sandbox, cmd: str, *, envs: dict[str, str] | None = None, cwd: str | None = None, timeout: float = 60, ) -> tuple[int, str, str]: """Synchronous shell exec. Returns (exit_code, stdout, stderr).""" from e2b.sandbox.commands.command_handle import CommandExitException try: r = sbx.commands.run( cmd, envs=envs, cwd=cwd, timeout=timeout, background=False ) return r.exit_code, r.stdout or "", r.stderr or "" except CommandExitException as exc: return ( int(getattr(exc, "exit_code", 1)), str(getattr(exc, "stdout", "") or ""), str(getattr(exc, "stderr", "") or str(exc)), ) def _exec_bg_with_timeout( sbx: Sandbox, cmd: str, *, envs: dict[str, str] | None = None, cwd: str | None = None, timeout_s: float = 600, poll_interval_s: float = 1.0, ) -> int: """Run ``cmd`` in the background and poll until it writes a marker file. Returns the command's exit code. Raises ``TimeoutError`` if the marker does not appear within ``timeout_s``. ``timeout=0`` is passed to E2B so the server-side 60s deadline does not kill the process. """ marker = f"/tmp/cmd_done_{secrets.token_hex(4)}" wrapped = f"({cmd}); echo $? > {marker}" sbx.commands.run( wrapped, envs=envs, cwd=cwd, background=True, timeout=0 ) deadline = time.time() + timeout_s while time.time() < deadline: try: if sbx.files.exists(marker): code_str = sbx.files.read(marker).strip() return int(code_str) if code_str else -1 except Exception: pass time.sleep(poll_interval_s) raise TimeoutError(f"command did not finish within {timeout_s}s") def _safe_read(sbx: Sandbox, path: str) -> str: try: return sbx.files.read(path) except Exception: return "" def _write_text(sbx: Sandbox, path: str, content: str) -> None: parent = str(Path(path).parent) if parent not in ("", "/"): sbx.files.make_dir(parent) sbx.files.write(path, content) # --------------------------------------------------------------------------- # Bootstrap: install opencode, write config, optionally start proxy. # --------------------------------------------------------------------------- def _wait_for_sandbox_ready(sbx: Sandbox, *, attempts: int = 15) -> None: for _ in range(attempts): code, out, _ = _exec(sbx, "echo ok", timeout=5) if code == 0 and "ok" in out: return time.sleep(1) raise RuntimeError("sandbox did not become ready within ~15s") def _install_opencode(sbx: Sandbox) -> None: cmd = ( "set -e && " f"mkdir -p {HOME}/.config/opencode {HOME}/logs/agent " f"{HOME}/logs/verifier {HOME}/task {WORKDIR} {HOME}/proxy && " "curl -fsSL https://opencode.ai/install | bash && " f'export PATH="{HOME}/.opencode/bin:$PATH" && ' "opencode --version" ) last_stderr = "" for attempt in range(3): code, _, err = _exec(sbx, cmd, timeout=240) if code == 0: return last_stderr = err time.sleep(3 * (attempt + 1)) raise RuntimeError(f"opencode install failed: {last_stderr[-1000:]}") def _ensure_dirs_exist(sbx: Sandbox) -> None: """When using a pre-baked template, dirs already exist. This is a no-op safety net that ensures the layout is present (cheap mkdir -p).""" _exec( sbx, f"mkdir -p {HOME}/.config/opencode {HOME}/logs/agent " f"{HOME}/logs/verifier {HOME}/task {WORKDIR} {HOME}/proxy", timeout=30, ) def _start_proxy( sbx: Sandbox, upstream_url: str, upstream_api_key: str, upstream_model: str, *, top_logprobs: int, max_tokens_cap: int, disable_thinking: bool, skip_install: bool = False, ) -> str: """Upload + start the logprob-capture proxy, return its baseURL. Returns the URL opencode should hit (``http://127.0.0.1:7000/v1``). When ``skip_install`` is True (pre-baked template), the proxy source and pip deps are assumed to already be present. """ if not skip_install: if not _PROXY_SOURCE_PATH.exists(): raise RuntimeError( f"proxy source not found at {_PROXY_SOURCE_PATH} — needed " "for transparent_proxy mode" ) _write_text(sbx, PROXY_SCRIPT_PATH, _PROXY_SOURCE_PATH.read_text()) code, _, err = _exec( sbx, "pip install --quiet 'fastapi>=0.104' 'uvicorn[standard]>=0.24' " "'httpx>=0.27' 2>&1 | tail -20", timeout=180, ) if code != 0: raise RuntimeError(f"proxy deps install failed: {err[-800:]}") flags = ( f"--upstream-url {upstream_url} " f"--upstream-api-key {upstream_api_key} " f"--trace {PROXY_TRACE} " f"--port {PROXY_PORT} " f"--top-logprobs {top_logprobs} " f"--max-tokens-cap {max_tokens_cap} " f"--model-override '{upstream_model}' " ) if disable_thinking: flags += "--disable-thinking " cmd = ( f"cd {HOME}/proxy && " f"python interception.py {flags}> {PROXY_LOG} 2>&1" ) sbx.commands.run(cmd, background=True, timeout=0) # Wait for healthz. for _ in range(120): code, _, _ = _exec( sbx, f"curl -sf http://127.0.0.1:{PROXY_PORT}/healthz", timeout=5 ) if code == 0: return f"http://127.0.0.1:{PROXY_PORT}/v1" time.sleep(0.5) log = _safe_read(sbx, PROXY_LOG) raise RuntimeError(f"proxy did not start within 60s. log:\n{log[-2000:]}") def _write_opencode_json( sbx: Sandbox, base_url: str, api_key: str, model: str, request_timeout_ms: int = 600_000, ) -> None: """Stage opencode.json for ``@ai-sdk/openai-compatible``. All three endpoints route through the OAI-compatible adapter — the proxy serves ``/v1/chat/completions`` and so does each upstream we target. """ inner_model = model.split("/", 1)[-1] doc = { "$schema": "https://opencode.ai/config.json", "model": f"intercepted/{inner_model}", "provider": { "intercepted": { "npm": "@ai-sdk/openai-compatible", "name": "Intercepted", "options": { "baseURL": base_url, "apiKey": api_key, "timeout": request_timeout_ms, }, "models": {inner_model: {"name": "Intercepted Model"}}, } }, "tools": {"webfetch": False, "question": False}, } _write_text(sbx, OPENCODE_CONFIG, json.dumps(doc, indent=2)) # --------------------------------------------------------------------------- # Run + verify + collect. # --------------------------------------------------------------------------- def _run_agent( sbx: Sandbox, *, instruction_path: str, base_url: str, api_key: str, timeout_s: float, ) -> int: """Invoke ``opencode run`` synchronously, return its exit code.""" envs = { "OPENAI_BASE_URL": base_url, "OPENAI_API_KEY": api_key, "OPENCODE_CONFIG": OPENCODE_CONFIG, "PATH": f"{HOME}/.opencode/bin:/usr/local/bin:/usr/bin:/bin", } cmd = ( f'export PATH="{HOME}/.opencode/bin:$PATH" && ' f"cd {WORKDIR} && " f"opencode run --format json --dangerously-skip-permissions " f'"$(cat {instruction_path})" 2>&1 | tee {AGENT_LOG}' ) return _exec_bg_with_timeout( sbx, cmd, envs=envs, timeout_s=timeout_s ) def _run_verifier(sbx: Sandbox) -> tuple[float | None, dict[str, str], str]: cmd = f"mkdir -p {HOME}/logs/verifier && python {VERIFIER_PATH}" code, out, err = _exec(sbx, cmd, timeout=120) reward_str = _safe_read(sbx, REWARD_FILE).strip() results_str = _safe_read(sbx, RESULTS_FILE) try: reward = float(reward_str) if reward_str else None except ValueError: reward = None try: tests = json.loads(results_str) if results_str.strip() else {} except json.JSONDecodeError: tests = {} combined = (out + ("\n" + err if err else "")).strip() return reward, tests, combined[-3000:] def _collect_files(sbx: Sandbox) -> tuple[dict[str, str], list[str]]: files: dict[str, str] = {} for name in MODULES: path = f"{WORKDIR}/{name}.py" try: if sbx.files.exists(path): files[f"{name}.py"] = sbx.files.read(path)[:8000] except Exception: pass code, out, _ = _exec( sbx, f"find {WORKDIR} -maxdepth 1 -type f -printf '%f\\n' 2>/dev/null", timeout=10, ) extras: list[str] = [] expected = {f"{m}.py" for m in MODULES} for line in (out or "").splitlines(): n = line.strip() if n and n not in expected and not n.startswith("."): extras.append(n) return files, extras def _read_proxy_trace(sbx: Sandbox) -> list[dict[str, Any]]: raw = _safe_read(sbx, PROXY_TRACE) out: list[dict[str, Any]] = [] for line in raw.splitlines(): line = line.strip() if not line: continue try: out.append(json.loads(line)) except Exception: pass return out def _logprob_stats(turns: list[dict[str, Any]]) -> LogprobStats: s = LogprobStats(n_turns=len(turns)) if not turns: return s all_lps: list[float] = [] finish: dict[str, int] = {} for t in turns: toks = t.get("completion_tokens") or [] lps = t.get("per_token_logps") or [] s.tokens_per_turn.append(len(toks)) s.total_completion_tokens += len(toks) if toks: s.productive_turns += 1 all_lps.extend(float(x) for x in lps if x is not None) fr = t.get("finish_reason") or "unknown" finish[fr] = finish.get(fr, 0) + 1 s.finish_reasons = finish if all_lps: s.mean_logprob = mean(all_lps) first = next((t for t in turns if t.get("completion_tokens")), None) last = next( (t for t in reversed(turns) if t.get("completion_tokens")), None ) if first: s.first_token = str(first["completion_tokens"][0]) lp = (first.get("per_token_logps") or [None])[0] if lp is not None: s.first_logprob = float(lp) if last: s.last_token = str(last["completion_tokens"][-1]) lp = (last.get("per_token_logps") or [None])[-1] if lp is not None: s.last_logprob = float(lp) return s # --------------------------------------------------------------------------- # One full rollout. # --------------------------------------------------------------------------- def run_one( ep: Endpoint, *, mode: str, agent_timeout_s: float, max_tokens_cap: int, top_logprobs: int, disable_thinking: bool, instruction: str, e2b_api_key: str, template: str | None = None, ) -> RunResult: print( f"[{ep.label}] launching base_url={ep.base_url} model={ep.model} " f"mode={mode} template={template or '(default)'}", flush=True, ) res = RunResult(endpoint=ep.label, model=ep.model, base_url=ep.base_url) started = time.time() sbx = Sandbox.create( template=template, timeout=int(agent_timeout_s) + 300, api_key=e2b_api_key, ) res.sandbox_id = sbx.sandbox_id print(f"[{ep.label}] sandbox={sbx.sandbox_id}", flush=True) try: _wait_for_sandbox_ready(sbx) if template: _ensure_dirs_exist(sbx) else: _install_opencode(sbx) _write_text(sbx, INSTRUCTION_PATH, instruction) _write_text(sbx, VERIFIER_PATH, VERIFIER_SOURCE) if mode == "transparent_proxy": base_url = _start_proxy( sbx, upstream_url=ep.base_url, upstream_api_key=ep.api_key, upstream_model=ep.model, top_logprobs=top_logprobs, max_tokens_cap=max_tokens_cap, disable_thinking=disable_thinking, skip_install=bool(template), ) else: base_url = ep.base_url _write_opencode_json( sbx, base_url=base_url, api_key=ep.api_key if mode == "black_box" else "intercepted", model=ep.model, ) try: res.agent_exit_code = _run_agent( sbx, instruction_path=INSTRUCTION_PATH, base_url=base_url, api_key=ep.api_key if mode == "black_box" else "intercepted", timeout_s=agent_timeout_s, ) print( f"[{ep.label}] agent exit_code={res.agent_exit_code}", flush=True, ) except TimeoutError as exc: res.error = f"agent timeout: {exc}" print(f"[{ep.label}] {res.error}", flush=True) reward, tests, vstdout = _run_verifier(sbx) res.reward, res.tests, res.verifier_stdout = reward, tests, vstdout res.files, res.files_extra = _collect_files(sbx) turns = _read_proxy_trace(sbx) res.logprobs = _logprob_stats(turns) # Capture truncated request/response per turn for debugging. Strip # large/noisy fields (full token logprobs, raw bytes) to keep the # artifact readable. for t in turns: req = t.get("request") or {} resp = t.get("response") or {} res.raw_turns.append( { "turn": t.get("turn"), "finish_reason": t.get("finish_reason"), "latency_s": t.get("latency_s"), "request_messages": req.get("messages", [])[-6:], "request_tools": [ (tool.get("function") or {}).get("name", "?") for tool in (req.get("tools") or []) ], "request_temperature": req.get("temperature"), "request_max_tokens": req.get("max_tokens") or req.get("max_completion_tokens"), "response_choices": [ { "finish_reason": ch.get("finish_reason"), "message_content": (ch.get("message") or {}).get( "content" ), "tool_calls": [ { "name": (tc.get("function") or {}).get( "name" ), "arguments": str( (tc.get("function") or {}).get( "arguments", "" ) )[:500], } for tc in ( (ch.get("message") or {}).get("tool_calls") or [] ) ], } for ch in (resp.get("choices") or []) ], "upstream_status": resp.get("upstream_status"), "upstream_error": resp.get("upstream_error"), } ) res.proxy_log_tail = _safe_read(sbx, PROXY_LOG)[-2000:] res.agent_log_tail = _safe_read(sbx, AGENT_LOG)[-4000:] except Exception as exc: # noqa: BLE001 res.error = f"{type(exc).__name__}: {exc}" print(f"[{ep.label}] ERROR {res.error}", flush=True) finally: try: sbx.kill() except Exception: pass res.wall_s = time.time() - started return res # --------------------------------------------------------------------------- # Reporting. # --------------------------------------------------------------------------- def _format_summary(results: list[RunResult]) -> str: lines: list[str] = [] sep = "-" * 110 lines.append(sep) lines.append( f"{'endpoint':<10} {'model':<42} {'reward':<8} {'pass':<6} " f"{'turns':<6} {'tokens':<8} {'mean-logp':<11} {'wall':<8}" ) lines.append(sep) for r in results: reward = f"{r.reward:.2f}" if r.reward is not None else "-" pass_str = f"{r.passed}/{len(MODULES)}" mean_lp = ( f"{r.logprobs.mean_logprob:+.3f}" if r.logprobs.mean_logprob is not None else "-" ) lines.append( f"{r.endpoint:<10} {r.model[:42]:<42} {reward:<8} {pass_str:<6} " f"{r.logprobs.n_turns:<6} {r.logprobs.total_completion_tokens:<8} " f"{mean_lp:<11} {r.wall_s:<7.1f}s" ) lines.append(sep) lines.append("") lines.append("per-file results:") for r in results: per_file = " ".join( f"{m}={r.tests.get(m, '?')}" for m in MODULES ) lines.append(f" {r.endpoint:<10} {per_file}") if r.files_extra: lines.append( f" {' ':<10} extras: {', '.join(sorted(r.files_extra))}" ) if r.error: lines.append(f" {' ':<10} ERROR: {r.error[:200]}") return "\n".join(lines) def _save_artifact(r: RunResult, out_dir: Path) -> Path: out_dir.mkdir(parents=True, exist_ok=True) ts = int(time.time()) target = out_dir / f"sorting_{r.endpoint}_{ts}.json" target.write_text(json.dumps(asdict(r), indent=2, default=str)) return target # --------------------------------------------------------------------------- # CLI. # --------------------------------------------------------------------------- def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: p = argparse.ArgumentParser( prog="test_five_sorts_e2e", description=( "Run opencode end-to-end against vLLM / OpenAI / HF Router, " "write 5 sorting algorithms in 5 files, verify them, return " "logprobs + tests + filesystem." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) p.add_argument( "--endpoint", choices=["vllm", "openai", "hf_router", "all"], default="all", help="Which endpoint to test (default: all configured endpoints).", ) p.add_argument( "--mode", choices=["transparent_proxy", "black_box"], default="transparent_proxy", help=( "transparent_proxy captures per-token logprobs; black_box skips " "the proxy. Default: transparent_proxy." ), ) p.add_argument( "--agent-timeout", type=float, default=600.0, help="Seconds to wait for opencode to finish (default 600).", ) p.add_argument( "--max-tokens-cap", type=int, default=4096, help="Per-turn max_tokens clamp on forwarded requests (default 4096).", ) p.add_argument( "--top-logprobs", type=int, default=5, help="Top-k logprobs requested from the upstream (HF Router cap is 5).", ) p.add_argument( "--disable-thinking", choices=["auto", "on", "off"], default="auto", help=( "Inject ``chat_template_kwargs.enable_thinking=false`` on " "forwarded requests. ``auto`` = on for vllm, off for openai / " "hf_router (default). ``on`` / ``off`` forces it for every " "endpoint." ), ) p.add_argument( "--save-artifacts", action="store_true", help="Dump per-run JSON to envs/opencode_env/tests/_artifacts/.", ) p.add_argument( "--instruction-override", default=None, help="Replace the default 5-sorts instruction.", ) p.add_argument( "--no-summary-files", action="store_true", help="Skip printing file contents in the summary.", ) p.add_argument( "--template", default=None, help=( "E2B template name to use (e.g. ``opencode-rl`` after running " "build_e2b_template.py). When set, skips opencode install + " "pip-deps install (already in the template) — saves ~2 min " "per rollout." ), ) return p.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = _parse_args(argv) e2b_api_key = os.environ.get("E2B_API_KEY") if not e2b_api_key: print( "ERROR: E2B_API_KEY is required (set it in .env or your shell).", file=sys.stderr, ) return 2 print(f"Loading env from {_DOTENV_PATH}") endpoints, skipped = _resolve_endpoints() if args.endpoint != "all": endpoints = [e for e in endpoints if e.label == args.endpoint] instruction = args.instruction_override or INSTRUCTION runs: list[RunResult] = [] for ep in endpoints: if args.disable_thinking == "on": disable_thinking = True elif args.disable_thinking == "off": disable_thinking = False else: disable_thinking = ep.disable_thinking_default runs.append( run_one( ep, mode=args.mode, agent_timeout_s=args.agent_timeout, max_tokens_cap=args.max_tokens_cap, top_logprobs=args.top_logprobs, disable_thinking=disable_thinking, instruction=instruction, e2b_api_key=e2b_api_key, template=args.template, ) ) print() print(_format_summary(runs)) if skipped: print("\nSkipped (not configured):") for s in skipped: print(f" - {s}") if not args.no_summary_files: for r in runs: print(f"\n=== files written by {r.endpoint} ({r.model}) ===") for fname, src in r.files.items(): head = "\n".join(src.splitlines()[:20]) print(f"--- {fname} (first 20 lines) ---") print(head) if src.count("\n") > 20: print(f"... ({src.count(chr(10)) - 20} more lines)") if args.save_artifacts: out_dir = _ENV_DIR / "tests" / "_artifacts" for r in runs: print(f"saved {_save_artifact(r, out_dir)}") if not runs: print("\nNo endpoints ran. Fill in .env and re-run.") return 2 failed = [r for r in runs if r.reward is None or r.reward < 1.0 or r.error] if failed: print(f"\n{len(failed)}/{len(runs)} endpoint(s) did not reach reward=1.0.") return 1 print(f"\nAll {len(runs)} endpoint(s) reached reward=1.0.") return 0 if __name__ == "__main__": sys.exit(main())