bbkdevops's picture
download
raw
30.7 kB
"""Policy-gated Sandbox Tool Core for TinyMind agents.
The core exposes a small, auditable tool registry over SandboxRLRuntime. It is
designed for training/eval evidence: every call has a stable request id, input
hash, output hash, policy snapshot, and pass/fail status.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
import hashlib
import json
from pathlib import Path
import time
from typing import Any, Callable
from model.sandbox_rl import ALLOWED_CMD, SandboxRLRuntime
from model.sandbox_proxy import LocalNetworkProxy, SandboxProxyPolicy, snapshot_workspace
from model.sandbox_environment import SandboxEnvironmentManager, SandboxEnvironmentPolicy
from train.adapter_compactor import choose_lora_rank_plan, compact_lora_adapter
def _canonical_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def _sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
@dataclass(frozen=True)
class SandboxToolPolicy:
allow_cmd: bool = True
allow_write: bool = True
allow_project_create: bool = True
allow_lua: bool = True
allow_read: bool = True
max_write_bytes: int = 1_000_000
max_files_per_project: int = 128
cmd_timeout_s: float = 5.0
command_allowlist: tuple[str, ...] = tuple(sorted(ALLOWED_CMD))
allow_network_proxy: bool = True
allow_public_internet: bool = False
allow_model_weight_ops: bool = True
allow_cognitive_tools: bool = True
network_timeout_s: float = 3.0
max_proxy_response_bytes: int = 256_000
@dataclass(frozen=True)
class SandboxToolSpec:
name: str
description: str
required_args: tuple[str, ...]
mutates_filesystem: bool = False
executes_process: bool = False
def to_dict(self) -> dict:
return asdict(self)
class SandboxToolCore:
"""Single entry point for safe TinyMind tool calls."""
def __init__(
self,
root: str | Path,
policy: SandboxToolPolicy | None = None,
ledger_name: str = "sandbox_tool_core_ledger.jsonl",
):
self.root = Path(root).resolve()
self.root.mkdir(parents=True, exist_ok=True)
self.policy = policy or SandboxToolPolicy()
self.runtime = SandboxRLRuntime(self.root)
self.ledger_path = self.root / ledger_name
self.proxy = LocalNetworkProxy(
self.root,
policy=SandboxProxyPolicy(
allow_public_internet=self.policy.allow_public_internet,
timeout_s=self.policy.network_timeout_s,
max_response_bytes=self.policy.max_proxy_response_bytes,
),
)
self.env_manager = SandboxEnvironmentManager(
self.root / "isolated_envs",
policy=SandboxEnvironmentPolicy(
command_allowlist=self.policy.command_allowlist,
timeout_s=self.policy.cmd_timeout_s,
max_runtime_duration_s=self.policy.cmd_timeout_s,
),
)
self._handlers: dict[str, Callable[[dict], dict]] = {
"lua.eval": self._lua_eval,
"sandbox.run_code": self._run_code,
"sandbox.proxy": self._proxy_request,
"sandbox.env.create": self._env_create,
"sandbox.env.run": self._env_run,
"sandbox.env.run_detached": self._env_run_detached,
"sandbox.env.job_status": self._env_job_status,
"sandbox.env.job_stream": self._env_job_stream,
"sandbox.env.copy_in": self._env_copy_in,
"sandbox.env.copy_out": self._env_copy_out,
"sandbox.env.file_put": self._env_file_put,
"sandbox.env.file_get": self._env_file_get,
"sandbox.env.snapshot": self._env_snapshot,
"sandbox.env.restore": self._env_restore,
"sandbox.env.fork": self._env_fork,
"sandbox.env.stop": self._env_stop,
"sandbox.env.dashboard": self._env_dashboard,
"sandbox.env.resources": self._env_resources,
"fs.write": self._fs_write,
"fs.read": self._fs_read,
"project.create": self._project_create,
"cmd.run": self._cmd_run,
"core.manifest": self._manifest,
}
self._specs = {
"lua.eval": SandboxToolSpec("lua.eval", "Evaluate the safe Lua arithmetic subset.", ("code",)),
"sandbox.run_code": SandboxToolSpec(
"sandbox.run_code",
"Run safe Lua with proxy, Evo, DeepResearch, DeepRL, and Canvas helpers routed through policy gates.",
("code",),
),
"sandbox.proxy": SandboxToolSpec(
"sandbox.proxy",
"Issue a policy-gated HTTP request to localhost/private LAN through the sandbox proxy.",
("method", "url"),
),
"sandbox.env.create": SandboxToolSpec(
"sandbox.env.create",
"Create an isolated filesystem sandbox environment.",
("name",),
True,
),
"sandbox.env.run": SandboxToolSpec(
"sandbox.env.run",
"Run an allowlisted command inside an isolated sandbox environment.",
("name", "argv"),
True,
True,
),
"sandbox.env.run_detached": SandboxToolSpec(
"sandbox.env.run_detached",
"Start an allowlisted command and stream stdout/stderr logs later.",
("name", "argv"),
True,
True,
),
"sandbox.env.job_status": SandboxToolSpec(
"sandbox.env.job_status",
"Return status for a detached sandbox command.",
("job_id",),
),
"sandbox.env.job_stream": SandboxToolSpec(
"sandbox.env.job_stream",
"Read a chunk from a detached command stdout/stderr stream.",
("job_id",),
),
"sandbox.env.copy_in": SandboxToolSpec(
"sandbox.env.copy_in",
"Copy a file or directory from host into an isolated sandbox environment.",
("name", "src", "dest"),
True,
),
"sandbox.env.copy_out": SandboxToolSpec(
"sandbox.env.copy_out",
"Copy a file or directory from an isolated sandbox environment to host.",
("name", "src", "dest"),
),
"sandbox.env.file_put": SandboxToolSpec(
"sandbox.env.file_put",
"Upload text/base64 content directly into a sandbox file.",
("name", "path", "content"),
True,
),
"sandbox.env.file_get": SandboxToolSpec(
"sandbox.env.file_get",
"Download text/base64 content directly from a sandbox file.",
("name", "path"),
),
"sandbox.env.snapshot": SandboxToolSpec(
"sandbox.env.snapshot",
"Save an isolated sandbox environment snapshot for reuse.",
("name", "snapshot"),
True,
),
"sandbox.env.restore": SandboxToolSpec(
"sandbox.env.restore",
"Restore an isolated sandbox environment from a saved snapshot.",
("name", "snapshot"),
True,
),
"sandbox.env.fork": SandboxToolSpec(
"sandbox.env.fork",
"Create a child sandbox from the latest snapshot of another sandbox.",
("source", "child"),
True,
),
"sandbox.env.stop": SandboxToolSpec(
"sandbox.env.stop",
"Stop a sandbox and optionally write an automatic snapshot.",
("name",),
True,
),
"sandbox.env.dashboard": SandboxToolSpec(
"sandbox.env.dashboard",
"Return observability dashboard data for sandboxes, commands, snapshots, and usage.",
(),
),
"sandbox.env.resources": SandboxToolSpec(
"sandbox.env.resources",
"Return sandbox resource accounting and limits.",
(),
),
"fs.write": SandboxToolSpec("fs.write", "Write UTF-8 text inside the sandbox root.", ("path", "content"), True),
"fs.read": SandboxToolSpec("fs.read", "Read UTF-8 text from inside the sandbox root.", ("path",)),
"project.create": SandboxToolSpec(
"project.create",
"Create a project directory from a mapping of relative files.",
("name", "files"),
True,
),
"cmd.run": SandboxToolSpec(
"cmd.run",
"Run an allowlisted command without shell expansion inside the sandbox root.",
("argv",),
False,
True,
),
"core.manifest": SandboxToolSpec("core.manifest", "Return tool specs and active policy.", ()),
}
def manifest(self) -> dict:
return {
"schema_version": "tinymind-sandbox-tool-core-v1",
"root": str(self.root),
"policy": asdict(self.policy),
"tools": {name: spec.to_dict() for name, spec in sorted(self._specs.items())},
"guarantees": {
"path_containment": True,
"shell_expansion": False,
"audit_hashes": True,
"workspace_snapshot_before_run_code": True,
"network_default": "not_exposed",
"proxy_allowed_destinations": "localhost, loopback, RFC1918 private LAN, .local",
"public_internet_default_allowed": self.policy.allow_public_internet,
"isolated_env_snapshots": True,
"resource_accounting": True,
"detached_command_streaming": True,
"file_api": True,
"fork_from_latest_snapshot": True,
"stop_auto_snapshot": True,
"lua_evo_weight_ops": self.policy.allow_model_weight_ops,
"lua_deepresearch_deeprl_canvas_tools": self.policy.allow_cognitive_tools,
"raw_gguf_patch_without_parser": False,
},
}
def call(self, tool: str, args: dict | None = None) -> dict:
args = args or {}
started = time.time()
request_payload = {"tool": tool, "args": args}
input_hash = _sha256_text(_canonical_json(request_payload))
request_id = input_hash[:16]
if tool not in self._handlers:
result = {"ok": False, "action": tool, "error": "unknown_tool"}
else:
missing = [name for name in self._specs[tool].required_args if name not in args]
if missing:
result = {"ok": False, "action": tool, "error": "missing_args", "missing": missing}
else:
try:
result = self._handlers[tool](args)
except Exception as exc: # defensive boundary for agent-facing calls
result = {"ok": False, "action": tool, "error": f"tool_exception:{type(exc).__name__}"}
elapsed_ms = (time.time() - started) * 1000.0
output_hash = _sha256_text(_canonical_json(result))
record = {
"request_id": request_id,
"timestamp": time.time(),
"tool": tool,
"input_sha256": input_hash,
"output_sha256": output_hash,
"elapsed_ms": elapsed_ms,
"policy": asdict(self.policy),
"result": result,
"sandboxed": True,
}
with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f:
f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
response = dict(result)
response.update(
{
"request_id": request_id,
"input_sha256": input_hash,
"output_sha256": output_hash,
"elapsed_ms": elapsed_ms,
"sandboxed": True,
}
)
return response
def _lua_eval(self, args: dict) -> dict:
if not self.policy.allow_lua:
return {"ok": False, "action": "lua.eval", "error": "lua_disabled"}
result = self.runtime.run_lua(str(args["code"]))
result["action"] = "lua.eval"
return result
def _run_code(self, args: dict) -> dict:
if not self.policy.allow_lua:
return {"ok": False, "action": "sandbox.run_code", "error": "lua_disabled"}
before = snapshot_workspace(self.root, max_files=512)
def http_handler(method: str, url: str, body: str | None) -> dict:
if not self.policy.allow_network_proxy:
return {"ok": False, "method": method, "url": url, "error": "sandbox_proxy_disabled"}
return self.proxy.request(method, url, body)
def evo_handler(method: str, values: tuple[Any, ...]) -> Any:
if not self.policy.allow_model_weight_ops:
return {"ok": False, "error": "model_weight_ops_disabled"}
if method == "compact_lora":
adapter, out, target_rank = values
report = compact_lora_adapter(adapter, out, int(target_rank))
return {
"ok": True,
"method": "compact_lora",
"manifest_path": report["manifest_path"],
"output_adapter": report["output_adapter"],
"target_rank": report["target_rank"],
"size": report["size"],
"claim_gate": report["claim_gate"],
}
if method == "choose_lora_rank":
adapter, min_energy_retained = values
plan = choose_lora_rank_plan(adapter, float(min_energy_retained))
plan["ok"] = True
return plan
if method == "compact_lora_auto":
adapter, out, min_energy_retained = values
plan = choose_lora_rank_plan(adapter, float(min_energy_retained))
report = compact_lora_adapter(
adapter,
out,
int(plan["recommended_rank"]),
adaptive_rank_plan=plan,
)
return {
"ok": True,
"method": "compact_lora_auto",
"adaptive_rank": True,
"rank_plan": plan,
"manifest_path": report["manifest_path"],
"output_adapter": report["output_adapter"],
"target_rank": report["target_rank"],
"global_energy_retained_ratio": report["global_energy_retained_ratio"],
"size": report["size"],
"claim_gate": report["claim_gate"],
}
if method == "patch_gguf":
gguf = values[0]
return {
"ok": False,
"method": "patch_gguf",
"gguf": str(gguf),
"error": "raw_gguf_patch_blocked_parser_required",
"safe_path": "Use trained adapter export or a GGUF tensor parser with checksum validation before writing GGUF weights.",
}
return {"ok": False, "error": "unknown_evo_weight_method"}
def tool_handler(method: str, values: tuple[Any, ...]) -> Any:
if not self.policy.allow_cognitive_tools:
return {"ok": False, "error": "cognitive_tools_disabled"}
if method == "deepresearch_plan":
query, depth = values
return self._deepresearch_plan(str(query), str(depth))
if method == "deeprl_choose":
state, actions, rewards = values
return self._deeprl_choose(str(state), str(actions), str(rewards))
if method == "canvas_create":
(title,) = values
return self._canvas_create(str(title))
if method == "canvas_add_node":
canvas, node_id, label, kind = values
return self._canvas_add_node(canvas, str(node_id), str(label), str(kind))
if method == "canvas_link":
canvas, source, target, relation = values
return self._canvas_link(canvas, str(source), str(target), str(relation))
if method == "canvas_snapshot":
(canvas,) = values
return self._canvas_snapshot(canvas)
if method == "tool_introspect":
return {
"ok": True,
"tools": [
"sandbox_deepresearch_plan",
"sandbox_deeprl_choose",
"sandbox_canvas_create",
"sandbox_canvas_add_node",
"sandbox_canvas_link",
"sandbox_canvas_snapshot",
],
"policy": {
"public_internet_allowed": self.policy.allow_public_internet,
"path_containment": True,
"audited": True,
},
}
return {"ok": False, "error": "unknown_cognitive_tool"}
result = self.runtime.run_lua(
str(args["code"]),
http_handler=http_handler,
evo_handler=evo_handler,
tool_handler=tool_handler,
)
result["action"] = "sandbox.run_code"
result["workspace_snapshot_before"] = before
result["proxy_ledger_path"] = str(self.proxy.ledger_path)
return result
def _deepresearch_plan(self, query: str, depth: str) -> dict:
depth = depth.lower().strip() or "standard"
step_count = {"fast": 4, "standard": 6, "deep": 8, "frontier": 10}.get(depth, 6)
base_steps = [
("intent", "Restate the task, required evidence, and forbidden claims."),
("source_map", "List primary sources, local files, benchmarks, and exact artifacts to inspect."),
("retrieve", "Use approved local/proxy retrieval only; public internet remains blocked unless policy changes."),
("triangulate", "Compare at least two independent evidence paths before synthesis."),
("sandbox_verify", "Run commands/tests in isolated sandbox and snapshot state before execution."),
("synthesize", "Write answer with uncertainty, raw evidence paths, and next actions."),
("red_team", "Attack the conclusion for leakage, stale cache, and unsupported world-best claims."),
("regression", "Persist checks so future runs can reproduce the claim gate."),
("handoff", "Emit machine-readable JSON/Markdown evidence for UI and training."),
("self_assess", "Score confidence, evidence coverage, and remaining blockers."),
]
selected = base_steps[:step_count]
fingerprint = _sha256_text(_canonical_json({"query": query, "depth": depth, "steps": selected}))
return {
"ok": True,
"method": "deepresearch_plan",
"query": query,
"depth": depth,
"steps": [{"id": idx + 1, "phase": phase, "action": action} for idx, (phase, action) in enumerate(selected)],
"evidence_policy": {
"exact_artifacts_required": True,
"public_internet_default": self.policy.allow_public_internet,
"claim_gate_required": True,
},
"plan_sha256": fingerprint,
}
def _deeprl_choose(self, state: str, actions: str, rewards: str) -> dict:
action_list = [item.strip() for item in actions.split("|") if item.strip()]
if not action_list:
return {"ok": False, "error": "empty_actions"}
reward_values: list[float] = []
if rewards.strip():
for item in rewards.split("|"):
try:
reward_values.append(float(item.strip()))
except ValueError:
reward_values.append(0.0)
scores = []
for idx, action in enumerate(action_list):
reward = reward_values[idx] if idx < len(reward_values) else 0.0
low = action.lower()
evidence_bonus = 0.0
for marker in ("test", "verify", "evidence", "snapshot", "hash", "audit", "probe", "eval"):
if marker in low:
evidence_bonus += 0.15
risk_penalty = 0.35 if any(marker in low for marker in ("unrestricted", "claim world", "disable gate")) else 0.0
score = reward + evidence_bonus - risk_penalty + min(len(state), 512) / 4096.0
scores.append({"index": idx, "action": action, "score": round(score, 6)})
selected = max(scores, key=lambda row: (row["score"], -row["index"]))
return {
"ok": True,
"method": "deeprl_choose",
"state_sha256": _sha256_text(state),
"scores": scores,
"selected_index": selected["index"],
"selected_action": selected["action"],
"policy": "deterministic_evidence_weighted_bandit_v1",
}
def _canvas_create(self, title: str) -> dict:
return {
"ok": True,
"schema": "tinymind.canvas.v1",
"title": title,
"nodes": [],
"edges": [],
"invariants": ["acyclic_preferred", "evidence_edges_required_for_claims"],
}
def _canvas_add_node(self, canvas: Any, node_id: str, label: str, kind: str) -> dict:
if not isinstance(canvas, dict) or canvas.get("schema") != "tinymind.canvas.v1":
return {"ok": False, "error": "bad_canvas"}
updated = json.loads(json.dumps(canvas, ensure_ascii=False))
if any(node.get("id") == node_id for node in updated.get("nodes", [])):
return {"ok": False, "error": "duplicate_node"}
updated.setdefault("nodes", []).append({"id": node_id, "label": label, "kind": kind})
updated["ok"] = True
return updated
def _canvas_link(self, canvas: Any, source: str, target: str, relation: str) -> dict:
if not isinstance(canvas, dict) or canvas.get("schema") != "tinymind.canvas.v1":
return {"ok": False, "error": "bad_canvas"}
node_ids = {node.get("id") for node in canvas.get("nodes", [])}
if source not in node_ids or target not in node_ids:
return {"ok": False, "error": "missing_node"}
updated = json.loads(json.dumps(canvas, ensure_ascii=False))
updated.setdefault("edges", []).append({"source": source, "target": target, "relation": relation})
updated["ok"] = True
return updated
def _canvas_snapshot(self, canvas: Any) -> dict:
if not isinstance(canvas, dict) or canvas.get("schema") != "tinymind.canvas.v1":
return {"ok": False, "error": "bad_canvas"}
payload = {
"schema": canvas.get("schema"),
"title": canvas.get("title"),
"node_count": len(canvas.get("nodes", [])),
"edge_count": len(canvas.get("edges", [])),
"canvas_sha256": _sha256_text(_canonical_json(canvas)),
}
return {"ok": True, "method": "canvas_snapshot", **payload}
def _proxy_request(self, args: dict) -> dict:
if not self.policy.allow_network_proxy:
return {"ok": False, "action": "sandbox.proxy", "error": "sandbox_proxy_disabled"}
method = str(args["method"]).upper()
if method not in {"GET", "POST"}:
return {"ok": False, "action": "sandbox.proxy", "error": "unsupported_proxy_method"}
result = self.proxy.request(method, str(args["url"]), args.get("body"))
result["action"] = "sandbox.proxy"
return result
def _env_create(self, args: dict) -> dict:
result = self.env_manager.create(str(args["name"]), args.get("from_snapshot"))
result["action"] = "sandbox.env.create"
return result
def _env_run(self, args: dict) -> dict:
argv = args["argv"]
if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv):
return {"ok": False, "action": "sandbox.env.run", "error": "argv_must_be_string_list"}
result = self.env_manager.run(str(args["name"]), argv, timeout_s=args.get("timeout_s"))
result["action"] = "sandbox.env.run"
return result
def _env_run_detached(self, args: dict) -> dict:
argv = args["argv"]
if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv):
return {"ok": False, "action": "sandbox.env.run_detached", "error": "argv_must_be_string_list"}
result = self.env_manager.run_detached(str(args["name"]), argv, timeout_s=args.get("timeout_s"))
result["action"] = "sandbox.env.run_detached"
return result
def _env_job_status(self, args: dict) -> dict:
result = self.env_manager.job_status(str(args["job_id"]))
result["action"] = "sandbox.env.job_status"
return result
def _env_job_stream(self, args: dict) -> dict:
result = self.env_manager.job_stream(
str(args["job_id"]),
stream=str(args.get("stream", "stdout")),
offset=int(args.get("offset", 0)),
max_bytes=int(args.get("max_bytes", 64_000)),
)
result["action"] = "sandbox.env.job_stream"
return result
def _env_copy_in(self, args: dict) -> dict:
result = self.env_manager.copy_in(str(args["name"]), str(args["src"]), str(args["dest"]))
result["action"] = "sandbox.env.copy_in"
return result
def _env_copy_out(self, args: dict) -> dict:
result = self.env_manager.copy_out(str(args["name"]), str(args["src"]), str(args["dest"]))
result["action"] = "sandbox.env.copy_out"
return result
def _env_file_put(self, args: dict) -> dict:
result = self.env_manager.file_put(
str(args["name"]),
str(args["path"]),
str(args["content"]),
encoding=str(args.get("encoding", "utf-8")),
)
result["action"] = "sandbox.env.file_put"
return result
def _env_file_get(self, args: dict) -> dict:
result = self.env_manager.file_get(
str(args["name"]),
str(args["path"]),
encoding=str(args.get("encoding", "utf-8")),
)
result["action"] = "sandbox.env.file_get"
return result
def _env_snapshot(self, args: dict) -> dict:
result = self.env_manager.save_snapshot(str(args["name"]), str(args["snapshot"]))
result["action"] = "sandbox.env.snapshot"
return result
def _env_restore(self, args: dict) -> dict:
result = self.env_manager.restore_snapshot(str(args["name"]), str(args["snapshot"]), replace=bool(args.get("replace", True)))
result["action"] = "sandbox.env.restore"
return result
def _env_fork(self, args: dict) -> dict:
result = self.env_manager.fork(str(args["source"]), str(args["child"]))
result["action"] = "sandbox.env.fork"
return result
def _env_stop(self, args: dict) -> dict:
result = self.env_manager.stop(str(args["name"]), snapshot=bool(args.get("snapshot", True)))
result["action"] = "sandbox.env.stop"
return result
def _env_dashboard(self, args: dict) -> dict:
return {"ok": True, "action": "sandbox.env.dashboard", "result": self.env_manager.dashboard()}
def _env_resources(self, args: dict) -> dict:
return {"ok": True, "action": "sandbox.env.resources", "result": self.env_manager.resource_report()}
def _fs_write(self, args: dict) -> dict:
if not self.policy.allow_write:
return {"ok": False, "action": "fs.write", "error": "write_disabled"}
content = str(args["content"])
if len(content.encode("utf-8")) > self.policy.max_write_bytes:
return {"ok": False, "action": "fs.write", "error": "write_too_large"}
result = self.runtime.write_file(str(args["path"]), content)
result["action"] = "fs.write"
return result
def _fs_read(self, args: dict) -> dict:
if not self.policy.allow_read:
return {"ok": False, "action": "fs.read", "error": "read_disabled"}
result = self.runtime.read_file(str(args["path"]))
result["action"] = "fs.read"
return result
def _project_create(self, args: dict) -> dict:
if not self.policy.allow_project_create:
return {"ok": False, "action": "project.create", "error": "project_create_disabled"}
files = args["files"]
if not isinstance(files, dict):
return {"ok": False, "action": "project.create", "error": "files_must_be_object"}
if len(files) > self.policy.max_files_per_project:
return {"ok": False, "action": "project.create", "error": "too_many_files"}
total_bytes = sum(len(str(content).encode("utf-8")) for content in files.values())
if total_bytes > self.policy.max_write_bytes:
return {"ok": False, "action": "project.create", "error": "project_too_large"}
result = self.runtime.create_project(str(args["name"]), {str(k): str(v) for k, v in files.items()})
result["action"] = "project.create"
return result
def _cmd_run(self, args: dict) -> dict:
if not self.policy.allow_cmd:
return {"ok": False, "action": "cmd.run", "error": "cmd_disabled"}
argv = args["argv"]
if not isinstance(argv, list) or not all(isinstance(part, str) for part in argv):
return {"ok": False, "action": "cmd.run", "error": "argv_must_be_string_list"}
executable = Path(argv[0]).name.lower() if argv else ""
if executable not in self.policy.command_allowlist:
return {"ok": False, "action": "cmd.run", "error": "command_not_allowlisted"}
result = self.runtime.run_cmd(argv, timeout_s=self.policy.cmd_timeout_s)
result["action"] = "cmd.run"
return result
def _manifest(self, args: dict) -> dict:
return {"ok": True, "action": "core.manifest", "result": self.manifest()}

Xet Storage Details

Size:
30.7 kB
·
Xet hash:
3c76acc155126d1a72eba1a76634d1f3a93fdc7a798194c9ba002f3ef9c5718e

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.