Spaces:
Running
Running
| """ | |
| Deep learning classifier comparison for AURIS. | |
| Trains and evaluates multiple neural network architectures on | |
| the 47 extracted audio features using stratified k-fold CV. | |
| Architectures: | |
| 1. Deep MLP (512-256-128-64) with BatchNorm + Dropout | |
| 2. 1D-CNN on feature vector (treats features as 1D signal) | |
| 3. Residual MLP (skip connections) | |
| 4. Attention MLP (self-attention over feature groups) | |
| Usage: | |
| python -m app.training.train_deep_classifiers ../DataSet/features.csv | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import json | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from torch.utils.data import DataLoader, TensorDataset | |
| from sklearn.model_selection import StratifiedKFold | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, precision_score, recall_score, roc_curve | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| SEED = 42 | |
| N_FOLDS = 5 | |
| EPOCHS = 100 | |
| PATIENCE = 10 | |
| BATCH_SIZE = 64 | |
| LR = 1e-3 | |
| def _optimal_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> float: | |
| """Youden's J: threshold maximising sensitivity + specificity - 1.""" | |
| fpr, tpr, thresholds = roc_curve(y_true, y_prob) | |
| j = tpr - fpr | |
| return float(thresholds[np.argmax(j)]) | |
| def set_seed(seed: int = SEED) -> None: | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(seed) | |
| class DeepMLP(nn.Module): | |
| def __init__(self, n_features: int) -> None: | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(n_features, 512), | |
| nn.BatchNorm1d(512), | |
| nn.ReLU(), | |
| nn.Dropout(0.4), | |
| nn.Linear(512, 256), | |
| nn.BatchNorm1d(256), | |
| nn.ReLU(), | |
| nn.Dropout(0.3), | |
| nn.Linear(256, 128), | |
| nn.BatchNorm1d(128), | |
| nn.ReLU(), | |
| nn.Dropout(0.2), | |
| nn.Linear(128, 64), | |
| nn.ReLU(), | |
| nn.Linear(64, 1), | |
| ) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| return self.net(x).squeeze(-1) | |
| class Conv1DClassifier(nn.Module): | |
| def __init__(self, n_features: int) -> None: | |
| super().__init__() | |
| self.conv = nn.Sequential( | |
| nn.Conv1d(1, 64, kernel_size=5, padding=2), | |
| nn.BatchNorm1d(64), | |
| nn.ReLU(), | |
| nn.Conv1d(64, 128, kernel_size=3, padding=1), | |
| nn.BatchNorm1d(128), | |
| nn.ReLU(), | |
| nn.AdaptiveAvgPool1d(1), | |
| ) | |
| self.fc = nn.Sequential( | |
| nn.Linear(128, 64), | |
| nn.ReLU(), | |
| nn.Dropout(0.3), | |
| nn.Linear(64, 1), | |
| ) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = x.unsqueeze(1) # (B, 1, F) | |
| x = self.conv(x).squeeze(-1) # (B, 128) | |
| return self.fc(x).squeeze(-1) | |
| class ResidualBlock(nn.Module): | |
| def __init__(self, dim: int, dropout: float = 0.2) -> None: | |
| super().__init__() | |
| self.block = nn.Sequential( | |
| nn.Linear(dim, dim), | |
| nn.BatchNorm1d(dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(dim, dim), | |
| nn.BatchNorm1d(dim), | |
| ) | |
| self.relu = nn.ReLU() | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| return self.relu(x + self.block(x)) | |
| class ResidualMLP(nn.Module): | |
| def __init__(self, n_features: int) -> None: | |
| super().__init__() | |
| self.input_proj = nn.Sequential( | |
| nn.Linear(n_features, 256), | |
| nn.BatchNorm1d(256), | |
| nn.ReLU(), | |
| ) | |
| self.res_blocks = nn.Sequential( | |
| ResidualBlock(256, 0.3), | |
| ResidualBlock(256, 0.2), | |
| ResidualBlock(256, 0.1), | |
| ) | |
| self.head = nn.Sequential( | |
| nn.Linear(256, 64), | |
| nn.ReLU(), | |
| nn.Linear(64, 1), | |
| ) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = self.input_proj(x) | |
| x = self.res_blocks(x) | |
| return self.head(x).squeeze(-1) | |
| class AttentionMLP(nn.Module): | |
| def __init__(self, n_features: int) -> None: | |
| super().__init__() | |
| self.proj = nn.Linear(n_features, 256) | |
| self.attn = nn.MultiheadAttention(256, num_heads=4, batch_first=True) | |
| self.norm = nn.LayerNorm(256) | |
| self.head = nn.Sequential( | |
| nn.Linear(256, 128), | |
| nn.ReLU(), | |
| nn.Dropout(0.3), | |
| nn.Linear(128, 1), | |
| ) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = self.proj(x) | |
| x = x.unsqueeze(1) # (B, 1, 256) | |
| x_chunk = x.expand(-1, 4, -1) # (B, 4, 256) - create sequence | |
| attn_out, _ = self.attn(x_chunk, x_chunk, x_chunk) | |
| x = self.norm(attn_out.mean(dim=1)) # (B, 256) | |
| return self.head(x).squeeze(-1) | |
| def load_data(csv_path: str | Path) -> tuple[np.ndarray, np.ndarray, list[str]]: | |
| _EXCLUDE = {"file_path", "label_int", "duration_sec", "sample_rate"} | |
| rows, labels = [], [] | |
| with open(csv_path, "r", encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| feature_cols = [c for c in reader.fieldnames if c not in _EXCLUDE] | |
| for row in reader: | |
| vals = [] | |
| for col in feature_cols: | |
| try: | |
| vals.append(float(row[col])) | |
| except (ValueError, KeyError): | |
| vals.append(0.0) | |
| rows.append(vals) | |
| labels.append(int(row["label_int"])) | |
| X = np.nan_to_num(np.array(rows, dtype=np.float32), nan=0.0) | |
| y = np.array(labels, dtype=np.int32) | |
| return X, y, feature_cols | |
| def train_one_fold( | |
| model: nn.Module, | |
| X_train: np.ndarray, y_train: np.ndarray, | |
| X_val: np.ndarray, y_val: np.ndarray, | |
| ) -> tuple[float, np.ndarray]: | |
| scaler = StandardScaler() | |
| X_tr = scaler.fit_transform(X_train) | |
| X_v = scaler.transform(X_val) | |
| train_ds = TensorDataset( | |
| torch.tensor(X_tr, dtype=torch.float32), | |
| torch.tensor(y_train, dtype=torch.float32), | |
| ) | |
| val_X = torch.tensor(X_v, dtype=torch.float32).to(DEVICE) | |
| val_y = torch.tensor(y_val, dtype=torch.float32) | |
| loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True) | |
| model = model.to(DEVICE) | |
| optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4) | |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( | |
| optimizer, mode="max", factor=0.5, patience=5 | |
| ) | |
| # pos_weight compensates for class imbalance (n_neg / n_pos) | |
| n_pos = max(int(y_train.sum()), 1) | |
| n_neg = len(y_train) - n_pos | |
| pos_weight = torch.tensor([n_neg / n_pos], dtype=torch.float32).to(DEVICE) | |
| criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) | |
| best_auc = 0.0 | |
| best_probs = None | |
| patience_ctr = 0 | |
| for epoch in range(EPOCHS): | |
| model.train() | |
| for bx, by in loader: | |
| bx, by = bx.to(DEVICE), by.to(DEVICE) | |
| optimizer.zero_grad() | |
| logits = model(bx) | |
| loss = criterion(logits, by) | |
| loss.backward() | |
| optimizer.step() | |
| model.eval() | |
| with torch.no_grad(): | |
| v_logits = model(val_X) | |
| v_probs = torch.sigmoid(v_logits).cpu().numpy() | |
| auc = roc_auc_score(y_val, v_probs) | |
| scheduler.step(auc) | |
| if auc > best_auc: | |
| best_auc = auc | |
| best_probs = v_probs.copy() | |
| patience_ctr = 0 | |
| else: | |
| patience_ctr += 1 | |
| if patience_ctr >= PATIENCE: | |
| break | |
| return best_auc, best_probs | |
| def evaluate_cv( | |
| model_class: type, | |
| X: np.ndarray, y: np.ndarray, | |
| n_features: int, | |
| ) -> dict: | |
| cv = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) | |
| all_probs = np.zeros(len(y)) | |
| aucs = [] | |
| t0 = time.time() | |
| for fold, (train_idx, val_idx) in enumerate(cv.split(X, y)): | |
| set_seed(SEED + fold) | |
| model = model_class(n_features) | |
| auc, probs = train_one_fold( | |
| model, | |
| X[train_idx], y[train_idx], | |
| X[val_idx], y[val_idx], | |
| ) | |
| all_probs[val_idx] = probs | |
| aucs.append(auc) | |
| print(f" Fold {fold+1}: AUC={auc:.4f}") | |
| elapsed = time.time() - t0 | |
| threshold = _optimal_threshold(y, all_probs) | |
| y_pred = (all_probs >= threshold).astype(int) | |
| return { | |
| "accuracy": round(float(accuracy_score(y, y_pred)), 4), | |
| "precision": round(float(precision_score(y, y_pred, zero_division=0)), 4), | |
| "recall": round(float(recall_score(y, y_pred, zero_division=0)), 4), | |
| "f1": round(float(f1_score(y, y_pred, zero_division=0)), 4), | |
| "roc_auc": round(float(roc_auc_score(y, all_probs)), 4), | |
| "optimal_threshold": round(threshold, 4), | |
| "fold_aucs": [round(a, 4) for a in aucs], | |
| "train_time_sec": round(elapsed, 1), | |
| } | |
| class TorchSklearnWrapper: | |
| """ | |
| Sklearn-compatible wrapper for trained PyTorch classifiers. | |
| Saves model class name + state dict so it can be pickled and reloaded. | |
| """ | |
| def __init__( | |
| self, | |
| model_class: type, | |
| n_features: int, | |
| state_dict: dict, | |
| scaler: StandardScaler, | |
| ) -> None: | |
| self.model_class_name = model_class.__name__ | |
| self._model_class = model_class | |
| self.n_features = n_features | |
| self.state_dict = state_dict | |
| self.scaler = scaler | |
| self.n_features_in_ = n_features | |
| def _build_model(self) -> nn.Module: | |
| model = self._model_class(self.n_features) | |
| model.load_state_dict(self.state_dict) | |
| model.eval() | |
| return model | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| model = self._build_model().to("cpu") | |
| X_scaled = self.scaler.transform(X) | |
| x_t = torch.tensor(X_scaled, dtype=torch.float32) | |
| with torch.no_grad(): | |
| logits = model(x_t) | |
| probs = torch.sigmoid(logits).numpy().flatten() | |
| return np.column_stack([1.0 - probs, probs]) | |
| def __getstate__(self) -> dict: | |
| state = self.__dict__.copy() | |
| state.pop("_model_class", None) | |
| return state | |
| def __setstate__(self, state: dict) -> None: | |
| self.__dict__.update(state) | |
| # Re-attach class from global lookup | |
| _CLASS_MAP = { | |
| "DeepMLP": DeepMLP, | |
| "Conv1DClassifier": Conv1DClassifier, | |
| "ResidualMLP": ResidualMLP, | |
| "AttentionMLP": AttentionMLP, | |
| } | |
| self._model_class = _CLASS_MAP.get(self.model_class_name, DeepMLP) | |
| def train_final_model( | |
| model_class: type, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| epochs: int = EPOCHS, | |
| patience: int = PATIENCE, | |
| ) -> TorchSklearnWrapper: | |
| """Train model on full dataset and return sklearn-compatible wrapper.""" | |
| from sklearn.model_selection import train_test_split | |
| scaler = StandardScaler() | |
| X_tr_raw, X_val_raw, y_tr, y_val = train_test_split( | |
| X, y, test_size=0.1, stratify=y, random_state=SEED | |
| ) | |
| X_tr = scaler.fit_transform(X_tr_raw) | |
| X_v = scaler.transform(X_val_raw) | |
| n_features = X.shape[1] | |
| model = model_class(n_features).to(DEVICE) | |
| optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4) | |
| n_pos = max(int(y_tr.sum()), 1) | |
| n_neg = len(y_tr) - n_pos | |
| pos_weight = torch.tensor([n_neg / n_pos], dtype=torch.float32).to(DEVICE) | |
| criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) | |
| loader = DataLoader( | |
| TensorDataset( | |
| torch.tensor(X_tr, dtype=torch.float32), | |
| torch.tensor(y_tr, dtype=torch.float32), | |
| ), | |
| batch_size=BATCH_SIZE, | |
| shuffle=True, | |
| ) | |
| val_X = torch.tensor(X_v, dtype=torch.float32).to(DEVICE) | |
| val_y = torch.tensor(y_val, dtype=torch.float32) | |
| best_auc = 0.0 | |
| best_state = None | |
| patience_ctr = 0 | |
| for epoch in range(epochs): | |
| model.train() | |
| for bx, by in loader: | |
| bx, by = bx.to(DEVICE), by.to(DEVICE) | |
| optimizer.zero_grad() | |
| criterion(model(bx), by).backward() | |
| optimizer.step() | |
| model.eval() | |
| with torch.no_grad(): | |
| probs = torch.sigmoid(model(val_X)).cpu().numpy() | |
| auc = roc_auc_score(val_y.numpy(), probs) | |
| if auc > best_auc: | |
| best_auc = auc | |
| best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} | |
| patience_ctr = 0 | |
| else: | |
| patience_ctr += 1 | |
| if patience_ctr >= patience: | |
| break | |
| return TorchSklearnWrapper(model_class, n_features, best_state or model.state_dict(), scaler) | |
| def _safe_name(name: str) -> str: | |
| return name.lower().replace(" ", "_").replace("(", "").replace(")", "").replace("-", "_") | |
| def main() -> None: | |
| import pickle | |
| csv_path = sys.argv[1] if len(sys.argv) > 1 else "../DataSet/features.csv" | |
| out_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("models") | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"Device: {DEVICE}") | |
| print(f"Loading: {csv_path}") | |
| X, y, feature_cols = load_data(csv_path) | |
| n_features = X.shape[1] | |
| print(f"Samples: {len(y)}, Features: {n_features}") | |
| print(f"AI: {np.sum(y == 1)}, Human: {np.sum(y == 0)}") | |
| model_classes = { | |
| "Deep MLP (512-256-128-64)": DeepMLP, | |
| "1D-CNN": Conv1DClassifier, | |
| "Residual MLP (3 blocks)": ResidualMLP, | |
| "Attention MLP": AttentionMLP, | |
| } | |
| all_results = {} | |
| for name, cls in model_classes.items(): | |
| print(f"\n{'='*60}") | |
| print(f" {name}") | |
| print(f"{'='*60}") | |
| result = evaluate_cv(cls, X, y, n_features) | |
| all_results[name] = {**result, "type": "deep_learning"} | |
| print(f" => Acc={result['accuracy']:.4f} AUC={result['roc_auc']:.4f} " | |
| f"F1={result['f1']:.4f} Time={result['train_time_sec']:.0f}s") | |
| print(f" Training final model for {name}...") | |
| wrapper = train_final_model(cls, X, y) | |
| pkl_path = out_dir / f"model_dl_{_safe_name(name)}.pkl" | |
| with open(pkl_path, "wb") as f: | |
| pickle.dump(wrapper, f) | |
| all_results[name]["model_path"] = str(pkl_path) | |
| print(f" Saved: {pkl_path}") | |
| out_path = out_dir / "deep_learning_results.json" | |
| with open(out_path, "w") as f: | |
| json.dump(all_results, f, indent=2) | |
| print(f"\nResults saved: {out_path}") | |
| print(f"\n{'='*60}") | |
| print(" SUMMARY") | |
| print(f"{'='*60}") | |
| for name, r in sorted(all_results.items(), key=lambda x: -x[1]["roc_auc"]): | |
| print(f" {name:35s} AUC={r['roc_auc']:.4f} Acc={r['accuracy']:.4f}") | |
| if __name__ == "__main__": | |
| main() | |