File size: 8,959 Bytes
24a5e7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import os
import glob

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

try:
    import torch
    from torch.utils.data import Dataset, DataLoader
except ImportError:  # pragma: no cover
    torch = None

    class Dataset:  # type: ignore
        pass

    class _MissingTorchDataLoader:  # type: ignore
        def __init__(self, *args, **kwargs):
            raise ImportError(
                "PyTorch not installed"
            )

    DataLoader = _MissingTorchDataLoader  # type: ignore

DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")

SELECTED_FEATURES = {
    "face_orientation": [
        'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
        'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos'
    ],
    "eye_behaviour": [
        'ear_left', 'ear_right', 'ear_avg', 'mar',
        'blink_rate', 'closure_duration', 'perclos', 'yawn_duration'
    ]
}


class FeatureVectorDataset(Dataset):
    def __init__(self, features: np.ndarray, labels: np.ndarray):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]


# ── Low-level helpers ────────────────────────────────────────────────────

def _clean_npz(raw, names):
    """Apply clipping rules in-place. Shared by all loaders."""
    for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]:
        if col in names:
            raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi)
    for feat in ['ear_left', 'ear_right', 'ear_avg']:
        if feat in names:
            raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85)
    return raw


def _load_one_npz(npz_path, target_features):
    """Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names)."""
    data = np.load(npz_path, allow_pickle=True)
    raw = data['features'].astype(np.float32)
    labels = data['labels'].astype(np.int64)
    names = list(data['feature_names'])
    raw = _clean_npz(raw, names)
    selected = [f for f in target_features if f in names]
    idx = [names.index(f) for f in selected]
    return raw[:, idx], labels, selected


# ── Public data loaders ──────────────────────────────────────────────────

def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None):
    """Load all collected_*/*.npz, clean, select features, concatenate.

    Returns (X_all, y_all, all_feature_names).
    """
    data_dir = data_dir or DATA_DIR
    target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
    pattern = os.path.join(data_dir, "collected_*", "*.npz")
    npz_files = sorted(glob.glob(pattern))

    if not npz_files:
        print("[DATA] Warning: No .npz files found. Falling back to synthetic.")
        X, y = _generate_synthetic_data(model_name)
        return X, y, target_features

    all_X, all_y = [], []
    all_names = None
    for npz_path in npz_files:
        X, y, names = _load_one_npz(npz_path, target_features)
        if all_names is None:
            all_names = names
        all_X.append(X)
        all_y.append(y)
        print(f"[DATA]   + {os.path.basename(npz_path)}: {X.shape[0]} samples")

    X_all = np.concatenate(all_X, axis=0)
    y_all = np.concatenate(all_y, axis=0)
    print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': "
          f"{X_all.shape[0]} total samples, {X_all.shape[1]} features")
    return X_all, y_all, all_names


def load_per_person(model_name: str = "face_orientation", data_dir: str = None):
    """Load collected_*/*.npz grouped by person (folder name).

    Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays.
    Also returns (X_all, y_all) as pooled data.
    """
    data_dir = data_dir or DATA_DIR
    target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
    pattern = os.path.join(data_dir, "collected_*", "*.npz")
    npz_files = sorted(glob.glob(pattern))

    if not npz_files:
        raise FileNotFoundError(f"No .npz files matching {pattern}")

    by_person = {}
    all_X, all_y = [], []
    for npz_path in npz_files:
        folder = os.path.basename(os.path.dirname(npz_path))
        person = folder.replace("collected_", "", 1)
        X, y, _ = _load_one_npz(npz_path, target_features)
        all_X.append(X)
        all_y.append(y)
        if person not in by_person:
            by_person[person] = []
        by_person[person].append((X, y))
        print(f"[DATA]   + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples")

    for person, chunks in by_person.items():
        by_person[person] = (
            np.concatenate([c[0] for c in chunks], axis=0),
            np.concatenate([c[1] for c in chunks], axis=0),
        )

    X_all = np.concatenate(all_X, axis=0)
    y_all = np.concatenate(all_y, axis=0)
    print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features")
    return by_person, X_all, y_all


def load_raw_npz(npz_path):
    """Load a single .npz without cleaning or feature selection. For exploration notebooks."""
    data = np.load(npz_path, allow_pickle=True)
    features = data['features'].astype(np.float32)
    labels = data['labels'].astype(np.int64)
    names = list(data['feature_names'])
    return features, labels, names


# ── Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) ─

def _load_real_data(model_name: str):
    X, y, _ = load_all_pooled(model_name)
    return X, y


def _generate_synthetic_data(model_name: str):
    target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
    n = 500
    d = len(target_features)
    c = 2
    rng = np.random.RandomState(42)
    features = rng.randn(n, d).astype(np.float32)
    labels = rng.randint(0, c, size=n).astype(np.int64)
    print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes")
    return features, labels


def _split_and_scale(features, labels, split_ratios, seed, scale):
    """Split data into train/val/test (stratified) and optionally scale."""
    test_ratio = split_ratios[2]
    val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1])

    X_train_val, X_test, y_train_val, y_test = train_test_split(
        features, labels, test_size=test_ratio, random_state=seed, stratify=labels,
    )
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val,
    )

    scaler = None
    if scale:
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_val = scaler.transform(X_val)
        X_test = scaler.transform(X_test)
        print("[DATA] Applied StandardScaler (fitted on training split)")

    splits = {
        "X_train": X_train, "y_train": y_train,
        "X_val": X_val,     "y_val": y_val,
        "X_test": X_test,   "y_test": y_test,
    }

    print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}")
    return splits, scaler


def get_numpy_splits(model_name: str, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True):
    """Return raw numpy arrays for non-PyTorch models (e.g. XGBoost)."""
    features, labels = _load_real_data(model_name)
    num_features = features.shape[1]
    num_classes = int(labels.max()) + 1
    splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
    return splits, num_features, num_classes, scaler


def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True):
    """Return PyTorch DataLoaders for neural-network models."""
    features, labels = _load_real_data(model_name)
    num_features = features.shape[1]
    num_classes = int(labels.max()) + 1
    splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)

    train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"])
    val_ds   = FeatureVectorDataset(splits["X_val"],   splits["y_val"])
    test_ds  = FeatureVectorDataset(splits["X_test"],  splits["y_test"])

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False)
    test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, num_features, num_classes, scaler