""" Deep learning classifier comparison for AURIS. Trains and evaluates multiple neural network architectures on the 47 extracted audio features using stratified k-fold CV. Architectures: 1. Deep MLP (512-256-128-64) with BatchNorm + Dropout 2. 1D-CNN on feature vector (treats features as 1D signal) 3. Residual MLP (skip connections) 4. Attention MLP (self-attention over feature groups) Usage: python -m app.training.train_deep_classifiers ../DataSet/features.csv """ from __future__ import annotations import csv import json import sys import time from pathlib import Path import numpy as np import torch import torch.nn as nn from torch.utils.data import DataLoader, TensorDataset from sklearn.model_selection import StratifiedKFold from sklearn.preprocessing import StandardScaler from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, precision_score, recall_score, roc_curve DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") SEED = 42 N_FOLDS = 5 EPOCHS = 100 PATIENCE = 10 BATCH_SIZE = 64 LR = 1e-3 def _optimal_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> float: """Youden's J: threshold maximising sensitivity + specificity - 1.""" fpr, tpr, thresholds = roc_curve(y_true, y_prob) j = tpr - fpr return float(thresholds[np.argmax(j)]) def set_seed(seed: int = SEED) -> None: np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) class DeepMLP(nn.Module): def __init__(self, n_features: int) -> None: super().__init__() self.net = nn.Sequential( nn.Linear(n_features, 512), nn.BatchNorm1d(512), nn.ReLU(), nn.Dropout(0.4), nn.Linear(512, 256), nn.BatchNorm1d(256), nn.ReLU(), nn.Dropout(0.3), nn.Linear(256, 128), nn.BatchNorm1d(128), nn.ReLU(), nn.Dropout(0.2), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 1), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x).squeeze(-1) class Conv1DClassifier(nn.Module): def __init__(self, n_features: int) -> None: super().__init__() self.conv = nn.Sequential( nn.Conv1d(1, 64, kernel_size=5, padding=2), nn.BatchNorm1d(64), nn.ReLU(), nn.Conv1d(64, 128, kernel_size=3, padding=1), nn.BatchNorm1d(128), nn.ReLU(), nn.AdaptiveAvgPool1d(1), ) self.fc = nn.Sequential( nn.Linear(128, 64), nn.ReLU(), nn.Dropout(0.3), nn.Linear(64, 1), ) def forward(self, x: torch.Tensor) -> torch.Tensor: x = x.unsqueeze(1) # (B, 1, F) x = self.conv(x).squeeze(-1) # (B, 128) return self.fc(x).squeeze(-1) class ResidualBlock(nn.Module): def __init__(self, dim: int, dropout: float = 0.2) -> None: super().__init__() self.block = nn.Sequential( nn.Linear(dim, dim), nn.BatchNorm1d(dim), nn.ReLU(), nn.Dropout(dropout), nn.Linear(dim, dim), nn.BatchNorm1d(dim), ) self.relu = nn.ReLU() def forward(self, x: torch.Tensor) -> torch.Tensor: return self.relu(x + self.block(x)) class ResidualMLP(nn.Module): def __init__(self, n_features: int) -> None: super().__init__() self.input_proj = nn.Sequential( nn.Linear(n_features, 256), nn.BatchNorm1d(256), nn.ReLU(), ) self.res_blocks = nn.Sequential( ResidualBlock(256, 0.3), ResidualBlock(256, 0.2), ResidualBlock(256, 0.1), ) self.head = nn.Sequential( nn.Linear(256, 64), nn.ReLU(), nn.Linear(64, 1), ) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.input_proj(x) x = self.res_blocks(x) return self.head(x).squeeze(-1) class AttentionMLP(nn.Module): def __init__(self, n_features: int) -> None: super().__init__() self.proj = nn.Linear(n_features, 256) self.attn = nn.MultiheadAttention(256, num_heads=4, batch_first=True) self.norm = nn.LayerNorm(256) self.head = nn.Sequential( nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.3), nn.Linear(128, 1), ) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.proj(x) x = x.unsqueeze(1) # (B, 1, 256) x_chunk = x.expand(-1, 4, -1) # (B, 4, 256) - create sequence attn_out, _ = self.attn(x_chunk, x_chunk, x_chunk) x = self.norm(attn_out.mean(dim=1)) # (B, 256) return self.head(x).squeeze(-1) def load_data(csv_path: str | Path) -> tuple[np.ndarray, np.ndarray, list[str]]: _EXCLUDE = {"file_path", "label_int", "duration_sec", "sample_rate"} rows, labels = [], [] with open(csv_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) feature_cols = [c for c in reader.fieldnames if c not in _EXCLUDE] for row in reader: vals = [] for col in feature_cols: try: vals.append(float(row[col])) except (ValueError, KeyError): vals.append(0.0) rows.append(vals) labels.append(int(row["label_int"])) X = np.nan_to_num(np.array(rows, dtype=np.float32), nan=0.0) y = np.array(labels, dtype=np.int32) return X, y, feature_cols def train_one_fold( model: nn.Module, X_train: np.ndarray, y_train: np.ndarray, X_val: np.ndarray, y_val: np.ndarray, ) -> tuple[float, np.ndarray]: scaler = StandardScaler() X_tr = scaler.fit_transform(X_train) X_v = scaler.transform(X_val) train_ds = TensorDataset( torch.tensor(X_tr, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32), ) val_X = torch.tensor(X_v, dtype=torch.float32).to(DEVICE) val_y = torch.tensor(y_val, dtype=torch.float32) loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True) model = model.to(DEVICE) optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode="max", factor=0.5, patience=5 ) # pos_weight compensates for class imbalance (n_neg / n_pos) n_pos = max(int(y_train.sum()), 1) n_neg = len(y_train) - n_pos pos_weight = torch.tensor([n_neg / n_pos], dtype=torch.float32).to(DEVICE) criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) best_auc = 0.0 best_probs = None patience_ctr = 0 for epoch in range(EPOCHS): model.train() for bx, by in loader: bx, by = bx.to(DEVICE), by.to(DEVICE) optimizer.zero_grad() logits = model(bx) loss = criterion(logits, by) loss.backward() optimizer.step() model.eval() with torch.no_grad(): v_logits = model(val_X) v_probs = torch.sigmoid(v_logits).cpu().numpy() auc = roc_auc_score(y_val, v_probs) scheduler.step(auc) if auc > best_auc: best_auc = auc best_probs = v_probs.copy() patience_ctr = 0 else: patience_ctr += 1 if patience_ctr >= PATIENCE: break return best_auc, best_probs def evaluate_cv( model_class: type, X: np.ndarray, y: np.ndarray, n_features: int, ) -> dict: cv = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED) all_probs = np.zeros(len(y)) aucs = [] t0 = time.time() for fold, (train_idx, val_idx) in enumerate(cv.split(X, y)): set_seed(SEED + fold) model = model_class(n_features) auc, probs = train_one_fold( model, X[train_idx], y[train_idx], X[val_idx], y[val_idx], ) all_probs[val_idx] = probs aucs.append(auc) print(f" Fold {fold+1}: AUC={auc:.4f}") elapsed = time.time() - t0 threshold = _optimal_threshold(y, all_probs) y_pred = (all_probs >= threshold).astype(int) return { "accuracy": round(float(accuracy_score(y, y_pred)), 4), "precision": round(float(precision_score(y, y_pred, zero_division=0)), 4), "recall": round(float(recall_score(y, y_pred, zero_division=0)), 4), "f1": round(float(f1_score(y, y_pred, zero_division=0)), 4), "roc_auc": round(float(roc_auc_score(y, all_probs)), 4), "optimal_threshold": round(threshold, 4), "fold_aucs": [round(a, 4) for a in aucs], "train_time_sec": round(elapsed, 1), } class TorchSklearnWrapper: """ Sklearn-compatible wrapper for trained PyTorch classifiers. Saves model class name + state dict so it can be pickled and reloaded. """ def __init__( self, model_class: type, n_features: int, state_dict: dict, scaler: StandardScaler, ) -> None: self.model_class_name = model_class.__name__ self._model_class = model_class self.n_features = n_features self.state_dict = state_dict self.scaler = scaler self.n_features_in_ = n_features def _build_model(self) -> nn.Module: model = self._model_class(self.n_features) model.load_state_dict(self.state_dict) model.eval() return model def predict_proba(self, X: np.ndarray) -> np.ndarray: model = self._build_model().to("cpu") X_scaled = self.scaler.transform(X) x_t = torch.tensor(X_scaled, dtype=torch.float32) with torch.no_grad(): logits = model(x_t) probs = torch.sigmoid(logits).numpy().flatten() return np.column_stack([1.0 - probs, probs]) def __getstate__(self) -> dict: state = self.__dict__.copy() state.pop("_model_class", None) return state def __setstate__(self, state: dict) -> None: self.__dict__.update(state) # Re-attach class from global lookup _CLASS_MAP = { "DeepMLP": DeepMLP, "Conv1DClassifier": Conv1DClassifier, "ResidualMLP": ResidualMLP, "AttentionMLP": AttentionMLP, } self._model_class = _CLASS_MAP.get(self.model_class_name, DeepMLP) def train_final_model( model_class: type, X: np.ndarray, y: np.ndarray, epochs: int = EPOCHS, patience: int = PATIENCE, ) -> TorchSklearnWrapper: """Train model on full dataset and return sklearn-compatible wrapper.""" from sklearn.model_selection import train_test_split scaler = StandardScaler() X_tr_raw, X_val_raw, y_tr, y_val = train_test_split( X, y, test_size=0.1, stratify=y, random_state=SEED ) X_tr = scaler.fit_transform(X_tr_raw) X_v = scaler.transform(X_val_raw) n_features = X.shape[1] model = model_class(n_features).to(DEVICE) optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4) n_pos = max(int(y_tr.sum()), 1) n_neg = len(y_tr) - n_pos pos_weight = torch.tensor([n_neg / n_pos], dtype=torch.float32).to(DEVICE) criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) loader = DataLoader( TensorDataset( torch.tensor(X_tr, dtype=torch.float32), torch.tensor(y_tr, dtype=torch.float32), ), batch_size=BATCH_SIZE, shuffle=True, ) val_X = torch.tensor(X_v, dtype=torch.float32).to(DEVICE) val_y = torch.tensor(y_val, dtype=torch.float32) best_auc = 0.0 best_state = None patience_ctr = 0 for epoch in range(epochs): model.train() for bx, by in loader: bx, by = bx.to(DEVICE), by.to(DEVICE) optimizer.zero_grad() criterion(model(bx), by).backward() optimizer.step() model.eval() with torch.no_grad(): probs = torch.sigmoid(model(val_X)).cpu().numpy() auc = roc_auc_score(val_y.numpy(), probs) if auc > best_auc: best_auc = auc best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} patience_ctr = 0 else: patience_ctr += 1 if patience_ctr >= patience: break return TorchSklearnWrapper(model_class, n_features, best_state or model.state_dict(), scaler) def _safe_name(name: str) -> str: return name.lower().replace(" ", "_").replace("(", "").replace(")", "").replace("-", "_") def main() -> None: import pickle csv_path = sys.argv[1] if len(sys.argv) > 1 else "../DataSet/features.csv" out_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("models") out_dir.mkdir(parents=True, exist_ok=True) print(f"Device: {DEVICE}") print(f"Loading: {csv_path}") X, y, feature_cols = load_data(csv_path) n_features = X.shape[1] print(f"Samples: {len(y)}, Features: {n_features}") print(f"AI: {np.sum(y == 1)}, Human: {np.sum(y == 0)}") model_classes = { "Deep MLP (512-256-128-64)": DeepMLP, "1D-CNN": Conv1DClassifier, "Residual MLP (3 blocks)": ResidualMLP, "Attention MLP": AttentionMLP, } all_results = {} for name, cls in model_classes.items(): print(f"\n{'='*60}") print(f" {name}") print(f"{'='*60}") result = evaluate_cv(cls, X, y, n_features) all_results[name] = {**result, "type": "deep_learning"} print(f" => Acc={result['accuracy']:.4f} AUC={result['roc_auc']:.4f} " f"F1={result['f1']:.4f} Time={result['train_time_sec']:.0f}s") print(f" Training final model for {name}...") wrapper = train_final_model(cls, X, y) pkl_path = out_dir / f"model_dl_{_safe_name(name)}.pkl" with open(pkl_path, "wb") as f: pickle.dump(wrapper, f) all_results[name]["model_path"] = str(pkl_path) print(f" Saved: {pkl_path}") out_path = out_dir / "deep_learning_results.json" with open(out_path, "w") as f: json.dump(all_results, f, indent=2) print(f"\nResults saved: {out_path}") print(f"\n{'='*60}") print(" SUMMARY") print(f"{'='*60}") for name, r in sorted(all_results.items(), key=lambda x: -x[1]["roc_auc"]): print(f" {name:35s} AUC={r['roc_auc']:.4f} Acc={r['accuracy']:.4f}") if __name__ == "__main__": main()