Multi-AutoML-Interface / src /pipeline_parser.py
PedroM2626's picture
Add ONNX export utilities, pipeline parser, and PyCaret integration
9244b7e
"""
pipeline_parser.py β€” infer which AutoML pipeline step is active from live logs.
Each framework has a sequence of steps. This module parses log lines
to determine which step is "done", which is "active", and which is "pending".
"""
from typing import Optional
# ── Step definitions per framework ───────────────────────────────────────────
# Each step has:
# label β€” displayed name
# keywords β€” log keywords that signal this step has STARTED or is active
# done_kw β€” log keywords that signal this step is DONE (optional)
# description β€” tooltip / explainer text
_STEPS: dict[str, list[dict]] = {
"autogluon": [
{
"label": "Data Preparation",
"icon": "πŸ“Š",
"keywords": ["preprocessing", "converting", "fitting", "loading data", "train_data"],
"done_kw": ["beginning automl", "fitting model:"],
"description": "Validates and preprocesses the dataset. Handles missing values, categorical encoding and feature types.",
},
{
"label": "Fitting Models",
"icon": "πŸ€–",
"keywords": ["fitting model:", "training model for", "fitting with cpus"],
"done_kw": ["weightedensemble", "autogluon training complete"],
"description": "Trains each individual model (LightGBM, XGBoost, CatBoost, RF, etc.) within the time budget.",
},
{
"label": "Stacking / Ensembling",
"icon": "πŸ—οΈ",
"keywords": ["weightedensemble", "ensemble weights", "stacking"],
"done_kw": ["autogluon training complete"],
"description": "Combines the best models using weighted ensembling or multi-layer stacking.",
},
{
"label": "Evaluation",
"icon": "πŸ“",
"keywords": ["leaderboard", "best model:", "validation score", "score_val"],
"done_kw": ["tabularpredictor saved", "best model logged"],
"description": "Evaluates all models on the validation set and builds the final leaderboard.",
},
{
"label": "MLflow Logging",
"icon": "πŸ“",
"keywords": ["mlflow", "log_artifacts", "logged successfully", "artifacts logged"],
"done_kw": ["thread finished"],
"description": "Persists model artifacts, parameters, and metrics to MLflow for tracking and versioning.",
},
],
"flaml": [
{
"label": "Data Preparation",
"icon": "πŸ“Š",
"keywords": ["data ready", "preprocessing", "starting flaml"],
"done_kw": ["executing hyperparameter search"],
"description": "Validates the dataset, detects feature types, and prepares inputs for FLAML's optimizer.",
},
{
"label": "Hyperparameter Search",
"icon": "πŸ”",
"keywords": ["executing hyperparameter search", "automl.fit", "[flaml.automl", "trial", "best config"],
"done_kw": ["search finished"],
"description": "FLAML runs a cost-effective search over hyperparameter configurations using Bayesian optimization.",
},
{
"label": "Best Config Selection",
"icon": "πŸ†",
"keywords": ["search finished", "best estimator", "best loss", "best final"],
"done_kw": ["saving best model"],
"description": "Identifies the best-performing estimator and its configuration from the search results.",
},
{
"label": "Model Saving",
"icon": "πŸ’Ύ",
"keywords": ["saving best model", "model_path", "artifact_path"],
"done_kw": ["mlflow", "logged successfully"],
"description": "Serializes the trained model to disk using pickle.",
},
{
"label": "MLflow Logging",
"icon": "πŸ“",
"keywords": ["mlflow", "log_artifact", "logged successfully"],
"done_kw": ["thread finished"],
"description": "Persists model artifacts, parameters, and metrics to MLflow for tracking and versioning.",
},
],
"h2o": [
{
"label": "H2O Cluster Init",
"icon": "🌊",
"keywords": ["h2o cluster initialized", "initializing h2o", "h2o init"],
"done_kw": ["starting h2o automl"],
"description": "Starts the local H2O Java cluster and allocates memory for distributed model training.",
},
{
"label": "Data Preparation",
"icon": "πŸ“Š",
"keywords": ["preparing data", "h2oframe", "feature engineering", "asfactor"],
"done_kw": ["starting h2o automl training"],
"description": "Converts Pandas DataFrames to H2O frames and applies type casting for features/targets.",
},
{
"label": "AutoML Training",
"icon": "πŸ€–",
"keywords": ["starting h2o automl training", "automl session", "training completed", "aml.train"],
"done_kw": ["training completed in"],
"description": "H2O trains multiple model families (GBM, XGBoost, GLM, DRF, DeepLearning) and their variants.",
},
{
"label": "Leaderboard & Scoring",
"icon": "πŸ“",
"keywords": ["top 5 models", "leaderboard", "best model score", "auc", "total_models_trained"],
"done_kw": ["model saved at", "log model to mlflow"],
"description": "Ranks all trained models and evaluates the leader on the validation/test set.",
},
{
"label": "MLflow Logging",
"icon": "πŸ“",
"keywords": ["mlflow", "log_artifacts", "logged successfully", "artifacts logged"],
"done_kw": ["thread finished"],
"description": "Persists model artifacts, parameters, and metrics to MLflow for tracking and versioning.",
},
],
"tpot": [
{
"label": "Data Preparation",
"icon": "πŸ“Š",
"keywords": ["problem type:", "training data shape", "test data shape", "label encoder"],
"done_kw": ["starting tpot training"],
"description": "Applies feature engineering pipelines: TF-IDF for text, ordinal encoding, and standard scaling.",
},
{
"label": "Pipeline Generation (GA)",
"icon": "🧬",
"keywords": ["starting tpot training", "generation:", "pipeline score:", "optimizing pipeline"],
"done_kw": ["training completed"],
"description": "TPOT uses a Genetic Algorithm to evolve and select the best scikit-learn pipeline configurations.",
},
{
"label": "Pipeline Selection",
"icon": "πŸ†",
"keywords": ["training completed", "best pipeline", "fitted_pipeline_", "accuracy:", "f1_macro:"],
"done_kw": ["pipeline exported"],
"description": "Identifies the highest-scoring pipeline from the genetic search as the final model.",
},
{
"label": "Export & Analysis",
"icon": "πŸ“€",
"keywords": ["pipeline exported", "export", "classification report"],
"done_kw": ["mlflow"],
"description": "Exports the best pipeline as a .py file and generates a classification/regression report.",
},
{
"label": "MLflow Logging",
"icon": "πŸ“",
"keywords": ["mlflow", "tpot automl model", "registered_model_name", "logged successfully"],
"done_kw": ["thread finished"],
"description": "Persists model artifacts, parameters, and metrics to MLflow for tracking and versioning.",
},
],
"pycaret": [
{
"label": "Environment Setup",
"icon": "βš™οΈ",
"keywords": ["setting up pycaret", "dataset shape"],
"done_kw": ["comparing models", "step: comparing models..."],
"description": "Initializes the PyCaret setup, handling normalization, encoding, and train/test splits internally.",
},
{
"label": "Model Comparison",
"icon": "βš–οΈ",
"keywords": ["comparing models", "including fast/robust models"],
"done_kw": ["tuning best model", "step: tuning best model..."],
"description": "Trains and evaluates a fast baseline of multiple estimators to find the top candidates.",
},
{
"label": "Hyperparameter Tuning",
"icon": "πŸ”§",
"keywords": ["tuning best model", "step: tuning best model..."],
"done_kw": ["blending top models", "step: blending top models..."],
"description": "Applies randomized search to optimize hyperparameters of the best performing model.",
},
{
"label": "Model Blending",
"icon": "πŸŒͺ️",
"keywords": ["blending top models", "step: blending top models..."],
"done_kw": ["saving model", "pycaret experiment completed"],
"description": "Creates an ensemble of the top models to improve generalized performance via voting/averaging.",
},
{
"label": "MLflow Logging",
"icon": "πŸ“",
"keywords": ["saving model to", "pycaret experiment completed", "thread finished"],
"done_kw": ["thread finished"],
"description": "Persists model artifacts, parameters, and metrics to MLflow for tracking and versioning.",
},
],
"lale": [
{
"label": "Pipeline Definition",
"icon": "βš™οΈ",
"keywords": ["defining lale planned pipeline", "dataset shape"],
"done_kw": ["tuning with hyperopt", "step: tuning with hyperopt..."],
"description": "Maps a search space over transformers (PCA, Scalers) and estimators (LR, RF, KNN).",
},
{
"label": "Hyperopt Tuning",
"icon": "πŸ”§",
"keywords": ["tuning with hyperopt", "step: tuning with hyperopt..."],
"done_kw": ["fitting lale optimizer", "step: fitting lale optimizer"],
"description": "Configures Tree-structured Parzen Estimators (TPE) algorithm for intelligent hyperparameter search.",
},
{
"label": "Fitting Optimizer",
"icon": "πŸ•’",
"keywords": ["fitting lale optimizer", "step: fitting lale optimizer"],
"done_kw": ["saving model locally", "step: saving model locally"],
"description": "Executes identical cross-validation folds on generated pipelines within the set budget.",
},
{
"label": "Best Model Extraction",
"icon": "πŸ†",
"keywords": ["best pipeline structure:", "best f1 (macro) score"],
"done_kw": ["saving model locally", "step: saving model locally"],
"description": "Decodes the structure and metrics of the optimized pipeline graph.",
},
{
"label": "MLflow Logging",
"icon": "πŸ“",
"keywords": ["saving model locally", "lale experiment completed", "thread finished"],
"done_kw": ["thread finished"],
"description": "Persists model artifacts, parameters, and metrics to MLflow for tracking and versioning.",
},
],
}
# ── Public API ────────────────────────────────────────────────────────────────
def get_framework_steps(framework_key: str) -> list[dict]:
"""Return the step definitions for a given framework key."""
return _STEPS.get(framework_key.lower(), [])
def infer_pipeline_steps(framework_key: str, logs: list[str], status: str) -> list[dict]:
"""
Returns enriched step list with status attached:
status = "done" | "active" | "pending"
On completed/failed/cancelled runs, all matched steps are "done".
"""
steps = get_framework_steps(framework_key)
if not steps:
return []
log_blob = " ".join(logs).lower()
if status == "completed":
# Mark all steps done
return [{"label": s["label"], "icon": s["icon"], "description": s["description"], "status": "done"} for s in steps]
if status in ("failed", "cancelled"):
# Mark up to the last-seen step as done, rest pending, mark last active as failed
last_done_idx = -1
for i, step in enumerate(steps):
if any(kw in log_blob for kw in step["keywords"]):
last_done_idx = i
result = []
for i, step in enumerate(steps):
if i < last_done_idx:
st_val = "done"
elif i == last_done_idx:
st_val = "failed" if status == "failed" else "cancelled"
else:
st_val = "pending"
result.append({"label": step["label"], "icon": step["icon"], "description": step["description"], "status": st_val})
return result
# Running or queued: find the active step
last_done_idx = -1
for i, step in enumerate(steps):
done_signals = step.get("done_kw", [])
if any(kw in log_blob for kw in done_signals):
last_done_idx = i
# Active = first step after last_done
active_idx = min(last_done_idx + 1, len(steps) - 1)
result = []
for i, step in enumerate(steps):
if i <= last_done_idx:
st_val = "done"
elif i == active_idx and status == "running":
st_val = "active"
else:
st_val = "pending"
result.append({"label": step["label"], "icon": step["icon"], "description": step["description"], "status": st_val})
return result
def extract_best_tpot_pipeline(logs: list[str]) -> Optional[str]:
"""Extract the TPOT best pipeline string from logs."""
for line in reversed(logs):
if "best pipeline:" in line.lower() or "fitted_pipeline_" in line.lower():
return line.strip()
if "pipeline(" in line.lower():
return line.strip()
return None
def extract_autogluon_leaderboard_text(logs: list[str]) -> Optional[str]:
"""Extract leaderboard table text from AutoGluon logs."""
rows = []
capture = False
for line in logs:
if "model" in line.lower() and "score_val" in line.lower():
capture = True
if capture:
rows.append(line)
if len(rows) > 15:
break
return "\n".join(rows) if rows else None