Neon-tech commited on
Commit
e7fd235
Β·
verified Β·
1 Parent(s): c5e0d0e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +118 -264
app.py CHANGED
@@ -1,286 +1,140 @@
1
  import os
2
  import json
3
- import time
4
- import socket
5
- import threading
6
  import requests
7
  import pyarrow.parquet as pq
8
  import gc
9
  from pathlib import Path
10
- from huggingface_hub import HfApi
11
 
12
  # ── Config ───────────────────────────────────────────────────────────────────
13
- HF_TOKEN = os.environ.get("HF_TOKEN")
14
- RAW_DIR = "/data/raw"
15
- STATE_FILE = "/data/state.json"
16
- WORKER_TIMEOUT = 700
17
- MAX_BUFFERED = 999999
18
 
19
- os.makedirs(RAW_DIR, exist_ok=True)
20
- api = HfApi(token=HF_TOKEN)
21
  AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
22
 
23
- # ── Sources ───────────────────────────────────────────────────────────────────
24
- SOURCES = [
25
- {
26
- "name" : "fineweb",
27
- "type" : "hf_list",
28
- "repo" : "HuggingFaceFW/fineweb-edu",
29
- "prefix" : "data/CC-MAIN-2025-26",
30
- "skip" : 5,
31
- "take" : 10,
32
- "text_col": "text",
33
- },
34
- {
35
- "name" : "wikipedia",
36
- "type" : "hf_list",
37
- "repo" : "wikimedia/wikipedia",
38
- "prefix" : "20231101.en/train-",
39
- "skip" : 2,
40
- "take" : 18,
41
- "text_col": "text",
42
- },
43
- {
44
- "name" : "openwebmath",
45
- "type" : "hf_list",
46
- "repo" : "open-web-math/open-web-math",
47
- "prefix" : "data/train-",
48
- "skip" : 0,
49
- "take" : 6,
50
- "text_col": "text",
51
- },
52
- {
53
- "name" : "code",
54
- "type" : "url_list",
55
- "text_col": "text",
56
- "fmt" : "jsonl",
57
- "urls" : [
58
- f"https://huggingface.co/buckets/Neon-tech/Dataset-arranger/resolve/by-language/{lang}/shard_{str(i).zfill(6)}.jsonl?download=true"
59
- for lang in ["C", "C++", "Java", "Go", "Rust", "Ruby", "PHP", "SQL", "C#", "Scala", "Lua", "Perl"]
60
- for i in range(2)
61
- ],
62
- },
63
  ]
64
 
65
- # ── Keep-alive ────────────────────────────────────────────────────────────────
66
- def serve():
67
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
68
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
69
- s.bind(("0.0.0.0", 7860))
70
- s.listen(5)
71
- print("βœ“ Listening on port 7860")
72
- while True:
73
- conn, _ = s.accept()
74
- conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
75
- conn.close()
76
-
77
- # ── State ─────────────────────────────────────────────────────────────────────
78
- def load_state():
79
- if os.path.exists(STATE_FILE):
80
- with open(STATE_FILE) as f:
81
- state = json.load(f)
82
- shards = state["shards"]
83
- queue = state.get("queue", [])
84
- done = sum(1 for v in shards.values() if v["status"] == "done")
85
- claimed = sum(1 for v in shards.values() if v["status"] == "claimed")
86
- pending = sum(1 for v in shards.values() if v["status"] == "pending")
87
- print(f"Resuming β€” {done} done / {claimed} claimed / {pending} buffered / {len(queue)} queued")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  else:
89
- state = {"shards": {}, "queue": []}
90
- print("Starting fresh")
91
- return state
92
-
93
- def save_state(state):
94
- tmp = STATE_FILE + ".tmp"
95
- with open(tmp, "w") as f:
96
- json.dump(state, f, indent=2)
97
- os.replace(tmp, STATE_FILE)
98
-
99
- # ── Discover ──────────────────────────────────────────────────────────────────
100
- def discover_all(state):
101
- known_urls = {v["url"] for v in state["shards"].values()} | {e["url"] for e in state.get("queue", [])}
102
- new_count = 0
103
-
104
- for src in SOURCES:
105
- name = src["name"]
106
- print(f"\nDiscovering: {name}")
107
-
108
- if src["type"] == "hf_list":
109
- all_files = sorted([
110
- f for f in api.list_repo_files(src["repo"], repo_type="dataset")
111
- if f.startswith(src["prefix"]) and f.endswith(".parquet")
112
- ])
113
- selected = all_files[src["skip"]: src["skip"] + src["take"]]
114
- base_url = f"https://huggingface.co/datasets/{src['repo']}/resolve/main/"
115
- urls = [base_url + f for f in selected]
116
- fmt = "parquet"
117
- else:
118
- urls = src["urls"]
119
- fmt = src.get("fmt", "parquet")
120
-
121
- added = 0
122
- for url in urls:
123
- if url not in known_urls:
124
- state["queue"].append({
125
- "url" : url,
126
- "source" : name,
127
- "text_col" : src["text_col"],
128
- "fmt" : fmt,
129
- })
130
- known_urls.add(url)
131
- new_count += 1
132
- added += 1
133
-
134
- print(f" {name}: {len(urls)} files | {added} new added to queue")
135
-
136
- save_state(state)
137
- print(f"\nTotal queued: {len(state['queue'])} | In state: {len(state['shards'])}")
138
-
139
- # ── Reclaim stale ─────────────────────────────────────────────────────────────
140
- def reclaim_stale(state):
141
- now = time.time()
142
- reclaimed = 0
143
- for name, info in state["shards"].items():
144
- if info["status"] == "claimed" and info.get("claimed_at"):
145
- if now - info["claimed_at"] > WORKER_TIMEOUT:
146
- print(f" ⚠ Reclaiming: {name}")
147
- info["status"] = "pending"
148
- info["worker"] = None
149
- info["claimed_at"] = None
150
- reclaimed += 1
151
- if reclaimed:
152
- save_state(state)
153
-
154
- # ── Parquet β†’ JSONL ───────────────────────────────────────────────────────────
155
- def parquet_to_jsonl(parquet_path, jsonl_path, text_col):
156
- """Stream parquet batch by batch β†’ write one JSON line per doc. No full load."""
157
- pf = pq.ParquetFile(parquet_path)
158
- n_written = 0
159
- with open(jsonl_path, "w", encoding="utf-8") as out:
160
- for batch in pf.iter_batches(batch_size=1_000, columns=[text_col]):
161
- texts = batch.column(text_col).to_pylist()
162
- for text in texts:
163
- if text and isinstance(text, str) and text.strip():
164
- out.write(json.dumps({"text": text.strip()}, ensure_ascii=False) + "\n")
165
  n_written += 1
166
- del texts
167
- gc.collect()
168
- return n_written
169
-
170
- # ── Download loop ─────────────────────────────────────────────────────────────
171
- def download_loop(state):
172
- while True:
173
- try:
174
- with open(STATE_FILE) as f:
175
- fresh = json.load(f)
176
- state["shards"] = fresh["shards"]
177
- state["queue"] = fresh.get("queue", [])
178
- except Exception:
179
- pass
180
-
181
- reclaim_stale(state)
182
-
183
- buffered = sum(1 for v in state["shards"].values() if v["status"] == "pending")
184
- if buffered >= MAX_BUFFERED:
185
- time.sleep(30)
186
- continue
187
-
188
- if not state["queue"]:
189
- done = sum(1 for v in state["shards"].values() if v["status"] == "done")
190
- total = len(state["shards"])
191
- if done == total and total > 0:
192
- print("βœ“ All shards complete!")
193
- break
194
- print(" Queue empty β€” sleeping...")
195
- time.sleep(60)
196
- continue
197
-
198
- entry = state["queue"][0]
199
- url = entry["url"]
200
- source = entry["source"]
201
- text_col = entry["text_col"]
202
- fmt = entry.get("fmt", "parquet")
203
 
204
- lang = url.split("?")[0].split("/")[-2]
205
- base_name = url.split("?")[0].split("/")[-1].replace(".parquet", "").replace(".jsonl", "")
206
- shard_name = f"{source}__{base_name}_{lang}.jsonl"
207
- jsonl_path = Path(RAW_DIR) / shard_name
208
- tmp_path = Path(RAW_DIR) / f"{shard_name}.tmp"
209
 
210
- print(f" Downloading: {source} | {base_name}")
211
- try:
212
- resp = requests.get(url, headers=AUTH_HEADERS, timeout=300, stream=True)
213
- resp.raise_for_status()
214
- with open(tmp_path, "wb") as f:
215
- for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
216
- f.write(chunk)
217
- except Exception as e:
218
- print(f" βœ— Download failed: {e} β€” retrying in 30s")
219
  tmp_path.unlink(missing_ok=True)
220
- time.sleep(30)
221
- continue
 
 
 
 
 
 
 
 
 
 
 
 
222
 
223
- if fmt == "parquet":
224
- print(f" Converting β†’ jsonl: {shard_name}")
225
- try:
226
- n = parquet_to_jsonl(tmp_path, jsonl_path, text_col)
227
- tmp_path.unlink(missing_ok=True)
228
- print(f" βœ“ {n:,} docs")
229
- except Exception as e:
230
- print(f" βœ— Convert failed: {e}")
231
- tmp_path.unlink(missing_ok=True)
232
- jsonl_path.unlink(missing_ok=True)
233
- time.sleep(30)
234
- continue
 
 
 
235
  else:
236
- tmp_path.rename(jsonl_path)
237
-
238
- state["queue"].pop(0)
239
- state["shards"][shard_name] = {
240
- "status" : "pending",
241
- "url" : url,
242
- "source" : source,
243
- "worker" : None,
244
- "claimed_at": None,
245
- "error" : None,
246
- }
247
- save_state(state)
248
- print(f" βœ“ Ready: {shard_name}")
249
- time.sleep(3)
250
-
251
- # ── Monitor ───────────────────────────────────────────────────────────────────
252
- def monitor_loop():
253
- while True:
254
- time.sleep(120)
255
- try:
256
- with open(STATE_FILE) as f:
257
- s = json.load(f)
258
- shards = s["shards"]
259
- queue = s.get("queue", [])
260
- done = sum(1 for v in shards.values() if v["status"] == "done")
261
- claimed = sum(1 for v in shards.values() if v["status"] == "claimed")
262
- pending = sum(1 for v in shards.values() if v["status"] == "pending")
263
- total = len(shards) + len(queue)
264
- pct = (done / total * 100) if total else 0
265
-
266
- src_done = {}
267
- for v in shards.values():
268
- src = v.get("source", "?")
269
- if v["status"] == "done":
270
- src_done[src] = src_done.get(src, 0) + 1
271
-
272
- print(f"[MONITOR] {done}/{total} ({pct:.1f}%) | {claimed} active | {pending} buffered | {len(queue)} queued")
273
- for src, cnt in sorted(src_done.items()):
274
- print(f" {src}: {cnt} done")
275
- except Exception:
276
- pass
277
 
278
- # ── Entry point ───────────────────────────────────────────────────────────────
279
  if __name__ == "__main__":
280
- threading.Thread(target=serve, daemon=True).start()
281
- state = load_state()
282
- discover_all(state)
283
- threading.Thread(target=monitor_loop, daemon=True).start()
284
- threading.Thread(target=download_loop, args=(state,), daemon=True).start()
285
- while True:
286
- time.sleep(60)
 
1
  import os
2
  import json
 
 
 
3
  import requests
4
  import pyarrow.parquet as pq
5
  import gc
6
  from pathlib import Path
 
7
 
8
  # ── Config ───────────────────────────────────────────────────────────────────
9
+ HF_TOKEN = os.environ.get("HF_TOKEN")
10
+ OUT_DIR = "/data/raw"
11
+ OUT_FILE = "/data/raw/phi__programming_books.jsonl"
 
 
12
 
13
+ os.makedirs(OUT_DIR, exist_ok=True)
 
14
  AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
15
 
16
+ PHI_URLS = [
17
+ "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00000-of-00004-ea05c5cb63b570a8.parquet?download=true",
18
+ "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00001-of-00004-d99cbe052bab0d4e.parquet?download=true",
19
+ "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00002-of-00004-2c25f0e11d537eaf.parquet?download=true",
20
+ "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00003-of-00004-faa8dbb07e5f02e8.parquet?download=true",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  ]
22
 
23
+ # Fields to extract in order (model excluded)
24
+ FIELDS = ["topic", "outline", "queries", "context", "markdown"]
25
+
26
+ # ── Helpers ───────────────────────────────────────────────────────────────────
27
+ def download_file(url, path):
28
+ r = requests.get(url, headers=AUTH_HEADERS, timeout=300, stream=True)
29
+ r.raise_for_status()
30
+ with open(path, "wb") as f:
31
+ for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
32
+ f.write(chunk)
33
+ print(f" Downloaded: {os.path.getsize(path)/1e6:.0f} MB")
34
+
35
+ def field_to_text(field_name, value):
36
+ """Convert a field value to clean text."""
37
+ if not value:
38
+ return ""
39
+ if isinstance(value, str):
40
+ return value.strip()
41
+ if isinstance(value, list):
42
+ # Join list items with newline
43
+ return "\n".join(str(item).strip() for item in value if item)
44
+ return str(value).strip()
45
+
46
+ def entry_to_text(row):
47
+ """Concatenate all useful fields into one coherent document."""
48
+ parts = []
49
+ for field in FIELDS:
50
+ val = row.get(field, None)
51
+ text = field_to_text(field, val)
52
+ if text:
53
+ parts.append(text)
54
+ return "\n\n".join(parts)
55
+
56
+ # ── Main ──────────────────────────────────────────────────────────────────────
57
+ def process_phi():
58
+ tmp_path = Path(OUT_DIR) / "phi_tmp.parquet"
59
+ n_written = 0
60
+ n_skipped = 0
61
+
62
+ # Resume β€” if output already exists, count lines
63
+ if Path(OUT_FILE).exists():
64
+ with open(OUT_FILE) as f:
65
+ existing = sum(1 for _ in f)
66
+ print(f"Resuming β€” {existing:,} entries already written")
67
  else:
68
+ existing = 0
69
+
70
+ entry_idx = 0
71
+
72
+ with open(OUT_FILE, "a", encoding="utf-8") as fout:
73
+ for url in PHI_URLS:
74
+ fname = url.split("?")[0].split("/")[-1]
75
+ print(f"\nDownloading: {fname}")
76
+ download_file(url, tmp_path)
77
+
78
+ pf = pq.ParquetFile(tmp_path)
79
+ # Discover available columns
80
+ available = pf.schema_arrow.names
81
+ use_cols = [f for f in FIELDS if f in available]
82
+ print(f" Available fields: {available}")
83
+ print(f" Using: {use_cols}")
84
+
85
+ for batch in pf.iter_batches(batch_size=500, columns=use_cols):
86
+ rows = batch.to_pydict()
87
+ n = len(batch)
88
+ for i in range(n):
89
+ entry_idx += 1
90
+ if entry_idx <= existing:
91
+ continue # skip already written
92
+
93
+ row = {col: rows[col][i] for col in use_cols}
94
+ text = entry_to_text(row)
95
+
96
+ if not text or len(text.strip()) < 50:
97
+ n_skipped += 1
98
+ continue
99
+
100
+ fout.write(json.dumps({"text": text}, ensure_ascii=False) + "\n")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  n_written += 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
+ del rows; gc.collect()
 
 
 
 
104
 
 
 
 
 
 
 
 
 
 
105
  tmp_path.unlink(missing_ok=True)
106
+ print(f" βœ“ {fname} done | written so far: {n_written + existing:,}")
107
+
108
+ total = n_written + existing
109
+ print(f"\nβœ“ Phi processing complete")
110
+ print(f" Total entries : {total:,}")
111
+ print(f" Skipped : {n_skipped:,}")
112
+ print(f" Output : {OUT_FILE}")
113
+ print(f" Size : {os.path.getsize(OUT_FILE)/1e6:.0f} MB")
114
+
115
+ # Add to state.json so workers pick it up
116
+ state_file = "/data/state.json"
117
+ if os.path.exists(state_file):
118
+ with open(state_file) as f:
119
+ state = json.load(f)
120
 
121
+ shard_name = "phi__programming_books.jsonl"
122
+ if shard_name not in state["shards"]:
123
+ state["shards"][shard_name] = {
124
+ "status" : "pending",
125
+ "url" : PHI_URLS[0], # reference only
126
+ "source" : "phi",
127
+ "worker" : None,
128
+ "claimed_at": None,
129
+ "error" : None,
130
+ }
131
+ tmp = state_file + ".tmp"
132
+ with open(tmp, "w") as f:
133
+ json.dump(state, f, indent=2)
134
+ os.replace(tmp, state_file)
135
+ print(f" βœ“ Added to state.json β€” workers will pick up automatically")
136
  else:
137
+ print(f" Already in state.json")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
 
139
  if __name__ == "__main__":
140
+ process_phi()