Spaces:
Sleeping
Sleeping
| """ | |
| Live webcam: detect face, crop each eye, run open/closed classifier, show on screen. | |
| Requires: opencv-python, ultralytics, mediapipe (pip install mediapipe). | |
| Press 'q' to quit. | |
| """ | |
| import urllib.request | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| from ultralytics import YOLO | |
| try: | |
| import mediapipe as mp | |
| _mp_has_solutions = hasattr(mp, "solutions") | |
| except ImportError: | |
| mp = None | |
| _mp_has_solutions = False | |
| # New MediaPipe Tasks API (Face Landmarker) eye indices | |
| LEFT_EYE_INDICES_NEW = [263, 249, 390, 373, 374, 380, 381, 382, 362, 466, 388, 387, 386, 385, 384, 398] | |
| RIGHT_EYE_INDICES_NEW = [33, 7, 163, 144, 145, 153, 154, 155, 133, 246, 161, 160, 159, 158, 157, 173] | |
| # Old Face Mesh (solutions) indices | |
| LEFT_EYE_INDICES_OLD = [33, 160, 158, 133, 153, 144] | |
| RIGHT_EYE_INDICES_OLD = [362, 385, 387, 263, 373, 380] | |
| EYE_PADDING = 0.35 | |
| def find_weights(project_root: Path) -> Path | None: | |
| candidates = [ | |
| project_root / "weights" / "best.pt", | |
| project_root / "runs" / "classify" / "runs_cls" / "eye_open_closed_cpu" / "weights" / "best.pt", | |
| project_root / "runs" / "classify" / "runs_cls" / "eye_open_closed_cpu" / "weights" / "last.pt", | |
| ] | |
| return next((p for p in candidates if p.is_file()), None) | |
| def get_eye_roi(frame: np.ndarray, landmarks, indices: list[int]) -> np.ndarray | None: | |
| h, w = frame.shape[:2] | |
| pts = np.array([(int(landmarks[i].x * w), int(landmarks[i].y * h)) for i in indices]) | |
| x_min, y_min = pts.min(axis=0) | |
| x_max, y_max = pts.max(axis=0) | |
| dx = max(int((x_max - x_min) * EYE_PADDING), 8) | |
| dy = max(int((y_max - y_min) * EYE_PADDING), 8) | |
| x0 = max(0, x_min - dx) | |
| y0 = max(0, y_min - dy) | |
| x1 = min(w, x_max + dx) | |
| y1 = min(h, y_max + dy) | |
| if x1 <= x0 or y1 <= y0: | |
| return None | |
| return frame[y0:y1, x0:x1].copy() | |
| def _run_with_solutions(mp, model, cap): | |
| face_mesh = mp.solutions.face_mesh.FaceMesh( | |
| static_image_mode=False, | |
| max_num_faces=1, | |
| refine_landmarks=True, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5, | |
| ) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = face_mesh.process(rgb) | |
| left_label, left_conf = "—", 0.0 | |
| right_label, right_conf = "—", 0.0 | |
| if results.multi_face_landmarks: | |
| lm = results.multi_face_landmarks[0].landmark | |
| for roi, indices, side in [ | |
| (get_eye_roi(frame, lm, LEFT_EYE_INDICES_OLD), LEFT_EYE_INDICES_OLD, "left"), | |
| (get_eye_roi(frame, lm, RIGHT_EYE_INDICES_OLD), RIGHT_EYE_INDICES_OLD, "right"), | |
| ]: | |
| if roi is not None and roi.size > 0: | |
| try: | |
| pred = model.predict(roi, imgsz=224, device="cpu", verbose=False) | |
| if pred: | |
| r = pred[0] | |
| label = model.names[int(r.probs.top1)] | |
| conf = float(r.probs.top1conf) | |
| if side == "left": | |
| left_label, left_conf = label, conf | |
| else: | |
| right_label, right_conf = label, conf | |
| except Exception: | |
| pass | |
| cv2.putText(frame, f"L: {left_label} ({left_conf:.0%})", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| cv2.putText(frame, f"R: {right_label} ({right_conf:.0%})", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| cv2.imshow("Eye open/closed (q to quit)", frame) | |
| if cv2.waitKey(1) & 0xFF == ord("q"): | |
| break | |
| def _run_with_tasks(project_root: Path, model, cap): | |
| from mediapipe.tasks.python import BaseOptions | |
| from mediapipe.tasks.python.vision import FaceLandmarker, FaceLandmarkerOptions | |
| from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode | |
| from mediapipe.tasks.python.vision.core import image as image_lib | |
| model_path = project_root / "weights" / "face_landmarker.task" | |
| if not model_path.is_file(): | |
| print("Downloading face_landmarker.task ...") | |
| url = "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task" | |
| urllib.request.urlretrieve(url, model_path) | |
| print("Done.") | |
| options = FaceLandmarkerOptions( | |
| base_options=BaseOptions(model_asset_path=str(model_path)), | |
| running_mode=running_mode.VisionTaskRunningMode.IMAGE, | |
| num_faces=1, | |
| ) | |
| face_landmarker = FaceLandmarker.create_from_options(options) | |
| ImageFormat = image_lib.ImageFormat | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| left_label, left_conf = "—", 0.0 | |
| right_label, right_conf = "—", 0.0 | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| rgb_contiguous = np.ascontiguousarray(rgb) | |
| mp_image = image_lib.Image(ImageFormat.SRGB, rgb_contiguous) | |
| result = face_landmarker.detect(mp_image) | |
| if result.face_landmarks: | |
| lm = result.face_landmarks[0] | |
| for roi, side in [ | |
| (get_eye_roi(frame, lm, LEFT_EYE_INDICES_NEW), "left"), | |
| (get_eye_roi(frame, lm, RIGHT_EYE_INDICES_NEW), "right"), | |
| ]: | |
| if roi is not None and roi.size > 0: | |
| try: | |
| pred = model.predict(roi, imgsz=224, device="cpu", verbose=False) | |
| if pred: | |
| r = pred[0] | |
| label = model.names[int(r.probs.top1)] | |
| conf = float(r.probs.top1conf) | |
| if side == "left": | |
| left_label, left_conf = label, conf | |
| else: | |
| right_label, right_conf = label, conf | |
| except Exception: | |
| pass | |
| cv2.putText(frame, f"L: {left_label} ({left_conf:.0%})", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| cv2.putText(frame, f"R: {right_label} ({right_conf:.0%})", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| cv2.imshow("Eye open/closed (q to quit)", frame) | |
| if cv2.waitKey(1) & 0xFF == ord("q"): | |
| break | |
| def main(): | |
| project_root = Path(__file__).resolve().parent.parent | |
| weights = find_weights(project_root) | |
| if weights is None: | |
| print("Weights not found. Put best.pt in weights/ or runs/.../weights/ (from model team).") | |
| return | |
| if mp is None: | |
| print("MediaPipe required. Install: pip install mediapipe") | |
| return | |
| model = YOLO(str(weights)) | |
| cap = cv2.VideoCapture(0) | |
| if not cap.isOpened(): | |
| print("Could not open webcam.") | |
| return | |
| print("Live eye open/closed on your face. Press 'q' to quit.") | |
| try: | |
| if _mp_has_solutions: | |
| _run_with_solutions(mp, model, cap) | |
| else: | |
| _run_with_tasks(project_root, model, cap) | |
| finally: | |
| cap.release() | |
| cv2.destroyAllWindows() | |
| if __name__ == "__main__": | |
| main() | |