crowncode-backend / app /training /train_deep_classifiers.py
Rthur2003's picture
fix: update evaluation to use optimal threshold and adjust BCEWithLogitsLoss for class imbalance
9b6c85d
"""
Deep learning classifier comparison for AURIS.
Trains and evaluates multiple neural network architectures on
the 47 extracted audio features using stratified k-fold CV.
Architectures:
1. Deep MLP (512-256-128-64) with BatchNorm + Dropout
2. 1D-CNN on feature vector (treats features as 1D signal)
3. Residual MLP (skip connections)
4. Attention MLP (self-attention over feature groups)
Usage:
python -m app.training.train_deep_classifiers ../DataSet/features.csv
"""
from __future__ import annotations
import csv
import json
import sys
import time
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, precision_score, recall_score, roc_curve
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SEED = 42
N_FOLDS = 5
EPOCHS = 100
PATIENCE = 10
BATCH_SIZE = 64
LR = 1e-3
def _optimal_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> float:
"""Youden's J: threshold maximising sensitivity + specificity - 1."""
fpr, tpr, thresholds = roc_curve(y_true, y_prob)
j = tpr - fpr
return float(thresholds[np.argmax(j)])
def set_seed(seed: int = SEED) -> None:
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
class DeepMLP(nn.Module):
def __init__(self, n_features: int) -> None:
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_features, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.4),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x).squeeze(-1)
class Conv1DClassifier(nn.Module):
def __init__(self, n_features: int) -> None:
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=5, padding=2),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Conv1d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.AdaptiveAvgPool1d(1),
)
self.fc = nn.Sequential(
nn.Linear(128, 64),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(64, 1),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x.unsqueeze(1) # (B, 1, F)
x = self.conv(x).squeeze(-1) # (B, 128)
return self.fc(x).squeeze(-1)
class ResidualBlock(nn.Module):
def __init__(self, dim: int, dropout: float = 0.2) -> None:
super().__init__()
self.block = nn.Sequential(
nn.Linear(dim, dim),
nn.BatchNorm1d(dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(dim, dim),
nn.BatchNorm1d(dim),
)
self.relu = nn.ReLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.relu(x + self.block(x))
class ResidualMLP(nn.Module):
def __init__(self, n_features: int) -> None:
super().__init__()
self.input_proj = nn.Sequential(
nn.Linear(n_features, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
)
self.res_blocks = nn.Sequential(
ResidualBlock(256, 0.3),
ResidualBlock(256, 0.2),
ResidualBlock(256, 0.1),
)
self.head = nn.Sequential(
nn.Linear(256, 64),
nn.ReLU(),
nn.Linear(64, 1),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.input_proj(x)
x = self.res_blocks(x)
return self.head(x).squeeze(-1)
class AttentionMLP(nn.Module):
def __init__(self, n_features: int) -> None:
super().__init__()
self.proj = nn.Linear(n_features, 256)
self.attn = nn.MultiheadAttention(256, num_heads=4, batch_first=True)
self.norm = nn.LayerNorm(256)
self.head = nn.Sequential(
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 1),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.proj(x)
x = x.unsqueeze(1) # (B, 1, 256)
x_chunk = x.expand(-1, 4, -1) # (B, 4, 256) - create sequence
attn_out, _ = self.attn(x_chunk, x_chunk, x_chunk)
x = self.norm(attn_out.mean(dim=1)) # (B, 256)
return self.head(x).squeeze(-1)
def load_data(csv_path: str | Path) -> tuple[np.ndarray, np.ndarray, list[str]]:
_EXCLUDE = {"file_path", "label_int", "duration_sec", "sample_rate"}
rows, labels = [], []
with open(csv_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
feature_cols = [c for c in reader.fieldnames if c not in _EXCLUDE]
for row in reader:
vals = []
for col in feature_cols:
try:
vals.append(float(row[col]))
except (ValueError, KeyError):
vals.append(0.0)
rows.append(vals)
labels.append(int(row["label_int"]))
X = np.nan_to_num(np.array(rows, dtype=np.float32), nan=0.0)
y = np.array(labels, dtype=np.int32)
return X, y, feature_cols
def train_one_fold(
model: nn.Module,
X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
) -> tuple[float, np.ndarray]:
scaler = StandardScaler()
X_tr = scaler.fit_transform(X_train)
X_v = scaler.transform(X_val)
train_ds = TensorDataset(
torch.tensor(X_tr, dtype=torch.float32),
torch.tensor(y_train, dtype=torch.float32),
)
val_X = torch.tensor(X_v, dtype=torch.float32).to(DEVICE)
val_y = torch.tensor(y_val, dtype=torch.float32)
loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
model = model.to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="max", factor=0.5, patience=5
)
# pos_weight compensates for class imbalance (n_neg / n_pos)
n_pos = max(int(y_train.sum()), 1)
n_neg = len(y_train) - n_pos
pos_weight = torch.tensor([n_neg / n_pos], dtype=torch.float32).to(DEVICE)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
best_auc = 0.0
best_probs = None
patience_ctr = 0
for epoch in range(EPOCHS):
model.train()
for bx, by in loader:
bx, by = bx.to(DEVICE), by.to(DEVICE)
optimizer.zero_grad()
logits = model(bx)
loss = criterion(logits, by)
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
v_logits = model(val_X)
v_probs = torch.sigmoid(v_logits).cpu().numpy()
auc = roc_auc_score(y_val, v_probs)
scheduler.step(auc)
if auc > best_auc:
best_auc = auc
best_probs = v_probs.copy()
patience_ctr = 0
else:
patience_ctr += 1
if patience_ctr >= PATIENCE:
break
return best_auc, best_probs
def evaluate_cv(
model_class: type,
X: np.ndarray, y: np.ndarray,
n_features: int,
) -> dict:
cv = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED)
all_probs = np.zeros(len(y))
aucs = []
t0 = time.time()
for fold, (train_idx, val_idx) in enumerate(cv.split(X, y)):
set_seed(SEED + fold)
model = model_class(n_features)
auc, probs = train_one_fold(
model,
X[train_idx], y[train_idx],
X[val_idx], y[val_idx],
)
all_probs[val_idx] = probs
aucs.append(auc)
print(f" Fold {fold+1}: AUC={auc:.4f}")
elapsed = time.time() - t0
threshold = _optimal_threshold(y, all_probs)
y_pred = (all_probs >= threshold).astype(int)
return {
"accuracy": round(float(accuracy_score(y, y_pred)), 4),
"precision": round(float(precision_score(y, y_pred, zero_division=0)), 4),
"recall": round(float(recall_score(y, y_pred, zero_division=0)), 4),
"f1": round(float(f1_score(y, y_pred, zero_division=0)), 4),
"roc_auc": round(float(roc_auc_score(y, all_probs)), 4),
"optimal_threshold": round(threshold, 4),
"fold_aucs": [round(a, 4) for a in aucs],
"train_time_sec": round(elapsed, 1),
}
class TorchSklearnWrapper:
"""
Sklearn-compatible wrapper for trained PyTorch classifiers.
Saves model class name + state dict so it can be pickled and reloaded.
"""
def __init__(
self,
model_class: type,
n_features: int,
state_dict: dict,
scaler: StandardScaler,
) -> None:
self.model_class_name = model_class.__name__
self._model_class = model_class
self.n_features = n_features
self.state_dict = state_dict
self.scaler = scaler
self.n_features_in_ = n_features
def _build_model(self) -> nn.Module:
model = self._model_class(self.n_features)
model.load_state_dict(self.state_dict)
model.eval()
return model
def predict_proba(self, X: np.ndarray) -> np.ndarray:
model = self._build_model().to("cpu")
X_scaled = self.scaler.transform(X)
x_t = torch.tensor(X_scaled, dtype=torch.float32)
with torch.no_grad():
logits = model(x_t)
probs = torch.sigmoid(logits).numpy().flatten()
return np.column_stack([1.0 - probs, probs])
def __getstate__(self) -> dict:
state = self.__dict__.copy()
state.pop("_model_class", None)
return state
def __setstate__(self, state: dict) -> None:
self.__dict__.update(state)
# Re-attach class from global lookup
_CLASS_MAP = {
"DeepMLP": DeepMLP,
"Conv1DClassifier": Conv1DClassifier,
"ResidualMLP": ResidualMLP,
"AttentionMLP": AttentionMLP,
}
self._model_class = _CLASS_MAP.get(self.model_class_name, DeepMLP)
def train_final_model(
model_class: type,
X: np.ndarray,
y: np.ndarray,
epochs: int = EPOCHS,
patience: int = PATIENCE,
) -> TorchSklearnWrapper:
"""Train model on full dataset and return sklearn-compatible wrapper."""
from sklearn.model_selection import train_test_split
scaler = StandardScaler()
X_tr_raw, X_val_raw, y_tr, y_val = train_test_split(
X, y, test_size=0.1, stratify=y, random_state=SEED
)
X_tr = scaler.fit_transform(X_tr_raw)
X_v = scaler.transform(X_val_raw)
n_features = X.shape[1]
model = model_class(n_features).to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)
n_pos = max(int(y_tr.sum()), 1)
n_neg = len(y_tr) - n_pos
pos_weight = torch.tensor([n_neg / n_pos], dtype=torch.float32).to(DEVICE)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
loader = DataLoader(
TensorDataset(
torch.tensor(X_tr, dtype=torch.float32),
torch.tensor(y_tr, dtype=torch.float32),
),
batch_size=BATCH_SIZE,
shuffle=True,
)
val_X = torch.tensor(X_v, dtype=torch.float32).to(DEVICE)
val_y = torch.tensor(y_val, dtype=torch.float32)
best_auc = 0.0
best_state = None
patience_ctr = 0
for epoch in range(epochs):
model.train()
for bx, by in loader:
bx, by = bx.to(DEVICE), by.to(DEVICE)
optimizer.zero_grad()
criterion(model(bx), by).backward()
optimizer.step()
model.eval()
with torch.no_grad():
probs = torch.sigmoid(model(val_X)).cpu().numpy()
auc = roc_auc_score(val_y.numpy(), probs)
if auc > best_auc:
best_auc = auc
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
patience_ctr = 0
else:
patience_ctr += 1
if patience_ctr >= patience:
break
return TorchSklearnWrapper(model_class, n_features, best_state or model.state_dict(), scaler)
def _safe_name(name: str) -> str:
return name.lower().replace(" ", "_").replace("(", "").replace(")", "").replace("-", "_")
def main() -> None:
import pickle
csv_path = sys.argv[1] if len(sys.argv) > 1 else "../DataSet/features.csv"
out_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("models")
out_dir.mkdir(parents=True, exist_ok=True)
print(f"Device: {DEVICE}")
print(f"Loading: {csv_path}")
X, y, feature_cols = load_data(csv_path)
n_features = X.shape[1]
print(f"Samples: {len(y)}, Features: {n_features}")
print(f"AI: {np.sum(y == 1)}, Human: {np.sum(y == 0)}")
model_classes = {
"Deep MLP (512-256-128-64)": DeepMLP,
"1D-CNN": Conv1DClassifier,
"Residual MLP (3 blocks)": ResidualMLP,
"Attention MLP": AttentionMLP,
}
all_results = {}
for name, cls in model_classes.items():
print(f"\n{'='*60}")
print(f" {name}")
print(f"{'='*60}")
result = evaluate_cv(cls, X, y, n_features)
all_results[name] = {**result, "type": "deep_learning"}
print(f" => Acc={result['accuracy']:.4f} AUC={result['roc_auc']:.4f} "
f"F1={result['f1']:.4f} Time={result['train_time_sec']:.0f}s")
print(f" Training final model for {name}...")
wrapper = train_final_model(cls, X, y)
pkl_path = out_dir / f"model_dl_{_safe_name(name)}.pkl"
with open(pkl_path, "wb") as f:
pickle.dump(wrapper, f)
all_results[name]["model_path"] = str(pkl_path)
print(f" Saved: {pkl_path}")
out_path = out_dir / "deep_learning_results.json"
with open(out_path, "w") as f:
json.dump(all_results, f, indent=2)
print(f"\nResults saved: {out_path}")
print(f"\n{'='*60}")
print(" SUMMARY")
print(f"{'='*60}")
for name, r in sorted(all_results.items(), key=lambda x: -x[1]["roc_auc"]):
print(f" {name:35s} AUC={r['roc_auc']:.4f} Acc={r['accuracy']:.4f}")
if __name__ == "__main__":
main()