bbkdevops's picture
download
raw
7.04 kB
"""Extract greedy/bloated records before high-purity training.
"Greedy" records consume token budget without adding proportional knowledge:
repetition, prompt stuffing, secrets, overlong low-density text, or a dominant
domain overeating the mix.
"""
from __future__ import annotations
from collections import Counter
import json
from hashlib import sha256
from pathlib import Path
import re
from typing import Any, Iterable
SECRET_RE = re.compile(r"\b(?:hf|sk|sk-or|ghp|glpat)_[A-Za-z0-9_\-]{20,}\b")
PROMPT_STUFFING_MARKERS = (
"ignore previous instruction",
"ignore all previous",
"developer mode",
"jailbreak",
"system prompt",
)
def _record_text(record: dict[str, Any]) -> str:
if isinstance(record.get("messages"), list):
parts = []
for message in record["messages"]:
if isinstance(message, dict):
parts.append(str(message.get("content", "")))
return "\n".join(parts)
return str(
record.get("text")
or record.get("prompt")
or record.get("instruction")
or record.get("question")
or ""
) + "\n" + str(record.get("completion") or record.get("answer") or record.get("response") or record.get("output") or "")
def _domain(record: dict[str, Any]) -> str:
return str(record.get("domain") or record.get("category") or "general")
class DataGreedExtractor:
def __init__(
self,
*,
max_chars: int = 12_000,
max_domain_share: float = 0.35,
min_unique_ratio: float = 0.32,
min_chars: int = 32,
):
self.max_chars = int(max_chars)
self.max_domain_share = float(max_domain_share)
self.min_unique_ratio = float(min_unique_ratio)
self.min_chars = int(min_chars)
def filter(self, sources: Iterable[str | Path], out_dir: str | Path) -> dict[str, Any]:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
pure_path = out / "anti_greed_pure.jsonl"
greed_path = out / "greed_quarantine.jsonl"
manifest_path = out / "data_greed_manifest.json"
pure_rows: list[dict[str, Any]] = []
greed_rows: list[dict[str, Any]] = []
greed_counts: Counter[str] = Counter()
domain_counts: Counter[str] = Counter()
seen_hashes: set[str] = set()
scanned = 0
for source in [Path(path) for path in sources]:
if not source.exists():
greed_counts["missing_source"] += 1
continue
for line_no, line in enumerate(source.read_text(encoding="utf-8", errors="ignore").splitlines(), start=1):
if not line.strip():
continue
scanned += 1
try:
record = json.loads(line)
except json.JSONDecodeError:
greed_counts["invalid_json"] += 1
continue
if not isinstance(record, dict):
record = {"text": str(record)}
text = _record_text(record)
digest = sha256(text.encode("utf-8", errors="ignore")).hexdigest()
reason = self._greed_reason(text)
if not reason and digest in seen_hashes:
reason = "duplicate_memory"
if not reason and self._domain_overeats(_domain(record), domain_counts, len(pure_rows) + 1):
reason = "domain_overeating"
if reason:
greed_counts[reason] += 1
greed_rows.append(
{
"reason": reason,
"domain": _domain(record),
"source_file": str(source),
"line": line_no,
"sha256": digest,
"preview": text[:280],
}
)
continue
seen_hashes.add(digest)
domain_counts[_domain(record)] += 1
pure_rows.append(record)
pure_path.write_text(
"\n".join(json.dumps(row, ensure_ascii=False, sort_keys=True) for row in pure_rows) + ("\n" if pure_rows else ""),
encoding="utf-8",
)
greed_path.write_text(
"\n".join(json.dumps(row, ensure_ascii=False, sort_keys=True) for row in greed_rows) + ("\n" if greed_rows else ""),
encoding="utf-8",
)
report = {
"schema_version": "tinymind-data-greed-extractor-v1",
"sources": [str(Path(path)) for path in sources],
"scanned_records": scanned,
"kept_records": len(pure_rows),
"greedy_records": len(greed_rows),
"greed_counts": dict(sorted(greed_counts.items())),
"domain_counts": dict(sorted(domain_counts.items())),
"pure_output_jsonl": str(pure_path),
"greed_quarantine_jsonl": str(greed_path),
"manifest_path": str(manifest_path),
"claim_gate": {
"anti_greed_filter_applied": True,
"pure_training_input_ready": bool(pure_rows),
"greed_quarantine_not_trainable": True,
"raw_memory_replay_allowed": False,
"world_best_claim_allowed": False,
},
}
manifest_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return report
def _greed_reason(self, text: str) -> str | None:
stripped = text.strip()
lowered = " ".join(stripped.lower().split())
if SECRET_RE.search(stripped):
return "secret_like"
if any(marker in lowered for marker in PROMPT_STUFFING_MARKERS):
return "prompt_stuffing"
if len(stripped) < self.min_chars:
return "too_small_to_extract"
if self._unique_ratio(stripped) < self.min_unique_ratio:
return "repetition_bloat"
if len(stripped) > self.max_chars and self._unique_ratio(stripped) < 0.55:
return "long_low_density_bloat"
if self._list_bloat(stripped):
return "list_bloat"
return None
def _domain_overeats(self, domain: str, domain_counts: Counter[str], next_total: int) -> bool:
next_count = domain_counts[domain] + 1
return (next_count / next_total) > self.max_domain_share and next_count > 1
@staticmethod
def _unique_ratio(text: str) -> float:
words = re.findall(r"[\wก-๙-]+", text.lower())
if not words:
return 0.0
return len(set(words)) / len(words)
@staticmethod
def _list_bloat(text: str) -> bool:
lines = [line.strip() for line in text.splitlines() if line.strip()]
if len(lines) < 24:
return False
bullet_lines = sum(1 for line in lines if re.match(r"^[-*•\d]+[.)]?\s+", line))
return bullet_lines / max(len(lines), 1) > 0.75

Xet Storage Details

Size:
7.04 kB
·
Xet hash:
eca1170ced4f9c12c62c10d3d92871aa397e9daa0a9a7ff83f5f5623a2c4cdb6

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.