Spaces:
Sleeping
Sleeping
| import os | |
| import glob | |
| import numpy as np | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.model_selection import train_test_split | |
| try: | |
| import torch | |
| from torch.utils.data import Dataset, DataLoader | |
| except ImportError: # pragma: no cover | |
| torch = None | |
| class Dataset: # type: ignore | |
| pass | |
| class _MissingTorchDataLoader: # type: ignore | |
| def __init__(self, *args, **kwargs): | |
| raise ImportError( | |
| "PyTorch not installed" | |
| ) | |
| DataLoader = _MissingTorchDataLoader # type: ignore | |
| DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") | |
| SELECTED_FEATURES = { | |
| "face_orientation": [ | |
| 'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch', | |
| 'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos' | |
| ], | |
| "eye_behaviour": [ | |
| 'ear_left', 'ear_right', 'ear_avg', 'mar', | |
| 'blink_rate', 'closure_duration', 'perclos', 'yawn_duration' | |
| ] | |
| } | |
| class FeatureVectorDataset(Dataset): | |
| def __init__(self, features: np.ndarray, labels: np.ndarray): | |
| self.features = torch.tensor(features, dtype=torch.float32) | |
| self.labels = torch.tensor(labels, dtype=torch.long) | |
| def __len__(self): | |
| return len(self.labels) | |
| def __getitem__(self, idx): | |
| return self.features[idx], self.labels[idx] | |
| # ββ Low-level helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _clean_npz(raw, names): | |
| """Apply clipping rules in-place. Shared by all loaders.""" | |
| for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]: | |
| if col in names: | |
| raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi) | |
| for feat in ['ear_left', 'ear_right', 'ear_avg']: | |
| if feat in names: | |
| raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85) | |
| return raw | |
| def _load_one_npz(npz_path, target_features): | |
| """Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names).""" | |
| data = np.load(npz_path, allow_pickle=True) | |
| raw = data['features'].astype(np.float32) | |
| labels = data['labels'].astype(np.int64) | |
| names = list(data['feature_names']) | |
| raw = _clean_npz(raw, names) | |
| selected = [f for f in target_features if f in names] | |
| idx = [names.index(f) for f in selected] | |
| return raw[:, idx], labels, selected | |
| # ββ Public data loaders ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None): | |
| """Load all collected_*/*.npz, clean, select features, concatenate. | |
| Returns (X_all, y_all, all_feature_names). | |
| """ | |
| data_dir = data_dir or DATA_DIR | |
| target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) | |
| pattern = os.path.join(data_dir, "collected_*", "*.npz") | |
| npz_files = sorted(glob.glob(pattern)) | |
| if not npz_files: | |
| print("[DATA] Warning: No .npz files found. Falling back to synthetic.") | |
| X, y = _generate_synthetic_data(model_name) | |
| return X, y, target_features | |
| all_X, all_y = [], [] | |
| all_names = None | |
| for npz_path in npz_files: | |
| X, y, names = _load_one_npz(npz_path, target_features) | |
| if all_names is None: | |
| all_names = names | |
| all_X.append(X) | |
| all_y.append(y) | |
| print(f"[DATA] + {os.path.basename(npz_path)}: {X.shape[0]} samples") | |
| X_all = np.concatenate(all_X, axis=0) | |
| y_all = np.concatenate(all_y, axis=0) | |
| print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': " | |
| f"{X_all.shape[0]} total samples, {X_all.shape[1]} features") | |
| return X_all, y_all, all_names | |
| def load_per_person(model_name: str = "face_orientation", data_dir: str = None): | |
| """Load collected_*/*.npz grouped by person (folder name). | |
| Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays. | |
| Also returns (X_all, y_all) as pooled data. | |
| """ | |
| data_dir = data_dir or DATA_DIR | |
| target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) | |
| pattern = os.path.join(data_dir, "collected_*", "*.npz") | |
| npz_files = sorted(glob.glob(pattern)) | |
| if not npz_files: | |
| raise FileNotFoundError(f"No .npz files matching {pattern}") | |
| by_person = {} | |
| all_X, all_y = [], [] | |
| for npz_path in npz_files: | |
| folder = os.path.basename(os.path.dirname(npz_path)) | |
| person = folder.replace("collected_", "", 1) | |
| X, y, _ = _load_one_npz(npz_path, target_features) | |
| all_X.append(X) | |
| all_y.append(y) | |
| if person not in by_person: | |
| by_person[person] = [] | |
| by_person[person].append((X, y)) | |
| print(f"[DATA] + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples") | |
| for person, chunks in by_person.items(): | |
| by_person[person] = ( | |
| np.concatenate([c[0] for c in chunks], axis=0), | |
| np.concatenate([c[1] for c in chunks], axis=0), | |
| ) | |
| X_all = np.concatenate(all_X, axis=0) | |
| y_all = np.concatenate(all_y, axis=0) | |
| print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features") | |
| return by_person, X_all, y_all | |
| def load_raw_npz(npz_path): | |
| """Load a single .npz without cleaning or feature selection. For exploration notebooks.""" | |
| data = np.load(npz_path, allow_pickle=True) | |
| features = data['features'].astype(np.float32) | |
| labels = data['labels'].astype(np.int64) | |
| names = list(data['feature_names']) | |
| return features, labels, names | |
| # ββ Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) β | |
| def _load_real_data(model_name: str): | |
| X, y, _ = load_all_pooled(model_name) | |
| return X, y | |
| def _generate_synthetic_data(model_name: str): | |
| target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"]) | |
| n = 500 | |
| d = len(target_features) | |
| c = 2 | |
| rng = np.random.RandomState(42) | |
| features = rng.randn(n, d).astype(np.float32) | |
| labels = rng.randint(0, c, size=n).astype(np.int64) | |
| print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes") | |
| return features, labels | |
| def _split_and_scale(features, labels, split_ratios, seed, scale): | |
| """Split data into train/val/test (stratified) and optionally scale.""" | |
| test_ratio = split_ratios[2] | |
| val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1]) | |
| X_train_val, X_test, y_train_val, y_test = train_test_split( | |
| features, labels, test_size=test_ratio, random_state=seed, stratify=labels, | |
| ) | |
| X_train, X_val, y_train, y_val = train_test_split( | |
| X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val, | |
| ) | |
| scaler = None | |
| if scale: | |
| scaler = StandardScaler() | |
| X_train = scaler.fit_transform(X_train) | |
| X_val = scaler.transform(X_val) | |
| X_test = scaler.transform(X_test) | |
| print("[DATA] Applied StandardScaler (fitted on training split)") | |
| splits = { | |
| "X_train": X_train, "y_train": y_train, | |
| "X_val": X_val, "y_val": y_val, | |
| "X_test": X_test, "y_test": y_test, | |
| } | |
| print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}") | |
| return splits, scaler | |
| def get_numpy_splits(model_name: str, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True): | |
| """Return raw numpy arrays for non-PyTorch models (e.g. XGBoost).""" | |
| features, labels = _load_real_data(model_name) | |
| num_features = features.shape[1] | |
| num_classes = int(labels.max()) + 1 | |
| splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale) | |
| return splits, num_features, num_classes, scaler | |
| def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True): | |
| """Return PyTorch DataLoaders for neural-network models.""" | |
| features, labels = _load_real_data(model_name) | |
| num_features = features.shape[1] | |
| num_classes = int(labels.max()) + 1 | |
| splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale) | |
| train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"]) | |
| val_ds = FeatureVectorDataset(splits["X_val"], splits["y_val"]) | |
| test_ds = FeatureVectorDataset(splits["X_test"], splits["y_test"]) | |
| train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) | |
| val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False) | |
| test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False) | |
| return train_loader, val_loader, test_loader, num_features, num_classes, scaler | |