Multi-AutoML-Interface / src /lale_utils.py
PedroM2626's picture
Add ONNX export utilities, pipeline parser, and PyCaret integration
9244b7e
import os
import logging
import traceback
import queue
import time
import pandas as pd
import numpy as np
import joblib
from typing import Dict, Any, Optional
import mlflow
# Lale core imports
import lale
from lale.lib.lale import Hyperopt
from lale.lib.sklearn import LogisticRegression, RandomForestClassifier
from lale.lib.sklearn import MinMaxScaler, PCA
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder
from src.mlflow_utils import safe_set_experiment
def _preprocess_for_lale(X: pd.DataFrame, y: pd.Series, task_type: str = "Classification"):
"""
Encode non-numeric features so that sklearn estimators can handle them.
Returns (X_encoded, y_encoded, encoders) where encoders can be used for inverse transforms.
"""
X = X.copy()
# Encode categorical / object columns
col_encoders = {}
for col in X.columns:
if X[col].dtype == object or str(X[col].dtype) == 'category':
le = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
X[col] = le.fit_transform(X[[col]]).ravel()
col_encoders[col] = le
# Fill any remaining NaNs
for col in X.columns:
if X[col].isna().any():
X[col] = X[col].fillna(X[col].median() if pd.api.types.is_numeric_dtype(X[col]) else 0)
# Encode target if classification (Regression target should remain continuous)
y_encoder = None
if task_type != "Regression":
if y.dtype == object or str(y.dtype) == 'category':
y_encoder = LabelEncoder()
y = pd.Series(y_encoder.fit_transform(y), name=y.name)
return X, y, col_encoders, y_encoder
def run_lale_experiment(
train_df: pd.DataFrame,
target_col: str,
run_name: str,
time_limit: Optional[int],
log_queue: queue.Queue,
stop_event=None,
val_df: Optional[pd.DataFrame] = None,
task_type: str = "Classification",
**kwargs
) -> Dict[str, Any]:
"""
Run Lale experiment using scikit-learn compatible pipelines via Hyperopt.
Handles text/categorical features with automatic encoding.
"""
logger = logging.getLogger("lale")
logger.info(f"Starting Lale experiment: {run_name} (Task: {task_type})")
logger.info(f"Dataset shape: {train_df.shape}, Target: {target_col}")
# Drop NaNs on target
train_df_c = train_df.dropna(subset=[target_col])
X_raw = train_df_c.drop(columns=[target_col])
y_raw = train_df_c[target_col]
# Pre-process: encode categoricals/text for sklearn compatibility
logger.info("Step: Encoding categorical/text features...")
X, y, col_encoders, y_encoder = _preprocess_for_lale(X_raw, y_raw, task_type)
unique_classes_log = ""
if task_type != "Regression":
unique_classes_log = f" | Classes: {y.unique()[:5].tolist()}"
logger.info(f"Features after encoding: {list(X.columns)}{unique_classes_log}")
# Validate MLflow tracking
safe_set_experiment("Multi_AutoML_Project")
# Always end any dangling run (Hyperopt can leave runs open)
try:
mlflow.end_run()
except Exception:
pass
if stop_event and stop_event.is_set():
raise StopIteration("Experiment cancelled before setup.")
try:
with mlflow.start_run(run_name=run_name) as run:
run_id = run.info.run_id
logger.info(f"MLflow Run ID: {run_id}")
mlflow.log_param("model_type", "lale")
mlflow.log_param("n_features", X.shape[1])
mlflow.log_param("n_samples", X.shape[0])
mlflow.log_param("task_type", task_type)
# 1. Pipeline Definition (only numeric-friendly preprocessors)
logger.info("Step: Defining Lale Planned Pipeline...")
if task_type == "Regression":
from lale.lib.sklearn import LinearRegression, RandomForestRegressor
planned_pipeline = (
(MinMaxScaler | PCA) >>
(LinearRegression | RandomForestRegressor)
)
scoring_metric = "r2"
else:
planned_pipeline = (
(MinMaxScaler | PCA) >>
(LogisticRegression | RandomForestClassifier)
)
scoring_metric = "accuracy"
if stop_event and stop_event.is_set():
raise StopIteration("Experiment cancelled before Hyperopt setup.")
# 2. Hyperparameter Tuning
logger.info("Step: Tuning with Hyperopt...")
max_evals = 10 if time_limit is None or time_limit >= 300 else 5
time_args = {}
if time_limit and time_limit > 0:
time_args['max_eval_time'] = time_limit
optimizer = Hyperopt(
estimator=planned_pipeline,
max_evals=max_evals,
cv=3,
scoring=scoring_metric,
show_progressbar=False,
verbose=True, # show per-trial info so we can debug failures
**time_args
)
# 3. Fit Model
logger.info(f"Step: Fitting Lale Optimizer (evals={max_evals})...")
start_time = time.time()
trained_optimizer = optimizer.fit(X.values, y.values)
if stop_event and stop_event.is_set():
raise StopIteration("Experiment cancelled after fitting.")
best_model = trained_optimizer.get_pipeline()
# Extract score
try:
summary = trained_optimizer.summary()
best_score = -summary.iloc[0]['loss'] if 'loss' in summary.columns else 0.0
except Exception:
best_score = 0.0
elapsed_time = time.time() - start_time
logger.info(f"Best Score (CV {scoring_metric}): {best_score:.4f}")
logger.info(f"Optimization time: {elapsed_time:.1f}s")
# 4. Save Model
logger.info("Step: Saving model locally...")
model_dir = "models"
os.makedirs(model_dir, exist_ok=True)
model_path = os.path.join(model_dir, f"{run_name}_lale_model.pkl")
joblib.dump({"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type}, model_path)
# Log metrics
mlflow.log_metric(f"best_cv_{scoring_metric}", best_score)
mlflow.log_metric("optimization_time", elapsed_time)
mlflow.log_param("max_evals", max_evals)
mlflow.log_artifact(model_path, artifact_path="model")
logger.info("Lale experiment completed successfully.")
# 5. Prepare return bundle
bundle = {"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type}
return {
"success": True,
"predictor": bundle,
"run_id": run_id,
"type": "lale",
"model_path": model_path,
"metrics": {f"best_cv_{scoring_metric}": best_score}
}
except StopIteration as si:
logger.warning(f"Cancelled: {si}")
raise
except Exception as e:
logger.error(f"Lale Error: {e}")
logger.error(traceback.format_exc())
raise e