Buckets:
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import re | |
| import shutil | |
| import urllib.request | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib.error import HTTPError, URLError | |
| from urllib.parse import urlparse | |
| from n21.config import load_structured, write_json | |
| from n21.settings import CONFIG_ROOT, REPO_ROOT | |
| from observability.audit_log import utc_now | |
| from data_pipeline.source_normalization import normalize_source | |
| from data_pipeline.source_quality_certifier import certify_normalized_source_content | |
| def sha256_file(path: Path) -> str: | |
| digest = hashlib.sha256() | |
| with path.open("rb") as handle: | |
| for block in iter(lambda: handle.read(1024 * 1024), b""): | |
| digest.update(block) | |
| return digest.hexdigest() | |
| def safe_name(value: str, fallback: str = "source") -> str: | |
| name = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._") | |
| return name[:120] or fallback | |
| def load_catalog(path: Path | None = None) -> dict[str, Any]: | |
| catalog_path = path or (CONFIG_ROOT / "data" / "public_source_catalog.json") | |
| return json.loads(catalog_path.read_text(encoding="utf-8")) | |
| def load_policy(path: Path | None = None) -> dict[str, Any]: | |
| return load_structured(path or (CONFIG_ROOT / "data" / "source_policy.yaml")) | |
| def license_status(text: str, url: str = "") -> str: | |
| haystack = f"{text}\n{url}".lower() | |
| if any(term in haystack for term in ["login", "signin", "checkout", "paywall", "subscription required"]): | |
| return "credentialed" | |
| if "no derivatives" in haystack or "nd license" in haystack: | |
| return "no_derivatives" | |
| if "non-commercial" in haystack or "noncommercial" in haystack or "cc-by-nc" in haystack: | |
| return "non_commercial" | |
| if "do not download" in haystack or "no downloading" in haystack or "download is prohibited" in haystack: | |
| return "restricted_download" | |
| if "public domain" in haystack: | |
| return "public_domain" | |
| if "government public" in haystack or "sec.gov" in haystack or "federalreserve.gov" in haystack or ".gov/" in haystack: | |
| return "government_public" | |
| if "creative commons attribution" in haystack or "cc-by" in haystack or "apache license" in haystack or "mit license" in haystack: | |
| return "permissive" | |
| if "copyright" in haystack or "all rights reserved" in haystack: | |
| return "copyright_notice" | |
| return "unknown" | |
| def source_gate(status: str, policy: dict[str, Any]) -> dict[str, Any]: | |
| approved = set(policy.get("approved_license_statuses", [])) | |
| review = set(policy.get("review_license_statuses", [])) | |
| blocked = set(policy.get("blocked_license_statuses", [])) | |
| if status in approved: | |
| decision = "approved_for_training" | |
| train_allowed = True | |
| elif status in blocked: | |
| decision = "blocked" | |
| train_allowed = False | |
| elif status in review: | |
| decision = "downloaded_for_review_not_training" | |
| train_allowed = False | |
| else: | |
| decision = "downloaded_for_review_not_training" | |
| train_allowed = False | |
| return { | |
| "license_status": status, | |
| "decision": decision, | |
| "train_allowed": train_allowed, | |
| "requires_human_review": not train_allowed, | |
| } | |
| def apply_ai_certification_gate(record: dict[str, Any], gate: dict[str, Any], policy: dict[str, Any]) -> dict[str, Any]: | |
| certification = record.get("ai_certification") or {} | |
| if not policy.get("source_quality_certification", {}).get("required_before_download", True): | |
| gate["verification_allowed"] = bool(gate.get("train_allowed")) | |
| return gate | |
| if not certification: | |
| gate.update( | |
| { | |
| "decision": "blocked_missing_ai_source_certification", | |
| "train_allowed": False, | |
| "verification_allowed": False, | |
| "requires_human_review": True, | |
| } | |
| ) | |
| return gate | |
| training_eligible = bool(certification.get("training_eligible")) | |
| verification_eligible = bool(certification.get("verification_eligible")) | |
| if gate.get("train_allowed") and training_eligible: | |
| gate["verification_allowed"] = verification_eligible | |
| return gate | |
| if gate.get("train_allowed") and verification_eligible: | |
| gate.update( | |
| { | |
| "decision": "downloaded_for_verification_not_training", | |
| "train_allowed": False, | |
| "verification_allowed": True, | |
| "requires_human_review": False, | |
| } | |
| ) | |
| return gate | |
| gate.update( | |
| { | |
| "decision": "blocked_ai_certification_not_training_or_verification", | |
| "train_allowed": False, | |
| "verification_allowed": False, | |
| "requires_human_review": True, | |
| } | |
| ) | |
| return gate | |
| def _download(url: str, output_path: Path, *, max_bytes: int) -> tuple[int, str | None]: | |
| parsed = urlparse(url) | |
| if parsed.scheme == "file": | |
| source_path = Path(urllib.request.url2pathname(parsed.path)) | |
| if source_path.stat().st_size > max_bytes: | |
| raise ValueError(f"source exceeds max_download_bytes: {source_path}") | |
| shutil.copyfile(source_path, output_path) | |
| return output_path.stat().st_size, None | |
| user_agent = os.environ.get( | |
| "SHFT_HTTP_USER_AGENT", | |
| "Linvest21-SHFT-source-intake/1.0 (public-source transparency manifest; contact=linvest21)", | |
| ) | |
| request = urllib.request.Request( | |
| url, | |
| headers={ | |
| "User-Agent": user_agent, | |
| "Accept": "text/html,application/pdf,text/plain,application/json,*/*;q=0.8", | |
| }, | |
| ) | |
| with urllib.request.urlopen(request, timeout=30) as response: | |
| content = response.read(max_bytes + 1) | |
| if len(content) > max_bytes: | |
| raise ValueError(f"download exceeds max_download_bytes: {url}") | |
| output_path.write_bytes(content) | |
| content_type = response.headers.get("content-type") | |
| return output_path.stat().st_size, content_type | |
| def select_sources( | |
| *, | |
| asset_class: str, | |
| role: str, | |
| catalog: dict[str, Any], | |
| max_sources: int, | |
| ) -> list[dict[str, Any]]: | |
| selected: list[dict[str, Any]] = [] | |
| seen_urls: set[str] = set() | |
| for source in catalog.get("sources", []): | |
| source_asset = source.get("asset_class") | |
| source_role = source.get("role") | |
| if source_asset not in {asset_class, "all"}: | |
| continue | |
| if source_role not in {role, "all"}: | |
| continue | |
| url_key = str(source.get("url", "")).strip().lower() | |
| if url_key and url_key in seen_urls: | |
| continue | |
| if url_key: | |
| seen_urls.add(url_key) | |
| selected.append(source) | |
| return selected | |
| def _existing_training_urls(training_dir: Path) -> set[str]: | |
| urls: set[str] = set() | |
| if not training_dir.exists(): | |
| return urls | |
| for path in training_dir.glob("*.normalized.json"): | |
| try: | |
| item = json.loads(path.read_text(encoding="utf-8-sig")) | |
| except (OSError, json.JSONDecodeError): | |
| continue | |
| url = str(item.get("source_url") or "").strip().lower() | |
| if url: | |
| urls.add(url) | |
| return urls | |
| def intake_public_sources( | |
| *, | |
| asset_class: str, | |
| role: str, | |
| catalog_path: Path | None = None, | |
| policy_path: Path | None = None, | |
| max_sources: int | None = None, | |
| promote_approved: bool | None = None, | |
| intake_root: Path | None = None, | |
| training_root: Path | None = None, | |
| quality_errors: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| policy = dict(load_policy(policy_path)) | |
| policy["_quality_errors"] = list(quality_errors or []) | |
| catalog = load_catalog(catalog_path) | |
| max_sources = int(max_sources or policy.get("stall_breakout", {}).get("max_new_sources_per_round", 10)) | |
| auto_download = bool(policy.get("auto_download_public_sources", True)) | |
| promote_approved = bool( | |
| policy.get("promote_approved_sources_to_training", True) if promote_approved is None else promote_approved | |
| ) | |
| allowed_schemes = set(policy.get("source_risk", {}).get("allowed_schemes", ["https"])) | |
| disallowed_hints = [str(item).lower() for item in policy.get("source_risk", {}).get("disallowed_url_hints", [])] | |
| max_bytes = int(policy.get("source_risk", {}).get("max_download_bytes", 50_000_000)) | |
| intake_dir = (intake_root or (REPO_ROOT / "data" / "learning_intake")) / asset_class / role | |
| raw_dir = intake_dir / "raw" | |
| normalized_dir = intake_dir / "normalized" | |
| approved_dir = intake_dir / "approved_for_training" | |
| validation_dir = intake_dir / "approved_for_validation" | |
| review_dir = intake_dir / "review_required" | |
| raw_dir.mkdir(parents=True, exist_ok=True) | |
| normalized_dir.mkdir(parents=True, exist_ok=True) | |
| approved_dir.mkdir(parents=True, exist_ok=True) | |
| validation_dir.mkdir(parents=True, exist_ok=True) | |
| review_dir.mkdir(parents=True, exist_ok=True) | |
| training_dir = (training_root or (REPO_ROOT / "data" / "learning")) / asset_class / role | |
| existing_training_urls = _existing_training_urls(training_dir) | |
| records: list[dict[str, Any]] = [] | |
| for index, source in enumerate(select_sources(asset_class=asset_class, role=role, catalog=catalog, max_sources=max_sources), start=1): | |
| url = str(source.get("url", "")) | |
| parsed = urlparse(url) | |
| title = str(source.get("title") or f"source_{index}") | |
| record: dict[str, Any] = { | |
| "asset_class": asset_class, | |
| "role": role, | |
| "title": title, | |
| "url": url, | |
| "source_type": source.get("source_type", "unknown"), | |
| "rationale": source.get("rationale"), | |
| "license_hint": source.get("license_hint", ""), | |
| "ai_certification": source.get("ai_certification"), | |
| "retrieved_at": utc_now(), | |
| } | |
| url_key = url.strip().lower() | |
| if url_key and url_key in existing_training_urls: | |
| record.update( | |
| { | |
| "downloaded": False, | |
| "decision": "already_promoted_to_training", | |
| "train_allowed": False, | |
| "requires_human_review": False, | |
| "already_in_training": True, | |
| } | |
| ) | |
| records.append(record) | |
| continue | |
| if not auto_download: | |
| record.update( | |
| { | |
| "downloaded": False, | |
| "decision": "download_skipped_by_policy", | |
| "train_allowed": False, | |
| "requires_human_review": True, | |
| } | |
| ) | |
| records.append(record) | |
| continue | |
| if parsed.scheme not in allowed_schemes: | |
| status = "blocked_scheme" | |
| record.update({"downloaded": False, "error": f"scheme not allowed: {parsed.scheme}", **source_gate(status, policy)}) | |
| records.append(record) | |
| continue | |
| if any(hint in url.lower() for hint in disallowed_hints): | |
| status = "credentialed" | |
| record.update({"downloaded": False, "error": "url contains disallowed access hint", **source_gate(status, policy)}) | |
| records.append(record) | |
| continue | |
| suffix = Path(parsed.path).suffix or ".html" | |
| filename = f"{index:03d}_{safe_name(title)}{suffix}" | |
| raw_path = raw_dir / filename | |
| try: | |
| byte_count, content_type = _download(url, raw_path, max_bytes=max_bytes) | |
| status = license_status(str(source.get("license_hint", "")), url) | |
| gate = apply_ai_certification_gate(record, source_gate(status, policy), policy) | |
| record.update( | |
| { | |
| "downloaded": True, | |
| "raw_path": str(raw_path), | |
| "sha256": sha256_file(raw_path), | |
| "bytes": byte_count, | |
| "content_type": content_type, | |
| **gate, | |
| } | |
| ) | |
| if gate["train_allowed"] or gate.get("verification_allowed"): | |
| normalized = normalize_source(raw_path=raw_path, record=record, policy=policy, output_dir=normalized_dir) | |
| record["normalized"] = { | |
| "path": normalized.get("normalized_path"), | |
| "format": normalized.get("format"), | |
| "ok": normalized.get("ok"), | |
| "quality": normalized.get("quality", {}), | |
| "eligibility": normalized.get("eligibility", {}), | |
| } | |
| normalized_path = Path(str(normalized.get("normalized_path"))) if normalized.get("normalized_path") else None | |
| content_certification = None | |
| if normalized.get("ok") and normalized.get("text"): | |
| content_certification = certify_normalized_source_content( | |
| asset_class=asset_class, | |
| role=role, | |
| title=title, | |
| url=url, | |
| source_type=str(record.get("source_type") or ""), | |
| text=str(normalized.get("text") or ""), | |
| quality_errors=policy.get("_quality_errors", []), | |
| policy=policy, | |
| ) | |
| record["content_ai_certification"] = content_certification | |
| normalized["content_ai_certification"] = content_certification | |
| normalized.setdefault("quality", {})["content_ai_score"] = content_certification.get("score") | |
| normalized.setdefault("quality", {})["content_reasoning_density"] = content_certification.get("metrics", {}).get( | |
| "reasoning_density" | |
| ) | |
| if content_certification.get("training_eligible") is not True: | |
| normalized.setdefault("eligibility", {})["training"] = False | |
| normalized.setdefault("eligibility", {}).setdefault("reasons", []).append( | |
| f"content_ai_not_training:{content_certification.get('intended_use')}" | |
| ) | |
| if content_certification.get("verification_eligible") is not True and content_certification.get("training_eligible") is not True: | |
| normalized.setdefault("eligibility", {})["validation"] = False | |
| normalized.setdefault("eligibility", {}).setdefault("reasons", []).append( | |
| f"content_ai_not_validation:{content_certification.get('intended_use')}" | |
| ) | |
| if normalized_path: | |
| write_json(normalized_path, normalized) | |
| record["normalized"] = { | |
| "path": normalized.get("normalized_path"), | |
| "format": normalized.get("format"), | |
| "ok": normalized.get("ok"), | |
| "quality": normalized.get("quality", {}), | |
| "eligibility": normalized.get("eligibility", {}), | |
| } | |
| if normalized_path and gate.get("train_allowed") and normalized.get("eligibility", {}).get("training"): | |
| approved_path = approved_dir / normalized_path.name | |
| shutil.copyfile(normalized_path, approved_path) | |
| record["approved_copy_path"] = str(approved_path) | |
| if normalized_path and gate.get("verification_allowed") and normalized.get("eligibility", {}).get("validation"): | |
| validation_path = validation_dir / normalized_path.name | |
| shutil.copyfile(normalized_path, validation_path) | |
| record["validation_copy_path"] = str(validation_path) | |
| if promote_approved and normalized_path and gate.get("train_allowed") and normalized.get("eligibility", {}).get("training"): | |
| training_dir.mkdir(parents=True, exist_ok=True) | |
| training_path = training_dir / normalized_path.name | |
| shutil.copyfile(normalized_path, training_path) | |
| record["training_path"] = str(training_path) | |
| else: | |
| review_path = review_dir / filename | |
| shutil.copyfile(raw_path, review_path) | |
| record["review_copy_path"] = str(review_path) | |
| except HTTPError as exc: | |
| record.update( | |
| { | |
| "downloaded": False, | |
| "decision": "download_failed", | |
| "train_allowed": False, | |
| "error": str(exc), | |
| "http_status": exc.code, | |
| "failure_class": "http_error", | |
| } | |
| ) | |
| except URLError as exc: | |
| record.update( | |
| { | |
| "downloaded": False, | |
| "decision": "download_failed", | |
| "train_allowed": False, | |
| "error": str(exc), | |
| "failure_class": "url_error", | |
| } | |
| ) | |
| except Exception as exc: | |
| record.update({"downloaded": False, "decision": "download_failed", "train_allowed": False, "error": str(exc)}) | |
| records.append(record) | |
| trainable_count = sum(1 for item in records if item.get("train_allowed") and item.get("training_path")) | |
| if trainable_count >= max_sources: | |
| break | |
| manifest = { | |
| "schema_version": "source_intake_manifest_v1", | |
| "asset_class": asset_class, | |
| "role": role, | |
| "policy": policy, | |
| "catalog_path": str(catalog_path or (CONFIG_ROOT / "data" / "public_source_catalog.json")), | |
| "intake_dir": str(intake_dir), | |
| "training_dir": str(training_dir), | |
| "source_count": len(records), | |
| "downloaded_count": sum(1 for item in records if item.get("downloaded")), | |
| "approved_for_training_count": sum(1 for item in records if item.get("train_allowed")), | |
| "approved_for_verification_count": sum(1 for item in records if item.get("verification_allowed")), | |
| "ai_training_certified_count": sum(1 for item in records if (item.get("ai_certification") or {}).get("training_eligible")), | |
| "ai_verification_certified_count": sum( | |
| 1 for item in records if (item.get("ai_certification") or {}).get("verification_eligible") | |
| ), | |
| "ai_rejected_count": sum( | |
| 1 for item in records if (item.get("ai_certification") or {}).get("intended_use") == "reject" | |
| ), | |
| "content_ai_training_certified_count": sum( | |
| 1 for item in records if (item.get("content_ai_certification") or {}).get("training_eligible") | |
| ), | |
| "content_ai_verification_certified_count": sum( | |
| 1 for item in records if (item.get("content_ai_certification") or {}).get("verification_eligible") | |
| ), | |
| "content_ai_rejected_count": sum( | |
| 1 for item in records if (item.get("content_ai_certification") or {}).get("intended_use") == "reject" | |
| ), | |
| "normalized_count": sum(1 for item in records if item.get("normalized", {}).get("path")), | |
| "training_eligible_count": sum(1 for item in records if item.get("train_allowed") and item.get("training_path")), | |
| "validation_eligible_count": sum(1 for item in records if item.get("verification_allowed") and item.get("validation_copy_path")), | |
| "review_required_count": sum(1 for item in records if item.get("requires_human_review")), | |
| "records": records, | |
| "created_at": utc_now(), | |
| } | |
| write_json(intake_dir / "source_intake_manifest.json", manifest) | |
| write_json(intake_dir / "license_manifest.json", {"records": records, "created_at": utc_now()}) | |
| return manifest | |
Xet Storage Details
- Size:
- 19.8 kB
- Xet hash:
- 3462823560f504d813b13493ec5be5d470b3e4965ea695abbe705b032b9a5ce9
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.