deprec / environment.py
exploring-solver's picture
updated logic to use qwen with intended tasks
5510ae2
"""
Core DevOpsEnv environment logic.
Simulates a broken Linux server with:
- Task 1: Crashed Nginx service needing restart
- Task 2: Misconfigured Docker container
- Task 3: Memory leak in Python mock API
Manages episode lifecycle:
reset() → Observation
step(action) → StepResult
get_state() → State
grade() → (score, breakdown, feedback)
"""
from __future__ import annotations
import uuid
import json
import re
from typing import Any, Dict, Optional, Tuple, List
from data import TASK_META
from graders import grade_task
from models import (
Action,
Observation,
Reward,
State,
StepResult,
SystemState,
)
# In-memory store: episode_id → EpisodeState dict
_EPISODES: Dict[str, Dict[str, Any]] = {}
# ---------------------------------------------------------------------------
# Mock filesystem and system state
# ---------------------------------------------------------------------------
def _create_initial_state_task1() -> Dict[str, Any]:
"""Task 1: Nginx is crashed."""
return {
"running_processes": [
{"pid": 100, "name": "systemd"},
{"pid": 105, "name": "sshd"},
# nginx NOT running
],
"service_status": {
"nginx": "inactive",
"docker": "active",
"mockapi": "active",
},
"http_ports_open": [8080], # 80 is down
"docker_containers": [],
"logs": "2026-03-29 01:30:00 nginx crashed\nCore dump detected.\n",
"files": {
NGINX_CONFIG_PATH: """
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
server {
listen 80 default_server;
server_name _;
location / {
return 200 "OK\\n";
}
}
}""",
"/etc/systemd/system/nginx.service": """
[Unit]
Description=The NGINX HTTP and reverse proxy server
After=network.target
[Service]
Type=forking
PIDFile=/var/run/nginx.pid
ExecStartPre=/usr/sbin/nginx -t
ExecStart=/usr/sbin/nginx
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s QUIT $MAINPID
PrivateTmp=true
[Install]
WantedBy=multi-user.target""",
},
"cpu_usage": 45.2,
"memory_usage_mb": 256,
}
def _create_initial_state_task2() -> Dict[str, Any]:
"""Task 2: Docker misconfigured."""
return {
"running_processes": [
{"pid": 100, "name": "systemd"},
{"pid": 105, "name": "sshd"},
{"pid": 200, "name": "dockerd"},
],
"service_status": {
"nginx": "active",
"docker": "active",
"mockapi": "inactive",
},
"http_ports_open": [80],
"docker_containers": [
{"id": "abc123", "name": "mockapi-svc", "status": "running", "ports": "8000->3000/tcp"}
],
"logs": "docker: port 3000 already in use\n",
"files": {
"/srv/docker-compose.yml": """
version: '3.8'
services:
mockapi:
image: mockapi:latest
ports:
- "8000:3000"
environment:
- PORT=3000
volumes:
- ./app.py:/app/app.py""",
},
"cpu_usage": 62.0,
"memory_usage_mb": 1024,
}
def _create_initial_state_task3() -> Dict[str, Any]:
"""Task 3: Memory leak in mock API."""
return {
"running_processes": [
{"pid": 100, "name": "systemd"},
{"pid": 105, "name": "sshd"},
{"pid": 300, "name": "python3", "rss_mb": 2048, "user": "appuser"}, # MEMORY LEAK
],
"service_status": {
"nginx": "active",
"docker": "active",
"mockapi": "active",
},
"http_ports_open": [80, 5000],
"docker_containers": [],
"logs": (
"2026-03-29 01:45:00 mockapi started\n"
"2026-03-29 01:46:00 memory usage: 512 MB\n"
"2026-03-29 01:47:00 memory usage: 1024 MB\n"
"2026-03-29 01:48:00 memory usage: 1536 MB (WARNING: HIGH)\n"
"2026-03-29 01:49:00 memory usage: 2048 MB (CRITICAL)\n"
),
"files": {
"/opt/mockapi/app.py": """
import json
from flask import Flask
app = Flask(__name__)
# BUG: This list grows unbounded
request_cache = []
@app.route('/api/data', methods=['GET'])
def get_data():
data = {"timestamp": 123456, "value": 42}
request_cache.append(data) # MEMORY LEAK!
return json.dumps(data)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
""",
},
"cpu_usage": 85.5,
"memory_usage_mb": 2048,
}
NGINX_CONFIG_PATH = "/etc/nginx/nginx.conf"
DOCKER_COMPOSE_PATH = "/srv/docker-compose.yml"
MOCK_API_PATH = "/opt/mockapi/app.py"
def _build_system_state(task_id: str, ep_state: Dict[str, Any]) -> SystemState:
"""Build a SystemState object from episode state."""
state_dict = ep_state["system_state"]
return SystemState(
task_id=task_id,
available_commands=["systemctl", "nginx", "docker", "curl", "ps", "cat", "vim"],
filesystem_snapshot=json.dumps({
k: v for k, v in state_dict.get("files", {}).items()
}),
running_processes=state_dict.get("running_processes", []),
service_status=state_dict.get("service_status", {}),
logs=state_dict.get("logs", ""),
http_ports_open=state_dict.get("http_ports_open", []),
docker_containers=state_dict.get("docker_containers", []),
cpu_usage=state_dict.get("cpu_usage", 0.0),
memory_usage_mb=state_dict.get("memory_usage_mb", 0),
)
# ---------------------------------------------------------------------------
# Dynamic execution simulation
# ---------------------------------------------------------------------------
def _simulate_bash_cmd(cmd: str, task_id: str, ep_state: Dict[str, Any]) -> str:
"""Simulate bash command execution."""
state_dict = ep_state["system_state"]
lower_cmd = cmd.lower()
# Task 1: Nginx commands
if task_id == "task1":
if "systemctl restart nginx" in lower_cmd or "systemctl start nginx" in lower_cmd:
state_dict["service_status"]["nginx"] = "active"
state_dict["running_processes"].append({"pid": 999, "name": "nginx"})
state_dict["http_ports_open"] = [80]
return "Job for nginx.service started successfully."
elif "systemctl status nginx" in lower_cmd:
if state_dict["service_status"]["nginx"] == "active":
return "● nginx.service - NGINX HTTP Server\n Loaded: loaded (/etc/systemd/system/nginx.service)\n Active: active (running)"
return "● nginx.service - NGINX HTTP Server\n Active: inactive (dead)"
elif "nginx -t" in lower_cmd:
return "nginx: the configuration file /etc/nginx/nginx.conf syntax is ok\nnginx: configuration file /etc/nginx/nginx.conf test is successful"
elif "curl http://localhost:80" in lower_cmd or "curl http://localhost" in lower_cmd:
if 80 in state_dict["http_ports_open"]:
return "OK"
return "curl: (7) Failed to connect to localhost port 80: Connection refused"
# Task 2: Docker commands
elif task_id == "task2":
if "docker-compose up -d" in lower_cmd:
if DOCKER_COMPOSE_PATH in state_dict["files"]:
compose_content = state_dict["files"][DOCKER_COMPOSE_PATH]
# Check if port is now correct
if "3000:3000" in compose_content and "8000:3000" not in compose_content:
state_dict["docker_containers"] = [
{"id": "xyz789", "name": "mockapi-svc", "status": "running", "ports": "3000:3000/tcp"}
]
state_dict["service_status"]["mockapi"] = "active"
return "Creating mockapi ... done"
return "ERROR: docker-compose.yml not found or invalid"
elif "docker ps" in lower_cmd:
if state_dict["docker_containers"]:
return "\n".join([f"{c['id']} {c['name']} {c['status']}" for c in state_dict["docker_containers"]])
return "No containers running"
# Task 3: Process/memory commands
elif task_id == "task3":
if "ps aux" in lower_cmd or "ps aux grep python" in lower_cmd:
output = ""
for proc in state_dict["running_processes"]:
if proc.get("name") == "python3":
output += f"appuser {proc['pid']} 85.5 {proc.get('rss_mb', 512)} python3 /opt/mockapi/app.py\n"
return output if output else "No python processes found"
elif "kill" in lower_cmd:
if "300" in lower_cmd or "python" in lower_cmd:
state_dict["running_processes"] = [p for p in state_dict["running_processes"] if p.get("name") != "python3"]
state_dict["service_status"]["mockapi"] = "inactive"
state_dict["memory_usage_mb"] = 1100
return "Process killed"
return "Process not found"
elif "python3 /opt/mockapi/app.py &" in lower_cmd or "python3 /opt/mockapi/app.py" in lower_cmd:
app_content = state_dict.get("files", {}).get(MOCK_API_PATH, "")
leak_fixed = "request_cache.append" not in app_content
rss_mb = 256 if leak_fixed else 1700
state_dict["running_processes"].append({"pid": 301, "name": "python3", "rss_mb": rss_mb, "user": "appuser"})
state_dict["service_status"]["mockapi"] = "active"
state_dict["http_ports_open"] = [80, 5000]
state_dict["memory_usage_mb"] = 700 if leak_fixed else 1800
return "Application started"
return f"Command '{cmd}' executed (simulated)"
def _simulate_file_edit(file_path: str, new_content: str, ep_state: Dict[str, Any]) -> str:
"""Simulate file editing."""
state_dict = ep_state["system_state"]
if file_path not in state_dict.get("files", {}):
return f"ERROR: File {file_path} not found"
# Detect task 2: Check docker-compose.yml fix
if file_path == DOCKER_COMPOSE_PATH and "3000:3000" in new_content:
state_dict["files"][file_path] = new_content
return f"File {file_path} updated successfully"
# Detect task 3: Check mock API fix
elif file_path == MOCK_API_PATH and "request_cache = []" not in new_content:
# Verify fix removes the memory leak
state_dict["files"][file_path] = new_content
return f"File {file_path} patched successfully"
state_dict["files"][file_path] = new_content
return f"File {file_path} edited"
# ---------------------------------------------------------------------------
# Reward calculation
# ---------------------------------------------------------------------------
def _calculate_step_reward(task_id: str, action: Action, ep_state: Dict[str, Any]) -> Tuple[float, str]:
"""Calculate reward based on action and task."""
base_step_cost = -0.02
reward = base_step_cost
explanation = "Step taken"
history = ep_state.get("action_history", [])
if len(history) >= 2:
prev = history[-2]
curr = history[-1]
if (
prev.get("action_type") == curr.get("action_type")
and prev.get("command") == curr.get("command")
and prev.get("file_path") == curr.get("file_path")
):
reward -= 0.05
explanation = "Repeated identical action penalty"
if action.action_type == "bash_cmd":
cmd = action.command or ""
reward += 0.05
explanation = f"Executed: {cmd[:50]}"
if task_id == "task1" and "nginx -t" in cmd.lower():
reward += 0.05
explanation += " | validated nginx config"
if task_id == "task1" and "curl" in cmd.lower():
last_output = str(ep_state["action_history"][-1].get("output", ""))
if "OK" in last_output:
reward += 0.08
explanation += " | verified HTTP health"
if task_id == "task2" and "docker-compose up -d" in cmd.lower():
output = str(ep_state["action_history"][-1].get("output", "")).lower()
if "done" in output or "creating" in output:
reward += 0.1
explanation += " | compose bring-up success"
if task_id == "task3" and "kill" in cmd.lower():
reward += 0.07
explanation += " | terminated leaky process"
if task_id == "task3" and "python3 /opt/mockapi/app.py" in cmd.lower():
mem = ep_state["system_state"].get("memory_usage_mb", 2048)
if mem < 1024:
reward += 0.12
explanation += " | restarted with lower memory"
return reward, explanation
elif action.action_type == "file_edit":
reward += 0.03
explanation = f"Edited: {action.file_path}"
result = str(ep_state["action_history"][-1].get("result", ""))
if "ERROR" in result:
reward -= 0.12
explanation += " | invalid edit target"
elif task_id == "task2" and action.file_path == DOCKER_COMPOSE_PATH:
content = action.file_content or ""
if "3000:3000" in content and "8000:3000" not in content:
reward += 0.12
explanation += " | corrected port mapping"
elif task_id == "task3" and action.file_path == MOCK_API_PATH:
content = action.file_content or ""
if "request_cache.append" not in content:
reward += 0.12
explanation += " | removed leak pattern"
return reward, explanation
elif action.action_type == "submit":
reward += 0.1
explanation = "Episode submitted for grading"
return reward, explanation
return reward, "Step taken"
# ---------------------------------------------------------------------------
# Core API functions
# ---------------------------------------------------------------------------
def reset(task_id: str) -> Observation:
"""Create a new episode for the given task."""
if task_id not in TASK_META:
raise ValueError(f"Unknown task_id {task_id!r}. Valid: {list(TASK_META)}")
meta = TASK_META[task_id]
# Initialize system state based on task
if task_id == "task1":
initial_sys_state = _create_initial_state_task1()
elif task_id == "task2":
initial_sys_state = _create_initial_state_task2()
elif task_id == "task3":
initial_sys_state = _create_initial_state_task3()
else:
initial_sys_state = {}
episode_id = str(uuid.uuid4())
_EPISODES[episode_id] = {
"task_id": task_id,
"step_number": 0,
"max_steps": meta["max_steps"],
"done": False,
"total_reward": 0.0,
"action_history": [],
"final_score": None,
"system_state": initial_sys_state,
}
system_state = _build_system_state(task_id, _EPISODES[episode_id])
return Observation(
task_id=task_id,
task_description=meta["description"],
episode_id=episode_id,
system_state=system_state,
thread_history=[],
available_actions=meta["available_actions"],
step_number=0,
max_steps=meta["max_steps"],
hint="Start by diagnosing the system state with basic commands.",
)
def step(episode_id: str, action: Action) -> StepResult:
"""Advance the episode by one step."""
ep = _EPISODES.get(episode_id)
if ep is None:
raise KeyError(f"Episode {episode_id} not found")
if ep["done"]:
raise ValueError(f"Episode {episode_id} is already done.")
task_id = ep["task_id"]
meta = TASK_META[task_id]
ep["step_number"] += 1
ep["action_history"].append(action.model_dump())
# Execute action
if action.action_type == "bash_cmd":
cmd_output = _simulate_bash_cmd(action.command or "", task_id, ep)
ep["action_history"][-1]["output"] = cmd_output
elif action.action_type == "file_edit":
edit_result = _simulate_file_edit(action.file_path or "", action.file_content or "", ep)
ep["action_history"][-1]["result"] = edit_result
# Determine if done
done = False
if action.action_type == "submit":
done = True
elif ep["step_number"] >= ep["max_steps"]:
done = True
# Calculate reward
step_reward, explanation = _calculate_step_reward(task_id, action, ep)
# Apply grader bonus when done
if done:
final_score, breakdown, grader_feedback = grade_task(task_id, ep)
ep["final_score"] = final_score
bonus = final_score * 0.5
step_reward += bonus
explanation += f" | Grader score: {final_score:.3f} (+{bonus:.3f} bonus)"
else:
final_score = None
ep["total_reward"] = round(ep["total_reward"] + step_reward, 4)
ep["done"] = done
# Build observation
system_state = _build_system_state(task_id, ep)
thread_history = [
{"role": "agent", "content": str(a)} for a in ep["action_history"]
]
obs = Observation(
task_id=task_id,
task_description=meta["description"],
episode_id=episode_id,
system_state=system_state,
thread_history=thread_history,
available_actions=meta["available_actions"] if not done else [],
step_number=ep["step_number"],
max_steps=ep["max_steps"],
hint=None if done else "Continue diagnosing and fixing the issue.",
)
reward = Reward(
step_reward=round(step_reward, 4),
total_reward=ep["total_reward"],
explanation=explanation,
)
info = {"step": ep["step_number"]}
if done:
info["final_score"] = final_score
return StepResult(observation=obs, reward=reward, done=done, info=info)
def get_state(episode_id: str) -> State:
"""Return the current state of an episode."""
ep = _EPISODES.get(episode_id)
if ep is None:
raise KeyError(f"Episode {episode_id} not found")
return State(
task_id=ep["task_id"],
episode_id=episode_id,
step_number=ep["step_number"],
max_steps=ep["max_steps"],
done=ep["done"],
total_reward=ep["total_reward"],
history=ep["action_history"],
final_score=ep.get("final_score"),
)
def grade(episode_id: str) -> Tuple[float, Dict[str, float], str]:
"""Grade a finished episode."""
ep = _EPISODES.get(episode_id)
if ep is None:
raise KeyError(f"Episode {episode_id} not found")
if not ep.get("done"):
raise ValueError(f"Episode {episode_id} is not done yet")
task_id = ep["task_id"]
score, breakdown, feedback = grade_task(task_id, ep)
ep["final_score"] = score
return score, breakdown, feedback