""" Comprehensive multi-model training pipeline for AURIS. Trains and evaluates multiple classifier families on extracted audio features using stratified cross-validation, then selects the best model and exports it for production use. Models compared: - Logistic Regression - Random Forest - Gradient Boosting - Support Vector Machine (RBF) - Multi-Layer Perceptron - XGBoost (optional) - LightGBM (optional) Usage: python -m app.training.train_classifier data/training/features.csv Outputs: models/auris_classifier_v1.pkl - best trained model models/feature_scaler_v1.pkl - fitted StandardScaler models/feature_columns_v1.json - ordered feature column names models/training_results.json - model metrics and metadata """ from __future__ import annotations import csv import json import pickle import sys import time import warnings from pathlib import Path from typing import Any import numpy as np from sklearn.base import clone from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier from sklearn.exceptions import ConvergenceWarning from sklearn.linear_model import LogisticRegression from sklearn.calibration import CalibratedClassifierCV from sklearn.metrics import ( accuracy_score, f1_score, precision_score, recall_score, roc_auc_score, roc_curve, ) from sklearn.model_selection import StratifiedKFold, cross_val_predict, train_test_split from sklearn.neural_network import MLPClassifier from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.svm import SVC # Optional: XGBoost try: import xgboost as xgb HAS_XGB = True except ImportError: HAS_XGB = False # Optional: LightGBM try: import lightgbm as lgb HAS_LGBM = True except ImportError: HAS_LGBM = False sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from app.training.evaluate import evaluate_predictions, load_features_csv _EXCLUDED_COLUMNS = {"file_path", "label_int", "duration_sec", "sample_rate"} _TUNED_PARAM_KEYS: dict[str, tuple[str, ...]] = { "Logistic Regression": ("C", "class_weight", "max_iter"), "Random Forest": ( "n_estimators", "max_depth", "min_samples_leaf", "min_samples_split", "class_weight", "max_features", ), "Gradient Boosting": ( "n_estimators", "max_depth", "learning_rate", "subsample", "min_samples_leaf", "min_samples_split", ), "SVM (RBF)": ("C", "gamma", "class_weight"), "MLP Neural Network": ( "hidden_layer_sizes", "alpha", "max_iter", "validation_fraction", ), "XGBoost": ( "n_estimators", "max_depth", "learning_rate", "subsample", "colsample_bytree", "min_child_weight", "reg_alpha", "reg_lambda", "gamma", ), "LightGBM": ( "n_estimators", "max_depth", "learning_rate", "num_leaves", "subsample", "colsample_bytree", "min_child_samples", "reg_alpha", "reg_lambda", ), } def train( features_csv: str | Path, models_dir: str | Path = "models", n_folds: int = 5, ) -> dict[str, Any]: """ Train and evaluate all classifier candidates. Returns: Dict with per-model metrics, best model info, and saved paths. """ features_csv = Path(features_csv) models_dir = Path(models_dir) models_dir.mkdir(parents=True, exist_ok=True) X, y = load_features_csv(features_csv) feature_cols = _load_feature_columns(features_csv) X = np.nan_to_num(X, nan=0.0, posinf=1.0, neginf=-1.0) selected_candidates, tuning_results = _select_best_candidates(X, y) cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42) scaler = StandardScaler() X_scaled = scaler.fit_transform(X) best_name = "" best_auc = -1.0 all_results: dict[str, dict[str, Any]] = {} for name, model in selected_candidates: print("\n" + "-" * 56) print(f"Training: {name}") print("-" * 56) t0 = time.time() pipeline = _build_eval_pipeline(model) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=ConvergenceWarning) y_prob = cross_val_predict( pipeline, X, y, cv=cv, method="predict_proba", )[:, 1] threshold = _optimal_threshold(y, y_prob) y_pred = (y_prob >= threshold).astype(int) cv_time = time.time() - t0 acc = accuracy_score(y, y_pred) prec = precision_score(y, y_pred, zero_division=0) rec = recall_score(y, y_pred, zero_division=0) f1 = f1_score(y, y_pred, zero_division=0) auc = roc_auc_score(y, y_prob) tuning_meta = tuning_results.get(name, {}) print(f" Validation AUC: {tuning_meta.get('validation_auc', 0.0):.4f}") print(f" CV Accuracy: {acc:.4f}") print(f" CV Precision: {prec:.4f}") print(f" CV Recall: {rec:.4f}") print(f" CV F1 Score: {f1:.4f}") print(f" CV ROC-AUC: {auc:.4f}") print(f" CV Time: {cv_time:.1f}s") all_results[name] = { "accuracy": round(acc, 4), "precision": round(prec, 4), "recall": round(rec, 4), "f1": round(f1, 4), "roc_auc": round(auc, 4), "optimal_threshold": round(threshold, 4), "validation_auc": round(tuning_meta.get("validation_auc", 0.0), 4), "selection_time_sec": round(tuning_meta.get("selection_time_sec", 0.0), 2), "train_time_sec": round(cv_time, 2), "selected_params": tuning_meta.get("selected_params", {}), "y_true": y.tolist(), "y_pred": y_pred.tolist(), "y_prob": y_prob.tolist(), } if auc > best_auc: best_auc = auc best_name = name print("\n" + "=" * 64) print(f"BEST MODEL: {best_name} (ROC-AUC = {best_auc:.4f})") print("=" * 64) y_prob_best = np.array(all_results[best_name]["y_prob"]) y_pred_best = np.array(all_results[best_name]["y_pred"]) evaluate_predictions(y, y_pred_best, y_prob_best, title=f"Best: {best_name}") fitted_models: dict[str, Any] = {} all_model_paths: dict[str, str] = {} for name, model in selected_candidates: print(f"\nFitting final {name} on all {len(y)} samples...") final_model = clone(model) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=ConvergenceWarning) final_model.fit(X_scaled, y) fitted_models[name] = final_model model_pkl = models_dir / f"model_{_safe_model_name(name)}.pkl" with open(model_pkl, "wb") as f: pickle.dump(final_model, f) all_model_paths[name] = str(model_pkl) print(f" Saved: {model_pkl}") best_model = fitted_models[best_name] importance_data = _extract_importance(best_model, feature_cols) if importance_data: print("\nTop 15 features:") for fname, imp in importance_data[:15]: print(f" {fname:<35} {imp:.4f}") model_path = models_dir / "auris_classifier_v1.pkl" scaler_path = models_dir / "feature_scaler_v1.pkl" columns_path = models_dir / "feature_columns_v1.json" results_path = models_dir / "training_results.json" with open(model_path, "wb") as f: pickle.dump(best_model, f) with open(scaler_path, "wb") as f: pickle.dump(scaler, f) with open(columns_path, "w", encoding="utf-8") as f: json.dump(feature_cols, f, indent=2) json_results: dict[str, Any] = {} for name, data in all_results.items(): json_results[name] = { key: value for key, value in data.items() if key not in ("y_true", "y_pred", "y_prob") } json_results["_best_model"] = best_name json_results["_n_samples"] = len(y) json_results["_n_features"] = X.shape[1] json_results["_n_folds"] = n_folds json_results["_dataset_path"] = str(features_csv) json_results["_class_balance"] = { "ai": int(np.sum(y == 1)), "human": int(np.sum(y == 0)), } json_results["_data_leakage_fix"] = ( "duration_sec and sample_rate removed from features; scaler fitted per fold during CV" ) json_results["_model_paths"] = all_model_paths if importance_data: json_results["_feature_importance"] = { feature_name: round(imp, 6) for feature_name, imp in importance_data } with open(results_path, "w", encoding="utf-8") as f: json.dump(json_results, f, indent=2) print("\nSaved artifacts:") print(f" Model: {model_path}") print(f" Scaler: {scaler_path}") print(f" Columns: {columns_path}") print(f" Results: {results_path}") return { "best_model": best_name, "best_auc": best_auc, "all_results": all_results, "feature_cols": feature_cols, "model_path": str(model_path), } def _load_feature_columns(features_csv: Path) -> list[str]: with open(features_csv, "r", encoding="utf-8") as f: reader = csv.DictReader(f) return [ column for column in (reader.fieldnames or []) if column not in _EXCLUDED_COLUMNS ] def _select_best_candidates( X: np.ndarray, y: np.ndarray, ) -> tuple[list[tuple[str, Any]], dict[str, dict[str, Any]]]: """ Pick one tuned configuration per model family using a stratified holdout. """ X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, stratify=y, random_state=42, ) selected: list[tuple[str, Any]] = [] tuning_results: dict[str, dict[str, Any]] = {} for name, variants in _build_candidate_families(y_train).items(): print("\n" + "." * 56) print(f"Selecting hyperparameters for: {name}") print("." * 56) best_model = None best_auc = -1.0 best_params: dict[str, Any] = {} selection_start = time.time() for idx, model in enumerate(variants, start=1): pipeline = _build_eval_pipeline(model) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=ConvergenceWarning) pipeline.fit(X_train, y_train) y_prob = pipeline.predict_proba(X_val)[:, 1] auc = roc_auc_score(y_val, y_prob) params = _summarize_selected_params(name, model) print(f" Candidate {idx}: holdout AUC={auc:.4f} | params={params}") if auc > best_auc: best_auc = auc best_model = model best_params = params if best_model is None: raise RuntimeError(f"No valid candidate selected for {name}") tuning_results[name] = { "validation_auc": float(best_auc), "selected_params": best_params, "selection_time_sec": time.time() - selection_start, } selected.append((name, best_model)) print(f" Selected {name}: AUC={best_auc:.4f}") return selected, tuning_results def _class_ratio(y: np.ndarray) -> float: """Returns n_negative / n_positive for scale_pos_weight in XGBoost.""" n_pos = int(np.sum(y == 1)) n_neg = int(np.sum(y == 0)) return n_neg / n_pos if n_pos > 0 else 1.0 def _build_candidate_families(y: np.ndarray) -> dict[str, list[Any]]: families: dict[str, list[Any]] = { "Logistic Regression": [ LogisticRegression( C=value, max_iter=2500, class_weight="balanced", random_state=42, ) for value in (0.25, 0.5, 1.0, 2.0) ], "Random Forest": [ RandomForestClassifier( n_estimators=300, max_depth=12, min_samples_leaf=4, min_samples_split=8, max_features="sqrt", class_weight="balanced_subsample", random_state=42, n_jobs=-1, ), RandomForestClassifier( n_estimators=450, max_depth=18, min_samples_leaf=2, min_samples_split=4, max_features="sqrt", class_weight="balanced_subsample", random_state=42, n_jobs=-1, ), RandomForestClassifier( n_estimators=500, max_depth=None, min_samples_leaf=1, min_samples_split=2, max_features="log2", class_weight="balanced_subsample", random_state=42, n_jobs=-1, ), ], "Gradient Boosting": [ GradientBoostingClassifier( n_estimators=200, max_depth=3, learning_rate=0.05, subsample=0.8, min_samples_leaf=10, min_samples_split=20, random_state=42, ), GradientBoostingClassifier( n_estimators=260, max_depth=2, learning_rate=0.04, subsample=0.85, min_samples_leaf=12, min_samples_split=24, random_state=42, ), GradientBoostingClassifier( n_estimators=180, max_depth=4, learning_rate=0.07, subsample=0.75, min_samples_leaf=8, min_samples_split=16, random_state=42, ), ], "SVM (RBF)": [ CalibratedClassifierCV( SVC(kernel="rbf", C=c, gamma=g, class_weight="balanced", random_state=42), method="isotonic", cv=3, ) for c, g in ((1.0, "scale"), (3.0, "scale"), (6.0, 0.02), (10.0, 0.05)) ], "MLP Neural Network": [ MLPClassifier( hidden_layer_sizes=(128, 64), activation="relu", solver="adam", alpha=0.0005, learning_rate="adaptive", max_iter=500, early_stopping=True, validation_fraction=0.15, random_state=42, ), MLPClassifier( hidden_layer_sizes=(192, 96, 32), activation="relu", solver="adam", alpha=0.001, learning_rate="adaptive", max_iter=600, early_stopping=True, validation_fraction=0.15, random_state=42, ), MLPClassifier( hidden_layer_sizes=(256, 128), activation="relu", solver="adam", alpha=0.002, learning_rate="adaptive", max_iter=700, early_stopping=True, validation_fraction=0.15, random_state=42, ), ], } if HAS_XGB: _spw = _class_ratio(y) families["XGBoost"] = [ xgb.XGBClassifier( n_estimators=300, max_depth=4, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, min_child_weight=4, reg_alpha=0.2, reg_lambda=1.2, gamma=0.1, scale_pos_weight=_spw, eval_metric="logloss", tree_method="hist", random_state=42, n_jobs=-1, verbosity=0, ), xgb.XGBClassifier( n_estimators=500, max_depth=3, learning_rate=0.03, subsample=0.9, colsample_bytree=0.8, min_child_weight=2, reg_alpha=0.1, reg_lambda=1.0, gamma=0.0, scale_pos_weight=_spw, eval_metric="logloss", tree_method="hist", random_state=42, n_jobs=-1, verbosity=0, ), xgb.XGBClassifier( n_estimators=240, max_depth=5, learning_rate=0.06, subsample=0.75, colsample_bytree=0.75, min_child_weight=6, reg_alpha=0.4, reg_lambda=1.5, gamma=0.2, scale_pos_weight=_spw, eval_metric="logloss", tree_method="hist", random_state=42, n_jobs=-1, verbosity=0, ), ] if HAS_LGBM: families["LightGBM"] = [ lgb.LGBMClassifier( n_estimators=300, max_depth=-1, learning_rate=0.05, num_leaves=31, subsample=0.8, colsample_bytree=0.8, min_child_samples=20, reg_alpha=0.1, reg_lambda=1.0, class_weight="balanced", random_state=42, verbose=-1, ), lgb.LGBMClassifier( n_estimators=500, max_depth=8, learning_rate=0.03, num_leaves=24, subsample=0.9, colsample_bytree=0.8, min_child_samples=30, reg_alpha=0.2, reg_lambda=1.2, class_weight="balanced", random_state=42, verbose=-1, ), lgb.LGBMClassifier( n_estimators=220, max_depth=6, learning_rate=0.07, num_leaves=18, subsample=0.75, colsample_bytree=0.75, min_child_samples=24, reg_alpha=0.3, reg_lambda=1.5, class_weight="balanced", random_state=42, verbose=-1, ), ] return families def _optimal_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> float: """Youden's J statistic: threshold that maximises sensitivity + specificity - 1.""" fpr, tpr, thresholds = roc_curve(y_true, y_prob) j_scores = tpr - fpr return float(thresholds[np.argmax(j_scores)]) def _build_eval_pipeline(model: Any) -> Pipeline: return Pipeline( [ ("scaler", StandardScaler()), ("model", clone(model)), ] ) def _safe_model_name(name: str) -> str: return ( name.lower() .replace(" ", "_") .replace("(", "") .replace(")", "") .replace("/", "_") ) def _summarize_selected_params(name: str, model: Any) -> dict[str, Any]: tuned_keys = _TUNED_PARAM_KEYS.get(name, ()) params = model.get_params() # CalibratedClassifierCV nests params as "estimator__" flat: dict[str, Any] = {} for key, value in params.items(): flat_key = key.split("__")[-1] if flat_key not in flat: flat[flat_key] = value return {key: flat[key] for key in tuned_keys if key in flat} def _extract_importance( model: Any, feature_cols: list[str], ) -> list[tuple[str, float]]: importances = None if hasattr(model, "feature_importances_"): importances = model.feature_importances_ elif hasattr(model, "coef_"): importances = np.abs(model.coef_[0]) if importances is None: return [] total = np.sum(importances) if total > 0: importances = importances / total return sorted( zip(feature_cols, importances.tolist()), key=lambda item: item[1], reverse=True, ) if __name__ == "__main__": csv_path = sys.argv[1] if len(sys.argv) > 1 else "data/training/features.csv" model_dir = sys.argv[2] if len(sys.argv) > 2 else "models" train(csv_path, model_dir)