Spaces:
Sleeping
Sleeping
| """ | |
| inference.py β CodeReviewEnv baseline inference script. | |
| Mandatory env vars: | |
| API_BASE_URL The API endpoint for the LLM. | |
| MODEL_NAME The model identifier to use for inference. | |
| HF_TOKEN Your Hugging Face / API key. | |
| STDOUT format (strictly followed): | |
| [START] task=<task_name> env=<benchmark> model=<model_name> | |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn> | |
| """ | |
| import json | |
| import os | |
| import sys | |
| import textwrap | |
| from typing import Any, Dict, List, Optional | |
| from openai import OpenAI | |
| sys.path.insert(0, os.path.dirname(__file__)) | |
| from env import CodeReviewEnv, TASK_IDS | |
| from models import ReviewAction | |
| # ββ Env vars ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| BENCHMARK = "code-review-env" | |
| SUCCESS_SCORE_THRESHOLD = 0.5 | |
| # ββ Logging helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| action_clean = action.replace("\n", " ").replace("\r", "")[:120] | |
| print( | |
| f"[STEP] step={step} action={action_clean} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # ββ Prompts βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM_PROMPT = textwrap.dedent(""" | |
| You are an expert software engineer performing a thorough code review. | |
| Your job is to: | |
| 1. Identify ALL bugs, security vulnerabilities, performance issues, and logic errors. | |
| 2. For each issue, output a JSON action with action_type="review". | |
| 3. After identifying all issues, output a patch with action_type="patch". | |
| 4. Finally, output action_type="submit" with your verdict. | |
| Each response must be a single valid JSON object. No markdown, no explanation outside JSON. | |
| Schema: | |
| { | |
| "action_type": "review" | "patch" | "comment" | "submit", | |
| "severity": "critical" | "major" | "minor" | "info", | |
| "issue_type": "bug" | "security" | "performance" | "logic" | "style", | |
| "line_number": <int or null>, | |
| "description": "<description of the issue>", | |
| "patched_code": "<full corrected code>", | |
| "comment": "<optional>", | |
| "verdict": "approve" | "request_changes" | "reject", | |
| "confidence": <0.0-1.0> | |
| } | |
| Output ONE JSON object per response. Be precise and thorough. | |
| """).strip() | |
| def build_user_prompt(obs: Dict[str, Any]) -> str: | |
| ctx = obs["review_context"] | |
| files_text = "\n\n".join( | |
| f"=== {f['filename']} ({f['language']}) ===\n{f['content']}" | |
| for f in ctx["files_changed"] | |
| ) | |
| issues_so_far = obs.get("issues_found_so_far", []) | |
| prompt = textwrap.dedent(f""" | |
| Pull Request: {ctx['pull_request_title']} | |
| Author: {ctx['author']} | |
| Description: {ctx['description']} | |
| Linter: {ctx.get('linter_output', 'N/A')} | |
| Tests: {ctx.get('test_results', 'N/A')} | |
| --- CODE --- | |
| {files_text} | |
| --- END CODE --- | |
| Step: {obs['step']} / {obs['max_steps']} | |
| Issues reported so far: {len(issues_so_far)} | |
| """).strip() | |
| if issues_so_far: | |
| prompt += "\n\nIssues already reported (do NOT repeat these):" | |
| for iss in issues_so_far: | |
| prompt += f"\n - [{iss.get('severity','?')}] line {iss.get('line','?')}: {iss.get('description','')}" | |
| steps_left = obs['max_steps'] - obs['step'] | |
| if steps_left <= 2: | |
| prompt += "\n\nYou are almost out of steps. Submit your patch and verdict NOW." | |
| elif obs['step'] == 0: | |
| prompt += "\n\nBegin your review. Output your first action as JSON." | |
| else: | |
| prompt += "\n\nContinue reviewing or submit if done. Output next action as JSON." | |
| return prompt | |
| # ββ JSON extraction βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_json(text: str) -> Dict[str, Any]: | |
| text = text.strip() | |
| if text.startswith("```"): | |
| lines = text.split("\n") | |
| text = "\n".join(lines[1:-1]) if len(lines) > 2 else text | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| pass | |
| start = text.find("{") | |
| if start == -1: | |
| raise ValueError("No JSON object found in response") | |
| depth = 0 | |
| for i, ch in enumerate(text[start:], start): | |
| if ch == "{": | |
| depth += 1 | |
| elif ch == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| return json.loads(text[start:i + 1]) | |
| raise ValueError("Unbalanced JSON in response") | |
| # ββ Episode runner ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_episode(client: OpenAI, task_id: str) -> Dict[str, Any]: | |
| env = CodeReviewEnv() | |
| obs_obj = env.reset(task_id) | |
| obs = obs_obj.model_dump() | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| history: List[Dict[str, str]] = [] | |
| patch_submitted = False | |
| error_msg: Optional[str] = None | |
| try: | |
| for step in range(1, obs_obj.max_steps + 1): | |
| if obs.get("done"): | |
| break | |
| error_msg = None | |
| steps_left = obs["max_steps"] - obs["step"] | |
| # Force patch then submit near step limit | |
| if steps_left <= 1 and not patch_submitted: | |
| action_dict = { | |
| "action_type": "patch", | |
| "patched_code": obs["review_context"]["files_changed"][0]["content"], | |
| } | |
| elif steps_left <= 0: | |
| action_dict = { | |
| "action_type": "submit", | |
| "verdict": "request_changes", | |
| "confidence": 0.5, | |
| } | |
| else: | |
| user_msg = build_user_prompt(obs) | |
| history.append({"role": "user", "content": user_msg}) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[{"role": "system", "content": SYSTEM_PROMPT}] + history, | |
| max_tokens=1024, | |
| temperature=0.2, | |
| stream=False, | |
| ) | |
| raw = (completion.choices[0].message.content or "").strip() | |
| history.append({"role": "assistant", "content": raw}) | |
| action_dict = extract_json(raw) | |
| except Exception as exc: | |
| error_msg = str(exc)[:80] | |
| action_dict = { | |
| "action_type": "submit", | |
| "verdict": "request_changes", | |
| "confidence": 0.3, | |
| } | |
| if action_dict.get("action_type") == "patch": | |
| patch_submitted = True | |
| # Validate action | |
| try: | |
| action = ReviewAction(**action_dict) | |
| except Exception as exc: | |
| error_msg = str(exc)[:80] | |
| action = ReviewAction( | |
| action_type="submit", | |
| verdict="request_changes", | |
| confidence=0.3, | |
| ) | |
| # Step environment | |
| obs_obj, reward_obj, done, info = env.step(action) | |
| obs = obs_obj.model_dump() | |
| reward = reward_obj.value | |
| rewards.append(reward) | |
| steps_taken = step | |
| action_summary = f"{action.action_type}:{(action.description or action.verdict or '')[:60]}" | |
| log_step(step=step, action=action_summary, reward=reward, done=done, error=error_msg) | |
| if done: | |
| score = info.get("final_score", 0.0) | |
| break | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| finally: | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| return {"task_id": task_id, "score": score, "steps": steps_taken, "success": success} | |
| # ββ Main ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| if not API_KEY: | |
| print("[ERROR] HF_TOKEN environment variable not set.", flush=True) | |
| sys.exit(1) | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| task_ids = os.getenv("TASK_IDS", ",".join(TASK_IDS)).split(",") | |
| task_ids = [t.strip() for t in task_ids if t.strip()] | |
| all_results = [] | |
| for task_id in task_ids: | |
| result = run_episode(client, task_id) | |
| all_results.append(result) | |
| # Aggregate summary to stderr so it doesn't pollute stdout log format | |
| print("\n[SUMMARY]", file=sys.stderr) | |
| for r in all_results: | |
| print(f" {r['task_id']}: score={r['score']:.3f} steps={r['steps']} success={r['success']}", file=sys.stderr) | |
| if all_results: | |
| avg = sum(r["score"] for r in all_results) / len(all_results) | |
| print(f" aggregate: {avg:.3f}", file=sys.stderr) | |
| if __name__ == "__main__": | |
| main() | |