Yingtao-Zheng's picture
Upload partially updated files
8bbb872
"""
Live webcam: detect face, crop each eye, run open/closed classifier, show on screen.
Requires: opencv-python, ultralytics, mediapipe (pip install mediapipe).
Press 'q' to quit.
"""
import urllib.request
from pathlib import Path
import cv2
import numpy as np
from ultralytics import YOLO
try:
import mediapipe as mp
_mp_has_solutions = hasattr(mp, "solutions")
except ImportError:
mp = None
_mp_has_solutions = False
# New MediaPipe Tasks API (Face Landmarker) eye indices
LEFT_EYE_INDICES_NEW = [263, 249, 390, 373, 374, 380, 381, 382, 362, 466, 388, 387, 386, 385, 384, 398]
RIGHT_EYE_INDICES_NEW = [33, 7, 163, 144, 145, 153, 154, 155, 133, 246, 161, 160, 159, 158, 157, 173]
# Old Face Mesh (solutions) indices
LEFT_EYE_INDICES_OLD = [33, 160, 158, 133, 153, 144]
RIGHT_EYE_INDICES_OLD = [362, 385, 387, 263, 373, 380]
EYE_PADDING = 0.35
def find_weights(project_root: Path) -> Path | None:
candidates = [
project_root / "weights" / "best.pt",
project_root / "runs" / "classify" / "runs_cls" / "eye_open_closed_cpu" / "weights" / "best.pt",
project_root / "runs" / "classify" / "runs_cls" / "eye_open_closed_cpu" / "weights" / "last.pt",
]
return next((p for p in candidates if p.is_file()), None)
def get_eye_roi(frame: np.ndarray, landmarks, indices: list[int]) -> np.ndarray | None:
h, w = frame.shape[:2]
pts = np.array([(int(landmarks[i].x * w), int(landmarks[i].y * h)) for i in indices])
x_min, y_min = pts.min(axis=0)
x_max, y_max = pts.max(axis=0)
dx = max(int((x_max - x_min) * EYE_PADDING), 8)
dy = max(int((y_max - y_min) * EYE_PADDING), 8)
x0 = max(0, x_min - dx)
y0 = max(0, y_min - dy)
x1 = min(w, x_max + dx)
y1 = min(h, y_max + dy)
if x1 <= x0 or y1 <= y0:
return None
return frame[y0:y1, x0:x1].copy()
def _run_with_solutions(mp, model, cap):
face_mesh = mp.solutions.face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
while True:
ret, frame = cap.read()
if not ret:
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb)
left_label, left_conf = "—", 0.0
right_label, right_conf = "—", 0.0
if results.multi_face_landmarks:
lm = results.multi_face_landmarks[0].landmark
for roi, indices, side in [
(get_eye_roi(frame, lm, LEFT_EYE_INDICES_OLD), LEFT_EYE_INDICES_OLD, "left"),
(get_eye_roi(frame, lm, RIGHT_EYE_INDICES_OLD), RIGHT_EYE_INDICES_OLD, "right"),
]:
if roi is not None and roi.size > 0:
try:
pred = model.predict(roi, imgsz=224, device="cpu", verbose=False)
if pred:
r = pred[0]
label = model.names[int(r.probs.top1)]
conf = float(r.probs.top1conf)
if side == "left":
left_label, left_conf = label, conf
else:
right_label, right_conf = label, conf
except Exception:
pass
cv2.putText(frame, f"L: {left_label} ({left_conf:.0%})", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
cv2.putText(frame, f"R: {right_label} ({right_conf:.0%})", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
cv2.imshow("Eye open/closed (q to quit)", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
def _run_with_tasks(project_root: Path, model, cap):
from mediapipe.tasks.python import BaseOptions
from mediapipe.tasks.python.vision import FaceLandmarker, FaceLandmarkerOptions
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode
from mediapipe.tasks.python.vision.core import image as image_lib
model_path = project_root / "weights" / "face_landmarker.task"
if not model_path.is_file():
print("Downloading face_landmarker.task ...")
url = "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task"
urllib.request.urlretrieve(url, model_path)
print("Done.")
options = FaceLandmarkerOptions(
base_options=BaseOptions(model_asset_path=str(model_path)),
running_mode=running_mode.VisionTaskRunningMode.IMAGE,
num_faces=1,
)
face_landmarker = FaceLandmarker.create_from_options(options)
ImageFormat = image_lib.ImageFormat
while True:
ret, frame = cap.read()
if not ret:
break
left_label, left_conf = "—", 0.0
right_label, right_conf = "—", 0.0
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
rgb_contiguous = np.ascontiguousarray(rgb)
mp_image = image_lib.Image(ImageFormat.SRGB, rgb_contiguous)
result = face_landmarker.detect(mp_image)
if result.face_landmarks:
lm = result.face_landmarks[0]
for roi, side in [
(get_eye_roi(frame, lm, LEFT_EYE_INDICES_NEW), "left"),
(get_eye_roi(frame, lm, RIGHT_EYE_INDICES_NEW), "right"),
]:
if roi is not None and roi.size > 0:
try:
pred = model.predict(roi, imgsz=224, device="cpu", verbose=False)
if pred:
r = pred[0]
label = model.names[int(r.probs.top1)]
conf = float(r.probs.top1conf)
if side == "left":
left_label, left_conf = label, conf
else:
right_label, right_conf = label, conf
except Exception:
pass
cv2.putText(frame, f"L: {left_label} ({left_conf:.0%})", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
cv2.putText(frame, f"R: {right_label} ({right_conf:.0%})", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
cv2.imshow("Eye open/closed (q to quit)", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
def main():
project_root = Path(__file__).resolve().parent.parent
weights = find_weights(project_root)
if weights is None:
print("Weights not found. Put best.pt in weights/ or runs/.../weights/ (from model team).")
return
if mp is None:
print("MediaPipe required. Install: pip install mediapipe")
return
model = YOLO(str(weights))
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Could not open webcam.")
return
print("Live eye open/closed on your face. Press 'q' to quit.")
try:
if _mp_has_solutions:
_run_with_solutions(mp, model, cap)
else:
_run_with_tasks(project_root, model, cap)
finally:
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()