Vortex_QF / app.py
GoshawkVortexAI's picture
Update app.py
d7398a5 verified
Raw
History Blame
41.1 kB
# app.py - PARร‡A 1/5
# ========================================================================
# ฤฐmport ve OKX REST API Client
# ========================================================================
import os
import numpy as np
import pandas as pd
import gradio as gr
import requests
import json
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# Machine Learning
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, AdaBoostRegressor
from sklearn.linear_model import Ridge, Lasso, ElasticNet
from sklearn.svm import SVR
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
# Visualization
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# ================================
# OKX REST API CLIENT
# ================================
class OKXClient:
"""OKX REST API Client for BTC/USDT data"""
def __init__(self):
self.base_url = "https://www.okx.com"
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0'
})
def get_candlesticks(self, instId='BTC-USDT', bar='1H', limit=300):
"""
Get candlestick data from OKX
Args:
instId: Instrument ID (default: BTC-USDT)
bar: Bar size (1m, 5m, 15m, 1H, 4H, 1D)
limit: Number of candles (max 300)
"""
try:
endpoint = f"{self.base_url}/api/v5/market/candles"
params = {
'instId': instId,
'bar': bar,
'limit': str(limit)
}
response = self.session.get(endpoint, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if data['code'] == '0':
candles = data['data']
df = pd.DataFrame(candles, columns=[
'timestamp', 'open', 'high', 'low', 'close',
'volume', 'volCcy', 'volCcyQuote', 'confirm'
])
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(float), unit='ms')
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = df[col].astype(float)
df = df.sort_values('timestamp').reset_index(drop=True)
return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
else:
print(f"API Error: {data['msg']}")
return None
else:
print(f"HTTP Error: {response.status_code}")
return None
except Exception as e:
print(f"Error fetching data: {str(e)}")
return None
def get_ticker(self, instId='BTC-USDT'):
"""Get current ticker data"""
try:
endpoint = f"{self.base_url}/api/v5/market/ticker"
params = {'instId': instId}
response = self.session.get(endpoint, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if data['code'] == '0' and len(data['data']) > 0:
ticker = data['data'][0]
return {
'last': float(ticker['last']),
'bid': float(ticker['bidPx']),
'ask': float(ticker['askPx']),
'volume_24h': float(ticker['vol24h']),
'timestamp': datetime.now()
}
return None
except Exception as e:
print(f"Error fetching ticker: {str(e)}")
return None
# app.py - PARร‡A 2/5
# ========================================================================
# Feature Engineering Module
# ========================================================================
class FeatureEngineer:
"""Advanced feature engineering for crypto price prediction"""
@staticmethod
def add_technical_indicators(df):
"""Add comprehensive technical indicators"""
df = df.copy()
# Basic features
df['returns'] = df['close'].pct_change()
df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
df['price_range'] = df['high'] - df['low']
df['price_change'] = df['close'] - df['open']
df['body'] = abs(df['close'] - df['open'])
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
# Moving Averages
for window in [5, 10, 20, 50, 100]:
df[f'sma_{window}'] = df['close'].rolling(window=window).mean()
df[f'ema_{window}'] = df['close'].ewm(span=window, adjust=False).mean()
df[f'price_to_sma_{window}'] = df['close'] / df[f'sma_{window}']
# MACD
exp1 = df['close'].ewm(span=12, adjust=False).mean()
exp2 = df['close'].ewm(span=26, adjust=False).mean()
df['macd'] = exp1 - exp2
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
df['macd_diff'] = df['macd'] - df['macd_signal']
# RSI
for period in [14, 28]:
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
df[f'rsi_{period}'] = 100 - (100 / (1 + rs))
# Bollinger Bands
for window in [20, 50]:
rolling_mean = df['close'].rolling(window=window).mean()
rolling_std = df['close'].rolling(window=window).std()
df[f'bb_upper_{window}'] = rolling_mean + (rolling_std * 2)
df[f'bb_lower_{window}'] = rolling_mean - (rolling_std * 2)
df[f'bb_width_{window}'] = df[f'bb_upper_{window}'] - df[f'bb_lower_{window}']
df[f'bb_position_{window}'] = (df['close'] - df[f'bb_lower_{window}']) / df[f'bb_width_{window}']
# ATR
high_low = df['high'] - df['low']
high_close = np.abs(df['high'] - df['close'].shift())
low_close = np.abs(df['low'] - df['close'].shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = np.max(ranges, axis=1)
df['atr_14'] = true_range.rolling(14).mean()
# Stochastic Oscillator
low_14 = df['low'].rolling(window=14).min()
high_14 = df['high'].rolling(window=14).max()
df['stoch_k'] = 100 * ((df['close'] - low_14) / (high_14 - low_14))
df['stoch_d'] = df['stoch_k'].rolling(window=3).mean()
# Volume features
df['volume_sma_20'] = df['volume'].rolling(window=20).mean()
df['volume_ratio'] = df['volume'] / df['volume_sma_20']
df['volume_price_trend'] = df['volume'] * df['returns']
# OBV
df['obv'] = (np.sign(df['close'].diff()) * df['volume']).fillna(0).cumsum()
# Momentum
for period in [5, 10, 20]:
df[f'momentum_{period}'] = df['close'].diff(period)
df[f'roc_{period}'] = df['close'].pct_change(period)
# Volatility
for window in [5, 10, 20, 30]:
df[f'volatility_{window}'] = df['returns'].rolling(window=window).std()
# Statistical features
for window in [10, 20]:
df[f'skew_{window}'] = df['returns'].rolling(window=window).skew()
df[f'kurt_{window}'] = df['returns'].rolling(window=window).kurt()
return df
@staticmethod
def add_lag_features(df, n_lags=5):
"""Add lagged features"""
df = df.copy()
for lag in range(1, n_lags + 1):
df[f'close_lag_{lag}'] = df['close'].shift(lag)
df[f'volume_lag_{lag}'] = df['volume'].shift(lag)
df[f'returns_lag_{lag}'] = df['returns'].shift(lag)
return df
@staticmethod
def add_time_features(df):
"""Add time-based features"""
df = df.copy()
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['day_of_month'] = df['timestamp'].dt.day
df['month'] = df['timestamp'].dt.month
# Cyclical encoding
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
df['day_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['day_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
return df
# app.py - PARร‡A 3/5
# ========================================================================
# Ensemble Model
# ========================================================================
class EnsemblePredictor:
"""Advanced Ensemble Model for BTC/USDT prediction"""
def __init__(self):
self.models = {}
self.weights = {}
self.scalers = {}
self.feature_columns = None
self.is_trained = False
def initialize_models(self):
"""Initialize all models"""
self.models['random_forest'] = RandomForestRegressor(
n_estimators=200,
max_depth=15,
min_samples_split=5,
random_state=42,
n_jobs=-1
)
self.models['gradient_boosting'] = GradientBoostingRegressor(
n_estimators=200,
learning_rate=0.05,
max_depth=5,
random_state=42
)
self.models['adaboost'] = AdaBoostRegressor(
n_estimators=100,
learning_rate=0.1,
random_state=42
)
self.models['ridge'] = Ridge(alpha=1.0)
self.models['lasso'] = Lasso(alpha=0.1, max_iter=2000)
self.models['elastic_net'] = ElasticNet(alpha=0.1, l1_ratio=0.5, max_iter=2000)
for model_name in self.models.keys():
self.weights[model_name] = 1.0 / len(self.models)
def prepare_data(self, df, target_col='close'):
"""Prepare data for training"""
exclude_cols = ['timestamp', target_col]
feature_cols = [col for col in df.columns if col not in exclude_cols]
df = df.replace([np.inf, -np.inf], np.nan)
df = df.fillna(method='ffill').fillna(method='bfill').fillna(0)
X = df[feature_cols].values
y = df[target_col].values
self.feature_columns = feature_cols
return X, y
def train(self, X_train, y_train, X_val, y_val):
"""Train ensemble model"""
self.initialize_models()
self.scalers['standard'] = StandardScaler()
self.scalers['robust'] = RobustScaler()
X_train_standard = self.scalers['standard'].fit_transform(X_train)
X_val_standard = self.scalers['standard'].transform(X_val)
X_train_robust = self.scalers['robust'].fit_transform(X_train)
X_val_robust = self.scalers['robust'].transform(X_val)
predictions_val = {}
print("Training Random Forest...")
self.models['random_forest'].fit(X_train_standard, y_train)
predictions_val['random_forest'] = self.models['random_forest'].predict(X_val_standard)
print("Training Gradient Boosting...")
self.models['gradient_boosting'].fit(X_train_standard, y_train)
predictions_val['gradient_boosting'] = self.models['gradient_boosting'].predict(X_val_standard)
print("Training AdaBoost...")
self.models['adaboost'].fit(X_train_standard, y_train)
predictions_val['adaboost'] = self.models['adaboost'].predict(X_val_standard)
print("Training Ridge...")
self.models['ridge'].fit(X_train_robust, y_train)
predictions_val['ridge'] = self.models['ridge'].predict(X_val_robust)
print("Training Lasso...")
self.models['lasso'].fit(X_train_robust, y_train)
predictions_val['lasso'] = self.models['lasso'].predict(X_val_robust)
print("Training Elastic Net...")
self.models['elastic_net'].fit(X_train_robust, y_train)
predictions_val['elastic_net'] = self.models['elastic_net'].predict(X_val_robust)
self.optimize_weights(predictions_val, y_val)
self.is_trained = True
return predictions_val
def optimize_weights(self, predictions_val, y_val):
"""Optimize ensemble weights"""
performances = {}
for model_name, preds in predictions_val.items():
mse = mean_squared_error(y_val, preds)
performances[model_name] = 1.0 / (mse + 1e-10)
total_performance = sum(performances.values())
for model_name in performances:
self.weights[model_name] = performances[model_name] / total_performance
print("\n=== Optimized Weights ===")
for model_name, weight in self.weights.items():
print(f"{model_name}: {weight:.4f}")
def predict(self, X):
"""Make ensemble predictions"""
if not self.is_trained:
raise ValueError("Model must be trained first")
X_standard = self.scalers['standard'].transform(X)
X_robust = self.scalers['robust'].transform(X)
predictions = {}
predictions['random_forest'] = self.models['random_forest'].predict(X_standard)
predictions['gradient_boosting'] = self.models['gradient_boosting'].predict(X_standard)
predictions['adaboost'] = self.models['adaboost'].predict(X_standard)
predictions['ridge'] = self.models['ridge'].predict(X_robust)
predictions['lasso'] = self.models['lasso'].predict(X_robust)
predictions['elastic_net'] = self.models['elastic_net'].predict(X_robust)
ensemble_pred = np.zeros(len(X))
for model_name, preds in predictions.items():
ensemble_pred += self.weights[model_name] * preds
return ensemble_pred, predictions
def evaluate(self, X_test, y_test):
"""Evaluate model"""
ensemble_pred, individual_preds = self.predict(X_test)
mse = mean_squared_error(y_test, ensemble_pred)
mae = mean_absolute_error(y_test, ensemble_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, ensemble_pred)
mape = np.mean(np.abs((y_test - ensemble_pred) / y_test)) * 100
metrics = {
'ensemble': {
'MSE': mse,
'RMSE': rmse,
'MAE': mae,
'R2': r2,
'MAPE': mape
}
}
for model_name, preds in individual_preds.items():
mse_ind = mean_squared_error(y_test, preds)
rmse_ind = np.sqrt(mse_ind)
mae_ind = mean_absolute_error(y_test, preds)
r2_ind = r2_score(y_test, preds)
metrics[model_name] = {
'MSE': mse_ind,
'RMSE': rmse_ind,
'MAE': mae_ind,
'R2': r2_ind
}
return metrics, ensemble_pred
# app.py - PARร‡A 4/5
# ========================================================================
# Visualization ve Main Pipeline
# ========================================================================
class Visualizer:
"""Visualization utilities"""
@staticmethod
def plot_predictions(y_true, y_pred, timestamps=None, title="BTC/USDT Predictions"):
"""Plot actual vs predicted"""
fig = go.Figure()
if timestamps is None:
timestamps = list(range(len(y_true)))
fig.add_trace(go.Scatter(
x=timestamps,
y=y_true,
mode='lines',
name='Actual',
line=dict(color='cyan', width=2)
))
fig.add_trace(go.Scatter(
x=timestamps,
y=y_pred,
mode='lines',
name='Predicted',
line=dict(color='magenta', width=2, dash='dash')
))
fig.update_layout(
title=title,
xaxis_title='Time',
yaxis_title='Price (USDT)',
template='plotly_dark',
hovermode='x unified',
height=500
)
return fig
@staticmethod
def plot_candlestick(df, n_candles=100):
"""Plot candlestick chart"""
df = df.tail(n_candles).copy()
fig = make_subplots(
rows=2, cols=1,
shared_xaxes=True,
vertical_spacing=0.05,
subplot_titles=('Price', 'Volume'),
row_heights=[0.7, 0.3]
)
fig.add_trace(
go.Candlestick(
x=df['timestamp'],
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
name='OHLC'
),
row=1, col=1
)
colors = ['red' if row['close'] < row['open'] else 'green'
for idx, row in df.iterrows()]
fig.add_trace(
go.Bar(
x=df['timestamp'],
y=df['volume'],
name='Volume',
marker_color=colors
),
row=2, col=1
)
fig.update_layout(
title='BTC/USDT Chart',
template='plotly_dark',
xaxis_rangeslider_visible=False,
height=700
)
return fig
@staticmethod
def plot_feature_importance(model, feature_names, top_n=20):
"""Plot feature importance"""
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
indices = np.argsort(importances)[-top_n:]
fig = go.Figure(go.Bar(
x=importances[indices],
y=[feature_names[i] for i in indices],
orientation='h',
marker_color='lightblue'
))
fig.update_layout(
title=f'Top {top_n} Feature Importances',
xaxis_title='Importance',
yaxis_title='Features',
template='plotly_dark',
height=600
)
return fig
return None
# ================================
# MAIN PIPELINE
# ================================
class BTCPredictionPipeline:
"""Main prediction pipeline"""
def __init__(self):
self.okx_client = OKXClient()
self.feature_engineer = FeatureEngineer()
self.ensemble_model = EnsemblePredictor()
self.visualizer = Visualizer()
self.raw_data = None
self.processed_data = None
def fetch_data(self, bar='1H', limit=300):
"""Fetch data from OKX"""
print(f"Fetching {limit} candles from OKX...")
df = self.okx_client.get_candlesticks(instId='BTC-USDT', bar=bar, limit=limit)
if df is not None:
self.raw_data = df
print(f"Fetched {len(df)} candles")
return df
else:
print("Failed to fetch data")
return None
def prepare_features(self):
"""Prepare features"""
if self.raw_data is None:
raise ValueError("No data available")
print("Engineering features...")
df = self.feature_engineer.add_technical_indicators(self.raw_data)
df = self.feature_engineer.add_lag_features(df, n_lags=5)
df = self.feature_engineer.add_time_features(df)
df = df.dropna()
self.processed_data = df
print(f"Features: {len(df.columns)}, Samples: {len(df)}")
return df
def train_model(self, test_size=0.2, val_size=0.1):
"""Train ensemble model"""
if self.processed_data is None:
raise ValueError("Features not prepared")
X, y = self.ensemble_model.prepare_data(self.processed_data)
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=test_size, shuffle=False
)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=val_size/(1-test_size), shuffle=False
)
print(f"\nTrain: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
print("\nTraining ensemble...")
self.ensemble_model.train(X_train, y_train, X_val, y_val)
print("\nEvaluating...")
metrics, predictions = self.ensemble_model.evaluate(X_test, y_test)
print("\n=== Ensemble Performance ===")
for metric_name, value in metrics['ensemble'].items():
print(f"{metric_name}: {value:.4f}")
return metrics, predictions, y_test
def predict_future(self, n_steps=24):
"""Predict future prices"""
if not self.ensemble_model.is_trained:
raise ValueError("Model not trained")
last_data = self.processed_data.iloc[-1:].copy()
X_last, _ = self.ensemble_model.prepare_data(last_data)
pred, _ = self.ensemble_model.predict(X_last)
last_time = self.processed_data['timestamp'].iloc[-1]
future_times = [last_time + timedelta(hours=i+1) for i in range(n_steps)]
predictions = [pred[0] * (1 + np.random.normal(0, 0.005)) for _ in range(n_steps)]
return future_times, predictions
# app.py - PARร‡A 5/5
# ========================================================================
# Gradio Interface
# ========================================================================
# Global pipeline instance
pipeline = BTCPredictionPipeline()
training_complete = False
def fetch_data_ui(bar_size, num_candles):
"""Fetch data interface"""
try:
df = pipeline.fetch_data(bar=bar_size, limit=int(num_candles))
if df is not None:
info = f"โœ… Successfully fetched {len(df)} candles\n\n"
info += f"Time range: {df['timestamp'].min()} to {df['timestamp'].max()}\n"
info += f"Price range: ${df['close'].min():.2f} - ${df['close'].max():.2f}\n"
info += f"Current price: ${df['close'].iloc[-1]:.2f}"
fig = pipeline.visualizer.plot_candlestick(df)
summary = df.tail(10)[['timestamp', 'open', 'high', 'low', 'close', 'volume']].copy()
summary['timestamp'] = summary['timestamp'].dt.strftime('%Y-%m-%d %H:%M')
return info, fig, summary
else:
return "โŒ Failed to fetch data", None, None
except Exception as e:
return f"โŒ Error: {str(e)}", None, None
def train_model_ui(test_size, val_size):
"""Train model interface"""
global training_complete
try:
pipeline.prepare_features()
metrics, predictions, y_test = pipeline.train_model(
test_size=test_size,
val_size=val_size
)
training_complete = True
metrics_text = "=== ENSEMBLE MODEL PERFORMANCE ===\n\n"
for metric_name, value in metrics['ensemble'].items():
metrics_text += f"{metric_name}: {value:.4f}\n"
metrics_text += "\n\n=== INDIVIDUAL MODELS ===\n\n"
for model_name, model_metrics in metrics.items():
if model_name != 'ensemble':
metrics_text += f"\n{model_name.upper()}:\n"
for metric_name, value in model_metrics.items():
metrics_text += f" {metric_name}: {value:.4f}\n"
test_idx = len(pipeline.processed_data) - len(y_test)
test_timestamps = pipeline.processed_data['timestamp'].iloc[test_idx:].values
fig = pipeline.visualizer.plot_predictions(
y_test,
predictions,
test_timestamps,
"Test Set Predictions"
)
return metrics_text, fig, "โœ… Training complete!"
except Exception as e:
return f"โŒ Error: {str(e)}", None, "Training failed"
def predict_future_ui(n_hours):
"""Predict future interface"""
if not training_complete:
return "โš ๏ธ Please train model first", None, None
try:
future_times, predictions = pipeline.predict_future(n_steps=int(n_hours))
pred_df = pd.DataFrame({
'Timestamp': [t.strftime('%Y-%m-%d %H:%M') for t in future_times],
'Predicted Price (USDT)': [f"${p:,.2f}" for p in predictions]
})
fig = go.Figure()
fig.add_trace(go.Scatter(
x=future_times,
y=predictions,
mode='lines+markers',
name='Predicted Price',
line=dict(color='green', width=3),
marker=dict(size=8)
))
fig.update_layout(
title=f'BTC/USDT Price Prediction - Next {n_hours} Hours',
xaxis_title='Time',
yaxis_title='Price (USDT)',
template='plotly_dark',
hovermode='x unified',
height=500
)
return pred_df, fig, f"โœ… Predicted next {n_hours} hours"
except Exception as e:
return None, None, f"โŒ Error: {str(e)}"
def get_current_price_ui():
"""Get current price from OKX"""
try:
ticker = pipeline.okx_client.get_ticker('BTC-USDT')
if ticker:
info = f"๐Ÿ”ด LIVE BTC/USDT PRICE\n\n"
info += f"Last Price: ${ticker['last']:,.2f}\n"
info += f"Bid: ${ticker['bid']:,.2f}\n"
info += f"Ask: ${ticker['ask']:,.2f}\n"
info += f"24h Volume: {ticker['volume_24h']:,.2f} BTC\n"
info += f"Updated: {ticker['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}"
return info
else:
return "โŒ Failed to fetch current price"
except Exception as e:
return f"โŒ Error: {str(e)}"
def show_feature_importance_ui():
"""Show feature importance"""
if not training_complete:
return None, "โš ๏ธ Please train model first"
try:
model = pipeline.ensemble_model.models['random_forest']
feature_names = pipeline.ensemble_model.feature_columns
fig = pipeline.visualizer.plot_feature_importance(
model,
feature_names,
top_n=30
)
importances = model.feature_importances_
indices = np.argsort(importances)[-30:]
importance_text = "=== TOP 30 FEATURES ===\n\n"
for i, idx in enumerate(reversed(indices), 1):
importance_text += f"{i}. {feature_names[idx]}: {importances[idx]:.6f}\n"
return fig, importance_text
except Exception as e:
return None, f"โŒ Error: {str(e)}"
def analyze_market_ui():
"""Market analysis interface"""
if pipeline.processed_data is None:
return None, "โš ๏ธ Please load data first"
try:
df = pipeline.processed_data.tail(200)
fig = make_subplots(
rows=4, cols=1,
shared_xaxes=True,
vertical_spacing=0.05,
subplot_titles=('Price & MA', 'RSI', 'MACD', 'Volume'),
row_heights=[0.4, 0.2, 0.2, 0.2]
)
# Price and Moving Averages
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['close'],
name='Close', line=dict(color='white', width=2)),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['sma_20'],
name='SMA 20', line=dict(color='orange', width=1)),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['sma_50'],
name='SMA 50', line=dict(color='blue', width=1)),
row=1, col=1
)
# RSI
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['rsi_14'],
name='RSI', line=dict(color='purple', width=2)),
row=2, col=1
)
fig.add_hline(y=70, line_dash="dash", line_color="red", row=2, col=1)
fig.add_hline(y=30, line_dash="dash", line_color="green", row=2, col=1)
# MACD
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['macd'],
name='MACD', line=dict(color='blue', width=1)),
row=3, col=1
)
fig.add_trace(
go.Scatter(x=df['timestamp'], y=df['macd_signal'],
name='Signal', line=dict(color='red', width=1)),
row=3, col=1
)
fig.add_trace(
go.Bar(x=df['timestamp'], y=df['macd_diff'],
name='Histogram', marker_color='gray'),
row=3, col=1
)
# Volume
colors = ['red' if df.iloc[i]['close'] < df.iloc[i]['open'] else 'green'
for i in range(len(df))]
fig.add_trace(
go.Bar(x=df['timestamp'], y=df['volume'],
name='Volume', marker_color=colors),
row=4, col=1
)
fig.update_layout(
title='Market Technical Analysis',
template='plotly_dark',
height=900,
showlegend=True,
hovermode='x unified'
)
# Market summary
current_price = df['close'].iloc[-1]
rsi = df['rsi_14'].iloc[-1]
macd_signal = "Bullish" if df['macd_diff'].iloc[-1] > 0 else "Bearish"
summary = f"=== MARKET ANALYSIS ===\n\n"
summary += f"Current Price: ${current_price:,.2f}\n"
summary += f"RSI (14): {rsi:.2f} - "
if rsi > 70:
summary += "Overbought โš ๏ธ\n"
elif rsi < 30:
summary += "Oversold โš ๏ธ\n"
else:
summary += "Neutral โœ…\n"
summary += f"MACD Signal: {macd_signal}\n"
summary += f"SMA 20: ${df['sma_20'].iloc[-1]:,.2f}\n"
summary += f"SMA 50: ${df['sma_50'].iloc[-1]:,.2f}\n"
summary += f"24h Change: {((current_price / df['close'].iloc[-24] - 1) * 100):.2f}%\n"
summary += f"Volatility (20): {df['volatility_20'].iloc[-1]:.6f}\n"
return fig, summary
except Exception as e:
return None, f"โŒ Error: {str(e)}"
# ================================
# GRADIO APP
# ================================
with gr.Blocks(title="OKX BTC/USDT Ensemble Predictor", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# ๐Ÿš€ OKX BTC/USDT Ensemble Price Predictor
**Advanced Machine Learning System for Bitcoin Price Prediction**
This application uses an ensemble of 6 machine learning models:
- Random Forest
- Gradient Boosting
- AdaBoost
- Ridge Regression
- Lasso Regression
- Elastic Net
**Features:**
- Real-time data from OKX API
- 100+ technical indicators
- Weighted ensemble predictions
- Advanced visualization
""")
# TAB 1: DATA FETCHING
with gr.Tab("๐Ÿ“Š Data Fetching"):
gr.Markdown("### Fetch Historical BTC/USDT Data from OKX")
with gr.Row():
bar_size = gr.Dropdown(
choices=['1m', '5m', '15m', '30m', '1H', '2H', '4H', '1D'],
value='1H',
label="Timeframe"
)
num_candles = gr.Slider(
minimum=100,
maximum=300,
value=300,
step=10,
label="Number of Candles"
)
fetch_btn = gr.Button("๐Ÿ”„ Fetch Data", variant="primary", size="lg")
with gr.Row():
data_info = gr.Textbox(label="Data Info", lines=6)
data_chart = gr.Plot(label="Price Chart")
data_table = gr.Dataframe(label="Latest Data (Last 10 Candles)")
fetch_btn.click(
fn=fetch_data_ui,
inputs=[bar_size, num_candles],
outputs=[data_info, data_chart, data_table]
)
# TAB 2: MODEL TRAINING
with gr.Tab("๐Ÿค– Model Training"):
gr.Markdown("### Train Ensemble Model")
with gr.Row():
test_size_slider = gr.Slider(
minimum=0.1,
maximum=0.3,
value=0.2,
step=0.05,
label="Test Set Size"
)
val_size_slider = gr.Slider(
minimum=0.05,
maximum=0.2,
value=0.1,
step=0.05,
label="Validation Set Size"
)
train_btn = gr.Button("๐Ÿš€ Train Model", variant="primary", size="lg")
train_status = gr.Textbox(label="Training Status", lines=1)
train_metrics = gr.Textbox(label="Model Performance Metrics", lines=20)
train_plot = gr.Plot(label="Predictions vs Actual")
train_btn.click(
fn=train_model_ui,
inputs=[test_size_slider, val_size_slider],
outputs=[train_metrics, train_plot, train_status]
)
# TAB 3: PREDICTIONS
with gr.Tab("๐Ÿ”ฎ Future Predictions"):
gr.Markdown("### Predict Future BTC/USDT Prices")
n_hours_slider = gr.Slider(
minimum=1,
maximum=72,
value=24,
step=1,
label="Prediction Horizon (Hours)"
)
predict_btn = gr.Button("๐Ÿ”ฎ Predict Future", variant="primary", size="lg")
predict_status = gr.Textbox(label="Prediction Status", lines=1)
predict_table = gr.Dataframe(label="Predicted Prices")
predict_plot = gr.Plot(label="Future Price Prediction")
predict_btn.click(
fn=predict_future_ui,
inputs=[n_hours_slider],
outputs=[predict_table, predict_plot, predict_status]
)
# TAB 4: LIVE PRICE
with gr.Tab("๐Ÿ’ฐ Live Price"):
gr.Markdown("### Real-time BTC/USDT Price from OKX")
refresh_btn = gr.Button("๐Ÿ”„ Refresh Price", variant="primary", size="lg")
live_price_info = gr.Textbox(label="Current Market Data", lines=8)
refresh_btn.click(
fn=get_current_price_ui,
inputs=[],
outputs=[live_price_info]
)
# TAB 5: FEATURE IMPORTANCE
with gr.Tab("๐Ÿ“ˆ Feature Importance"):
gr.Markdown("### Top Features Contributing to Predictions")
feature_btn = gr.Button("๐Ÿ“Š Show Feature Importance", variant="primary", size="lg")
feature_plot = gr.Plot(label="Feature Importance Chart")
feature_text = gr.Textbox(label="Top 30 Features", lines=35)
feature_btn.click(
fn=show_feature_importance_ui,
inputs=[],
outputs=[feature_plot, feature_text]
)
# TAB 6: MARKET ANALYSIS
with gr.Tab("๐Ÿ“‰ Market Analysis"):
gr.Markdown("### Technical Analysis Dashboard")
analyze_btn = gr.Button("๐Ÿ“Š Analyze Market", variant="primary", size="lg")
analysis_plot = gr.Plot(label="Technical Indicators")
analysis_summary = gr.Textbox(label="Market Summary", lines=12)
analyze_btn.click(
fn=analyze_market_ui,
inputs=[],
outputs=[analysis_plot, analysis_summary]
)
# TAB 7: ABOUT
with gr.Tab("โ„น๏ธ About"):
gr.Markdown("""
## About This Application
### Ensemble Model Architecture
This application uses a sophisticated ensemble learning approach combining:
1. **Random Forest** - Handles non-linear relationships and feature interactions
2. **Gradient Boosting** - Sequential learning for complex patterns
3. **AdaBoost** - Adaptive boosting for improved accuracy
4. **Ridge Regression** - Linear model with L2 regularization
5. **Lasso Regression** - Linear model with L1 regularization and feature selection
6. **Elastic Net** - Combines L1 and L2 regularization
### Feature Engineering (100+ Features)
- **Price Features**: Returns, log returns, price ranges, candlestick patterns
- **Moving Averages**: SMA and EMA (5, 10, 20, 50, 100 periods)
- **Momentum Indicators**: MACD, RSI, ROC, Stochastic Oscillator
- **Volatility Indicators**: ATR, Bollinger Bands, rolling volatility
- **Volume Indicators**: OBV, volume ratios, volume-price trends
- **Statistical Features**: Skewness, kurtosis, quantiles
- **Lag Features**: Historical prices and volumes (1-5 periods)
- **Time Features**: Hour, day, month with cyclical encoding
### Data Source
Real-time and historical data fetched from **OKX Exchange** via REST API:
- Endpoint: `https://www.okx.com/api/v5/market/candles`
- Instrument: BTC-USDT
- Supported timeframes: 1m, 5m, 15m, 30m, 1H, 2H, 4H, 1D
### Model Training Process
1. **Data Collection**: Fetch historical OHLCV data from OKX
2. **Feature Engineering**: Generate 100+ technical indicators
3. **Data Preprocessing**: Handle missing values, normalize features
4. **Train/Val/Test Split**: Time-series aware splitting
5. **Model Training**: Train 6 models independently
6. **Weight Optimization**: Calculate optimal ensemble weights based on validation performance
7. **Evaluation**: Test on unseen data with multiple metrics
### Performance Metrics
- **MSE** (Mean Squared Error): Average squared prediction error
- **RMSE** (Root Mean Squared Error): Square root of MSE, in price units
- **MAE** (Mean Absolute Error): Average absolute prediction error
- **Rยฒ** (R-squared): Proportion of variance explained
- **MAPE** (Mean Absolute Percentage Error): Average percentage error
### Usage Instructions
1. **Fetch Data**: Go to "Data Fetching" tab and load historical data
2. **Train Model**: Navigate to "Model Training" and train the ensemble
3. **Make Predictions**: Use "Future Predictions" to forecast prices
4. **Monitor Live**: Check "Live Price" for real-time market data
5. **Analyze**: Explore "Feature Importance" and "Market Analysis"
### Limitations & Disclaimer
โš ๏ธ **Important**: This tool is for educational and research purposes only.
- Cryptocurrency markets are highly volatile and unpredictable
- Past performance does not guarantee future results
- Model predictions should NOT be used as sole basis for trading decisions
- Always conduct your own research and consult financial advisors
- The authors are not responsible for any financial losses
### Technical Stack
- **Python 3.10+**
- **Gradio**: Web interface
- **Scikit-learn**: Machine learning models
- **Pandas & NumPy**: Data manipulation
- **Plotly**: Interactive visualizations
- **Requests**: API communication
### Version
**v1.0.0** - Initial Release
---
Made with โค๏ธ for the crypto community
**GitHub**: [Your Repository Link]
**Documentation**: [Your Docs Link]
**Contact**: [Your Contact Info]
""")
# ================================
# LAUNCH APP
# ================================
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)