Dataset / app.py
Neon-tech's picture
Update app.py
7845b2b verified
import os
import json
import time
import socket
import threading
import io
import requests
import pandas as pd
from pathlib import Path
from tokenizers import Tokenizer
from huggingface_hub import HfApi
# ── Config ───────────────────────────────────────────────────────────────────
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_REPO = "Neon-coding/github-code-raw"
TOK_PATH = "/data/tokenizer.json"
OUT_DIR = "/data/by-language"
STATE_FILE = "/data/progress_state.json"
TOTAL_PARQUETS = 880
SHARD_TOKENS = 50_000_000 # 50M tokens per shard
PARQUET_URL = (
"https://huggingface.co/datasets/codeparrot/github-code-clean"
"/resolve/main/data/train-{i:05d}-of-00880.parquet"
)
os.makedirs(OUT_DIR, exist_ok=True)
api = HfApi(token=HF_TOKEN)
# ── Port 7860 β€” keeps Space green ────────────────────────────────────────────
def serve():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 7860))
s.listen(5)
print("βœ“ Listening on port 7860")
while True:
conn, _ = s.accept()
conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
conn.close()
# ── State ────────────────────────────────────────────────────────────────────
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
state = json.load(f)
print(f"Resuming β€” {len(state['done'])} / {TOTAL_PARQUETS} parquets done")
else:
state = {
"done": [],
"lang_shards": {},
"lang_tokens": {},
}
print("Starting fresh")
return state
def save_state(state, retries=3, delay=5):
for attempt in range(retries):
try:
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
return
except OSError as e:
print(f" ⚠ State save attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay)
print(" βœ— State save failed after all retries β€” continuing")
# ── Shard buffers β€” global per language, persist across parquets ─────────────
buffers = {}
def get_buffer(lang):
if lang not in buffers:
buffers[lang] = {"rows": [], "token_count": 0}
return buffers[lang]
def flush_shard(lang, rows, state):
shard_idx = state["lang_shards"].get(lang, 0)
lang_dir = Path(OUT_DIR) / lang
lang_dir.mkdir(parents=True, exist_ok=True)
shard_name = f"shard_{shard_idx:06d}.jsonl"
shard_path = lang_dir / shard_name
with open(shard_path, "w", encoding="utf-8") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
tok_in_shard = sum(r["token_count"] for r in rows)
state["lang_shards"][lang] = shard_idx + 1
state["lang_tokens"][lang] = state["lang_tokens"].get(lang, 0) + tok_in_shard
print(f" βœ“ {lang}/{shard_name} | {len(rows)} samples | {tok_in_shard:,} tokens")
# ── Main processing loop ─────────────────────────────────────────────────────
def process(tokenizer, state):
for i in range(TOTAL_PARQUETS):
if i in state["done"]:
print(f"[{i:06d}/{TOTAL_PARQUETS}] SKIP")
continue
url = PARQUET_URL.format(i=i)
print(f"[{i:06d}/{TOTAL_PARQUETS}] Downloading...")
try:
resp = requests.get(
url,
headers={"Authorization": f"Bearer {HF_TOKEN}"},
timeout=180,
)
resp.raise_for_status()
df = pd.read_parquet(io.BytesIO(resp.content))
except Exception as e:
print(f"[{i:06d}] Download error: {e} β€” skipping")
continue
print(f"[{i:06d}] {len(df):,} rows | {df['language'].nunique()} languages")
# row by row β€” constant memory
for row_tuple in df.itertuples(index=False):
lang = row_tuple.language
text = row_tuple.code if row_tuple.code else ""
repo = row_tuple.repo_name
fpath = row_tuple.path
lic = row_tuple.license
if not text.strip():
continue
enc = tokenizer.encode(text)
token_count = len(enc.ids)
if token_count < 2:
continue
buf = get_buffer(lang)
row = {
"text": text,
"token_count": token_count,
"repo": repo,
"path": fpath,
"license": lic,
}
if buf["token_count"] + token_count > SHARD_TOKENS and buf["rows"]:
flush_shard(lang, buf["rows"], state)
save_state(state)
buf["rows"] = []
buf["token_count"] = 0
buf["rows"].append(row)
buf["token_count"] += token_count
del df
state["done"].append(i)
save_state(state)
print(f"[{i:06d}] βœ“ Complete")
# ── Flush remaining partial shards ────────────────────────────────────────
print("\nFlushing remaining buffers...")
for lang, buf in buffers.items():
if buf["rows"]:
flush_shard(lang, buf["rows"], state)
save_state(state)
# ── Write meta.json per language ──────────────────────────────────────────
print("\nWriting meta.json per language...")
for lang in state["lang_tokens"]:
meta = {
"language": lang,
"total_tokens": state["lang_tokens"][lang],
"total_shards": state["lang_shards"].get(lang, 0),
}
meta_path = Path(OUT_DIR) / lang / "meta.json"
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
print(f" {lang}: {meta['total_tokens']:,} tokens | {meta['total_shards']} shards")
# ── Push everything to HF dataset repo ───────────────────────────────────
print(f"\nPushing to {DATASET_REPO}...")
api.upload_folder(
folder_path=OUT_DIR,
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
)
print("\nβœ“ All done!")
# ── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
threading.Thread(target=serve, daemon=True).start()
print("βœ“ Loading tokenizer from /data/tokenizer.json...")
tokenizer = Tokenizer.from_file(TOK_PATH)
print(f"βœ“ Tokenizer loaded | vocab: {tokenizer.get_vocab_size():,}")
state = load_state()
threading.Thread(target=process, args=(tokenizer, state), daemon=True).start()
while True:
time.sleep(60)