Spaces:
Sleeping
Sleeping
File size: 5,204 Bytes
989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 019e7db 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c 29308b1 989722c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """Strict-output inference runtime for OpenEnv validators."""
from __future__ import annotations
from typing import Any
from compat import install_openenv_fastmcp_compat
from app.agents.review_agent import ReviewAgent
from app.models.inference import AgentDecision, InferenceConfig
from app.services.openai_service import OpenAIActionPlanner
from app.utils.runtime import (
compact_text,
format_bool,
format_error,
format_reward,
observation_attr,
parse_task_ids,
suppress_output,
)
install_openenv_fastmcp_compat()
try:
from models import PythonCodeReviewAction
from server.env import PythonCodeReviewEnvironment
except ImportError: # pragma: no cover
from python_env.models import PythonCodeReviewAction # type: ignore[no-redef]
from python_env.server.env import PythonCodeReviewEnvironment # type: ignore[no-redef]
class InferenceRunner:
"""Run benchmark tasks with strict single-line progress output."""
def __init__(self, config: InferenceConfig) -> None:
self.config = config
self.agent = ReviewAgent(OpenAIActionPlanner(config))
def run(self) -> int:
for task_name in parse_task_ids():
self.run_task(task_name)
return 0
def run_task(self, task_name: str) -> None:
rewards: list[str] = []
step_count = 0
success = False
fatal_error: str | None = None
final_score = 0.0
self._emit_start(task_name)
try:
env = self._create_env()
observation = self._reset_env(env, task_name)
done = bool(observation_attr(observation, "done", False))
final_score = float(observation_attr(observation, "score", 0.0) or 0.0)
max_steps = max(
1,
min(
self.config.max_episode_steps,
int(observation_attr(observation, "attempts_remaining", self.config.max_episode_steps) or self.config.max_episode_steps),
),
)
while not done and step_count < max_steps:
decision = self.agent.act(observation)
observation, reward, done, info = self._step_env(env, decision)
step_count += 1
final_score = float(observation_attr(observation, "score", final_score) or final_score)
rewards.append(format_reward(reward))
step_error = self._resolve_step_error(info, observation, decision)
self._emit_step(step_count, decision.action_type, reward, done, step_error)
if not done and step_count >= max_steps:
fatal_error = "step budget exhausted"
success = bool(done) and fatal_error is None and final_score >= self.config.success_threshold
except Exception as exc:
fatal_error = compact_text(f"{type(exc).__name__}: {exc}", default="runtime failure")
finally:
self._emit_end(success=success, step_count=step_count, rewards=rewards)
def _create_env(self) -> PythonCodeReviewEnvironment:
with suppress_output():
return PythonCodeReviewEnvironment(verbose=False)
def _reset_env(self, env: PythonCodeReviewEnvironment, task_name: str) -> Any:
with suppress_output():
return env.reset(task_id=task_name)
def _step_env(
self,
env: PythonCodeReviewEnvironment,
decision: AgentDecision,
) -> tuple[Any, float, bool, dict[str, Any]]:
action = PythonCodeReviewAction(action_type=decision.action_type, code=decision.code)
with suppress_output():
observation, reward, done, info = env.step_result(action)
return observation, float(reward), bool(done), dict(info or {})
def _resolve_step_error(
self,
info: dict[str, Any],
observation: Any,
decision: AgentDecision,
) -> str | None:
env_error = compact_text(
info.get("last_action_error") or observation_attr(observation, "last_action_error", None),
default="",
)
if env_error:
return env_error
if decision.error:
return compact_text(decision.error, default="")
return None
def _emit_start(self, task_name: str) -> None:
print(
f"[START] task={task_name} env={self.config.benchmark_name} model={self.config.model_name}",
flush=True,
)
def _emit_step(self, step_count: int, action: str, reward: float, done: bool, error: str | None) -> None:
print(
f"[STEP] step={step_count} action={compact_text(action, default='analyze_code')} "
f"reward={format_reward(reward)} done={format_bool(done)} error={format_error(error)}",
flush=True,
)
def _emit_end(self, *, success: bool, step_count: int, rewards: list[str]) -> None:
print(
f"[END] success={format_bool(success)} steps={step_count} rewards={','.join(rewards)}",
flush=True,
)
def main() -> int:
"""Entrypoint used by the root-level inference wrapper."""
return InferenceRunner(InferenceConfig.from_env()).run()
|