""" Feature importance and leave-one-feature-out ablation for the 10 face_orientation features. Run: python -m evaluation.feature_importance Outputs: - XGBoost gain-based importance (from trained checkpoint) - Leave-one-feature-out LOPO F1 (ablation): drop each feature in turn, report mean LOPO F1. - Writes evaluation/feature_selection_justification.md """ import os import sys import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.metrics import f1_score from xgboost import XGBClassifier _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from data_preparation.prepare_dataset import load_per_person, SELECTED_FEATURES SEED = 42 FEATURES = SELECTED_FEATURES["face_orientation"] def _resolve_xgb_path(): return os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json") def xgb_feature_importance(): """Load trained XGBoost and return gain-based importance for the 10 features.""" path = _resolve_xgb_path() if not os.path.isfile(path): print(f"[WARN] No XGBoost checkpoint at {path}; skip importance.") return None model = XGBClassifier() model.load_model(path) imp = model.get_booster().get_score(importance_type="gain") # Booster uses f0, f1, ...; we use same order as FEATURES (training order) by_idx = {int(k.replace("f", "")): v for k, v in imp.items() if k.startswith("f")} order = [by_idx.get(i, 0.0) for i in range(len(FEATURES))] return dict(zip(FEATURES, order)) def run_ablation_lopo(): """Leave-one-feature-out: for each feature, train XGBoost on the other 9 with LOPO, report mean F1.""" by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) n_folds = len(persons) results = {} for drop_feat in FEATURES: idx_keep = [i for i, f in enumerate(FEATURES) if f != drop_feat] f1s = [] for held_out in persons: train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) X_test, y_test = by_person[held_out] X_tr = train_X[:, idx_keep] X_te = X_test[:, idx_keep] scaler = StandardScaler().fit(X_tr) X_tr_sc = scaler.transform(X_tr) X_te_sc = scaler.transform(X_te) xgb = XGBClassifier( n_estimators=600, max_depth=8, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, eval_metric="logloss", random_state=SEED, verbosity=0, ) xgb.fit(X_tr_sc, train_y) pred = xgb.predict(X_te_sc) f1s.append(f1_score(y_test, pred, average="weighted")) results[drop_feat] = np.mean(f1s) return results def run_baseline_lopo_f1(): """Full 10-feature LOPO mean F1 for reference.""" by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) f1s = [] for held_out in persons: train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) X_test, y_test = by_person[held_out] scaler = StandardScaler().fit(train_X) X_tr_sc = scaler.transform(train_X) X_te_sc = scaler.transform(X_test) xgb = XGBClassifier( n_estimators=600, max_depth=8, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, eval_metric="logloss", random_state=SEED, verbosity=0, ) xgb.fit(X_tr_sc, train_y) pred = xgb.predict(X_te_sc) f1s.append(f1_score(y_test, pred, average="weighted")) return np.mean(f1s) # Channel subsets for ablation (subset name -> list of feature names) CHANNEL_SUBSETS = { "head_pose": ["head_deviation", "s_face", "pitch"], "eye_state": ["ear_left", "ear_avg", "ear_right", "perclos"], "gaze": ["h_gaze", "gaze_offset", "s_eye"], } def run_channel_ablation(): """LOPO XGBoost with head-only, eye-only, gaze-only, and all 10. Returns dict subset_name -> mean F1.""" by_person, _, _ = load_per_person("face_orientation") persons = sorted(by_person.keys()) results = {} for subset_name, feat_list in CHANNEL_SUBSETS.items(): idx_keep = [FEATURES.index(f) for f in feat_list] f1s = [] for held_out in persons: train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) X_test, y_test = by_person[held_out] X_tr = train_X[:, idx_keep] X_te = X_test[:, idx_keep] scaler = StandardScaler().fit(X_tr) X_tr_sc = scaler.transform(X_tr) X_te_sc = scaler.transform(X_te) xgb = XGBClassifier( n_estimators=600, max_depth=8, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, eval_metric="logloss", random_state=SEED, verbosity=0, ) xgb.fit(X_tr_sc, train_y) pred = xgb.predict(X_te_sc) f1s.append(f1_score(y_test, pred, average="weighted")) results[subset_name] = np.mean(f1s) baseline = run_baseline_lopo_f1() results["all_10"] = baseline return results def main(): print("=== Feature importance (XGBoost gain) ===") imp = xgb_feature_importance() if imp: for name in FEATURES: print(f" {name}: {imp.get(name, 0):.2f}") order = sorted(imp.items(), key=lambda x: -x[1]) print(" Top-5 by gain:", [x[0] for x in order[:5]]) print("\n=== Leave-one-feature-out ablation (LOPO mean F1) ===") baseline = run_baseline_lopo_f1() print(f" Baseline (all 10 features) mean LOPO F1: {baseline:.4f}") ablation = run_ablation_lopo() for feat in FEATURES: delta = baseline - ablation[feat] print(f" drop {feat}: F1={ablation[feat]:.4f} (Δ={delta:+.4f})") worst_drop = min(ablation.items(), key=lambda x: x[1]) print(f" Largest F1 drop when dropping: {worst_drop[0]} (F1={worst_drop[1]:.4f})") print("\n=== Channel ablation (LOPO mean F1) ===") channel_f1 = run_channel_ablation() for name, f1 in channel_f1.items(): print(f" {name}: {f1:.4f}") out_dir = os.path.join(_PROJECT_ROOT, "evaluation") out_path = os.path.join(out_dir, "feature_selection_justification.md") lines = [ "# Feature selection justification", "", "The face_orientation model uses 10 of 17 extracted features. This document summarises empirical support.", "", "## 1. Domain rationale", "", "The 10 features were chosen to cover three channels:", "- **Head pose:** head_deviation, s_face, pitch", "- **Eye state:** ear_left, ear_right, ear_avg, perclos", "- **Gaze:** h_gaze, gaze_offset, s_eye", "", "Excluded: v_gaze (noisy), mar (rare events), yaw/roll (redundant with head_deviation/s_face), blink_rate/closure_duration/yawn_duration (temporal overlap with perclos).", "", "## 2. XGBoost feature importance (gain)", "", "From the trained XGBoost checkpoint (gain on the 10 features):", "", "| Feature | Gain |", "|---------|------|", ] if imp: for name in FEATURES: lines.append(f"| {name} | {imp.get(name, 0):.2f} |") order = sorted(imp.items(), key=lambda x: -x[1]) lines.append("") lines.append(f"**Top 5 by gain:** {', '.join(x[0] for x in order[:5])}.") else: lines.append("(Run with XGBoost checkpoint to populate.)") lines.extend([ "", "## 3. Leave-one-feature-out ablation (LOPO)", "", f"Baseline (all 10 features) mean LOPO F1: **{baseline:.4f}**.", "", "| Feature dropped | Mean LOPO F1 | Δ vs baseline |", "|------------------|--------------|---------------|", ]) for feat in FEATURES: delta = baseline - ablation[feat] lines.append(f"| {feat} | {ablation[feat]:.4f} | {delta:+.4f} |") worst_drop = min(ablation.items(), key=lambda x: x[1]) lines.append("") lines.append(f"Dropping **{worst_drop[0]}** hurts most (F1={worst_drop[1]:.4f}), consistent with it being important.") lines.append("") lines.append("## 4. Conclusion") lines.append("") lines.append("Selection is supported by (1) domain rationale (three attention channels), (2) XGBoost gain importance, and (3) leave-one-out ablation. SHAP or correlation-based pruning can be added in future work.") lines.append("") with open(out_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) print(f"\nReport written to {out_path}") if __name__ == "__main__": main()