Multi-AutoML-Interface / src /tpot_utils.py
PedroM2626's picture
feat: add support for multiple AutoML frameworks (TPOT, H2O, AutoGluon, FLAML) including data preprocessing and MLflow integration.
9c720d9
import os
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
import shutil
import logging
import time
import warnings
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, LabelEncoder, OrdinalEncoder
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, f1_score, classification_report, mean_squared_error, r2_score
# Monkeypatch for scikit-learn >= 1.2 compatibility with TPOT
import sklearn.metrics
if not hasattr(sklearn.metrics, 'SCORERS'):
try:
from sklearn.metrics import get_scorer_names
sklearn.metrics.SCORERS = {name: name for name in get_scorer_names()}
except ImportError:
pass
from tpot import TPOTClassifier, TPOTRegressor
from src.mlflow_utils import safe_set_experiment
logger = logging.getLogger(__name__)
def detect_problem_type(y):
"""Detect if problem is classification or regression"""
if pd.api.types.is_numeric_dtype(y):
unique_values = y.nunique()
if unique_values <= 20 and all(y % 1 == 0 for val in y.dropna()):
return 'classification'
else:
return 'regression'
else:
return 'classification'
def create_feature_pipeline(df, target_column, text_columns=None, tfidf_max_features=500, tfidf_ngram_range=(1, 2)):
"""Create feature engineering pipeline for TPOT"""
# Identify column types
categorical_columns = df.select_dtypes(include=['object', 'category']).columns.tolist()
numerical_columns = df.select_dtypes(include=[np.number]).columns.tolist()
# Remove target column from features
if target_column in categorical_columns:
categorical_columns.remove(target_column)
if target_column in numerical_columns:
numerical_columns.remove(target_column)
# Handle text columns separately
if text_columns is None:
text_columns = []
# Auto-detect text columns (high cardinality object columns)
for col in categorical_columns:
if df[col].nunique() > len(df) * 0.5 and df[col].dtype == 'object':
text_columns.append(col)
categorical_columns.remove(col)
transformers = []
# Text processing
if text_columns:
for col in text_columns:
transformers.append((f'tfidf_{col}', TfidfVectorizer(
max_features=tfidf_max_features,
ngram_range=tfidf_ngram_range,
stop_words='english',
dtype=np.float64,
token_pattern=r'(?u)\b\w+\b' # Handle empty strings better
), col))
# Numerical features
if numerical_columns:
transformers.append(('num', StandardScaler(), numerical_columns))
# Categorical features (non-text)
if categorical_columns:
# For TPOT, we use OrdinalEncoder to prevent dimension explosion on high cardinality
transformers.append(('cat', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1), categorical_columns))
if transformers:
# TPOT requires dense arrays for most operations, so we force sparse_threshold=0
preprocessor = ColumnTransformer(transformers, remainder='drop', sparse_threshold=0)
else:
preprocessor = None
return preprocessor, text_columns, categorical_columns, numerical_columns
def prepare_data_for_tpot(df, target_column, test_data=None, test_size=0.2, random_state=42):
"""Prepare data for TPOT training"""
# Drop rows with missing target
df_clean = df.dropna(subset=[target_column]).copy()
# Handle missing values in features
# For text columns, fill with empty string
# For numerical columns, fill with median
# For categorical columns, fill with mode
for col in df_clean.columns:
if col != target_column:
if df_clean[col].dtype == 'object':
df_clean[col] = df_clean[col].fillna('').astype(str)
else:
df_clean[col] = df_clean[col].fillna(df_clean[col].median())
# Convert all object columns to string to avoid mixed types
for col in df_clean.select_dtypes(include=['object']).columns:
if col != target_column:
df_clean[col] = df_clean[col].astype(str)
# Process test_data if provided
if test_data is not None:
if target_column not in test_data.columns:
raise ValueError(f"Target column '{target_column}' not found in Test data.")
test_clean = test_data.dropna(subset=[target_column]).copy()
for col in test_clean.columns:
if col != target_column:
if test_clean[col].dtype == 'object':
test_clean[col] = test_clean[col].fillna('').astype(str)
else:
test_clean[col] = test_clean[col].fillna(test_clean[col].median())
for col in test_clean.select_dtypes(include=['object']).columns:
if col != target_column:
test_clean[col] = test_clean[col].astype(str)
# Split features and target
X_train = df_clean.drop(columns=[target_column])
y_train = df_clean[target_column].copy()
if test_data is not None:
X_test = test_clean.drop(columns=[target_column])
y_test = test_clean[target_column].copy()
# Handle target encoding for classification
problem_type = detect_problem_type(y_train)
label_encoder = None
if problem_type == 'classification' and y_train.dtype == 'object':
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(y_train)
if test_data is not None:
# Handle unknown labels in test by assigning them to a special class or throwing error
# For simplicity in AutoML we fit_transform on train and transform on test, catching unknown label cases if needed
try:
y_test = label_encoder.transform(y_test)
except ValueError:
# If there are new labels in test, handle them gracefully by forcing a combined fit
combined_y = pd.concat([df_clean[target_column], test_clean[target_column]])
label_encoder.fit(combined_y)
y_train = label_encoder.transform(df_clean[target_column])
y_test = label_encoder.transform(test_clean[target_column])
# Split data if test_data is not explicitly provided
if test_data is None:
X_train, X_test, y_train, y_test = train_test_split(
X_train, y_train, test_size=test_size, random_state=random_state, stratify=y_train if problem_type == 'classification' else None
)
return X_train, X_test, y_train, y_test, problem_type, label_encoder
def train_tpot_model(df, target_column, run_name,
valid_data=None, test_data=None,
generations=5, population_size=20, cv=5,
scoring=None, max_time_mins=30, max_eval_time_mins=5, random_state=42,
verbosity=2, n_jobs=-1, config_dict='TPOT sparse',
tfidf_max_features=500, tfidf_ngram_range=(1, 2)):
"""
Train TPOT model with MLflow tracking
"""
try:
# TPOT handles validation automatically via CV. If validation is passed, concatenate to train for larger pool
if valid_data is not None:
if target_column not in valid_data.columns:
raise ValueError(f"Target column '{target_column}' not found in Validation data.")
df = pd.concat([df, valid_data], ignore_index=True)
mlflow.log_param("has_validation_data", True)
# Setup experiment
safe_set_experiment("TPOT_Experiments")
# Prepare data
X_train, X_test, y_train, y_test, problem_type, label_encoder = prepare_data_for_tpot(
df, target_column, test_data=test_data, random_state=random_state
)
# Create feature pipeline
preprocessor, text_columns, cat_columns, num_columns = create_feature_pipeline(
X_train, target_column,
tfidf_max_features=tfidf_max_features,
tfidf_ngram_range=tfidf_ngram_range
)
# Process features
if preprocessor is not None:
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)
else:
X_train_processed = X_train
X_test_processed = X_test
logger.info(f"Problem type: {problem_type}")
logger.info(f"Training data shape: {X_train_processed.shape}")
logger.info(f"Test data shape: {X_test_processed.shape}")
# Check for high dimensional data and adjust parameters
n_features = X_train_processed.shape[1]
n_samples = X_train_processed.shape[0]
if n_features > 10000:
logger.warning(f"High dimensional data detected: {n_features} features")
# Reduce complexity for high dimensional data
generations = min(generations, 2)
population_size = min(population_size, 10)
max_eval_time_mins = min(max_eval_time_mins, 2)
config_dict = 'TPOT light'
logger.info(f"Adjusted parameters for high dimensional data: generations={generations}, population_size={population_size}")
if n_samples > 50000:
logger.warning(f"Large dataset detected: {n_samples} samples")
# Reduce complexity for large datasets
cv = min(cv, 3)
max_eval_time_mins = min(max_eval_time_mins, 1)
logger.info(f"Adjusted parameters for large dataset: cv={cv}, max_eval_time_mins={max_eval_time_mins}")
# Default scoring based on problem type
if scoring is None:
if problem_type == 'classification':
scoring = 'f1_macro'
else:
scoring = 'neg_mean_squared_error'
# Ensure there are no loose active runs that could cause errors on start
while mlflow.active_run():
mlflow.end_run()
with mlflow.start_run(run_name=run_name) as run:
logger.info(f"Starting TPOT training for run: {run_name}")
# Choose TPOT class based on problem type
if problem_type == 'classification':
tpot = TPOTClassifier(
generations=generations,
population_size=population_size,
cv=cv,
scoring=scoring,
max_time_mins=max_time_mins,
max_eval_time_mins=max_eval_time_mins,
random_state=random_state,
verbosity=verbosity,
n_jobs=n_jobs,
config_dict=config_dict
)
else:
tpot = TPOTRegressor(
generations=generations,
population_size=population_size,
cv=cv,
scoring=scoring,
max_time_mins=max_time_mins,
max_eval_time_mins=max_eval_time_mins,
random_state=random_state,
verbosity=verbosity,
n_jobs=n_jobs,
config_dict=config_dict
)
# Log parameters
mlflow.log_param("problem_type", problem_type)
mlflow.log_param("generations", generations)
mlflow.log_param("population_size", population_size)
mlflow.log_param("cv", cv)
mlflow.log_param("scoring", scoring)
mlflow.log_param("max_time_mins", max_time_mins)
mlflow.log_param("max_eval_time_mins", max_eval_time_mins)
mlflow.log_param("random_state", random_state)
mlflow.log_param("config_dict", config_dict)
mlflow.log_param("n_jobs", n_jobs)
mlflow.log_param("target", target_column)
mlflow.log_param("n_features", n_features)
mlflow.log_param("n_samples", n_samples)
mlflow.log_param("text_columns", text_columns)
mlflow.log_param("categorical_columns", cat_columns)
mlflow.log_param("numerical_columns", num_columns)
# Train model with error handling
try:
start_time = time.time()
tpot.fit(X_train_processed, y_train)
training_duration = time.time() - start_time
logger.info(f"Training completed in {training_duration:.2f} seconds")
except Exception as tpot_error:
logger.error(f"Error during TPOT training: {tpot_error}")
# Try with simpler configuration
logger.info("Trying with simpler configuration...")
tpot = TPOTClassifier(
generations=1,
population_size=5,
cv=2,
scoring='accuracy' if problem_type == 'classification' else 'neg_mean_squared_error',
max_time_mins=min(max_time_mins, 5),
max_eval_time_mins=1,
random_state=random_state,
verbosity=1,
n_jobs=1,
config_dict='TPOT light'
)
tpot.fit(X_train_processed, y_train)
training_duration = time.time() - start_time
logger.info(f"Simplified training completed in {training_duration:.2f} seconds")
# Predictions
y_pred = tpot.predict(X_test_processed)
# Calculate metrics
if problem_type == 'classification':
accuracy = accuracy_score(y_test, y_pred)
f1_macro = f1_score(y_test, y_pred, average='macro')
f1_weighted = f1_score(y_test, y_pred, average='weighted')
logger.info(f"Accuracy: {accuracy:.4f}")
logger.info(f"F1-Score (macro): {f1_macro:.4f}")
logger.info(f"F1-Score (weighted): {f1_weighted:.4f}")
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_macro", f1_macro)
mlflow.log_metric("f1_weighted", f1_weighted)
# Classification report
try:
if label_encoder is not None:
# Convert back to original labels for report
y_test_orig = label_encoder.inverse_transform(y_test)
y_pred_orig = label_encoder.inverse_transform(y_pred)
class_names = label_encoder.classes_
else:
y_test_orig = y_test
y_pred_orig = y_pred
class_names = [f"Class_{i}" for i in np.unique(y_test)]
report = classification_report(y_test_orig, y_pred_orig, target_names=class_names)
logger.info(f"\nClassification Report:\n{report}")
# Save classification report
report_path = f"classification_report_{run_name}.txt"
with open(report_path, "w") as f:
f.write(f"TPOT AutoML Classification Report\n")
f.write(f"{'='*50}\n\n")
f.write(f"Problem Type: {problem_type}\n")
f.write(f"Best Pipeline: {tpot.fitted_pipeline_}\n\n")
f.write(report)
mlflow.log_artifact(report_path)
except Exception as e:
logger.warning(f"Could not generate classification report: {e}")
else: # Regression
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)
logger.info(f"MSE: {mse:.4f}")
logger.info(f"RMSE: {rmse:.4f}")
logger.info(f"R²: {r2:.4f}")
# Log metrics
mlflow.log_metric("mse", mse)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("training_duration", training_duration)
# Create complete pipeline for saving
if preprocessor is not None:
final_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', tpot.fitted_pipeline_)
])
else:
final_pipeline = tpot.fitted_pipeline_
# Save model info
model_info = {
"problem_type": problem_type,
"best_pipeline": str(tpot.fitted_pipeline_),
"generations": generations,
"population_size": population_size,
"cv": cv,
"scoring": scoring,
"training_duration": training_duration,
"n_features": n_features,
"n_samples": n_samples
}
if problem_type == 'classification':
model_info.update({
"accuracy": accuracy if 'accuracy' in locals() else None,
"f1_macro": f1_macro if 'f1_macro' in locals() else None,
"f1_weighted": f1_weighted if 'f1_weighted' in locals() else None
})
else:
model_info.update({
"mse": mse if 'mse' in locals() else None,
"rmse": rmse if 'rmse' in locals() else None,
"r2": r2 if 'r2' in locals() else None
})
# Export pipeline
pipeline_path = f"tpot_models/best_pipeline_{run_name}.py"
os.makedirs("tpot_models", exist_ok=True)
tpot.export(pipeline_path)
logger.info(f"Pipeline exported to {pipeline_path}")
# Save model info
info_path = f"tpot_models/model_info_{run_name}.txt"
with open(info_path, "w") as f:
f.write("TPOT AutoML Model Information\n")
f.write(f"{'='*50}\n\n")
for key, value in model_info.items():
f.write(f"{key}: {value}\n")
mlflow.log_artifact(pipeline_path)
mlflow.log_artifact(info_path)
# Log the fitted pipeline
mlflow.sklearn.log_model(final_pipeline, "model", registered_model_name=f"TPOT_{run_name}")
logger.info("TPOT model successfully registered in MLflow")
return tpot, final_pipeline, run.info.run_id, model_info
except Exception as e:
logger.error(f"Error during TPOT training: {e}")
raise
def load_tpot_model(run_id, model_path="model"):
"""Load TPOT model from MLflow"""
try:
model = mlflow.sklearn.load_model(f"runs:/{run_id}/{model_path}")
return model
except Exception as e:
logger.error(f"Error loading TPOT model: {e}")
raise
def predict_with_tpot(model, data, preprocessor=None):
"""Make predictions with TPOT model"""
try:
if preprocessor is not None:
data_processed = preprocessor.transform(data)
else:
data_processed = data
predictions = model.predict(data_processed)
return predictions
except Exception as e:
logger.error(f"Error during TPOT prediction: {e}")
raise