Spaces:
Sleeping
Sleeping
| import argparse | |
| import os | |
| import sys | |
| import time | |
| import cv2 | |
| import numpy as np | |
| from mediapipe.tasks.python.vision import FaceLandmarksConnections | |
| _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if _PROJECT_ROOT not in sys.path: | |
| sys.path.insert(0, _PROJECT_ROOT) | |
| from ui.pipeline import ( | |
| FaceMeshPipeline, MLPPipeline, HybridFocusPipeline, GRUPipeline, | |
| _load_gru_artifacts, _latest_model_artifacts, | |
| ) | |
| from models.face_mesh import FaceMeshDetector | |
| FONT = cv2.FONT_HERSHEY_SIMPLEX | |
| CYAN = (255, 255, 0) | |
| GREEN = (0, 255, 0) | |
| MAGENTA = (255, 0, 255) | |
| ORANGE = (0, 165, 255) | |
| RED = (0, 0, 255) | |
| WHITE = (255, 255, 255) | |
| YELLOW = (0, 255, 255) | |
| LIGHT_GREEN = (144, 238, 144) | |
| _TESSELATION = [(c.start, c.end) for c in FaceLandmarksConnections.FACE_LANDMARKS_TESSELATION] | |
| _CONTOURS = [(c.start, c.end) for c in FaceLandmarksConnections.FACE_LANDMARKS_CONTOURS] | |
| _LEFT_EYEBROW = [70, 63, 105, 66, 107, 55, 65, 52, 53, 46] | |
| _RIGHT_EYEBROW = [300, 293, 334, 296, 336, 285, 295, 282, 283, 276] | |
| _NOSE_BRIDGE = [6, 197, 195, 5, 4, 1, 19, 94, 2] | |
| _LIPS_OUTER = [61, 146, 91, 181, 84, 17, 314, 405, 321, 375, 291, 409, 270, 269, 267, 0, 37, 39, 40, 185, 61] | |
| _LIPS_INNER = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 415, 310, 311, 312, 13, 82, 81, 80, 191, 78] | |
| _LEFT_EAR_POINTS = [33, 160, 158, 133, 153, 145] | |
| _RIGHT_EAR_POINTS = [362, 385, 387, 263, 373, 380] | |
| MESH_FULL = 0 | |
| MESH_CONTOURS = 1 | |
| MESH_OFF = 2 | |
| _MESH_NAMES = ["FULL MESH", "CONTOURS", "MESH OFF"] | |
| MODE_GEO = 0 | |
| MODE_MLP = 1 | |
| MODE_GRU = 2 | |
| MODE_HYBRID = 3 | |
| _MODE_NAMES = ["GEOMETRIC", "MLP", "GRU", "HYBRID"] | |
| _MODE_KEYS = {ord("1"): MODE_GEO, ord("2"): MODE_MLP, ord("3"): MODE_GRU, ord("4"): MODE_HYBRID} | |
| def _lm_to_px(landmarks, idx, w, h): | |
| return (int(landmarks[idx, 0] * w), int(landmarks[idx, 1] * h)) | |
| def draw_tessellation(frame, landmarks, w, h): | |
| overlay = frame.copy() | |
| for conn in _TESSELATION: | |
| pt1 = _lm_to_px(landmarks, conn[0], w, h) | |
| pt2 = _lm_to_px(landmarks, conn[1], w, h) | |
| cv2.line(overlay, pt1, pt2, (200, 200, 200), 1, cv2.LINE_AA) | |
| cv2.addWeighted(overlay, 0.3, frame, 0.7, 0, frame) | |
| def draw_contours(frame, landmarks, w, h): | |
| for conn in _CONTOURS: | |
| pt1 = _lm_to_px(landmarks, conn[0], w, h) | |
| pt2 = _lm_to_px(landmarks, conn[1], w, h) | |
| cv2.line(frame, pt1, pt2, CYAN, 1, cv2.LINE_AA) | |
| for indices in [_LEFT_EYEBROW, _RIGHT_EYEBROW]: | |
| for i in range(len(indices) - 1): | |
| pt1 = _lm_to_px(landmarks, indices[i], w, h) | |
| pt2 = _lm_to_px(landmarks, indices[i + 1], w, h) | |
| cv2.line(frame, pt1, pt2, LIGHT_GREEN, 2, cv2.LINE_AA) | |
| for i in range(len(_NOSE_BRIDGE) - 1): | |
| pt1 = _lm_to_px(landmarks, _NOSE_BRIDGE[i], w, h) | |
| pt2 = _lm_to_px(landmarks, _NOSE_BRIDGE[i + 1], w, h) | |
| cv2.line(frame, pt1, pt2, ORANGE, 1, cv2.LINE_AA) | |
| for i in range(len(_LIPS_OUTER) - 1): | |
| pt1 = _lm_to_px(landmarks, _LIPS_OUTER[i], w, h) | |
| pt2 = _lm_to_px(landmarks, _LIPS_OUTER[i + 1], w, h) | |
| cv2.line(frame, pt1, pt2, MAGENTA, 1, cv2.LINE_AA) | |
| for i in range(len(_LIPS_INNER) - 1): | |
| pt1 = _lm_to_px(landmarks, _LIPS_INNER[i], w, h) | |
| pt2 = _lm_to_px(landmarks, _LIPS_INNER[i + 1], w, h) | |
| cv2.line(frame, pt1, pt2, (200, 0, 200), 1, cv2.LINE_AA) | |
| def draw_eyes_and_irises(frame, landmarks, w, h): | |
| left_pts = np.array( | |
| [_lm_to_px(landmarks, i, w, h) for i in FaceMeshDetector.LEFT_EYE_INDICES], | |
| dtype=np.int32, | |
| ) | |
| cv2.polylines(frame, [left_pts], True, GREEN, 2, cv2.LINE_AA) | |
| right_pts = np.array( | |
| [_lm_to_px(landmarks, i, w, h) for i in FaceMeshDetector.RIGHT_EYE_INDICES], | |
| dtype=np.int32, | |
| ) | |
| cv2.polylines(frame, [right_pts], True, GREEN, 2, cv2.LINE_AA) | |
| for indices in [_LEFT_EAR_POINTS, _RIGHT_EAR_POINTS]: | |
| for idx in indices: | |
| pt = _lm_to_px(landmarks, idx, w, h) | |
| cv2.circle(frame, pt, 3, YELLOW, -1, cv2.LINE_AA) | |
| for iris_indices, eye_inner, eye_outer in [ | |
| (FaceMeshDetector.LEFT_IRIS_INDICES, 133, 33), | |
| (FaceMeshDetector.RIGHT_IRIS_INDICES, 362, 263), | |
| ]: | |
| iris_pts = np.array( | |
| [_lm_to_px(landmarks, i, w, h) for i in iris_indices], | |
| dtype=np.int32, | |
| ) | |
| center = iris_pts[0] | |
| if len(iris_pts) >= 5: | |
| radii = [np.linalg.norm(iris_pts[j] - center) for j in range(1, 5)] | |
| radius = max(int(np.mean(radii)), 2) | |
| cv2.circle(frame, tuple(center), radius, MAGENTA, 2, cv2.LINE_AA) | |
| cv2.circle(frame, tuple(center), 2, WHITE, -1, cv2.LINE_AA) | |
| eye_center_x = (landmarks[eye_inner, 0] + landmarks[eye_outer, 0]) / 2.0 | |
| eye_center_y = (landmarks[eye_inner, 1] + landmarks[eye_outer, 1]) / 2.0 | |
| eye_center = (int(eye_center_x * w), int(eye_center_y * h)) | |
| dx = center[0] - eye_center[0] | |
| dy = center[1] - eye_center[1] | |
| gaze_end = (int(center[0] + dx * 3), int(center[1] + dy * 3)) | |
| cv2.line(frame, tuple(center), gaze_end, RED, 1, cv2.LINE_AA) | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--camera", type=int, default=0) | |
| parser.add_argument("--mlp-dir", type=str, default=None) | |
| parser.add_argument("--max-angle", type=float, default=22.0) | |
| parser.add_argument("--eye-model", type=str, default=None) | |
| parser.add_argument("--eye-backend", type=str, default="yolo", choices=["yolo", "geometric"]) | |
| parser.add_argument("--eye-blend", type=float, default=0.5) | |
| args = parser.parse_args() | |
| model_dir = args.mlp_dir or os.path.join(_PROJECT_ROOT, "checkpoints") | |
| detector = FaceMeshDetector() | |
| pipelines = {} | |
| available_modes = [] | |
| pipelines[MODE_GEO] = FaceMeshPipeline( | |
| max_angle=args.max_angle, | |
| eye_model_path=args.eye_model, | |
| eye_backend=args.eye_backend, | |
| eye_blend=args.eye_blend, | |
| detector=detector, | |
| ) | |
| available_modes.append(MODE_GEO) | |
| mlp_path, _, _ = _latest_model_artifacts(model_dir) | |
| if mlp_path is not None: | |
| try: | |
| pipelines[MODE_MLP] = MLPPipeline(model_dir=model_dir, detector=detector) | |
| available_modes.append(MODE_MLP) | |
| except Exception as e: | |
| print(f"[DEMO] MLP unavailable: {e}") | |
| try: | |
| pipelines[MODE_HYBRID] = HybridFocusPipeline( | |
| model_dir=model_dir, | |
| eye_model_path=args.eye_model, | |
| eye_backend=args.eye_backend, | |
| eye_blend=args.eye_blend, | |
| max_angle=args.max_angle, | |
| detector=detector, | |
| ) | |
| available_modes.append(MODE_HYBRID) | |
| except Exception as e: | |
| print(f"[DEMO] Hybrid unavailable: {e}") | |
| gru_arts = _load_gru_artifacts(model_dir) | |
| if gru_arts[0] is not None: | |
| try: | |
| pipelines[MODE_GRU] = GRUPipeline(model_dir=model_dir, detector=detector) | |
| available_modes.append(MODE_GRU) | |
| except Exception as e: | |
| print(f"[DEMO] GRU unavailable: {e}") | |
| current_mode = available_modes[0] | |
| pipeline = pipelines[current_mode] | |
| cap = cv2.VideoCapture(args.camera) | |
| if not cap.isOpened(): | |
| print("[DEMO] ERROR: Cannot open camera") | |
| return | |
| mode_hint = " ".join(f"{k+1}:{_MODE_NAMES[k]}" for k in available_modes) | |
| print(f"[DEMO] Available modes: {mode_hint}") | |
| print(f"[DEMO] Active: {_MODE_NAMES[current_mode]}") | |
| print("[DEMO] q=quit m=mesh 1-4=switch mode") | |
| prev_time = time.time() | |
| fps = 0.0 | |
| mesh_mode = MESH_FULL | |
| try: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| result = pipeline.process_frame(frame) | |
| now = time.time() | |
| fps = 0.9 * fps + 0.1 * (1.0 / max(now - prev_time, 1e-6)) | |
| prev_time = now | |
| h, w = frame.shape[:2] | |
| lm = result["landmarks"] | |
| if lm is not None: | |
| if mesh_mode == MESH_FULL: | |
| draw_tessellation(frame, lm, w, h) | |
| draw_contours(frame, lm, w, h) | |
| elif mesh_mode == MESH_CONTOURS: | |
| draw_contours(frame, lm, w, h) | |
| draw_eyes_and_irises(frame, lm, w, h) | |
| if hasattr(pipeline, "head_pose"): | |
| pipeline.head_pose.draw_axes(frame, lm) | |
| if result.get("left_bbox") and result.get("right_bbox"): | |
| lx1, ly1, lx2, ly2 = result["left_bbox"] | |
| rx1, ry1, rx2, ry2 = result["right_bbox"] | |
| cv2.rectangle(frame, (lx1, ly1), (lx2, ly2), YELLOW, 1) | |
| cv2.rectangle(frame, (rx1, ry1), (rx2, ry2), YELLOW, 1) | |
| # --- HUD --- | |
| status = "FOCUSED" if result["is_focused"] else "NOT FOCUSED" | |
| status_color = GREEN if result["is_focused"] else RED | |
| cv2.rectangle(frame, (0, 0), (w, 55), (0, 0, 0), -1) | |
| cv2.putText(frame, status, (10, 28), FONT, 0.8, status_color, 2, cv2.LINE_AA) | |
| mode_label = _MODE_NAMES[current_mode] | |
| cv2.putText(frame, f"{mode_label} {_MESH_NAMES[mesh_mode]} FPS:{fps:.0f}", | |
| (w - 340, 28), FONT, 0.45, WHITE, 1, cv2.LINE_AA) | |
| detail = "" | |
| if current_mode == MODE_GEO: | |
| sf = result.get("s_face", 0) | |
| se = result.get("s_eye", 0) | |
| rs = result.get("raw_score", 0) | |
| mar_s = f" MAR:{result['mar']:.2f}" if result.get("mar") is not None else "" | |
| detail = f"S_face:{sf:.2f} S_eye:{se:.2f}{mar_s} score:{rs:.2f}" | |
| elif current_mode == MODE_MLP: | |
| mp = result.get("mlp_prob", 0) | |
| rs = result.get("raw_score", 0) | |
| mar_s = f" MAR:{result['mar']:.2f}" if result.get("mar") is not None else "" | |
| detail = f"mlp_prob:{mp:.2f} score:{rs:.2f}{mar_s}" | |
| elif current_mode == MODE_GRU: | |
| gp = result.get("gru_prob", 0) | |
| rs = result.get("raw_score", 0) | |
| mar_s = f" MAR:{result['mar']:.2f}" if result.get("mar") is not None else "" | |
| detail = f"gru_prob:{gp:.2f} score:{rs:.2f}{mar_s}" | |
| elif current_mode == MODE_HYBRID: | |
| mp = result.get("mlp_prob", 0) | |
| gs = result.get("geo_score", 0) | |
| fs = result.get("focus_score", 0) | |
| mar_s = f" MAR:{result['mar']:.2f}" if result.get("mar") is not None else "" | |
| detail = f"focus:{fs:.2f} mlp:{mp:.2f} geo:{gs:.2f}{mar_s}" | |
| cv2.putText(frame, detail, (10, 48), FONT, 0.45, WHITE, 1, cv2.LINE_AA) | |
| if result.get("is_yawning"): | |
| cv2.putText(frame, "YAWN", (10, 75), FONT, 0.7, ORANGE, 2, cv2.LINE_AA) | |
| if result.get("yaw") is not None: | |
| cv2.putText( | |
| frame, | |
| f"yaw:{result['yaw']:+.0f} pitch:{result['pitch']:+.0f} roll:{result['roll']:+.0f}", | |
| (w - 280, 48), FONT, 0.4, (180, 180, 180), 1, cv2.LINE_AA, | |
| ) | |
| cv2.putText(frame, f"q:quit m:mesh {mode_hint}", | |
| (10, h - 10), FONT, 0.35, (150, 150, 150), 1, cv2.LINE_AA) | |
| cv2.imshow("FocusGuard", frame) | |
| key = cv2.waitKey(1) & 0xFF | |
| if key == ord("q"): | |
| break | |
| elif key == ord("m"): | |
| mesh_mode = (mesh_mode + 1) % 3 | |
| print(f"[DEMO] Mesh: {_MESH_NAMES[mesh_mode]}") | |
| elif key in _MODE_KEYS: | |
| requested = _MODE_KEYS[key] | |
| if requested in pipelines: | |
| current_mode = requested | |
| pipeline = pipelines[current_mode] | |
| print(f"[DEMO] Switched to {_MODE_NAMES[current_mode]}") | |
| else: | |
| print(f"[DEMO] {_MODE_NAMES[requested]} not available (no checkpoint)") | |
| finally: | |
| cap.release() | |
| cv2.destroyAllWindows() | |
| for p in pipelines.values(): | |
| p.close() | |
| detector.close() | |
| print("[DEMO] Done") | |
| if __name__ == "__main__": | |
| main() | |