Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-colab-handoff /bundle /model /sandbox_tool_core.py
| """Policy-gated Sandbox Tool Core for TinyMind agents. | |
| The core exposes a small, auditable tool registry over SandboxRLRuntime. It is | |
| designed for training/eval evidence: every call has a stable request id, input | |
| hash, output hash, policy snapshot, and pass/fail status. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import asdict, dataclass | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import time | |
| from typing import Any, Callable | |
| from model.sandbox_rl import ALLOWED_CMD, SandboxRLRuntime | |
| from model.sandbox_proxy import LocalNetworkProxy, SandboxProxyPolicy, snapshot_workspace | |
| from model.sandbox_environment import SandboxEnvironmentManager, SandboxEnvironmentPolicy | |
| from train.adapter_compactor import choose_lora_rank_plan, compact_lora_adapter | |
| def _canonical_json(payload: Any) -> str: | |
| return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) | |
| def _sha256_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| class SandboxToolPolicy: | |
| allow_cmd: bool = True | |
| allow_write: bool = True | |
| allow_project_create: bool = True | |
| allow_lua: bool = True | |
| allow_read: bool = True | |
| max_write_bytes: int = 1_000_000 | |
| max_files_per_project: int = 128 | |
| cmd_timeout_s: float = 5.0 | |
| command_allowlist: tuple[str, ...] = tuple(sorted(ALLOWED_CMD)) | |
| allow_network_proxy: bool = True | |
| allow_public_internet: bool = False | |
| allow_model_weight_ops: bool = True | |
| network_timeout_s: float = 3.0 | |
| max_proxy_response_bytes: int = 256_000 | |
| class SandboxToolSpec: | |
| name: str | |
| description: str | |
| required_args: tuple[str, ...] | |
| mutates_filesystem: bool = False | |
| executes_process: bool = False | |
| def to_dict(self) -> dict: | |
| return asdict(self) | |
| class SandboxToolCore: | |
| """Single entry point for safe TinyMind tool calls.""" | |
| def __init__( | |
| self, | |
| root: str | Path, | |
| policy: SandboxToolPolicy | None = None, | |
| ledger_name: str = "sandbox_tool_core_ledger.jsonl", | |
| ): | |
| self.root = Path(root).resolve() | |
| self.root.mkdir(parents=True, exist_ok=True) | |
| self.policy = policy or SandboxToolPolicy() | |
| self.runtime = SandboxRLRuntime(self.root) | |
| self.ledger_path = self.root / ledger_name | |
| self.proxy = LocalNetworkProxy( | |
| self.root, | |
| policy=SandboxProxyPolicy( | |
| allow_public_internet=self.policy.allow_public_internet, | |
| timeout_s=self.policy.network_timeout_s, | |
| max_response_bytes=self.policy.max_proxy_response_bytes, | |
| ), | |
| ) | |
| self.env_manager = SandboxEnvironmentManager( | |
| self.root / "isolated_envs", | |
| policy=SandboxEnvironmentPolicy( | |
| command_allowlist=self.policy.command_allowlist, | |
| timeout_s=self.policy.cmd_timeout_s, | |
| max_runtime_duration_s=self.policy.cmd_timeout_s, | |
| ), | |
| ) | |
| self._handlers: dict[str, Callable[[dict], dict]] = { | |
| "lua.eval": self._lua_eval, | |
| "sandbox.run_code": self._run_code, | |
| "sandbox.proxy": self._proxy_request, | |
| "sandbox.env.create": self._env_create, | |
| "sandbox.env.run": self._env_run, | |
| "sandbox.env.run_detached": self._env_run_detached, | |
| "sandbox.env.job_status": self._env_job_status, | |
| "sandbox.env.job_stream": self._env_job_stream, | |
| "sandbox.env.copy_in": self._env_copy_in, | |
| "sandbox.env.copy_out": self._env_copy_out, | |
| "sandbox.env.file_put": self._env_file_put, | |
| "sandbox.env.file_get": self._env_file_get, | |
| "sandbox.env.snapshot": self._env_snapshot, | |
| "sandbox.env.restore": self._env_restore, | |
| "sandbox.env.fork": self._env_fork, | |
| "sandbox.env.stop": self._env_stop, | |
| "sandbox.env.dashboard": self._env_dashboard, | |
| "sandbox.env.resources": self._env_resources, | |
| "fs.write": self._fs_write, | |
| "fs.read": self._fs_read, | |
| "project.create": self._project_create, | |
| "cmd.run": self._cmd_run, | |
| "core.manifest": self._manifest, | |
| } | |
| self._specs = { | |
| "lua.eval": SandboxToolSpec("lua.eval", "Evaluate the safe Lua arithmetic subset.", ("code",)), | |
| "sandbox.run_code": SandboxToolSpec( | |
| "sandbox.run_code", | |
| "Run safe Lua with sandbox_http_get/post helpers routed through the local proxy.", | |
| ("code",), | |
| ), | |
| "sandbox.proxy": SandboxToolSpec( | |
| "sandbox.proxy", | |
| "Issue a policy-gated HTTP request to localhost/private LAN through the sandbox proxy.", | |
| ("method", "url"), | |
| ), | |
| "sandbox.env.create": SandboxToolSpec( | |
| "sandbox.env.create", | |
| "Create an isolated filesystem sandbox environment.", | |
| ("name",), | |
| True, | |
| ), | |
| "sandbox.env.run": SandboxToolSpec( | |
| "sandbox.env.run", | |
| "Run an allowlisted command inside an isolated sandbox environment.", | |
| ("name", "argv"), | |
| True, | |
| True, | |
| ), | |
| "sandbox.env.run_detached": SandboxToolSpec( | |
| "sandbox.env.run_detached", | |
| "Start an allowlisted command and stream stdout/stderr logs later.", | |
| ("name", "argv"), | |
| True, | |
| True, | |
| ), | |
| "sandbox.env.job_status": SandboxToolSpec( | |
| "sandbox.env.job_status", | |
| "Return status for a detached sandbox command.", | |
| ("job_id",), | |
| ), | |
| "sandbox.env.job_stream": SandboxToolSpec( | |
| "sandbox.env.job_stream", | |
| "Read a chunk from a detached command stdout/stderr stream.", | |
| ("job_id",), | |
| ), | |
| "sandbox.env.copy_in": SandboxToolSpec( | |
| "sandbox.env.copy_in", | |
| "Copy a file or directory from host into an isolated sandbox environment.", | |
| ("name", "src", "dest"), | |
| True, | |
| ), | |
| "sandbox.env.copy_out": SandboxToolSpec( | |
| "sandbox.env.copy_out", | |
| "Copy a file or directory from an isolated sandbox environment to host.", | |
| ("name", "src", "dest"), | |
| ), | |
| "sandbox.env.file_put": SandboxToolSpec( | |
| "sandbox.env.file_put", | |
| "Upload text/base64 content directly into a sandbox file.", | |
| ("name", "path", "content"), | |
| True, | |
| ), | |
| "sandbox.env.file_get": SandboxToolSpec( | |
| "sandbox.env.file_get", | |
| "Download text/base64 content directly from a sandbox file.", | |
| ("name", "path"), | |
| ), | |
| "sandbox.env.snapshot": SandboxToolSpec( | |
| "sandbox.env.snapshot", | |
| "Save an isolated sandbox environment snapshot for reuse.", | |
| ("name", "snapshot"), | |
| True, | |
| ), | |
| "sandbox.env.restore": SandboxToolSpec( | |
| "sandbox.env.restore", | |
| "Restore an isolated sandbox environment from a saved snapshot.", | |
| ("name", "snapshot"), | |
| True, | |
| ), | |
| "sandbox.env.fork": SandboxToolSpec( | |
| "sandbox.env.fork", | |
| "Create a child sandbox from the latest snapshot of another sandbox.", | |
| ("source", "child"), | |
| True, | |
| ), | |
| "sandbox.env.stop": SandboxToolSpec( | |
| "sandbox.env.stop", | |
| "Stop a sandbox and optionally write an automatic snapshot.", | |
| ("name",), | |
| True, | |
| ), | |
| "sandbox.env.dashboard": SandboxToolSpec( | |
| "sandbox.env.dashboard", | |
| "Return observability dashboard data for sandboxes, commands, snapshots, and usage.", | |
| (), | |
| ), | |
| "sandbox.env.resources": SandboxToolSpec( | |
| "sandbox.env.resources", | |
| "Return sandbox resource accounting and limits.", | |
| (), | |
| ), | |
| "fs.write": SandboxToolSpec("fs.write", "Write UTF-8 text inside the sandbox root.", ("path", "content"), True), | |
| "fs.read": SandboxToolSpec("fs.read", "Read UTF-8 text from inside the sandbox root.", ("path",)), | |
| "project.create": SandboxToolSpec( | |
| "project.create", | |
| "Create a project directory from a mapping of relative files.", | |
| ("name", "files"), | |
| True, | |
| ), | |
| "cmd.run": SandboxToolSpec( | |
| "cmd.run", | |
| "Run an allowlisted command without shell expansion inside the sandbox root.", | |
| ("argv",), | |
| False, | |
| True, | |
| ), | |
| "core.manifest": SandboxToolSpec("core.manifest", "Return tool specs and active policy.", ()), | |
| } | |
| def manifest(self) -> dict: | |
| return { | |
| "schema_version": "tinymind-sandbox-tool-core-v1", | |
| "root": str(self.root), | |
| "policy": asdict(self.policy), | |
| "tools": {name: spec.to_dict() for name, spec in sorted(self._specs.items())}, | |
| "guarantees": { | |
| "path_containment": True, | |
| "shell_expansion": False, | |
| "audit_hashes": True, | |
| "workspace_snapshot_before_run_code": True, | |
| "network_default": "not_exposed", | |
| "proxy_allowed_destinations": "localhost, loopback, RFC1918 private LAN, .local", | |
| "public_internet_default_allowed": self.policy.allow_public_internet, | |
| "isolated_env_snapshots": True, | |
| "resource_accounting": True, | |
| "detached_command_streaming": True, | |
| "file_api": True, | |
| "fork_from_latest_snapshot": True, | |
| "stop_auto_snapshot": True, | |
| "lua_evo_weight_ops": self.policy.allow_model_weight_ops, | |
| "raw_gguf_patch_without_parser": False, | |
| }, | |
| } | |
| def call(self, tool: str, args: dict | None = None) -> dict: | |
| args = args or {} | |
| started = time.time() | |
| request_payload = {"tool": tool, "args": args} | |
| input_hash = _sha256_text(_canonical_json(request_payload)) | |
| request_id = input_hash[:16] | |
| if tool not in self._handlers: | |
| result = {"ok": False, "action": tool, "error": "unknown_tool"} | |
| else: | |
| missing = [name for name in self._specs[tool].required_args if name not in args] | |
| if missing: | |
| result = {"ok": False, "action": tool, "error": "missing_args", "missing": missing} | |
| else: | |
| try: | |
| result = self._handlers[tool](args) | |
| except Exception as exc: # defensive boundary for agent-facing calls | |
| result = {"ok": False, "action": tool, "error": f"tool_exception:{type(exc).__name__}"} | |
| elapsed_ms = (time.time() - started) * 1000.0 | |
| output_hash = _sha256_text(_canonical_json(result)) | |
| record = { | |
| "request_id": request_id, | |
| "timestamp": time.time(), | |
| "tool": tool, | |
| "input_sha256": input_hash, | |
| "output_sha256": output_hash, | |
| "elapsed_ms": elapsed_ms, | |
| "policy": asdict(self.policy), | |
| "result": result, | |
| "sandboxed": True, | |
| } | |
| with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f: | |
| f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") | |
| response = dict(result) | |
| response.update( | |
| { | |
| "request_id": request_id, | |
| "input_sha256": input_hash, | |
| "output_sha256": output_hash, | |
| "elapsed_ms": elapsed_ms, | |
| "sandboxed": True, | |
| } | |
| ) | |
| return response | |
| def _lua_eval(self, args: dict) -> dict: | |
| if not self.policy.allow_lua: | |
| return {"ok": False, "action": "lua.eval", "error": "lua_disabled"} | |
| result = self.runtime.run_lua(str(args["code"])) | |
| result["action"] = "lua.eval" | |
| return result | |
| def _run_code(self, args: dict) -> dict: | |
| if not self.policy.allow_lua: | |
| return {"ok": False, "action": "sandbox.run_code", "error": "lua_disabled"} | |
| before = snapshot_workspace(self.root, max_files=512) | |
| def http_handler(method: str, url: str, body: str | None) -> dict: | |
| if not self.policy.allow_network_proxy: | |
| return {"ok": False, "method": method, "url": url, "error": "sandbox_proxy_disabled"} | |
| return self.proxy.request(method, url, body) | |
| def evo_handler(method: str, values: tuple[Any, ...]) -> Any: | |
| if not self.policy.allow_model_weight_ops: | |
| return {"ok": False, "error": "model_weight_ops_disabled"} | |
| if method == "compact_lora": | |
| adapter, out, target_rank = values | |
| report = compact_lora_adapter(adapter, out, int(target_rank)) | |
| return { | |
| "ok": True, | |
| "method": "compact_lora", | |
| "manifest_path": report["manifest_path"], | |
| "output_adapter": report["output_adapter"], | |
| "target_rank": report["target_rank"], | |
| "size": report["size"], | |
| "claim_gate": report["claim_gate"], | |
| } | |
| if method == "choose_lora_rank": | |
| adapter, min_energy_retained = values | |
| plan = choose_lora_rank_plan(adapter, float(min_energy_retained)) | |
| plan["ok"] = True | |
| return plan | |
| if method == "compact_lora_auto": | |
| adapter, out, min_energy_retained = values | |
| plan = choose_lora_rank_plan(adapter, float(min_energy_retained)) | |
| report = compact_lora_adapter( | |
| adapter, | |
| out, | |
| int(plan["recommended_rank"]), | |
| adaptive_rank_plan=plan, | |
| ) | |
| return { | |
| "ok": True, | |
| "method": "compact_lora_auto", | |
| "adaptive_rank": True, | |
| "rank_plan": plan, | |
| "manifest_path": report["manifest_path"], | |
| "output_adapter": report["output_adapter"], | |
| "target_rank": report["target_rank"], | |
| "global_energy_retained_ratio": report["global_energy_retained_ratio"], | |
| "size": report["size"], | |
| "claim_gate": report["claim_gate"], | |
| } | |
| if method == "patch_gguf": | |
| gguf = values[0] | |
| return { | |
| "ok": False, | |
| "method": "patch_gguf", | |
| "gguf": str(gguf), | |
| "error": "raw_gguf_patch_blocked_parser_required", | |
| "safe_path": "Use trained adapter export or a GGUF tensor parser with checksum validation before writing GGUF weights.", | |
| } | |
| return {"ok": False, "error": "unknown_evo_weight_method"} | |
| result = self.runtime.run_lua(str(args["code"]), http_handler=http_handler, evo_handler=evo_handler) | |
| result["action"] = "sandbox.run_code" | |
| result["workspace_snapshot_before"] = before | |
| result["proxy_ledger_path"] = str(self.proxy.ledger_path) | |
| return result | |
| def _proxy_request(self, args: dict) -> dict: | |
| if not self.policy.allow_network_proxy: | |
| return {"ok": False, "action": "sandbox.proxy", "error": "sandbox_proxy_disabled"} | |
| method = str(args["method"]).upper() | |
| if method not in {"GET", "POST"}: | |
| return {"ok": False, "action": "sandbox.proxy", "error": "unsupported_proxy_method"} | |
| result = self.proxy.request(method, str(args["url"]), args.get("body")) | |
| result["action"] = "sandbox.proxy" | |
| return result | |
| def _env_create(self, args: dict) -> dict: | |
| result = self.env_manager.create(str(args["name"]), args.get("from_snapshot")) | |
| result["action"] = "sandbox.env.create" | |
| return result | |
| def _env_run(self, args: dict) -> dict: | |
| argv = args["argv"] | |
| if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv): | |
| return {"ok": False, "action": "sandbox.env.run", "error": "argv_must_be_string_list"} | |
| result = self.env_manager.run(str(args["name"]), argv, timeout_s=args.get("timeout_s")) | |
| result["action"] = "sandbox.env.run" | |
| return result | |
| def _env_run_detached(self, args: dict) -> dict: | |
| argv = args["argv"] | |
| if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv): | |
| return {"ok": False, "action": "sandbox.env.run_detached", "error": "argv_must_be_string_list"} | |
| result = self.env_manager.run_detached(str(args["name"]), argv, timeout_s=args.get("timeout_s")) | |
| result["action"] = "sandbox.env.run_detached" | |
| return result | |
| def _env_job_status(self, args: dict) -> dict: | |
| result = self.env_manager.job_status(str(args["job_id"])) | |
| result["action"] = "sandbox.env.job_status" | |
| return result | |
| def _env_job_stream(self, args: dict) -> dict: | |
| result = self.env_manager.job_stream( | |
| str(args["job_id"]), | |
| stream=str(args.get("stream", "stdout")), | |
| offset=int(args.get("offset", 0)), | |
| max_bytes=int(args.get("max_bytes", 64_000)), | |
| ) | |
| result["action"] = "sandbox.env.job_stream" | |
| return result | |
| def _env_copy_in(self, args: dict) -> dict: | |
| result = self.env_manager.copy_in(str(args["name"]), str(args["src"]), str(args["dest"])) | |
| result["action"] = "sandbox.env.copy_in" | |
| return result | |
| def _env_copy_out(self, args: dict) -> dict: | |
| result = self.env_manager.copy_out(str(args["name"]), str(args["src"]), str(args["dest"])) | |
| result["action"] = "sandbox.env.copy_out" | |
| return result | |
| def _env_file_put(self, args: dict) -> dict: | |
| result = self.env_manager.file_put( | |
| str(args["name"]), | |
| str(args["path"]), | |
| str(args["content"]), | |
| encoding=str(args.get("encoding", "utf-8")), | |
| ) | |
| result["action"] = "sandbox.env.file_put" | |
| return result | |
| def _env_file_get(self, args: dict) -> dict: | |
| result = self.env_manager.file_get( | |
| str(args["name"]), | |
| str(args["path"]), | |
| encoding=str(args.get("encoding", "utf-8")), | |
| ) | |
| result["action"] = "sandbox.env.file_get" | |
| return result | |
| def _env_snapshot(self, args: dict) -> dict: | |
| result = self.env_manager.save_snapshot(str(args["name"]), str(args["snapshot"])) | |
| result["action"] = "sandbox.env.snapshot" | |
| return result | |
| def _env_restore(self, args: dict) -> dict: | |
| result = self.env_manager.restore_snapshot(str(args["name"]), str(args["snapshot"]), replace=bool(args.get("replace", True))) | |
| result["action"] = "sandbox.env.restore" | |
| return result | |
| def _env_fork(self, args: dict) -> dict: | |
| result = self.env_manager.fork(str(args["source"]), str(args["child"])) | |
| result["action"] = "sandbox.env.fork" | |
| return result | |
| def _env_stop(self, args: dict) -> dict: | |
| result = self.env_manager.stop(str(args["name"]), snapshot=bool(args.get("snapshot", True))) | |
| result["action"] = "sandbox.env.stop" | |
| return result | |
| def _env_dashboard(self, args: dict) -> dict: | |
| return {"ok": True, "action": "sandbox.env.dashboard", "result": self.env_manager.dashboard()} | |
| def _env_resources(self, args: dict) -> dict: | |
| return {"ok": True, "action": "sandbox.env.resources", "result": self.env_manager.resource_report()} | |
| def _fs_write(self, args: dict) -> dict: | |
| if not self.policy.allow_write: | |
| return {"ok": False, "action": "fs.write", "error": "write_disabled"} | |
| content = str(args["content"]) | |
| if len(content.encode("utf-8")) > self.policy.max_write_bytes: | |
| return {"ok": False, "action": "fs.write", "error": "write_too_large"} | |
| result = self.runtime.write_file(str(args["path"]), content) | |
| result["action"] = "fs.write" | |
| return result | |
| def _fs_read(self, args: dict) -> dict: | |
| if not self.policy.allow_read: | |
| return {"ok": False, "action": "fs.read", "error": "read_disabled"} | |
| result = self.runtime.read_file(str(args["path"])) | |
| result["action"] = "fs.read" | |
| return result | |
| def _project_create(self, args: dict) -> dict: | |
| if not self.policy.allow_project_create: | |
| return {"ok": False, "action": "project.create", "error": "project_create_disabled"} | |
| files = args["files"] | |
| if not isinstance(files, dict): | |
| return {"ok": False, "action": "project.create", "error": "files_must_be_object"} | |
| if len(files) > self.policy.max_files_per_project: | |
| return {"ok": False, "action": "project.create", "error": "too_many_files"} | |
| total_bytes = sum(len(str(content).encode("utf-8")) for content in files.values()) | |
| if total_bytes > self.policy.max_write_bytes: | |
| return {"ok": False, "action": "project.create", "error": "project_too_large"} | |
| result = self.runtime.create_project(str(args["name"]), {str(k): str(v) for k, v in files.items()}) | |
| result["action"] = "project.create" | |
| return result | |
| def _cmd_run(self, args: dict) -> dict: | |
| if not self.policy.allow_cmd: | |
| return {"ok": False, "action": "cmd.run", "error": "cmd_disabled"} | |
| argv = args["argv"] | |
| if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv): | |
| return {"ok": False, "action": "cmd.run", "error": "argv_must_be_string_list"} | |
| executable = Path(argv[0]).name.lower() if argv else "" | |
| if executable not in self.policy.command_allowlist: | |
| return {"ok": False, "action": "cmd.run", "error": "command_not_allowlisted"} | |
| result = self.runtime.run_cmd(argv, timeout_s=self.policy.cmd_timeout_s) | |
| result["action"] = "cmd.run" | |
| return result | |
| def _manifest(self, args: dict) -> dict: | |
| return {"ok": True, "action": "core.manifest", "result": self.manifest()} | |
Xet Storage Details
- Size:
- 22.9 kB
- Xet hash:
- c46ecdf6f79ce864509568348795684f7d6c822c54c0c51f0cb2b691796960de
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.