Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /model /sandbox_tool_core.py
| """Policy-gated Sandbox Tool Core for TinyMind agents. | |
| The core exposes a small, auditable tool registry over SandboxRLRuntime. It is | |
| designed for training/eval evidence: every call has a stable request id, input | |
| hash, output hash, policy snapshot, and pass/fail status. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import asdict, dataclass | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import time | |
| from typing import Any, Callable | |
| from model.sandbox_rl import ALLOWED_CMD, SandboxRLRuntime | |
| from model.sandbox_proxy import LocalNetworkProxy, SandboxProxyPolicy, snapshot_workspace | |
| from model.sandbox_environment import SandboxEnvironmentManager, SandboxEnvironmentPolicy | |
| from train.adapter_compactor import choose_lora_rank_plan, compact_lora_adapter | |
| def _canonical_json(payload: Any) -> str: | |
| return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) | |
| def _sha256_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| class SandboxToolPolicy: | |
| allow_cmd: bool = True | |
| allow_write: bool = True | |
| allow_project_create: bool = True | |
| allow_lua: bool = True | |
| allow_read: bool = True | |
| max_write_bytes: int = 1_000_000 | |
| max_files_per_project: int = 128 | |
| cmd_timeout_s: float = 5.0 | |
| command_allowlist: tuple[str, ...] = tuple(sorted(ALLOWED_CMD)) | |
| allow_network_proxy: bool = True | |
| allow_public_internet: bool = False | |
| allow_model_weight_ops: bool = True | |
| allow_cognitive_tools: bool = True | |
| network_timeout_s: float = 3.0 | |
| max_proxy_response_bytes: int = 256_000 | |
| class SandboxToolSpec: | |
| name: str | |
| description: str | |
| required_args: tuple[str, ...] | |
| mutates_filesystem: bool = False | |
| executes_process: bool = False | |
| def to_dict(self) -> dict: | |
| return asdict(self) | |
| class SandboxToolCore: | |
| """Single entry point for safe TinyMind tool calls.""" | |
| def __init__( | |
| self, | |
| root: str | Path, | |
| policy: SandboxToolPolicy | None = None, | |
| ledger_name: str = "sandbox_tool_core_ledger.jsonl", | |
| ): | |
| self.root = Path(root).resolve() | |
| self.root.mkdir(parents=True, exist_ok=True) | |
| self.policy = policy or SandboxToolPolicy() | |
| self.runtime = SandboxRLRuntime(self.root) | |
| self.ledger_path = self.root / ledger_name | |
| self.proxy = LocalNetworkProxy( | |
| self.root, | |
| policy=SandboxProxyPolicy( | |
| allow_public_internet=self.policy.allow_public_internet, | |
| timeout_s=self.policy.network_timeout_s, | |
| max_response_bytes=self.policy.max_proxy_response_bytes, | |
| ), | |
| ) | |
| self.env_manager = SandboxEnvironmentManager( | |
| self.root / "isolated_envs", | |
| policy=SandboxEnvironmentPolicy( | |
| command_allowlist=self.policy.command_allowlist, | |
| timeout_s=self.policy.cmd_timeout_s, | |
| max_runtime_duration_s=self.policy.cmd_timeout_s, | |
| ), | |
| ) | |
| self._handlers: dict[str, Callable[[dict], dict]] = { | |
| "lua.eval": self._lua_eval, | |
| "sandbox.run_code": self._run_code, | |
| "sandbox.proxy": self._proxy_request, | |
| "sandbox.env.create": self._env_create, | |
| "sandbox.env.run": self._env_run, | |
| "sandbox.env.run_detached": self._env_run_detached, | |
| "sandbox.env.job_status": self._env_job_status, | |
| "sandbox.env.job_stream": self._env_job_stream, | |
| "sandbox.env.copy_in": self._env_copy_in, | |
| "sandbox.env.copy_out": self._env_copy_out, | |
| "sandbox.env.file_put": self._env_file_put, | |
| "sandbox.env.file_get": self._env_file_get, | |
| "sandbox.env.snapshot": self._env_snapshot, | |
| "sandbox.env.restore": self._env_restore, | |
| "sandbox.env.fork": self._env_fork, | |
| "sandbox.env.stop": self._env_stop, | |
| "sandbox.env.dashboard": self._env_dashboard, | |
| "sandbox.env.resources": self._env_resources, | |
| "fs.write": self._fs_write, | |
| "fs.read": self._fs_read, | |
| "project.create": self._project_create, | |
| "cmd.run": self._cmd_run, | |
| "core.manifest": self._manifest, | |
| } | |
| self._specs = { | |
| "lua.eval": SandboxToolSpec("lua.eval", "Evaluate the safe Lua arithmetic subset.", ("code",)), | |
| "sandbox.run_code": SandboxToolSpec( | |
| "sandbox.run_code", | |
| "Run safe Lua with proxy, Evo, DeepResearch, DeepRL, and Canvas helpers routed through policy gates.", | |
| ("code",), | |
| ), | |
| "sandbox.proxy": SandboxToolSpec( | |
| "sandbox.proxy", | |
| "Issue a policy-gated HTTP request to localhost/private LAN through the sandbox proxy.", | |
| ("method", "url"), | |
| ), | |
| "sandbox.env.create": SandboxToolSpec( | |
| "sandbox.env.create", | |
| "Create an isolated filesystem sandbox environment.", | |
| ("name",), | |
| True, | |
| ), | |
| "sandbox.env.run": SandboxToolSpec( | |
| "sandbox.env.run", | |
| "Run an allowlisted command inside an isolated sandbox environment.", | |
| ("name", "argv"), | |
| True, | |
| True, | |
| ), | |
| "sandbox.env.run_detached": SandboxToolSpec( | |
| "sandbox.env.run_detached", | |
| "Start an allowlisted command and stream stdout/stderr logs later.", | |
| ("name", "argv"), | |
| True, | |
| True, | |
| ), | |
| "sandbox.env.job_status": SandboxToolSpec( | |
| "sandbox.env.job_status", | |
| "Return status for a detached sandbox command.", | |
| ("job_id",), | |
| ), | |
| "sandbox.env.job_stream": SandboxToolSpec( | |
| "sandbox.env.job_stream", | |
| "Read a chunk from a detached command stdout/stderr stream.", | |
| ("job_id",), | |
| ), | |
| "sandbox.env.copy_in": SandboxToolSpec( | |
| "sandbox.env.copy_in", | |
| "Copy a file or directory from host into an isolated sandbox environment.", | |
| ("name", "src", "dest"), | |
| True, | |
| ), | |
| "sandbox.env.copy_out": SandboxToolSpec( | |
| "sandbox.env.copy_out", | |
| "Copy a file or directory from an isolated sandbox environment to host.", | |
| ("name", "src", "dest"), | |
| ), | |
| "sandbox.env.file_put": SandboxToolSpec( | |
| "sandbox.env.file_put", | |
| "Upload text/base64 content directly into a sandbox file.", | |
| ("name", "path", "content"), | |
| True, | |
| ), | |
| "sandbox.env.file_get": SandboxToolSpec( | |
| "sandbox.env.file_get", | |
| "Download text/base64 content directly from a sandbox file.", | |
| ("name", "path"), | |
| ), | |
| "sandbox.env.snapshot": SandboxToolSpec( | |
| "sandbox.env.snapshot", | |
| "Save an isolated sandbox environment snapshot for reuse.", | |
| ("name", "snapshot"), | |
| True, | |
| ), | |
| "sandbox.env.restore": SandboxToolSpec( | |
| "sandbox.env.restore", | |
| "Restore an isolated sandbox environment from a saved snapshot.", | |
| ("name", "snapshot"), | |
| True, | |
| ), | |
| "sandbox.env.fork": SandboxToolSpec( | |
| "sandbox.env.fork", | |
| "Create a child sandbox from the latest snapshot of another sandbox.", | |
| ("source", "child"), | |
| True, | |
| ), | |
| "sandbox.env.stop": SandboxToolSpec( | |
| "sandbox.env.stop", | |
| "Stop a sandbox and optionally write an automatic snapshot.", | |
| ("name",), | |
| True, | |
| ), | |
| "sandbox.env.dashboard": SandboxToolSpec( | |
| "sandbox.env.dashboard", | |
| "Return observability dashboard data for sandboxes, commands, snapshots, and usage.", | |
| (), | |
| ), | |
| "sandbox.env.resources": SandboxToolSpec( | |
| "sandbox.env.resources", | |
| "Return sandbox resource accounting and limits.", | |
| (), | |
| ), | |
| "fs.write": SandboxToolSpec("fs.write", "Write UTF-8 text inside the sandbox root.", ("path", "content"), True), | |
| "fs.read": SandboxToolSpec("fs.read", "Read UTF-8 text from inside the sandbox root.", ("path",)), | |
| "project.create": SandboxToolSpec( | |
| "project.create", | |
| "Create a project directory from a mapping of relative files.", | |
| ("name", "files"), | |
| True, | |
| ), | |
| "cmd.run": SandboxToolSpec( | |
| "cmd.run", | |
| "Run an allowlisted command without shell expansion inside the sandbox root.", | |
| ("argv",), | |
| False, | |
| True, | |
| ), | |
| "core.manifest": SandboxToolSpec("core.manifest", "Return tool specs and active policy.", ()), | |
| } | |
| def manifest(self) -> dict: | |
| return { | |
| "schema_version": "tinymind-sandbox-tool-core-v1", | |
| "root": str(self.root), | |
| "policy": asdict(self.policy), | |
| "tools": {name: spec.to_dict() for name, spec in sorted(self._specs.items())}, | |
| "guarantees": { | |
| "path_containment": True, | |
| "shell_expansion": False, | |
| "audit_hashes": True, | |
| "workspace_snapshot_before_run_code": True, | |
| "network_default": "not_exposed", | |
| "proxy_allowed_destinations": "localhost, loopback, RFC1918 private LAN, .local", | |
| "public_internet_default_allowed": self.policy.allow_public_internet, | |
| "isolated_env_snapshots": True, | |
| "resource_accounting": True, | |
| "detached_command_streaming": True, | |
| "file_api": True, | |
| "fork_from_latest_snapshot": True, | |
| "stop_auto_snapshot": True, | |
| "lua_evo_weight_ops": self.policy.allow_model_weight_ops, | |
| "lua_deepresearch_deeprl_canvas_tools": self.policy.allow_cognitive_tools, | |
| "raw_gguf_patch_without_parser": False, | |
| }, | |
| } | |
| def call(self, tool: str, args: dict | None = None) -> dict: | |
| args = args or {} | |
| started = time.time() | |
| request_payload = {"tool": tool, "args": args} | |
| input_hash = _sha256_text(_canonical_json(request_payload)) | |
| request_id = input_hash[:16] | |
| if tool not in self._handlers: | |
| result = {"ok": False, "action": tool, "error": "unknown_tool"} | |
| else: | |
| missing = [name for name in self._specs[tool].required_args if name not in args] | |
| if missing: | |
| result = {"ok": False, "action": tool, "error": "missing_args", "missing": missing} | |
| else: | |
| try: | |
| result = self._handlers[tool](args) | |
| except Exception as exc: # defensive boundary for agent-facing calls | |
| result = {"ok": False, "action": tool, "error": f"tool_exception:{type(exc).__name__}"} | |
| elapsed_ms = (time.time() - started) * 1000.0 | |
| output_hash = _sha256_text(_canonical_json(result)) | |
| record = { | |
| "request_id": request_id, | |
| "timestamp": time.time(), | |
| "tool": tool, | |
| "input_sha256": input_hash, | |
| "output_sha256": output_hash, | |
| "elapsed_ms": elapsed_ms, | |
| "policy": asdict(self.policy), | |
| "result": result, | |
| "sandboxed": True, | |
| } | |
| with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f: | |
| f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") | |
| response = dict(result) | |
| response.update( | |
| { | |
| "request_id": request_id, | |
| "input_sha256": input_hash, | |
| "output_sha256": output_hash, | |
| "elapsed_ms": elapsed_ms, | |
| "sandboxed": True, | |
| } | |
| ) | |
| return response | |
| def _lua_eval(self, args: dict) -> dict: | |
| if not self.policy.allow_lua: | |
| return {"ok": False, "action": "lua.eval", "error": "lua_disabled"} | |
| result = self.runtime.run_lua(str(args["code"])) | |
| result["action"] = "lua.eval" | |
| return result | |
| def _run_code(self, args: dict) -> dict: | |
| if not self.policy.allow_lua: | |
| return {"ok": False, "action": "sandbox.run_code", "error": "lua_disabled"} | |
| before = snapshot_workspace(self.root, max_files=512) | |
| def http_handler(method: str, url: str, body: str | None) -> dict: | |
| if not self.policy.allow_network_proxy: | |
| return {"ok": False, "method": method, "url": url, "error": "sandbox_proxy_disabled"} | |
| return self.proxy.request(method, url, body) | |
| def evo_handler(method: str, values: tuple[Any, ...]) -> Any: | |
| if not self.policy.allow_model_weight_ops: | |
| return {"ok": False, "error": "model_weight_ops_disabled"} | |
| if method == "compact_lora": | |
| adapter, out, target_rank = values | |
| report = compact_lora_adapter(adapter, out, int(target_rank)) | |
| return { | |
| "ok": True, | |
| "method": "compact_lora", | |
| "manifest_path": report["manifest_path"], | |
| "output_adapter": report["output_adapter"], | |
| "target_rank": report["target_rank"], | |
| "size": report["size"], | |
| "claim_gate": report["claim_gate"], | |
| } | |
| if method == "choose_lora_rank": | |
| adapter, min_energy_retained = values | |
| plan = choose_lora_rank_plan(adapter, float(min_energy_retained)) | |
| plan["ok"] = True | |
| return plan | |
| if method == "compact_lora_auto": | |
| adapter, out, min_energy_retained = values | |
| plan = choose_lora_rank_plan(adapter, float(min_energy_retained)) | |
| report = compact_lora_adapter( | |
| adapter, | |
| out, | |
| int(plan["recommended_rank"]), | |
| adaptive_rank_plan=plan, | |
| ) | |
| return { | |
| "ok": True, | |
| "method": "compact_lora_auto", | |
| "adaptive_rank": True, | |
| "rank_plan": plan, | |
| "manifest_path": report["manifest_path"], | |
| "output_adapter": report["output_adapter"], | |
| "target_rank": report["target_rank"], | |
| "global_energy_retained_ratio": report["global_energy_retained_ratio"], | |
| "size": report["size"], | |
| "claim_gate": report["claim_gate"], | |
| } | |
| if method == "patch_gguf": | |
| gguf = values[0] | |
| return { | |
| "ok": False, | |
| "method": "patch_gguf", | |
| "gguf": str(gguf), | |
| "error": "raw_gguf_patch_blocked_parser_required", | |
| "safe_path": "Use trained adapter export or a GGUF tensor parser with checksum validation before writing GGUF weights.", | |
| } | |
| return {"ok": False, "error": "unknown_evo_weight_method"} | |
| def tool_handler(method: str, values: tuple[Any, ...]) -> Any: | |
| if not self.policy.allow_cognitive_tools: | |
| return {"ok": False, "error": "cognitive_tools_disabled"} | |
| if method == "deepresearch_plan": | |
| query, depth = values | |
| return self._deepresearch_plan(str(query), str(depth)) | |
| if method == "deeprl_choose": | |
| state, actions, rewards = values | |
| return self._deeprl_choose(str(state), str(actions), str(rewards)) | |
| if method == "canvas_create": | |
| (title,) = values | |
| return self._canvas_create(str(title)) | |
| if method == "canvas_add_node": | |
| canvas, node_id, label, kind = values | |
| return self._canvas_add_node(canvas, str(node_id), str(label), str(kind)) | |
| if method == "canvas_link": | |
| canvas, source, target, relation = values | |
| return self._canvas_link(canvas, str(source), str(target), str(relation)) | |
| if method == "canvas_snapshot": | |
| (canvas,) = values | |
| return self._canvas_snapshot(canvas) | |
| if method == "tool_introspect": | |
| return { | |
| "ok": True, | |
| "tools": [ | |
| "sandbox_deepresearch_plan", | |
| "sandbox_deeprl_choose", | |
| "sandbox_canvas_create", | |
| "sandbox_canvas_add_node", | |
| "sandbox_canvas_link", | |
| "sandbox_canvas_snapshot", | |
| ], | |
| "policy": { | |
| "public_internet_allowed": self.policy.allow_public_internet, | |
| "path_containment": True, | |
| "audited": True, | |
| }, | |
| } | |
| return {"ok": False, "error": "unknown_cognitive_tool"} | |
| result = self.runtime.run_lua( | |
| str(args["code"]), | |
| http_handler=http_handler, | |
| evo_handler=evo_handler, | |
| tool_handler=tool_handler, | |
| ) | |
| result["action"] = "sandbox.run_code" | |
| result["workspace_snapshot_before"] = before | |
| result["proxy_ledger_path"] = str(self.proxy.ledger_path) | |
| return result | |
| def _deepresearch_plan(self, query: str, depth: str) -> dict: | |
| depth = depth.lower().strip() or "standard" | |
| step_count = {"fast": 4, "standard": 6, "deep": 8, "frontier": 10}.get(depth, 6) | |
| base_steps = [ | |
| ("intent", "Restate the task, required evidence, and forbidden claims."), | |
| ("source_map", "List primary sources, local files, benchmarks, and exact artifacts to inspect."), | |
| ("retrieve", "Use approved local/proxy retrieval only; public internet remains blocked unless policy changes."), | |
| ("triangulate", "Compare at least two independent evidence paths before synthesis."), | |
| ("sandbox_verify", "Run commands/tests in isolated sandbox and snapshot state before execution."), | |
| ("synthesize", "Write answer with uncertainty, raw evidence paths, and next actions."), | |
| ("red_team", "Attack the conclusion for leakage, stale cache, and unsupported world-best claims."), | |
| ("regression", "Persist checks so future runs can reproduce the claim gate."), | |
| ("handoff", "Emit machine-readable JSON/Markdown evidence for UI and training."), | |
| ("self_assess", "Score confidence, evidence coverage, and remaining blockers."), | |
| ] | |
| selected = base_steps[:step_count] | |
| fingerprint = _sha256_text(_canonical_json({"query": query, "depth": depth, "steps": selected})) | |
| return { | |
| "ok": True, | |
| "method": "deepresearch_plan", | |
| "query": query, | |
| "depth": depth, | |
| "steps": [{"id": idx + 1, "phase": phase, "action": action} for idx, (phase, action) in enumerate(selected)], | |
| "evidence_policy": { | |
| "exact_artifacts_required": True, | |
| "public_internet_default": self.policy.allow_public_internet, | |
| "claim_gate_required": True, | |
| }, | |
| "plan_sha256": fingerprint, | |
| } | |
| def _deeprl_choose(self, state: str, actions: str, rewards: str) -> dict: | |
| action_list = [item.strip() for item in actions.split("|") if item.strip()] | |
| if not action_list: | |
| return {"ok": False, "error": "empty_actions"} | |
| reward_values: list[float] = [] | |
| if rewards.strip(): | |
| for item in rewards.split("|"): | |
| try: | |
| reward_values.append(float(item.strip())) | |
| except ValueError: | |
| reward_values.append(0.0) | |
| scores = [] | |
| for idx, action in enumerate(action_list): | |
| reward = reward_values[idx] if idx < len(reward_values) else 0.0 | |
| low = action.lower() | |
| evidence_bonus = 0.0 | |
| for marker in ("test", "verify", "evidence", "snapshot", "hash", "audit", "probe", "eval"): | |
| if marker in low: | |
| evidence_bonus += 0.15 | |
| risk_penalty = 0.35 if any(marker in low for marker in ("unrestricted", "claim world", "disable gate")) else 0.0 | |
| score = reward + evidence_bonus - risk_penalty + min(len(state), 512) / 4096.0 | |
| scores.append({"index": idx, "action": action, "score": round(score, 6)}) | |
| selected = max(scores, key=lambda row: (row["score"], -row["index"])) | |
| return { | |
| "ok": True, | |
| "method": "deeprl_choose", | |
| "state_sha256": _sha256_text(state), | |
| "scores": scores, | |
| "selected_index": selected["index"], | |
| "selected_action": selected["action"], | |
| "policy": "deterministic_evidence_weighted_bandit_v1", | |
| } | |
| def _canvas_create(self, title: str) -> dict: | |
| return { | |
| "ok": True, | |
| "schema": "tinymind.canvas.v1", | |
| "title": title, | |
| "nodes": [], | |
| "edges": [], | |
| "invariants": ["acyclic_preferred", "evidence_edges_required_for_claims"], | |
| } | |
| def _canvas_add_node(self, canvas: Any, node_id: str, label: str, kind: str) -> dict: | |
| if not isinstance(canvas, dict) or canvas.get("schema") != "tinymind.canvas.v1": | |
| return {"ok": False, "error": "bad_canvas"} | |
| updated = json.loads(json.dumps(canvas, ensure_ascii=False)) | |
| if any(node.get("id") == node_id for node in updated.get("nodes", [])): | |
| return {"ok": False, "error": "duplicate_node"} | |
| updated.setdefault("nodes", []).append({"id": node_id, "label": label, "kind": kind}) | |
| updated["ok"] = True | |
| return updated | |
| def _canvas_link(self, canvas: Any, source: str, target: str, relation: str) -> dict: | |
| if not isinstance(canvas, dict) or canvas.get("schema") != "tinymind.canvas.v1": | |
| return {"ok": False, "error": "bad_canvas"} | |
| node_ids = {node.get("id") for node in canvas.get("nodes", [])} | |
| if source not in node_ids or target not in node_ids: | |
| return {"ok": False, "error": "missing_node"} | |
| updated = json.loads(json.dumps(canvas, ensure_ascii=False)) | |
| updated.setdefault("edges", []).append({"source": source, "target": target, "relation": relation}) | |
| updated["ok"] = True | |
| return updated | |
| def _canvas_snapshot(self, canvas: Any) -> dict: | |
| if not isinstance(canvas, dict) or canvas.get("schema") != "tinymind.canvas.v1": | |
| return {"ok": False, "error": "bad_canvas"} | |
| payload = { | |
| "schema": canvas.get("schema"), | |
| "title": canvas.get("title"), | |
| "node_count": len(canvas.get("nodes", [])), | |
| "edge_count": len(canvas.get("edges", [])), | |
| "canvas_sha256": _sha256_text(_canonical_json(canvas)), | |
| } | |
| return {"ok": True, "method": "canvas_snapshot", **payload} | |
| def _proxy_request(self, args: dict) -> dict: | |
| if not self.policy.allow_network_proxy: | |
| return {"ok": False, "action": "sandbox.proxy", "error": "sandbox_proxy_disabled"} | |
| method = str(args["method"]).upper() | |
| if method not in {"GET", "POST"}: | |
| return {"ok": False, "action": "sandbox.proxy", "error": "unsupported_proxy_method"} | |
| result = self.proxy.request(method, str(args["url"]), args.get("body")) | |
| result["action"] = "sandbox.proxy" | |
| return result | |
| def _env_create(self, args: dict) -> dict: | |
| result = self.env_manager.create(str(args["name"]), args.get("from_snapshot")) | |
| result["action"] = "sandbox.env.create" | |
| return result | |
| def _env_run(self, args: dict) -> dict: | |
| argv = args["argv"] | |
| if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv): | |
| return {"ok": False, "action": "sandbox.env.run", "error": "argv_must_be_string_list"} | |
| result = self.env_manager.run(str(args["name"]), argv, timeout_s=args.get("timeout_s")) | |
| result["action"] = "sandbox.env.run" | |
| return result | |
| def _env_run_detached(self, args: dict) -> dict: | |
| argv = args["argv"] | |
| if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv): | |
| return {"ok": False, "action": "sandbox.env.run_detached", "error": "argv_must_be_string_list"} | |
| result = self.env_manager.run_detached(str(args["name"]), argv, timeout_s=args.get("timeout_s")) | |
| result["action"] = "sandbox.env.run_detached" | |
| return result | |
| def _env_job_status(self, args: dict) -> dict: | |
| result = self.env_manager.job_status(str(args["job_id"])) | |
| result["action"] = "sandbox.env.job_status" | |
| return result | |
| def _env_job_stream(self, args: dict) -> dict: | |
| result = self.env_manager.job_stream( | |
| str(args["job_id"]), | |
| stream=str(args.get("stream", "stdout")), | |
| offset=int(args.get("offset", 0)), | |
| max_bytes=int(args.get("max_bytes", 64_000)), | |
| ) | |
| result["action"] = "sandbox.env.job_stream" | |
| return result | |
| def _env_copy_in(self, args: dict) -> dict: | |
| result = self.env_manager.copy_in(str(args["name"]), str(args["src"]), str(args["dest"])) | |
| result["action"] = "sandbox.env.copy_in" | |
| return result | |
| def _env_copy_out(self, args: dict) -> dict: | |
| result = self.env_manager.copy_out(str(args["name"]), str(args["src"]), str(args["dest"])) | |
| result["action"] = "sandbox.env.copy_out" | |
| return result | |
| def _env_file_put(self, args: dict) -> dict: | |
| result = self.env_manager.file_put( | |
| str(args["name"]), | |
| str(args["path"]), | |
| str(args["content"]), | |
| encoding=str(args.get("encoding", "utf-8")), | |
| ) | |
| result["action"] = "sandbox.env.file_put" | |
| return result | |
| def _env_file_get(self, args: dict) -> dict: | |
| result = self.env_manager.file_get( | |
| str(args["name"]), | |
| str(args["path"]), | |
| encoding=str(args.get("encoding", "utf-8")), | |
| ) | |
| result["action"] = "sandbox.env.file_get" | |
| return result | |
| def _env_snapshot(self, args: dict) -> dict: | |
| result = self.env_manager.save_snapshot(str(args["name"]), str(args["snapshot"])) | |
| result["action"] = "sandbox.env.snapshot" | |
| return result | |
| def _env_restore(self, args: dict) -> dict: | |
| result = self.env_manager.restore_snapshot(str(args["name"]), str(args["snapshot"]), replace=bool(args.get("replace", True))) | |
| result["action"] = "sandbox.env.restore" | |
| return result | |
| def _env_fork(self, args: dict) -> dict: | |
| result = self.env_manager.fork(str(args["source"]), str(args["child"])) | |
| result["action"] = "sandbox.env.fork" | |
| return result | |
| def _env_stop(self, args: dict) -> dict: | |
| result = self.env_manager.stop(str(args["name"]), snapshot=bool(args.get("snapshot", True))) | |
| result["action"] = "sandbox.env.stop" | |
| return result | |
| def _env_dashboard(self, args: dict) -> dict: | |
| return {"ok": True, "action": "sandbox.env.dashboard", "result": self.env_manager.dashboard()} | |
| def _env_resources(self, args: dict) -> dict: | |
| return {"ok": True, "action": "sandbox.env.resources", "result": self.env_manager.resource_report()} | |
| def _fs_write(self, args: dict) -> dict: | |
| if not self.policy.allow_write: | |
| return {"ok": False, "action": "fs.write", "error": "write_disabled"} | |
| content = str(args["content"]) | |
| if len(content.encode("utf-8")) > self.policy.max_write_bytes: | |
| return {"ok": False, "action": "fs.write", "error": "write_too_large"} | |
| result = self.runtime.write_file(str(args["path"]), content) | |
| result["action"] = "fs.write" | |
| return result | |
| def _fs_read(self, args: dict) -> dict: | |
| if not self.policy.allow_read: | |
| return {"ok": False, "action": "fs.read", "error": "read_disabled"} | |
| result = self.runtime.read_file(str(args["path"])) | |
| result["action"] = "fs.read" | |
| return result | |
| def _project_create(self, args: dict) -> dict: | |
| if not self.policy.allow_project_create: | |
| return {"ok": False, "action": "project.create", "error": "project_create_disabled"} | |
| files = args["files"] | |
| if not isinstance(files, dict): | |
| return {"ok": False, "action": "project.create", "error": "files_must_be_object"} | |
| if len(files) > self.policy.max_files_per_project: | |
| return {"ok": False, "action": "project.create", "error": "too_many_files"} | |
| total_bytes = sum(len(str(content).encode("utf-8")) for content in files.values()) | |
| if total_bytes > self.policy.max_write_bytes: | |
| return {"ok": False, "action": "project.create", "error": "project_too_large"} | |
| result = self.runtime.create_project(str(args["name"]), {str(k): str(v) for k, v in files.items()}) | |
| result["action"] = "project.create" | |
| return result | |
| def _cmd_run(self, args: dict) -> dict: | |
| if not self.policy.allow_cmd: | |
| return {"ok": False, "action": "cmd.run", "error": "cmd_disabled"} | |
| argv = args["argv"] | |
| if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv): | |
| return {"ok": False, "action": "cmd.run", "error": "argv_must_be_string_list"} | |
| executable = Path(argv[0]).name.lower() if argv else "" | |
| if executable not in self.policy.command_allowlist: | |
| return {"ok": False, "action": "cmd.run", "error": "command_not_allowlisted"} | |
| result = self.runtime.run_cmd(argv, timeout_s=self.policy.cmd_timeout_s) | |
| result["action"] = "cmd.run" | |
| return result | |
| def _manifest(self, args: dict) -> dict: | |
| return {"ok": True, "action": "core.manifest", "result": self.manifest()} | |
Xet Storage Details
- Size:
- 30.7 kB
- Xet hash:
- 3c76acc155126d1a72eba1a76634d1f3a93fdc7a798194c9ba002f3ef9c5718e
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.