Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import traceback | |
| import queue | |
| import time | |
| import pandas as pd | |
| from typing import Dict, Any, Optional | |
| import mlflow | |
| from src.mlflow_utils import safe_set_experiment | |
| from src.onnx_utils import export_to_onnx | |
| def run_pycaret_experiment( | |
| train_df: pd.DataFrame, | |
| target_col: str, | |
| run_name: str, | |
| time_limit: Optional[int], | |
| log_queue: queue.Queue, | |
| stop_event=None, | |
| val_df: Optional[pd.DataFrame] = None, | |
| task_type: str = "Classification", | |
| n_jobs: int = 1, | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run PyCaret experiment. | |
| Dynamically loads classification, regression, or time_series depending on task_type. | |
| """ | |
| logger = logging.getLogger("pycaret") | |
| logger.info(f"Starting PyCaret experiment: {run_name} (Task: {task_type})") | |
| logger.info(f"Dataset shape: {train_df.shape}, Target: {target_col}") | |
| # Dynamic imports based on task_type | |
| if task_type == "Regression": | |
| from pycaret.regression import setup, compare_models, pull, tune_model, blend_models, save_model | |
| sort_metric = "R2" | |
| include_models = ["lr", "rf", "et", "lightgbm"] | |
| elif task_type == "Time Series Forecasting": | |
| from pycaret.time_series import setup, compare_models, pull, tune_model, blend_models, save_model | |
| sort_metric = "MASE" | |
| include_models = ["naive", "snaive", "arima", "ets"] | |
| else: | |
| from pycaret.classification import setup, compare_models, pull, tune_model, blend_models, save_model | |
| sort_metric = "F1" | |
| include_models = ["lr", "nb", "rf", "et", "lightgbm"] | |
| # Always end any dangling MLflow run to avoid conflicts | |
| try: | |
| mlflow.end_run() | |
| except Exception: | |
| pass | |
| # 1. Prepare MLflow Tracking | |
| safe_set_experiment("Multi_AutoML_Project") | |
| if stop_event and stop_event.is_set(): | |
| raise StopIteration("Experiment cancelled before setup.") | |
| try: | |
| # 2. PyCaret Setup | |
| logger.info("Step: Setting up PyCaret environment...") | |
| setup_kwargs = { | |
| "data": train_df, | |
| "target": target_col, | |
| "session_id": 42, | |
| "verbose": False, | |
| "fold": 3, | |
| "log_experiment": False, | |
| "system_log": False, | |
| "n_jobs": n_jobs | |
| } | |
| if task_type == "Time Series Forecasting": | |
| setup_kwargs["fh"] = kwargs.get("fh", 12) | |
| setup_kwargs["seasonal_period"] = kwargs.get("seasonal_period", 12) | |
| else: | |
| setup_kwargs["test_data"] = val_df | |
| setup_kwargs["normalize"] = True | |
| setup_kwargs["index"] = False | |
| setup_kwargs["feature_selection"] = False | |
| setup_kwargs["memory"] = False | |
| clf_setup = setup(**setup_kwargs) | |
| if stop_event and stop_event.is_set(): | |
| raise StopIteration("Experiment cancelled after setup.") | |
| # 3. Start our own MLflow run AFTER PyCaret setup | |
| with mlflow.start_run(run_name=run_name) as run: | |
| run_id = run.info.run_id | |
| logger.info(f"MLflow Run ID: {run_id}") | |
| mlflow.log_param("framework", "pycaret") | |
| mlflow.log_param("model_type", "pycaret") | |
| mlflow.log_param("task_type", task_type) | |
| # 4. Model Comparison | |
| logger.info("Step: Comparing models...") | |
| n_select = 3 | |
| logger.info(f"Including models: {include_models} (Sorting by {sort_metric})") | |
| best_models = compare_models( | |
| n_select=n_select, | |
| sort=sort_metric, | |
| verbose=False, | |
| include=include_models | |
| ) | |
| comparison_df = pull() | |
| if not comparison_df.empty: | |
| top_model_name = comparison_df.iloc[0]['Model'] | |
| logger.info(f"Best model found: {top_model_name}") | |
| if stop_event and stop_event.is_set(): | |
| raise StopIteration("Experiment cancelled after model comparison.") | |
| # Ensure best_models is a list | |
| if not isinstance(best_models, list): | |
| best_models = [best_models] | |
| best_model = best_models[0] | |
| # 5. Tuning (Time Series tuning might require different params, keeping generic) | |
| logger.info("Step: Tuning best model...") | |
| n_iter = 10 if time_limit is None or time_limit >= 300 else 5 | |
| # search_library="scikit-learn" shouldn't be passed to pycaret.time_series | |
| tune_kwargs = { | |
| "estimator": best_model, | |
| "optimize": sort_metric, | |
| "n_iter": n_iter, | |
| "verbose": False, | |
| "choose_better": True | |
| } | |
| if task_type != "Time Series Forecasting": | |
| tune_kwargs["search_library"] = "scikit-learn" | |
| tune_kwargs["search_algorithm"] = "random" | |
| tuned_model = tune_model(**tune_kwargs) | |
| if stop_event and stop_event.is_set(): | |
| raise StopIteration("Experiment cancelled after tuning.") | |
| # 6. Blending (only if we have multiple models) | |
| if len(best_models) > 1: | |
| logger.info("Step: Blending top models...") | |
| final_model = blend_models( | |
| estimator_list=best_models, | |
| optimize=sort_metric, | |
| verbose=False | |
| ) | |
| else: | |
| final_model = tuned_model | |
| logger.info("Step: Skipping blend (only one model selected).") | |
| # 7. Save model | |
| model_dir = "models" | |
| os.makedirs(model_dir, exist_ok=True) | |
| model_path_base = os.path.join(model_dir, f"{run_name}_pycaret_model") | |
| logger.info(f"Saving model to {model_path_base}.pkl...") | |
| save_model(final_model, model_path_base) | |
| # 8. Log metrics to our MLflow run | |
| try: | |
| final_metrics = pull() | |
| if not final_metrics.empty: | |
| row = final_metrics.iloc[0] | |
| for k, v in row.items(): | |
| if isinstance(v, (int, float)): | |
| mlflow.log_metric(k.lower().replace(" ", "_"), float(v)) | |
| except Exception as me: | |
| logger.warning(f"Could not pull metrics: {me}") | |
| # Log model artifact | |
| model_pkl = f"{model_path_base}.pkl" | |
| if os.path.exists(model_pkl): | |
| mlflow.log_artifact(model_pkl, artifact_path="model") | |
| # ONNX Export | |
| try: | |
| onnx_path = os.path.join(model_dir, f"{run_name}_pycaret.onnx") | |
| # PyCaret 'final_model' is a scikit-learn pipeline | |
| export_to_onnx(final_model, "pycaret", target_col, onnx_path, input_sample=train_df[:1]) | |
| mlflow.log_artifact(onnx_path, artifact_path="model") | |
| except Exception as e: | |
| logger.warning(f"Failed to export PyCaret model to ONNX: {e}") | |
| logger.info("PyCaret experiment completed successfully.") | |
| return { | |
| "success": True, | |
| "predictor": final_model, | |
| "run_id": run_id, | |
| "type": "pycaret", | |
| "model_path": model_pkl | |
| } | |
| except StopIteration as si: | |
| logger.warning(f"Cancelled: {si}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"PyCaret Error: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise e | |
| finally: | |
| # Always clean up any dangling run | |
| try: | |
| mlflow.end_run() | |
| except Exception: | |
| pass | |