Spaces:
Sleeping
Sleeping
File size: 8,959 Bytes
24a5e7e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | import os
import glob
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
try:
import torch
from torch.utils.data import Dataset, DataLoader
except ImportError: # pragma: no cover
torch = None
class Dataset: # type: ignore
pass
class _MissingTorchDataLoader: # type: ignore
def __init__(self, *args, **kwargs):
raise ImportError(
"PyTorch not installed"
)
DataLoader = _MissingTorchDataLoader # type: ignore
DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
SELECTED_FEATURES = {
"face_orientation": [
'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos'
],
"eye_behaviour": [
'ear_left', 'ear_right', 'ear_avg', 'mar',
'blink_rate', 'closure_duration', 'perclos', 'yawn_duration'
]
}
class FeatureVectorDataset(Dataset):
def __init__(self, features: np.ndarray, labels: np.ndarray):
self.features = torch.tensor(features, dtype=torch.float32)
self.labels = torch.tensor(labels, dtype=torch.long)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
# ββ Low-level helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _clean_npz(raw, names):
"""Apply clipping rules in-place. Shared by all loaders."""
for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]:
if col in names:
raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi)
for feat in ['ear_left', 'ear_right', 'ear_avg']:
if feat in names:
raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85)
return raw
def _load_one_npz(npz_path, target_features):
"""Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names)."""
data = np.load(npz_path, allow_pickle=True)
raw = data['features'].astype(np.float32)
labels = data['labels'].astype(np.int64)
names = list(data['feature_names'])
raw = _clean_npz(raw, names)
selected = [f for f in target_features if f in names]
idx = [names.index(f) for f in selected]
return raw[:, idx], labels, selected
# ββ Public data loaders ββββββββββββββββββββββββββββββββββββββββββββββββββ
def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None):
"""Load all collected_*/*.npz, clean, select features, concatenate.
Returns (X_all, y_all, all_feature_names).
"""
data_dir = data_dir or DATA_DIR
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
pattern = os.path.join(data_dir, "collected_*", "*.npz")
npz_files = sorted(glob.glob(pattern))
if not npz_files:
print("[DATA] Warning: No .npz files found. Falling back to synthetic.")
X, y = _generate_synthetic_data(model_name)
return X, y, target_features
all_X, all_y = [], []
all_names = None
for npz_path in npz_files:
X, y, names = _load_one_npz(npz_path, target_features)
if all_names is None:
all_names = names
all_X.append(X)
all_y.append(y)
print(f"[DATA] + {os.path.basename(npz_path)}: {X.shape[0]} samples")
X_all = np.concatenate(all_X, axis=0)
y_all = np.concatenate(all_y, axis=0)
print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': "
f"{X_all.shape[0]} total samples, {X_all.shape[1]} features")
return X_all, y_all, all_names
def load_per_person(model_name: str = "face_orientation", data_dir: str = None):
"""Load collected_*/*.npz grouped by person (folder name).
Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays.
Also returns (X_all, y_all) as pooled data.
"""
data_dir = data_dir or DATA_DIR
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
pattern = os.path.join(data_dir, "collected_*", "*.npz")
npz_files = sorted(glob.glob(pattern))
if not npz_files:
raise FileNotFoundError(f"No .npz files matching {pattern}")
by_person = {}
all_X, all_y = [], []
for npz_path in npz_files:
folder = os.path.basename(os.path.dirname(npz_path))
person = folder.replace("collected_", "", 1)
X, y, _ = _load_one_npz(npz_path, target_features)
all_X.append(X)
all_y.append(y)
if person not in by_person:
by_person[person] = []
by_person[person].append((X, y))
print(f"[DATA] + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples")
for person, chunks in by_person.items():
by_person[person] = (
np.concatenate([c[0] for c in chunks], axis=0),
np.concatenate([c[1] for c in chunks], axis=0),
)
X_all = np.concatenate(all_X, axis=0)
y_all = np.concatenate(all_y, axis=0)
print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features")
return by_person, X_all, y_all
def load_raw_npz(npz_path):
"""Load a single .npz without cleaning or feature selection. For exploration notebooks."""
data = np.load(npz_path, allow_pickle=True)
features = data['features'].astype(np.float32)
labels = data['labels'].astype(np.int64)
names = list(data['feature_names'])
return features, labels, names
# ββ Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) β
def _load_real_data(model_name: str):
X, y, _ = load_all_pooled(model_name)
return X, y
def _generate_synthetic_data(model_name: str):
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
n = 500
d = len(target_features)
c = 2
rng = np.random.RandomState(42)
features = rng.randn(n, d).astype(np.float32)
labels = rng.randint(0, c, size=n).astype(np.int64)
print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes")
return features, labels
def _split_and_scale(features, labels, split_ratios, seed, scale):
"""Split data into train/val/test (stratified) and optionally scale."""
test_ratio = split_ratios[2]
val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1])
X_train_val, X_test, y_train_val, y_test = train_test_split(
features, labels, test_size=test_ratio, random_state=seed, stratify=labels,
)
X_train, X_val, y_train, y_val = train_test_split(
X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val,
)
scaler = None
if scale:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)
print("[DATA] Applied StandardScaler (fitted on training split)")
splits = {
"X_train": X_train, "y_train": y_train,
"X_val": X_val, "y_val": y_val,
"X_test": X_test, "y_test": y_test,
}
print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}")
return splits, scaler
def get_numpy_splits(model_name: str, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True):
"""Return raw numpy arrays for non-PyTorch models (e.g. XGBoost)."""
features, labels = _load_real_data(model_name)
num_features = features.shape[1]
num_classes = int(labels.max()) + 1
splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
return splits, num_features, num_classes, scaler
def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True):
"""Return PyTorch DataLoaders for neural-network models."""
features, labels = _load_real_data(model_name)
num_features = features.shape[1]
num_classes = int(labels.max()) + 1
splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"])
val_ds = FeatureVectorDataset(splits["X_val"], splits["y_val"])
test_ds = FeatureVectorDataset(splits["X_test"], splits["y_test"])
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False)
return train_loader, val_loader, test_loader, num_features, num_classes, scaler
|