deepfake-server / src /data /extract_eye_sequences.py
DevQueen's picture
Sync from GitHub via hub-sync
1dc2504 verified
Raw
History Blame Contribute Delete
4.28 kB
from __future__ import annotations
import argparse
from pathlib import Path
from typing import List, Tuple
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
from tqdm import tqdm
LEFT_EYE_IDX = [33, 133, 159, 145]
RIGHT_EYE_IDX = [362, 263, 386, 374]
def eye_bbox(landmarks, image_w: int, image_h: int, indices: List[int], pad: int = 8) -> Tuple[int, int, int, int]:
pts = np.array([(int(landmarks[i].x * image_w), int(landmarks[i].y * image_h)) for i in indices])
x1, y1 = pts.min(axis=0)
x2, y2 = pts.max(axis=0)
return max(0, x1 - pad), max(0, y1 - pad), min(image_w, x2 + pad), min(image_h, y2 + pad)
def process_frame(frame: np.ndarray, face_mesh) -> Tuple[np.ndarray, float]:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
res = face_mesh.process(rgb)
if not res.multi_face_landmarks:
return None, 0.0
h, w = frame.shape[:2]
lm = res.multi_face_landmarks[0].landmark
lx1, ly1, lx2, ly2 = eye_bbox(lm, w, h, LEFT_EYE_IDX)
rx1, ry1, rx2, ry2 = eye_bbox(lm, w, h, RIGHT_EYE_IDX)
left = frame[ly1:ly2, lx1:lx2]
right = frame[ry1:ry2, rx1:rx2]
if left.size == 0 or right.size == 0:
return None, 0.0
left = cv2.resize(left, (112, 112))
right = cv2.resize(right, (112, 112))
eye_pair = np.concatenate([left, right], axis=1)
left_open = abs(lm[LEFT_EYE_IDX[2]].y - lm[LEFT_EYE_IDX[3]].y)
right_open = abs(lm[RIGHT_EYE_IDX[2]].y - lm[RIGHT_EYE_IDX[3]].y)
ear_proxy = float((left_open + right_open) / 2.0)
return eye_pair, ear_proxy
def sample_sequences(frames: List[np.ndarray], ear: List[float], seq_len: int) -> Tuple[List[np.ndarray], List[np.ndarray]]:
x, b = [], []
for i in range(0, len(frames) - seq_len + 1, seq_len):
x.append(np.stack(frames[i : i + seq_len], axis=0))
b.append(np.array(ear[i : i + seq_len], dtype=np.float32))
return x, b
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--metadata", type=Path, required=True)
parser.add_argument("--out-root", type=Path, required=True)
parser.add_argument("--sequence-length", type=int, default=32)
args = parser.parse_args()
df = pd.read_csv(args.metadata)
mesh = mp.solutions.face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
records = []
for row in tqdm(df.to_dict(orient="records"), desc="Extracting eye sequences"):
frame_dir = Path(row["frame_dir"])
frame_paths = sorted(frame_dir.glob("*.jpg"))
eye_frames: List[np.ndarray] = []
ear_values: List[float] = []
for fp in frame_paths:
frame = cv2.imread(str(fp))
if frame is None:
continue
eye_img, ear = process_frame(frame, mesh)
if eye_img is None:
continue
eye_frames.append(eye_img)
ear_values.append(ear)
sequences, blink = sample_sequences(eye_frames, ear_values, args.sequence_length)
video_id = Path(row["video_path"]).stem
saved = 0
for idx, (seq, blink_seq) in enumerate(zip(sequences, blink)):
out_dir = args.out_root / "sequences" / row["dataset"] / row["split"] / video_id
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{idx:03d}.npz"
np.savez_compressed(out_path, frames=seq, blink=blink_seq, label=row["label"])
saved += 1
if saved > 0:
records.append(
{
"dataset": row["dataset"],
"video_path": row["video_path"],
"identity": row["identity"],
"split": row["split"],
"label": row["label"],
"sequence_count": saved,
"sequence_dir": str((args.out_root / "sequences" / row["dataset"] / row["split"] / video_id).resolve()),
}
)
out_csv = args.out_root / "metadata_sequences.csv"
pd.DataFrame(records).to_csv(out_csv, index=False)
print(f"Saved sequence metadata: {out_csv}")
if __name__ == "__main__":
main()