Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /scripts /collect_gutenberg.py
| #!/usr/bin/env python | |
| """ | |
| Collect high-provenance public-domain-oriented text from Project Gutenberg via Gutendex. | |
| Outputs: | |
| - raw/gutenberg/{id}.txt | |
| - jsonl/gutenberg_seed.jsonl | |
| - manifests/gutenberg_manifest.jsonl | |
| - manifests/gutenberg_quality_report.json | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import hashlib | |
| import json | |
| import re | |
| import time | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| from typing import Any | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from tqdm import tqdm | |
| from urllib3.util.retry import Retry | |
| GUTENDEX = "https://gutendex.com/books/" | |
| USER_AGENT = "TinyMindDataFoundry/1.0 (provenance-first educational dataset builder)" | |
| class QualityDecision: | |
| accepted: bool | |
| reason: str | |
| chars: int | |
| words: int | |
| unique_ratio: float | |
| def sha256_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest() | |
| def normalize_text(text: str) -> str: | |
| text = text.replace("\r\n", "\n").replace("\r", "\n") | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{4,}", "\n\n\n", text) | |
| text = text.strip() | |
| return text | |
| def strip_gutenberg_boilerplate(text: str) -> str: | |
| start_patterns = [ | |
| r"\*\*\* START OF (?:THE|THIS) PROJECT GUTENBERG EBOOK .*?\*\*\*", | |
| r"\*\*\* START OF .*?\*\*\*", | |
| ] | |
| end_patterns = [ | |
| r"\*\*\* END OF (?:THE|THIS) PROJECT GUTENBERG EBOOK .*?\*\*\*", | |
| r"\*\*\* END OF .*?\*\*\*", | |
| ] | |
| for pattern in start_patterns: | |
| match = re.search(pattern, text, flags=re.IGNORECASE | re.DOTALL) | |
| if match: | |
| text = text[match.end() :] | |
| break | |
| for pattern in end_patterns: | |
| match = re.search(pattern, text, flags=re.IGNORECASE | re.DOTALL) | |
| if match: | |
| text = text[: match.start()] | |
| break | |
| return normalize_text(text) | |
| def quality_check(text: str) -> QualityDecision: | |
| words = re.findall(r"[A-Za-z][A-Za-z'-]+", text) | |
| chars = len(text) | |
| if chars < 20_000: | |
| return QualityDecision(False, "too_short", chars, len(words), 0.0) | |
| if len(words) < 3_000: | |
| return QualityDecision(False, "too_few_words", chars, len(words), 0.0) | |
| lowered = [w.lower() for w in words[:50_000]] | |
| unique_ratio = len(set(lowered)) / max(len(lowered), 1) | |
| if unique_ratio < 0.025: | |
| return QualityDecision(False, "low_unique_ratio", chars, len(words), unique_ratio) | |
| ascii_printable = sum(1 for ch in text if ch == "\n" or ch == "\t" or 32 <= ord(ch) <= 126) | |
| printable_ratio = ascii_printable / max(chars, 1) | |
| if printable_ratio < 0.85: | |
| return QualityDecision(False, "encoding_noise", chars, len(words), unique_ratio) | |
| return QualityDecision(True, "accepted", chars, len(words), unique_ratio) | |
| def choose_plaintext_url(book: dict[str, Any]) -> str | None: | |
| formats = book.get("formats", {}) | |
| preferred = [ | |
| "text/plain; charset=utf-8", | |
| "text/plain", | |
| ] | |
| for key in preferred: | |
| url = formats.get(key) | |
| if url and url.startswith("https://"): | |
| return url | |
| for key, url in formats.items(): | |
| if key.startswith("text/plain") and isinstance(url, str) and url.startswith("https://"): | |
| return url | |
| return None | |
| def make_session() -> requests.Session: | |
| session = requests.Session() | |
| retry = Retry( | |
| total=5, | |
| connect=5, | |
| read=5, | |
| backoff_factor=1.5, | |
| status_forcelist=(429, 500, 502, 503, 504), | |
| allowed_methods=("GET",), | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry) | |
| session.mount("https://", adapter) | |
| session.mount("http://", adapter) | |
| session.headers.update({"User-Agent": USER_AGENT}) | |
| return session | |
| def fetch_json(url: str, session: requests.Session) -> dict[str, Any]: | |
| response = session.get(url, timeout=90) | |
| response.raise_for_status() | |
| return response.json() | |
| def fetch_text(url: str, session: requests.Session) -> str: | |
| response = session.get(url, timeout=120) | |
| response.raise_for_status() | |
| response.encoding = response.encoding or "utf-8" | |
| return response.text | |
| def iter_books(limit: int, session: requests.Session, delay: float, language: str): | |
| url = f"{GUTENDEX}?languages={language}&mime_type=text/plain" | |
| yielded = 0 | |
| while url and yielded < limit: | |
| data = fetch_json(url, session) | |
| for book in data.get("results", []): | |
| yield book | |
| yielded += 1 | |
| if yielded >= limit: | |
| break | |
| url = data.get("next") | |
| time.sleep(delay) | |
| def main() -> int: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--root", default=r"D:\ad\tinymind\data") | |
| parser.add_argument("--limit", type=int, default=100) | |
| parser.add_argument("--language", default="en") | |
| parser.add_argument("--delay", type=float, default=0.35) | |
| parser.add_argument("--min-chars", type=int, default=20_000) | |
| args = parser.parse_args() | |
| root = Path(args.root) | |
| source_key = f"gutenberg_{args.language}" | |
| raw_dir = root / "raw" / source_key | |
| jsonl_dir = root / "jsonl" | |
| manifest_dir = root / "manifests" | |
| log_dir = root / "logs" | |
| for path in [raw_dir, jsonl_dir, manifest_dir, log_dir]: | |
| path.mkdir(parents=True, exist_ok=True) | |
| session = make_session() | |
| out_jsonl = jsonl_dir / f"{source_key}_seed.jsonl" | |
| out_manifest = manifest_dir / f"{source_key}_manifest.jsonl" | |
| quality_report = manifest_dir / f"{source_key}_quality_report.json" | |
| seen_hashes: set[str] = set() | |
| accepted = 0 | |
| rejected = 0 | |
| downloaded = 0 | |
| existing_manifest = [] | |
| if out_manifest.exists(): | |
| for line in out_manifest.read_text(encoding="utf-8").splitlines(): | |
| if line.strip(): | |
| item = json.loads(line) | |
| seen_hashes.add(item["content_sha256"]) | |
| existing_manifest.append(item) | |
| with out_jsonl.open("a", encoding="utf-8") as data_f, out_manifest.open("a", encoding="utf-8") as manifest_f: | |
| for book in tqdm(iter_books(args.limit, session, args.delay, args.language), total=args.limit, desc=source_key): | |
| book_id = book.get("id") | |
| text_url = choose_plaintext_url(book) | |
| if not book_id or not text_url: | |
| rejected += 1 | |
| continue | |
| raw_path = raw_dir / f"{book_id}.txt" | |
| try: | |
| raw_text = raw_path.read_text(encoding="utf-8") if raw_path.exists() else fetch_text(text_url, session) | |
| if not raw_path.exists(): | |
| raw_path.write_text(raw_text, encoding="utf-8") | |
| downloaded += 1 | |
| time.sleep(args.delay) | |
| except Exception as exc: | |
| rejected += 1 | |
| print(f"reject fetch {book_id}: {exc}") | |
| continue | |
| clean_text = strip_gutenberg_boilerplate(raw_text) | |
| if len(clean_text) < args.min_chars: | |
| rejected += 1 | |
| continue | |
| content_hash = sha256_text(clean_text) | |
| if content_hash in seen_hashes: | |
| rejected += 1 | |
| continue | |
| decision = quality_check(clean_text) | |
| if not decision.accepted: | |
| rejected += 1 | |
| continue | |
| seen_hashes.add(content_hash) | |
| authors = [a.get("name", "") for a in book.get("authors", []) if a.get("name")] | |
| record = { | |
| "id": f"gutenberg:{args.language}:{book_id}", | |
| "source": "Project Gutenberg via Gutendex", | |
| "source_url": text_url, | |
| "license_family": "public-domain", | |
| "title": book.get("title", ""), | |
| "authors": authors, | |
| "languages": book.get("languages", []), | |
| "subjects": book.get("subjects", []), | |
| "bookshelves": book.get("bookshelves", []), | |
| "download_count": book.get("download_count"), | |
| "content_sha256": content_hash, | |
| "text": clean_text, | |
| } | |
| manifest = { | |
| **{k: v for k, v in record.items() if k != "text"}, | |
| "raw_path": str(raw_path), | |
| "quality": asdict(decision), | |
| } | |
| data_f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| manifest_f.write(json.dumps(manifest, ensure_ascii=False) + "\n") | |
| accepted += 1 | |
| report = { | |
| "source": f"gutenberg_gutendex_{args.language}_plaintext", | |
| "limit": args.limit, | |
| "accepted_this_run": accepted, | |
| "rejected_this_run": rejected, | |
| "downloaded_this_run": downloaded, | |
| "total_unique_hashes_seen": len(seen_hashes), | |
| "jsonl": str(out_jsonl), | |
| "manifest": str(out_manifest), | |
| } | |
| quality_report.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8") | |
| print(json.dumps(report, indent=2, ensure_ascii=False)) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
Xet Storage Details
- Size:
- 9.08 kB
- Xet hash:
- 5fd87e15f4bd8b76882057c8788d9b97ac7f521bee48165b188dcb829f3a3ba5
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.