bbkdevops's picture
download
raw
6.29 kB
"""Local-only HTTP proxy substrate for Sandbox Tool Core."""
from __future__ import annotations
from dataclasses import asdict, dataclass
import hashlib
import ipaddress
import json
from pathlib import Path
import time
from typing import Any
from urllib import request
from urllib.parse import urlparse
@dataclass(frozen=True)
class SandboxProxyPolicy:
allow_localhost: bool = True
allow_private_lan: bool = True
allow_dot_local: bool = True
allow_public_internet: bool = False
timeout_s: float = 3.0
max_response_bytes: int = 256_000
max_snapshot_files: int = 512
def _sha256_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def _host_allowed(host: str, policy: SandboxProxyPolicy) -> tuple[bool, str]:
normalized = host.strip("[]").lower()
if normalized in {"localhost", "127.0.0.1", "::1"}:
return policy.allow_localhost, "localhost"
if normalized.endswith(".local"):
return policy.allow_dot_local, "dot_local"
try:
ip = ipaddress.ip_address(normalized)
except ValueError:
return policy.allow_public_internet, "hostname_public_or_unresolved"
if ip.is_loopback:
return policy.allow_localhost, "localhost_ip"
if ip.is_private:
return policy.allow_private_lan, "private_lan"
return policy.allow_public_internet, "public_ip"
def snapshot_workspace(root: str | Path, max_files: int = 512) -> dict:
base = Path(root).resolve()
files: list[dict] = []
for path in sorted(p for p in base.rglob("*") if p.is_file()):
if len(files) >= max_files:
break
rel = path.relative_to(base).as_posix()
try:
data = path.read_bytes()
except OSError:
continue
files.append({"path": rel, "bytes": len(data), "sha256": _sha256_bytes(data)})
manifest_bytes = json.dumps(files, sort_keys=True, separators=(",", ":")).encode("utf-8")
return {
"root": str(base),
"file_count": len(files),
"truncated": len(files) >= max_files,
"sha256": _sha256_bytes(manifest_bytes),
"files": files,
}
class LocalNetworkProxy:
"""HTTP client with local/private-network policy and JSONL audit log."""
def __init__(
self,
root: str | Path,
policy: SandboxProxyPolicy | None = None,
ledger_name: str = "sandbox_proxy_ledger.jsonl",
):
self.root = Path(root).resolve()
self.root.mkdir(parents=True, exist_ok=True)
self.policy = policy or SandboxProxyPolicy()
self.ledger_path = self.root / ledger_name
def get(self, url: str) -> dict:
return self.request("GET", url)
def post(self, url: str, body: str = "") -> dict:
return self.request("POST", url, body=body)
def request(self, method: str, url: str, body: str | bytes | None = None) -> dict:
started = time.time()
workspace_snapshot = snapshot_workspace(self.root, self.policy.max_snapshot_files)
parsed = urlparse(url)
host = parsed.hostname or ""
scheme_ok = parsed.scheme in {"http", "https"}
host_ok, host_class = _host_allowed(host, self.policy)
allowed = scheme_ok and host_ok
if not allowed:
result = {
"ok": False,
"method": method.upper(),
"url": url,
"host": host,
"host_class": host_class,
"status": None,
"error": "proxy_policy_blocked",
"elapsed_ms": (time.time() - started) * 1000.0,
"workspace_snapshot": workspace_snapshot,
}
self._record(result)
return result
payload = body.encode("utf-8") if isinstance(body, str) else body
req = request.Request(url, data=payload, method=method.upper())
if payload is not None:
req.add_header("Content-Type", "text/plain; charset=utf-8")
try:
with request.urlopen(req, timeout=self.policy.timeout_s) as resp:
data = resp.read(self.policy.max_response_bytes + 1)
truncated = len(data) > self.policy.max_response_bytes
data = data[: self.policy.max_response_bytes]
text = data.decode("utf-8", errors="replace")
result = {
"ok": True,
"method": method.upper(),
"url": url,
"host": host,
"host_class": host_class,
"status": int(resp.status),
"elapsed_ms": (time.time() - started) * 1000.0,
"response_text": text,
"response_bytes": len(data),
"response_sha256": _sha256_bytes(data),
"truncated": truncated,
"workspace_snapshot": workspace_snapshot,
}
except Exception as exc:
result = {
"ok": False,
"method": method.upper(),
"url": url,
"host": host,
"host_class": host_class,
"status": None,
"error": f"proxy_error:{type(exc).__name__}",
"elapsed_ms": (time.time() - started) * 1000.0,
"workspace_snapshot": workspace_snapshot,
}
self._record(result)
return result
def _record(self, result: dict[str, Any]) -> None:
record = {
"timestamp": time.time(),
"policy": asdict(self.policy),
"destination": {
"method": result.get("method"),
"url": result.get("url"),
"host": result.get("host"),
"host_class": result.get("host_class"),
"status": result.get("status"),
},
"elapsed_ms": result.get("elapsed_ms"),
"ok": result.get("ok"),
"error": result.get("error"),
"workspace_snapshot_sha256": result.get("workspace_snapshot", {}).get("sha256"),
}
with self.ledger_path.open("a", encoding="utf-8", newline="\n") as f:
f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")

Xet Storage Details

Size:
6.29 kB
·
Xet hash:
f2c1c5fdd35bdb329cb7a05dfd1a9bcf83dab5ff9cd8cab4c22104af9d979ff6

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.