Spaces:
Sleeping
Sleeping
| """ | |
| Model Evaluator for Data-Centric RL Environment. | |
| Uses sklearn RandomForestClassifier (primary) + LogisticRegression (secondary) | |
| with hash-based caching. Trains on working_copy, evaluates on held-out | |
| ground_truth test split. | |
| Primary accuracy (RF) drives all rewards and grading. | |
| Secondary accuracy (LR) is diagnostic β shows whether improvements generalise | |
| beyond the RF decision boundary (overfitting detection). | |
| """ | |
| import hashlib | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| class ModelEvaluator: | |
| """Caching dual-classifier evaluator. | |
| Args: | |
| fast_mode: If True, uses n_estimators=20 (for GRPO rollouts, ~4x faster). | |
| If False, uses n_estimators=100 (for final eval, more accurate). | |
| """ | |
| def __init__(self, fast_mode: bool = False): | |
| self._cache_hash: Optional[str] = None | |
| self._cached_accuracy: float = 0.0 | |
| self._cached_per_class: Dict = {} | |
| self._cached_lr_accuracy: float = 0.0 | |
| self._cached_feature_importance: Dict[str, float] = {} | |
| self._cached = False | |
| self._fast_mode = fast_mode | |
| self._n_estimators = 20 if fast_mode else 100 | |
| def _compute_hash(self, df: pd.DataFrame) -> str: | |
| try: | |
| return hashlib.md5( | |
| pd.util.hash_pandas_object(df, index=True).values.tobytes() | |
| ).hexdigest() | |
| except Exception: | |
| return hashlib.md5(df.to_json().encode()).hexdigest() | |
| def evaluate( | |
| self, | |
| working_copy: pd.DataFrame, | |
| ground_truth: pd.DataFrame, | |
| test_size: float = 0.25, | |
| seed: int = 42, | |
| ) -> Tuple[float, Dict, bool, float]: | |
| """ | |
| Train on working_copy; evaluate on held-out ground_truth test split. | |
| Returns: | |
| rf_accuracy β float (primary, used for rewards) | |
| per_class β dict from classification_report | |
| from_cache β True if result came from cache (no retrain) | |
| lr_accuracy β float (secondary, diagnostic only) | |
| """ | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.metrics import classification_report | |
| from sklearn.model_selection import train_test_split | |
| current_hash = self._compute_hash(working_copy) | |
| if self._cached and current_hash == self._cache_hash: | |
| return self._cached_accuracy, self._cached_per_class, True, self._cached_lr_accuracy | |
| # Prepare train set from working_copy | |
| wc = working_copy.copy() | |
| wc = wc.dropna(subset=["target"]) | |
| for col in wc.columns: | |
| if col != "target": | |
| wc[col] = pd.to_numeric(wc[col], errors="coerce") | |
| wc = wc.dropna() | |
| _empty = (0.0, {}, False, 0.0) | |
| if len(wc) < 10: | |
| self._cache_hash = current_hash | |
| self._cached_accuracy = 0.0 | |
| self._cached_per_class = {} | |
| self._cached_lr_accuracy = 0.0 | |
| self._cached_feature_importance = {} | |
| self._cached = True | |
| return _empty | |
| X_train = wc.drop("target", axis=1).values | |
| y_train = wc["target"].astype(int).values | |
| # Build test set from ground_truth | |
| gt = ground_truth.copy().dropna() | |
| for col in gt.columns: | |
| if col != "target": | |
| gt[col] = pd.to_numeric(gt[col], errors="coerce") | |
| gt = gt.dropna() | |
| if len(gt) < 10: | |
| self._cache_hash = current_hash | |
| self._cached_accuracy = 0.0 | |
| self._cached_per_class = {} | |
| self._cached_lr_accuracy = 0.0 | |
| self._cached_feature_importance = {} | |
| self._cached = True | |
| return _empty | |
| _, X_test_df, _, y_test_df = train_test_split( | |
| gt.drop("target", axis=1), | |
| gt["target"], | |
| test_size=test_size, | |
| random_state=seed, | |
| stratify=gt["target"] if gt["target"].nunique() > 1 else None, | |
| ) | |
| y_test = y_test_df.astype(int).values | |
| # Align columns | |
| train_cols = list(wc.drop("target", axis=1).columns) | |
| test_cols = list(X_test_df.columns) | |
| shared = [c for c in train_cols if c in test_cols] | |
| if not shared: | |
| self._cache_hash = current_hash | |
| self._cached_accuracy = 0.0 | |
| self._cached_per_class = {} | |
| self._cached_lr_accuracy = 0.0 | |
| self._cached_feature_importance = {} | |
| self._cached = True | |
| return _empty | |
| X_train_arr = wc[shared].values | |
| X_test_arr = X_test_df[shared].values | |
| # ββ Primary: Random Forest ββββββββββββββββββββββββββββββββββββββββββ | |
| rf = RandomForestClassifier( | |
| n_estimators=self._n_estimators, | |
| random_state=42, | |
| n_jobs=1, | |
| ) | |
| rf.fit(X_train_arr, y_train) | |
| y_pred_rf = rf.predict(X_test_arr) | |
| rf_accuracy = float(rf.score(X_test_arr, y_test)) | |
| try: | |
| per_class = classification_report( | |
| y_test, y_pred_rf, output_dict=True, zero_division=0 | |
| ) | |
| except Exception: | |
| per_class = {} | |
| # Feature importance (RF gives this for free) | |
| feature_importance: Dict[str, float] = {} | |
| if hasattr(rf, "feature_importances_"): | |
| importances = rf.feature_importances_ | |
| for col, imp in zip(shared, importances): | |
| feature_importance[col] = round(float(imp), 4) | |
| # ββ Secondary: Logistic Regression (diagnostic) βββββββββββββββββββββ | |
| lr_accuracy = 0.0 | |
| if not self._fast_mode: # skip LR in fast_mode to keep GRPO rollouts quick | |
| try: | |
| lr = LogisticRegression(max_iter=500, random_state=42, n_jobs=1) | |
| lr.fit(X_train_arr, y_train) | |
| lr_accuracy = float(lr.score(X_test_arr, y_test)) | |
| except Exception: | |
| lr_accuracy = 0.0 | |
| else: | |
| # In fast_mode, reuse RF accuracy as placeholder (not shown to agent) | |
| lr_accuracy = rf_accuracy | |
| # Update cache | |
| self._cache_hash = current_hash | |
| self._cached_accuracy = rf_accuracy | |
| self._cached_per_class = per_class | |
| self._cached_lr_accuracy = lr_accuracy | |
| self._cached_feature_importance = feature_importance | |
| self._cached = True | |
| return rf_accuracy, per_class, False, lr_accuracy | |
| def agreement_signal(self, rf_acc: float, lr_acc: float, | |
| prev_rf: float, prev_lr: float) -> str: | |
| """ | |
| Compare RF vs LR improvement direction. | |
| Returns a signal string for the agent to reason about. | |
| """ | |
| rf_improved = rf_acc > prev_rf + 0.005 | |
| lr_improved = lr_acc > prev_lr + 0.005 | |
| rf_declined = rf_acc < prev_rf - 0.005 | |
| lr_declined = lr_acc < prev_lr - 0.005 | |
| if rf_improved and lr_improved: | |
| return "BOTH_AGREE_IMPROVE β fix is robust and generalises" | |
| elif rf_improved and lr_declined: | |
| return "DISAGREE β RF improved but LR declined (possible RF-specific overfitting)" | |
| elif rf_declined and lr_declined: | |
| return "BOTH_DECLINED β last change hurt both classifiers, consider undo" | |
| elif not rf_improved and not rf_declined: | |
| return "NO_CHANGE β last operation had no measurable effect" | |
| else: | |
| return "MIXED β marginal changes, continue and validate again" | |
| def feature_importance_text(self, top_n: int = 5) -> str: | |
| """Return formatted feature importance string for agent observation.""" | |
| if not self._cached_feature_importance: | |
| return "" | |
| sorted_feats = sorted( | |
| self._cached_feature_importance.items(), | |
| key=lambda x: -x[1] | |
| )[:top_n] | |
| parts = [f"{col} ({imp:.3f})" for col, imp in sorted_feats] | |
| return "Feature importance: " + " > ".join(parts) | |
| def invalidate_cache(self): | |
| self._cached = False | |
| self._cache_hash = None | |
| def last_accuracy(self) -> float: | |
| return self._cached_accuracy if self._cached else 0.0 | |
| def last_lr_accuracy(self) -> float: | |
| return self._cached_lr_accuracy if self._cached else 0.0 | |
| def last_feature_importance(self) -> Dict[str, float]: | |
| return self._cached_feature_importance if self._cached else {} | |