Neon-tech commited on
Commit
528f3a5
Β·
verified Β·
1 Parent(s): e7fd235

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +264 -118
app.py CHANGED
@@ -1,140 +1,286 @@
1
  import os
2
  import json
 
 
 
3
  import requests
4
  import pyarrow.parquet as pq
5
  import gc
6
  from pathlib import Path
 
7
 
8
  # ── Config ───────────────────────────────────────────────────────────────────
9
- HF_TOKEN = os.environ.get("HF_TOKEN")
10
- OUT_DIR = "/data/raw"
11
- OUT_FILE = "/data/raw/phi__programming_books.jsonl"
 
 
12
 
13
- os.makedirs(OUT_DIR, exist_ok=True)
 
14
  AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
15
 
16
- PHI_URLS = [
17
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00000-of-00004-ea05c5cb63b570a8.parquet?download=true",
18
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00001-of-00004-d99cbe052bab0d4e.parquet?download=true",
19
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00002-of-00004-2c25f0e11d537eaf.parquet?download=true",
20
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00003-of-00004-faa8dbb07e5f02e8.parquet?download=true",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  ]
22
 
23
- # Fields to extract in order (model excluded)
24
- FIELDS = ["topic", "outline", "queries", "context", "markdown"]
25
-
26
- # ── Helpers ───────────────────────────────────────────────────────────────────
27
- def download_file(url, path):
28
- r = requests.get(url, headers=AUTH_HEADERS, timeout=300, stream=True)
29
- r.raise_for_status()
30
- with open(path, "wb") as f:
31
- for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
32
- f.write(chunk)
33
- print(f" Downloaded: {os.path.getsize(path)/1e6:.0f} MB")
34
-
35
- def field_to_text(field_name, value):
36
- """Convert a field value to clean text."""
37
- if not value:
38
- return ""
39
- if isinstance(value, str):
40
- return value.strip()
41
- if isinstance(value, list):
42
- # Join list items with newline
43
- return "\n".join(str(item).strip() for item in value if item)
44
- return str(value).strip()
45
-
46
- def entry_to_text(row):
47
- """Concatenate all useful fields into one coherent document."""
48
- parts = []
49
- for field in FIELDS:
50
- val = row.get(field, None)
51
- text = field_to_text(field, val)
52
- if text:
53
- parts.append(text)
54
- return "\n\n".join(parts)
55
-
56
- # ── Main ──────────────────────────────────────────────────────────────────────
57
- def process_phi():
58
- tmp_path = Path(OUT_DIR) / "phi_tmp.parquet"
59
- n_written = 0
60
- n_skipped = 0
61
-
62
- # Resume β€” if output already exists, count lines
63
- if Path(OUT_FILE).exists():
64
- with open(OUT_FILE) as f:
65
- existing = sum(1 for _ in f)
66
- print(f"Resuming β€” {existing:,} entries already written")
67
  else:
68
- existing = 0
69
-
70
- entry_idx = 0
71
-
72
- with open(OUT_FILE, "a", encoding="utf-8") as fout:
73
- for url in PHI_URLS:
74
- fname = url.split("?")[0].split("/")[-1]
75
- print(f"\nDownloading: {fname}")
76
- download_file(url, tmp_path)
77
-
78
- pf = pq.ParquetFile(tmp_path)
79
- # Discover available columns
80
- available = pf.schema_arrow.names
81
- use_cols = [f for f in FIELDS if f in available]
82
- print(f" Available fields: {available}")
83
- print(f" Using: {use_cols}")
84
-
85
- for batch in pf.iter_batches(batch_size=500, columns=use_cols):
86
- rows = batch.to_pydict()
87
- n = len(batch)
88
- for i in range(n):
89
- entry_idx += 1
90
- if entry_idx <= existing:
91
- continue # skip already written
92
-
93
- row = {col: rows[col][i] for col in use_cols}
94
- text = entry_to_text(row)
95
-
96
- if not text or len(text.strip()) < 50:
97
- n_skipped += 1
98
- continue
99
-
100
- fout.write(json.dumps({"text": text}, ensure_ascii=False) + "\n")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  n_written += 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
- del rows; gc.collect()
 
 
 
 
104
 
 
 
 
 
 
 
 
 
 
105
  tmp_path.unlink(missing_ok=True)
106
- print(f" βœ“ {fname} done | written so far: {n_written + existing:,}")
107
-
108
- total = n_written + existing
109
- print(f"\nβœ“ Phi processing complete")
110
- print(f" Total entries : {total:,}")
111
- print(f" Skipped : {n_skipped:,}")
112
- print(f" Output : {OUT_FILE}")
113
- print(f" Size : {os.path.getsize(OUT_FILE)/1e6:.0f} MB")
114
-
115
- # Add to state.json so workers pick it up
116
- state_file = "/data/state.json"
117
- if os.path.exists(state_file):
118
- with open(state_file) as f:
119
- state = json.load(f)
120
 
121
- shard_name = "phi__programming_books.jsonl"
122
- if shard_name not in state["shards"]:
123
- state["shards"][shard_name] = {
124
- "status" : "pending",
125
- "url" : PHI_URLS[0], # reference only
126
- "source" : "phi",
127
- "worker" : None,
128
- "claimed_at": None,
129
- "error" : None,
130
- }
131
- tmp = state_file + ".tmp"
132
- with open(tmp, "w") as f:
133
- json.dump(state, f, indent=2)
134
- os.replace(tmp, state_file)
135
- print(f" βœ“ Added to state.json β€” workers will pick up automatically")
136
  else:
137
- print(f" Already in state.json")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
 
139
  if __name__ == "__main__":
140
- process_phi()
 
 
 
 
 
 
 
1
  import os
2
  import json
3
+ import time
4
+ import socket
5
+ import threading
6
  import requests
7
  import pyarrow.parquet as pq
8
  import gc
9
  from pathlib import Path
10
+ from huggingface_hub import HfApi
11
 
12
  # ── Config ───────────────────────────────────────────────────────────────────
13
+ HF_TOKEN = os.environ.get("HF_TOKEN")
14
+ RAW_DIR = "/data/raw"
15
+ STATE_FILE = "/data/state.json"
16
+ WORKER_TIMEOUT = 700
17
+ MAX_BUFFERED = 999999
18
 
19
+ os.makedirs(RAW_DIR, exist_ok=True)
20
+ api = HfApi(token=HF_TOKEN)
21
  AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
22
 
23
+ # ── Sources ───────────────────────────────────────────────────────────────────
24
+ SOURCES = [
25
+ {
26
+ "name" : "fineweb",
27
+ "type" : "hf_list",
28
+ "repo" : "HuggingFaceFW/fineweb-edu",
29
+ "prefix" : "data/CC-MAIN-2025-26",
30
+ "skip" : 5,
31
+ "take" : 10,
32
+ "text_col": "text",
33
+ },
34
+ {
35
+ "name" : "wikipedia",
36
+ "type" : "hf_list",
37
+ "repo" : "wikimedia/wikipedia",
38
+ "prefix" : "20231101.en/train-",
39
+ "skip" : 2,
40
+ "take" : 18,
41
+ "text_col": "text",
42
+ },
43
+ {
44
+ "name" : "openwebmath",
45
+ "type" : "hf_list",
46
+ "repo" : "open-web-math/open-web-math",
47
+ "prefix" : "data/train-",
48
+ "skip" : 0,
49
+ "take" : 6,
50
+ "text_col": "text",
51
+ },
52
+ {
53
+ "name" : "code",
54
+ "type" : "url_list",
55
+ "text_col": "text",
56
+ "fmt" : "jsonl",
57
+ "urls" : [
58
+ f"https://huggingface.co/buckets/Neon-tech/Dataset-arranger/resolve/by-language/{lang}/shard_{str(i).zfill(6)}.jsonl?download=true"
59
+ for lang in ["C", "C++", "Java", "Go", "Rust", "Ruby", "PHP", "SQL", "C#", "Scala", "Lua", "Perl", "CSS"]
60
+ for i in range(2)
61
+ ],
62
+ },
63
  ]
64
 
65
+ # ── Keep-alive ────────────────────────────────────────────────────────────────
66
+ def serve():
67
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
68
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
69
+ s.bind(("0.0.0.0", 7860))
70
+ s.listen(5)
71
+ print("βœ“ Listening on port 7860")
72
+ while True:
73
+ conn, _ = s.accept()
74
+ conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
75
+ conn.close()
76
+
77
+ # ── State ─────────────────────────────────────────────────────────────────────
78
+ def load_state():
79
+ if os.path.exists(STATE_FILE):
80
+ with open(STATE_FILE) as f:
81
+ state = json.load(f)
82
+ shards = state["shards"]
83
+ queue = state.get("queue", [])
84
+ done = sum(1 for v in shards.values() if v["status"] == "done")
85
+ claimed = sum(1 for v in shards.values() if v["status"] == "claimed")
86
+ pending = sum(1 for v in shards.values() if v["status"] == "pending")
87
+ print(f"Resuming β€” {done} done / {claimed} claimed / {pending} buffered / {len(queue)} queued")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  else:
89
+ state = {"shards": {}, "queue": []}
90
+ print("Starting fresh")
91
+ return state
92
+
93
+ def save_state(state):
94
+ tmp = STATE_FILE + ".tmp"
95
+ with open(tmp, "w") as f:
96
+ json.dump(state, f, indent=2)
97
+ os.replace(tmp, STATE_FILE)
98
+
99
+ # ── Discover ──────────────────────────────────────────────────────────────────
100
+ def discover_all(state):
101
+ known_urls = {v["url"] for v in state["shards"].values()} | {e["url"] for e in state.get("queue", [])}
102
+ new_count = 0
103
+
104
+ for src in SOURCES:
105
+ name = src["name"]
106
+ print(f"\nDiscovering: {name}")
107
+
108
+ if src["type"] == "hf_list":
109
+ all_files = sorted([
110
+ f for f in api.list_repo_files(src["repo"], repo_type="dataset")
111
+ if f.startswith(src["prefix"]) and f.endswith(".parquet")
112
+ ])
113
+ selected = all_files[src["skip"]: src["skip"] + src["take"]]
114
+ base_url = f"https://huggingface.co/datasets/{src['repo']}/resolve/main/"
115
+ urls = [base_url + f for f in selected]
116
+ fmt = "parquet"
117
+ else:
118
+ urls = src["urls"]
119
+ fmt = src.get("fmt", "parquet")
120
+
121
+ added = 0
122
+ for url in urls:
123
+ if url not in known_urls:
124
+ state["queue"].append({
125
+ "url" : url,
126
+ "source" : name,
127
+ "text_col" : src["text_col"],
128
+ "fmt" : fmt,
129
+ })
130
+ known_urls.add(url)
131
+ new_count += 1
132
+ added += 1
133
+
134
+ print(f" {name}: {len(urls)} files | {added} new added to queue")
135
+
136
+ save_state(state)
137
+ print(f"\nTotal queued: {len(state['queue'])} | In state: {len(state['shards'])}")
138
+
139
+ # ── Reclaim stale ─────────────────────────────────────────────────────────────
140
+ def reclaim_stale(state):
141
+ now = time.time()
142
+ reclaimed = 0
143
+ for name, info in state["shards"].items():
144
+ if info["status"] == "claimed" and info.get("claimed_at"):
145
+ if now - info["claimed_at"] > WORKER_TIMEOUT:
146
+ print(f" ⚠ Reclaiming: {name}")
147
+ info["status"] = "pending"
148
+ info["worker"] = None
149
+ info["claimed_at"] = None
150
+ reclaimed += 1
151
+ if reclaimed:
152
+ save_state(state)
153
+
154
+ # ── Parquet β†’ JSONL ────────────────────────────────────��──────────────────────
155
+ def parquet_to_jsonl(parquet_path, jsonl_path, text_col):
156
+ """Stream parquet batch by batch β†’ write one JSON line per doc. No full load."""
157
+ pf = pq.ParquetFile(parquet_path)
158
+ n_written = 0
159
+ with open(jsonl_path, "w", encoding="utf-8") as out:
160
+ for batch in pf.iter_batches(batch_size=1_000, columns=[text_col]):
161
+ texts = batch.column(text_col).to_pylist()
162
+ for text in texts:
163
+ if text and isinstance(text, str) and text.strip():
164
+ out.write(json.dumps({"text": text.strip()}, ensure_ascii=False) + "\n")
165
  n_written += 1
166
+ del texts
167
+ gc.collect()
168
+ return n_written
169
+
170
+ # ── Download loop ─────────────────────────────────────────────────────────────
171
+ def download_loop(state):
172
+ while True:
173
+ try:
174
+ with open(STATE_FILE) as f:
175
+ fresh = json.load(f)
176
+ state["shards"] = fresh["shards"]
177
+ state["queue"] = fresh.get("queue", [])
178
+ except Exception:
179
+ pass
180
+
181
+ reclaim_stale(state)
182
+
183
+ buffered = sum(1 for v in state["shards"].values() if v["status"] == "pending")
184
+ if buffered >= MAX_BUFFERED:
185
+ time.sleep(30)
186
+ continue
187
+
188
+ if not state["queue"]:
189
+ done = sum(1 for v in state["shards"].values() if v["status"] == "done")
190
+ total = len(state["shards"])
191
+ if done == total and total > 0:
192
+ print("βœ“ All shards complete!")
193
+ break
194
+ print(" Queue empty β€” sleeping...")
195
+ time.sleep(60)
196
+ continue
197
+
198
+ entry = state["queue"][0]
199
+ url = entry["url"]
200
+ source = entry["source"]
201
+ text_col = entry["text_col"]
202
+ fmt = entry.get("fmt", "parquet")
203
 
204
+ lang = url.split("?")[0].split("/")[-2]
205
+ base_name = url.split("?")[0].split("/")[-1].replace(".parquet", "").replace(".jsonl", "")
206
+ shard_name = f"{source}__{base_name}_{lang}.jsonl"
207
+ jsonl_path = Path(RAW_DIR) / shard_name
208
+ tmp_path = Path(RAW_DIR) / f"{shard_name}.tmp"
209
 
210
+ print(f" Downloading: {source} | {base_name}")
211
+ try:
212
+ resp = requests.get(url, headers=AUTH_HEADERS, timeout=300, stream=True)
213
+ resp.raise_for_status()
214
+ with open(tmp_path, "wb") as f:
215
+ for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
216
+ f.write(chunk)
217
+ except Exception as e:
218
+ print(f" βœ— Download failed: {e} β€” retrying in 30s")
219
  tmp_path.unlink(missing_ok=True)
220
+ time.sleep(30)
221
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
222
 
223
+ if fmt == "parquet":
224
+ print(f" Converting β†’ jsonl: {shard_name}")
225
+ try:
226
+ n = parquet_to_jsonl(tmp_path, jsonl_path, text_col)
227
+ tmp_path.unlink(missing_ok=True)
228
+ print(f" βœ“ {n:,} docs")
229
+ except Exception as e:
230
+ print(f" βœ— Convert failed: {e}")
231
+ tmp_path.unlink(missing_ok=True)
232
+ jsonl_path.unlink(missing_ok=True)
233
+ time.sleep(30)
234
+ continue
 
 
 
235
  else:
236
+ tmp_path.rename(jsonl_path)
237
+
238
+ state["queue"].pop(0)
239
+ state["shards"][shard_name] = {
240
+ "status" : "pending",
241
+ "url" : url,
242
+ "source" : source,
243
+ "worker" : None,
244
+ "claimed_at": None,
245
+ "error" : None,
246
+ }
247
+ save_state(state)
248
+ print(f" βœ“ Ready: {shard_name}")
249
+ time.sleep(3)
250
+
251
+ # ── Monitor ───────────────────────────────────────────────────────────────────
252
+ def monitor_loop():
253
+ while True:
254
+ time.sleep(120)
255
+ try:
256
+ with open(STATE_FILE) as f:
257
+ s = json.load(f)
258
+ shards = s["shards"]
259
+ queue = s.get("queue", [])
260
+ done = sum(1 for v in shards.values() if v["status"] == "done")
261
+ claimed = sum(1 for v in shards.values() if v["status"] == "claimed")
262
+ pending = sum(1 for v in shards.values() if v["status"] == "pending")
263
+ total = len(shards) + len(queue)
264
+ pct = (done / total * 100) if total else 0
265
+
266
+ src_done = {}
267
+ for v in shards.values():
268
+ src = v.get("source", "?")
269
+ if v["status"] == "done":
270
+ src_done[src] = src_done.get(src, 0) + 1
271
+
272
+ print(f"[MONITOR] {done}/{total} ({pct:.1f}%) | {claimed} active | {pending} buffered | {len(queue)} queued")
273
+ for src, cnt in sorted(src_done.items()):
274
+ print(f" {src}: {cnt} done")
275
+ except Exception:
276
+ pass
277
 
278
+ # ── Entry point ───────────────────────────────────────────────────────────────
279
  if __name__ == "__main__":
280
+ threading.Thread(target=serve, daemon=True).start()
281
+ state = load_state()
282
+ discover_all(state)
283
+ threading.Thread(target=monitor_loop, daemon=True).start()
284
+ threading.Thread(target=download_loop, args=(state,), daemon=True).start()
285
+ while True:
286
+ time.sleep(60)