Data-Science-Agent / src /tools /model_training.py
Pulastya B
Fixed Scalability issues
08646ab
"""
Model Training Tools
Tools for training machine learning models and generating reports.
"""
import polars as pl
import numpy as np
from typing import Dict, Any, List, Optional
from pathlib import Path
import sys
import os
import joblib
import json
import tempfile
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import artifact store
try:
from storage.helpers import save_model_with_store
ARTIFACT_STORE_AVAILABLE = True
except ImportError:
ARTIFACT_STORE_AVAILABLE = False
print("⚠️ Artifact store not available, using local paths")
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from xgboost import XGBClassifier, XGBRegressor
from lightgbm import LGBMClassifier, LGBMRegressor
from catboost import CatBoostClassifier, CatBoostRegressor
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
confusion_matrix, classification_report,
mean_squared_error, mean_absolute_error, r2_score
)
import shap
try:
from .visualization_engine import (
generate_model_performance_plots,
generate_feature_importance_plot
)
VISUALIZATION_AVAILABLE = True
except ImportError as e:
VISUALIZATION_AVAILABLE = False
print(f"⚠️ Visualization engine not available: {e}")
from ..utils.polars_helpers import (
load_dataframe,
get_numeric_columns,
split_features_target,
)
from ..utils.validation import (
validate_file_exists,
validate_file_format,
validate_dataframe,
validate_column_exists,
validate_target_column,
)
def train_baseline_models(file_path: str, target_col: str,
task_type: str = "auto",
test_size: float = 0.2,
random_state: int = 42) -> Dict[str, Any]:
"""
Train multiple baseline models and compare performance.
Args:
file_path: Path to prepared dataset
target_col: Name of target column
task_type: 'classification', 'regression', or 'auto'
test_size: Proportion for test split
random_state: Random seed
Returns:
Dictionary with training results and best model
"""
# Validation
validate_file_exists(file_path)
validate_file_format(file_path)
# Load data
df = load_dataframe(file_path)
validate_dataframe(df)
validate_column_exists(df, target_col)
# Infer task type if auto
if task_type == "auto":
task_type = validate_target_column(df, target_col)
# Split features and target
X, y = split_features_target(df, target_col)
# Convert to numpy for sklearn
# Only keep numeric columns for X
numeric_cols = get_numeric_columns(X)
if len(numeric_cols) == 0:
return {
"status": "error",
"message": "No numeric features found. Please encode categorical variables first."
}
X_numeric = X.select(numeric_cols)
X_np = X_numeric.to_numpy()
y_np = y.to_numpy()
# Handle missing values (simple imputation with mean)
from sklearn.impute import SimpleImputer
imputer = SimpleImputer(strategy='mean')
X_np = imputer.fit_transform(X_np)
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
X_np, y_np, test_size=test_size, random_state=random_state
)
results = {
"task_type": task_type,
"n_features": X_np.shape[1],
"n_samples": len(X_np),
"train_size": len(X_train),
"test_size": len(X_test),
"feature_names": numeric_cols,
"models": {}
}
# Train models based on task type
import sys
print(f"\n🚀 Training {5 if task_type == 'classification' else 5} baseline models...", flush=True)
print(f" 📊 Training set: {len(X_train):,} samples × {X_train.shape[1]} features", flush=True)
print(f" 📊 Test set: {len(X_test):,} samples", flush=True)
print(f" ⚡ Note: Random Forest excluded to optimize compute resources", flush=True)
sys.stdout.flush()
if task_type == "classification":
models = {
"logistic_regression": LogisticRegression(max_iter=1000, random_state=random_state),
"xgboost": XGBClassifier(n_estimators=100, random_state=random_state, n_jobs=-1),
"lightgbm": LGBMClassifier(n_estimators=100, random_state=random_state, n_jobs=-1, verbose=-1),
"catboost": CatBoostClassifier(iterations=100, random_state=random_state, verbose=0, allow_writing_files=False)
}
for idx, (model_name, model) in enumerate(models.items(), 1):
try:
# Train
print(f"\n [{idx}/{len(models)}] Training {model_name}...", flush=True)
sys.stdout.flush()
import time
start_time = time.time()
model.fit(X_train, y_train)
elapsed = time.time() - start_time
print(f" ✓ {model_name} trained in {elapsed:.1f}s", flush=True)
sys.stdout.flush()
# Predict
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# Metrics
results["models"][model_name] = {
"train_metrics": {
"accuracy": float(accuracy_score(y_train, y_pred_train)),
"precision": float(precision_score(y_train, y_pred_train, average='weighted', zero_division=0)),
"recall": float(recall_score(y_train, y_pred_train, average='weighted', zero_division=0)),
"f1": float(f1_score(y_train, y_pred_train, average='weighted', zero_division=0))
},
"test_metrics": {
"accuracy": float(accuracy_score(y_test, y_pred_test)),
"precision": float(precision_score(y_test, y_pred_test, average='weighted', zero_division=0)),
"recall": float(recall_score(y_test, y_pred_test, average='weighted', zero_division=0)),
"f1": float(f1_score(y_test, y_pred_test, average='weighted', zero_division=0))
}
}
# Save model using artifact store
if ARTIFACT_STORE_AVAILABLE:
model_path = save_model_with_store(
model_data={
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
},
filename=f"{model_name}.pkl",
metadata={
"model_name": model_name,
"task_type": "classification",
"train_accuracy": float(accuracy_score(y_train, y_pred_train)),
"test_accuracy": float(accuracy_score(y_test, y_pred_test)),
"features": numeric_cols
}
)
else:
model_path = f"./outputs/models/{model_name}.pkl"
Path(model_path).parent.mkdir(parents=True, exist_ok=True)
joblib.dump({
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
}, model_path)
results["models"][model_name]["model_path"] = model_path
except Exception as e:
results["models"][model_name] = {
"status": "error",
"message": str(e)
}
else: # regression
models = {
"ridge": Ridge(random_state=random_state),
"lasso": Lasso(random_state=random_state),
"xgboost": XGBRegressor(n_estimators=100, random_state=random_state, n_jobs=-1),
"lightgbm": LGBMRegressor(n_estimators=100, random_state=random_state, n_jobs=-1, verbose=-1),
"catboost": CatBoostRegressor(iterations=100, random_state=random_state, verbose=0, allow_writing_files=False)
}
for idx, (model_name, model) in enumerate(models.items(), 1):
try:
# Train
import sys
print(f"\n [{idx}/{len(models)}] Training {model_name}...", flush=True)
sys.stdout.flush()
import time
start_time = time.time()
model.fit(X_train, y_train)
elapsed = time.time() - start_time
print(f" ✓ {model_name} trained in {elapsed:.1f}s", flush=True)
sys.stdout.flush()
# Predict
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# Metrics
results["models"][model_name] = {
"train_metrics": {
"mse": float(mean_squared_error(y_train, y_pred_train)),
"rmse": float(np.sqrt(mean_squared_error(y_train, y_pred_train))),
"mae": float(mean_absolute_error(y_train, y_pred_train)),
"r2": float(r2_score(y_train, y_pred_train))
},
"test_metrics": {
"mse": float(mean_squared_error(y_test, y_pred_test)),
"rmse": float(np.sqrt(mean_squared_error(y_test, y_pred_test))),
"mae": float(mean_absolute_error(y_test, y_pred_test)),
"r2": float(r2_score(y_test, y_pred_test))
}
}
# Save model using artifact store
if ARTIFACT_STORE_AVAILABLE:
model_path = save_model_with_store(
model_data={
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
},
filename=f"{model_name}.pkl",
metadata={
"model_name": model_name,
"task_type": "regression",
"train_r2": float(r2_score(y_train, y_pred_train)),
"test_r2": float(r2_score(y_test, y_pred_test)),
"features": numeric_cols
}
)
else:
model_path = f"./outputs/models/{model_name}.pkl"
Path(model_path).parent.mkdir(parents=True, exist_ok=True)
joblib.dump({
"model": model,
"imputer": imputer,
"feature_names": numeric_cols
}, model_path)
results["models"][model_name]["model_path"] = model_path
except Exception as e:
results["models"][model_name] = {
"status": "error",
"message": str(e)
}
# Determine best model
best_model_name = None
best_score = -float('inf')
for model_name, model_results in results["models"].items():
if "test_metrics" in model_results:
if task_type == "classification":
score = model_results["test_metrics"]["f1"]
else:
score = model_results["test_metrics"]["r2"]
if score > best_score:
best_score = score
best_model_name = model_name
results["best_model"] = {
"name": best_model_name,
"score": best_score,
"model_path": results["models"][best_model_name]["model_path"] if best_model_name else None
}
# ⚠️ Add guidance for hyperparameter tuning on large datasets
if results["n_samples"] > 100000:
# Recommend faster models for large datasets
fast_models = ["xgboost", "lightgbm"]
if best_model_name in fast_models:
results["tuning_recommendation"] = {
"suggested_model": best_model_name,
"reason": f"{best_model_name} is optimal for large datasets - fast training and good performance"
}
elif best_model_name == "random_forest_legacy": # Disabled for compute optimization
# Find next best fast model
fast_model_scores = {name: results["models"][name]["test_metrics"].get("r2" if task_type == "regression" else "f1", 0)
for name in fast_models if name in results["models"]}
if fast_model_scores:
alt_model = max(fast_model_scores, key=fast_model_scores.get)
alt_score = fast_model_scores[alt_model]
score_diff = abs(best_score - alt_score)
if score_diff < 0.05: # Less than 5% difference
results["tuning_recommendation"] = {
"suggested_model": alt_model,
"reason": f"For large datasets, {alt_model} is 5-10x faster than {best_model_name} with similar performance (score difference: {score_diff:.4f})"
}
# Generate visualizations for best model
if VISUALIZATION_AVAILABLE and best_model_name:
try:
print(f"\n🎨 Generating visualizations for {best_model_name}...")
# Load best model
model_data = joblib.dump({
"model": models[best_model_name],
"imputer": imputer,
"feature_names": numeric_cols
}, f"./outputs/models/{best_model_name}_temp.pkl")
# Get predictions for visualization
best_model = models[best_model_name]
y_pred_test = best_model.predict(X_test)
y_pred_proba = None
if hasattr(best_model, "predict_proba") and task_type == "classification":
y_pred_proba = best_model.predict_proba(X_test)
# Generate model performance plots
plot_dir = "./outputs/plots/model_performance"
perf_plots = generate_model_performance_plots(
y_true=y_test,
y_pred=y_pred_test,
y_pred_proba=y_pred_proba,
task_type=task_type,
model_name=best_model_name,
output_dir=plot_dir
)
results["performance_plots"] = perf_plots["plot_paths"]
# Generate feature importance plot if available
if hasattr(best_model, "feature_importances_"):
feature_importance = dict(zip(numeric_cols, best_model.feature_importances_))
importance_plot = generate_feature_importance_plot(
feature_importances=feature_importance,
output_path=f"{plot_dir}/feature_importance_{best_model_name}.png"
)
results["feature_importance_plot"] = importance_plot
print(f" ✓ Generated {len(perf_plots.get('plot_paths', []))} performance plots")
results["visualization_generated"] = True
except Exception as e:
print(f" ⚠️ Could not generate visualizations: {str(e)}")
results["visualization_generated"] = False
else:
results["visualization_generated"] = False
# Print final summary
print(f"\n{'='*60}")
print(f"✅ TRAINING COMPLETE")
print(f"{'='*60}")
print(f"📊 Best Model: {best_model_name}")
if task_type == "regression":
print(f"📈 Test R²: {best_score:.4f}")
print(f"📉 Test RMSE: {results['models'][best_model_name]['test_metrics']['rmse']:.4f}")
else:
print(f"📈 Test F1: {best_score:.4f}")
print(f"📉 Test Accuracy: {results['models'][best_model_name]['test_metrics']['accuracy']:.4f}")
print(f"💾 Model saved: {results['best_model']['model_path']}")
print(f"{'='*60}\\n")
return results
def generate_model_report(model_path: str, test_data_path: str,
target_col: str, output_path: str) -> Dict[str, Any]:
"""
Generate comprehensive model evaluation report.
Args:
model_path: Path to saved model file
test_data_path: Path to test dataset
target_col: Name of target column
output_path: Path to save report JSON
Returns:
Dictionary with model report
"""
# Validation
validate_file_exists(model_path)
validate_file_exists(test_data_path)
# Load model
model_data = joblib.load(model_path)
model = model_data["model"]
imputer = model_data["imputer"]
feature_names = model_data["feature_names"]
# Load test data
df = load_dataframe(test_data_path)
validate_dataframe(df)
validate_column_exists(df, target_col)
# Prepare features
X = df.select(feature_names)
y = df[target_col].to_numpy()
X_np = imputer.transform(X.to_numpy())
# Predict
y_pred = model.predict(X_np)
# Determine task type
if hasattr(model, "predict_proba"):
task_type = "classification"
else:
task_type = "regression"
report = {
"model_path": model_path,
"task_type": task_type,
"n_features": len(feature_names),
"n_samples": len(X_np)
}
# Calculate metrics
if task_type == "classification":
report["metrics"] = {
"accuracy": float(accuracy_score(y, y_pred)),
"precision": float(precision_score(y, y_pred, average='weighted', zero_division=0)),
"recall": float(recall_score(y, y_pred, average='weighted', zero_division=0)),
"f1": float(f1_score(y, y_pred, average='weighted', zero_division=0))
}
# Confusion matrix
cm = confusion_matrix(y, y_pred)
report["confusion_matrix"] = cm.tolist()
# Classification report
class_report = classification_report(y, y_pred, output_dict=True, zero_division=0)
report["classification_report"] = class_report
else: # regression
report["metrics"] = {
"mse": float(mean_squared_error(y, y_pred)),
"rmse": float(np.sqrt(mean_squared_error(y, y_pred))),
"mae": float(mean_absolute_error(y, y_pred)),
"r2": float(r2_score(y, y_pred))
}
# Feature importance
if hasattr(model, "feature_importances_"):
importances = model.feature_importances_
feature_importance = [
{"feature": name, "importance": float(imp)}
for name, imp in zip(feature_names, importances)
]
feature_importance.sort(key=lambda x: x["importance"], reverse=True)
report["feature_importance"] = feature_importance[:20] # Top 20
# SHAP values (for top 10 features)
try:
# Use TreeExplainer for tree-based models
if hasattr(model, "feature_importances_"):
explainer = shap.TreeExplainer(model)
else:
# Use KernelExplainer for other models (sample for speed)
sample_size = min(100, len(X_np))
explainer = shap.KernelExplainer(
model.predict,
X_np[:sample_size]
)
shap_values = explainer.shap_values(X_np[:100]) # First 100 samples
# Calculate mean absolute SHAP values
if isinstance(shap_values, list): # Multi-class
shap_values = shap_values[0]
mean_shap = np.abs(shap_values).mean(axis=0)
shap_importance = [
{"feature": name, "shap_value": float(val)}
for name, val in zip(feature_names, mean_shap)
]
shap_importance.sort(key=lambda x: x["shap_value"], reverse=True)
report["shap_feature_importance"] = shap_importance[:10] # Top 10
except Exception as e:
report["shap_error"] = f"Could not compute SHAP values: {str(e)}"
# Save report
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
json.dump(report, f, indent=2)
report["output_path"] = output_path
return report