import argparse import json import os from typing import Any, Dict, List, Optional import requests from openai import OpenAI # LLM config (required by hackathon instructions) API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN", "") # Environment endpoint for OpenEnv server OPENENV_BASE_URL = os.getenv("OPENENV_BASE_URL", "http://localhost:7860").rstrip("/") BENCHMARK = "devopsenv" MAX_STEPS_CAP = 20 def _to_bool_str(value: bool) -> str: return str(bool(value)).lower() def _safe_action_text(action: Dict[str, Any]) -> str: text = json.dumps(action, separators=(",", ":"), ensure_ascii=True) return text.replace("\n", " ").replace("\r", " ") def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" print( f"[STEP] step={step} action={action} reward={reward:.2f} done={_to_bool_str(done)} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, rewards: List[float]) -> None: rewards_str = ",".join(f"{value:.2f}" for value in rewards) print(f"[END] success={_to_bool_str(success)} steps={steps} rewards={rewards_str}", flush=True) def _llm_side_call(client: OpenAI, task_id: str, step_num: int, state_payload: Dict[str, Any]) -> None: """Lightweight LLM call for baseline parity using OpenAI client without changing deterministic actions.""" if not HF_TOKEN: return brief_state = { "task_id": task_id, "step_number": state_payload.get("step_number"), "done": state_payload.get("done"), "service_status": state_payload.get("system_state", {}).get("service_status", {}), } try: client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "You are a concise DevOps assistant."}, { "role": "user", "content": ( f"Task={task_id}, step={step_num}. " f"State={json.dumps(brief_state, ensure_ascii=True)}. " "Reply with one short sentence describing next best move." ), }, ], temperature=0.0, max_tokens=24, stream=False, ) except Exception: # Do not fail the submission run if the optional LLM call fails. return def _task_plan(task_id: str) -> List[Dict[str, Any]]: if task_id == "task1": return [ {"action_type": "bash_cmd", "command": "systemctl status nginx"}, {"action_type": "bash_cmd", "command": "nginx -t"}, {"action_type": "bash_cmd", "command": "systemctl restart nginx"}, {"action_type": "bash_cmd", "command": "curl http://localhost"}, {"action_type": "submit", "summary": "Nginx restored and verified"}, ] if task_id == "task2": return [ {"action_type": "bash_cmd", "command": "cat /srv/docker-compose.yml"}, { "action_type": "file_edit", "file_path": "/srv/docker-compose.yml", "file_content": ( "version: '3.8'\n" "services:\n" " mockapi:\n" " image: mockapi:latest\n" " ports:\n" " - \"3000:3000\"\n" " environment:\n" " - PORT=3000\n" " volumes:\n" " - ./app.py:/app/app.py" ), }, {"action_type": "bash_cmd", "command": "docker-compose up -d"}, {"action_type": "bash_cmd", "command": "docker ps"}, {"action_type": "submit", "summary": "Docker compose mapping fixed"}, ] return [ {"action_type": "bash_cmd", "command": "ps aux | grep python"}, {"action_type": "bash_cmd", "command": "kill 300"}, { "action_type": "file_edit", "file_path": "/opt/mockapi/app.py", "file_content": ( "import json\n" "from flask import Flask\n\n" "app = Flask(__name__)\n\n" "@app.route('/api/data', methods=['GET'])\n" "def get_data():\n" " data = {'timestamp': 123456, 'value': 42}\n" " return json.dumps(data)\n\n" "if __name__ == '__main__':\n" " app.run(host='0.0.0.0', port=5000)\n" ), }, {"action_type": "bash_cmd", "command": "python3 /opt/mockapi/app.py &"}, {"action_type": "submit", "summary": "Memory leak patched and service restarted"}, ] def run_task(client: OpenAI, task_id: str) -> float: rewards: List[float] = [] steps_taken = 0 success = False episode_id = "" log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) try: reset_response = requests.post( f"{OPENENV_BASE_URL}/reset", json={"task_id": task_id}, timeout=20 ) reset_response.raise_for_status() reset_payload = reset_response.json() episode_id = reset_payload["episode_id"] for step_num, action in enumerate(_task_plan(task_id), start=1): if step_num > MAX_STEPS_CAP: break state_response = requests.get( f"{OPENENV_BASE_URL}/state", params={"episode_id": episode_id}, timeout=20 ) state_response.raise_for_status() state_payload = state_response.json() _llm_side_call(client, task_id, step_num, state_payload) step_error = None try: step_response = requests.post( f"{OPENENV_BASE_URL}/step", json={"episode_id": episode_id, "action": action}, timeout=30, ) step_response.raise_for_status() step_payload = step_response.json() except Exception as exc: step_error = str(exc).replace("\n", " ").replace("\r", " ") log_step( step=step_num, action=_safe_action_text(action), reward=0.0, done=True, error=step_error, ) steps_taken = step_num break reward = float(step_payload.get("reward", {}).get("step_reward", 0.0)) done = bool(step_payload.get("done", False)) rewards.append(reward) steps_taken = step_num log_step( step=step_num, action=_safe_action_text(action), reward=reward, done=done, error=step_error, ) if done: break if episode_id: grade_response = requests.post( f"{OPENENV_BASE_URL}/grader", json={"episode_id": episode_id}, timeout=20 ) grade_response.raise_for_status() score = float(grade_response.json().get("score", 0.0)) success = score > 0.0 return score return 0.0 except Exception: success = False return 0.0 finally: log_end(success=success, steps=steps_taken, rewards=rewards) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--task", choices=["task1", "task2", "task3", "all"], default="all") args = parser.parse_args() client = OpenAI(api_key=HF_TOKEN, base_url=API_BASE_URL) tasks = [args.task] if args.task != "all" else ["task1", "task2", "task3"] for task_id in tasks: run_task(client, task_id) if __name__ == "__main__": main()