opencode-env / harness.py
AdithyaSK's picture
AdithyaSK HF Staff
Upload folder using huggingface_hub
70f2179 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""OpenCode session factory + session implementation.
Implements the :class:`ResourceSessionFactory` / :class:`ResourceSession`
contracts from ``openenv.core.harness`` (PR #471). The session wraps one
sandbox running the ``opencode`` CLI agent.
Two operating modes:
- ``mode="black_box"`` — opencode talks directly to ``config.base_url``.
No proxy, no logprob capture. Use for smoke tests / SFT / eval.
- ``mode="transparent_proxy"`` (default) — an in-sandbox FastAPI proxy
sits between opencode and the upstream LLM. It injects ``logprobs=true``
on every request and writes per-turn ``(messages, completion_tokens,
per_token_logps)`` to ``proxy_trace.jsonl`` for GRPO consumption.
Single driver path: opencode is started as a background subprocess via
``opencode run --format json --dangerously-skip-permissions ...`` and we
poll its exit code. The previous ``opencode serve`` driver was removed —
opencode CLI is the only path now.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Callable, Literal
from openenv.core.env_server.mcp_types import Tool
from openenv.core.harness import (
Message,
ResourceSession,
ResourceSessionFactory,
ToolResult,
VerifyResult,
)
from .config import OpenCodeConfig
from .opencode_runtime import (
agent_log_path,
build_env_vars,
build_install_cmd,
build_opencode_json,
build_run_cmd,
instruction_path,
opencode_config_path,
system_prompt_path,
)
from .sandbox.base import BgJob, SandboxBackend, SandboxHandle
from .task import OpenCodeTask
# Inside-sandbox proxy paths (Mode B).
_PROXY_PORT = 7000
_PROXY_TRACE_PATH = "/home/user/logs/agent/proxy_trace.jsonl"
_PROXY_LOG_PATH = "/home/user/logs/agent/proxy.log"
# Where the proxy source lives on disk (in this repo). Uploaded into the
# sandbox at /home/user/proxy/interception.py before each rollout, unless
# the sandbox was created from a template that already has it baked in.
_PROXY_SOURCE_PATH = Path(__file__).parent / "sandbox" / "interception.py"
Verifier = Callable[[SandboxHandle, OpenCodeTask], VerifyResult]
class OpenCodeSession(ResourceSession):
"""One live OpenCode rollout inside a sandbox.
The session is created already-running: :meth:`OpenCodeSessionFactory.create`
calls :meth:`start_agent` before returning. Typical usage::
session = factory.create(task)
session.wait_for_completion()
result = session.verify([])
session.close()
"""
def __init__(
self,
*,
sandbox: SandboxHandle,
config: OpenCodeConfig,
task: OpenCodeTask,
verifier: Verifier | None = None,
base_url_override: str | None = None,
proxy_trace_path: str | None = None,
proxy_bg_job: BgJob | None = None,
) -> None:
self.sandbox = sandbox
self.config = config
self.task = task
self._verifier = verifier
self._base_url_override = base_url_override
self._bg_job: BgJob | None = None
self._proxy_trace_path = proxy_trace_path
self._proxy_bg_job = proxy_bg_job
# ------------------------------------------------------------------
# ResourceSession contract (PR #471)
# ------------------------------------------------------------------
def initial_messages(self) -> list[Message]:
return [{"role": "user", "content": self.task.instruction}]
def list_tools(self) -> list[Tool]:
# OpenCode owns its own tool loop — none are exposed to the harness.
return []
def call_tool(self, name: str, arguments: dict[str, Any]) -> ToolResult:
return ToolResult(
error=(
"OpenCodeSession does not expose external tool calls; the "
"CLI agent owns its own tool loop."
)
)
def verify(
self,
transcript: list[Message],
final_state: Any | None = None,
) -> VerifyResult:
if self._verifier is None:
return VerifyResult(env_reward=None, done=True)
return self._verifier(self.sandbox, self.task)
def close(self) -> None:
if self._bg_job is not None:
try:
self._bg_job.kill()
except Exception:
pass
self._bg_job = None
if self._proxy_bg_job is not None:
try:
self._proxy_bg_job.kill()
except Exception:
pass
self._proxy_bg_job = None
self.sandbox.kill()
# ------------------------------------------------------------------
# OpenCode-specific session API
# ------------------------------------------------------------------
def start_agent(self) -> None:
"""Launch ``opencode run`` as a background subprocess in the sandbox."""
if self._bg_job is not None:
return
cmd = build_run_cmd(self.config)
envs = build_env_vars(self.config, base_url_override=self._base_url_override)
self._bg_job = self.sandbox.start_bg(cmd, envs=envs)
def wait_for_completion(self, timeout_s: float | None = None) -> int:
"""Block until the agent exits, returning its exit code."""
budget = timeout_s if timeout_s is not None else self.config.agent_timeout_s
if self._bg_job is None:
raise RuntimeError("Agent not started; call start_agent() first.")
return self._bg_job.wait(timeout=budget)
def fetch_trace(self) -> str:
"""Return the raw ``opencode run`` log (JSON-lines when ``run_format=json``)."""
return self.sandbox.read_text(agent_log_path(self.config))
def fetch_proxy_trace(self) -> list[dict[str, Any]]:
"""Return per-turn proxy-captured records (Mode B only).
Each entry has ``request``, ``response``, ``completion_tokens``,
``completion_token_ids``, ``per_token_logps``, ``finish_reason``,
and ``latency_s``. Returns ``[]`` in Mode A.
"""
if self._proxy_trace_path is None:
return []
try:
content = self.sandbox.read_text(self._proxy_trace_path)
except Exception:
return []
records: list[dict[str, Any]] = []
for line in content.splitlines():
line = line.strip()
if not line:
continue
import json as _json
records.append(_json.loads(line))
return records
class OpenCodeSessionFactory(ResourceSessionFactory):
"""Produce isolated per-rollout :class:`OpenCodeSession` instances.
The factory owns sandbox provisioning, opencode install, config injection,
and (Mode B) proxy startup. Each :meth:`create` call returns a fresh
sandbox with a running agent.
"""
def __init__(
self,
*,
config: OpenCodeConfig,
sandbox_backend: SandboxBackend,
mode: Literal["black_box", "transparent_proxy"] = "black_box",
verifier: Verifier | None = None,
install_timeout_s: int = 240,
setup_timeout_s: int = 300,
) -> None:
if mode not in {"black_box", "transparent_proxy"}:
raise ValueError(f"Unknown mode: {mode!r}")
self._config = config
self._backend = sandbox_backend
self._mode = mode
self._verifier = verifier
self._install_timeout_s = install_timeout_s
self._setup_timeout_s = setup_timeout_s
def create(
self,
task: Any,
seed: int | None = None,
episode_id: str | None = None,
) -> OpenCodeSession:
import logging
_log = logging.getLogger(__name__)
oc_task = OpenCodeTask.coerce(task)
sandbox_timeout = int(self._config.agent_timeout_s) + 300
_log.info(
"factory.create: creating sandbox timeout=%ds mode=%s",
sandbox_timeout, self._mode,
)
sandbox = self._backend.create(
timeout_s=sandbox_timeout,
metadata={"episode_id": episode_id} if episode_id else None,
)
sid = (
getattr(sandbox, "sandbox_id", None)
or getattr(getattr(sandbox, "raw", None), "sandbox_id", "?")
)
_log.info("factory.create: sandbox=%s — bootstrapping…", sid)
try:
self._bootstrap_sandbox(sandbox, oc_task)
except Exception as exc:
_log.error("factory.create: bootstrap failed: %r", exc)
sandbox.kill()
raise
base_url_override: str | None = None
proxy_trace_path: str | None = None
proxy_bg_job: BgJob | None = None
if self._mode == "transparent_proxy":
_log.info(
"factory.create: starting interception proxy on :%d → %s",
_PROXY_PORT, self._config.base_url,
)
proxy_bg_job, base_url_override, proxy_trace_path = self._start_proxy(
sandbox
)
_log.info("factory.create: proxy up at %s", base_url_override)
# Rewrite opencode.json so opencode points at the proxy. Force
# ``openai_compatible`` so opencode hits ``/v1/chat/completions``
# (which the proxy serves) rather than provider-specific paths.
from .config import OpenCodeConfig as _OCC
proxy_cfg = _OCC(
**{
**self._config.model_dump(),
"provider": "openai_compatible",
"base_url": base_url_override,
}
)
sandbox.write_text(
opencode_config_path(self._config),
build_opencode_json(proxy_cfg),
)
session = OpenCodeSession(
sandbox=sandbox,
config=self._config,
task=oc_task,
verifier=self._verifier,
base_url_override=base_url_override,
proxy_trace_path=proxy_trace_path,
proxy_bg_job=proxy_bg_job,
)
session.start_agent()
return session
# ------------------------------------------------------------------
def _wait_for_sandbox_ready(
self,
sandbox: SandboxHandle,
*,
attempts: int = 15,
delay_s: float = 1.0,
) -> None:
"""Probe the sandbox until ``echo ok`` succeeds.
E2B (and other backends) sometimes return the handle before the
guest is fully ready. Issue ``echo ok`` with short timeouts until
it succeeds. Returns silently on success; raises ``RuntimeError``
on prolonged failure.
"""
import time
last_err = ""
for _ in range(attempts):
try:
r = sandbox.exec("echo ok", timeout=5)
if r.exit_code == 0 and "ok" in (r.stdout or ""):
return
last_err = (r.stderr or r.stdout or "").strip() or f"exit={r.exit_code}"
except Exception as exc: # noqa: BLE001
last_err = f"{type(exc).__name__}: {exc}"
time.sleep(delay_s)
raise RuntimeError(
f"sandbox did not become ready within {attempts * delay_s:.0f}s "
f"(last error: {last_err})"
)
def _exec_with_retry(
self,
sandbox: SandboxHandle,
cmd: str,
*,
timeout: float,
attempts: int = 3,
backoff_s: float = 3.0,
label: str = "cmd",
):
"""Run ``sandbox.exec`` with exponential backoff on transient failure.
Transient = ``exit_code != 0`` AND empty stderr (SIGKILL / network
blip signature) OR an exception during exec. Final failure is raised
as ``RuntimeError`` carrying the last exit code + stderr.
"""
import time
last_stdout = ""
last_stderr = ""
last_exit = 0
for i in range(attempts):
try:
r = sandbox.exec(cmd, timeout=timeout)
if r.exit_code == 0:
return r
last_stdout = r.stdout or ""
last_stderr = r.stderr or ""
last_exit = r.exit_code
if last_stderr.strip():
break
except Exception as exc: # noqa: BLE001
last_stderr = f"{type(exc).__name__}: {exc}"
last_exit = -1
if i + 1 < attempts:
time.sleep(backoff_s * (2**i))
raise RuntimeError(
f"{label} failed after {attempts} attempts "
f"(exit={last_exit}, stderr={last_stderr!r}, stdout_tail={last_stdout[-400:]!r})"
)
def _opencode_already_installed(self, sandbox: SandboxHandle) -> bool:
"""Cheap probe — returns True if opencode is on disk in the sandbox.
Used to skip the slow ``curl install`` step when running against a
prebaked template that already ships opencode.
"""
try:
r = sandbox.exec(
"/home/user/.opencode/bin/opencode --version",
timeout=10,
)
return r.exit_code == 0
except Exception:
return False
def _bootstrap_sandbox(
self,
sandbox: SandboxHandle,
task: OpenCodeTask,
) -> None:
"""Install opencode, write config + task files, run optional setup."""
# Stage 1: wait for the sandbox to be responsive.
self._wait_for_sandbox_ready(sandbox)
# Stage 2: install opencode (skipped if a prebaked template already
# has it). curl|bash is flaky — retry with backoff.
if not self._opencode_already_installed(sandbox):
self._exec_with_retry(
sandbox,
build_install_cmd(self._config),
timeout=self._install_timeout_s,
attempts=3,
backoff_s=3.0,
label="opencode install",
)
sandbox.write_text(
opencode_config_path(self._config),
build_opencode_json(self._config),
)
sandbox.write_text(instruction_path(self._config), task.instruction)
if self._config.system_prompt:
sandbox.write_text(
system_prompt_path(self._config),
self._config.system_prompt,
)
for remote_path, content in task.upload_files.items():
sandbox.write_text(remote_path, content)
if self._config.extra_setup_shell:
self._exec_with_retry(
sandbox,
self._config.extra_setup_shell,
timeout=self._setup_timeout_s,
attempts=2,
backoff_s=2.0,
label="extra_setup_shell",
)
if task.setup_shell:
r = sandbox.exec(task.setup_shell, timeout=self._setup_timeout_s)
if r.exit_code != 0:
raise RuntimeError(
f"task.setup_shell failed ({r.exit_code}): {r.stderr}"
)
def _start_proxy(
self,
sandbox: SandboxHandle,
) -> tuple[BgJob, str, str]:
"""Install proxy deps + start the proxy as a bg job inside the sandbox.
Returns ``(proxy_bg_job, base_url_override, proxy_trace_path)``.
Skips the pip install + source-upload steps when the prebaked
template already has them in place.
"""
proxy_already_present = sandbox.exists(
"/home/user/proxy/interception.py"
)
if not proxy_already_present:
# Install proxy deps (idempotent on retries).
self._exec_with_retry(
sandbox,
"pip install --quiet 'fastapi>=0.104' 'uvicorn[standard]>=0.24' "
"'httpx>=0.27' 2>&1 | tail -20",
timeout=180,
attempts=3,
backoff_s=2.0,
label="proxy deps install",
)
# Upload the proxy module into the sandbox.
sandbox.write_text(
"/home/user/proxy/interception.py",
_PROXY_SOURCE_PATH.read_text(),
)
sandbox.write_text("/home/user/proxy/__init__.py", "")
cap_flag = ""
if self._config.proxy_max_tokens_cap is not None:
cap_flag = f"--max-tokens-cap {self._config.proxy_max_tokens_cap} "
thinking_flag = ""
if self._config.proxy_disable_thinking:
thinking_flag = "--disable-thinking "
# Force the upstream model id on every forwarded request — opencode's
# internal title-gen call sometimes strips the provider prefix.
model_override_flag = ""
if self._config.model:
model_override_flag = f"--model-override '{self._config.model}' "
proxy_cmd = (
"cd /home/user/proxy && "
"python interception.py "
f"--upstream-url {self._config.base_url} "
f"--upstream-api-key {self._config.api_key} "
f"--trace {_PROXY_TRACE_PATH} "
f"--port {_PROXY_PORT} "
f"--top-logprobs {self._config.proxy_top_logprobs} "
f"{cap_flag}"
f"{thinking_flag}"
f"{model_override_flag}"
f"> {_PROXY_LOG_PATH} 2>&1"
)
proxy_job = sandbox.start_bg(proxy_cmd)
# Wait for the proxy to start listening. Cold uvicorn boot inside
# E2B can take anywhere from <1s to ~30s depending on cache state.
import time
attempts = 120
interval_s = 0.5
for _ in range(attempts):
r = sandbox.exec(
f"curl -sf http://127.0.0.1:{_PROXY_PORT}/healthz",
timeout=5,
)
if r.exit_code == 0:
break
time.sleep(interval_s)
else:
log = ""
try:
log = sandbox.read_text(_PROXY_LOG_PATH)
except Exception:
pass
proxy_job.kill()
raise RuntimeError(
f"proxy did not start within {attempts * interval_s:.0f}s. "
f"log:\n{log[-2000:]}"
)
base_url_override = f"http://127.0.0.1:{_PROXY_PORT}/v1"
return proxy_job, base_url_override, _PROXY_TRACE_PATH
__all__ = [
"OpenCodeSession",
"OpenCodeSessionFactory",
"OpenCodeTask",
"Verifier",
]