Spaces:
Sleeping
Sleeping
| """ | |
| walk_forward.py β Strict time-series walk-forward cross-validation. | |
| Architecture: | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β FOLD 1: [=TRAIN=======|=VAL=|----TEST----] β | |
| β FOLD 2: [=TRAIN============|=VAL=|--TEST--] β | |
| β FOLD 3: [=TRAIN==================|=VAL=|TEST] β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| Key anti-lookahead rules enforced here: | |
| 1. Train/val/test boundaries are strictly chronological | |
| 2. No future data ever seen during training or threshold search | |
| 3. Labels computed BEFORE fold construction (in labeler.py) | |
| 4. Threshold optimized on VAL set; reported metric on TEST set only | |
| 5. Model fitted fresh for each fold (no weight leakage) | |
| """ | |
| import json | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import List, Tuple, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from ml_config import ( | |
| WF_N_SPLITS, | |
| WF_TRAIN_FRAC, | |
| WF_MIN_TRAIN_OBS, | |
| LGBM_PARAMS, | |
| THRESHOLD_MIN, | |
| THRESHOLD_MAX, | |
| THRESHOLD_STEPS, | |
| THRESHOLD_OBJECTIVE, | |
| ROUND_TRIP_COST, | |
| TARGET_RR, | |
| FEATURE_COLUMNS, | |
| ) | |
| from model_backend import ModelBackend | |
| logger = logging.getLogger(__name__) | |
| class FoldResult: | |
| fold: int | |
| n_train: int | |
| n_val: int | |
| n_test: int | |
| train_win_rate: float | |
| val_win_rate: float | |
| test_win_rate: float | |
| best_threshold: float | |
| val_objective: float # objective on val (used to pick threshold) | |
| test_sharpe: float # out-of-sample Sharpe after thresholding | |
| test_expectancy: float # out-of-sample expectancy per trade | |
| test_precision: float # win rate of filtered trades on test | |
| test_n_trades: int # number of trades passing filter on test | |
| feature_importances: np.ndarray = field(repr=False) | |
| def _compute_expectancy(y_true: np.ndarray, rr: float = TARGET_RR, cost: float = ROUND_TRIP_COST) -> float: | |
| """ | |
| Mathematical expectancy per trade (in R units): | |
| E = win_rate * RR - loss_rate * 1 - cost | |
| """ | |
| if len(y_true) == 0: | |
| return -999.0 | |
| win_rate = float(y_true.mean()) | |
| loss_rate = 1.0 - win_rate | |
| return win_rate * rr - loss_rate * 1.0 - cost | |
| def _compute_sharpe(y_true: np.ndarray, rr: float = TARGET_RR, cost: float = ROUND_TRIP_COST) -> float: | |
| """ | |
| Approximate trade Sharpe: mean(trade PnL) / std(trade PnL). | |
| Trade PnL in R: +RR for win, -1 for loss. | |
| """ | |
| if len(y_true) < 5: | |
| return -999.0 | |
| pnl = np.where(y_true == 1, rr, -1.0) - cost | |
| std = pnl.std() | |
| if std < 1e-9: | |
| return 0.0 | |
| return float(pnl.mean() / std * np.sqrt(252)) # annualized loosely | |
| def _optimize_threshold( | |
| probs: np.ndarray, | |
| y_true: np.ndarray, | |
| objective: str = THRESHOLD_OBJECTIVE, | |
| ) -> Tuple[float, float]: | |
| """ | |
| Grid-search threshold on VAL set. | |
| Returns (best_threshold, best_objective_value). | |
| """ | |
| thresholds = np.linspace(THRESHOLD_MIN, THRESHOLD_MAX, THRESHOLD_STEPS) | |
| best_thresh = THRESHOLD_MIN | |
| best_val = -np.inf | |
| for t in thresholds: | |
| mask = probs >= t | |
| if mask.sum() < 10: # too few trades to be meaningful | |
| continue | |
| y_filtered = y_true[mask] | |
| if objective == "expectancy": | |
| val = _compute_expectancy(y_filtered) | |
| elif objective == "sharpe": | |
| val = _compute_sharpe(y_filtered) | |
| elif objective == "precision_recall": | |
| prec = y_filtered.mean() | |
| recall = y_filtered.sum() / (y_true.sum() + 1e-9) | |
| val = 2 * prec * recall / (prec + recall + 1e-9) # F1 | |
| else: | |
| val = y_filtered.mean() # default: win rate | |
| if val > best_val: | |
| best_val = val | |
| best_thresh = t | |
| return float(best_thresh), float(best_val) | |
| def _make_folds( | |
| n: int, | |
| n_splits: int = WF_N_SPLITS, | |
| train_frac: float = WF_TRAIN_FRAC, | |
| ) -> List[Tuple[range, range, range]]: | |
| """ | |
| Generate (train, val, test) index ranges for walk-forward CV. | |
| Each fold grows the training window while test always moves forward. | |
| Val is 15% of the train fraction; test is the remaining hold-out. | |
| """ | |
| folds = [] | |
| fold_size = n // (n_splits + 1) | |
| val_frac = 0.15 | |
| for i in range(n_splits): | |
| test_end = n - (n_splits - 1 - i) * fold_size | |
| test_start = test_end - fold_size | |
| val_end = test_start | |
| val_start = int(val_end * (1 - val_frac)) | |
| train_end = val_start | |
| train_start = 0 # expanding window | |
| if train_end - train_start < WF_MIN_TRAIN_OBS: | |
| continue | |
| folds.append(( | |
| range(train_start, train_end), | |
| range(val_start, val_end), | |
| range(test_start, test_end), | |
| )) | |
| return folds | |
| def run_walk_forward( | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| timestamps: Optional[np.ndarray] = None, | |
| params: dict = None, | |
| ) -> List[FoldResult]: | |
| """ | |
| Execute full walk-forward validation. | |
| Args: | |
| X: Feature matrix (N, n_features) β rows in chronological order | |
| y: Label array (N,) β 0/1 binary | |
| timestamps: Optional array of timestamps for logging | |
| params: Model hyperparameters (defaults to ml_config.LGBM_PARAMS) | |
| Returns: | |
| List of FoldResult, one per valid fold. | |
| """ | |
| if params is None: | |
| params = LGBM_PARAMS | |
| results: List[FoldResult] = [] | |
| folds = _make_folds(len(X), WF_N_SPLITS, WF_TRAIN_FRAC) | |
| if not folds: | |
| raise ValueError(f"Insufficient data for walk-forward CV. Need >= {WF_MIN_TRAIN_OBS * (WF_N_SPLITS + 1)} rows.") | |
| all_importances = [] | |
| for fold_idx, (tr, va, te) in enumerate(folds, 1): | |
| X_tr, y_tr = X[tr], y[tr] | |
| X_va, y_va = X[va], y[va] | |
| X_te, y_te = X[te], y[te] | |
| if len(np.unique(y_tr)) < 2: | |
| logger.warning(f"Fold {fold_idx}: only one class in training set β skipping") | |
| continue | |
| logger.info( | |
| f"Fold {fold_idx}/{len(folds)}: " | |
| f"train={len(X_tr)} val={len(X_va)} test={len(X_te)} " | |
| f"(wr_tr={y_tr.mean():.3f} wr_va={y_va.mean():.3f} wr_te={y_te.mean():.3f})" | |
| ) | |
| # Compute class weight to handle imbalance (crypto: ~35-45% win rate) | |
| pos_frac = y_tr.mean() | |
| if 0.05 < pos_frac < 0.95: | |
| sample_weight = np.where(y_tr == 1, 1.0 / pos_frac, 1.0 / (1 - pos_frac)) | |
| else: | |
| sample_weight = None | |
| backend = ModelBackend(params=params, calibrate=True) | |
| backend.fit(X_tr, y_tr, X_va, y_va, sample_weight=sample_weight) | |
| val_probs = backend.predict_win_prob(X_va) | |
| test_probs = backend.predict_win_prob(X_te) | |
| best_thresh, best_val_obj = _optimize_threshold(val_probs, y_va) | |
| # Evaluate on TEST set using threshold from VAL | |
| test_mask = test_probs >= best_thresh | |
| y_te_filtered = y_te[test_mask] | |
| n_test_trades = int(test_mask.sum()) | |
| test_expectancy = _compute_expectancy(y_te_filtered) if n_test_trades > 0 else -999.0 | |
| test_sharpe = _compute_sharpe(y_te_filtered) if n_test_trades > 0 else -999.0 | |
| test_precision = float(y_te_filtered.mean()) if n_test_trades > 0 else 0.0 | |
| all_importances.append(backend.feature_importances_) | |
| result = FoldResult( | |
| fold=fold_idx, | |
| n_train=len(X_tr), | |
| n_val=len(X_va), | |
| n_test=len(X_te), | |
| train_win_rate=float(y_tr.mean()), | |
| val_win_rate=float(y_va.mean()), | |
| test_win_rate=float(y_te.mean()), | |
| best_threshold=best_thresh, | |
| val_objective=best_val_obj, | |
| test_sharpe=test_sharpe, | |
| test_expectancy=test_expectancy, | |
| test_precision=test_precision, | |
| test_n_trades=n_test_trades, | |
| feature_importances=backend.feature_importances_, | |
| ) | |
| results.append(result) | |
| logger.info( | |
| f"Fold {fold_idx}: thresh={best_thresh:.3f} " | |
| f"test_expectancy={test_expectancy:.4f} " | |
| f"test_sharpe={test_sharpe:.3f} " | |
| f"test_prec={test_precision:.3f} " | |
| f"n_trades={n_test_trades}" | |
| ) | |
| return results | |
| def summarize_walk_forward(results: List[FoldResult]) -> dict: | |
| """Aggregate walk-forward results into a summary dict.""" | |
| if not results: | |
| return {} | |
| thresholds = [r.best_threshold for r in results] | |
| expectancies = [r.test_expectancy for r in results if r.test_expectancy > -999] | |
| sharpes = [r.test_sharpe for r in results if r.test_sharpe > -999] | |
| precisions = [r.test_precision for r in results] | |
| n_trades = [r.test_n_trades for r in results] | |
| avg_importance = np.mean([r.feature_importances for r in results], axis=0) | |
| return { | |
| "n_folds": len(results), | |
| "mean_threshold": round(float(np.mean(thresholds)), 4), | |
| "std_threshold": round(float(np.std(thresholds)), 4), | |
| "mean_expectancy": round(float(np.mean(expectancies)), 4) if expectancies else None, | |
| "std_expectancy": round(float(np.std(expectancies)), 4) if expectancies else None, | |
| "mean_sharpe": round(float(np.mean(sharpes)), 4) if sharpes else None, | |
| "mean_precision": round(float(np.mean(precisions)), 4), | |
| "mean_n_trades_per_fold": round(float(np.mean(n_trades)), 1), | |
| "avg_feature_importance": avg_importance.tolist(), | |
| "fold_details": [ | |
| { | |
| "fold": r.fold, | |
| "threshold": r.best_threshold, | |
| "test_expectancy": r.test_expectancy, | |
| "test_sharpe": r.test_sharpe, | |
| "test_precision": r.test_precision, | |
| "test_n_trades": r.test_n_trades, | |
| } | |
| for r in results | |
| ], | |
| } | |