Spaces:
Running
Running
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """End-to-end: spawn E2B, install opencode, write 5 sorting algorithms, verify. | |
| Talks to E2B and the LLM endpoints directly via the e2b SDK and httpx — no | |
| imports from the ``opencode_env`` package at runtime. The proxy that captures | |
| per-token logprobs (Mode B) is uploaded into the sandbox as a standalone | |
| source file from ``../interception.py``. | |
| For each endpoint configured in ``envs/opencode_env/.env`` (vLLM / OpenAI / | |
| HF Router) this script: | |
| 1. Creates a fresh E2B sandbox | |
| 2. Installs opencode (``curl https://opencode.ai/install | bash``) | |
| 3. (Mode B only) uploads + starts the in-sandbox logprob-capture proxy | |
| 4. Writes ``opencode.json`` pointing at the proxy (or the LLM directly) | |
| 5. Runs ``opencode run --format json "<instruction>"`` to completion | |
| 6. Runs an in-sandbox verifier that imports each sort module and tests it | |
| 7. Reads back: per-file pass/fail, file contents, proxy logprob stats, | |
| wall time, sandbox id | |
| Default usage (runs every endpoint that has the required vars in .env):: | |
| .venv/bin/python envs/opencode_env/tests/test_five_sorts_e2e.py | |
| Common flags:: | |
| --endpoint vllm|openai|hf_router|all (default: all) | |
| --mode transparent_proxy|black_box (default: transparent_proxy) | |
| --agent-timeout 600 (seconds before opencode is killed) | |
| --max-tokens-cap 4096 (per-turn max_tokens clamp) | |
| --save-artifacts (dump JSON per run to tests/_artifacts/) | |
| --instruction-override "..." (custom instruction) | |
| Requires ``E2B_API_KEY`` in the environment plus per-endpoint creds in .env. | |
| Each rollout takes 1–7 minutes of wall plus ~10s sandbox cold start. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import secrets | |
| import sys | |
| import time | |
| from dataclasses import asdict, dataclass, field | |
| from pathlib import Path | |
| from statistics import mean | |
| from typing import Any | |
| import httpx | |
| from e2b import Sandbox | |
| # --------------------------------------------------------------------------- | |
| # .env loader — minimal, no python-dotenv dep. | |
| # --------------------------------------------------------------------------- | |
| _THIS_DIR = Path(__file__).resolve().parent | |
| _ENV_DIR = _THIS_DIR.parent | |
| _DOTENV_PATH = _ENV_DIR / ".env" | |
| _PROXY_SOURCE_PATH = _ENV_DIR / "sandbox" / "interception.py" | |
| def _load_env(path: Path) -> None: | |
| if not path.exists(): | |
| return | |
| for raw in path.read_text().splitlines(): | |
| line = raw.strip() | |
| if not line or line.startswith("#") or "=" not in line: | |
| continue | |
| k, _, v = line.partition("=") | |
| k = k.strip() | |
| v = v.strip().strip('"').strip("'") | |
| if k and k not in os.environ: | |
| os.environ[k] = v | |
| _load_env(_DOTENV_PATH) | |
| # --------------------------------------------------------------------------- | |
| # Endpoint specs — three flavors, all OAI-compatible. | |
| # --------------------------------------------------------------------------- | |
| class Endpoint: | |
| label: str | |
| base_url: str | |
| model: str | |
| api_key: str | |
| # Inject ``chat_template_kwargs.enable_thinking=false`` on forwarded | |
| # requests. Needed for Qwen3.5 served via vLLM (otherwise the model | |
| # spends its budget on reasoning). OpenAI rejects this field with HTTP | |
| # 400 ("Unrecognized request argument"); HF Router's Instruct variant | |
| # doesn't need it. Default per-endpoint, overridable via CLI. | |
| disable_thinking_default: bool = False | |
| def _resolve_endpoints() -> tuple[list[Endpoint], list[str]]: | |
| """Return (configured, skipped_reasons) from current process env.""" | |
| specs = [ | |
| # (label, base_url_env, default_base_url, model_env, default_model, | |
| # api_key_env, default_api_key) | |
| ( | |
| "vllm", | |
| "VLLM_URL", | |
| "", | |
| "VLLM_MODEL", | |
| "Qwen/Qwen3.5-4B", | |
| "VLLM_API_KEY", | |
| "intercepted", | |
| ), | |
| ( | |
| "openai", | |
| "OPENAI_BASE_URL", | |
| "https://api.openai.com/v1", | |
| "OPENAI_MODEL", | |
| "gpt-4o-mini", | |
| "OPENAI_API_KEY", | |
| "", | |
| ), | |
| ( | |
| "hf_router", | |
| "HF_ROUTER_BASE_URL", | |
| "https://router.huggingface.co/v1", | |
| "HF_ROUTER_MODEL", | |
| "Qwen/Qwen3-4B-Instruct-2507:nscale", | |
| "HF_ROUTER_API_KEY", | |
| "", | |
| ), | |
| ] | |
| chosen: list[Endpoint] = [] | |
| skipped: list[str] = [] | |
| for label, bu_env, bu_default, mdl_env, mdl_default, ak_env, ak_default in specs: | |
| base = os.environ.get(bu_env) or bu_default | |
| model = os.environ.get(mdl_env) or mdl_default | |
| api_key = os.environ.get(ak_env) or ak_default | |
| if not (base and model and api_key): | |
| skipped.append( | |
| f"{label} (need {bu_env} / {mdl_env} / {ak_env} in .env)" | |
| ) | |
| continue | |
| # Always normalize to a /v1 base URL — opencode + the proxy expect it. | |
| base = base.rstrip("/") | |
| if not base.endswith("/v1"): | |
| base = f"{base}/v1" | |
| chosen.append( | |
| Endpoint( | |
| label=label, | |
| base_url=base, | |
| model=model, | |
| api_key=api_key, | |
| disable_thinking_default=(label == "vllm"), | |
| ) | |
| ) | |
| return chosen, skipped | |
| # --------------------------------------------------------------------------- | |
| # The locked task: instruction + verifier source. Identical for all endpoints. | |
| # --------------------------------------------------------------------------- | |
| MODULES = ["bubble_sort", "merge_sort", "quick_sort"] | |
| INSTRUCTION = ( | |
| "Create THREE Python files in the current working directory, one per " | |
| "sorting algorithm. Use RELATIVE paths — do NOT write to absolute paths " | |
| "like `/bubble_sort.py`. Files (one algorithm each):\n" | |
| " - bubble_sort.py -> bubble sort\n" | |
| " - merge_sort.py -> merge sort\n" | |
| " - quick_sort.py -> quicksort\n\n" | |
| "Each file MUST expose exactly one function with this signature:\n" | |
| " def sort(arr: list[int]) -> list[int]\n\n" | |
| "It must return a NEW list sorted in non-decreasing order (do not mutate " | |
| "the input). Each file must implement the algorithm named for it — do " | |
| "NOT call `sorted()` or `list.sort()`, and do NOT import third-party " | |
| "libraries. Handle edge cases: empty list, single element, duplicates, " | |
| "already-sorted, reverse-sorted, negative numbers. Do not write tests, " | |
| "a main block, README, or any other files." | |
| ) | |
| VERIFIER_SOURCE = '''\ | |
| """Verifier for the three-sorts E2E test. Runs inside the sandbox.""" | |
| import importlib | |
| import json | |
| import re | |
| import shutil | |
| import sys | |
| import traceback | |
| from pathlib import Path | |
| WORKDIR = Path("/home/user/workdir") | |
| LOG_DIR = Path("/home/user/logs/verifier") | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| WORKDIR.mkdir(parents=True, exist_ok=True) | |
| sys.path.insert(0, str(WORKDIR)) | |
| MODULES = ["bubble_sort", "merge_sort", "quick_sort"] | |
| # Some models (notably Qwen3 served via vLLM) ignore "use relative paths" | |
| # and write files to ``/<name>.py``. With ``--dangerously-skip-permissions`` | |
| # opencode allows it, so we relocate any stray files into WORKDIR so the | |
| # import side below is path-uniform. | |
| for name in MODULES: | |
| stray = Path("/") / f"{name}.py" | |
| target = WORKDIR / f"{name}.py" | |
| if stray.exists() and not target.exists(): | |
| shutil.move(str(stray), str(target)) | |
| # Each test case: (input, expected_sorted_output) | |
| CASES = [ | |
| ([3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5], [1, 1, 2, 3, 3, 4, 5, 5, 5, 6, 9]), | |
| ([], []), | |
| ([42], [42]), | |
| ([2, 1], [1, 2]), | |
| ([10, 9, 8, 7, 6, 5, 4, 3, 2, 1], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), | |
| ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]), | |
| ([-3, -1, -2, 0, 5, 4], [-3, -2, -1, 0, 4, 5]), | |
| ([7, 7, 7, 7, 7], [7, 7, 7, 7, 7]), | |
| ] | |
| # Catch a model that calls ``sorted()`` / ``.sort()`` while pretending to | |
| # "implement" the named algorithm. | |
| SOURCE_FORBIDDEN = re.compile(r"\\b(sorted\\s*\\(|\\.sort\\s*\\()") | |
| results = {} | |
| for name in MODULES: | |
| fpath = WORKDIR / f"{name}.py" | |
| if not fpath.exists(): | |
| results[name] = "missing" | |
| continue | |
| try: | |
| src = fpath.read_text() | |
| if SOURCE_FORBIDDEN.search(src): | |
| results[name] = "cheat: uses sorted()/list.sort()" | |
| continue | |
| sys.modules.pop(name, None) | |
| mod = importlib.import_module(name) | |
| # Accept either ``sort`` (per spec) or the algorithm-named function | |
| # (a common drift — e.g. gpt-4o-mini emits ``def bubble_sort(...)``). | |
| fn = getattr(mod, "sort", None) or getattr(mod, name, None) | |
| if fn is None: | |
| results[name] = "no_sort_or_named_function" | |
| continue | |
| all_pass = True | |
| for inp, expected in CASES: | |
| inp_copy = list(inp) | |
| actual = fn(list(inp)) | |
| if actual != expected: | |
| all_pass = False | |
| results[name] = ( | |
| f"fail: {fn.__name__}({inp!r}) -> {actual!r}, " | |
| f"expected {expected!r}" | |
| ) | |
| break | |
| # The callee should not mutate the caller's list. | |
| if list(inp) != inp_copy: | |
| all_pass = False | |
| results[name] = ( | |
| f"fail: {fn.__name__} mutated input {inp!r} -> {inp_copy!r}" | |
| ) | |
| break | |
| if all_pass: | |
| results[name] = "pass" | |
| except Exception: | |
| tb = traceback.format_exc() | |
| results[name] = f"error: {tb.splitlines()[-1]}" | |
| passed = sum(1 for v in results.values() if v == "pass") | |
| reward = passed / len(MODULES) | |
| (LOG_DIR / "reward.txt").write_text(f"{reward:.4f}") | |
| (LOG_DIR / "results.json").write_text(json.dumps(results, indent=2)) | |
| print(f"REWARD={reward:.4f} PASSED={passed}/{len(MODULES)}") | |
| print(f"RESULTS={json.dumps(results)}") | |
| ''' | |
| # --------------------------------------------------------------------------- | |
| # Sandbox paths. | |
| # --------------------------------------------------------------------------- | |
| HOME = "/home/user" | |
| WORKDIR = f"{HOME}/workdir" | |
| OPENCODE_BIN = f"{HOME}/.opencode/bin/opencode" | |
| OPENCODE_CONFIG = f"{HOME}/.config/opencode/opencode.json" | |
| INSTRUCTION_PATH = f"{HOME}/task/instruction.md" | |
| VERIFIER_PATH = f"{HOME}/test.py" | |
| AGENT_LOG = f"{HOME}/logs/agent/opencode.jsonl" | |
| PROXY_LOG = f"{HOME}/logs/agent/proxy.log" | |
| PROXY_TRACE = f"{HOME}/logs/agent/proxy_trace.jsonl" | |
| PROXY_SCRIPT_PATH = f"{HOME}/proxy/interception.py" | |
| REWARD_FILE = f"{HOME}/logs/verifier/reward.txt" | |
| RESULTS_FILE = f"{HOME}/logs/verifier/results.json" | |
| PROXY_PORT = 7000 | |
| # --------------------------------------------------------------------------- | |
| # Result types. | |
| # --------------------------------------------------------------------------- | |
| class LogprobStats: | |
| n_turns: int = 0 | |
| productive_turns: int = 0 | |
| total_completion_tokens: int = 0 | |
| tokens_per_turn: list[int] = field(default_factory=list) | |
| mean_logprob: float | None = None | |
| first_token: str = "" | |
| first_logprob: float | None = None | |
| last_token: str = "" | |
| last_logprob: float | None = None | |
| finish_reasons: dict[str, int] = field(default_factory=dict) | |
| class RunResult: | |
| endpoint: str | |
| model: str | |
| base_url: str | |
| sandbox_id: str = "" | |
| reward: float | None = None | |
| tests: dict[str, str] = field(default_factory=dict) | |
| files: dict[str, str] = field(default_factory=dict) | |
| files_extra: list[str] = field(default_factory=list) | |
| logprobs: LogprobStats = field(default_factory=LogprobStats) | |
| wall_s: float = 0.0 | |
| agent_exit_code: int | None = None | |
| error: str = "" | |
| proxy_log_tail: str = "" | |
| agent_log_tail: str = "" | |
| verifier_stdout: str = "" | |
| # Raw per-turn dump (request body + response body, truncated). Saved | |
| # into artifacts so failures can be debugged without re-running. | |
| raw_turns: list[dict[str, Any]] = field(default_factory=list) | |
| def passed(self) -> int: | |
| return sum(1 for v in self.tests.values() if v == "pass") | |
| # --------------------------------------------------------------------------- | |
| # Sandbox helpers — thin wrappers around e2b SDK. | |
| # --------------------------------------------------------------------------- | |
| def _exec( | |
| sbx: Sandbox, | |
| cmd: str, | |
| *, | |
| envs: dict[str, str] | None = None, | |
| cwd: str | None = None, | |
| timeout: float = 60, | |
| ) -> tuple[int, str, str]: | |
| """Synchronous shell exec. Returns (exit_code, stdout, stderr).""" | |
| from e2b.sandbox.commands.command_handle import CommandExitException | |
| try: | |
| r = sbx.commands.run( | |
| cmd, envs=envs, cwd=cwd, timeout=timeout, background=False | |
| ) | |
| return r.exit_code, r.stdout or "", r.stderr or "" | |
| except CommandExitException as exc: | |
| return ( | |
| int(getattr(exc, "exit_code", 1)), | |
| str(getattr(exc, "stdout", "") or ""), | |
| str(getattr(exc, "stderr", "") or str(exc)), | |
| ) | |
| def _exec_bg_with_timeout( | |
| sbx: Sandbox, | |
| cmd: str, | |
| *, | |
| envs: dict[str, str] | None = None, | |
| cwd: str | None = None, | |
| timeout_s: float = 600, | |
| poll_interval_s: float = 1.0, | |
| ) -> int: | |
| """Run ``cmd`` in the background and poll until it writes a marker file. | |
| Returns the command's exit code. Raises ``TimeoutError`` if the marker | |
| does not appear within ``timeout_s``. ``timeout=0`` is passed to E2B so | |
| the server-side 60s deadline does not kill the process. | |
| """ | |
| marker = f"/tmp/cmd_done_{secrets.token_hex(4)}" | |
| wrapped = f"({cmd}); echo $? > {marker}" | |
| sbx.commands.run( | |
| wrapped, envs=envs, cwd=cwd, background=True, timeout=0 | |
| ) | |
| deadline = time.time() + timeout_s | |
| while time.time() < deadline: | |
| try: | |
| if sbx.files.exists(marker): | |
| code_str = sbx.files.read(marker).strip() | |
| return int(code_str) if code_str else -1 | |
| except Exception: | |
| pass | |
| time.sleep(poll_interval_s) | |
| raise TimeoutError(f"command did not finish within {timeout_s}s") | |
| def _safe_read(sbx: Sandbox, path: str) -> str: | |
| try: | |
| return sbx.files.read(path) | |
| except Exception: | |
| return "" | |
| def _write_text(sbx: Sandbox, path: str, content: str) -> None: | |
| parent = str(Path(path).parent) | |
| if parent not in ("", "/"): | |
| sbx.files.make_dir(parent) | |
| sbx.files.write(path, content) | |
| # --------------------------------------------------------------------------- | |
| # Bootstrap: install opencode, write config, optionally start proxy. | |
| # --------------------------------------------------------------------------- | |
| def _wait_for_sandbox_ready(sbx: Sandbox, *, attempts: int = 15) -> None: | |
| for _ in range(attempts): | |
| code, out, _ = _exec(sbx, "echo ok", timeout=5) | |
| if code == 0 and "ok" in out: | |
| return | |
| time.sleep(1) | |
| raise RuntimeError("sandbox did not become ready within ~15s") | |
| def _install_opencode(sbx: Sandbox) -> None: | |
| cmd = ( | |
| "set -e && " | |
| f"mkdir -p {HOME}/.config/opencode {HOME}/logs/agent " | |
| f"{HOME}/logs/verifier {HOME}/task {WORKDIR} {HOME}/proxy && " | |
| "curl -fsSL https://opencode.ai/install | bash && " | |
| f'export PATH="{HOME}/.opencode/bin:$PATH" && ' | |
| "opencode --version" | |
| ) | |
| last_stderr = "" | |
| for attempt in range(3): | |
| code, _, err = _exec(sbx, cmd, timeout=240) | |
| if code == 0: | |
| return | |
| last_stderr = err | |
| time.sleep(3 * (attempt + 1)) | |
| raise RuntimeError(f"opencode install failed: {last_stderr[-1000:]}") | |
| def _ensure_dirs_exist(sbx: Sandbox) -> None: | |
| """When using a pre-baked template, dirs already exist. This is a no-op | |
| safety net that ensures the layout is present (cheap mkdir -p).""" | |
| _exec( | |
| sbx, | |
| f"mkdir -p {HOME}/.config/opencode {HOME}/logs/agent " | |
| f"{HOME}/logs/verifier {HOME}/task {WORKDIR} {HOME}/proxy", | |
| timeout=30, | |
| ) | |
| def _start_proxy( | |
| sbx: Sandbox, | |
| upstream_url: str, | |
| upstream_api_key: str, | |
| upstream_model: str, | |
| *, | |
| top_logprobs: int, | |
| max_tokens_cap: int, | |
| disable_thinking: bool, | |
| skip_install: bool = False, | |
| ) -> str: | |
| """Upload + start the logprob-capture proxy, return its baseURL. | |
| Returns the URL opencode should hit (``http://127.0.0.1:7000/v1``). | |
| When ``skip_install`` is True (pre-baked template), the proxy source | |
| and pip deps are assumed to already be present. | |
| """ | |
| if not skip_install: | |
| if not _PROXY_SOURCE_PATH.exists(): | |
| raise RuntimeError( | |
| f"proxy source not found at {_PROXY_SOURCE_PATH} — needed " | |
| "for transparent_proxy mode" | |
| ) | |
| _write_text(sbx, PROXY_SCRIPT_PATH, _PROXY_SOURCE_PATH.read_text()) | |
| code, _, err = _exec( | |
| sbx, | |
| "pip install --quiet 'fastapi>=0.104' 'uvicorn[standard]>=0.24' " | |
| "'httpx>=0.27' 2>&1 | tail -20", | |
| timeout=180, | |
| ) | |
| if code != 0: | |
| raise RuntimeError(f"proxy deps install failed: {err[-800:]}") | |
| flags = ( | |
| f"--upstream-url {upstream_url} " | |
| f"--upstream-api-key {upstream_api_key} " | |
| f"--trace {PROXY_TRACE} " | |
| f"--port {PROXY_PORT} " | |
| f"--top-logprobs {top_logprobs} " | |
| f"--max-tokens-cap {max_tokens_cap} " | |
| f"--model-override '{upstream_model}' " | |
| ) | |
| if disable_thinking: | |
| flags += "--disable-thinking " | |
| cmd = ( | |
| f"cd {HOME}/proxy && " | |
| f"python interception.py {flags}> {PROXY_LOG} 2>&1" | |
| ) | |
| sbx.commands.run(cmd, background=True, timeout=0) | |
| # Wait for healthz. | |
| for _ in range(120): | |
| code, _, _ = _exec( | |
| sbx, f"curl -sf http://127.0.0.1:{PROXY_PORT}/healthz", timeout=5 | |
| ) | |
| if code == 0: | |
| return f"http://127.0.0.1:{PROXY_PORT}/v1" | |
| time.sleep(0.5) | |
| log = _safe_read(sbx, PROXY_LOG) | |
| raise RuntimeError(f"proxy did not start within 60s. log:\n{log[-2000:]}") | |
| def _write_opencode_json( | |
| sbx: Sandbox, | |
| base_url: str, | |
| api_key: str, | |
| model: str, | |
| request_timeout_ms: int = 600_000, | |
| ) -> None: | |
| """Stage opencode.json for ``@ai-sdk/openai-compatible``. | |
| All three endpoints route through the OAI-compatible adapter — the proxy | |
| serves ``/v1/chat/completions`` and so does each upstream we target. | |
| """ | |
| inner_model = model.split("/", 1)[-1] | |
| doc = { | |
| "$schema": "https://opencode.ai/config.json", | |
| "model": f"intercepted/{inner_model}", | |
| "provider": { | |
| "intercepted": { | |
| "npm": "@ai-sdk/openai-compatible", | |
| "name": "Intercepted", | |
| "options": { | |
| "baseURL": base_url, | |
| "apiKey": api_key, | |
| "timeout": request_timeout_ms, | |
| }, | |
| "models": {inner_model: {"name": "Intercepted Model"}}, | |
| } | |
| }, | |
| "tools": {"webfetch": False, "question": False}, | |
| } | |
| _write_text(sbx, OPENCODE_CONFIG, json.dumps(doc, indent=2)) | |
| # --------------------------------------------------------------------------- | |
| # Run + verify + collect. | |
| # --------------------------------------------------------------------------- | |
| def _run_agent( | |
| sbx: Sandbox, | |
| *, | |
| instruction_path: str, | |
| base_url: str, | |
| api_key: str, | |
| timeout_s: float, | |
| ) -> int: | |
| """Invoke ``opencode run`` synchronously, return its exit code.""" | |
| envs = { | |
| "OPENAI_BASE_URL": base_url, | |
| "OPENAI_API_KEY": api_key, | |
| "OPENCODE_CONFIG": OPENCODE_CONFIG, | |
| "PATH": f"{HOME}/.opencode/bin:/usr/local/bin:/usr/bin:/bin", | |
| } | |
| cmd = ( | |
| f'export PATH="{HOME}/.opencode/bin:$PATH" && ' | |
| f"cd {WORKDIR} && " | |
| f"opencode run --format json --dangerously-skip-permissions " | |
| f'"$(cat {instruction_path})" 2>&1 | tee {AGENT_LOG}' | |
| ) | |
| return _exec_bg_with_timeout( | |
| sbx, cmd, envs=envs, timeout_s=timeout_s | |
| ) | |
| def _run_verifier(sbx: Sandbox) -> tuple[float | None, dict[str, str], str]: | |
| cmd = f"mkdir -p {HOME}/logs/verifier && python {VERIFIER_PATH}" | |
| code, out, err = _exec(sbx, cmd, timeout=120) | |
| reward_str = _safe_read(sbx, REWARD_FILE).strip() | |
| results_str = _safe_read(sbx, RESULTS_FILE) | |
| try: | |
| reward = float(reward_str) if reward_str else None | |
| except ValueError: | |
| reward = None | |
| try: | |
| tests = json.loads(results_str) if results_str.strip() else {} | |
| except json.JSONDecodeError: | |
| tests = {} | |
| combined = (out + ("\n" + err if err else "")).strip() | |
| return reward, tests, combined[-3000:] | |
| def _collect_files(sbx: Sandbox) -> tuple[dict[str, str], list[str]]: | |
| files: dict[str, str] = {} | |
| for name in MODULES: | |
| path = f"{WORKDIR}/{name}.py" | |
| try: | |
| if sbx.files.exists(path): | |
| files[f"{name}.py"] = sbx.files.read(path)[:8000] | |
| except Exception: | |
| pass | |
| code, out, _ = _exec( | |
| sbx, | |
| f"find {WORKDIR} -maxdepth 1 -type f -printf '%f\\n' 2>/dev/null", | |
| timeout=10, | |
| ) | |
| extras: list[str] = [] | |
| expected = {f"{m}.py" for m in MODULES} | |
| for line in (out or "").splitlines(): | |
| n = line.strip() | |
| if n and n not in expected and not n.startswith("."): | |
| extras.append(n) | |
| return files, extras | |
| def _read_proxy_trace(sbx: Sandbox) -> list[dict[str, Any]]: | |
| raw = _safe_read(sbx, PROXY_TRACE) | |
| out: list[dict[str, Any]] = [] | |
| for line in raw.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| out.append(json.loads(line)) | |
| except Exception: | |
| pass | |
| return out | |
| def _logprob_stats(turns: list[dict[str, Any]]) -> LogprobStats: | |
| s = LogprobStats(n_turns=len(turns)) | |
| if not turns: | |
| return s | |
| all_lps: list[float] = [] | |
| finish: dict[str, int] = {} | |
| for t in turns: | |
| toks = t.get("completion_tokens") or [] | |
| lps = t.get("per_token_logps") or [] | |
| s.tokens_per_turn.append(len(toks)) | |
| s.total_completion_tokens += len(toks) | |
| if toks: | |
| s.productive_turns += 1 | |
| all_lps.extend(float(x) for x in lps if x is not None) | |
| fr = t.get("finish_reason") or "unknown" | |
| finish[fr] = finish.get(fr, 0) + 1 | |
| s.finish_reasons = finish | |
| if all_lps: | |
| s.mean_logprob = mean(all_lps) | |
| first = next((t for t in turns if t.get("completion_tokens")), None) | |
| last = next( | |
| (t for t in reversed(turns) if t.get("completion_tokens")), None | |
| ) | |
| if first: | |
| s.first_token = str(first["completion_tokens"][0]) | |
| lp = (first.get("per_token_logps") or [None])[0] | |
| if lp is not None: | |
| s.first_logprob = float(lp) | |
| if last: | |
| s.last_token = str(last["completion_tokens"][-1]) | |
| lp = (last.get("per_token_logps") or [None])[-1] | |
| if lp is not None: | |
| s.last_logprob = float(lp) | |
| return s | |
| # --------------------------------------------------------------------------- | |
| # One full rollout. | |
| # --------------------------------------------------------------------------- | |
| def run_one( | |
| ep: Endpoint, | |
| *, | |
| mode: str, | |
| agent_timeout_s: float, | |
| max_tokens_cap: int, | |
| top_logprobs: int, | |
| disable_thinking: bool, | |
| instruction: str, | |
| e2b_api_key: str, | |
| template: str | None = None, | |
| ) -> RunResult: | |
| print( | |
| f"[{ep.label}] launching base_url={ep.base_url} model={ep.model} " | |
| f"mode={mode} template={template or '(default)'}", | |
| flush=True, | |
| ) | |
| res = RunResult(endpoint=ep.label, model=ep.model, base_url=ep.base_url) | |
| started = time.time() | |
| sbx = Sandbox.create( | |
| template=template, | |
| timeout=int(agent_timeout_s) + 300, | |
| api_key=e2b_api_key, | |
| ) | |
| res.sandbox_id = sbx.sandbox_id | |
| print(f"[{ep.label}] sandbox={sbx.sandbox_id}", flush=True) | |
| try: | |
| _wait_for_sandbox_ready(sbx) | |
| if template: | |
| _ensure_dirs_exist(sbx) | |
| else: | |
| _install_opencode(sbx) | |
| _write_text(sbx, INSTRUCTION_PATH, instruction) | |
| _write_text(sbx, VERIFIER_PATH, VERIFIER_SOURCE) | |
| if mode == "transparent_proxy": | |
| base_url = _start_proxy( | |
| sbx, | |
| upstream_url=ep.base_url, | |
| upstream_api_key=ep.api_key, | |
| upstream_model=ep.model, | |
| top_logprobs=top_logprobs, | |
| max_tokens_cap=max_tokens_cap, | |
| disable_thinking=disable_thinking, | |
| skip_install=bool(template), | |
| ) | |
| else: | |
| base_url = ep.base_url | |
| _write_opencode_json( | |
| sbx, | |
| base_url=base_url, | |
| api_key=ep.api_key if mode == "black_box" else "intercepted", | |
| model=ep.model, | |
| ) | |
| try: | |
| res.agent_exit_code = _run_agent( | |
| sbx, | |
| instruction_path=INSTRUCTION_PATH, | |
| base_url=base_url, | |
| api_key=ep.api_key if mode == "black_box" else "intercepted", | |
| timeout_s=agent_timeout_s, | |
| ) | |
| print( | |
| f"[{ep.label}] agent exit_code={res.agent_exit_code}", | |
| flush=True, | |
| ) | |
| except TimeoutError as exc: | |
| res.error = f"agent timeout: {exc}" | |
| print(f"[{ep.label}] {res.error}", flush=True) | |
| reward, tests, vstdout = _run_verifier(sbx) | |
| res.reward, res.tests, res.verifier_stdout = reward, tests, vstdout | |
| res.files, res.files_extra = _collect_files(sbx) | |
| turns = _read_proxy_trace(sbx) | |
| res.logprobs = _logprob_stats(turns) | |
| # Capture truncated request/response per turn for debugging. Strip | |
| # large/noisy fields (full token logprobs, raw bytes) to keep the | |
| # artifact readable. | |
| for t in turns: | |
| req = t.get("request") or {} | |
| resp = t.get("response") or {} | |
| res.raw_turns.append( | |
| { | |
| "turn": t.get("turn"), | |
| "finish_reason": t.get("finish_reason"), | |
| "latency_s": t.get("latency_s"), | |
| "request_messages": req.get("messages", [])[-6:], | |
| "request_tools": [ | |
| (tool.get("function") or {}).get("name", "?") | |
| for tool in (req.get("tools") or []) | |
| ], | |
| "request_temperature": req.get("temperature"), | |
| "request_max_tokens": req.get("max_tokens") | |
| or req.get("max_completion_tokens"), | |
| "response_choices": [ | |
| { | |
| "finish_reason": ch.get("finish_reason"), | |
| "message_content": (ch.get("message") or {}).get( | |
| "content" | |
| ), | |
| "tool_calls": [ | |
| { | |
| "name": (tc.get("function") or {}).get( | |
| "name" | |
| ), | |
| "arguments": str( | |
| (tc.get("function") or {}).get( | |
| "arguments", "" | |
| ) | |
| )[:500], | |
| } | |
| for tc in ( | |
| (ch.get("message") or {}).get("tool_calls") | |
| or [] | |
| ) | |
| ], | |
| } | |
| for ch in (resp.get("choices") or []) | |
| ], | |
| "upstream_status": resp.get("upstream_status"), | |
| "upstream_error": resp.get("upstream_error"), | |
| } | |
| ) | |
| res.proxy_log_tail = _safe_read(sbx, PROXY_LOG)[-2000:] | |
| res.agent_log_tail = _safe_read(sbx, AGENT_LOG)[-4000:] | |
| except Exception as exc: # noqa: BLE001 | |
| res.error = f"{type(exc).__name__}: {exc}" | |
| print(f"[{ep.label}] ERROR {res.error}", flush=True) | |
| finally: | |
| try: | |
| sbx.kill() | |
| except Exception: | |
| pass | |
| res.wall_s = time.time() - started | |
| return res | |
| # --------------------------------------------------------------------------- | |
| # Reporting. | |
| # --------------------------------------------------------------------------- | |
| def _format_summary(results: list[RunResult]) -> str: | |
| lines: list[str] = [] | |
| sep = "-" * 110 | |
| lines.append(sep) | |
| lines.append( | |
| f"{'endpoint':<10} {'model':<42} {'reward':<8} {'pass':<6} " | |
| f"{'turns':<6} {'tokens':<8} {'mean-logp':<11} {'wall':<8}" | |
| ) | |
| lines.append(sep) | |
| for r in results: | |
| reward = f"{r.reward:.2f}" if r.reward is not None else "-" | |
| pass_str = f"{r.passed}/{len(MODULES)}" | |
| mean_lp = ( | |
| f"{r.logprobs.mean_logprob:+.3f}" | |
| if r.logprobs.mean_logprob is not None | |
| else "-" | |
| ) | |
| lines.append( | |
| f"{r.endpoint:<10} {r.model[:42]:<42} {reward:<8} {pass_str:<6} " | |
| f"{r.logprobs.n_turns:<6} {r.logprobs.total_completion_tokens:<8} " | |
| f"{mean_lp:<11} {r.wall_s:<7.1f}s" | |
| ) | |
| lines.append(sep) | |
| lines.append("") | |
| lines.append("per-file results:") | |
| for r in results: | |
| per_file = " ".join( | |
| f"{m}={r.tests.get(m, '?')}" for m in MODULES | |
| ) | |
| lines.append(f" {r.endpoint:<10} {per_file}") | |
| if r.files_extra: | |
| lines.append( | |
| f" {' ':<10} extras: {', '.join(sorted(r.files_extra))}" | |
| ) | |
| if r.error: | |
| lines.append(f" {' ':<10} ERROR: {r.error[:200]}") | |
| return "\n".join(lines) | |
| def _save_artifact(r: RunResult, out_dir: Path) -> Path: | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| ts = int(time.time()) | |
| target = out_dir / f"sorting_{r.endpoint}_{ts}.json" | |
| target.write_text(json.dumps(asdict(r), indent=2, default=str)) | |
| return target | |
| # --------------------------------------------------------------------------- | |
| # CLI. | |
| # --------------------------------------------------------------------------- | |
| def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: | |
| p = argparse.ArgumentParser( | |
| prog="test_five_sorts_e2e", | |
| description=( | |
| "Run opencode end-to-end against vLLM / OpenAI / HF Router, " | |
| "write 5 sorting algorithms in 5 files, verify them, return " | |
| "logprobs + tests + filesystem." | |
| ), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| p.add_argument( | |
| "--endpoint", | |
| choices=["vllm", "openai", "hf_router", "all"], | |
| default="all", | |
| help="Which endpoint to test (default: all configured endpoints).", | |
| ) | |
| p.add_argument( | |
| "--mode", | |
| choices=["transparent_proxy", "black_box"], | |
| default="transparent_proxy", | |
| help=( | |
| "transparent_proxy captures per-token logprobs; black_box skips " | |
| "the proxy. Default: transparent_proxy." | |
| ), | |
| ) | |
| p.add_argument( | |
| "--agent-timeout", | |
| type=float, | |
| default=600.0, | |
| help="Seconds to wait for opencode to finish (default 600).", | |
| ) | |
| p.add_argument( | |
| "--max-tokens-cap", | |
| type=int, | |
| default=4096, | |
| help="Per-turn max_tokens clamp on forwarded requests (default 4096).", | |
| ) | |
| p.add_argument( | |
| "--top-logprobs", | |
| type=int, | |
| default=5, | |
| help="Top-k logprobs requested from the upstream (HF Router cap is 5).", | |
| ) | |
| p.add_argument( | |
| "--disable-thinking", | |
| choices=["auto", "on", "off"], | |
| default="auto", | |
| help=( | |
| "Inject ``chat_template_kwargs.enable_thinking=false`` on " | |
| "forwarded requests. ``auto`` = on for vllm, off for openai / " | |
| "hf_router (default). ``on`` / ``off`` forces it for every " | |
| "endpoint." | |
| ), | |
| ) | |
| p.add_argument( | |
| "--save-artifacts", | |
| action="store_true", | |
| help="Dump per-run JSON to envs/opencode_env/tests/_artifacts/.", | |
| ) | |
| p.add_argument( | |
| "--instruction-override", | |
| default=None, | |
| help="Replace the default 5-sorts instruction.", | |
| ) | |
| p.add_argument( | |
| "--no-summary-files", | |
| action="store_true", | |
| help="Skip printing file contents in the summary.", | |
| ) | |
| p.add_argument( | |
| "--template", | |
| default=None, | |
| help=( | |
| "E2B template name to use (e.g. ``opencode-rl`` after running " | |
| "build_e2b_template.py). When set, skips opencode install + " | |
| "pip-deps install (already in the template) — saves ~2 min " | |
| "per rollout." | |
| ), | |
| ) | |
| return p.parse_args(argv) | |
| def main(argv: list[str] | None = None) -> int: | |
| args = _parse_args(argv) | |
| e2b_api_key = os.environ.get("E2B_API_KEY") | |
| if not e2b_api_key: | |
| print( | |
| "ERROR: E2B_API_KEY is required (set it in .env or your shell).", | |
| file=sys.stderr, | |
| ) | |
| return 2 | |
| print(f"Loading env from {_DOTENV_PATH}") | |
| endpoints, skipped = _resolve_endpoints() | |
| if args.endpoint != "all": | |
| endpoints = [e for e in endpoints if e.label == args.endpoint] | |
| instruction = args.instruction_override or INSTRUCTION | |
| runs: list[RunResult] = [] | |
| for ep in endpoints: | |
| if args.disable_thinking == "on": | |
| disable_thinking = True | |
| elif args.disable_thinking == "off": | |
| disable_thinking = False | |
| else: | |
| disable_thinking = ep.disable_thinking_default | |
| runs.append( | |
| run_one( | |
| ep, | |
| mode=args.mode, | |
| agent_timeout_s=args.agent_timeout, | |
| max_tokens_cap=args.max_tokens_cap, | |
| top_logprobs=args.top_logprobs, | |
| disable_thinking=disable_thinking, | |
| instruction=instruction, | |
| e2b_api_key=e2b_api_key, | |
| template=args.template, | |
| ) | |
| ) | |
| print() | |
| print(_format_summary(runs)) | |
| if skipped: | |
| print("\nSkipped (not configured):") | |
| for s in skipped: | |
| print(f" - {s}") | |
| if not args.no_summary_files: | |
| for r in runs: | |
| print(f"\n=== files written by {r.endpoint} ({r.model}) ===") | |
| for fname, src in r.files.items(): | |
| head = "\n".join(src.splitlines()[:20]) | |
| print(f"--- {fname} (first 20 lines) ---") | |
| print(head) | |
| if src.count("\n") > 20: | |
| print(f"... ({src.count(chr(10)) - 20} more lines)") | |
| if args.save_artifacts: | |
| out_dir = _ENV_DIR / "tests" / "_artifacts" | |
| for r in runs: | |
| print(f"saved {_save_artifact(r, out_dir)}") | |
| if not runs: | |
| print("\nNo endpoints ran. Fill in .env and re-run.") | |
| return 2 | |
| failed = [r for r in runs if r.reward is None or r.reward < 1.0 or r.error] | |
| if failed: | |
| print(f"\n{len(failed)}/{len(runs)} endpoint(s) did not reach reward=1.0.") | |
| return 1 | |
| print(f"\nAll {len(runs)} endpoint(s) reached reward=1.0.") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |