Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-colab-handoff /bundle /model /sandbox_proxy.py
| """Local-only HTTP proxy substrate for Sandbox Tool Core.""" | |
| from __future__ import annotations | |
| from dataclasses import asdict, dataclass | |
| import hashlib | |
| import ipaddress | |
| import json | |
| from pathlib import Path | |
| import time | |
| from typing import Any | |
| from urllib import request | |
| from urllib.parse import urlparse | |
| class SandboxProxyPolicy: | |
| allow_localhost: bool = True | |
| allow_private_lan: bool = True | |
| allow_dot_local: bool = True | |
| allow_public_internet: bool = False | |
| timeout_s: float = 3.0 | |
| max_response_bytes: int = 256_000 | |
| max_snapshot_files: int = 512 | |
| def _sha256_bytes(payload: bytes) -> str: | |
| return hashlib.sha256(payload).hexdigest() | |
| def _host_allowed(host: str, policy: SandboxProxyPolicy) -> tuple[bool, str]: | |
| normalized = host.strip("[]").lower() | |
| if normalized in {"localhost", "127.0.0.1", "::1"}: | |
| return policy.allow_localhost, "localhost" | |
| if normalized.endswith(".local"): | |
| return policy.allow_dot_local, "dot_local" | |
| try: | |
| ip = ipaddress.ip_address(normalized) | |
| except ValueError: | |
| return policy.allow_public_internet, "hostname_public_or_unresolved" | |
| if ip.is_loopback: | |
| return policy.allow_localhost, "localhost_ip" | |
| if ip.is_private: | |
| return policy.allow_private_lan, "private_lan" | |
| return policy.allow_public_internet, "public_ip" | |
| def snapshot_workspace(root: str | Path, max_files: int = 512) -> dict: | |
| base = Path(root).resolve() | |
| files: list[dict] = [] | |
| for path in sorted(p for p in base.rglob("*") if p.is_file()): | |
| if len(files) >= max_files: | |
| break | |
| rel = path.relative_to(base).as_posix() | |
| try: | |
| data = path.read_bytes() | |
| except OSError: | |
| continue | |
| files.append({"path": rel, "bytes": len(data), "sha256": _sha256_bytes(data)}) | |
| manifest_bytes = json.dumps(files, sort_keys=True, separators=(",", ":")).encode("utf-8") | |
| return { | |
| "root": str(base), | |
| "file_count": len(files), | |
| "truncated": len(files) >= max_files, | |
| "sha256": _sha256_bytes(manifest_bytes), | |
| "files": files, | |
| } | |
| class LocalNetworkProxy: | |
| """HTTP client with local/private-network policy and JSONL audit log.""" | |
| def __init__( | |
| self, | |
| root: str | Path, | |
| policy: SandboxProxyPolicy | None = None, | |
| ledger_name: str = "sandbox_proxy_ledger.jsonl", | |
| ): | |
| self.root = Path(root).resolve() | |
| self.root.mkdir(parents=True, exist_ok=True) | |
| self.policy = policy or SandboxProxyPolicy() | |
| self.ledger_path = self.root / ledger_name | |
| def get(self, url: str) -> dict: | |
| return self.request("GET", url) | |
| def post(self, url: str, body: str = "") -> dict: | |
| return self.request("POST", url, body=body) | |
| def request(self, method: str, url: str, body: str | bytes | None = None) -> dict: | |
| started = time.time() | |
| workspace_snapshot = snapshot_workspace(self.root, self.policy.max_snapshot_files) | |
| parsed = urlparse(url) | |
| host = parsed.hostname or "" | |
| scheme_ok = parsed.scheme in {"http", "https"} | |
| host_ok, host_class = _host_allowed(host, self.policy) | |
| allowed = scheme_ok and host_ok | |
| if not allowed: | |
| result = { | |
| "ok": False, | |
| "method": method.upper(), | |
| "url": url, | |
| "host": host, | |
| "host_class": host_class, | |
| "status": None, | |
| "error": "proxy_policy_blocked", | |
| "elapsed_ms": (time.time() - started) * 1000.0, | |
| "workspace_snapshot": workspace_snapshot, | |
| } | |
| self._record(result) | |
| return result | |
| payload = body.encode("utf-8") if isinstance(body, str) else body | |
| req = request.Request(url, data=payload, method=method.upper()) | |
| if payload is not None: | |
| req.add_header("Content-Type", "text/plain; charset=utf-8") | |
| try: | |
| with request.urlopen(req, timeout=self.policy.timeout_s) as resp: | |
| data = resp.read(self.policy.max_response_bytes + 1) | |
| truncated = len(data) > self.policy.max_response_bytes | |
| data = data[: self.policy.max_response_bytes] | |
| text = data.decode("utf-8", errors="replace") | |
| result = { | |
| "ok": True, | |
| "method": method.upper(), | |
| "url": url, | |
| "host": host, | |
| "host_class": host_class, | |
| "status": int(resp.status), | |
| "elapsed_ms": (time.time() - started) * 1000.0, | |
| "response_text": text, | |
| "response_bytes": len(data), | |
| "response_sha256": _sha256_bytes(data), | |
| "truncated": truncated, | |
| "workspace_snapshot": workspace_snapshot, | |
| } | |
| except Exception as exc: | |
| result = { | |
| "ok": False, | |
| "method": method.upper(), | |
| "url": url, | |
| "host": host, | |
| "host_class": host_class, | |
| "status": None, | |
| "error": f"proxy_error:{type(exc).__name__}", | |
| "elapsed_ms": (time.time() - started) * 1000.0, | |
| "workspace_snapshot": workspace_snapshot, | |
| } | |
| self._record(result) | |
| return result | |
| def _record(self, result: dict[str, Any]) -> None: | |
| record = { | |
| "timestamp": time.time(), | |
| "policy": asdict(self.policy), | |
| "destination": { | |
| "method": result.get("method"), | |
| "url": result.get("url"), | |
| "host": result.get("host"), | |
| "host_class": result.get("host_class"), | |
| "status": result.get("status"), | |
| }, | |
| "elapsed_ms": result.get("elapsed_ms"), | |
| "ok": result.get("ok"), | |
| "error": result.get("error"), | |
| "workspace_snapshot_sha256": result.get("workspace_snapshot", {}).get("sha256"), | |
| } | |
| with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f: | |
| f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") | |
Xet Storage Details
- Size:
- 6.29 kB
- Xet hash:
- f2c1c5fdd35bdb329cb7a05dfd1a9bcf83dab5ff9cd8cab4c22104af9d979ff6
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.