bbkdevops's picture
download
raw
9.09 kB
#!/usr/bin/env python
from __future__ import annotations
import hashlib
import json
from itertools import product
from pathlib import Path
from typing import Any
ROOT = Path(r"D:\ad\tinymind\data\distill")
SOURCE = ROOT / "jsonl" / "apexdistill_gold.jsonl"
REGISTRY = ROOT / "manifests" / "dimension_registry_10000.json"
VECTORS = ROOT / "jsonl" / "dimension_vectors.jsonl"
ENRICHED = ROOT / "jsonl" / "apexdistill_gold_10000d.jsonl"
AUDIT = ROOT / "manifests" / "dimension_audit.json"
DOMAINS = [
"tool_use",
"data_engineering",
"reverse_engineering",
"tool_calling",
"thai_data",
"systems_performance",
"windows_reliability",
"agent_architecture",
"software_engineering",
"evaluation",
]
LENSES = ["architect", "operator", "critic", "security", "teacher", "evaluator", "base"]
PRESSURES = ["clean", "ambiguous", "overreach", "unsafe_pressure", "resource_limited", "base"]
AXES = ["precision", "adversarial", "thai", "tool_schema", "verification", "compression", "uncertainty", "rollback", "edge_cases", "curriculum", "base"]
TOOL_CLASSES = ["filesystem", "shell", "git", "dataset", "web", "confirm", "none"]
RISK_CLASSES = ["low", "medium", "high", "license", "privacy", "admin", "destructive", "unknown"]
FAILURE_MODES = ["hallucination", "bad_schema", "unsafe_action", "missing_tool", "bad_path", "permission_denied", "license_violation", "overclaim", "underverify", "none"]
EVIDENCE_TYPES = ["command_output", "official_doc", "manifest", "hash", "schema", "event_log", "source_registry", "static_analysis", "none"]
VERIFICATION_TYPES = ["schema_check", "hash_check", "read_only_command", "unit_test", "audit_manifest", "official_source", "manual_review", "none"]
OUTPUT_FORMS = ["jsonl", "markdown", "powershell", "tool_call", "report", "rubric", "plan", "summary"]
DEPTH_LEVELS = [f"d{i}" for i in range(1, 21)]
def stable_id(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
def make_dimension(index: int, category: str, values: dict[str, str]) -> dict[str, Any]:
key = "|".join([category] + [f"{k}={v}" for k, v in sorted(values.items())])
return {
"index": index,
"id": f"dim_{index:05d}_{stable_id(key)}",
"category": category,
"values": values,
"description": key,
}
def build_registry() -> list[dict[str, Any]]:
dims: list[dict[str, Any]] = []
def add(category: str, values: dict[str, str]) -> None:
if len(dims) < 10_000:
dims.append(make_dimension(len(dims), category, values))
for domain, axis, lens, pressure in product(DOMAINS, AXES, LENSES, PRESSURES):
add("core_context", {"domain": domain, "axis": axis, "lens": lens, "pressure": pressure})
for domain, tool, risk, failure in product(DOMAINS, TOOL_CLASSES, RISK_CLASSES, FAILURE_MODES):
add("tool_risk_failure", {"domain": domain, "tool": tool, "risk": risk, "failure": failure})
for evidence, verify, output, depth in product(EVIDENCE_TYPES, VERIFICATION_TYPES, OUTPUT_FORMS, DEPTH_LEVELS):
add("evidence_verification_output", {"evidence": evidence, "verification": verify, "output": output, "depth": depth})
for domain, evidence, verify in product(DOMAINS, EVIDENCE_TYPES, VERIFICATION_TYPES):
add("domain_evidence_verification", {"domain": domain, "evidence": evidence, "verification": verify})
if len(dims) != 10_000:
raise RuntimeError(f"Expected 10000 dimensions, got {len(dims)}")
return dims
def infer_tool_classes(item: dict[str, Any]) -> set[str]:
text = json.dumps(item, ensure_ascii=False).lower()
found = set()
mapping = {
"filesystem": ["filesystem.", "file", "path"],
"shell": ["powershell", "shell.", "powercfg", "get-ciminstance"],
"git": ["git."],
"dataset": ["jsonl", "dataset", "audit_jsonl"],
"web": ["web.", "official_doc", "url"],
"confirm": ["user.confirm", "confirmation", "approval"],
}
for key, needles in mapping.items():
if any(needle in text for needle in needles):
found.add(key)
return found or {"none"}
def infer_values(item: dict[str, Any]) -> dict[str, set[str]]:
inputs = item.get("inputs", {})
quality = item.get("quality", {})
synthesis = item.get("synthesis", {})
text = json.dumps(item, ensure_ascii=False).lower()
risk = str(inputs.get("risk", item.get("risk", "low"))).lower()
if "privacy" in text:
risk = "privacy"
elif "license" in text:
risk = "license"
elif "admin" in text:
risk = "admin"
elif risk not in RISK_CLASSES:
risk = "unknown"
failures = set()
for failure in FAILURE_MODES:
if failure != "none" and failure.replace("_", " ") in text:
failures.add(failure)
if not failures:
failures.add("none")
evidence = set()
for ev in EVIDENCE_TYPES:
if ev != "none" and ev.replace("_", " ") in text:
evidence.add(ev)
if "manifest" in text:
evidence.add("manifest")
if "schema" in text:
evidence.add("schema")
if not evidence:
evidence.add("none")
verification = set()
for vt in VERIFICATION_TYPES:
if vt != "none" and vt.replace("_", " ") in text:
verification.add(vt)
if item.get("verification", {}).get("passes") is not None:
verification.add("schema_check")
if not verification:
verification.add("none")
output = set()
if synthesis.get("recommended_tool_calls"):
output.add("tool_call")
for form in OUTPUT_FORMS:
if form in text:
output.add(form)
if not output:
output.add("summary")
score = float(quality.get("score", 0.0))
depth_idx = min(max(int(score * 20), 1), 20)
return {
"domain": {item.get("domain", "evaluation")},
"axis": {inputs.get("distill_axis", "base")},
"lens": {inputs.get("lens", "base")},
"pressure": {inputs.get("pressure", "base")},
"tool": infer_tool_classes(item),
"risk": {risk},
"failure": failures,
"evidence": evidence,
"verification": verification,
"output": output,
"depth": {f"d{depth_idx}"},
}
def encode(item: dict[str, Any], dims: list[dict[str, Any]]) -> dict[str, float]:
values = infer_values(item)
sparse: dict[str, float] = {}
for dim in dims:
dim_values = dim["values"]
matched = True
for key, val in dim_values.items():
if val not in values.get(key, set()):
matched = False
break
if matched:
sparse[str(dim["index"])] = 1.0
# Guarantee a non-empty vector by anchoring domain base dimensions.
if not sparse:
for dim in dims:
if dim["category"] == "core_context" and dim["values"].get("domain") == item.get("domain") and dim["values"].get("axis") == "base":
sparse[str(dim["index"])] = 1.0
break
return sparse
def main() -> int:
dims = build_registry()
REGISTRY.write_text(json.dumps({"dimensions": dims}, indent=2, ensure_ascii=False), encoding="utf-8")
records = [json.loads(line) for line in SOURCE.read_text(encoding="utf-8").splitlines() if line.strip()]
active_counts = []
used_dims = set()
with VECTORS.open("w", encoding="utf-8") as vf, ENRICHED.open("w", encoding="utf-8") as ef:
for item in records:
vector = encode(item, dims)
active_counts.append(len(vector))
used_dims.update(vector.keys())
vector_record = {
"id": item["id"],
"dimension_count": 10_000,
"active_dimensions": vector,
}
enriched = {
**item,
"dimension_vector": {
"dimension_count": 10_000,
"active_dimensions": vector,
},
}
vf.write(json.dumps(vector_record, ensure_ascii=False) + "\n")
ef.write(json.dumps(enriched, ensure_ascii=False) + "\n")
audit = {
"dimension_count": len(dims),
"records": len(records),
"used_dimension_count": len(used_dims),
"avg_active_dimensions": round(sum(active_counts) / max(len(active_counts), 1), 3),
"min_active_dimensions": min(active_counts) if active_counts else 0,
"max_active_dimensions": max(active_counts) if active_counts else 0,
"registry": str(REGISTRY),
"vectors": str(VECTORS),
"enriched": str(ENRICHED),
"registry_sha256": hashlib.sha256(REGISTRY.read_bytes()).hexdigest(),
"vectors_sha256": hashlib.sha256(VECTORS.read_bytes()).hexdigest(),
"enriched_sha256": hashlib.sha256(ENRICHED.read_bytes()).hexdigest(),
}
AUDIT.write_text(json.dumps(audit, indent=2, ensure_ascii=False), encoding="utf-8")
print(json.dumps(audit, indent=2, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())

Xet Storage Details

Size:
9.09 kB
·
Xet hash:
6a2d46440c52502d3353b94034c7c72f06450bde057bd6fa76ae6dadf84d6a8c

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.