linvest21's picture
download
raw
8.23 kB
from __future__ import annotations
import json
import re
from html.parser import HTMLParser
from pathlib import Path
from typing import Any
from n21.config import write_json
from observability.audit_log import utc_now
from data_pipeline.pdf_warning_filter import suppress_known_pypdf_pointer_noise
class _VisibleTextHTMLParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self._skip_depth = 0
self.parts: list[str] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag.lower() in {"script", "style", "noscript", "svg", "nav", "footer", "header"}:
self._skip_depth += 1
def handle_endtag(self, tag: str) -> None:
if tag.lower() in {"script", "style", "noscript", "svg", "nav", "footer", "header"} and self._skip_depth:
self._skip_depth -= 1
def handle_data(self, data: str) -> None:
if not self._skip_depth and data.strip():
self.parts.append(data.strip())
def clean_text(text: str) -> str:
text = text.replace("\x00", " ")
text = re.sub(r"[ \t\r\f\v]+", " ", text)
text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text)
return text.strip()
def detect_format(path: Path, *, source_type: str | None = None, content_type: str | None = None) -> str:
hint = (source_type or "").lower()
suffix = path.suffix.lower()
ctype = (content_type or "").lower()
if hint in {"pdf", "jsonl", "html", "html_index", "txt", "md", "markdown"}:
return "md" if hint == "markdown" else hint
if suffix == ".pdf" or "application/pdf" in ctype:
return "pdf"
if suffix in {".jsonl", ".hf_finetune.jsonl"}:
return "jsonl"
if suffix in {".html", ".htm"} or "text/html" in ctype:
return "html"
if suffix == ".md":
return "md"
if suffix == ".txt" or "text/plain" in ctype:
return "txt"
return suffix.lstrip(".") or "unknown"
def _extract_html(path: Path) -> tuple[str, dict[str, Any]]:
raw = path.read_text(encoding="utf-8", errors="ignore")
parser = _VisibleTextHTMLParser()
parser.feed(raw)
text = clean_text("\n".join(parser.parts))
markup_chars = len(raw)
text_chars = len(text)
boilerplate_ratio = 1.0 - (text_chars / markup_chars) if markup_chars else 1.0
return text, {"extractor": "html_visible_text", "boilerplate_ratio": round(max(0.0, boilerplate_ratio), 4)}
def _extract_pdf(path: Path) -> tuple[str, dict[str, Any]]:
try:
from pypdf import PdfReader
except ModuleNotFoundError as exc:
raise RuntimeError("Missing dependency pypdf. Install with: python -m pip install pypdf") from exc
with suppress_known_pypdf_pointer_noise():
reader = PdfReader(str(path))
pages: list[str] = []
for page_no, page in enumerate(reader.pages, start=1):
text = clean_text(page.extract_text() or "")
if text:
pages.append(f"[Page {page_no}]\n{text}")
return clean_text("\n\n".join(pages)), {"extractor": "pypdf", "page_count_with_text": len(pages)}
def _extract_jsonl(path: Path) -> tuple[str, dict[str, Any]]:
parts: list[str] = []
record_count = 0
with path.open("r", encoding="utf-8-sig", newline="") as handle:
for line_no, line in enumerate(handle, start=1):
if not line.strip():
continue
item = json.loads(line)
record_count += 1
for message in item.get("messages", []):
if isinstance(message, dict):
parts.append(str(message.get("content", "")))
return clean_text("\n\n".join(parts)), {"extractor": "jsonl_chat_passthrough", "record_count": record_count}
def extract_text(path: Path, source_format: str) -> tuple[str, dict[str, Any]]:
if source_format in {"html", "html_index"}:
return _extract_html(path)
if source_format == "pdf":
return _extract_pdf(path)
if source_format == "jsonl":
return _extract_jsonl(path)
if source_format in {"txt", "md"}:
return clean_text(path.read_text(encoding="utf-8", errors="ignore")), {"extractor": f"{source_format}_plain_text"}
raise ValueError(f"unsupported source format for normalization: {source_format}")
def _format_policy(policy: dict[str, Any], source_format: str) -> dict[str, Any]:
formats = policy.get("formats", {})
return dict(formats.get(source_format, formats.get("unknown", {})))
def normalize_source(
*,
raw_path: Path,
record: dict[str, Any],
policy: dict[str, Any],
output_dir: Path,
) -> dict[str, Any]:
source_format = detect_format(
raw_path,
source_type=str(record.get("source_type") or ""),
content_type=str(record.get("content_type") or ""),
)
format_policy = _format_policy(policy, source_format)
output_dir.mkdir(parents=True, exist_ok=True)
if not bool(format_policy.get("normalize", source_format in {"pdf", "jsonl", "html", "txt", "md"})):
result = {
"schema_version": "normalized_source_v1",
"source_title": record.get("title"),
"source_url": record.get("url"),
"raw_path": str(raw_path),
"format": source_format,
"ok": False,
"error": f"normalization disabled for format: {source_format}",
"created_at": utc_now(),
}
write_json(output_dir / f"{raw_path.stem}.normalized.json", result)
return result
text, metrics = extract_text(raw_path, source_format)
text_chars = len(text)
min_train_chars = int(format_policy.get("min_train_text_chars", 500))
min_validation_chars = int(format_policy.get("min_validation_text_chars", min_train_chars))
max_boilerplate = float(format_policy.get("max_boilerplate_ratio", 1.0))
boilerplate_ratio = float(metrics.get("boilerplate_ratio", 0.0))
train_allowed_by_format = bool(format_policy.get("train", source_format in {"pdf", "jsonl"}))
validation_allowed_by_format = bool(format_policy.get("validate", source_format in {"pdf", "jsonl"}))
extraction_ok = text_chars >= min_train_chars and boilerplate_ratio <= max_boilerplate
training = bool(record.get("train_allowed")) and train_allowed_by_format and extraction_ok
validation = (
bool(record.get("verification_allowed") or record.get("train_allowed"))
and validation_allowed_by_format
and text_chars >= min_validation_chars
and extraction_ok
)
reasons: list[str] = []
if not record.get("train_allowed"):
reasons.append("source_policy_not_train_allowed")
if not train_allowed_by_format:
reasons.append(f"format_not_training_enabled:{source_format}")
if text_chars < min_train_chars:
reasons.append(f"text_chars_below_training_minimum:{text_chars}<{min_train_chars}")
if boilerplate_ratio > max_boilerplate:
reasons.append(f"boilerplate_ratio_above_max:{boilerplate_ratio}>{max_boilerplate}")
if not validation_allowed_by_format:
reasons.append(f"format_not_validation_enabled:{source_format}")
normalized = {
"schema_version": "normalized_source_v1",
"asset_class": record.get("asset_class"),
"role": record.get("role"),
"source_title": record.get("title"),
"source_url": record.get("url"),
"source_type": record.get("source_type"),
"license_status": record.get("license_status"),
"decision": record.get("decision"),
"raw_path": str(raw_path),
"format": source_format,
"text": text,
"quality": {
"text_chars": text_chars,
"has_source_attribution": bool(record.get("title") or record.get("url")),
**metrics,
},
"eligibility": {
"training": training,
"validation": validation,
"reasons": reasons,
},
"ok": extraction_ok,
"created_at": utc_now(),
}
output_path = output_dir / f"{raw_path.stem}.normalized.json"
write_json(output_path, normalized)
normalized["normalized_path"] = str(output_path)
write_json(output_path, normalized)
return normalized

Xet Storage Details

Size:
8.23 kB
·
Xet hash:
5d99170819568298ffcd0ab573af87fbb9fff2d1a665da6816c50aafaab2b015

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.