Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import traceback | |
| import queue | |
| import time | |
| import pandas as pd | |
| import numpy as np | |
| import joblib | |
| from typing import Dict, Any, Optional | |
| import mlflow | |
| # Lale core imports | |
| import lale | |
| from lale.lib.lale import Hyperopt | |
| from lale.lib.sklearn import LogisticRegression, RandomForestClassifier | |
| from lale.lib.sklearn import MinMaxScaler, PCA | |
| from sklearn.preprocessing import LabelEncoder, OrdinalEncoder | |
| from src.mlflow_utils import safe_set_experiment | |
| def _preprocess_for_lale(X: pd.DataFrame, y: pd.Series, task_type: str = "Classification"): | |
| """ | |
| Encode non-numeric features so that sklearn estimators can handle them. | |
| Returns (X_encoded, y_encoded, encoders) where encoders can be used for inverse transforms. | |
| """ | |
| X = X.copy() | |
| # Encode categorical / object columns | |
| col_encoders = {} | |
| for col in X.columns: | |
| if X[col].dtype == object or str(X[col].dtype) == 'category': | |
| le = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1) | |
| X[col] = le.fit_transform(X[[col]]).ravel() | |
| col_encoders[col] = le | |
| # Fill any remaining NaNs | |
| for col in X.columns: | |
| if X[col].isna().any(): | |
| X[col] = X[col].fillna(X[col].median() if pd.api.types.is_numeric_dtype(X[col]) else 0) | |
| # Encode target if classification (Regression target should remain continuous) | |
| y_encoder = None | |
| if task_type != "Regression": | |
| if y.dtype == object or str(y.dtype) == 'category': | |
| y_encoder = LabelEncoder() | |
| y = pd.Series(y_encoder.fit_transform(y), name=y.name) | |
| return X, y, col_encoders, y_encoder | |
| def run_lale_experiment( | |
| train_df: pd.DataFrame, | |
| target_col: str, | |
| run_name: str, | |
| time_limit: Optional[int], | |
| log_queue: queue.Queue, | |
| stop_event=None, | |
| val_df: Optional[pd.DataFrame] = None, | |
| task_type: str = "Classification", | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run Lale experiment using scikit-learn compatible pipelines via Hyperopt. | |
| Handles text/categorical features with automatic encoding. | |
| """ | |
| logger = logging.getLogger("lale") | |
| logger.info(f"Starting Lale experiment: {run_name} (Task: {task_type})") | |
| logger.info(f"Dataset shape: {train_df.shape}, Target: {target_col}") | |
| # Drop NaNs on target | |
| train_df_c = train_df.dropna(subset=[target_col]) | |
| X_raw = train_df_c.drop(columns=[target_col]) | |
| y_raw = train_df_c[target_col] | |
| # Pre-process: encode categoricals/text for sklearn compatibility | |
| logger.info("Step: Encoding categorical/text features...") | |
| X, y, col_encoders, y_encoder = _preprocess_for_lale(X_raw, y_raw, task_type) | |
| unique_classes_log = "" | |
| if task_type != "Regression": | |
| unique_classes_log = f" | Classes: {y.unique()[:5].tolist()}" | |
| logger.info(f"Features after encoding: {list(X.columns)}{unique_classes_log}") | |
| # Validate MLflow tracking | |
| safe_set_experiment("Multi_AutoML_Project") | |
| # Always end any dangling run (Hyperopt can leave runs open) | |
| try: | |
| mlflow.end_run() | |
| except Exception: | |
| pass | |
| if stop_event and stop_event.is_set(): | |
| raise StopIteration("Experiment cancelled before setup.") | |
| try: | |
| with mlflow.start_run(run_name=run_name) as run: | |
| run_id = run.info.run_id | |
| logger.info(f"MLflow Run ID: {run_id}") | |
| mlflow.log_param("model_type", "lale") | |
| mlflow.log_param("n_features", X.shape[1]) | |
| mlflow.log_param("n_samples", X.shape[0]) | |
| mlflow.log_param("task_type", task_type) | |
| # 1. Pipeline Definition (only numeric-friendly preprocessors) | |
| logger.info("Step: Defining Lale Planned Pipeline...") | |
| if task_type == "Regression": | |
| from lale.lib.sklearn import LinearRegression, RandomForestRegressor | |
| planned_pipeline = ( | |
| (MinMaxScaler | PCA) >> | |
| (LinearRegression | RandomForestRegressor) | |
| ) | |
| scoring_metric = "r2" | |
| else: | |
| planned_pipeline = ( | |
| (MinMaxScaler | PCA) >> | |
| (LogisticRegression | RandomForestClassifier) | |
| ) | |
| scoring_metric = "accuracy" | |
| if stop_event and stop_event.is_set(): | |
| raise StopIteration("Experiment cancelled before Hyperopt setup.") | |
| # 2. Hyperparameter Tuning | |
| logger.info("Step: Tuning with Hyperopt...") | |
| max_evals = 10 if time_limit is None or time_limit >= 300 else 5 | |
| time_args = {} | |
| if time_limit and time_limit > 0: | |
| time_args['max_eval_time'] = time_limit | |
| optimizer = Hyperopt( | |
| estimator=planned_pipeline, | |
| max_evals=max_evals, | |
| cv=3, | |
| scoring=scoring_metric, | |
| show_progressbar=False, | |
| verbose=True, # show per-trial info so we can debug failures | |
| **time_args | |
| ) | |
| # 3. Fit Model | |
| logger.info(f"Step: Fitting Lale Optimizer (evals={max_evals})...") | |
| start_time = time.time() | |
| trained_optimizer = optimizer.fit(X.values, y.values) | |
| if stop_event and stop_event.is_set(): | |
| raise StopIteration("Experiment cancelled after fitting.") | |
| best_model = trained_optimizer.get_pipeline() | |
| # Extract score | |
| try: | |
| summary = trained_optimizer.summary() | |
| best_score = -summary.iloc[0]['loss'] if 'loss' in summary.columns else 0.0 | |
| except Exception: | |
| best_score = 0.0 | |
| elapsed_time = time.time() - start_time | |
| logger.info(f"Best Score (CV {scoring_metric}): {best_score:.4f}") | |
| logger.info(f"Optimization time: {elapsed_time:.1f}s") | |
| # 4. Save Model | |
| logger.info("Step: Saving model locally...") | |
| model_dir = "models" | |
| os.makedirs(model_dir, exist_ok=True) | |
| model_path = os.path.join(model_dir, f"{run_name}_lale_model.pkl") | |
| joblib.dump({"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type}, model_path) | |
| # Log metrics | |
| mlflow.log_metric(f"best_cv_{scoring_metric}", best_score) | |
| mlflow.log_metric("optimization_time", elapsed_time) | |
| mlflow.log_param("max_evals", max_evals) | |
| mlflow.log_artifact(model_path, artifact_path="model") | |
| logger.info("Lale experiment completed successfully.") | |
| # 5. Prepare return bundle | |
| bundle = {"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type} | |
| return { | |
| "success": True, | |
| "predictor": bundle, | |
| "run_id": run_id, | |
| "type": "lale", | |
| "model_path": model_path, | |
| "metrics": {f"best_cv_{scoring_metric}": best_score} | |
| } | |
| except StopIteration as si: | |
| logger.warning(f"Cancelled: {si}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Lale Error: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise e | |