linvest21's picture
download
raw
19.8 kB
from __future__ import annotations
import hashlib
import json
import re
import shutil
import urllib.request
import os
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from n21.config import load_structured, write_json
from n21.settings import CONFIG_ROOT, REPO_ROOT
from observability.audit_log import utc_now
from data_pipeline.source_normalization import normalize_source
from data_pipeline.source_quality_certifier import certify_normalized_source_content
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for block in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(block)
return digest.hexdigest()
def safe_name(value: str, fallback: str = "source") -> str:
name = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._")
return name[:120] or fallback
def load_catalog(path: Path | None = None) -> dict[str, Any]:
catalog_path = path or (CONFIG_ROOT / "data" / "public_source_catalog.json")
return json.loads(catalog_path.read_text(encoding="utf-8"))
def load_policy(path: Path | None = None) -> dict[str, Any]:
return load_structured(path or (CONFIG_ROOT / "data" / "source_policy.yaml"))
def license_status(text: str, url: str = "") -> str:
haystack = f"{text}\n{url}".lower()
if any(term in haystack for term in ["login", "signin", "checkout", "paywall", "subscription required"]):
return "credentialed"
if "no derivatives" in haystack or "nd license" in haystack:
return "no_derivatives"
if "non-commercial" in haystack or "noncommercial" in haystack or "cc-by-nc" in haystack:
return "non_commercial"
if "do not download" in haystack or "no downloading" in haystack or "download is prohibited" in haystack:
return "restricted_download"
if "public domain" in haystack:
return "public_domain"
if "government public" in haystack or "sec.gov" in haystack or "federalreserve.gov" in haystack or ".gov/" in haystack:
return "government_public"
if "creative commons attribution" in haystack or "cc-by" in haystack or "apache license" in haystack or "mit license" in haystack:
return "permissive"
if "copyright" in haystack or "all rights reserved" in haystack:
return "copyright_notice"
return "unknown"
def source_gate(status: str, policy: dict[str, Any]) -> dict[str, Any]:
approved = set(policy.get("approved_license_statuses", []))
review = set(policy.get("review_license_statuses", []))
blocked = set(policy.get("blocked_license_statuses", []))
if status in approved:
decision = "approved_for_training"
train_allowed = True
elif status in blocked:
decision = "blocked"
train_allowed = False
elif status in review:
decision = "downloaded_for_review_not_training"
train_allowed = False
else:
decision = "downloaded_for_review_not_training"
train_allowed = False
return {
"license_status": status,
"decision": decision,
"train_allowed": train_allowed,
"requires_human_review": not train_allowed,
}
def apply_ai_certification_gate(record: dict[str, Any], gate: dict[str, Any], policy: dict[str, Any]) -> dict[str, Any]:
certification = record.get("ai_certification") or {}
if not policy.get("source_quality_certification", {}).get("required_before_download", True):
gate["verification_allowed"] = bool(gate.get("train_allowed"))
return gate
if not certification:
gate.update(
{
"decision": "blocked_missing_ai_source_certification",
"train_allowed": False,
"verification_allowed": False,
"requires_human_review": True,
}
)
return gate
training_eligible = bool(certification.get("training_eligible"))
verification_eligible = bool(certification.get("verification_eligible"))
if gate.get("train_allowed") and training_eligible:
gate["verification_allowed"] = verification_eligible
return gate
if gate.get("train_allowed") and verification_eligible:
gate.update(
{
"decision": "downloaded_for_verification_not_training",
"train_allowed": False,
"verification_allowed": True,
"requires_human_review": False,
}
)
return gate
gate.update(
{
"decision": "blocked_ai_certification_not_training_or_verification",
"train_allowed": False,
"verification_allowed": False,
"requires_human_review": True,
}
)
return gate
def _download(url: str, output_path: Path, *, max_bytes: int) -> tuple[int, str | None]:
parsed = urlparse(url)
if parsed.scheme == "file":
source_path = Path(urllib.request.url2pathname(parsed.path))
if source_path.stat().st_size > max_bytes:
raise ValueError(f"source exceeds max_download_bytes: {source_path}")
shutil.copyfile(source_path, output_path)
return output_path.stat().st_size, None
user_agent = os.environ.get(
"SHFT_HTTP_USER_AGENT",
"Linvest21-SHFT-source-intake/1.0 (public-source transparency manifest; contact=linvest21)",
)
request = urllib.request.Request(
url,
headers={
"User-Agent": user_agent,
"Accept": "text/html,application/pdf,text/plain,application/json,*/*;q=0.8",
},
)
with urllib.request.urlopen(request, timeout=30) as response:
content = response.read(max_bytes + 1)
if len(content) > max_bytes:
raise ValueError(f"download exceeds max_download_bytes: {url}")
output_path.write_bytes(content)
content_type = response.headers.get("content-type")
return output_path.stat().st_size, content_type
def select_sources(
*,
asset_class: str,
role: str,
catalog: dict[str, Any],
max_sources: int,
) -> list[dict[str, Any]]:
selected: list[dict[str, Any]] = []
seen_urls: set[str] = set()
for source in catalog.get("sources", []):
source_asset = source.get("asset_class")
source_role = source.get("role")
if source_asset not in {asset_class, "all"}:
continue
if source_role not in {role, "all"}:
continue
url_key = str(source.get("url", "")).strip().lower()
if url_key and url_key in seen_urls:
continue
if url_key:
seen_urls.add(url_key)
selected.append(source)
return selected
def _existing_training_urls(training_dir: Path) -> set[str]:
urls: set[str] = set()
if not training_dir.exists():
return urls
for path in training_dir.glob("*.normalized.json"):
try:
item = json.loads(path.read_text(encoding="utf-8-sig"))
except (OSError, json.JSONDecodeError):
continue
url = str(item.get("source_url") or "").strip().lower()
if url:
urls.add(url)
return urls
def intake_public_sources(
*,
asset_class: str,
role: str,
catalog_path: Path | None = None,
policy_path: Path | None = None,
max_sources: int | None = None,
promote_approved: bool | None = None,
intake_root: Path | None = None,
training_root: Path | None = None,
quality_errors: list[str] | None = None,
) -> dict[str, Any]:
policy = dict(load_policy(policy_path))
policy["_quality_errors"] = list(quality_errors or [])
catalog = load_catalog(catalog_path)
max_sources = int(max_sources or policy.get("stall_breakout", {}).get("max_new_sources_per_round", 10))
auto_download = bool(policy.get("auto_download_public_sources", True))
promote_approved = bool(
policy.get("promote_approved_sources_to_training", True) if promote_approved is None else promote_approved
)
allowed_schemes = set(policy.get("source_risk", {}).get("allowed_schemes", ["https"]))
disallowed_hints = [str(item).lower() for item in policy.get("source_risk", {}).get("disallowed_url_hints", [])]
max_bytes = int(policy.get("source_risk", {}).get("max_download_bytes", 50_000_000))
intake_dir = (intake_root or (REPO_ROOT / "data" / "learning_intake")) / asset_class / role
raw_dir = intake_dir / "raw"
normalized_dir = intake_dir / "normalized"
approved_dir = intake_dir / "approved_for_training"
validation_dir = intake_dir / "approved_for_validation"
review_dir = intake_dir / "review_required"
raw_dir.mkdir(parents=True, exist_ok=True)
normalized_dir.mkdir(parents=True, exist_ok=True)
approved_dir.mkdir(parents=True, exist_ok=True)
validation_dir.mkdir(parents=True, exist_ok=True)
review_dir.mkdir(parents=True, exist_ok=True)
training_dir = (training_root or (REPO_ROOT / "data" / "learning")) / asset_class / role
existing_training_urls = _existing_training_urls(training_dir)
records: list[dict[str, Any]] = []
for index, source in enumerate(select_sources(asset_class=asset_class, role=role, catalog=catalog, max_sources=max_sources), start=1):
url = str(source.get("url", ""))
parsed = urlparse(url)
title = str(source.get("title") or f"source_{index}")
record: dict[str, Any] = {
"asset_class": asset_class,
"role": role,
"title": title,
"url": url,
"source_type": source.get("source_type", "unknown"),
"rationale": source.get("rationale"),
"license_hint": source.get("license_hint", ""),
"ai_certification": source.get("ai_certification"),
"retrieved_at": utc_now(),
}
url_key = url.strip().lower()
if url_key and url_key in existing_training_urls:
record.update(
{
"downloaded": False,
"decision": "already_promoted_to_training",
"train_allowed": False,
"requires_human_review": False,
"already_in_training": True,
}
)
records.append(record)
continue
if not auto_download:
record.update(
{
"downloaded": False,
"decision": "download_skipped_by_policy",
"train_allowed": False,
"requires_human_review": True,
}
)
records.append(record)
continue
if parsed.scheme not in allowed_schemes:
status = "blocked_scheme"
record.update({"downloaded": False, "error": f"scheme not allowed: {parsed.scheme}", **source_gate(status, policy)})
records.append(record)
continue
if any(hint in url.lower() for hint in disallowed_hints):
status = "credentialed"
record.update({"downloaded": False, "error": "url contains disallowed access hint", **source_gate(status, policy)})
records.append(record)
continue
suffix = Path(parsed.path).suffix or ".html"
filename = f"{index:03d}_{safe_name(title)}{suffix}"
raw_path = raw_dir / filename
try:
byte_count, content_type = _download(url, raw_path, max_bytes=max_bytes)
status = license_status(str(source.get("license_hint", "")), url)
gate = apply_ai_certification_gate(record, source_gate(status, policy), policy)
record.update(
{
"downloaded": True,
"raw_path": str(raw_path),
"sha256": sha256_file(raw_path),
"bytes": byte_count,
"content_type": content_type,
**gate,
}
)
if gate["train_allowed"] or gate.get("verification_allowed"):
normalized = normalize_source(raw_path=raw_path, record=record, policy=policy, output_dir=normalized_dir)
record["normalized"] = {
"path": normalized.get("normalized_path"),
"format": normalized.get("format"),
"ok": normalized.get("ok"),
"quality": normalized.get("quality", {}),
"eligibility": normalized.get("eligibility", {}),
}
normalized_path = Path(str(normalized.get("normalized_path"))) if normalized.get("normalized_path") else None
content_certification = None
if normalized.get("ok") and normalized.get("text"):
content_certification = certify_normalized_source_content(
asset_class=asset_class,
role=role,
title=title,
url=url,
source_type=str(record.get("source_type") or ""),
text=str(normalized.get("text") or ""),
quality_errors=policy.get("_quality_errors", []),
policy=policy,
)
record["content_ai_certification"] = content_certification
normalized["content_ai_certification"] = content_certification
normalized.setdefault("quality", {})["content_ai_score"] = content_certification.get("score")
normalized.setdefault("quality", {})["content_reasoning_density"] = content_certification.get("metrics", {}).get(
"reasoning_density"
)
if content_certification.get("training_eligible") is not True:
normalized.setdefault("eligibility", {})["training"] = False
normalized.setdefault("eligibility", {}).setdefault("reasons", []).append(
f"content_ai_not_training:{content_certification.get('intended_use')}"
)
if content_certification.get("verification_eligible") is not True and content_certification.get("training_eligible") is not True:
normalized.setdefault("eligibility", {})["validation"] = False
normalized.setdefault("eligibility", {}).setdefault("reasons", []).append(
f"content_ai_not_validation:{content_certification.get('intended_use')}"
)
if normalized_path:
write_json(normalized_path, normalized)
record["normalized"] = {
"path": normalized.get("normalized_path"),
"format": normalized.get("format"),
"ok": normalized.get("ok"),
"quality": normalized.get("quality", {}),
"eligibility": normalized.get("eligibility", {}),
}
if normalized_path and gate.get("train_allowed") and normalized.get("eligibility", {}).get("training"):
approved_path = approved_dir / normalized_path.name
shutil.copyfile(normalized_path, approved_path)
record["approved_copy_path"] = str(approved_path)
if normalized_path and gate.get("verification_allowed") and normalized.get("eligibility", {}).get("validation"):
validation_path = validation_dir / normalized_path.name
shutil.copyfile(normalized_path, validation_path)
record["validation_copy_path"] = str(validation_path)
if promote_approved and normalized_path and gate.get("train_allowed") and normalized.get("eligibility", {}).get("training"):
training_dir.mkdir(parents=True, exist_ok=True)
training_path = training_dir / normalized_path.name
shutil.copyfile(normalized_path, training_path)
record["training_path"] = str(training_path)
else:
review_path = review_dir / filename
shutil.copyfile(raw_path, review_path)
record["review_copy_path"] = str(review_path)
except HTTPError as exc:
record.update(
{
"downloaded": False,
"decision": "download_failed",
"train_allowed": False,
"error": str(exc),
"http_status": exc.code,
"failure_class": "http_error",
}
)
except URLError as exc:
record.update(
{
"downloaded": False,
"decision": "download_failed",
"train_allowed": False,
"error": str(exc),
"failure_class": "url_error",
}
)
except Exception as exc:
record.update({"downloaded": False, "decision": "download_failed", "train_allowed": False, "error": str(exc)})
records.append(record)
trainable_count = sum(1 for item in records if item.get("train_allowed") and item.get("training_path"))
if trainable_count >= max_sources:
break
manifest = {
"schema_version": "source_intake_manifest_v1",
"asset_class": asset_class,
"role": role,
"policy": policy,
"catalog_path": str(catalog_path or (CONFIG_ROOT / "data" / "public_source_catalog.json")),
"intake_dir": str(intake_dir),
"training_dir": str(training_dir),
"source_count": len(records),
"downloaded_count": sum(1 for item in records if item.get("downloaded")),
"approved_for_training_count": sum(1 for item in records if item.get("train_allowed")),
"approved_for_verification_count": sum(1 for item in records if item.get("verification_allowed")),
"ai_training_certified_count": sum(1 for item in records if (item.get("ai_certification") or {}).get("training_eligible")),
"ai_verification_certified_count": sum(
1 for item in records if (item.get("ai_certification") or {}).get("verification_eligible")
),
"ai_rejected_count": sum(
1 for item in records if (item.get("ai_certification") or {}).get("intended_use") == "reject"
),
"content_ai_training_certified_count": sum(
1 for item in records if (item.get("content_ai_certification") or {}).get("training_eligible")
),
"content_ai_verification_certified_count": sum(
1 for item in records if (item.get("content_ai_certification") or {}).get("verification_eligible")
),
"content_ai_rejected_count": sum(
1 for item in records if (item.get("content_ai_certification") or {}).get("intended_use") == "reject"
),
"normalized_count": sum(1 for item in records if item.get("normalized", {}).get("path")),
"training_eligible_count": sum(1 for item in records if item.get("train_allowed") and item.get("training_path")),
"validation_eligible_count": sum(1 for item in records if item.get("verification_allowed") and item.get("validation_copy_path")),
"review_required_count": sum(1 for item in records if item.get("requires_human_review")),
"records": records,
"created_at": utc_now(),
}
write_json(intake_dir / "source_intake_manifest.json", manifest)
write_json(intake_dir / "license_manifest.json", {"records": records, "created_at": utc_now()})
return manifest

Xet Storage Details

Size:
19.8 kB
·
Xet hash:
3462823560f504d813b13493ec5be5d470b3e4965ea695abbe705b032b9a5ce9

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.