opencode-env / tests /test_five_sorts_e2e.py
AdithyaSK's picture
AdithyaSK HF Staff
Upload folder using huggingface_hub
70f2179 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""End-to-end: spawn E2B, install opencode, write 5 sorting algorithms, verify.
Talks to E2B and the LLM endpoints directly via the e2b SDK and httpx — no
imports from the ``opencode_env`` package at runtime. The proxy that captures
per-token logprobs (Mode B) is uploaded into the sandbox as a standalone
source file from ``../interception.py``.
For each endpoint configured in ``envs/opencode_env/.env`` (vLLM / OpenAI /
HF Router) this script:
1. Creates a fresh E2B sandbox
2. Installs opencode (``curl https://opencode.ai/install | bash``)
3. (Mode B only) uploads + starts the in-sandbox logprob-capture proxy
4. Writes ``opencode.json`` pointing at the proxy (or the LLM directly)
5. Runs ``opencode run --format json "<instruction>"`` to completion
6. Runs an in-sandbox verifier that imports each sort module and tests it
7. Reads back: per-file pass/fail, file contents, proxy logprob stats,
wall time, sandbox id
Default usage (runs every endpoint that has the required vars in .env)::
.venv/bin/python envs/opencode_env/tests/test_five_sorts_e2e.py
Common flags::
--endpoint vllm|openai|hf_router|all (default: all)
--mode transparent_proxy|black_box (default: transparent_proxy)
--agent-timeout 600 (seconds before opencode is killed)
--max-tokens-cap 4096 (per-turn max_tokens clamp)
--save-artifacts (dump JSON per run to tests/_artifacts/)
--instruction-override "..." (custom instruction)
Requires ``E2B_API_KEY`` in the environment plus per-endpoint creds in .env.
Each rollout takes 1–7 minutes of wall plus ~10s sandbox cold start.
"""
from __future__ import annotations
import argparse
import json
import os
import secrets
import sys
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from statistics import mean
from typing import Any
import httpx
from e2b import Sandbox
# ---------------------------------------------------------------------------
# .env loader — minimal, no python-dotenv dep.
# ---------------------------------------------------------------------------
_THIS_DIR = Path(__file__).resolve().parent
_ENV_DIR = _THIS_DIR.parent
_DOTENV_PATH = _ENV_DIR / ".env"
_PROXY_SOURCE_PATH = _ENV_DIR / "sandbox" / "interception.py"
def _load_env(path: Path) -> None:
if not path.exists():
return
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k = k.strip()
v = v.strip().strip('"').strip("'")
if k and k not in os.environ:
os.environ[k] = v
_load_env(_DOTENV_PATH)
# ---------------------------------------------------------------------------
# Endpoint specs — three flavors, all OAI-compatible.
# ---------------------------------------------------------------------------
@dataclass
class Endpoint:
label: str
base_url: str
model: str
api_key: str
# Inject ``chat_template_kwargs.enable_thinking=false`` on forwarded
# requests. Needed for Qwen3.5 served via vLLM (otherwise the model
# spends its budget on reasoning). OpenAI rejects this field with HTTP
# 400 ("Unrecognized request argument"); HF Router's Instruct variant
# doesn't need it. Default per-endpoint, overridable via CLI.
disable_thinking_default: bool = False
def _resolve_endpoints() -> tuple[list[Endpoint], list[str]]:
"""Return (configured, skipped_reasons) from current process env."""
specs = [
# (label, base_url_env, default_base_url, model_env, default_model,
# api_key_env, default_api_key)
(
"vllm",
"VLLM_URL",
"",
"VLLM_MODEL",
"Qwen/Qwen3.5-4B",
"VLLM_API_KEY",
"intercepted",
),
(
"openai",
"OPENAI_BASE_URL",
"https://api.openai.com/v1",
"OPENAI_MODEL",
"gpt-4o-mini",
"OPENAI_API_KEY",
"",
),
(
"hf_router",
"HF_ROUTER_BASE_URL",
"https://router.huggingface.co/v1",
"HF_ROUTER_MODEL",
"Qwen/Qwen3-4B-Instruct-2507:nscale",
"HF_ROUTER_API_KEY",
"",
),
]
chosen: list[Endpoint] = []
skipped: list[str] = []
for label, bu_env, bu_default, mdl_env, mdl_default, ak_env, ak_default in specs:
base = os.environ.get(bu_env) or bu_default
model = os.environ.get(mdl_env) or mdl_default
api_key = os.environ.get(ak_env) or ak_default
if not (base and model and api_key):
skipped.append(
f"{label} (need {bu_env} / {mdl_env} / {ak_env} in .env)"
)
continue
# Always normalize to a /v1 base URL — opencode + the proxy expect it.
base = base.rstrip("/")
if not base.endswith("/v1"):
base = f"{base}/v1"
chosen.append(
Endpoint(
label=label,
base_url=base,
model=model,
api_key=api_key,
disable_thinking_default=(label == "vllm"),
)
)
return chosen, skipped
# ---------------------------------------------------------------------------
# The locked task: instruction + verifier source. Identical for all endpoints.
# ---------------------------------------------------------------------------
MODULES = ["bubble_sort", "merge_sort", "quick_sort"]
INSTRUCTION = (
"Create THREE Python files in the current working directory, one per "
"sorting algorithm. Use RELATIVE paths — do NOT write to absolute paths "
"like `/bubble_sort.py`. Files (one algorithm each):\n"
" - bubble_sort.py -> bubble sort\n"
" - merge_sort.py -> merge sort\n"
" - quick_sort.py -> quicksort\n\n"
"Each file MUST expose exactly one function with this signature:\n"
" def sort(arr: list[int]) -> list[int]\n\n"
"It must return a NEW list sorted in non-decreasing order (do not mutate "
"the input). Each file must implement the algorithm named for it — do "
"NOT call `sorted()` or `list.sort()`, and do NOT import third-party "
"libraries. Handle edge cases: empty list, single element, duplicates, "
"already-sorted, reverse-sorted, negative numbers. Do not write tests, "
"a main block, README, or any other files."
)
VERIFIER_SOURCE = '''\
"""Verifier for the three-sorts E2E test. Runs inside the sandbox."""
import importlib
import json
import re
import shutil
import sys
import traceback
from pathlib import Path
WORKDIR = Path("/home/user/workdir")
LOG_DIR = Path("/home/user/logs/verifier")
LOG_DIR.mkdir(parents=True, exist_ok=True)
WORKDIR.mkdir(parents=True, exist_ok=True)
sys.path.insert(0, str(WORKDIR))
MODULES = ["bubble_sort", "merge_sort", "quick_sort"]
# Some models (notably Qwen3 served via vLLM) ignore "use relative paths"
# and write files to ``/<name>.py``. With ``--dangerously-skip-permissions``
# opencode allows it, so we relocate any stray files into WORKDIR so the
# import side below is path-uniform.
for name in MODULES:
stray = Path("/") / f"{name}.py"
target = WORKDIR / f"{name}.py"
if stray.exists() and not target.exists():
shutil.move(str(stray), str(target))
# Each test case: (input, expected_sorted_output)
CASES = [
([3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5], [1, 1, 2, 3, 3, 4, 5, 5, 5, 6, 9]),
([], []),
([42], [42]),
([2, 1], [1, 2]),
([10, 9, 8, 7, 6, 5, 4, 3, 2, 1], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
([-3, -1, -2, 0, 5, 4], [-3, -2, -1, 0, 4, 5]),
([7, 7, 7, 7, 7], [7, 7, 7, 7, 7]),
]
# Catch a model that calls ``sorted()`` / ``.sort()`` while pretending to
# "implement" the named algorithm.
SOURCE_FORBIDDEN = re.compile(r"\\b(sorted\\s*\\(|\\.sort\\s*\\()")
results = {}
for name in MODULES:
fpath = WORKDIR / f"{name}.py"
if not fpath.exists():
results[name] = "missing"
continue
try:
src = fpath.read_text()
if SOURCE_FORBIDDEN.search(src):
results[name] = "cheat: uses sorted()/list.sort()"
continue
sys.modules.pop(name, None)
mod = importlib.import_module(name)
# Accept either ``sort`` (per spec) or the algorithm-named function
# (a common drift — e.g. gpt-4o-mini emits ``def bubble_sort(...)``).
fn = getattr(mod, "sort", None) or getattr(mod, name, None)
if fn is None:
results[name] = "no_sort_or_named_function"
continue
all_pass = True
for inp, expected in CASES:
inp_copy = list(inp)
actual = fn(list(inp))
if actual != expected:
all_pass = False
results[name] = (
f"fail: {fn.__name__}({inp!r}) -> {actual!r}, "
f"expected {expected!r}"
)
break
# The callee should not mutate the caller's list.
if list(inp) != inp_copy:
all_pass = False
results[name] = (
f"fail: {fn.__name__} mutated input {inp!r} -> {inp_copy!r}"
)
break
if all_pass:
results[name] = "pass"
except Exception:
tb = traceback.format_exc()
results[name] = f"error: {tb.splitlines()[-1]}"
passed = sum(1 for v in results.values() if v == "pass")
reward = passed / len(MODULES)
(LOG_DIR / "reward.txt").write_text(f"{reward:.4f}")
(LOG_DIR / "results.json").write_text(json.dumps(results, indent=2))
print(f"REWARD={reward:.4f} PASSED={passed}/{len(MODULES)}")
print(f"RESULTS={json.dumps(results)}")
'''
# ---------------------------------------------------------------------------
# Sandbox paths.
# ---------------------------------------------------------------------------
HOME = "/home/user"
WORKDIR = f"{HOME}/workdir"
OPENCODE_BIN = f"{HOME}/.opencode/bin/opencode"
OPENCODE_CONFIG = f"{HOME}/.config/opencode/opencode.json"
INSTRUCTION_PATH = f"{HOME}/task/instruction.md"
VERIFIER_PATH = f"{HOME}/test.py"
AGENT_LOG = f"{HOME}/logs/agent/opencode.jsonl"
PROXY_LOG = f"{HOME}/logs/agent/proxy.log"
PROXY_TRACE = f"{HOME}/logs/agent/proxy_trace.jsonl"
PROXY_SCRIPT_PATH = f"{HOME}/proxy/interception.py"
REWARD_FILE = f"{HOME}/logs/verifier/reward.txt"
RESULTS_FILE = f"{HOME}/logs/verifier/results.json"
PROXY_PORT = 7000
# ---------------------------------------------------------------------------
# Result types.
# ---------------------------------------------------------------------------
@dataclass
class LogprobStats:
n_turns: int = 0
productive_turns: int = 0
total_completion_tokens: int = 0
tokens_per_turn: list[int] = field(default_factory=list)
mean_logprob: float | None = None
first_token: str = ""
first_logprob: float | None = None
last_token: str = ""
last_logprob: float | None = None
finish_reasons: dict[str, int] = field(default_factory=dict)
@dataclass
class RunResult:
endpoint: str
model: str
base_url: str
sandbox_id: str = ""
reward: float | None = None
tests: dict[str, str] = field(default_factory=dict)
files: dict[str, str] = field(default_factory=dict)
files_extra: list[str] = field(default_factory=list)
logprobs: LogprobStats = field(default_factory=LogprobStats)
wall_s: float = 0.0
agent_exit_code: int | None = None
error: str = ""
proxy_log_tail: str = ""
agent_log_tail: str = ""
verifier_stdout: str = ""
# Raw per-turn dump (request body + response body, truncated). Saved
# into artifacts so failures can be debugged without re-running.
raw_turns: list[dict[str, Any]] = field(default_factory=list)
@property
def passed(self) -> int:
return sum(1 for v in self.tests.values() if v == "pass")
# ---------------------------------------------------------------------------
# Sandbox helpers — thin wrappers around e2b SDK.
# ---------------------------------------------------------------------------
def _exec(
sbx: Sandbox,
cmd: str,
*,
envs: dict[str, str] | None = None,
cwd: str | None = None,
timeout: float = 60,
) -> tuple[int, str, str]:
"""Synchronous shell exec. Returns (exit_code, stdout, stderr)."""
from e2b.sandbox.commands.command_handle import CommandExitException
try:
r = sbx.commands.run(
cmd, envs=envs, cwd=cwd, timeout=timeout, background=False
)
return r.exit_code, r.stdout or "", r.stderr or ""
except CommandExitException as exc:
return (
int(getattr(exc, "exit_code", 1)),
str(getattr(exc, "stdout", "") or ""),
str(getattr(exc, "stderr", "") or str(exc)),
)
def _exec_bg_with_timeout(
sbx: Sandbox,
cmd: str,
*,
envs: dict[str, str] | None = None,
cwd: str | None = None,
timeout_s: float = 600,
poll_interval_s: float = 1.0,
) -> int:
"""Run ``cmd`` in the background and poll until it writes a marker file.
Returns the command's exit code. Raises ``TimeoutError`` if the marker
does not appear within ``timeout_s``. ``timeout=0`` is passed to E2B so
the server-side 60s deadline does not kill the process.
"""
marker = f"/tmp/cmd_done_{secrets.token_hex(4)}"
wrapped = f"({cmd}); echo $? > {marker}"
sbx.commands.run(
wrapped, envs=envs, cwd=cwd, background=True, timeout=0
)
deadline = time.time() + timeout_s
while time.time() < deadline:
try:
if sbx.files.exists(marker):
code_str = sbx.files.read(marker).strip()
return int(code_str) if code_str else -1
except Exception:
pass
time.sleep(poll_interval_s)
raise TimeoutError(f"command did not finish within {timeout_s}s")
def _safe_read(sbx: Sandbox, path: str) -> str:
try:
return sbx.files.read(path)
except Exception:
return ""
def _write_text(sbx: Sandbox, path: str, content: str) -> None:
parent = str(Path(path).parent)
if parent not in ("", "/"):
sbx.files.make_dir(parent)
sbx.files.write(path, content)
# ---------------------------------------------------------------------------
# Bootstrap: install opencode, write config, optionally start proxy.
# ---------------------------------------------------------------------------
def _wait_for_sandbox_ready(sbx: Sandbox, *, attempts: int = 15) -> None:
for _ in range(attempts):
code, out, _ = _exec(sbx, "echo ok", timeout=5)
if code == 0 and "ok" in out:
return
time.sleep(1)
raise RuntimeError("sandbox did not become ready within ~15s")
def _install_opencode(sbx: Sandbox) -> None:
cmd = (
"set -e && "
f"mkdir -p {HOME}/.config/opencode {HOME}/logs/agent "
f"{HOME}/logs/verifier {HOME}/task {WORKDIR} {HOME}/proxy && "
"curl -fsSL https://opencode.ai/install | bash && "
f'export PATH="{HOME}/.opencode/bin:$PATH" && '
"opencode --version"
)
last_stderr = ""
for attempt in range(3):
code, _, err = _exec(sbx, cmd, timeout=240)
if code == 0:
return
last_stderr = err
time.sleep(3 * (attempt + 1))
raise RuntimeError(f"opencode install failed: {last_stderr[-1000:]}")
def _ensure_dirs_exist(sbx: Sandbox) -> None:
"""When using a pre-baked template, dirs already exist. This is a no-op
safety net that ensures the layout is present (cheap mkdir -p)."""
_exec(
sbx,
f"mkdir -p {HOME}/.config/opencode {HOME}/logs/agent "
f"{HOME}/logs/verifier {HOME}/task {WORKDIR} {HOME}/proxy",
timeout=30,
)
def _start_proxy(
sbx: Sandbox,
upstream_url: str,
upstream_api_key: str,
upstream_model: str,
*,
top_logprobs: int,
max_tokens_cap: int,
disable_thinking: bool,
skip_install: bool = False,
) -> str:
"""Upload + start the logprob-capture proxy, return its baseURL.
Returns the URL opencode should hit (``http://127.0.0.1:7000/v1``).
When ``skip_install`` is True (pre-baked template), the proxy source
and pip deps are assumed to already be present.
"""
if not skip_install:
if not _PROXY_SOURCE_PATH.exists():
raise RuntimeError(
f"proxy source not found at {_PROXY_SOURCE_PATH} — needed "
"for transparent_proxy mode"
)
_write_text(sbx, PROXY_SCRIPT_PATH, _PROXY_SOURCE_PATH.read_text())
code, _, err = _exec(
sbx,
"pip install --quiet 'fastapi>=0.104' 'uvicorn[standard]>=0.24' "
"'httpx>=0.27' 2>&1 | tail -20",
timeout=180,
)
if code != 0:
raise RuntimeError(f"proxy deps install failed: {err[-800:]}")
flags = (
f"--upstream-url {upstream_url} "
f"--upstream-api-key {upstream_api_key} "
f"--trace {PROXY_TRACE} "
f"--port {PROXY_PORT} "
f"--top-logprobs {top_logprobs} "
f"--max-tokens-cap {max_tokens_cap} "
f"--model-override '{upstream_model}' "
)
if disable_thinking:
flags += "--disable-thinking "
cmd = (
f"cd {HOME}/proxy && "
f"python interception.py {flags}> {PROXY_LOG} 2>&1"
)
sbx.commands.run(cmd, background=True, timeout=0)
# Wait for healthz.
for _ in range(120):
code, _, _ = _exec(
sbx, f"curl -sf http://127.0.0.1:{PROXY_PORT}/healthz", timeout=5
)
if code == 0:
return f"http://127.0.0.1:{PROXY_PORT}/v1"
time.sleep(0.5)
log = _safe_read(sbx, PROXY_LOG)
raise RuntimeError(f"proxy did not start within 60s. log:\n{log[-2000:]}")
def _write_opencode_json(
sbx: Sandbox,
base_url: str,
api_key: str,
model: str,
request_timeout_ms: int = 600_000,
) -> None:
"""Stage opencode.json for ``@ai-sdk/openai-compatible``.
All three endpoints route through the OAI-compatible adapter — the proxy
serves ``/v1/chat/completions`` and so does each upstream we target.
"""
inner_model = model.split("/", 1)[-1]
doc = {
"$schema": "https://opencode.ai/config.json",
"model": f"intercepted/{inner_model}",
"provider": {
"intercepted": {
"npm": "@ai-sdk/openai-compatible",
"name": "Intercepted",
"options": {
"baseURL": base_url,
"apiKey": api_key,
"timeout": request_timeout_ms,
},
"models": {inner_model: {"name": "Intercepted Model"}},
}
},
"tools": {"webfetch": False, "question": False},
}
_write_text(sbx, OPENCODE_CONFIG, json.dumps(doc, indent=2))
# ---------------------------------------------------------------------------
# Run + verify + collect.
# ---------------------------------------------------------------------------
def _run_agent(
sbx: Sandbox,
*,
instruction_path: str,
base_url: str,
api_key: str,
timeout_s: float,
) -> int:
"""Invoke ``opencode run`` synchronously, return its exit code."""
envs = {
"OPENAI_BASE_URL": base_url,
"OPENAI_API_KEY": api_key,
"OPENCODE_CONFIG": OPENCODE_CONFIG,
"PATH": f"{HOME}/.opencode/bin:/usr/local/bin:/usr/bin:/bin",
}
cmd = (
f'export PATH="{HOME}/.opencode/bin:$PATH" && '
f"cd {WORKDIR} && "
f"opencode run --format json --dangerously-skip-permissions "
f'"$(cat {instruction_path})" 2>&1 | tee {AGENT_LOG}'
)
return _exec_bg_with_timeout(
sbx, cmd, envs=envs, timeout_s=timeout_s
)
def _run_verifier(sbx: Sandbox) -> tuple[float | None, dict[str, str], str]:
cmd = f"mkdir -p {HOME}/logs/verifier && python {VERIFIER_PATH}"
code, out, err = _exec(sbx, cmd, timeout=120)
reward_str = _safe_read(sbx, REWARD_FILE).strip()
results_str = _safe_read(sbx, RESULTS_FILE)
try:
reward = float(reward_str) if reward_str else None
except ValueError:
reward = None
try:
tests = json.loads(results_str) if results_str.strip() else {}
except json.JSONDecodeError:
tests = {}
combined = (out + ("\n" + err if err else "")).strip()
return reward, tests, combined[-3000:]
def _collect_files(sbx: Sandbox) -> tuple[dict[str, str], list[str]]:
files: dict[str, str] = {}
for name in MODULES:
path = f"{WORKDIR}/{name}.py"
try:
if sbx.files.exists(path):
files[f"{name}.py"] = sbx.files.read(path)[:8000]
except Exception:
pass
code, out, _ = _exec(
sbx,
f"find {WORKDIR} -maxdepth 1 -type f -printf '%f\\n' 2>/dev/null",
timeout=10,
)
extras: list[str] = []
expected = {f"{m}.py" for m in MODULES}
for line in (out or "").splitlines():
n = line.strip()
if n and n not in expected and not n.startswith("."):
extras.append(n)
return files, extras
def _read_proxy_trace(sbx: Sandbox) -> list[dict[str, Any]]:
raw = _safe_read(sbx, PROXY_TRACE)
out: list[dict[str, Any]] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except Exception:
pass
return out
def _logprob_stats(turns: list[dict[str, Any]]) -> LogprobStats:
s = LogprobStats(n_turns=len(turns))
if not turns:
return s
all_lps: list[float] = []
finish: dict[str, int] = {}
for t in turns:
toks = t.get("completion_tokens") or []
lps = t.get("per_token_logps") or []
s.tokens_per_turn.append(len(toks))
s.total_completion_tokens += len(toks)
if toks:
s.productive_turns += 1
all_lps.extend(float(x) for x in lps if x is not None)
fr = t.get("finish_reason") or "unknown"
finish[fr] = finish.get(fr, 0) + 1
s.finish_reasons = finish
if all_lps:
s.mean_logprob = mean(all_lps)
first = next((t for t in turns if t.get("completion_tokens")), None)
last = next(
(t for t in reversed(turns) if t.get("completion_tokens")), None
)
if first:
s.first_token = str(first["completion_tokens"][0])
lp = (first.get("per_token_logps") or [None])[0]
if lp is not None:
s.first_logprob = float(lp)
if last:
s.last_token = str(last["completion_tokens"][-1])
lp = (last.get("per_token_logps") or [None])[-1]
if lp is not None:
s.last_logprob = float(lp)
return s
# ---------------------------------------------------------------------------
# One full rollout.
# ---------------------------------------------------------------------------
def run_one(
ep: Endpoint,
*,
mode: str,
agent_timeout_s: float,
max_tokens_cap: int,
top_logprobs: int,
disable_thinking: bool,
instruction: str,
e2b_api_key: str,
template: str | None = None,
) -> RunResult:
print(
f"[{ep.label}] launching base_url={ep.base_url} model={ep.model} "
f"mode={mode} template={template or '(default)'}",
flush=True,
)
res = RunResult(endpoint=ep.label, model=ep.model, base_url=ep.base_url)
started = time.time()
sbx = Sandbox.create(
template=template,
timeout=int(agent_timeout_s) + 300,
api_key=e2b_api_key,
)
res.sandbox_id = sbx.sandbox_id
print(f"[{ep.label}] sandbox={sbx.sandbox_id}", flush=True)
try:
_wait_for_sandbox_ready(sbx)
if template:
_ensure_dirs_exist(sbx)
else:
_install_opencode(sbx)
_write_text(sbx, INSTRUCTION_PATH, instruction)
_write_text(sbx, VERIFIER_PATH, VERIFIER_SOURCE)
if mode == "transparent_proxy":
base_url = _start_proxy(
sbx,
upstream_url=ep.base_url,
upstream_api_key=ep.api_key,
upstream_model=ep.model,
top_logprobs=top_logprobs,
max_tokens_cap=max_tokens_cap,
disable_thinking=disable_thinking,
skip_install=bool(template),
)
else:
base_url = ep.base_url
_write_opencode_json(
sbx,
base_url=base_url,
api_key=ep.api_key if mode == "black_box" else "intercepted",
model=ep.model,
)
try:
res.agent_exit_code = _run_agent(
sbx,
instruction_path=INSTRUCTION_PATH,
base_url=base_url,
api_key=ep.api_key if mode == "black_box" else "intercepted",
timeout_s=agent_timeout_s,
)
print(
f"[{ep.label}] agent exit_code={res.agent_exit_code}",
flush=True,
)
except TimeoutError as exc:
res.error = f"agent timeout: {exc}"
print(f"[{ep.label}] {res.error}", flush=True)
reward, tests, vstdout = _run_verifier(sbx)
res.reward, res.tests, res.verifier_stdout = reward, tests, vstdout
res.files, res.files_extra = _collect_files(sbx)
turns = _read_proxy_trace(sbx)
res.logprobs = _logprob_stats(turns)
# Capture truncated request/response per turn for debugging. Strip
# large/noisy fields (full token logprobs, raw bytes) to keep the
# artifact readable.
for t in turns:
req = t.get("request") or {}
resp = t.get("response") or {}
res.raw_turns.append(
{
"turn": t.get("turn"),
"finish_reason": t.get("finish_reason"),
"latency_s": t.get("latency_s"),
"request_messages": req.get("messages", [])[-6:],
"request_tools": [
(tool.get("function") or {}).get("name", "?")
for tool in (req.get("tools") or [])
],
"request_temperature": req.get("temperature"),
"request_max_tokens": req.get("max_tokens")
or req.get("max_completion_tokens"),
"response_choices": [
{
"finish_reason": ch.get("finish_reason"),
"message_content": (ch.get("message") or {}).get(
"content"
),
"tool_calls": [
{
"name": (tc.get("function") or {}).get(
"name"
),
"arguments": str(
(tc.get("function") or {}).get(
"arguments", ""
)
)[:500],
}
for tc in (
(ch.get("message") or {}).get("tool_calls")
or []
)
],
}
for ch in (resp.get("choices") or [])
],
"upstream_status": resp.get("upstream_status"),
"upstream_error": resp.get("upstream_error"),
}
)
res.proxy_log_tail = _safe_read(sbx, PROXY_LOG)[-2000:]
res.agent_log_tail = _safe_read(sbx, AGENT_LOG)[-4000:]
except Exception as exc: # noqa: BLE001
res.error = f"{type(exc).__name__}: {exc}"
print(f"[{ep.label}] ERROR {res.error}", flush=True)
finally:
try:
sbx.kill()
except Exception:
pass
res.wall_s = time.time() - started
return res
# ---------------------------------------------------------------------------
# Reporting.
# ---------------------------------------------------------------------------
def _format_summary(results: list[RunResult]) -> str:
lines: list[str] = []
sep = "-" * 110
lines.append(sep)
lines.append(
f"{'endpoint':<10} {'model':<42} {'reward':<8} {'pass':<6} "
f"{'turns':<6} {'tokens':<8} {'mean-logp':<11} {'wall':<8}"
)
lines.append(sep)
for r in results:
reward = f"{r.reward:.2f}" if r.reward is not None else "-"
pass_str = f"{r.passed}/{len(MODULES)}"
mean_lp = (
f"{r.logprobs.mean_logprob:+.3f}"
if r.logprobs.mean_logprob is not None
else "-"
)
lines.append(
f"{r.endpoint:<10} {r.model[:42]:<42} {reward:<8} {pass_str:<6} "
f"{r.logprobs.n_turns:<6} {r.logprobs.total_completion_tokens:<8} "
f"{mean_lp:<11} {r.wall_s:<7.1f}s"
)
lines.append(sep)
lines.append("")
lines.append("per-file results:")
for r in results:
per_file = " ".join(
f"{m}={r.tests.get(m, '?')}" for m in MODULES
)
lines.append(f" {r.endpoint:<10} {per_file}")
if r.files_extra:
lines.append(
f" {' ':<10} extras: {', '.join(sorted(r.files_extra))}"
)
if r.error:
lines.append(f" {' ':<10} ERROR: {r.error[:200]}")
return "\n".join(lines)
def _save_artifact(r: RunResult, out_dir: Path) -> Path:
out_dir.mkdir(parents=True, exist_ok=True)
ts = int(time.time())
target = out_dir / f"sorting_{r.endpoint}_{ts}.json"
target.write_text(json.dumps(asdict(r), indent=2, default=str))
return target
# ---------------------------------------------------------------------------
# CLI.
# ---------------------------------------------------------------------------
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(
prog="test_five_sorts_e2e",
description=(
"Run opencode end-to-end against vLLM / OpenAI / HF Router, "
"write 5 sorting algorithms in 5 files, verify them, return "
"logprobs + tests + filesystem."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument(
"--endpoint",
choices=["vllm", "openai", "hf_router", "all"],
default="all",
help="Which endpoint to test (default: all configured endpoints).",
)
p.add_argument(
"--mode",
choices=["transparent_proxy", "black_box"],
default="transparent_proxy",
help=(
"transparent_proxy captures per-token logprobs; black_box skips "
"the proxy. Default: transparent_proxy."
),
)
p.add_argument(
"--agent-timeout",
type=float,
default=600.0,
help="Seconds to wait for opencode to finish (default 600).",
)
p.add_argument(
"--max-tokens-cap",
type=int,
default=4096,
help="Per-turn max_tokens clamp on forwarded requests (default 4096).",
)
p.add_argument(
"--top-logprobs",
type=int,
default=5,
help="Top-k logprobs requested from the upstream (HF Router cap is 5).",
)
p.add_argument(
"--disable-thinking",
choices=["auto", "on", "off"],
default="auto",
help=(
"Inject ``chat_template_kwargs.enable_thinking=false`` on "
"forwarded requests. ``auto`` = on for vllm, off for openai / "
"hf_router (default). ``on`` / ``off`` forces it for every "
"endpoint."
),
)
p.add_argument(
"--save-artifacts",
action="store_true",
help="Dump per-run JSON to envs/opencode_env/tests/_artifacts/.",
)
p.add_argument(
"--instruction-override",
default=None,
help="Replace the default 5-sorts instruction.",
)
p.add_argument(
"--no-summary-files",
action="store_true",
help="Skip printing file contents in the summary.",
)
p.add_argument(
"--template",
default=None,
help=(
"E2B template name to use (e.g. ``opencode-rl`` after running "
"build_e2b_template.py). When set, skips opencode install + "
"pip-deps install (already in the template) — saves ~2 min "
"per rollout."
),
)
return p.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = _parse_args(argv)
e2b_api_key = os.environ.get("E2B_API_KEY")
if not e2b_api_key:
print(
"ERROR: E2B_API_KEY is required (set it in .env or your shell).",
file=sys.stderr,
)
return 2
print(f"Loading env from {_DOTENV_PATH}")
endpoints, skipped = _resolve_endpoints()
if args.endpoint != "all":
endpoints = [e for e in endpoints if e.label == args.endpoint]
instruction = args.instruction_override or INSTRUCTION
runs: list[RunResult] = []
for ep in endpoints:
if args.disable_thinking == "on":
disable_thinking = True
elif args.disable_thinking == "off":
disable_thinking = False
else:
disable_thinking = ep.disable_thinking_default
runs.append(
run_one(
ep,
mode=args.mode,
agent_timeout_s=args.agent_timeout,
max_tokens_cap=args.max_tokens_cap,
top_logprobs=args.top_logprobs,
disable_thinking=disable_thinking,
instruction=instruction,
e2b_api_key=e2b_api_key,
template=args.template,
)
)
print()
print(_format_summary(runs))
if skipped:
print("\nSkipped (not configured):")
for s in skipped:
print(f" - {s}")
if not args.no_summary_files:
for r in runs:
print(f"\n=== files written by {r.endpoint} ({r.model}) ===")
for fname, src in r.files.items():
head = "\n".join(src.splitlines()[:20])
print(f"--- {fname} (first 20 lines) ---")
print(head)
if src.count("\n") > 20:
print(f"... ({src.count(chr(10)) - 20} more lines)")
if args.save_artifacts:
out_dir = _ENV_DIR / "tests" / "_artifacts"
for r in runs:
print(f"saved {_save_artifact(r, out_dir)}")
if not runs:
print("\nNo endpoints ran. Fill in .env and re-run.")
return 2
failed = [r for r in runs if r.reward is None or r.reward < 1.0 or r.error]
if failed:
print(f"\n{len(failed)}/{len(runs)} endpoint(s) did not reach reward=1.0.")
return 1
print(f"\nAll {len(runs)} endpoint(s) reached reward=1.0.")
return 0
if __name__ == "__main__":
sys.exit(main())