""" Optuna-based Hyperparameter Optimization for TFT-ASRO. Searches across model architecture, training, and ASRO loss parameters using Tree-structured Parzen Estimator (TPE) with early pruning. Usage: python -m deep_learning.training.hyperopt --n-trials 50 """ from __future__ import annotations import argparse import json import logging import warnings from dataclasses import replace from pathlib import Path from typing import Optional import numpy as np warnings.filterwarnings( "ignore", message="X does not have valid feature names", category=UserWarning, module="sklearn", ) from deep_learning.config import ( ASROConfig, TFTASROConfig, TFTModelConfig, TrainingConfig, get_tft_config, ) logger = logging.getLogger(__name__) def create_trial_config(trial, base_cfg: TFTASROConfig) -> TFTASROConfig: """Map an Optuna trial to a TFT-ASRO configuration.""" model_cfg = TFTModelConfig( max_encoder_length=trial.suggest_int("max_encoder_length", 30, 90, step=10), max_prediction_length=base_cfg.model.max_prediction_length, # Post-MRMR pruning (~60-80 features), smaller models generalise better. # 24 is viable now that feature count dropped from 200+ to ~60-80. hidden_size=trial.suggest_int("hidden_size", 24, 48, step=8), attention_head_size=trial.suggest_int("attention_head_size", 1, 2), # Floor at 0.20: 313 samples with dropout<0.20 causes co-adaptation # and memorization (REG-2026-001). Cap at 0.35: dropout>0.35 with # small hidden_size collapses the output range. dropout=trial.suggest_float("dropout", 0.20, 0.35, step=0.05), # Paired reduction: with hidden=24-48 and ~60-80 features, # 8-16 is the sweet spot for continuous variable processing. hidden_continuous_size=trial.suggest_int("hidden_continuous_size", 8, 16, step=8), quantiles=base_cfg.model.quantiles, # Range [1e-4, 1e-3]: LR < 1e-4 produces near-zero pred_std (VR=0.14); # LR > 1e-3 causes 1-epoch divergence. This band is the stable zone. learning_rate=trial.suggest_float("learning_rate", 1e-4, 1e-3, log=True), reduce_on_plateau_patience=4, gradient_clip_val=trial.suggest_float("gradient_clip_val", 0.5, 2.0, step=0.5), weight_decay=trial.suggest_float("weight_decay", 1e-5, 1e-3, log=True), ) asro_cfg = ASROConfig( # Floor at 0.25: three Optuna runs consistently selected 0.30-0.35. # Lower values let the model collapse to near-zero pred_std. lambda_vol=trial.suggest_float("lambda_vol", 0.25, 0.45, step=0.05), # lambda_quantile is the explicit w_quantile weight (w_sharpe = 1 - w_q) # Capped at 0.40 to ensure Sharpe (directional) component always has # ≥60% weight. Higher values caused the "perfect calibration, coin-flip # direction" pathology where the model optimised volatility at the # expense of directional signal. lambda_quantile=trial.suggest_float("lambda_quantile", 0.2, 0.4, step=0.05), # MADL weight: how much the directional loss contributes relative to Sharpe. lambda_madl=trial.suggest_float("lambda_madl", 0.1, 0.5, step=0.1), risk_free_rate=0.0, ) training_cfg = TrainingConfig( # CI budget: 3h limit @ CPU-only. # 15 trials × 3 folds × 25 epochs ≈ 108 min → leaves 70 min for final trainer. # (Was 35/6, causing 3h+ timeout with 20 trials.) max_epochs=25, early_stopping_patience=4, # 16 gives 19 batches/epoch, 32 gives ~10. 64 produced only 4 # batches/epoch with noisy gradients — removed after REG-2026-001. batch_size=trial.suggest_categorical("batch_size", [16, 32]), val_ratio=base_cfg.training.val_ratio, test_ratio=base_cfg.training.test_ratio, lookback_days=base_cfg.training.lookback_days, seed=base_cfg.training.seed, num_workers=base_cfg.training.num_workers, optuna_n_trials=base_cfg.training.optuna_n_trials, checkpoint_dir=str(Path(base_cfg.training.checkpoint_dir) / f"trial_{trial.number}"), best_model_path=str(Path(base_cfg.training.checkpoint_dir) / f"trial_{trial.number}" / "best.ckpt"), ) return TFTASROConfig( embedding=base_cfg.embedding, sentiment=base_cfg.sentiment, lme=base_cfg.lme, model=model_cfg, asro=asro_cfg, training=training_cfg, feature_store=base_cfg.feature_store, ) def _objective(trial, base_cfg: TFTASROConfig, master_data: tuple) -> float: """ Single Optuna trial with Walk-Forward k-Fold Temporal CV. Each trial trains k models (one per fold) and returns the mean composite score. This prevents overfitting to a single validation window — the core structural issue identified in REG-2026-001. Composite score per fold (lower is better): fold_score = val_loss + vr_penalty Final score: mean(fold_scores) + consistency_penalty + da_penalty After each fold, an intermediate score is reported to Optuna so the MedianPruner can kill clearly-bad trials early (after 1 fold instead of waiting for all 3). """ try: import lightning.pytorch as pl from lightning.pytorch.callbacks import EarlyStopping except ImportError: import pytorch_lightning as pl # type: ignore[no-redef] from pytorch_lightning.callbacks import EarlyStopping # type: ignore[no-redef] import optuna import numpy as np import torch from deep_learning.data.dataset import build_cv_folds, create_dataloaders from deep_learning.models.tft_copper import create_tft_model trial_cfg = create_trial_config(trial, base_cfg) master_df, tv_unknown, tv_known, target_cols, _ = master_data n_folds = getattr(trial_cfg.training, "cv_n_folds", 3) try: cv_folds = build_cv_folds( master_df, tv_unknown, tv_known, target_cols, trial_cfg, n_folds=n_folds, ) except Exception as exc: logger.warning("Trial %d CV fold creation failed: %s", trial.number, exc) return float("inf") fold_scores: list[float] = [] fold_da_list: list[float] = [] fold_sharpe_list: list[float] = [] fold_vr_list: list[float] = [] for fold_idx, (fold_train_ds, fold_val_ds) in enumerate(cv_folds): # ---- setup ---- try: fold_train_dl, fold_val_dl, _ = create_dataloaders( fold_train_ds, fold_val_ds, cfg=trial_cfg, ) model = create_tft_model(fold_train_ds, trial_cfg, use_asro=True) except Exception as exc: logger.warning( "Trial %d fold %d setup failed: %s", trial.number, fold_idx, exc, ) return float("inf") callbacks = [ EarlyStopping( monitor="val_loss", patience=trial_cfg.training.early_stopping_patience, mode="min", ), ] ckpt_dir = Path(trial_cfg.training.checkpoint_dir) / f"fold_{fold_idx}" ckpt_dir.mkdir(parents=True, exist_ok=True) trainer = pl.Trainer( max_epochs=trial_cfg.training.max_epochs, accelerator="auto", gradient_clip_val=trial_cfg.model.gradient_clip_val, callbacks=callbacks, enable_progress_bar=False, enable_model_summary=False, log_every_n_steps=20, ) # ---- train ---- try: trainer.fit(model, train_dataloaders=fold_train_dl, val_dataloaders=fold_val_dl) except Exception as exc: logger.warning("Trial %d fold %d training failed: %s", trial.number, fold_idx, exc) return float("inf") val_loss = trainer.callback_metrics.get("val_loss") if val_loss is None: return float("inf") fold_val_loss = float(val_loss) # ---- per-fold metrics ---- fold_vr_penalty = 0.0 fold_da = 0.5 fold_sharpe = 0.0 fold_vr = 0.0 try: pred_tensor = model.predict(fold_val_dl, mode="quantiles") if hasattr(pred_tensor, "cpu"): pred_np = pred_tensor.cpu().numpy() else: pred_np = np.array(pred_tensor) median_idx = len(trial_cfg.model.quantiles) // 2 y_pred = pred_np[:, 0, median_idx] if pred_np.ndim == 3 else pred_np.flatten() y_actual_parts = [] for batch in fold_val_dl: y_actual_parts.append( batch[1][0] if isinstance(batch[1], (list, tuple)) else batch[1] ) y_actual = torch.cat(y_actual_parts).cpu().numpy().flatten() fn = min(len(y_actual), len(y_pred)) pred_std = float(y_pred[:fn].std()) actual_std = float(y_actual[:fn].std()) fold_vr = pred_std / actual_std if actual_std > 1e-9 else 0.0 if fold_vr < 0.5: fold_vr_penalty = 2.0 * (1.0 - fold_vr / 0.5) elif fold_vr > 1.5: fold_vr_penalty = 0.5 * (fold_vr - 1.5) pred_sign = np.sign(y_pred[:fn]) actual_sign = np.sign(y_actual[:fn]) fold_da = float(np.mean(pred_sign == actual_sign)) strategy_returns = np.sign(y_pred[:fn]) * y_actual[:fn] sr_mean = float(strategy_returns.mean()) sr_std = float(strategy_returns.std()) + 1e-9 fold_sharpe = sr_mean / sr_std except Exception as exc: logger.debug( "Trial %d fold %d metrics failed: %s", trial.number, fold_idx, exc ) fold_vr_list.append(fold_vr) fold_da_list.append(fold_da) fold_sharpe_list.append(fold_sharpe) # Incorporate DA directly into fold_score as a reward (not just penalty). # DA > 50% (coin-flip) is rewarded, < 50% penalised. # This ensures the hyperopt objective actively selects for directional # accuracy, not just low calibration loss. da_baseline = 0.50 da_adjustment = (fold_da - da_baseline) * 2.0 # reward when DA > 50%, penalty when < 50% fold_score = fold_val_loss + fold_vr_penalty - da_adjustment fold_scores.append(fold_score) logger.debug( "Trial %d fold %d/%d: val_loss=%.4f vr=%.3f da=%.1f%% sharpe=%.4f", trial.number, fold_idx + 1, n_folds, fold_val_loss, fold_vr, fold_da * 100, fold_sharpe, ) # Per-fold Sharpe pruning: if a fold has deeply negative Sharpe, # the trial is systematically predicting the wrong direction for # that market regime — no point continuing to subsequent folds. if fold_sharpe < -0.5 and fold_idx >= 1: logger.warning( "Trial %d PRUNED at fold %d: fold_sharpe=%.4f < -0.5", trial.number, fold_idx + 1, fold_sharpe, ) raise optuna.exceptions.TrialPruned() # Report running average so MedianPruner can kill bad trials early running_avg = float(np.mean(fold_scores)) trial.report(running_avg, fold_idx) if trial.should_prune(): raise optuna.exceptions.TrialPruned() # Free GPU memory between folds del model, trainer if torch.cuda.is_available(): torch.cuda.empty_cache() # ---- cross-fold aggregation ---- avg_score = float(np.mean(fold_scores)) avg_da = float(np.mean(fold_da_list)) if fold_da_list else 0.5 avg_sharpe = float(np.mean(fold_sharpe_list)) if fold_sharpe_list else 0.0 avg_vr = float(np.mean(fold_vr_list)) if fold_vr_list else 0.0 # High fold-score variance = trial is unreliable (works in one regime, fails in another) consistency_penalty = ( float(np.std(fold_scores)) * 0.5 if len(fold_scores) > 1 else 0.0 ) trial.set_user_attr("avg_variance_ratio", round(avg_vr, 4)) trial.set_user_attr("avg_directional_accuracy", round(avg_da, 4)) trial.set_user_attr("avg_val_sharpe", round(avg_sharpe, 4)) trial.set_user_attr( "fold_score_std", round(float(np.std(fold_scores)) if len(fold_scores) > 1 else 0.0, 4), ) # Hard prune: avg Sharpe negative across folds = systematically wrong if avg_sharpe < 0.0: logger.warning( "Trial %d PRUNED: avg_sharpe=%.4f < 0 across %d folds (DA=%.1f%%)", trial.number, avg_sharpe, n_folds, avg_da * 100, ) raise optuna.exceptions.TrialPruned() # Soft penalty: avg DA below coin-flip da_penalty = 2.0 * max(0.0, 0.50 - avg_da) if avg_da < 0.50 else 0.0 final_score = avg_score + consistency_penalty + da_penalty logger.info( "Trial %d [%d-fold CV]: avg_score=%.4f consistency=%.4f " "da_penalty=%.4f → final=%.4f | DA=%.1f%% Sharpe=%.3f VR=%.3f", trial.number, n_folds, avg_score, consistency_penalty, da_penalty, final_score, avg_da * 100, avg_sharpe, avg_vr, ) return final_score def run_hyperopt( base_cfg: Optional[TFTASROConfig] = None, n_trials: int = 50, study_name: str = "tft_asro_optuna", storage: Optional[str] = None, ) -> dict: """ Launch Optuna hyperparameter search. Returns: Dict with best params, best value, and study summary. """ import optuna try: import lightning.pytorch as pl except ImportError: import pytorch_lightning as pl # type: ignore[no-redef] from app.db import SessionLocal, init_db from deep_learning.data.feature_store import build_tft_dataframe if base_cfg is None: base_cfg = get_tft_config() init_db() pl.seed_everything(base_cfg.training.seed) logger.info("Building feature store for hyperopt ...") with SessionLocal() as session: master_data = build_tft_dataframe(session, base_cfg) study = optuna.create_study( study_name=study_name, direction="minimize", storage=storage, load_if_exists=True, pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=5), ) study.optimize( lambda trial: _objective(trial, base_cfg, master_data), n_trials=n_trials, show_progress_bar=True, ) best = study.best_trial logger.info("Optuna best trial #%d: val_loss=%.6f", best.number, best.value) logger.info("Best params: %s", best.params) # Save alongside best_tft_asro.ckpt (tft/ root) so upload_tft_artifacts picks it up. results_path = Path(base_cfg.training.best_model_path).parent / "optuna_results.json" results_path.parent.mkdir(parents=True, exist_ok=True) results_path.write_text(json.dumps({ "best_trial": best.number, "best_value": best.value, "best_params": best.params, "n_trials": len(study.trials), }, indent=2)) return { "best_trial": best.number, "best_value": best.value, "best_params": best.params, "n_trials": len(study.trials), } # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") parser = argparse.ArgumentParser(description="TFT-ASRO hyperparameter optimisation") parser.add_argument("--n-trials", type=int, default=50) parser.add_argument("--study-name", default="tft_asro_optuna") args = parser.parse_args() result = run_hyperopt(n_trials=args.n_trials, study_name=args.study_name) print("\n" + "=" * 60) print("HYPEROPT COMPLETE") print("=" * 60) print(f"Best trial: #{result['best_trial']}") print(f"Best val_loss: {result['best_value']:.6f}") for k, v in result["best_params"].items(): print(f" {k}: {v}")