import csv import json import os import random import sys import numpy as np import joblib import torch import torch.nn as nn import torch.optim as optim from sklearn.metrics import ( classification_report, confusion_matrix, f1_score, precision_recall_fscore_support, roc_auc_score, roc_curve, ) from data_preparation.prepare_dataset import get_dataloaders, SELECTED_FEATURES _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) USE_CLEARML = os.environ.get("USE_CLEARML", "0") == "1" or bool(os.environ.get("CLEARML_TASK_ID")) CLEARML_QUEUE = os.environ.get("CLEARML_QUEUE", "") def _load_cfg(): """Build training config from config/default.yaml with fallbacks.""" try: from config import get mlp = get("mlp") or {} data = get("data") or {} ratios = data.get("split_ratios", [0.7, 0.15, 0.15]) return { "model_name": mlp.get("model_name", "face_orientation"), "epochs": mlp.get("epochs", 30), "batch_size": mlp.get("batch_size", 32), "lr": mlp.get("lr", 1e-3), "seed": mlp.get("seed", 42), "split_ratios": tuple(ratios), "hidden_sizes": mlp.get("hidden_sizes", [64, 32]), "checkpoints_dir": os.path.join(_PROJECT_ROOT, "checkpoints"), "logs_dir": os.path.join(_PROJECT_ROOT, "evaluation", "logs"), } except Exception: return { "model_name": "face_orientation", "epochs": 30, "batch_size": 32, "lr": 1e-3, "seed": 42, "split_ratios": (0.7, 0.15, 0.15), "hidden_sizes": [64, 32], "checkpoints_dir": os.path.join(_PROJECT_ROOT, "checkpoints"), "logs_dir": os.path.join(_PROJECT_ROOT, "evaluation", "logs"), } CFG = _load_cfg() # ==== ClearML: expose all config as task params, support remote execution ==== task = None if USE_CLEARML: try: from clearml import Task from config import CLEARML_PROJECT_NAME, flatten_for_clearml task = Task.init( project_name=CLEARML_PROJECT_NAME, task_name="MLP Model Training", tags=["training", "mlp_model"], ) from config.clearml_enrich import enrich_task, upload_repro_artifacts enrich_task(task, role="train_mlp") flat = flatten_for_clearml() flat["mlp/model_name"] = CFG.get("model_name", "face_orientation") flat["mlp/epochs"] = CFG.get("epochs", 30) flat["mlp/batch_size"] = CFG.get("batch_size", 32) flat["mlp/lr"] = CFG.get("lr", 1e-3) flat["mlp/seed"] = CFG.get("seed", 42) flat["mlp/hidden_sizes"] = str(CFG.get("hidden_sizes", [64, 32])) flat["mlp/split_ratios"] = str(CFG.get("split_ratios", (0.7, 0.15, 0.15))) task.connect(flat) upload_repro_artifacts(task) if CLEARML_QUEUE: print(f"[ClearML] Enqueuing to queue '{CLEARML_QUEUE}'. Agent will run training.") task.execute_remotely(queue_name=CLEARML_QUEUE) sys.exit(0) except ImportError: task = None USE_CLEARML = False # ==== Model ============================================= def set_seed(seed: int) -> None: """Set random seed for numpy, torch, and Python RNG for reproducibility.""" random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) class BaseModel(nn.Module): """MLP classifier: num_features -> hidden_sizes -> num_classes. Used for face_orientation focus.""" def __init__(self, num_features: int, num_classes: int, hidden_sizes: list[int] | None = None): super().__init__() sizes = hidden_sizes or CFG.get("hidden_sizes", [64, 32]) layers = [] prev = num_features for h in sizes: layers.extend([nn.Linear(prev, h), nn.ReLU()]) prev = h layers.append(nn.Linear(prev, num_classes)) self.network = nn.Sequential(*layers) def forward(self, x): return self.network(x) def training_step(self, loader, optimizer, criterion, device): self.train() total_loss = 0.0 correct = 0 total = 0 for features, labels in loader: features, labels = features.to(device), labels.to(device) optimizer.zero_grad() outputs = self(features) loss = criterion(outputs, labels) loss.backward() optimizer.step() total_loss += loss.item() * features.size(0) correct += (outputs.argmax(dim=1) == labels).sum().item() total += features.size(0) return total_loss / total, correct / total @torch.no_grad() def validation_step(self, loader, criterion, device): self.eval() total_loss = 0.0 correct = 0 total = 0 all_preds = [] all_labels = [] for features, labels in loader: features, labels = features.to(device), labels.to(device) outputs = self(features) loss = criterion(outputs, labels) total_loss += loss.item() * features.size(0) preds = outputs.argmax(dim=1) correct += (preds == labels).sum().item() total += features.size(0) all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) val_f1 = f1_score(np.array(all_labels), np.array(all_preds), average="weighted") return total_loss / total, correct / total, val_f1 @torch.no_grad() def test_step(self, loader, criterion, device): self.eval() total_loss = 0.0 correct = 0 total = 0 all_preds = [] all_labels = [] all_probs = [] for features, labels in loader: features, labels = features.to(device), labels.to(device) outputs = self(features) loss = criterion(outputs, labels) total_loss += loss.item() * features.size(0) preds = outputs.argmax(dim=1) correct += (preds == labels).sum().item() total += features.size(0) probs = torch.softmax(outputs, dim=1) all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) all_probs.extend(probs.cpu().numpy()) return total_loss / total, correct / total, np.array(all_probs), np.array(all_preds), np.array(all_labels) def main() -> None: """Train MLP on face_orientation features, save best checkpoint and scaler to checkpoints/.""" set_seed(CFG["seed"]) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"[TRAIN] Device: {device}") print(f"[TRAIN] Model: {CFG['model_name']}") train_loader, val_loader, test_loader, num_features, num_classes, scaler = get_dataloaders( model_name=CFG["model_name"], batch_size=CFG["batch_size"], split_ratios=CFG["split_ratios"], seed=CFG["seed"], ) model = BaseModel(num_features, num_classes, hidden_sizes=CFG.get("hidden_sizes")).to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=CFG["lr"]) param_count = sum(p.numel() for p in model.parameters()) print(f"[TRAIN] Parameters: {param_count:,}") ckpt_dir = CFG["checkpoints_dir"] os.makedirs(ckpt_dir, exist_ok=True) best_ckpt_path = os.path.join(ckpt_dir, "mlp_best.pt") history = { "model_name": CFG["model_name"], "param_count": param_count, "epochs": [], "train_loss": [], "train_acc": [], "val_loss": [], "val_acc": [], "val_f1": [], } best_val_f1 = 0.0 best_val_acc = 0.0 print(f"\n{'Epoch':>6} | {'Train Loss':>10} | {'Train Acc':>9} | {'Val Loss':>10} | {'Val Acc':>9} | {'Val F1':>8}") print("-" * 72) for epoch in range(1, CFG["epochs"] + 1): train_loss, train_acc = model.training_step(train_loader, optimizer, criterion, device) val_loss, val_acc, val_f1 = model.validation_step(val_loader, criterion, device) history["epochs"].append(epoch) history["train_loss"].append(round(train_loss, 4)) history["train_acc"].append(round(train_acc, 4)) history["val_loss"].append(round(val_loss, 4)) history["val_acc"].append(round(val_acc, 4)) history["val_f1"].append(round(val_f1, 4)) current_lr = optimizer.param_groups[0]['lr'] if task is not None: task.logger.report_scalar("Loss", "Train", float(train_loss), iteration=epoch) task.logger.report_scalar("Accuracy", "Train", float(train_acc), iteration=epoch) task.logger.report_scalar("Loss", "Val", float(val_loss), iteration=epoch) task.logger.report_scalar("Accuracy", "Val", float(val_acc), iteration=epoch) task.logger.report_scalar("F1", "Val", float(val_f1), iteration=epoch) task.logger.report_scalar("Learning Rate", "LR", float(current_lr), iteration=epoch) task.logger.flush() marker = "" if val_f1 > best_val_f1: best_val_f1 = val_f1 best_val_acc = val_acc torch.save(model.state_dict(), best_ckpt_path) marker = " *" print( f"{epoch:>6} | {train_loss:>10.4f} | {train_acc:>8.2%} | {val_loss:>10.4f} | " f"{val_acc:>8.2%} | {val_f1:>8.4f}{marker}" ) print(f"\nBest validation F1: {best_val_f1:.4f} (accuracy at best F1: {best_val_acc:.2%})") print(f"Checkpoint saved to: {best_ckpt_path}") model.load_state_dict(torch.load(best_ckpt_path, weights_only=True)) test_loss, test_acc, test_probs, test_preds, test_labels = model.test_step(test_loader, criterion, device) test_labels_np = np.asarray(test_labels) test_preds_np = np.asarray(test_preds) test_f1 = f1_score(test_labels_np, test_preds_np, average="weighted") if num_classes > 2: test_auc = roc_auc_score(test_labels_np, test_probs, multi_class="ovr", average="weighted") else: test_auc = roc_auc_score(test_labels_np, test_probs[:, 1]) print(f"\n[TEST] Loss: {test_loss:.4f} | Accuracy: {test_acc:.2%}") print(f"[TEST] F1: {test_f1:.4f} | ROC-AUC: {test_auc:.4f}") history["test_loss"] = round(test_loss, 4) history["test_acc"] = round(test_acc, 4) history["test_f1"] = round(test_f1, 4) history["test_auc"] = round(test_auc, 4) # Dataset stats for ClearML train_labels = train_loader.dataset.labels.numpy() val_labels = val_loader.dataset.labels.numpy() dataset_stats = { "train_size": len(train_loader.dataset), "val_size": len(val_loader.dataset), "test_size": len(test_loader.dataset), "train_class_counts": np.bincount(train_labels, minlength=num_classes).tolist(), "val_class_counts": np.bincount(val_labels, minlength=num_classes).tolist(), "test_class_counts": np.bincount(test_labels_np, minlength=num_classes).tolist(), } history["dataset_stats"] = dataset_stats logs_dir = CFG["logs_dir"] os.makedirs(logs_dir, exist_ok=True) log_path = os.path.join(logs_dir, f"{CFG['model_name']}_training_log.json") with open(log_path, "w") as f: json.dump(history, f, indent=2) print(f"[LOG] Training history saved to: {log_path}") scaler_path = os.path.join(ckpt_dir, "scaler_mlp.joblib") joblib.dump(scaler, scaler_path) meta_path = os.path.join(ckpt_dir, "meta_mlp.npz") np.savez(meta_path, feature_names=np.array(SELECTED_FEATURES["face_orientation"])) print(f"[LOG] Scaler and meta saved to {ckpt_dir}") cm = confusion_matrix(test_labels_np, test_preds_np) pred_csv = os.path.join(logs_dir, f"{CFG['model_name']}_test_predictions.csv") with open(pred_csv, "w", newline="") as f: w = csv.writer(f) w.writerow(["y_true", "y_pred"] + [f"prob_{j}" for j in range(num_classes)]) for i in range(len(test_labels_np)): w.writerow( [int(test_labels_np[i]), int(test_preds_np[i])] + [float(x) for x in test_probs[i]] ) summary_path = os.path.join(logs_dir, f"{CFG['model_name']}_test_metrics_summary.json") with open(summary_path, "w", encoding="utf-8") as f: json.dump( { "model": "mlp", "model_name": CFG["model_name"], "checkpoint": os.path.basename(best_ckpt_path), "test_loss": history["test_loss"], "test_accuracy": history["test_acc"], "test_f1_weighted": history["test_f1"], "test_roc_auc": history["test_auc"], "confusion_matrix": cm.tolist(), "classification_report": classification_report( test_labels_np, test_preds_np, digits=4 ), }, f, indent=2, ) print(f"[LOG] Test predictions → {pred_csv}") # ClearML: artifacts, confusion matrix, per-class metrics, registered model if task is not None: from clearml import OutputModel from config.clearml_enrich import attach_output_metrics, task_done_summary task.upload_artifact(name="mlp_checkpoint", artifact_object=best_ckpt_path) task.upload_artifact(name="training_log", artifact_object=log_path) task.upload_artifact(name="test_predictions", artifact_object=pred_csv) task.upload_artifact(name="test_metrics_summary", artifact_object=summary_path) task.upload_artifact(name="scaler_mlp", artifact_object=scaler_path) task.upload_artifact(name="meta_mlp", artifact_object=meta_path) out_model = OutputModel( task=task, name=f"MLP_{CFG['model_name']}", framework="PyTorch" ) out_model.update_weights( weights_filename=best_ckpt_path, auto_delete_file=False ) attach_output_metrics( out_model, { "test_accuracy": round(float(test_acc), 6), "test_f1_weighted": round(float(test_f1), 6), "test_roc_auc": round(float(test_auc), 6), }, ) task_done_summary( task, f"MLP {CFG['model_name']}: test acc={test_acc:.4f}, F1={test_f1:.4f}, ROC-AUC={test_auc:.4f}", ) task.logger.report_single_value("test/accuracy", test_acc) task.logger.report_single_value("test/f1_weighted", test_f1) task.logger.report_single_value("test/roc_auc", test_auc) for key, val in dataset_stats.items(): if isinstance(val, list): for i, v in enumerate(val): task.logger.report_single_value(f"dataset/{key}/{i}", float(v)) else: task.logger.report_single_value(f"dataset/{key}", float(val)) prec, rec, f1_per_class, _ = precision_recall_fscore_support( test_labels_np, test_preds_np, average=None, zero_division=0 ) for c in range(num_classes): task.logger.report_single_value(f"test/class_{c}_precision", float(prec[c])) task.logger.report_single_value(f"test/class_{c}_recall", float(rec[c])) task.logger.report_single_value(f"test/class_{c}_f1", float(f1_per_class[c])) import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(6, 5)) ax.imshow(cm, cmap="Blues") ax.set_xticks(range(num_classes)) ax.set_yticks(range(num_classes)) ax.set_xticklabels([f"Class {i}" for i in range(num_classes)]) ax.set_yticklabels([f"Class {i}" for i in range(num_classes)]) for i in range(num_classes): for j in range(num_classes): ax.text(j, i, str(cm[i, j]), ha="center", va="center", color="black") ax.set_xlabel("Predicted") ax.set_ylabel("True") ax.set_title("Test set confusion matrix") fig.tight_layout() task.logger.report_matplotlib_figure(title="Confusion Matrix", series="test", figure=fig, iteration=0) plt.close(fig) if num_classes == 2: fpr, tpr, _ = roc_curve(test_labels_np, test_probs[:, 1]) fig_r, ax_r = plt.subplots(figsize=(6, 5)) ax_r.plot(fpr, tpr, label=f"ROC-AUC = {test_auc:.4f}") ax_r.plot([0, 1], [0, 1], "k--", lw=1) ax_r.set_xlabel("False positive rate") ax_r.set_ylabel("True positive rate") ax_r.set_title("Test ROC (MLP)") ax_r.legend(loc="lower right") fig_r.tight_layout() task.logger.report_matplotlib_figure( title="ROC", series="test", figure=fig_r, iteration=0 ) plt.close(fig_r) task.logger.flush() if __name__ == "__main__": main()