broadfield-dev commited on
Commit
2fcce3e
·
verified ·
1 Parent(s): a738515

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +194 -103
processor.py CHANGED
@@ -14,49 +14,61 @@ class DatasetCommandCenter:
14
  self.token = token
15
  self.api = HfApi(token=token)
16
 
17
- # --- 1. METADATA & INSPECTION ---
 
 
18
 
19
  def get_dataset_metadata(self, dataset_id):
20
- configs = []
21
- splits = []
 
 
 
 
22
  license_name = "unknown"
23
 
24
- # 1. Get Configs
25
  try:
26
- configs = get_dataset_config_names(dataset_id, token=self.token)
27
- except Exception as e:
28
- logger.warning(f"Could not fetch configs: {e}")
29
- configs = ['default']
30
-
31
- # 2. Get Splits & License
32
- try:
33
- selected_config = configs[0] if configs else 'default'
34
- infos = get_dataset_infos(dataset_id, token=self.token)
35
-
36
- info_obj = None
37
- if selected_config in infos:
38
- info_obj = infos[selected_config]
39
- elif 'default' in infos:
40
- info_obj = infos['default']
41
- elif len(infos) > 0:
42
- info_obj = list(infos.values())[0]
43
 
44
- if info_obj:
45
- splits = list(info_obj.splits.keys())
46
- license_name = info_obj.license or "unknown"
 
 
47
 
48
- except Exception as e:
49
- logger.warning(f"Metadata fetch fallback: {e}")
50
- splits = ['train', 'test', 'validation']
 
 
 
 
 
 
 
 
 
 
51
 
52
- return {
53
- "status": "success",
54
- "configs": configs if configs else ['default'],
55
- "splits": splits if splits else ['train'],
56
- "license_detected": license_name
57
- }
 
 
58
 
59
  def get_splits_for_config(self, dataset_id, config_name):
 
 
 
60
  try:
61
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
62
  if config_name in infos:
@@ -65,112 +77,165 @@ class DatasetCommandCenter:
65
  splits = list(infos.values())[0].splits.keys()
66
  else:
67
  splits = ['train', 'test']
 
68
  except:
69
- splits = ['train', 'test', 'validation']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
 
71
- return {"status": "success", "splits": splits}
72
 
73
  def inspect_dataset(self, dataset_id, config, split):
 
 
 
74
  try:
75
  conf = config if config != 'default' else None
76
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
77
 
78
  sample_rows = []
79
- schema_map = {}
 
80
 
81
  for i, row in enumerate(ds_stream):
82
  if i >= 10: break
83
 
84
- # Create clean sample for UI
85
  clean_row = {}
86
  for k, v in row.items():
87
- # Convert objects to strings for display safety
88
  if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
89
  clean_row[k] = str(v)
90
  else:
91
  clean_row[k] = v
92
  sample_rows.append(clean_row)
93
 
94
- # Analyze Schema
 
 
 
 
95
  for k, v in row.items():
96
- if k not in schema_map:
97
- schema_map[k] = {"is_list": False, "keys": set()}
98
 
99
  val = v
100
- # Check for JSON string
101
  if isinstance(val, str):
102
- try:
103
- val = json.loads(val)
104
  except: pass
105
 
106
- if isinstance(val, list):
107
- schema_map[k]["is_list"] = True
108
- if len(val) > 0 and isinstance(val[0], dict):
109
- schema_map[k]["keys"].update(val[0].keys())
110
- elif isinstance(val, dict):
111
- schema_map[k]["keys"].update(val.keys())
112
-
113
- formatted_schema = {}
114
- for k, info in schema_map.items():
115
- formatted_schema[k] = {
116
- "type": "List" if info["is_list"] else "Object",
117
- "keys": list(info["keys"])
118
- }
119
 
120
  return {
121
  "status": "success",
122
  "samples": sample_rows,
123
- "schema": formatted_schema,
 
124
  "dataset_id": dataset_id
125
  }
126
  except Exception as e:
127
  return {"status": "error", "message": str(e)}
128
 
129
- # --- 2. EXTRACTION LOGIC ---
 
 
130
 
131
  def _get_value_by_path(self, obj, path):
 
 
 
 
132
  if not path: return obj
133
  keys = path.split('.')
134
  current = obj
135
 
136
  for key in keys:
 
137
  if isinstance(current, str):
138
  s = current.strip()
139
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
140
  try:
141
  current = json.loads(s)
142
- except: pass
 
143
 
144
  if isinstance(current, dict) and key in current:
145
  current = current[key]
146
  else:
147
- return None
148
  return current
149
 
150
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
 
 
 
151
  data = row.get(source_col)
 
 
152
  if isinstance(data, str):
153
  try:
154
  data = json.loads(data)
155
- except: return None
 
156
 
157
  if not isinstance(data, list):
158
  return None
159
 
160
  matched_item = None
161
  for item in data:
 
162
  if str(item.get(filter_key, '')) == str(filter_val):
163
  matched_item = item
164
  break
165
 
166
  if matched_item:
167
  return self._get_value_by_path(matched_item, target_path)
 
168
  return None
169
 
170
  def _apply_projection(self, row, recipe):
 
 
 
 
171
  new_row = {}
172
 
173
- # Setup Context for Python/Eval
174
  eval_context = row.copy()
175
  eval_context['row'] = row
176
  eval_context['json'] = json
@@ -194,21 +259,25 @@ class DatasetCommandCenter:
194
  )
195
 
196
  elif t_type == 'python':
 
197
  expression = col_def['expression']
198
  val = eval(expression, {}, eval_context)
199
  new_row[target_col] = val
200
 
201
  except Exception as e:
202
- # Fail Fast: Raise error to stop the generator
203
  raise ValueError(f"Column '{target_col}' failed: {str(e)}")
204
 
205
  return new_row
206
 
207
- # --- 3. DOCUMENTATION (CARD) ---
 
 
208
 
209
  def _generate_card(self, source_id, target_id, recipe, license_name):
210
- logger.info(f"Generating card for {target_id} with license {license_name}")
211
-
 
212
  card_data = DatasetCardData(
213
  language="en",
214
  license=license_name,
@@ -226,20 +295,22 @@ It was generated using the **Hugging Face Dataset Command Center**.
226
 
227
  The following operations were applied to the source data:
228
 
229
- | Target Column | Source | Type | Logic / Filter |
230
- |---------------|--------|------|----------------|
231
  """
232
  for col in recipe['columns']:
233
  c_type = col.get('type', 'simple')
234
  c_name = col['name']
235
- c_src = col.get('source', '-')
236
 
237
  logic = "-"
238
- if c_type == 'simple': logic = "Direct Mapping"
239
- elif c_type == 'list_search': logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
240
- elif c_type == 'python': logic = f"`{col.get('expression')}`"
 
 
 
241
 
242
- content += f"| **{c_name}** | `{c_src}` | {c_type} | {logic} |\n"
243
 
244
  if recipe.get('filter_rule'):
245
  content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n"
@@ -249,7 +320,9 @@ The following operations were applied to the source data:
249
  card = DatasetCard.from_template(card_data, content=content)
250
  return card
251
 
252
- # --- 4. EXECUTION ---
 
 
253
 
254
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
255
  logger.info(f"Job started: {source_id} -> {target_id}")
@@ -259,39 +332,42 @@ The following operations were applied to the source data:
259
  ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
260
  count = 0
261
  for i, row in enumerate(ds_stream):
262
- if max_rows and count >= int(max_rows): break
 
263
 
264
- # Filter
265
  if recipe.get('filter_rule'):
266
  try:
267
  ctx = row.copy()
268
  ctx['row'] = row
 
 
269
  if not eval(recipe['filter_rule'], {}, ctx):
270
  continue
271
  except Exception as e:
272
  raise ValueError(f"Filter crashed on row {i}: {e}")
273
 
274
- # Projection
275
  try:
276
  yield self._apply_projection(row, recipe)
277
  count += 1
278
  except ValueError as ve:
 
279
  raise ve
280
  except Exception as e:
281
- raise ValueError(f"Crash on row {i}: {e}")
282
 
283
  try:
284
- # 1. Push Data
285
  new_dataset = datasets.Dataset.from_generator(gen)
286
  new_dataset.push_to_hub(target_id, token=self.token)
287
 
288
- # 2. Push Card
289
  try:
290
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
291
  card.push_to_hub(target_id, token=self.token)
292
  except Exception as e:
293
  logger.error(f"Failed to push Dataset Card: {e}")
294
- # We do NOT fail the whole job, but we log it.
295
 
296
  return {"status": "success", "rows_processed": len(new_dataset)}
297
 
@@ -299,27 +375,42 @@ The following operations were applied to the source data:
299
  logger.error(f"Job Failed: {e}")
300
  return {"status": "failed", "error": str(e)}
301
 
302
- # --- 5. PREVIEW ---
 
 
 
303
  def preview_transform(self, dataset_id, config, split, recipe):
304
  conf = config if config != 'default' else None
305
- ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
306
- processed = []
307
- for row in ds_stream:
308
- if len(processed) >= 5: break
309
-
310
- # Filter
311
- passed = True
312
- if recipe.get('filter_rule'):
313
- try:
314
- ctx = row.copy()
315
- ctx['row'] = row
316
- if not eval(recipe['filter_rule'], {}, ctx): passed = False
317
- except: passed = False
318
 
319
- if passed:
320
- try:
321
- processed.append(self._apply_projection(row, recipe))
322
- except Exception as e:
323
- processed.append({"error": str(e)}) # Show error in preview
324
-
325
- return processed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  self.token = token
15
  self.api = HfApi(token=token)
16
 
17
+ # ==========================================
18
+ # 1. METADATA & INSPECTION
19
+ # ==========================================
20
 
21
  def get_dataset_metadata(self, dataset_id):
22
+ """
23
+ Fetches available Configs (subsets), Splits, and License info
24
+ without downloading the actual data rows.
25
+ """
26
+ configs = ['default']
27
+ splits = ['train', 'test', 'validation']
28
  license_name = "unknown"
29
 
 
30
  try:
31
+ # 1. Fetch Configs
32
+ try:
33
+ found_configs = get_dataset_config_names(dataset_id, token=self.token)
34
+ if found_configs:
35
+ configs = found_configs
36
+ except Exception:
37
+ pass # Keep default
 
 
 
 
 
 
 
 
 
 
38
 
39
+ # 2. Fetch Metadata (Splits & License)
40
+ try:
41
+ selected = configs[0]
42
+ # This API call can fail on some datasets, so we wrap it safely
43
+ infos = get_dataset_infos(dataset_id, token=self.token)
44
 
45
+ info = None
46
+ if selected in infos:
47
+ info = infos[selected]
48
+ elif 'default' in infos:
49
+ info = infos['default']
50
+ elif infos:
51
+ info = list(infos.values())[0]
52
+
53
+ if info:
54
+ splits = list(info.splits.keys())
55
+ license_name = info.license or "unknown"
56
+ except Exception:
57
+ pass # Keep defaults if metadata fails
58
 
59
+ return {
60
+ "status": "success",
61
+ "configs": configs,
62
+ "splits": splits,
63
+ "license_detected": license_name
64
+ }
65
+ except Exception as e:
66
+ return {"status": "error", "message": str(e)}
67
 
68
  def get_splits_for_config(self, dataset_id, config_name):
69
+ """
70
+ Updates the Split dropdown when the user changes the Config.
71
+ """
72
  try:
73
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
74
  if config_name in infos:
 
77
  splits = list(infos.values())[0].splits.keys()
78
  else:
79
  splits = ['train', 'test']
80
+ return {"status": "success", "splits": splits}
81
  except:
82
+ return {"status": "success", "splits": ['train', 'test', 'validation']}
83
+
84
+ def _flatten_object(self, obj, parent_key='', sep='.'):
85
+ """
86
+ Recursively finds all keys in nested dicts or JSON strings
87
+ to populate the 'Simple Path' dropdown in the UI.
88
+ """
89
+ items = {}
90
+
91
+ # Transparently parse JSON strings
92
+ if isinstance(obj, str):
93
+ s = obj.strip()
94
+ if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
95
+ try:
96
+ obj = json.loads(s)
97
+ except:
98
+ pass # Keep as string if parse fails
99
+
100
+ if isinstance(obj, dict):
101
+ for k, v in obj.items():
102
+ new_key = f"{parent_key}{sep}{k}" if parent_key else k
103
+ items.update(self._flatten_object(v, new_key, sep=sep))
104
+ elif isinstance(obj, list):
105
+ # We mark lists but do not recurse infinitely
106
+ new_key = f"{parent_key}" if parent_key else "list_content"
107
+ items[new_key] = "List"
108
+ else:
109
+ # Leaf node
110
+ items[parent_key] = type(obj).__name__
111
 
112
+ return items
113
 
114
  def inspect_dataset(self, dataset_id, config, split):
115
+ """
116
+ Scans the first 10 rows to build a Schema Tree for the UI.
117
+ """
118
  try:
119
  conf = config if config != 'default' else None
120
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
121
 
122
  sample_rows = []
123
+ available_paths = set()
124
+ schema_map = {} # Used for List Mode detection
125
 
126
  for i, row in enumerate(ds_stream):
127
  if i >= 10: break
128
 
129
+ # 1. Clean row for UI Preview (convert objects to strings)
130
  clean_row = {}
131
  for k, v in row.items():
 
132
  if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
133
  clean_row[k] = str(v)
134
  else:
135
  clean_row[k] = v
136
  sample_rows.append(clean_row)
137
 
138
+ # 2. Deep Flattening for "Simple Path" dropdowns
139
+ flattened = self._flatten_object(row)
140
+ available_paths.update(flattened.keys())
141
+
142
+ # 3. Top Level Analysis for "List Mode" detection
143
  for k, v in row.items():
144
+ if k not in schema_map:
145
+ schema_map[k] = {"type": "Object"}
146
 
147
  val = v
 
148
  if isinstance(val, str):
149
+ try: val = json.loads(val)
 
150
  except: pass
151
 
152
+ if isinstance(val, list):
153
+ schema_map[k]["type"] = "List"
154
+
155
+ # Reconstruct Schema Tree for UI grouping
156
+ sorted_paths = sorted(list(available_paths))
157
+ schema_tree = {}
158
+ for path in sorted_paths:
159
+ root = path.split('.')[0]
160
+ if root not in schema_tree:
161
+ schema_tree[root] = []
162
+ schema_tree[root].append(path)
 
 
163
 
164
  return {
165
  "status": "success",
166
  "samples": sample_rows,
167
+ "schema_tree": schema_tree, # Used by Simple Path Dropdown
168
+ "schema": schema_map, # Used by List Mode Dropdown
169
  "dataset_id": dataset_id
170
  }
171
  except Exception as e:
172
  return {"status": "error", "message": str(e)}
173
 
174
+ # ==========================================
175
+ # 2. CORE EXTRACTION LOGIC
176
+ # ==========================================
177
 
178
  def _get_value_by_path(self, obj, path):
179
+ """
180
+ Navigates dot notation (meta.user.id), automatically parsing
181
+ JSON strings if encountered along the path.
182
+ """
183
  if not path: return obj
184
  keys = path.split('.')
185
  current = obj
186
 
187
  for key in keys:
188
+ # Auto-parse JSON string if encountered
189
  if isinstance(current, str):
190
  s = current.strip()
191
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
192
  try:
193
  current = json.loads(s)
194
+ except:
195
+ pass
196
 
197
  if isinstance(current, dict) and key in current:
198
  current = current[key]
199
  else:
200
+ return None # Path broken
201
  return current
202
 
203
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
204
+ """
205
+ Logic for: FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path
206
+ """
207
  data = row.get(source_col)
208
+
209
+ # Parse if string
210
  if isinstance(data, str):
211
  try:
212
  data = json.loads(data)
213
+ except:
214
+ return None
215
 
216
  if not isinstance(data, list):
217
  return None
218
 
219
  matched_item = None
220
  for item in data:
221
+ # String comparison for safety
222
  if str(item.get(filter_key, '')) == str(filter_val):
223
  matched_item = item
224
  break
225
 
226
  if matched_item:
227
  return self._get_value_by_path(matched_item, target_path)
228
+
229
  return None
230
 
231
  def _apply_projection(self, row, recipe):
232
+ """
233
+ Builds the new row based on the recipe.
234
+ Raises ValueError if user Python code fails (Fail Fast).
235
+ """
236
  new_row = {}
237
 
238
+ # Setup Eval Context (Variables available in Python Mode)
239
  eval_context = row.copy()
240
  eval_context['row'] = row
241
  eval_context['json'] = json
 
259
  )
260
 
261
  elif t_type == 'python':
262
+ # Execute user code
263
  expression = col_def['expression']
264
  val = eval(expression, {}, eval_context)
265
  new_row[target_col] = val
266
 
267
  except Exception as e:
268
+ # Fail Fast: Stop the generator immediately if a column fails
269
  raise ValueError(f"Column '{target_col}' failed: {str(e)}")
270
 
271
  return new_row
272
 
273
+ # ==========================================
274
+ # 3. DOCUMENTATION (MODEL CARD)
275
+ # ==========================================
276
 
277
  def _generate_card(self, source_id, target_id, recipe, license_name):
278
+ """
279
+ Creates a README.md for the new dataset.
280
+ """
281
  card_data = DatasetCardData(
282
  language="en",
283
  license=license_name,
 
295
 
296
  The following operations were applied to the source data:
297
 
298
+ | Target Column | Operation Type | Logic |
299
+ |---------------|----------------|-------|
300
  """
301
  for col in recipe['columns']:
302
  c_type = col.get('type', 'simple')
303
  c_name = col['name']
 
304
 
305
  logic = "-"
306
+ if c_type == 'simple':
307
+ logic = f"Mapped from `{col.get('source')}`"
308
+ elif c_type == 'list_search':
309
+ logic = f"Extracted `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
310
+ elif c_type == 'python':
311
+ logic = f"Python: `{col.get('expression')}`"
312
 
313
+ content += f"| **{c_name}** | {c_type} | {logic} |\n"
314
 
315
  if recipe.get('filter_rule'):
316
  content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n"
 
320
  card = DatasetCard.from_template(card_data, content=content)
321
  return card
322
 
323
+ # ==========================================
324
+ # 4. EXECUTION
325
+ # ==========================================
326
 
327
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
328
  logger.info(f"Job started: {source_id} -> {target_id}")
 
332
  ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
333
  count = 0
334
  for i, row in enumerate(ds_stream):
335
+ if max_rows and count >= int(max_rows):
336
+ break
337
 
338
+ # 1. Filter
339
  if recipe.get('filter_rule'):
340
  try:
341
  ctx = row.copy()
342
  ctx['row'] = row
343
+ ctx['json'] = json
344
+ ctx['re'] = re
345
  if not eval(recipe['filter_rule'], {}, ctx):
346
  continue
347
  except Exception as e:
348
  raise ValueError(f"Filter crashed on row {i}: {e}")
349
 
350
+ # 2. Projection
351
  try:
352
  yield self._apply_projection(row, recipe)
353
  count += 1
354
  except ValueError as ve:
355
+ # Pass the specific column error up
356
  raise ve
357
  except Exception as e:
358
+ raise ValueError(f"Unexpected crash on row {i}: {e}")
359
 
360
  try:
361
+ # 1. Process & Push Data
362
  new_dataset = datasets.Dataset.from_generator(gen)
363
  new_dataset.push_to_hub(target_id, token=self.token)
364
 
365
+ # 2. Generate & Push Card
366
  try:
367
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
368
  card.push_to_hub(target_id, token=self.token)
369
  except Exception as e:
370
  logger.error(f"Failed to push Dataset Card: {e}")
 
371
 
372
  return {"status": "success", "rows_processed": len(new_dataset)}
373
 
 
375
  logger.error(f"Job Failed: {e}")
376
  return {"status": "failed", "error": str(e)}
377
 
378
+ # ==========================================
379
+ # 5. PREVIEW
380
+ # ==========================================
381
+
382
  def preview_transform(self, dataset_id, config, split, recipe):
383
  conf = config if config != 'default' else None
384
+
385
+ try:
386
+ ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
387
+ processed = []
 
 
 
 
 
 
 
 
 
388
 
389
+ for i, row in enumerate(ds_stream):
390
+ if len(processed) >= 5: break
391
+
392
+ # Check Filter
393
+ passed = True
394
+ if recipe.get('filter_rule'):
395
+ try:
396
+ ctx = row.copy()
397
+ ctx['row'] = row
398
+ ctx['json'] = json
399
+ ctx['re'] = re
400
+ if not eval(recipe['filter_rule'], {}, ctx):
401
+ passed = False
402
+ except:
403
+ passed = False # Skip invalid rows in preview
404
+
405
+ if passed:
406
+ try:
407
+ new_row = self._apply_projection(row, recipe)
408
+ processed.append(new_row)
409
+ except Exception as e:
410
+ # In preview, we want to see the error, not crash
411
+ processed.append({"_preview_error": f"Error: {str(e)}"})
412
+
413
+ return processed
414
+ except Exception as e:
415
+ # Return global error if loading fails
416
+ raise e