FocusGuardBaseModel / ui /pipeline.py
Kexin-251202's picture
Deploy base model
c86c45b verified
import collections
import glob
import json
import math
import os
import sys
import numpy as np
import joblib
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from models.face_mesh import FaceMeshDetector
from models.head_pose import HeadPoseEstimator
from models.eye_scorer import EyeBehaviourScorer, compute_mar, MAR_YAWN_THRESHOLD
from models.eye_crop import extract_eye_crops
from models.eye_classifier import load_eye_classifier, GeometricOnlyClassifier
from models.collect_features import FEATURE_NAMES, TemporalTracker, extract_features
_FEAT_IDX = {name: i for i, name in enumerate(FEATURE_NAMES)}
def _clip_features(vec):
"""Clip raw features to the same ranges used during training."""
out = vec.copy()
_i = _FEAT_IDX
out[_i["yaw"]] = np.clip(out[_i["yaw"]], -45, 45)
out[_i["pitch"]] = np.clip(out[_i["pitch"]], -30, 30)
out[_i["roll"]] = np.clip(out[_i["roll"]], -30, 30)
out[_i["head_deviation"]] = math.sqrt(
float(out[_i["yaw"]]) ** 2 + float(out[_i["pitch"]]) ** 2
)
for f in ("ear_left", "ear_right", "ear_avg"):
out[_i[f]] = np.clip(out[_i[f]], 0, 0.85)
out[_i["mar"]] = np.clip(out[_i["mar"]], 0, 1.0)
out[_i["gaze_offset"]] = np.clip(out[_i["gaze_offset"]], 0, 0.50)
out[_i["perclos"]] = np.clip(out[_i["perclos"]], 0, 0.80)
out[_i["blink_rate"]] = np.clip(out[_i["blink_rate"]], 0, 30.0)
out[_i["closure_duration"]] = np.clip(out[_i["closure_duration"]], 0, 10.0)
out[_i["yawn_duration"]] = np.clip(out[_i["yawn_duration"]], 0, 10.0)
return out
class _OutputSmoother:
"""EMA smoothing on focus score with no-face grace period."""
def __init__(self, alpha: float = 0.3, grace_frames: int = 15):
self._alpha = alpha
self._grace = grace_frames
self._score = 0.5
self._no_face = 0
def update(self, raw_score: float, face_detected: bool) -> float:
if face_detected:
self._no_face = 0
self._score += self._alpha * (raw_score - self._score)
else:
self._no_face += 1
if self._no_face > self._grace:
self._score *= 0.85
return self._score
DEFAULT_HYBRID_CONFIG = {
"w_mlp": 0.7,
"w_geo": 0.3,
"threshold": 0.55,
"use_yawn_veto": True,
"geo_face_weight": 0.4,
"geo_eye_weight": 0.6,
"mar_yawn_threshold": float(MAR_YAWN_THRESHOLD),
}
class _RuntimeFeatureEngine:
"""Runtime feature engineering (magnitudes, velocities, variances) with EMA baselines."""
_MAG_FEATURES = ["pitch", "yaw", "head_deviation", "gaze_offset", "v_gaze", "h_gaze"]
_VEL_FEATURES = ["pitch", "yaw", "h_gaze", "v_gaze", "head_deviation", "gaze_offset"]
_VAR_FEATURES = ["h_gaze", "v_gaze", "pitch"]
_VAR_WINDOW = 30
_WARMUP = 15
def __init__(self, base_feature_names, norm_features=None):
self._base_names = list(base_feature_names)
self._norm_features = list(norm_features) if norm_features else []
tracked = set(self._MAG_FEATURES) | set(self._norm_features)
self._ema_mean = {f: 0.0 for f in tracked}
self._ema_var = {f: 1.0 for f in tracked}
self._n = 0
self._prev = None
self._var_bufs = {
f: collections.deque(maxlen=self._VAR_WINDOW) for f in self._VAR_FEATURES
}
self._ext_names = (
list(self._base_names)
+ [f"{f}_mag" for f in self._MAG_FEATURES]
+ [f"{f}_vel" for f in self._VEL_FEATURES]
+ [f"{f}_var" for f in self._VAR_FEATURES]
)
@property
def extended_names(self):
return list(self._ext_names)
def transform(self, base_vec):
self._n += 1
raw = {name: float(base_vec[i]) for i, name in enumerate(self._base_names)}
alpha = 2.0 / (min(self._n, 120) + 1)
for feat in self._ema_mean:
if feat not in raw:
continue
v = raw[feat]
if self._n == 1:
self._ema_mean[feat] = v
self._ema_var[feat] = 0.0
else:
self._ema_mean[feat] += alpha * (v - self._ema_mean[feat])
self._ema_var[feat] += alpha * (
(v - self._ema_mean[feat]) ** 2 - self._ema_var[feat]
)
out = base_vec.copy().astype(np.float32)
if self._n > self._WARMUP:
for feat in self._norm_features:
if feat in raw:
idx = self._base_names.index(feat)
std = max(math.sqrt(self._ema_var[feat]), 1e-6)
out[idx] = (raw[feat] - self._ema_mean[feat]) / std
mag = np.zeros(len(self._MAG_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._MAG_FEATURES):
if feat in raw:
mag[i] = abs(raw[feat] - self._ema_mean.get(feat, raw[feat]))
vel = np.zeros(len(self._VEL_FEATURES), dtype=np.float32)
if self._prev is not None:
for i, feat in enumerate(self._VEL_FEATURES):
if feat in raw and feat in self._prev:
vel[i] = abs(raw[feat] - self._prev[feat])
self._prev = dict(raw)
for feat in self._VAR_FEATURES:
if feat in raw:
self._var_bufs[feat].append(raw[feat])
var = np.zeros(len(self._VAR_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._VAR_FEATURES):
buf = self._var_bufs[feat]
if len(buf) >= 2:
arr = np.array(buf)
var[i] = float(arr.var())
return np.concatenate([out, mag, vel, var])
class FaceMeshPipeline:
def __init__(
self,
max_angle: float = 22.0,
alpha: float = 0.4,
beta: float = 0.6,
threshold: float = 0.55,
eye_model_path: str | None = None,
eye_backend: str = "yolo",
eye_blend: float = 0.5,
detector=None,
):
self.detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self.head_pose = HeadPoseEstimator(max_angle=max_angle)
self.eye_scorer = EyeBehaviourScorer()
self.alpha = alpha
self.beta = beta
self.threshold = threshold
self.eye_blend = eye_blend
self.eye_classifier = load_eye_classifier(
path=eye_model_path if eye_model_path and os.path.exists(eye_model_path) else None,
backend=eye_backend,
device="cpu",
)
self._has_eye_model = not isinstance(self.eye_classifier, GeometricOnlyClassifier)
if self._has_eye_model:
print(f"[PIPELINE] Eye model: {self.eye_classifier.name}")
self._smoother = _OutputSmoother()
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self.detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"is_focused": False,
"yaw": None,
"pitch": None,
"roll": None,
"mar": None,
"is_yawning": False,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self.threshold
return out
angles = self.head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self.head_pose.score(landmarks, w, h)
s_eye_geo = self.eye_scorer.score(landmarks)
if self._has_eye_model:
left_crop, right_crop, left_bbox, right_bbox = extract_eye_crops(bgr_frame, landmarks)
out["left_bbox"] = left_bbox
out["right_bbox"] = right_bbox
s_eye_model = self.eye_classifier.predict_score([left_crop, right_crop])
out["s_eye"] = (1.0 - self.eye_blend) * s_eye_geo + self.eye_blend * s_eye_model
else:
out["s_eye"] = s_eye_geo
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD
raw = self.alpha * out["s_face"] + self.beta * out["s_eye"]
if out["is_yawning"]:
raw = 0.0
out["raw_score"] = self._smoother.update(raw, True)
out["is_focused"] = out["raw_score"] >= self.threshold
return out
@property
def has_eye_model(self) -> bool:
return self._has_eye_model
def close(self):
if self._owns_detector:
self.detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _latest_model_artifacts(model_dir):
model_files = sorted(glob.glob(os.path.join(model_dir, "model_*.joblib")))
if not model_files:
model_files = sorted(glob.glob(os.path.join(model_dir, "mlp_*.joblib")))
if not model_files:
return None, None, None
basename = os.path.basename(model_files[-1])
for prefix in ("model_", "mlp_"):
if basename.startswith(prefix):
tag = basename[len(prefix) :].replace(".joblib", "")
break
scaler_path = os.path.join(model_dir, f"scaler_{tag}.joblib")
meta_path = os.path.join(model_dir, f"meta_{tag}.npz")
if not os.path.isfile(scaler_path) or not os.path.isfile(meta_path):
return None, None, None
return model_files[-1], scaler_path, meta_path
def _load_hybrid_config(model_dir: str, config_path: str | None = None):
cfg = dict(DEFAULT_HYBRID_CONFIG)
resolved = config_path or os.path.join(model_dir, "hybrid_focus_config.json")
if not os.path.isfile(resolved):
print(f"[HYBRID] No config found at {resolved}; using defaults")
return cfg, None
with open(resolved, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
for key in DEFAULT_HYBRID_CONFIG:
if key in file_cfg:
cfg[key] = file_cfg[key]
cfg["w_mlp"] = float(cfg["w_mlp"])
cfg["w_geo"] = float(cfg["w_geo"])
weight_sum = cfg["w_mlp"] + cfg["w_geo"]
if weight_sum <= 0:
raise ValueError("[HYBRID] Invalid config: w_mlp + w_geo must be > 0")
cfg["w_mlp"] /= weight_sum
cfg["w_geo"] /= weight_sum
cfg["threshold"] = float(cfg["threshold"])
cfg["use_yawn_veto"] = bool(cfg["use_yawn_veto"])
cfg["geo_face_weight"] = float(cfg["geo_face_weight"])
cfg["geo_eye_weight"] = float(cfg["geo_eye_weight"])
cfg["mar_yawn_threshold"] = float(cfg["mar_yawn_threshold"])
print(f"[HYBRID] Loaded config: {resolved}")
return cfg, resolved
class MLPPipeline:
def __init__(self, model_dir=None, detector=None):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
mlp_path, scaler_path, meta_path = _latest_model_artifacts(model_dir)
if mlp_path is None:
raise FileNotFoundError(f"No MLP artifacts in {model_dir}")
self._mlp = joblib.load(mlp_path)
self._scaler = joblib.load(scaler_path)
meta = np.load(meta_path, allow_pickle=True)
self._feature_names = list(meta["feature_names"])
norm_feats = list(meta["norm_features"]) if "norm_features" in meta else []
self._engine = _RuntimeFeatureEngine(FEATURE_NAMES, norm_features=norm_feats)
ext_names = self._engine.extended_names
self._indices = [ext_names.index(n) for n in self._feature_names]
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother()
self._threshold = 0.5
print(f"[MLP] Loaded {mlp_path} | {len(self._feature_names)} features")
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"mlp_prob": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
ext_vec = self._engine.transform(vec)
X = ext_vec[self._indices].reshape(1, -1).astype(np.float64)
X_sc = self._scaler.transform(X)
if hasattr(self._mlp, "predict_proba"):
mlp_prob = float(self._mlp.predict_proba(X_sc)[0, 1])
else:
mlp_prob = float(self._mlp.predict(X_sc)[0] == 1)
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
out["raw_score"] = self._smoother.update(out["mlp_prob"], True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
class HybridFocusPipeline:
def __init__(
self,
model_dir=None,
config_path: str | None = None,
eye_model_path: str | None = None,
eye_backend: str = "yolo",
eye_blend: float = 0.5,
max_angle: float = 22.0,
detector=None,
):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
mlp_path, scaler_path, meta_path = _latest_model_artifacts(model_dir)
if mlp_path is None:
raise FileNotFoundError(f"No MLP artifacts in {model_dir}")
self._mlp = joblib.load(mlp_path)
self._scaler = joblib.load(scaler_path)
meta = np.load(meta_path, allow_pickle=True)
self._feature_names = list(meta["feature_names"])
norm_feats = list(meta["norm_features"]) if "norm_features" in meta else []
self._engine = _RuntimeFeatureEngine(FEATURE_NAMES, norm_features=norm_feats)
ext_names = self._engine.extended_names
self._indices = [ext_names.index(n) for n in self._feature_names]
self._cfg, self._cfg_path = _load_hybrid_config(model_dir=model_dir, config_path=config_path)
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator(max_angle=max_angle)
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._eye_blend = eye_blend
self.eye_classifier = load_eye_classifier(
path=eye_model_path if eye_model_path and os.path.exists(eye_model_path) else None,
backend=eye_backend,
device="cpu",
)
self._has_eye_model = not isinstance(self.eye_classifier, GeometricOnlyClassifier)
if self._has_eye_model:
print(f"[HYBRID] Eye model: {self.eye_classifier.name}")
self.head_pose = self._head_pose
self._smoother = _OutputSmoother()
print(
f"[HYBRID] Loaded {mlp_path} | {len(self._feature_names)} features | "
f"w_mlp={self._cfg['w_mlp']:.2f}, w_geo={self._cfg['w_geo']:.2f}, "
f"threshold={self._cfg['threshold']:.2f}"
)
@property
def has_eye_model(self) -> bool:
return self._has_eye_model
@property
def config(self) -> dict:
return dict(self._cfg)
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"focus_score": 0.0,
"mlp_prob": 0.0,
"geo_score": 0.0,
"raw_score": 0.0,
"s_face": 0.0,
"s_eye": 0.0,
"mar": None,
"is_yawning": False,
"yaw": None,
"pitch": None,
"roll": None,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["focus_score"] = smoothed
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._cfg["threshold"]
return out
angles = self._head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self._head_pose.score(landmarks, w, h)
s_eye_geo = self._eye_scorer.score(landmarks)
if self._has_eye_model:
left_crop, right_crop, left_bbox, right_bbox = extract_eye_crops(bgr_frame, landmarks)
out["left_bbox"] = left_bbox
out["right_bbox"] = right_bbox
s_eye_model = self.eye_classifier.predict_score([left_crop, right_crop])
out["s_eye"] = (1.0 - self._eye_blend) * s_eye_geo + self._eye_blend * s_eye_model
else:
out["s_eye"] = s_eye_geo
geo_score = (
self._cfg["geo_face_weight"] * out["s_face"] +
self._cfg["geo_eye_weight"] * out["s_eye"]
)
geo_score = float(np.clip(geo_score, 0.0, 1.0))
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > self._cfg["mar_yawn_threshold"]
if self._cfg["use_yawn_veto"] and out["is_yawning"]:
geo_score = 0.0
out["geo_score"] = geo_score
pre = {
"angles": angles,
"s_face": out["s_face"],
"s_eye": s_eye_geo,
"mar": out["mar"],
}
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal, _pre=pre)
vec = _clip_features(vec)
ext_vec = self._engine.transform(vec)
X = ext_vec[self._indices].reshape(1, -1).astype(np.float64)
X_sc = self._scaler.transform(X)
if hasattr(self._mlp, "predict_proba"):
mlp_prob = float(self._mlp.predict_proba(X_sc)[0, 1])
else:
mlp_prob = float(self._mlp.predict(X_sc)[0] == 1)
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
focus_score = self._cfg["w_mlp"] * out["mlp_prob"] + self._cfg["w_geo"] * out["geo_score"]
out["focus_score"] = self._smoother.update(float(np.clip(focus_score, 0.0, 1.0)), True)
out["raw_score"] = out["focus_score"]
out["is_focused"] = out["focus_score"] >= self._cfg["threshold"]
return out
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# ---------------------------------------------------------------------------
# GRU Pipeline
# ---------------------------------------------------------------------------
def _load_gru_artifacts(model_dir=None):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
pt_path = os.path.join(model_dir, "gru_best.pt")
scaler_path = os.path.join(model_dir, "gru_scaler_best.npz")
meta_path = os.path.join(model_dir, "gru_meta_best.npz")
if not all(os.path.isfile(p) for p in [pt_path, scaler_path, meta_path]):
return None, None, None
return pt_path, scaler_path, meta_path
class _AttentionGRU:
def __init__(self, pt_path, input_size, hidden_size=64, num_layers=2, dropout=0.3):
import torch
import torch.nn as nn
class _GRUNet(nn.Module):
def __init__(self, in_sz, h_sz, n_layers, drop):
super().__init__()
self.gru = nn.GRU(
input_size=in_sz, hidden_size=h_sz,
num_layers=n_layers, batch_first=True,
dropout=drop if n_layers > 1 else 0.0,
)
self.classifier = nn.Sequential(
nn.Dropout(drop),
nn.Linear(h_sz, 32),
nn.ReLU(),
nn.Dropout(drop * 0.5),
nn.Linear(32, 1),
)
def forward(self, x):
gru_out, _ = self.gru(x)
return self.classifier(gru_out[:, -1, :])
self._device = torch.device("cpu")
self._model = _GRUNet(input_size, hidden_size, num_layers, dropout)
checkpoint = torch.load(pt_path, map_location=self._device, weights_only=False)
if isinstance(checkpoint, dict) and "model_state_dict" in checkpoint:
self._model.load_state_dict(checkpoint["model_state_dict"])
else:
self._model.load_state_dict(checkpoint)
self._model.eval()
def predict_proba(self, x_np):
"""x_np: (1, window, features) numpy array -> float probability of focused."""
import torch
with torch.no_grad():
t = torch.tensor(x_np, dtype=torch.float32, device=self._device)
logit = self._model(t)
prob = torch.sigmoid(logit).item()
return prob
class GRUPipeline:
def __init__(self, model_dir=None, detector=None):
pt_path, scaler_path, meta_path = _load_gru_artifacts(model_dir)
if pt_path is None:
d = model_dir or os.path.join(_PROJECT_ROOT, "checkpoints")
raise FileNotFoundError(f"No GRU artifacts in {d}")
meta = np.load(meta_path, allow_pickle=True)
self._feature_names = list(meta["feature_names"])
self._window_size = int(meta["window_size"])
hidden_size = int(meta["hidden_size"])
num_layers = int(meta["num_layers"])
dropout = float(meta["dropout"])
self._threshold = float(meta["default_threshold"])
sc = np.load(scaler_path)
self._sc_mean = sc["mean"]
self._sc_scale = sc["scale"]
self._gru = _AttentionGRU(
pt_path, input_size=len(self._feature_names),
hidden_size=hidden_size, num_layers=num_layers, dropout=dropout,
)
self._feat_indices = [FEATURE_NAMES.index(n) for n in self._feature_names]
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother(alpha=0.6, grace_frames=10)
self._buffer = collections.deque(maxlen=self._window_size)
print(
f"[GRU] Loaded {pt_path} | {len(self._feature_names)} features | "
f"window={self._window_size} | threshold={self._threshold:.3f}"
)
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"raw_score": 0.0,
"gru_prob": 0.0,
"s_face": 0.0,
"s_eye": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
selected = vec[self._feat_indices].astype(np.float64)
scaled = (selected - self._sc_mean) / np.maximum(self._sc_scale, 1e-8)
scaled_f32 = scaled.astype(np.float32)
# Pad buffer on first frame so GRU can predict immediately
if len(self._buffer) == 0:
for _ in range(self._window_size):
self._buffer.append(scaled_f32)
else:
self._buffer.append(scaled_f32)
window = np.array(self._buffer)[np.newaxis, :, :] # (1, W, F)
gru_prob = self._gru.predict_proba(window)
out["gru_prob"] = float(np.clip(gru_prob, 0.0, 1.0))
out["raw_score"] = self._smoother.update(out["gru_prob"], True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()