File size: 7,317 Bytes
9244b7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import os
import logging
import traceback
import queue
import time
import pandas as pd
import numpy as np
import joblib
from typing import Dict, Any, Optional

import mlflow

# Lale core imports
import lale
from lale.lib.lale import Hyperopt
from lale.lib.sklearn import LogisticRegression, RandomForestClassifier
from lale.lib.sklearn import MinMaxScaler, PCA
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder

from src.mlflow_utils import safe_set_experiment


def _preprocess_for_lale(X: pd.DataFrame, y: pd.Series, task_type: str = "Classification"):
    """
    Encode non-numeric features so that sklearn estimators can handle them.
    Returns (X_encoded, y_encoded, encoders) where encoders can be used for inverse transforms.
    """
    X = X.copy()

    # Encode categorical / object columns
    col_encoders = {}
    for col in X.columns:
        if X[col].dtype == object or str(X[col].dtype) == 'category':
            le = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
            X[col] = le.fit_transform(X[[col]]).ravel()
            col_encoders[col] = le

    # Fill any remaining NaNs
    for col in X.columns:
        if X[col].isna().any():
            X[col] = X[col].fillna(X[col].median() if pd.api.types.is_numeric_dtype(X[col]) else 0)

    # Encode target if classification (Regression target should remain continuous)
    y_encoder = None
    if task_type != "Regression":
        if y.dtype == object or str(y.dtype) == 'category':
            y_encoder = LabelEncoder()
            y = pd.Series(y_encoder.fit_transform(y), name=y.name)

    return X, y, col_encoders, y_encoder


def run_lale_experiment(
    train_df: pd.DataFrame,
    target_col: str,
    run_name: str,
    time_limit: Optional[int],
    log_queue: queue.Queue,
    stop_event=None,
    val_df: Optional[pd.DataFrame] = None,
    task_type: str = "Classification",
    **kwargs
) -> Dict[str, Any]:
    """
    Run Lale experiment using scikit-learn compatible pipelines via Hyperopt.
    Handles text/categorical features with automatic encoding.
    """
    logger = logging.getLogger("lale")
    logger.info(f"Starting Lale experiment: {run_name} (Task: {task_type})")
    logger.info(f"Dataset shape: {train_df.shape}, Target: {target_col}")

    # Drop NaNs on target
    train_df_c = train_df.dropna(subset=[target_col])
    X_raw = train_df_c.drop(columns=[target_col])
    y_raw = train_df_c[target_col]

    # Pre-process: encode categoricals/text for sklearn compatibility
    logger.info("Step: Encoding categorical/text features...")
    X, y, col_encoders, y_encoder = _preprocess_for_lale(X_raw, y_raw, task_type)
    
    unique_classes_log = ""
    if task_type != "Regression":
       unique_classes_log = f" | Classes: {y.unique()[:5].tolist()}"
    logger.info(f"Features after encoding: {list(X.columns)}{unique_classes_log}")

    # Validate MLflow tracking
    safe_set_experiment("Multi_AutoML_Project")

    # Always end any dangling run (Hyperopt can leave runs open)
    try:
        mlflow.end_run()
    except Exception:
        pass

    if stop_event and stop_event.is_set():
        raise StopIteration("Experiment cancelled before setup.")

    try:
        with mlflow.start_run(run_name=run_name) as run:
            run_id = run.info.run_id
            logger.info(f"MLflow Run ID: {run_id}")
            mlflow.log_param("model_type", "lale")
            mlflow.log_param("n_features", X.shape[1])
            mlflow.log_param("n_samples", X.shape[0])
            mlflow.log_param("task_type", task_type)

            # 1. Pipeline Definition (only numeric-friendly preprocessors)
            logger.info("Step: Defining Lale Planned Pipeline...")
            
            if task_type == "Regression":
                from lale.lib.sklearn import LinearRegression, RandomForestRegressor
                planned_pipeline = (
                    (MinMaxScaler | PCA) >>
                    (LinearRegression | RandomForestRegressor)
                )
                scoring_metric = "r2"
            else:
                planned_pipeline = (
                    (MinMaxScaler | PCA) >>
                    (LogisticRegression | RandomForestClassifier)
                )
                scoring_metric = "accuracy"

            if stop_event and stop_event.is_set():
                raise StopIteration("Experiment cancelled before Hyperopt setup.")

            # 2. Hyperparameter Tuning
            logger.info("Step: Tuning with Hyperopt...")
            max_evals = 10 if time_limit is None or time_limit >= 300 else 5
            time_args = {}
            if time_limit and time_limit > 0:
                time_args['max_eval_time'] = time_limit

            optimizer = Hyperopt(
                estimator=planned_pipeline,
                max_evals=max_evals,
                cv=3,
                scoring=scoring_metric,
                show_progressbar=False,
                verbose=True,   # show per-trial info so we can debug failures
                **time_args
            )

            # 3. Fit Model
            logger.info(f"Step: Fitting Lale Optimizer (evals={max_evals})...")
            start_time = time.time()
            trained_optimizer = optimizer.fit(X.values, y.values)

            if stop_event and stop_event.is_set():
                raise StopIteration("Experiment cancelled after fitting.")

            best_model = trained_optimizer.get_pipeline()

            # Extract score
            try:
                summary = trained_optimizer.summary()
                best_score = -summary.iloc[0]['loss'] if 'loss' in summary.columns else 0.0
            except Exception:
                best_score = 0.0

            elapsed_time = time.time() - start_time
            logger.info(f"Best Score (CV {scoring_metric}): {best_score:.4f}")
            logger.info(f"Optimization time: {elapsed_time:.1f}s")

            # 4. Save Model
            logger.info("Step: Saving model locally...")
            model_dir = "models"
            os.makedirs(model_dir, exist_ok=True)
            model_path = os.path.join(model_dir, f"{run_name}_lale_model.pkl")
            joblib.dump({"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type}, model_path)

            # Log metrics
            mlflow.log_metric(f"best_cv_{scoring_metric}", best_score)
            mlflow.log_metric("optimization_time", elapsed_time)
            mlflow.log_param("max_evals", max_evals)
            mlflow.log_artifact(model_path, artifact_path="model")

            logger.info("Lale experiment completed successfully.")

            # 5. Prepare return bundle
            bundle = {"model": best_model, "col_encoders": col_encoders, "y_encoder": y_encoder, "task_type": task_type}

            return {
                "success": True,
                "predictor": bundle,
                "run_id": run_id,
                "type": "lale",
                "model_path": model_path,
                "metrics": {f"best_cv_{scoring_metric}": best_score}
            }

    except StopIteration as si:
        logger.warning(f"Cancelled: {si}")
        raise
    except Exception as e:
        logger.error(f"Lale Error: {e}")
        logger.error(traceback.format_exc())
        raise e