Spaces:
Running
Running
| """ | |
| Optuna-based Hyperparameter Optimization for TFT-ASRO. | |
| Searches across model architecture, training, and ASRO loss parameters | |
| using Tree-structured Parzen Estimator (TPE) with early pruning. | |
| Usage: | |
| python -m deep_learning.training.hyperopt --n-trials 50 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import warnings | |
| from dataclasses import replace | |
| from pathlib import Path | |
| from typing import Optional | |
| import numpy as np | |
| warnings.filterwarnings( | |
| "ignore", | |
| message="X does not have valid feature names", | |
| category=UserWarning, | |
| module="sklearn", | |
| ) | |
| from deep_learning.config import ( | |
| ASROConfig, | |
| TFTASROConfig, | |
| TFTModelConfig, | |
| TrainingConfig, | |
| get_tft_config, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def create_trial_config(trial, base_cfg: TFTASROConfig) -> TFTASROConfig: | |
| """Map an Optuna trial to a TFT-ASRO configuration.""" | |
| model_cfg = TFTModelConfig( | |
| max_encoder_length=trial.suggest_int("max_encoder_length", 30, 90, step=10), | |
| max_prediction_length=base_cfg.model.max_prediction_length, | |
| # Post-MRMR pruning (~60-80 features), smaller models generalise better. | |
| # 24 is viable now that feature count dropped from 200+ to ~60-80. | |
| hidden_size=trial.suggest_int("hidden_size", 24, 48, step=8), | |
| attention_head_size=trial.suggest_int("attention_head_size", 1, 2), | |
| # Floor at 0.20: 313 samples with dropout<0.20 causes co-adaptation | |
| # and memorization (REG-2026-001). Cap at 0.35: dropout>0.35 with | |
| # small hidden_size collapses the output range. | |
| dropout=trial.suggest_float("dropout", 0.20, 0.35, step=0.05), | |
| # Paired reduction: with hidden=24-48 and ~60-80 features, | |
| # 8-16 is the sweet spot for continuous variable processing. | |
| hidden_continuous_size=trial.suggest_int("hidden_continuous_size", 8, 16, step=8), | |
| quantiles=base_cfg.model.quantiles, | |
| # Range [1e-4, 1e-3]: LR < 1e-4 produces near-zero pred_std (VR=0.14); | |
| # LR > 1e-3 causes 1-epoch divergence. This band is the stable zone. | |
| learning_rate=trial.suggest_float("learning_rate", 1e-4, 1e-3, log=True), | |
| reduce_on_plateau_patience=4, | |
| gradient_clip_val=trial.suggest_float("gradient_clip_val", 0.5, 2.0, step=0.5), | |
| weight_decay=trial.suggest_float("weight_decay", 1e-5, 1e-3, log=True), | |
| ) | |
| asro_cfg = ASROConfig( | |
| # Floor at 0.25: three Optuna runs consistently selected 0.30-0.35. | |
| # Lower values let the model collapse to near-zero pred_std. | |
| lambda_vol=trial.suggest_float("lambda_vol", 0.25, 0.45, step=0.05), | |
| # lambda_quantile is the explicit w_quantile weight (w_sharpe = 1 - w_q) | |
| # Capped at 0.40 to ensure Sharpe (directional) component always has | |
| # ≥60% weight. Higher values caused the "perfect calibration, coin-flip | |
| # direction" pathology where the model optimised volatility at the | |
| # expense of directional signal. | |
| lambda_quantile=trial.suggest_float("lambda_quantile", 0.2, 0.4, step=0.05), | |
| # MADL weight: how much the directional loss contributes relative to Sharpe. | |
| lambda_madl=trial.suggest_float("lambda_madl", 0.1, 0.5, step=0.1), | |
| risk_free_rate=0.0, | |
| ) | |
| training_cfg = TrainingConfig( | |
| # CI budget: 3h limit @ CPU-only. | |
| # 15 trials × 3 folds × 25 epochs ≈ 108 min → leaves 70 min for final trainer. | |
| # (Was 35/6, causing 3h+ timeout with 20 trials.) | |
| max_epochs=25, | |
| early_stopping_patience=4, | |
| # 16 gives 19 batches/epoch, 32 gives ~10. 64 produced only 4 | |
| # batches/epoch with noisy gradients — removed after REG-2026-001. | |
| batch_size=trial.suggest_categorical("batch_size", [16, 32]), | |
| val_ratio=base_cfg.training.val_ratio, | |
| test_ratio=base_cfg.training.test_ratio, | |
| lookback_days=base_cfg.training.lookback_days, | |
| seed=base_cfg.training.seed, | |
| num_workers=base_cfg.training.num_workers, | |
| optuna_n_trials=base_cfg.training.optuna_n_trials, | |
| checkpoint_dir=str(Path(base_cfg.training.checkpoint_dir) / f"trial_{trial.number}"), | |
| best_model_path=str(Path(base_cfg.training.checkpoint_dir) / f"trial_{trial.number}" / "best.ckpt"), | |
| ) | |
| return TFTASROConfig( | |
| embedding=base_cfg.embedding, | |
| sentiment=base_cfg.sentiment, | |
| lme=base_cfg.lme, | |
| model=model_cfg, | |
| asro=asro_cfg, | |
| training=training_cfg, | |
| feature_store=base_cfg.feature_store, | |
| ) | |
| def _objective(trial, base_cfg: TFTASROConfig, master_data: tuple) -> float: | |
| """ | |
| Single Optuna trial with Walk-Forward k-Fold Temporal CV. | |
| Each trial trains k models (one per fold) and returns the mean | |
| composite score. This prevents overfitting to a single validation | |
| window — the core structural issue identified in REG-2026-001. | |
| Composite score per fold (lower is better): | |
| fold_score = val_loss + vr_penalty | |
| Final score: | |
| mean(fold_scores) + consistency_penalty + da_penalty | |
| After each fold, an intermediate score is reported to Optuna so | |
| the MedianPruner can kill clearly-bad trials early (after 1 fold | |
| instead of waiting for all 3). | |
| """ | |
| try: | |
| import lightning.pytorch as pl | |
| from lightning.pytorch.callbacks import EarlyStopping | |
| except ImportError: | |
| import pytorch_lightning as pl # type: ignore[no-redef] | |
| from pytorch_lightning.callbacks import EarlyStopping # type: ignore[no-redef] | |
| import optuna | |
| import numpy as np | |
| import torch | |
| from deep_learning.data.dataset import build_cv_folds, create_dataloaders | |
| from deep_learning.models.tft_copper import create_tft_model | |
| trial_cfg = create_trial_config(trial, base_cfg) | |
| master_df, tv_unknown, tv_known, target_cols, _ = master_data | |
| n_folds = getattr(trial_cfg.training, "cv_n_folds", 3) | |
| try: | |
| cv_folds = build_cv_folds( | |
| master_df, tv_unknown, tv_known, target_cols, | |
| trial_cfg, n_folds=n_folds, | |
| ) | |
| except Exception as exc: | |
| logger.warning("Trial %d CV fold creation failed: %s", trial.number, exc) | |
| return float("inf") | |
| fold_scores: list[float] = [] | |
| fold_da_list: list[float] = [] | |
| fold_sharpe_list: list[float] = [] | |
| fold_vr_list: list[float] = [] | |
| for fold_idx, (fold_train_ds, fold_val_ds) in enumerate(cv_folds): | |
| # ---- setup ---- | |
| try: | |
| fold_train_dl, fold_val_dl, _ = create_dataloaders( | |
| fold_train_ds, fold_val_ds, cfg=trial_cfg, | |
| ) | |
| model = create_tft_model(fold_train_ds, trial_cfg, use_asro=True) | |
| except Exception as exc: | |
| logger.warning( | |
| "Trial %d fold %d setup failed: %s", | |
| trial.number, fold_idx, exc, | |
| ) | |
| return float("inf") | |
| callbacks = [ | |
| EarlyStopping( | |
| monitor="val_loss", | |
| patience=trial_cfg.training.early_stopping_patience, | |
| mode="min", | |
| ), | |
| ] | |
| ckpt_dir = Path(trial_cfg.training.checkpoint_dir) / f"fold_{fold_idx}" | |
| ckpt_dir.mkdir(parents=True, exist_ok=True) | |
| trainer = pl.Trainer( | |
| max_epochs=trial_cfg.training.max_epochs, | |
| accelerator="auto", | |
| gradient_clip_val=trial_cfg.model.gradient_clip_val, | |
| callbacks=callbacks, | |
| enable_progress_bar=False, | |
| enable_model_summary=False, | |
| log_every_n_steps=20, | |
| ) | |
| # ---- train ---- | |
| try: | |
| trainer.fit(model, train_dataloaders=fold_train_dl, val_dataloaders=fold_val_dl) | |
| except Exception as exc: | |
| logger.warning("Trial %d fold %d training failed: %s", trial.number, fold_idx, exc) | |
| return float("inf") | |
| val_loss = trainer.callback_metrics.get("val_loss") | |
| if val_loss is None: | |
| return float("inf") | |
| fold_val_loss = float(val_loss) | |
| # ---- per-fold metrics ---- | |
| fold_vr_penalty = 0.0 | |
| fold_da = 0.5 | |
| fold_sharpe = 0.0 | |
| fold_vr = 0.0 | |
| try: | |
| pred_tensor = model.predict(fold_val_dl, mode="quantiles") | |
| if hasattr(pred_tensor, "cpu"): | |
| pred_np = pred_tensor.cpu().numpy() | |
| else: | |
| pred_np = np.array(pred_tensor) | |
| median_idx = len(trial_cfg.model.quantiles) // 2 | |
| y_pred = pred_np[:, 0, median_idx] if pred_np.ndim == 3 else pred_np.flatten() | |
| y_actual_parts = [] | |
| for batch in fold_val_dl: | |
| y_actual_parts.append( | |
| batch[1][0] if isinstance(batch[1], (list, tuple)) else batch[1] | |
| ) | |
| y_actual = torch.cat(y_actual_parts).cpu().numpy().flatten() | |
| fn = min(len(y_actual), len(y_pred)) | |
| pred_std = float(y_pred[:fn].std()) | |
| actual_std = float(y_actual[:fn].std()) | |
| fold_vr = pred_std / actual_std if actual_std > 1e-9 else 0.0 | |
| if fold_vr < 0.5: | |
| fold_vr_penalty = 2.0 * (1.0 - fold_vr / 0.5) | |
| elif fold_vr > 1.5: | |
| fold_vr_penalty = 0.5 * (fold_vr - 1.5) | |
| pred_sign = np.sign(y_pred[:fn]) | |
| actual_sign = np.sign(y_actual[:fn]) | |
| fold_da = float(np.mean(pred_sign == actual_sign)) | |
| strategy_returns = np.sign(y_pred[:fn]) * y_actual[:fn] | |
| sr_mean = float(strategy_returns.mean()) | |
| sr_std = float(strategy_returns.std()) + 1e-9 | |
| fold_sharpe = sr_mean / sr_std | |
| except Exception as exc: | |
| logger.debug( | |
| "Trial %d fold %d metrics failed: %s", trial.number, fold_idx, exc | |
| ) | |
| fold_vr_list.append(fold_vr) | |
| fold_da_list.append(fold_da) | |
| fold_sharpe_list.append(fold_sharpe) | |
| # Incorporate DA directly into fold_score as a reward (not just penalty). | |
| # DA > 50% (coin-flip) is rewarded, < 50% penalised. | |
| # This ensures the hyperopt objective actively selects for directional | |
| # accuracy, not just low calibration loss. | |
| da_baseline = 0.50 | |
| da_adjustment = (fold_da - da_baseline) * 2.0 # reward when DA > 50%, penalty when < 50% | |
| fold_score = fold_val_loss + fold_vr_penalty - da_adjustment | |
| fold_scores.append(fold_score) | |
| logger.debug( | |
| "Trial %d fold %d/%d: val_loss=%.4f vr=%.3f da=%.1f%% sharpe=%.4f", | |
| trial.number, fold_idx + 1, n_folds, | |
| fold_val_loss, fold_vr, fold_da * 100, fold_sharpe, | |
| ) | |
| # Per-fold Sharpe pruning: if a fold has deeply negative Sharpe, | |
| # the trial is systematically predicting the wrong direction for | |
| # that market regime — no point continuing to subsequent folds. | |
| if fold_sharpe < -0.5 and fold_idx >= 1: | |
| logger.warning( | |
| "Trial %d PRUNED at fold %d: fold_sharpe=%.4f < -0.5", | |
| trial.number, fold_idx + 1, fold_sharpe, | |
| ) | |
| raise optuna.exceptions.TrialPruned() | |
| # Report running average so MedianPruner can kill bad trials early | |
| running_avg = float(np.mean(fold_scores)) | |
| trial.report(running_avg, fold_idx) | |
| if trial.should_prune(): | |
| raise optuna.exceptions.TrialPruned() | |
| # Free GPU memory between folds | |
| del model, trainer | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # ---- cross-fold aggregation ---- | |
| avg_score = float(np.mean(fold_scores)) | |
| avg_da = float(np.mean(fold_da_list)) if fold_da_list else 0.5 | |
| avg_sharpe = float(np.mean(fold_sharpe_list)) if fold_sharpe_list else 0.0 | |
| avg_vr = float(np.mean(fold_vr_list)) if fold_vr_list else 0.0 | |
| # High fold-score variance = trial is unreliable (works in one regime, fails in another) | |
| consistency_penalty = ( | |
| float(np.std(fold_scores)) * 0.5 if len(fold_scores) > 1 else 0.0 | |
| ) | |
| trial.set_user_attr("avg_variance_ratio", round(avg_vr, 4)) | |
| trial.set_user_attr("avg_directional_accuracy", round(avg_da, 4)) | |
| trial.set_user_attr("avg_val_sharpe", round(avg_sharpe, 4)) | |
| trial.set_user_attr( | |
| "fold_score_std", | |
| round(float(np.std(fold_scores)) if len(fold_scores) > 1 else 0.0, 4), | |
| ) | |
| # Hard prune: avg Sharpe negative across folds = systematically wrong | |
| if avg_sharpe < 0.0: | |
| logger.warning( | |
| "Trial %d PRUNED: avg_sharpe=%.4f < 0 across %d folds (DA=%.1f%%)", | |
| trial.number, avg_sharpe, n_folds, avg_da * 100, | |
| ) | |
| raise optuna.exceptions.TrialPruned() | |
| # Soft penalty: avg DA below coin-flip | |
| da_penalty = 2.0 * max(0.0, 0.50 - avg_da) if avg_da < 0.50 else 0.0 | |
| final_score = avg_score + consistency_penalty + da_penalty | |
| logger.info( | |
| "Trial %d [%d-fold CV]: avg_score=%.4f consistency=%.4f " | |
| "da_penalty=%.4f → final=%.4f | DA=%.1f%% Sharpe=%.3f VR=%.3f", | |
| trial.number, n_folds, avg_score, consistency_penalty, da_penalty, | |
| final_score, avg_da * 100, avg_sharpe, avg_vr, | |
| ) | |
| return final_score | |
| def run_hyperopt( | |
| base_cfg: Optional[TFTASROConfig] = None, | |
| n_trials: int = 50, | |
| study_name: str = "tft_asro_optuna", | |
| storage: Optional[str] = None, | |
| ) -> dict: | |
| """ | |
| Launch Optuna hyperparameter search. | |
| Returns: | |
| Dict with best params, best value, and study summary. | |
| """ | |
| import optuna | |
| try: | |
| import lightning.pytorch as pl | |
| except ImportError: | |
| import pytorch_lightning as pl # type: ignore[no-redef] | |
| from app.db import SessionLocal, init_db | |
| from deep_learning.data.feature_store import build_tft_dataframe | |
| if base_cfg is None: | |
| base_cfg = get_tft_config() | |
| init_db() | |
| pl.seed_everything(base_cfg.training.seed) | |
| logger.info("Building feature store for hyperopt ...") | |
| with SessionLocal() as session: | |
| master_data = build_tft_dataframe(session, base_cfg) | |
| study = optuna.create_study( | |
| study_name=study_name, | |
| direction="minimize", | |
| storage=storage, | |
| load_if_exists=True, | |
| pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=5), | |
| ) | |
| study.optimize( | |
| lambda trial: _objective(trial, base_cfg, master_data), | |
| n_trials=n_trials, | |
| show_progress_bar=True, | |
| ) | |
| best = study.best_trial | |
| logger.info("Optuna best trial #%d: val_loss=%.6f", best.number, best.value) | |
| logger.info("Best params: %s", best.params) | |
| # Save alongside best_tft_asro.ckpt (tft/ root) so upload_tft_artifacts picks it up. | |
| results_path = Path(base_cfg.training.best_model_path).parent / "optuna_results.json" | |
| results_path.parent.mkdir(parents=True, exist_ok=True) | |
| results_path.write_text(json.dumps({ | |
| "best_trial": best.number, | |
| "best_value": best.value, | |
| "best_params": best.params, | |
| "n_trials": len(study.trials), | |
| }, indent=2)) | |
| return { | |
| "best_trial": best.number, | |
| "best_value": best.value, | |
| "best_params": best.params, | |
| "n_trials": len(study.trials), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| parser = argparse.ArgumentParser(description="TFT-ASRO hyperparameter optimisation") | |
| parser.add_argument("--n-trials", type=int, default=50) | |
| parser.add_argument("--study-name", default="tft_asro_optuna") | |
| args = parser.parse_args() | |
| result = run_hyperopt(n_trials=args.n_trials, study_name=args.study_name) | |
| print("\n" + "=" * 60) | |
| print("HYPEROPT COMPLETE") | |
| print("=" * 60) | |
| print(f"Best trial: #{result['best_trial']}") | |
| print(f"Best val_loss: {result['best_value']:.6f}") | |
| for k, v in result["best_params"].items(): | |
| print(f" {k}: {v}") | |