Spaces:
Sleeping
Sleeping
| """ | |
| regime.py — Market regime detection with ADX, volatility compression, | |
| distance-from-mean filter, and regime confidence scoring. | |
| Key fixes vs prior version: | |
| - STRUCTURE_LOOKBACK halved (10) to reduce entry lag | |
| - True ATR (not EWM-only) with percentile-based compression detection | |
| - ADX for objective trend strength (replaces pure HH/HL heuristic) | |
| - Regime confidence: composite of trend + structure + vol alignment | |
| - Distance-from-mean filter to avoid entering extended moves | |
| """ | |
| from typing import Dict, Any | |
| import numpy as np | |
| import pandas as pd | |
| from config import ( | |
| ATR_PERIOD, | |
| STRUCTURE_LOOKBACK, | |
| STRUCTURE_CONFIRM_BARS, | |
| VOLATILITY_EXPANSION_MULT, | |
| VOLATILITY_CONTRACTION_MULT, | |
| VOL_COMPRESSION_LOOKBACK, | |
| VOL_COMPRESSION_PERCENTILE, | |
| VOL_EXPANSION_CONFIRM_MULT, | |
| ADX_PERIOD, | |
| ADX_TREND_THRESHOLD, | |
| ADX_STRONG_THRESHOLD, | |
| DIST_FROM_MEAN_MA, | |
| DIST_FROM_MEAN_ATR_MAX, | |
| REGIME_CONFIDENCE_MIN, | |
| ) | |
| def compute_atr(df: pd.DataFrame, period: int = ATR_PERIOD) -> pd.Series: | |
| high, low, prev_close = df["high"], df["low"], df["close"].shift(1) | |
| tr = pd.concat( | |
| [high - low, (high - prev_close).abs(), (low - prev_close).abs()], | |
| axis=1, | |
| ).max(axis=1) | |
| # Use Wilder's smoothing (RMA) — matches TradingView / industry standard | |
| return tr.ewm(alpha=1.0 / period, adjust=False).mean() | |
| def compute_adx(df: pd.DataFrame, period: int = ADX_PERIOD) -> pd.DataFrame: | |
| """ | |
| Returns DataFrame with columns: adx, di_plus, di_minus. | |
| Uses Wilder smoothing throughout to match standard ADX definition. | |
| """ | |
| high, low, close = df["high"], df["low"], df["close"] | |
| prev_high = high.shift(1) | |
| prev_low = low.shift(1) | |
| prev_close = close.shift(1) | |
| dm_plus = (high - prev_high).clip(lower=0) | |
| dm_minus = (prev_low - low).clip(lower=0) | |
| # Zero out when the other direction is larger | |
| mask = dm_plus >= dm_minus | |
| dm_plus = dm_plus.where(mask, 0.0) | |
| dm_minus = dm_minus.where(~mask, 0.0) | |
| tr = pd.concat( | |
| [high - low, (high - prev_close).abs(), (low - prev_close).abs()], | |
| axis=1, | |
| ).max(axis=1) | |
| alpha = 1.0 / period | |
| atr_w = tr.ewm(alpha=alpha, adjust=False).mean() | |
| sdm_plus = dm_plus.ewm(alpha=alpha, adjust=False).mean() | |
| sdm_minus = dm_minus.ewm(alpha=alpha, adjust=False).mean() | |
| di_plus = 100 * sdm_plus / atr_w.replace(0, np.nan) | |
| di_minus = 100 * sdm_minus / atr_w.replace(0, np.nan) | |
| dx = 100 * (di_plus - di_minus).abs() / (di_plus + di_minus).replace(0, np.nan) | |
| adx = dx.ewm(alpha=alpha, adjust=False).mean() | |
| return pd.DataFrame({"adx": adx, "di_plus": di_plus, "di_minus": di_minus}) | |
| def compute_structure(df: pd.DataFrame, lookback: int = STRUCTURE_LOOKBACK) -> pd.Series: | |
| roll_high = df["high"].rolling(lookback).max() | |
| roll_low = df["low"].rolling(lookback).min() | |
| half = max(1, lookback // 2) | |
| prev_high = roll_high.shift(half) | |
| prev_low = roll_low.shift(half) | |
| hh = roll_high > prev_high | |
| hl = roll_low > prev_low | |
| lh = roll_high < prev_high | |
| ll = roll_low < prev_low | |
| structure = pd.Series(0, index=df.index) | |
| structure[hh & hl] = 1 | |
| structure[lh & ll] = -1 | |
| return structure | |
| def compute_volatility_compression( | |
| atr_series: pd.Series, | |
| lookback: int = VOL_COMPRESSION_LOOKBACK, | |
| percentile: float = VOL_COMPRESSION_PERCENTILE, | |
| ) -> pd.Series: | |
| """ | |
| Returns True where current ATR is below the Nth percentile of its | |
| recent history — i.e., volatility is compressed (coiled). | |
| """ | |
| rolling_pct = atr_series.rolling(lookback).quantile(percentile / 100.0) | |
| return atr_series < rolling_pct | |
| def compute_volatility_expanding_from_compression( | |
| atr_series: pd.Series, | |
| compressed_series: pd.Series, | |
| mult: float = VOL_EXPANSION_CONFIRM_MULT, | |
| lookback: int = 5, | |
| ) -> pd.Series: | |
| """ | |
| Returns True where ATR is now expanding (current > recent_min * mult) | |
| AND was compressed within the last `lookback` bars. | |
| Catches the precise moment of volatility breakout from a base. | |
| """ | |
| recent_min_atr = atr_series.rolling(lookback).min().shift(1) | |
| expanding = atr_series > recent_min_atr * mult | |
| was_compressed = compressed_series.shift(1).rolling(lookback).max().fillna(0) > 0 | |
| return expanding & was_compressed | |
| def compute_distance_from_mean( | |
| df: pd.DataFrame, | |
| atr_series: pd.Series, | |
| ma_period: int = DIST_FROM_MEAN_MA, | |
| atr_max: float = DIST_FROM_MEAN_ATR_MAX, | |
| ) -> pd.Series: | |
| """ | |
| Returns ATR-normalised distance of close from its SMA. | |
| Values > atr_max mean price is too extended for a fresh long entry. | |
| """ | |
| sma = df["close"].rolling(ma_period).mean() | |
| distance_atr = (df["close"] - sma) / atr_series.replace(0, np.nan) | |
| return distance_atr | |
| def classify_trend( | |
| structure_series: pd.Series, | |
| adx_df: pd.DataFrame, | |
| lookback: int = STRUCTURE_CONFIRM_BARS, | |
| ) -> str: | |
| recent_struct = structure_series.iloc[-lookback:] | |
| bullish = (recent_struct == 1).sum() | |
| bearish = (recent_struct == -1).sum() | |
| adx_val = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0 | |
| di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0 | |
| di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0 | |
| adx_trending = adx_val >= ADX_TREND_THRESHOLD | |
| if adx_trending and di_plus > di_minus and bullish >= max(1, lookback // 2): | |
| return "bullish" | |
| if adx_trending and di_minus > di_plus and bearish >= max(1, lookback // 2): | |
| return "bearish" | |
| return "ranging" | |
| def compute_regime_confidence( | |
| trend: str, | |
| adx_val: float, | |
| structure: int, | |
| vol_expanding_from_base: bool, | |
| vol_ratio: float, | |
| dist_atr: float, | |
| ) -> float: | |
| """ | |
| Composite confidence [0, 1] requiring alignment across: | |
| - ADX trend strength | |
| - Price structure | |
| - Volatility expanding from compression | |
| - Price not extended | |
| Low confidence = system holds off even if other scores look good. | |
| """ | |
| score = 0.0 | |
| # ADX contribution (0 to 0.35) | |
| if adx_val >= ADX_STRONG_THRESHOLD: | |
| score += 0.35 | |
| elif adx_val >= ADX_TREND_THRESHOLD: | |
| score += 0.20 | |
| else: | |
| score += 0.05 | |
| # Structure alignment (0 to 0.25) | |
| if trend == "bullish" and structure == 1: | |
| score += 0.25 | |
| elif trend == "bearish" and structure == -1: | |
| score += 0.25 | |
| elif structure == 0: | |
| score += 0.10 | |
| else: | |
| score += 0.0 | |
| # Volatility expanding from base (0 to 0.25) | |
| if vol_expanding_from_base: | |
| score += 0.25 | |
| elif 1.0 < vol_ratio < VOLATILITY_EXPANSION_MULT: | |
| score += 0.10 | |
| else: | |
| score += 0.0 | |
| # Price not extended (0 to 0.15) | |
| abs_dist = abs(dist_atr) if not np.isnan(dist_atr) else 0.0 | |
| if abs_dist < 1.0: | |
| score += 0.15 | |
| elif abs_dist < DIST_FROM_MEAN_ATR_MAX: | |
| score += 0.07 | |
| else: | |
| score += 0.0 | |
| return float(np.clip(score, 0.0, 1.0)) | |
| def detect_regime(df: pd.DataFrame) -> Dict[str, Any]: | |
| atr_series = compute_atr(df, ATR_PERIOD) | |
| adx_df = compute_adx(df, ADX_PERIOD) | |
| structure_series = compute_structure(df, STRUCTURE_LOOKBACK) | |
| compressed_series = compute_volatility_compression(atr_series) | |
| expanding_from_base = compute_volatility_expanding_from_compression( | |
| atr_series, compressed_series | |
| ) | |
| dist_atr_series = compute_distance_from_mean(df, atr_series) | |
| last_atr = float(atr_series.iloc[-1]) | |
| last_close = float(df["close"].iloc[-1]) | |
| last_structure = int(structure_series.iloc[-1]) | |
| last_adx = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0 | |
| last_di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0 | |
| last_di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0 | |
| last_compressed = bool(compressed_series.iloc[-1]) | |
| last_expanding_from_base = bool(expanding_from_base.iloc[-1]) | |
| last_dist_atr = float(dist_atr_series.iloc[-1]) if not np.isnan(dist_atr_series.iloc[-1]) else 0.0 | |
| atr_ma = atr_series.rolling(ATR_PERIOD * 2).mean() | |
| last_atr_ma = float(atr_ma.iloc[-1]) if not np.isnan(atr_ma.iloc[-1]) else last_atr | |
| vol_ratio = last_atr / last_atr_ma if last_atr_ma > 0 else 1.0 | |
| vol_expanding = vol_ratio > VOLATILITY_EXPANSION_MULT | |
| vol_contracting = vol_ratio < VOLATILITY_CONTRACTION_MULT | |
| atr_pct = last_atr / last_close if last_close > 0 else 0.0 | |
| trend = classify_trend(structure_series, adx_df, STRUCTURE_CONFIRM_BARS) | |
| price_too_extended_long = last_dist_atr > DIST_FROM_MEAN_ATR_MAX | |
| price_too_extended_short = last_dist_atr < -DIST_FROM_MEAN_ATR_MAX | |
| regime_confidence = compute_regime_confidence( | |
| trend=trend, | |
| adx_val=last_adx, | |
| structure=last_structure, | |
| vol_expanding_from_base=last_expanding_from_base, | |
| vol_ratio=vol_ratio, | |
| dist_atr=last_dist_atr, | |
| ) | |
| # Regime score: raw directional quality | |
| if trend == "bullish" and not vol_expanding: | |
| regime_score = 1.0 | |
| elif trend == "bullish" and vol_expanding: | |
| regime_score = 0.55 | |
| elif trend == "ranging": | |
| regime_score = 0.25 | |
| elif trend == "bearish" and not vol_expanding: | |
| regime_score = 0.15 | |
| else: | |
| regime_score = 0.05 | |
| if last_adx >= ADX_STRONG_THRESHOLD: | |
| regime_score = min(1.0, regime_score + 0.1) | |
| elif last_adx < ADX_TREND_THRESHOLD: | |
| regime_score = max(0.0, regime_score - 0.15) | |
| if last_structure == 1: | |
| regime_score = min(1.0, regime_score + 0.1) | |
| elif last_structure == -1: | |
| regime_score = max(0.0, regime_score - 0.1) | |
| atr_ma_20 = atr_series.rolling(20).mean().iloc[-1] | |
| atr_ma_50 = atr_series.rolling(50).mean().iloc[-1] if len(df) >= 50 else atr_ma_20 | |
| atr_trend_dir = "rising" if atr_ma_20 > atr_ma_50 else "falling" | |
| return { | |
| "atr": last_atr, | |
| "atr_pct": atr_pct, | |
| "atr_pct_pct": round(atr_pct * 100, 3), | |
| "structure": last_structure, | |
| "trend": trend, | |
| "vol_ratio": round(vol_ratio, 3), | |
| "vol_expanding": vol_expanding, | |
| "vol_contracting": vol_contracting, | |
| "vol_compressed": last_compressed, | |
| "vol_expanding_from_base": last_expanding_from_base, | |
| "adx": round(last_adx, 2), | |
| "di_plus": round(last_di_plus, 2), | |
| "di_minus": round(last_di_minus, 2), | |
| "dist_atr": round(last_dist_atr, 3), | |
| "price_extended_long": price_too_extended_long, | |
| "price_extended_short": price_too_extended_short, | |
| "regime_confidence": round(regime_confidence, 4), | |
| "regime_score": round(float(np.clip(regime_score, 0.0, 1.0)), 4), | |
| "atr_trend": atr_trend_dir, | |
| "atr_series": atr_series, | |
| "structure_series": structure_series, | |
| "adx_series": adx_df, | |
| "compressed_series": compressed_series, | |
| "dist_atr_series": dist_atr_series, | |
| } | |