File size: 5,204 Bytes
989722c
29308b1
 
 
 
 
989722c
 
29308b1
989722c
29308b1
989722c
 
 
 
 
 
 
 
 
 
 
29308b1
 
 
 
 
 
 
 
 
 
989722c
29308b1
 
 
 
 
989722c
 
 
 
29308b1
989722c
29308b1
989722c
29308b1
989722c
 
29308b1
989722c
019e7db
989722c
 
 
 
 
 
 
 
 
 
 
 
 
29308b1
989722c
 
 
29308b1
989722c
 
 
 
 
 
29308b1
989722c
29308b1
989722c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29308b1
 
 
989722c
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Strict-output inference runtime for OpenEnv validators."""

from __future__ import annotations

from typing import Any

from compat import install_openenv_fastmcp_compat

from app.agents.review_agent import ReviewAgent
from app.models.inference import AgentDecision, InferenceConfig
from app.services.openai_service import OpenAIActionPlanner
from app.utils.runtime import (
    compact_text,
    format_bool,
    format_error,
    format_reward,
    observation_attr,
    parse_task_ids,
    suppress_output,
)

install_openenv_fastmcp_compat()

try:
    from models import PythonCodeReviewAction
    from server.env import PythonCodeReviewEnvironment
except ImportError:  # pragma: no cover
    from python_env.models import PythonCodeReviewAction  # type: ignore[no-redef]
    from python_env.server.env import PythonCodeReviewEnvironment  # type: ignore[no-redef]


class InferenceRunner:
    """Run benchmark tasks with strict single-line progress output."""

    def __init__(self, config: InferenceConfig) -> None:
        self.config = config
        self.agent = ReviewAgent(OpenAIActionPlanner(config))

    def run(self) -> int:
        for task_name in parse_task_ids():
            self.run_task(task_name)
        return 0

    def run_task(self, task_name: str) -> None:
        rewards: list[str] = []
        step_count = 0
        success = False
        fatal_error: str | None = None
        final_score = 0.0

        self._emit_start(task_name)

        try:
            env = self._create_env()
            observation = self._reset_env(env, task_name)
            done = bool(observation_attr(observation, "done", False))
            final_score = float(observation_attr(observation, "score", 0.0) or 0.0)
            max_steps = max(
                1,
                min(
                    self.config.max_episode_steps,
                    int(observation_attr(observation, "attempts_remaining", self.config.max_episode_steps) or self.config.max_episode_steps),
                ),
            )
            while not done and step_count < max_steps:
                decision = self.agent.act(observation)
                observation, reward, done, info = self._step_env(env, decision)
                step_count += 1
                final_score = float(observation_attr(observation, "score", final_score) or final_score)
                rewards.append(format_reward(reward))
                step_error = self._resolve_step_error(info, observation, decision)
                self._emit_step(step_count, decision.action_type, reward, done, step_error)

            if not done and step_count >= max_steps:
                fatal_error = "step budget exhausted"
            success = bool(done) and fatal_error is None and final_score >= self.config.success_threshold
        except Exception as exc:
            fatal_error = compact_text(f"{type(exc).__name__}: {exc}", default="runtime failure")
        finally:
            self._emit_end(success=success, step_count=step_count, rewards=rewards)

    def _create_env(self) -> PythonCodeReviewEnvironment:
        with suppress_output():
            return PythonCodeReviewEnvironment(verbose=False)

    def _reset_env(self, env: PythonCodeReviewEnvironment, task_name: str) -> Any:
        with suppress_output():
            return env.reset(task_id=task_name)

    def _step_env(
        self,
        env: PythonCodeReviewEnvironment,
        decision: AgentDecision,
    ) -> tuple[Any, float, bool, dict[str, Any]]:
        action = PythonCodeReviewAction(action_type=decision.action_type, code=decision.code)
        with suppress_output():
            observation, reward, done, info = env.step_result(action)
        return observation, float(reward), bool(done), dict(info or {})

    def _resolve_step_error(
        self,
        info: dict[str, Any],
        observation: Any,
        decision: AgentDecision,
    ) -> str | None:
        env_error = compact_text(
            info.get("last_action_error") or observation_attr(observation, "last_action_error", None),
            default="",
        )
        if env_error:
            return env_error
        if decision.error:
            return compact_text(decision.error, default="")
        return None

    def _emit_start(self, task_name: str) -> None:
        print(
            f"[START] task={task_name} env={self.config.benchmark_name} model={self.config.model_name}",
            flush=True,
        )

    def _emit_step(self, step_count: int, action: str, reward: float, done: bool, error: str | None) -> None:
        print(
            f"[STEP]  step={step_count} action={compact_text(action, default='analyze_code')} "
            f"reward={format_reward(reward)} done={format_bool(done)} error={format_error(error)}",
            flush=True,
        )

    def _emit_end(self, *, success: bool, step_count: int, rewards: list[str]) -> None:
        print(
            f"[END]   success={format_bool(success)} steps={step_count} rewards={','.join(rewards)}",
            flush=True,
        )


def main() -> int:
    """Entrypoint used by the root-level inference wrapper."""

    return InferenceRunner(InferenceConfig.from_env()).run()