| """ |
| AgentDebuggerEnv β Sandboxed Code Execution (Gold Standard) |
| ============================================================ |
| Isolated execution environment for user-submitted code. |
| Implements multi-layered security: |
| 1. AST-based static analysis (blocks dangerous builtins & dunders) |
| 3. Subprocess isolation with strict timeouts |
| 4. Resource limits (memory/CPU) |
| """ |
|
|
| import subprocess |
| import tempfile |
| import os |
| import time |
| import ast |
| from typing import Tuple |
|
|
| BLOCKED_IMPORTS = [ |
| "os", "sys", "subprocess", "socket", "importlib", "shutil", |
| "pathlib", "glob", "pickle", "shelve", "dbm", "sqlite3", |
| "ftplib", "http", "urllib", "requests", "httpx", "asyncio", |
| "multiprocessing", "threading", |
| "ctypes", "cffi", "resource", "signal", "mmap", "gc" |
| ] |
|
|
| DANGEROUS_BUILTINS = [ |
| "eval", "exec", "compile", "getattr", "setattr", "delattr", |
| "input", "breakpoint", "help", "open" |
| ] |
|
|
| EXECUTION_TIMEOUT_SECONDS = 10 |
| MEMORY_LIMIT_MB = 256 |
|
|
|
|
| def _build_security_prelude(blocked_imports: list[str]) -> str: |
| """Build a Python script snippet that hardens the environment before user code runs.""" |
| blocked_repr = repr(blocked_imports) |
| builtins_repr = repr(DANGEROUS_BUILTINS) |
| |
| return f''' |
| import ast as _ast |
| import sys as _sys |
| import builtins as _builtins |
| |
| # ββ 1. Resource Limits ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| try: |
| import resource as _resource |
| # Limit memory usage (Address Space) to 256MB |
| _mem_limit = {MEMORY_LIMIT_MB} * 1024 * 1024 |
| _resource.setrlimit(_resource.RLIMIT_AS, (_mem_limit, _mem_limit)) |
| except Exception: |
| pass |
| |
| # ββ 2. AST Static Analysis βββββββββββββββββββββββββββββββββββββββββββββββββββ |
| _BLOCKED_IMPORTS = {blocked_repr} |
| _DANGEROUS_BUILTINS = {builtins_repr} |
| |
| # We use _builtins.open because it might be nullified later in the user's scope |
| try: |
| _source_to_check = _builtins.open(__file__).read() |
| # Find the marker line and only check code after it |
| _marker = "# --- USER CODE START ---" |
| _marker_pos = _source_to_check.find(_marker) |
| if _marker_pos != -1: |
| _source_to_check = _source_to_check[_marker_pos + len(_marker):] |
| |
| _tree = _ast.parse(_source_to_check) |
| for _node in _ast.walk(_tree): |
| # Block dangerous imports |
| if isinstance(_node, (_ast.Import, _ast.ImportFrom)): |
| _names = [] |
| if isinstance(_node, _ast.Import): |
| _names = [a.name.split('.')[0] for a in _node.names] |
| else: |
| if _node.module: |
| _names = [_node.module.split('.')[0]] |
| |
| for _name in _names: |
| if _name in _BLOCKED_IMPORTS: |
| print(f"BLOCKED IMPORT: '{{_name}}' is not allowed in the sandbox.") |
| _sys.exit(1) |
| |
| # Block dangerous builtins (static names) |
| if isinstance(_node, _ast.Name) and _node.id in _DANGEROUS_BUILTINS: |
| print(f"SECURITY ERROR: Use of '{{_node.id}}' is prohibited.") |
| _sys.exit(1) |
| |
| # Block Dunder attribute access and leading underscores (reflection) |
| if isinstance(_node, _ast.Attribute): |
| if _node.attr.startswith('_'): |
| print(f"SECURITY ERROR: Access to internal attribute '{{_node.attr}}' is prohibited.") |
| _sys.exit(1) |
| except SyntaxError: |
| pass # Let the actual execution catch syntax errors |
| except Exception as e: |
| # Any other error during check is a sandbox failure |
| # print(f"SANDBOX INTERNALS ERROR: {{str(e)}}") |
| pass |
| |
| # ββ 3. Runtime Protection ββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| # Block __import__ to catch dynamic imports at runtime |
| _orig_import = _builtins.__import__ |
| def _restricted_import(name, *args, _orig_import=_orig_import, _blocked=_BLOCKED_IMPORTS, **kwargs): |
| _top = name.split(".")[0] |
| if _top in _blocked: |
| raise ImportError(f"BLOCKED IMPORT: '{{name}}' is not allowed in the sandbox.") |
| return _orig_import(name, *args, **kwargs) |
| _builtins.__import__ = _restricted_import |
| |
| # Nullify dangerous builtins |
| for _b in _DANGEROUS_BUILTINS: |
| if _b not in ('setattr', 'getattr', 'delattr'): |
| _builtins.__dict__[_b] = None |
| |
| # Clean up namespace gracefully |
| for _v in ["_ast", "_sys", "_builtins", "_source_to_check", "_tree", "_node", "_marker", "_marker_pos", "_b", "_orig_import", "_restricted_import"]: |
| if _v in locals(): |
| del locals()[_v] |
| ''' |
|
|
|
|
| def execute_code(code: str, test_code: str, allow_threading: bool = False) -> Tuple[str, bool, int]: |
| """ |
| Execute code + test_code in a sandboxed subprocess. |
| |
| Returns: |
| (output: str, timed_out: bool, execution_time_ms: int) |
| """ |
| |
| blocked = [b for b in BLOCKED_IMPORTS if not (b == "threading" and allow_threading)] |
|
|
| |
| prelude = _build_security_prelude(blocked) |
| full_script = prelude + "\n# --- USER CODE START ---\n" + code + "\n" + test_code |
|
|
| tmp_path = None |
| try: |
| |
| with tempfile.NamedTemporaryFile( |
| mode='w', suffix='.py', prefix='sandbox_', |
| delete=False, dir=tempfile.gettempdir() |
| ) as tmp: |
| tmp.write(full_script) |
| tmp_path = tmp.name |
|
|
| |
| start_time = time.time() |
| try: |
| result = subprocess.run( |
| ["python3", tmp_path], |
| capture_output=True, |
| text=True, |
| timeout=EXECUTION_TIMEOUT_SECONDS, |
| env={ |
| "PATH": os.environ.get("PATH", "/usr/bin:/usr/local/bin"), |
| "HOME": os.environ.get("HOME", "/tmp"), |
| "PYTHONDONTWRITEBYTECODE": "1", |
| } |
| ) |
| elapsed_ms = int((time.time() - start_time) * 1000) |
| output = result.stdout + result.stderr |
| return (output.strip(), False, elapsed_ms) |
|
|
| except subprocess.TimeoutExpired: |
| elapsed_ms = int((time.time() - start_time) * 1000) |
| return ( |
| f"TIMEOUT: Code execution exceeded {EXECUTION_TIMEOUT_SECONDS} second limit.", |
| True, |
| elapsed_ms |
| ) |
|
|
| except Exception as e: |
| return (f"SANDBOX ERROR: {str(e)}", False, 0) |
|
|
| finally: |
| |
| if tmp_path and os.path.exists(tmp_path): |
| try: |
| os.unlink(tmp_path) |
| except OSError: |
| pass |
|
|