import os import json import time import socket import threading import io import requests import pandas as pd from pathlib import Path from tokenizers import Tokenizer from huggingface_hub import HfApi # ── Config ─────────────────────────────────────────────────────────────────── HF_TOKEN = os.environ.get("HF_TOKEN") DATASET_REPO = "Neon-coding/github-code-raw" TOK_PATH = "/data/tokenizer.json" OUT_DIR = "/data/by-language" STATE_FILE = "/data/progress_state.json" TOTAL_PARQUETS = 880 SHARD_TOKENS = 50_000_000 # 50M tokens per shard PARQUET_URL = ( "https://huggingface.co/datasets/codeparrot/github-code-clean" "/resolve/main/data/train-{i:05d}-of-00880.parquet" ) os.makedirs(OUT_DIR, exist_ok=True) api = HfApi(token=HF_TOKEN) # ── Port 7860 — keeps Space green ──────────────────────────────────────────── def serve(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("0.0.0.0", 7860)) s.listen(5) print("✓ Listening on port 7860") while True: conn, _ = s.accept() conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK") conn.close() # ── State ──────────────────────────────────────────────────────────────────── def load_state(): if os.path.exists(STATE_FILE): with open(STATE_FILE) as f: state = json.load(f) print(f"Resuming — {len(state['done'])} / {TOTAL_PARQUETS} parquets done") else: state = { "done": [], "lang_shards": {}, "lang_tokens": {}, } print("Starting fresh") return state def save_state(state, retries=3, delay=5): for attempt in range(retries): try: with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) return except OSError as e: print(f" ⚠ State save attempt {attempt + 1} failed: {e}") if attempt < retries - 1: time.sleep(delay) print(" ✗ State save failed after all retries — continuing") # ── Shard buffers — global per language, persist across parquets ───────────── buffers = {} def get_buffer(lang): if lang not in buffers: buffers[lang] = {"rows": [], "token_count": 0} return buffers[lang] def flush_shard(lang, rows, state): shard_idx = state["lang_shards"].get(lang, 0) lang_dir = Path(OUT_DIR) / lang lang_dir.mkdir(parents=True, exist_ok=True) shard_name = f"shard_{shard_idx:06d}.jsonl" shard_path = lang_dir / shard_name with open(shard_path, "w", encoding="utf-8") as f: for row in rows: f.write(json.dumps(row, ensure_ascii=False) + "\n") tok_in_shard = sum(r["token_count"] for r in rows) state["lang_shards"][lang] = shard_idx + 1 state["lang_tokens"][lang] = state["lang_tokens"].get(lang, 0) + tok_in_shard print(f" ✓ {lang}/{shard_name} | {len(rows)} samples | {tok_in_shard:,} tokens") # ── Main processing loop ───────────────────────────────────────────────────── def process(tokenizer, state): for i in range(TOTAL_PARQUETS): if i in state["done"]: print(f"[{i:06d}/{TOTAL_PARQUETS}] SKIP") continue url = PARQUET_URL.format(i=i) print(f"[{i:06d}/{TOTAL_PARQUETS}] Downloading...") try: resp = requests.get( url, headers={"Authorization": f"Bearer {HF_TOKEN}"}, timeout=180, ) resp.raise_for_status() df = pd.read_parquet(io.BytesIO(resp.content)) except Exception as e: print(f"[{i:06d}] Download error: {e} — skipping") continue print(f"[{i:06d}] {len(df):,} rows | {df['language'].nunique()} languages") # row by row — constant memory for row_tuple in df.itertuples(index=False): lang = row_tuple.language text = row_tuple.code if row_tuple.code else "" repo = row_tuple.repo_name fpath = row_tuple.path lic = row_tuple.license if not text.strip(): continue enc = tokenizer.encode(text) token_count = len(enc.ids) if token_count < 2: continue buf = get_buffer(lang) row = { "text": text, "token_count": token_count, "repo": repo, "path": fpath, "license": lic, } if buf["token_count"] + token_count > SHARD_TOKENS and buf["rows"]: flush_shard(lang, buf["rows"], state) save_state(state) buf["rows"] = [] buf["token_count"] = 0 buf["rows"].append(row) buf["token_count"] += token_count del df state["done"].append(i) save_state(state) print(f"[{i:06d}] ✓ Complete") # ── Flush remaining partial shards ──────────────────────────────────────── print("\nFlushing remaining buffers...") for lang, buf in buffers.items(): if buf["rows"]: flush_shard(lang, buf["rows"], state) save_state(state) # ── Write meta.json per language ────────────────────────────────────────── print("\nWriting meta.json per language...") for lang in state["lang_tokens"]: meta = { "language": lang, "total_tokens": state["lang_tokens"][lang], "total_shards": state["lang_shards"].get(lang, 0), } meta_path = Path(OUT_DIR) / lang / "meta.json" with open(meta_path, "w") as f: json.dump(meta, f, indent=2) print(f" {lang}: {meta['total_tokens']:,} tokens | {meta['total_shards']} shards") # ── Push everything to HF dataset repo ─────────────────────────────────── print(f"\nPushing to {DATASET_REPO}...") api.upload_folder( folder_path=OUT_DIR, repo_id=DATASET_REPO, repo_type="dataset", token=HF_TOKEN, ) print("\n✓ All done!") # ── Entry point ────────────────────────────────────────────────────────────── if __name__ == "__main__": threading.Thread(target=serve, daemon=True).start() print("✓ Loading tokenizer from /data/tokenizer.json...") tokenizer = Tokenizer.from_file(TOK_PATH) print(f"✓ Tokenizer loaded | vocab: {tokenizer.get_vocab_size():,}") state = load_state() threading.Thread(target=process, args=(tokenizer, state), daemon=True).start() while True: time.sleep(60)