Spaces:
Sleeping
Sleeping
| """ | |
| main.py β Gradio interface with integrated ML probability filter. | |
| Pipeline: | |
| OHLCV Data | |
| β | |
| βΌ | |
| Rule Engine (regime + volume + scoring + veto) | |
| β | |
| βββΊ Vetoed β skip (no ML call, save compute) | |
| β | |
| βββΊ Approved by rules | |
| β | |
| βΌ | |
| ML Filter (LightGBM / HGBM probability) | |
| β | |
| βββΊ prob < threshold β FILTERED (shown as ML_REJECT) | |
| β | |
| βββΊ prob >= threshold β Risk Engine β Final setup | |
| β | |
| βΌ | |
| Ranked output with ML prob overlay | |
| """ | |
| import logging | |
| import sys | |
| import time | |
| from typing import List, Optional, Dict, Any | |
| import gradio as gr | |
| from config import ( | |
| DEFAULT_SYMBOLS, | |
| TOP_N_DEFAULT, | |
| DEFAULT_ACCOUNT_EQUITY, | |
| TIMEFRAME, | |
| CANDLE_LIMIT, | |
| ) | |
| from data_fetcher import fetch_multiple, fetch_instruments | |
| from regime import detect_regime | |
| from volume_analysis import analyze_volume | |
| from risk_engine import evaluate_risk | |
| from veto import apply_veto, veto_summary | |
| from scorer import compute_structure_score, score_token, rank_tokens, format_score_bar, quality_tier | |
| from feature_builder import build_feature_dict, validate_features | |
| from ml_filter import TradeFilter | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| stream=sys.stdout, | |
| ) | |
| logger = logging.getLogger("main") | |
| # Load ML filter once at startup (None if not yet trained) | |
| _TRADE_FILTER: Optional[TradeFilter] = TradeFilter.load_or_none() | |
| _TREND_ICON = {"bullish": "β²", "ranging": "β", "bearish": "βΌ"} | |
| _BREAK_LABEL = {1: "βUP", -1: "βDN", 0: " β "} | |
| _DIR_LABEL = {1: "LONG", -1: "SHORT", 0: "NONE"} | |
| def infer_direction(trend: str, breakout: int) -> int: | |
| if trend == "bullish" or breakout == 1: | |
| return 1 | |
| if trend == "bearish" or breakout == -1: | |
| return -1 | |
| return 0 | |
| def analyze_single( | |
| symbol: str, | |
| df, | |
| account_equity: float, | |
| consec_losses: int = 0, | |
| equity_drawdown_pct: float = 0.0, | |
| use_ml: bool = True, | |
| ) -> Dict[str, Any]: | |
| # ββ RULE ENGINE βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| regime_data = detect_regime(df) | |
| volume_data = analyze_volume(df, atr_series=regime_data["atr_series"]) | |
| structure_sc = compute_structure_score(regime_data) | |
| direction = infer_direction(regime_data["trend"], volume_data["breakout"]) | |
| vetoed, veto_reason = apply_veto(regime_data, volume_data, structure_sc, direction=direction) | |
| scores = score_token(regime_data, volume_data, vetoed) | |
| # ββ ML FILTER βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ml_prob = None | |
| ml_approved = None | |
| ml_reject_reason = "" | |
| if use_ml and _TRADE_FILTER is not None and not vetoed: | |
| try: | |
| feat = build_feature_dict(regime_data, volume_data, scores) | |
| if validate_features(feat): | |
| result = _TRADE_FILTER.predict(regime_data, volume_data, scores) | |
| ml_prob = result.probability | |
| ml_approved = result.approved | |
| ml_reject_reason = result.reject_reason | |
| else: | |
| ml_approved = None # pass through if features invalid | |
| except Exception as e: | |
| logger.warning(f"{symbol}: ML filter error: {e}") | |
| ml_approved = None | |
| # ββ RISK ENGINE βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Only compute full risk if not vetoed by rules AND not rejected by ML | |
| final_approved = ( | |
| not vetoed and | |
| (ml_approved is None or ml_approved) | |
| ) | |
| risk_data = evaluate_risk( | |
| close=float(df["close"].iloc[-1]), | |
| atr=regime_data["atr"], | |
| atr_pct=regime_data["atr_pct"], | |
| regime_score=regime_data["regime_score"], | |
| vol_ratio=regime_data["vol_ratio"], | |
| volume_score=volume_data["volume_score"], | |
| regime_confidence=regime_data["regime_confidence"], | |
| vol_compressed=regime_data["vol_compressed"], | |
| consec_losses=consec_losses, | |
| equity_drawdown_pct=equity_drawdown_pct, | |
| account_equity=account_equity, | |
| ) if final_approved else {} | |
| return { | |
| "symbol": symbol, | |
| "close": float(df["close"].iloc[-1]), | |
| "trend": regime_data["trend"], | |
| "adx": regime_data["adx"], | |
| "di_plus": regime_data["di_plus"], | |
| "di_minus": regime_data["di_minus"], | |
| "vol_ratio": regime_data["vol_ratio"], | |
| "vol_compressed": regime_data["vol_compressed"], | |
| "vol_expanding_from_base": regime_data["vol_expanding_from_base"], | |
| "vol_expanding": regime_data["vol_expanding"], | |
| "dist_atr": regime_data["dist_atr"], | |
| "price_extended": regime_data["price_extended_long"] or regime_data["price_extended_short"], | |
| "regime_confidence": regime_data["regime_confidence"], | |
| "spike": volume_data["spike"], | |
| "climax": volume_data["climax"], | |
| "absorption": volume_data["absorption"], | |
| "failed_breakout": volume_data["failed_breakout"], | |
| "recent_failed": volume_data["recent_failed_count"], | |
| "breakout": volume_data["breakout"], | |
| "obv_slope": volume_data["obv_slope_norm"], | |
| "delta_sign": volume_data["delta_sign"], | |
| "direction": direction, | |
| "rule_vetoed": vetoed, | |
| "veto_reason": veto_reason, | |
| "ml_prob": ml_prob, | |
| "ml_approved": ml_approved, | |
| "ml_reject_reason": ml_reject_reason, | |
| "final_approved": final_approved, | |
| "regime_score": scores["regime_score"], | |
| "volume_score": scores["volume_score"], | |
| "structure_score": scores["structure_score"], | |
| "confidence_score": scores["confidence_score"], | |
| "total_score": scores["total_score"], | |
| "risk": risk_data, | |
| } | |
| def _ml_status(d: Dict) -> str: | |
| if _TRADE_FILTER is None: | |
| return "NO_MODEL" | |
| if d["rule_vetoed"]: | |
| return "RULE_VET" | |
| if d["ml_prob"] is None: | |
| return "ML_ERR " | |
| prob_str = f"{d['ml_prob']:.3f}" | |
| return f"β{prob_str}" if d["ml_approved"] else f"β{prob_str}" | |
| def build_ranked_table(ranked: list, top_n: int) -> str: | |
| hdr = ( | |
| f"{'#':>3} {'Symbol':<14} {'Score':>7} {'Tier':>4} " | |
| f"{'Regime':>6} {'Vol':>6} {'S':>5} {'C':>5} " | |
| f"{'Trend':>7} {'ADX':>5} {'VR':>5} " | |
| f"{'ML':>8} {'Status'}\n" | |
| ) | |
| sep = "β" * 105 + "\n" | |
| rows = hdr + sep | |
| for rank, (sym, d) in enumerate(ranked[:top_n], 1): | |
| icon = _TREND_ICON.get(d["trend"], "?") | |
| tier = quality_tier(d["total_score"]) | |
| ml_str = _ml_status(d) | |
| if d["rule_vetoed"]: | |
| status = "RULE_VET" | |
| elif not d["final_approved"]: | |
| status = "ML_FILT " | |
| else: | |
| status = "OK " | |
| rows += ( | |
| f"{rank:>3} {sym:<14} {d['total_score']:>7.4f} {tier:>4} " | |
| f"{d['regime_score']:>6.3f} {d['volume_score']:>6.3f} " | |
| f"{d['structure_score']:>5.3f} {d['confidence_score']:>5.3f} " | |
| f"{icon} {d['trend']:<5} {d['adx']:>5.1f} {d['vol_ratio']:>5.2f} " | |
| f"{ml_str:>8} {status}\n" | |
| ) | |
| return rows | |
| def build_best_detail(data: Dict[str, Any]) -> str: | |
| r = data.get("risk", {}) | |
| sym = data["symbol"] | |
| icon = _TREND_ICON.get(data["trend"], "?") | |
| vol_state = [] | |
| if data["vol_compressed"]: vol_state.append("COMPRESSED") | |
| if data["vol_expanding_from_base"]: vol_state.append("EXPANDING FROM BASE β") | |
| if data["vol_expanding"] and not data["vol_expanding_from_base"]: | |
| vol_state.append("EXPANDING (no base)") | |
| vol_state_str = " | ".join(vol_state) or "NORMAL" | |
| ml_section = "" | |
| if _TRADE_FILTER is not None: | |
| prob_str = f"{data['ml_prob']:.4f}" if data["ml_prob"] is not None else "N/A" | |
| thresh_str = f"{_TRADE_FILTER.threshold:.4f}" | |
| decision = "APPROVED β" if data["ml_approved"] else "FILTERED β" | |
| ml_section = ( | |
| f"\n ββ ML PROBABILITY FILTER βββββββββββββββββββββββββ\n" | |
| f" P(win): {prob_str}\n" | |
| f" Threshold: {thresh_str}\n" | |
| f" ML Decision: {decision}\n" | |
| ) | |
| risk_section = "" | |
| if r: | |
| risk_section = ( | |
| f"\n ββ RISK PARAMETERS ββββββββββββββββββββββββββββββββ\n" | |
| f" Entry: {r.get('entry_price', 0):.8f}\n" | |
| f" ATR: {r.get('atr', 0):.8f} ({r.get('atr_pct', 0):.3f}%)\n" | |
| f" Stop Mult: {r.get('stop_mult', 0):.1f}x ATR\n" | |
| f" LONG β Stop: {r.get('stop_long', 0):.8f} Target: {r.get('target_long', 0):.8f}\n" | |
| f" SHORT β Stop: {r.get('stop_short', 0):.8f} Target: {r.get('target_short', 0):.8f}\n" | |
| f" R:R Ratio: 1 : {r.get('rr_ratio', 2):.1f}\n" | |
| f" Risk Fraction: {r.get('risk_fraction', 0):.4f}%\n" | |
| f" $ At Risk: ${r.get('dollar_at_risk', 0):.2f}\n" | |
| f" Position Size: ${r.get('position_notional', 0):.2f} notional\n" | |
| f" Leverage (est): {r.get('leverage_implied', 0):.2f}x\n" | |
| f" Consec. Losses: {r.get('consec_losses', 0)}\n" | |
| f" Sizing Halted: {'YES β' if r.get('sizing_halted') else 'no'}\n" | |
| ) | |
| lines = [ | |
| "β" * 64, | |
| f" BEST APPROVED SETUP: {sym} [{_DIR_LABEL.get(data['direction'], '?')}]", | |
| "β" * 64, | |
| f" Trend: {icon} {data['trend'].upper()}", | |
| f" ADX: {data['adx']:.1f} (DI+ {data['di_plus']:.1f} / DI- {data['di_minus']:.1f})", | |
| f" Vol State: {vol_state_str}", | |
| f" Dist from Mean: {data['dist_atr']:.2f} ATR", | |
| f" Regime Confidence:{data['regime_confidence']:.3f}", | |
| "", | |
| " ββ SCORES ββββββββββββββββββββββββββββββββββββββββββ", | |
| f" Regime: {format_score_bar(data['regime_score'])}", | |
| f" Volume: {format_score_bar(data['volume_score'])}", | |
| f" Structure: {format_score_bar(data['structure_score'])}", | |
| f" Confidence: {format_score_bar(data['confidence_score'])}", | |
| f" TOTAL: {format_score_bar(data['total_score'])}", | |
| ] | |
| lines.append(ml_section) | |
| lines.append(risk_section) | |
| lines.append("β" * 64) | |
| return "\n".join(lines) | |
| def parse_symbols(raw: str) -> List[str]: | |
| out = [] | |
| for tok in raw.replace(",", " ").replace("\n", " ").split(): | |
| tok = tok.strip().upper() | |
| if tok: | |
| out.append(tok if "-" in tok else f"{tok}-USDT") | |
| return out or DEFAULT_SYMBOLS | |
| def run_analysis( | |
| symbols_input: str, | |
| equity: float, | |
| consec_losses: int, | |
| drawdown_pct: float, | |
| top_n: int, | |
| use_live: bool, | |
| use_ml: bool, | |
| progress=gr.Progress(track_tqdm=False), | |
| ) -> str: | |
| t0 = time.time() | |
| lines = [] | |
| ml_status_str = "ACTIVE" if (_TRADE_FILTER is not None and use_ml) else ( | |
| "DISABLED" if not use_ml else "NOT TRAINED (run train.py)" | |
| ) | |
| lines += [ | |
| "β" * 68, | |
| " OKX QUANTITATIVE ANALYSIS ENGINE v3", | |
| f" ML Filter: {ml_status_str}", | |
| "β" * 68, | |
| ] | |
| if _TRADE_FILTER is not None and use_ml: | |
| lines.append(f" ML threshold: {_TRADE_FILTER.threshold:.4f} | Stats: {_TRADE_FILTER.stats()}") | |
| if use_live: | |
| lines.append("β³ Fetching live OKX instrument list...") | |
| symbols = fetch_instruments("SPOT") or DEFAULT_SYMBOLS | |
| lines.append(f"β {len(symbols)} live USDT instruments") | |
| else: | |
| symbols = parse_symbols(symbols_input) | |
| lines.append(f"β {len(symbols)} symbol(s)") | |
| lines.append( | |
| f" Equity: ${equity:,.0f} | Losses: {int(consec_losses)}" | |
| f" | DD: {drawdown_pct:.1f}% | TF: {TIMEFRAME}" | |
| ) | |
| lines.append("") | |
| total = len(symbols) | |
| def prog_cb(i, t, sym): | |
| progress(i / t, desc=f"Fetching {sym} ({i}/{t})") | |
| ohlcv_map = fetch_multiple(symbols, min_bars=50, progress_callback=prog_cb) | |
| lines.append(f"β Fetched {len(ohlcv_map)}/{total}") | |
| lines.append("") | |
| all_results: Dict[str, Any] = {} | |
| errors = [] | |
| for sym, df in ohlcv_map.items(): | |
| try: | |
| all_results[sym] = analyze_single( | |
| sym, df, | |
| account_equity=equity, | |
| consec_losses=int(consec_losses), | |
| equity_drawdown_pct=drawdown_pct / 100.0, | |
| use_ml=use_ml, | |
| ) | |
| except Exception as exc: | |
| logger.error(f"{sym}: {exc}", exc_info=True) | |
| errors.append(sym) | |
| if errors: | |
| lines.append(f"β Errors: {', '.join(errors)}") | |
| ranked = rank_tokens(all_results) | |
| rule_vetoed_n = sum(1 for _, d in ranked if d["rule_vetoed"]) | |
| ml_filtered_n = sum(1 for _, d in ranked if not d["rule_vetoed"] and not d["final_approved"]) | |
| approved_n = sum(1 for _, d in ranked if d["final_approved"]) | |
| lines += [ | |
| f" {len(all_results)} analyzed | {approved_n} approved | " | |
| f"{rule_vetoed_n} rule-vetoed | {ml_filtered_n} ML-filtered", | |
| "", | |
| " RANKED SETUPS", | |
| "β" * 105, | |
| build_ranked_table(ranked, int(top_n)), | |
| ] | |
| final_approved = [(s, d) for s, d in ranked if d["final_approved"]] | |
| if final_approved: | |
| best_sym, best_data = final_approved[0] | |
| lines += ["", build_best_detail(best_data)] | |
| else: | |
| lines += [ | |
| "", | |
| " β No fully approved setups.", | |
| " Possible causes: market regime unfavorable, ML model not trained,", | |
| " or all signals vetoed by rule engine.", | |
| ] | |
| if _TRADE_FILTER is not None and use_ml: | |
| lines += ["", f" ML session stats: {_TRADE_FILTER.stats()}"] | |
| lines += ["", f" β Complete in {time.time() - t0:.1f}s", "β" * 68] | |
| return "\n".join(lines) | |
| def build_app() -> gr.Blocks: | |
| with gr.Blocks( | |
| title="OKX Quant Engine v3", | |
| theme=gr.themes.Base( | |
| primary_hue="slate", | |
| neutral_hue="zinc", | |
| font=[gr.themes.GoogleFont("JetBrains Mono"), "monospace"], | |
| ), | |
| css=""" | |
| body, .gradio-container { | |
| background: #060a10 !important; | |
| font-family: 'JetBrains Mono', monospace !important; | |
| max-width: 1280px !important; | |
| } | |
| .gr-button-primary { | |
| background: linear-gradient(90deg, #1a6bff, #0044cc) !important; | |
| border: none !important; | |
| font-weight: 700 !important; | |
| letter-spacing: 0.06em !important; | |
| } | |
| #output_box textarea { | |
| font-family: 'JetBrains Mono', monospace !important; | |
| font-size: 12px !important; | |
| line-height: 1.55 !important; | |
| background: #0a0e18 !important; | |
| color: #b0c4de !important; | |
| border: 1px solid #182030 !important; | |
| min-height: 740px !important; | |
| } | |
| label { color: #4a6080 !important; font-size: 11px !important; text-transform: uppercase !important; letter-spacing: 0.09em !important; } | |
| h1, h2 { color: #c0d4f0 !important; font-family: 'JetBrains Mono', monospace !important; } | |
| p { color: #384858 !important; font-size: 12px !important; } | |
| .gr-panel { background: #0c1020 !important; border: 1px solid #182030 !important; } | |
| """, | |
| ) as app: | |
| gr.Markdown("# β OKX QUANT ENGINE v3") | |
| gr.Markdown( | |
| "ADX Β· absorption detection Β· volatility compression Β· " | |
| "fake breakout filter Β· **LightGBM probability layer** Β· adaptive risk" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| symbols_box = gr.Textbox( | |
| label="Symbols (comma / newline β blank = defaults)", | |
| placeholder="BTC-USDT, ETH-USDT, SOL-USDT ...", | |
| lines=4, value="", | |
| ) | |
| with gr.Column(scale=1): | |
| equity_slider = gr.Slider( | |
| label="Account Equity ($)", | |
| minimum=100, maximum=1_000_000, step=500, | |
| value=DEFAULT_ACCOUNT_EQUITY, | |
| ) | |
| top_n_slider = gr.Slider( | |
| label="Top N to Display", | |
| minimum=5, maximum=100, step=5, value=TOP_N_DEFAULT, | |
| ) | |
| with gr.Column(scale=1): | |
| consec_loss = gr.Slider(label="Consecutive Losses", minimum=0, maximum=10, step=1, value=0) | |
| drawdown = gr.Slider(label="Drawdown from Peak (%)", minimum=0.0, maximum=30.0, step=0.5, value=0.0) | |
| live_check = gr.Checkbox(label="Fetch live OKX instruments (100+)", value=False) | |
| ml_check = gr.Checkbox( | |
| label=f"Enable ML Filter (model: {'LOADED' if _TRADE_FILTER else 'NOT TRAINED'})", | |
| value=_TRADE_FILTER is not None, | |
| ) | |
| run_btn = gr.Button("βΆ RUN ANALYSIS", variant="primary", size="lg") | |
| output_box = gr.Textbox( | |
| label="Analysis Output", | |
| lines=50, max_lines=150, | |
| interactive=False, | |
| elem_id="output_box", | |
| ) | |
| run_btn.click( | |
| fn=run_analysis, | |
| inputs=[symbols_box, equity_slider, consec_loss, drawdown, top_n_slider, live_check, ml_check], | |
| outputs=output_box, | |
| ) | |
| gr.Markdown( | |
| "**Research use only. Not financial advice.** " | |
| "Train the ML filter: `python train.py --use-defaults` | " | |
| "Re-optimize threshold: `python threshold_optimizer.py`" | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--port", type=int, default=7860) | |
| parser.add_argument("--share", action="store_true") | |
| a = parser.parse_args() | |
| build_app().launch(server_name="0.0.0.0", server_port=a.port, share=a.share, show_error=True) | |