bbkdevops's picture
download
raw
22.9 kB
"""Policy-gated Sandbox Tool Core for TinyMind agents.
The core exposes a small, auditable tool registry over SandboxRLRuntime. It is
designed for training/eval evidence: every call has a stable request id, input
hash, output hash, policy snapshot, and pass/fail status.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
import hashlib
import json
from pathlib import Path
import time
from typing import Any, Callable
from model.sandbox_rl import ALLOWED_CMD, SandboxRLRuntime
from model.sandbox_proxy import LocalNetworkProxy, SandboxProxyPolicy, snapshot_workspace
from model.sandbox_environment import SandboxEnvironmentManager, SandboxEnvironmentPolicy
from train.adapter_compactor import choose_lora_rank_plan, compact_lora_adapter
def _canonical_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def _sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
@dataclass(frozen=True)
class SandboxToolPolicy:
allow_cmd: bool = True
allow_write: bool = True
allow_project_create: bool = True
allow_lua: bool = True
allow_read: bool = True
max_write_bytes: int = 1_000_000
max_files_per_project: int = 128
cmd_timeout_s: float = 5.0
command_allowlist: tuple[str, ...] = tuple(sorted(ALLOWED_CMD))
allow_network_proxy: bool = True
allow_public_internet: bool = False
allow_model_weight_ops: bool = True
network_timeout_s: float = 3.0
max_proxy_response_bytes: int = 256_000
@dataclass(frozen=True)
class SandboxToolSpec:
name: str
description: str
required_args: tuple[str, ...]
mutates_filesystem: bool = False
executes_process: bool = False
def to_dict(self) -> dict:
return asdict(self)
class SandboxToolCore:
"""Single entry point for safe TinyMind tool calls."""
def __init__(
self,
root: str | Path,
policy: SandboxToolPolicy | None = None,
ledger_name: str = "sandbox_tool_core_ledger.jsonl",
):
self.root = Path(root).resolve()
self.root.mkdir(parents=True, exist_ok=True)
self.policy = policy or SandboxToolPolicy()
self.runtime = SandboxRLRuntime(self.root)
self.ledger_path = self.root / ledger_name
self.proxy = LocalNetworkProxy(
self.root,
policy=SandboxProxyPolicy(
allow_public_internet=self.policy.allow_public_internet,
timeout_s=self.policy.network_timeout_s,
max_response_bytes=self.policy.max_proxy_response_bytes,
),
)
self.env_manager = SandboxEnvironmentManager(
self.root / "isolated_envs",
policy=SandboxEnvironmentPolicy(
command_allowlist=self.policy.command_allowlist,
timeout_s=self.policy.cmd_timeout_s,
max_runtime_duration_s=self.policy.cmd_timeout_s,
),
)
self._handlers: dict[str, Callable[[dict], dict]] = {
"lua.eval": self._lua_eval,
"sandbox.run_code": self._run_code,
"sandbox.proxy": self._proxy_request,
"sandbox.env.create": self._env_create,
"sandbox.env.run": self._env_run,
"sandbox.env.run_detached": self._env_run_detached,
"sandbox.env.job_status": self._env_job_status,
"sandbox.env.job_stream": self._env_job_stream,
"sandbox.env.copy_in": self._env_copy_in,
"sandbox.env.copy_out": self._env_copy_out,
"sandbox.env.file_put": self._env_file_put,
"sandbox.env.file_get": self._env_file_get,
"sandbox.env.snapshot": self._env_snapshot,
"sandbox.env.restore": self._env_restore,
"sandbox.env.fork": self._env_fork,
"sandbox.env.stop": self._env_stop,
"sandbox.env.dashboard": self._env_dashboard,
"sandbox.env.resources": self._env_resources,
"fs.write": self._fs_write,
"fs.read": self._fs_read,
"project.create": self._project_create,
"cmd.run": self._cmd_run,
"core.manifest": self._manifest,
}
self._specs = {
"lua.eval": SandboxToolSpec("lua.eval", "Evaluate the safe Lua arithmetic subset.", ("code",)),
"sandbox.run_code": SandboxToolSpec(
"sandbox.run_code",
"Run safe Lua with sandbox_http_get/post helpers routed through the local proxy.",
("code",),
),
"sandbox.proxy": SandboxToolSpec(
"sandbox.proxy",
"Issue a policy-gated HTTP request to localhost/private LAN through the sandbox proxy.",
("method", "url"),
),
"sandbox.env.create": SandboxToolSpec(
"sandbox.env.create",
"Create an isolated filesystem sandbox environment.",
("name",),
True,
),
"sandbox.env.run": SandboxToolSpec(
"sandbox.env.run",
"Run an allowlisted command inside an isolated sandbox environment.",
("name", "argv"),
True,
True,
),
"sandbox.env.run_detached": SandboxToolSpec(
"sandbox.env.run_detached",
"Start an allowlisted command and stream stdout/stderr logs later.",
("name", "argv"),
True,
True,
),
"sandbox.env.job_status": SandboxToolSpec(
"sandbox.env.job_status",
"Return status for a detached sandbox command.",
("job_id",),
),
"sandbox.env.job_stream": SandboxToolSpec(
"sandbox.env.job_stream",
"Read a chunk from a detached command stdout/stderr stream.",
("job_id",),
),
"sandbox.env.copy_in": SandboxToolSpec(
"sandbox.env.copy_in",
"Copy a file or directory from host into an isolated sandbox environment.",
("name", "src", "dest"),
True,
),
"sandbox.env.copy_out": SandboxToolSpec(
"sandbox.env.copy_out",
"Copy a file or directory from an isolated sandbox environment to host.",
("name", "src", "dest"),
),
"sandbox.env.file_put": SandboxToolSpec(
"sandbox.env.file_put",
"Upload text/base64 content directly into a sandbox file.",
("name", "path", "content"),
True,
),
"sandbox.env.file_get": SandboxToolSpec(
"sandbox.env.file_get",
"Download text/base64 content directly from a sandbox file.",
("name", "path"),
),
"sandbox.env.snapshot": SandboxToolSpec(
"sandbox.env.snapshot",
"Save an isolated sandbox environment snapshot for reuse.",
("name", "snapshot"),
True,
),
"sandbox.env.restore": SandboxToolSpec(
"sandbox.env.restore",
"Restore an isolated sandbox environment from a saved snapshot.",
("name", "snapshot"),
True,
),
"sandbox.env.fork": SandboxToolSpec(
"sandbox.env.fork",
"Create a child sandbox from the latest snapshot of another sandbox.",
("source", "child"),
True,
),
"sandbox.env.stop": SandboxToolSpec(
"sandbox.env.stop",
"Stop a sandbox and optionally write an automatic snapshot.",
("name",),
True,
),
"sandbox.env.dashboard": SandboxToolSpec(
"sandbox.env.dashboard",
"Return observability dashboard data for sandboxes, commands, snapshots, and usage.",
(),
),
"sandbox.env.resources": SandboxToolSpec(
"sandbox.env.resources",
"Return sandbox resource accounting and limits.",
(),
),
"fs.write": SandboxToolSpec("fs.write", "Write UTF-8 text inside the sandbox root.", ("path", "content"), True),
"fs.read": SandboxToolSpec("fs.read", "Read UTF-8 text from inside the sandbox root.", ("path",)),
"project.create": SandboxToolSpec(
"project.create",
"Create a project directory from a mapping of relative files.",
("name", "files"),
True,
),
"cmd.run": SandboxToolSpec(
"cmd.run",
"Run an allowlisted command without shell expansion inside the sandbox root.",
("argv",),
False,
True,
),
"core.manifest": SandboxToolSpec("core.manifest", "Return tool specs and active policy.", ()),
}
def manifest(self) -> dict:
return {
"schema_version": "tinymind-sandbox-tool-core-v1",
"root": str(self.root),
"policy": asdict(self.policy),
"tools": {name: spec.to_dict() for name, spec in sorted(self._specs.items())},
"guarantees": {
"path_containment": True,
"shell_expansion": False,
"audit_hashes": True,
"workspace_snapshot_before_run_code": True,
"network_default": "not_exposed",
"proxy_allowed_destinations": "localhost, loopback, RFC1918 private LAN, .local",
"public_internet_default_allowed": self.policy.allow_public_internet,
"isolated_env_snapshots": True,
"resource_accounting": True,
"detached_command_streaming": True,
"file_api": True,
"fork_from_latest_snapshot": True,
"stop_auto_snapshot": True,
"lua_evo_weight_ops": self.policy.allow_model_weight_ops,
"raw_gguf_patch_without_parser": False,
},
}
def call(self, tool: str, args: dict | None = None) -> dict:
args = args or {}
started = time.time()
request_payload = {"tool": tool, "args": args}
input_hash = _sha256_text(_canonical_json(request_payload))
request_id = input_hash[:16]
if tool not in self._handlers:
result = {"ok": False, "action": tool, "error": "unknown_tool"}
else:
missing = [name for name in self._specs[tool].required_args if name not in args]
if missing:
result = {"ok": False, "action": tool, "error": "missing_args", "missing": missing}
else:
try:
result = self._handlers[tool](args)
except Exception as exc: # defensive boundary for agent-facing calls
result = {"ok": False, "action": tool, "error": f"tool_exception:{type(exc).__name__}"}
elapsed_ms = (time.time() - started) * 1000.0
output_hash = _sha256_text(_canonical_json(result))
record = {
"request_id": request_id,
"timestamp": time.time(),
"tool": tool,
"input_sha256": input_hash,
"output_sha256": output_hash,
"elapsed_ms": elapsed_ms,
"policy": asdict(self.policy),
"result": result,
"sandboxed": True,
}
with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f:
f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
response = dict(result)
response.update(
{
"request_id": request_id,
"input_sha256": input_hash,
"output_sha256": output_hash,
"elapsed_ms": elapsed_ms,
"sandboxed": True,
}
)
return response
def _lua_eval(self, args: dict) -> dict:
if not self.policy.allow_lua:
return {"ok": False, "action": "lua.eval", "error": "lua_disabled"}
result = self.runtime.run_lua(str(args["code"]))
result["action"] = "lua.eval"
return result
def _run_code(self, args: dict) -> dict:
if not self.policy.allow_lua:
return {"ok": False, "action": "sandbox.run_code", "error": "lua_disabled"}
before = snapshot_workspace(self.root, max_files=512)
def http_handler(method: str, url: str, body: str | None) -> dict:
if not self.policy.allow_network_proxy:
return {"ok": False, "method": method, "url": url, "error": "sandbox_proxy_disabled"}
return self.proxy.request(method, url, body)
def evo_handler(method: str, values: tuple[Any, ...]) -> Any:
if not self.policy.allow_model_weight_ops:
return {"ok": False, "error": "model_weight_ops_disabled"}
if method == "compact_lora":
adapter, out, target_rank = values
report = compact_lora_adapter(adapter, out, int(target_rank))
return {
"ok": True,
"method": "compact_lora",
"manifest_path": report["manifest_path"],
"output_adapter": report["output_adapter"],
"target_rank": report["target_rank"],
"size": report["size"],
"claim_gate": report["claim_gate"],
}
if method == "choose_lora_rank":
adapter, min_energy_retained = values
plan = choose_lora_rank_plan(adapter, float(min_energy_retained))
plan["ok"] = True
return plan
if method == "compact_lora_auto":
adapter, out, min_energy_retained = values
plan = choose_lora_rank_plan(adapter, float(min_energy_retained))
report = compact_lora_adapter(
adapter,
out,
int(plan["recommended_rank"]),
adaptive_rank_plan=plan,
)
return {
"ok": True,
"method": "compact_lora_auto",
"adaptive_rank": True,
"rank_plan": plan,
"manifest_path": report["manifest_path"],
"output_adapter": report["output_adapter"],
"target_rank": report["target_rank"],
"global_energy_retained_ratio": report["global_energy_retained_ratio"],
"size": report["size"],
"claim_gate": report["claim_gate"],
}
if method == "patch_gguf":
gguf = values[0]
return {
"ok": False,
"method": "patch_gguf",
"gguf": str(gguf),
"error": "raw_gguf_patch_blocked_parser_required",
"safe_path": "Use trained adapter export or a GGUF tensor parser with checksum validation before writing GGUF weights.",
}
return {"ok": False, "error": "unknown_evo_weight_method"}
result = self.runtime.run_lua(str(args["code"]), http_handler=http_handler, evo_handler=evo_handler)
result["action"] = "sandbox.run_code"
result["workspace_snapshot_before"] = before
result["proxy_ledger_path"] = str(self.proxy.ledger_path)
return result
def _proxy_request(self, args: dict) -> dict:
if not self.policy.allow_network_proxy:
return {"ok": False, "action": "sandbox.proxy", "error": "sandbox_proxy_disabled"}
method = str(args["method"]).upper()
if method not in {"GET", "POST"}:
return {"ok": False, "action": "sandbox.proxy", "error": "unsupported_proxy_method"}
result = self.proxy.request(method, str(args["url"]), args.get("body"))
result["action"] = "sandbox.proxy"
return result
def _env_create(self, args: dict) -> dict:
result = self.env_manager.create(str(args["name"]), args.get("from_snapshot"))
result["action"] = "sandbox.env.create"
return result
def _env_run(self, args: dict) -> dict:
argv = args["argv"]
if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv):
return {"ok": False, "action": "sandbox.env.run", "error": "argv_must_be_string_list"}
result = self.env_manager.run(str(args["name"]), argv, timeout_s=args.get("timeout_s"))
result["action"] = "sandbox.env.run"
return result
def _env_run_detached(self, args: dict) -> dict:
argv = args["argv"]
if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv):
return {"ok": False, "action": "sandbox.env.run_detached", "error": "argv_must_be_string_list"}
result = self.env_manager.run_detached(str(args["name"]), argv, timeout_s=args.get("timeout_s"))
result["action"] = "sandbox.env.run_detached"
return result
def _env_job_status(self, args: dict) -> dict:
result = self.env_manager.job_status(str(args["job_id"]))
result["action"] = "sandbox.env.job_status"
return result
def _env_job_stream(self, args: dict) -> dict:
result = self.env_manager.job_stream(
str(args["job_id"]),
stream=str(args.get("stream", "stdout")),
offset=int(args.get("offset", 0)),
max_bytes=int(args.get("max_bytes", 64_000)),
)
result["action"] = "sandbox.env.job_stream"
return result
def _env_copy_in(self, args: dict) -> dict:
result = self.env_manager.copy_in(str(args["name"]), str(args["src"]), str(args["dest"]))
result["action"] = "sandbox.env.copy_in"
return result
def _env_copy_out(self, args: dict) -> dict:
result = self.env_manager.copy_out(str(args["name"]), str(args["src"]), str(args["dest"]))
result["action"] = "sandbox.env.copy_out"
return result
def _env_file_put(self, args: dict) -> dict:
result = self.env_manager.file_put(
str(args["name"]),
str(args["path"]),
str(args["content"]),
encoding=str(args.get("encoding", "utf-8")),
)
result["action"] = "sandbox.env.file_put"
return result
def _env_file_get(self, args: dict) -> dict:
result = self.env_manager.file_get(
str(args["name"]),
str(args["path"]),
encoding=str(args.get("encoding", "utf-8")),
)
result["action"] = "sandbox.env.file_get"
return result
def _env_snapshot(self, args: dict) -> dict:
result = self.env_manager.save_snapshot(str(args["name"]), str(args["snapshot"]))
result["action"] = "sandbox.env.snapshot"
return result
def _env_restore(self, args: dict) -> dict:
result = self.env_manager.restore_snapshot(str(args["name"]), str(args["snapshot"]), replace=bool(args.get("replace", True)))
result["action"] = "sandbox.env.restore"
return result
def _env_fork(self, args: dict) -> dict:
result = self.env_manager.fork(str(args["source"]), str(args["child"]))
result["action"] = "sandbox.env.fork"
return result
def _env_stop(self, args: dict) -> dict:
result = self.env_manager.stop(str(args["name"]), snapshot=bool(args.get("snapshot", True)))
result["action"] = "sandbox.env.stop"
return result
def _env_dashboard(self, args: dict) -> dict:
return {"ok": True, "action": "sandbox.env.dashboard", "result": self.env_manager.dashboard()}
def _env_resources(self, args: dict) -> dict:
return {"ok": True, "action": "sandbox.env.resources", "result": self.env_manager.resource_report()}
def _fs_write(self, args: dict) -> dict:
if not self.policy.allow_write:
return {"ok": False, "action": "fs.write", "error": "write_disabled"}
content = str(args["content"])
if len(content.encode("utf-8")) > self.policy.max_write_bytes:
return {"ok": False, "action": "fs.write", "error": "write_too_large"}
result = self.runtime.write_file(str(args["path"]), content)
result["action"] = "fs.write"
return result
def _fs_read(self, args: dict) -> dict:
if not self.policy.allow_read:
return {"ok": False, "action": "fs.read", "error": "read_disabled"}
result = self.runtime.read_file(str(args["path"]))
result["action"] = "fs.read"
return result
def _project_create(self, args: dict) -> dict:
if not self.policy.allow_project_create:
return {"ok": False, "action": "project.create", "error": "project_create_disabled"}
files = args["files"]
if not isinstance(files, dict):
return {"ok": False, "action": "project.create", "error": "files_must_be_object"}
if len(files) > self.policy.max_files_per_project:
return {"ok": False, "action": "project.create", "error": "too_many_files"}
total_bytes = sum(len(str(content).encode("utf-8")) for content in files.values())
if total_bytes > self.policy.max_write_bytes:
return {"ok": False, "action": "project.create", "error": "project_too_large"}
result = self.runtime.create_project(str(args["name"]), {str(k): str(v) for k, v in files.items()})
result["action"] = "project.create"
return result
def _cmd_run(self, args: dict) -> dict:
if not self.policy.allow_cmd:
return {"ok": False, "action": "cmd.run", "error": "cmd_disabled"}
argv = args["argv"]
if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv):
return {"ok": False, "action": "cmd.run", "error": "argv_must_be_string_list"}
executable = Path(argv[0]).name.lower() if argv else ""
if executable not in self.policy.command_allowlist:
return {"ok": False, "action": "cmd.run", "error": "command_not_allowlisted"}
result = self.runtime.run_cmd(argv, timeout_s=self.policy.cmd_timeout_s)
result["action"] = "cmd.run"
return result
def _manifest(self, args: dict) -> dict:
return {"ok": True, "action": "core.manifest", "result": self.manifest()}

Xet Storage Details

Size:
22.9 kB
·
Xet hash:
c46ecdf6f79ce864509568348795684f7d6c822c54c0c51f0cb2b691796960de

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.