Buckets:
| from __future__ import annotations | |
| import json | |
| import re | |
| from html.parser import HTMLParser | |
| from pathlib import Path | |
| from typing import Any | |
| from n21.config import write_json | |
| from observability.audit_log import utc_now | |
| from data_pipeline.pdf_warning_filter import suppress_known_pypdf_pointer_noise | |
| class _VisibleTextHTMLParser(HTMLParser): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._skip_depth = 0 | |
| self.parts: list[str] = [] | |
| def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: | |
| if tag.lower() in {"script", "style", "noscript", "svg", "nav", "footer", "header"}: | |
| self._skip_depth += 1 | |
| def handle_endtag(self, tag: str) -> None: | |
| if tag.lower() in {"script", "style", "noscript", "svg", "nav", "footer", "header"} and self._skip_depth: | |
| self._skip_depth -= 1 | |
| def handle_data(self, data: str) -> None: | |
| if not self._skip_depth and data.strip(): | |
| self.parts.append(data.strip()) | |
| def clean_text(text: str) -> str: | |
| text = text.replace("\x00", " ") | |
| text = re.sub(r"[ \t\r\f\v]+", " ", text) | |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text) | |
| return text.strip() | |
| def detect_format(path: Path, *, source_type: str | None = None, content_type: str | None = None) -> str: | |
| hint = (source_type or "").lower() | |
| suffix = path.suffix.lower() | |
| ctype = (content_type or "").lower() | |
| if hint in {"pdf", "jsonl", "html", "html_index", "txt", "md", "markdown"}: | |
| return "md" if hint == "markdown" else hint | |
| if suffix == ".pdf" or "application/pdf" in ctype: | |
| return "pdf" | |
| if suffix in {".jsonl", ".hf_finetune.jsonl"}: | |
| return "jsonl" | |
| if suffix in {".html", ".htm"} or "text/html" in ctype: | |
| return "html" | |
| if suffix == ".md": | |
| return "md" | |
| if suffix == ".txt" or "text/plain" in ctype: | |
| return "txt" | |
| return suffix.lstrip(".") or "unknown" | |
| def _extract_html(path: Path) -> tuple[str, dict[str, Any]]: | |
| raw = path.read_text(encoding="utf-8", errors="ignore") | |
| parser = _VisibleTextHTMLParser() | |
| parser.feed(raw) | |
| text = clean_text("\n".join(parser.parts)) | |
| markup_chars = len(raw) | |
| text_chars = len(text) | |
| boilerplate_ratio = 1.0 - (text_chars / markup_chars) if markup_chars else 1.0 | |
| return text, {"extractor": "html_visible_text", "boilerplate_ratio": round(max(0.0, boilerplate_ratio), 4)} | |
| def _extract_pdf(path: Path) -> tuple[str, dict[str, Any]]: | |
| try: | |
| from pypdf import PdfReader | |
| except ModuleNotFoundError as exc: | |
| raise RuntimeError("Missing dependency pypdf. Install with: python -m pip install pypdf") from exc | |
| with suppress_known_pypdf_pointer_noise(): | |
| reader = PdfReader(str(path)) | |
| pages: list[str] = [] | |
| for page_no, page in enumerate(reader.pages, start=1): | |
| text = clean_text(page.extract_text() or "") | |
| if text: | |
| pages.append(f"[Page {page_no}]\n{text}") | |
| return clean_text("\n\n".join(pages)), {"extractor": "pypdf", "page_count_with_text": len(pages)} | |
| def _extract_jsonl(path: Path) -> tuple[str, dict[str, Any]]: | |
| parts: list[str] = [] | |
| record_count = 0 | |
| with path.open("r", encoding="utf-8-sig", newline="") as handle: | |
| for line_no, line in enumerate(handle, start=1): | |
| if not line.strip(): | |
| continue | |
| item = json.loads(line) | |
| record_count += 1 | |
| for message in item.get("messages", []): | |
| if isinstance(message, dict): | |
| parts.append(str(message.get("content", ""))) | |
| return clean_text("\n\n".join(parts)), {"extractor": "jsonl_chat_passthrough", "record_count": record_count} | |
| def extract_text(path: Path, source_format: str) -> tuple[str, dict[str, Any]]: | |
| if source_format in {"html", "html_index"}: | |
| return _extract_html(path) | |
| if source_format == "pdf": | |
| return _extract_pdf(path) | |
| if source_format == "jsonl": | |
| return _extract_jsonl(path) | |
| if source_format in {"txt", "md"}: | |
| return clean_text(path.read_text(encoding="utf-8", errors="ignore")), {"extractor": f"{source_format}_plain_text"} | |
| raise ValueError(f"unsupported source format for normalization: {source_format}") | |
| def _format_policy(policy: dict[str, Any], source_format: str) -> dict[str, Any]: | |
| formats = policy.get("formats", {}) | |
| return dict(formats.get(source_format, formats.get("unknown", {}))) | |
| def normalize_source( | |
| *, | |
| raw_path: Path, | |
| record: dict[str, Any], | |
| policy: dict[str, Any], | |
| output_dir: Path, | |
| ) -> dict[str, Any]: | |
| source_format = detect_format( | |
| raw_path, | |
| source_type=str(record.get("source_type") or ""), | |
| content_type=str(record.get("content_type") or ""), | |
| ) | |
| format_policy = _format_policy(policy, source_format) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| if not bool(format_policy.get("normalize", source_format in {"pdf", "jsonl", "html", "txt", "md"})): | |
| result = { | |
| "schema_version": "normalized_source_v1", | |
| "source_title": record.get("title"), | |
| "source_url": record.get("url"), | |
| "raw_path": str(raw_path), | |
| "format": source_format, | |
| "ok": False, | |
| "error": f"normalization disabled for format: {source_format}", | |
| "created_at": utc_now(), | |
| } | |
| write_json(output_dir / f"{raw_path.stem}.normalized.json", result) | |
| return result | |
| text, metrics = extract_text(raw_path, source_format) | |
| text_chars = len(text) | |
| min_train_chars = int(format_policy.get("min_train_text_chars", 500)) | |
| min_validation_chars = int(format_policy.get("min_validation_text_chars", min_train_chars)) | |
| max_boilerplate = float(format_policy.get("max_boilerplate_ratio", 1.0)) | |
| boilerplate_ratio = float(metrics.get("boilerplate_ratio", 0.0)) | |
| train_allowed_by_format = bool(format_policy.get("train", source_format in {"pdf", "jsonl"})) | |
| validation_allowed_by_format = bool(format_policy.get("validate", source_format in {"pdf", "jsonl"})) | |
| extraction_ok = text_chars >= min_train_chars and boilerplate_ratio <= max_boilerplate | |
| training = bool(record.get("train_allowed")) and train_allowed_by_format and extraction_ok | |
| validation = ( | |
| bool(record.get("verification_allowed") or record.get("train_allowed")) | |
| and validation_allowed_by_format | |
| and text_chars >= min_validation_chars | |
| and extraction_ok | |
| ) | |
| reasons: list[str] = [] | |
| if not record.get("train_allowed"): | |
| reasons.append("source_policy_not_train_allowed") | |
| if not train_allowed_by_format: | |
| reasons.append(f"format_not_training_enabled:{source_format}") | |
| if text_chars < min_train_chars: | |
| reasons.append(f"text_chars_below_training_minimum:{text_chars}<{min_train_chars}") | |
| if boilerplate_ratio > max_boilerplate: | |
| reasons.append(f"boilerplate_ratio_above_max:{boilerplate_ratio}>{max_boilerplate}") | |
| if not validation_allowed_by_format: | |
| reasons.append(f"format_not_validation_enabled:{source_format}") | |
| normalized = { | |
| "schema_version": "normalized_source_v1", | |
| "asset_class": record.get("asset_class"), | |
| "role": record.get("role"), | |
| "source_title": record.get("title"), | |
| "source_url": record.get("url"), | |
| "source_type": record.get("source_type"), | |
| "license_status": record.get("license_status"), | |
| "decision": record.get("decision"), | |
| "raw_path": str(raw_path), | |
| "format": source_format, | |
| "text": text, | |
| "quality": { | |
| "text_chars": text_chars, | |
| "has_source_attribution": bool(record.get("title") or record.get("url")), | |
| **metrics, | |
| }, | |
| "eligibility": { | |
| "training": training, | |
| "validation": validation, | |
| "reasons": reasons, | |
| }, | |
| "ok": extraction_ok, | |
| "created_at": utc_now(), | |
| } | |
| output_path = output_dir / f"{raw_path.stem}.normalized.json" | |
| write_json(output_path, normalized) | |
| normalized["normalized_path"] = str(output_path) | |
| write_json(output_path, normalized) | |
| return normalized | |
Xet Storage Details
- Size:
- 8.23 kB
- Xet hash:
- 5d99170819568298ffcd0ab573af87fbb9fff2d1a665da6816c50aafaab2b015
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.