import collections import glob import json import math import os import sys import numpy as np import joblib _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from models.face_mesh import FaceMeshDetector from models.head_pose import HeadPoseEstimator from models.eye_scorer import EyeBehaviourScorer, compute_mar, MAR_YAWN_THRESHOLD from models.eye_crop import extract_eye_crops from models.eye_classifier import load_eye_classifier, GeometricOnlyClassifier from models.collect_features import FEATURE_NAMES, TemporalTracker, extract_features _FEAT_IDX = {name: i for i, name in enumerate(FEATURE_NAMES)} def _clip_features(vec): """Clip raw features to the same ranges used during training.""" out = vec.copy() _i = _FEAT_IDX out[_i["yaw"]] = np.clip(out[_i["yaw"]], -45, 45) out[_i["pitch"]] = np.clip(out[_i["pitch"]], -30, 30) out[_i["roll"]] = np.clip(out[_i["roll"]], -30, 30) out[_i["head_deviation"]] = math.sqrt( float(out[_i["yaw"]]) ** 2 + float(out[_i["pitch"]]) ** 2 ) for f in ("ear_left", "ear_right", "ear_avg"): out[_i[f]] = np.clip(out[_i[f]], 0, 0.85) out[_i["mar"]] = np.clip(out[_i["mar"]], 0, 1.0) out[_i["gaze_offset"]] = np.clip(out[_i["gaze_offset"]], 0, 0.50) out[_i["perclos"]] = np.clip(out[_i["perclos"]], 0, 0.80) out[_i["blink_rate"]] = np.clip(out[_i["blink_rate"]], 0, 30.0) out[_i["closure_duration"]] = np.clip(out[_i["closure_duration"]], 0, 10.0) out[_i["yawn_duration"]] = np.clip(out[_i["yawn_duration"]], 0, 10.0) return out class _OutputSmoother: """EMA smoothing on focus score with no-face grace period.""" def __init__(self, alpha: float = 0.3, grace_frames: int = 15): self._alpha = alpha self._grace = grace_frames self._score = 0.5 self._no_face = 0 def update(self, raw_score: float, face_detected: bool) -> float: if face_detected: self._no_face = 0 self._score += self._alpha * (raw_score - self._score) else: self._no_face += 1 if self._no_face > self._grace: self._score *= 0.85 return self._score DEFAULT_HYBRID_CONFIG = { "w_mlp": 0.7, "w_geo": 0.3, "threshold": 0.55, "use_yawn_veto": True, "geo_face_weight": 0.4, "geo_eye_weight": 0.6, "mar_yawn_threshold": float(MAR_YAWN_THRESHOLD), } class _RuntimeFeatureEngine: """Runtime feature engineering (magnitudes, velocities, variances) with EMA baselines.""" _MAG_FEATURES = ["pitch", "yaw", "head_deviation", "gaze_offset", "v_gaze", "h_gaze"] _VEL_FEATURES = ["pitch", "yaw", "h_gaze", "v_gaze", "head_deviation", "gaze_offset"] _VAR_FEATURES = ["h_gaze", "v_gaze", "pitch"] _VAR_WINDOW = 30 _WARMUP = 15 def __init__(self, base_feature_names, norm_features=None): self._base_names = list(base_feature_names) self._norm_features = list(norm_features) if norm_features else [] tracked = set(self._MAG_FEATURES) | set(self._norm_features) self._ema_mean = {f: 0.0 for f in tracked} self._ema_var = {f: 1.0 for f in tracked} self._n = 0 self._prev = None self._var_bufs = { f: collections.deque(maxlen=self._VAR_WINDOW) for f in self._VAR_FEATURES } self._ext_names = ( list(self._base_names) + [f"{f}_mag" for f in self._MAG_FEATURES] + [f"{f}_vel" for f in self._VEL_FEATURES] + [f"{f}_var" for f in self._VAR_FEATURES] ) @property def extended_names(self): return list(self._ext_names) def transform(self, base_vec): self._n += 1 raw = {name: float(base_vec[i]) for i, name in enumerate(self._base_names)} alpha = 2.0 / (min(self._n, 120) + 1) for feat in self._ema_mean: if feat not in raw: continue v = raw[feat] if self._n == 1: self._ema_mean[feat] = v self._ema_var[feat] = 0.0 else: self._ema_mean[feat] += alpha * (v - self._ema_mean[feat]) self._ema_var[feat] += alpha * ( (v - self._ema_mean[feat]) ** 2 - self._ema_var[feat] ) out = base_vec.copy().astype(np.float32) if self._n > self._WARMUP: for feat in self._norm_features: if feat in raw: idx = self._base_names.index(feat) std = max(math.sqrt(self._ema_var[feat]), 1e-6) out[idx] = (raw[feat] - self._ema_mean[feat]) / std mag = np.zeros(len(self._MAG_FEATURES), dtype=np.float32) for i, feat in enumerate(self._MAG_FEATURES): if feat in raw: mag[i] = abs(raw[feat] - self._ema_mean.get(feat, raw[feat])) vel = np.zeros(len(self._VEL_FEATURES), dtype=np.float32) if self._prev is not None: for i, feat in enumerate(self._VEL_FEATURES): if feat in raw and feat in self._prev: vel[i] = abs(raw[feat] - self._prev[feat]) self._prev = dict(raw) for feat in self._VAR_FEATURES: if feat in raw: self._var_bufs[feat].append(raw[feat]) var = np.zeros(len(self._VAR_FEATURES), dtype=np.float32) for i, feat in enumerate(self._VAR_FEATURES): buf = self._var_bufs[feat] if len(buf) >= 2: arr = np.array(buf) var[i] = float(arr.var()) return np.concatenate([out, mag, vel, var]) class FaceMeshPipeline: def __init__( self, max_angle: float = 22.0, alpha: float = 0.4, beta: float = 0.6, threshold: float = 0.55, eye_model_path: str | None = None, eye_backend: str = "yolo", eye_blend: float = 0.5, detector=None, ): self.detector = detector or FaceMeshDetector() self._owns_detector = detector is None self.head_pose = HeadPoseEstimator(max_angle=max_angle) self.eye_scorer = EyeBehaviourScorer() self.alpha = alpha self.beta = beta self.threshold = threshold self.eye_blend = eye_blend self.eye_classifier = load_eye_classifier( path=eye_model_path if eye_model_path and os.path.exists(eye_model_path) else None, backend=eye_backend, device="cpu", ) self._has_eye_model = not isinstance(self.eye_classifier, GeometricOnlyClassifier) if self._has_eye_model: print(f"[PIPELINE] Eye model: {self.eye_classifier.name}") self._smoother = _OutputSmoother() def process_frame(self, bgr_frame: np.ndarray) -> dict: landmarks = self.detector.process(bgr_frame) h, w = bgr_frame.shape[:2] out = { "landmarks": landmarks, "s_face": 0.0, "s_eye": 0.0, "raw_score": 0.0, "is_focused": False, "yaw": None, "pitch": None, "roll": None, "mar": None, "is_yawning": False, "left_bbox": None, "right_bbox": None, } if landmarks is None: smoothed = self._smoother.update(0.0, False) out["raw_score"] = smoothed out["is_focused"] = smoothed >= self.threshold return out angles = self.head_pose.estimate(landmarks, w, h) if angles is not None: out["yaw"], out["pitch"], out["roll"] = angles out["s_face"] = self.head_pose.score(landmarks, w, h) s_eye_geo = self.eye_scorer.score(landmarks) if self._has_eye_model: left_crop, right_crop, left_bbox, right_bbox = extract_eye_crops(bgr_frame, landmarks) out["left_bbox"] = left_bbox out["right_bbox"] = right_bbox s_eye_model = self.eye_classifier.predict_score([left_crop, right_crop]) out["s_eye"] = (1.0 - self.eye_blend) * s_eye_geo + self.eye_blend * s_eye_model else: out["s_eye"] = s_eye_geo out["mar"] = compute_mar(landmarks) out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD raw = self.alpha * out["s_face"] + self.beta * out["s_eye"] if out["is_yawning"]: raw = 0.0 out["raw_score"] = self._smoother.update(raw, True) out["is_focused"] = out["raw_score"] >= self.threshold return out @property def has_eye_model(self) -> bool: return self._has_eye_model def close(self): if self._owns_detector: self.detector.close() def __enter__(self): return self def __exit__(self, *args): self.close() def _latest_model_artifacts(model_dir): model_files = sorted(glob.glob(os.path.join(model_dir, "model_*.joblib"))) if not model_files: model_files = sorted(glob.glob(os.path.join(model_dir, "mlp_*.joblib"))) if not model_files: return None, None, None basename = os.path.basename(model_files[-1]) for prefix in ("model_", "mlp_"): if basename.startswith(prefix): tag = basename[len(prefix) :].replace(".joblib", "") break scaler_path = os.path.join(model_dir, f"scaler_{tag}.joblib") meta_path = os.path.join(model_dir, f"meta_{tag}.npz") if not os.path.isfile(scaler_path) or not os.path.isfile(meta_path): return None, None, None return model_files[-1], scaler_path, meta_path def _load_hybrid_config(model_dir: str, config_path: str | None = None): cfg = dict(DEFAULT_HYBRID_CONFIG) resolved = config_path or os.path.join(model_dir, "hybrid_focus_config.json") if not os.path.isfile(resolved): print(f"[HYBRID] No config found at {resolved}; using defaults") return cfg, None with open(resolved, "r", encoding="utf-8") as f: file_cfg = json.load(f) for key in DEFAULT_HYBRID_CONFIG: if key in file_cfg: cfg[key] = file_cfg[key] cfg["w_mlp"] = float(cfg["w_mlp"]) cfg["w_geo"] = float(cfg["w_geo"]) weight_sum = cfg["w_mlp"] + cfg["w_geo"] if weight_sum <= 0: raise ValueError("[HYBRID] Invalid config: w_mlp + w_geo must be > 0") cfg["w_mlp"] /= weight_sum cfg["w_geo"] /= weight_sum cfg["threshold"] = float(cfg["threshold"]) cfg["use_yawn_veto"] = bool(cfg["use_yawn_veto"]) cfg["geo_face_weight"] = float(cfg["geo_face_weight"]) cfg["geo_eye_weight"] = float(cfg["geo_eye_weight"]) cfg["mar_yawn_threshold"] = float(cfg["mar_yawn_threshold"]) print(f"[HYBRID] Loaded config: {resolved}") return cfg, resolved class MLPPipeline: def __init__(self, model_dir=None, detector=None): if model_dir is None: model_dir = os.path.join(_PROJECT_ROOT, "checkpoints") mlp_path, scaler_path, meta_path = _latest_model_artifacts(model_dir) if mlp_path is None: raise FileNotFoundError(f"No MLP artifacts in {model_dir}") self._mlp = joblib.load(mlp_path) self._scaler = joblib.load(scaler_path) meta = np.load(meta_path, allow_pickle=True) self._feature_names = list(meta["feature_names"]) norm_feats = list(meta["norm_features"]) if "norm_features" in meta else [] self._engine = _RuntimeFeatureEngine(FEATURE_NAMES, norm_features=norm_feats) ext_names = self._engine.extended_names self._indices = [ext_names.index(n) for n in self._feature_names] self._detector = detector or FaceMeshDetector() self._owns_detector = detector is None self._head_pose = HeadPoseEstimator() self.head_pose = self._head_pose self._eye_scorer = EyeBehaviourScorer() self._temporal = TemporalTracker() self._smoother = _OutputSmoother() self._threshold = 0.5 print(f"[MLP] Loaded {mlp_path} | {len(self._feature_names)} features") def process_frame(self, bgr_frame): landmarks = self._detector.process(bgr_frame) h, w = bgr_frame.shape[:2] out = { "landmarks": landmarks, "is_focused": False, "s_face": 0.0, "s_eye": 0.0, "raw_score": 0.0, "mlp_prob": 0.0, "mar": None, "yaw": None, "pitch": None, "roll": None, } if landmarks is None: smoothed = self._smoother.update(0.0, False) out["raw_score"] = smoothed out["is_focused"] = smoothed >= self._threshold return out vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal) vec = _clip_features(vec) out["yaw"] = float(vec[_FEAT_IDX["yaw"]]) out["pitch"] = float(vec[_FEAT_IDX["pitch"]]) out["roll"] = float(vec[_FEAT_IDX["roll"]]) out["s_face"] = float(vec[_FEAT_IDX["s_face"]]) out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]]) out["mar"] = float(vec[_FEAT_IDX["mar"]]) ext_vec = self._engine.transform(vec) X = ext_vec[self._indices].reshape(1, -1).astype(np.float64) X_sc = self._scaler.transform(X) if hasattr(self._mlp, "predict_proba"): mlp_prob = float(self._mlp.predict_proba(X_sc)[0, 1]) else: mlp_prob = float(self._mlp.predict(X_sc)[0] == 1) out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0)) out["raw_score"] = self._smoother.update(out["mlp_prob"], True) out["is_focused"] = out["raw_score"] >= self._threshold return out def close(self): if self._owns_detector: self._detector.close() def __enter__(self): return self def __exit__(self, *args): self.close() class HybridFocusPipeline: def __init__( self, model_dir=None, config_path: str | None = None, eye_model_path: str | None = None, eye_backend: str = "yolo", eye_blend: float = 0.5, max_angle: float = 22.0, detector=None, ): if model_dir is None: model_dir = os.path.join(_PROJECT_ROOT, "checkpoints") mlp_path, scaler_path, meta_path = _latest_model_artifacts(model_dir) if mlp_path is None: raise FileNotFoundError(f"No MLP artifacts in {model_dir}") self._mlp = joblib.load(mlp_path) self._scaler = joblib.load(scaler_path) meta = np.load(meta_path, allow_pickle=True) self._feature_names = list(meta["feature_names"]) norm_feats = list(meta["norm_features"]) if "norm_features" in meta else [] self._engine = _RuntimeFeatureEngine(FEATURE_NAMES, norm_features=norm_feats) ext_names = self._engine.extended_names self._indices = [ext_names.index(n) for n in self._feature_names] self._cfg, self._cfg_path = _load_hybrid_config(model_dir=model_dir, config_path=config_path) self._detector = detector or FaceMeshDetector() self._owns_detector = detector is None self._head_pose = HeadPoseEstimator(max_angle=max_angle) self._eye_scorer = EyeBehaviourScorer() self._temporal = TemporalTracker() self._eye_blend = eye_blend self.eye_classifier = load_eye_classifier( path=eye_model_path if eye_model_path and os.path.exists(eye_model_path) else None, backend=eye_backend, device="cpu", ) self._has_eye_model = not isinstance(self.eye_classifier, GeometricOnlyClassifier) if self._has_eye_model: print(f"[HYBRID] Eye model: {self.eye_classifier.name}") self.head_pose = self._head_pose self._smoother = _OutputSmoother() print( f"[HYBRID] Loaded {mlp_path} | {len(self._feature_names)} features | " f"w_mlp={self._cfg['w_mlp']:.2f}, w_geo={self._cfg['w_geo']:.2f}, " f"threshold={self._cfg['threshold']:.2f}" ) @property def has_eye_model(self) -> bool: return self._has_eye_model @property def config(self) -> dict: return dict(self._cfg) def process_frame(self, bgr_frame: np.ndarray) -> dict: landmarks = self._detector.process(bgr_frame) h, w = bgr_frame.shape[:2] out = { "landmarks": landmarks, "is_focused": False, "focus_score": 0.0, "mlp_prob": 0.0, "geo_score": 0.0, "raw_score": 0.0, "s_face": 0.0, "s_eye": 0.0, "mar": None, "is_yawning": False, "yaw": None, "pitch": None, "roll": None, "left_bbox": None, "right_bbox": None, } if landmarks is None: smoothed = self._smoother.update(0.0, False) out["focus_score"] = smoothed out["raw_score"] = smoothed out["is_focused"] = smoothed >= self._cfg["threshold"] return out angles = self._head_pose.estimate(landmarks, w, h) if angles is not None: out["yaw"], out["pitch"], out["roll"] = angles out["s_face"] = self._head_pose.score(landmarks, w, h) s_eye_geo = self._eye_scorer.score(landmarks) if self._has_eye_model: left_crop, right_crop, left_bbox, right_bbox = extract_eye_crops(bgr_frame, landmarks) out["left_bbox"] = left_bbox out["right_bbox"] = right_bbox s_eye_model = self.eye_classifier.predict_score([left_crop, right_crop]) out["s_eye"] = (1.0 - self._eye_blend) * s_eye_geo + self._eye_blend * s_eye_model else: out["s_eye"] = s_eye_geo geo_score = ( self._cfg["geo_face_weight"] * out["s_face"] + self._cfg["geo_eye_weight"] * out["s_eye"] ) geo_score = float(np.clip(geo_score, 0.0, 1.0)) out["mar"] = compute_mar(landmarks) out["is_yawning"] = out["mar"] > self._cfg["mar_yawn_threshold"] if self._cfg["use_yawn_veto"] and out["is_yawning"]: geo_score = 0.0 out["geo_score"] = geo_score pre = { "angles": angles, "s_face": out["s_face"], "s_eye": s_eye_geo, "mar": out["mar"], } vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal, _pre=pre) vec = _clip_features(vec) ext_vec = self._engine.transform(vec) X = ext_vec[self._indices].reshape(1, -1).astype(np.float64) X_sc = self._scaler.transform(X) if hasattr(self._mlp, "predict_proba"): mlp_prob = float(self._mlp.predict_proba(X_sc)[0, 1]) else: mlp_prob = float(self._mlp.predict(X_sc)[0] == 1) out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0)) focus_score = self._cfg["w_mlp"] * out["mlp_prob"] + self._cfg["w_geo"] * out["geo_score"] out["focus_score"] = self._smoother.update(float(np.clip(focus_score, 0.0, 1.0)), True) out["raw_score"] = out["focus_score"] out["is_focused"] = out["focus_score"] >= self._cfg["threshold"] return out def close(self): if self._owns_detector: self._detector.close() def __enter__(self): return self def __exit__(self, *args): self.close() # --------------------------------------------------------------------------- # GRU Pipeline # --------------------------------------------------------------------------- def _load_gru_artifacts(model_dir=None): if model_dir is None: model_dir = os.path.join(_PROJECT_ROOT, "checkpoints") pt_path = os.path.join(model_dir, "gru_best.pt") scaler_path = os.path.join(model_dir, "gru_scaler_best.npz") meta_path = os.path.join(model_dir, "gru_meta_best.npz") if not all(os.path.isfile(p) for p in [pt_path, scaler_path, meta_path]): return None, None, None return pt_path, scaler_path, meta_path class _AttentionGRU: def __init__(self, pt_path, input_size, hidden_size=64, num_layers=2, dropout=0.3): import torch import torch.nn as nn class _GRUNet(nn.Module): def __init__(self, in_sz, h_sz, n_layers, drop): super().__init__() self.gru = nn.GRU( input_size=in_sz, hidden_size=h_sz, num_layers=n_layers, batch_first=True, dropout=drop if n_layers > 1 else 0.0, ) self.classifier = nn.Sequential( nn.Dropout(drop), nn.Linear(h_sz, 32), nn.ReLU(), nn.Dropout(drop * 0.5), nn.Linear(32, 1), ) def forward(self, x): gru_out, _ = self.gru(x) return self.classifier(gru_out[:, -1, :]) self._device = torch.device("cpu") self._model = _GRUNet(input_size, hidden_size, num_layers, dropout) checkpoint = torch.load(pt_path, map_location=self._device, weights_only=False) if isinstance(checkpoint, dict) and "model_state_dict" in checkpoint: self._model.load_state_dict(checkpoint["model_state_dict"]) else: self._model.load_state_dict(checkpoint) self._model.eval() def predict_proba(self, x_np): """x_np: (1, window, features) numpy array -> float probability of focused.""" import torch with torch.no_grad(): t = torch.tensor(x_np, dtype=torch.float32, device=self._device) logit = self._model(t) prob = torch.sigmoid(logit).item() return prob class GRUPipeline: def __init__(self, model_dir=None, detector=None): pt_path, scaler_path, meta_path = _load_gru_artifacts(model_dir) if pt_path is None: d = model_dir or os.path.join(_PROJECT_ROOT, "checkpoints") raise FileNotFoundError(f"No GRU artifacts in {d}") meta = np.load(meta_path, allow_pickle=True) self._feature_names = list(meta["feature_names"]) self._window_size = int(meta["window_size"]) hidden_size = int(meta["hidden_size"]) num_layers = int(meta["num_layers"]) dropout = float(meta["dropout"]) self._threshold = float(meta["default_threshold"]) sc = np.load(scaler_path) self._sc_mean = sc["mean"] self._sc_scale = sc["scale"] self._gru = _AttentionGRU( pt_path, input_size=len(self._feature_names), hidden_size=hidden_size, num_layers=num_layers, dropout=dropout, ) self._feat_indices = [FEATURE_NAMES.index(n) for n in self._feature_names] self._detector = detector or FaceMeshDetector() self._owns_detector = detector is None self._head_pose = HeadPoseEstimator() self.head_pose = self._head_pose self._eye_scorer = EyeBehaviourScorer() self._temporal = TemporalTracker() self._smoother = _OutputSmoother(alpha=0.6, grace_frames=10) self._buffer = collections.deque(maxlen=self._window_size) print( f"[GRU] Loaded {pt_path} | {len(self._feature_names)} features | " f"window={self._window_size} | threshold={self._threshold:.3f}" ) def process_frame(self, bgr_frame): landmarks = self._detector.process(bgr_frame) h, w = bgr_frame.shape[:2] out = { "landmarks": landmarks, "is_focused": False, "raw_score": 0.0, "gru_prob": 0.0, "s_face": 0.0, "s_eye": 0.0, "mar": None, "yaw": None, "pitch": None, "roll": None, } if landmarks is None: smoothed = self._smoother.update(0.0, False) out["raw_score"] = smoothed out["is_focused"] = smoothed >= self._threshold return out vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal) vec = _clip_features(vec) out["yaw"] = float(vec[_FEAT_IDX["yaw"]]) out["pitch"] = float(vec[_FEAT_IDX["pitch"]]) out["roll"] = float(vec[_FEAT_IDX["roll"]]) out["s_face"] = float(vec[_FEAT_IDX["s_face"]]) out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]]) out["mar"] = float(vec[_FEAT_IDX["mar"]]) selected = vec[self._feat_indices].astype(np.float64) scaled = (selected - self._sc_mean) / np.maximum(self._sc_scale, 1e-8) scaled_f32 = scaled.astype(np.float32) # Pad buffer on first frame so GRU can predict immediately if len(self._buffer) == 0: for _ in range(self._window_size): self._buffer.append(scaled_f32) else: self._buffer.append(scaled_f32) window = np.array(self._buffer)[np.newaxis, :, :] # (1, W, F) gru_prob = self._gru.predict_proba(window) out["gru_prob"] = float(np.clip(gru_prob, 0.0, 1.0)) out["raw_score"] = self._smoother.update(out["gru_prob"], True) out["is_focused"] = out["raw_score"] >= self._threshold return out def close(self): if self._owns_detector: self._detector.close() def __enter__(self): return self def __exit__(self, *args): self.close()