| import os |
| import json |
| import time |
| import socket |
| import threading |
| import io |
| import requests |
| import pandas as pd |
| from pathlib import Path |
| from tokenizers import Tokenizer |
| from huggingface_hub import HfApi |
|
|
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| DATASET_REPO = "Neon-coding/github-code-raw" |
| TOK_PATH = "/data/tokenizer.json" |
| OUT_DIR = "/data/by-language" |
| STATE_FILE = "/data/progress_state.json" |
| TOTAL_PARQUETS = 880 |
| SHARD_TOKENS = 50_000_000 |
|
|
| PARQUET_URL = ( |
| "https://huggingface.co/datasets/codeparrot/github-code-clean" |
| "/resolve/main/data/train-{i:05d}-of-00880.parquet" |
| ) |
|
|
| os.makedirs(OUT_DIR, exist_ok=True) |
|
|
| api = HfApi(token=HF_TOKEN) |
|
|
| |
| def serve(): |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| s.bind(("0.0.0.0", 7860)) |
| s.listen(5) |
| print("β Listening on port 7860") |
| while True: |
| conn, _ = s.accept() |
| conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK") |
| conn.close() |
|
|
| |
| def load_state(): |
| if os.path.exists(STATE_FILE): |
| with open(STATE_FILE) as f: |
| state = json.load(f) |
| print(f"Resuming β {len(state['done'])} / {TOTAL_PARQUETS} parquets done") |
| else: |
| state = { |
| "done": [], |
| "lang_shards": {}, |
| "lang_tokens": {}, |
| } |
| print("Starting fresh") |
| return state |
|
|
| def save_state(state, retries=3, delay=5): |
| for attempt in range(retries): |
| try: |
| with open(STATE_FILE, "w") as f: |
| json.dump(state, f, indent=2) |
| return |
| except OSError as e: |
| print(f" β State save attempt {attempt + 1} failed: {e}") |
| if attempt < retries - 1: |
| time.sleep(delay) |
| print(" β State save failed after all retries β continuing") |
|
|
| |
| buffers = {} |
|
|
| def get_buffer(lang): |
| if lang not in buffers: |
| buffers[lang] = {"rows": [], "token_count": 0} |
| return buffers[lang] |
|
|
| def flush_shard(lang, rows, state): |
| shard_idx = state["lang_shards"].get(lang, 0) |
| lang_dir = Path(OUT_DIR) / lang |
| lang_dir.mkdir(parents=True, exist_ok=True) |
| shard_name = f"shard_{shard_idx:06d}.jsonl" |
| shard_path = lang_dir / shard_name |
|
|
| with open(shard_path, "w", encoding="utf-8") as f: |
| for row in rows: |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") |
|
|
| tok_in_shard = sum(r["token_count"] for r in rows) |
| state["lang_shards"][lang] = shard_idx + 1 |
| state["lang_tokens"][lang] = state["lang_tokens"].get(lang, 0) + tok_in_shard |
| print(f" β {lang}/{shard_name} | {len(rows)} samples | {tok_in_shard:,} tokens") |
|
|
| |
| def process(tokenizer, state): |
| for i in range(TOTAL_PARQUETS): |
| if i in state["done"]: |
| print(f"[{i:06d}/{TOTAL_PARQUETS}] SKIP") |
| continue |
|
|
| url = PARQUET_URL.format(i=i) |
| print(f"[{i:06d}/{TOTAL_PARQUETS}] Downloading...") |
|
|
| try: |
| resp = requests.get( |
| url, |
| headers={"Authorization": f"Bearer {HF_TOKEN}"}, |
| timeout=180, |
| ) |
| resp.raise_for_status() |
| df = pd.read_parquet(io.BytesIO(resp.content)) |
| except Exception as e: |
| print(f"[{i:06d}] Download error: {e} β skipping") |
| continue |
|
|
| print(f"[{i:06d}] {len(df):,} rows | {df['language'].nunique()} languages") |
|
|
| |
| for row_tuple in df.itertuples(index=False): |
| lang = row_tuple.language |
| text = row_tuple.code if row_tuple.code else "" |
| repo = row_tuple.repo_name |
| fpath = row_tuple.path |
| lic = row_tuple.license |
|
|
| if not text.strip(): |
| continue |
|
|
| enc = tokenizer.encode(text) |
| token_count = len(enc.ids) |
|
|
| if token_count < 2: |
| continue |
|
|
| buf = get_buffer(lang) |
| row = { |
| "text": text, |
| "token_count": token_count, |
| "repo": repo, |
| "path": fpath, |
| "license": lic, |
| } |
|
|
| if buf["token_count"] + token_count > SHARD_TOKENS and buf["rows"]: |
| flush_shard(lang, buf["rows"], state) |
| save_state(state) |
| buf["rows"] = [] |
| buf["token_count"] = 0 |
|
|
| buf["rows"].append(row) |
| buf["token_count"] += token_count |
|
|
| del df |
|
|
| state["done"].append(i) |
| save_state(state) |
| print(f"[{i:06d}] β Complete") |
|
|
| |
| print("\nFlushing remaining buffers...") |
| for lang, buf in buffers.items(): |
| if buf["rows"]: |
| flush_shard(lang, buf["rows"], state) |
| save_state(state) |
|
|
| |
| print("\nWriting meta.json per language...") |
| for lang in state["lang_tokens"]: |
| meta = { |
| "language": lang, |
| "total_tokens": state["lang_tokens"][lang], |
| "total_shards": state["lang_shards"].get(lang, 0), |
| } |
| meta_path = Path(OUT_DIR) / lang / "meta.json" |
| with open(meta_path, "w") as f: |
| json.dump(meta, f, indent=2) |
| print(f" {lang}: {meta['total_tokens']:,} tokens | {meta['total_shards']} shards") |
|
|
| |
| print(f"\nPushing to {DATASET_REPO}...") |
| api.upload_folder( |
| folder_path=OUT_DIR, |
| repo_id=DATASET_REPO, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| ) |
| print("\nβ All done!") |
|
|
| |
| if __name__ == "__main__": |
| threading.Thread(target=serve, daemon=True).start() |
|
|
| print("β Loading tokenizer from /data/tokenizer.json...") |
| tokenizer = Tokenizer.from_file(TOK_PATH) |
| print(f"β Tokenizer loaded | vocab: {tokenizer.get_vocab_size():,}") |
|
|
| state = load_state() |
|
|
| threading.Thread(target=process, args=(tokenizer, state), daemon=True).start() |
|
|
| while True: |
| time.sleep(60) |