| """
|
| OHLC Data Background Worker - REAL DATA FROM MULTIPLE FREE APIs
|
|
|
| CRITICAL RULES:
|
| - MUST fetch REAL candlestick data from multiple sources with automatic fallback
|
| - MUST store actual OHLC values, not fake data
|
| - MUST use actual timestamps from API responses
|
| - NEVER generate or interpolate candles
|
| - If primary API fails, automatically try alternative sources
|
|
|
| SUPPORTED DATA SOURCES (in priority order):
|
| 1. CoinGecko (FREE, no API key, 365-day history)
|
| 2. Kraken (FREE, no API key, up to 720 candles)
|
| 3. Coinbase Pro (FREE, no API key, up to 300 candles)
|
| 4. Binance (FREE, but may be geo-restricted in some regions)
|
| 5. CoinPaprika (FREE, no API key, 366-day history)
|
| """
|
|
|
| import asyncio
|
| import time
|
| import logging
|
| import os
|
| from datetime import datetime
|
| from typing import List, Dict, Any, Optional
|
| import httpx
|
|
|
| from database.cache_queries import get_cache_queries
|
| from database.db_manager import db_manager
|
| from utils.logger import setup_logger
|
|
|
| logger = setup_logger("ohlc_worker")
|
|
|
|
|
| cache = get_cache_queries(db_manager)
|
|
|
|
|
| HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN"))
|
| if HF_UPLOAD_ENABLED:
|
| try:
|
| from hf_dataset_uploader import get_dataset_uploader
|
| hf_uploader = get_dataset_uploader()
|
| logger.info("✅ HuggingFace Dataset upload ENABLED for OHLC data")
|
| except Exception as e:
|
| logger.warning(f"HuggingFace Dataset upload disabled: {e}")
|
| HF_UPLOAD_ENABLED = False
|
| hf_uploader = None
|
| else:
|
| logger.info("ℹ️ HuggingFace Dataset upload DISABLED (no HF_TOKEN)")
|
| hf_uploader = None
|
|
|
|
|
| SYMBOLS = ["BTC", "ETH", "BNB", "XRP", "ADA", "SOL", "DOT", "DOGE", "MATIC", "AVAX",
|
| "LINK", "LTC", "UNI", "ALGO", "XLM", "ATOM", "TRX", "XMR", "ETC", "XTZ"]
|
|
|
|
|
| INTERVALS = ["1h", "4h", "1d"]
|
|
|
|
|
| SYMBOL_MAP = {
|
| "coingecko": {
|
| "BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin", "XRP": "ripple",
|
| "ADA": "cardano", "SOL": "solana", "DOT": "polkadot", "DOGE": "dogecoin",
|
| "MATIC": "matic-network", "AVAX": "avalanche-2", "LINK": "chainlink",
|
| "LTC": "litecoin", "UNI": "uniswap", "ALGO": "algorand", "XLM": "stellar",
|
| "ATOM": "cosmos", "TRX": "tron", "XMR": "monero", "ETC": "ethereum-classic",
|
| "XTZ": "tezos"
|
| },
|
| "kraken": {
|
| "BTC": "XXBTZUSD", "ETH": "XETHZUSD", "XRP": "XXRPZUSD", "ADA": "ADAUSD",
|
| "SOL": "SOLUSD", "DOT": "DOTUSD", "DOGE": "XDGUSD", "LINK": "LINKUSD",
|
| "LTC": "XLTCZUSD", "UNI": "UNIUSD", "ALGO": "ALGOUSD", "XLM": "XXLMZUSD",
|
| "ATOM": "ATOMUSD", "TRX": "TRXUSD", "ETC": "XETCZUSD", "XTZ": "XTZUSD"
|
| },
|
| "coinbase": {
|
| "BTC": "BTC-USD", "ETH": "ETH-USD", "XRP": "XRP-USD", "ADA": "ADA-USD",
|
| "SOL": "SOL-USD", "DOT": "DOT-USD", "DOGE": "DOGE-USD", "LINK": "LINK-USD",
|
| "LTC": "LTC-USD", "UNI": "UNI-USD", "ALGO": "ALGO-USD", "XLM": "XLM-USD",
|
| "ATOM": "ATOM-USD", "MATIC": "MATIC-USD", "AVAX": "AVAX-USD"
|
| },
|
| "binance": {
|
| "BTC": "BTCUSDT", "ETH": "ETHUSDT", "BNB": "BNBUSDT", "XRP": "XRPUSDT",
|
| "ADA": "ADAUSDT", "SOL": "SOLUSDT", "DOT": "DOTUSDT", "DOGE": "DOGEUSDT",
|
| "MATIC": "MATICUSDT", "AVAX": "AVAXUSDT", "LINK": "LINKUSDT", "LTC": "LTCUSDT",
|
| "UNI": "UNIUSDT", "ALGO": "ALGOUSDT", "XLM": "XLMUSDT", "ATOM": "ATOMUSDT",
|
| "TRX": "TRXUSDT", "XMR": "XMRUSDT", "ETC": "ETCUSDT", "XTZ": "XTZUSDT"
|
| }
|
| }
|
|
|
|
|
| async def fetch_from_coingecko(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
| """
|
| Fetch OHLC data from CoinGecko (FREE, no API key required)
|
|
|
| Args:
|
| symbol: Base symbol (e.g., 'BTC')
|
| interval: Interval (only '1d' supported by CoinGecko)
|
| limit: Number of days to fetch (max 365)
|
|
|
| Returns:
|
| List of OHLC candles
|
| """
|
| try:
|
| coin_id = SYMBOL_MAP["coingecko"].get(symbol)
|
| if not coin_id:
|
| logger.debug(f"CoinGecko: No mapping for {symbol}")
|
| return []
|
|
|
|
|
| if interval not in ["1d", "4h", "1h"]:
|
| return []
|
|
|
|
|
| days = min(limit if interval == "1d" else limit // 6 if interval == "4h" else limit // 24, 365)
|
|
|
| url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/ohlc"
|
| params = {"vs_currency": "usd", "days": days}
|
|
|
| logger.debug(f"Fetching from CoinGecko: {coin_id} ({symbol})")
|
|
|
| async with httpx.AsyncClient(timeout=15.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| if not data or not isinstance(data, list):
|
| return []
|
|
|
| ohlc_data = []
|
| for candle in data:
|
| try:
|
|
|
| ohlc_data.append({
|
| "symbol": symbol,
|
| "interval": interval,
|
| "timestamp": datetime.fromtimestamp(candle[0] / 1000),
|
| "open": float(candle[1]),
|
| "high": float(candle[2]),
|
| "low": float(candle[3]),
|
| "close": float(candle[4]),
|
| "volume": 0.0,
|
| "provider": "coingecko"
|
| })
|
| except Exception as e:
|
| logger.debug(f"Error parsing CoinGecko candle: {e}")
|
| continue
|
|
|
| logger.info(f"✅ CoinGecko: Fetched {len(ohlc_data)} candles for {symbol}")
|
| return ohlc_data
|
|
|
| except httpx.HTTPStatusError as e:
|
| logger.debug(f"CoinGecko HTTP error for {symbol}: {e.response.status_code}")
|
| return []
|
| except Exception as e:
|
| logger.debug(f"CoinGecko error for {symbol}: {e}")
|
| return []
|
|
|
|
|
| async def fetch_from_kraken(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
| """
|
| Fetch OHLC data from Kraken (FREE, no API key required)
|
|
|
| Args:
|
| symbol: Base symbol (e.g., 'BTC')
|
| interval: Interval
|
| limit: Number of candles
|
|
|
| Returns:
|
| List of OHLC candles
|
| """
|
| try:
|
| pair = SYMBOL_MAP["kraken"].get(symbol)
|
| if not pair:
|
| logger.debug(f"Kraken: No mapping for {symbol}")
|
| return []
|
|
|
|
|
| interval_map = {"1h": "60", "4h": "240", "1d": "1440"}
|
| kraken_interval = interval_map.get(interval)
|
| if not kraken_interval:
|
| return []
|
|
|
| url = "https://api.kraken.com/0/public/OHLC"
|
| params = {"pair": pair, "interval": kraken_interval}
|
|
|
| logger.debug(f"Fetching from Kraken: {pair} ({symbol})")
|
|
|
| async with httpx.AsyncClient(timeout=15.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| if data.get("error") and len(data["error"]) > 0:
|
| logger.debug(f"Kraken error for {symbol}: {data['error']}")
|
| return []
|
|
|
| result = data.get("result", {})
|
| candles = result.get(pair, [])
|
|
|
| if not candles:
|
| return []
|
|
|
| ohlc_data = []
|
| for candle in candles[:limit]:
|
| try:
|
|
|
| ohlc_data.append({
|
| "symbol": symbol,
|
| "interval": interval,
|
| "timestamp": datetime.fromtimestamp(int(candle[0])),
|
| "open": float(candle[1]),
|
| "high": float(candle[2]),
|
| "low": float(candle[3]),
|
| "close": float(candle[4]),
|
| "volume": float(candle[6]),
|
| "provider": "kraken"
|
| })
|
| except Exception as e:
|
| logger.debug(f"Error parsing Kraken candle: {e}")
|
| continue
|
|
|
| logger.info(f"✅ Kraken: Fetched {len(ohlc_data)} candles for {symbol}")
|
| return ohlc_data
|
|
|
| except Exception as e:
|
| logger.debug(f"Kraken error for {symbol}: {e}")
|
| return []
|
|
|
|
|
| async def fetch_from_coinbase(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
| """
|
| Fetch OHLC data from Coinbase Pro (FREE, no API key required)
|
|
|
| Args:
|
| symbol: Base symbol (e.g., 'BTC')
|
| interval: Interval
|
| limit: Number of candles (max 300)
|
|
|
| Returns:
|
| List of OHLC candles
|
| """
|
| try:
|
| pair = SYMBOL_MAP["coinbase"].get(symbol)
|
| if not pair:
|
| logger.debug(f"Coinbase: No mapping for {symbol}")
|
| return []
|
|
|
|
|
| interval_map = {"1h": "3600", "4h": "21600", "1d": "86400"}
|
| granularity = interval_map.get(interval)
|
| if not granularity:
|
| return []
|
|
|
| url = f"https://api.exchange.coinbase.com/products/{pair}/candles"
|
| params = {"granularity": granularity}
|
|
|
| logger.debug(f"Fetching from Coinbase: {pair} ({symbol})")
|
|
|
| async with httpx.AsyncClient(timeout=15.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| if not data or not isinstance(data, list):
|
| return []
|
|
|
| ohlc_data = []
|
| for candle in data[:limit]:
|
| try:
|
|
|
| ohlc_data.append({
|
| "symbol": symbol,
|
| "interval": interval,
|
| "timestamp": datetime.fromtimestamp(int(candle[0])),
|
| "open": float(candle[3]),
|
| "high": float(candle[2]),
|
| "low": float(candle[1]),
|
| "close": float(candle[4]),
|
| "volume": float(candle[5]),
|
| "provider": "coinbase"
|
| })
|
| except Exception as e:
|
| logger.debug(f"Error parsing Coinbase candle: {e}")
|
| continue
|
|
|
| logger.info(f"✅ Coinbase: Fetched {len(ohlc_data)} candles for {symbol}")
|
| return ohlc_data
|
|
|
| except Exception as e:
|
| logger.debug(f"Coinbase error for {symbol}: {e}")
|
| return []
|
|
|
|
|
| async def fetch_from_binance(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
| """
|
| Fetch OHLC data from Binance (FREE, may be geo-restricted)
|
|
|
| Args:
|
| symbol: Base symbol (e.g., 'BTC')
|
| interval: Interval
|
| limit: Number of candles
|
|
|
| Returns:
|
| List of OHLC candles
|
| """
|
| try:
|
| pair = SYMBOL_MAP["binance"].get(symbol)
|
| if not pair:
|
| logger.debug(f"Binance: No mapping for {symbol}")
|
| return []
|
|
|
| url = "https://api.binance.com/api/v3/klines"
|
| params = {"symbol": pair, "interval": interval, "limit": limit}
|
|
|
| logger.debug(f"Fetching from Binance: {pair} ({symbol})")
|
|
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| if not data or not isinstance(data, list):
|
| return []
|
|
|
| ohlc_data = []
|
| for candle in data:
|
| try:
|
|
|
| ohlc_data.append({
|
| "symbol": symbol,
|
| "interval": interval,
|
| "timestamp": datetime.fromtimestamp(int(candle[0]) / 1000),
|
| "open": float(candle[1]),
|
| "high": float(candle[2]),
|
| "low": float(candle[3]),
|
| "close": float(candle[4]),
|
| "volume": float(candle[5]),
|
| "provider": "binance"
|
| })
|
| except Exception as e:
|
| logger.debug(f"Error parsing Binance candle: {e}")
|
| continue
|
|
|
| logger.info(f"✅ Binance: Fetched {len(ohlc_data)} candles for {symbol}")
|
| return ohlc_data
|
|
|
| except httpx.HTTPStatusError as e:
|
| if e.response.status_code == 451:
|
| logger.debug(f"Binance geo-restricted for {symbol}")
|
| else:
|
| logger.debug(f"Binance HTTP error for {symbol}: {e.response.status_code}")
|
| return []
|
| except Exception as e:
|
| logger.debug(f"Binance error for {symbol}: {e}")
|
| return []
|
|
|
|
|
| async def fetch_ohlc_with_fallback(symbol: str, interval: str, limit: int = 100) -> List[Dict[str, Any]]:
|
| """
|
| Fetch OHLC data with automatic fallback across multiple sources
|
|
|
| Priority order:
|
| 1. CoinGecko (most reliable, no auth, no geo-restrictions)
|
| 2. Kraken (reliable, no auth)
|
| 3. Coinbase (reliable, no auth)
|
| 4. Binance (may be geo-restricted)
|
|
|
| Args:
|
| symbol: Base symbol (e.g., 'BTC')
|
| interval: Interval ('1h', '4h', '1d')
|
| limit: Number of candles to fetch
|
|
|
| Returns:
|
| List of OHLC candles from first successful source
|
| """
|
| sources = [
|
| ("CoinGecko", fetch_from_coingecko),
|
| ("Kraken", fetch_from_kraken),
|
| ("Coinbase", fetch_from_coinbase),
|
| ("Binance", fetch_from_binance),
|
| ]
|
|
|
| for source_name, fetch_func in sources:
|
| try:
|
| data = await fetch_func(symbol, interval, limit)
|
| if data and len(data) > 0:
|
| logger.debug(f"✅ Successfully fetched {len(data)} candles from {source_name} for {symbol}")
|
| return data
|
| except Exception as e:
|
| logger.debug(f"❌ {source_name} failed for {symbol}: {e}")
|
| continue
|
|
|
| logger.warning(f"⚠️ All sources failed for {symbol} {interval}")
|
| return []
|
|
|
|
|
| async def save_ohlc_data_to_cache(ohlc_data: List[Dict[str, Any]]) -> int:
|
| """
|
| Save REAL OHLC data to database cache AND upload to HuggingFace Datasets
|
|
|
| Data Flow:
|
| 1. Save to SQLite cache (local persistence)
|
| 2. Upload to HuggingFace Datasets (cloud storage & hub)
|
| 3. Clients can fetch from HuggingFace Datasets
|
|
|
| Args:
|
| ohlc_data: List of REAL OHLC data dictionaries
|
|
|
| Returns:
|
| int: Number of candles saved
|
| """
|
| saved_count = 0
|
|
|
|
|
| for data in ohlc_data:
|
| try:
|
| success = cache.save_ohlc_candle(
|
| symbol=data["symbol"],
|
| interval=data["interval"],
|
| timestamp=data["timestamp"],
|
| open_price=data["open"],
|
| high=data["high"],
|
| low=data["low"],
|
| close=data["close"],
|
| volume=data["volume"],
|
| provider=data["provider"]
|
| )
|
|
|
| if success:
|
| saved_count += 1
|
|
|
| except Exception as e:
|
| logger.error(f"Error saving OHLC data for {data.get('symbol')}: {e}")
|
| continue
|
|
|
|
|
| if HF_UPLOAD_ENABLED and hf_uploader and ohlc_data:
|
| try:
|
|
|
| upload_data = []
|
| for data in ohlc_data:
|
| upload_record = data.copy()
|
| if isinstance(upload_record.get("timestamp"), datetime):
|
| upload_record["timestamp"] = upload_record["timestamp"].isoformat() + "Z"
|
| upload_data.append(upload_record)
|
|
|
| logger.info(f"📤 Uploading {len(upload_data)} OHLC records to HuggingFace Datasets...")
|
| upload_success = await hf_uploader.upload_ohlc_data(
|
| upload_data,
|
| append=True
|
| )
|
|
|
| if upload_success:
|
| logger.info(f"✅ Successfully uploaded OHLC data to HuggingFace Datasets")
|
| else:
|
| logger.warning(f"⚠️ Failed to upload OHLC data to HuggingFace Datasets")
|
|
|
| except Exception as e:
|
| logger.error(f"Error uploading OHLC to HuggingFace Datasets: {e}")
|
|
|
|
|
| return saved_count
|
|
|
|
|
| async def fetch_and_cache_ohlc_for_symbol(symbol: str, interval: str) -> int:
|
| """
|
| Fetch and cache OHLC data for a single symbol and interval using multi-source fallback
|
|
|
| Args:
|
| symbol: Base symbol (e.g., 'BTC')
|
| interval: Candle interval ('1h', '4h', '1d')
|
|
|
| Returns:
|
| int: Number of candles saved
|
| """
|
| try:
|
|
|
| limit = 100 if interval == "1d" else 100
|
|
|
|
|
| ohlc_data = await fetch_ohlc_with_fallback(symbol, interval, limit)
|
|
|
| if not ohlc_data or len(ohlc_data) == 0:
|
| logger.debug(f"No OHLC data received for {symbol} {interval}")
|
| return 0
|
|
|
|
|
| saved_count = await save_ohlc_data_to_cache(ohlc_data)
|
|
|
| if saved_count > 0:
|
| logger.debug(f"Saved {saved_count}/{len(ohlc_data)} candles for {symbol} {interval}")
|
| return saved_count
|
|
|
| except Exception as e:
|
| logger.error(f"Error fetching OHLC for {symbol} {interval}: {e}")
|
| return 0
|
|
|
|
|
| async def ohlc_data_worker_loop():
|
| """
|
| Background worker loop - Fetch REAL OHLC data periodically with multi-source fallback
|
|
|
| CRITICAL RULES:
|
| 1. Run continuously in background
|
| 2. Fetch REAL data from multiple sources with automatic fallback
|
| 3. Store REAL data in database
|
| 4. NEVER generate fake candles as fallback
|
| 5. If all sources fail, log error and retry on next iteration
|
| """
|
|
|
| logger.info("Starting OHLC data background worker with multi-source fallback")
|
| logger.info("📊 Data sources: CoinGecko, Kraken, Coinbase, Binance")
|
| iteration = 0
|
|
|
| while True:
|
| try:
|
| iteration += 1
|
| start_time = time.time()
|
|
|
| logger.info(f"[Iteration {iteration}] Fetching REAL OHLC data from multiple sources...")
|
|
|
| total_saved = 0
|
| total_combinations = len(SYMBOLS) * len(INTERVALS)
|
| successful_fetches = 0
|
|
|
|
|
| for symbol in SYMBOLS:
|
| for interval in INTERVALS:
|
| try:
|
| saved = await fetch_and_cache_ohlc_for_symbol(symbol, interval)
|
| total_saved += saved
|
| if saved > 0:
|
| successful_fetches += 1
|
|
|
|
|
| await asyncio.sleep(0.5)
|
|
|
| except Exception as e:
|
| logger.error(f"Error processing {symbol} {interval}: {e}")
|
| continue
|
|
|
| elapsed = time.time() - start_time
|
| logger.info(
|
| f"[Iteration {iteration}] Successfully saved {total_saved} REAL OHLC candles "
|
| f"({successful_fetches}/{total_combinations} symbol-intervals) in {elapsed:.2f}s"
|
| )
|
|
|
|
|
| await asyncio.sleep(300)
|
|
|
| except Exception as e:
|
| logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True)
|
|
|
| await asyncio.sleep(300)
|
|
|
|
|
| async def start_ohlc_data_worker():
|
| """
|
| Start OHLC data background worker with multi-source support
|
|
|
| This should be called during application startup
|
| """
|
| try:
|
| logger.info("Initializing OHLC data worker with multi-source fallback...")
|
| logger.info("📊 Supported sources: CoinGecko, Kraken, Coinbase, Binance")
|
|
|
|
|
| logger.info("Running initial OHLC data fetch...")
|
| total_saved = 0
|
|
|
| for symbol in SYMBOLS[:5]:
|
| for interval in INTERVALS:
|
| saved = await fetch_and_cache_ohlc_for_symbol(symbol, interval)
|
| total_saved += saved
|
| await asyncio.sleep(0.5)
|
|
|
| logger.info(f"Initial fetch: Saved {total_saved} REAL OHLC candles")
|
|
|
|
|
| asyncio.create_task(ohlc_data_worker_loop())
|
| logger.info("OHLC data worker started successfully")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to start OHLC data worker: {e}", exc_info=True)
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
| import sys
|
| sys.path.append("/workspace")
|
|
|
| async def test():
|
| """Test the worker with multi-source fallback"""
|
| logger.info("Testing OHLC data worker with multi-source fallback...")
|
|
|
|
|
| test_symbols = ["BTC", "ETH"]
|
| interval = "1h"
|
|
|
| for symbol in test_symbols:
|
| logger.info(f"\n{'='*60}")
|
| logger.info(f"Testing {symbol}")
|
| logger.info(f"{'='*60}")
|
|
|
| data = await fetch_ohlc_with_fallback(symbol, interval, limit=10)
|
| logger.info(f"Fetched {len(data)} candles for {symbol} {interval}")
|
|
|
| if data:
|
|
|
| logger.info(f"Provider: {data[0].get('provider')}")
|
| for candle in data[:3]:
|
| logger.info(
|
| f" {candle['timestamp']}: O={candle['open']:.2f} "
|
| f"H={candle['high']:.2f} L={candle['low']:.2f} C={candle['close']:.2f}"
|
| )
|
|
|
|
|
| saved = await save_ohlc_data_to_cache(data)
|
| logger.info(f"Saved {saved} candles to database")
|
| else:
|
| logger.warning(f"No data retrieved for {symbol}")
|
|
|
| asyncio.run(test())
|
|
|