IntegrationTest / data_preparation /prepare_dataset.py
Yingtao-Zheng's picture
Add other files and folders, including data related, notebook, test and evaluation
24a5e7e
import os
import glob
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
try:
import torch
from torch.utils.data import Dataset, DataLoader
except ImportError: # pragma: no cover
torch = None
class Dataset: # type: ignore
pass
class _MissingTorchDataLoader: # type: ignore
def __init__(self, *args, **kwargs):
raise ImportError(
"PyTorch not installed"
)
DataLoader = _MissingTorchDataLoader # type: ignore
DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
SELECTED_FEATURES = {
"face_orientation": [
'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos'
],
"eye_behaviour": [
'ear_left', 'ear_right', 'ear_avg', 'mar',
'blink_rate', 'closure_duration', 'perclos', 'yawn_duration'
]
}
class FeatureVectorDataset(Dataset):
def __init__(self, features: np.ndarray, labels: np.ndarray):
self.features = torch.tensor(features, dtype=torch.float32)
self.labels = torch.tensor(labels, dtype=torch.long)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
# ── Low-level helpers ────────────────────────────────────────────────────
def _clean_npz(raw, names):
"""Apply clipping rules in-place. Shared by all loaders."""
for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]:
if col in names:
raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi)
for feat in ['ear_left', 'ear_right', 'ear_avg']:
if feat in names:
raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85)
return raw
def _load_one_npz(npz_path, target_features):
"""Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names)."""
data = np.load(npz_path, allow_pickle=True)
raw = data['features'].astype(np.float32)
labels = data['labels'].astype(np.int64)
names = list(data['feature_names'])
raw = _clean_npz(raw, names)
selected = [f for f in target_features if f in names]
idx = [names.index(f) for f in selected]
return raw[:, idx], labels, selected
# ── Public data loaders ──────────────────────────────────────────────────
def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None):
"""Load all collected_*/*.npz, clean, select features, concatenate.
Returns (X_all, y_all, all_feature_names).
"""
data_dir = data_dir or DATA_DIR
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
pattern = os.path.join(data_dir, "collected_*", "*.npz")
npz_files = sorted(glob.glob(pattern))
if not npz_files:
print("[DATA] Warning: No .npz files found. Falling back to synthetic.")
X, y = _generate_synthetic_data(model_name)
return X, y, target_features
all_X, all_y = [], []
all_names = None
for npz_path in npz_files:
X, y, names = _load_one_npz(npz_path, target_features)
if all_names is None:
all_names = names
all_X.append(X)
all_y.append(y)
print(f"[DATA] + {os.path.basename(npz_path)}: {X.shape[0]} samples")
X_all = np.concatenate(all_X, axis=0)
y_all = np.concatenate(all_y, axis=0)
print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': "
f"{X_all.shape[0]} total samples, {X_all.shape[1]} features")
return X_all, y_all, all_names
def load_per_person(model_name: str = "face_orientation", data_dir: str = None):
"""Load collected_*/*.npz grouped by person (folder name).
Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays.
Also returns (X_all, y_all) as pooled data.
"""
data_dir = data_dir or DATA_DIR
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
pattern = os.path.join(data_dir, "collected_*", "*.npz")
npz_files = sorted(glob.glob(pattern))
if not npz_files:
raise FileNotFoundError(f"No .npz files matching {pattern}")
by_person = {}
all_X, all_y = [], []
for npz_path in npz_files:
folder = os.path.basename(os.path.dirname(npz_path))
person = folder.replace("collected_", "", 1)
X, y, _ = _load_one_npz(npz_path, target_features)
all_X.append(X)
all_y.append(y)
if person not in by_person:
by_person[person] = []
by_person[person].append((X, y))
print(f"[DATA] + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples")
for person, chunks in by_person.items():
by_person[person] = (
np.concatenate([c[0] for c in chunks], axis=0),
np.concatenate([c[1] for c in chunks], axis=0),
)
X_all = np.concatenate(all_X, axis=0)
y_all = np.concatenate(all_y, axis=0)
print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features")
return by_person, X_all, y_all
def load_raw_npz(npz_path):
"""Load a single .npz without cleaning or feature selection. For exploration notebooks."""
data = np.load(npz_path, allow_pickle=True)
features = data['features'].astype(np.float32)
labels = data['labels'].astype(np.int64)
names = list(data['feature_names'])
return features, labels, names
# ── Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) ─
def _load_real_data(model_name: str):
X, y, _ = load_all_pooled(model_name)
return X, y
def _generate_synthetic_data(model_name: str):
target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
n = 500
d = len(target_features)
c = 2
rng = np.random.RandomState(42)
features = rng.randn(n, d).astype(np.float32)
labels = rng.randint(0, c, size=n).astype(np.int64)
print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes")
return features, labels
def _split_and_scale(features, labels, split_ratios, seed, scale):
"""Split data into train/val/test (stratified) and optionally scale."""
test_ratio = split_ratios[2]
val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1])
X_train_val, X_test, y_train_val, y_test = train_test_split(
features, labels, test_size=test_ratio, random_state=seed, stratify=labels,
)
X_train, X_val, y_train, y_val = train_test_split(
X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val,
)
scaler = None
if scale:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)
print("[DATA] Applied StandardScaler (fitted on training split)")
splits = {
"X_train": X_train, "y_train": y_train,
"X_val": X_val, "y_val": y_val,
"X_test": X_test, "y_test": y_test,
}
print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}")
return splits, scaler
def get_numpy_splits(model_name: str, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True):
"""Return raw numpy arrays for non-PyTorch models (e.g. XGBoost)."""
features, labels = _load_real_data(model_name)
num_features = features.shape[1]
num_classes = int(labels.max()) + 1
splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
return splits, num_features, num_classes, scaler
def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=(0.7, 0.15, 0.15), seed: int = 42, scale: bool = True):
"""Return PyTorch DataLoaders for neural-network models."""
features, labels = _load_real_data(model_name)
num_features = features.shape[1]
num_classes = int(labels.max()) + 1
splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"])
val_ds = FeatureVectorDataset(splits["X_val"], splits["y_val"])
test_ds = FeatureVectorDataset(splits["X_test"], splits["y_test"])
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False)
return train_loader, val_loader, test_loader, num_features, num_classes, scaler