Spaces:
Sleeping
Sleeping
| # LOPO threshold/weight analysis. Run: python -m evaluation.justify_thresholds | |
| # ClearML logging: set USE_CLEARML=1 env var or pass --clearml flag | |
| import glob | |
| import os | |
| import sys | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import roc_curve, roc_auc_score, f1_score | |
| from xgboost import XGBClassifier | |
| _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
| sys.path.insert(0, _PROJECT_ROOT) | |
| from data_preparation.prepare_dataset import load_per_person, SELECTED_FEATURES | |
| PLOTS_DIR = os.path.join(os.path.dirname(__file__), "plots") | |
| REPORT_PATH = os.path.join(os.path.dirname(__file__), "THRESHOLD_JUSTIFICATION.md") | |
| SEED = 42 | |
| # ClearML | |
| # start logging with: USE_CLEARML=1 python -m evaluation.justify_thresholds or: python -m evaluation.justify_thresholds --clearml | |
| _USE_CLEARML = os.environ.get("USE_CLEARML", "0") == "1" or "--clearml" in sys.argv | |
| _task = None | |
| _logger = None | |
| if _USE_CLEARML: | |
| try: | |
| from clearml import Task | |
| _task = Task.init( | |
| project_name="Focus Guard", | |
| task_name="Threshold Justification", | |
| tags=["evaluation", "thresholds"], | |
| ) | |
| _task.connect({"SEED": SEED, "n_participants": 9}) | |
| _logger = _task.get_logger() | |
| print("ClearML enabled — logging to project 'Focus Guard'") | |
| except ImportError: | |
| print("WARNING: ClearML not installed. Continuing without logging.") | |
| _USE_CLEARML = False | |
| def _youdens_j(y_true, y_prob): | |
| fpr, tpr, thresholds = roc_curve(y_true, y_prob) | |
| j = tpr - fpr | |
| idx = j.argmax() | |
| auc = roc_auc_score(y_true, y_prob) | |
| return float(thresholds[idx]), fpr, tpr, thresholds, float(auc) | |
| def _f1_at_threshold(y_true, y_prob, threshold): | |
| return f1_score(y_true, (y_prob >= threshold).astype(int), zero_division=0) | |
| def _plot_roc(fpr, tpr, auc, opt_thresh, opt_idx, title, path, clearml_title=None): | |
| fig, ax = plt.subplots(figsize=(6, 5)) | |
| ax.plot(fpr, tpr, lw=2, label=f"ROC (AUC = {auc:.4f})") | |
| ax.plot(fpr[opt_idx], tpr[opt_idx], "ro", markersize=10, | |
| label=f"Youden's J optimum (t = {opt_thresh:.3f})") | |
| ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5) | |
| ax.set_xlabel("False Positive Rate") | |
| ax.set_ylabel("True Positive Rate") | |
| ax.set_title(title) | |
| ax.legend(loc="lower right") | |
| fig.tight_layout() | |
| # Log to ClearML before closing the figure | |
| if _logger and clearml_title: | |
| _logger.report_matplotlib_figure( | |
| title=clearml_title, series="ROC", figure=fig, iteration=0 | |
| ) | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| def run_lopo_models(): | |
| print("\n=== LOPO: MLP and XGBoost ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| results = {"mlp": {"y": [], "p": []}, "xgb": {"y": [], "p": []}} | |
| for i, held_out in enumerate(persons): | |
| X_test, y_test = by_person[held_out] | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| scaler = StandardScaler().fit(train_X) | |
| X_tr_sc = scaler.transform(train_X) | |
| X_te_sc = scaler.transform(X_test) | |
| mlp = MLPClassifier( | |
| hidden_layer_sizes=(64, 32), activation="relu", | |
| max_iter=200, early_stopping=True, validation_fraction=0.15, | |
| random_state=SEED, verbose=False, | |
| ) | |
| mlp.fit(X_tr_sc, train_y) | |
| mlp_prob = mlp.predict_proba(X_te_sc)[:, 1] | |
| results["mlp"]["y"].append(y_test) | |
| results["mlp"]["p"].append(mlp_prob) | |
| xgb = XGBClassifier( | |
| n_estimators=600, max_depth=8, learning_rate=0.05, | |
| subsample=0.8, colsample_bytree=0.8, | |
| reg_alpha=0.1, reg_lambda=1.0, | |
| use_label_encoder=False, eval_metric="logloss", | |
| random_state=SEED, verbosity=0, | |
| ) | |
| xgb.fit(X_tr_sc, train_y) | |
| xgb_prob = xgb.predict_proba(X_te_sc)[:, 1] | |
| results["xgb"]["y"].append(y_test) | |
| results["xgb"]["p"].append(xgb_prob) | |
| print(f" fold {i+1}/{len(persons)}: held out {held_out} " | |
| f"({X_test.shape[0]} samples)") | |
| for key in results: | |
| results[key]["y"] = np.concatenate(results[key]["y"]) | |
| results[key]["p"] = np.concatenate(results[key]["p"]) | |
| return results | |
| def analyse_model_thresholds(results): | |
| print("\n=== Model threshold analysis ===") | |
| model_stats = {} | |
| for name, label in [("mlp", "MLP"), ("xgb", "XGBoost")]: | |
| y, p = results[name]["y"], results[name]["p"] | |
| opt_t, fpr, tpr, thresholds, auc = _youdens_j(y, p) | |
| j = tpr - fpr | |
| opt_idx = j.argmax() | |
| f1_opt = _f1_at_threshold(y, p, opt_t) | |
| f1_50 = _f1_at_threshold(y, p, 0.50) | |
| path = os.path.join(PLOTS_DIR, f"roc_{name}.png") | |
| _plot_roc(fpr, tpr, auc, opt_t, opt_idx, | |
| f"LOPO ROC — {label} (9 folds, 144k samples)", path, | |
| clearml_title=f"ROC_{label}") | |
| model_stats[name] = { | |
| "label": label, "auc": auc, | |
| "opt_threshold": opt_t, "f1_opt": f1_opt, "f1_50": f1_50, | |
| } | |
| print(f" {label}: AUC={auc:.4f}, optimal threshold={opt_t:.3f} " | |
| f"(F1={f1_opt:.4f}), F1@0.50={f1_50:.4f}") | |
| # Log scalars to ClearML | |
| if _logger: | |
| _logger.report_single_value(f"{label} Optimal Threshold", opt_t) | |
| _logger.report_single_value(f"{label} AUC", auc) | |
| _logger.report_single_value(f"{label} F1 @ Optimal", f1_opt) | |
| _logger.report_single_value(f"{label} F1 @ 0.5", f1_50) | |
| return model_stats | |
| def run_geo_weight_search(): | |
| print("\n=== Geometric weight grid search ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| features = SELECTED_FEATURES["face_orientation"] | |
| sf_idx = features.index("s_face") | |
| se_idx = features.index("s_eye") | |
| alphas = np.arange(0.2, 0.85, 0.1).round(1) | |
| alpha_f1 = {a: [] for a in alphas} | |
| for held_out in persons: | |
| X_test, y_test = by_person[held_out] | |
| sf = X_test[:, sf_idx] | |
| se = X_test[:, se_idx] | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| sf_tr = train_X[:, sf_idx] | |
| se_tr = train_X[:, se_idx] | |
| for a in alphas: | |
| score_tr = a * sf_tr + (1.0 - a) * se_tr | |
| opt_t, *_ = _youdens_j(train_y, score_tr) | |
| score_te = a * sf + (1.0 - a) * se | |
| f1 = _f1_at_threshold(y_test, score_te, opt_t) | |
| alpha_f1[a].append(f1) | |
| mean_f1 = {a: np.mean(f1s) for a, f1s in alpha_f1.items()} | |
| best_alpha = max(mean_f1, key=mean_f1.get) | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.bar([f"{a:.1f}" for a in alphas], | |
| [mean_f1[a] for a in alphas], color="steelblue") | |
| ax.set_xlabel("Face weight (alpha); eye weight = 1 - alpha") | |
| ax.set_ylabel("Mean LOPO F1") | |
| ax.set_title("Geometric Pipeline: Face vs Eye Weight Search") | |
| ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) | |
| for i, a in enumerate(alphas): | |
| ax.text(i, mean_f1[a] + 0.003, f"{mean_f1[a]:.3f}", | |
| ha="center", va="bottom", fontsize=8) | |
| fig.tight_layout() | |
| # Log to ClearML before closing | |
| if _logger: | |
| _logger.report_matplotlib_figure( | |
| title="Geo Weight Search", series="F1 vs Alpha", figure=fig, iteration=0 | |
| ) | |
| path = os.path.join(PLOTS_DIR, "geo_weight_search.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| print(f" Best alpha (face weight) = {best_alpha:.1f}, " | |
| f"mean LOPO F1 = {mean_f1[best_alpha]:.4f}") | |
| # Log scalars to ClearML | |
| if _logger: | |
| _logger.report_single_value("Geo Best Alpha", best_alpha) | |
| for i, a in enumerate(sorted(alphas)): | |
| _logger.report_scalar( | |
| "Geo Weight Search", "Mean LOPO F1", | |
| iteration=i, value=mean_f1[a] | |
| ) | |
| return dict(mean_f1), best_alpha | |
| def run_hybrid_weight_search(lopo_results): | |
| print("\n=== Hybrid weight grid search ===") | |
| by_person, _, _ = load_per_person("face_orientation") | |
| persons = sorted(by_person.keys()) | |
| features = SELECTED_FEATURES["face_orientation"] | |
| sf_idx = features.index("s_face") | |
| se_idx = features.index("s_eye") | |
| GEO_FACE_W = 0.7 | |
| GEO_EYE_W = 0.3 | |
| w_mlps = np.arange(0.3, 0.85, 0.1).round(1) | |
| wmf1 = {w: [] for w in w_mlps} | |
| mlp_p = lopo_results["mlp"]["p"] | |
| offset = 0 | |
| for held_out in persons: | |
| X_test, y_test = by_person[held_out] | |
| n = X_test.shape[0] | |
| mlp_prob_fold = mlp_p[offset:offset + n] | |
| offset += n | |
| sf = X_test[:, sf_idx] | |
| se = X_test[:, se_idx] | |
| geo_score = np.clip(GEO_FACE_W * sf + GEO_EYE_W * se, 0, 1) | |
| train_X = np.concatenate([by_person[p][0] for p in persons if p != held_out]) | |
| train_y = np.concatenate([by_person[p][1] for p in persons if p != held_out]) | |
| sf_tr = train_X[:, sf_idx] | |
| se_tr = train_X[:, se_idx] | |
| geo_tr = np.clip(GEO_FACE_W * sf_tr + GEO_EYE_W * se_tr, 0, 1) | |
| scaler = StandardScaler().fit(train_X) | |
| mlp_tr = MLPClassifier( | |
| hidden_layer_sizes=(64, 32), activation="relu", | |
| max_iter=200, early_stopping=True, validation_fraction=0.15, | |
| random_state=SEED, verbose=False, | |
| ) | |
| mlp_tr.fit(scaler.transform(train_X), train_y) | |
| mlp_prob_tr = mlp_tr.predict_proba(scaler.transform(train_X))[:, 1] | |
| for w in w_mlps: | |
| combo_tr = w * mlp_prob_tr + (1.0 - w) * geo_tr | |
| opt_t, *_ = _youdens_j(train_y, combo_tr) | |
| combo_te = w * mlp_prob_fold + (1.0 - w) * geo_score | |
| f1 = _f1_at_threshold(y_test, combo_te, opt_t) | |
| wmf1[w].append(f1) | |
| mean_f1 = {w: np.mean(f1s) for w, f1s in wmf1.items()} | |
| best_w = max(mean_f1, key=mean_f1.get) | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.bar([f"{w:.1f}" for w in w_mlps], | |
| [mean_f1[w] for w in w_mlps], color="darkorange") | |
| ax.set_xlabel("MLP weight (w_mlp); geo weight = 1 - w_mlp") | |
| ax.set_ylabel("Mean LOPO F1") | |
| ax.set_title("Hybrid Pipeline: MLP vs Geometric Weight Search") | |
| ax.set_ylim(bottom=max(0, min(mean_f1.values()) - 0.05)) | |
| for i, w in enumerate(w_mlps): | |
| ax.text(i, mean_f1[w] + 0.003, f"{mean_f1[w]:.3f}", | |
| ha="center", va="bottom", fontsize=8) | |
| fig.tight_layout() | |
| # Log to ClearML before closing | |
| if _logger: | |
| _logger.report_matplotlib_figure( | |
| title="Hybrid Weight Search", series="F1 vs w_mlp", figure=fig, iteration=0 | |
| ) | |
| path = os.path.join(PLOTS_DIR, "hybrid_weight_search.png") | |
| fig.savefig(path, dpi=150) | |
| plt.close(fig) | |
| print(f" saved {path}") | |
| print(f" Best w_mlp = {best_w:.1f}, mean LOPO F1 = {mean_f1[best_w]:.4f}") | |
| # Log scalars to ClearML | |
| if _logger: | |
| _logger.report_single_value("Hybrid Best w_mlp", best_w) | |
| for i, w in enumerate(sorted(w_mlps)): | |
| _logger.report_scalar( | |
| "Hybrid Weight Search", "Mean LOPO F1", | |
| iteration=i, value=mean_f1[w] | |
| ) | |
| return dict(mean_f1), best_w | |
| def plot_distributions(): | |
| print("\n=== EAR / MAR distributions ===") | |
| npz_files = sorted(glob.glob(os.path.join(_PROJECT_ROOT, "data", "collected_*", "*.npz"))) | |
| all_ear_l, all_ear_r, all_mar, all_labels = [], [], [], [] | |
| for f in npz_files: | |
| d = np.load(f, allow_pickle=True) | |
| names = list(d["feature_names"]) | |
| feat = d["features"].astype(np.float32) | |
| lab = d["labels"].astype(np.int64) | |
| all_ear_l.append(feat[:, names.index("ear_left")]) | |
| all_ear_r.append(feat[:, names.index("ear_right")]) | |
| all_mar.append(feat[:, names.index("mar")]) | |
| all_labels.append(lab) | |
| ear_l = np.concatenate(all_ear_l) | |
| ear_r = np.concatenate(all_ear_r) | |
| mar = np.concatenate(all_mar) | |
| labels = np.concatenate(all_labels) | |
| ear_min = np.minimum(ear_l, ear_r) | |
| ear_plot = np.clip(ear_min, 0, 0.85) | |
| mar_plot = np.clip(mar, 0, 1.5) | |
| # EAR distribution plot | |
| fig_ear, ax = plt.subplots(figsize=(7, 4)) | |
| ax.hist(ear_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True) | |
| ax.hist(ear_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True) | |
| for val, lbl, c in [ | |
| (0.16, "ear_closed = 0.16", "red"), | |
| (0.21, "EAR_BLINK = 0.21", "orange"), | |
| (0.30, "ear_open = 0.30", "green"), | |
| ]: | |
| ax.axvline(val, color=c, ls="--", lw=1.5, label=lbl) | |
| ax.set_xlabel("min(left_EAR, right_EAR)") | |
| ax.set_ylabel("Density") | |
| ax.set_title("EAR Distribution by Class (144k samples)") | |
| ax.legend(fontsize=8) | |
| fig_ear.tight_layout() | |
| # Log to ClearML before closing | |
| if _logger: | |
| _logger.report_matplotlib_figure( | |
| title="EAR Distribution", series="by class", figure=fig_ear, iteration=0 | |
| ) | |
| path = os.path.join(PLOTS_DIR, "ear_distribution.png") | |
| fig_ear.savefig(path, dpi=150) | |
| plt.close(fig_ear) | |
| print(f" saved {path}") | |
| # MAR distribution plot | |
| fig_mar, ax = plt.subplots(figsize=(7, 4)) | |
| ax.hist(mar_plot[labels == 1], bins=100, alpha=0.6, label="Focused (1)", density=True) | |
| ax.hist(mar_plot[labels == 0], bins=100, alpha=0.6, label="Unfocused (0)", density=True) | |
| ax.axvline(0.55, color="red", ls="--", lw=1.5, label="MAR_YAWN = 0.55") | |
| ax.set_xlabel("Mouth Aspect Ratio (MAR)") | |
| ax.set_ylabel("Density") | |
| ax.set_title("MAR Distribution by Class (144k samples)") | |
| ax.legend(fontsize=8) | |
| fig_mar.tight_layout() | |
| # Log to ClearML before closing | |
| if _logger: | |
| _logger.report_matplotlib_figure( | |
| title="MAR Distribution", series="by class", figure=fig_mar, iteration=0 | |
| ) | |
| path = os.path.join(PLOTS_DIR, "mar_distribution.png") | |
| fig_mar.savefig(path, dpi=150) | |
| plt.close(fig_mar) | |
| print(f" saved {path}") | |
| closed_pct = np.mean(ear_min < 0.16) * 100 | |
| blink_pct = np.mean(ear_min < 0.21) * 100 | |
| open_pct = np.mean(ear_min >= 0.30) * 100 | |
| yawn_pct = np.mean(mar > 0.55) * 100 | |
| stats = { | |
| "ear_below_016": closed_pct, | |
| "ear_below_021": blink_pct, | |
| "ear_above_030": open_pct, | |
| "mar_above_055": yawn_pct, | |
| "n_samples": len(ear_min), | |
| } | |
| print(f" EAR<0.16 (closed): {closed_pct:.1f}% | EAR<0.21 (blink): {blink_pct:.1f}% | " | |
| f"EAR>=0.30 (open): {open_pct:.1f}%") | |
| print(f" MAR>0.55 (yawn): {yawn_pct:.1f}%") | |
| return stats | |
| def write_report(model_stats, geo_f1, best_alpha, hybrid_f1, best_w, dist_stats): | |
| lines = [] | |
| lines.append("# Threshold Justification Report") | |
| lines.append("") | |
| lines.append("Auto-generated by `evaluation/justify_thresholds.py` using LOPO cross-validation " | |
| "over 9 participants (~145k samples).") | |
| lines.append("") | |
| lines.append("## 1. ML Model Decision Thresholds") | |
| lines.append("") | |
| lines.append("Thresholds selected via **Youden's J statistic** (J = sensitivity + specificity - 1) " | |
| "on pooled LOPO held-out predictions.") | |
| lines.append("") | |
| lines.append("| Model | LOPO AUC | Optimal Threshold (Youden's J) | F1 @ Optimal | F1 @ 0.50 |") | |
| lines.append("|-------|----------|-------------------------------|--------------|-----------|") | |
| for key in ("mlp", "xgb"): | |
| s = model_stats[key] | |
| lines.append(f"| {s['label']} | {s['auc']:.4f} | **{s['opt_threshold']:.3f}** | " | |
| f"{s['f1_opt']:.4f} | {s['f1_50']:.4f} |") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 2. Geometric Pipeline Weights (s_face vs s_eye)") | |
| lines.append("") | |
| lines.append("Grid search over face weight alpha in {0.2 ... 0.8}. " | |
| "Eye weight = 1 - alpha. Threshold per fold via Youden's J.") | |
| lines.append("") | |
| lines.append("| Face Weight (alpha) | Mean LOPO F1 |") | |
| lines.append("|--------------------:|-------------:|") | |
| for a in sorted(geo_f1.keys()): | |
| marker = " **<-- selected**" if a == best_alpha else "" | |
| lines.append(f"| {a:.1f} | {geo_f1[a]:.4f}{marker} |") | |
| lines.append("") | |
| lines.append(f"**Best:** alpha = {best_alpha:.1f} (face {best_alpha*100:.0f}%, " | |
| f"eye {(1-best_alpha)*100:.0f}%)") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 3. Hybrid Pipeline Weights (MLP vs Geometric)") | |
| lines.append("") | |
| lines.append("Grid search over w_mlp in {0.3 ... 0.8}. w_geo = 1 - w_mlp. " | |
| "Geometric sub-score uses same weights as geometric pipeline (face=0.7, eye=0.3). " | |
| "If you change geometric weights, re-run this script — optimal w_mlp can shift.") | |
| lines.append("") | |
| lines.append("| MLP Weight (w_mlp) | Mean LOPO F1 |") | |
| lines.append("|-------------------:|-------------:|") | |
| for w in sorted(hybrid_f1.keys()): | |
| marker = " **<-- selected**" if w == best_w else "" | |
| lines.append(f"| {w:.1f} | {hybrid_f1[w]:.4f}{marker} |") | |
| lines.append("") | |
| lines.append(f"**Best:** w_mlp = {best_w:.1f} (MLP {best_w*100:.0f}%, " | |
| f"geometric {(1-best_w)*100:.0f}%)") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 4. Eye and Mouth Aspect Ratio Thresholds") | |
| lines.append("") | |
| lines.append("### EAR (Eye Aspect Ratio)") | |
| lines.append("") | |
| lines.append("Reference: Soukupova & Cech, \"Real-Time Eye Blink Detection Using Facial " | |
| "Landmarks\" (2016) established EAR ~ 0.2 as a blink threshold.") | |
| lines.append("") | |
| lines.append("Our thresholds define a linear interpolation zone around this established value:") | |
| lines.append("") | |
| lines.append("| Constant | Value | Justification |") | |
| lines.append("|----------|------:|---------------|") | |
| lines.append(f"| `ear_closed` | 0.16 | Below this, eyes are fully shut. " | |
| f"{dist_stats['ear_below_016']:.1f}% of samples fall here. |") | |
| lines.append(f"| `EAR_BLINK_THRESH` | 0.21 | Blink detection point; close to the 0.2 reference. " | |
| f"{dist_stats['ear_below_021']:.1f}% of samples below. |") | |
| lines.append(f"| `ear_open` | 0.30 | Above this, eyes are fully open. " | |
| f"{dist_stats['ear_above_030']:.1f}% of samples here. |") | |
| lines.append("") | |
| lines.append("Between 0.16 and 0.30 the `_ear_score` function linearly interpolates from 0 to 1, " | |
| "providing a smooth transition rather than a hard binary cutoff.") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("### MAR (Mouth Aspect Ratio)") | |
| lines.append("") | |
| lines.append(f"| Constant | Value | Justification |") | |
| lines.append("|----------|------:|---------------|") | |
| lines.append(f"| `MAR_YAWN_THRESHOLD` | 0.55 | Only {dist_stats['mar_above_055']:.1f}% of " | |
| f"samples exceed this, confirming it captures genuine yawns without false positives. |") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("") | |
| lines.append("## 5. Other Constants") | |
| lines.append("") | |
| lines.append("| Constant | Value | Rationale |") | |
| lines.append("|----------|------:|-----------|") | |
| lines.append("| `gaze_max_offset` | 0.28 | Max iris displacement (normalised) before gaze score " | |
| "drops to zero. Corresponds to ~56% of the eye width; beyond this the iris is at " | |
| "the extreme edge. |") | |
| lines.append("| `max_angle` | 22.0 deg | Head deviation beyond which face score = 0. Based on " | |
| "typical monitor-viewing cone: at 60 cm distance and a 24\" monitor, the viewing " | |
| "angle is ~20-25 degrees. |") | |
| lines.append("| `roll_weight` | 0.5 | Roll is less indicative of inattention than yaw/pitch " | |
| "(tilting head doesn't mean looking away), so it's down-weighted by 50%. |") | |
| lines.append("| `EMA alpha` | 0.3 | Smoothing factor for focus score. " | |
| "Gives ~3-4 frame effective window; balances responsiveness vs flicker. |") | |
| lines.append("| `grace_frames` | 15 | ~0.5 s at 30 fps before penalising no-face. Allows brief " | |
| "occlusions (e.g. hand gesture) without dropping score. |") | |
| lines.append("| `PERCLOS_WINDOW` | 60 frames | 2 s at 30 fps; standard PERCLOS measurement " | |
| "window (Dinges & Grace, 1998). |") | |
| lines.append("| `BLINK_WINDOW_SEC` | 30 s | Blink rate measured over 30 s; typical spontaneous " | |
| "blink rate is 15-20/min (Bentivoglio et al., 1997). |") | |
| lines.append("") | |
| with open(REPORT_PATH, "w", encoding="utf-8") as f: | |
| f.write("\n".join(lines)) | |
| print(f"\nReport written to {REPORT_PATH}") | |
| def main(): | |
| os.makedirs(PLOTS_DIR, exist_ok=True) | |
| lopo_results = run_lopo_models() | |
| model_stats = analyse_model_thresholds(lopo_results) | |
| geo_f1, best_alpha = run_geo_weight_search() | |
| hybrid_f1, best_w = run_hybrid_weight_search(lopo_results) | |
| dist_stats = plot_distributions() | |
| write_report(model_stats, geo_f1, best_alpha, hybrid_f1, best_w, dist_stats) | |
| # Close ClearML task | |
| if _task: | |
| _task.close() | |
| print("ClearML task closed.") | |
| print("\nDone.") | |
| if __name__ == "__main__": | |
| main() | |