""" Live webcam: detect face, crop each eye, run open/closed classifier, show on screen. Requires: opencv-python, ultralytics, mediapipe (pip install mediapipe). Press 'q' to quit. """ import urllib.request from pathlib import Path import cv2 import numpy as np from ultralytics import YOLO try: import mediapipe as mp _mp_has_solutions = hasattr(mp, "solutions") except ImportError: mp = None _mp_has_solutions = False # New MediaPipe Tasks API (Face Landmarker) eye indices LEFT_EYE_INDICES_NEW = [263, 249, 390, 373, 374, 380, 381, 382, 362, 466, 388, 387, 386, 385, 384, 398] RIGHT_EYE_INDICES_NEW = [33, 7, 163, 144, 145, 153, 154, 155, 133, 246, 161, 160, 159, 158, 157, 173] # Old Face Mesh (solutions) indices LEFT_EYE_INDICES_OLD = [33, 160, 158, 133, 153, 144] RIGHT_EYE_INDICES_OLD = [362, 385, 387, 263, 373, 380] EYE_PADDING = 0.35 def find_weights(project_root: Path) -> Path | None: candidates = [ project_root / "weights" / "best.pt", project_root / "runs" / "classify" / "runs_cls" / "eye_open_closed_cpu" / "weights" / "best.pt", project_root / "runs" / "classify" / "runs_cls" / "eye_open_closed_cpu" / "weights" / "last.pt", ] return next((p for p in candidates if p.is_file()), None) def get_eye_roi(frame: np.ndarray, landmarks, indices: list[int]) -> np.ndarray | None: h, w = frame.shape[:2] pts = np.array([(int(landmarks[i].x * w), int(landmarks[i].y * h)) for i in indices]) x_min, y_min = pts.min(axis=0) x_max, y_max = pts.max(axis=0) dx = max(int((x_max - x_min) * EYE_PADDING), 8) dy = max(int((y_max - y_min) * EYE_PADDING), 8) x0 = max(0, x_min - dx) y0 = max(0, y_min - dy) x1 = min(w, x_max + dx) y1 = min(h, y_max + dy) if x1 <= x0 or y1 <= y0: return None return frame[y0:y1, x0:x1].copy() def _run_with_solutions(mp, model, cap): face_mesh = mp.solutions.face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5, ) while True: ret, frame = cap.read() if not ret: break rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_mesh.process(rgb) left_label, left_conf = "—", 0.0 right_label, right_conf = "—", 0.0 if results.multi_face_landmarks: lm = results.multi_face_landmarks[0].landmark for roi, indices, side in [ (get_eye_roi(frame, lm, LEFT_EYE_INDICES_OLD), LEFT_EYE_INDICES_OLD, "left"), (get_eye_roi(frame, lm, RIGHT_EYE_INDICES_OLD), RIGHT_EYE_INDICES_OLD, "right"), ]: if roi is not None and roi.size > 0: try: pred = model.predict(roi, imgsz=224, device="cpu", verbose=False) if pred: r = pred[0] label = model.names[int(r.probs.top1)] conf = float(r.probs.top1conf) if side == "left": left_label, left_conf = label, conf else: right_label, right_conf = label, conf except Exception: pass cv2.putText(frame, f"L: {left_label} ({left_conf:.0%})", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) cv2.putText(frame, f"R: {right_label} ({right_conf:.0%})", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) cv2.imshow("Eye open/closed (q to quit)", frame) if cv2.waitKey(1) & 0xFF == ord("q"): break def _run_with_tasks(project_root: Path, model, cap): from mediapipe.tasks.python import BaseOptions from mediapipe.tasks.python.vision import FaceLandmarker, FaceLandmarkerOptions from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode from mediapipe.tasks.python.vision.core import image as image_lib model_path = project_root / "weights" / "face_landmarker.task" if not model_path.is_file(): print("Downloading face_landmarker.task ...") url = "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task" urllib.request.urlretrieve(url, model_path) print("Done.") options = FaceLandmarkerOptions( base_options=BaseOptions(model_asset_path=str(model_path)), running_mode=running_mode.VisionTaskRunningMode.IMAGE, num_faces=1, ) face_landmarker = FaceLandmarker.create_from_options(options) ImageFormat = image_lib.ImageFormat while True: ret, frame = cap.read() if not ret: break left_label, left_conf = "—", 0.0 right_label, right_conf = "—", 0.0 rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) rgb_contiguous = np.ascontiguousarray(rgb) mp_image = image_lib.Image(ImageFormat.SRGB, rgb_contiguous) result = face_landmarker.detect(mp_image) if result.face_landmarks: lm = result.face_landmarks[0] for roi, side in [ (get_eye_roi(frame, lm, LEFT_EYE_INDICES_NEW), "left"), (get_eye_roi(frame, lm, RIGHT_EYE_INDICES_NEW), "right"), ]: if roi is not None and roi.size > 0: try: pred = model.predict(roi, imgsz=224, device="cpu", verbose=False) if pred: r = pred[0] label = model.names[int(r.probs.top1)] conf = float(r.probs.top1conf) if side == "left": left_label, left_conf = label, conf else: right_label, right_conf = label, conf except Exception: pass cv2.putText(frame, f"L: {left_label} ({left_conf:.0%})", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) cv2.putText(frame, f"R: {right_label} ({right_conf:.0%})", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) cv2.imshow("Eye open/closed (q to quit)", frame) if cv2.waitKey(1) & 0xFF == ord("q"): break def main(): project_root = Path(__file__).resolve().parent.parent weights = find_weights(project_root) if weights is None: print("Weights not found. Put best.pt in weights/ or runs/.../weights/ (from model team).") return if mp is None: print("MediaPipe required. Install: pip install mediapipe") return model = YOLO(str(weights)) cap = cv2.VideoCapture(0) if not cap.isOpened(): print("Could not open webcam.") return print("Live eye open/closed on your face. Press 'q' to quit.") try: if _mp_has_solutions: _run_with_solutions(mp, model, cap) else: _run_with_tasks(project_root, model, cap) finally: cap.release() cv2.destroyAllWindows() if __name__ == "__main__": main()