import os import glob import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split try: import torch from torch.utils.data import Dataset, DataLoader except ImportError: # pragma: no cover torch = None class Dataset: # type: ignore pass class _MissingTorchDataLoader: # type: ignore def __init__(self, *args, **kwargs): raise ImportError( "PyTorch not installed" ) DataLoader = _MissingTorchDataLoader # type: ignore DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") SELECTED_FEATURES = { "face_orientation": [ 'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch', 'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos' ], "eye_behaviour": [ 'ear_left', 'ear_right', 'ear_avg', 'mar', 'blink_rate', 'closure_duration', 'perclos', 'yawn_duration' ] } class FeatureVectorDataset(Dataset): def __init__(self, features: np.ndarray, labels: np.ndarray): self.features = torch.tensor(features, dtype=torch.float32) self.labels = torch.tensor(labels, dtype=torch.long) def __len__(self): return len(self.labels) def __getitem__(self, idx): return self.features[idx], self.labels[idx] # ── Low-level helpers ──────────────────────────────────────────────────── def _clean_npz(raw, names): """Apply clipping rules in-place. Shared by all loaders.""" for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]: if col in names: raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi) for feat in ['ear_left', 'ear_right', 'ear_avg']: if feat in names: raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85) return raw def _load_one_npz(npz_path, target_features): """Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names).""" data = np.load(npz_path, allow_pickle=True) raw = data['features'].astype(np.float32) labels = data['labels'].astype(np.int64) names = list(data['feature_names']) raw = _clean_npz(raw, names) selected = [f for f in target_features if f in names] idx = [names.index(f) for f in selected] return raw[:, idx], labels, selected # ── Public data loaders ────────────────────────────────────────────────── def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None): """Load all collected_*/*.npz, clean, select features, concatenate. Returns (X_all, y_all, all_feature_names). """ data_dir = data_dir or DATA_DIR target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) pattern = os.path.join(data_dir, "collected_*", "*.npz") npz_files = sorted(glob.glob(pattern)) if not npz_files: print("[DATA] Warning: No .npz files found. Falling back to synthetic.") X, y = _generate_synthetic_data(model_name) return X, y, target_features all_X, all_y = [], [] all_names = None for npz_path in npz_files: X, y, names = _load_one_npz(npz_path, target_features) if all_names is None: all_names = names all_X.append(X) all_y.append(y) print(f"[DATA] + {os.path.basename(npz_path)}: {X.shape[0]} samples") X_all = np.concatenate(all_X, axis=0) y_all = np.concatenate(all_y, axis=0) print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': " f"{X_all.shape[0]} total samples, {X_all.shape[1]} features") return X_all, y_all, all_names def load_per_person(model_name: str = "face_orientation", data_dir: str = None): """Load collected_*/*.npz grouped by person (folder name). Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays. Also returns (X_all, y_all) as pooled data. """ data_dir = data_dir or DATA_DIR target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) pattern = os.path.join(data_dir, "collected_*", "*.npz") npz_files = sorted(glob.glob(pattern)) if not npz_files: raise FileNotFoundError(f"No .npz files matching {pattern}") by_person = {} all_X, all_y = [], [] for npz_path in npz_files: folder = os.path.basename(os.path.dirname(npz_path)) person = folder.replace("collected_", "", 1) X, y, _ = _load_one_npz(npz_path, target_features) all_X.append(X) all_y.append(y) if person not in by_person: by_person[person] = [] by_person[person].append((X, y)) print(f"[DATA] + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples") for person, chunks in by_person.items(): by_person[person] = ( np.concatenate([c[0] for c in chunks], axis=0), np.concatenate([c[1] for c in chunks], axis=0), ) X_all = np.concatenate(all_X, axis=0) y_all = np.concatenate(all_y, axis=0) print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features") return by_person, X_all, y_all def load_raw_npz(npz_path): """Load a single .npz without cleaning or feature selection. For exploration notebooks.""" data = np.load(npz_path, allow_pickle=True) features = data['features'].astype(np.float32) labels = data['labels'].astype(np.int64) names = list(data['feature_names']) return features, labels, names # ── Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) ─ def _load_real_data(model_name: str): X, y, _ = load_all_pooled(model_name) return X, y def _generate_synthetic_data(model_name: str): target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) n = 500 d = len(target_features) c = 2 rng = np.random.RandomState(42) features = rng.randn(n, d).astype(np.float32) labels = rng.randint(0, c, size=n).astype(np.int64) print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes") return features, labels def _split_and_scale(features, labels, split_ratios, seed, scale): """Split data into train/val/test (stratified) and optionally scale.""" test_ratio = split_ratios[2] val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1]) X_train_val, X_test, y_train_val, y_test = train_test_split( features, labels, test_size=test_ratio, random_state=seed, stratify=labels, ) X_train, X_val, y_train, y_val = train_test_split( X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val, ) scaler = None if scale: scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_val = scaler.transform(X_val) X_test = scaler.transform(X_test) print("[DATA] Applied StandardScaler (fitted on training split)") splits = { "X_train": X_train, "y_train": y_train, "X_val": X_val, "y_val": y_val, "X_test": X_test, "y_test": y_test, } print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}") return splits, scaler def get_numpy_splits(model_name: str, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True): """Return raw numpy arrays for non-PyTorch models (e.g. XGBoost).""" features, labels = _load_real_data(model_name) num_features = features.shape[1] num_classes = int(labels.max()) + 1 splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale) return splits, num_features, num_classes, scaler def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True): """Return PyTorch DataLoaders for neural-network models.""" features, labels = _load_real_data(model_name) num_features = features.shape[1] num_classes = int(labels.max()) + 1 splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale) train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"]) val_ds = FeatureVectorDataset(splits["X_val"], splits["y_val"]) test_ds = FeatureVectorDataset(splits["X_test"], splits["y_test"]) train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False) return train_loader, val_loader, test_loader, num_features, num_classes, scaler