Spaces:
Sleeping
Sleeping
File size: 7,317 Bytes
9244b7e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | import os
import logging
import traceback
import queue
import time
import pandas as pd
import numpy as np
import joblib
from typing import Dict, Any, Optional
import mlflow
# Lale core imports
import lale
from lale.lib.lale import Hyperopt
from lale.lib.sklearn import LogisticRegression, RandomForestClassifier
from lale.lib.sklearn import MinMaxScaler, PCA
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder
from src.mlflow_utils import safe_set_experiment
def _preprocess_for_lale(X: pd.DataFrame, y: pd.Series, task_type: str = "Classification"):
"""
Encode non-numeric features so that sklearn estimators can handle them.
Returns (X_encoded, y_encoded, encoders) where encoders can be used for inverse transforms.
"""
X = X.copy()
# Encode categorical / object columns
col_encoders = {}
for col in X.columns:
if X[col].dtype == object or str(X[col].dtype) == 'category':
le = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
X[col] = le.fit_transform(X[[col]]).ravel()
col_encoders[col] = le
# Fill any remaining NaNs
for col in X.columns:
if X[col].isna().any():
X[col] = X[col].fillna(X[col].median() if pd.api.types.is_numeric_dtype(X[col]) else 0)
# Encode target if classification (Regression target should remain continuous)
y_encoder = None
if task_type != "Regression":
if y.dtype == object or str(y.dtype) == 'category':
y_encoder = LabelEncoder()
y = pd.Series(y_encoder.fit_transform(y), name=y.name)
return X, y, col_encoders, y_encoder
def run_lale_experiment(
train_df: pd.DataFrame,
target_col: str,
run_name: str,
time_limit: Optional[int],
log_queue: queue.Queue,
stop_event=None,
val_df: Optional[pd.DataFrame] = None,
task_type: str = "Classification",
**kwargs
) -> Dict[str, Any]:
"""
Run Lale experiment using scikit-learn compatible pipelines via Hyperopt.
Handles text/categorical features with automatic encoding.
"""
logger = logging.getLogger("lale")
logger.info(f"Starting Lale experiment: {run_name} (Task: {task_type})")
logger.info(f"Dataset shape: {train_df.shape}, Target: {target_col}")
# Drop NaNs on target
train_df_c = train_df.dropna(subset=[target_col])
X_raw = train_df_c.drop(columns=[target_col])
y_raw = train_df_c[target_col]
# Pre-process: encode categoricals/text for sklearn compatibility
logger.info("Step: Encoding categorical/text features...")
X, y, col_encoders, y_encoder = _preprocess_for_lale(X_raw, y_raw, task_type)
unique_classes_log = ""
if task_type != "Regression":
unique_classes_log = f" | Classes: {y.unique()[:5].tolist()}"
logger.info(f"Features after encoding: {list(X.columns)}{unique_classes_log}")
# Validate MLflow tracking
safe_set_experiment("Multi_AutoML_Project")
# Always end any dangling run (Hyperopt can leave runs open)
try:
mlflow.end_run()
except Exception:
pass
if stop_event and stop_event.is_set():
raise StopIteration("Experiment cancelled before setup.")
try:
with mlflow.start_run(run_name=run_name) as run:
run_id = run.info.run_id
logger.info(f"MLflow Run ID: {run_id}")
mlflow.log_param("model_type", "lale")
mlflow.log_param("n_features", X.shape[1])
mlflow.log_param("n_samples", X.shape[0])
mlflow.log_param("task_type", task_type)
# 1. Pipeline Definition (only numeric-friendly preprocessors)
logger.info("Step: Defining Lale Planned Pipeline...")
if task_type == "Regression":
from lale.lib.sklearn import LinearRegression, RandomForestRegressor
planned_pipeline = (
(MinMaxScaler | PCA) >>
(LinearRegression | RandomForestRegressor)
)
scoring_metric = "r2"
else:
planned_pipeline = (
(MinMaxScaler | PCA) >>
(LogisticRegression | RandomForestClassifier)
)
scoring_metric = "accuracy"
if stop_event and stop_event.is_set():
raise StopIteration("Experiment cancelled before Hyperopt setup.")
# 2. Hyperparameter Tuning
logger.info("Step: Tuning with Hyperopt...")
max_evals = 10 if time_limit is None or time_limit >= 300 else 5
time_args = {}
if time_limit and time_limit > 0:
time_args['max_eval_time'] = time_limit
optimizer = Hyperopt(
estimator=planned_pipeline,
max_evals=max_evals,
cv=3,
scoring=scoring_metric,
show_progressbar=False,
verbose=True, # show per-trial info so we can debug failures
**time_args
)
# 3. Fit Model
logger.info(f"Step: Fitting Lale Optimizer (evals={max_evals})...")
start_time = time.time()
trained_optimizer = optimizer.fit(X.values, y.values)
if stop_event and stop_event.is_set():
raise StopIteration("Experiment cancelled after fitting.")
best_model = trained_optimizer.get_pipeline()
# Extract score
try:
summary = trained_optimizer.summary()
best_score = -summary.iloc[0]['loss'] if 'loss' in summary.columns else 0.0
except Exception:
best_score = 0.0
elapsed_time = time.time() - start_time
logger.info(f"Best Score (CV {scoring_metric}): {best_score:.4f}")
logger.info(f"Optimization time: {elapsed_time:.1f}s")
# 4. Save Model
logger.info("Step: Saving model locally...")
model_dir = "models"
os.makedirs(model_dir, exist_ok=True)
model_path = os.path.join(model_dir, f"{run_name}_lale_model.pkl")
joblib.dump({"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type}, model_path)
# Log metrics
mlflow.log_metric(f"best_cv_{scoring_metric}", best_score)
mlflow.log_metric("optimization_time", elapsed_time)
mlflow.log_param("max_evals", max_evals)
mlflow.log_artifact(model_path, artifact_path="model")
logger.info("Lale experiment completed successfully.")
# 5. Prepare return bundle
bundle = {"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type}
return {
"success": True,
"predictor": bundle,
"run_id": run_id,
"type": "lale",
"model_path": model_path,
"metrics": {f"best_cv_{scoring_metric}": best_score}
}
except StopIteration as si:
logger.warning(f"Cancelled: {si}")
raise
except Exception as e:
logger.error(f"Lale Error: {e}")
logger.error(traceback.format_exc())
raise e
|